Salta el contingut

Pràctica PR5075/01: Anàlisi de Dades Massives amb PySpark i Docker

Objectius

  • Configurar un clúster Spark local amb Docker i accedir a la Spark UI
  • Carregar i explorar el dataset NYC Yellow Taxi (format Parquet, ~500 MB)
  • Realitzar transformacions amb la DataFrame API i Spark SQL
  • Aplicar Window Functions per a anàlisi avançada
  • Optimitzar el processament amb reparticionament i cache
  • Exportar els resultats en format Parquet
  • Visualitzar els resultats clau amb Plotly

Prerequisits

  • Temps estimat: 8 hores
  • RAM mínima: 8 GB (6 GB assignats a Docker)
  • Docker Desktop instal·lat i en funcionament
  • Coneixements bàsics de Python i SQL
  • Connexió a Internet per descarregar el dataset (~500 MB)

Introducció

El NYC Yellow Taxi Dataset és un dels datasets públics de referència per aprendre Big Data. El Departament de Transport de Nova York (NYC TLC) publica cada mes les dades de tots els trajectes de taxi groc de la ciutat: origen, destí, distància, tarifa, propina, mètode de pagament i molts altres camps. Un únic mes de 2024 conté al voltant de 3,5 milions de registres i ocupa uns 50 MB en format Parquet comprimit.

Treballar amb aquest dataset és representatiu de casos reals d'analítica de transport, optimització de flotes, predicció de demanda i detecció d'anomalies tarifàries. Les habilitats que practiquem en aquesta pràctica —càrrega de Parquet, Spark SQL, Window Functions, reparticionament i cache— es traslladen directament a entorns professionals.

flowchart LR
    A[NYC Taxi\nParquet] --> B[SparkSession\nDocker]
    B --> C[Exploració\ni neteja]
    C --> D[Spark SQL\nAgregacions]
    D --> E[Window\nFunctions]
    E --> F[Optimització\ncache + repart.]
    F --> G[Export\nParquet]
    G --> H[Visualitzacions\nPlotly]

Dataset NYC TLC

El dataset és públic i gratuït. La URL oficial és https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page. En aquesta pràctica usarem el mes de gener de 2024 (yellow_tripdata_2024-01.parquet).


Part 1: Configuració de l'entorn Docker

1.1 Estructura del projecte

Crea la carpeta del projecte amb el teu nom:

# Substitueix joan-garcia pel teu nom real
mkdir spark-joan-garcia
cd spark-joan-garcia
mkdir notebooks data resultats

1.2 Arrencada del contenidor Jupyter + PySpark

docker run --name spark-joan-garcia \
  -p 8888:8888 \
  -p 4040:4040 \
  -v $(pwd)/notebooks:/home/jovyan/work \
  -v $(pwd)/data:/home/jovyan/data \
  -v $(pwd)/resultats:/home/jovyan/resultats \
  --memory=6g \
  --memory-swap=8g \
  jupyter/pyspark-notebook:spark-3.5.0

Spark UI al port 4040

El port 4040 exposa la Spark UI, on pots monitoritzar jobs, stages, tasques i el pla d'execució en temps real. Mentre executes codi PySpark, obre http://localhost:4040 per veure el progrés.

Windows i els paths

A Windows PowerShell, substitueix $(pwd) per ${PWD}. A cmd, usa la ruta absoluta: -v C:/Users/joan/spark-joan-garcia/notebooks:/home/jovyan/work.

1.3 Verificació de l'arrencada

# Comprova que el contenidor funciona
docker logs spark-joan-garcia 2>&1 | grep token

# Si necessites reiniciar
docker stop spark-joan-garcia
docker start spark-joan-garcia

Obre el navegador a http://localhost:8888 i introdueix el token que apareix als logs. Crea un nou notebook Python 3 amb el nom spark_joan_garcia.ipynb.


Part 2: Sessió Spark amb configuració optimitzada

2.1 Capçalera del notebook

