Build a Resilient Multi-Agent Task Queue with Python and Redis: A Step-by-Step Developer Tutorial

1 comment
(Developer Tutorials) - Learn how to build a fault-tolerant multi-agent task queue using Python, Redis, and the ECOA AI Platform ACP. This tutorial covers retry logic, circuit breakers, and state recovery — patterns that our Vietnamese engineering team uses daily to deliver 5x efficiency.

Build a Resilient Multi-Agent Task Queue with Python and Redis: A Step-by-Step Developer Tutorial

You’ve built a multi-agent system. It works in dev. Then production hits, and everything falls apart. Agents hang, tasks get lost, and you’re debugging at 2 AM.

This is the reality most tutorials conveniently skip.

How to Build a Custom AI Code Review Agent: A Step-by-Step Tutorial with ECOA AI Platform ACP

How to Build a Custom AI Code Review Agent: A Step-by-Step Tutorial with ECOA AI Platform ACP

How to Build a Custom AI Code Review Agent: A Step-by-Step Tutorial with ECOA AI Platform ACP Let’s… ...

Here’s the thing: agents will fail. Your LLM API can timeout. A downstream service might be slow. The network could hiccup. If your agent orchestration treats these as edge cases, you’re in for a world of pain.

In this tutorial, I’ll show you how to build a resilient multi-agent task queue using Python and Redis — the exact patterns we use at ECOAAI with our Vietnamese engineering team. We’ll implement retry logic with exponential backoff, circuit breakers, and state recovery. By the end, you’ll have a queue that survives real-world chaos.

How We Cut Code Review Time from 2 Days to 2 Hours with a Remote Vietnamese Team and AI Automation

How We Cut Code Review Time from 2 Days to 2 Hours with a Remote Vietnamese Team and AI Automation

How We Cut Code Review Time from 2 Days to 2 Hours with a Remote Vietnamese Team and… ...

Let’s dive in.

Why a Task Queue for Multi-Agent Systems?

Most multi-agent architectures use direct function calls or HTTP requests. That works for prototypes, but in production, you need asynchronous, decoupled communication. A task queue gives you:

  • Backpressure handling – when agents are busy, tasks queue up.
  • Retry mechanisms – without blocking the caller.
  • Observability – you see exactly what’s pending, running, or failed.
  • Scalability – spin up more workers without touching orchestration code.

Redis is perfect here. It’s fast, persistent (with RDB/AOF), and supports atomic list operations. We use it at ECOAAI as the backbone for our ACP agent orchestration, combined with PostgreSQL for durable state. Today we’ll focus on the queue.

The Architecture

We’ll build three components:

  1. A dispatcher – pushes tasks into a Redis list.
  2. Worker agents – pop tasks, execute them, and handle failures.
  3. A circuit breaker – prevents hammering a failing service.

Tasks will have a JSON payload with a unique ID, agent type, and data. Workers will pull from a stream (we’ll use Redis Streams for their consumer group support, not just lists). Why streams? Because they allow multi-worker consumption without race conditions. Exactly what you need when multiple agents need to pick up tasks.

Step 1: Setting Up Redis and Dependencies

Install the required packages:

bash
pip install redis python-dotenv pydantic

Create a `.env` file with your Redis connection string:


REDIS_URL=redis://localhost:6379/0

Let’s initialize our Redis client and define a task model:

python
import json
import os
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from typing import Any, Dict
import redis.asyncio as aioredis

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)

class Task(BaseModel):
    task_id: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    agent_type: str  # e.g., "analyzer", "summarizer"
    payload: Dict[str, Any]
    retries: int = 0
    max_retries: int = 3
    status: str = "pending"  # pending, running, completed, failed

Step 2: The Dispatcher — Pushing Tasks

We’ll use `XADD` to add tasks to a Redis stream. Streams support consumer groups, meaning you can have multiple workers processing different tasks without duplicates.

