Salta el contingut

Temari N8N - Unitats 4 i 5

Nodes Avançats i ETL amb N8N


Unitat 4: Nodes Avançats

4.1. Nodos de transformació

Function Node - JavaScript personalitzat

El Function Node permet executar codi JavaScript per a transformacions complexes.

// Exemple: Càlcul de mètriques d'usuaris
const items = $input.all();

return items.map(item => {
  const user = item.json;

  // Càlculs complexos
  const totalSpent = user.orders.reduce((sum, order) => sum + order.total, 0);
  const avgOrderValue = totalSpent / user.orders.length;
  const isVIP = totalSpent > 1000;

  return {
    json: {
      user_id: user.id,
      name: user.name,
      total_spent: totalSpent.toFixed(2),
      avg_order_value: avgOrderValue.toFixed(2),
      order_count: user.orders.length,
      customer_tier: isVIP ? 'VIP' : 'Regular',
      last_order_date: user.orders[user.orders.length - 1].date
    }
  };
});

Code Node - JavaScript o Python

Més potent que Function Node, permet importar llibreries.

# Python per a data science
import pandas as pd
import numpy as np
from datetime import datetime

# Convertir items a DataFrame
data = [item['json'] for item in $input.all()]
df = pd.DataFrame(data)

# Anàlisi amb pandas
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.month
monthly_sales = df.groupby('month')['sales'].sum()

# Statistical analysis
df['z_score'] = (df['sales'] - df['sales'].mean()) / df['sales'].std()
outliers = df[abs(df['z_score']) > 2]

# Return results
results = []
for _, row in monthly_sales.items():
    results.append({
        'json': {
            'month': _,
            'total_sales': float(row),
            'outlier_count': len(outliers[outliers['month'] == _])
        }
    })

return results

Set Node - Manipulació ràpida

Ideal per a transformacions simples sense codi.

Configuration:
  Keep Only Set: false

Values to Set:
  full_name: {{$json.first_name}} {{$json.last_name}}
  email_domain: {{$json.email.split('@')[1]}}
  age_group: {{$json.age >= 18 ? 'Adult' : 'Minor'}}
  total_with_tax: {{$json.subtotal * 1.21}}
  created_date: {{$now.toISO()}}

Merge Node - Combinació de dades

Tres modes principals:

Mode 1: Append (concatenar)
  Input 1: [A, B, C]
  Input 2: [D, E, F]
  Output:  [A, B, C, D, E, F]

Mode 2: Keep Key Matches (JOIN)
  Input 1: [{id:1, name:'Joan'}, {id:2, name:'Maria'}]
  Input 2: [{id:1, age:30}, {id:2, age:25}]
  Key: 'id'
  Output: [{id:1, name:'Joan', age:30}, {id:2, name:'Maria', age:25}]

Mode 3: Remove Key Matches (ANTI-JOIN)
  Input 1: [{id:1}, {id:2}, {id:3}]
  Input 2: [{id:2}]
  Key: 'id'
  Output: [{id:1}, {id:3}]

Split In Batches - Processament per lots

Configuració:
  Batch Size: 100
  Reset: true

Flux:
[Source: 1000 items]
[Split In Batches: 100]
  ↓ (Iteració 1: items 1-100)
[Process Batch]
[Loop back] → (Iteració 2: items 101-200)
[Repeat fins item 1000]

Exemple pràctic:

[Get All Users: 10,000 users]
[Split In Batches: 50]
[Enrich Each User Data] (50 at a time)
[PostgreSQL: Bulk Insert] (50 at a time)
[Loop] → Repeat 200 times


4.2. Nodes de control de flux

IF Node - Condicionals

Configuration:
  Condition 1:
    Field: {{$json.amount}}
    Operation: Larger
    Value: 1000

  Condition 2:
    Field: {{$json.status}}
    Operation: Equal
    Value: pending

  Combine: AND

Output:
  true → High value pending orders
  false → Normal processing

Switch Node - Múltiples rutes

Mode: Rules

