Salta el contingut

Structured Streaming

Introducció

Spark Structured Streaming és l'API de processament de dades en temps real de Spark. En lloc de pensar en "streams de dades", Structured Streaming et permet pensar en una taula que creix contínuament: cada cop que arriben noves dades, s'afegeixen com a noves files i el resultat s'actualitza automàticament.

graph LR
    KAFKA[(Kafka\nTopic)] -->|micro-batch| SPARK[Structured\nStreaming]
    FILES[/Directori\nde fitxers/] -->|nous fitxers| SPARK
    SPARK -->|update| CONSOLE[Consola]
    SPARK -->|update| DELTA[(Delta Lake)]
    SPARK -->|update| KAFKA2[(Kafka\nOutput)]

Avantatges respecte al batch tradicional: - Latència molt baixa: resultats actualitzats en segons o fraccions de segon - Mateixa API que batch: el codi de transformació és idèntic al codi batch amb DataFrames - Fault tolerance: recuperació automàtica de fallades gràcies als checkpoints


Fonts de dades (sources)

Kafka (la font més habitual en producció)

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, sum, col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("StreamingVendes") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# Llegir des de Kafka
df_stream_kafka = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "vendes") \
    .option("startingOffsets", "latest") \
    .load()

# El DataFrame de Kafka té columnes fixes: key, value, topic, partition, offset, timestamp
# El camp 'value' conté el missatge serialitzat (normalment JSON)
schema_vendes = StructType() \
    .add("timestamp_event", TimestampType()) \
    .add("id_usuari", StringType()) \
    .add("producte", StringType()) \
    .add("preu", DoubleType()) \
    .add("pais", StringType())

df_parsed = df_stream_kafka \
    .select(from_json(col("value").cast("string"), schema_vendes).alias("data")) \
    .select("data.*")

Directori de fitxers (ideal per a proves)

# Llegir des d'un directori: Spark processa automàticament els fitxers nous
schema_vendes = StructType() \
    .add("timestamp_event", TimestampType()) \
    .add("id_usuari",       StringType()) \
    .add("producte",        StringType()) \
    .add("preu",            DoubleType()) \
    .add("pais",            StringType())

df_stream_fitxers = spark \
    .readStream \
    .option("maxFilesPerTrigger", 10) \
    .schema(schema_vendes) \
    .json("/dades/streaming/vendes/")

Socket TCP (per a prototips i proves)

# Llegir des d'un socket TCP (NOMÉS per a proves - no usar en producció)
df_socket = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

Watermarking i finestres de temps

Watermark: gestionar dades tardanes

En streaming, les dades poden arribar fora d'ordre (un event del 14:00h pot arribar a les 14:10h). El watermark defineix quant temps esperem dades tardanes:

# Watermark de 10 minuts: acceptem events fins a 10 minuts de retard
df_amb_watermark = df_stream_fitxers \
    .withWatermark("timestamp_event", "10 minutes")

Finestres temporals (tumbling i sliding)

# Finestra tumbling de 5 minuts (sense solapament)
vendes_tumbling = df_stream_fitxers \
    .withWatermark("timestamp_event", "10 minutes") \
    .groupBy(
        window(col("timestamp_event"), "5 minutes"),
        col("pais")
    ) \
    .agg(
        count("*").alias("num_transaccions"),
        sum("preu").alias("import_total")
    )

# Finestra sliding de 5 minuts, actualitzada cada minut (amb solapament)
vendes_sliding = df_stream_fitxers \
    .withWatermark("timestamp_event", "10 minutes") \
    .groupBy(
        window(col("timestamp_event"), "5 minutes", "1 minute"),  # finestra, slide
        col("pais")
    ) \
    .agg(
        count("*").alias("num_transaccions"),
        sum("preu").alias("import_total")
    )
