---
name: langgraph-workflows
description: Design and implement multi-agent workflows with LangGraph 0.2+ - state management, supervisor-worker patterns, conditional routing, and fault-tolerant checkpointing
version: 2.0.0
author: YG Starter Template
tags: [langgraph, workflows, multi-agent, state-management, checkpointing, 2025]
---

# LangGraph Workflows

**Master multi-agent workflow orchestration with LangGraph 0.2+**

## Overview

LangGraph is a library for building stateful, multi-agent workflows as directed graphs. This skill covers production patterns for building complex AI workflows with fault tolerance, checkpointing, and observability.

**Real-World Use Cases:**
- **Multi-Agent Code Review**: Security, performance, style, and test coverage agents
- **E-commerce Product Enrichment**: Image classification, attribute extraction, SEO optimization
- **Customer Support Routing**: Intent classification, priority scoring, agent assignment
- **Document Processing Pipeline**: OCR, entity extraction, summarization, QA validation
- **Research Assistant**: Query expansion, retrieval, synthesis, fact-checking

**When to use this skill:**
- Building multi-step AI workflows with agent coordination
- Implementing supervisor-worker patterns (one agent routes to specialists)
- Creating fault-tolerant workflows with checkpointing
- Managing complex state across multiple LLM calls
- Conditional routing based on workflow state

**When NOT to use this skill:**
- Single-agent tasks (use simple LangChain chains)
- Stateless API calls (no need for graph complexity)
- Simple sequential pipelines (LangChain LCEL is simpler)

---

## Core Concepts

### 1. State Management

LangGraph workflows operate on **shared state** passed between nodes.

**Two State Approaches:**

```python
# Approach 1: TypedDict (simple, type-safe)
from typing import TypedDict, Annotated
from operator import add

class WorkflowState(TypedDict):
    input: str
    output: str
    agent_responses: Annotated[list[dict], add]  # List accumulates
    metadata: dict

# Approach 2: Pydantic (validation, complex logic)
from pydantic import BaseModel, Field

class WorkflowState(BaseModel):
    input: str = Field(description="User input")
    output: str = ""
    agent_responses: list[dict] = Field(default_factory=list)

    def add_response(self, agent: str, result: str):
        self.agent_responses.append({"agent": agent, "result": result})
```

**Real-World Example: Code Review Pipeline**
```python
class CodeReviewState(TypedDict):
    repository: str
    pull_request_id: int
    code_diff: str

    # Agent outputs (each agent adds to these)
    security_findings: Annotated[list[SecurityIssue], add]
    performance_issues: Annotated[list[PerformanceWarning], add]
    style_violations: Annotated[list[StyleViolation], add]
    test_coverage_gaps: Annotated[list[CoverageGap], add]

    # Control flow
    current_agent: str
    agents_completed: list[str]
    quality_passed: bool
    requires_human_review: bool
```

**Key Pattern: `Annotated[list[T], add]`**
- Without `add`: Each node replaces the list
- With `add`: Each node appends to the list
- Critical for multi-agent accumulation!

---

### 2. Supervisor-Worker Pattern

The most common multi-agent pattern: **one supervisor routes to specialized workers**.

```python
from langgraph.graph import StateGraph, END

# Define nodes
def supervisor(state: WorkflowState) -> WorkflowState:
    """Route to next worker based on state."""
    if state["needs_analysis"]:
        state["next"] = "analyzer"
    elif state["needs_validation"]:
        state["next"] = "validator"
    else:
        state["next"] = END
    return state

def analyzer(state: WorkflowState) -> WorkflowState:
    """Specialized analysis worker."""
    result = analyze(state["input"])
    state["results"].append(result)
    return state

# Build graph
workflow = StateGraph(WorkflowState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("analyzer", analyzer)
workflow.add_node("validator", validator)

# Supervisor routes dynamically
workflow.add_conditional_edges(
    "supervisor",
    lambda s: s["next"],  # Route based on state
    {
        "analyzer": "analyzer",
        "validator": "validator",
        END: END
    }
)

# Workers return to supervisor
workflow.add_edge("analyzer", "supervisor")
workflow.add_edge("validator", "supervisor")

workflow.set_entry_point("supervisor")
app = workflow.compile()
```

