Salta el contingut

RDD: Resilient Distributed Dataset

Introducció

L'RDD (Resilient Distributed Dataset) és l'estructura de dades primitiva i fonamental de Spark. Un RDD és una col·lecció distribuïda i tolerant a fallades de registres que pot ser processada en paral·lel sobre els nodes del clúster.

El nom resumeix les seves tres propietats clau: - Resilient: tolerant a fallades gràcies a la replicació del llinatge (lineage) - Distributed: les dades es distribueixen automàticament entre els nodes del clúster - Dataset: col·lecció de registres (de qualsevol tipus: enters, cadenes, tuples, objectes)


Conceptes clau: transformacions i accions

Spark segueix el model d'execució lazy evaluation: les transformacions sobre un RDD no s'executen immediatament, sinó que es registren com a pla d'execució (DAG). L'execució real es dispara quan es crida una acció.

graph LR
    A[RDD original] -->|filter| B[RDD filtrat]
    B -->|map| C[RDD transformat]
    C -->|collect| D[Resultat al Driver]
    style D fill:#f96,stroke:#333
Tipus Operació Descripció
Transformació filter, map, flatMap, reduceByKey Genera un nou RDD, execució lazy
Acció collect, count, reduce, saveAsTextFile Dispara l'execució i retorna resultats

Exemple bàsic d'RDD

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExempleRDD") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

# Crear un RDD des d'una llista local
nombres = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], numSlices=3)

# Transformacions (lazy - no s'executen fins a una acció)
parells = nombres.filter(lambda x: x % 2 == 0)    # [2, 4, 6, 8, 10]
quadrats = parells.map(lambda x: x ** 2)            # [4, 16, 36, 64, 100]

# Acció (dispara l'execució)
resultat = quadrats.collect()   # porta les dades al Driver
print(resultat)  # [4, 16, 36, 64, 100]

# Agregació
total = nombres.reduce(lambda a, b: a + b)   # 55
print(f"Suma: {total}")

RDD de parells (key-value)

Els RDD de parells (clau, valor) permeten operacions d'agrupació i reducció distribuïdes:

# RDD de parells (key, value) per a operacions de group
vendes_rdd = sc.parallelize([
    ("Barcelona", 1500), ("Madrid", 2200), ("Barcelona", 800),
    ("Valencia", 950), ("Madrid", 1100), ("Valencia", 750)
])

# reduceByKey agrupa per clau i aplica la funció de reducció
vendes_per_ciutat = vendes_rdd \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)

print(vendes_per_ciutat.collect())
# [('Madrid', 3300), ('Barcelona', 2300), ('Valencia', 1700)]

Transformacions freqüents sobre RDD de parells:

Operació Descripció Exemple
groupByKey() Agrupa valors per clau rdd.groupByKey()
reduceByKey(f) Redueix valors per clau (més eficient que groupByKey) rdd.reduceByKey(lambda a,b: a+b)
sortByKey() Ordena per clau rdd.sortByKey(ascending=False)
join(other) Join entre dos RDD de parells rdd1.join(rdd2)
mapValues(f) Aplica funció només als valors rdd.mapValues(str.upper)
countByKey() Compte elements per clau rdd.countByKey()

Llegir fitxers amb RDD

# Llegir un fitxer de text (cada línia és un element de l'RDD)
linies = sc.textFile("/dades/logs/app.log")

# Word count clàssic amb RDD
paraules = linies \
    .flatMap(lambda linia: linia.split(" ")) \
    .map(lambda paraula: (paraula, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)

print(paraules.take(10))  # les 10 paraules més freqüents

# Guardar resultats
paraules.saveAsTextFile("/resultats/word_count/")

Persistència i cache

Quan un RDD es reutilitza en múltiples operacions, és molt recomanable persistir-lo per evitar recalcular-lo:

from pyspark import StorageLevel

# Cache en memòria (comportament per defecte de .cache())
rdd_net = linies \
    .filter(lambda l: "ERROR" in l) \
    .map(lambda l: l.split("\t"))

rdd_net.cache()  # equivalent a persist(StorageLevel.MEMORY_ONLY)

# Opció recomanada en producció: memòria + disc per si no hi cap a RAM
rdd_net.persist(StorageLevel.MEMORY_AND_DISK)

# Comptar (primera acció: calcula i guarda a cache)
n_errors = rdd_net.count()

# Processar (segona acció: usa la cache, no recalcula)
errors_crítics = rdd_net.filter(lambda r: r[2] == "CRITICAL").collect()

# Alliberar la cache quan ja no cal
rdd_net.unpersist()

Limitacions dels RDD i per què usar DataFrames

Tot i que els RDD són la base de Spark, l'API modern prefereix els DataFrames per a la majoria de casos:

Aspecte RDD DataFrame
Optimització Cap — Spark no coneix l'estructura de les dades Catalyst optimizer + Tungsten engine
Rendiment Python ↔ JVM serialization overhead Execució nativa JVM, operacions vectoritzades
Expressivitat Lambdes genèriques API declarativa + SQL
Depuració Difícil (codi Python opac) explain(), Spark UI, plans d'execució
Quan usar-los Dades no estructurades, lògica complexa no expressable amb DataFrames Gairebé tots els casos d'ús

RDD en producció

Avui dia, usar RDD en producció és una senyal que alguna cosa pot estar malament en el disseny. Gairebé tot el que es pot fer amb RDD es pot fer millor amb DataFrames. Reserva els RDD per a casos molt específics on necessites control directe sobre la distribució o per treballar amb dades completament no estructurades.


Miniactivitat: Llinatge d'un RDD

Crea una SparkSession local i construeix un RDD de 5 transformacions encadenades (parallelize → filter → map → flatMap → reduceByKey). Abans de executar cap acció, crida rdd.toDebugString() per veure el llinatge complet.

Observa com Spark registra cada transformació sense executar-la. Afegeix .collect() al final i observa com el Spark UI mostra el DAG complet de l'execució.