Salta el contingut

Formats d'evolució: Avro i Delta Lake

En entorns de producció, els formats de dades no son estàtics. Els esquemes canvien: s'afegeixen camps nous, se n'eliminen d'obsolets, se'n canvien els tipus. Els sistemes que no gestionen bé aquests canvis generen errors en producció, pipelines trencats i pèrdua de dades. Apache Avro resol l'evolució d'esquema per a missatgeria i streaming, mentre que Delta Lake afegeix transaccions ACID i time travel sobre fitxers Parquet estàndard, transformant un Data Lake en un Data Lakehouse transaccional.


Apache Avro

Origen i disseny

Avro va néixer el 2009 com a part del projecte Hadoop d'Apache, dissenyat específicament per a la serialització de dades en sistemes distribuïts. A diferència de Parquet i ORC, Avro és un format row-oriented (per fila), optimitzat per a streaming i missatgeria, no per a analítica columnar.

Les característiques que el diferencien son:

  • Schema-driven: l'esquema és un document JSON que defineix tots els camps i els seus tipus. L'esquema pot viatjar amb les dades (embedded) o emmagatzemar-se en un Schema Registry central.
  • Format binari compacte: sense capçaleres de columna repetides, els fitxers son molt compactes.
  • Evolució d'esquema: mecanisme formal per a compatibilitat entre versions d'esquema.
  • Suport de tipus ric: boolean, int, long, float, double, bytes, string, record, enum, array, map, union, fixed.

Esquema Avro

Un esquema Avro es defineix en JSON. Per exemple, per a una transacció financera:

{
  "type": "record",
  "name": "Transaccio",
  "namespace": "cat.sapalomera.bigdata",
  "fields": [
    {"name": "id",      "type": "long"},
    {"name": "data",    "type": {"type": "int", "logicalType": "date"}},
    {"name": "origen",  "type": "string"},
    {"name": "desti",   "type": "string"},
    {"name": "import",  "type": "double"},
    {"name": "moneda",  "type": {"type": "enum", "name": "Moneda",
                                "symbols": ["EUR", "USD", "GBP", "JPY"]}},
    {"name": "estat",   "type": ["null", "string"], "default": null},
    {"name": "metadades", "type": {"type": "map", "values": "string"},
                          "default": {}}
  ]
}

Notes sobre el esquema:

  • El tipus ["null", "string"] és un union: el camp pot ser nul o un string. Avro obliga a declarar explícitament la possibilitat de nuls.
  • El "default": null indica el valor que s'usa si el camp no apareix en dades anteriors a l'afegir-lo (evolució backward compatible).
  • logicalType: "date" representa una data com a nombre de dies des de l'epoch Unix sobre un enter de 32 bits.

Serialitzar i deserialitzar amb fastavro

import fastavro
import io
from datetime import date

# Definir l'esquema com a dict Python
esquema_dict = {
    "type": "record",
    "name": "Transaccio",
    "fields": [
        {"name": "id",     "type": "long"},
        {"name": "origen", "type": "string"},
        {"name": "desti",  "type": "string"},
        {"name": "import", "type": "double"},
        {"name": "moneda", "type": "string"},
        {"name": "estat",  "type": ["null", "string"], "default": None},
    ]
}

esquema_parsejat = fastavro.parse_schema(esquema_dict)

# Dades a serialitzar
registres = [
    {"id": 1, "origen": "ES12 3456", "desti": "DE89 3704",
     "import": 1500.0, "moneda": "EUR", "estat": "COMPLETADA"},
    {"id": 2, "origen": "ES12 3456", "desti": "US12 3456",
     "import": 200.5, "moneda": "USD", "estat": None},
]

# Escriure a un fitxer Avro
with open('transaccions.avro', 'wb') as f:
    fastavro.writer(f, esquema_parsejat, registres)

# Llegir des d'un fitxer Avro
with open('transaccions.avro', 'rb') as f:
    lector = fastavro.reader(f)
    print(f"Esquema escriptor: {lector.writer_schema['name']}")
    for registre in lector:
        print(registre)

Compatibilitat d'esquema

L'evolució d'esquema en Avro es regeix per regles de compatibilitat formals:

Compatibilitat backward (nous lectors, dades velles): el nou esquema pot llegir dades escrites amb l'esquema antic.

  • Es pot afegir un camp amb valor per defecte: els lectors nous usaran el defecte per als registres antics que no en tinguin.
  • Es pot eliminar un camp amb valor per defecte: els lectors nous ignoraran els camps extres en registres antics.

Compatibilitat forward (lectors vells, dades noves): l'esquema antic pot llegir dades escrites amb el nou esquema.

  • Es pot afegir un camp amb valor per defecte: els lectors vells ignoraran el camp nou.
  • Es pot eliminar un camp: els lectors vells usaran el valor per defecte per als camps absents.

Compatibilitat completa (full): compleix alhora backward i forward. El cas mes restrictiu i recomanat en sistemes Kafka amb Schema Registry.

Canvi Backward Forward Full
Afegir camp amb defecte Si Si Si
Afegir camp sense defecte No Si No
Eliminar camp amb defecte Si No No
Canviar tipus incompatible No No No
Renomenar camp No No No

