Salta el contingut

DataFrames i Spark SQL

Introducció

El DataFrame és l'abstracció de Spark que ha substituït pràcticament als RDD en tots els casos d'ús. Afegeix l'esquema (noms i tipus de columnes) i usa el motor d'optimització Catalyst per generar plans d'execució eficients.

A diferència dels RDD, Spark "entén" l'estructura de les dades d'un DataFrame i pot aplicar optimitzacions automàtiques: eliminació de columnes innecessàries, pushdown de filtres al sistema de fitxers, reordenació d'operacions, etc.


Crear i llegir DataFrames

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, year, month
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

spark = SparkSession.builder \
    .appName("ExempleDataFrame") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Llegir un CSV amb inferència automàtica d'esquema
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .csv("/dades/vendes_2024.csv")

# Veure l'esquema
df.printSchema()
# root
#  |-- id_comanda: integer (nullable = true)
#  |-- data_comanda: date (nullable = true)
#  |-- id_client: integer (nullable = true)
#  |-- producte: string (nullable = true)
#  |-- categoria: string (nullable = true)
#  |-- quantitat: integer (nullable = true)
#  |-- preu_unitari: double (nullable = true)

# Estadístiques bàsiques
df.describe("quantitat", "preu_unitari").show()

Definir l'esquema explícitament

Per a millor rendiment i robustesa, definir l'esquema manualment és preferible a la inferència automàtica:

schema_vendes = StructType([
    StructField("id_comanda",   IntegerType(), nullable=False),
    StructField("data_comanda", StringType(),  nullable=True),
    StructField("id_client",    IntegerType(), nullable=True),
    StructField("producte",     StringType(),  nullable=True),
    StructField("categoria",    StringType(),  nullable=True),
    StructField("quantitat",    IntegerType(), nullable=True),
    StructField("preu_unitari", DoubleType(),  nullable=True),
])

df = spark.read \
    .option("header", "true") \
    .schema(schema_vendes) \
    .csv("/dades/vendes_2024.csv")

Transformacions bàsiques

# Afegir columnes calculades
df_enriquit = df \
    .withColumn("import_total", col("quantitat") * col("preu_unitari")) \
    .withColumn("any_comanda", year(col("data_comanda"))) \
    .withColumn("mes_comanda", month(col("data_comanda"))) \
    .withColumn("segment_preu",
        when(col("preu_unitari") < 50, "baix")
        .when(col("preu_unitari") < 200, "mig")
        .otherwise("alt")
    )

# Filtrar, seleccionar i renomenar
df_net = df_enriquit \
    .filter(col("import_total") > 0) \
    .filter(col("any_comanda") == 2024) \
    .select(
        col("id_comanda"),
        col("producte"),
        col("categoria"),
        col("import_total"),
        col("segment_preu")
    ) \
    .withColumnRenamed("import_total", "import_eur")

# Eliminar duplicats
df_deduplicat = df.dropDuplicates(["id_comanda"])

# Gestionar nuls
df_net = df \
    .dropna(subset=["producte", "preu_unitari"]) \
    .fillna({"quantitat": 1, "categoria": "Desconeguda"})

Guardar DataFrames

# Guardar en format Parquet, particionat per any i mes
df_enriquit.write \
    .mode("overwrite") \
    .partitionBy("any_comanda", "mes_comanda") \
    .parquet("/resultats/vendes_enriquides/")

# Guardar com a CSV
df_enriquit.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/resultats/vendes.csv")

# Guardar com a JSON
df_enriquit.coalesce(1).write \
    .mode("overwrite") \
    .json("/resultats/vendes.json")

Parquet és el format recomanat

Parquet és el format estàndard per a Big Data: comprimit per columnes, compatible amb particionament, i compatible amb Hive, Spark, Athena, Presto i DuckDB.


Spark SQL: consultes SQL sobre DataFrames

Spark SQL permet escriure consultes SQL directament sobre DataFrames registrats com a vistes temporals:

# Registrar el DataFrame com a vista SQL temporal
df_enriquit.createOrReplaceTempView("vendes")

# Fer consultes SQL directament
top_productes = spark.sql("""
    SELECT
        producte,
        categoria,
        COUNT(*) AS num_comandes,
        ROUND(SUM(import_total), 2) AS facturacio_total,
        ROUND(AVG(quantitat), 1) AS quantitat_mitja
    FROM vendes
    WHERE any_comanda = 2024
      AND import_total > 0
    GROUP BY producte, categoria
    HAVING COUNT(*) >= 10
    ORDER BY facturacio_total DESC
    LIMIT 20
""")

top_productes.show(truncate=False)

Subqueries i funcions de finestra en SQL

# Subquery i window functions
spark.sql("""
    SELECT
        producte,
        mes_comanda,
        facturacio,
        SUM(facturacio) OVER (
            PARTITION BY producte
            ORDER BY mes_comanda
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS facturacio_acumulada,
        facturacio / SUM(facturacio) OVER (PARTITION BY producte) * 100
            AS percentatge_anual
    FROM (
        SELECT producte, mes_comanda, SUM(import_total) AS facturacio
        FROM vendes
        WHERE any_comanda = 2024
        GROUP BY producte, mes_comanda
    ) t
    ORDER BY producte, mes_comanda
""").show(30)

L'optimitzador Catalyst i AQE

Catalyst

L'optimitzador Catalyst és un dels components més importants de Spark SQL. Quan escrius una consulta (en SQL o amb l'API DataFrame), Catalyst:

  1. Anàlisi: resol els noms de les columnes i les taules
  2. Optimització lògica: aplica regles d'optimització (eliminació de projeccions, predicat pushdown, eliminació de subqueries...)
  3. Pla físic: genera múltiples plans físics possibles i en tria el millor usant estadístiques
  4. Generació de codi: genera bytecode Java optimitzat (Tungsten engine)
# Veure el pla d'execució d'una consulta
query = spark.sql("""
    SELECT c.nom_client, SUM(v.import_total) AS total_gastat
    FROM vendes v
    JOIN clients c ON v.id_client = c.id_client
    WHERE v.any_comanda = 2024
    GROUP BY c.nom_client
    HAVING SUM(v.import_total) > 10000
""")

# Pla simplificat
query.explain()

# Pla complet amb tots els detalls
query.explain("extended")

# Pla en format visual (millor per a debugging)
query.explain("formatted")

Adaptive Query Execution (AQE)

Introduït a Spark 3.0 i millorat a 3.5, AQE permet que el pla d'execució s'adapti dinàmicament en temps d'execució basant-se en estadístiques reals de les dades:

# Habilitar AQE (ja activat per defecte a Spark 3.2+)
spark = SparkSession.builder \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .getOrCreate()

AQE resol automàticament tres problemes clàssics: - Skew joins: si una partició és molt més gran que les altres (dades desequilibrades), AQE la divideix automàticament - Particions buides: elimina les particions buides que Spark crearia per defecte - Canvi de join strategy: si una taula resulta ser petita (després de filtrar), AQE pot convertir un sort-merge join en un broadcast join


Miniactivitat: Comparar plans d'execució

Crea un DataFrame amb dades de vendes fictícies i escriu la mateixa consulta de dues maneres: amb l'API DataFrame i amb Spark SQL. Crida .explain("formatted") en ambdós casos i compara els plans d'execució generats.

Afegeix un filtre sobre una columna i observa com Catalyst aplica el predicat pushdown (el filtre apareix just sobre la lectura del fitxer, no al final).