Formats d'evolució: Avro i Delta Lake
En entorns de producció, els formats de dades no son estàtics. Els esquemes canvien: s'afegeixen camps nous, se n'eliminen d'obsolets, se'n canvien els tipus. Els sistemes que no gestionen bé aquests canvis generen errors en producció, pipelines trencats i pèrdua de dades. Apache Avro resol l'evolució d'esquema per a missatgeria i streaming, mentre que Delta Lake afegeix transaccions ACID i time travel sobre fitxers Parquet estàndard, transformant un Data Lake en un Data Lakehouse transaccional.
Apache Avro
Origen i disseny
Avro va néixer el 2009 com a part del projecte Hadoop d'Apache, dissenyat específicament per a la serialització de dades en sistemes distribuïts. A diferència de Parquet i ORC, Avro és un format row-oriented (per fila), optimitzat per a streaming i missatgeria, no per a analítica columnar.
Les característiques que el diferencien son:
- Schema-driven: l'esquema és un document JSON que defineix tots els camps i els seus tipus. L'esquema pot viatjar amb les dades (embedded) o emmagatzemar-se en un Schema Registry central.
- Format binari compacte: sense capçaleres de columna repetides, els fitxers son molt compactes.
- Evolució d'esquema: mecanisme formal per a compatibilitat entre versions d'esquema.
- Suport de tipus ric: boolean, int, long, float, double, bytes, string, record, enum, array, map, union, fixed.
Esquema Avro
Un esquema Avro es defineix en JSON. Per exemple, per a una transacció financera:
{
"type": "record",
"name": "Transaccio",
"namespace": "cat.sapalomera.bigdata",
"fields": [
{"name": "id", "type": "long"},
{"name": "data", "type": {"type": "int", "logicalType": "date"}},
{"name": "origen", "type": "string"},
{"name": "desti", "type": "string"},
{"name": "import", "type": "double"},
{"name": "moneda", "type": {"type": "enum", "name": "Moneda",
"symbols": ["EUR", "USD", "GBP", "JPY"]}},
{"name": "estat", "type": ["null", "string"], "default": null},
{"name": "metadades", "type": {"type": "map", "values": "string"},
"default": {}}
]
}
Notes sobre el esquema:
- El tipus
["null", "string"]és un union: el camp pot ser nul o un string. Avro obliga a declarar explícitament la possibilitat de nuls. - El
"default": nullindica el valor que s'usa si el camp no apareix en dades anteriors a l'afegir-lo (evolució backward compatible). logicalType: "date"representa una data com a nombre de dies des de l'epoch Unix sobre un enter de 32 bits.
Serialitzar i deserialitzar amb fastavro
import fastavro
import io
from datetime import date
# Definir l'esquema com a dict Python
esquema_dict = {
"type": "record",
"name": "Transaccio",
"fields": [
{"name": "id", "type": "long"},
{"name": "origen", "type": "string"},
{"name": "desti", "type": "string"},
{"name": "import", "type": "double"},
{"name": "moneda", "type": "string"},
{"name": "estat", "type": ["null", "string"], "default": None},
]
}
esquema_parsejat = fastavro.parse_schema(esquema_dict)
# Dades a serialitzar
registres = [
{"id": 1, "origen": "ES12 3456", "desti": "DE89 3704",
"import": 1500.0, "moneda": "EUR", "estat": "COMPLETADA"},
{"id": 2, "origen": "ES12 3456", "desti": "US12 3456",
"import": 200.5, "moneda": "USD", "estat": None},
]
# Escriure a un fitxer Avro
with open('transaccions.avro', 'wb') as f:
fastavro.writer(f, esquema_parsejat, registres)
# Llegir des d'un fitxer Avro
with open('transaccions.avro', 'rb') as f:
lector = fastavro.reader(f)
print(f"Esquema escriptor: {lector.writer_schema['name']}")
for registre in lector:
print(registre)
Compatibilitat d'esquema
L'evolució d'esquema en Avro es regeix per regles de compatibilitat formals:
Compatibilitat backward (nous lectors, dades velles): el nou esquema pot llegir dades escrites amb l'esquema antic.
- Es pot afegir un camp amb valor per defecte: els lectors nous usaran el defecte per als registres antics que no en tinguin.
- Es pot eliminar un camp amb valor per defecte: els lectors nous ignoraran els camps extres en registres antics.
Compatibilitat forward (lectors vells, dades noves): l'esquema antic pot llegir dades escrites amb el nou esquema.
- Es pot afegir un camp amb valor per defecte: els lectors vells ignoraran el camp nou.
- Es pot eliminar un camp: els lectors vells usaran el valor per defecte per als camps absents.
Compatibilitat completa (full): compleix alhora backward i forward. El cas mes restrictiu i recomanat en sistemes Kafka amb Schema Registry.
| Canvi | Backward | Forward | Full |
|---|---|---|---|
| Afegir camp amb defecte | Si | Si | Si |
| Afegir camp sense defecte | No | Si | No |
| Eliminar camp amb defecte | Si | No | No |
| Canviar tipus incompatible | No | No | No |
| Renomenar camp | No | No | No |
Kafka i Schema Registry
En pipelines Kafka amb Confluent Schema Registry, cada missatge porta un identificador d'esquema de 4 bytes al principi del payload. El consumidor usa aquest ID per recuperar l'esquema exacte del Registry i deserialitzar el missatge. Sense Schema Registry, caldria que cada missatge portés l'esquema complet, cosa que infla enormement la mida dels missatges.
Quan usar Avro
- Missatgeria Kafka: és el format de serialització mes usat en ecosistemes Kafka, especialment amb Confluent Schema Registry.
- RPC i serialització de xarxa: quan cal serialitzar objectes per enviar-los entre serveis.
- Pipelines de streaming on l'esquema evoluciona i cal garantir compatibilitat.
- Fitxers de dades en Hadoop: per a casos on es necessita streaming de files (no columnar).
Delta Lake
Que és Delta Lake
Delta Lake és una capa d'emmagatzematge open-source que afegeix funcionalitats transaccionals sobre fitxers Parquet estàndard. Creat per Databricks i donat a l'Apache Software Foundation, resol el problema fonamental dels Data Lakes: la manca de garanties ACID sobre sistemes de fitxers d'objectes com S3, ADLS o GCS.
La clau de Delta Lake és el transaction log: un directori _delta_log/ al costat dels fitxers Parquet que conté un registre ordenat de totes les operacions sobre la taula.
taula_transaccions/
├── _delta_log/
│ ├── 00000000000000000000.json (versio 0: CREATE TABLE)
│ ├── 00000000000000000001.json (versio 1: INSERT batch 1)
│ ├── 00000000000000000002.json (versio 2: UPDATE registres)
│ ├── 00000000000000000003.json (versio 3: DELETE registres)
│ └── 00000000000000000010.checkpoint.parquet (checkpoint cada 10 commits)
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
└── part-00002-ghi789.snappy.parquet
Propietats ACID en Delta Lake
Atomicitat: cada operació (INSERT, UPDATE, DELETE, MERGE) es registra com una transacció completa al log. Si falla a mig camí, la transacció no apareix al log i la taula queda intacta.
Consistencia: Delta Lake valida que les dades respecten l'esquema i les constraints definides abans d'escriure qualsevol fitxer.
Aïllament (Isolation): Delta Lake usa optimistic concurrency control. Cada escriptor llegeix la versió actual del log, escriu els fitxers Parquet i intenta afegir la seva entrada al log. Si un altre escriptor ha modificat el log entremig, la transacció és rebutjada i reintentada. Garanteix aïllament de tipus Serializable en la majoria dels casos.
Durabilitat: un cop el commit apareix al _delta_log/, és persistent. Els fitxers Parquet son immutables (mai es modifiquen, només s'afegeixen nous i el log marca els vells com a eliminats lògicament).
Time Travel
Time Travel permet llegir qualsevol versió anterior d'una taula Delta Lake, ja que el log preserva totes les versions.
from delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("delta-time-travel") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
ruta = "taula_transaccions/"
# Llegir la versio actual
df_actual = spark.read.format("delta").load(ruta)
# Llegir una versio anterior per numero de versio
df_v5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load(ruta)
# Llegir una versio anterior per timestamp
df_ahir = spark.read.format("delta") \
.option("timestampAsOf", "2024-03-14 00:00:00") \
.load(ruta)
# Veure l'historial de canvis
delta_taula = DeltaTable.forPath(spark, ruta)
historial = delta_taula.history()
historial.select("version", "timestamp", "operation", "operationParameters").show()
Casos d'us del Time Travel:
- Auditoria: qui va modificar que i quan.
- Rollback: si es detecta un error en un batch, tornar a la versio anterior.
- Reproducibilitat: garantir que dos entrenaments de model ML usen exactament les mateixes dades.
- Debug: investigar l'estat de la taula en un moment concret.
Schema enforcement i evolució
# Schema enforcement (activat per defecte):
# Si un DataFrame te columnes que no existeixen a la taula Delta, l'escriptura falla.
df_incorrecte.write.format("delta").mode("append").save(ruta)
# -> AnalysisException: A schema mismatch detected ...
# Schema evolution: permetre afegir columnes noves
df_amb_columna_nova.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save(ruta)
# Sobreescriure l'esquema complet (USE AMB PRECAUCIO)
df.write.format("delta") \
.option("overwriteSchema", "true") \
.mode("overwrite") \
.save(ruta)
VACUUM: netejar versions antigues
Delta Lake preserva tots els fitxers Parquet antics (els marcats com a eliminats al log) per a suportar Time Travel. VACUUM elimina els fitxers mes antics que un llindar de retencio:
# Eliminar fitxers mes antics de 7 dies (168 hores, valor per defecte)
delta_taula.vacuum()
# Eliminar fitxers mes antics de 30 dies
delta_taula.vacuum(retentionHours=720)
# ATENCIO: vacuum(0) elimina tot pero trenca el time travel.
# Delta Lake te un minim de 168h per seguretat.
# Per forcar valors inferiors (NO recomanat en produccio):
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_taula.vacuum(retentionHours=0)
Z-Ordering: co-localitzar dades relacionades
Z-Ordering reordena fisicament els fitxers Parquet dins d'una taula Delta per co-localitzar files relacionades, millorant el predicate pushdown:
# Optimitzar i fer Z-ordering per les columnes mes usades en filtres
delta_taula.optimize().executeZOrderBy("moneda", "estat")
Despres del Z-Ordering, totes les files de moneda = 'EUR' tendiran a estar en els mateixos fitxers Parquet, minimitzant els fitxers que cal llegir per a un filtre per moneda.
MERGE / Upsert per a CDC
MERGE INTO permet fer upserts (insert si no existeix, update si existeix), essencial per a Change Data Capture (CDC):
from delta import DeltaTable
taula_delta = DeltaTable.forPath(spark, "taula_transaccions/")
df_actualitzacions = spark.createDataFrame([
(1, "COMPLETADA"), # actualitzar estat de la transaccio 1
(999, "PENDENT"), # inserir transaccio nova
], ["id", "nou_estat"])
taula_delta.alias("original") \
.merge(
df_actualitzacions.alias("actualitzacio"),
"original.id = actualitzacio.id"
) \
.whenMatchedUpdate(set={"estat": "actualitzacio.nou_estat"}) \
.whenNotMatchedInsert(values={
"id": "actualitzacio.id",
"estat": "actualitzacio.nou_estat",
}) \
.execute()
Apache Iceberg (breu)
Apache Iceberg és l'alternativa principal a Delta Lake, adoptada per AWS Athena, Snowflake, Dremio, Apache Flink i Apache Spark. Les seves característiques distintives respecte a Delta Lake son:
Hidden partitioning: Iceberg gestiona les particions internament. L'usuari no especifica la columna de particio en les queries; Iceberg la dedueix automàticament del filtre. Canviar l'estrategia de particio no requereix reescriure les dades.
Partition evolution: es pot canviar l'estrategia de particiÓ (p. ex., passar de particionar per mes a particionar per dia) sense reescriure les dades existents. Les noves particions usen la nova estrategia, les antigues mantenen l'anterior.
Row-level deletes sense reescriure fitxers: Iceberg pot marcar files com a eliminades en un fitxer separat (delete file) sense tocar el fitxer Parquet original. Delta Lake ha de reescriure el fitxer sencer per eliminar files.
Multi-engine: Iceberg esta dissenyat per a ser usat simultàniament per múltiples motors (Spark + Flink + Trino) sobre les mateixes dades, amb garanties d'aïllament.
Comparativa Delta Lake vs Iceberg vs Apache Hudi
| Dimensio | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| Creador | Databricks | Netflix/Apple | Uber |
| Transaccions ACID | Si | Si | Si |
| Time Travel | Si | Si | Si |
| Schema evolution | Si | Si | Si |
| Hidden partitioning | No | Si | No |
| Partition evolution | Limitada | Si | Si |
| Row-level deletes eficients | Reescriu fitxer | Delete files | Si (COW i MOR) |
| Integracio Spark | Excellent | Bona | Bona |
| Integracio Flink | Bona | Excel·lent | Excel·lent |
| Suport AWS Athena | Si (a traves Lake Formation) | Si (natiu) | Si |
| Suport Snowflake | No | Si (natiu) | No |
| Maduresa de l'ecosistema | Alta | Alta | Mitjana |
Taula resum de tots els formats
| Format | Orientacio | Tipus | Schema | Transaccions | Time Travel | Cas d'us principal |
|---|---|---|---|---|---|---|
| CSV | Fila | Cap | No | No | No | Intercanvi simple |
| JSON | Jerarquic | Parcial | No | No | No | APIs, configuracio |
| NDJSON | Fila (streaming) | Parcial | No | No | No | Logs, events, Kafka |
| Parquet | Columnar | Si | Si (static) | No | No | Analitica al data lake |
| ORC | Columnar | Si | Si (static) | No | No | Hive, Hadoop classic |
| Avro | Fila (streaming) | Si | Si (evolutiu) | No | No | Kafka, RPC, serialitzacio |
| Delta Lake | Columnar + log | Si | Si (evolutiu) | ACID | Si | Data Lakehouse, CDC |
| Iceberg | Columnar + log | Si | Si (evolutiu) | ACID | Si | Multi-engine Lakehouse |
AC5074/06/03 — Miniactivitat
Una plataforma de pagaments processa 2 milions de transaccions diaries i necessita:
- Auditar tots els canvis en les transaccions dels últims 30 dies (qui va modificar que i quan).
- Fer correccions massives d'errors (canviar l'estat de 50.000 transaccions d'un dia concret).
- Garantir que si un procés batch falla a mig camí, les dades no queden en un estat inconsistent.
- Permetre que els analistes de dades consultin l'estat de les transaccions tal com eren ahir al migdia per reconciliació.
Redacta un document tècnic (400-600 paraules) que:
- Justifiqui per que Delta Lake és la millor opció per a aquest cas d'ús (respecte a CSV, Parquet pur i Avro).
- Identifiqui quines característiques concretes de Delta Lake s'usarien per a cada requisit: Time Travel, MERGE, schema enforcement, VACUUM.
- Proposi una estrategia de particionament i retencio de versions (quant temps mantenir l'historial amb VACUUM).
- Descrigui breument si Iceberg podria ser una alternativa viable i per que.