Rule 0: {{$json.country}} equals 'ES' → Route 0 (Spain)
Rule 1: {{$json.country}} equals 'FR' → Route 1 (France)
Rule 2: {{$json.country}} equals 'DE' → Route 2 (Germany)
Default: → Route 3 (Others)

Cada ruta pot tenir processament diferent

Loop Node - Iteracions

Parell Loop Over/Loop Items:

[Start] → [Loop Over Items]
          [Process Each]
          [Check Condition]
          [Loop Back if needed]
          [Loop Done] → [Continue]

Wait Node - Delays i throttling

Use case 1: Rate limiting
[HTTP Request to API]
[Wait: 1 second]
[Next HTTP Request]

Use case 2: Esperar resposta async
[Trigger Job]
[Wait: Until webhook received]
[Process Result]

Use case 3: Retry with backoff
[Try API Call]
[IF: Failed?]
  ↓ true
[Wait: 5 seconds]
[Retry]

Error Trigger - Gestió global d'errors

Workflow Principal:
  [Any node can fail]

Workflow Error Handler:
  [Error Trigger]
  [Log Error Details]
  [Send Alert to Slack]
  [Create Ticket in Jira]
  [Store in Error Database]

4.3. Nodes per a bases de dades

PostgreSQL Node

Operations disponibles:

1. Execute Query (més flexible)
Query:
  SELECT u.name, u.email, COUNT(o.id) as order_count
  FROM users u
  LEFT JOIN orders o ON u.id = o.user_id
  WHERE u.created_at > '{{$json.since_date}}'
  GROUP BY u.id
  HAVING COUNT(o.id) > {{$json.min_orders}}

2. Insert
Table: customers
Columns:
  - name: {{$json.name}}
  - email: {{$json.email}}
  - created_at: {{$now}}

3. Update
Table: orders
Column to Match On: id
Values to Set:
  - status: {{$json.new_status}}
  - updated_at: {{$now}}

4. Delete
Table: temp_data
Delete By Column: created_at
Value: < NOW() - INTERVAL '7 days'

MySQL Node

Similar a PostgreSQL però amb sintaxi MySQL:

Query parametritzada:
  SELECT * FROM products 
  WHERE category = ? 
  AND price BETWEEN ? AND ?
  AND stock > ?

Parameters:
  - {{$json.category}}
  - {{$json.min_price}}
  - {{$json.max_price}}
  - 0

MongoDB Node

Operation: Find

Collection: users
Query:
{
  "age": { "$gte": {{$json.min_age}} },
  "city": "{{$json.city}}",
  "status": "active"
}

Projection:
{
  "name": 1,
  "email": 1,
  "orders": 1,
  "_id": 0
}

Options:
  Limit: {{$json.limit || 100}}
  Sort: { "created_at": -1 }

Redis Node

Use cases:

1. Caching
  [Check Redis]
    ↓ miss
  [Fetch from DB]
  [Store in Redis: TTL 3600s]

2. Rate Limiting
  Key: api_calls:{{$json.user_id}}
  [INCR key]
  [IF > limit → Reject]
  [EXPIRE key 60]

3. Pub/Sub
  [PUBLISH channel "message"]

4.4. HTTP Request Node

Configuració completa:

Method: POST
URL: https://api.company.com/v2/users

Authentication:
  Type: Generic Credential Type
  Generic Auth Type: OAuth2

Headers:
  Content-Type: application/json
  X-API-Version: 2.0
  X-Request-ID: {{$now.toUnixInteger()}}
  Accept-Language: {{$json.locale || 'es'}}

Body:
  Body Content Type: JSON
  JSON:
  {
    "email": "{{$json.email}}",
    "name": "{{$json.name}}",
    "metadata": {
      "source": "n8n",
      "workflow_id": "{{$workflow.id}}",
      "execution_id": "{{$execution.id}}"
    }
  }

Options:
  Timeout: 30000  (30 segons)
  Redirect: Follow Redirects
  Response Format: JSON
  Ignore SSL Issues: false
  Batching: Combine with items

