Salta el contingut

Apache Airflow

Per qué Airflow i no un cron?

Abans d'Airflow, la manera habitual d'executar pipelines periòdics era un crontab. Un cron és senzill i eficaç per a tasques simples, però falla en qualsevol escenari real de dades:

Problema Cron Airflow
El pipeline falla per error de xarxa El cron no ho detecta, no reintenta Reintenta automàticament N vegades amb backoff
Pipeline B depèn que acabi el pipeline A Cal gestionar-ho manualment (sleep, fitxers de lock) Dependències explícites entre tasques
Vull saber quant ha trigat cada pas No hi ha registre estructurat UI web amb durada, logs i historial per execució
El pipeline ha fallat en silenci durant 3 dies Ningú s'assabenta Alertes per email/Slack en fallida
Necessito re-executar el pipeline per al 15 de gener Impossible sense modificar el cron Backfill amb una sola comanda
Vull pausar el pipeline durant el cap de setmana Cal comentar el cron Toggle a la UI en un clic

Airflow és la plataforma d'orquestació de workflows de dades més estesa al sector. El 2025, pràcticament tota oferta de feina de Data Engineer menciona Airflow o un equivalent (Prefect, Dagster). Els pipelines es defineixen com a codi Python, s'executen de forma programada i es monitoritzen a través d'una interfície web.


Arquitectura d'Airflow

flowchart TD
    subgraph Airflow
        SCH[Scheduler\nDecideix quan executar cada tasca]
        WEB[Webserver\nUI de monitoratge]
        META[(Metadatabase\nPostgreSQL)]
        WORK[Worker\nExecuta les tasques]
    end
    SCH -->|llegeix DAGs| DAGS[Carpeta /dags]
    SCH -->|escriu estat| META
    WEB -->|llegeix estat| META
    SCH -->|envia tasques| WORK
    WORK -->|escriu logs| LOGS[Carpeta /logs]

Components principals:

  • Scheduler: el cor d'Airflow. Analitza les definicions de DAG, determina quines tasques estan llestes per executar-se (totes les dependències completes) i les encua per als workers. S'executa de forma contínua en segon pla.
  • Webserver: la interfície web (per defecte al port 8080). Permet veure l'estat de cada DAG run, les tasques individuals, els logs, el gràfic de dependències i disparar execucions manuals.
  • Metadatabase: base de dades (PostgreSQL en producció, SQLite per a desenvolupament local) on l'Scheduler guarda l'estat de cada DAG run i cada tasca. No s'hi emmagatzemen les dades del pipeline, només l'estat d'execució.
  • Workers: els processos que executen el codi real de cada tasca. Depenent de l'executor configurat, poden ser processos locals o workers distribuïts (Celery, Kubernetes).

Conceptes fonamentals

DAG: Directed Acyclic Graph