# =============================================================
# Pràctica PR5075/01 - Anàlisi de Dades Massives amb PySpark
# Alumne: Joan Garcia
# Institut Sa Palomera (Blanes) - Curs IABD 2026-2027
# Mòdul: 5074 Sistemes de Big Data
# =============================================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import warnings
warnings.filterwarnings('ignore')

# Dades de l'alumne — modifica aquests valors
ALUMNE = "Joan Garcia"
ALUMNE_ID = "joan_garcia"
MODUL = "5074"
PRACTICA = "PR5075/01"
DATASET = "NYC Yellow Taxi 2024-01"

print("=" * 60)
print(f"Pràctica {PRACTICA} - Anàlisi amb PySpark")
print(f"Alumne: {ALUMNE}")
print(f"Dataset: {DATASET}")
print("=" * 60)

2.2 Creació de la SparkSession

spark = SparkSession.builder \
    .appName(f"Analisi_Taxis_NYC_{ALUMNE.replace(' ', '_')}") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Silencia logs de nivell INFO per a una sortida més neta
spark.sparkContext.setLogLevel("WARN")

print(f"Spark {spark.version} iniciat correctament per {ALUMNE}")
print(f"Spark UI disponible a: http://localhost:4040")
print(f"Adaptive Query Execution (AQE): activat")

Adaptive Query Execution (AQE)

L'AQE, introduïda a Spark 3.0, optimitza el pla d'execució en temps d'execució basant-se en estadístiques reals de les dades. La configuració coalescePartitions redueix automàticament el nombre de particions després d'un shuffle si les dades són menors del previst, evitant el problema de les "petites particions".


Part 3: Càrrega del dataset NYC Yellow Taxi

3.1 Descàrrega del dataset

import urllib.request
import os

URL_DATASET = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
RUTA_LOCAL = "/home/jovyan/data/taxis_2024_01.parquet"

if not os.path.exists(RUTA_LOCAL):
    print(f"Descarregant dataset ({DATASET})...")
    print("Mida aproximada: 50 MB. Pot trigar uns minuts.")

    def progress_bar(block_num, block_size, total_size):
        downloaded = block_num * block_size
        percent = min(100, downloaded * 100 / total_size)
        if block_num % 100 == 0:
            print(f"  {percent:.1f}% ({downloaded / 1024 / 1024:.1f} MB)", end='\r')

    urllib.request.urlretrieve(URL_DATASET, RUTA_LOCAL, reporthook=progress_bar)
    print(f"\nDataset descarregat: {RUTA_LOCAL}")
    mida_mb = os.path.getsize(RUTA_LOCAL) / 1024 / 1024
    print(f"Mida del fitxer: {mida_mb:.1f} MB")
else:
    mida_mb = os.path.getsize(RUTA_LOCAL) / 1024 / 1024
    print(f"Dataset ja existent: {RUTA_LOCAL} ({mida_mb:.1f} MB)")

3.2 Càrrega amb Spark

# Spark llegeix Parquet de forma columnar, molt eficient
df = spark.read.parquet(RUTA_LOCAL)

# Estadístiques bàsiques
n_registres = df.count()
n_columnes = len(df.columns)

print(f"\n--- INFORMACIÓ DEL DATASET ---")
print(f"Registres totals: {n_registres:,}")
print(f"Nombre de columnes: {n_columnes}")
print(f"Particions inicials: {df.rdd.getNumPartitions()}")

print(f"\nEsquema del dataset:")
df.printSchema()

print(f"\nPrimeres 5 files:")
df.show(5, truncate=False)

3.3 Estadístiques descriptives

# Estadístiques descriptives de les columnes numèriques clau
cols_numeriques = ["trip_distance", "fare_amount", "tip_amount",
                   "total_amount", "passenger_count"]

print("Estadístiques descriptives:")
df.describe(cols_numeriques).show()

# Distribució de la columna de pagament
print("\nDistribució per tipus de pagament:")
df.groupBy("payment_type") \
  .count() \
  .withColumn("percentatge", F.round(F.col("count") * 100 / n_registres, 2)) \
  .orderBy(F.desc("count")) \
  .show()

Part 4: Neteja i control de qualitat de les dades

4.1 Detecció de valors nuls