Gestió de paginació:

// Function Node abans d'HTTP Request
let allResults = [];
let page = 1;
const perPage = 100;

while (true) {
  // HTTP Request
  const response = await this.helpers.httpRequest({
    method: 'GET',
    url: `https://api.company.com/users?page=${page}&per_page=${perPage}`,
    headers: {
      'Authorization': `Bearer ${credentials.token}`
    }
  });

  allResults = allResults.concat(response.data);

  // Si no hi ha més pàgines, sortir
  if (response.data.length < perPage) break;

  page++;
}

return allResults.map(item => ({json: item}));

Rate Limiting:

Pattern 1: Wait between requests
[HTTP Request 1]
[Wait: 1000ms]
[HTTP Request 2]

Pattern 2: Batch amb Split In Batches
[Get 1000 IDs]
[Split In Batches: 10]
[HTTP Requests for batch]
[Wait: 2000ms]
[Loop]

Pattern 3: Token Bucket (Function Node)
const rateLimiter = {
  tokens: 10,
  maxTokens: 10,
  refillRate: 1, // tokens per second
  lastRefill: Date.now()
};

// Refill tokens
const now = Date.now();
const timePassed = (now - rateLimiter.lastRefill) / 1000;
rateLimiter.tokens = Math.min(
  rateLimiter.maxTokens,
  rateLimiter.tokens + timePassed * rateLimiter.refillRate
);
rateLimiter.lastRefill = now;

// Check if we can proceed
if (rateLimiter.tokens < 1) {
  throw new Error('Rate limit exceeded');
}

rateLimiter.tokens--;

Unitat 5: ETL i Data Pipelines amb N8N

5.1. Fonaments d'ETL

Extract, Transform, Load:

EXTRACT (E):
  ├── APIs (REST, GraphQL)
  ├── Bases de dades (PostgreSQL, MySQL, MongoDB)
  ├── Fitxers (CSV, JSON, XML, Excel)
  ├── Web scraping
  └── Streaming (Webhooks, Kafka)

TRANSFORM (T):
  ├── Neteja (nulls, duplicats, outliers)
  ├── Validació (formats, rangs, tipus)
  ├── Enriquiment (lookup, API calls)
  ├── Agregació (group by, sum, avg)
  └── Normalització (formats consistents)

LOAD (L):
  ├── Data Warehouses (BigQuery, Snowflake)
  ├── Bases de dades (PostgreSQL, MongoDB)
  ├── Data Lakes (S3, GCS)
  ├── APIs (enviar dades processades)
  └── Cache (Redis, Memcached)

N8N vs eines tradicionals:

Apache Airflow:
  ✅ Millor per a pipelines molt complexos
  ✅ Més escalable per big data massiu
  ❌ Només codi (Python DAGs)
  ❌ Configuració complexa

N8N:
  ✅ Visual + codi
  ✅ Setup ràpid (minuts vs dies)
  ✅ Ideal per SMB i mid-market
  ✅ Excel·lent per a prototipatge
  ❌ Menys adequat per a petabytes

Casos d'ús N8N en Big Data:

✅ Excel·lent per:
- Pipelines fins a 10M registres/dia
- Integracions entre 3-10 sistemes
- ETL amb transformacions mitjanes
- Prototipatge ràpid de pipelines

⚠️ Acceptable amb optimitzacions:
- 10M-50M registres/dia (amb batching)
- Transformacions pesades (Code Node + external processing)

❌ No recomanat:
- >100M registres/dia
- Streaming en temps real massiu (millor Kafka+Flink)
- Processament de fitxers >1GB in-memory

5.2. Extracció de dades

APIs REST:

Exemple complet: Extreure dades de Salesforce

[Schedule Trigger: Daily 2AM]
[HTTP Request: Salesforce Auth]
  Method: POST
  URL: https://login.salesforce.com/services/oauth2/token
  Body:
    grant_type: password
    client_id: {{$env.SF_CLIENT_ID}}
    client_secret: {{$env.SF_CLIENT_SECRET}}
    username: {{$env.SF_USERNAME}}
    password: {{$env.SF_PASSWORD}}
