---
name: agent-workflow-designer
description: >
  Design and implement multi-agent orchestration systems with workflow DAGs,
  agent routing, handoff protocols, state management, and cost optimization. Use
  when building AI pipelines with multiple specialized agents, designing
  fan-out/fan-in patterns, or implementing fault-tolerant agent workflows.
license: MIT + Commons Clause
metadata:
  version: 1.0.0
  author: borghei
  category: engineering
  domain: ai-orchestration
  tier: POWERFUL
  updated: 2026-03-09
  frameworks: langgraph, crewai, autogen, claude-agent-teams
---
# Agent Workflow Designer

The agent designs multi-agent orchestration systems using five core patterns: sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, and consensus validation. It implements agent routing strategies, circuit breaker reliability patterns, context window budgeting, and cost optimization across LangGraph, CrewAI, AutoGen, and Claude Code agent teams.

## Core Capabilities

### 1. Pattern Selection and Design
- Sequential pipelines with typed handoffs
- Parallel fan-out/fan-in with merge strategies
- Hierarchical delegation with dynamic subtask discovery
- Event-driven reactors with pub/sub agent triggers
- Consensus validation with voting and arbitration

### 2. Agent Routing
- Intent-based routing with classifier agents
- Skill-based routing using capability matching
- Cost-aware routing (cheap models for simple tasks)
- Load-balanced routing across agent pools
- Fallback chains with graceful degradation

### 3. State and Context Management
- Persistent workflow state across agent hops
- Context window budgeting and summarization
- Checkpoint/resume for long-running workflows
- Conflict resolution for parallel state updates

### 4. Reliability Engineering
- Circuit breakers for failing agents
- Retry with exponential backoff and model fallback
- Dead letter queues for unprocessable tasks
- Timeout enforcement at every agent boundary
- Idempotent operations for safe retries

## When to Use

- Building multi-step AI pipelines that exceed one agent's capability
- Parallelizing research, analysis, or generation tasks
- Creating specialist agent teams with defined roles and contracts
- Designing fault-tolerant AI workflows for production deployment
- Optimizing cost across workflows with mixed model tiers

## Pattern Selection Decision Tree

```
What does the workflow look like?
│
├─ Linear: step A feeds step B feeds step C
│  └─ SEQUENTIAL PIPELINE
│     Best for: content pipelines, code review chains, data transformation
│
├─ Parallel: N independent tasks, then combine
│  └─ FAN-OUT / FAN-IN
│     Best for: competitive research, multi-source analysis, parallel code gen
│
├─ Tree: orchestrator breaks work into subtasks dynamically
│  └─ HIERARCHICAL DELEGATION
│     Best for: complex projects, open-ended research, code generation with planning
│
├─ Reactive: agents respond to events/triggers
│  └─ EVENT-DRIVEN REACTOR
│     Best for: monitoring, alerting, continuous integration, chat workflows
│
└─ Verification: multiple agents must agree on output
   └─ CONSENSUS VALIDATION
      Best for: high-stakes decisions, code review, fact checking, safety-critical output
```

## Pattern 1: Sequential Pipeline

Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.

### LangGraph Implementation

```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_anthropic import ChatAnthropic

class PipelineState(TypedDict):
    topic: str
    research: str
    draft: str
    final: str
    stage_costs: Annotated[list[dict], "append"]  # accumulates cost per stage

def research_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048)
    result = model.invoke(
        f"Research the following topic thoroughly. Provide key facts, statistics, "
        f"and expert perspectives:\n\n{state['topic']}"
    )
    return {
        "research": result.content,
        "stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}],
    }

def writing_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096)
    result = model.invoke(
        f"Using this research, write a compelling 800-word blog post with a hook, "
        f"3 main sections, and a CTA:\n\n{state['research']}"
    )
    return {
        "draft": result.content,
        "stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}],
    }

def editing_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096)
    result = model.invoke(
        f"Edit this draft for clarity, flow, and grammar. Return only the improved "
        f"version:\n\n{state['draft']}"
    )
    return {
        "final": result.content,
        "stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}],
    }

# Build the graph
graph = StateGraph(PipelineState)
graph.add_node("research", research_stage)
graph.add_node("write", writing_stage)
graph.add_node("edit", editing_stage)
graph.add_edge("research", "write")
graph.add_edge("write", "edit")
graph.add_edge("edit", END)
graph.set_entry_point("research")

pipeline = graph.compile()

# Execute
result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"})
print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")
```