from pyspark.sql.functions import col, sum as spark_sum, isnan, when, count

print("Valors nuls per columna:")
nuls = df.select([
    spark_sum(when(col(c).isNull() | isnan(c), 1).otherwise(0)).alias(c)
    for c in df.columns
])
nuls.show()

# Percentatge de nuls per a les columnes critiques
print("\nPercentatge de nuls en columnes clau:")
for c in cols_numeriques:
    n_nuls = df.filter(col(c).isNull()).count()
    pct = n_nuls * 100 / n_registres
    print(f"  {c}: {n_nuls:,} nuls ({pct:.2f}%)")

4.2 Filtres de qualitat

# Aplicació de filtres de qualitat raonables per al dataset de taxis
df_net = df.filter(
    (col("trip_distance") > 0) & (col("trip_distance") < 100) &
    (col("fare_amount") > 0) & (col("fare_amount") < 500) &
    (col("passenger_count") > 0) & (col("passenger_count") <= 6) &
    (col("tip_amount") >= 0) &
    (col("tpep_pickup_datetime").isNotNull()) &
    (col("tpep_dropoff_datetime").isNotNull())
)

n_nets = df_net.count()
pct_retinguts = n_nets * 100 / n_registres

print(f"Registres originals: {n_registres:,}")
print(f"Registres nets:      {n_nets:,} ({pct_retinguts:.1f}% retinguts)")
print(f"Registres eliminats: {n_registres - n_nets:,} ({100 - pct_retinguts:.1f}%)")

4.3 Enriquiment: columnes derivades

# Càlcul de la durada del viatge i de la velocitat mitjana
df_enriquit = df_net.withColumn(
    "durada_minuts",
    F.round(
        (F.unix_timestamp("tpep_dropoff_datetime") -
         F.unix_timestamp("tpep_pickup_datetime")) / 60,
        2
    )
).withColumn(
    "hora_recollida",
    F.hour("tpep_pickup_datetime")
).withColumn(
    "dia_setmana",
    F.dayofweek("tpep_pickup_datetime")
).withColumn(
    "velocitat_mph",
    F.round(col("trip_distance") / (col("durada_minuts") / 60), 2)
).withColumn(
    "categoria_viatge",
    F.when(col("trip_distance") < 1, "molt_curt")
     .when(col("trip_distance") < 3, "curt")
     .when(col("trip_distance") < 8, "mitja")
     .when(col("trip_distance") < 20, "llarg")
     .otherwise("molt_llarg")
).filter(
    (col("durada_minuts") > 0) &
    (col("durada_minuts") < 180) &
    (col("velocitat_mph") < 80)
)

print(f"Registres finals amb enriquiment: {df_enriquit.count():,}")
df_enriquit.select(
    "trip_distance", "durada_minuts", "hora_recollida",
    "velocitat_mph", "categoria_viatge"
).show(5)

Miniactivitat

Observa les estadístiques dels filtres. Respon al notebook:

  1. Quin percentatge de registres s'han eliminat i per quins motius?
  2. Hi ha algun filtre que trobes massa restrictiu o massa permissiu? Justifica-ho.
  3. Quina diferència hi ha entre isNull() i isnan() a Spark?

Part 5: Anàlisi amb Spark SQL

5.1 Registre de la vista temporal

# Registrem el DataFrame com a vista SQL temporal
df_enriquit.createOrReplaceTempView("taxis")

print("Vista 'taxis' registrada correctament.")
print(f"La vista conté {spark.sql('SELECT COUNT(*) FROM taxis').collect()[0][0]:,} registres.")

5.2 Anàlisi per hora del dia

# Distribució de viatges i tarifes per hora del dia
consulta_hores = spark.sql("""
    SELECT
        hora_recollida AS hora,
        COUNT(*) AS viatges,
        ROUND(AVG(trip_distance), 2) AS distancia_mitjana_milles,
        ROUND(AVG(durada_minuts), 1) AS durada_mitjana_min,
        ROUND(AVG(fare_amount), 2) AS tarifa_mitjana,
        ROUND(AVG(tip_amount / fare_amount) * 100, 1) AS propina_pct_mitja,
        ROUND(SUM(total_amount), 0) AS ingressos_totals
    FROM taxis
    GROUP BY hora_recollida
    ORDER BY hora_recollida
""")

