Salta el contingut

Apache Spark

Introducció

Apache Spark va néixer el 2009 al laboratori AMPLab de la Universitat de Berkeley com a resposta a les limitacions de MapReduce. La idea central era deceptivament simple: i si en lloc d'escriure i llegir d'HDFS entre cada pas del pipeline, mantinguéssim les dades en memòria RAM? El resultat va ser un sistema de processament distribuït fins a 100 vegades més ràpid que MapReduce per a algoritmes iteratius.

El 2025, Apache Spark 3.5 és l'estàndard de facto per al processament de Big Data. El 90% de les ofertes de feina de Data Engineer mencionen Spark. Databricks, la empresa fundada pels creadors de Spark, va assolir una valoració de 43.000 milions de dòlars. PySpark (l'API Python de Spark) domina àmpliament sobre Scala.


Arquitectura de Spark

graph TB
    subgraph Client
        DR[Driver Program\n- SparkContext/SparkSession\n- DAG Scheduler\n- Task Scheduler]
    end

    subgraph ClusterManager["Cluster Manager (YARN / Kubernetes / Standalone)"]
        CM[Gestor de recursos\nAssigna contenidors]
    end

    subgraph Workers
        subgraph Worker1["Worker Node 1"]
            EX1[Executor 1\n- Tasks\n- Memoria cache\n- Disk spill]
        end
        subgraph Worker2["Worker Node 2"]
            EX2[Executor 2\n- Tasks\n- Memoria cache\n- Disk spill]
        end
        subgraph Worker3["Worker Node 3"]
            EX3[Executor 3\n- Tasks\n- Memoria cache\n- Disk spill]
        end
    end

    DR -->|demana recursos| CM
    CM -->|assigna| EX1
    CM -->|assigna| EX2
    CM -->|assigna| EX3
    DR -->|envia tasks| EX1
    DR -->|envia tasks| EX2
    DR -->|envia tasks| EX3
    EX1 -->|resultat final| DR

Driver: el procés principal de l'aplicació Spark. Crea la SparkSession, construeix el pla d'execució (DAG) i coordina els executors. Si el Driver mor, l'aplicació falla.

Executors: els processos que executen les tasques reals. Cada executor té una memòria assignada i un conjunt de CPU cores. Llegeixen i escriuen les dades, realitzen les transformacions i comuniquen els resultats al Driver.

Cluster Manager: el sistema que gestiona els recursos del clúster. Spark suporta YARN (Hadoop), Kubernetes, Mesos i el seu propi Standalone mode.


RDD, DataFrame i Dataset

RDD: Resilient Distributed Dataset

L'RDD és la estructura de dades primitiva de Spark. Un RDD és una col·lecció distribuïda i tolerant a fallades de registres que pot ser processada en paral·lel.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExempleRDD") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

# Crear un RDD des d'una llista local
nombres = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], numSlices=3)

# Transformacions (lazy - no s'executen fins a una acció)
parells = nombres.filter(lambda x: x % 2 == 0)    # [2, 4, 6, 8, 10]
quadrats = parells.map(lambda x: x ** 2)            # [4, 16, 36, 64, 100]

# Acció (dispara l'execució)
resultat = quadrats.collect()   # porta les dades al Driver
print(resultat)  # [4, 16, 36, 64, 100]

# Agregació
total = nombres.reduce(lambda a, b: a + b)   # 55
print(f"Suma: {total}")

# RDD de parells (key, value) per a operacions de group
vendes_rdd = sc.parallelize([
    ("Barcelona", 1500), ("Madrid", 2200), ("Barcelona", 800),
    ("Valencia", 950), ("Madrid", 1100), ("Valencia", 750)
])

vendes_per_ciutat = vendes_rdd \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)

print(vendes_per_ciutat.collect())
# [('Madrid', 3300), ('Barcelona', 2300), ('Valencia', 1700)]

Limitació dels RDD: sense informació de l'esquema, Spark no pot optimitzar l'execució. Tot s'executa com a codi Python (lent per la serialització Python ↔ JVM) i no hi ha inspecció del contingut.

DataFrame: la revolució de Spark 2.0

El DataFrame és l'abstracció de Spark que ha substituït pràcticament als RDD en tots els casos d'ús. Afegeix l'esquema (noms i tipus de columnes) i usa el motor d'optimització Catalyst per generar plans d'execució eficients.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, year, month
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import datetime

spark = SparkSession.builder \
    .appName("ExempleDataFrame") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Llegir un CSV amb inferència automàtica d'esquema
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .csv("/dades/vendes_2024.csv")