## Pattern 2: Parallel Fan-Out / Fan-In

Independent tasks run concurrently. A merge function combines results.

```python
import asyncio
from dataclasses import dataclass

@dataclass
class FanOutTask:
    name: str
    system_prompt: str
    user_message: str
    model: str = "claude-sonnet-4-20250514"

@dataclass
class FanOutResult:
    task_name: str
    output: str
    tokens_used: int
    success: bool
    error: str | None = None

async def fan_out_fan_in(
    tasks: list[FanOutTask],
    merge_prompt: str,
    max_concurrent: int = 5,
    timeout_seconds: float = 60.0,
) -> dict:
    """Execute tasks in parallel with concurrency limit and timeout."""
    import anthropic

    client = anthropic.AsyncAnthropic()
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_one(task: FanOutTask) -> FanOutResult:
        async with semaphore:
            try:
                response = await asyncio.wait_for(
                    client.messages.create(
                        model=task.model,
                        max_tokens=2048,
                        system=task.system_prompt,
                        messages=[{"role": "user", "content": task.user_message}],
                    ),
                    timeout=timeout_seconds,
                )
                return FanOutResult(
                    task_name=task.name,
                    output=response.content[0].text,
                    tokens_used=response.usage.input_tokens + response.usage.output_tokens,
                    success=True,
                )
            except Exception as e:
                return FanOutResult(
                    task_name=task.name, output="", tokens_used=0,
                    success=False, error=str(e),
                )

    # FAN-OUT: run all tasks concurrently
    results = await asyncio.gather(*[run_one(t) for t in tasks])
    successful = [r for r in results if r.success]
    failed = [r for r in results if not r.success]

    if not successful:
        raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")

    # FAN-IN: merge results
    combined = "\n\n---\n\n".join(
        f"## {r.task_name}\n{r.output}" for r in successful
    )

    merge_response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        system="Synthesize the following parallel analyses into a unified report.",
        messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
    )

    return {
        "synthesis": merge_response.content[0].text,
        "individual_results": successful,
        "failures": failed,
        "total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
    }
```

## Pattern 3: Hierarchical Delegation

An orchestrator agent dynamically decomposes work and delegates to specialists.

```python
from typing import Literal

SPECIALISTS = {
    "researcher": "Find accurate information with sources. Be thorough and cite evidence.",
    "coder": "Write clean, tested code. Include error handling and type hints.",
    "writer": "Create clear, engaging content. Match the requested tone and format.",
    "analyst": "Analyze data and produce evidence-backed conclusions with visualizations.",
    "reviewer": "Review work product for quality, accuracy, and completeness.",
}

@dataclass
class SubTask:
    id: str
    agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"]
    task: str
    depends_on: list[str]
    priority: int = 0  # higher = run first when deps are equal

class HierarchicalOrchestrator:
    def __init__(self, client):
        self.client = client

    async def plan(self, request: str) -> list[SubTask]:
        """Orchestrator creates an execution plan with dependencies."""
        response = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            system=f"""You are a task orchestrator. Break down the request into subtasks.
Available specialists: {', '.join(SPECIALISTS.keys())}
Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}}
Rules:
- Minimize the number of subtasks (prefer fewer, more substantial tasks)
- Only add dependencies when output is genuinely needed
- Independent tasks should have empty depends_on for parallel execution""",
            messages=[{"role": "user", "content": request}],
        )
        import json
        plan = json.loads(response.content[0].text)
        return [SubTask(**st) for st in plan["subtasks"]]

    async def execute(self, request: str) -> str:
        """Plan, execute with dependency resolution, and synthesize."""
        subtasks = await self.plan(request)
        results = {}

        # Execute in dependency order, parallelize where possible
        for batch in self._batch_by_dependencies(subtasks):
            batch_results = await asyncio.gather(*[
                self._run_specialist(st, results) for st in batch
            ])
            for st, result in zip(batch, batch_results):
                results[st.id] = result

        # Final synthesis
        all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items())
        synthesis = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            system="Synthesize specialist outputs into a coherent final response.",
            messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}],
        )
        return synthesis.content[0].text

    def _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]:
        """Group subtasks into batches that can run in parallel."""
        completed = set()
        remaining = list(subtasks)
        batches = []
        while remaining:
            batch = [t for t in remaining if all(d in completed for d in t.depends_on)]
            if not batch:
                raise ValueError("Circular dependency detected in subtask plan")
            batches.append(sorted(batch, key=lambda t: -t.priority))
            completed.update(t.id for t in batch)
            remaining = [t for t in remaining if t.id not in completed]
        return batches
```