Un DAG és un graf dirigit sense cicles. A Airflow, cada node del graf és una tasca i les arestes representen dependències d'execució. "Dirigit" vol dir que les dependències tenen una direcció (A ha d'acabar abans que B). "Acíclic" vol dir que no hi pot haver bucles (A no pot dependre de si mateix ni d'una cadena circular).

# Exemple mínim d'un DAG a Airflow 2.8+
from airflow.sdk import DAG, task
from datetime import datetime, timedelta

with DAG(
    dag_id="exemple_basic",
    description="DAG d'exemple per al curs M5074",
    schedule="0 2 * * *",        # cada dia a les 02:00
    start_date=datetime(2026, 1, 1),
    catchup=False,                # no re-executar dates passades
    tags=["m5074", "exemple"],
) as dag:
    pass  # aquí s'afegeixen les tasques

Paràmetres importants del DAG:

  • schedule: expressió cron ("0 2 * * *" = les 2:00 AM cada dia) o predefinits ("@daily", "@hourly", None per a execució manual).
  • start_date: primera data d'execució possible. Airflow calcularà les execucions des d'aquesta data.
  • catchup: si és True, Airflow re-executarà totes les dates passades entre start_date i avui en iniciar el DAG. En producció, quasi sempre False.
  • retries: nombre de reintents automàtics si una tasca falla.
  • retry_delay: temps d'espera entre reintents (timedelta(minutes=5)).
  • depends_on_past: si és True, una execució no comença fins que la del dia anterior ha acabat correctament.

Task: unitat de treball

Cada tasca és una unitat de treball atòmica. Airflow proporciona diversos tipus de tasques predefinits (Operators):

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.sensors.filesystem import FileSensor

# PythonOperator: executa una funció Python
def processa_dades():
    import pandas as pd
    df = pd.read_csv("/data/vendes.csv")
    print(f"Registres processats: {len(df)}")

tasca_python = PythonOperator(
    task_id="processa_dades",
    python_callable=processa_dades,
    dag=dag,
)

# BashOperator: executa una comanda shell
tasca_bash = BashOperator(
    task_id="comprova_espai_disc",
    bash_command="df -h /data && echo 'Espai comprovat'",
    dag=dag,
)

# SQLExecuteQueryOperator: executa SQL sobre una connexió configurada
tasca_sql = SQLExecuteQueryOperator(
    task_id="inserta_resum_diari",
    conn_id="postgres_dw",
    sql="INSERT INTO resum_diari SELECT CURRENT_DATE, COUNT(*) FROM vendes;",
    dag=dag,
)

Operator vs Sensor

Un Operator executa una acció (processa, insereix, envia). Un Sensor espera que una condició es compleixi abans de continuar el pipeline:

from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor

# FileSensor: espera que existeixi un fitxer
espera_fitxer = FileSensor(
    task_id="espera_csv_vendes",
    filepath="/data/vendes_{{ ds }}.csv",   # ds = data d'execució (YYYY-MM-DD)
    poke_interval=60,       # comprova cada 60 segons
    timeout=3600,           # falla si no apareix en 1 hora
    mode="reschedule",      # allibera el worker mentre espera
    dag=dag,
)

# HttpSensor: espera que una API respongui amb èxit
espera_api = HttpSensor(
    task_id="espera_api_disponible",
    http_conn_id="api_meteo",
    endpoint="/health",
    poke_interval=30,
    timeout=300,
    dag=dag,
)

La diferència clau entre mode="poke" i mode="reschedule": en mode poke, el worker queda bloquejat mentre espera (consumeix recursos); en mode reschedule, allibera el worker i el scheduler el reprèn periòdicament.

Executors

L'executor determina com Airflow distribueix les tasques als workers:

  • SequentialExecutor: executa una tasca cada vegada, de forma seqüencial. Només per a provar localment.
  • LocalExecutor: executa tasques en paral·lel com a processos locals. Adequat per a instal·lacions single-node.
  • CeleryExecutor: distribueix tasques a workers externs a través d'una cua de missatges (Redis o RabbitMQ). Per a clústers multi-node en producció.
  • KubernetesExecutor: crea un pod Kubernetes per a cada tasca. Excel·lent per a aïllament i escalat elàstic.

XCom: comunicació entre tasques

XCom (cross-communication) permet que una tasca passi un valor petit a una altra:

from airflow.sdk import task

@task
def extreu_dades():
    registres = 42_350
    return registres   # Airflow guarda aquest valor a la metadatabase

@task
def valida_volum(n_registres: int):
    if n_registres < 1000:
        raise ValueError(f"Volum insuficient: {n_registres} registres")
    print(f"Volum correcte: {n_registres:,} registres")

# La connexió es fa automàticament amb el TaskFlow API
n = extreu_dades()
valida_volum(n)

Limitació important: XCom emmagatzema els valors a la metadatabase. No és adequat per a DataFrames ni fitxers grans. Per a passar dades grans entre tasques, usar un sistema extern (S3, base de dades) i passar-ne la referència (un path o un ID) via XCom.

Variables i Connections

Variables: parells clau-valor globals configurables des de la UI o via CLI, sense necessitat de modificar el codi del DAG:

from airflow.models import Variable

# Llegir una variable configurada a la UI d'Airflow
bucket_s3 = Variable.get("bucket_s3_dades")
entorn = Variable.get("entorn", default_var="dev")

Connections: credencials de connexió a sistemes externs (PostgreSQL, S3, APIs) gestionades de forma centralitzada i encriptada:

from airflow.hooks.base import BaseHook

# La connexió "postgres_dw" es configura a la UI, no al codi
conn = BaseHook.get_connection("postgres_dw")
print(f"Host: {conn.host}, Port: {conn.port}, DB: {conn.schema}")

DAG complet: pipeline de vendes

A continuació, un DAG complet que implementa un pipeline real de quatre passos: detecció del fitxer, validació, inserció a PostgreSQL i notificació:

from airflow.sdk import DAG, task
from airflow.sensors.filesystem import FileSensor
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.email import EmailOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import logging

log = logging.getLogger(__name__)

RUTA_CSV = "/opt/airflow/dades/vendes_{{ ds }}.csv"
CONN_PG = "postgres_dw"

default_args = {
    "owner": "equip-dades",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["alertes@empresa.cat"],
}

with DAG(
    dag_id="pipeline_vendes_diari",
    description="Carrega diària de vendes: CSV -> validació -> PostgreSQL",
    schedule="30 6 * * 1-5",    # feiners a les 6:30 AM
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=default_args,
    tags=["m5074", "vendes", "produccio"],
) as dag:

    # Pas 1: esperar que el fitxer CSV estigui disponible
    espera_csv = FileSensor(
        task_id="espera_csv",
        filepath=RUTA_CSV,
        poke_interval=120,       # comprova cada 2 minuts
        timeout=7200,            # falla si no apareix en 2 hores
        mode="reschedule",
    )

    # Pas 2: validar el CSV amb pandas
    @task(task_id="valida_csv")
    def valida_csv(data_interval_start=None):
        import pandas as pd
        from pathlib import Path

        ruta = f"/opt/airflow/dades/vendes_{data_interval_start.date()}.csv"
        if not Path(ruta).exists():
            raise FileNotFoundError(f"Fitxer no trobat: {ruta}")

        df = pd.read_csv(ruta)

        # Validacions bàsiques
        camps_obligatoris = ["id_venda", "data", "id_client", "import"]
        for camp in camps_obligatoris:
            if camp not in df.columns:
                raise ValueError(f"Camp obligatori absent: {camp}")

        n_nuls = df[camps_obligatoris].isnull().sum().sum()
        if n_nuls > 0:
            raise ValueError(f"S'han trobat {n_nuls} valors nuls en camps obligatoris")

        invalids = df[df["import"] <= 0]
        if not invalids.empty:
            log.warning("Registres amb import <= 0: %d (s'ignoraran)", len(invalids))

        n_valids = len(df[df["import"] > 0])
        log.info("Validació completada: %d registres valids", n_valids)
        return n_valids   # passat via XCom al pas següent

    # Pas 3: inserir a PostgreSQL (upsert idempotent)
    inserta_pg = SQLExecuteQueryOperator(
        task_id="inserta_vendes_pg",
        conn_id=CONN_PG,
        sql="""
            INSERT INTO dw.vendes (id_venda, data, id_client, import, carregat_el)
            SELECT
                id_venda::INTEGER,
                data::DATE,
                id_client::INTEGER,
                import::DECIMAL(10,2),
                NOW()
            FROM staging.vendes_tmp
            ON CONFLICT (id_venda)
            DO UPDATE SET
                import = EXCLUDED.import,
                carregat_el = EXCLUDED.carregat_el;
        """,
    )

    # Pas 4: notificació per email
    notifica = EmailOperator(
        task_id="envia_confirmacio",
        to=["analisi@empresa.cat"],
        subject="Pipeline vendes completat - {{ ds }}",
        html_content="""
            <p>El pipeline de vendes del dia <strong>{{ ds }}</strong>
            ha finalitzat correctament.</p>
            <p>Registres processats: {{ ti.xcom_pull(task_ids='valida_csv') }}</p>
        """,
    )

    # Definir les dependències
    espera_csv >> valida_csv() >> inserta_pg >> notifica

Backfill: re-executar per dates passades

Si el pipeline ha fallat durant una setmana i cal recuperar les dades d'aquells dies:

# Re-executar el DAG per a un rang de dates
airflow dags backfill \
    --dag-id pipeline_vendes_diari \
    --start-date 2026-06-01 \
    --end-date 2026-06-07

# Veure l'estat de les execucions d'un DAG
airflow dags list-runs --dag-id pipeline_vendes_diari --limit 20

El backfill respecta les dependències entre tasques i els paràmetres catchup i depends_on_past. Si una execució anterior falla durant el backfill, les execucions posteriors que depenguin d'ella no s'iniciaran (si depends_on_past=True).


TaskGroups: organitzar tasques visuals

Quan un DAG té moltes tasques, els TaskGroups permeten agrupar-les visualment a la UI:

from airflow.utils.task_group import TaskGroup

with DAG(dag_id="pipeline_complex", ...) as dag:

    with TaskGroup("extraccio", tooltip="Extracció de dades d'origen") as grp_extraccio:
        extreu_vendes = PythonOperator(task_id="extreu_vendes", ...)
        extreu_clients = PythonOperator(task_id="extreu_clients", ...)
        extreu_productes = PythonOperator(task_id="extreu_productes", ...)

    with TaskGroup("transformacio", tooltip="Neteja i transformació") as grp_transf:
        neteja = PythonOperator(task_id="neteja", ...)
        enriquiment = PythonOperator(task_id="enriquiment", ...)

    with TaskGroup("carrega", tooltip="Càrrega al Data Warehouse") as grp_carrega:
        carrega_fets = SQLExecuteQueryOperator(task_id="carrega_fets", ...)
        carrega_dims = SQLExecuteQueryOperator(task_id="carrega_dims", ...)

    grp_extraccio >> grp_transf >> grp_carrega

Instal·lació amb Docker Compose

La forma més ràpida de tenir Airflow funcionant localment és amb el Docker Compose oficial:

# Descarregar el fitxer docker-compose oficial d'Airflow 2.9
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'

# Crear les carpetes necessàries
mkdir -p ./dags ./logs ./plugins ./config

# Inicialitzar la base de dades i crear l'usuari admin
docker compose up airflow-init

# Arrancar tots els serveis en segon pla
docker compose up -d

Airflow queda disponible a http://localhost:8080 (usuari: airflow, contrasenya: airflow).

Per afegir dependències Python addicionals als workers, crear un fitxer requirements.txt a la carpeta del projecte i afegir-lo a l'entorn:

# Al docker-compose.yaml, afegir a la secció x-airflow-common -> environment:
_PIP_ADDITIONAL_REQUIREMENTS: "pandas==2.2.0 pandera==0.18.0 great-expectations==0.18.0"

AC5074/05/02 — Miniactivitat

Escriu un DAG d'Airflow (fitxer Python complet, executable) que faci el següent:

  1. Usant un PythonOperator, extregui les dades meteorològiques actuals de Barcelona de l'API oberta d'Open-Meteo (https://api.open-meteo.com/v1/forecast?latitude=41.39&longitude=2.15&current=temperature_2m,wind_speed_10m). Guarda la resposta en un fitxer CSV a /opt/airflow/dades/meteo_{{ ds }}.csv.

  2. Usant un segon PythonOperator, llegeix el CSV i valida que la temperatura és un número entre -10 i 50 graus i que la velocitat del vent és positiva. Si la validació falla, la tasca ha de fallar amb un missatge d'error descriptiu.

  3. Usant un SQLExecuteQueryOperator, insereix les dades a una taula PostgreSQL meteo_barcelona (data DATE, temperatura DECIMAL, vent DECIMAL, inserit_el TIMESTAMP). La inserció ha de ser idempotent (upsert per data).

El DAG ha de tenir: schedule="@daily", catchup=False, retries=2, retry_delay=timedelta(minutes=3).

Lliura el fitxer dag_meteo_nom_cognom.py amb comentaris que expliquin les decisions de disseny.


Mòdul M5074 Sistemes de Big Data | Institut Sa Palomera (Blanes) | Curs CEIABD 2026-2027