Salta el contingut

Gestió i Emmagatzematge Distribuït de Dades

Introducció

L'emmagatzematge de dades ha evolucionat més en els darrers cinc anys que en les dues dècades anteriors. El 2020, la majoria d'empreses guardaven les seves dades en HDFS o en S3 en format CSV. El 2025, els estàndards de facto del sector —Delta Lake, Apache Iceberg i Apache Hudi— han transformat el Data Lake en un sistema transaccional de primera classe, capaç de garanties ACID, auditoria de canvis i recuperació de versions anteriors.

En aquest tema aprofundim en tots els aspectes de l'emmagatzematge distribuït: des de les entranyes del HDFS (NameNode HA, Federation, quotas) fins al format Delta Lake que gestiona el lakehouse de Databricks, passant per les estratègies d'ingestió de dades amb Debezium, Kafka Connect i Apache NiFi.


1. HDFS en profunditat

1.1 Arquitectura bàsica recordatori

HDFS (Hadoop Distributed File System) divideix els fitxers en blocs (per defecte 128 MB) i els reparteix entre els DataNodes. El NameNode manté el mapa complet del sistema de fitxers: quins blocs componen cada fitxer i en quin DataNode resideix cada còpia. Per defecte, cada bloc té un factor de replicació de 3: una còpia "primària" i dues de seguretat.

1.2 NameNode en Alta Disponibilitat (HA)

El punt únic de fallada del HDFS original era el NameNode: si queia, tot el clúster quedava inaccessible. L'arquitectura NameNode HA resol aquest problema:

graph TD
    ZK[Apache ZooKeeper\nQuorum 3 nodes] --> ACTIVE
    ZK --> STANDBY

    subgraph NameNodes
        ACTIVE[NameNode\nActiu]
        STANDBY[NameNode\nStandby]
    end

    subgraph JournalNodes
        JN1[JournalNode 1]
        JN2[JournalNode 2]
        JN3[JournalNode 3]
    end

    ACTIVE --> JN1
    ACTIVE --> JN2
    ACTIVE --> JN3
    STANDBY --> JN1
    STANDBY --> JN2
    STANDBY --> JN3

    DN1[DataNode 1] --> ACTIVE
    DN2[DataNode 2] --> ACTIVE
    DN3[DataNode 3] --> ACTIVE
    DN1 --> STANDBY
    DN2 --> STANDBY
    DN3 --> STANDBY

Mecanisme de failover automàtic:

  1. El ZooKeeper Failover Controller (ZKFC) monitora constantment la salut del NameNode actiu.
  2. Si el NameNode actiu cau, el ZKFC detecta la fallada i ordena al NameNode Standby que prengui el control.
  3. Els JournalNodes (quorum imparell, mínim 3) mantenen el log de canvis (edit log) compartit entre actiu i standby, garantint que el standby estigui sempre al dia.
  4. El failover es completa en menys de 30 segons (configurable).

1.3 HDFS Federation

Quan el clúster creix molt (milers de milions de fitxers), un únic NameNode pot convertir-se en un coll d'ampolla de memòria: cada fitxer ocupa uns 150 bytes de RAM al NameNode. HDFS Federation permet múltiples NameNodes independents, cadascun gestionant un namespace diferent:

# Exemple de configuració Federation (hdfs-site.xml)
# NameNode 1 gestiona /user i /tmp
# NameNode 2 gestiona /data/warehouse i /data/raw

Cada NameNode manté el seu propi namespace i block pool, però tots comparteixen els mateixos DataNodes.

1.4 Operacions HDFS essencials

# Llistar fitxers
hdfs dfs -ls /data/raw/

# Crear directori
hdfs dfs -mkdir -p /data/joan-garcia/vendes/2024

# Copiar fitxer local a HDFS
hdfs dfs -put vendes_2024.csv /data/joan-garcia/vendes/2024/

# Llegir els primers 100 bytes d'un fitxer
hdfs dfs -cat /data/joan-garcia/vendes/2024/vendes_2024.csv | head -20

# Comprovar integritat de blocs
hdfs fsck /data/joan-garcia/ -files -blocks -locations

# Verificar el safe mode
hdfs dfsadmin -safemode get

# Mostrar l'ús d'espai
hdfs dfs -du -s -h /data/joan-garcia/

# Establir quota de fitxers (màxim 10.000 fitxers al directori)
hdfs dfsadmin -setQuota 10000 /data/joan-garcia/

