---
name: agent-orchestration
description: Provides best practices for AI agent orchestration including MCP servers, A2A protocol, multi-agent coordination, and swarm architectures. Use when designing agent systems, configuring MCP servers, setting up agent teams, or when user mentions 'MCP', 'A2A', 'agent orchestration', 'multi-agent', 'swarm', 'agent team', 'LangGraph', 'CrewAI', 'AutoGen'.
type: skill
category: orchestration
status: stable
origin: tibsfox
modified: false
first_seen: 2026-02-07
first_path: examples/agent-orchestration/SKILL.md
superseded_by: null
---
# Agent Orchestration

Best practices for designing, deploying, and coordinating AI agent systems using MCP servers, A2A protocol, and multi-agent patterns.

## Agent Orchestration Patterns

Orchestration determines how agents are coordinated, who makes decisions, and how work flows between them.

| Pattern | Description | Best For | Drawback |
|---------|------------|----------|----------|
| **Centralized** | Single orchestrator dispatches tasks to worker agents | Predictable workflows, clear task boundaries | Orchestrator is a bottleneck and single point of failure |
| **Hierarchical** | Manager agents delegate to specialist sub-agents | Complex multi-domain tasks | Deep hierarchies add latency and lose context |
| **Peer-to-peer** | Agents communicate directly, no central coordinator | Collaborative reasoning, brainstorming | Hard to debug, potential infinite loops |
| **Pipeline** | Agents process sequentially, output feeds next agent | Data transformation, multi-stage analysis | Slow for parallelizable work, rigid ordering |
| **Blackboard** | Shared state space that agents read from and write to | Problems requiring incremental refinement | Contention on shared state, ordering issues |
| **Auction/Market** | Agents bid on tasks based on capability and capacity | Dynamic workload distribution | Overhead of bidding, suboptimal for simple tasks |
| **Swarm** | Many lightweight agents with simple rules, emergent behavior | Exploration, search, large-scale parallel tasks | Unpredictable outcomes, hard to steer |

### Choosing the Right Pattern

```
Is the workflow predictable and linear?
  YES --> Pipeline or Centralized
  NO  --> Does it require specialized domain expertise?
            YES --> Hierarchical (domain managers + specialists)
            NO  --> Do agents need to collaborate on shared output?
                      YES --> Blackboard or Peer-to-peer
                      NO  --> Is the workload dynamic and variable?
                                YES --> Auction/Market
                                NO  --> Centralized (default safe choice)
```

## MCP (Model Context Protocol) for DevOps

MCP provides a standardized way for AI agents to interact with external tools, services, and data sources. Each MCP server exposes capabilities that agents can discover and invoke.

### MCP Architecture

```
Agent (Claude, GPT, etc.)
  |
  +--> MCP Client (built into agent runtime)
         |
         +--> MCP Server: GitHub     (repos, PRs, issues)
         +--> MCP Server: Kubernetes (pods, deployments, services)
         +--> MCP Server: Database   (queries, schema inspection)
         +--> MCP Server: Monitoring (metrics, alerts, dashboards)
         +--> MCP Server: Cloud      (AWS/GCP/Azure resources)
```

### MCP Server Configuration for DevOps Tools

```json
{
  "mcpServers": {
    "github": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-github"],
      "env": {
        "GITHUB_PERSONAL_ACCESS_TOKEN": "${GITHUB_TOKEN}"
      }
    },
    "kubernetes": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-kubernetes"],
      "env": {
        "KUBECONFIG": "${HOME}/.kube/config"
      }
    },
    "postgres": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-postgres",
        "postgresql://readonly:${DB_PASSWORD}@db.internal:5432/production"
      ]
    },
    "filesystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/opt/configs",
        "/var/log/apps"
      ]
    },
    "prometheus": {
      "command": "python",
      "args": ["-m", "mcp_prometheus"],
      "env": {
        "PROMETHEUS_URL": "http://prometheus.internal:9090"
      }
    }
  }
}
```

### MCP Server Security Rules