**Production Example: Code Review Supervisor**
```python
# app/workflows/code_review_workflow.py
def supervisor_node(state: CodeReviewState) -> CodeReviewState:
    """Route to next available review agent."""
    completed = set(state["agents_completed"])
    available_agents = [a for a in ALL_REVIEW_AGENTS if a not in completed]

    if not available_agents:
        state["next"] = "quality_gate"
    else:
        # Priority-based routing (security first, then performance, etc.)
        state["next"] = available_agents[0]

    return state

# Specialist review agents
REVIEW_AGENTS = [
    "security_scanner",      # OWASP Top 10, CVE detection
    "performance_analyzer",  # N+1 queries, algorithmic complexity
    "style_checker",         # ESLint, Prettier, PEP8
    "test_coverage",         # Missing tests, assertions
    "documentation_review",  # Docstrings, READMEs
    "dependency_audit"       # Outdated libs, license compliance
]

for agent_name in REVIEW_AGENTS:
    workflow.add_node(agent_name, create_review_agent(agent_name))
    workflow.add_edge(agent_name, "supervisor")  # Return to supervisor
```

**Benefits:**
- Easy to add/remove agents (just modify routing logic)
- Centralized coordination (supervisor sees all state)
- Parallel execution possible (if agents independent)

---

### 3. Conditional Routing

**Conditional edges** let you route dynamically based on state.

```python
def route_based_on_quality(state: WorkflowState) -> str:
    """Decide next step based on quality score."""
    if state["quality_score"] >= 0.8:
        return "publish"
    elif state["retry_count"] < 3:
        return "retry"
    else:
        return "manual_review"

workflow.add_conditional_edges(
    "quality_check",
    route_based_on_quality,
    {
        "publish": "publish_node",
        "retry": "generator",
        "manual_review": "review_queue"
    }
)
```

**this project Example: Quality Gate**
```python
def route_after_quality_gate(state: AnalysisState) -> str:
    """Route based on quality gate result."""
    if state["quality_passed"]:
        return "compress_findings"  # Success path
    elif state["retry_count"] < 2:
        return "supervisor"  # Retry with more agents
    else:
        return END  # Failed, return partial results

workflow.add_conditional_edges(
    "quality_gate",
    route_after_quality_gate,
    {
        "compress_findings": "compress_findings",
        "supervisor": "supervisor",
        END: END
    }
)
```

**Routing Patterns:**
- **Sequential:** `A -> B -> C` (simple edges)
- **Branching:** `A -> (B or C)` (conditional edges)
- **Looping:** `A -> B -> A` (retry logic)
- **Convergence:** `(A or B) -> C` (multiple inputs, one output)

---

### 4. Checkpointing & Persistence

**Problem:** If a workflow crashes mid-execution, you lose all progress.

**Solution:** LangGraph checkpointing saves state after each node.

```python
from langgraph.checkpoint import MemorySaver, SqliteSaver

# In-memory (development)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# Persistent (production) - SQLite
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=checkpointer)

# Persistent (production) - PostgreSQL
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)
```

**Using Checkpoints:**
```python
# Start new workflow
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(initial_state, config=config)

# Resume interrupted workflow
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(None, config=config)  # Resumes from last checkpoint
```

**this project Checkpointing:**
```python
# backend/app/workflows/checkpoints.py
from langgraph.checkpoint.postgres import PostgresSaver

def create_checkpointer():
    """Create PostgreSQL checkpointer for production."""
    return PostgresSaver.from_conn_string(
        settings.DATABASE_URL,
        # Save after each agent completes
        save_every=1
    )

# Compile with checkpointing
app = workflow.compile(
    checkpointer=create_checkpointer(),
    interrupt_before=["quality_gate"]  # Manual review point
)

# Resume after crash
result = app.invoke(
    None,
    config={"configurable": {"thread_id": analysis_id}}
)
```

**Benefits:**
- **Fault tolerance:** Resume after crashes
- **Human-in-the-loop:** Pause for approval (`interrupt_before`)
- **Debugging:** Inspect state at each checkpoint
- **Cost savings:** Don't re-run expensive LLM calls

---

## 5. Integration with Langfuse

**LangGraph + Langfuse = Full Observability**

