Salta el contingut

Pràctica 4 — Workflow Multi-Agent amb LangGraph

📋 Fitxa de la Pràctica

⏱️ Durada: 5 hores 🎯 RA: RA3 (CA 3.1, CA 3.2, CA 3.3, CA 3.4) 📊 Pes: 25% de la nota pràctica 🕸️ LangGraph 0.2 + LangChain 0.3

🎯 Objectius

  • Dissenyar una arquitectura multi-agent amb agents especialitzats
  • Implementar el flux amb LangGraph (nodes, edges, estat compartit)
  • Implementar human-in-the-loop per a accions crítiques
  • Avaluar el rendiment i el cost del sistema

🏗️ L'Arquitectura: Sistema d'Anàlisi de Seguretat

Implementarem un sistema de 3 agents especialitzats per a anàlisi de logs de seguretat:

                    ┌───────────────┐
    INPUT           │   SUPERVISOR  │          OUTPUT
    (fitxer log) ──►│               │──► Informe de seguretat
                    └──────┬────────┘
              ┌────────────┼────────────┐
              ▼            ▼            ▼
     ┌──────────────┐  ┌──────────┐  ┌──────────────┐
     │  ANALITZADOR │  │  CERCADOR│  │   REDACTOR   │
     │  de Logs     │  │  de CVEs │  │   d'Informe  │
     └──────────────┘  └──────────┘  └──────────────┘
     Llegeix i classifica  Busca CVEs     Genera l'informe
     events sospitosos     per als        final en Markdown
     del log               patrons trobats

💻 Implementació Completa

"""
Pràctica 4 — Sistema Multi-Agent: Anàlisi de Seguretat
pip install langgraph langchain langchain-openai langchain-community duckduckgo-search
"""

from typing import TypedDict, Annotated, Sequence, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain.tools import tool, DuckDuckGoSearchResults
from langchain.memory import ConversationBufferMemory
from dotenv import load_dotenv
import operator
import json
import re

load_dotenv()

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# ── ESTAT COMPARTIT ───────────────────────────────────────────

class SecurityAnalysisState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    log_content: str              # Contingut del log analitzat
    events_sospitosos: list       # Llista d'events detectats
    cves_rellevants: list         # CVEs relacionades
    next_agent: str               # Pròxim agent a actuar
    iteration: int                # Comptador d'iteracions
    final_report: Optional[str]   # Informe final generat

# ── EINES ─────────────────────────────────────────────────────

search_tool = DuckDuckGoSearchResults(num_results=3)

@tool
def analitzar_log_auth(log_text: str) -> str:
    """
    Analitza un log de autenticació (SSH, sudo, auth.log) i detecta
    events sospitosos: brute force, escalada de privilegis, accesos no autoritzats.

    Args:
        log_text: Contingut del fitxer de log (màxim 5000 caràcters)
    """
    log_text = log_text[:5000]  # Limitar per seguretat

    patterns = {
        "brute_force_ssh": r"Failed password for .+ from (\d+\.\d+\.\d+\.\d+)",
        "auth_success":    r"Accepted \w+ for .+ from (\d+\.\d+\.\d+\.\d+)",
        "sudo_usage":      r"sudo: .+ COMMAND=(.+)",
        "invalid_user":    r"Invalid user (\w+) from (\d+\.\d+\.\d+\.\d+)",
        "root_login":      r"session opened for user root",
    }

    findings = {}
    for name, pattern in patterns.items():
        matches = re.findall(pattern, log_text)
        if matches:
            findings[name] = matches

    if not findings:
        return "No s'han detectat patterns sospitosos al log"

    result = ["Events detectats:"]
    for event_type, matches in findings.items():
        result.append(f"\n{event_type.upper().replace('_',' ')}:")
        for m in matches[:5]:  # Limitar a 5 per tipus
            result.append(f"  → {m}")
        if len(matches) > 5:
            result.append(f"  ... i {len(matches)-5} més")

    return "\n".join(result)


@tool
def buscar_cve(tecnologia_i_versio: str) -> str:
    """
    Busca CVEs recents per a una tecnologia i versió específiques.

    Args:
        tecnologia_i_versio: Ex: "OpenSSH 8.9", "Apache 2.4.51", "Linux kernel 5.15"
    """
    query = f"CVE {tecnologia_i_versio} vulnerability 2024 security"
    results = search_tool.invoke(query)
    return f"CVEs trobades per '{tecnologia_i_versio}':\n{results}"