# Establir quota d'espai (màxim 100 GB)
hdfs dfsadmin -setSpaceQuota 100g /data/joan-garcia/

# Balancejar el clúster si hi ha desbalanceig
hdfs balancer -threshold 10

1.5 Safe Mode

El NameNode arrenca en Safe Mode: un estat on el sistema de fitxers és de lectura i no permet modificacions. En Safe Mode, el NameNode recopila informació de tots els DataNodes per reconstruir el mapa de blocs. El Safe Mode s'abandona quan:

  • Un percentatge mínim de blocs (per defecte 99,9%) han estat reportats com a vàlids pels DataNodes.
  • Ha transcorregut un temps mínim d'estabilització (30 segons per defecte).
# Forçar sortida de safe mode (útil en entorns de test)
hdfs dfsadmin -safemode leave

# Forçar entrada en safe mode (per a manteniment)
hdfs dfsadmin -safemode enter

Safe Mode en producció

Mai forçis la sortida de safe mode en producció fins que no entenguis per qué el NameNode hi ha entrat. Pot indicar que molts blocs estan corruptes o no reportats, i forçar la sortida podria provocar pèrdua de dades.


2. Formats d'emmagatzematge

2.1 Comparativa de formats

Format Orientació Compressió Evolució d'esquema ACID Millor per a
CSV Fila No nativa No No Intercanvi, debugging, fonts simples
JSON / JSON Lines Fila No nativa Flexible No APIs, dades semiestructurades
Avro Fila Snappy, Gzip Sí (Schema Registry) No Kafka, ingestió streaming
Parquet Columna Snappy, Zstd, Gzip Limitada No Analítica, Spark, Hive, Presto
ORC Columna Zlib, Snappy Limitada No Hive optimitzat
Delta Lake Columna (Parquet) Snappy, Zstd Sí, forçada Lakehouse, ETL, ML
Apache Iceberg Columna (Parquet/ORC) Snappy, Zstd Multi-engine, cloud
Apache Hudi Columna (Parquet) Snappy Streaming, upserts

2.2 Parquet: el format columnar per excel·lència

Parquet organitza les dades per columnes en lloc de per files. Això és revolucionari per a l'analítica:

Format CSV (per files):
ID,Nom,Preu,Quantitat
1,Portàtil,699,2
2,Monitor,299,5
3,Teclat,89,10

Format Parquet (per columnes, lògicament):
IDs:       [1, 2, 3]
Noms:      [Portàtil, Monitor, Teclat]
Preus:     [699, 299, 89]
Quantitats:[2, 5, 10]

Si fas SELECT SUM(Preu) FROM productes, Parquet llegeix únicament la columna Preu, ignorant les altres. Un CSV llegeix totes les columnes. En un dataset de 100 columnes on n'uses 3, Parquet pot ser 30x més ràpid.

# Benchmark: CSV vs Parquet amb PySpark
import time
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BenchmarkFormats").getOrCreate()

# Escriure el dataset en ambdós formats
df.write.mode("overwrite").csv("/data/test_csv")
df.write.mode("overwrite").parquet("/data/test_parquet")

# Llegir i agregar: CSV
t0 = time.time()
spark.read.csv("/data/test_csv", header=True, inferSchema=True) \
    .agg({"Preu": "sum"}).collect()
t_csv = time.time() - t0

# Llegir i agregar: Parquet
t0 = time.time()
spark.read.parquet("/data/test_parquet") \
    .agg({"Preu": "sum"}).collect()
t_parquet = time.time() - t0

print(f"CSV:     {t_csv:.2f}s")
print(f"Parquet: {t_parquet:.2f}s")
print(f"Speedup: {t_csv/t_parquet:.1f}x")

2.3 Avro i Schema Registry

Avro és el format preferit per a Kafka perquè: 1. És orientat a files (ideal per a missatges individuals) 2. Inclou l'esquema dins del fitxer (autodescriptiu) 3. Permet evolució d'esquema compatible

El Schema Registry (Confluent) emmagatzema i versiona els esquemes Avro, garantint que productors i consumidors de Kafka sempre parlin el mateix "idioma":

# Productor Kafka amb Avro i Schema Registry
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

schema_str = """
{
    "type": "record",
    "name": "Venda",
    "fields": [
        {"name": "id_venda", "type": "string"},
        {"name": "producte", "type": "string"},
        {"name": "import", "type": "double"},
        {"name": "timestamp", "type": "long"}
    ]
}
"""