Kafka i Schema Registry

En pipelines Kafka amb Confluent Schema Registry, cada missatge porta un identificador d'esquema de 4 bytes al principi del payload. El consumidor usa aquest ID per recuperar l'esquema exacte del Registry i deserialitzar el missatge. Sense Schema Registry, caldria que cada missatge portés l'esquema complet, cosa que infla enormement la mida dels missatges.

Quan usar Avro

  • Missatgeria Kafka: és el format de serialització mes usat en ecosistemes Kafka, especialment amb Confluent Schema Registry.
  • RPC i serialització de xarxa: quan cal serialitzar objectes per enviar-los entre serveis.
  • Pipelines de streaming on l'esquema evoluciona i cal garantir compatibilitat.
  • Fitxers de dades en Hadoop: per a casos on es necessita streaming de files (no columnar).

Delta Lake

Que és Delta Lake

Delta Lake és una capa d'emmagatzematge open-source que afegeix funcionalitats transaccionals sobre fitxers Parquet estàndard. Creat per Databricks i donat a l'Apache Software Foundation, resol el problema fonamental dels Data Lakes: la manca de garanties ACID sobre sistemes de fitxers d'objectes com S3, ADLS o GCS.

La clau de Delta Lake és el transaction log: un directori _delta_log/ al costat dels fitxers Parquet que conté un registre ordenat de totes les operacions sobre la taula.

taula_transaccions/
├── _delta_log/
│   ├── 00000000000000000000.json   (versio 0: CREATE TABLE)
│   ├── 00000000000000000001.json   (versio 1: INSERT batch 1)
│   ├── 00000000000000000002.json   (versio 2: UPDATE registres)
│   ├── 00000000000000000003.json   (versio 3: DELETE registres)
│   └── 00000000000000000010.checkpoint.parquet  (checkpoint cada 10 commits)
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
└── part-00002-ghi789.snappy.parquet

Propietats ACID en Delta Lake

Atomicitat: cada operació (INSERT, UPDATE, DELETE, MERGE) es registra com una transacció completa al log. Si falla a mig camí, la transacció no apareix al log i la taula queda intacta.

Consistencia: Delta Lake valida que les dades respecten l'esquema i les constraints definides abans d'escriure qualsevol fitxer.

Aïllament (Isolation): Delta Lake usa optimistic concurrency control. Cada escriptor llegeix la versió actual del log, escriu els fitxers Parquet i intenta afegir la seva entrada al log. Si un altre escriptor ha modificat el log entremig, la transacció és rebutjada i reintentada. Garanteix aïllament de tipus Serializable en la majoria dels casos.

