Salta el contingut

Windowing i Joins

Introducció

Les funcions de finestra (window functions) i els joins entre DataFrames són dues de les operacions més potents i freqüents en pipelines analítics amb Spark. Les funcions de finestra permeten càlculs sobre grups de files sense col·lapsar el resultat en una sola fila (a diferència del groupBy). Els joins permeten combinar múltiples fonts de dades per enriquir l'anàlisi.


Funcions de finestra (Window Functions)

Concepte de finestra

Una finestra (window) defineix un conjunt de files relacionades amb la fila actual, sobre les quals s'aplica una funció. El resultat s'afegeix com una nova columna, mantenint totes les files originals.

graph LR
    subgraph "groupBy + agg"
        A[10 files] -->|agregar| B[3 files\nResultat col·lapsat]
    end
    subgraph "Window Function"
        C[10 files] -->|calcular finestra| D[10 files\nResultat enriquit]
    end

Definir una finestra

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import (
    row_number, rank, dense_rank,
    lag, lead,
    sum, avg, min, max, count,
    percent_rank, ntile, cume_dist,
    col, round as spark_round
)

spark = SparkSession.builder \
    .appName("WindowFunctions") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/dades/vendes_2024.csv")

# Definir una especificació de finestra: particionar per categoria, ordenar per import
window_categoria = Window \
    .partitionBy("categoria") \
    .orderBy(col("import_total").desc())

Funcions de rànquing

row_number, rank i dense_rank

df_ranking = df \
    .withColumn("row_num",    row_number().over(window_categoria)) \
    .withColumn("rank",       rank().over(window_categoria)) \
    .withColumn("dense_rank", dense_rank().over(window_categoria))

df_ranking.select("producte", "categoria", "import_total",
                  "row_num", "rank", "dense_rank") \
          .filter(col("categoria") == "Electrònica") \
          .show(10)

# Si dues files empaten en import:
# - row_number: sempre únic (1, 2, 3, 4...)
# - rank: salta números (1, 1, 3, 4...)      → 'deixa forat'
# - dense_rank: no salta (1, 1, 2, 3...)     → 'sense forat'

Filtrar el top N per grup

Un patró molt freqüent: obtenir els 3 productes més venuts per categoria:

# Top 3 productes per categoria
top3_per_categoria = df \
    .withColumn("rk", dense_rank().over(window_categoria)) \
    .filter(col("rk") <= 3) \
    .select("categoria", "producte", "import_total", "rk") \
    .orderBy("categoria", "rk")

top3_per_categoria.show(20)

percent_rank, ntile i cume_dist

window_global = Window.orderBy(col("import_total").desc())

df_percentils = df \
    .withColumn("percentil",   spark_round(percent_rank().over(window_global) * 100, 1)) \
    .withColumn("quartil",     ntile(4).over(window_global)) \
    .withColumn("cume_dist",   spark_round(cume_dist().over(window_global), 4))

df_percentils.select("producte", "import_total", "percentil", "quartil", "cume_dist").show(10)

Funcions d'accés a files adjacents: lag i lead

lag i lead permeten accedir al valor d'una fila anterior o posterior dins de la finestra:

window_temps = Window \
    .partitionBy("producte") \
    .orderBy("mes_comanda")

df_tendencia = df \
    .groupBy("producte", "mes_comanda") \
    .agg(sum("import_total").alias("facturacio_mes")) \
    .withColumn("facturacio_mes_anterior", lag("facturacio_mes", 1).over(window_temps)) \
    .withColumn("facturacio_mes_seguent",  lead("facturacio_mes", 1).over(window_temps)) \
    .withColumn("variacio_pct",
        spark_round(
            (col("facturacio_mes") - col("facturacio_mes_anterior"))
            / col("facturacio_mes_anterior") * 100,
            2
        )
    )

df_tendencia.show(20)

Funcions d'agregació sobre finestres

Les funcions d'agregació (sum, avg, min, max) sobre finestres permeten calcular acumulats i mitjanes mòbils:

# Finestra acumulada (des del principi fins a la fila actual)
window_acumulat = Window \
    .partitionBy("producte") \
    .orderBy("mes_comanda") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Finestra de 3 mesos (mes anterior, actual, mes seguent)
window_rolling = Window \
    .partitionBy("producte") \
    .orderBy("mes_comanda") \
    .rowsBetween(-1, 1)

df_acumulats = df_tendencia \
    .withColumn("facturacio_acumulada",
                sum("facturacio_mes").over(window_acumulat)) \
    .withColumn("mitjana_movil_3m",
                spark_round(avg("facturacio_mes").over(window_rolling), 2)) \
    .withColumn("maxim_historic",
                max("facturacio_mes").over(window_acumulat))

df_acumulats.select(
    "producte", "mes_comanda", "facturacio_mes",
    "facturacio_acumulada", "mitjana_movil_3m"
).show(24)