[Set: Extract Token]
  access_token: {{$json.access_token}}
[HTTP Request: Get Accounts]
  URL: https://{{$env.SF_INSTANCE}}.salesforce.com/services/data/v58.0/query
  Query Params:
    q: SELECT Id, Name, Industry, AnnualRevenue FROM Account WHERE LastModifiedDate > {{$json.last_sync_date}}
  Headers:
    Authorization: Bearer {{$node["Extract Token"].json.access_token}}

GraphQL APIs:

[HTTP Request]
  Method: POST
  URL: https://api.github.com/graphql
  Headers:
    Authorization: Bearer {{$credentials.github_token}}
  Body:
    {
      "query": "query { 
        repository(owner: \"n8n-io\", name: \"n8n\") {
          issues(first: 100, states: OPEN) {
            edges {
              node {
                title
                createdAt
                author { login }
                labels(first: 10) {
                  edges { node { name } }
                }
              }
            }
          }
        }
      }"
    }

Bases de dades:

Estratègia Incremental Load:

[PostgreSQL: Get Last Sync Time]
  Query: SELECT MAX(sync_timestamp) as last_sync FROM sync_log
[PostgreSQL: Get New/Modified Records]
  Query:
    SELECT * FROM customers
    WHERE updated_at > '{{$node["Get Last Sync Time"].json.last_sync}}'
    ORDER BY updated_at
    LIMIT 10000
[Transform]
[Load to DWH]
[PostgreSQL: Update Sync Log]
  Query: INSERT INTO sync_log (sync_timestamp, records_processed) 
         VALUES ('{{$now}}', {{$json.count}})

Fitxers CSV/Excel:

[Read Binary File]
  File Path: /data/uploads/sales_{{$json.date}}.csv
[Spreadsheet File]
  Operation: Read from File
  File Format: CSV
  Options:
    - Header Row: Yes
    - Delimiter: ,
    - Encoding: UTF-8
[Function: Parse and Validate]
  // Validar format dates, números, etc.

Web Scraping:

[HTTP Request]
  URL: https://example.com/products
[HTML Extract]
  Extraction Values:
    - Selector: .product-title
      Attribute: text
      Key: title
    - Selector: .product-price
      Attribute: text
      Key: price
    - Selector: .product-link
      Attribute: href
      Key: url
[Function: Clean Data]
  price: parseFloat(price.replace('€', ''))

5.3. Transformació de dades

Neteja de dades:

// Function Node: Data Cleaning
const items = $input.all();

return items
  .map(item => {
    const data = item.json;

    return {
      json: {
        // Remove nulls and empty strings
        id: data.id || null,
        name: data.name?.trim() || 'Unknown',
        email: data.email?.toLowerCase().trim() || null,

        // Normalize phone numbers
        phone: data.phone?.replace(/[^0-9+]/g, '') || null,

        // Fix dates
        created_at: data.created_at ? 
          new Date(data.created_at).toISOString() : 
          null,

        // Remove outliers (z-score method)
        amount: Math.abs(data.amount) < 1000000 ? data.amount : null,

        // Standardize categories
        category: data.category?.toLowerCase()
          .replace(/[^a-z0-9]/g, '_') || 'uncategorized'
      }
    };
  })
  .filter(item => {
    // Remove invalid records
    return item.json.id && item.json.email;
  });

Normalització:

// Normalize addresses
function normalizeAddress(addr) {
  return {
    street: addr.street?.trim(),
    city: addr.city?.trim().toUpperCase(),
    postal_code: addr.postal_code?.replace(/\s/g, ''),
    country: addr.country?.trim().toUpperCase() || 'ES'
  };
}

// Normalize currencies
function normalizeCurrency(amount, from, to = 'EUR') {
  const rates = {
    'USD': 0.92,
    'GBP': 1.17,
    'EUR': 1.00
  };

  return amount * rates[from] / rates[to];
}

