Salta el contingut

Aggregation Pipeline de MongoDB

L'Aggregation Pipeline és el mecanisme principal d'anàlisi de dades a MongoDB. Permet processar documents d'una col·lecció a través d'una seqüència d'etapes (stages) on la sortida d'una etapa és l'entrada de la següent. És l'equivalent al SQL amb GROUP BY, JOIN, subconsultes i funcions de finestra, tot en un sol framework consistent.

Concepte de pipeline

Un pipeline és un array d'etapes. Cada etapa és un objecte amb un operador ($match, $group, etc.) i els seus paràmetres:

db.col·leccio.aggregate([
  { $etapa1: { ... } },
  { $etapa2: { ... } },
  { $etapa3: { ... } }
])
graph LR
    Col["Col·lecció\n(documents originals)"]
    M["$match\n(filtra documents)"]
    G["$group\n(agrupa i acumula)"]
    S["$sort\n(ordena)"]
    L["$limit\n(limita)"]
    Out["Resultat\n(documents transformats)"]
    Col --> M --> G --> S --> L --> Out

La diferència clau respecte a find(): find() retorna documents de la col·lecció (possiblement filtrats i projectats), però no pot calcular agregats ni combinar dades de múltiples col·leccions. L'Aggregation Pipeline pot fer tot això.

Dataset de pràctica: vendes e-commerce

Per a tots els exemples d'aquesta unitat, usarem una col·lecció vendes que generarem amb unes dades de mostra. En la pràctica PR507403 es generarà amb 1 milió de documents amb Python:

use bigdata_nom_cognom

db.vendes.insertMany([
  {
    _id: 1, data: ISODate("2025-01-15"), pais: "Espanya",
    client: { id: "CLI-001", nom: "Joan Puig", segment: "premium" },
    productes: [
      { id: "PRD-010", nom: "Portàtil", categoria: "informatica", preu: 899.00, qty: 1 },
      { id: "PRD-045", nom: "Ratolí", categoria: "peripherics", preu: 24.99, qty: 2 }
    ],
    total: 948.98, estat: "completat"
  },
  {
    _id: 2, data: ISODate("2025-01-22"), pais: "França",
    client: { id: "CLI-002", nom: "Marie Dupont", segment: "basic" },
    productes: [
      { id: "PRD-020", nom: "Monitor", categoria: "monitors", preu: 349.00, qty: 1 }
    ],
    total: 349.00, estat: "completat"
  },
  {
    _id: 3, data: ISODate("2025-02-05"), pais: "Espanya",
    client: { id: "CLI-001", nom: "Joan Puig", segment: "premium" },
    productes: [
      { id: "PRD-030", nom: "Teclat", categoria: "peripherics", preu: 129.00, qty: 1 },
      { id: "PRD-035", nom: "Auriculars", categoria: "audio", preu: 89.99, qty: 1 }
    ],
    total: 218.99, estat: "completat"
  },
  {
    _id: 4, data: ISODate("2025-02-18"), pais: "Alemanya",
    client: { id: "CLI-003", nom: "Hans Müller", segment: "premium" },
    productes: [
      { id: "PRD-010", nom: "Portàtil", categoria: "informatica", preu: 899.00, qty: 2 }
    ],
    total: 1798.00, estat: "cancel·lat"
  },
  {
    _id: 5, data: ISODate("2025-03-10"), pais: "Espanya",
    client: { id: "CLI-004", nom: "Anna Mas", segment: "basic" },
    productes: [
      { id: "PRD-045", nom: "Ratolí", categoria: "peripherics", preu: 24.99, qty: 3 },
      { id: "PRD-050", nom: "Disc extern", categoria: "emmagatzematge", preu: 69.99, qty: 1 }
    ],
    total: 144.96, estat: "completat"
  }
])

Stages principals

$match — filtrar documents

$match és l'equivalent al WHERE de SQL. Filtra els documents que passen al stage següent. Ha d'anar al principi del pipeline sempre que sigui possible, perquè redueix la quantitat de documents que han de processar els stages posteriors i, si existeix un índex al camp de filtre, MongoDB l'aprofitarà.