| Rule | Rationale |
|------|-----------|
| Use read-only credentials where possible | Agents should observe before acting; limit blast radius |
| Scope tokens to minimum required permissions | A GitHub token for reading PRs should not have admin access |
| Run MCP servers in isolated environments | Prevent lateral movement if an MCP server is compromised |
| Log all MCP tool invocations | Audit trail for agent actions, required for compliance |
| Set rate limits on MCP server endpoints | Prevent runaway agents from overwhelming external services |
| Validate agent inputs before execution | MCP servers must sanitize and validate all parameters |

### Custom MCP Server Example

```typescript
// mcp-server-deploy.ts -- Custom MCP server for deployment operations
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { CallToolRequestSchema, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  { name: "deploy-server", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [{
    name: "get_deployment_status",
    description: "Get current deployment status for a service",
    inputSchema: {
      type: "object",
      properties: {
        service: { type: "string" },
        environment: { type: "string", enum: ["staging", "production"] },
      },
      required: ["service", "environment"],
    },
  }],
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  if (name === "get_deployment_status") {
    const status = await queryDeploymentSystem(args.service, args.environment);
    return { content: [{ type: "text", text: JSON.stringify(status, null, 2) }] };
  }
  throw new Error(`Unknown tool: ${name}`);
});

await server.connect(new StdioServerTransport());
```

## A2A (Agent-to-Agent) Protocol

A2A is Google's open protocol for agent interoperability. It enables agents built on different frameworks to discover each other, negotiate capabilities, and exchange tasks.

### A2A Core Concepts

| Concept | Description |
|---------|-------------|
| **Agent Card** | JSON metadata describing an agent's capabilities, endpoint, and auth |
| **Task** | A unit of work sent from one agent to another |
| **Message** | Communication within a task (text, files, structured data) |
| **Artifact** | Output produced by an agent (files, data, results) |
| **Push Notification** | Server-sent updates for long-running tasks |

### A2A Agent Card

```json
{
  "name": "DevOps Deployment Agent",
  "description": "Handles deployments, rollbacks, and release management",
  "url": "https://agents.internal/deploy",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["bearer"],
    "credentials": "oauth2_token"
  },
  "defaultInputModes": ["text/plain", "application/json"],
  "defaultOutputModes": ["text/plain", "application/json"],
  "skills": [
    {
      "id": "deploy-service",
      "name": "Deploy Service",
      "description": "Deploy a service to staging or production",
      "tags": ["deployment", "release"],
      "examples": [
        "Deploy payment-api v2.3.1 to staging",
        "Roll back auth-service in production to previous version"
      ]
    },
    {
      "id": "deployment-status",
      "name": "Check Deployment Status",
      "description": "Get current deployment status and history",
      "tags": ["monitoring", "status"]
    }
  ]
}
```

### A2A Task Message Exchange

```json
{
  "jsonrpc": "2.0",
  "method": "tasks/send",
  "id": "req-001",
  "params": {
    "id": "task-deploy-2025-001",
    "message": {
      "role": "user",
      "parts": [
        {
          "type": "text",
          "text": "Deploy payment-api v2.3.1 to staging environment"
        },
        {
          "type": "data",
          "mimeType": "application/json",
          "data": {
            "service": "payment-api",
            "version": "v2.3.1",
            "environment": "staging",
            "strategy": "canary",
            "canary_percentage": 10,
            "rollback_on_error": true
          }
        }
      ]
    }
  }
}
```

### A2A Task Response

```json
{
  "jsonrpc": "2.0",
  "id": "req-001",
  "result": {
    "id": "task-deploy-2025-001",
    "status": {
      "state": "completed",
      "message": {
        "role": "agent",
        "parts": [{ "type": "text", "text": "Deployed payment-api v2.3.1 to staging, canary at 10%." }]
      }
    },
    "artifacts": [{
      "name": "deployment-report",
      "parts": [{
        "type": "data",
        "mimeType": "application/json",
        "data": {
          "deployment_id": "deploy-abc123",
          "status": "healthy",
          "canary_metrics": { "error_rate": 0.001, "p99_latency_ms": 245 }
        }
      }]
    }]
  }
}
```