print("Anàlisi per hora del dia:")
consulta_hores.show(24, truncate=False)

5.3 Top zones de recollida

consulta_zones = spark.sql("""
    SELECT
        PULocationID AS zona_recollida,
        COUNT(*) AS total_viatges,
        ROUND(AVG(fare_amount), 2) AS tarifa_mitjana,
        ROUND(AVG(tip_amount), 2) AS propina_mitjana,
        ROUND(AVG(trip_distance), 2) AS distancia_mitjana,
        ROUND(SUM(total_amount), 0) AS ingressos_totals
    FROM taxis
    GROUP BY PULocationID
    HAVING COUNT(*) > 1000
    ORDER BY total_viatges DESC
    LIMIT 20
""")

print("Top 20 zones de recollida (per nombre de viatges):")
consulta_zones.show(20)

5.4 Anàlisi per dia de la setmana

consulta_dies = spark.sql("""
    SELECT
        CASE dia_setmana
            WHEN 1 THEN '1-Diumenge'
            WHEN 2 THEN '2-Dilluns'
            WHEN 3 THEN '3-Dimarts'
            WHEN 4 THEN '4-Dimecres'
            WHEN 5 THEN '5-Dijous'
            WHEN 6 THEN '6-Divendres'
            WHEN 7 THEN '7-Dissabte'
        END AS dia,
        COUNT(*) AS viatges,
        ROUND(AVG(total_amount), 2) AS import_mitja,
        ROUND(SUM(total_amount), 0) AS ingressos_totals
    FROM taxis
    GROUP BY dia_setmana
    ORDER BY dia_setmana
""")

print("Anàlisi per dia de la setmana:")
consulta_dies.show()

Part 6: Transformacions avançades amb Window Functions

6.1 Ranking de tarifes per zona

# Window Function: rang de tarifes dins de cada zona de recollida
windowSpec_zona = Window.partitionBy("PULocationID").orderBy(F.desc("fare_amount"))

df_ranked = df_enriquit.withColumn(
    "rang_tarifa_zona",
    F.rank().over(windowSpec_zona)
)

# Mostrem les 5 tarifes més altes de les 10 zones amb més viatges
top_zones = [row[0] for row in
             df_enriquit.groupBy("PULocationID").count()
             .orderBy(F.desc("count")).limit(10).collect()]

print("Top 5 tarifes per zona (de les 10 zones més actives):")
df_ranked.filter(
    (col("PULocationID").isin(top_zones)) & (col("rang_tarifa_zona") <= 5)
).select(
    "PULocationID", "rang_tarifa_zona", "fare_amount",
    "trip_distance", "tip_amount"
).orderBy("PULocationID", "rang_tarifa_zona").show(50)

6.2 Mediana mòbil de tarifes per hora

# Window Function: media mòbil de les tarifes per hora (finestra de ±1 hora)
windowSpec_hora = Window.orderBy("hora_recollida") \
    .rowsBetween(-1, 1)

stats_hora = df_enriquit.groupBy("hora_recollida").agg(
    F.count("*").alias("viatges"),
    F.avg("fare_amount").alias("tarifa_mitjana")
).withColumn(
    "tarifa_media_mobil_3h",
    F.round(F.avg("tarifa_mitjana").over(windowSpec_hora), 2)
).orderBy("hora_recollida")

print("Tarifes per hora amb mitjana mòbil de 3h:")
stats_hora.show(24)

6.3 Percentils de distància per categoria de viatge

# Percentils 25, 50, 75 i 95 de la distància per categoria
from pyspark.sql.functions import percentile_approx

percentils = df_enriquit.groupBy("categoria_viatge").agg(
    F.count("*").alias("total"),
    F.round(percentile_approx("trip_distance", 0.25), 2).alias("p25"),
    F.round(percentile_approx("trip_distance", 0.50), 2).alias("p50_mediana"),
    F.round(percentile_approx("trip_distance", 0.75), 2).alias("p75"),
    F.round(percentile_approx("trip_distance", 0.95), 2).alias("p95"),
    F.round(F.avg("fare_amount"), 2).alias("tarifa_avg")
).orderBy("p50_mediana")

