Gestió i Emmagatzematge Distribuït de Dades
Introducció
L'emmagatzematge de dades ha evolucionat més en els darrers cinc anys que en les dues dècades anteriors. El 2020, la majoria d'empreses guardaven les seves dades en HDFS o en S3 en format CSV. El 2025, els estàndards de facto del sector —Delta Lake, Apache Iceberg i Apache Hudi— han transformat el Data Lake en un sistema transaccional de primera classe, capaç de garanties ACID, auditoria de canvis i recuperació de versions anteriors.
En aquest tema aprofundim en tots els aspectes de l'emmagatzematge distribuït: des de les entranyes del HDFS (NameNode HA, Federation, quotas) fins al format Delta Lake que gestiona el lakehouse de Databricks, passant per les estratègies d'ingestió de dades amb Debezium, Kafka Connect i Apache NiFi.
1. HDFS en profunditat
1.1 Arquitectura bàsica recordatori
HDFS (Hadoop Distributed File System) divideix els fitxers en blocs (per defecte 128 MB) i els reparteix entre els DataNodes. El NameNode manté el mapa complet del sistema de fitxers: quins blocs componen cada fitxer i en quin DataNode resideix cada còpia. Per defecte, cada bloc té un factor de replicació de 3: una còpia "primària" i dues de seguretat.
1.2 NameNode en Alta Disponibilitat (HA)
El punt únic de fallada del HDFS original era el NameNode: si queia, tot el clúster quedava inaccessible. L'arquitectura NameNode HA resol aquest problema:
graph TD
ZK[Apache ZooKeeper\nQuorum 3 nodes] --> ACTIVE
ZK --> STANDBY
subgraph NameNodes
ACTIVE[NameNode\nActiu]
STANDBY[NameNode\nStandby]
end
subgraph JournalNodes
JN1[JournalNode 1]
JN2[JournalNode 2]
JN3[JournalNode 3]
end
ACTIVE --> JN1
ACTIVE --> JN2
ACTIVE --> JN3
STANDBY --> JN1
STANDBY --> JN2
STANDBY --> JN3
DN1[DataNode 1] --> ACTIVE
DN2[DataNode 2] --> ACTIVE
DN3[DataNode 3] --> ACTIVE
DN1 --> STANDBY
DN2 --> STANDBY
DN3 --> STANDBY
Mecanisme de failover automàtic:
- El ZooKeeper Failover Controller (ZKFC) monitora constantment la salut del NameNode actiu.
- Si el NameNode actiu cau, el ZKFC detecta la fallada i ordena al NameNode Standby que prengui el control.
- Els JournalNodes (quorum imparell, mínim 3) mantenen el log de canvis (
edit log) compartit entre actiu i standby, garantint que el standby estigui sempre al dia. - El failover es completa en menys de 30 segons (configurable).
1.3 HDFS Federation
Quan el clúster creix molt (milers de milions de fitxers), un únic NameNode pot convertir-se en un coll d'ampolla de memòria: cada fitxer ocupa uns 150 bytes de RAM al NameNode. HDFS Federation permet múltiples NameNodes independents, cadascun gestionant un namespace diferent:
# Exemple de configuració Federation (hdfs-site.xml)
# NameNode 1 gestiona /user i /tmp
# NameNode 2 gestiona /data/warehouse i /data/raw
Cada NameNode manté el seu propi namespace i block pool, però tots comparteixen els mateixos DataNodes.
1.4 Operacions HDFS essencials
# Llistar fitxers
hdfs dfs -ls /data/raw/
# Crear directori
hdfs dfs -mkdir -p /data/joan-garcia/vendes/2024
# Copiar fitxer local a HDFS
hdfs dfs -put vendes_2024.csv /data/joan-garcia/vendes/2024/
# Llegir els primers 100 bytes d'un fitxer
hdfs dfs -cat /data/joan-garcia/vendes/2024/vendes_2024.csv | head -20
# Comprovar integritat de blocs
hdfs fsck /data/joan-garcia/ -files -blocks -locations
# Verificar el safe mode
hdfs dfsadmin -safemode get
# Mostrar l'ús d'espai
hdfs dfs -du -s -h /data/joan-garcia/
# Establir quota de fitxers (màxim 10.000 fitxers al directori)
hdfs dfsadmin -setQuota 10000 /data/joan-garcia/
# Establir quota d'espai (màxim 100 GB)
hdfs dfsadmin -setSpaceQuota 100g /data/joan-garcia/
# Balancejar el clúster si hi ha desbalanceig
hdfs balancer -threshold 10
1.5 Safe Mode
El NameNode arrenca en Safe Mode: un estat on el sistema de fitxers és de lectura i no permet modificacions. En Safe Mode, el NameNode recopila informació de tots els DataNodes per reconstruir el mapa de blocs. El Safe Mode s'abandona quan:
- Un percentatge mínim de blocs (per defecte 99,9%) han estat reportats com a vàlids pels DataNodes.
- Ha transcorregut un temps mínim d'estabilització (30 segons per defecte).
# Forçar sortida de safe mode (útil en entorns de test)
hdfs dfsadmin -safemode leave
# Forçar entrada en safe mode (per a manteniment)
hdfs dfsadmin -safemode enter
Safe Mode en producció
Mai forçis la sortida de safe mode en producció fins que no entenguis per qué el NameNode hi ha entrat. Pot indicar que molts blocs estan corruptes o no reportats, i forçar la sortida podria provocar pèrdua de dades.
2. Formats d'emmagatzematge
2.1 Comparativa de formats
| Format | Orientació | Compressió | Evolució d'esquema | ACID | Millor per a |
|---|---|---|---|---|---|
| CSV | Fila | No nativa | No | No | Intercanvi, debugging, fonts simples |
| JSON / JSON Lines | Fila | No nativa | Flexible | No | APIs, dades semiestructurades |
| Avro | Fila | Snappy, Gzip | Sí (Schema Registry) | No | Kafka, ingestió streaming |
| Parquet | Columna | Snappy, Zstd, Gzip | Limitada | No | Analítica, Spark, Hive, Presto |
| ORC | Columna | Zlib, Snappy | Limitada | No | Hive optimitzat |
| Delta Lake | Columna (Parquet) | Snappy, Zstd | Sí, forçada | Sí | Lakehouse, ETL, ML |
| Apache Iceberg | Columna (Parquet/ORC) | Snappy, Zstd | Sí | Sí | Multi-engine, cloud |
| Apache Hudi | Columna (Parquet) | Snappy | Sí | Sí | Streaming, upserts |
2.2 Parquet: el format columnar per excel·lència
Parquet organitza les dades per columnes en lloc de per files. Això és revolucionari per a l'analítica:
Format CSV (per files):
ID,Nom,Preu,Quantitat
1,Portàtil,699,2
2,Monitor,299,5
3,Teclat,89,10
Format Parquet (per columnes, lògicament):
IDs: [1, 2, 3]
Noms: [Portàtil, Monitor, Teclat]
Preus: [699, 299, 89]
Quantitats:[2, 5, 10]
Si fas SELECT SUM(Preu) FROM productes, Parquet llegeix únicament la columna Preu, ignorant les altres. Un CSV llegeix totes les columnes. En un dataset de 100 columnes on n'uses 3, Parquet pot ser 30x més ràpid.
# Benchmark: CSV vs Parquet amb PySpark
import time
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BenchmarkFormats").getOrCreate()
# Escriure el dataset en ambdós formats
df.write.mode("overwrite").csv("/data/test_csv")
df.write.mode("overwrite").parquet("/data/test_parquet")
# Llegir i agregar: CSV
t0 = time.time()
spark.read.csv("/data/test_csv", header=True, inferSchema=True) \
.agg({"Preu": "sum"}).collect()
t_csv = time.time() - t0
# Llegir i agregar: Parquet
t0 = time.time()
spark.read.parquet("/data/test_parquet") \
.agg({"Preu": "sum"}).collect()
t_parquet = time.time() - t0
print(f"CSV: {t_csv:.2f}s")
print(f"Parquet: {t_parquet:.2f}s")
print(f"Speedup: {t_csv/t_parquet:.1f}x")
2.3 Avro i Schema Registry
Avro és el format preferit per a Kafka perquè: 1. És orientat a files (ideal per a missatges individuals) 2. Inclou l'esquema dins del fitxer (autodescriptiu) 3. Permet evolució d'esquema compatible
El Schema Registry (Confluent) emmagatzema i versiona els esquemes Avro, garantint que productors i consumidors de Kafka sempre parlin el mateix "idioma":
# Productor Kafka amb Avro i Schema Registry
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
schema_str = """
{
"type": "record",
"name": "Venda",
"fields": [
{"name": "id_venda", "type": "string"},
{"name": "producte", "type": "string"},
{"name": "import", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
"""
avro_producer = AvroProducer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081'
}, default_value_schema=avro.loads(schema_str))
Miniactivitat
Busca a Internet el paper de Google sobre Parquet (o el document de Cloudera). Explica en 5 línies per qué el format columnar és millor per a consultes analítiques (SELECT d'algunes columnes) i pitjor per a transaccions (UPDATE d'una fila).
3. Delta Lake: el format lakehouse líder 2025
3.1 Qué és Delta Lake
Delta Lake és una capa de transaccions ACID sobre el sistema de fitxers (S3, HDFS, ADLS). Sota el capó, les taules Delta Lake són fitxers Parquet més un directori _delta_log que conté el historial de transaccions en format JSON.
s3://datalake/vendes/
├── _delta_log/
│ ├── 00000000000000000000.json ← Transacció 0: CREATE TABLE
│ ├── 00000000000000000001.json ← Transacció 1: INSERT 1M files
│ ├── 00000000000000000002.json ← Transacció 2: DELETE fraudulentes
│ └── 00000000000000000003.json ← Transacció 3: OPTIMIZE
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
└── ...
3.2 Transaccions ACID sobre object storage
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Crear una taula Delta Lake
df_vendes = spark.createDataFrame([
("V001", "2024-01-15", "Portàtil", 699.99, 2),
("V002", "2024-01-16", "Monitor", 299.99, 1),
("V003", "2024-01-17", "Teclat", 89.99, 5),
], ["id_venda", "data", "producte", "preu", "quantitat"])
df_vendes.write \
.format("delta") \
.mode("overwrite") \
.save("/data/delta/vendes")
print("Taula Delta Lake creada correctament.")
# Llegir la taula
df_delta = spark.read.format("delta").load("/data/delta/vendes")
df_delta.show()
3.3 Time Travel: auditoria i recuperació
# Llegir la versió 0 de la taula (estat inicial)
df_v0 = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/data/delta/vendes")
# Llegir l'estat d'ahir (per timestamp)
df_ahir = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-14 00:00:00") \
.load("/data/delta/vendes")
# Historial de transaccions
deltaTable = DeltaTable.forPath(spark, "/data/delta/vendes")
deltaTable.history().show(truncate=False)
# Restaurar a la versió 1
deltaTable.restoreToVersion(1)
print("Taula restaurada a la versió 1.")
# Restaurar a un timestamp concret
deltaTable.restoreToTimestamp("2024-01-15 12:00:00")
3.4 Operació MERGE (Upsert)
El MERGE és la operació més potent de Delta Lake: actualitza, insereix o elimina files en una sola operació atòmica. És essencial per a pipelines de CDC.
# Upsert: actualitzar vendes existents o inserir noves
df_nous_registres = spark.createDataFrame([
("V001", "2024-01-15", "Portàtil", 749.99, 2), # Actualitzar preu
("V004", "2024-01-18", "Ratolí", 49.99, 3), # Nou registre
], ["id_venda", "data", "producte", "preu", "quantitat"])
deltaTable = DeltaTable.forPath(spark, "/data/delta/vendes")
deltaTable.alias("vendes_existents") \
.merge(
df_nous_registres.alias("nous"),
"vendes_existents.id_venda = nous.id_venda"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
print("MERGE completat. Verificació:")
spark.read.format("delta").load("/data/delta/vendes").show()
3.5 OPTIMIZE i Z-ordering
Amb el temps, una taula Delta Lake s'omple de fitxers Parquet petits (el problema dels "small files"), especialment si es fan molts inserts incremental. OPTIMIZE consolida els fitxers petits en fitxers més grans. Z-ORDER BY reordena les dades per maximitzar la poda (data skipping) per als camps de filtre habituals.
# Optimitzar i Z-order per zona de recollida (camp habitual en filtres)
spark.sql("""
OPTIMIZE delta.`/data/delta/vendes`
ZORDER BY (data, producte)
""")
# Veure estadístiques de la taula
spark.sql("DESCRIBE DETAIL delta.`/data/delta/vendes`").show(truncate=False)
# Netejar fitxers antics (>7 dies) per a estalvi d'espai
deltaTable.vacuum(retentionHours=168) # 7 dies = 168 hores
VACUUM i Time Travel
VACUUM elimina fitxers Parquet que ja no formen part de la versió actual. Si fas un VACUUM(0), perdràs la capacitat de Time Travel. Per defecte, Delta Lake reté 7 dies d'historial. No redueixis aquest valor sense consultar amb l'equip.
3.6 Schema Evolution i Schema Enforcement
# Schema Enforcement: per defecte, Delta Lake rebutja esquemes que no coincideixen
try:
df_incorrecte = spark.createDataFrame([
("V005", "2024-01-19", "Auriculars", 79.99, 1, "vermell"),
], ["id_venda", "data", "producte", "preu", "quantitat", "color"])
df_incorrecte.write.format("delta").mode("append") \
.save("/data/delta/vendes")
except Exception as e:
print(f"Error d'esquema (esperat): {e}")
# Schema Evolution: permetre afegir noves columnes
df_incorrecte.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/data/delta/vendes")
print("Esquema evolucionat correctament.")
spark.read.format("delta").load("/data/delta/vendes").printSchema()
3.7 Change Data Feed
El Change Data Feed (CDF) permet capturar els canvis (inserts, updates, deletes) d'una taula Delta Lake i propagar-los a altres sistemes, de forma similar a Debezium però dins del lakehouse.
# Activar CDF a la taula
spark.sql("""
ALTER TABLE delta.`/data/delta/vendes`
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Llegir els canvis des de la versió 2
df_canvis = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 2) \
.load("/data/delta/vendes")
# Filtrar per tipus de canvi
df_canvis.filter("_change_type = 'update_postimage'").show()
df_canvis.filter("_change_type = 'delete'").show()
4. Apache Iceberg
4.1 Qué és i per qué existeix
Apache Iceberg és un format de taula obert creat per Netflix i donat a la Apache Software Foundation. La seva filosofia és diferent de Delta Lake: mentre Delta Lake va ser dissenyat per Databricks per als seus casos d'ús, Iceberg va ser dissenyat com a format neutral que funciona amb Spark, Flink, Trino, Presto, Hive i qualsevol engine compatible.
4.2 Hidden Partitioning i Partition Evolution
El particionament tradicional de Hive o Delta Lake exposa la lògica de particionament a l'usuari: cal escriure WHERE year=2024 AND month=1 per beneficiar-se del partition pruning. Iceberg implementa Hidden Partitioning: l'engine dedueix automàticament les particions sense que l'usuari hagi d'escriure predicats especials.
# Iceberg amb Spark: hidden partitioning per data
spark.sql("""
CREATE TABLE catalog.vendes_iceberg (
id_venda STRING,
data TIMESTAMP,
producte STRING,
preu DOUBLE
) USING ICEBERG
PARTITIONED BY (days(data))
""")
-- La particio 'days(data)' s'aplica automaticament sense canviar les queries
Partition Evolution: Iceberg permet canviar l'esquema de particionament sense reescriure les dades existents. Pots canviar de days(data) a months(data) i les dades antigues seguiran sent llegibles amb el particionament antic.
4.3 Branching i Tagging: Git per a dades
Iceberg 1.x introdueix branches i tags inspirats en Git:
-- Crear una branca per a una transformació experimental
ALTER TABLE catalog.vendes_iceberg
CREATE BRANCH transformacio_experimental;
-- Fer canvis sobre la branca (no afecta main)
INSERT INTO catalog.vendes_iceberg.branch_transformacio_experimental
SELECT * FROM vendes_raw WHERE preu > 0;
-- Si tot és correcte, fusionar la branca a main (via Spark)
-- Si no, simplement eliminar la branca
ALTER TABLE catalog.vendes_iceberg DROP BRANCH transformacio_experimental;
4.4 Delta Lake vs Apache Iceberg: quan usar cada un
| Aspecte | Delta Lake | Apache Iceberg |
|---|---|---|
| Engine principal | Spark (Databricks) | Multi-engine (Spark, Flink, Trino) |
| Ecosistema | Databricks, AWS EMR | AWS Glue, Snowflake, Dremio |
| Community | Databricks-led | Apache Foundation |
| Time Travel | Sí | Sí |
| Branching | No (a partir de Liquid) | Sí (des de v1.x) |
| Hidden Partitioning | No | Sí |
| Hidden Compaction | Automatic (OPTIMIZE) | Via rewriteDataFiles |
Quan usar Delta Lake: equips que ja usen Databricks o Spark com a motor principal. Millor integració amb Unity Catalog.
Quan usar Iceberg: entorns multi-engine (Spark + Flink + Trino). AWS Glue. Projectes que necessiten portabilitat màxima.
Miniactivitat
Imagina que treballes per a una empresa que usa Spark per a ETL i Trino per a consultes de BI. Quin format triaries, Delta Lake o Iceberg? Argumenta la teva resposta en 5 línies.
5. Apache Hudi
5.1 Copy-on-Write vs Merge-on-Read
Apache Hudi (Hadoop Upserts Deletes and Incrementals) és el format especialitzat per a casos d'ús on es fan molts upserts i deletes —típicament pipelines de CDC o compliment del RGPD.
Hudi ofereix dos tipus de taules:
Copy-on-Write (CoW): - Quan actualitzes o elimines una fila, Hudi reescriu el fitxer Parquet complet que conté aquella fila. - Avantatge: Lectures molt ràpides (fitxers sempre complets i consolidats). - Inconvenient: Escriptures lentes i costoses (reescriure fitxers sencers).
Merge-on-Read (MoR): - Quan actualitzes o elimines, Hudi escriu un fitxer de delta (Avro) petit que registra el canvi. - Avantatge: Escriptures molt ràpides. - Inconvenient: Lectures més lentes (cal fusionar el fitxer base amb els deltas en temps de lectura). Compaction periòdic necessari.
# Escriure dades amb Hudi (MoR)
hudi_options = {
'hoodie.table.name': 'vendes_hudi',
'hoodie.datasource.write.recordkey.field': 'id_venda',
'hoodie.datasource.write.precombine.field': 'data_modificacio',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.upsert.shuffle.parallelism': 4,
}
df_vendes.write \
.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("/data/hudi/vendes")
5.2 Deletes eficients per a compliment del RGPD
# Eliminar les dades d'un client concret (RGPD - dret a l'oblit)
# Primer creem un DataFrame amb les claus a eliminar
df_a_eliminar = spark.createDataFrame([
("C045",),
("C078",),
], ["id_client"])
# Hudi soft delete (marca com a eliminat sense reescriure físicament)
hudi_delete_options = {
'hoodie.table.name': 'clients_hudi',
'hoodie.datasource.write.recordkey.field': 'id_client',
'hoodie.datasource.write.operation': 'delete',
}
df_a_eliminar.write \
.format("hudi") \
.options(**hudi_delete_options) \
.mode("append") \
.save("/data/hudi/clients")
print("Clients eliminats per compliment del RGPD.")
6. Ingestió de dades
6.1 Kafka Connect
Kafka Connect és un framework per a connectar Kafka amb sistemes externs (bases de dades, sistemes de fitxers, APIs) sense escriure codi. Es configura amb fitxers JSON.
Connector JDBC (Source): llegir des de PostgreSQL
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/vendes_db",
"connection.user": "alumne",
"connection.password": "iabd2025",
"mode": "incrementing",
"incrementing.column.name": "id_venda",
"topic.prefix": "pg.",
"poll.interval.ms": "10000",
"table.whitelist": "vendes,clients,productes"
}
}
# Registrar el connector via API REST de Kafka Connect
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Verificar l'estat del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
6.2 Debezium: Change Data Capture des de PostgreSQL
Debezium captura cada operació INSERT, UPDATE i DELETE de la base de dades transaccional i la publica a Kafka com a un event en temps real. Funciona llegint el WAL (Write-Ahead Log) de PostgreSQL.
{
"name": "debezium-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "alumne",
"database.password": "iabd2025",
"database.dbname": "vendes_db",
"database.server.name": "pg-vendes",
"plugin.name": "pgoutput",
"table.include.list": "public.vendes,public.clients",
"publication.autocreate.mode": "filtered",
"snapshot.mode": "initial"
}
}
Estructura d'un missatge Debezium a Kafka:
{
"before": null,
"after": {
"id_venda": "V001",
"producte": "Portàtil",
"preu": 699.99,
"data": 1705276800000
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "pg-vendes",
"ts_ms": 1705276812345,
"db": "vendes_db",
"table": "vendes",
"txId": 42,
"lsn": 12345678
},
"op": "c",
"ts_ms": 1705276812789
}
El camp op indica el tipus d'operació: c (create/insert), u (update), d (delete), r (read/snapshot).
6.3 Apache NiFi
Apache NiFi ofereix una interfície visual per dissenyar pipelines de dades sense codi. Un pipeline típic:
O per a casos d'ús d'ETL senzills:
NiFi és especialment útil per a: - Ingestió de fonts heterogènies (FTP, HTTP, bases de dades, S3) - Transformacions lleugeres sense necessitat de Spark - Routing condicional: si les dades compleixen una condició, van per un camí; si no, per un altre
7. Patrons d'ingestió
7.1 Batch vs Micro-batch vs Streaming
| Patró | Eina | Latència | Ús típic |
|---|---|---|---|
| Batch | Spark, Hive, dbt | Hores/dies | Informes diaris, ETL nocturn |
| Micro-batch | Spark Structured Streaming | Segons/minuts | Near-real-time, Kafka → Delta Lake |
| Streaming | Apache Flink | Mil·lisegons | Detecció de frau, IoT en temps real |
7.2 Exemple complet: pipeline CDC amb Debezium + Kafka → Delta Lake
flowchart LR
PG[PostgreSQL\nvendes_db] -->|WAL| DEB[Debezium\nConnector]
DEB -->|Events CDC| KAF[Kafka\nTopic: pg-vendes.vendes]
KAF -->|Spark Structured Streaming| SS[Spark\nStreaming Job]
SS -->|MERGE / Upsert| DL[Delta Lake\n/data/delta/vendes]
DL -->|SQL| META[Metabase\nDashboard]
# Pipeline Spark Structured Streaming: Kafka → Delta Lake
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, LongType, StructType, StructField
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("CDC_Debezium_to_DeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Esquema del missatge Debezium (camp "after")
schema_venda = StructType([
StructField("id_venda", StringType()),
StructField("producte", StringType()),
StructField("preu", DoubleType()),
StructField("quantitat", LongType()),
])
schema_debezium = StructType([
StructField("op", StringType()),
StructField("after", schema_venda),
StructField("before", schema_venda),
])
# Llegir de Kafka
df_kafka = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "pg-vendes.public.vendes") \
.option("startingOffsets", "latest") \
.load()
# Parsejar el JSON de Debezium
df_parsed = df_kafka \
.select(F.from_json(
F.col("value").cast("string"),
schema_debezium
).alias("data")) \
.select(
F.col("data.op").alias("operacio"),
F.col("data.after.*")
) \
.filter(F.col("operacio").isin(["c", "u", "r"]))
# Funció per a cada micro-batch: MERGE a Delta Lake
RUTA_DELTA = "/data/delta/vendes"
def upsert_to_delta(df_batch, batch_id):
if df_batch.count() == 0:
return
deltaTable = DeltaTable.forPath(spark, RUTA_DELTA)
deltaTable.alias("target") \
.merge(
df_batch.alias("source"),
"target.id_venda = source.id_venda"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
print(f"Batch {batch_id}: {df_batch.count()} registres processats.")
# Iniciar el stream
query = df_parsed.writeStream \
.foreachBatch(upsert_to_delta) \
.option("checkpointLocation", "/data/checkpoints/vendes_cdc") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
Miniactivitat
Revisa el codi del pipeline CDC. Respon:
- Per qué filtrem
operacio.isin(["c", "u", "r"])i no processem els"d"(deletes)? - Qué fa el
checkpointLocation? Qué passaria si el job Spark cau i es reinicia sense checkpoint? - El
trigger(processingTime="30 seconds")fa que el job processi dades cada 30 segons. Quin és el trade-off entre un interval curt (5s) i un de llarg (5min)?
8. Exercici pràctic final
Implementa el pipeline complet CDC descrit a la secció 7.2 en local amb Docker:
- Levanta un PostgreSQL amb Docker
- Crea la taula
vendesi insereix 100 registres de prova - Configura Debezium per capturar els canvis
- Arrenca el job Spark Structured Streaming que llegeix de Kafka i escriu a Delta Lake
- Fes 10 inserts i 5 updates a PostgreSQL i verifica que els canvis arriben a Delta Lake en menys de 60 segons
- Usa Time Travel per veure l'estat inicial (versió 0) de la taula Delta Lake
Documenta cada pas amb captures de pantalla i respon: quina latència has observat entre l'insert a PostgreSQL i la disponibilitat a Delta Lake?
Tema 1 | Mòdul 5075 Big Data Aplicat | Institut Sa Palomera (Blanes) | Curs IABD 2026-2027