Structured Streaming
Introducció
Spark Structured Streaming és l'API de processament de dades en temps real de Spark. En lloc de pensar en "streams de dades", Structured Streaming et permet pensar en una taula que creix contínuament: cada cop que arriben noves dades, s'afegeixen com a noves files i el resultat s'actualitza automàticament.
graph LR
KAFKA[(Kafka\nTopic)] -->|micro-batch| SPARK[Structured\nStreaming]
FILES[/Directori\nde fitxers/] -->|nous fitxers| SPARK
SPARK -->|update| CONSOLE[Consola]
SPARK -->|update| DELTA[(Delta Lake)]
SPARK -->|update| KAFKA2[(Kafka\nOutput)]
Avantatges respecte al batch tradicional: - Latència molt baixa: resultats actualitzats en segons o fraccions de segon - Mateixa API que batch: el codi de transformació és idèntic al codi batch amb DataFrames - Fault tolerance: recuperació automàtica de fallades gràcies als checkpoints
Fonts de dades (sources)
Kafka (la font més habitual en producció)
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, sum, col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("StreamingVendes") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
# Llegir des de Kafka
df_stream_kafka = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "vendes") \
.option("startingOffsets", "latest") \
.load()
# El DataFrame de Kafka té columnes fixes: key, value, topic, partition, offset, timestamp
# El camp 'value' conté el missatge serialitzat (normalment JSON)
schema_vendes = StructType() \
.add("timestamp_event", TimestampType()) \
.add("id_usuari", StringType()) \
.add("producte", StringType()) \
.add("preu", DoubleType()) \
.add("pais", StringType())
df_parsed = df_stream_kafka \
.select(from_json(col("value").cast("string"), schema_vendes).alias("data")) \
.select("data.*")
Directori de fitxers (ideal per a proves)
# Llegir des d'un directori: Spark processa automàticament els fitxers nous
schema_vendes = StructType() \
.add("timestamp_event", TimestampType()) \
.add("id_usuari", StringType()) \
.add("producte", StringType()) \
.add("preu", DoubleType()) \
.add("pais", StringType())
df_stream_fitxers = spark \
.readStream \
.option("maxFilesPerTrigger", 10) \
.schema(schema_vendes) \
.json("/dades/streaming/vendes/")
Socket TCP (per a prototips i proves)
# Llegir des d'un socket TCP (NOMÉS per a proves - no usar en producció)
df_socket = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
Watermarking i finestres de temps
Watermark: gestionar dades tardanes
En streaming, les dades poden arribar fora d'ordre (un event del 14:00h pot arribar a les 14:10h). El watermark defineix quant temps esperem dades tardanes:
# Watermark de 10 minuts: acceptem events fins a 10 minuts de retard
df_amb_watermark = df_stream_fitxers \
.withWatermark("timestamp_event", "10 minutes")
Finestres temporals (tumbling i sliding)
# Finestra tumbling de 5 minuts (sense solapament)
vendes_tumbling = df_stream_fitxers \
.withWatermark("timestamp_event", "10 minutes") \
.groupBy(
window(col("timestamp_event"), "5 minutes"),
col("pais")
) \
.agg(
count("*").alias("num_transaccions"),
sum("preu").alias("import_total")
)
# Finestra sliding de 5 minuts, actualitzada cada minut (amb solapament)
vendes_sliding = df_stream_fitxers \
.withWatermark("timestamp_event", "10 minutes") \
.groupBy(
window(col("timestamp_event"), "5 minutes", "1 minute"), # finestra, slide
col("pais")
) \
.agg(
count("*").alias("num_transaccions"),
sum("preu").alias("import_total")
)
gantt
title Finestres de temps (exemple simplificat)
dateFormat mm:ss
axisFormat %M:%S
section Tumbling 5min
Finestra 00-05 :00:00, 5m
Finestra 05-10 :05:00, 5m
Finestra 10-15 :10:00, 5m
section Sliding 5min cada 1min
Finestra 00-05 :00:00, 5m
Finestra 01-06 :01:00, 5m
Finestra 02-07 :02:00, 5m
Transformacions sobre streams
Les transformacions sobre streams son idèntiques a les de batch: filter, select, withColumn, groupBy, etc.:
from pyspark.sql.functions import upper, round as spark_round
# Transformar el stream
df_transformat = df_stream_fitxers \
.filter(col("preu") > 0) \
.withColumn("preu_arrodonit", spark_round(col("preu"), 2)) \
.withColumn("pais_upper", upper(col("pais"))) \
.select("timestamp_event", "id_usuari", "producte", "preu_arrodonit", "pais_upper")
Join entre stream i dades estàtiques
# Taula de referència estàtica (s'actualitza amb cada micro-batch)
df_paisos = spark.read.parquet("/dades/referencia/paisos/")
# Join stream + estàtic
df_enriquit = df_stream_fitxers \
.join(df_paisos, on="pais", how="left") \
.select("timestamp_event", "producte", "preu", "nom_pais", "divisa")
Sinks de sortida (outputs)
Consola (per a proves i debugging)
query = df_transformat \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.option("numRows", 20) \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
Delta Lake (el sink recomanat en producció)
query = vendes_sliding \
.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/vendes_sliding/") \
.option("path", "/dades/delta/vendes_streaming/") \
.trigger(processingTime="30 seconds") \
.start()
Kafka (re-publicar resultats)
query = df_enriquit \
.select(
col("id_usuari").alias("key"),
to_json(struct("*")).alias("value")
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "vendes_enriquides") \
.option("checkpointLocation", "/checkpoints/vendes_kafka_out/") \
.start()
Output modes
| Mode | Descripció | Compatible amb |
|---|---|---|
append |
Escriu només les noves files | Transformacions sense agregació |
update |
Escriu les files que han canviat | Agregacions amb watermark |
complete |
Re-escriu la taula sencera | Agregacions sense watermark (ATENCIÓ: memoria) |
Exemple complet: pipeline de vendes en temps real
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
window, count, sum, col, from_json, to_json, struct
)
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("PipelineVendesRealTime") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Esquema dels events Kafka
schema = StructType() \
.add("ts", TimestampType()) \
.add("usuari", StringType()) \
.add("producte",StringType()) \
.add("preu", DoubleType()) \
.add("pais", StringType())
# 1. Llegir des de Kafka
df_raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "vendes-raw") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("e")) \
.select("e.*")
# 2. Filtrar i transformar
df_net = df_raw \
.filter(col("preu") > 0) \
.withWatermark("ts", "5 minutes")
# 3. Agregar per finestres de 1 minut
df_agg = df_net \
.groupBy(window(col("ts"), "1 minute"), col("pais")) \
.agg(
count("*").alias("transaccions"),
sum("preu").alias("import_eur")
)
# 4. Escriure a Delta Lake
query = df_agg \
.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "/checkpoints/vendes_agg/") \
.option("path", "/dades/delta/vendes_1min/") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
Gestió de queries i monitorització
# Llistar totes les queries actives
spark.streams.active
# Obtenir l'estat d'una query
print(query.status)
print(query.lastProgress)
# Aturar una query
query.stop()
# Esperar que acabi (o que hi hagi un error)
query.awaitTermination(timeout=300) # 5 minuts màxim
Miniactivitat: Word Count en streaming
- Obre un terminal i crea un servidor TCP senzill:
nc -lk 9999 - Configura Spark per llegir des de socket (
host=localhost, port=9999) - Implementa un word count en streaming: divideix les línies en paraules i compta-les
- Usa l'output mode
completei el sinkconsole - Escriu paraules al terminal i observa com les comptades s'actualitzen en temps real
Reflexiona: per quin motiu complete és perillós en un stream de llarga durada?