print("Percentils de distància per categoria de viatge:")
percentils.show()

Miniactivitat

Les Window Functions permeten calcular agregats sense col·lapsar les files. Respon:

  1. Quina és la diferència entre rank() i dense_rank()? Posa un exemple.
  2. En el cas de la mediana mòbil, per qué usem rowsBetween(-1, 1) i no rangeBetween?
  3. Quin avantatge té percentile_approx sobre el càlcul exacte de percentils?

Part 7: Optimització del processament

7.1 Reparticionament estratègic

# Abans d'operacions intensives per zona, reparticionem per zona de recollida
print(f"Particions abans del reparticionament: {df_enriquit.rdd.getNumPartitions()}")

df_opt = df_enriquit.repartition(16, "PULocationID")
print(f"Particions despres del reparticionament: {df_opt.rdd.getNumPartitions()}")

# El reparticionament per clau (PULocationID) garanteix que totes les files
# de la mateixa zona van a la mateixa particio, optimitzant els joins i
# les window functions per zona.

7.2 Cache per a consultes repetides

# Materialitzem el DataFrame en memòria per a consultes repetides
print("Materialitzant el DataFrame a la cache...")
df_opt.cache()

# La primera acció força la materialització
import time
t_inici = time.time()
n_cached = df_opt.count()
t_fi = time.time()

print(f"DataFrame materialitzat: {n_cached:,} registres en {t_fi - t_inici:.1f}s")
print("Pots veure l'ús de cache a: http://localhost:4040/storage/")

7.3 Anàlisi del pla d'execució

# Consulta representativa per analitzar el pla
consulta_top_zones = df_opt.groupBy("PULocationID").agg(
    F.count("*").alias("viatges"),
    F.round(F.sum("total_amount"), 0).alias("ingressos")
).orderBy(F.desc("ingressos"))

print("=== PLA D'EXECUCIÓ FÍSIC (mode formatted) ===")
consulta_top_zones.explain(mode="formatted")

Llegir el Explain Plan

Busca els operadors clau: FileScan (lectura del Parquet), HashAggregate (primera agregació parcial a cada executor), Exchange (shuffle de xarxa), HashAggregate (agregació final). Un bon pla minimitza l'Exchange. Si veus Sort innecessaris, pot indicar un problema de configuració.

7.4 Benchmark: amb cache vs sense cache

# Comparativa de rendiment: primera consulta (sense cache) vs segona (amb cache)
consulta_benchmark = df_opt.filter(col("categoria_viatge") == "llarg") \
    .groupBy("hora_recollida") \
    .agg(F.count("*").alias("viatges"), F.avg("fare_amount").alias("tarifa_avg"))

print("Execució 1 (cache activa - pot haver-hi dades ja en memòria):")
t0 = time.time()
r1 = consulta_benchmark.count()
t1 = time.time()
print(f"  {r1} grups calculats en {t1 - t0:.2f}s")

print("\nExecució 2 (des de cache):")
t2 = time.time()
r2 = consulta_benchmark.count()
t3 = time.time()
print(f"  {r2} grups calculats en {t3 - t2:.2f}s")

if (t1 - t0) > 0:
    speedup = (t1 - t0) / max(t3 - t2, 0.001)
    print(f"\nSpeedup de la cache: {speedup:.1f}x")

Part 8: Anàlisi final i exportació de resultats

8.1 Resum executiu per categoria de viatge

# Taula resum: la que lliurarem com a resultat final
resultat_categories = df_opt.groupBy("categoria_viatge").agg(
    F.count("*").alias("total_viatges"),
    F.round(F.avg("trip_distance"), 2).alias("distancia_mitjana_milles"),
    F.round(F.avg("durada_minuts"), 1).alias("durada_mitjana_min"),
    F.round(F.avg("fare_amount"), 2).alias("tarifa_mitjana"),
    F.round(F.avg("tip_amount"), 2).alias("propina_mitjana"),
    F.round(F.avg("tip_amount") / F.avg("fare_amount") * 100, 1).alias("propina_pct"),
    F.round(F.sum("total_amount"), 0).alias("ingressos_totals")
).orderBy("distancia_mitjana_milles")

