DataFrames i Spark SQL
Introducció
El DataFrame és l'abstracció de Spark que ha substituït pràcticament als RDD en tots els casos d'ús. Afegeix l'esquema (noms i tipus de columnes) i usa el motor d'optimització Catalyst per generar plans d'execució eficients.
A diferència dels RDD, Spark "entén" l'estructura de les dades d'un DataFrame i pot aplicar optimitzacions automàtiques: eliminació de columnes innecessàries, pushdown de filtres al sistema de fitxers, reordenació d'operacions, etc.
Crear i llegir DataFrames
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, year, month
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
spark = SparkSession.builder \
.appName("ExempleDataFrame") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "8") \
.getOrCreate()
# Llegir un CSV amb inferència automàtica d'esquema
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("encoding", "UTF-8") \
.csv("/dades/vendes_2024.csv")
# Veure l'esquema
df.printSchema()
# root
# |-- id_comanda: integer (nullable = true)
# |-- data_comanda: date (nullable = true)
# |-- id_client: integer (nullable = true)
# |-- producte: string (nullable = true)
# |-- categoria: string (nullable = true)
# |-- quantitat: integer (nullable = true)
# |-- preu_unitari: double (nullable = true)
# Estadístiques bàsiques
df.describe("quantitat", "preu_unitari").show()
Definir l'esquema explícitament
Per a millor rendiment i robustesa, definir l'esquema manualment és preferible a la inferència automàtica:
schema_vendes = StructType([
StructField("id_comanda", IntegerType(), nullable=False),
StructField("data_comanda", StringType(), nullable=True),
StructField("id_client", IntegerType(), nullable=True),
StructField("producte", StringType(), nullable=True),
StructField("categoria", StringType(), nullable=True),
StructField("quantitat", IntegerType(), nullable=True),
StructField("preu_unitari", DoubleType(), nullable=True),
])
df = spark.read \
.option("header", "true") \
.schema(schema_vendes) \
.csv("/dades/vendes_2024.csv")
Transformacions bàsiques
# Afegir columnes calculades
df_enriquit = df \
.withColumn("import_total", col("quantitat") * col("preu_unitari")) \
.withColumn("any_comanda", year(col("data_comanda"))) \
.withColumn("mes_comanda", month(col("data_comanda"))) \
.withColumn("segment_preu",
when(col("preu_unitari") < 50, "baix")
.when(col("preu_unitari") < 200, "mig")
.otherwise("alt")
)
# Filtrar, seleccionar i renomenar
df_net = df_enriquit \
.filter(col("import_total") > 0) \
.filter(col("any_comanda") == 2024) \
.select(
col("id_comanda"),
col("producte"),
col("categoria"),
col("import_total"),
col("segment_preu")
) \
.withColumnRenamed("import_total", "import_eur")
# Eliminar duplicats
df_deduplicat = df.dropDuplicates(["id_comanda"])
# Gestionar nuls
df_net = df \
.dropna(subset=["producte", "preu_unitari"]) \
.fillna({"quantitat": 1, "categoria": "Desconeguda"})
Guardar DataFrames
# Guardar en format Parquet, particionat per any i mes
df_enriquit.write \
.mode("overwrite") \
.partitionBy("any_comanda", "mes_comanda") \
.parquet("/resultats/vendes_enriquides/")
# Guardar com a CSV
df_enriquit.write \
.mode("overwrite") \
.option("header", "true") \
.csv("/resultats/vendes.csv")
# Guardar com a JSON
df_enriquit.coalesce(1).write \
.mode("overwrite") \
.json("/resultats/vendes.json")
Parquet és el format recomanat
Parquet és el format estàndard per a Big Data: comprimit per columnes, compatible amb particionament, i compatible amb Hive, Spark, Athena, Presto i DuckDB.
Spark SQL: consultes SQL sobre DataFrames
Spark SQL permet escriure consultes SQL directament sobre DataFrames registrats com a vistes temporals:
# Registrar el DataFrame com a vista SQL temporal
df_enriquit.createOrReplaceTempView("vendes")
# Fer consultes SQL directament
top_productes = spark.sql("""
SELECT
producte,
categoria,
COUNT(*) AS num_comandes,
ROUND(SUM(import_total), 2) AS facturacio_total,
ROUND(AVG(quantitat), 1) AS quantitat_mitja
FROM vendes
WHERE any_comanda = 2024
AND import_total > 0
GROUP BY producte, categoria
HAVING COUNT(*) >= 10
ORDER BY facturacio_total DESC
LIMIT 20
""")
top_productes.show(truncate=False)
Subqueries i funcions de finestra en SQL
# Subquery i window functions
spark.sql("""
SELECT
producte,
mes_comanda,
facturacio,
SUM(facturacio) OVER (
PARTITION BY producte
ORDER BY mes_comanda
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS facturacio_acumulada,
facturacio / SUM(facturacio) OVER (PARTITION BY producte) * 100
AS percentatge_anual
FROM (
SELECT producte, mes_comanda, SUM(import_total) AS facturacio
FROM vendes
WHERE any_comanda = 2024
GROUP BY producte, mes_comanda
) t
ORDER BY producte, mes_comanda
""").show(30)
L'optimitzador Catalyst i AQE
Catalyst
L'optimitzador Catalyst és un dels components més importants de Spark SQL. Quan escrius una consulta (en SQL o amb l'API DataFrame), Catalyst:
- Anàlisi: resol els noms de les columnes i les taules
- Optimització lògica: aplica regles d'optimització (eliminació de projeccions, predicat pushdown, eliminació de subqueries...)
- Pla físic: genera múltiples plans físics possibles i en tria el millor usant estadístiques
- Generació de codi: genera bytecode Java optimitzat (Tungsten engine)
# Veure el pla d'execució d'una consulta
query = spark.sql("""
SELECT c.nom_client, SUM(v.import_total) AS total_gastat
FROM vendes v
JOIN clients c ON v.id_client = c.id_client
WHERE v.any_comanda = 2024
GROUP BY c.nom_client
HAVING SUM(v.import_total) > 10000
""")
# Pla simplificat
query.explain()
# Pla complet amb tots els detalls
query.explain("extended")
# Pla en format visual (millor per a debugging)
query.explain("formatted")
Adaptive Query Execution (AQE)
Introduït a Spark 3.0 i millorat a 3.5, AQE permet que el pla d'execució s'adapti dinàmicament en temps d'execució basant-se en estadístiques reals de les dades:
# Habilitar AQE (ja activat per defecte a Spark 3.2+)
spark = SparkSession.builder \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.getOrCreate()
AQE resol automàticament tres problemes clàssics: - Skew joins: si una partició és molt més gran que les altres (dades desequilibrades), AQE la divideix automàticament - Particions buides: elimina les particions buides que Spark crearia per defecte - Canvi de join strategy: si una taula resulta ser petita (després de filtrar), AQE pot convertir un sort-merge join en un broadcast join
Miniactivitat: Comparar plans d'execució
Crea un DataFrame amb dades de vendes fictícies i escriu la mateixa consulta de dues maneres: amb l'API DataFrame i amb Spark SQL. Crida .explain("formatted") en ambdós casos i compara els plans d'execució generats.
Afegeix un filtre sobre una columna i observa com Catalyst aplica el predicat pushdown (el filtre apareix just sobre la lectura del fitxer, no al final).