Temari N8N - Unitats 4 i 5
Nodes Avançats i ETL amb N8N
Unitat 4: Nodes Avançats
4.1. Nodos de transformació
Function Node - JavaScript personalitzat
El Function Node permet executar codi JavaScript per a transformacions complexes.
// Exemple: Càlcul de mètriques d'usuaris
const items = $input.all();
return items.map(item => {
const user = item.json;
// Càlculs complexos
const totalSpent = user.orders.reduce((sum, order) => sum + order.total, 0);
const avgOrderValue = totalSpent / user.orders.length;
const isVIP = totalSpent > 1000;
return {
json: {
user_id: user.id,
name: user.name,
total_spent: totalSpent.toFixed(2),
avg_order_value: avgOrderValue.toFixed(2),
order_count: user.orders.length,
customer_tier: isVIP ? 'VIP' : 'Regular',
last_order_date: user.orders[user.orders.length - 1].date
}
};
});
Code Node - JavaScript o Python
Més potent que Function Node, permet importar llibreries.
# Python per a data science
import pandas as pd
import numpy as np
from datetime import datetime
# Convertir items a DataFrame
data = [item['json'] for item in $input.all()]
df = pd.DataFrame(data)
# Anàlisi amb pandas
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.month
monthly_sales = df.groupby('month')['sales'].sum()
# Statistical analysis
df['z_score'] = (df['sales'] - df['sales'].mean()) / df['sales'].std()
outliers = df[abs(df['z_score']) > 2]
# Return results
results = []
for _, row in monthly_sales.items():
results.append({
'json': {
'month': _,
'total_sales': float(row),
'outlier_count': len(outliers[outliers['month'] == _])
}
})
return results
Set Node - Manipulació ràpida
Ideal per a transformacions simples sense codi.
Configuration:
Keep Only Set: false
Values to Set:
full_name: {{$json.first_name}} {{$json.last_name}}
email_domain: {{$json.email.split('@')[1]}}
age_group: {{$json.age >= 18 ? 'Adult' : 'Minor'}}
total_with_tax: {{$json.subtotal * 1.21}}
created_date: {{$now.toISO()}}
Merge Node - Combinació de dades
Tres modes principals:
Mode 1: Append (concatenar)
Input 1: [A, B, C]
Input 2: [D, E, F]
Output: [A, B, C, D, E, F]
Mode 2: Keep Key Matches (JOIN)
Input 1: [{id:1, name:'Joan'}, {id:2, name:'Maria'}]
Input 2: [{id:1, age:30}, {id:2, age:25}]
Key: 'id'
Output: [{id:1, name:'Joan', age:30}, {id:2, name:'Maria', age:25}]
Mode 3: Remove Key Matches (ANTI-JOIN)
Input 1: [{id:1}, {id:2}, {id:3}]
Input 2: [{id:2}]
Key: 'id'
Output: [{id:1}, {id:3}]
Split In Batches - Processament per lots
Configuració:
Batch Size: 100
Reset: true
Flux:
[Source: 1000 items]
↓
[Split In Batches: 100]
↓ (Iteració 1: items 1-100)
[Process Batch]
↓
[Loop back] → (Iteració 2: items 101-200)
↓
[Repeat fins item 1000]
Exemple pràctic:
[Get All Users: 10,000 users]
↓
[Split In Batches: 50]
↓
[Enrich Each User Data] (50 at a time)
↓
[PostgreSQL: Bulk Insert] (50 at a time)
↓
[Loop] → Repeat 200 times
4.2. Nodes de control de flux
IF Node - Condicionals
Configuration:
Condition 1:
Field: {{$json.amount}}
Operation: Larger
Value: 1000
Condition 2:
Field: {{$json.status}}
Operation: Equal
Value: pending
Combine: AND
Output:
true → High value pending orders
false → Normal processing
Switch Node - Múltiples rutes
Mode: Rules
Rule 0: {{$json.country}} equals 'ES' → Route 0 (Spain)
Rule 1: {{$json.country}} equals 'FR' → Route 1 (France)
Rule 2: {{$json.country}} equals 'DE' → Route 2 (Germany)
Default: → Route 3 (Others)
Cada ruta pot tenir processament diferent
Loop Node - Iteracions
Parell Loop Over/Loop Items:
[Start] → [Loop Over Items]
↓
[Process Each]
↓
[Check Condition]
↓
[Loop Back if needed]
↓
[Loop Done] → [Continue]
Wait Node - Delays i throttling
Use case 1: Rate limiting
[HTTP Request to API]
↓
[Wait: 1 second]
↓
[Next HTTP Request]
Use case 2: Esperar resposta async
[Trigger Job]
↓
[Wait: Until webhook received]
↓
[Process Result]
Use case 3: Retry with backoff
[Try API Call]
↓
[IF: Failed?]
↓ true
[Wait: 5 seconds]
↓
[Retry]
Error Trigger - Gestió global d'errors
Workflow Principal:
[Any node can fail]
Workflow Error Handler:
[Error Trigger]
↓
[Log Error Details]
↓
[Send Alert to Slack]
↓
[Create Ticket in Jira]
↓
[Store in Error Database]
4.3. Nodes per a bases de dades
PostgreSQL Node
Operations disponibles:
1. Execute Query (més flexible)
Query:
SELECT u.name, u.email, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '{{$json.since_date}}'
GROUP BY u.id
HAVING COUNT(o.id) > {{$json.min_orders}}
2. Insert
Table: customers
Columns:
- name: {{$json.name}}
- email: {{$json.email}}
- created_at: {{$now}}
3. Update
Table: orders
Column to Match On: id
Values to Set:
- status: {{$json.new_status}}
- updated_at: {{$now}}
4. Delete
Table: temp_data
Delete By Column: created_at
Value: < NOW() - INTERVAL '7 days'
MySQL Node
Similar a PostgreSQL però amb sintaxi MySQL:
Query parametritzada:
SELECT * FROM products
WHERE category = ?
AND price BETWEEN ? AND ?
AND stock > ?
Parameters:
- {{$json.category}}
- {{$json.min_price}}
- {{$json.max_price}}
- 0
MongoDB Node
Operation: Find
Collection: users
Query:
{
"age": { "$gte": {{$json.min_age}} },
"city": "{{$json.city}}",
"status": "active"
}
Projection:
{
"name": 1,
"email": 1,
"orders": 1,
"_id": 0
}
Options:
Limit: {{$json.limit || 100}}
Sort: { "created_at": -1 }
Redis Node
Use cases:
1. Caching
[Check Redis]
↓ miss
[Fetch from DB]
↓
[Store in Redis: TTL 3600s]
2. Rate Limiting
Key: api_calls:{{$json.user_id}}
[INCR key]
[IF > limit → Reject]
[EXPIRE key 60]
3. Pub/Sub
[PUBLISH channel "message"]
4.4. HTTP Request Node
Configuració completa:
Method: POST
URL: https://api.company.com/v2/users
Authentication:
Type: Generic Credential Type
Generic Auth Type: OAuth2
Headers:
Content-Type: application/json
X-API-Version: 2.0
X-Request-ID: {{$now.toUnixInteger()}}
Accept-Language: {{$json.locale || 'es'}}
Body:
Body Content Type: JSON
JSON:
{
"email": "{{$json.email}}",
"name": "{{$json.name}}",
"metadata": {
"source": "n8n",
"workflow_id": "{{$workflow.id}}",
"execution_id": "{{$execution.id}}"
}
}
Options:
Timeout: 30000 (30 segons)
Redirect: Follow Redirects
Response Format: JSON
Ignore SSL Issues: false
Batching: Combine with items
Gestió de paginació:
// Function Node abans d'HTTP Request
let allResults = [];
let page = 1;
const perPage = 100;
while (true) {
// HTTP Request
const response = await this.helpers.httpRequest({
method: 'GET',
url: `https://api.company.com/users?page=${page}&per_page=${perPage}`,
headers: {
'Authorization': `Bearer ${credentials.token}`
}
});
allResults = allResults.concat(response.data);
// Si no hi ha més pàgines, sortir
if (response.data.length < perPage) break;
page++;
}
return allResults.map(item => ({json: item}));
Rate Limiting:
Pattern 1: Wait between requests
[HTTP Request 1]
↓
[Wait: 1000ms]
↓
[HTTP Request 2]
Pattern 2: Batch amb Split In Batches
[Get 1000 IDs]
↓
[Split In Batches: 10]
↓
[HTTP Requests for batch]
↓
[Wait: 2000ms]
↓
[Loop]
Pattern 3: Token Bucket (Function Node)
const rateLimiter = {
tokens: 10,
maxTokens: 10,
refillRate: 1, // tokens per second
lastRefill: Date.now()
};
// Refill tokens
const now = Date.now();
const timePassed = (now - rateLimiter.lastRefill) / 1000;
rateLimiter.tokens = Math.min(
rateLimiter.maxTokens,
rateLimiter.tokens + timePassed * rateLimiter.refillRate
);
rateLimiter.lastRefill = now;
// Check if we can proceed
if (rateLimiter.tokens < 1) {
throw new Error('Rate limit exceeded');
}
rateLimiter.tokens--;
Unitat 5: ETL i Data Pipelines amb N8N
5.1. Fonaments d'ETL
Extract, Transform, Load:
EXTRACT (E):
├── APIs (REST, GraphQL)
├── Bases de dades (PostgreSQL, MySQL, MongoDB)
├── Fitxers (CSV, JSON, XML, Excel)
├── Web scraping
└── Streaming (Webhooks, Kafka)
TRANSFORM (T):
├── Neteja (nulls, duplicats, outliers)
├── Validació (formats, rangs, tipus)
├── Enriquiment (lookup, API calls)
├── Agregació (group by, sum, avg)
└── Normalització (formats consistents)
LOAD (L):
├── Data Warehouses (BigQuery, Snowflake)
├── Bases de dades (PostgreSQL, MongoDB)
├── Data Lakes (S3, GCS)
├── APIs (enviar dades processades)
└── Cache (Redis, Memcached)
N8N vs eines tradicionals:
Apache Airflow:
✅ Millor per a pipelines molt complexos
✅ Més escalable per big data massiu
❌ Només codi (Python DAGs)
❌ Configuració complexa
N8N:
✅ Visual + codi
✅ Setup ràpid (minuts vs dies)
✅ Ideal per SMB i mid-market
✅ Excel·lent per a prototipatge
❌ Menys adequat per a petabytes
Casos d'ús N8N en Big Data:
✅ Excel·lent per:
- Pipelines fins a 10M registres/dia
- Integracions entre 3-10 sistemes
- ETL amb transformacions mitjanes
- Prototipatge ràpid de pipelines
⚠️ Acceptable amb optimitzacions:
- 10M-50M registres/dia (amb batching)
- Transformacions pesades (Code Node + external processing)
❌ No recomanat:
- >100M registres/dia
- Streaming en temps real massiu (millor Kafka+Flink)
- Processament de fitxers >1GB in-memory
5.2. Extracció de dades
APIs REST:
Exemple complet: Extreure dades de Salesforce
[Schedule Trigger: Daily 2AM]
↓
[HTTP Request: Salesforce Auth]
Method: POST
URL: https://login.salesforce.com/services/oauth2/token
Body:
grant_type: password
client_id: {{$env.SF_CLIENT_ID}}
client_secret: {{$env.SF_CLIENT_SECRET}}
username: {{$env.SF_USERNAME}}
password: {{$env.SF_PASSWORD}}
↓
[Set: Extract Token]
access_token: {{$json.access_token}}
↓
[HTTP Request: Get Accounts]
URL: https://{{$env.SF_INSTANCE}}.salesforce.com/services/data/v58.0/query
Query Params:
q: SELECT Id, Name, Industry, AnnualRevenue FROM Account WHERE LastModifiedDate > {{$json.last_sync_date}}
Headers:
Authorization: Bearer {{$node["Extract Token"].json.access_token}}
GraphQL APIs:
[HTTP Request]
Method: POST
URL: https://api.github.com/graphql
Headers:
Authorization: Bearer {{$credentials.github_token}}
Body:
{
"query": "query {
repository(owner: \"n8n-io\", name: \"n8n\") {
issues(first: 100, states: OPEN) {
edges {
node {
title
createdAt
author { login }
labels(first: 10) {
edges { node { name } }
}
}
}
}
}
}"
}
Bases de dades:
Estratègia Incremental Load:
[PostgreSQL: Get Last Sync Time]
Query: SELECT MAX(sync_timestamp) as last_sync FROM sync_log
↓
[PostgreSQL: Get New/Modified Records]
Query:
SELECT * FROM customers
WHERE updated_at > '{{$node["Get Last Sync Time"].json.last_sync}}'
ORDER BY updated_at
LIMIT 10000
↓
[Transform]
↓
[Load to DWH]
↓
[PostgreSQL: Update Sync Log]
Query: INSERT INTO sync_log (sync_timestamp, records_processed)
VALUES ('{{$now}}', {{$json.count}})
Fitxers CSV/Excel:
[Read Binary File]
File Path: /data/uploads/sales_{{$json.date}}.csv
↓
[Spreadsheet File]
Operation: Read from File
File Format: CSV
Options:
- Header Row: Yes
- Delimiter: ,
- Encoding: UTF-8
↓
[Function: Parse and Validate]
// Validar format dates, números, etc.
Web Scraping:
[HTTP Request]
URL: https://example.com/products
↓
[HTML Extract]
Extraction Values:
- Selector: .product-title
Attribute: text
Key: title
- Selector: .product-price
Attribute: text
Key: price
- Selector: .product-link
Attribute: href
Key: url
↓
[Function: Clean Data]
price: parseFloat(price.replace('€', ''))
5.3. Transformació de dades
Neteja de dades:
// Function Node: Data Cleaning
const items = $input.all();
return items
.map(item => {
const data = item.json;
return {
json: {
// Remove nulls and empty strings
id: data.id || null,
name: data.name?.trim() || 'Unknown',
email: data.email?.toLowerCase().trim() || null,
// Normalize phone numbers
phone: data.phone?.replace(/[^0-9+]/g, '') || null,
// Fix dates
created_at: data.created_at ?
new Date(data.created_at).toISOString() :
null,
// Remove outliers (z-score method)
amount: Math.abs(data.amount) < 1000000 ? data.amount : null,
// Standardize categories
category: data.category?.toLowerCase()
.replace(/[^a-z0-9]/g, '_') || 'uncategorized'
}
};
})
.filter(item => {
// Remove invalid records
return item.json.id && item.json.email;
});
Normalització:
// Normalize addresses
function normalizeAddress(addr) {
return {
street: addr.street?.trim(),
city: addr.city?.trim().toUpperCase(),
postal_code: addr.postal_code?.replace(/\s/g, ''),
country: addr.country?.trim().toUpperCase() || 'ES'
};
}
// Normalize currencies
function normalizeCurrency(amount, from, to = 'EUR') {
const rates = {
'USD': 0.92,
'GBP': 1.17,
'EUR': 1.00
};
return amount * rates[from] / rates[to];
}
Validació:
// Validation rules
const validationRules = {
email: /^[^\s@]+@[^\s@]+\.[^\s@]+$/,
phone: /^\+?[0-9]{9,15}$/,
postalCode: /^[0-9]{5}$/,
iban: /^ES[0-9]{22}$/
};
function validate(data, rules) {
const errors = [];
for (const [field, regex] of Object.entries(rules)) {
if (data[field] && !regex.test(data[field])) {
errors.push(`Invalid ${field}: ${data[field]}`);
}
}
return {
isValid: errors.length === 0,
errors: errors
};
}
// Apply validation
const items = $input.all();
return items.map(item => {
const validation = validate(item.json, validationRules);
return {
json: {
...item.json,
_validation: validation,
_is_valid: validation.isValid
}
};
});
Enriquiment:
[Base Data]
↓
[HTTP Request: Enrich with Geolocation]
URL: https://api.ipgeolocation.io/ipgeo
Params:
apiKey: {{$env.GEO_API_KEY}}
ip: {{$json.ip_address}}
↓
[PostgreSQL: Enrich with Customer Tier]
Query:
SELECT tier, discount_rate
FROM customer_tiers
WHERE customer_id = {{$json.customer_id}}
↓
[Merge: Combine All Data]
Mode: Keep Key Matches
Key: customer_id
↓
[Enriched Data]
Agregacions:
// Group by and aggregate
const items = $input.all();
const grouped = {};
items.forEach(item => {
const category = item.json.category;
if (!grouped[category]) {
grouped[category] = {
category: category,
count: 0,
total_amount: 0,
items: []
};
}
grouped[category].count++;
grouped[category].total_amount += item.json.amount;
grouped[category].items.push(item.json);
});
// Calculate aggregates
return Object.values(grouped).map(group => ({
json: {
category: group.category,
total_count: group.count,
total_amount: group.total_amount,
avg_amount: group.total_amount / group.count,
min_amount: Math.min(...group.items.map(i => i.amount)),
max_amount: Math.max(...group.items.map(i => i.amount))
}
}));
5.4. Càrrega de dades
PostgreSQL/MySQL:
[Transform] (10,000 records)
↓
[Split In Batches: 500]
↓
[PostgreSQL: Bulk Insert]
Operation: Insert
Table: staging.sales_data
Columns: Auto-map
Options:
- On Conflict: Update
- Returning: id, created_at
↓
[Loop until all loaded]
BigQuery (Data Warehouse):
[Prepare Data]
↓
[HTTP Request: BigQuery API]
Method: POST
URL: https://bigquery.googleapis.com/bigquery/v2/projects/{{$env.GCP_PROJECT}}/datasets/{{$env.DATASET}}/tables/{{$env.TABLE}}/insertAll
Headers:
Authorization: Bearer {{$credentials.gcp_token}}
Body:
{
"rows": [
{{$json.rows.map(r => ({json: r}))}}
],
"skipInvalidRows": false,
"ignoreUnknownValues": false
}
S3/Cloud Storage:
[Transform to CSV]
↓
[Convert to Binary]
↓
[S3: Upload File]
Bucket: company-data-lake
Path: /raw/sales/{{$now.toFormat('yyyy-MM-dd')}}/sales_data.csv
ACL: private
Metadata:
- source: n8n-workflow
- execution_id: {{$execution.id}}
Webhooks (notificar sistemes externs):
[ETL Complete]
↓
[HTTP Request: Notify Data Warehouse]
Method: POST
URL: {{$env.DWH_WEBHOOK_URL}}
Body:
{
"event": "etl_complete",
"workflow": "{{$workflow.name}}",
"records_processed": {{$json.count}},
"execution_time_ms": {{$json.duration}},
"timestamp": "{{$now}}"
}
5.5. Patrons de disseny en pipelines
Incremental Loading:
[Get Last Watermark]
Query: SELECT MAX(updated_at) as watermark FROM staging.sync_metadata
↓
[Extract New/Modified]
Query: SELECT * FROM source
WHERE updated_at > '{{$json.watermark}}'
↓
[Transform]
↓
[Load]
↓
[Update Watermark]
Query: UPDATE staging.sync_metadata
SET updated_at = '{{$now}}'
Change Data Capture (CDC):
[PostgreSQL: Read WAL/Replication Slot]
or
[Webhook: Receive CDC Events]
↓
[Parse Change Event]
event_type: INSERT | UPDATE | DELETE
table: customers
data: {...}
↓
[Switch by Event Type]
├─ INSERT → [Load New Record]
├─ UPDATE → [Update Existing]
└─ DELETE → [Soft Delete]
Checkpointing:
// Checkpoint pattern for resilience
const checkpoint = {
batch_id: Date.now(),
last_processed_id: 0,
total_processed: 0
};
// Load checkpoint if exists
try {
const lastCheckpoint = await loadCheckpoint();
if (lastCheckpoint) {
checkpoint.last_processed_id = lastCheckpoint.last_processed_id;
}
} catch (e) {}
// Process with checkpoints
while (true) {
const batch = await fetchBatch(checkpoint.last_processed_id);
if (batch.length === 0) break;
await processBatch(batch);
checkpoint.last_processed_id = batch[batch.length - 1].id;
checkpoint.total_processed += batch.length;
// Save checkpoint every 1000 records
if (checkpoint.total_processed % 1000 === 0) {
await saveCheckpoint(checkpoint);
}
}
Idempotència:
-- Pattern 1: UPSERT with ON CONFLICT
INSERT INTO customers (id, name, email, updated_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id)
DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
updated_at = EXCLUDED.updated_at;
-- Pattern 2: Check before insert
BEGIN;
DELETE FROM staging.temp_data WHERE batch_id = '{{$json.batch_id}}';
INSERT INTO staging.temp_data ...;
COMMIT;
-- Pattern 3: Idempotent processing key
INSERT INTO processed_events (event_id, processed_at)
VALUES ('{{$json.event_id}}', NOW())
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id;
-- If returns null, event was already processed
Gestió de grans volums:
Strategy 1: Pagination + Batching
[Get Total Count]
↓
[Calculate Pages] (100,000 records / 1,000 per page = 100 pages)
↓
[Loop: Page 1 to 100]
↓
[Fetch Page]
↓
[Split In Batches: 100]
↓
[Process Batch]
↓
[Load Batch]
↓
[Next Page]
Strategy 2: Parallel Processing
[Source Data]
↓
[Split by Key Range]
├─ Range 1: IDs 1-10000 → Worker 1
├─ Range 2: IDs 10001-20000 → Worker 2
└─ Range 3: IDs 20001-30000 → Worker 3
↓
[Merge Results]
Strategy 3: Streaming
[Webhook Trigger: Continuous]
↓
[Buffer: Collect for 60s or 1000 items]
↓
[Process Buffer]
↓
[Load Batch]
↓
[Repeat]
Resum Unitats 4 i 5:
Hem après: - Nodes avançats per a transformacions complexes - Control de flux sofisticat - Operacions amb bases de dades - HTTP Request avançat amb autenticació i paginació - Conceptes i implementació d'ETL amb N8N - Estratègies per extreure, transformar i carregar dades - Patrons de disseny per a pipelines robusts i escalables
Punts clau: - Function/Code nodes donen flexibilitat màxima - Split In Batches és essencial per a grans volums - ETL amb N8N és ideal per a SMB i mid-market - Patterns com checkpointing i idempotència són crítics - Rate limiting i error handling són obligatoris
Pròxims passos: Les següents unitats cobriran integracions amb IA, escalabilitat, seguretat i casos d'ús específics de Big Data.