# Veure l'esquema
df.printSchema()
# root
#  |-- id_comanda: integer (nullable = true)
#  |-- data_comanda: date (nullable = true)
#  |-- id_client: integer (nullable = true)
#  |-- producte: string (nullable = true)
#  |-- categoria: string (nullable = true)
#  |-- quantitat: integer (nullable = true)
#  |-- preu_unitari: double (nullable = true)

# Estadístiques bàsiques
df.describe("quantitat", "preu_unitari").show()

# Transformació: afegir columna calculada
df_enriquit = df \
    .withColumn("import_total", col("quantitat") * col("preu_unitari")) \
    .withColumn("any_comanda", year(col("data_comanda"))) \
    .withColumn("mes_comanda", month(col("data_comanda"))) \
    .withColumn("segment_preu",
        when(col("preu_unitari") < 50, "baix")
        .when(col("preu_unitari") < 200, "mig")
        .otherwise("alt")
    )

# Agregació: vendes per categoria i mes
vendes_mensuals = df_enriquit \
    .groupBy("any_comanda", "mes_comanda", "categoria") \
    .agg(
        count("id_comanda").alias("num_comandes"),
        sum("quantitat").alias("unitats_venudes"),
        sum("import_total").alias("facturacio"),
        avg("preu_unitari").alias("preu_mig")
    ) \
    .orderBy("any_comanda", "mes_comanda", col("facturacio").desc())

vendes_mensuals.show(20)

# Join entre dos DataFrames
df_clients = spark.read.parquet("/dades/clients/")

df_complet = df_enriquit \
    .join(df_clients, on="id_client", how="left") \
    .select(
        "id_comanda", "data_comanda",
        "nom_client", "pais_client",
        "producte", "categoria",
        "import_total"
    )

# Guardar en format Parquet, particionat per any i mes
df_enriquit.write \
    .mode("overwrite") \
    .partitionBy("any_comanda", "mes_comanda") \
    .parquet("/resultats/vendes_enriquides/")

Spark SQL: SQL natiu en Spark

# Registrar el DataFrame com a vista SQL temporal
df_enriquit.createOrReplaceTempView("vendes")

# Fer consultes SQL directament
top_productes = spark.sql("""
    SELECT
        producte,
        categoria,
        COUNT(*) AS num_comandes,
        ROUND(SUM(import_total), 2) AS facturacio_total,
        ROUND(AVG(quantitat), 1) AS quantitat_mitja
    FROM vendes
    WHERE any_comanda = 2024
      AND import_total > 0
    GROUP BY producte, categoria
    HAVING COUNT(*) >= 10
    ORDER BY facturacio_total DESC
    LIMIT 20
""")

top_productes.show(truncate=False)

# Subquery i window functions
spark.sql("""
    SELECT
        producte,
        mes_comanda,
        facturacio,
        SUM(facturacio) OVER (
            PARTITION BY producte
            ORDER BY mes_comanda
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS facturacio_acumulada,
        facturacio / SUM(facturacio) OVER (PARTITION BY producte) * 100
            AS percentatge_anual
    FROM (
        SELECT producte, mes_comanda, SUM(import_total) AS facturacio
        FROM vendes
        WHERE any_comanda = 2024
        GROUP BY producte, mes_comanda
    ) t
    ORDER BY producte, mes_comanda
""").show(30)

L'optimitzador Catalyst i AQE

Catalyst

L'optimitzador Catalyst és un dels components més importants de Spark SQL. Quan escrius una consulta (en SQL o amb l'API DataFrame), Catalyst:

  1. Analisi: resol els noms de les columnes i les taules
  2. Optimització lògica: aplica regles d'optimització (eliminació de projeccions, predicat pushdown, eliminació de subqueries...)
  3. Pla físic: genera múltiples plans físics possibles i en tria el millor usant estadístiques
  4. Generació de codi: genera bytecode Java optimitzat (Tungsten engine)
# Veure el pla d'execució d'una consulta
query = spark.sql("""
    SELECT c.nom_client, SUM(v.import_total) AS total_gastat
    FROM vendes v
    JOIN clients c ON v.id_client = c.id_client
    WHERE v.any_comanda = 2024
    GROUP BY c.nom_client
    HAVING SUM(v.import_total) > 10000
""")

# Pla simplificat
query.explain()

# Pla complet amb tots els detalls
query.explain("extended")

# Pla en format visual (millor per a debugging)
query.explain("formatted")

Adaptive Query Execution (AQE)

Introduït a Spark 3.0 i millorat a 3.5, AQE permet que el pla d'execució s'adapti dinàmicament en temps d'execució basant-se en estadístiques reals de les dades:

# Habilitar AQE (ja activat per defecte a Spark 3.2+)
spark = SparkSession.builder \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .getOrCreate()

