Pràctica 4 — Workflow Multi-Agent amb LangGraph¶
📋 Fitxa de la Pràctica
🎯 Objectius¶
- Dissenyar una arquitectura multi-agent amb agents especialitzats
- Implementar el flux amb LangGraph (nodes, edges, estat compartit)
- Implementar human-in-the-loop per a accions crítiques
- Avaluar el rendiment i el cost del sistema
🏗️ L'Arquitectura: Sistema d'Anàlisi de Seguretat¶
Implementarem un sistema de 3 agents especialitzats per a anàlisi de logs de seguretat:
┌───────────────┐
INPUT │ SUPERVISOR │ OUTPUT
(fitxer log) ──►│ │──► Informe de seguretat
└──────┬────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌──────────────┐
│ ANALITZADOR │ │ CERCADOR│ │ REDACTOR │
│ de Logs │ │ de CVEs │ │ d'Informe │
└──────────────┘ └──────────┘ └──────────────┘
Llegeix i classifica Busca CVEs Genera l'informe
events sospitosos per als final en Markdown
del log patrons trobats
💻 Implementació Completa¶
"""
Pràctica 4 — Sistema Multi-Agent: Anàlisi de Seguretat
pip install langgraph langchain langchain-openai langchain-community duckduckgo-search
"""
from typing import TypedDict, Annotated, Sequence, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain.tools import tool, DuckDuckGoSearchResults
from langchain.memory import ConversationBufferMemory
from dotenv import load_dotenv
import operator
import json
import re
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# ── ESTAT COMPARTIT ───────────────────────────────────────────
class SecurityAnalysisState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
log_content: str # Contingut del log analitzat
events_sospitosos: list # Llista d'events detectats
cves_rellevants: list # CVEs relacionades
next_agent: str # Pròxim agent a actuar
iteration: int # Comptador d'iteracions
final_report: Optional[str] # Informe final generat
# ── EINES ─────────────────────────────────────────────────────
search_tool = DuckDuckGoSearchResults(num_results=3)
@tool
def analitzar_log_auth(log_text: str) -> str:
"""
Analitza un log de autenticació (SSH, sudo, auth.log) i detecta
events sospitosos: brute force, escalada de privilegis, accesos no autoritzats.
Args:
log_text: Contingut del fitxer de log (màxim 5000 caràcters)
"""
log_text = log_text[:5000] # Limitar per seguretat
patterns = {
"brute_force_ssh": r"Failed password for .+ from (\d+\.\d+\.\d+\.\d+)",
"auth_success": r"Accepted \w+ for .+ from (\d+\.\d+\.\d+\.\d+)",
"sudo_usage": r"sudo: .+ COMMAND=(.+)",
"invalid_user": r"Invalid user (\w+) from (\d+\.\d+\.\d+\.\d+)",
"root_login": r"session opened for user root",
}
findings = {}
for name, pattern in patterns.items():
matches = re.findall(pattern, log_text)
if matches:
findings[name] = matches
if not findings:
return "No s'han detectat patterns sospitosos al log"
result = ["Events detectats:"]
for event_type, matches in findings.items():
result.append(f"\n{event_type.upper().replace('_',' ')}:")
for m in matches[:5]: # Limitar a 5 per tipus
result.append(f" → {m}")
if len(matches) > 5:
result.append(f" ... i {len(matches)-5} més")
return "\n".join(result)
@tool
def buscar_cve(tecnologia_i_versio: str) -> str:
"""
Busca CVEs recents per a una tecnologia i versió específiques.
Args:
tecnologia_i_versio: Ex: "OpenSSH 8.9", "Apache 2.4.51", "Linux kernel 5.15"
"""
query = f"CVE {tecnologia_i_versio} vulnerability 2024 security"
results = search_tool.invoke(query)
return f"CVEs trobades per '{tecnologia_i_versio}':\n{results}"
# ── NODE SUPERVISOR ───────────────────────────────────────────
SUPERVISOR_PROMPT = """Ets el supervisor d'un equip d'anàlisi de seguretat.
El teu equip:
- "analitzador": Analitza logs cercant events sospitosos
- "cercador_cve": Busca CVEs relacionades amb les amenaces detectades
- "redactor": Genera l'informe final de seguretat
Basant-te en l'estat actual, quin agent ha d'actuar ara?
- Si no s'ha analitzat el log → "analitzador"
- Si hi ha events però no s'han cercat CVEs → "cercador_cve"
- Si hi ha CVEs i volem l'informe → "redactor"
- Si l'informe ja està generat → "FINAL"
Respon ÚNICAMENT amb: analitzador | cercador_cve | redactor | FINAL"""
def supervisor_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
events = state.get("events_sospitosos", [])
cves = state.get("cves_rellevants", [])
final = state.get("final_report")
iteration = state.get("iteration", 0)
if iteration > 6: # Safeguard contra bucles infinits
return {**state, "next_agent": "FINAL", "iteration": iteration + 1}
context = f"""
Log analitzat: {'Sí' if state.get('log_content') else 'No'}
Events sospitosos trobats: {len(events)}
CVEs cercades: {len(cves)}
Informe generat: {'Sí' if final else 'No'}
"""
response = llm.invoke([
{"role": "system", "content": SUPERVISOR_PROMPT},
{"role": "user", "content": context}
])
next_agent = response.content.strip().lower()
if next_agent not in {"analitzador", "cercador_cve", "redactor"}:
next_agent = "FINAL"
return {
**state,
"next_agent": next_agent,
"iteration": iteration + 1,
"messages": [AIMessage(content=f"Supervisor → {next_agent}")]
}
# ── NODE ANALITZADOR ──────────────────────────────────────────
def analitzador_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
log = state.get("log_content", "")
if not log:
return {**state, "events_sospitosos": [],
"messages": [AIMessage(content="Analitzador: No hi ha log per analitzar")]}
# Usar el LLM + l'eina per analitzar
llm_tools = llm.bind_tools([analitzar_log_auth])
response = llm_tools.invoke([
{"role": "system", "content": "Analitza el log cercant events de seguretat."},
{"role": "user", "content": f"Analitza aquest log:\n{log[:3000]}"}
])
events = []
if response.tool_calls:
for tc in response.tool_calls:
if tc["name"] == "analitzar_log_auth":
result = analitzar_log_auth.invoke(tc["args"]["log_text"])
events.append(result)
else:
events.append(response.content)
return {
**state,
"events_sospitosos": events,
"messages": [AIMessage(content=f"Analitzador: {len(events)} tipus d'events detectats")]
}
# ── NODE CERCADOR CVE ─────────────────────────────────────────
def cercador_cve_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
events = state.get("events_sospitosos", [])
if not events:
return {**state, "cves_rellevants": [],
"messages": [AIMessage(content="Cercador CVE: Sense events per analitzar")]}
# Identificar tecnologies a cercar
events_text = "\n".join(str(e) for e in events)
response = llm.invoke([
{"role": "system", "content": "Identifica les tecnologies i versions dels events de seguretat. "
"Retorna un JSON: [{\"tecnologia\": \"...\", \"versio\": \"...\"}]"},
{"role": "user", "content": f"Events:\n{events_text[:1000]}"}
])
cves = []
try:
tecnologies = json.loads(response.content)
for tech in tecnologies[:2]: # Limitar a 2 per cost
query = f"{tech.get('tecnologia', '')} {tech.get('versio', '')}".strip()
if query:
result = buscar_cve.invoke(query)
cves.append({"tecnologia": query, "cves": result})
except (json.JSONDecodeError, Exception):
cves.append({"tecnologia": "general", "cves": str(response.content)})
return {
**state,
"cves_rellevants": cves,
"messages": [AIMessage(content=f"Cercador CVE: {len(cves)} cerques completades")]
}
# ── NODE REDACTOR ─────────────────────────────────────────────
def redactor_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
events = state.get("events_sospitosos", [])
cves = state.get("cves_rellevants", [])
context = f"""
EVENTS SOSPITOSOS:
{json.dumps(events, ensure_ascii=False, indent=2)[:2000]}
CVEs RELLEVANTS:
{json.dumps(cves, ensure_ascii=False, indent=2)[:2000]}
"""
response = llm.invoke([
{"role": "system", "content": """Ets un expert en ciberseguretat.
Genera un informe de seguretat en Markdown amb:
# Informe d'Anàlisi de Seguretat
## Resum Executiu
## Events Detectats
## Vulnerabilitats (CVEs)
## Recomanacions i Pla d'Acció
## Prioritat de Resposta (Alta/Mitja/Baixa)
Redacta en català."""},
{"role": "user", "content": context}
])
return {
**state,
"final_report": response.content,
"messages": [AIMessage(content="Redactor: Informe generat")]
}
# ── CONSTRUIR EL GRAF ─────────────────────────────────────────
def route_supervisor(state: SecurityAnalysisState) -> str:
return state.get("next_agent", "FINAL")
workflow = StateGraph(SecurityAnalysisState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("analitzador", analitzador_node)
workflow.add_node("cercador_cve", cercador_cve_node)
workflow.add_node("redactor", redactor_node)
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route_supervisor, {
"analitzador": "analitzador",
"cercador_cve": "cercador_cve",
"redactor": "redactor",
"FINAL": END
})
workflow.add_edge("analitzador", "supervisor")
workflow.add_edge("cercador_cve", "supervisor")
workflow.add_edge("redactor", "supervisor")
app = workflow.compile()
# ── EXECUTAR AMB UN LOG DE PROVA ──────────────────────────────
LOG_PROVA = """
Jan 15 10:23:15 server sshd[1234]: Failed password for root from 192.168.1.100 port 22342 ssh2
Jan 15 10:23:16 server sshd[1234]: Failed password for root from 192.168.1.100 port 22343 ssh2
Jan 15 10:23:17 server sshd[1234]: Failed password for root from 192.168.1.100 port 22344 ssh2
Jan 15 10:23:18 server sshd[1234]: Failed password for admin from 192.168.1.100 port 22345 ssh2
Jan 15 10:30:45 server sshd[1245]: Accepted password for deploy from 10.0.0.5 port 56789 ssh2
Jan 15 10:31:02 server sudo: deploy : TTY=pts/1 ; PWD=/home/deploy ; USER=root ; COMMAND=/bin/bash
Jan 15 10:31:05 server sudo: deploy : TTY=pts/1 ; PWD=/ ; USER=root ; COMMAND=/usr/bin/cat /etc/shadow
"""
if __name__ == "__main__":
print("🚀 Iniciant anàlisi multi-agent de seguretat...")
initial_state = {
"messages": [HumanMessage(content="Analitza el log de seguretat i genera un informe")],
"log_content": LOG_PROVA,
"events_sospitosos": [],
"cves_rellevants": [],
"next_agent": "",
"iteration": 0,
"final_report": None
}
result = app.invoke(initial_state)
print("\n" + "="*60)
print("📄 INFORME FINAL:")
print("="*60)
print(result["final_report"])
# Guardar l'informe
with open("informe_seguretat.md", "w", encoding="utf-8") as f:
f.write(result["final_report"])
print("\n✅ Informe guardat a informe_seguretat.md")
📊 Criteris d'Avaluació¶
| Criteri | Pes | Indicadors |
|---|---|---|
| Arquitectura multi-agent implementada | 30% | Supervisor + ≥3 workers, comunicació via estat |
| LangGraph complet (graf, nodes, edges) | 25% | Graf visualitzat, edges condicionals, cicles |
| Qualitat dels agents especialitzats | 25% | Cada agent fa bé la seva tasca |
| Human-in-the-loop (opcional +10%) | 10% | Almenys un punt d'aprovació humana |
| Documentació i diagrama | 10% | Diagrama del graf, README, informe de resultats |
Bonus
Afegeix interrupt_before=["redactor"] i implementa un pas d'aprovació humana abans de generar l'informe final, on l'humà pot afegir notes addicionals.