// Vendes completades d'Espanya
db.vendes.aggregate([
  { $match: { pais: "Espanya", estat: "completat" } }
])

// Vendes del primer trimestre de 2025 amb total superior a 200
db.vendes.aggregate([
  {
    $match: {
      data: {
        $gte: ISODate("2025-01-01"),
        $lt:  ISODate("2025-04-01")
      },
      total: { $gt: 200 }
    }
  }
])

$group — agrupació i acumuladors

$group és l'equivalent al GROUP BY de SQL. Agrupa els documents per un camp (o expressió) i calcula valors acumulats per a cada grup. El camp _id de $group defineix la clau d'agrupació.

// Total de vendes i import total per país
db.vendes.aggregate([
  {
    $group: {
      _id: "$pais",                           // agrupar per país
      nombre_vendes: { $sum: 1 },             // comptar documents
      import_total: { $sum: "$total" },       // sumar el camp total
      import_mig: { $avg: "$total" },         // mitjana
      import_max: { $max: "$total" },         // màxim
      import_min: { $min: "$total" }          // mínim
    }
  }
])

Acumuladors disponibles en $group:

Acumulador SQL equivalent Descripció
$sum SUM() Suma. $sum: 1 compta documents
$avg AVG() Mitjana aritmètica
$max MAX() Valor màxim
$min MIN() Valor mínim
$count COUNT() Compta (des de MongoDB 5.0)
$push Array amb tots els valors del grup
$addToSet Array amb valors únics del grup
$first FIRST_VALUE() Primer valor del grup
$last LAST_VALUE() Últim valor del grup
// Agrupar per segment de client i obtenir llista de clients únics
db.vendes.aggregate([
  {
    $group: {
      _id: "$client.segment",
      clients_unics: { $addToSet: "$client.id" },
      total_acumulat: { $sum: "$total" }
    }
  }
])

$project — transformar i seleccionar camps

$project permet incloure, excloure i crear nous camps calculats. Similar a la clàusula SELECT de SQL però amb la potència d'expressions.

// Seleccionar camps i crear un camp calculat (any de la venda)
db.vendes.aggregate([
  {
    $project: {
      _id: 0,
      pais: 1,
      total: 1,
      any_venda: { $year: "$data" },          // extreure l'any de la data
      mes_venda: { $month: "$data" },          // extreure el mes
      total_arrodonit: { $round: ["$total", 0] }
    }
  }
])

// Calcular l'IVA (21%) i el total amb IVA
db.vendes.aggregate([
  {
    $project: {
      nom_client: "$client.nom",
      total_sense_iva: "$total",
      iva: { $multiply: ["$total", 0.21] },
      total_amb_iva: { $multiply: ["$total", 1.21] }
    }
  }
])

$addFields — afegir camps sense eliminar els existents

$addFields afegeix camps calculats al document sense eliminar els camps originals. Equivalent a $project però sense haver d'especificar tots els camps existents:

// Afegir mes i any a cada document de venda sense modificar res més
db.vendes.aggregate([
  {
    $addFields: {
      any: { $year: "$data" },
      mes: { $month: "$data" },
      es_premium: { $eq: ["$client.segment", "premium"] }
    }
  }
])

$sort, $limit, $skip — ordenació i paginació

// Ordenar per total descendent i agafar el top 3
db.vendes.aggregate([
  { $match: { estat: "completat" } },
  { $sort: { total: -1 } },
  { $limit: 3 }
])

// Paginació: pàgina 2 amb 10 resultats per pàgina
db.vendes.aggregate([
  { $sort: { data: -1 } },
  { $skip: 10 },
  { $limit: 10 }
])

$unwind — expandir arrays en documents individuals

$unwind és un stage específic de MongoDB sense equivalent directe en SQL. Pren un camp que és un array i crea un document per cada element de l'array. Essencial per analitzar dades dins d'arrays.

// Expandir les línies de productes de cada venda
// Cada venda amb 2 productes es converteix en 2 documents separats
db.vendes.aggregate([
  { $unwind: "$productes" }
])
// Resultat (exemple per al document _id:1):
// { _id: 1, pais: "Espanya", ..., productes: { id: "PRD-010", nom: "Portàtil", ... } }
// { _id: 1, pais: "Espanya", ..., productes: { id: "PRD-045", nom: "Ratolí", ... } }

