Instalación y Configuración de Apache Airflow

Apache Airflow es la plataforma de orquestación de flujos de trabajo más popular en el ecosistema de datos, que permite programar, monitorizar y gestionar pipelines de datos complejos mediante DAGs (Directed Acyclic Graphs) definidos en Python. Con soporte para múltiples ejecutores (Local, Celery, Kubernetes), más de 1.000 operadores e integraciones y un potente panel web de monitorización, Airflow es la herramienta estándar para ingeniería de datos en producción. Esta guía cubre la instalación y configuración de Airflow en Linux.

Requisitos Previos

  • Ubuntu 20.04+, Debian 11+ o CentOS 8+
  • Python 3.8-3.11
  • Docker y Docker Compose (para instalación containerizada)
  • Mínimo 4 GB de RAM (8 GB para producción con Celery)
  • PostgreSQL o MySQL para la base de datos de metadatos
  • Redis para el broker de mensajes (con ejecutor Celery)

Instalación con Docker Compose

La forma más rápida y recomendada para comenzar:

# Crear directorio de trabajo
mkdir -p ~/airflow/{dags,logs,plugins,config}

# Establecer el UID del usuario para los volumenes
export AIRFLOW_UID=$(id -u)

# Descargar el docker-compose oficial de Airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'

# Crear el archivo .env con el UID
echo "AIRFLOW_UID=$(id -u)" > .env

# Inicializar la base de datos de Airflow
docker compose up airflow-init

# Iniciar todos los servicios
docker compose up -d

# Ver el estado de los servicios
docker compose ps

Los servicios incluidos en el compose oficial:

  • airflow-webserver: Panel web en puerto 8080
  • airflow-scheduler: Programador de DAGs
  • airflow-worker: Trabajadores de tareas (con CeleryExecutor)
  • airflow-triggerer: Triggerer para tareas diferidas
  • postgres: Base de datos de metadatos
  • redis: Broker de mensajes para Celery

Acceder al panel web en http://TU-SERVIDOR:8080:

  • Usuario: airflow
  • Contraseña: airflow
# Cambiar la contraseña del administrador
docker compose exec airflow-webserver airflow users reset-password \
  --username airflow --password nueva-contrasena-segura

Instalación con pip en Ubuntu

Para instalaciones más personalizadas en sistemas bare-metal:

# Instalar dependencias del sistema
sudo apt update
sudo apt install -y python3 python3-pip python3-venv \
  libpq-dev python3-dev build-essential

# Instalar PostgreSQL (base de datos de metadatos)
sudo apt install -y postgresql postgresql-client

# Crear base de datos y usuario
sudo -u postgres psql << 'EOF'
CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'contrasena-airflow';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
ALTER DATABASE airflow OWNER TO airflow;
EOF

# Instalar Redis (broker para Celery)
sudo apt install -y redis-server
sudo systemctl enable --now redis-server

# Crear entorno virtual para Airflow
python3 -m venv /opt/airflow-venv
source /opt/airflow-venv/bin/activate

# Establecer la versión de Airflow y los constraints de compatibilidad
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# Instalar Airflow con dependencias para PostgreSQL y Celery
pip install "apache-airflow[postgres,celery,redis]==${AIRFLOW_VERSION}" \
  --constraint "${CONSTRAINT_URL}"

# Configurar variables de entorno
export AIRFLOW_HOME=/opt/airflow
export AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:contrasena-airflow@localhost/airflow
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
export AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:contrasena-airflow@localhost/airflow
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor

# Inicializar la base de datos
airflow db migrate

# Crear usuario administrador
airflow users create \
  --username admin \
  --password contrasena-admin \
  --firstname Admin \
  --lastname Admin \
  --role Admin \
  --email [email protected]

Creación de DAGs en Python

Los DAGs definen los flujos de trabajo en Airflow:

# ~/airflow/dags/mi_primer_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Argumentos por defecto para todas las tareas del DAG
argumentos_por_defecto = {
    'owner': 'equipo-datos',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Definición del DAG
with DAG(
    'pipeline_etl_diario',
    default_args=argumentos_por_defecto,
    description='Pipeline ETL que se ejecuta diariamente',
    schedule='@daily',  # También acepta cron: '0 6 * * *'
    start_date=datetime(2024, 1, 1),
    catchup=False,  # No ejecutar fechas pasadas al activar el DAG
    tags=['etl', 'produccion'],
) as dag:

    # Tarea 1: Extraer datos de la fuente
    extraer = BashOperator(
        task_id='extraer_datos',
        bash_command='python3 /opt/scripts/extraer.py --fecha {{ ds }}',
    )

    # Tarea 2: Transformar datos (Python operator)
    def transformar_datos(fecha, **context):
        """Transformar los datos extraídos."""
        print(f"Transformando datos del {fecha}")
        # Aquí iría la lógica de transformación
        return f"datos-transformados-{fecha}"

    transformar = PythonOperator(
        task_id='transformar_datos',
        python_callable=transformar_datos,
        op_kwargs={'fecha': '{{ ds }}'},
    )

    # Tarea 3: Cargar en destino
    cargar = BashOperator(
        task_id='cargar_en_destino',
        bash_command='python3 /opt/scripts/cargar.py --fecha {{ ds }}',
    )

    # Tarea 4: Notificación de éxito
    notificar = BashOperator(
        task_id='notificar_exito',
        bash_command='echo "Pipeline completado para {{ ds }}"',
    )

    # Definir el orden de ejecución (dependencias)
    extraer >> transformar >> cargar >> notificar

DAG más avanzado con ramificación y grupos de tareas:

# ~/airflow/dags/pipeline_avanzado.py
from airflow.operators.branch import BranchPythonOperator
from airflow.utils.task_group import TaskGroup

with DAG('pipeline_avanzado', ...):

    with TaskGroup("preparacion") as preparacion:
        verificar_fuente = BashOperator(
            task_id='verificar_fuente',
            bash_command='curl -s https://api.fuente.com/health',
        )
        descargar_datos = BashOperator(
            task_id='descargar_datos',
            bash_command='wget -q https://api.fuente.com/datos -O /tmp/datos.json',
        )
        verificar_fuente >> descargar_datos

    # Bifurcación según la calidad de datos
    def elegir_proceso(**context):
        # Lógica para decidir qué rama ejecutar
        if datos_validos():
            return 'proceso_normal'
        else:
            return 'proceso_reparacion'

    bifurcar = BranchPythonOperator(
        task_id='verificar_calidad',
        python_callable=elegir_proceso,
    )

    preparacion >> bifurcar

Configuración del Ejecutor Celery

Para escalar Airflow con múltiples workers:

# Ver la configuración actual
airflow config get-value core executor

# Iniciar el worker de Celery (en un servidor separado o el mismo)
source /opt/airflow-venv/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow celery worker &

# Iniciar el Flower para monitorización de Celery (opcional)
airflow celery flower &
# Disponible en http://TU-SERVIDOR:5555

# En producción, usar systemd para gestionar los procesos
sudo tee /etc/systemd/system/airflow-worker.service << 'EOF'
[Unit]
Description=Airflow Celery Worker
After=network.target

[Service]
User=airflow
Group=airflow
Type=simple
EnvironmentFile=/etc/airflow/airflow.env
ExecStart=/opt/airflow-venv/bin/airflow celery worker
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable --now airflow-worker

Conexiones y Variables

Gestionar credenciales y configuración centralizada:

# Agregar conexión a base de datos via CLI
airflow connections add postgres_produccion \
  --conn-type postgres \
  --conn-host db.tu-dominio.com \
  --conn-schema mi_base_datos \
  --conn-login usuario_bd \
  --conn-password contrasena_bd \
  --conn-port 5432

# Agregar conexión S3
airflow connections add s3_principal \
  --conn-type aws \
  --conn-extra '{"aws_access_key_id":"AKID...", "aws_secret_access_key":"secretkey"}'

# Listar todas las conexiones
airflow connections list

# Agregar variables de configuración
airflow variables set ENV_NAME produccion
airflow variables set MAX_WORKERS 10
airflow variables set API_BASE_URL https://api.tu-servicio.com

# Leer una variable en un DAG
from airflow.models import Variable
env = Variable.get("ENV_NAME")

Monitorización y Alertas

# Configurar email para alertas (en airflow.cfg o variables de entorno)
export AIRFLOW__SMTP__SMTP_HOST=smtp.tu-dominio.com
export AIRFLOW__SMTP__SMTP_PORT=587
export [email protected]
export AIRFLOW__SMTP__SMTP_PASSWORD=contrasena-smtp
export [email protected]

# Ver el estado de los DAGs desde CLI
airflow dags list
airflow dags list-runs -d pipeline_etl_diario

# Ver el estado de las tareas
airflow tasks list pipeline_etl_diario
airflow tasks states-for-dag-run pipeline_etl_diario 2024-01-01T00:00:00+00:00

# Verificar la salud del sistema
airflow jobs check

Configuración para Producción

# Variables de entorno críticas para producción
export AIRFLOW__CORE__FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
export AIRFLOW__WEBSERVER__SECRET_KEY=$(openssl rand -hex 32)
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=3
export AIRFLOW__CORE__PARALLELISM=32
export AIRFLOW__CELERY__WORKER_CONCURRENCY=8

# Proxy inverso con Nginx
sudo tee /etc/nginx/sites-available/airflow << 'EOF'
server {
    listen 443 ssl http2;
    server_name airflow.tu-dominio.com;

    ssl_certificate /etc/letsencrypt/live/airflow.tu-dominio.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/airflow.tu-dominio.com/privkey.pem;

    location / {
        proxy_pass http://127.0.0.1:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto https;
    }
}
EOF

Solución de Problemas

El scheduler no ejecuta los DAGs:

# Verificar que el scheduler está activo
airflow jobs check --job-type SchedulerJob
# Docker:
docker compose ps airflow-scheduler

# Ver los logs del scheduler
airflow scheduler --log-level DEBUG
# Docker:
docker compose logs airflow-scheduler --tail 50

# Verificar que el DAG está activo (no pausado)
airflow dags list | grep mi_dag
airflow dags unpause mi_dag

Tareas bloqueadas en estado "queued":

# Verificar que hay workers disponibles
airflow celery inspect active

# Comprobar la conexión con Redis
redis-cli ping

# Ver workers activos
airflow workers list

Error de base de datos:

# Verificar la conexión a la base de datos
airflow db check

# Si la BD tiene esquema obsoleto, actualizarla
airflow db migrate

Conclusión

Apache Airflow es la solución estándar para orquestar pipelines de datos complejos con dependencias, reintento automático y monitorización centralizada. Su arquitectura extensible mediante operadores y ejecutores permite escalar desde un servidor único con LocalExecutor hasta clusters distribuidos con CeleryExecutor o KubernetesExecutor, adaptándose a cualquier escala de operaciones de datos.