Durabilitat: un cop el commit apareix al _delta_log/, és persistent. Els fitxers Parquet son immutables (mai es modifiquen, només s'afegeixen nous i el log marca els vells com a eliminats lògicament).

Time Travel

Time Travel permet llegir qualsevol versió anterior d'una taula Delta Lake, ja que el log preserva totes les versions.

from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("delta-time-travel") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

ruta = "taula_transaccions/"

# Llegir la versio actual
df_actual = spark.read.format("delta").load(ruta)

# Llegir una versio anterior per numero de versio
df_v5 = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load(ruta)

# Llegir una versio anterior per timestamp
df_ahir = spark.read.format("delta") \
    .option("timestampAsOf", "2024-03-14 00:00:00") \
    .load(ruta)

# Veure l'historial de canvis
delta_taula = DeltaTable.forPath(spark, ruta)
historial = delta_taula.history()
historial.select("version", "timestamp", "operation", "operationParameters").show()

Casos d'us del Time Travel:

  • Auditoria: qui va modificar que i quan.
  • Rollback: si es detecta un error en un batch, tornar a la versio anterior.
  • Reproducibilitat: garantir que dos entrenaments de model ML usen exactament les mateixes dades.
  • Debug: investigar l'estat de la taula en un moment concret.

Schema enforcement i evolució

# Schema enforcement (activat per defecte):
# Si un DataFrame te columnes que no existeixen a la taula Delta, l'escriptura falla.
df_incorrecte.write.format("delta").mode("append").save(ruta)
# -> AnalysisException: A schema mismatch detected ...

# Schema evolution: permetre afegir columnes noves
df_amb_columna_nova.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save(ruta)

# Sobreescriure l'esquema complet (USE AMB PRECAUCIO)
df.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .save(ruta)

VACUUM: netejar versions antigues

Delta Lake preserva tots els fitxers Parquet antics (els marcats com a eliminats al log) per a suportar Time Travel. VACUUM elimina els fitxers mes antics que un llindar de retencio:

# Eliminar fitxers mes antics de 7 dies (168 hores, valor per defecte)
delta_taula.vacuum()

# Eliminar fitxers mes antics de 30 dies
delta_taula.vacuum(retentionHours=720)

# ATENCIO: vacuum(0) elimina tot pero trenca el time travel.
# Delta Lake te un minim de 168h per seguretat.
# Per forcar valors inferiors (NO recomanat en produccio):
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_taula.vacuum(retentionHours=0)

Z-Ordering: co-localitzar dades relacionades

Z-Ordering reordena fisicament els fitxers Parquet dins d'una taula Delta per co-localitzar files relacionades, millorant el predicate pushdown:

# Optimitzar i fer Z-ordering per les columnes mes usades en filtres
delta_taula.optimize().executeZOrderBy("moneda", "estat")

Despres del Z-Ordering, totes les files de moneda = 'EUR' tendiran a estar en els mateixos fitxers Parquet, minimitzant els fitxers que cal llegir per a un filtre per moneda.

MERGE / Upsert per a CDC

MERGE INTO permet fer upserts (insert si no existeix, update si existeix), essencial per a Change Data Capture (CDC):

from delta import DeltaTable

taula_delta = DeltaTable.forPath(spark, "taula_transaccions/")
df_actualitzacions = spark.createDataFrame([
    (1, "COMPLETADA"),   # actualitzar estat de la transaccio 1
    (999, "PENDENT"),    # inserir transaccio nova
], ["id", "nou_estat"])

taula_delta.alias("original") \
    .merge(
        df_actualitzacions.alias("actualitzacio"),
        "original.id = actualitzacio.id"
    ) \
    .whenMatchedUpdate(set={"estat": "actualitzacio.nou_estat"}) \
    .whenNotMatchedInsert(values={
        "id":    "actualitzacio.id",
        "estat": "actualitzacio.nou_estat",
    }) \
    .execute()

Apache Iceberg (breu)

Apache Iceberg és l'alternativa principal a Delta Lake, adoptada per AWS Athena, Snowflake, Dremio, Apache Flink i Apache Spark. Les seves característiques distintives respecte a Delta Lake son:

Hidden partitioning: Iceberg gestiona les particions internament. L'usuari no especifica la columna de particio en les queries; Iceberg la dedueix automàticament del filtre. Canviar l'estrategia de particio no requereix reescriure les dades.

Partition evolution: es pot canviar l'estrategia de particiÓ (p. ex., passar de particionar per mes a particionar per dia) sense reescriure les dades existents. Les noves particions usen la nova estrategia, les antigues mantenen l'anterior.

Row-level deletes sense reescriure fitxers: Iceberg pot marcar files com a eliminades en un fitxer separat (delete file) sense tocar el fitxer Parquet original. Delta Lake ha de reescriure el fitxer sencer per eliminar files.

Multi-engine: Iceberg esta dissenyat per a ser usat simultàniament per múltiples motors (Spark + Flink + Trino) sobre les mateixes dades, amb garanties d'aïllament.

Comparativa Delta Lake vs Iceberg vs Apache Hudi

Dimensio Delta Lake Apache Iceberg Apache Hudi
Creador Databricks Netflix/Apple Uber
Transaccions ACID Si Si Si
Time Travel Si Si Si
Schema evolution Si Si Si
Hidden partitioning No Si No
Partition evolution Limitada Si Si
Row-level deletes eficients Reescriu fitxer Delete files Si (COW i MOR)
Integracio Spark Excellent Bona Bona
Integracio Flink Bona Excel·lent Excel·lent
Suport AWS Athena Si (a traves Lake Formation) Si (natiu) Si
Suport Snowflake No Si (natiu) No
Maduresa de l'ecosistema Alta Alta Mitjana

Taula resum de tots els formats

Format Orientacio Tipus Schema Transaccions Time Travel Cas d'us principal
CSV Fila Cap No No No Intercanvi simple
JSON Jerarquic Parcial No No No APIs, configuracio
NDJSON Fila (streaming) Parcial No No No Logs, events, Kafka
Parquet Columnar Si Si (static) No No Analitica al data lake
ORC Columnar Si Si (static) No No Hive, Hadoop classic
Avro Fila (streaming) Si Si (evolutiu) No No Kafka, RPC, serialitzacio
Delta Lake Columnar + log Si Si (evolutiu) ACID Si Data Lakehouse, CDC
Iceberg Columnar + log Si Si (evolutiu) ACID Si Multi-engine Lakehouse

AC5074/06/03 — Miniactivitat

Una plataforma de pagaments processa 2 milions de transaccions diaries i necessita:

  • Auditar tots els canvis en les transaccions dels últims 30 dies (qui va modificar que i quan).
  • Fer correccions massives d'errors (canviar l'estat de 50.000 transaccions d'un dia concret).
  • Garantir que si un procés batch falla a mig camí, les dades no queden en un estat inconsistent.
  • Permetre que els analistes de dades consultin l'estat de les transaccions tal com eren ahir al migdia per reconciliació.

Redacta un document tècnic (400-600 paraules) que:

  1. Justifiqui per que Delta Lake és la millor opció per a aquest cas d'ús (respecte a CSV, Parquet pur i Avro).
  2. Identifiqui quines característiques concretes de Delta Lake s'usarien per a cada requisit: Time Travel, MERGE, schema enforcement, VACUUM.
  3. Proposi una estrategia de particionament i retencio de versions (quant temps mantenir l'historial amb VACUUM).
  4. Descrigui breument si Iceberg podria ser una alternativa viable i per que.