Argo Workflows: Orquestación de Jobs en Kubernetes

Argo Workflows es un motor de orquestación de flujos de trabajo nativo de Kubernetes que permite definir pipelines complejos mediante DAGs, plantillas reutilizables y gestión de artefactos. Esta guía cubre la instalación, la creación de plantillas de workflow, la ejecución de DAGs, los cron workflows y los patrones de integración con CI/CD.

Requisitos Previos

  • Clúster Kubernetes 1.23+
  • kubectl configurado
  • Mínimo 2 GB de RAM disponibles en el clúster
  • Storage class con soporte para PVCs (para artefactos)

Instalación de Argo Workflows

# Crear el namespace de Argo Workflows
kubectl create namespace argo

# Instalar Argo Workflows (versión estable)
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.5.2/install.yaml

# Verificar que los pods están corriendo
kubectl get pods -n argo

# Instalar la CLI de Argo
curl -sLO https://github.com/argoproj/argo-workflows/releases/download/v3.5.2/argo-linux-amd64.gz
gunzip argo-linux-amd64.gz
chmod +x argo-linux-amd64
sudo mv argo-linux-amd64 /usr/local/bin/argo

# Verificar la CLI
argo version

# Configurar permisos para el namespace de trabajo
kubectl create rolebinding argo-default-binding \
  --clusterrole=argo-role \
  --serviceaccount=default:default \
  --namespace=default

# Acceder a la UI de Argo Workflows
kubectl -n argo port-forward deployment/argo-server 2746:2746
# Abrir https://localhost:2746 en el navegador

Conceptos Fundamentales

  • Workflow: una instancia de ejecución de un flujo de trabajo
  • WorkflowTemplate: plantilla reutilizable que define el flujo
  • Template: bloque de construcción básico (container, script, dag, steps)
  • Artifact: datos producidos o consumidos por los steps
  • Parameter: variables que se pasan entre steps

Plantillas de Workflow

# Workflow básico con múltiples steps secuenciales
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: pipeline-datos-
  namespace: default
spec:
  entrypoint: pipeline-principal
  templates:
    # Template principal que orquesta los pasos
    - name: pipeline-principal
      steps:
        - - name: extraer-datos
            template: extraccion
        - - name: transformar-datos
            template: transformacion
            arguments:
              parameters:
                - name: fuente
                  value: "{{steps.extraer-datos.outputs.parameters.ruta}}"
        - - name: cargar-datos
            template: carga

    # Template de extracción de datos
    - name: extraccion
      container:
        image: python:3.11-slim
        command: [python, -c]
        args:
          - |
            # Script de extracción de datos
            import json
            datos = {"registros": 1000, "ruta": "/datos/salida.json"}
            print(json.dumps(datos))
      outputs:
        parameters:
          - name: ruta
            valueFrom:
              path: /tmp/ruta.txt

    # Template de transformación
    - name: transformacion
      inputs:
        parameters:
          - name: fuente
      container:
        image: python:3.11-slim
        command: [python, -c]
        args:
          - |
            fuente = "{{inputs.parameters.fuente}}"
            print(f"Transformando datos desde: {fuente}")

    # Template de carga de datos
    - name: carga
      script:
        image: python:3.11-slim
        command: [python]
        source: |
          # Script inline para cargar datos al destino
          print("Cargando datos al destino...")
          # Aquí iría la lógica de carga real
          print("Carga completada con éxito")

WorkflowTemplate para reutilización:

# WorkflowTemplate reutilizable para tareas comunes
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: utilidades-plataforma
  namespace: default
spec:
  templates:
    # Template genérico para ejecutar tests
    - name: ejecutar-tests
      inputs:
        parameters:
          - name: imagen
          - name: comando
            value: "pytest"
          - name: directorio
            value: "/app"
      container:
        image: "{{inputs.parameters.imagen}}"
        command: [sh, -c]
        args:
          - "cd {{inputs.parameters.directorio}} && {{inputs.parameters.comando}}"

    # Template para notificar en Slack
    - name: notificar-slack
      inputs:
        parameters:
          - name: mensaje
          - name: canal
            value: "#deployments"
      container:
        image: curlimages/curl:latest
        command: [sh, -c]
        args:
          - |
            curl -X POST $SLACK_WEBHOOK_URL \
              -H 'Content-type: application/json' \
              -d '{"channel": "{{inputs.parameters.canal}}", "text": "{{inputs.parameters.mensaje}}"}'
        env:
          - name: SLACK_WEBHOOK_URL
            valueFrom:
              secretKeyRef:
                name: slack-secret
                key: webhook-url

