Build a Resilient Multi-Agent Task Queue with Python and Redis: A Step-by-Step Developer Tutorial
You’ve built a multi-agent system. It works in dev. Then production hits, and everything falls apart. Agents hang, tasks get lost, and you’re debugging at 2 AM.
This is the reality most tutorials conveniently skip.
How to Build a Custom AI Code Review Agent: A Step-by-Step Tutorial with ECOA AI Platform ACP
How to Build a Custom AI Code Review Agent: A Step-by-Step Tutorial with ECOA AI Platform ACP Let’s… ...
Here’s the thing: agents will fail. Your LLM API can timeout. A downstream service might be slow. The network could hiccup. If your agent orchestration treats these as edge cases, you’re in for a world of pain.
In this tutorial, I’ll show you how to build a resilient multi-agent task queue using Python and Redis — the exact patterns we use at ECOAAI with our Vietnamese engineering team. We’ll implement retry logic with exponential backoff, circuit breakers, and state recovery. By the end, you’ll have a queue that survives real-world chaos.
How We Cut Code Review Time from 2 Days to 2 Hours with a Remote Vietnamese Team and AI Automation
How We Cut Code Review Time from 2 Days to 2 Hours with a Remote Vietnamese Team and… ...
Let’s dive in.
Why a Task Queue for Multi-Agent Systems?
Most multi-agent architectures use direct function calls or HTTP requests. That works for prototypes, but in production, you need asynchronous, decoupled communication. A task queue gives you:
- Backpressure handling – when agents are busy, tasks queue up.
- Retry mechanisms – without blocking the caller.
- Observability – you see exactly what’s pending, running, or failed.
- Scalability – spin up more workers without touching orchestration code.
Redis is perfect here. It’s fast, persistent (with RDB/AOF), and supports atomic list operations. We use it at ECOAAI as the backbone for our ACP agent orchestration, combined with PostgreSQL for durable state. Today we’ll focus on the queue.
The Architecture
We’ll build three components:
- A dispatcher – pushes tasks into a Redis list.
- Worker agents – pop tasks, execute them, and handle failures.
- A circuit breaker – prevents hammering a failing service.
Tasks will have a JSON payload with a unique ID, agent type, and data. Workers will pull from a stream (we’ll use Redis Streams for their consumer group support, not just lists). Why streams? Because they allow multi-worker consumption without race conditions. Exactly what you need when multiple agents need to pick up tasks.
Step 1: Setting Up Redis and Dependencies
Install the required packages:
bash
pip install redis python-dotenv pydantic
Create a `.env` file with your Redis connection string:
REDIS_URL=redis://localhost:6379/0
Let’s initialize our Redis client and define a task model:
python
import json
import os
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from typing import Any, Dict
import redis.asyncio as aioredis
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)
class Task(BaseModel):
task_id: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
agent_type: str # e.g., "analyzer", "summarizer"
payload: Dict[str, Any]
retries: int = 0
max_retries: int = 3
status: str = "pending" # pending, running, completed, failed
Step 2: The Dispatcher — Pushing Tasks
We’ll use `XADD` to add tasks to a Redis stream. Streams support consumer groups, meaning you can have multiple workers processing different tasks without duplicates.
python
async def dispatch_task(task: Task, stream: str = "agent_tasks"):
task_dict = task.model_dump()
await redis_client.xadd(stream, task_dict)
print(f"Dispatched task {task.task_id} to stream {stream}")
return task.task_id
That’s it. The stream automatically assigns an auto-increment ID (`XADD` returns a timestamp-based ID). Workers can then acknowledge processed tasks.
Step 3: The Worker — With Retry Logic
Workers create a consumer group and read new tasks. We’ll add a retry mechanism: if the task fails, it’s re-added to the stream with an incremented `retries` counter and an exponential backoff delay.
python
import asyncio
async def worker(worker_id: str, stream: str = "agent_tasks", group: str = "agent_group"):
# Create consumer group (ignore if exists)
try:
await redis_client.xgroup_create(stream, group, mkstream=True)
except aioredis.ResponseError:
pass # Group already exists
consumer_name = f"worker_{worker_id}"
while True:
# Read pending and new messages
results = await redis_client.xreadgroup(
group, consumer_name, {stream: ">"}, count=1, block=5000
)
if not results:
await asyncio.sleep(0.1)
continue
for stream_name, messages in results:
for msg_id, msg_data in messages:
task = Task(**msg_data)
print(f"[{consumer_name}] Processing task {task.task_id}")
# Mark as running
task.status = "running"
await redis_client.xack(stream, group, msg_id)
# Execute
success = await execute_task(task)
if success:
# Optionally store result in another stream or Redis hash
await redis_client.hset(f"task_result:{task.task_id}", mapping={"status": "completed"})
else:
# Retry logic
task.retries += 1
if task.retries <= task.max_retries:
backoff = 2 ** task.retries # exponential: 2, 4, 8 seconds
print(f"Retrying task {task.task_id} in {backoff}s (attempt {task.retries})")
await asyncio.sleep(backoff)
await dispatch_task(task)
else:
print(f"Task {task.task_id} failed after {task.max_retries} retries")
await redis_client.hset(f"task_result:{task.task_id}", mapping={"status": "failed"})
# Always acknowledge so we don't get stuck
await redis_client.xack(stream, group, msg_id)
async def execute_task(task: Task) -> bool:
# Simulate agent processing
# In real code, call an LLM or a microservice
await asyncio.sleep(1)
# Simulate random failure (50% chance)
import random
if random.random() < 0.5:
print(f"Task {task.task_id} failed")
return False
return True
Notice the exponential backoff. Don’t just retry immediately — you’ll swamp your system. Start at 2 seconds, then 4, then 8. After `max_retries`, classify as dead. We store results in a Redis hash so other agents can check them.
Step 4: Circuit Breaker — Protecting Downstream Services
A circuit breaker prevents your agents from calling a failing service repeatedly. Here’s a simple implementation using Redis counters:
python
class CircuitBreaker:
def __init__(self, redis_client, name: str, failure_threshold: int = 5, recovery_timeout: int = 30):
self.redis = redis_client
self.name = f"circuit_breaker:{name}"
self.threshold = failure_threshold
self.timeout = recovery_timeout
async def is_open(self) -> bool:
state = await self.redis.get(self.name)
if state == "open":
return True
return False
async def record_failure(self):
count = await self.redis.incr(f"{self.name}:failures")
await self.redis.expire(f"{self.name}:failures", 60)
if count >= self.threshold:
await self.redis.set(self.name, "open", ex=self.timeout)
async def record_success(self):
await self.redis.delete(f"{self.name}:failures")
await self.redis.set(self.name, "closed")
In your `execute_task` function, wrap the external call:
python
async def execute_task(task: Task) -> bool:
breaker = CircuitBreaker(redis_client, "analyzer_service")
if await breaker.is_open():
print("Circuit open — skipping")
return False
try:
# Call external API
result = await call_external_service(task.payload)
await breaker.record_success()
return True
except Exception as e:
await breaker.record_failure()
raise
This way, if the service fails 5 times in 60 seconds, all subsequent calls are short-circuited for 30 seconds. Your agents don’t waste time and resources.
Step 5: State Recovery — Handling Crashes
What if a worker crashes mid-task? The unacknowledged message remains in the stream’s pending list. On restart, the worker can claim pending messages.
Add this recovery at the start of your worker:
python
async def recover_pending(worker_id: str, stream: str, group: str):
"""Claim pending messages older than 10 seconds."""
consumer_name = f"worker_{worker_id}"
pending = await redis_client.xpending_range(stream, group, min="-", max="+", count=10)
for msg in pending:
if msg['consumer'] == consumer_name:
continue # Already ours
# Claim it
claimed = await redis_client.xclaim(stream, group, consumer_name, min_idle_time=10000, message_ids=[msg['message_id']])
if claimed:
print(f"Recovered message {msg['message_id']}")
Run this before entering the main loop. The threshold of 10 seconds avoids stealing tasks that are still being processed.
Putting It All Together
Here’s how you start the dispatcher and workers:
python
async def main():
# Start 3 workers
workers = [worker(i) for i in range(3)]
await asyncio.gather(*workers)
asyncio.run(main())
And to dispatch a few tasks:
python
async def seed():
tasks = [
Task(agent_type="analyzer", payload={"url": "https://example.com"}),
Task(agent_type="summarizer", payload={"text": "AI agents..."}),
]
for t in tasks:
await dispatch_task(t)
asyncio.run(seed())
Why This Matters
We’ve seen countless teams in Ho Chi Minh City and Can Tho build multi-agent systems that work in staging but fall over in production. The difference is always resilience. At ECOAAI, our Vietnamese developers apply these patterns using our ACP orchestration platform to deliver 5x efficiency — not because the agents are smarter, but because they don’t stop when something goes wrong.
You don’t need a massive framework. A few hundred lines of Python with Redis and a circuit breaker can handle production loads. The beauty? It scales horizontally. Add more workers, more consumer groups. No single point of failure.
Frequently Asked Questions
Q: Why Redis Streams over simple Redis lists?
A: Streams support consumer groups, which let multiple workers process tasks without duplication. With lists, you’d need `BRPOP` and manual handling of failures — streams give you built-in message acknowledgment and pending list management.
Q: How do I persist task results long-term?
A: Use Redis hashes for short-term storage (results that other agents need within seconds). For long-term persistence, write results to PostgreSQL or a data lake. In our ECOA AI Platform ACP, we use PostgreSQL for durable state and Redis for fast queue operations.
Q: Can I run this with multiple Python processes?
A: Absolutely. Each worker runs independently. Redis handles concurrency through the stream’s consumer group mechanism. Just ensure all workers use the same group name and different consumer names.
Q: What about monitoring?
A: Use `XLEN` to check queue depth, `XPENDING` for stalled messages, and track circuit breaker states in Redis. Integrate with Prometheus for alerts when circuit breakers open or queue sizes grow.
Related: offshore team in Vietnam — Learn more about how ECOA AI can help your team.
Related: Vietnam outsourcing — Learn more about how ECOA AI can help your team.
Related: Vietnam software outsourcing — Learn more about how ECOA AI can help your team.
Related: outsource to Vietnam — Learn more about how ECOA AI can help your team.
Related reading: Hire Vietnamese Developers: The Strategic Advantage for Your Next Project