gantt
    title Finestres de temps (exemple simplificat)
    dateFormat mm:ss
    axisFormat %M:%S

    section Tumbling 5min
    Finestra 00-05   :00:00, 5m
    Finestra 05-10   :05:00, 5m
    Finestra 10-15   :10:00, 5m

    section Sliding 5min cada 1min
    Finestra 00-05   :00:00, 5m
    Finestra 01-06   :01:00, 5m
    Finestra 02-07   :02:00, 5m

Transformacions sobre streams

Les transformacions sobre streams son idèntiques a les de batch: filter, select, withColumn, groupBy, etc.:

from pyspark.sql.functions import upper, round as spark_round

# Transformar el stream
df_transformat = df_stream_fitxers \
    .filter(col("preu") > 0) \
    .withColumn("preu_arrodonit", spark_round(col("preu"), 2)) \
    .withColumn("pais_upper", upper(col("pais"))) \
    .select("timestamp_event", "id_usuari", "producte", "preu_arrodonit", "pais_upper")

Join entre stream i dades estàtiques

# Taula de referència estàtica (s'actualitza amb cada micro-batch)
df_paisos = spark.read.parquet("/dades/referencia/paisos/")

# Join stream + estàtic
df_enriquit = df_stream_fitxers \
    .join(df_paisos, on="pais", how="left") \
    .select("timestamp_event", "producte", "preu", "nom_pais", "divisa")

Sinks de sortida (outputs)

Consola (per a proves i debugging)

query = df_transformat \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 20) \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()

Delta Lake (el sink recomanat en producció)

query = vendes_sliding \
    .writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/vendes_sliding/") \
    .option("path", "/dades/delta/vendes_streaming/") \
    .trigger(processingTime="30 seconds") \
    .start()

Kafka (re-publicar resultats)

query = df_enriquit \
    .select(
        col("id_usuari").alias("key"),
        to_json(struct("*")).alias("value")
    ) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "vendes_enriquides") \
    .option("checkpointLocation", "/checkpoints/vendes_kafka_out/") \
    .start()

Output modes

Mode Descripció Compatible amb
append Escriu només les noves files Transformacions sense agregació
update Escriu les files que han canviat Agregacions amb watermark
complete Re-escriu la taula sencera Agregacions sense watermark (ATENCIÓ: memoria)

Exemple complet: pipeline de vendes en temps real

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    window, count, sum, col, from_json, to_json, struct
)
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("PipelineVendesRealTime") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Esquema dels events Kafka
schema = StructType() \
    .add("ts",      TimestampType()) \
    .add("usuari",  StringType()) \
    .add("producte",StringType()) \
    .add("preu",    DoubleType()) \
    .add("pais",    StringType())

# 1. Llegir des de Kafka
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "vendes-raw") \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("e")) \
    .select("e.*")

# 2. Filtrar i transformar
df_net = df_raw \
    .filter(col("preu") > 0) \
    .withWatermark("ts", "5 minutes")

# 3. Agregar per finestres de 1 minut
df_agg = df_net \
    .groupBy(window(col("ts"), "1 minute"), col("pais")) \
    .agg(
        count("*").alias("transaccions"),
        sum("preu").alias("import_eur")
    )

# 4. Escriure a Delta Lake
query = df_agg \
    .writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/vendes_agg/") \
    .option("path", "/dades/delta/vendes_1min/") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Gestió de queries i monitorització

# Llistar totes les queries actives
spark.streams.active

# Obtenir l'estat d'una query
print(query.status)
print(query.lastProgress)

# Aturar una query
query.stop()

# Esperar que acabi (o que hi hagi un error)
query.awaitTermination(timeout=300)  # 5 minuts màxim

Miniactivitat: Word Count en streaming

  1. Obre un terminal i crea un servidor TCP senzill: nc -lk 9999
  2. Configura Spark per llegir des de socket (host=localhost, port=9999)
  3. Implementa un word count en streaming: divideix les línies en paraules i compta-les
  4. Usa l'output mode complete i el sink console
  5. Escriu paraules al terminal i observa com les comptades s'actualitzen en temps real

Reflexiona: per quin motiu complete és perillós en un stream de llarga durada?