print("Resum per categoria de viatge:")
resultat_categories.show()

8.2 Exportació a Parquet

# Ruta de sortida personalitzada amb el nom de l'alumne
ruta_sortida = f"/home/jovyan/resultats/analisi_{ALUMNE_ID}"

# Exporta el resultat en Parquet (format estàndard de Big Data)
resultat_categories.write.mode("overwrite").parquet(ruta_sortida)

print(f"Resultats exportats a: {ruta_sortida}")
print(f"Fitxers generats:")

import os
for f in os.listdir(ruta_sortida):
    mida = os.path.getsize(os.path.join(ruta_sortida, f)) / 1024
    print(f"  {f} ({mida:.1f} KB)")

# Exporta també les estadístiques per hora com a CSV per a Plotly
stats_hora_local = stats_hora.toPandas()
stats_hora_local.to_csv(
    f"/home/jovyan/resultats/hores_{ALUMNE_ID}.csv",
    index=False
)
print(f"\nEstadístiques per hora exportades com a CSV.")

8.3 Verificació de la lectura

# Verifica que el Parquet és llegible
df_verificacio = spark.read.parquet(ruta_sortida)
print("Verificació del Parquet exportat:")
df_verificacio.show()
print(f"Esquema: {df_verificacio.dtypes}")

Part 9: Visualitzacions amb Plotly

9.1 Instal·lació de Plotly

# Instal·la Plotly si no és disponible al contenidor
import subprocess
subprocess.run(["pip", "install", "plotly", "kaleido", "-q"])
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd

# Convertir resultats a pandas per a Plotly
hores_pd = stats_hora.toPandas()
categories_pd = resultat_categories.toPandas()
zones_pd = consulta_zones.toPandas()

9.2 Viatges per hora del dia

fig_hores = go.Figure()

fig_hores.add_trace(go.Bar(
    x=hores_pd["hora_recollida"],
    y=hores_pd["viatges"],
    name="Viatges",
    marker_color="#3498db",
    opacity=0.8
))

fig_hores.add_trace(go.Scatter(
    x=hores_pd["hora_recollida"],
    y=hores_pd["tarifa_media_mobil_3h"],
    name="Tarifa mitjana (€)",
    yaxis="y2",
    line=dict(color="#e74c3c", width=2)
))

fig_hores.update_layout(
    title=f"Distribució de Viatges i Tarifes per Hora - {ALUMNE}",
    xaxis_title="Hora del dia",
    yaxis_title="Nombre de viatges",
    yaxis2=dict(title="Tarifa mitjana ($)", overlaying="y", side="right"),
    legend=dict(x=0.01, y=0.99),
    template="plotly_white",
    hovermode="x unified"
)

fig_hores.write_html(f"/home/jovyan/resultats/hores_{ALUMNE_ID}.html")
fig_hores.show()
print("Gràfic guardat.")

9.3 Ingressos per categoria de viatge

fig_cat = px.bar(
    categories_pd.sort_values("ingressos_totals", ascending=True),
    x="ingressos_totals",
    y="categoria_viatge",
    color="tarifa_mitjana",
    color_continuous_scale="Blues",
    orientation="h",
    title=f"Ingressos Totals per Categoria de Viatge - {ALUMNE}",
    labels={
        "ingressos_totals": "Ingressos totals ($)",
        "categoria_viatge": "Categoria",
        "tarifa_mitjana": "Tarifa mitjana ($)"
    },
    text="total_viatges"
)

fig_cat.update_traces(texttemplate="%{text:,}", textposition="outside")
fig_cat.update_layout(template="plotly_white")
fig_cat.write_html(f"/home/jovyan/resultats/categories_{ALUMNE_ID}.html")
fig_cat.show()

9.4 Top zones per ingressos

