Salta el contingut

Delta Lake

Introducció

Delta Lake afegeix transaccions ACID, time travel i schema evolution sobre fitxers Parquet. És la tecnologia clau del paradigma Lakehouse: combina l'escalabilitat i el baix cost d'un Data Lake amb les garanties transaccionals d'un Data Warehouse.

Sense Delta Lake, un Data Lake basat en fitxers Parquet té problemes fonamentals: - No hi ha transaccions: si un job falla a meitat d'escriptura, les dades queden en un estat inconsistent - No hi ha updates ni deletes: Parquet és immutable; modificar un registre requereix reescriure el fitxer sencer - No hi ha control de concurrència: dos jobs escrivint simultàniament poden corrompre les dades

Delta Lake resol tots aquests problemes mantenint un transaction log (un directori _delta_log/) que registra cada operació com una entrada JSON.


Configuració de Delta Lake

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

Crear i llegir taules Delta

RUTA_DELTA = "/dades/delta/clients"

# Escriure dades en format Delta
df_clients = spark.createDataFrame([
    (1, "Anna Garcia",  "anna@email.com",  "Barcelona"),
    (2, "Joan Puig",    "joan@email.com",  "Girona"),
    (3, "Maria Costa",  "maria@email.com", "Tarragona"),
], ["id_client", "nom", "email", "ciutat"])

df_clients.write \
    .format("delta") \
    .mode("overwrite") \
    .save(RUTA_DELTA)

# Llegir una taula Delta
df = spark.read.format("delta").load(RUTA_DELTA)
df.show()

Operacions ACID

Update i Delete

delta_table = DeltaTable.forPath(spark, RUTA_DELTA)

# Actualitzar un registre
delta_table.update(
    condition=col("id_client") == 2,
    set={
        "email":  "'joan.puig@noudomini.com'",
        "ciutat": "'Lleida'"
    }
)

# Eliminar un registre
delta_table.delete(condition=col("id_client") == 3)

MERGE (Upsert)

L'operació MERGE és la més poderosa de Delta Lake. Permet inserir nous registres, actualitzar els existents i eliminar els obsolets en una sola operació atòmica:

df_nous_clients = spark.createDataFrame([
    (2, "Joan Puig Actualitzat", "joan.nou@email.com", "Lleida"),  # existeix, actualitzar
    (4, "Pere Vidal",            "pere@email.com",     "Manresa"), # nou, inserir
], ["id_client", "nom", "email", "ciutat"])

delta_table.alias("original") \
    .merge(
        df_nous_clients.alias("nous"),
        "original.id_client = nous.id_client"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

MERGE amb lògica personalitzada

# MERGE amb condicions específiques
delta_table.alias("dest") \
    .merge(
        df_incremental.alias("src"),
        "dest.id = src.id"
    ) \
    .whenMatchedUpdate(
        condition="src.data_actualitzacio > dest.data_actualitzacio",
        set={
            "nom":               "src.nom",
            "email":             "src.email",
            "data_actualitzacio": "src.data_actualitzacio"
        }
    ) \
    .whenNotMatchedInsert(
        condition="src.actiu = true",
        values={
            "id":                "src.id",
            "nom":               "src.nom",
            "email":             "src.email",
            "data_actualitzacio": "src.data_actualitzacio",
            "actiu":             "src.actiu"
        }
    ) \
    .whenMatchedDelete(condition="src.actiu = false") \
    .execute()

Time Travel: llegir versions anteriors

Delta Lake registra totes les versions de la taula. Pots llegir qualsevol versió anterior o tornar a un estat anterior:

# Llegir la versió original (versió 0)
df_v0 = spark.read \
    .format("delta") \
    .option("versionAsOf", 0) \
    .load(RUTA_DELTA)

print("Versió original:")
df_v0.show()

# Llegir per timestamp (l'estat de la taula fa 2 hores)
df_ahir = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2024-11-01 12:00:00") \
    .load(RUTA_DELTA)

# Restaurar la taula a una versió anterior
DeltaTable.restoreToVersion(spark, RUTA_DELTA, version=0)

Historial i manteniment

# Veure el log complet d'operacions
delta_table.history().show(truncate=False)
# +-------+------------------+----------+----------+-------------+...
# |version|timestamp         |userId    |operation |operationPar.|
# +-------+------------------+----------+----------+-------------+
# |3      |2024-11-15 10:35..|spark     |MERGE     |...          |
# |2      |2024-11-15 10:34..|spark     |DELETE    |...          |
# |1      |2024-11-15 10:33..|spark     |UPDATE    |...          |
# |0      |2024-11-15 10:32..|spark     |WRITE     |...          |

# Compactar fitxers petits (Z-ordering per a cerca eficient)
delta_table.optimize().executeZOrderBy("id_client")

# Eliminar versions antigues (mantenir les darreres 7 dies = 168 hores)
delta_table.vacuum(168)

vacuum i time travel

Si executes vacuum(0), perds la capacitat de time travel ja que s'eliminen tots els fitxers antics. Deixa sempre un marge raonable (7 dies per defecte és l'estàndard).


Schema Evolution

Delta Lake permet afegir noves columnes sense reescriure les dades existents:

# Afegir una nova columna al DataFrame
df_amb_telefon = df_nous.withColumn("telefon", lit(None).cast(StringType()))

# Escriure amb mergeSchema per permetre l'evolució de l'esquema
df_amb_telefon.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(RUTA_DELTA)

# Les files antigues tindran NULL a la nova columna 'telefon'
spark.read.format("delta").load(RUTA_DELTA).show()

Delta Lake amb Spark SQL

Delta Lake s'integra completament amb Spark SQL, permetent crear taules gestionades:

# Crear una taula Delta gestionada (amb metastore)
spark.sql("""
    CREATE TABLE IF NOT EXISTS vendes_delta (
        id_comanda   INT,
        data_comanda DATE,
        id_client    INT,
        producte     STRING,
        import_total DOUBLE
    )
    USING DELTA
    PARTITIONED BY (data_comanda)
    LOCATION '/dades/delta/vendes'
""")

# Inserir, actualitzar i consultar amb SQL pur
spark.sql("INSERT INTO vendes_delta VALUES (1001, '2024-11-01', 42, 'Portàtil', 999.99)")

spark.sql("""
    MERGE INTO vendes_delta AS dest
    USING nous_registres AS src ON dest.id_comanda = src.id_comanda
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

# Time travel des de SQL
spark.sql("SELECT * FROM vendes_delta VERSION AS OF 2").show()
spark.sql("SELECT * FROM vendes_delta TIMESTAMP AS OF '2024-11-01'").show()

# Veure historial des de SQL
spark.sql("DESCRIBE HISTORY vendes_delta").show(truncate=False)

Miniactivitat: Simular un pipeline incremental

  1. Crea una taula Delta amb 100 clients ficticis.
  2. Simula una càrrega incremental: prepara un DataFrame amb 20 clients nous i 10 actualitzacions de clients existents.
  3. Usa MERGE per sincronitzar el DataFrame incremental amb la taula Delta.
  4. Verifica amb history() que l'operació ha quedat registrada.
  5. Usa time travel per recuperar l'estat de la taula abans del MERGE i compara el nombre de files.