# ── NODE SUPERVISOR ───────────────────────────────────────────

SUPERVISOR_PROMPT = """Ets el supervisor d'un equip d'anàlisi de seguretat.
El teu equip:
- "analitzador": Analitza logs cercant events sospitosos
- "cercador_cve": Busca CVEs relacionades amb les amenaces detectades
- "redactor": Genera l'informe final de seguretat

Basant-te en l'estat actual, quin agent ha d'actuar ara?
- Si no s'ha analitzat el log → "analitzador"
- Si hi ha events però no s'han cercat CVEs → "cercador_cve"
- Si hi ha CVEs i volem l'informe → "redactor"
- Si l'informe ja està generat → "FINAL"

Respon ÚNICAMENT amb: analitzador | cercador_cve | redactor | FINAL"""

def supervisor_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
    events     = state.get("events_sospitosos", [])
    cves       = state.get("cves_rellevants", [])
    final      = state.get("final_report")
    iteration  = state.get("iteration", 0)

    if iteration > 6:  # Safeguard contra bucles infinits
        return {**state, "next_agent": "FINAL", "iteration": iteration + 1}

    context = f"""
    Log analitzat: {'Sí' if state.get('log_content') else 'No'}
    Events sospitosos trobats: {len(events)}
    CVEs cercades: {len(cves)}
    Informe generat: {'Sí' if final else 'No'}
    """

    response = llm.invoke([
        {"role": "system", "content": SUPERVISOR_PROMPT},
        {"role": "user",   "content": context}
    ])

    next_agent = response.content.strip().lower()
    if next_agent not in {"analitzador", "cercador_cve", "redactor"}:
        next_agent = "FINAL"

    return {
        **state,
        "next_agent": next_agent,
        "iteration": iteration + 1,
        "messages": [AIMessage(content=f"Supervisor → {next_agent}")]
    }


# ── NODE ANALITZADOR ──────────────────────────────────────────

def analitzador_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
    log = state.get("log_content", "")
    if not log:
        return {**state, "events_sospitosos": [],
                "messages": [AIMessage(content="Analitzador: No hi ha log per analitzar")]}

    # Usar el LLM + l'eina per analitzar
    llm_tools = llm.bind_tools([analitzar_log_auth])
    response = llm_tools.invoke([
        {"role": "system", "content": "Analitza el log cercant events de seguretat."},
        {"role": "user",   "content": f"Analitza aquest log:\n{log[:3000]}"}
    ])

    events = []
    if response.tool_calls:
        for tc in response.tool_calls:
            if tc["name"] == "analitzar_log_auth":
                result = analitzar_log_auth.invoke(tc["args"]["log_text"])
                events.append(result)
    else:
        events.append(response.content)

    return {
        **state,
        "events_sospitosos": events,
        "messages": [AIMessage(content=f"Analitzador: {len(events)} tipus d'events detectats")]
    }


# ── NODE CERCADOR CVE ─────────────────────────────────────────

def cercador_cve_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
    events = state.get("events_sospitosos", [])
    if not events:
        return {**state, "cves_rellevants": [],
                "messages": [AIMessage(content="Cercador CVE: Sense events per analitzar")]}

    # Identificar tecnologies a cercar
    events_text = "\n".join(str(e) for e in events)

    response = llm.invoke([
        {"role": "system", "content": "Identifica les tecnologies i versions dels events de seguretat. "
                                      "Retorna un JSON: [{\"tecnologia\": \"...\", \"versio\": \"...\"}]"},
        {"role": "user",   "content": f"Events:\n{events_text[:1000]}"}
    ])

    cves = []
    try:
        tecnologies = json.loads(response.content)
        for tech in tecnologies[:2]:  # Limitar a 2 per cost
            query = f"{tech.get('tecnologia', '')} {tech.get('versio', '')}".strip()
            if query:
                result = buscar_cve.invoke(query)
                cves.append({"tecnologia": query, "cves": result})
    except (json.JSONDecodeError, Exception):
        cves.append({"tecnologia": "general", "cves": str(response.content)})

    return {
        **state,
        "cves_rellevants": cves,
        "messages": [AIMessage(content=f"Cercador CVE: {len(cves)} cerques completades")]
    }


# ── NODE REDACTOR ─────────────────────────────────────────────

