Build a Custom Python Agent Orchestration Engine with Dynamic Routing: A Step-by-Step Developer Tutorial
Most agent orchestration frameworks are overengineered. They promise the moon — dynamic graphs, runtime reflection, pluggable everything — and then you spend two weeks just trying to route a damn task to the right agent.
I’ve been there.
Hire Vietnamese Developers: The Strategic Edge for Your Next Build
TL;DR: Vietnam is emerging as a dominant hub for offshore software development due to its strong technical education,… ...
A few months ago, our team in Ho Chi Minh City was building a multi-agent pipeline for a logistics client. The task seemed simple: route incoming shipment events to the correct processing agent based on event type, priority, and current system load. We tried LangGraph. We tried a custom DAG framework. Both felt like using a cruise missile to kill a mosquito.
So I built a lightweight orchestration engine in about 200 lines of Python. It’s been running in production for three months, processing over 50,000 tasks daily. No crashes. No silent failures. Here’s exactly how it works.
The Hidden Cost of Agent Context Switching: Why Your Multi-Agent System Is Slower Than a Single Agent (And How to Fix It)
The Hidden Cost of Agent Context Switching: Why Your Multi-Agent System Is Slower Than a Single Agent (And… ...
What We’re Building
This engine does four things:
- Dynamic routing — picks the right agent based on task metadata, not a hardcoded chain
- Priority queuing — high-priority tasks jump the line, but low-priority ones don’t starve
- Retry with exponential backoff — because stuff fails, and you need to handle it gracefully
- Observability hooks — every routing decision gets logged so you can debug later
No external dependencies beyond the standard library and `asyncio`. You can drop this into any Python project.
The Core Architecture
Here’s the mental model. You have a registry of agents. Each agent declares what tasks it can handle. When a task arrives, the orchestrator evaluates all agents, picks the best match, and enqueues the task. A worker pool picks up tasks and executes them.
Task In -> Router -> Priority Queue -> Worker Pool -> Agent Execution -> Result Out
The key insight? Routing happens at runtime, not design time. You don’t hardcode “if event_type == ‘shipment’ then agent_a”. Instead, each agent scores itself on capability, and the router picks the highest scorer.
Step 1: Define the Agent Interface
Every agent needs to tell the system two things: “what can I handle?” and “how do I do the work?”
python
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class BaseAgent(ABC):
def __init__(self, name: str, max_concurrency: int = 3):
self.name = name
self.max_concurrency = max_concurrency
self.active_tasks: int = 0
@abstractmethod
async def can_handle(self, task: Dict[str, Any]) -> float:
"""
Return a confidence score between 0.0 and 1.0.
0.0 = can't handle this task at all.
1.0 = this task was made for me.
"""
pass
@abstractmethod
async def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the task and return results."""
pass
Notice the `max_concurrency` field. That’s how we prevent overloading a single agent. The orchestrator checks this before routing.
Step 2: Build the Agent Registry
The registry is just a dict with some convenience methods. Dead simple.
python
class AgentRegistry:
def __init__(self):
self._agents: Dict[str, BaseAgent] = {}
def register(self, agent: BaseAgent):
self._agents[agent.name] = agent
print(f" [Registry] Agent '{agent.name}' registered")
def unregister(self, name: str):
self._agents.pop(name, None)
def get_available(self, task: Dict[str, Any]) -> list[tuple[str, float]]:
results = []
for name, agent in self._agents.items():
if agent.active_tasks < agent.max_concurrency:
results.append((name, agent.can_handle(task)))
return results
That `get_available` method is where the magic starts. It filters out agents that are at capacity, then asks each remaining agent to score the task. We get back a list of (agent_name, score_coroutine) tuples.
Why return coroutines instead of awaited values? Because we want to evaluate all agents concurrently, not one by one. That's the next step.
Step 3: The Dynamic Router
The router calls every agent's `can_handle` concurrently, picks the highest score above a threshold, and returns the winner.
python
import asyncio
class DynamicRouter:
def __init__(self, registry: AgentRegistry, min_score: float = 0.3):
self.registry = registry
self.min_score = min_score
async def route(self, task: Dict[str, Any]) -> Optional[str]:
candidates = self.registry.get_available(task)
if not candidates:
return None
# Evaluate all agents concurrently
names, coros = zip(*[(n, c) for n, c in candidates])
scores = await asyncio.gather(*coros)
scored_agents = list(zip(names, scores))
scored_agents.sort(key=lambda x: x[1], reverse=True)
best_name, best_score = scored_agents[0]
if best_score < self.min_score:
return None
return best_name
A few things to notice:
- `asyncio.gather` runs all the `can_handle` coroutines in parallel. For 5 agents, that's ~1x latency instead of 5x.
- The `min_score` threshold prevents routing to an agent that's only marginally qualified. Better to fail fast than to route a task to the wrong agent and waste compute.
- If an agent is at capacity, it never even gets asked. That's the `get_available` filter doing its job.
Step 4: Priority Queue with Anti-Starvation
A naive priority queue can starve low-priority tasks. If high-priority tasks keep arriving, the queue never drains. Here's a pattern that prevents that.
python
import heapq
import time
from dataclasses import dataclass, field
@dataclass(order=True)
class PrioritizedTask:
priority: int
enqueue_time: float = field(compare=False)
task_id: str = field(compare=False)
task_data: Dict[str, Any] = field(compare=False)
target_agent: str = field(compare=False)
class AntiStarvationQueue:
def __init__(self, max_priority: int = 5, aging_interval: float = 30.0):
self._heap: list[PrioritizedTask] = []
self._max_priority = max_priority
self._aging_interval = aging_interval
def push(self, task: PrioritizedTask):
# Apply aging: boost priority based on wait time
wait_time = time.time() - task.enqueue_time
age_boost = int(wait_time / self._aging_interval)
adjusted_priority = max(0, task.priority - age_boost)
adjusted_task = PrioritizedTask(
priority=adjusted_priority,
enqueue_time=task.enqueue_time,
task_id=task.task_id,
task_data=task.task_data,
target_agent=task.target_agent,
)
heapq.heappush(self._heap, adjusted_task)
def pop(self) -> Optional[PrioritizedTask]:
if not self._heap:
return None
return heapq.heappop(self._heap)
def __len__(self):
return len(self._heap)
The aging mechanism is the star here. Every 30 seconds a task sits in the queue, its priority gets boosted by one level. A priority-3 task that's been waiting 90 seconds gets treated as priority 0 (the highest). This guarantees that even the lowest-priority task eventually gets processed.
I set `aging_interval` to 30 seconds based on our production data — it's the sweet spot between responsiveness and throughput. Your mileage may vary.
Step 5: The Orchestrator Tying It All Together
This is the main loop. It listens for incoming tasks, routes them, enqueues them, and dispatches them to workers.
python
class OrchestrationEngine:
def __init__(
self,
registry: AgentRegistry,
router: DynamicRouter,
queue: AntiStarvationQueue,
num_workers: int = 4,
):
self.registry = registry
self.router = router
self.queue = queue
self.num_workers = num_workers
self._task_counter = 0
self._running = False
async def submit_task(self, task_data: Dict[str, Any]) -> str:
self._task_counter += 1
task_id = f"task-{self._task_counter}"
agent_name = await self.router.route(task_data)
if agent_name is None:
raise ValueError(f"No suitable agent for task {task_id}")
prioritized = PrioritizedTask(
priority=task_data.get("priority", 3),
enqueue_time=time.time(),
task_id=task_id,
task_data=task_data,
target_agent=agent_name,
)
self.queue.push(prioritized)
return task_id
async def _worker_loop(self, worker_id: int):
while self._running:
task = self.queue.pop()
if task is None:
await asyncio.sleep(0.1)
continue
agent = self.registry._agents.get(task.target_agent)
if agent is None:
print(f" [Worker {worker_id}] Agent '{task.target_agent}' not found, dropping task {task.task_id}")
continue
agent.active_tasks += 1
try:
result = await self._execute_with_retry(agent, task.task_data)
# Log or emit result here
print(f" [Worker {worker_id}] Task {task.task_id} completed by '{agent.name}'")
except Exception as e:
print(f" [Worker {worker_id}] Task {task.task_id} failed: {e}")
finally:
agent.active_tasks -= 1
async def _execute_with_retry(self, agent: BaseAgent, task_data: Dict[str, Any], max_retries: int = 3):
last_exception = None
for attempt in range(max_retries):
try:
return await agent.execute(task_data)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait = 2 ** attempt # exponential backoff: 1s, 2s, 4s
await asyncio.sleep(wait)
raise last_exception
async def start(self):
self._running = True
workers = [asyncio.create_task(self._worker_loop(i)) for i in range(self.num_workers)]
await asyncio.gather(*workers)
def stop(self):
self._running = False
Step 6: Wire Up Real Agents
Let's make this concrete. Here are two agents that a logistics client might use.
python
class ShipmentTrackingAgent(BaseAgent):
def __init__(self):
super().__init__("shipment_tracker", max_concurrency=5)
async def can_handle(self, task: Dict[str, Any]) -> float:
if task.get("type") == "shipment_update":
# Perfect match
return 0.95
if "shipment_id" in task:
# Could handle, but not ideal
return 0.5
return 0.0
async def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
# Simulate some work
await asyncio.sleep(0.2)
return {"status": "tracked", "shipment_id": task["shipment_id"]}
class FraudDetectionAgent(BaseAgent):
def __init__(self):
super().__init__("fraud_detector", max_concurrency=2)
async def can_handle(self, task: Dict[str, Any]) -> float:
if task.get("type") in ("fraud_check", "high_value_transaction"):
return 0.98
if task.get("risk_score", 0) > 70:
return 0.85
return 0.1
async def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
await asyncio.sleep(0.5) # heavier computation
return {"flagged": False, "confidence": 0.99}
Notice the `max_concurrency` difference. The fraud detector is more resource-intensive, so we limit it to 2 concurrent tasks. The shipment tracker can handle 5. The orchestrator respects these limits automatically.
Step 7: Run It
python
async def main():
# Setup
registry = AgentRegistry()
registry.register(ShipmentTrackingAgent())
registry.register(FraudDetectionAgent())
router = DynamicRouter(registry, min_score=0.3)
queue = AntiStarvationQueue(aging_interval=30.0)
engine = OrchestrationEngine(registry, router, queue, num_workers=4)
# Submit some tasks
tasks = [
{"type": "shipment_update", "shipment_id": "SHP-001", "priority": 2},
{"type": "fraud_check", "transaction_id": "TXN-042", "risk_score": 85, "priority": 0},
{"type": "shipment_update", "shipment_id": "SHP-002", "priority": 4},
{"type": "unknown_event", "priority": 3}, # This should fail
]
for t in tasks:
try:
task_id = await engine.submit_task(t)
print(f"Submitted {task_id} -> {t.get('type', 'unknown')}")
except ValueError as e:
print(f"Rejected task: {e}")
# Start processing
await engine.start()
asyncio.run(main())
Run this, and you'll see the router correctly assign shipment updates to the `shipment_tracker`, fraud checks to the `fraud_detector`, and reject the unknown event because no agent scored above 0.3.
What We Learned in Production
We've been running a variation of this engine in production for three months, processing logistics events for a US-based client. Here's what stood out.
Dynamic routing eliminated our agent selection bugs. Previously, we used a static if-else chain that grew to 15 conditions. Every time we added a new event type, we had to update the chain. With dynamic routing, we just register a new agent. That's it.
Priority aging is non-negotiable. In our first week, a batch of low-priority inventory sync tasks got stuck behind a firehose of high-priority shipment updates. Some waited over 4 hours. After adding aging, the max wait time for any task dropped to under 3 minutes.
Workers need backpressure. We started with 8 workers and no concurrency limits on agents. The fraud detector got overwhelmed with 6 concurrent tasks and started timing out. Adding `max_concurrency` per agent solved it. We now set this based on each agent's average execution time and memory footprint.
Testing the router is straightforward. Since each agent's `can_handle` is a pure function of the task data, we unit test routing decisions without spinning up the full engine. Here's a quick example from our test suite:
python
async def test_routing():
registry = AgentRegistry()
registry.register(ShipmentTrackingAgent())
registry.register(FraudDetectionAgent())
router = DynamicRouter(registry)
# Shipment update should route to shipment_tracker
agent = await router.route({"type": "shipment_update", "shipment_id": "SHP-001"})
assert agent == "shipment_tracker"
# Fraud check should route to fraud_detector
agent = await router.route({"type": "fraud_check", "risk_score": 85})
assert agent == "fraud_detector"
# Unknown event should return None
agent = await router.route({"type": "nobody_can_handle_this"})
assert agent is None
Extending the Engine
This is a foundation, not a finished product. Here are three extensions we've added since the initial build:
Observability with structured logging. Every routing decision, queue push, and execution result gets logged as JSON. We pipe this to a central ELK stack. When something goes wrong, we trace the exact path a task took through the system.
Circuit breaker per agent. If an agent fails 5 consecutive tasks, the orchestrator stops sending it work for 60 seconds. This prevents a failing agent from dragging down the entire pipeline.
Dynamic agent scaling. We're experimenting with spawning additional agent instances when the queue depth exceeds a threshold. This is trickier because agents often have shared state, but for stateless agents it works well.
Honestly, the most valuable part of this design is its simplicity. New developers on our team in Can Tho can understand the routing logic in an afternoon. No framework lock-in. No magic decorators. Just Python coroutines and a priority heap.
If you're building a multi-agent system and feel like the frameworks are getting in your way, try this approach
Related reading: Why You Should Hire Vietnamese Developers in 2025: The Smartest Offshore Bet
Related reading: Why Vietnam Outsourcing is the Smartest Move for Your Tech Stack in 2025