Salta el contingut

PR507405 — Pipeline ETL amb Airflow i Docker

Tipus: Pràctica tècnica de programació (orquestació de pipelines) Durada estimada: 10-12 hores (3-4 sessions de treball, alineades amb S22-S25) Lliurament: Campus Virtual — fitxers DAG .py + captures de la UI d'Airflow + informe breu


Objectius

Al finalitzar aquesta pràctica, l'alumne serà capaç de:

  1. Instal·lar Apache Airflow amb Docker Compose i explorar la interfície web (DAGs, Graph view, logs, Variables, Connections).
  2. Crear un DAG bàsic amb PythonOperator i BashOperator per a un pipeline d'extracció de dades des d'una API pública o un fitxer CSV.
  3. Implementar un DAG complex amb branching condicionat (BranchPythonOperator), sensors (FileSensor) i gestió d'errors (retries, retry_delay, on_failure_callback).
  4. Configurar un pipeline de càrrega incremental amb idempotència garantida (upsert per clau primària o watermark de timestamp).
  5. Implementar checks de qualitat de dades dins el DAG amb Great Expectations o dbt tests, separant registres vàlids de registres en quarantena.
  6. Llegir i interpretar els logs i l'estat d'execució de les tasques des de la UI d'Airflow per diagnosticar errors.

Materials necessaris

  • Docker Desktop instal·lat i en funcionament (mínim 4 GB de RAM assignats a Docker).
  • Docker Compose (inclòs a Docker Desktop).
  • Editor de codi (VS Code recomanat, amb extensió Python).
  • Python 3.10+ instal·lat localment (per provar scripts abans de copiar-los a la carpeta dags/).
  • Connexió a internet (per descarregar imatges Docker i consultar l'API pública d'extracció).
  • Materials teòrics del Bloc 5: ETL vs ELT, Apache Airflow, Qualitat de dades.

Descripció de la pràctica

Part 1 — Instal·lació d'Airflow amb Docker Compose (1,5-2 hores)

Crea una carpeta de projecte pr507405_cognom_nom/ amb l'estructura següent:

pr507405_cognom_nom/
├── docker-compose.yaml
├── dags/
├── logs/
├── plugins/
├── config/
└── dades/

Descarrega el docker-compose.yaml oficial d'Airflow i inicialitza l'entorn:

# Descarregar el fitxer docker-compose oficial d'Airflow 2.9
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'

# Crear les carpetes necessàries (si no existeixen)
mkdir -p ./dags ./logs ./plugins ./config ./dades

# A Linux/Mac cal fixar l'UID; a Windows es pot ometre
echo -e "AIRFLOW_UID=50000" > .env

# Inicialitzar la base de dades i crear l'usuari admin
docker compose up airflow-init

# Arrancar tots els serveis en segon pla
docker compose up -d

Afegeix les dependències Python que necessitaràs durant la pràctica (pandas, requests, pandera, great-expectations) editant la secció x-airflow-common del docker-compose.yaml:

x-airflow-common:
  &airflow-common
  environment:
    &airflow-common-env
    _PIP_ADDITIONAL_REQUIREMENTS: "pandas==2.2.0 requests==2.31.0 pandera==0.18.0 great-expectations==0.18.0"

Després de modificar el docker-compose.yaml, reinicia els serveis (docker compose down && docker compose up -d) perquè s'instal·lin les noves dependències als workers.

Comprovacions a fer i documentar (amb captura de pantalla):

  • Accedeix a http://localhost:8080 (usuari airflow, contrasenya airflow) i fes una captura de la llista de DAGs.
  • Executa docker compose ps i comprova que els serveis airflow-scheduler, airflow-webserver, airflow-worker i postgres estan en estat healthy o running.
  • Explora la UI: pestanyes DAGs, Variables i Connections. Crea una Variable anomenada entorn amb valor desenvolupament des de la UI (Admin → Variables) i fes-ne una captura.

Part 2 — DAG bàsic: extracció de dades meteorològiques (2-2,5 hores)

Crea el fitxer dags/dag_extraccio_meteo.py amb un DAG que extregui dades de l'API pública d'Open-Meteo i les guardi en un CSV diari:

from airflow.sdk import DAG, task
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

RUTA_DADES = "/opt/airflow/dades"

default_args = {
    "owner": "alumne",
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
}