## Agent Team Configuration

Agent teams assign distinct roles to specialized agents that collaborate on complex tasks.

### Claude Code Agent Team Configuration

```yaml
# agent-team.yaml -- DevOps agent team using Claude Code
team:
  name: devops-ops-team
  coordination: centralized

agents:
  - role: orchestrator
    model: claude-sonnet-4-20250514
    system_prompt: "Receive requests, delegate to specialists, synthesize results. Never act directly."
    tools: [dispatch_to_agent, check_agent_status, aggregate_results]

  - role: code-reviewer
    model: claude-sonnet-4-20250514
    system_prompt: "Review code for security, reliability, team standards. Actionable feedback with line refs."
    tools: [github_pr_read, github_pr_comment, run_static_analysis]

  - role: deployment-agent
    model: claude-sonnet-4-20250514
    system_prompt: "Handle deployments. Verify pre-conditions, canary for prod, confirm health checks."
    tools: [kubernetes_apply, deployment_status, rollback_deployment, run_smoke_tests]

  - role: incident-responder
    model: claude-sonnet-4-20250514
    system_prompt: "Gather metrics, correlate with changes, propose mitigations. No prod changes without approval."
    tools: [query_prometheus, query_logs, get_recent_deployments, create_incident_report]

workflows:
  deploy_request:
    - { agent: code-reviewer, action: review_changes, gate: approval_required }
    - { agent: deployment-agent, action: deploy_to_staging }
    - { agent: deployment-agent, action: run_smoke_tests, gate: tests_must_pass }
    - { agent: deployment-agent, action: deploy_to_production }
    - { agent: orchestrator, action: notify_team }
```

## Swarm Architecture Comparison

Swarm architectures use multiple lightweight agents that coordinate through simple rules or shared state.

| Framework | Architecture | Coordination | State Management | Best For |
|-----------|-------------|-------------|-----------------|----------|
| **LangGraph** | Graph-based DAG | Explicit edges between nodes | Shared state object passed through graph | Complex workflows with conditional branching |
| **CrewAI** | Role-based crew | Sequential or parallel task execution | Shared memory + per-agent memory | Task-oriented teams with clear role separation |
| **AutoGen** | Conversational | Agent-to-agent messaging | Conversation history as shared context | Multi-turn collaborative reasoning |
| **OpenAI Agents SDK** | Handoff-based | Agent-to-agent handoffs with context transfer | Thread-level state with tool results | Production agent systems with tool use |
| **Claude Code** | Orchestrator + sub-agents | Parent spawns child agents via Task tool | File system + context passing | Developer tooling and code generation |

### LangGraph: Conditional Workflow

```python
# langgraph_deploy_workflow.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class DeployState(TypedDict):
    service: str
    version: str
    review_result: str       # "approved" | "rejected"
    staging_healthy: bool

def review_code(state: DeployState) -> DeployState:
    result = code_review_agent.invoke(f"Review {state['service']} {state['version']}")
    state["review_result"] = result.approval_status
    return state

def deploy_staging(state: DeployState) -> DeployState:
    result = deploy_agent.invoke(f"Deploy {state['service']} {state['version']} to staging")
    state["staging_healthy"] = result.healthy
    return state

def should_deploy(state: DeployState) -> Literal["deploy_staging", "end"]:
    return "deploy_staging" if state["review_result"] == "approved" else "end"

# Build: review --> (approved?) --> staging --> (healthy?) --> production
workflow = StateGraph(DeployState)
workflow.add_node("review", review_code)
workflow.add_node("deploy_staging", deploy_staging)
workflow.set_entry_point("review")
workflow.add_conditional_edges("review", should_deploy)
workflow.add_edge("deploy_staging", END)
graph = workflow.compile()
```

### OpenAI Agents SDK: Handoff Pattern