## Pattern 4: Event-Driven Reactor

Agents react to events from a message bus. Decoupled and scalable.

```python
from collections import defaultdict
from typing import Callable, Any

class AgentEventBus:
    """Simple event bus for agent-to-agent communication."""

    def __init__(self):
        self._handlers: dict[str, list[Callable]] = defaultdict(list)
        self._history: list[dict] = []

    def subscribe(self, event_type: str, handler: Callable):
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: Any, source: str):
        event = {"type": event_type, "payload": payload, "source": source}
        self._history.append(event)
        handlers = self._handlers.get(event_type, [])
        results = await asyncio.gather(
            *[h(event) for h in handlers],
            return_exceptions=True,
        )
        errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
        if errors:
            for handler, error in errors:
                print(f"Handler {handler.__name__} failed: {error}")
        return results

# Usage: code review pipeline triggered by PR events
bus = AgentEventBus()

async def on_pr_opened(event):
    """Security agent scans PR for vulnerabilities."""
    diff = event["payload"]["diff"]
    # ... scan and publish results
    await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")

async def on_security_complete(event):
    """Review agent incorporates security findings into review."""
    # ... generate review with security context

bus.subscribe("pr_opened", on_pr_opened)
bus.subscribe("security_scan_complete", on_security_complete)
```

## Pattern 5: Consensus Validation

Multiple agents independently evaluate the same input. A quorum determines the final output.

```python
@dataclass
class Vote:
    agent: str
    verdict: str  # "approve" | "reject" | "revise"
    confidence: float  # 0.0 - 1.0
    reasoning: str

async def consensus_validate(
    content: str,
    validators: list[dict],  # [{"name": "...", "system": "..."}]
    quorum: float = 0.66,
    confidence_threshold: float = 0.7,
) -> dict:
    """Run content through multiple validators and determine consensus."""
    votes: list[Vote] = []

    # Collect independent votes (no agent sees another's vote)
    vote_tasks = []
    for v in validators:
        vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
    raw_votes = await asyncio.gather(*vote_tasks)
    votes = [v for v in raw_votes if v is not None]

    # Calculate consensus
    approvals = [v for v in votes if v.verdict == "approve"]
    approval_rate = len(approvals) / len(votes) if votes else 0
    avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0

    if approval_rate >= quorum and avg_confidence >= confidence_threshold:
        return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
    elif any(v.verdict == "reject" for v in votes):
        rejections = [v for v in votes if v.verdict == "reject"]
        return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
    else:
        return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}
```

## Agent Routing Strategies

### Intent-Based Router

```python
class IntentRouter:
    """Route requests to specialized agents based on intent classification."""

    ROUTING_TABLE = {
        "code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
        "code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
        "research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
        "simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
        "creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
        "complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
    }

    async def route(self, message: str) -> dict:
        # Use a fast, cheap model for classification
        classification = await self.client.messages.create(
            model="claude-haiku-4-20250514",
            max_tokens=50,
            system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
            messages=[{"role": "user", "content": message}],
        )
        intent = classification.content[0].text.strip().lower()
        return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])
```

## Context Window Budgeting

