Spark JDBC: connexió a bases de dades relacionals
Introducció
Spark pot llegir i escriure dades directament des de qualsevol base de dades relacional que disposi d'un connector JDBC: PostgreSQL, MySQL, MariaDB, Oracle, SQL Server, SQLite, etc. Això permet integrar pipelines Spark amb fonts de dades corporatives existents sense necessitat d'exportar fitxers intermedis.
El connector JDBC de Spark és especialment útil per a: - ETL des de bases de dades transaccionals cap a un Data Lake - Escriure resultats analítics de tornada a una base de dades operacional - Sincronització incremental de taules entre sistemes
Prerequisits: drivers JDBC
El driver JDBC del motor de base de dades ha d'estar disponible al classpath de Spark. En Docker, la forma habitual és afegir-lo com a paquet Maven:
# Iniciar PySpark amb el driver de PostgreSQL
pyspark --packages org.postgresql:postgresql:42.7.3
# Per a MySQL
pyspark --packages com.mysql.cj:mysql-connector-j:8.3.0
# Per a SQL Server
pyspark --packages com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre11
En una SparkSession programàtica:
spark = SparkSession.builder \
.appName("SparkJDBC") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
.getOrCreate()
Llegir des d'una base de dades
Lectura simple d'una taula
# Paràmetres de connexió PostgreSQL
jdbc_url = "jdbc:postgresql://localhost:5432/vendes_db"
connexio = {
"user": "spark_user",
"password": "secret123",
"driver": "org.postgresql.Driver"
}
# Llegir una taula sencera
df_clients = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.clients") \
.options(**connexio) \
.load()
df_clients.printSchema()
df_clients.show(5)
Llegir amb una query personalitzada
# Llegir el resultat d'una consulta SQL (no una taula sencera)
query = """
(SELECT
c.id_client,
c.nom,
c.email,
COUNT(v.id_comanda) AS num_comandes,
SUM(v.import_total) AS total_gastat
FROM clients c
LEFT JOIN vendes v ON c.id_client = v.id_client
WHERE v.data_comanda >= '2024-01-01'
GROUP BY c.id_client, c.nom, c.email
) AS subquery
"""
df_resum = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", query) \
.options(**connexio) \
.load()
Parentèsis i àlies obligatori
Quan uses una subquery com a dbtable, ha d'anar entre parèntesis i tenir un àlies. Alguns motors (PostgreSQL, MySQL) ho requereixen explícitament.
Particionament per a lectures paral·leles
Per defecte, Spark llegeix una taula JDBC amb una sola connexió i un sol executor (un únic partition). Amb taules grans, això és un coll d'ampolla important.
La solució és especificar una columna numèrica per a partir la lectura en múltiples particions paral·leles:
# Llegir la taula de vendes en 10 particions paral·leles
# basant-se en el rang de la columna 'id_comanda'
df_vendes = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.vendes") \
.option("partitionColumn", "id_comanda") \
.option("lowerBound", "1") \
.option("upperBound", "5000000") \
.option("numPartitions", "10") \
.options(**connexio) \
.load()
# Verificar que s'han creat 10 particions
print(f"Nombre de particions: {df_vendes.rdd.getNumPartitions()}")
graph LR
DB[(PostgreSQL\nvendes)] -->|id 1-500k| P1[Partició 1\nExecutor 1]
DB -->|id 500k-1M| P2[Partició 2\nExecutor 2]
DB -->|id 1M-1.5M| P3[Partició 3\nExecutor 3]
DB -->|...| PN[Partició N\nExecutor N]
P1 --> DF[DataFrame Spark]
P2 --> DF
P3 --> DF
PN --> DF
Tria una bona columna de particionament
La columna de particionament ha de ser numèrica, tenir una distribució uniforme, i tenir un índex a la base de dades. Un ID seqüencial o una data convertida a timestamp numèric solen funcionar bé. Evita columnes amb molts valors nuls.
Particionament per predicats personalitzats
Quan la columna numèrica no té una distribució uniforme, pots definir predicats SQL personalitzats:
# Particions per trimestre (distribució més controlada)
predicats = [
"data_comanda >= '2024-01-01' AND data_comanda < '2024-04-01'",
"data_comanda >= '2024-04-01' AND data_comanda < '2024-07-01'",
"data_comanda >= '2024-07-01' AND data_comanda < '2024-10-01'",
"data_comanda >= '2024-10-01' AND data_comanda < '2025-01-01'",
]
df_vendes = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.vendes") \
.options(**connexio) \
.load(predicates=predicats)
print(f"Nombre de particions: {df_vendes.rdd.getNumPartitions()}") # 4
Escriure a una base de dades
Modes d'escriptura
# Resultat de la transformació Spark que volem guardar
df_resultat = df_vendes \
.groupBy("regio", "categoria") \
.agg(
count("id_comanda").alias("num_comandes"),
sum("import_total").alias("facturacio")
)
# Mode 'overwrite': esborra i recrea la taula (PERILLÓS en producció!)
df_resultat.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.resum_vendes") \
.options(**connexio) \
.mode("overwrite") \
.save()
# Mode 'append': afegeix files sense esborrar (útil per a càrregues incrementals)
df_nous = spark.createDataFrame([...])
df_nous.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.resum_vendes") \
.options(**connexio) \
.mode("append") \
.save()
# Mode 'ignore': no fa res si la taula ja existeix
# Mode 'error' (per defecte): llança error si la taula existeix
Escriptura per lots (batchsize)
Per a taules grans, ajustar batchsize millora notablement el rendiment:
df_gran.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.taula_gran") \
.option("batchsize", "10000") \
.option("numPartitions", "8") \
.options(**connexio) \
.mode("append") \
.save()
Exemple complet: ETL des de PostgreSQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, datediff
spark = SparkSession.builder \
.appName("ETL_PostgreSQL_2024") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
.getOrCreate()
jdbc_url = "jdbc:postgresql://postgres:5432/crm_db"
conn_opts = {
"user": "etl_user",
"password": "etl_secret",
"driver": "org.postgresql.Driver"
}
# 1. Llegir clients actius de CRM
df_clients = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "public.clients") \
.option("partitionColumn", "id_client") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.option("numPartitions", "8") \
.options(**conn_opts) \
.load() \
.filter(col("actiu") == True)
# 2. Llegir comandes de l'últim mes
query_comandes = """
(SELECT * FROM public.comandes
WHERE data_comanda >= CURRENT_DATE - INTERVAL '30 days') AS t
"""
df_comandes = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", query_comandes) \
.options(**conn_opts) \
.load()
# 3. Transformació: enriquir comandes amb dades de client
df_enriquit = df_comandes \
.join(df_clients.select("id_client", "segment", "regio"), on="id_client", how="left") \
.withColumn("dies_desde_comanda", datediff(current_date(), col("data_comanda")))
# 4. Agregar per segment i regio
df_resum = df_enriquit \
.groupBy("segment", "regio") \
.agg(
count("id_comanda").alias("comandes"),
sum("import_total").alias("facturacio")
)
# 5. Escriure resultats al data warehouse (PostgreSQL diferent)
dw_url = "jdbc:postgresql://datawarehouse:5432/dw_db"
df_resum.write \
.format("jdbc") \
.option("url", dw_url) \
.option("dbtable", "analytics.resum_mensual") \
.options(**{**conn_opts, "user": "dw_user", "password": "dw_secret"}) \
.mode("append") \
.save()
print("ETL completat correctament.")
spark.stop()
Miniactivitat: Connectar Spark a SQLite
SQLite és una bona opció per practicar JDBC localment sense instal·lar cap servidor:
- Descarrega el driver SQLite JDBC:
sqlite-jdbc-3.x.jar - Crea una base de dades SQLite amb
sqlite3 vendes.dbi una taula amb 100 files de prova. - Llegeix la taula amb Spark JDBC (sense particionament, ja que SQLite és de fitxer únic).
- Fes una agregació i escriu el resultat en una taula nova de la mateixa base de dades.
- Verifica el resultat amb
sqlite3 vendes.db "SELECT * FROM resum;".