avro_producer = AvroProducer({
    'bootstrap.servers': 'kafka:9092',
    'schema.registry.url': 'http://schema-registry:8081'
}, default_value_schema=avro.loads(schema_str))

Miniactivitat

Busca a Internet el paper de Google sobre Parquet (o el document de Cloudera). Explica en 5 línies per qué el format columnar és millor per a consultes analítiques (SELECT d'algunes columnes) i pitjor per a transaccions (UPDATE d'una fila).


3. Delta Lake: el format lakehouse líder 2025

3.1 Qué és Delta Lake

Delta Lake és una capa de transaccions ACID sobre el sistema de fitxers (S3, HDFS, ADLS). Sota el capó, les taules Delta Lake són fitxers Parquet més un directori _delta_log que conté el historial de transaccions en format JSON.

s3://datalake/vendes/
├── _delta_log/
│   ├── 00000000000000000000.json   ← Transacció 0: CREATE TABLE
│   ├── 00000000000000000001.json   ← Transacció 1: INSERT 1M files
│   ├── 00000000000000000002.json   ← Transacció 2: DELETE fraudulentes
│   └── 00000000000000000003.json   ← Transacció 3: OPTIMIZE
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
└── ...

3.2 Transaccions ACID sobre object storage

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Crear una taula Delta Lake
df_vendes = spark.createDataFrame([
    ("V001", "2024-01-15", "Portàtil", 699.99, 2),
    ("V002", "2024-01-16", "Monitor", 299.99, 1),
    ("V003", "2024-01-17", "Teclat", 89.99, 5),
], ["id_venda", "data", "producte", "preu", "quantitat"])

df_vendes.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/data/delta/vendes")

print("Taula Delta Lake creada correctament.")

# Llegir la taula
df_delta = spark.read.format("delta").load("/data/delta/vendes")
df_delta.show()

3.3 Time Travel: auditoria i recuperació

# Llegir la versió 0 de la taula (estat inicial)
df_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/data/delta/vendes")

# Llegir l'estat d'ahir (per timestamp)
df_ahir = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-14 00:00:00") \
    .load("/data/delta/vendes")

# Historial de transaccions
deltaTable = DeltaTable.forPath(spark, "/data/delta/vendes")
deltaTable.history().show(truncate=False)

# Restaurar a la versió 1
deltaTable.restoreToVersion(1)
print("Taula restaurada a la versió 1.")

# Restaurar a un timestamp concret
deltaTable.restoreToTimestamp("2024-01-15 12:00:00")

3.4 Operació MERGE (Upsert)

El MERGE és la operació més potent de Delta Lake: actualitza, insereix o elimina files en una sola operació atòmica. És essencial per a pipelines de CDC.

# Upsert: actualitzar vendes existents o inserir noves
df_nous_registres = spark.createDataFrame([
    ("V001", "2024-01-15", "Portàtil", 749.99, 2),  # Actualitzar preu
    ("V004", "2024-01-18", "Ratolí", 49.99, 3),      # Nou registre
], ["id_venda", "data", "producte", "preu", "quantitat"])

deltaTable = DeltaTable.forPath(spark, "/data/delta/vendes")