```python
MODEL_LIMITS = {
    "claude-sonnet-4-20250514": 200_000,
    "claude-haiku-4-20250514": 200_000,
    "claude-opus-4-20250514": 200_000,
    "gpt-4o": 128_000,
}

class ContextBudget:
    def __init__(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15):
        self.total = MODEL_LIMITS.get(model, 128_000)
        self.reserve = int(self.total * reserve_pct)
        self.per_stage = (self.total - self.reserve) // pipeline_stages
        self.used = 0

    def allocate(self, stage: str) -> int:
        available = self.total - self.reserve - self.used
        allocation = min(self.per_stage, int(available * 0.6))
        return max(allocation, 1000)  # minimum 1000 tokens per stage

    def consume(self, tokens: int):
        self.used += tokens

    def summarize_if_needed(self, text: str, budget: int) -> str:
        estimated_tokens = len(text) // 4
        if estimated_tokens <= budget:
            return text
        # Truncate to budget with marker
        char_limit = budget * 4
        return text[:char_limit] + "\n\n[Content truncated to fit context budget]"
```

## Cost Optimization Matrix

| Strategy | Cost Reduction | Quality Impact | When to Use |
|----------|---------------|----------------|-------------|
| Haiku for routing/classification | 85-90% | Minimal | Always for intent routing |
| Haiku for editing/formatting | 60-70% | Low | Mechanical tasks |
| Sonnet for most stages | Baseline | Baseline | Default choice |
| Opus only for final synthesis | +50% on that stage | Higher quality | High-stakes output |
| Prompt caching (system prompts) | 50-90% per call | None | Repeated system prompts |
| Truncate intermediate outputs | 20-40% | May lose detail | Long pipelines |
| Parallel + early termination | 30-50% | None if threshold met | Search/validation tasks |
| Batch similar requests | Up to 50% | Increased latency | Non-real-time workloads |

## Reliability Patterns

### Circuit Breaker

```python
import time

class CircuitBreaker:
    """Prevent cascading failures when an agent/model is down."""

    def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.failures = 0
        self.state = "closed"  # closed = healthy, open = failing, half-open = testing
        self.last_failure_time = 0.0

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_time:
                self.state = "half-open"
                return True
            return False
        return True  # half-open: allow one test request

    def record_success(self):
        self.failures = 0
        self.state = "closed"

    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"
```

## Common Pitfalls

- **Over-orchestration** — if a single prompt can handle it, adding agents adds cost and latency, not value
- **Circular dependencies** in subtask graphs causing infinite loops; always validate DAG structure before execution
- **Context bleed** — passing entire previous outputs to every stage; summarize or extract only what is needed
- **No timeout enforcement** — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary
- **Silent failures** — agent returns plausible but incorrect output; add validation stages for critical paths
- **Ignoring cost** — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one
- **Stateless retries** on stateful operations — ensure idempotency before enabling automatic retries
- **Single point of failure** in orchestrator — if the orchestrator agent fails, the entire workflow fails

## Best Practices

1. **Start with a single prompt** — only add agents when you prove one cannot handle the task
2. **Type your handoffs** — use dataclasses or TypedDicts for inter-agent data, not raw strings
3. **Budget context upfront** — calculate token allocations before running the pipeline
4. **Use cheap models for routing** — Haiku for classification costs 10x less than Sonnet
5. **Validate DAG structure** at build time, not runtime
6. **Log every agent call** with input hash, output hash, tokens, latency, and cost
7. **Set SLAs per stage** — if research takes >30s, timeout and use cached results
8. **Test with production-scale inputs** — a pipeline that works on 100 words may fail on 10,000

## Troubleshooting