with DAG(
    dag_id="dag_extraccio_meteo",
    description="Extreu dades meteorologiques de Barcelona des d'Open-Meteo",
    schedule="@daily",
    start_date=datetime(2026, 3, 1),
    catchup=False,
    default_args=default_args,
    tags=["m5074", "pr507405", "extraccio"],
) as dag:

    # PythonOperator (via TaskFlow API): crida l'API i guarda el CSV
    @task(task_id="extreu_dades_meteo")
    def extreu_dades_meteo(ds=None):
        import requests
        import csv

        url = (
            "https://api.open-meteo.com/v1/forecast"
            "?latitude=41.39&longitude=2.15"
            "&current=temperature_2m,wind_speed_10m"
        )
        resposta = requests.get(url, timeout=15)
        resposta.raise_for_status()
        dades = resposta.json()["current"]

        ruta = f"{RUTA_DADES}/meteo_{ds}.csv"
        with open(ruta, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["data", "temperatura", "vent"])
            writer.writerow([ds, dades["temperature_2m"], dades["wind_speed_10m"]])

        print(f"Fitxer generat: {ruta}")
        return ruta

    # BashOperator: comprova que el fitxer s'ha creat i mostra'n el contingut
    comprova_fitxer = BashOperator(
        task_id="comprova_fitxer",
        bash_command=(
            f"test -f {RUTA_DADES}/meteo_{{{{ ds }}}}.csv "
            f"&& cat {RUTA_DADES}/meteo_{{{{ ds }}}}.csv"
        ),
    )

    extreu_dades_meteo() >> comprova_fitxer

Tasques a fer:

  1. Copia el fitxer a la carpeta dags/ del projecte (Airflow el detectarà automàticament en 30-60 segons).
  2. Activa el DAG des de la UI i dispara'n una execució manual (botó "Trigger DAG").
  3. Comprova els logs de cada tasca des de la Graph view. Fes una captura de l'execució correcta (totes les tasques en verd).
  4. Modifica el DAG per afegir una tercera tasca BashOperator que comprimeixi el CSV generat amb gzip i documenta el canvi.

Part 3 — DAG complex: branching, sensors i gestió d'errors (3-3,5 hores)

Crea el fitxer dags/dag_pipeline_complex.py amb un pipeline que:

  1. Esperi l'arribada d'un fitxer CSV de vendes mitjançant FileSensor.
  2. Decideixi amb BranchPythonOperator si el volum de dades és suficient per continuar o si cal aturar el pipeline i alertar.
  3. Gestioni errors amb retries, retry_delay i un on_failure_callback que registri l'incident.
from airflow.sdk import DAG, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import logging

log = logging.getLogger(__name__)

RUTA_CSV = "/opt/airflow/dades/vendes_{{ ds }}.csv"


def notifica_fallada(context):
    """on_failure_callback: registra l'incident al log d'Airflow."""
    tasca = context["task_instance"].task_id
    execucio = context["ds"]
    log.error("ALERTA: la tasca '%s' ha fallat per a l'execucio del %s", tasca, execucio)


default_args = {
    "owner": "alumne",
    "retries": 3,
    "retry_delay": timedelta(minutes=2),
    "on_failure_callback": notifica_fallada,
}

with DAG(
    dag_id="dag_pipeline_complex",
    description="Pipeline amb sensor, branching i gestio d'errors",
    schedule="@daily",
    start_date=datetime(2026, 3, 1),
    catchup=False,
    default_args=default_args,
    tags=["m5074", "pr507405", "complex"],
) as dag:

    espera_csv = FileSensor(
        task_id="espera_csv_vendes",
        filepath=RUTA_CSV,
        poke_interval=30,
        timeout=600,
        mode="reschedule",
    )

    @task(task_id="compta_registres")
    def compta_registres(ds=None):
        import pandas as pd
        ruta = f"/opt/airflow/dades/vendes_{ds}.csv"
        df = pd.read_csv(ruta)
        log.info("Registres llegits: %d", len(df))
        return len(df)

    def decideix_branca(ti=None, **_):
        n = ti.xcom_pull(task_ids="compta_registres")
        return "processa_dades" if n >= 10 else "alerta_volum_baix"

    branca = BranchPythonOperator(
        task_id="decideix_branca",
        python_callable=decideix_branca,
    )

    @task(task_id="processa_dades")
    def processa_dades(ds=None):
        import pandas as pd
        ruta = f"/opt/airflow/dades/vendes_{ds}.csv"
        df = pd.read_csv(ruta)
        df_net = df.dropna(subset=["id_venda", "import"])
        log.info("Registres nets: %d de %d", len(df_net), len(df))
        return len(df_net)

    alerta_volum_baix = EmptyOperator(task_id="alerta_volum_baix")

    fi_pipeline = EmptyOperator(
        task_id="fi_pipeline",
        trigger_rule="none_failed_min_one_success",
    )

    n_registres = compta_registres()
    espera_csv >> n_registres >> branca
    branca >> processa_dades() >> fi_pipeline
    branca >> alerta_volum_baix >> fi_pipeline