```python
from langfuse.decorators import observe, langfuse_context
from langfuse import Langfuse

langfuse = Langfuse()

@observe()  # Traces entire workflow
def run_analysis_workflow(url: str):
    """Run LangGraph workflow with Langfuse tracing."""

    # Set trace metadata
    langfuse_context.update_current_trace(
        name="content_analysis",
        metadata={"url": url},
        tags=["langgraph", "multi-agent"]
    )

    # Compile workflow
    app = workflow.compile(checkpointer=checkpointer)

    # Each node is automatically traced as a span
    result = app.invoke({"url": url})

    # Log final metrics
    langfuse_context.update_current_observation(
        output=result,
        metadata={"agents_used": len(result["agents_completed"])}
    )

    return result

# Node-level tracing
@observe(as_type="generation")  # Mark as LLM call
def security_agent_node(state: AnalysisState):
    """Security analysis agent."""
    langfuse_context.update_current_observation(
        name="security_agent",
        input=state["raw_content"][:200]  # First 200 chars
    )

    result = security_agent.analyze(state["raw_content"])

    langfuse_context.update_current_observation(
        output=result,
        usage={
            "input_tokens": result["usage"]["input_tokens"],
            "output_tokens": result["usage"]["output_tokens"]
        }
    )

    state["findings"].append(result)
    state["agents_completed"].append("security")
    return state
```

**Langfuse Dashboard Shows:**
- Full workflow execution graph
- Per-node latency and costs
- Token usage by agent
- Failed nodes and retry attempts
- State at each checkpoint

---

## this project's 8-Agent Analysis Pipeline

**Architecture:**
```
User Content
    ↓
[Supervisor] → Routes to 8 specialist agents
    ↓
[Security Agent]  ──┐
[Tech Comparator] ──┤
[Implementation]  ──┤
[Tutorial]        ──┼→ [Supervisor] → [Quality Gate]
[Depth Analyzer]  ──┤                        ↓
[Prerequisites]   ──┤                   Pass: Compress
[Best Practices]  ──┤                   Fail: Retry or END
[Code Examples]   ──┘
```

**State Schema:**
```python
class Finding(BaseModel):
    agent: str
    category: str
    content: str
    confidence: float

class AnalysisState(TypedDict):
    # Input
    url: str
    raw_content: str

    # Agent outputs
    findings: Annotated[list[Finding], add]
    embeddings: Annotated[list[Embedding], add]

    # Control flow
    current_agent: str
    agents_completed: list[str]
    next: str

    # Quality control
    quality_score: float
    quality_passed: bool
    retry_count: int

    # Final output
    compressed_summary: str
    artifact: dict
```

**Key Design Decisions:**
1. **Supervisor pattern:** Centralized routing, easy to modify agent list
2. **Accumulating state:** `Annotated[list[T], add]` ensures all findings preserved
3. **Quality gate:** Validates before compression (prevents bad outputs)
4. **Checkpointing:** Resume expensive multi-agent workflows after failures
5. **Langfuse tracing:** Track costs and latency per agent

---

## Common Patterns

### Pattern 1: Map-Reduce (Parallel Agents)

```python
from langgraph.graph import StateGraph

def fan_out(state):
    """Split work into parallel tasks."""
    state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
    return state

def worker(state):
    """Process one task."""
    # LangGraph handles parallel execution
    task = state["current_task"]
    result = process(task)
    return {"results": [result]}

def fan_in(state):
    """Combine parallel results."""
    combined = aggregate(state["results"])
    return {"final": combined}

workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)

workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in")  # Waits for all workers
```

### Pattern 2: Human-in-the-Loop

```python
workflow = StateGraph(State)
workflow.add_node("draft", generate_draft)
workflow.add_node("review", human_review)
workflow.add_node("publish", publish_content)

# Interrupt before review (wait for human)
app = workflow.compile(interrupt_before=["review"])

# Step 1: Generate draft (stops at review)
result = app.invoke({"topic": "AI"}, config=config)

# Step 2: Human reviews, modifies state
state = app.get_state(config)
state["approved"] = True  # Human decision
app.update_state(config, state)

# Step 3: Resume workflow
result = app.invoke(None, config=config)  # Continues to publish
```

### Pattern 3: Retry with Backoff