fig_zones = px.bar(
    zones_pd.head(15).sort_values("ingressos_totals", ascending=True),
    x="ingressos_totals",
    y="zona_recollida",
    orientation="h",
    color="tarifa_mitjana",
    color_continuous_scale="Viridis",
    title=f"Top 15 Zones per Ingressos Totals - {ALUMNE}",
    labels={
        "ingressos_totals": "Ingressos totals ($)",
        "zona_recollida": "Zona (LocationID)",
        "tarifa_mitjana": "Tarifa mitjana ($)"
    }
)

fig_zones.update_layout(template="plotly_white", yaxis=dict(type="category"))
fig_zones.write_html(f"/home/jovyan/resultats/zones_{ALUMNE_ID}.html")
fig_zones.show()

Part 10: Tancament de la sessió

# Allibera la cache
df_opt.unpersist()
print("Cache alliberada.")

# Tanca la sessió Spark de forma ordenada
spark.stop()
print(f"\nSessió Spark tancada.")
print(f"\nFitxers generats per {ALUMNE}:")
print(f"  - {ruta_sortida}/ (Parquet)")
print(f"  - /home/jovyan/resultats/hores_{ALUMNE_ID}.csv")
print(f"  - /home/jovyan/resultats/hores_{ALUMNE_ID}.html")
print(f"  - /home/jovyan/resultats/categories_{ALUMNE_ID}.html")
print(f"  - /home/jovyan/resultats/zones_{ALUMNE_ID}.html")
print(f"\nPràctica {PRACTICA} completada per {ALUMNE}!")

Preguntes de reflexió

Preguntes de reflexió - PR5075/01

Respon per escrit com a cel·les Markdown al notebook:

PySpark i Spark SQL:

  1. Quina diferència hi ha entre una transformació i una acció a Spark? Per qué Spark utilitza lazy evaluation? Posa un exemple de cada una.

  2. Quan convé usar la DataFrame API i quan convé usar Spark SQL? Hi ha diferències de rendiment?

  3. Explica amb les teves paraules com funciona una Window Function. Quins casos d'ús no es podrien resoldre fàcilment sense elles?

Optimització:

  1. El reparticionament per PULocationID millora el rendiment de les Window Functions per zona. Explica per qué.

  2. Quins riscos té usar cache() de forma indiscriminada? En quins casos no compensa?

  3. Al Explain Plan, l'operador Exchange representa un shuffle. Per qué els shuffles són cars i com es poden minimitzar?

Big Data i negoci:

  1. Si fossis analista de dades de la NYC TLC, quines tres conclusions de negoci extrauries de l'anàlisi realitzada? Com les presentaries als directius?

  2. El dataset conté coordenades de zones (LocationID). Com amplaries l'anàlisi per incloure dades geogràfiques (mapes de calor de recollides)?


Lliurament

Puja els fitxers següents al Campus Virtual abans de la data límit:

Fitxer Descripció
spark_joan_garcia.ipynb Notebook Jupyter complet, executat de dalt a baix sense errors
analisi_joan_garcia/ Carpeta amb els Parquet dels resultats finals
hores_joan_garcia.html Visualització interactiva per hora
categories_joan_garcia.html Visualització per categoria de viatge
zones_joan_garcia.html Visualització de zones top

Requisits mínims per ser avaluat

  • El notebook ha d'executar-se de dalt a baix sense errors (Kernel > Restart & Run All).
  • La variable ALUMNE ha de tenir el teu nom real i ha d'aparèixer als noms de fitxers.
  • Han d'existir almenys 3 visualitzacions Plotly exportades.
  • Les 8 preguntes de reflexió han d'estar contestades.
  • Els resultats finals han d'estar exportats en format Parquet.

Consell per al lliurament

Comprova que els fitxers HTML s'obren correctament al navegador sense necessitar el servidor Jupyter. Les visualitzacions Plotly en HTML són autocontingudes i no requereixen connexió a Internet.

Consulta la rúbrica PR5075/01 per als criteris detallats d'avaluació.


Pràctica PR5075/01 | Mòdul 5074 Sistemes de Big Data | Institut Sa Palomera (Blanes) | Curs IABD 2026-2027