Argo Workflows: Orquestación de Jobs en Kubernetes
Argo Workflows es un motor de orquestación de flujos de trabajo nativo de Kubernetes que permite definir pipelines complejos mediante DAGs, plantillas reutilizables y gestión de artefactos. Esta guía cubre la instalación, la creación de plantillas de workflow, la ejecución de DAGs, los cron workflows y los patrones de integración con CI/CD.
Requisitos Previos
- Clúster Kubernetes 1.23+
kubectlconfigurado- Mínimo 2 GB de RAM disponibles en el clúster
- Storage class con soporte para PVCs (para artefactos)
Instalación de Argo Workflows
# Crear el namespace de Argo Workflows
kubectl create namespace argo
# Instalar Argo Workflows (versión estable)
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.5.2/install.yaml
# Verificar que los pods están corriendo
kubectl get pods -n argo
# Instalar la CLI de Argo
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.5.2/argo-linux-amd64.gz
gunzip argo-linux-amd64.gz
chmod +x argo-linux-amd64
sudo mv argo-linux-amd64 /usr/local/bin/argo
# Verificar la CLI
argo version
# Configurar permisos para el namespace de trabajo
kubectl create rolebinding argo-default-binding \
--clusterrole=argo-role \
--serviceaccount=default:default \
--namespace=default
# Acceder a la UI de Argo Workflows
kubectl -n argo port-forward deployment/argo-server 2746:2746
# Abrir https://localhost:2746 en el navegador
Conceptos Fundamentales
- Workflow: una instancia de ejecución de un flujo de trabajo
- WorkflowTemplate: plantilla reutilizable que define el flujo
- Template: bloque de construcción básico (container, script, dag, steps)
- Artifact: datos producidos o consumidos por los steps
- Parameter: variables que se pasan entre steps
Plantillas de Workflow
# Workflow básico con múltiples steps secuenciales
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pipeline-datos-
namespace: default
spec:
entrypoint: pipeline-principal
templates:
# Template principal que orquesta los pasos
- name: pipeline-principal
steps:
- - name: extraer-datos
template: extraccion
- - name: transformar-datos
template: transformacion
arguments:
parameters:
- name: fuente
value: "{{steps.extraer-datos.outputs.parameters.ruta}}"
- - name: cargar-datos
template: carga
# Template de extracción de datos
- name: extraccion
container:
image: python:3.11-slim
command: [python, -c]
args:
- |
# Script de extracción de datos
import json
datos = {"registros": 1000, "ruta": "/datos/salida.json"}
print(json.dumps(datos))
outputs:
parameters:
- name: ruta
valueFrom:
path: /tmp/ruta.txt
# Template de transformación
- name: transformacion
inputs:
parameters:
- name: fuente
container:
image: python:3.11-slim
command: [python, -c]
args:
- |
fuente = "{{inputs.parameters.fuente}}"
print(f"Transformando datos desde: {fuente}")
# Template de carga de datos
- name: carga
script:
image: python:3.11-slim
command: [python]
source: |
# Script inline para cargar datos al destino
print("Cargando datos al destino...")
# Aquí iría la lógica de carga real
print("Carga completada con éxito")
WorkflowTemplate para reutilización:
# WorkflowTemplate reutilizable para tareas comunes
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: utilidades-plataforma
namespace: default
spec:
templates:
# Template genérico para ejecutar tests
- name: ejecutar-tests
inputs:
parameters:
- name: imagen
- name: comando
value: "pytest"
- name: directorio
value: "/app"
container:
image: "{{inputs.parameters.imagen}}"
command: [sh, -c]
args:
- "cd {{inputs.parameters.directorio}} && {{inputs.parameters.comando}}"
# Template para notificar en Slack
- name: notificar-slack
inputs:
parameters:
- name: mensaje
- name: canal
value: "#deployments"
container:
image: curlimages/curl:latest
command: [sh, -c]
args:
- |
curl -X POST $SLACK_WEBHOOK_URL \
-H 'Content-type: application/json' \
-d '{"channel": "{{inputs.parameters.canal}}", "text": "{{inputs.parameters.mensaje}}"}'
env:
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: slack-secret
key: webhook-url
DAGs y Ejecución Paralela
Los DAGs permiten definir dependencias complejas entre tareas:
# Workflow con DAG para procesamiento paralelo
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pipeline-ml-
spec:
entrypoint: dag-entrenamiento
templates:
- name: dag-entrenamiento
dag:
tasks:
# Preparar datos (sin dependencias, empieza primero)
- name: preparar-datos
template: preparacion
# Tres experimentos en paralelo, esperando la preparación
- name: experimento-lr
template: entrenar-modelo
dependencies: [preparar-datos]
arguments:
parameters:
- name: algoritmo
value: "logistic-regression"
- name: experimento-rf
template: entrenar-modelo
dependencies: [preparar-datos]
arguments:
parameters:
- name: algoritmo
value: "random-forest"
- name: experimento-xgb
template: entrenar-modelo
dependencies: [preparar-datos]
arguments:
parameters:
- name: algoritmo
value: "xgboost"
# Seleccionar el mejor modelo cuando los tres terminen
- name: seleccionar-modelo
template: evaluacion
dependencies: [experimento-lr, experimento-rf, experimento-xgb]
# Desplegar solo si la evaluación fue exitosa
- name: desplegar-modelo
template: despliegue
dependencies: [seleccionar-modelo]
- name: preparacion
container:
image: python:3.11-slim
command: [python, -c]
args: ["print('Preparando datos de entrenamiento...')"]
- name: entrenar-modelo
inputs:
parameters:
- name: algoritmo
container:
image: python:3.11-slim
command: [python, -c]
args: ["print(f'Entrenando modelo: {{inputs.parameters.algoritmo}}')"]
resources:
requests:
memory: 2Gi
cpu: "1"
- name: evaluacion
container:
image: python:3.11-slim
command: [python, -c]
args: ["print('Evaluando y seleccionando el mejor modelo...')"]
- name: despliegue
container:
image: bitnami/kubectl:latest
command: [kubectl, apply, -f, /manifiestos/modelo.yaml]
Gestión de Artefactos
# Configurar S3 como almacén de artefactos
apiVersion: v1
kind: ConfigMap
metadata:
name: artifact-repositories
namespace: argo
data:
default-v1: |
archiveLogs: true
s3:
bucket: mi-bucket-artefactos
endpoint: s3.amazonaws.com
insecure: false
accessKeySecret:
name: s3-credentials
key: accessKey
secretKeySecret:
name: s3-credentials
key: secretKey
# Workflow que produce y consume artefactos
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-artefactos-
spec:
entrypoint: pipeline-con-artefactos
templates:
- name: pipeline-con-artefactos
steps:
- - name: generar-reporte
template: generador
- - name: procesar-reporte
template: procesador
arguments:
artifacts:
- name: reporte-entrada
from: "{{steps.generar-reporte.outputs.artifacts.reporte}}"
- name: generador
container:
image: python:3.11-slim
command: [sh, -c]
args:
- |
# Generar datos de ejemplo
echo '{"total": 1500, "errores": 3, "fecha": "2024-01-15"}' > /tmp/reporte.json
echo "Reporte generado en /tmp/reporte.json"
outputs:
artifacts:
- name: reporte
path: /tmp/reporte.json
s3:
key: reportes/{{workflow.name}}/reporte.json
- name: procesador
inputs:
artifacts:
- name: reporte-entrada
path: /tmp/reporte-entrada.json
container:
image: python:3.11-slim
command: [python, -c]
args:
- |
import json
with open('/tmp/reporte-entrada.json') as f:
datos = json.load(f)
print(f"Procesando reporte: {datos['total']} registros")
Cron Workflows
# Cron Workflow para ejecución programada
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: backup-diario
namespace: default
spec:
# Ejecutar todos los días a las 2:00 AM (UTC)
schedule: "0 2 * * *"
timezone: "Europe/Madrid"
concurrencyPolicy: Forbid # No ejecutar si ya hay una instancia corriendo
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 3 # Mantener los últimos 3 trabajos exitosos
failedJobsHistoryLimit: 5 # Mantener los últimos 5 trabajos fallidos
workflowSpec:
entrypoint: backup-completo
templates:
- name: backup-completo
steps:
- - name: backup-base-datos
template: pg-dump
- - name: subir-a-s3
template: upload-s3
- - name: notificar-exito
template: notificacion
- name: pg-dump
container:
image: postgres:15
command: [sh, -c]
args:
- pg_dump -h $DB_HOST -U $DB_USER $DB_NAME | gzip > /tmp/backup.sql.gz
env:
- name: DB_HOST
valueFrom:
secretKeyRef:
name: db-credentials
key: host
- name: PGPASSWORD
valueFrom:
secretKeyRef:
name: db-credentials
key: password
- name: upload-s3
container:
image: amazon/aws-cli:latest
command: [aws, s3, cp, /tmp/backup.sql.gz]
args: ["s3://mis-backups/$(date +%Y/%m/%d)/backup.sql.gz"]
- name: notificacion
container:
image: curlimages/curl:latest
command: [sh, -c]
args: ["echo 'Backup completado exitosamente'"]
Patrones de CI/CD
# Enviar un workflow desde el pipeline de CI
argo submit -n argo --watch pipeline-deploy.yaml \
--parameter "imagen=registry.empresa.com/mi-app:${CI_COMMIT_SHA}" \
--parameter "entorno=staging"
# Esperar a que termine y verificar el resultado
STATUS=$(argo get -n argo @latest -o json | jq -r '.status.phase')
if [ "$STATUS" != "Succeeded" ]; then
echo "El workflow falló con estado: $STATUS"
exit 1
fi
# WorkflowTemplate para pipeline de CI/CD completo
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: pipeline-cicd
namespace: argo
spec:
templates:
- name: pipeline-completo
inputs:
parameters:
- name: repo-url
- name: rama
- name: entorno
dag:
tasks:
- name: clonar-repo
template: git-clone
arguments:
parameters:
- name: url
value: "{{inputs.parameters.repo-url}}"
- name: tests-unitarios
template: run-tests
dependencies: [clonar-repo]
- name: construir-imagen
template: docker-build
dependencies: [tests-unitarios]
- name: tests-integracion
template: integration-tests
dependencies: [construir-imagen]
- name: desplegar
template: kubectl-deploy
dependencies: [tests-integracion]
arguments:
parameters:
- name: entorno
value: "{{inputs.parameters.entorno}}"
Solución de Problemas
# Ver el estado de un workflow
argo get -n argo mi-workflow-abc123
# Ver los logs de un step específico
argo logs -n argo mi-workflow-abc123 -c main
# Listar todos los workflows con su estado
argo list -n argo
# Ver workflows fallidos
argo list -n argo --status Failed
# Reintentar un workflow fallido
argo retry -n argo mi-workflow-abc123
# Depurar un pod de workflow directamente
kubectl exec -it -n default \
$(kubectl get pods -n default -l workflows.argoproj.io/workflow=mi-workflow -o name | head -1) \
-- /bin/sh
# Ver los eventos del workflow
kubectl describe workflow mi-workflow-abc123 -n default
El workflow se queda en Pending:
# Verificar que hay nodos con recursos suficientes
kubectl describe workflow mi-workflow -n default | grep -A5 "Message"
# Verificar los recursos del namespace
kubectl describe resourcequota -n default
Conclusión
Argo Workflows proporciona una forma nativa y declarativa de orquestar jobs complejos en Kubernetes, con soporte para paralelismo, gestión de artefactos y ejecución programada. Los DAGs son especialmente potentes para pipelines de datos y machine learning donde las dependencias entre tareas son complejas. Integrado con Argo CD y el ecosistema Argo completo, se convierte en el motor de automatización definitivo para plataformas Kubernetes.


