Apache Airflow
Per qué Airflow i no un cron?
Abans d'Airflow, la manera habitual d'executar pipelines periòdics era un crontab. Un cron és senzill i eficaç per a tasques simples, però falla en qualsevol escenari real de dades:
| Problema | Cron | Airflow |
|---|---|---|
| El pipeline falla per error de xarxa | El cron no ho detecta, no reintenta | Reintenta automàticament N vegades amb backoff |
| Pipeline B depèn que acabi el pipeline A | Cal gestionar-ho manualment (sleep, fitxers de lock) | Dependències explícites entre tasques |
| Vull saber quant ha trigat cada pas | No hi ha registre estructurat | UI web amb durada, logs i historial per execució |
| El pipeline ha fallat en silenci durant 3 dies | Ningú s'assabenta | Alertes per email/Slack en fallida |
| Necessito re-executar el pipeline per al 15 de gener | Impossible sense modificar el cron | Backfill amb una sola comanda |
| Vull pausar el pipeline durant el cap de setmana | Cal comentar el cron | Toggle a la UI en un clic |
Airflow és la plataforma d'orquestació de workflows de dades més estesa al sector. El 2025, pràcticament tota oferta de feina de Data Engineer menciona Airflow o un equivalent (Prefect, Dagster). Els pipelines es defineixen com a codi Python, s'executen de forma programada i es monitoritzen a través d'una interfície web.
Arquitectura d'Airflow
flowchart TD
subgraph Airflow
SCH[Scheduler\nDecideix quan executar cada tasca]
WEB[Webserver\nUI de monitoratge]
META[(Metadatabase\nPostgreSQL)]
WORK[Worker\nExecuta les tasques]
end
SCH -->|llegeix DAGs| DAGS[Carpeta /dags]
SCH -->|escriu estat| META
WEB -->|llegeix estat| META
SCH -->|envia tasques| WORK
WORK -->|escriu logs| LOGS[Carpeta /logs]
Components principals:
- Scheduler: el cor d'Airflow. Analitza les definicions de DAG, determina quines tasques estan llestes per executar-se (totes les dependències completes) i les encua per als workers. S'executa de forma contínua en segon pla.
- Webserver: la interfície web (per defecte al port 8080). Permet veure l'estat de cada DAG run, les tasques individuals, els logs, el gràfic de dependències i disparar execucions manuals.
- Metadatabase: base de dades (PostgreSQL en producció, SQLite per a desenvolupament local) on l'Scheduler guarda l'estat de cada DAG run i cada tasca. No s'hi emmagatzemen les dades del pipeline, només l'estat d'execució.
- Workers: els processos que executen el codi real de cada tasca. Depenent de l'executor configurat, poden ser processos locals o workers distribuïts (Celery, Kubernetes).
Conceptes fonamentals
DAG: Directed Acyclic Graph
Un DAG és un graf dirigit sense cicles. A Airflow, cada node del graf és una tasca i les arestes representen dependències d'execució. "Dirigit" vol dir que les dependències tenen una direcció (A ha d'acabar abans que B). "Acíclic" vol dir que no hi pot haver bucles (A no pot dependre de si mateix ni d'una cadena circular).
# Exemple mínim d'un DAG a Airflow 2.8+
from airflow.sdk import DAG, task
from datetime import datetime, timedelta
with DAG(
dag_id="exemple_basic",
description="DAG d'exemple per al curs M5074",
schedule="0 2 * * *", # cada dia a les 02:00
start_date=datetime(2026, 1, 1),
catchup=False, # no re-executar dates passades
tags=["m5074", "exemple"],
) as dag:
pass # aquí s'afegeixen les tasques
Paràmetres importants del DAG:
schedule: expressió cron ("0 2 * * *"= les 2:00 AM cada dia) o predefinits ("@daily","@hourly",Noneper a execució manual).start_date: primera data d'execució possible. Airflow calcularà les execucions des d'aquesta data.catchup: si ésTrue, Airflow re-executarà totes les dates passades entrestart_datei avui en iniciar el DAG. En producció, quasi sempreFalse.retries: nombre de reintents automàtics si una tasca falla.retry_delay: temps d'espera entre reintents (timedelta(minutes=5)).depends_on_past: si ésTrue, una execució no comença fins que la del dia anterior ha acabat correctament.
Task: unitat de treball
Cada tasca és una unitat de treball atòmica. Airflow proporciona diversos tipus de tasques predefinits (Operators):
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.sensors.filesystem import FileSensor
# PythonOperator: executa una funció Python
def processa_dades():
import pandas as pd
df = pd.read_csv("/data/vendes.csv")
print(f"Registres processats: {len(df)}")
tasca_python = PythonOperator(
task_id="processa_dades",
python_callable=processa_dades,
dag=dag,
)
# BashOperator: executa una comanda shell
tasca_bash = BashOperator(
task_id="comprova_espai_disc",
bash_command="df -h /data && echo 'Espai comprovat'",
dag=dag,
)
# SQLExecuteQueryOperator: executa SQL sobre una connexió configurada
tasca_sql = SQLExecuteQueryOperator(
task_id="inserta_resum_diari",
conn_id="postgres_dw",
sql="INSERT INTO resum_diari SELECT CURRENT_DATE, COUNT(*) FROM vendes;",
dag=dag,
)
Operator vs Sensor
Un Operator executa una acció (processa, insereix, envia). Un Sensor espera que una condició es compleixi abans de continuar el pipeline:
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
# FileSensor: espera que existeixi un fitxer
espera_fitxer = FileSensor(
task_id="espera_csv_vendes",
filepath="/data/vendes_{{ ds }}.csv", # ds = data d'execució (YYYY-MM-DD)
poke_interval=60, # comprova cada 60 segons
timeout=3600, # falla si no apareix en 1 hora
mode="reschedule", # allibera el worker mentre espera
dag=dag,
)
# HttpSensor: espera que una API respongui amb èxit
espera_api = HttpSensor(
task_id="espera_api_disponible",
http_conn_id="api_meteo",
endpoint="/health",
poke_interval=30,
timeout=300,
dag=dag,
)
La diferència clau entre mode="poke" i mode="reschedule": en mode poke, el worker queda bloquejat mentre espera (consumeix recursos); en mode reschedule, allibera el worker i el scheduler el reprèn periòdicament.
Executors
L'executor determina com Airflow distribueix les tasques als workers:
- SequentialExecutor: executa una tasca cada vegada, de forma seqüencial. Només per a provar localment.
- LocalExecutor: executa tasques en paral·lel com a processos locals. Adequat per a instal·lacions single-node.
- CeleryExecutor: distribueix tasques a workers externs a través d'una cua de missatges (Redis o RabbitMQ). Per a clústers multi-node en producció.
- KubernetesExecutor: crea un pod Kubernetes per a cada tasca. Excel·lent per a aïllament i escalat elàstic.
XCom: comunicació entre tasques
XCom (cross-communication) permet que una tasca passi un valor petit a una altra:
from airflow.sdk import task
@task
def extreu_dades():
registres = 42_350
return registres # Airflow guarda aquest valor a la metadatabase
@task
def valida_volum(n_registres: int):
if n_registres < 1000:
raise ValueError(f"Volum insuficient: {n_registres} registres")
print(f"Volum correcte: {n_registres:,} registres")
# La connexió es fa automàticament amb el TaskFlow API
n = extreu_dades()
valida_volum(n)
Limitació important: XCom emmagatzema els valors a la metadatabase. No és adequat per a DataFrames ni fitxers grans. Per a passar dades grans entre tasques, usar un sistema extern (S3, base de dades) i passar-ne la referència (un path o un ID) via XCom.
Variables i Connections
Variables: parells clau-valor globals configurables des de la UI o via CLI, sense necessitat de modificar el codi del DAG:
from airflow.models import Variable
# Llegir una variable configurada a la UI d'Airflow
bucket_s3 = Variable.get("bucket_s3_dades")
entorn = Variable.get("entorn", default_var="dev")
Connections: credencials de connexió a sistemes externs (PostgreSQL, S3, APIs) gestionades de forma centralitzada i encriptada:
from airflow.hooks.base import BaseHook
# La connexió "postgres_dw" es configura a la UI, no al codi
conn = BaseHook.get_connection("postgres_dw")
print(f"Host: {conn.host}, Port: {conn.port}, DB: {conn.schema}")
DAG complet: pipeline de vendes
A continuació, un DAG complet que implementa un pipeline real de quatre passos: detecció del fitxer, validació, inserció a PostgreSQL i notificació:
from airflow.sdk import DAG, task
from airflow.sensors.filesystem import FileSensor
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.email import EmailOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import logging
log = logging.getLogger(__name__)
RUTA_CSV = "/opt/airflow/dades/vendes_{{ ds }}.csv"
CONN_PG = "postgres_dw"
default_args = {
"owner": "equip-dades",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["alertes@empresa.cat"],
}
with DAG(
dag_id="pipeline_vendes_diari",
description="Carrega diària de vendes: CSV -> validació -> PostgreSQL",
schedule="30 6 * * 1-5", # feiners a les 6:30 AM
start_date=datetime(2026, 1, 1),
catchup=False,
default_args=default_args,
tags=["m5074", "vendes", "produccio"],
) as dag:
# Pas 1: esperar que el fitxer CSV estigui disponible
espera_csv = FileSensor(
task_id="espera_csv",
filepath=RUTA_CSV,
poke_interval=120, # comprova cada 2 minuts
timeout=7200, # falla si no apareix en 2 hores
mode="reschedule",
)
# Pas 2: validar el CSV amb pandas
@task(task_id="valida_csv")
def valida_csv(data_interval_start=None):
import pandas as pd
from pathlib import Path
ruta = f"/opt/airflow/dades/vendes_{data_interval_start.date()}.csv"
if not Path(ruta).exists():
raise FileNotFoundError(f"Fitxer no trobat: {ruta}")
df = pd.read_csv(ruta)
# Validacions bàsiques
camps_obligatoris = ["id_venda", "data", "id_client", "import"]
for camp in camps_obligatoris:
if camp not in df.columns:
raise ValueError(f"Camp obligatori absent: {camp}")
n_nuls = df[camps_obligatoris].isnull().sum().sum()
if n_nuls > 0:
raise ValueError(f"S'han trobat {n_nuls} valors nuls en camps obligatoris")
invalids = df[df["import"] <= 0]
if not invalids.empty:
log.warning("Registres amb import <= 0: %d (s'ignoraran)", len(invalids))
n_valids = len(df[df["import"] > 0])
log.info("Validació completada: %d registres valids", n_valids)
return n_valids # passat via XCom al pas següent
# Pas 3: inserir a PostgreSQL (upsert idempotent)
inserta_pg = SQLExecuteQueryOperator(
task_id="inserta_vendes_pg",
conn_id=CONN_PG,
sql="""
INSERT INTO dw.vendes (id_venda, data, id_client, import, carregat_el)
SELECT
id_venda::INTEGER,
data::DATE,
id_client::INTEGER,
import::DECIMAL(10,2),
NOW()
FROM staging.vendes_tmp
ON CONFLICT (id_venda)
DO UPDATE SET
import = EXCLUDED.import,
carregat_el = EXCLUDED.carregat_el;
""",
)
# Pas 4: notificació per email
notifica = EmailOperator(
task_id="envia_confirmacio",
to=["analisi@empresa.cat"],
subject="Pipeline vendes completat - {{ ds }}",
html_content="""
<p>El pipeline de vendes del dia <strong>{{ ds }}</strong>
ha finalitzat correctament.</p>
<p>Registres processats: {{ ti.xcom_pull(task_ids='valida_csv') }}</p>
""",
)
# Definir les dependències
espera_csv >> valida_csv() >> inserta_pg >> notifica
Backfill: re-executar per dates passades
Si el pipeline ha fallat durant una setmana i cal recuperar les dades d'aquells dies:
# Re-executar el DAG per a un rang de dates
airflow dags backfill \
--dag-id pipeline_vendes_diari \
--start-date 2026-06-01 \
--end-date 2026-06-07
# Veure l'estat de les execucions d'un DAG
airflow dags list-runs --dag-id pipeline_vendes_diari --limit 20
El backfill respecta les dependències entre tasques i els paràmetres catchup i depends_on_past. Si una execució anterior falla durant el backfill, les execucions posteriors que depenguin d'ella no s'iniciaran (si depends_on_past=True).
TaskGroups: organitzar tasques visuals
Quan un DAG té moltes tasques, els TaskGroups permeten agrupar-les visualment a la UI:
from airflow.utils.task_group import TaskGroup
with DAG(dag_id="pipeline_complex", ...) as dag:
with TaskGroup("extraccio", tooltip="Extracció de dades d'origen") as grp_extraccio:
extreu_vendes = PythonOperator(task_id="extreu_vendes", ...)
extreu_clients = PythonOperator(task_id="extreu_clients", ...)
extreu_productes = PythonOperator(task_id="extreu_productes", ...)
with TaskGroup("transformacio", tooltip="Neteja i transformació") as grp_transf:
neteja = PythonOperator(task_id="neteja", ...)
enriquiment = PythonOperator(task_id="enriquiment", ...)
with TaskGroup("carrega", tooltip="Càrrega al Data Warehouse") as grp_carrega:
carrega_fets = SQLExecuteQueryOperator(task_id="carrega_fets", ...)
carrega_dims = SQLExecuteQueryOperator(task_id="carrega_dims", ...)
grp_extraccio >> grp_transf >> grp_carrega
Instal·lació amb Docker Compose
La forma més ràpida de tenir Airflow funcionant localment és amb el Docker Compose oficial:
# Descarregar el fitxer docker-compose oficial d'Airflow 2.9
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'
# Crear les carpetes necessàries
mkdir -p ./dags ./logs ./plugins ./config
# Inicialitzar la base de dades i crear l'usuari admin
docker compose up airflow-init
# Arrancar tots els serveis en segon pla
docker compose up -d
Airflow queda disponible a http://localhost:8080 (usuari: airflow, contrasenya: airflow).
Per afegir dependències Python addicionals als workers, crear un fitxer requirements.txt a la carpeta del projecte i afegir-lo a l'entorn:
# Al docker-compose.yaml, afegir a la secció x-airflow-common -> environment:
_PIP_ADDITIONAL_REQUIREMENTS: "pandas==2.2.0 pandera==0.18.0 great-expectations==0.18.0"
AC5074/05/02 — Miniactivitat
Escriu un DAG d'Airflow (fitxer Python complet, executable) que faci el següent:
-
Usant un
PythonOperator, extregui les dades meteorològiques actuals de Barcelona de l'API oberta d'Open-Meteo (https://api.open-meteo.com/v1/forecast?latitude=41.39&longitude=2.15¤t=temperature_2m,wind_speed_10m). Guarda la resposta en un fitxer CSV a/opt/airflow/dades/meteo_{{ ds }}.csv. -
Usant un segon
PythonOperator, llegeix el CSV i valida que la temperatura és un número entre -10 i 50 graus i que la velocitat del vent és positiva. Si la validació falla, la tasca ha de fallar amb un missatge d'error descriptiu. -
Usant un
SQLExecuteQueryOperator, insereix les dades a una taula PostgreSQLmeteo_barcelona (data DATE, temperatura DECIMAL, vent DECIMAL, inserit_el TIMESTAMP). La inserció ha de ser idempotent (upsert per data).
El DAG ha de tenir: schedule="@daily", catchup=False, retries=2, retry_delay=timedelta(minutes=3).
Lliura el fitxer dag_meteo_nom_cognom.py amb comentaris que expliquin les decisions de disseny.
Mòdul M5074 Sistemes de Big Data | Institut Sa Palomera (Blanes) | Curs CEIABD 2026-2027