```python
# openai_agents_deploy.py
from agents import Agent, handoff, Runner

code_reviewer = Agent(
    name="Code Reviewer",
    instructions="""Review code changes for security and reliability.
    If approved, hand off to Deployer. If rejected, explain why.""",
    handoffs=["deployer"],
)

deployer = Agent(
    name="Deployer",
    instructions="""Deploy the approved changes. Use canary strategy
    for production. Hand off to Monitor after deployment.""",
    handoffs=["monitor"],
    tools=[deploy_to_staging, deploy_to_production, run_smoke_tests],
)

monitor = Agent(
    name="Monitor",
    instructions="""Monitor the deployment for 15 minutes. Check error
    rates, latency, and resource usage. Report any anomalies.""",
    tools=[query_metrics, check_error_rate, check_latency],
)

# Run the pipeline
result = Runner.run(
    code_reviewer,
    input="Deploy payment-api v2.3.1 -- changes include rate limiting middleware",
)
```

## Agent Communication Patterns

### Message Types

| Message Type | Purpose | Example |
|-------------|---------|---------|
| **Task Request** | Ask an agent to perform work | "Deploy service X to staging" |
| **Status Update** | Report progress on ongoing work | "Deployment at 50%, canary healthy" |
| **Result** | Deliver completed work output | "Deployment complete, all health checks pass" |
| **Query** | Ask for information without action | "What is the current error rate for service X?" |
| **Escalation** | Report a problem requiring higher authority | "Canary error rate exceeds 5%, requesting rollback approval" |
| **Handoff** | Transfer responsibility to another agent | "Code review complete, handing off to deployment agent" |

### Communication Topology

```
Centralized (Star):          Peer-to-peer (Mesh):
                              A --- B
    B   C                     |\ /|
     \ /                      | X |
      A  (orchestrator)       |/ \|
     / \                      C --- D
    D   E

Pipeline (Chain):            Hierarchical (Tree):
A --> B --> C --> D                A
                                /   \
                               B     C
                              / \     \
                             D   E     F
```

### Shared State Protocol

```python
# agent_state.py -- Thread-safe shared state (Blackboard pattern)
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any

class SharedAgentState:
    """Shared state space for multi-agent coordination."""

    def __init__(self):
        self._state: dict[str, Any] = {}
        self._lock = threading.RLock()

    def write(self, key: str, value: Any, agent_id: str) -> None:
        with self._lock:
            self._state[key] = {
                "value": value, "updated_by": agent_id,
                "updated_at": datetime.now(timezone.utc).isoformat(),
            }

    def read(self, key: str) -> Any | None:
        with self._lock:
            entry = self._state.get(key)
            return entry["value"] if entry else None
```

## State Management Across Agents

### State Strategies by Pattern

| Strategy | Mechanism | Consistency | Scalability |
|----------|----------|-------------|-------------|
| **Pass-through** | State object passed as function argument | Strong (single owner) | Low (deep copying overhead) |
| **Shared memory** | In-process shared dict with locking | Strong (with locks) | Low (single process) |
| **Message queue** | Redis Streams, Kafka, RabbitMQ | Eventual | High |
| **Database** | PostgreSQL, DynamoDB | Strong or eventual (configurable) | High |
| **File system** | JSON/YAML files in shared volume | Weak (race conditions) | Low |
| **Event sourcing** | Append-only log of state changes | Strong (replayable) | High |

### State Persistence for Long-Running Agents

```yaml
# agent-state-config.yaml
state_management:
  backend: redis
  connection: "redis://state.internal:6379/0"
  key_prefix: "agent-state:"
  persistence:
    snapshot_interval: 60s
    snapshot_backend: s3
  isolation:
    strategy: namespace           # {team}:{workflow}:{run_id}
  recovery:
    on_agent_crash: restore_from_snapshot
    on_state_corruption: replay_from_event_log
```

## Multi-Agent Coordination Example

End-to-end example: an incident response pipeline with four coordinating agents using parallel data gathering and sequential analysis.