python
async def dispatch_task(task: Task, stream: str = "agent_tasks"):
    task_dict = task.model_dump()
    await redis_client.xadd(stream, task_dict)
    print(f"Dispatched task {task.task_id} to stream {stream}")
    return task.task_id

That’s it. The stream automatically assigns an auto-increment ID (`XADD` returns a timestamp-based ID). Workers can then acknowledge processed tasks.

Step 3: The Worker — With Retry Logic

Workers create a consumer group and read new tasks. We’ll add a retry mechanism: if the task fails, it’s re-added to the stream with an incremented `retries` counter and an exponential backoff delay.

python
import asyncio

async def worker(worker_id: str, stream: str = "agent_tasks", group: str = "agent_group"):
    # Create consumer group (ignore if exists)
    try:
        await redis_client.xgroup_create(stream, group, mkstream=True)
    except aioredis.ResponseError:
        pass  # Group already exists

    consumer_name = f"worker_{worker_id}"

    while True:
        # Read pending and new messages
        results = await redis_client.xreadgroup(
            group, consumer_name, {stream: ">"}, count=1, block=5000
        )

        if not results:
            await asyncio.sleep(0.1)
            continue

        for stream_name, messages in results:
            for msg_id, msg_data in messages:
                task = Task(**msg_data)
                print(f"[{consumer_name}] Processing task {task.task_id}")

                # Mark as running
                task.status = "running"
                await redis_client.xack(stream, group, msg_id)

                # Execute
                success = await execute_task(task)

                if success:
                    # Optionally store result in another stream or Redis hash
                    await redis_client.hset(f"task_result:{task.task_id}", mapping={"status": "completed"})
                else:
                    # Retry logic
                    task.retries += 1
                    if task.retries <= task.max_retries:
                        backoff = 2 ** task.retries  # exponential: 2, 4, 8 seconds
                        print(f"Retrying task {task.task_id} in {backoff}s (attempt {task.retries})")
                        await asyncio.sleep(backoff)
                        await dispatch_task(task)
                    else:
                        print(f"Task {task.task_id} failed after {task.max_retries} retries")
                        await redis_client.hset(f"task_result:{task.task_id}", mapping={"status": "failed"})

                # Always acknowledge so we don't get stuck
                await redis_client.xack(stream, group, msg_id)

async def execute_task(task: Task) -> bool:
    # Simulate agent processing
    # In real code, call an LLM or a microservice
    await asyncio.sleep(1)
    # Simulate random failure (50% chance)
    import random
    if random.random() < 0.5:
        print(f"Task {task.task_id} failed")
        return False
    return True

Notice the exponential backoff. Don’t just retry immediately — you’ll swamp your system. Start at 2 seconds, then 4, then 8. After `max_retries`, classify as dead. We store results in a Redis hash so other agents can check them.

Step 4: Circuit Breaker — Protecting Downstream Services

A circuit breaker prevents your agents from calling a failing service repeatedly. Here’s a simple implementation using Redis counters:

python
class CircuitBreaker:
    def __init__(self, redis_client, name: str, failure_threshold: int = 5, recovery_timeout: int = 30):
        self.redis = redis_client
        self.name = f"circuit_breaker:{name}"
        self.threshold = failure_threshold
        self.timeout = recovery_timeout

    async def is_open(self) -> bool:
        state = await self.redis.get(self.name)
        if state == "open":
            return True
        return False

    async def record_failure(self):
        count = await self.redis.incr(f"{self.name}:failures")
        await self.redis.expire(f"{self.name}:failures", 60)
        if count >= self.threshold:
            await self.redis.set(self.name, "open", ex=self.timeout)

    async def record_success(self):
        await self.redis.delete(f"{self.name}:failures")
        await self.redis.set(self.name, "closed")

In your `execute_task` function, wrap the external call:

python
async def execute_task(task: Task) -> bool:
    breaker = CircuitBreaker(redis_client, "analyzer_service")
    if await breaker.is_open():
        print("Circuit open — skipping")
        return False

    try:
        # Call external API
        result = await call_external_service(task.payload)
        await breaker.record_success()
        return True
    except Exception as e:
        await breaker.record_failure()
        raise

This way, if the service fails 5 times in 60 seconds, all subsequent calls are short-circuited for 30 seconds. Your agents don’t waste time and resources.

Step 5: State Recovery — Handling Crashes

What if a worker crashes mid-task? The unacknowledged message remains in the stream’s pending list. On restart, the worker can claim pending messages.

Add this recovery at the start of your worker:

python
async def recover_pending(worker_id: str, stream: str, group: str):
    """Claim pending messages older than 10 seconds."""
    consumer_name = f"worker_{worker_id}"
    pending = await redis_client.xpending_range(stream, group, min="-", max="+", count=10)
    for msg in pending:
        if msg['consumer'] == consumer_name:
            continue  # Already ours
        # Claim it
        claimed = await redis_client.xclaim(stream, group, consumer_name, min_idle_time=10000, message_ids=[msg['message_id']])
        if claimed:
            print(f"Recovered message {msg['message_id']}")

Run this before entering the main loop. The threshold of 10 seconds avoids stealing tasks that are still being processed.

Putting It All Together

Here’s how you start the dispatcher and workers:

python
async def main():
    # Start 3 workers
    workers = [worker(i) for i in range(3)]
    await asyncio.gather(*workers)

asyncio.run(main())

And to dispatch a few tasks:

python
async def seed():
    tasks = [
        Task(agent_type="analyzer", payload={"url": "https://example.com"}),
        Task(agent_type="summarizer", payload={"text": "AI agents..."}),
    ]
    for t in tasks:
        await dispatch_task(t)

asyncio.run(seed())

Why This Matters

We’ve seen countless teams in Ho Chi Minh City and Can Tho build multi-agent systems that work in staging but fall over in production. The difference is always resilience. At ECOAAI, our Vietnamese developers apply these patterns using our ACP orchestration platform to deliver 5x efficiency — not because the agents are smarter, but because they don’t stop when something goes wrong.

You don’t need a massive framework. A few hundred lines of Python with Redis and a circuit breaker can handle production loads. The beauty? It scales horizontally. Add more workers, more consumer groups. No single point of failure.

Frequently Asked Questions

Q: Why Redis Streams over simple Redis lists?

A: Streams support consumer groups, which let multiple workers process tasks without duplication. With lists, you’d need `BRPOP` and manual handling of failures — streams give you built-in message acknowledgment and pending list management.

Q: How do I persist task results long-term?

A: Use Redis hashes for short-term storage (results that other agents need within seconds). For long-term persistence, write results to PostgreSQL or a data lake. In our ECOA AI Platform ACP, we use PostgreSQL for durable state and Redis for fast queue operations.

Q: Can I run this with multiple Python processes?

A: Absolutely. Each worker runs independently. Redis handles concurrency through the stream’s consumer group mechanism. Just ensure all workers use the same group name and different consumer names.

Q: What about monitoring?

A: Use `XLEN` to check queue depth, `XPENDING` for stalled messages, and track circuit breaker states in Redis. Integrate with Prometheus for alerts when circuit breakers open or queue sizes grow.

Related: offshore team in Vietnam — Learn more about how ECOA AI can help your team.

Related: Vietnam outsourcing — Learn more about how ECOA AI can help your team.

Related: Vietnam software outsourcing — Learn more about how ECOA AI can help your team.

Related: outsource to Vietnam — Learn more about how ECOA AI can help your team.

Related reading: Hire Vietnamese Developers: The Strategic Advantage for Your Next Project

Leave a Comment

Your email address will not be published. Required fields are marked *

Ready to Build with AI-Powered Developers?

Hire Vietnamese engineers augmented by ECOA AI Platform + Claude Code. 5x faster, 40% cheaper.