RDD: Resilient Distributed Dataset
Introducció
L'RDD (Resilient Distributed Dataset) és l'estructura de dades primitiva i fonamental de Spark. Un RDD és una col·lecció distribuïda i tolerant a fallades de registres que pot ser processada en paral·lel sobre els nodes del clúster.
El nom resumeix les seves tres propietats clau: - Resilient: tolerant a fallades gràcies a la replicació del llinatge (lineage) - Distributed: les dades es distribueixen automàticament entre els nodes del clúster - Dataset: col·lecció de registres (de qualsevol tipus: enters, cadenes, tuples, objectes)
Conceptes clau: transformacions i accions
Spark segueix el model d'execució lazy evaluation: les transformacions sobre un RDD no s'executen immediatament, sinó que es registren com a pla d'execució (DAG). L'execució real es dispara quan es crida una acció.
graph LR
A[RDD original] -->|filter| B[RDD filtrat]
B -->|map| C[RDD transformat]
C -->|collect| D[Resultat al Driver]
style D fill:#f96,stroke:#333
| Tipus | Operació | Descripció |
|---|---|---|
| Transformació | filter, map, flatMap, reduceByKey |
Genera un nou RDD, execució lazy |
| Acció | collect, count, reduce, saveAsTextFile |
Dispara l'execució i retorna resultats |
Exemple bàsic d'RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ExempleRDD") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
# Crear un RDD des d'una llista local
nombres = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], numSlices=3)
# Transformacions (lazy - no s'executen fins a una acció)
parells = nombres.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10]
quadrats = parells.map(lambda x: x ** 2) # [4, 16, 36, 64, 100]
# Acció (dispara l'execució)
resultat = quadrats.collect() # porta les dades al Driver
print(resultat) # [4, 16, 36, 64, 100]
# Agregació
total = nombres.reduce(lambda a, b: a + b) # 55
print(f"Suma: {total}")
RDD de parells (key-value)
Els RDD de parells (clau, valor) permeten operacions d'agrupació i reducció distribuïdes:
# RDD de parells (key, value) per a operacions de group
vendes_rdd = sc.parallelize([
("Barcelona", 1500), ("Madrid", 2200), ("Barcelona", 800),
("Valencia", 950), ("Madrid", 1100), ("Valencia", 750)
])
# reduceByKey agrupa per clau i aplica la funció de reducció
vendes_per_ciutat = vendes_rdd \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], ascending=False)
print(vendes_per_ciutat.collect())
# [('Madrid', 3300), ('Barcelona', 2300), ('Valencia', 1700)]
Transformacions freqüents sobre RDD de parells:
| Operació | Descripció | Exemple |
|---|---|---|
groupByKey() |
Agrupa valors per clau | rdd.groupByKey() |
reduceByKey(f) |
Redueix valors per clau (més eficient que groupByKey) | rdd.reduceByKey(lambda a,b: a+b) |
sortByKey() |
Ordena per clau | rdd.sortByKey(ascending=False) |
join(other) |
Join entre dos RDD de parells | rdd1.join(rdd2) |
mapValues(f) |
Aplica funció només als valors | rdd.mapValues(str.upper) |
countByKey() |
Compte elements per clau | rdd.countByKey() |
Llegir fitxers amb RDD
# Llegir un fitxer de text (cada línia és un element de l'RDD)
linies = sc.textFile("/dades/logs/app.log")
# Word count clàssic amb RDD
paraules = linies \
.flatMap(lambda linia: linia.split(" ")) \
.map(lambda paraula: (paraula, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], ascending=False)
print(paraules.take(10)) # les 10 paraules més freqüents
# Guardar resultats
paraules.saveAsTextFile("/resultats/word_count/")
Persistència i cache
Quan un RDD es reutilitza en múltiples operacions, és molt recomanable persistir-lo per evitar recalcular-lo:
from pyspark import StorageLevel
# Cache en memòria (comportament per defecte de .cache())
rdd_net = linies \
.filter(lambda l: "ERROR" in l) \
.map(lambda l: l.split("\t"))
rdd_net.cache() # equivalent a persist(StorageLevel.MEMORY_ONLY)
# Opció recomanada en producció: memòria + disc per si no hi cap a RAM
rdd_net.persist(StorageLevel.MEMORY_AND_DISK)
# Comptar (primera acció: calcula i guarda a cache)
n_errors = rdd_net.count()
# Processar (segona acció: usa la cache, no recalcula)
errors_crítics = rdd_net.filter(lambda r: r[2] == "CRITICAL").collect()
# Alliberar la cache quan ja no cal
rdd_net.unpersist()
Limitacions dels RDD i per què usar DataFrames
Tot i que els RDD són la base de Spark, l'API modern prefereix els DataFrames per a la majoria de casos:
| Aspecte | RDD | DataFrame |
|---|---|---|
| Optimització | Cap — Spark no coneix l'estructura de les dades | Catalyst optimizer + Tungsten engine |
| Rendiment | Python ↔ JVM serialization overhead | Execució nativa JVM, operacions vectoritzades |
| Expressivitat | Lambdes genèriques | API declarativa + SQL |
| Depuració | Difícil (codi Python opac) | explain(), Spark UI, plans d'execució |
| Quan usar-los | Dades no estructurades, lògica complexa no expressable amb DataFrames | Gairebé tots els casos d'ús |
RDD en producció
Avui dia, usar RDD en producció és una senyal que alguna cosa pot estar malament en el disseny. Gairebé tot el que es pot fer amb RDD es pot fer millor amb DataFrames. Reserva els RDD per a casos molt específics on necessites control directe sobre la distribució o per treballar amb dades completament no estructurades.
Miniactivitat: Llinatge d'un RDD
Crea una SparkSession local i construeix un RDD de 5 transformacions encadenades (parallelize → filter → map → flatMap → reduceByKey). Abans de executar cap acció, crida rdd.toDebugString() per veure el llinatge complet.
Observa com Spark registra cada transformació sense executar-la. Afegeix .collect() al final i observa com el Spark UI mostra el DAG complet de l'execució.