```python
def llm_call_with_retry(state):
    """Retry failed LLM calls."""
    try:
        result = call_llm(state["input"])
        state["output"] = result
        state["retry_count"] = 0
        return state
    except Exception as e:
        state["retry_count"] += 1
        state["error"] = str(e)
        return state

def should_retry(state) -> str:
    if state["retry_count"] == 0:
        return "success"
    elif state["retry_count"] < 3:
        return "retry"
    else:
        return "failed"

workflow.add_conditional_edges(
    "llm_call",
    should_retry,
    {
        "success": "next_step",
        "retry": "llm_call",  # Loop back
        "failed": "error_handler"
    }
)
```

---

## Best Practices

### 1. State Design
- **Keep state flat:** Avoid deeply nested dicts (hard to debug)
- **Use TypedDict:** Type safety catches errors early
- **Annotated accumulators:** Use `Annotated[list, add]` for multi-agent outputs
- **Immutable inputs:** Don't modify input fields (helps with checkpointing)

### 2. Node Design
- **Pure functions:** Nodes should not have side effects (except I/O)
- **Idempotent:** Safe to re-run (important for checkpointing)
- **Single responsibility:** One agent = one node
- **Return new state:** Don't mutate in place (use `state.copy()`)

### 3. Error Handling
- **Wrap nodes:** Try/catch to prevent workflow crash
- **Dead letter queue:** Send failed items to error handler
- **Retry logic:** Exponential backoff for transient errors
- **Checkpoints:** Enable recovery without losing progress

### 4. Performance
- **Parallel execution:** Use `Send` API for independent tasks
- **Lazy loading:** Don't load heavy data until needed
- **Streaming:** Stream LLM responses for better UX
- **Caching:** Cache expensive operations (embeddings, API calls)

### 5. Observability
- **Trace everything:** Use `@observe()` on all nodes
- **Log state changes:** Before/after state for debugging
- **Cost tracking:** Record token usage per node
- **Alerting:** Set up alerts for workflow failures

---

## Debugging LangGraph Workflows

### Visualize the Graph
```python
from IPython.display import Image

# Generate graph visualization
image = app.get_graph().draw_mermaid_png()
Image(image)
```

### Inspect Checkpoints
```python
# Get all checkpoints for a workflow
checkpoints = app.get_state_history(config)

for checkpoint in checkpoints:
    print(f"Step: {checkpoint.metadata['step']}")
    print(f"Node: {checkpoint.metadata['source']}")
    print(f"State: {checkpoint.values}")
```

### Step-by-Step Execution
```python
# Execute one node at a time
for step in app.stream(initial_state, config):
    print(f"After {step['node']}: {step['state']}")
    input("Press Enter to continue...")
```

---

## Migration from LangChain Chains

**Old Way (LCEL Chain):**
```python
chain = (
    load_content
    | analyze
    | summarize
    | format_output
)
result = chain.invoke({"url": url})
```

**New Way (LangGraph):**
```python
workflow = StateGraph(State)
workflow.add_node("load", load_content)
workflow.add_node("analyze", analyze)
workflow.add_node("summarize", summarize)
workflow.add_node("format", format_output)

workflow.add_edge("load", "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", "format")

app = workflow.compile()
result = app.invoke({"url": url})
```

**When to use LangGraph over LCEL:**
- Need state persistence (checkpointing)
- Conditional routing based on results
- Multi-agent coordination
- Human-in-the-loop approval
- Fault tolerance required

---

## References

### LangGraph Documentation
- [LangGraph Docs](https://langchain-ai.github.io/langgraph/)
- [StateGraph API](https://langchain-ai.github.io/langgraph/reference/graphs/)
- [Checkpointing Guide](https://langchain-ai.github.io/langgraph/how-tos/persistence/)

### this project Examples
- `backend/app/workflows/content_analysis_workflow.py` - Main analysis pipeline
- `backend/app/workflows/nodes/` - Individual agent nodes
- `backend/app/workflows/state.py` - State schema definitions

### Related Skills
- `ai-native-development` - LLM integration patterns
- `langfuse-observability` - Workflow tracing and monitoring
- `performance-optimization` - Optimize multi-agent execution

---

**Version:** 1.0.0 (December 2025)
**Status:** Production-ready patterns from this project's multi-agent pipeline
