PR507405 — Pipeline ETL amb Airflow i Docker
Tipus: Pràctica tècnica de programació (orquestació de pipelines)
Durada estimada: 10-12 hores (3-4 sessions de treball, alineades amb S22-S25)
Lliurament: Campus Virtual — fitxers DAG .py + captures de la UI d'Airflow + informe breu
Objectius
Al finalitzar aquesta pràctica, l'alumne serà capaç de:
- Instal·lar Apache Airflow amb Docker Compose i explorar la interfície web (DAGs, Graph view, logs, Variables, Connections).
- Crear un DAG bàsic amb
PythonOperatoriBashOperatorper a un pipeline d'extracció de dades des d'una API pública o un fitxer CSV. - Implementar un DAG complex amb branching condicionat (
BranchPythonOperator), sensors (FileSensor) i gestió d'errors (retries,retry_delay,on_failure_callback). - Configurar un pipeline de càrrega incremental amb idempotència garantida (upsert per clau primària o watermark de timestamp).
- Implementar checks de qualitat de dades dins el DAG amb Great Expectations o dbt tests, separant registres vàlids de registres en quarantena.
- Llegir i interpretar els logs i l'estat d'execució de les tasques des de la UI d'Airflow per diagnosticar errors.
Materials necessaris
- Docker Desktop instal·lat i en funcionament (mínim 4 GB de RAM assignats a Docker).
- Docker Compose (inclòs a Docker Desktop).
- Editor de codi (VS Code recomanat, amb extensió Python).
- Python 3.10+ instal·lat localment (per provar scripts abans de copiar-los a la carpeta
dags/). - Connexió a internet (per descarregar imatges Docker i consultar l'API pública d'extracció).
- Materials teòrics del Bloc 5: ETL vs ELT, Apache Airflow, Qualitat de dades.
Descripció de la pràctica
Part 1 — Instal·lació d'Airflow amb Docker Compose (1,5-2 hores)
Crea una carpeta de projecte pr507405_cognom_nom/ amb l'estructura següent:
pr507405_cognom_nom/
├── docker-compose.yaml
├── dags/
├── logs/
├── plugins/
├── config/
└── dades/
Descarrega el docker-compose.yaml oficial d'Airflow i inicialitza l'entorn:
# Descarregar el fitxer docker-compose oficial d'Airflow 2.9
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'
# Crear les carpetes necessàries (si no existeixen)
mkdir -p ./dags ./logs ./plugins ./config ./dades
# A Linux/Mac cal fixar l'UID; a Windows es pot ometre
echo -e "AIRFLOW_UID=50000" > .env
# Inicialitzar la base de dades i crear l'usuari admin
docker compose up airflow-init
# Arrancar tots els serveis en segon pla
docker compose up -d
Afegeix les dependències Python que necessitaràs durant la pràctica (pandas, requests, pandera, great-expectations) editant la secció x-airflow-common del docker-compose.yaml:
x-airflow-common:
&airflow-common
environment:
&airflow-common-env
_PIP_ADDITIONAL_REQUIREMENTS: "pandas==2.2.0 requests==2.31.0 pandera==0.18.0 great-expectations==0.18.0"
Després de modificar el docker-compose.yaml, reinicia els serveis (docker compose down && docker compose up -d) perquè s'instal·lin les noves dependències als workers.
Comprovacions a fer i documentar (amb captura de pantalla):
- Accedeix a
http://localhost:8080(usuariairflow, contrasenyaairflow) i fes una captura de la llista de DAGs. - Executa
docker compose psi comprova que els serveisairflow-scheduler,airflow-webserver,airflow-workeripostgresestan en estathealthyorunning. - Explora la UI: pestanyes DAGs, Variables i Connections. Crea una Variable anomenada
entornamb valordesenvolupamentdes de la UI (Admin → Variables) i fes-ne una captura.
Part 2 — DAG bàsic: extracció de dades meteorològiques (2-2,5 hores)
Crea el fitxer dags/dag_extraccio_meteo.py amb un DAG que extregui dades de l'API pública d'Open-Meteo i les guardi en un CSV diari:
from airflow.sdk import DAG, task
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
RUTA_DADES = "/opt/airflow/dades"
default_args = {
"owner": "alumne",
"retries": 1,
"retry_delay": timedelta(minutes=2),
}
with DAG(
dag_id="dag_extraccio_meteo",
description="Extreu dades meteorologiques de Barcelona des d'Open-Meteo",
schedule="@daily",
start_date=datetime(2026, 3, 1),
catchup=False,
default_args=default_args,
tags=["m5074", "pr507405", "extraccio"],
) as dag:
# PythonOperator (via TaskFlow API): crida l'API i guarda el CSV
@task(task_id="extreu_dades_meteo")
def extreu_dades_meteo(ds=None):
import requests
import csv
url = (
"https://api.open-meteo.com/v1/forecast"
"?latitude=41.39&longitude=2.15"
"¤t=temperature_2m,wind_speed_10m"
)
resposta = requests.get(url, timeout=15)
resposta.raise_for_status()
dades = resposta.json()["current"]
ruta = f"{RUTA_DADES}/meteo_{ds}.csv"
with open(ruta, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["data", "temperatura", "vent"])
writer.writerow([ds, dades["temperature_2m"], dades["wind_speed_10m"]])
print(f"Fitxer generat: {ruta}")
return ruta
# BashOperator: comprova que el fitxer s'ha creat i mostra'n el contingut
comprova_fitxer = BashOperator(
task_id="comprova_fitxer",
bash_command=(
f"test -f {RUTA_DADES}/meteo_{{{{ ds }}}}.csv "
f"&& cat {RUTA_DADES}/meteo_{{{{ ds }}}}.csv"
),
)
extreu_dades_meteo() >> comprova_fitxer
Tasques a fer:
- Copia el fitxer a la carpeta
dags/del projecte (Airflow el detectarà automàticament en 30-60 segons). - Activa el DAG des de la UI i dispara'n una execució manual (botó "Trigger DAG").
- Comprova els logs de cada tasca des de la Graph view. Fes una captura de l'execució correcta (totes les tasques en verd).
- Modifica el DAG per afegir una tercera tasca
BashOperatorque comprimeixi el CSV generat ambgzipi documenta el canvi.
Part 3 — DAG complex: branching, sensors i gestió d'errors (3-3,5 hores)
Crea el fitxer dags/dag_pipeline_complex.py amb un pipeline que:
- Esperi l'arribada d'un fitxer CSV de vendes mitjançant
FileSensor. - Decideixi amb
BranchPythonOperatorsi el volum de dades és suficient per continuar o si cal aturar el pipeline i alertar. - Gestioni errors amb
retries,retry_delayi unon_failure_callbackque registri l'incident.
from airflow.sdk import DAG, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import logging
log = logging.getLogger(__name__)
RUTA_CSV = "/opt/airflow/dades/vendes_{{ ds }}.csv"
def notifica_fallada(context):
"""on_failure_callback: registra l'incident al log d'Airflow."""
tasca = context["task_instance"].task_id
execucio = context["ds"]
log.error("ALERTA: la tasca '%s' ha fallat per a l'execucio del %s", tasca, execucio)
default_args = {
"owner": "alumne",
"retries": 3,
"retry_delay": timedelta(minutes=2),
"on_failure_callback": notifica_fallada,
}
with DAG(
dag_id="dag_pipeline_complex",
description="Pipeline amb sensor, branching i gestio d'errors",
schedule="@daily",
start_date=datetime(2026, 3, 1),
catchup=False,
default_args=default_args,
tags=["m5074", "pr507405", "complex"],
) as dag:
espera_csv = FileSensor(
task_id="espera_csv_vendes",
filepath=RUTA_CSV,
poke_interval=30,
timeout=600,
mode="reschedule",
)
@task(task_id="compta_registres")
def compta_registres(ds=None):
import pandas as pd
ruta = f"/opt/airflow/dades/vendes_{ds}.csv"
df = pd.read_csv(ruta)
log.info("Registres llegits: %d", len(df))
return len(df)
def decideix_branca(ti=None, **_):
n = ti.xcom_pull(task_ids="compta_registres")
return "processa_dades" if n >= 10 else "alerta_volum_baix"
branca = BranchPythonOperator(
task_id="decideix_branca",
python_callable=decideix_branca,
)
@task(task_id="processa_dades")
def processa_dades(ds=None):
import pandas as pd
ruta = f"/opt/airflow/dades/vendes_{ds}.csv"
df = pd.read_csv(ruta)
df_net = df.dropna(subset=["id_venda", "import"])
log.info("Registres nets: %d de %d", len(df_net), len(df))
return len(df_net)
alerta_volum_baix = EmptyOperator(task_id="alerta_volum_baix")
fi_pipeline = EmptyOperator(
task_id="fi_pipeline",
trigger_rule="none_failed_min_one_success",
)
n_registres = compta_registres()
espera_csv >> n_registres >> branca
branca >> processa_dades() >> fi_pipeline
branca >> alerta_volum_baix >> fi_pipeline
Tasques a fer:
- Genera dos fitxers de prova:
vendes_2026-03-20.csvamb 15 files (branca normal) i un altre dia amb només 3 files (branca d'alerta). Adaptastart_date/execucions per provar ambdós camins. - Comprova a la Graph view que les tasques no executades de la branca alternativa apareixen com a
skipped(no com a error). - Provoca deliberadament un error (per exemple, esborrant temporalment el fitxer abans del timeout del sensor, o llançant una excepció a
processa_dades) i comprova als logs que es registren els 3 reintents (retries=3) abans que la tasca acabi enfailed. - Documenta a l'informe què mostren els logs del
on_failure_callbackquan una tasca falla definitivament.
Part 4 — Càrrega incremental idempotent (2-2,5 hores)
Amplia el pipeline anterior (o crea un nou DAG dag_carrega_incremental.py) per carregar les dades validades a una base de dades amb upsert idempotent basat en un watermark de timestamp:
from airflow.sdk import DAG, task
from airflow.models import Variable
from datetime import datetime, timedelta
import logging
log = logging.getLogger(__name__)
with DAG(
dag_id="dag_carrega_incremental",
description="Carrega incremental idempotent amb watermark",
schedule="@daily",
start_date=datetime(2026, 3, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=3)},
tags=["m5074", "pr507405", "incremental"],
) as dag:
@task(task_id="carrega_incremental")
def carrega_incremental(ds=None):
import sqlite3
import pandas as pd
ruta_db = "/opt/airflow/dades/dw_vendes.db"
ruta_csv = f"/opt/airflow/dades/vendes_{ds}.csv"
conn = sqlite3.connect(ruta_db)
conn.execute("""
CREATE TABLE IF NOT EXISTS vendes (
id_venda INTEGER PRIMARY KEY,
data TEXT,
id_client INTEGER,
import REAL,
carregat_el TEXT
)
""")
# Watermark: darrera data carregada amb exit (guardat com a Variable d'Airflow)
last_run = Variable.get("watermark_vendes", default_var="1970-01-01")
log.info("Watermark actual: %s", last_run)
df = pd.read_csv(ruta_csv)
df_nous = df[df["data"] > last_run]
if df_nous.empty:
log.info("Cap registre nou des del watermark %s.", last_run)
conn.close()
return 0
cur = conn.cursor()
for _, fila in df_nous.iterrows():
# Upsert idempotent: si id_venda existeix, actualitza; si no, insereix
cur.execute("""
INSERT INTO vendes (id_venda, data, id_client, import, carregat_el)
VALUES (?, ?, ?, ?, datetime('now'))
ON CONFLICT(id_venda) DO UPDATE SET
import = excluded.import,
carregat_el = excluded.carregat_el
""", (fila["id_venda"], fila["data"], fila["id_client"], fila["import"]))
conn.commit()
nou_watermark = df_nous["data"].max()
Variable.set("watermark_vendes", nou_watermark)
log.info("Carregats %d registres. Nou watermark: %s", len(df_nous), nou_watermark)
conn.close()
return len(df_nous)
carrega_incremental()
Tasques a fer:
- Executa el DAG dues vegades seguides sobre el mateix fitxer CSV i comprova (amb una consulta
SELECT COUNT(*) FROM vendes) que no hi ha duplicats: aquesta és la prova d'idempotència. - Modifica l'import d'un registre existent al CSV, torna a executar el DAG i comprova que el registre s'ha actualitzat, no duplicat.
- Documenta a l'informe quin mecanisme garanteix la idempotència en aquest DAG (watermark +
ON CONFLICT DO UPDATE) i què passaria si s'eliminés el watermark.
Part 5 — Checks de qualitat de dades integrats al DAG (1,5-2 hores)
Afegeix una tasca de validació de qualitat abans de la càrrega, fent servir Great Expectations o Pandera (ja inclòs a etl_conceptes.md i qualitat_dades.md):
from airflow.sdk import task
import pandera as pa
from pandera import Column, DataFrameSchema, Check
esquema_vendes = DataFrameSchema(
columns={
"id_venda": Column(int, checks=Check.greater_than(0), unique=True, nullable=False),
"data": Column(str, nullable=False),
"id_client": Column(int, checks=Check.greater_than(0), nullable=False),
"import": Column(float, checks=Check.greater_than(0), nullable=False),
},
coerce=True,
strict=False,
)
@task(task_id="valida_qualitat")
def valida_qualitat(ds=None):
import pandas as pd
ruta = f"/opt/airflow/dades/vendes_{ds}.csv"
df = pd.read_csv(ruta)
try:
df_valid = esquema_vendes.validate(df, lazy=True)
print(f"Validacio correcta: {len(df_valid)} registres.")
return len(df_valid)
except pa.errors.SchemaErrors as e:
print("Errors de qualitat trobats:")
print(e.failure_cases)
ruta_quarantena = f"/opt/airflow/dades/quarantena_{ds}.csv"
e.failure_cases.to_csv(ruta_quarantena, index=False)
raise ValueError(
f"Validacio fallida: {len(e.failure_cases)} errors. "
f"Detall a {ruta_quarantena}"
) from e
Tasques a fer:
- Insereix aquesta tasca
valida_qualitatentre el sensor i la càrrega del DAG complex de la Part 3 (espera_csv >> valida_qualitat() >> branca). - Crea un CSV de prova amb almenys 2 errors deliberats (un import negatiu, un
id_vendaduplicat) i comprova que la tasca falla i genera el fitxer de quarantena. - Si optes per Great Expectations en lloc de Pandera, adapta l'exemple de Qualitat de dades (secció "Validació amb Great Expectations") substituint l'esquema Pandera per una Expectation Suite equivalent.
- Documenta a l'informe les dimensions de qualitat comprovades (completitud, exactitud, unicitat) i com es correspon cada
Checkamb una dimensió.
Lliurament
Puja al Campus Virtual un fitxer comprimit PR507405_cognom_nom.zip amb:
| Contingut | Detall |
|---|---|
dags/ |
Tots els fitxers .py dels DAGs desenvolupats (Parts 2-5), amb comentaris explicant les decisions de disseny |
captures/ |
Captures de pantalla de la UI d'Airflow: llista de DAGs, Graph view de cada DAG amb execució correcta, logs d'un error gestionat, Variable watermark_vendes a la UI |
dades/ |
CSV de prova utilitzats (inclòs el de quarantena generat a la Part 5) |
informe_cognom_nom.pdf |
Informe breu (màxim 5 pàgines) que respongui les preguntes de reflexió i documenti les decisions tècniques de cada part |
Estructura recomanada de l'informe:
- Captura de pantalla de
docker compose psamb tots els serveis en marxa. - Resum de cada DAG (què fa, quines tasques té, com s'ha provat).
- Evidència de la idempotència (resultat de les dues execucions de la Part 4).
- Resultats de la validació de qualitat (registres vàlids vs. quarantena).
- Respostes a les preguntes de reflexió final.
Consulta la Rúbrica PR507405 per als criteris detallats d'avaluació.
Data límit de lliurament: consulta el calendari del Campus Virtual (coincideix amb la sessió S25).
Prova escrita del Bloc 5
La sessió S25 inclou també una prova escrita sobre els continguts teòrics del Bloc 5 (ETL vs ELT, arquitectura d'Airflow, idempotència, dimensions de qualitat de dades). Repassa ETL vs ELT, Apache Airflow i Qualitat de dades abans de la sessió.
Preguntes de reflexió final
Un cop completada la pràctica, reflexiona breument sobre:
- Per què Airflow gestiona millor aquest pipeline que un script Python llançat per cron? Posa un exemple concret extret de la teva pròpia experiència fent la pràctica (un error, un reintent, una dependència).
- En quin punt del DAG complex (Part 3) decideixes posar el
BranchPythonOperator? Per què aquest disseny és millor que fer la comprovació de volum dins la mateixa tasca de processament? - Explica amb les teves paraules per què el patró
ON CONFLICT DO UPDATEcombinat amb un watermark garanteix la idempotència del pipeline de la Part 4. Què passaria si el watermark es guardés després d'un error de càrrega en lloc d'abans? - Quina dimensió de qualitat de dades (completitud, exactitud, consistència, unicitat, temporalitat, validesa) et sembla més difícil de validar automàticament i per què?
Inclou les respostes a l'informe (3-5 línies per pregunta).
Pràctica PR507405 | Mòdul M5074 Sistemes de Big Data | Institut Sa Palomera (Blanes) | Curs CEIABD 2026-2027