Joins entre DataFrames

Tipus de join

df_vendes  = spark.read.parquet("/dades/vendes/")
df_clients = spark.read.parquet("/dades/clients/")
df_productes = spark.read.parquet("/dades/productes/")

# Inner join: només files que coincideixen en ambdós DataFrames
df_inner = df_vendes.join(df_clients, on="id_client", how="inner")

# Left join: totes les vendes, els clients que no existeixin seran NULL
df_left = df_vendes.join(df_clients, on="id_client", how="left")

# Right join: tots els clients, inclosos els que no han comprat mai
df_right = df_vendes.join(df_clients, on="id_client", how="right")

# Full outer join: totes les files d'ambdós costats
df_full = df_vendes.join(df_clients, on="id_client", how="full")

# Left anti join: vendes de clients que NO estan a la taula de clients
df_anti = df_vendes.join(df_clients, on="id_client", how="left_anti")

# Left semi join: vendes de clients que SÍ estan a la taula (equivalent a EXISTS)
df_semi = df_vendes.join(df_clients, on="id_client", how="left_semi")

Join amb múltiples condicions

# Join amb condició composta
df_complet = df_vendes \
    .join(
        df_productes,
        (df_vendes["id_producte"] == df_productes["id"]) &
        (df_vendes["versio"]      == df_productes["versio_actual"]),
        how="left"
    ) \
    .select(
        df_vendes["id_comanda"],
        df_vendes["import_total"],
        df_productes["nom_producte"],
        df_productes["categoria"]
    )

Broadcast Join: optimitzar joins amb taules petites

Quan un dels DataFrames és petit (menys de 100–200 MB), el broadcast join és molt més eficient que el sort-merge join per defecte: Spark envia una còpia de la taula petita a cada executor, evitant el shuffle de la taula gran.

from pyspark.sql.functions import broadcast

# Forçar broadcast join (Spark ho fa automàticament amb AQE si la taula és prou petita)
df_resultat = df_vendes_gran \
    .join(
        broadcast(df_categories_petit),
        on="id_categoria",
        how="left"
    )

# Configurar el llindar per a broadcast automàtic (per defecte: 10 MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024))  # 100 MB

Quan usar broadcast?

  • Taula de dimensió (categories, països, productes) unida a una taula de fets gran
  • Diccionaris o taules de lookup amb centenars de milers de files, no milions

Skew Joins: dades desequilibrades

El data skew (dades desequilibrades) és quan un valor de la clau de join apareix en moltes més files que els altres (p. ex., un client que té el 80% de les comandes). Provoca que un executor es sobrecarregui mentre els altres esperen.

# Diagnosi: comptar files per valor de la clau
df_vendes.groupBy("id_client").count() \
    .orderBy(col("count").desc()) \
    .show(10)

# Solució 1: AQE (automàtic a Spark 3.x)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")

# Solució 2: Salt (afegir un número aleatori a la clau per distribuir la càrrega)
from pyspark.sql.functions import concat_ws, lit, floor, rand

SALT_FACTOR = 10

df_vendes_salted = df_vendes \
    .withColumn("salt", (rand() * SALT_FACTOR).cast("int")) \
    .withColumn("id_client_salted", concat_ws("_", col("id_client"), col("salt")))

df_clients_salted = df_clients \
    .withColumn("salt_range",
        explode(array([lit(i) for i in range(SALT_FACTOR)]))
    ) \
    .withColumn("id_client_salted",
        concat_ws("_", col("id_client"), col("salt_range"))
    )

df_resultat = df_vendes_salted \
    .join(df_clients_salted, on="id_client_salted", how="left") \
    .drop("salt", "id_client_salted", "salt_range")

Join entre stream i batch

En Structured Streaming, és possible fer joins entre un stream i un DataFrame estàtic:

# Taula de referència (batch)
df_productes_ref = spark.read.parquet("/dades/productes/")

# Stream d'events
df_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load()

# Join stream + estàtic: s'actualitza automàticament a cada micro-batch
df_enriquit = df_events \
    .join(broadcast(df_productes_ref), on="id_producte", how="left")

df_enriquit.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/events_enriquits/") \
    .start("/dades/delta/events_enriquits/")

Miniactivitat: Anàlisi de cohorts

Una anàlisi de cohorts agrupa clients pel mes de la seva primera compra i segueix el seu comportament al llarg del temps.

  1. Carrega un DataFrame de vendes amb id_client, data_comanda, import_total.
  2. Usa una window function per trobar la data_primera_compra de cada client (min sobre una finestra particionada per id_client).
  3. Calcula el mes_cohort (mes de la primera compra) i el mes_relatiu (quants mesos han passat des de la primera compra).
  4. Agrega: per a cada cohort i mes relatiu, calcula el nombre de clients actius i la facturació.
  5. Fes un pivot per mes relatiu i visualitza la retenció per cohort.