DAGs y Ejecución Paralela

Los DAGs permiten definir dependencias complejas entre tareas:

# Workflow con DAG para procesamiento paralelo
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: pipeline-ml-
spec:
  entrypoint: dag-entrenamiento
  templates:
    - name: dag-entrenamiento
      dag:
        tasks:
          # Preparar datos (sin dependencias, empieza primero)
          - name: preparar-datos
            template: preparacion

          # Tres experimentos en paralelo, esperando la preparación
          - name: experimento-lr
            template: entrenar-modelo
            dependencies: [preparar-datos]
            arguments:
              parameters:
                - name: algoritmo
                  value: "logistic-regression"

          - name: experimento-rf
            template: entrenar-modelo
            dependencies: [preparar-datos]
            arguments:
              parameters:
                - name: algoritmo
                  value: "random-forest"

          - name: experimento-xgb
            template: entrenar-modelo
            dependencies: [preparar-datos]
            arguments:
              parameters:
                - name: algoritmo
                  value: "xgboost"

          # Seleccionar el mejor modelo cuando los tres terminen
          - name: seleccionar-modelo
            template: evaluacion
            dependencies: [experimento-lr, experimento-rf, experimento-xgb]

          # Desplegar solo si la evaluación fue exitosa
          - name: desplegar-modelo
            template: despliegue
            dependencies: [seleccionar-modelo]

    - name: preparacion
      container:
        image: python:3.11-slim
        command: [python, -c]
        args: ["print('Preparando datos de entrenamiento...')"]

    - name: entrenar-modelo
      inputs:
        parameters:
          - name: algoritmo
      container:
        image: python:3.11-slim
        command: [python, -c]
        args: ["print(f'Entrenando modelo: {{inputs.parameters.algoritmo}}')"]
      resources:
        requests:
          memory: 2Gi
          cpu: "1"

    - name: evaluacion
      container:
        image: python:3.11-slim
        command: [python, -c]
        args: ["print('Evaluando y seleccionando el mejor modelo...')"]

    - name: despliegue
      container:
        image: bitnami/kubectl:latest
        command: [kubectl, apply, -f, /manifiestos/modelo.yaml]

Gestión de Artefactos

# Configurar S3 como almacén de artefactos
apiVersion: v1
kind: ConfigMap
metadata:
  name: artifact-repositories
  namespace: argo
data:
  default-v1: |
    archiveLogs: true
    s3:
      bucket: mi-bucket-artefactos
      endpoint: s3.amazonaws.com
      insecure: false
      accessKeySecret:
        name: s3-credentials
        key: accessKey
      secretKeySecret:
        name: s3-credentials
        key: secretKey
# Workflow que produce y consume artefactos
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: workflow-artefactos-
spec:
  entrypoint: pipeline-con-artefactos
  templates:
    - name: pipeline-con-artefactos
      steps:
        - - name: generar-reporte
            template: generador
        - - name: procesar-reporte
            template: procesador
            arguments:
              artifacts:
                - name: reporte-entrada
                  from: "{{steps.generar-reporte.outputs.artifacts.reporte}}"

    - name: generador
      container:
        image: python:3.11-slim
        command: [sh, -c]
        args:
          - |
            # Generar datos de ejemplo
            echo '{"total": 1500, "errores": 3, "fecha": "2024-01-15"}' > /tmp/reporte.json
            echo "Reporte generado en /tmp/reporte.json"
      outputs:
        artifacts:
          - name: reporte
            path: /tmp/reporte.json
            s3:
              key: reportes/{{workflow.name}}/reporte.json

    - name: procesador
      inputs:
        artifacts:
          - name: reporte-entrada
            path: /tmp/reporte-entrada.json
      container:
        image: python:3.11-slim
        command: [python, -c]
        args:
          - |
            import json
            with open('/tmp/reporte-entrada.json') as f:
                datos = json.load(f)
            print(f"Procesando reporte: {datos['total']} registros")

Cron Workflows

# Cron Workflow para ejecución programada
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: backup-diario
  namespace: default