AQE resol automàticament tres problemes clàssics: - Skew joins: si una partició és molt més gran que les altres (dades desequilibrades), AQE la divideix automàticament - Particions buides: elimina les particions buides que Spark crearia per defecte - Canvi de join strategy: si una taula resulta ser petita (després de filtrar), AQE pot convertir un sort-merge join en un broadcast join


Structured Streaming

Spark Structured Streaming és l'API de processament de dades en streaming de Spark. En lloc de pensar en "streams de dades", Structured Streaming et permet pensar en una taula que creix contínuament:

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, sum, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("StreamingVendes") \
    .getOrCreate()

# Definir l'esquema del stream
schema_vendes = StructType() \
    .add("timestamp_event", TimestampType()) \
    .add("id_usuari", StringType()) \
    .add("producte", StringType()) \
    .add("preu", DoubleType()) \
    .add("pais", StringType())

# Llegir des de Kafka (stream continu)
df_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "vendes") \
    .option("startingOffsets", "latest") \
    .load() \
    .selectExpr("CAST(value AS STRING) AS json_value") \
    .select(
        col("json_value").cast("string")
    )

# Alternativa: llegir des d'un directori (útil per a proves)
df_stream_fitxers = spark \
    .readStream \
    .option("maxFilesPerTrigger", 10) \
    .schema(schema_vendes) \
    .json("/dades/streaming/vendes/")

# Processar: agregació per finestra de temps i país
vendes_per_finestra = df_stream_fitxers \
    .withWatermark("timestamp_event", "10 minutes") \
    .groupBy(
        window(col("timestamp_event"), "5 minutes", "1 minute"),
        col("pais")
    ) \
    .agg(
        count("*").alias("num_transaccions"),
        sum("preu").alias("import_total")
    )

# Escriure els resultats
query = vendes_per_finestra \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Spark MLlib

MLlib és la biblioteca de Machine Learning distribuïda de Spark. La seva Pipeline API segueix el patró de scikit-learn:

from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Exemple: classificació de ressenyes de productes
df_ressenyes = spark.createDataFrame([
    ("El producte és excel·lent, molt recomanable!", "positiu"),
    ("Terribles. No compreu mai aquest producte.", "negatiu"),
    ("Qualitat mediocre, no val el preu que demanen.", "negatiu"),
    ("Impressionant! Supera totes les expectatives.", "positiu"),
], ["text", "etiqueta"])

# Pipeline de NLP
tokenizer = Tokenizer(inputCol="text", outputCol="paraules")
remover = StopWordsRemover(
    inputCol="paraules",
    outputCol="paraules_netes",
    stopWords=StopWordsRemover.loadDefaultStopWords("catalan")
)
hashingTF = HashingTF(inputCol="paraules_netes", outputCol="tf", numFeatures=10000)
idf = IDF(inputCol="tf", outputCol="features")
indexer = StringIndexer(inputCol="etiqueta", outputCol="label")
lr = LogisticRegression(maxIter=100, regParam=0.01)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, indexer, lr])

# Divisió train/test
train, test = df_ressenyes.randomSplit([0.8, 0.2], seed=42)

# Entrenar
model = pipeline.fit(train)

# Avaluar
prediccions = model.transform(test)
evaluador = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Precisió: {evaluador.evaluate(prediccions):.2%}")

Delta Lake

Delta Lake afegeix transaccions ACID, time travel i schema evolution sobre fitxers Parquet. És la tecnologia clau del paradigma Lakehouse.

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

RUTA_DELTA = "/dades/delta/clients"

# Escriure dades en format Delta
df_clients = spark.createDataFrame([
    (1, "Anna Garcia", "anna@email.com", "Barcelona"),
    (2, "Joan Puig", "joan@email.com", "Girona"),
    (3, "Maria Costa", "maria@email.com", "Tarragona"),
], ["id_client", "nom", "email", "ciutat"])

df_clients.write.format("delta").mode("overwrite").save(RUTA_DELTA)

# ACID: actualitzar un registre
delta_table = DeltaTable.forPath(spark, RUTA_DELTA)

delta_table.update(
    condition=col("id_client") == 2,
    set={"email": "'joan.puig@noudomini.com'",
         "ciutat": "'Lleida'"}
)

# ACID: eliminar un registre
delta_table.delete(condition=col("id_client") == 3)

# ACID: MERGE (upsert) - inserir o actualitzar
df_nous_clients = spark.createDataFrame([
    (2, "Joan Puig Actualitzat", "joan.nou@email.com", "Lleida"),  # existeix, actualitzar
    (4, "Pere Vidal", "pere@email.com", "Manresa"),                 # nou, inserir
], ["id_client", "nom", "email", "ciutat"])