$unwind és molt poderós en combinació amb $group per analitzar els elements individuals d'arrays:

// Unitats venudes per categoria de producte
db.vendes.aggregate([
  { $match: { estat: "completat" } },
  { $unwind: "$productes" },
  {
    $group: {
      _id: "$productes.categoria",
      unitats_venudes: { $sum: "$productes.qty" },
      ingressos: { $sum: { $multiply: ["$productes.preu", "$productes.qty"] } }
    }
  },
  { $sort: { ingressos: -1 } }
])

$lookup — JOIN entre col·leccions

$lookup permet combinar dades de dues col·leccions, equivalent al LEFT JOIN de SQL.

// Suposem una col·lecció "clients" separada
db.clients.insertMany([
  { _id: "CLI-001", nom: "Joan Puig", email: "joan@exemple.cat", ciutat: "Girona" },
  { _id: "CLI-002", nom: "Marie Dupont", email: "marie@exemple.fr", ciutat: "Lió" },
  { _id: "CLI-003", nom: "Hans Müller", email: "hans@exemple.de", ciutat: "Berlín" },
  { _id: "CLI-004", nom: "Anna Mas", email: "anna@exemple.cat", ciutat: "Barcelona" }
])

// JOIN: unir vendes amb les dades completes del client
db.vendes.aggregate([
  {
    $lookup: {
      from: "clients",              // col·lecció de destí
      localField: "client.id",     // camp de la col·lecció origen
      foreignField: "_id",         // camp de la col·lecció destí
      as: "client_info"            // nom del camp resultat (array)
    }
  },
  {
    $project: {
      pais: 1, total: 1, estat: 1,
      "client_info.email": 1,
      "client_info.ciutat": 1
    }
  }
])

$lookup retorna un array

El camp especificat a as sempre és un array, fins i tot si el JOIN retorna un sol document. Per "aplanar" l'array i accedir directament als camps, pots fer $unwind sobre el camp as immediatament després del $lookup.

// $lookup + $unwind per accedir als camps del JOIN directament
db.vendes.aggregate([
  {
    $lookup: {
      from: "clients",
      localField: "client.id",
      foreignField: "_id",
      as: "client_info"
    }
  },
  { $unwind: "$client_info" },    // aplanar l'array de resultat
  {
    $project: {
      pais: 1, total: 1,
      nom_client: "$client_info.nom",
      email: "$client_info.email"
    }
  }
])

$out i $merge — persistir resultats

$out escriu el resultat del pipeline en una nova col·lecció (sobreescriu si existeix). $merge permet combinar el resultat amb una col·lecció existent.

// Desar el resum mensual en una nova col·lecció
db.vendes.aggregate([
  { $match: { estat: "completat" } },
  {
    $group: {
      _id: { any: { $year: "$data" }, mes: { $month: "$data" } },
      total_mes: { $sum: "$total" },
      nombre_vendes: { $sum: 1 }
    }
  },
  { $sort: { "_id.any": 1, "_id.mes": 1 } },
  { $out: "resum_mensual" }        // crea la col·lecció resum_mensual
])

$facet — múltiples sub-pipelines en paral·lel

$facet permet executar múltiples pipelines independents sobre el mateix conjunt de documents d'entrada, retornant tots els resultats en un sol document. Molt útil per a pàgines de cerca amb facetes (filtres laterals):

// Obtenir estadístiques generals + distribució per país + top 3 clients
db.vendes.aggregate([
  { $match: { estat: "completat" } },
  {
    $facet: {
      estadistiques_generals: [
        {
          $group: {
            _id: null,
            total_vendes: { $sum: 1 },
            import_total: { $sum: "$total" },
            import_mig: { $avg: "$total" }
          }
        }
      ],
      per_pais: [
        { $group: { _id: "$pais", count: { $sum: 1 }, total: { $sum: "$total" } } },
        { $sort: { total: -1 } }
      ],
      top_clients: [
        { $group: { _id: "$client.id", nom: { $first: "$client.nom" }, total: { $sum: "$total" } } },
        { $sort: { total: -1 } },
        { $limit: 3 }
      ]
    }
  }
])

