Salta el contingut

ETL vs ELT: conceptes i disseny de pipelines

Introducció

Qualsevol sistema de Big Data necessita moure dades d'un lloc a un altre: des de bases de dades operacionals, APIs, fitxers o fluxos d'events, fins a un Data Warehouse o Data Lake on es puguin analitzar. Els paradigmes ETL i ELT descriuen com s'organitza aquest moviment i on s'apliquen les transformacions. La tria entre l'un i l'altre depèn de l'arquitectura de dades, el volum, la latència requerida i les habilitats de l'equip.


ETL clàssic: Extract, Transform, Load

L'ETL neix als anys noranta, quan els Data Warehouses eren sistemes cars i amb capacitat de còmput limitada. La idea central és aplicar totes les transformacions abans de carregar les dades al sistema de destí, de manera que el DW només rep dades ja netes, validades i en el format final.

flowchart LR
    A[Sistema\norigen\nMySQL / API] -->|1. Extracció| B[Zona de\nestadi\nSTAGING]
    B -->|2. Transformació\npre-càrrega| C[Motor ETL\nPython / Spark]
    C -->|3. Càrrega\ndades netes| D[Data\nWarehouse]

Seqüència de l'ETL:

  1. Extract: es connecta al sistema origen (BD operacional, API REST, fitxers CSV/JSON) i s'extreuen les dades en brut. L'extracció ha de ser poc invasiva: es llegeix, no s'escriu.
  2. Transform: les dades es netegen, s'estandarditzen, es validen i es modelitzen. Aquesta fase inclou operacions com: normalitzar dates a ISO 8601, eliminar duplicats, fer joins entre múltiples fonts, calcular mètriques derivades o aplicar regles de negoci.
  3. Load: les dades transformades s'insereixen al DW en el format final (taules de fets, dimensions).

