Aggregation Pipeline de MongoDB
L'Aggregation Pipeline és el mecanisme principal d'anàlisi de dades a MongoDB. Permet processar documents d'una col·lecció a través d'una seqüència d'etapes (stages) on la sortida d'una etapa és l'entrada de la següent. És l'equivalent al SQL amb GROUP BY, JOIN, subconsultes i funcions de finestra, tot en un sol framework consistent.
Concepte de pipeline
Un pipeline és un array d'etapes. Cada etapa és un objecte amb un operador ($match, $group, etc.) i els seus paràmetres:
graph LR
Col["Col·lecció\n(documents originals)"]
M["$match\n(filtra documents)"]
G["$group\n(agrupa i acumula)"]
S["$sort\n(ordena)"]
L["$limit\n(limita)"]
Out["Resultat\n(documents transformats)"]
Col --> M --> G --> S --> L --> Out
La diferència clau respecte a find(): find() retorna documents de la col·lecció (possiblement filtrats i projectats), però no pot calcular agregats ni combinar dades de múltiples col·leccions. L'Aggregation Pipeline pot fer tot això.
Dataset de pràctica: vendes e-commerce
Per a tots els exemples d'aquesta unitat, usarem una col·lecció vendes que generarem amb unes dades de mostra. En la pràctica PR507403 es generarà amb 1 milió de documents amb Python:
use bigdata_nom_cognom
db.vendes.insertMany([
{
_id: 1, data: ISODate("2025-01-15"), pais: "Espanya",
client: { id: "CLI-001", nom: "Joan Puig", segment: "premium" },
productes: [
{ id: "PRD-010", nom: "Portàtil", categoria: "informatica", preu: 899.00, qty: 1 },
{ id: "PRD-045", nom: "Ratolí", categoria: "peripherics", preu: 24.99, qty: 2 }
],
total: 948.98, estat: "completat"
},
{
_id: 2, data: ISODate("2025-01-22"), pais: "França",
client: { id: "CLI-002", nom: "Marie Dupont", segment: "basic" },
productes: [
{ id: "PRD-020", nom: "Monitor", categoria: "monitors", preu: 349.00, qty: 1 }
],
total: 349.00, estat: "completat"
},
{
_id: 3, data: ISODate("2025-02-05"), pais: "Espanya",
client: { id: "CLI-001", nom: "Joan Puig", segment: "premium" },
productes: [
{ id: "PRD-030", nom: "Teclat", categoria: "peripherics", preu: 129.00, qty: 1 },
{ id: "PRD-035", nom: "Auriculars", categoria: "audio", preu: 89.99, qty: 1 }
],
total: 218.99, estat: "completat"
},
{
_id: 4, data: ISODate("2025-02-18"), pais: "Alemanya",
client: { id: "CLI-003", nom: "Hans Müller", segment: "premium" },
productes: [
{ id: "PRD-010", nom: "Portàtil", categoria: "informatica", preu: 899.00, qty: 2 }
],
total: 1798.00, estat: "cancel·lat"
},
{
_id: 5, data: ISODate("2025-03-10"), pais: "Espanya",
client: { id: "CLI-004", nom: "Anna Mas", segment: "basic" },
productes: [
{ id: "PRD-045", nom: "Ratolí", categoria: "peripherics", preu: 24.99, qty: 3 },
{ id: "PRD-050", nom: "Disc extern", categoria: "emmagatzematge", preu: 69.99, qty: 1 }
],
total: 144.96, estat: "completat"
}
])
Stages principals
$match — filtrar documents
$match és l'equivalent al WHERE de SQL. Filtra els documents que passen al stage següent. Ha d'anar al principi del pipeline sempre que sigui possible, perquè redueix la quantitat de documents que han de processar els stages posteriors i, si existeix un índex al camp de filtre, MongoDB l'aprofitarà.
// Vendes completades d'Espanya
db.vendes.aggregate([
{ $match: { pais: "Espanya", estat: "completat" } }
])
// Vendes del primer trimestre de 2025 amb total superior a 200
db.vendes.aggregate([
{
$match: {
data: {
$gte: ISODate("2025-01-01"),
$lt: ISODate("2025-04-01")
},
total: { $gt: 200 }
}
}
])
$group — agrupació i acumuladors
$group és l'equivalent al GROUP BY de SQL. Agrupa els documents per un camp (o expressió) i calcula valors acumulats per a cada grup. El camp _id de $group defineix la clau d'agrupació.
// Total de vendes i import total per país
db.vendes.aggregate([
{
$group: {
_id: "$pais", // agrupar per país
nombre_vendes: { $sum: 1 }, // comptar documents
import_total: { $sum: "$total" }, // sumar el camp total
import_mig: { $avg: "$total" }, // mitjana
import_max: { $max: "$total" }, // màxim
import_min: { $min: "$total" } // mínim
}
}
])
Acumuladors disponibles en $group:
| Acumulador | SQL equivalent | Descripció |
|---|---|---|
$sum |
SUM() |
Suma. $sum: 1 compta documents |
$avg |
AVG() |
Mitjana aritmètica |
$max |
MAX() |
Valor màxim |
$min |
MIN() |
Valor mínim |
$count |
COUNT() |
Compta (des de MongoDB 5.0) |
$push |
— | Array amb tots els valors del grup |
$addToSet |
— | Array amb valors únics del grup |
$first |
FIRST_VALUE() |
Primer valor del grup |
$last |
LAST_VALUE() |
Últim valor del grup |
// Agrupar per segment de client i obtenir llista de clients únics
db.vendes.aggregate([
{
$group: {
_id: "$client.segment",
clients_unics: { $addToSet: "$client.id" },
total_acumulat: { $sum: "$total" }
}
}
])
$project — transformar i seleccionar camps
$project permet incloure, excloure i crear nous camps calculats. Similar a la clàusula SELECT de SQL però amb la potència d'expressions.
// Seleccionar camps i crear un camp calculat (any de la venda)
db.vendes.aggregate([
{
$project: {
_id: 0,
pais: 1,
total: 1,
any_venda: { $year: "$data" }, // extreure l'any de la data
mes_venda: { $month: "$data" }, // extreure el mes
total_arrodonit: { $round: ["$total", 0] }
}
}
])
// Calcular l'IVA (21%) i el total amb IVA
db.vendes.aggregate([
{
$project: {
nom_client: "$client.nom",
total_sense_iva: "$total",
iva: { $multiply: ["$total", 0.21] },
total_amb_iva: { $multiply: ["$total", 1.21] }
}
}
])
$addFields — afegir camps sense eliminar els existents
$addFields afegeix camps calculats al document sense eliminar els camps originals. Equivalent a $project però sense haver d'especificar tots els camps existents:
// Afegir mes i any a cada document de venda sense modificar res més
db.vendes.aggregate([
{
$addFields: {
any: { $year: "$data" },
mes: { $month: "$data" },
es_premium: { $eq: ["$client.segment", "premium"] }
}
}
])
$sort, $limit, $skip — ordenació i paginació
// Ordenar per total descendent i agafar el top 3
db.vendes.aggregate([
{ $match: { estat: "completat" } },
{ $sort: { total: -1 } },
{ $limit: 3 }
])
// Paginació: pàgina 2 amb 10 resultats per pàgina
db.vendes.aggregate([
{ $sort: { data: -1 } },
{ $skip: 10 },
{ $limit: 10 }
])
$unwind — expandir arrays en documents individuals
$unwind és un stage específic de MongoDB sense equivalent directe en SQL. Pren un camp que és un array i crea un document per cada element de l'array. Essencial per analitzar dades dins d'arrays.
// Expandir les línies de productes de cada venda
// Cada venda amb 2 productes es converteix en 2 documents separats
db.vendes.aggregate([
{ $unwind: "$productes" }
])
// Resultat (exemple per al document _id:1):
// { _id: 1, pais: "Espanya", ..., productes: { id: "PRD-010", nom: "Portàtil", ... } }
// { _id: 1, pais: "Espanya", ..., productes: { id: "PRD-045", nom: "Ratolí", ... } }
$unwind és molt poderós en combinació amb $group per analitzar els elements individuals d'arrays:
// Unitats venudes per categoria de producte
db.vendes.aggregate([
{ $match: { estat: "completat" } },
{ $unwind: "$productes" },
{
$group: {
_id: "$productes.categoria",
unitats_venudes: { $sum: "$productes.qty" },
ingressos: { $sum: { $multiply: ["$productes.preu", "$productes.qty"] } }
}
},
{ $sort: { ingressos: -1 } }
])
$lookup — JOIN entre col·leccions
$lookup permet combinar dades de dues col·leccions, equivalent al LEFT JOIN de SQL.
// Suposem una col·lecció "clients" separada
db.clients.insertMany([
{ _id: "CLI-001", nom: "Joan Puig", email: "joan@exemple.cat", ciutat: "Girona" },
{ _id: "CLI-002", nom: "Marie Dupont", email: "marie@exemple.fr", ciutat: "Lió" },
{ _id: "CLI-003", nom: "Hans Müller", email: "hans@exemple.de", ciutat: "Berlín" },
{ _id: "CLI-004", nom: "Anna Mas", email: "anna@exemple.cat", ciutat: "Barcelona" }
])
// JOIN: unir vendes amb les dades completes del client
db.vendes.aggregate([
{
$lookup: {
from: "clients", // col·lecció de destí
localField: "client.id", // camp de la col·lecció origen
foreignField: "_id", // camp de la col·lecció destí
as: "client_info" // nom del camp resultat (array)
}
},
{
$project: {
pais: 1, total: 1, estat: 1,
"client_info.email": 1,
"client_info.ciutat": 1
}
}
])
$lookup retorna un array
El camp especificat a as sempre és un array, fins i tot si el JOIN retorna un sol document. Per "aplanar" l'array i accedir directament als camps, pots fer $unwind sobre el camp as immediatament després del $lookup.
// $lookup + $unwind per accedir als camps del JOIN directament
db.vendes.aggregate([
{
$lookup: {
from: "clients",
localField: "client.id",
foreignField: "_id",
as: "client_info"
}
},
{ $unwind: "$client_info" }, // aplanar l'array de resultat
{
$project: {
pais: 1, total: 1,
nom_client: "$client_info.nom",
email: "$client_info.email"
}
}
])
$out i $merge — persistir resultats
$out escriu el resultat del pipeline en una nova col·lecció (sobreescriu si existeix). $merge permet combinar el resultat amb una col·lecció existent.
// Desar el resum mensual en una nova col·lecció
db.vendes.aggregate([
{ $match: { estat: "completat" } },
{
$group: {
_id: { any: { $year: "$data" }, mes: { $month: "$data" } },
total_mes: { $sum: "$total" },
nombre_vendes: { $sum: 1 }
}
},
{ $sort: { "_id.any": 1, "_id.mes": 1 } },
{ $out: "resum_mensual" } // crea la col·lecció resum_mensual
])
$facet — múltiples sub-pipelines en paral·lel
$facet permet executar múltiples pipelines independents sobre el mateix conjunt de documents d'entrada, retornant tots els resultats en un sol document. Molt útil per a pàgines de cerca amb facetes (filtres laterals):
// Obtenir estadístiques generals + distribució per país + top 3 clients
db.vendes.aggregate([
{ $match: { estat: "completat" } },
{
$facet: {
estadistiques_generals: [
{
$group: {
_id: null,
total_vendes: { $sum: 1 },
import_total: { $sum: "$total" },
import_mig: { $avg: "$total" }
}
}
],
per_pais: [
{ $group: { _id: "$pais", count: { $sum: 1 }, total: { $sum: "$total" } } },
{ $sort: { total: -1 } }
],
top_clients: [
{ $group: { _id: "$client.id", nom: { $first: "$client.nom" }, total: { $sum: "$total" } } },
{ $sort: { total: -1 } },
{ $limit: 3 }
]
}
}
])
Exemple complet: anàlisi de vendes
Aquest pipeline resol una pregunta analítica real: "Quins van ser els ingressos per categoria de producte al primer trimestre de 2025, ordenats de major a menor?"
db.vendes.aggregate([
// 1. Filtrar: només vendes completades del Q1 2025
{
$match: {
estat: "completat",
data: { $gte: ISODate("2025-01-01"), $lt: ISODate("2025-04-01") }
}
},
// 2. Expandir l'array de productes
{ $unwind: "$productes" },
// 3. Afegir camp de ingrés per línia
{
$addFields: {
ingres_linia: { $multiply: ["$productes.preu", "$productes.qty"] }
}
},
// 4. Agrupar per categoria
{
$group: {
_id: "$productes.categoria",
ingressos_totals: { $sum: "$ingres_linia" },
unitats_venudes: { $sum: "$productes.qty" },
nombre_transaccions: { $sum: 1 }
}
},
// 5. Reanomenar _id per llegibilitat
{
$project: {
_id: 0,
categoria: "$_id",
ingressos_totals: { $round: ["$ingressos_totals", 2] },
unitats_venudes: 1,
nombre_transaccions: 1
}
},
// 6. Ordenar per ingressos descendent
{ $sort: { ingressos_totals: -1 } }
])
Optimització de pipelines
L'ordre dels stages té un impacte directe en el rendiment. MongoDB aplica algunes optimitzacions automàticament, però el dissenyador del pipeline ha de conèixer les regles bàsiques:
Regla 1: $match el més aviat possible
Un $match al principi del pipeline redueix el nombre de documents que han de processar els stages posteriors. Si el camp de filtre té un índex, MongoDB l'aprofita per evitar un COLLSCAN.
Regla 2: $sort just després de $match
Si $sort va immediatament després de $match sobre un camp indexat, MongoDB pot usar l'índex per ordenar sense carregar tots els documents en memòria (evita l'ordenació en memòria que té un límit de 100 MB per defecte).
Límit de memòria del pipeline
Per defecte, cada stage del pipeline pot usar com a màxim 100 MB de RAM. Si un stage supera aquest límit, MongoDB lança un error. La solució és afegir { allowDiskUse: true } a les opcions de l'agregació: db.col.aggregate([...], { allowDiskUse: true }).
Regla 3: projectar aviat els camps no necessaris
Si el pipeline no necessita tots els camps dels documents, afegir un $project o $unset aviat redueix la mida dels documents intermedis que es passen entre stages.
Comparativa: Aggregation Pipeline vs SQL
| Operació | SQL | MongoDB Aggregation |
|---|---|---|
| Filtrar | WHERE |
$match |
| Agrupar | GROUP BY |
$group |
| Funció d'agregació | SUM(), AVG() |
$sum, $avg (dins de $group) |
| Seleccionar camps | SELECT col1, col2 |
$project |
| JOIN | JOIN ... ON |
$lookup |
| Ordenar | ORDER BY |
$sort |
| Limitar resultats | LIMIT N |
$limit |
| Saltar resultats | OFFSET N |
$skip |
| Subconsulta en FROM | FROM (SELECT ...) |
Stage addicional al pipeline |
| Expandir arrays | — (no existeix) | $unwind |
| Múltiples agregacions | Multiple queries | $facet |
| Desar resultats | CREATE TABLE AS SELECT |
$out / $merge |
AC5074/03/03 — Miniactivitat
Pipeline de vendes per categoria i mes.
Partint de la col·lecció vendes (ampliada amb almenys 20 documents que cobreixin 3 mesos i 3 categories de producte), escriu un pipeline d'agregació que:
- Filtri les vendes d'un rang de dates especificat (p. ex. gener-març de 2025) i que tinguin estat
"completat". - Expandeixi l'array de productes de cada venda.
- Calculi els ingressos totals i les unitats venudes per categoria de producte i per mes.
- Ordeni el resultat per mes ascendent i, dins de cada mes, per ingressos descendent.
- Projecti el resultat final amb els camps:
any,mes,categoria,ingressos_totals,unitats_venudes. Sense el camp_id.
A continuació, explica amb comentaris al codi quin índex crearies per optimitzar el $match inicial i per qué.