deltaTable.alias("vendes_existents") \
    .merge(
        df_nous_registres.alias("nous"),
        "vendes_existents.id_venda = nous.id_venda"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

print("MERGE completat. Verificació:")
spark.read.format("delta").load("/data/delta/vendes").show()

3.5 OPTIMIZE i Z-ordering

Amb el temps, una taula Delta Lake s'omple de fitxers Parquet petits (el problema dels "small files"), especialment si es fan molts inserts incremental. OPTIMIZE consolida els fitxers petits en fitxers més grans. Z-ORDER BY reordena les dades per maximitzar la poda (data skipping) per als camps de filtre habituals.

# Optimitzar i Z-order per zona de recollida (camp habitual en filtres)
spark.sql("""
    OPTIMIZE delta.`/data/delta/vendes`
    ZORDER BY (data, producte)
""")

# Veure estadístiques de la taula
spark.sql("DESCRIBE DETAIL delta.`/data/delta/vendes`").show(truncate=False)

# Netejar fitxers antics (>7 dies) per a estalvi d'espai
deltaTable.vacuum(retentionHours=168)  # 7 dies = 168 hores

VACUUM i Time Travel

VACUUM elimina fitxers Parquet que ja no formen part de la versió actual. Si fas un VACUUM(0), perdràs la capacitat de Time Travel. Per defecte, Delta Lake reté 7 dies d'historial. No redueixis aquest valor sense consultar amb l'equip.

3.6 Schema Evolution i Schema Enforcement

# Schema Enforcement: per defecte, Delta Lake rebutja esquemes que no coincideixen
try:
    df_incorrecte = spark.createDataFrame([
        ("V005", "2024-01-19", "Auriculars", 79.99, 1, "vermell"),
    ], ["id_venda", "data", "producte", "preu", "quantitat", "color"])

    df_incorrecte.write.format("delta").mode("append") \
        .save("/data/delta/vendes")
except Exception as e:
    print(f"Error d'esquema (esperat): {e}")

# Schema Evolution: permetre afegir noves columnes
df_incorrecte.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/data/delta/vendes")

print("Esquema evolucionat correctament.")
spark.read.format("delta").load("/data/delta/vendes").printSchema()

3.7 Change Data Feed

El Change Data Feed (CDF) permet capturar els canvis (inserts, updates, deletes) d'una taula Delta Lake i propagar-los a altres sistemes, de forma similar a Debezium però dins del lakehouse.

# Activar CDF a la taula
spark.sql("""
    ALTER TABLE delta.`/data/delta/vendes`
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Llegir els canvis des de la versió 2
df_canvis = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 2) \
    .load("/data/delta/vendes")

# Filtrar per tipus de canvi
df_canvis.filter("_change_type = 'update_postimage'").show()
df_canvis.filter("_change_type = 'delete'").show()

4. Apache Iceberg

4.1 Qué és i per qué existeix

Apache Iceberg és un format de taula obert creat per Netflix i donat a la Apache Software Foundation. La seva filosofia és diferent de Delta Lake: mentre Delta Lake va ser dissenyat per Databricks per als seus casos d'ús, Iceberg va ser dissenyat com a format neutral que funciona amb Spark, Flink, Trino, Presto, Hive i qualsevol engine compatible.

4.2 Hidden Partitioning i Partition Evolution

El particionament tradicional de Hive o Delta Lake exposa la lògica de particionament a l'usuari: cal escriure WHERE year=2024 AND month=1 per beneficiar-se del partition pruning. Iceberg implementa Hidden Partitioning: l'engine dedueix automàticament les particions sense que l'usuari hagi d'escriure predicats especials.

# Iceberg amb Spark: hidden partitioning per data
spark.sql("""
    CREATE TABLE catalog.vendes_iceberg (
        id_venda STRING,
        data TIMESTAMP,
        producte STRING,
        preu DOUBLE
    ) USING ICEBERG
    PARTITIONED BY (days(data))
""")
-- La particio 'days(data)' s'aplica automaticament sense canviar les queries

Partition Evolution: Iceberg permet canviar l'esquema de particionament sense reescriure les dades existents. Pots canviar de days(data) a months(data) i les dades antigues seguiran sent llegibles amb el particionament antic.

4.3 Branching i Tagging: Git per a dades

Iceberg 1.x introdueix branches i tags inspirats en Git:

-- Crear una branca per a una transformació experimental
ALTER TABLE catalog.vendes_iceberg
CREATE BRANCH transformacio_experimental;

-- Fer canvis sobre la branca (no afecta main)
INSERT INTO catalog.vendes_iceberg.branch_transformacio_experimental
SELECT * FROM vendes_raw WHERE preu > 0;

-- Si tot és correcte, fusionar la branca a main (via Spark)
-- Si no, simplement eliminar la branca
ALTER TABLE catalog.vendes_iceberg DROP BRANCH transformacio_experimental;

4.4 Delta Lake vs Apache Iceberg: quan usar cada un

Aspecte Delta Lake Apache Iceberg
Engine principal Spark (Databricks) Multi-engine (Spark, Flink, Trino)
Ecosistema Databricks, AWS EMR AWS Glue, Snowflake, Dremio
Community Databricks-led Apache Foundation
Time Travel
Branching No (a partir de Liquid) Sí (des de v1.x)
Hidden Partitioning No
Hidden Compaction Automatic (OPTIMIZE) Via rewriteDataFiles

Quan usar Delta Lake: equips que ja usen Databricks o Spark com a motor principal. Millor integració amb Unity Catalog.

Quan usar Iceberg: entorns multi-engine (Spark + Flink + Trino). AWS Glue. Projectes que necessiten portabilitat màxima.

Miniactivitat

Imagina que treballes per a una empresa que usa Spark per a ETL i Trino per a consultes de BI. Quin format triaries, Delta Lake o Iceberg? Argumenta la teva resposta en 5 línies.


5. Apache Hudi

5.1 Copy-on-Write vs Merge-on-Read

Apache Hudi (Hadoop Upserts Deletes and Incrementals) és el format especialitzat per a casos d'ús on es fan molts upserts i deletes —típicament pipelines de CDC o compliment del RGPD.

Hudi ofereix dos tipus de taules:

Copy-on-Write (CoW): - Quan actualitzes o elimines una fila, Hudi reescriu el fitxer Parquet complet que conté aquella fila. - Avantatge: Lectures molt ràpides (fitxers sempre complets i consolidats). - Inconvenient: Escriptures lentes i costoses (reescriure fitxers sencers).

Merge-on-Read (MoR): - Quan actualitzes o elimines, Hudi escriu un fitxer de delta (Avro) petit que registra el canvi. - Avantatge: Escriptures molt ràpides. - Inconvenient: Lectures més lentes (cal fusionar el fitxer base amb els deltas en temps de lectura). Compaction periòdic necessari.

# Escriure dades amb Hudi (MoR)
hudi_options = {
    'hoodie.table.name': 'vendes_hudi',
    'hoodie.datasource.write.recordkey.field': 'id_venda',
    'hoodie.datasource.write.precombine.field': 'data_modificacio',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.upsert.shuffle.parallelism': 4,
}

df_vendes.write \
    .format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save("/data/hudi/vendes")

5.2 Deletes eficients per a compliment del RGPD

# Eliminar les dades d'un client concret (RGPD - dret a l'oblit)
# Primer creem un DataFrame amb les claus a eliminar
df_a_eliminar = spark.createDataFrame([
    ("C045",),
    ("C078",),
], ["id_client"])

# Hudi soft delete (marca com a eliminat sense reescriure físicament)
hudi_delete_options = {
    'hoodie.table.name': 'clients_hudi',
    'hoodie.datasource.write.recordkey.field': 'id_client',
    'hoodie.datasource.write.operation': 'delete',
}

df_a_eliminar.write \
    .format("hudi") \
    .options(**hudi_delete_options) \
    .mode("append") \
    .save("/data/hudi/clients")

print("Clients eliminats per compliment del RGPD.")

6. Ingestió de dades

6.1 Kafka Connect

Kafka Connect és un framework per a connectar Kafka amb sistemes externs (bases de dades, sistemes de fitxers, APIs) sense escriure codi. Es configura amb fitxers JSON.

Connector JDBC (Source): llegir des de PostgreSQL

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/vendes_db",
    "connection.user": "alumne",
    "connection.password": "iabd2025",
    "mode": "incrementing",
    "incrementing.column.name": "id_venda",
    "topic.prefix": "pg.",
    "poll.interval.ms": "10000",
    "table.whitelist": "vendes,clients,productes"
  }
}
# Registrar el connector via API REST de Kafka Connect
curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-source-connector.json

# Verificar l'estat del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status

6.2 Debezium: Change Data Capture des de PostgreSQL

Debezium captura cada operació INSERT, UPDATE i DELETE de la base de dades transaccional i la publica a Kafka com a un event en temps real. Funciona llegint el WAL (Write-Ahead Log) de PostgreSQL.

{
  "name": "debezium-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "alumne",
    "database.password": "iabd2025",
    "database.dbname": "vendes_db",
    "database.server.name": "pg-vendes",
    "plugin.name": "pgoutput",
    "table.include.list": "public.vendes,public.clients",
    "publication.autocreate.mode": "filtered",
    "snapshot.mode": "initial"
  }
}

Estructura d'un missatge Debezium a Kafka:

{
  "before": null,
  "after": {
    "id_venda": "V001",
    "producte": "Portàtil",
    "preu": 699.99,
    "data": 1705276800000
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "postgresql",
    "name": "pg-vendes",
    "ts_ms": 1705276812345,
    "db": "vendes_db",
    "table": "vendes",
    "txId": 42,
    "lsn": 12345678
  },
  "op": "c",
  "ts_ms": 1705276812789
}

El camp op indica el tipus d'operació: c (create/insert), u (update), d (delete), r (read/snapshot).

6.3 Apache NiFi

Apache NiFi ofereix una interfície visual per dissenyar pipelines de dades sense codi. Un pipeline típic:

[GetFile] → [ConvertRecord CSV→JSON] → [PublishKafkaRecord] → [Kafka Topic]

O per a casos d'ús d'ETL senzills:

[QueryDatabaseTable] → [ConvertAvroToJSON] → [PutHDFS]

NiFi és especialment útil per a: - Ingestió de fonts heterogènies (FTP, HTTP, bases de dades, S3) - Transformacions lleugeres sense necessitat de Spark - Routing condicional: si les dades compleixen una condició, van per un camí; si no, per un altre


7. Patrons d'ingestió

7.1 Batch vs Micro-batch vs Streaming

Patró Eina Latència Ús típic
Batch Spark, Hive, dbt Hores/dies Informes diaris, ETL nocturn
Micro-batch Spark Structured Streaming Segons/minuts Near-real-time, Kafka → Delta Lake
Streaming Apache Flink Mil·lisegons Detecció de frau, IoT en temps real

7.2 Exemple complet: pipeline CDC amb Debezium + Kafka → Delta Lake

flowchart LR
    PG[PostgreSQL\nvendes_db] -->|WAL| DEB[Debezium\nConnector]
    DEB -->|Events CDC| KAF[Kafka\nTopic: pg-vendes.vendes]
    KAF -->|Spark Structured Streaming| SS[Spark\nStreaming Job]
    SS -->|MERGE / Upsert| DL[Delta Lake\n/data/delta/vendes]
    DL -->|SQL| META[Metabase\nDashboard]
# Pipeline Spark Structured Streaming: Kafka → Delta Lake
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, LongType, StructType, StructField
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("CDC_Debezium_to_DeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Esquema del missatge Debezium (camp "after")
schema_venda = StructType([
    StructField("id_venda", StringType()),
    StructField("producte", StringType()),
    StructField("preu", DoubleType()),
    StructField("quantitat", LongType()),
])

schema_debezium = StructType([
    StructField("op", StringType()),
    StructField("after", schema_venda),
    StructField("before", schema_venda),
])

# Llegir de Kafka
df_kafka = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "pg-vendes.public.vendes") \
    .option("startingOffsets", "latest") \
    .load()

# Parsejar el JSON de Debezium
df_parsed = df_kafka \
    .select(F.from_json(
        F.col("value").cast("string"),
        schema_debezium
    ).alias("data")) \
    .select(
        F.col("data.op").alias("operacio"),
        F.col("data.after.*")
    ) \
    .filter(F.col("operacio").isin(["c", "u", "r"]))

# Funció per a cada micro-batch: MERGE a Delta Lake
RUTA_DELTA = "/data/delta/vendes"

def upsert_to_delta(df_batch, batch_id):
    if df_batch.count() == 0:
        return

    deltaTable = DeltaTable.forPath(spark, RUTA_DELTA)
    deltaTable.alias("target") \
        .merge(
            df_batch.alias("source"),
            "target.id_venda = source.id_venda"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

    print(f"Batch {batch_id}: {df_batch.count()} registres processats.")

# Iniciar el stream
query = df_parsed.writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "/data/checkpoints/vendes_cdc") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Miniactivitat

Revisa el codi del pipeline CDC. Respon:

  1. Per qué filtrem operacio.isin(["c", "u", "r"]) i no processem els "d" (deletes)?
  2. Qué fa el checkpointLocation? Qué passaria si el job Spark cau i es reinicia sense checkpoint?
  3. El trigger(processingTime="30 seconds") fa que el job processi dades cada 30 segons. Quin és el trade-off entre un interval curt (5s) i un de llarg (5min)?

8. Exercici pràctic final

Implementa el pipeline complet CDC descrit a la secció 7.2 en local amb Docker:

  1. Levanta un PostgreSQL amb Docker
  2. Crea la taula vendes i insereix 100 registres de prova
  3. Configura Debezium per capturar els canvis
  4. Arrenca el job Spark Structured Streaming que llegeix de Kafka i escriu a Delta Lake
  5. Fes 10 inserts i 5 updates a PostgreSQL i verifica que els canvis arriben a Delta Lake en menys de 60 segons
  6. Usa Time Travel per veure l'estat inicial (versió 0) de la taula Delta Lake

Documenta cada pas amb captures de pantalla i respon: quina latència has observat entre l'insert a PostgreSQL i la disponibilitat a Delta Lake?


Tema 1 | Mòdul 5075 Big Data Aplicat | Institut Sa Palomera (Blanes) | Curs IABD 2026-2027