"Architecture is what keeps the agent running at 3 AM when nobody is watching."
Deploy, Production-Hardened AI Agent
A production agent is far more than a model in a loop. The agent loop from Section 22.1 described the conceptual cycle of perceive, reason, and act. Deploying that loop in production requires surrounding it with infrastructure: a planner that decomposes tasks, a tool router that dispatches actions, a memory manager that persists context, an execution sandbox that isolates side effects, an evaluator that checks results, a recovery handler that manages failures, a permissions gate that enforces access control, and a cost controller that keeps spending within budget. This section presents a reference architecture that wires all of these components together, giving you a blueprint you can adapt to any agent deployment.
Prerequisites
This section assumes familiarity with the agent loop and cognitive architectures from Section 22.1, agent memory systems from Section 22.2, and planning strategies from Section 22.3. Practical experience with Python async programming and basic familiarity with web service patterns (rate limiting, circuit breakers) will help you follow the production-oriented examples.
1. The Eight Components of a Production Agent
Building a reliable agent system requires eight distinct components, each responsible for a specific concern. Separating these concerns makes the system easier to test, debug, and extend. When any component fails, the others can continue operating in a degraded mode rather than bringing down the entire agent.
The components are: (1) the Planner, which decomposes user requests into executable steps; (2) the Tool Router, which maps planned actions to available tools; (3) the Memory Manager, which reads and writes persistent state; (4) the Execution Sandbox, which runs tool calls in isolation; (5) the Evaluator, which validates results against expectations; (6) the Recovery Handler, which manages retries, rollbacks, and graceful degradation; (7) the Permissions Gate, which enforces access control at every boundary; and (8) the Cost Controller, which tracks and limits token and API spending.
The most common production failure mode is not a model hallucination; it is an unhandled edge case in the infrastructure surrounding the model. A tool times out and nobody catches the exception. A context window overflows because memory was not pruned. A user escalates to an expensive model and nobody notices the cost spike. The eight-component architecture exists to handle these failure modes systematically rather than through ad-hoc patches.
2. Reference Architecture: Request Flow
Every request to the agent system follows a predictable path through the eight components. Understanding this flow is essential for debugging production issues, because the failure mode tells you exactly which component to investigate. The sequence is as follows.
First, the user's input arrives at the Permissions Gate, which checks whether the user is authenticated and authorized to invoke the agent. If the request passes, the Cost Controller checks whether the user's budget allows another request. The Memory Manager then loads relevant context: the user's profile, recent conversation history, and any semantic memories retrieved via embedding search. This assembled context flows into the Planner, which calls the LLM to produce a step-by-step plan. Each planned step is dispatched to the Tool Router, which selects the appropriate tool and sends the call to the Execution Sandbox. After execution, the Evaluator checks whether the result meets the expected criteria. If the evaluation fails, the Recovery Handler decides whether to retry, try a different approach, or abort with a user-facing message. Once all steps complete, the Memory Manager persists any new information and the final response is returned to the user.
The following diagram describes the reference architecture.
Each component communicates through typed messages, making the interfaces explicit and testable. The Planner component below defines these data structures and then uses them to decompose a user request into a sequence of tool calls via the OpenAI API. The plan-and-execute pattern from Section 22.3 provides the theoretical foundation; here we implement it concretely.
# Code Fragment 22.6.2: Data structures and Planner for the agent system
import json
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from openai import AsyncOpenAI
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class AgentRequest:
user_id: str
session_id: str
message: str
permissions: list[str] = field(default_factory=list)
max_budget_usd: float = 1.0
@dataclass
class PlanStep:
step_id: int
description: str
tool_name: str
tool_args: dict[str, Any] = field(default_factory=dict)
status: StepStatus = StepStatus.PENDING
result: Any = None
cost_usd: float = 0.0
@dataclass
class AgentPlan:
steps: list[PlanStep]
reasoning: str
estimated_cost_usd: float = 0.0
@dataclass
class AgentResponse:
message: str
plan: AgentPlan
total_cost_usd: float = 0.0
steps_completed: int = 0
steps_failed: int = 0
class Planner:
"""Decomposes a user request into an ordered sequence of tool calls."""
def __init__(self, client: AsyncOpenAI, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def create_plan(
self,
request: AgentRequest,
memory_context: str,
available_tools: list[dict],
) -> AgentPlan:
tool_descriptions = "\n".join(
f"- {t['name']}: {t['description']} "
f"(requires: {', '.join(t.get('permissions', ['none']))})"
for t in available_tools
)
system_prompt = f"""You are a planning agent. Decompose the user's
request into a sequence of tool calls.
Available tools:
{tool_descriptions}
User permissions: {', '.join(request.permissions)}
Budget limit: ${request.max_budget_usd:.2f}
Respond with JSON: {{"reasoning": "...", "steps": [
{{"description": "...", "tool_name": "...", "tool_args": {{...}}}}
]}}"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": (
f"Context:\n{memory_context}\n\n"
f"Request: {request.message}"
)},
],
response_format={"type": "json_object"},
temperature=0.0,
)
data = json.loads(response.choices[0].message.content)
steps = [
PlanStep(step_id=i, description=s["description"],
tool_name=s["tool_name"],
tool_args=s.get("tool_args", {}))
for i, s in enumerate(data["steps"])
]
return AgentPlan(steps=steps, reasoning=data["reasoning"])
4. Tool Router and Execution Sandbox
The Tool Router maps a tool name from the plan to the actual callable implementation, while the Execution Sandbox provides isolation for running that tool. Isolation is important for several reasons: a misbehaving tool should not crash the agent, tool execution time should be bounded, and side effects should be trackable for audit logging. In practice, the sandbox can range from a simple try/except with a timeout (shown here) to a full container-based sandbox for code execution agents.
# Code Fragment 22.6.5: Tool Router and Execution Sandbox
import asyncio
from typing import Callable, Awaitable
class ToolRouter:
"""Maps tool names to implementations and enforces permissions."""
def __init__(self):
self._tools: dict[str, Callable[..., Awaitable[Any]]] = {}
self._permissions: dict[str, list[str]] = {}
def register(
self, name: str, fn: Callable[..., Awaitable[Any]],
required_permissions: list[str] | None = None,
):
self._tools[name] = fn
self._permissions[name] = required_permissions or []
def check_permission(self, tool_name: str, user_perms: list[str]) -> bool:
required = self._permissions.get(tool_name, [])
return all(p in user_perms for p in required)
def get_tool(self, name: str) -> Callable[..., Awaitable[Any]] | None:
return self._tools.get(name)
class ExecutionSandbox:
"""Runs tool calls with timeout and error isolation."""
def __init__(self, timeout_seconds: float = 30.0):
self.timeout = timeout_seconds
async def execute(
self, tool_fn: Callable[..., Awaitable[Any]],
tool_args: dict[str, Any],
) -> tuple[bool, Any]:
"""Returns (success, result_or_error)."""
try:
result = await asyncio.wait_for(
tool_fn(**tool_args), timeout=self.timeout
)
return True, result
except asyncio.TimeoutError:
return False, f"Tool timed out after {self.timeout}s"
except Exception as e:
return False, f"Tool error: {type(e).__name__}: {e}"
asyncio.wait_for.Never run untrusted tool code in the same process as your agent without sandboxing. A tool that enters an infinite loop, consumes all available memory, or raises an unhandled exception can take down the entire agent. For code execution tools specifically, use container-based isolation (Docker, gVisor, or a cloud sandbox service). The asyncio.wait_for pattern shown above handles timeouts but does not protect against memory exhaustion or malicious system calls.
5. Production Concerns: Rate Limiting, Circuit Breakers, and Graceful Degradation
Production agent systems must handle the reality that external services fail. APIs go down, rate limits are hit, and network connections drop. Three patterns from distributed systems engineering are essential for building resilient agents.
Rate Limiting
Every external API the agent calls has rate limits. Exceeding them causes cascading failures as retries pile up. A token bucket rate limiter, applied per tool, prevents the agent from overwhelming any single service. The rate limiter should be shared across all agent instances if you are running multiple replicas.
Circuit Breakers
A circuit breaker tracks consecutive failures for each tool. After a configurable threshold (for example, five consecutive failures), the breaker "opens" and immediately rejects further calls to that tool for a cooldown period. This prevents the agent from wasting time and budget on a tool that is clearly down. After the cooldown, the breaker enters a "half-open" state and allows one test call through. If the test succeeds, the breaker closes and normal operation resumes.
Graceful Degradation
When a tool is unavailable, the agent should not simply fail. Instead, the Recovery Handler should check whether an alternative tool can accomplish the same goal, or whether the agent can provide a partial answer without the tool. For example, if a web search tool is down, the agent might fall back to its parametric knowledge and clearly disclose that it could not verify the information with a live search.
# Code Fragment 22.6.4: Circuit breaker implementation
import time
class CircuitBreaker:
"""Prevents repeated calls to a failing tool."""
def __init__(
self, failure_threshold: int = 5,
cooldown_seconds: float = 60.0,
):
self.failure_threshold = failure_threshold
self.cooldown = cooldown_seconds
self._failures: dict[str, int] = {}
self._opened_at: dict[str, float] = {}
def is_open(self, tool_name: str) -> bool:
if self._failures.get(tool_name, 0) < self.failure_threshold:
return False
opened = self._opened_at.get(tool_name, 0)
if time.time() - opened > self.cooldown:
# Half-open: allow one attempt
self._failures[tool_name] = self.failure_threshold - 1
return False
return True
def record_success(self, tool_name: str):
self._failures[tool_name] = 0
self._opened_at.pop(tool_name, None)
def record_failure(self, tool_name: str):
self._failures[tool_name] = self._failures.get(tool_name, 0) + 1
if self._failures[tool_name] >= self.failure_threshold:
self._opened_at[tool_name] = time.time()
6. Cost Control: Token Budgeting and Cascade Routing
Agent systems can consume tokens at an alarming rate. A single complex request might trigger dozens of LLM calls across planning, tool selection, evaluation, and replanning. Without explicit cost control, a runaway agent loop can burn through an entire monthly API budget in minutes. The Cost Controller addresses this with two mechanisms: per-request token budgets and cascade routing.
Per-Request Token Budgets
Each AgentRequest carries a max_budget_usd field. The Cost Controller tracks cumulative spending across all LLM calls and tool invocations within a single request. Before each LLM call, the controller checks the remaining budget. If the remaining budget is too low for the planned call, the agent must either abort or switch to a cheaper strategy.
Cascade Routing
Cascade routing sends requests to the cheapest adequate model first and only escalates to more expensive models when the cheaper one cannot produce a satisfactory result. For example, simple tool selection might use gpt-4o-mini at $0.15 per million input tokens, while complex multi-step planning escalates to claude-sonnet-4-20250514 or o3. The evaluator's quality score determines whether escalation is needed.
# Code Fragment 22.6.5: Cost Controller with cascade routing
from dataclasses import dataclass
@dataclass
class ModelTier:
name: str
model_id: str
cost_per_1k_input: float
cost_per_1k_output: float
class CostController:
"""Tracks spending and routes to the cheapest adequate model."""
TIERS = [
ModelTier("fast", "gpt-4o-mini", 0.00015, 0.0006),
ModelTier("balanced", "claude-sonnet-4-20250514", 0.003, 0.015),
ModelTier("powerful", "o3", 0.01, 0.04),
]
def __init__(self):
self._spent: dict[str, float] = {} # session_id -> total USD
def get_spent(self, session_id: str) -> float:
return self._spent.get(session_id, 0.0)
def record_cost(self, session_id: str, cost_usd: float):
self._spent[session_id] = self.get_spent(session_id) + cost_usd
def can_afford(self, session_id: str, budget: float,
estimated_cost: float) -> bool:
return self.get_spent(session_id) + estimated_cost <= budget
def select_tier(self, session_id: str, budget: float,
min_tier: str = "fast") -> ModelTier | None:
"""Select cheapest tier at or above min_tier within budget."""
started = False
for tier in self.TIERS:
if tier.name == min_tier:
started = True
if not started:
continue
# Estimate cost for a typical call (~2k input, ~500 output)
estimated = (2 * tier.cost_per_1k_input
+ 0.5 * tier.cost_per_1k_output)
if self.can_afford(session_id, budget, estimated):
return tier
return None # Budget exhausted
Cascade routing often saves 60 to 80% of API costs with minimal quality degradation. In practice, most agent steps (simple tool dispatch, formatting results, short follow-up questions) do not require the most capable model. Reserve expensive models for the planning phase and for recovery after evaluation failures. Measure your cascade hit rate (the percentage of calls handled by the cheapest tier) as a key operational metric.
7. Permissions Model
The Permissions Gate enforces security at three levels: user-level access (who can use the agent), tool-level access control lists (which tools a given user can invoke), and action-level constraints (what parameters are allowed). Every tool invocation passes through the gate before reaching the sandbox. This is especially important for agents that can execute code, send emails, modify databases, or access external services.
Audit logging is the companion to access control. Every permission check, whether it passes or fails, is logged with a timestamp, user ID, tool name, and the decision. This log serves both security auditing and debugging purposes. When an agent produces an unexpected result, the audit log reveals exactly which tools it invoked and in what order.
# Code Fragment 22.6.6: Permissions Gate with audit logging
import logging
from datetime import datetime, timezone
audit_log = logging.getLogger("agent.audit")
class PermissionsGate:
"""Enforces tool-level access control with audit trail."""
def __init__(self, acl: dict[str, list[str]]):
# acl maps tool_name -> list of allowed permission strings
self._acl = acl
def check(self, user_id: str, user_perms: list[str],
tool_name: str, tool_args: dict) -> bool:
required = self._acl.get(tool_name, [])
allowed = all(p in user_perms for p in required)
audit_log.info(
"permission_check | %s | user=%s | tool=%s | "
"required=%s | granted=%s | args_keys=%s",
datetime.now(timezone.utc).isoformat(),
user_id, tool_name, required, allowed,
list(tool_args.keys()),
)
return allowed
def filter_tools(self, user_perms: list[str],
tools: list[dict]) -> list[dict]:
"""Return only tools the user is allowed to invoke."""
return [
t for t in tools
if all(p in user_perms
for p in t.get("permissions", []))
]
8. Recovery Patterns
Failures in agent systems are inevitable. Tools time out, APIs return errors, and LLMs produce invalid outputs. The Recovery Handler implements three patterns that keep the agent operational despite failures.
Checkpoint and Resume
After each successful step, the agent saves a checkpoint containing the current plan state, all completed step results, and the accumulated context. If a later step fails and the agent must restart, it resumes from the last checkpoint rather than repeating all previous work. This is particularly valuable for long-running tasks that span many tool calls.
Idempotent Actions
Tools should be designed so that repeating the same call produces the same result without harmful side effects. For example, a "create file" tool should check whether the file already exists before creating it. When idempotent design is not possible (such as sending an email), the agent should track which actions have been executed and skip them on retry.
Rollback
For operations that support undo (database transactions, file system changes, API operations with delete endpoints), the Recovery Handler can reverse completed steps when a later step makes the entire plan invalid. The key requirement is that each tool registers a rollback function alongside its forward function.
Design your tools with recovery in mind from the start. Every tool should answer three questions: (1) Is this action idempotent? (2) Can it be rolled back? (3) What does a partial failure look like? Documenting these properties in the tool's metadata allows the Recovery Handler to make informed decisions automatically rather than relying on hardcoded retry logic.
9. Wiring It All Together
The following code shows the complete agent system skeleton that wires all eight components into a single AgentSystem class. This is a minimal but functional implementation that you can use as a starting point for production systems. Each component is injected as a dependency, making it straightforward to swap implementations (for example, replacing the in-memory cost tracker with a Redis-backed one).
# Code Fragment 22.6.7: Complete agent system skeleton
class AgentSystem:
"""Wires all eight components into a production agent."""
def __init__(
self,
planner: Planner,
tool_router: ToolRouter,
sandbox: ExecutionSandbox,
cost_ctrl: CostController,
perms_gate: PermissionsGate,
circuit_breaker: CircuitBreaker,
memory_manager, # see Section 22.7
max_retries: int = 2,
):
self.planner = planner
self.router = tool_router
self.sandbox = sandbox
self.cost = cost_ctrl
self.perms = perms_gate
self.breaker = circuit_breaker
self.memory = memory_manager
self.max_retries = max_retries
async def handle(self, request: AgentRequest) -> AgentResponse:
# 1. Permissions: authenticate user
if not request.permissions:
return AgentResponse(
message="Access denied: no permissions.",
plan=AgentPlan(steps=[], reasoning=""),
)
# 2. Cost check: ensure budget is available
if not self.cost.can_afford(
request.session_id, request.max_budget_usd, 0.01
):
return AgentResponse(
message="Budget exhausted for this session.",
plan=AgentPlan(steps=[], reasoning=""),
)
# 3. Memory: load relevant context
memory_ctx = await self.memory.load_context(
request.user_id, request.session_id, request.message
)
# 4. Filter tools by user permissions
all_tools = self.router._tools
allowed_tools = self.perms.filter_tools(
request.permissions,
[{"name": n, "description": "", "permissions": []}
for n in all_tools],
)
# 5. Plan
plan = await self.planner.create_plan(
request, memory_ctx, allowed_tools
)
# 6. Execute each step
completed, failed = 0, 0
for step in plan.steps:
# Check circuit breaker
if self.breaker.is_open(step.tool_name):
step.status = StepStatus.SKIPPED
failed += 1
continue
# Check permissions for this specific tool
if not self.perms.check(
request.user_id, request.permissions,
step.tool_name, step.tool_args,
):
step.status = StepStatus.SKIPPED
failed += 1
continue
# Execute with retries
tool_fn = self.router.get_tool(step.tool_name)
if tool_fn is None:
step.status = StepStatus.FAILED
step.result = f"Unknown tool: {step.tool_name}"
failed += 1
continue
success = False
for attempt in range(self.max_retries + 1):
ok, result = await self.sandbox.execute(
tool_fn, step.tool_args
)
if ok:
step.status = StepStatus.SUCCESS
step.result = result
self.breaker.record_success(step.tool_name)
success = True
break
self.breaker.record_failure(step.tool_name)
if success:
completed += 1
else:
step.status = StepStatus.FAILED
step.result = result
failed += 1
# 7. Persist memory
await self.memory.save_context(
request.user_id, request.session_id,
request.message, plan,
)
# 8. Build response
total_cost = self.cost.get_spent(request.session_id)
summary = "\n".join(
f"Step {s.step_id}: {s.status.value} ({s.description})"
for s in plan.steps
)
return AgentResponse(
message=f"Completed.\n{summary}",
plan=plan,
total_cost_usd=total_cost,
steps_completed=completed,
steps_failed=failed,
)
handle() method.10. Putting It Into Practice
To see how the system skeleton comes together, consider a concrete scenario: a customer support agent that can look up orders, check inventory, issue refunds, and send emails. Each of these is a tool with specific permissions. The following snippet shows how to register tools and run the agent.
# Code Fragment 22.6.6: Registering tools and running the agent
from openai import AsyncOpenAI
# Define tool implementations
async def lookup_order(order_id: str) -> dict:
# In production, query your order database
return {"order_id": order_id, "status": "shipped", "total": 49.99}
async def check_inventory(product_id: str) -> dict:
return {"product_id": product_id, "in_stock": True, "quantity": 42}
async def issue_refund(order_id: str, amount: float) -> dict:
return {"order_id": order_id, "refunded": amount, "success": True}
# Wire up the system
client = AsyncOpenAI()
router = ToolRouter()
router.register("lookup_order", lookup_order)
router.register("check_inventory", check_inventory)
router.register("issue_refund", issue_refund,
required_permissions=["refunds"])
agent = AgentSystem(
planner=Planner(client),
tool_router=router,
sandbox=ExecutionSandbox(timeout_seconds=15.0),
cost_ctrl=CostController(),
perms_gate=PermissionsGate({
"lookup_order": [],
"check_inventory": [],
"issue_refund": ["refunds"],
}),
circuit_breaker=CircuitBreaker(),
memory_manager=None, # replaced with real memory in Section 22.7
)
# Handle a request
import asyncio
request = AgentRequest(
user_id="agent-42",
session_id="sess-001",
message="Check if order ORD-123 is shipped and refund it if so.",
permissions=["read", "refunds"],
max_budget_usd=0.50,
)
response = asyncio.run(agent.handle(request))
print(response.message)
Modify the ExecutionSandbox to support per-tool timeout overrides. Some tools (like web scraping) legitimately need longer timeouts than others (like database lookups). Add a timeout_overrides: dict[str, float] parameter to the constructor and use it in execute().
Answer Sketch
Store the overrides dict in __init__. In execute(), accept a tool_name parameter. Look up self.timeout_overrides.get(tool_name, self.timeout) and pass that value to asyncio.wait_for(). This allows setting 60s for web_scrape while keeping the default at 15s for everything else.
Extend the CostController to track per-model-tier spending and per-tool spending separately. Add methods get_cost_breakdown(session_id) that returns a dictionary with keys "by_tier" and "by_tool", each mapping to sub-dictionaries of costs. This data is essential for building an operational cost dashboard.
Answer Sketch
Add two more dictionaries alongside _spent: _by_tier[session_id][tier_name] and _by_tool[session_id][tool_name]. Update record_cost() to accept tier_name and tool_name parameters and increment the appropriate counters. The breakdown method reads from these nested dictionaries.
Implement a CheckpointManager that saves agent state after each successful step and can resume from the last checkpoint. Use JSON serialization to persist the plan state and completed step results to a file. Add a resume(session_id) method to AgentSystem that loads the checkpoint and continues execution from where it left off.
Answer Sketch
Create a CheckpointManager class that writes {session_id}.json after each successful step, containing the serialized AgentPlan and the index of the last completed step. In AgentSystem.handle(), check for an existing checkpoint at the start. If found, deserialize the plan and skip steps that are already marked as SUCCESS. The key subtlety is that tool arguments may reference results from earlier steps, so the checkpoint must also store step results.
- Production agents need eight key components: rate limiting, circuit breakers, graceful degradation, token budgets, cascade routing, checkpointing, idempotent actions, and observability.
- Circuit breakers prevent cascading failures; graceful degradation keeps the agent functional with reduced capabilities.
- Per-request token budgets prevent runaway costs; cascade routing optimizes cost by trying cheaper models first.
- Idempotent actions ensure that retried operations do not produce duplicate side effects.
Show Answer
Examples: (1) Rate limiting prevents runaway API costs and respects provider quotas. (2) Circuit breakers stop cascading failures when a downstream service is unhealthy. (3) Checkpoint and resume enables long-running agents to recover from transient failures without restarting from scratch.
Show Answer
A circuit breaker stops making calls to a failing service entirely (fast-fail), while graceful degradation continues operating with reduced functionality, such as falling back to a cheaper model or returning cached results instead of live data.
What Comes Next
The next section, Section 22.7: Memory Architecture for Agents, dives deep into the memory manager component introduced here. You will learn how to design multi-tier memory systems with explicit write policies, read policies, and forgetting strategies that keep agents both informed and efficient.
References and Further Reading
Production Agent Architecture
Anthropic (2024). "Building Effective Agents."
Anthropic's practical guide to production agent patterns including routing, parallelization, orchestration, and evaluation. Essential reference for system architecture decisions.
The CoALA framework provides a principled way to design agent system components including memory, action spaces, and decision-making procedures used in production deployments.
Introduces the AutoGen framework for building multi-agent systems with customizable conversation patterns, demonstrating production-grade agent orchestration.
Reliability and Deployment Patterns
Kapoor, S., Stroebl, B., Siber, Z.S., et al. (2024). "AI Agents That Matter." arXiv preprint.
Analyzes the gap between benchmark performance and production reliability, offering practical guidance on cost-performance trade-offs for deployed agent systems.
Demonstrates end-to-end agent deployment for GUI interaction, illustrating production concerns like action grounding, error recovery, and execution sandboxing.
Nelhage, N. (2024). "Transformers for Software Engineers."
Practical engineering perspective on building reliable systems with LLMs, covering rate limiting, circuit breakers, and other patterns essential for production agent deployments.