Casos d'ús on ETL té sentit:

  • Sistemes de destí amb capacitat de còmput limitada (DW on-premises).
  • Casos on les dades originals no s'han d'emmagatzemar per motius legals (RGPD: s'anonimitzen abans de carregar).
  • Fluxos amb transformacions complexes que requereixen biblioteques Python específiques (NLP, validació de dades geoespacials).
  • Volums moderats on la transformació en un motor extern (Spark, pandas) és viable.

Limitacions de rendiment del ETL:

  • La transformació es produeix en un servidor intermedi que pot convertir-se en coll d'ampolla.
  • Cal mantenir una capa de staging separada.
  • Els canvis de schema al sistema origen poden trencar el pipeline de transformació.
  • Difícilment escalable per a volums de terabytes.

ELT modern: Extract, Load, Transform

Amb l'aparició dels Data Warehouses cloud (BigQuery, Snowflake, Amazon Redshift, Azure Synapse), la capacitat de còmput al costat de les dades s'ha disparat. Ara té sentit carregar les dades en brut primer i transformar-les directament dins el DW usant SQL.

flowchart LR
    A[Sistema\norigen] -->|1. Extracció| B[Zona de\nestadi\nraw / bronze]
    B -->|2. Càrrega\nen brut| C[Data\nWarehouse\nCloud]
    C -->|3. Transformació\nSQL dins el DW| D[Capa\ntransformada\nsilver / gold]

Per qué té sentit l'ELT quan el DW és potent:

  • BigQuery, Snowflake i Redshift processen petabytes amb SQL distribuït. Les transformacions SQL dins el DW s'executen en paral·lel sobre clústers de desenes de nodes.
  • El cost de còmput del cloud és elàstic: pagues per la consulta, no per la infraestructura mantinguda.
  • Les dades originals queden emmagatzemades (zona raw), cosa que permet re-transformar-les si canvien les regles de negoci.
  • Els equips de dades dominan SQL millor que Spark o Python per a transformacions analítiques.

Exemple pràctic: carregar CSV brut a Redshift i transformar amb SQL

-- Pas 1: carregar el CSV brut a la taula staging (des d'S3)
COPY staging.vendes_raw
FROM 's3://data-bucket/vendes/2026/06/'
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftS3Role'
FORMAT AS CSV
IGNOREHEADER 1
DATEFORMAT 'YYYY-MM-DD';

-- Pas 2: transformar i inserir a la taula final (SQL dins Redshift)
INSERT INTO dw.fets_vendes (id_venda, data_venda, id_producte, quantitat, import_net)
SELECT
    venda_id::INTEGER,
    data::DATE,
    producte_id::INTEGER,
    CAST(quantitat AS INTEGER),
    ROUND(preu_unitari::DECIMAL * quantitat::INTEGER, 2)
FROM staging.vendes_raw
WHERE preu_unitari > 0
  AND quantitat > 0
  AND venda_id IS NOT NULL;

Comparativa pràctica ETL vs ELT

Dimensió ETL clàssic ELT modern
On es transforma Motor extern (Spark, Python) Dins el DW (SQL)
Volum òptim Fins a centenars de GB Terabytes i més
Latència Pot ser alta (transformació pesada) Baixa (càrrega ràpida, transformació diferida)
Cost de còmput Servidor ETL dedicat Elàstic (pay-per-query)
Habilitats necessàries Python/Spark + SQL Principalment SQL
Dades originals Sovint no es conserven Sempre conservades (zona raw)
Flexibilitat Cal modificar pipeline si canvien regles Re-transforma amb nova SQL sense re-extreure
Eines habituals Apache Spark, pandas, Airbyte dbt, Dataform, SQL nativa al DW

Patrons de càrrega

Full Load (càrrega completa)

S'esborra tota la taula de destí i es torna a carregar des de zero. El patró més simple.

# Full load: truncar i reinsertar
def full_load(conn_origen, conn_dest, taula):
    # Llegir tot l'origen
    df = pd.read_sql(f"SELECT * FROM {taula}", conn_origen)

    # Esborrar destí i reinsertar
    with conn_dest.cursor() as cur:
        cur.execute(f"TRUNCATE TABLE {taula}")
    df.to_sql(taula, conn_dest, if_exists='append', index=False)
    print(f"Full load completat: {len(df):,} registres carregats.")

Quan usar-lo: taules de dimensions petites (menys de 100.000 registres), dades de referència que canvien rarament.

Limitació: per a taules grans (milions de registres), una càrrega completa diària pot trigar hores i sobrecarregar el sistema origen.

Incremental (basat en timestamp)

Només s'extreuen els registres modificats o creats des de l'última execució del pipeline. Requereix un camp updated_at o created_at a la taula origen.

import datetime

def incremental_load(conn_origen, conn_dest, taula, last_run: datetime.datetime):
    # Extreiem només els registres modificats des del darrer run
    query = f"""
        SELECT *
        FROM {taula}
        WHERE updated_at > '{last_run.isoformat()}'
        ORDER BY updated_at ASC
    """
    df = pd.read_sql(query, conn_origen)

    if df.empty:
        print(f"Cap registre nou des de {last_run}.")
        return

    # Upsert: inserir nous, actualitzar existents
    upsert_to_dest(df, conn_dest, taula)

    # Guardar el nou timestamp de darrera execució
    nou_last_run = df['updated_at'].max()
    save_checkpoint(taula, nou_last_run)
    print(f"Incremental load: {len(df):,} registres processats fins {nou_last_run}.")

Limitació: si un registre s'elimina a l'origen, la càrrega incremental no ho detecta (els deletes no modifiquen updated_at).

CDC: Change Data Capture

CDC captura tots els canvis (INSERT, UPDATE, DELETE) a nivell del log de transaccions de la base de dades (BinLog a MySQL, WAL a PostgreSQL). L'eina de referència és Debezium, que llegeix el BinLog i publica els canvis a Kafka.

flowchart LR
    A[MySQL\nBinLog] -->|Debezium\nConnector| B[Apache\nKafka]
    B -->|Kafka Connect\nSink| C[Data\nWarehouse]
    B -->|Kafka Connect\nSink| D[Data\nLake\nS3]

CDC permet capturar els deletes, cosa que la càrrega incremental per timestamp no pot fer. La latència pot ser de segons a pocs minuts, en lloc d'hores.

Upsert: INSERT ON CONFLICT / MERGE

L'upsert combina la inserció amb l'actualització en una sola operació atòmica: si el registre ja existeix (per clau primària), s'actualitza; si no existeix, s'insereix. Evita duplicats sense necessitat de truncar la taula.

-- PostgreSQL: INSERT ON CONFLICT DO UPDATE
INSERT INTO dw.clients (id_client, nom, email, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (id_client)
DO UPDATE SET
    nom = EXCLUDED.nom,
    email = EXCLUDED.email,
    updated_at = EXCLUDED.updated_at;
-- SQL estàndard: MERGE (suportat per Snowflake, BigQuery, Redshift, SQL Server)
MERGE INTO dw.clients AS dest
USING staging.clients_nous AS src
ON dest.id_client = src.id_client
WHEN MATCHED THEN
    UPDATE SET nom = src.nom, email = src.email, updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (id_client, nom, email, updated_at)
    VALUES (src.id_client, src.nom, src.email, CURRENT_TIMESTAMP);

Idempotència: per qué és fonamental

Un pipeline és idempotent quan pot executar-se múltiples vegades sobre les mateixes dades i produir sempre el mateix resultat, sense duplicar dades ni corrompre l'estat del sistema.

La idempotència és essencial perquè els pipelines fallen: una connexió que cau a mitja inserció, un servidor que es reinicia, un timeout que tallada la transacció. Si el pipeline no és idempotent, una re-execució duplica les dades.

Patrons per garantir la idempotència:

  1. Truncate + insert atòmic: dins una transacció, esborrar la partició de temps del dia i reinsertar. Si falla, el rollback deixa la taula intacta.
  2. Upsert per clau primària: l'INSERT ON CONFLICT garanteix que executar-se dues vegades no duplica.
  3. Checkpoints: guardar l'estat de l'última execució exitosa. Si el pipeline falla, reprèn des del checkpoint.
  4. Taules de staging: carregar a una taula temporal i, un cop validada, fer un SWAP atòmic amb la taula de producció.
def load_idempotent(df, conn, taula, data_particiona: str):
    """Càrrega idempotent per partició de data."""
    with conn.begin() as trans:
        try:
            # Eliminar la partició d'aquesta data
            conn.execute(
                f"DELETE FROM {taula} WHERE data_particio = '{data_particiona}'"
            )
            # Inserir les dades noves
            df.to_sql(taula, conn, if_exists='append', index=False)
            trans.commit()
            print(f"Càrrega idempotent completada per {data_particiona}.")
        except Exception as e:
            trans.rollback()
            raise RuntimeError(f"Error en la càrrega: {e}") from e

Qualitat de dades en un pipeline

Un pipeline que carrega dades incorrectes és pitjor que cap pipeline: contamina el Data Warehouse amb dades errònies que alimenten decisions de negoci incorrectes. La validació de qualitat ha de ser un pas explícit del pipeline, no una esperança.

Etapes de validació recomanades:

  1. Validació d'esquema: el fitxer CSV té les columnes esperades? Els tipus de dades coincideixen?
  2. Validació de negoci: l'edat és positiva? El preu és major que zero? El codi postal és vàlid?
  3. Validació de volum: el nombre de registres és raonable (ni zero ni absurdament alt)?
  4. Validació de duplicats: hi ha claus primàries duplicades?
import pandas as pd

def validar_dataset(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Separa registres vàlids de registres invàlids."""
    mascara_valida = (
        df['preu'].notna() & (df['preu'] > 0) &
        df['quantitat'].notna() & (df['quantitat'] > 0) &
        df['email'].str.contains(r'^[^@]+@[^@]+\.[^@]+$', na=False)
    )
    valids = df[mascara_valida].copy()
    invalids = df[~mascara_valida].copy()

    print(f"Registres valids: {len(valids):,}")
    print(f"Registres invalids (quarantena): {len(invalids):,}")

    return valids, invalids

Els registres invàlids s'envien a una dead-letter queue o taula de quarantena per a revisió manual, en lloc de bloquejar tot el pipeline.


Orquestació: per qué cal un orquestrador

Un cron job pot executar un script Python a les 2:00 AM. Però no pot:

  • Reintentar automàticament si el script falla per un error de xarxa puntual.
  • Gestionar dependències entre pipelines: el pipeline de vendes no ha de córrer fins que el de productes hagi acabat.
  • Monitorar el temps d'execució i alertar si un pipeline tarda el doble de l'habitual.
  • Pausar el pipeline si la validació de qualitat falla.
  • Permetre re-executar un pipeline per a una data concreta del passat (backfill).

Un orquestrador de workflows com Apache Airflow, Prefect o Dagster resol tots aquests problemes. Els pipelines es defineixen com a grafs dirigits acíclics (DAG) on cada node és una tasca i les arestes representen dependències.

flowchart TD
    A[extreu_dades_mysql] --> B[valida_schema]
    B --> C[neteja_i_transforma]
    C --> D[carrega_a_redshift]
    D --> E[actualitza_dashboard]
    B -->|validació fallida| F[envia_alerta_slack]

El cron és adequat per a tasques simples i aïllades. Quan el pipeline té múltiples passos amb dependències, la gestió d'errors és crítica i cal auditoria, cal un orquestrador.


AC5074/05/01 — Miniactivitat

Cas d'ús: Una empresa d'e-commerce té una base de dades operacional MySQL amb les taules comandes, productes, clients i categories. L'equip analític vol construir un Data Warehouse a Amazon Redshift per analitzar les vendes per producte, categoria i regió geogràfica.

Contesta per escrit (entre 300 i 500 paraules):

  1. ETL o ELT? Justifica quina estratègia triaries per a aquest cas d'ús. Considera: quin és el volum esperable (e-commerce mitja: 50.000 comandes/dia), quines habilitats té l'equip (strong en SQL), si cal conservar les dades originals, i el cost de la solució.

  2. Disseny del pipeline: Descriu les etapes del pipeline (com a mínim 4 passos). Per a cada etapa, indica: quin component l'executa, quines dades llegeix i quines escriu.

  3. Patró de càrrega: Per a la taula comandes (on arriben comandes noves però les existents mai s'eliminen), quin patró de càrrega (full load, incremental, CDC, upsert) usaries? Per a la taula productes (que canvia rarament però pot tenir actualitzacions de preu i eliminar productes discontinuats), quin patró triaries? Justifica-ho.

  4. Idempotència: Com garantiries que el pipeline de comandes és idempotent? Descriu el mecanisme concret.


Mòdul M5074 Sistemes de Big Data | Institut Sa Palomera (Blanes) | Curs CEIABD 2026-2027