Apache Airflow Installation and Configuration

Apache Airflow is the leading open-source workflow orchestration platform for scheduling, monitoring, and managing data pipelines defined as Python code using Directed Acyclic Graphs (DAGs). Used extensively in data engineering and MLOps, Airflow supports multiple executors from single-machine Local to distributed Celery clusters, making it suitable for everything from simple cron replacement to complex multi-step ETL pipelines on Linux.

Prerequisites

  • Ubuntu 20.04+, Debian 11+, or CentOS/Rocky 8+
  • Python 3.8+ and pip
  • PostgreSQL (recommended) or MySQL for metadata database
  • Root or sudo access
  • Minimum 4 GB RAM for production use

Installing Airflow

Set up a Python virtual environment:

# Install system dependencies
sudo apt update && sudo apt install -y \
  python3-pip python3-venv \
  libpq-dev python3-dev \
  gcc g++ libssl-dev

# Create a dedicated airflow user
sudo useradd -m -s /bin/bash airflow
sudo su - airflow

# Create virtual environment
python3 -m venv ~/airflow-venv
source ~/airflow-venv/bin/activate

Install Airflow with extras:

# Set Airflow version and Python version
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# Install with PostgreSQL and common providers
pip install "apache-airflow[postgres,celery,redis,amazon,google,slack]==${AIRFLOW_VERSION}" \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# Verify installation
airflow version

Set up the Airflow home directory:

# Set AIRFLOW_HOME (add to ~/.bashrc)
export AIRFLOW_HOME=~/airflow
echo 'export AIRFLOW_HOME=~/airflow' >> ~/.bashrc
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins

Initial Configuration

# Initialize the database (creates airflow.cfg)
airflow db init

# Edit the configuration
nano ~/airflow/airflow.cfg

Key airflow.cfg settings:

[core]
# Points to DAG directory
dags_folder = /home/airflow/airflow/dags

# Use PostgreSQL instead of SQLite
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost/airflow

# Number of parallel tasks
parallelism = 32
max_active_tasks_per_dag = 16
max_active_runs_per_dag = 16

# Executor type (LocalExecutor for single-node, CeleryExecutor for distributed)
executor = LocalExecutor

[webserver]
base_url = https://airflow.example.com
web_server_port = 8080
secret_key = your-secret-key-here  # Generate with: python -c "import os; print(os.urandom(24).hex())"

[scheduler]
dag_dir_list_interval = 30    # Rescan DAGs every 30 seconds

[logging]
base_log_folder = /home/airflow/airflow/logs
logging_level = INFO

Set up PostgreSQL:

# Exit airflow user, back to root/sudo
exit

sudo apt install -y postgresql
sudo systemctl enable --now postgresql

sudo -u postgres psql <<'EOF'
CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'yourpassword';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
\q
EOF

# Re-initialize with PostgreSQL
sudo su - airflow
source ~/airflow-venv/bin/activate
export AIRFLOW_HOME=~/airflow
airflow db migrate

Create the admin user:

airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected] \
  --password yourpassword

Creating Your First DAG

Create a DAG file in the dags/ directory:

cat > ~/airflow/dags/example_pipeline.py <<'EOF'
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Default arguments applied to all tasks
default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["[email protected]"],
}

# Define the DAG
with DAG(
    dag_id="example_pipeline",
    default_args=default_args,
    description="A simple example pipeline",
    schedule="0 6 * * *",   # Daily at 6 AM (cron syntax)
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "data"],
) as dag:

    # Task 1: Extract data
    extract = BashOperator(
        task_id="extract_data",
        bash_command="echo 'Extracting data...' && sleep 2",
    )

    # Task 2: Transform (Python function)
    def transform_data(**context):
        print("Transforming data...")
        # Access execution date
        execution_date = context["ds"]
        print(f"Processing date: {execution_date}")
        return {"records_processed": 1000}

    transform = PythonOperator(
        task_id="transform_data",
        python_callable=transform_data,
        provide_context=True,
    )

    # Task 3: Load
    load = BashOperator(
        task_id="load_data",
        bash_command="echo 'Loading {{ ti.xcom_pull(task_ids=\"transform_data\") }}'",
    )

    # Set task dependencies
    extract >> transform >> load
EOF

Trigger the DAG from CLI:

# Start the webserver (in a separate terminal)
airflow webserver --port 8080 &

# Start the scheduler (in a separate terminal)
airflow scheduler &

# List DAGs
airflow dags list

# Trigger a DAG run
airflow dags trigger example_pipeline