Validació:

// Validation rules
const validationRules = {
  email: /^[^\s@]+@[^\s@]+\.[^\s@]+$/,
  phone: /^\+?[0-9]{9,15}$/,
  postalCode: /^[0-9]{5}$/,
  iban: /^ES[0-9]{22}$/
};

function validate(data, rules) {
  const errors = [];

  for (const [field, regex] of Object.entries(rules)) {
    if (data[field] && !regex.test(data[field])) {
      errors.push(`Invalid ${field}: ${data[field]}`);
    }
  }

  return {
    isValid: errors.length === 0,
    errors: errors
  };
}

// Apply validation
const items = $input.all();
return items.map(item => {
  const validation = validate(item.json, validationRules);

  return {
    json: {
      ...item.json,
      _validation: validation,
      _is_valid: validation.isValid
    }
  };
});

Enriquiment:

[Base Data]
[HTTP Request: Enrich with Geolocation]
  URL: https://api.ipgeolocation.io/ipgeo
  Params:
    apiKey: {{$env.GEO_API_KEY}}
    ip: {{$json.ip_address}}
[PostgreSQL: Enrich with Customer Tier]
  Query:
    SELECT tier, discount_rate 
    FROM customer_tiers 
    WHERE customer_id = {{$json.customer_id}}
[Merge: Combine All Data]
  Mode: Keep Key Matches
  Key: customer_id
[Enriched Data]

Agregacions:

// Group by and aggregate
const items = $input.all();
const grouped = {};

items.forEach(item => {
  const category = item.json.category;

  if (!grouped[category]) {
    grouped[category] = {
      category: category,
      count: 0,
      total_amount: 0,
      items: []
    };
  }

  grouped[category].count++;
  grouped[category].total_amount += item.json.amount;
  grouped[category].items.push(item.json);
});

// Calculate aggregates
return Object.values(grouped).map(group => ({
  json: {
    category: group.category,
    total_count: group.count,
    total_amount: group.total_amount,
    avg_amount: group.total_amount / group.count,
    min_amount: Math.min(...group.items.map(i => i.amount)),
    max_amount: Math.max(...group.items.map(i => i.amount))
  }
}));

5.4. Càrrega de dades

PostgreSQL/MySQL:

[Transform] (10,000 records)
[Split In Batches: 500]
[PostgreSQL: Bulk Insert]
  Operation: Insert
  Table: staging.sales_data
  Columns: Auto-map
  Options:
    - On Conflict: Update
    - Returning: id, created_at
[Loop until all loaded]

BigQuery (Data Warehouse):

[Prepare Data]
[HTTP Request: BigQuery API]
  Method: POST
  URL: https://bigquery.googleapis.com/bigquery/v2/projects/{{$env.GCP_PROJECT}}/datasets/{{$env.DATASET}}/tables/{{$env.TABLE}}/insertAll
  Headers:
    Authorization: Bearer {{$credentials.gcp_token}}
  Body:
    {
      "rows": [
        {{$json.rows.map(r => ({json: r}))}}
      ],
      "skipInvalidRows": false,
      "ignoreUnknownValues": false
    }

S3/Cloud Storage:

[Transform to CSV]
[Convert to Binary]
[S3: Upload File]
  Bucket: company-data-lake
  Path: /raw/sales/{{$now.toFormat('yyyy-MM-dd')}}/sales_data.csv
  ACL: private
  Metadata:
    - source: n8n-workflow
    - execution_id: {{$execution.id}}

Webhooks (notificar sistemes externs):

[ETL Complete]
[HTTP Request: Notify Data Warehouse]
  Method: POST
  URL: {{$env.DWH_WEBHOOK_URL}}
  Body:
    {
      "event": "etl_complete",
      "workflow": "{{$workflow.name}}",
      "records_processed": {{$json.count}},
      "execution_time_ms": {{$json.duration}},
      "timestamp": "{{$now}}"
    }