| Problem | Cause | Solution |
|---------|-------|----------|
| Pipeline hangs indefinitely | Missing timeout enforcement on one or more agent stages | Add `asyncio.wait_for()` with explicit `timeout_seconds` at every agent boundary; use the Circuit Breaker pattern to fail fast |
| Circular dependency error at runtime | Subtask graph contains a cycle (e.g., task A depends on B which depends on A) | Validate DAG structure at build time with topological sort; the `_batch_by_dependencies` method catches this but validation should happen earlier |
| Context window exceeded mid-pipeline | Intermediate outputs grow beyond the model's token limit | Use the `ContextBudget` class to allocate tokens per stage; summarize or truncate outputs before passing to the next stage |
| Fan-out tasks return inconsistent formats | Each parallel agent interprets the output schema differently | Define a shared `TypedDict` or `dataclass` for all fan-out results; add a validation step before the merge function |
| Orchestrator plan creates too many subtasks | The planning prompt does not constrain subtask count, leading to over-decomposition | Add explicit constraints in the planner system prompt (e.g., "maximum 5 subtasks"); review and approve plans before execution in high-stakes workflows |
| Consensus never reaches quorum | Validators disagree consistently or confidence scores are too low | Lower the `quorum` threshold, add a tiebreaker agent, or revise validator prompts to align on evaluation criteria |
| Cost spikes on parallel workflows | Expensive models (Opus) used for all fan-out branches instead of routing by complexity | Apply cost-aware routing: use Haiku for classification and simple tasks, Sonnet for most work, Opus only for final synthesis or high-stakes decisions |

## Success Criteria

- Pipeline end-to-end latency stays within the defined SLA (e.g., under 60 seconds for a 5-stage workflow) with no stage exceeding its individual timeout
- Agent routing accuracy exceeds 90% when measured against a labeled test set of at least 100 representative requests
- Fan-out/fan-in workflows complete with fewer than 5% task failures across all parallel branches under normal operating conditions
- Total token cost per workflow run decreases by at least 40% after applying model tiering (Haiku for routing, Sonnet for core work, Opus for synthesis)
- Circuit breakers trigger correctly within 5 consecutive failures and recover automatically after the defined recovery window
- Context window utilization stays below 85% of model limits at every pipeline stage, with no truncation-related quality degradation
- All inter-agent handoffs pass schema validation with zero type errors across 100 consecutive workflow executions

## Scope & Limitations

**This skill covers:**
- Design and implementation of five core multi-agent orchestration patterns (sequential, parallel, hierarchical, event-driven, consensus)
- Agent routing strategies including intent-based, skill-based, and cost-aware routing
- Reliability engineering patterns: circuit breakers, retries, timeouts, and dead letter queues
- Context window budgeting, cost optimization, and framework-specific implementations (LangGraph, CrewAI, AutoGen)

**This skill does NOT cover:**
- Training or fine-tuning the underlying LLMs used by agents (see `engineering/ml-pipeline-architect` for ML training workflows)
- Infrastructure provisioning, container orchestration, or deployment pipelines (see `engineering/cloud-infrastructure-designer` for cloud architecture)
- Human-in-the-loop approval workflows or UI design for agent dashboards (see `product-team/ux-researcher` for user-facing workflow design)
- Long-term agent memory, vector database setup, or RAG pipeline construction (see `engineering/rag-pipeline-architect` for retrieval-augmented generation)

## Integration Points

| Skill | Integration | Data Flow |
|-------|-------------|-----------|
| `engineering/ml-pipeline-architect` | Agent workflows that include ML inference stages use ML Pipeline Architect for model serving and batch prediction design | Workflow DAG exports stage specs to ML pipeline; ML pipeline returns inference endpoints for agent consumption |
| `engineering/rag-pipeline-architect` | Research and retrieval agents within workflows rely on RAG pipelines for grounded knowledge access | Agent sends queries to RAG pipeline; RAG returns ranked document chunks with citations for agent context |
| `engineering/cloud-infrastructure-designer` | Production deployment of agent workflows requires infrastructure design for scaling, queuing, and monitoring | Workflow resource requirements feed into infrastructure specs; infra returns endpoint URLs, queue ARNs, and scaling policies |
| `engineering/api-design-architect` | Inter-agent communication contracts and external API boundaries follow API design standards | Agent handoff schemas are validated against API design specs; API architect provides OpenAPI definitions for external integrations |
| `engineering/system-design-architect` | Overall system architecture decisions (sync vs async, monolith vs distributed) shape workflow topology choices | System design constraints (latency budgets, availability targets) inform pattern selection; workflow requirements feed back into system capacity planning |
| `project-management/technical-project-planning` | Complex multi-agent projects require structured planning for phased rollout, risk management, and milestone tracking | Workflow complexity estimates feed into project plans; PM skill provides sprint boundaries and dependency timelines for staged deployment |