# Check task status
airflow tasks states-for-dag-run example_pipeline $(date +%Y-%m-%dT%H:%M:%S+00:00)

Executor Configuration

LocalExecutor (single node, parallel execution):

# airflow.cfg
[core]
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:pass@localhost/airflow

CeleryExecutor (distributed, multi-worker):

# Install Celery extras
pip install apache-airflow[celery,redis]
# airflow.cfg
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://localhost:6379/0
result_backend = db+postgresql://airflow:pass@localhost/airflow
worker_concurrency = 16
# Start a Celery worker
airflow celery worker --concurrency 8 --loglevel info &

# Start the Flower monitoring UI
airflow celery flower &

Connections and Variables

Store external service credentials securely in Airflow:

Add connections via CLI:

# Add a PostgreSQL connection
airflow connections add "my_postgres" \
  --conn-type postgres \
  --conn-host "db.example.com" \
  --conn-schema "mydb" \
  --conn-login "dbuser" \
  --conn-password "dbpassword" \
  --conn-port 5432

# Add an S3/HTTP connection
airflow connections add "my_s3" \
  --conn-type aws \
  --conn-extra '{"aws_access_key_id": "KEY", "aws_secret_access_key": "SECRET", "region_name": "us-east-1"}'

Use connections in DAGs:

from airflow.hooks.postgres_hook import PostgresHook

def extract_from_db(**context):
    hook = PostgresHook(postgres_conn_id="my_postgres")
    results = hook.get_records("SELECT * FROM events WHERE date = %s", [context["ds"]])
    return results

Variables for configuration values:

# Set variables
airflow variables set MAX_RECORDS 10000
airflow variables set ENVIRONMENT production

# Use in DAGs
from airflow.models import Variable
max_records = int(Variable.get("MAX_RECORDS"))

Monitoring with the Web UI

Start the webserver as a systemd service:

# Exit to root user, create services
sudo tee /etc/systemd/system/airflow-webserver.service <<'EOF'
[Unit]
Description=Airflow Webserver
After=network.target postgresql.service

[Service]
User=airflow
Group=airflow
Environment="AIRFLOW_HOME=/home/airflow/airflow"
Environment="PATH=/home/airflow/airflow-venv/bin:$PATH"
ExecStart=/home/airflow/airflow-venv/bin/airflow webserver --port 8080
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

sudo tee /etc/systemd/system/airflow-scheduler.service <<'EOF'
[Unit]
Description=Airflow Scheduler
After=network.target postgresql.service

[Service]
User=airflow
Group=airflow
Environment="AIRFLOW_HOME=/home/airflow/airflow"
Environment="PATH=/home/airflow/airflow-venv/bin:$PATH"
ExecStart=/home/airflow/airflow-venv/bin/airflow scheduler
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable --now airflow-webserver airflow-scheduler

Production Setup with Docker Compose

# Download the official Docker Compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.3/docker-compose.yaml'

# Create required directories
mkdir -p ./dags ./logs ./plugins ./config

# Set the Airflow user UID
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize the database
docker compose up airflow-init

# Start all services
docker compose up -d

# Create an admin user
docker compose run airflow-worker airflow users create \
  --username admin --password admin \
  --firstname Admin --lastname User \
  --role Admin --email [email protected]

Troubleshooting

DAGs not appearing in the UI:

# Check for Python syntax errors in DAG files
python3 ~/airflow/dags/my_dag.py

# Check the scheduler is scanning the DAG directory
airflow dags list
airflow dags show my_dag

# Review scheduler logs
sudo journalctl -u airflow-scheduler -f

Tasks stuck in "queued" state:

# Check if a worker is running
airflow celery status   # for CeleryExecutor
ps aux | grep airflow

# Verify broker connectivity
airflow celery status

# Clear stuck tasks
airflow tasks clear my_dag -t my_task --start-date 2024-01-01

Database connection errors:

# Test PostgreSQL connectivity
sudo -u airflow psql -h localhost -U airflow -d airflow -c "SELECT 1;"

# Check Airflow DB version is current
airflow db check-migrations
airflow db migrate

Conclusion

Apache Airflow provides a production-grade workflow orchestration platform with Python-based DAG definitions, a comprehensive web UI for monitoring, and flexible executor options that scale from a single-server LocalExecutor setup to a distributed Celery cluster for high-throughput data pipelines. By combining DAG version control, connection management, and retry logic, Airflow brings reliability and observability to complex multi-step data workflows that would otherwise require custom cron job management and manual error handling.