5.5. Patrons de disseny en pipelines

Incremental Loading:

[Get Last Watermark]
  Query: SELECT MAX(updated_at) as watermark FROM staging.sync_metadata
[Extract New/Modified]
  Query: SELECT * FROM source 
         WHERE updated_at > '{{$json.watermark}}'
[Transform]
[Load]
[Update Watermark]
  Query: UPDATE staging.sync_metadata 
         SET updated_at = '{{$now}}'

Change Data Capture (CDC):

[PostgreSQL: Read WAL/Replication Slot]
  or
[Webhook: Receive CDC Events]
[Parse Change Event]
  event_type: INSERT | UPDATE | DELETE
  table: customers
  data: {...}
[Switch by Event Type]
  ├─ INSERT → [Load New Record]
  ├─ UPDATE → [Update Existing]
  └─ DELETE → [Soft Delete]

Checkpointing:

// Checkpoint pattern for resilience
const checkpoint = {
  batch_id: Date.now(),
  last_processed_id: 0,
  total_processed: 0
};

// Load checkpoint if exists
try {
  const lastCheckpoint = await loadCheckpoint();
  if (lastCheckpoint) {
    checkpoint.last_processed_id = lastCheckpoint.last_processed_id;
  }
} catch (e) {}

// Process with checkpoints
while (true) {
  const batch = await fetchBatch(checkpoint.last_processed_id);
  if (batch.length === 0) break;

  await processBatch(batch);

  checkpoint.last_processed_id = batch[batch.length - 1].id;
  checkpoint.total_processed += batch.length;

  // Save checkpoint every 1000 records
  if (checkpoint.total_processed % 1000 === 0) {
    await saveCheckpoint(checkpoint);
  }
}

Idempotència:

-- Pattern 1: UPSERT with ON CONFLICT
INSERT INTO customers (id, name, email, updated_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) 
DO UPDATE SET 
  name = EXCLUDED.name,
  email = EXCLUDED.email,
  updated_at = EXCLUDED.updated_at;

-- Pattern 2: Check before insert
BEGIN;
DELETE FROM staging.temp_data WHERE batch_id = '{{$json.batch_id}}';
INSERT INTO staging.temp_data ...;
COMMIT;

-- Pattern 3: Idempotent processing key
INSERT INTO processed_events (event_id, processed_at)
VALUES ('{{$json.event_id}}', NOW())
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id;
-- If returns null, event was already processed

Gestió de grans volums:

Strategy 1: Pagination + Batching
[Get Total Count]
[Calculate Pages] (100,000 records / 1,000 per page = 100 pages)
[Loop: Page 1 to 100]
  [Fetch Page]
  [Split In Batches: 100]
  [Process Batch]
  [Load Batch]
[Next Page]

Strategy 2: Parallel Processing
[Source Data]
[Split by Key Range]
  ├─ Range 1: IDs 1-10000 → Worker 1
  ├─ Range 2: IDs 10001-20000 → Worker 2
  └─ Range 3: IDs 20001-30000 → Worker 3
[Merge Results]

Strategy 3: Streaming
[Webhook Trigger: Continuous]
[Buffer: Collect for 60s or 1000 items]
[Process Buffer]
[Load Batch]
[Repeat]

Resum Unitats 4 i 5:

Hem après: - Nodes avançats per a transformacions complexes - Control de flux sofisticat - Operacions amb bases de dades - HTTP Request avançat amb autenticació i paginació - Conceptes i implementació d'ETL amb N8N - Estratègies per extreure, transformar i carregar dades - Patrons de disseny per a pipelines robusts i escalables

Punts clau: - Function/Code nodes donen flexibilitat màxima - Split In Batches és essencial per a grans volums - ETL amb N8N és ideal per a SMB i mid-market - Patterns com checkpointing i idempotència són crítics - Rate limiting i error handling són obligatoris

Pròxims passos: Les següents unitats cobriran integracions amb IA, escalabilitat, seguretat i casos d'ús específics de Big Data.