Tasques a fer:

  1. Genera dos fitxers de prova: vendes_2026-03-20.csv amb 15 files (branca normal) i un altre dia amb només 3 files (branca d'alerta). Adapta start_date/execucions per provar ambdós camins.
  2. Comprova a la Graph view que les tasques no executades de la branca alternativa apareixen com a skipped (no com a error).
  3. Provoca deliberadament un error (per exemple, esborrant temporalment el fitxer abans del timeout del sensor, o llançant una excepció a processa_dades) i comprova als logs que es registren els 3 reintents (retries=3) abans que la tasca acabi en failed.
  4. Documenta a l'informe què mostren els logs del on_failure_callback quan una tasca falla definitivament.

Part 4 — Càrrega incremental idempotent (2-2,5 hores)

Amplia el pipeline anterior (o crea un nou DAG dag_carrega_incremental.py) per carregar les dades validades a una base de dades amb upsert idempotent basat en un watermark de timestamp:

from airflow.sdk import DAG, task
from airflow.models import Variable
from datetime import datetime, timedelta
import logging

log = logging.getLogger(__name__)

with DAG(
    dag_id="dag_carrega_incremental",
    description="Carrega incremental idempotent amb watermark",
    schedule="@daily",
    start_date=datetime(2026, 3, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=3)},
    tags=["m5074", "pr507405", "incremental"],
) as dag:

    @task(task_id="carrega_incremental")
    def carrega_incremental(ds=None):
        import sqlite3
        import pandas as pd

        ruta_db = "/opt/airflow/dades/dw_vendes.db"
        ruta_csv = f"/opt/airflow/dades/vendes_{ds}.csv"

        conn = sqlite3.connect(ruta_db)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS vendes (
                id_venda INTEGER PRIMARY KEY,
                data TEXT,
                id_client INTEGER,
                import REAL,
                carregat_el TEXT
            )
        """)

        # Watermark: darrera data carregada amb exit (guardat com a Variable d'Airflow)
        last_run = Variable.get("watermark_vendes", default_var="1970-01-01")
        log.info("Watermark actual: %s", last_run)

        df = pd.read_csv(ruta_csv)
        df_nous = df[df["data"] > last_run]

        if df_nous.empty:
            log.info("Cap registre nou des del watermark %s.", last_run)
            conn.close()
            return 0

        cur = conn.cursor()
        for _, fila in df_nous.iterrows():
            # Upsert idempotent: si id_venda existeix, actualitza; si no, insereix
            cur.execute("""
                INSERT INTO vendes (id_venda, data, id_client, import, carregat_el)
                VALUES (?, ?, ?, ?, datetime('now'))
                ON CONFLICT(id_venda) DO UPDATE SET
                    import = excluded.import,
                    carregat_el = excluded.carregat_el
            """, (fila["id_venda"], fila["data"], fila["id_client"], fila["import"]))
        conn.commit()

        nou_watermark = df_nous["data"].max()
        Variable.set("watermark_vendes", nou_watermark)
        log.info("Carregats %d registres. Nou watermark: %s", len(df_nous), nou_watermark)

        conn.close()
        return len(df_nous)

    carrega_incremental()

Tasques a fer:

  1. Executa el DAG dues vegades seguides sobre el mateix fitxer CSV i comprova (amb una consulta SELECT COUNT(*) FROM vendes) que no hi ha duplicats: aquesta és la prova d'idempotència.
  2. Modifica l'import d'un registre existent al CSV, torna a executar el DAG i comprova que el registre s'ha actualitzat, no duplicat.
  3. Documenta a l'informe quin mecanisme garanteix la idempotència en aquest DAG (watermark + ON CONFLICT DO UPDATE) i què passaria si s'eliminés el watermark.

Part 5 — Checks de qualitat de dades integrats al DAG (1,5-2 hores)

Afegeix una tasca de validació de qualitat abans de la càrrega, fent servir Great Expectations o Pandera (ja inclòs a etl_conceptes.md i qualitat_dades.md):

from airflow.sdk import task
import pandera as pa
from pandera import Column, DataFrameSchema, Check

esquema_vendes = DataFrameSchema(
    columns={
        "id_venda": Column(int, checks=Check.greater_than(0), unique=True, nullable=False),
        "data": Column(str, nullable=False),
        "id_client": Column(int, checks=Check.greater_than(0), nullable=False),
        "import": Column(float, checks=Check.greater_than(0), nullable=False),
    },
    coerce=True,
    strict=False,
)

@task(task_id="valida_qualitat")
def valida_qualitat(ds=None):
    import pandas as pd

    ruta = f"/opt/airflow/dades/vendes_{ds}.csv"
    df = pd.read_csv(ruta)

    try:
        df_valid = esquema_vendes.validate(df, lazy=True)
        print(f"Validacio correcta: {len(df_valid)} registres.")
        return len(df_valid)
    except pa.errors.SchemaErrors as e:
        print("Errors de qualitat trobats:")
        print(e.failure_cases)
        ruta_quarantena = f"/opt/airflow/dades/quarantena_{ds}.csv"
        e.failure_cases.to_csv(ruta_quarantena, index=False)
        raise ValueError(
            f"Validacio fallida: {len(e.failure_cases)} errors. "
            f"Detall a {ruta_quarantena}"
        ) from e

Tasques a fer:

  1. Insereix aquesta tasca valida_qualitat entre el sensor i la càrrega del DAG complex de la Part 3 (espera_csv >> valida_qualitat() >> branca).
  2. Crea un CSV de prova amb almenys 2 errors deliberats (un import negatiu, un id_venda duplicat) i comprova que la tasca falla i genera el fitxer de quarantena.
  3. Si optes per Great Expectations en lloc de Pandera, adapta l'exemple de Qualitat de dades (secció "Validació amb Great Expectations") substituint l'esquema Pandera per una Expectation Suite equivalent.
  4. Documenta a l'informe les dimensions de qualitat comprovades (completitud, exactitud, unicitat) i com es correspon cada Check amb una dimensió.

Lliurament

Puja al Campus Virtual un fitxer comprimit PR507405_cognom_nom.zip amb:

Contingut Detall
dags/ Tots els fitxers .py dels DAGs desenvolupats (Parts 2-5), amb comentaris explicant les decisions de disseny
captures/ Captures de pantalla de la UI d'Airflow: llista de DAGs, Graph view de cada DAG amb execució correcta, logs d'un error gestionat, Variable watermark_vendes a la UI
dades/ CSV de prova utilitzats (inclòs el de quarantena generat a la Part 5)
informe_cognom_nom.pdf Informe breu (màxim 5 pàgines) que respongui les preguntes de reflexió i documenti les decisions tècniques de cada part

Estructura recomanada de l'informe:

  1. Captura de pantalla de docker compose ps amb tots els serveis en marxa.
  2. Resum de cada DAG (què fa, quines tasques té, com s'ha provat).
  3. Evidència de la idempotència (resultat de les dues execucions de la Part 4).
  4. Resultats de la validació de qualitat (registres vàlids vs. quarantena).
  5. Respostes a les preguntes de reflexió final.

Consulta la Rúbrica PR507405 per als criteris detallats d'avaluació.

Data límit de lliurament: consulta el calendari del Campus Virtual (coincideix amb la sessió S25).

Prova escrita del Bloc 5

La sessió S25 inclou també una prova escrita sobre els continguts teòrics del Bloc 5 (ETL vs ELT, arquitectura d'Airflow, idempotència, dimensions de qualitat de dades). Repassa ETL vs ELT, Apache Airflow i Qualitat de dades abans de la sessió.


Preguntes de reflexió final

Un cop completada la pràctica, reflexiona breument sobre:

  1. Per què Airflow gestiona millor aquest pipeline que un script Python llançat per cron? Posa un exemple concret extret de la teva pròpia experiència fent la pràctica (un error, un reintent, una dependència).
  2. En quin punt del DAG complex (Part 3) decideixes posar el BranchPythonOperator? Per què aquest disseny és millor que fer la comprovació de volum dins la mateixa tasca de processament?
  3. Explica amb les teves paraules per què el patró ON CONFLICT DO UPDATE combinat amb un watermark garanteix la idempotència del pipeline de la Part 4. Què passaria si el watermark es guardés després d'un error de càrrega en lloc d'abans?
  4. Quina dimensió de qualitat de dades (completitud, exactitud, consistència, unicitat, temporalitat, validesa) et sembla més difícil de validar automàticament i per què?

Inclou les respostes a l'informe (3-5 línies per pregunta).


Pràctica PR507405 | Mòdul M5074 Sistemes de Big Data | Institut Sa Palomera (Blanes) | Curs CEIABD 2026-2027