def redactor_node(state: SecurityAnalysisState) -> SecurityAnalysisState:
    events = state.get("events_sospitosos", [])
    cves   = state.get("cves_rellevants", [])

    context = f"""
    EVENTS SOSPITOSOS:
    {json.dumps(events, ensure_ascii=False, indent=2)[:2000]}

    CVEs RELLEVANTS:
    {json.dumps(cves, ensure_ascii=False, indent=2)[:2000]}
    """

    response = llm.invoke([
        {"role": "system", "content": """Ets un expert en ciberseguretat. 
        Genera un informe de seguretat en Markdown amb:
        # Informe d'Anàlisi de Seguretat
        ## Resum Executiu
        ## Events Detectats
        ## Vulnerabilitats (CVEs)
        ## Recomanacions i Pla d'Acció
        ## Prioritat de Resposta (Alta/Mitja/Baixa)
        Redacta en català."""},
        {"role": "user", "content": context}
    ])

    return {
        **state,
        "final_report": response.content,
        "messages": [AIMessage(content="Redactor: Informe generat")]
    }


# ── CONSTRUIR EL GRAF ─────────────────────────────────────────

def route_supervisor(state: SecurityAnalysisState) -> str:
    return state.get("next_agent", "FINAL")

workflow = StateGraph(SecurityAnalysisState)
workflow.add_node("supervisor",    supervisor_node)
workflow.add_node("analitzador",   analitzador_node)
workflow.add_node("cercador_cve",  cercador_cve_node)
workflow.add_node("redactor",      redactor_node)

workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route_supervisor, {
    "analitzador":  "analitzador",
    "cercador_cve": "cercador_cve",
    "redactor":     "redactor",
    "FINAL":        END
})
workflow.add_edge("analitzador",   "supervisor")
workflow.add_edge("cercador_cve",  "supervisor")
workflow.add_edge("redactor",      "supervisor")

app = workflow.compile()


# ── EXECUTAR AMB UN LOG DE PROVA ──────────────────────────────

LOG_PROVA = """
Jan 15 10:23:15 server sshd[1234]: Failed password for root from 192.168.1.100 port 22342 ssh2
Jan 15 10:23:16 server sshd[1234]: Failed password for root from 192.168.1.100 port 22343 ssh2
Jan 15 10:23:17 server sshd[1234]: Failed password for root from 192.168.1.100 port 22344 ssh2
Jan 15 10:23:18 server sshd[1234]: Failed password for admin from 192.168.1.100 port 22345 ssh2
Jan 15 10:30:45 server sshd[1245]: Accepted password for deploy from 10.0.0.5 port 56789 ssh2
Jan 15 10:31:02 server sudo: deploy : TTY=pts/1 ; PWD=/home/deploy ; USER=root ; COMMAND=/bin/bash
Jan 15 10:31:05 server sudo: deploy : TTY=pts/1 ; PWD=/ ; USER=root ; COMMAND=/usr/bin/cat /etc/shadow
"""

if __name__ == "__main__":
    print("🚀 Iniciant anàlisi multi-agent de seguretat...")

    initial_state = {
        "messages": [HumanMessage(content="Analitza el log de seguretat i genera un informe")],
        "log_content": LOG_PROVA,
        "events_sospitosos": [],
        "cves_rellevants": [],
        "next_agent": "",
        "iteration": 0,
        "final_report": None
    }

    result = app.invoke(initial_state)

    print("\n" + "="*60)
    print("📄 INFORME FINAL:")
    print("="*60)
    print(result["final_report"])

    # Guardar l'informe
    with open("informe_seguretat.md", "w", encoding="utf-8") as f:
        f.write(result["final_report"])
    print("\n✅ Informe guardat a informe_seguretat.md")

📊 Criteris d'Avaluació

Criteri Pes Indicadors
Arquitectura multi-agent implementada 30% Supervisor + ≥3 workers, comunicació via estat
LangGraph complet (graf, nodes, edges) 25% Graf visualitzat, edges condicionals, cicles
Qualitat dels agents especialitzats 25% Cada agent fa bé la seva tasca
Human-in-the-loop (opcional +10%) 10% Almenys un punt d'aprovació humana
Documentació i diagrama 10% Diagrama del graf, README, informe de resultats

Bonus

Afegeix interrupt_before=["redactor"] i implementa un pas d'aprovació humana abans de generar l'informe final, on l'humà pot afegir notes addicionals.