Instalación y Configuración de Apache Airflow
Apache Airflow es la plataforma de orquestación de flujos de trabajo más popular en el ecosistema de datos, que permite programar, monitorizar y gestionar pipelines de datos complejos mediante DAGs (Directed Acyclic Graphs) definidos en Python. Con soporte para múltiples ejecutores (Local, Celery, Kubernetes), más de 1.000 operadores e integraciones y un potente panel web de monitorización, Airflow es la herramienta estándar para ingeniería de datos en producción. Esta guía cubre la instalación y configuración de Airflow en Linux.
Requisitos Previos
- Ubuntu 20.04+, Debian 11+ o CentOS 8+
- Python 3.8-3.11
- Docker y Docker Compose (para instalación containerizada)
- Mínimo 4 GB de RAM (8 GB para producción con Celery)
- PostgreSQL o MySQL para la base de datos de metadatos
- Redis para el broker de mensajes (con ejecutor Celery)
Instalación con Docker Compose
La forma más rápida y recomendada para comenzar:
# Crear directorio de trabajo
mkdir -p ~/airflow/{dags,logs,plugins,config}
# Establecer el UID del usuario para los volumenes
export AIRFLOW_UID=$(id -u)
# Descargar el docker-compose oficial de Airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# Crear el archivo .env con el UID
echo "AIRFLOW_UID=$(id -u)" > .env
# Inicializar la base de datos de Airflow
docker compose up airflow-init
# Iniciar todos los servicios
docker compose up -d
# Ver el estado de los servicios
docker compose ps
Los servicios incluidos en el compose oficial:
airflow-webserver: Panel web en puerto 8080airflow-scheduler: Programador de DAGsairflow-worker: Trabajadores de tareas (con CeleryExecutor)airflow-triggerer: Triggerer para tareas diferidaspostgres: Base de datos de metadatosredis: Broker de mensajes para Celery
Acceder al panel web en http://TU-SERVIDOR:8080:
- Usuario:
airflow - Contraseña:
airflow
# Cambiar la contraseña del administrador
docker compose exec airflow-webserver airflow users reset-password \
--username airflow --password nueva-contrasena-segura
Instalación con pip en Ubuntu
Para instalaciones más personalizadas en sistemas bare-metal:
# Instalar dependencias del sistema
sudo apt update
sudo apt install -y python3 python3-pip python3-venv \
libpq-dev python3-dev build-essential
# Instalar PostgreSQL (base de datos de metadatos)
sudo apt install -y postgresql postgresql-client
# Crear base de datos y usuario
sudo -u postgres psql << 'EOF'
CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'contrasena-airflow';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
ALTER DATABASE airflow OWNER TO airflow;
EOF
# Instalar Redis (broker para Celery)
sudo apt install -y redis-server
sudo systemctl enable --now redis-server
# Crear entorno virtual para Airflow
python3 -m venv /opt/airflow-venv
source /opt/airflow-venv/bin/activate
# Establecer la versión de Airflow y los constraints de compatibilidad
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# Instalar Airflow con dependencias para PostgreSQL y Celery
pip install "apache-airflow[postgres,celery,redis]==${AIRFLOW_VERSION}" \
--constraint "${CONSTRAINT_URL}"
# Configurar variables de entorno
export AIRFLOW_HOME=/opt/airflow
export AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:contrasena-airflow@localhost/airflow
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
export AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:contrasena-airflow@localhost/airflow
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
# Inicializar la base de datos
airflow db migrate
# Crear usuario administrador
airflow users create \
--username admin \
--password contrasena-admin \
--firstname Admin \
--lastname Admin \
--role Admin \
--email [email protected]
Creación de DAGs en Python
Los DAGs definen los flujos de trabajo en Airflow:
# ~/airflow/dags/mi_primer_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Argumentos por defecto para todas las tareas del DAG
argumentos_por_defecto = {
'owner': 'equipo-datos',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Definición del DAG
with DAG(
'pipeline_etl_diario',
default_args=argumentos_por_defecto,
description='Pipeline ETL que se ejecuta diariamente',
schedule='@daily', # También acepta cron: '0 6 * * *'
start_date=datetime(2024, 1, 1),
catchup=False, # No ejecutar fechas pasadas al activar el DAG
tags=['etl', 'produccion'],
) as dag:
# Tarea 1: Extraer datos de la fuente
extraer = BashOperator(
task_id='extraer_datos',
bash_command='python3 /opt/scripts/extraer.py --fecha {{ ds }}',
)
# Tarea 2: Transformar datos (Python operator)
def transformar_datos(fecha, **context):
"""Transformar los datos extraídos."""
print(f"Transformando datos del {fecha}")
# Aquí iría la lógica de transformación
return f"datos-transformados-{fecha}"
transformar = PythonOperator(
task_id='transformar_datos',
python_callable=transformar_datos,
op_kwargs={'fecha': '{{ ds }}'},
)
# Tarea 3: Cargar en destino
cargar = BashOperator(
task_id='cargar_en_destino',
bash_command='python3 /opt/scripts/cargar.py --fecha {{ ds }}',
)
# Tarea 4: Notificación de éxito
notificar = BashOperator(
task_id='notificar_exito',
bash_command='echo "Pipeline completado para {{ ds }}"',
)
# Definir el orden de ejecución (dependencias)
extraer >> transformar >> cargar >> notificar
DAG más avanzado con ramificación y grupos de tareas:
# ~/airflow/dags/pipeline_avanzado.py
from airflow.operators.branch import BranchPythonOperator
from airflow.utils.task_group import TaskGroup
with DAG('pipeline_avanzado', ...):
with TaskGroup("preparacion") as preparacion:
verificar_fuente = BashOperator(
task_id='verificar_fuente',
bash_command='curl -s https://api.fuente.com/health',
)
descargar_datos = BashOperator(
task_id='descargar_datos',
bash_command='wget -q https://api.fuente.com/datos -O /tmp/datos.json',
)
verificar_fuente >> descargar_datos
# Bifurcación según la calidad de datos
def elegir_proceso(**context):
# Lógica para decidir qué rama ejecutar
if datos_validos():
return 'proceso_normal'
else:
return 'proceso_reparacion'
bifurcar = BranchPythonOperator(
task_id='verificar_calidad',
python_callable=elegir_proceso,
)
preparacion >> bifurcar
Configuración del Ejecutor Celery
Para escalar Airflow con múltiples workers:
# Ver la configuración actual
airflow config get-value core executor
# Iniciar el worker de Celery (en un servidor separado o el mismo)
source /opt/airflow-venv/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow celery worker &
# Iniciar el Flower para monitorización de Celery (opcional)
airflow celery flower &
# Disponible en http://TU-SERVIDOR:5555
# En producción, usar systemd para gestionar los procesos
sudo tee /etc/systemd/system/airflow-worker.service << 'EOF'
[Unit]
Description=Airflow Celery Worker
After=network.target
[Service]
User=airflow
Group=airflow
Type=simple
EnvironmentFile=/etc/airflow/airflow.env
ExecStart=/opt/airflow-venv/bin/airflow celery worker
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl enable --now airflow-worker
Conexiones y Variables
Gestionar credenciales y configuración centralizada:
# Agregar conexión a base de datos via CLI
airflow connections add postgres_produccion \
--conn-type postgres \
--conn-host db.tu-dominio.com \
--conn-schema mi_base_datos \
--conn-login usuario_bd \
--conn-password contrasena_bd \
--conn-port 5432
# Agregar conexión S3
airflow connections add s3_principal \
--conn-type aws \
--conn-extra '{"aws_access_key_id":"AKID...", "aws_secret_access_key":"secretkey"}'
# Listar todas las conexiones
airflow connections list
# Agregar variables de configuración
airflow variables set ENV_NAME produccion
airflow variables set MAX_WORKERS 10
airflow variables set API_BASE_URL https://api.tu-servicio.com
# Leer una variable en un DAG
from airflow.models import Variable
env = Variable.get("ENV_NAME")
Monitorización y Alertas
# Configurar email para alertas (en airflow.cfg o variables de entorno)
export AIRFLOW__SMTP__SMTP_HOST=smtp.tu-dominio.com
export AIRFLOW__SMTP__SMTP_PORT=587
export [email protected]
export AIRFLOW__SMTP__SMTP_PASSWORD=contrasena-smtp
export [email protected]
# Ver el estado de los DAGs desde CLI
airflow dags list
airflow dags list-runs -d pipeline_etl_diario
# Ver el estado de las tareas
airflow tasks list pipeline_etl_diario
airflow tasks states-for-dag-run pipeline_etl_diario 2024-01-01T00:00:00+00:00
# Verificar la salud del sistema
airflow jobs check
Configuración para Producción
# Variables de entorno críticas para producción
export AIRFLOW__CORE__FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
export AIRFLOW__WEBSERVER__SECRET_KEY=$(openssl rand -hex 32)
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=3
export AIRFLOW__CORE__PARALLELISM=32
export AIRFLOW__CELERY__WORKER_CONCURRENCY=8
# Proxy inverso con Nginx
sudo tee /etc/nginx/sites-available/airflow << 'EOF'
server {
listen 443 ssl http2;
server_name airflow.tu-dominio.com;
ssl_certificate /etc/letsencrypt/live/airflow.tu-dominio.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/airflow.tu-dominio.com/privkey.pem;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
}
}
EOF
Solución de Problemas
El scheduler no ejecuta los DAGs:
# Verificar que el scheduler está activo
airflow jobs check --job-type SchedulerJob
# Docker:
docker compose ps airflow-scheduler
# Ver los logs del scheduler
airflow scheduler --log-level DEBUG
# Docker:
docker compose logs airflow-scheduler --tail 50
# Verificar que el DAG está activo (no pausado)
airflow dags list | grep mi_dag
airflow dags unpause mi_dag
Tareas bloqueadas en estado "queued":
# Verificar que hay workers disponibles
airflow celery inspect active
# Comprobar la conexión con Redis
redis-cli ping
# Ver workers activos
airflow workers list
Error de base de datos:
# Verificar la conexión a la base de datos
airflow db check
# Si la BD tiene esquema obsoleto, actualizarla
airflow db migrate
Conclusión
Apache Airflow es la solución estándar para orquestar pipelines de datos complejos con dependencias, reintento automático y monitorización centralizada. Su arquitectura extensible mediante operadores y ejecutores permite escalar desde un servidor único con LocalExecutor hasta clusters distribuidos con CeleryExecutor o KubernetesExecutor, adaptándose a cualquier escala de operaciones de datos.