```python
# incident_response_team.py
import asyncio
from dataclasses import dataclass

@dataclass
class IncidentContext:
    alert_id: str
    service: str
    severity: str
    metrics: dict | None = None
    recent_deploys: list | None = None
    root_cause: str | None = None
    mitigation: str | None = None

async def run_incident_response(alert_id: str, service: str, severity: str):
    ctx = IncidentContext(alert_id=alert_id, service=service, severity=severity)

    # Phase 1: Parallel data gathering (metrics + deploy history agents)
    ctx.metrics, ctx.recent_deploys = await asyncio.gather(
        gather_metrics_agent(ctx),
        gather_deploys_agent(ctx),
    )

    # Phase 2: Sequential analysis (needs data from phase 1)
    ctx.root_cause = await analyze_root_cause_agent(ctx)

    # Phase 3: Mitigation (needs root cause from phase 2)
    ctx.mitigation = await execute_mitigation_agent(ctx)

    # Phase 4: Documentation agent generates postmortem from full context
    return ctx
```

## Anti-Patterns

| Anti-Pattern | Problem | Fix |
|-------------|---------|-----|
| Giving agents unrestricted production access | Single hallucinated command can cause outage | Use read-only access by default; require approval gates for writes |
| No audit trail for agent actions | Cannot determine what an agent did or why | Log all tool invocations, decisions, and state changes |
| Agents calling agents in unbounded loops | Infinite recursion, cost explosion, no convergence | Set max iteration limits, timeout budgets, and cycle detection |
| Single mega-agent instead of specialized team | Context window overflow, poor at every task | Split into focused agents with clear responsibilities |
| Shared state without concurrency control | Race conditions, lost updates, inconsistent state | Use locking, versioned writes, or event sourcing |
| No fallback when an agent fails | Entire pipeline stops on one agent error | Implement retries, circuit breakers, and graceful degradation |
| Hardcoding agent dependencies | Cannot swap implementations or scale independently | Use discovery (A2A Agent Cards) or dependency injection |
| Trusting agent output without validation | Hallucinated data propagates through the pipeline | Validate outputs against schemas; add human checkpoints for critical actions |
| Running all agents on the most expensive model | Unnecessary cost for simple tasks | Match model capability to task complexity (small model for routing, large for analysis) |
| No resource budgets per agent | One runaway agent consumes all API quota or compute | Set per-agent token limits, rate limits, and cost ceilings |
| Synchronous-only communication | Pipeline blocked waiting for slow agents | Use async messaging with status callbacks for long-running tasks |
| Ignoring agent context window limits | Agents receive truncated context and make poor decisions | Summarize and filter context before passing between agents |

## Agent Orchestration Readiness Checklist

### Infrastructure

- [ ] MCP servers deployed for required external tools (GitHub, K8s, monitoring)
- [ ] MCP server credentials scoped to minimum required permissions
- [ ] Agent communication channel established (A2A, message queue, or direct)
- [ ] State management backend selected and configured (Redis, DB, or file)
- [ ] Logging and audit trail capturing all agent actions
- [ ] Rate limiting configured per agent and per MCP server

### Agent Design

- [ ] Each agent has a single, well-defined responsibility
- [ ] Agent system prompts include boundaries (what NOT to do)
- [ ] Model selection matches task complexity (not all tasks need the largest model)
- [ ] Input/output schemas defined for agent communication
- [ ] Error handling and retry logic implemented per agent
- [ ] Maximum iteration and token budgets set per agent

### Coordination

- [ ] Orchestration pattern selected and documented (centralized, hierarchical, etc.)
- [ ] Task routing logic tested with representative workloads
- [ ] Handoff protocols defined between agent pairs
- [ ] Shared state access patterns documented with concurrency controls
- [ ] Timeout and circuit breaker thresholds configured
- [ ] Escalation paths defined (agent to agent, agent to human)

### Safety and Governance

- [ ] Human-in-the-loop gates for destructive actions (deploy, delete, rollback)
- [ ] Agent outputs validated against schemas before downstream consumption
- [ ] Cost monitoring and alerting configured per agent team
- [ ] Kill switch available to halt all agent activity immediately
- [ ] Regular review of agent decision logs for quality and drift
- [ ] Incident response plan covers agent-caused failures