spec:
  # Ejecutar todos los días a las 2:00 AM (UTC)
  schedule: "0 2 * * *"
  timezone: "Europe/Madrid"
  concurrencyPolicy: Forbid       # No ejecutar si ya hay una instancia corriendo
  startingDeadlineSeconds: 0
  successfulJobsHistoryLimit: 3   # Mantener los últimos 3 trabajos exitosos
  failedJobsHistoryLimit: 5       # Mantener los últimos 5 trabajos fallidos
  workflowSpec:
    entrypoint: backup-completo
    templates:
      - name: backup-completo
        steps:
          - - name: backup-base-datos
              template: pg-dump
          - - name: subir-a-s3
              template: upload-s3
          - - name: notificar-exito
              template: notificacion

      - name: pg-dump
        container:
          image: postgres:15
          command: [sh, -c]
          args:
            - pg_dump -h $DB_HOST -U $DB_USER $DB_NAME | gzip > /tmp/backup.sql.gz
          env:
            - name: DB_HOST
              valueFrom:
                secretKeyRef:
                  name: db-credentials
                  key: host
            - name: PGPASSWORD
              valueFrom:
                secretKeyRef:
                  name: db-credentials
                  key: password

      - name: upload-s3
        container:
          image: amazon/aws-cli:latest
          command: [aws, s3, cp, /tmp/backup.sql.gz]
          args: ["s3://mis-backups/$(date +%Y/%m/%d)/backup.sql.gz"]

      - name: notificacion
        container:
          image: curlimages/curl:latest
          command: [sh, -c]
          args: ["echo 'Backup completado exitosamente'"]

Patrones de CI/CD

# Enviar un workflow desde el pipeline de CI
argo submit -n argo --watch pipeline-deploy.yaml \
  --parameter "imagen=registry.empresa.com/mi-app:${CI_COMMIT_SHA}" \
  --parameter "entorno=staging"

# Esperar a que termine y verificar el resultado
STATUS=$(argo get -n argo @latest -o json | jq -r '.status.phase')
if [ "$STATUS" != "Succeeded" ]; then
    echo "El workflow falló con estado: $STATUS"
    exit 1
fi
# WorkflowTemplate para pipeline de CI/CD completo
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: pipeline-cicd
  namespace: argo
spec:
  templates:
    - name: pipeline-completo
      inputs:
        parameters:
          - name: repo-url
          - name: rama
          - name: entorno
      dag:
        tasks:
          - name: clonar-repo
            template: git-clone
            arguments:
              parameters:
                - name: url
                  value: "{{inputs.parameters.repo-url}}"
          - name: tests-unitarios
            template: run-tests
            dependencies: [clonar-repo]
          - name: construir-imagen
            template: docker-build
            dependencies: [tests-unitarios]
          - name: tests-integracion
            template: integration-tests
            dependencies: [construir-imagen]
          - name: desplegar
            template: kubectl-deploy
            dependencies: [tests-integracion]
            arguments:
              parameters:
                - name: entorno
                  value: "{{inputs.parameters.entorno}}"

Solución de Problemas

# Ver el estado de un workflow
argo get -n argo mi-workflow-abc123

# Ver los logs de un step específico
argo logs -n argo mi-workflow-abc123 -c main

# Listar todos los workflows con su estado
argo list -n argo

# Ver workflows fallidos
argo list -n argo --status Failed

# Reintentar un workflow fallido
argo retry -n argo mi-workflow-abc123

# Depurar un pod de workflow directamente
kubectl exec -it -n default \
  $(kubectl get pods -n default -l workflows.argoproj.io/workflow=mi-workflow -o name | head -1) \
  -- /bin/sh

# Ver los eventos del workflow
kubectl describe workflow mi-workflow-abc123 -n default

El workflow se queda en Pending:

# Verificar que hay nodos con recursos suficientes
kubectl describe workflow mi-workflow -n default | grep -A5 "Message"

# Verificar los recursos del namespace
kubectl describe resourcequota -n default

Conclusión

Argo Workflows proporciona una forma nativa y declarativa de orquestar jobs complejos en Kubernetes, con soporte para paralelismo, gestión de artefactos y ejecución programada. Los DAGs son especialmente potentes para pipelines de datos y machine learning donde las dependencias entre tareas son complejas. Integrado con Argo CD y el ecosistema Argo completo, se convierte en el motor de automatización definitivo para plataformas Kubernetes.