delta_table.alias("original") \
    .merge(
        df_nous_clients.alias("nous"),
        "original.id_client = nous.id_client"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

# Time travel: llegir la versió original (versió 0)
df_original = spark.read \
    .format("delta") \
    .option("versionAsOf", 0) \
    .load(RUTA_DELTA)
print("Versió original:")
df_original.show()

# Historial de canvis
delta_table.history().show(truncate=False)

# Compactar fitxers petits (Z-ordering per a cerca eficient)
delta_table.optimize().executeZOrderBy("id_client")

# Eliminar versions antigues (mantenir les darreres 7 dies)
delta_table.vacuum(168)  # 168 hores = 7 dies

PySpark vs Scala Spark el 2025

La pregunta de si usar PySpark o Spark amb Scala és recurrent. La resposta el 2025 és clara per a la majoria de casos:

Usa PySpark quan: - El teu equip domina Python (el 90% dels Data Scientists i Data Engineers) - Necessites integrar biblioteques Python (pandas, scikit-learn, PyTorch, Hugging Face) - Vols iteració ràpida i notebooks interactius - El rendiment és acceptable per al teu cas d'ús (Spark 3.x amb Pandas on Spark és molt ràpid)

Considera Scala quan: - El rendiment és crític i l'overhead Python → JVM importa (casos rars amb Spark 3.5+) - El teu equip ve d'un background Java/Scala - Construeixes extensions personalitzades de Spark (custom transformers, optimitzadors)


Configuració i tuning de Spark

# Configuració d'una SparkSession per a producció
spark = SparkSession.builder \
    .appName("Pipeline_Produccio_2025") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "10") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.parquet.filterPushdown", "true") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .config("spark.default.parallelism", "40") \
    .getOrCreate()

Regles de tuning:

  1. Particions: spark.sql.shuffle.partitions per defecte és 200. Per a datasets petits (<10 GB), reduir a 8-50 millora el rendiment.
  2. Memòria: spark.executor.memory hauria de ser un 75% de la RAM del worker disponible per a Spark.
  3. Skew: si una partició és molt més gran, usar SKEW_HINT o AQE skew join.
  4. Cache: usar .cache() o .persist(StorageLevel.MEMORY_AND_DISK) per a DataFrames usats múltiples vegades.
  5. Broadcast: si un DataFrame és petit (<100 MB), forçar un broadcast join per evitar shuffles.
from pyspark.sql.functions import broadcast

# Forçar un broadcast join
df_resultat = df_gran \
    .join(broadcast(df_petit), on="id_categoria", how="left")

Databricks: la plataforma Spark empresarial

Databricks és la plataforma de Big Data i IA més usada a les empreses el 2025. Fundada pels creadors de Spark, ofereix una experiència molt superior al Spark "pur":

  • Databricks Runtime: versió de Spark optimitzada, típicament 5-10x més ràpida que open-source
  • Unity Catalog: governança unificada de dades, ML i IA generativa
  • Delta Sharing: compartir dades de forma segura entre organitzacions
  • Notebooks: entorn col·laboratiu en temps real (com Google Docs per a notebooks)
  • Workflows: orquestració de pipelines sense Airflow addicional
  • MLflow integrat: tracking d'experiments i desplegament de models

Databricks Community Edition

Databricks ofereix un account Community Edition gratuït a community.cloud.databricks.com que inclou un clúster de processament limitat però suficient per aprendre. Inclou exemples de datasets i notebooks de tutorial.


Exercici pràctic complet: Pipeline d'anàlisi de logs web

Consulta el fitxer practiques/practica_spark.md per a la pràctica completa amb Docker, que inclou:

  1. Configuració del contenidor PySpark + JupyterLab
  2. Càrrega del dataset NYC Taxi (Parquet, ~500 MB)
  3. Exploració amb Spark SQL
  4. Transformacions avançades (filtres, agregacions, joins)
  5. Optimització: repartition, cache, explain plan
  6. Exportació de resultats
  7. Visualització final amb Plotly

Miniactivitat: Explorar el Spark UI

Mentre executes qualsevol job Spark (local o en Docker), obre el navegador a http://localhost:4040. Aquí pots veure: - Tots els jobs en execució i completats - El DAG (grafo de dependències) de cada job - Les estadístiques de cada tasca (temps, bytes llegits/escrits, shuffle) - L'ús de memòria dels executors

Identifica quines transformacions causen un "shuffle" (redistribució de dades entre executors) i quines no. Els shuffles són els principals colls d'ampolla en Spark.