Salta el contingut

Spark JDBC: connexió a bases de dades relacionals

Introducció

Spark pot llegir i escriure dades directament des de qualsevol base de dades relacional que disposi d'un connector JDBC: PostgreSQL, MySQL, MariaDB, Oracle, SQL Server, SQLite, etc. Això permet integrar pipelines Spark amb fonts de dades corporatives existents sense necessitat d'exportar fitxers intermedis.

El connector JDBC de Spark és especialment útil per a: - ETL des de bases de dades transaccionals cap a un Data Lake - Escriure resultats analítics de tornada a una base de dades operacional - Sincronització incremental de taules entre sistemes


Prerequisits: drivers JDBC

El driver JDBC del motor de base de dades ha d'estar disponible al classpath de Spark. En Docker, la forma habitual és afegir-lo com a paquet Maven:

# Iniciar PySpark amb el driver de PostgreSQL
pyspark --packages org.postgresql:postgresql:42.7.3

# Per a MySQL
pyspark --packages com.mysql.cj:mysql-connector-j:8.3.0

# Per a SQL Server
pyspark --packages com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre11

En una SparkSession programàtica:

spark = SparkSession.builder \
    .appName("SparkJDBC") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

Llegir des d'una base de dades

Lectura simple d'una taula

# Paràmetres de connexió PostgreSQL
jdbc_url = "jdbc:postgresql://localhost:5432/vendes_db"
connexio = {
    "user": "spark_user",
    "password": "secret123",
    "driver": "org.postgresql.Driver"
}

# Llegir una taula sencera
df_clients = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.clients") \
    .options(**connexio) \
    .load()

df_clients.printSchema()
df_clients.show(5)

Llegir amb una query personalitzada

# Llegir el resultat d'una consulta SQL (no una taula sencera)
query = """
    (SELECT
        c.id_client,
        c.nom,
        c.email,
        COUNT(v.id_comanda) AS num_comandes,
        SUM(v.import_total) AS total_gastat
     FROM clients c
     LEFT JOIN vendes v ON c.id_client = v.id_client
     WHERE v.data_comanda >= '2024-01-01'
     GROUP BY c.id_client, c.nom, c.email
    ) AS subquery
"""

df_resum = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", query) \
    .options(**connexio) \
    .load()

Parentèsis i àlies obligatori

Quan uses una subquery com a dbtable, ha d'anar entre parèntesis i tenir un àlies. Alguns motors (PostgreSQL, MySQL) ho requereixen explícitament.


Particionament per a lectures paral·leles

Per defecte, Spark llegeix una taula JDBC amb una sola connexió i un sol executor (un únic partition). Amb taules grans, això és un coll d'ampolla important.

La solució és especificar una columna numèrica per a partir la lectura en múltiples particions paral·leles:

# Llegir la taula de vendes en 10 particions paral·leles
# basant-se en el rang de la columna 'id_comanda'
df_vendes = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.vendes") \
    .option("partitionColumn", "id_comanda") \
    .option("lowerBound", "1") \
    .option("upperBound", "5000000") \
    .option("numPartitions", "10") \
    .options(**connexio) \
    .load()

# Verificar que s'han creat 10 particions
print(f"Nombre de particions: {df_vendes.rdd.getNumPartitions()}")
graph LR
    DB[(PostgreSQL\nvendes)] -->|id 1-500k| P1[Partició 1\nExecutor 1]
    DB -->|id 500k-1M| P2[Partició 2\nExecutor 2]
    DB -->|id 1M-1.5M| P3[Partició 3\nExecutor 3]
    DB -->|...| PN[Partició N\nExecutor N]
    P1 --> DF[DataFrame Spark]
    P2 --> DF
    P3 --> DF
    PN --> DF

Tria una bona columna de particionament

La columna de particionament ha de ser numèrica, tenir una distribució uniforme, i tenir un índex a la base de dades. Un ID seqüencial o una data convertida a timestamp numèric solen funcionar bé. Evita columnes amb molts valors nuls.


Particionament per predicats personalitzats

Quan la columna numèrica no té una distribució uniforme, pots definir predicats SQL personalitzats:

# Particions per trimestre (distribució més controlada)
predicats = [
    "data_comanda >= '2024-01-01' AND data_comanda < '2024-04-01'",
    "data_comanda >= '2024-04-01' AND data_comanda < '2024-07-01'",
    "data_comanda >= '2024-07-01' AND data_comanda < '2024-10-01'",
    "data_comanda >= '2024-10-01' AND data_comanda < '2025-01-01'",
]

df_vendes = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.vendes") \
    .options(**connexio) \
    .load(predicates=predicats)

print(f"Nombre de particions: {df_vendes.rdd.getNumPartitions()}")  # 4

Escriure a una base de dades

Modes d'escriptura

# Resultat de la transformació Spark que volem guardar
df_resultat = df_vendes \
    .groupBy("regio", "categoria") \
    .agg(
        count("id_comanda").alias("num_comandes"),
        sum("import_total").alias("facturacio")
    )

# Mode 'overwrite': esborra i recrea la taula (PERILLÓS en producció!)
df_resultat.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.resum_vendes") \
    .options(**connexio) \
    .mode("overwrite") \
    .save()

# Mode 'append': afegeix files sense esborrar (útil per a càrregues incrementals)
df_nous = spark.createDataFrame([...])
df_nous.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.resum_vendes") \
    .options(**connexio) \
    .mode("append") \
    .save()

# Mode 'ignore': no fa res si la taula ja existeix
# Mode 'error' (per defecte): llança error si la taula existeix

Escriptura per lots (batchsize)

Per a taules grans, ajustar batchsize millora notablement el rendiment:

df_gran.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.taula_gran") \
    .option("batchsize", "10000") \
    .option("numPartitions", "8") \
    .options(**connexio) \
    .mode("append") \
    .save()

Exemple complet: ETL des de PostgreSQL

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, datediff

spark = SparkSession.builder \
    .appName("ETL_PostgreSQL_2024") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres:5432/crm_db"
conn_opts = {
    "user": "etl_user",
    "password": "etl_secret",
    "driver": "org.postgresql.Driver"
}

# 1. Llegir clients actius de CRM
df_clients = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.clients") \
    .option("partitionColumn", "id_client") \
    .option("lowerBound", "1") \
    .option("upperBound", "1000000") \
    .option("numPartitions", "8") \
    .options(**conn_opts) \
    .load() \
    .filter(col("actiu") == True)

# 2. Llegir comandes de l'últim mes
query_comandes = """
    (SELECT * FROM public.comandes
     WHERE data_comanda >= CURRENT_DATE - INTERVAL '30 days') AS t
"""
df_comandes = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", query_comandes) \
    .options(**conn_opts) \
    .load()

# 3. Transformació: enriquir comandes amb dades de client
df_enriquit = df_comandes \
    .join(df_clients.select("id_client", "segment", "regio"), on="id_client", how="left") \
    .withColumn("dies_desde_comanda", datediff(current_date(), col("data_comanda")))

# 4. Agregar per segment i regio
df_resum = df_enriquit \
    .groupBy("segment", "regio") \
    .agg(
        count("id_comanda").alias("comandes"),
        sum("import_total").alias("facturacio")
    )

# 5. Escriure resultats al data warehouse (PostgreSQL diferent)
dw_url = "jdbc:postgresql://datawarehouse:5432/dw_db"
df_resum.write \
    .format("jdbc") \
    .option("url", dw_url) \
    .option("dbtable", "analytics.resum_mensual") \
    .options(**{**conn_opts, "user": "dw_user", "password": "dw_secret"}) \
    .mode("append") \
    .save()

print("ETL completat correctament.")
spark.stop()

Miniactivitat: Connectar Spark a SQLite

SQLite és una bona opció per practicar JDBC localment sense instal·lar cap servidor:

  1. Descarrega el driver SQLite JDBC: sqlite-jdbc-3.x.jar
  2. Crea una base de dades SQLite amb sqlite3 vendes.db i una taula amb 100 files de prova.
  3. Llegeix la taula amb Spark JDBC (sense particionament, ja que SQLite és de fitxer únic).
  4. Fes una agregació i escriu el resultat en una taula nova de la mateixa base de dades.
  5. Verifica el resultat amb sqlite3 vendes.db "SELECT * FROM resum;".