Exemple complet: anàlisi de vendes

Aquest pipeline resol una pregunta analítica real: "Quins van ser els ingressos per categoria de producte al primer trimestre de 2025, ordenats de major a menor?"

db.vendes.aggregate([
  // 1. Filtrar: només vendes completades del Q1 2025
  {
    $match: {
      estat: "completat",
      data: { $gte: ISODate("2025-01-01"), $lt: ISODate("2025-04-01") }
    }
  },
  // 2. Expandir l'array de productes
  { $unwind: "$productes" },
  // 3. Afegir camp de ingrés per línia
  {
    $addFields: {
      ingres_linia: { $multiply: ["$productes.preu", "$productes.qty"] }
    }
  },
  // 4. Agrupar per categoria
  {
    $group: {
      _id: "$productes.categoria",
      ingressos_totals: { $sum: "$ingres_linia" },
      unitats_venudes: { $sum: "$productes.qty" },
      nombre_transaccions: { $sum: 1 }
    }
  },
  // 5. Reanomenar _id per llegibilitat
  {
    $project: {
      _id: 0,
      categoria: "$_id",
      ingressos_totals: { $round: ["$ingressos_totals", 2] },
      unitats_venudes: 1,
      nombre_transaccions: 1
    }
  },
  // 6. Ordenar per ingressos descendent
  { $sort: { ingressos_totals: -1 } }
])

Optimització de pipelines

L'ordre dels stages té un impacte directe en el rendiment. MongoDB aplica algunes optimitzacions automàticament, però el dissenyador del pipeline ha de conèixer les regles bàsiques:

Regla 1: $match el més aviat possible

Un $match al principi del pipeline redueix el nombre de documents que han de processar els stages posteriors. Si el camp de filtre té un índex, MongoDB l'aprofita per evitar un COLLSCAN.

Regla 2: $sort just després de $match

Si $sort va immediatament després de $match sobre un camp indexat, MongoDB pot usar l'índex per ordenar sense carregar tots els documents en memòria (evita l'ordenació en memòria que té un límit de 100 MB per defecte).

Límit de memòria del pipeline

Per defecte, cada stage del pipeline pot usar com a màxim 100 MB de RAM. Si un stage supera aquest límit, MongoDB lança un error. La solució és afegir { allowDiskUse: true } a les opcions de l'agregació: db.col.aggregate([...], { allowDiskUse: true }).

Regla 3: projectar aviat els camps no necessaris

Si el pipeline no necessita tots els camps dels documents, afegir un $project o $unset aviat redueix la mida dels documents intermedis que es passen entre stages.

Comparativa: Aggregation Pipeline vs SQL

Operació SQL MongoDB Aggregation
Filtrar WHERE $match
Agrupar GROUP BY $group
Funció d'agregació SUM(), AVG() $sum, $avg (dins de $group)
Seleccionar camps SELECT col1, col2 $project
JOIN JOIN ... ON $lookup
Ordenar ORDER BY $sort
Limitar resultats LIMIT N $limit
Saltar resultats OFFSET N $skip
Subconsulta en FROM FROM (SELECT ...) Stage addicional al pipeline
Expandir arrays — (no existeix) $unwind
Múltiples agregacions Multiple queries $facet
Desar resultats CREATE TABLE AS SELECT $out / $merge

AC5074/03/03 — Miniactivitat

Pipeline de vendes per categoria i mes.

Partint de la col·lecció vendes (ampliada amb almenys 20 documents que cobreixin 3 mesos i 3 categories de producte), escriu un pipeline d'agregació que:

  1. Filtri les vendes d'un rang de dates especificat (p. ex. gener-març de 2025) i que tinguin estat "completat".
  2. Expandeixi l'array de productes de cada venda.
  3. Calculi els ingressos totals i les unitats venudes per categoria de producte i per mes.
  4. Ordeni el resultat per mes ascendent i, dins de cada mes, per ingressos descendent.
  5. Projecti el resultat final amb els camps: any, mes, categoria, ingressos_totals, unitats_venudes. Sense el camp _id.

A continuació, explica amb comentaris al codi quin índex crearies per optimitzar el $match inicial i per qué.