Build a Production-Grade Multi-Agent System with Redis Streams: The Exact Architecture That Handles 10K Tasks/Hour

1 comment
(Developer Tutorials) - Stop duct-taping together brittle agent pipelines. Here's a battle-tested architecture using Redis Streams and Python that handles 10,000 concurrent tasks per hour with built-in retry, deduplication, and observability. Full code included.

Build a Production-Grade Multi-Agent System with Redis Streams: The Exact Architecture That Handles 10K Tasks/Hour

You’ve built a multi-agent system. It works in dev. Then you deploy it, and everything falls apart.

I’ve been there. Three times, actually. Each time, the root cause was the same: we treated agent communication like a simple queue. Messages got lost. Workers crashed mid-task. The orchestrator couldn’t tell if a task was in progress or dead.

Vietnam Outsourcing: The Smartest Offshore Development Bet for 2025

Vietnam Outsourcing: The Smartest Offshore Development Bet for 2025

TL;DR: Vietnam outsourcing is rapidly becoming the top choice for CTOs seeking high-quality software development at 40-60% lower… ...

Redis Streams fixed all of that. Here’s exactly how we built it.

Why Not Just Use a Queue?

Most teams start with Redis Lists or RabbitMQ. Simple, right? Push a message, pop it on the other end. But here’s the problem: no consumer group semantics, no acknowledgment, no message history.

Outsourcing Software Development: The Strategic Playbook for CTOs in 2025

Outsourcing Software Development: The Strategic Playbook for CTOs in 2025

TL;DR: Outsourcing software development can cut costs by 40% and accelerate delivery—but only if you choose the right… ...

  • A worker pulls a task and crashes. The task vanishes.
  • You can’t replay a failed task without manual intervention.
  • There’s zero visibility into what’s been processed.

Redis Streams solve all three. They’re persistent, support consumer groups with acknowledgments, and maintain a full event log. Our system in Ho Chi Minh City processes about 10,000 tasks per hour through a single Redis Stream. It’s been running for eight months without a single lost message.

The Architecture

Here’s the high-level flow:


Task Producer → Redis Stream → Consumer Group → Agent Workers → Result Stream
                                ↓
                          Dead Letter Stream

Each task is a JSON message with a unique ID, payload, and metadata. Workers belong to a consumer group. When a worker finishes, it writes the result to a separate stream. If it fails after three retries, the task goes to a dead letter stream for manual inspection.

Let’s walk through the code.

Setting Up the Stream

First, the producer. This is the component that generates tasks and pushes them into the stream.

python
import redis
import json
import uuid
from datetime import datetime

class TaskProducer:
    def __init__(self, redis_client: redis.Redis, stream_name: str):
        self.redis = redis_client
        self.stream = stream_name

    def dispatch(self, agent_type: str, payload: dict, priority: int = 0) -> str:
        task_id = str(uuid.uuid4())
        task = {
            "task_id": task_id,
            "agent_type": agent_type,
            "payload": json.dumps(payload),
            "priority": priority,
            "created_at": datetime.utcnow().isoformat(),
            "retry_count": 0,
            "max_retries": 3
        }
        # Redis Streams use * for auto-generated message ID
        self.redis.xadd(self.stream, task, maxlen=100000)
        return task_id

Notice `maxlen=100000`. This trims the stream to the last 100,000 messages. You don’t need infinite history, but you need enough to debug failures.

The Consumer Group

Consumer groups are where Redis Streams shine. They ensure each message is delivered to exactly one worker, and they track which messages have been processed.

python
def setup_consumer_group(redis_client: redis.Redis, stream: str, group: str):
    try:
        redis_client.xgroup_create(stream, group, id="0", mkstream=True)
    except redis.exceptions.ResponseError as e:
        if "BUSYGROUP" not in str(e):
            raise
        # Group already exists — that's fine

The `id=”0″` means new consumers start reading from the beginning of the stream. In production, you’ll want `id=”$”` to only read new messages after the group is created. But for development, starting from the beginning helps with testing.

The Worker

Here’s the worker that processes tasks. This runs in a loop, reading from the stream, processing the task, and acknowledging it.

python
class AgentWorker:
    def __init__(
        self,
        redis_client: redis.Redis,
        stream: str,
        group: str,
        consumer_name: str,
        agent_type: str,
        result_stream: str,
        dead_letter_stream: str
    ):
        self.redis = redis_client
        self.stream = stream
        self.group = group
        self.consumer = consumer_name
        self.agent_type = agent_type
        self.result_stream = result_stream
        self.dead_letter_stream = dead_letter_stream

    def run(self):
        print(f"[{self.consumer}] Starting worker for {self.agent_type}")
        while True:
            try:
                # Block for up to 5 seconds waiting for a message
                messages = self.redis.xreadgroup(
                    self.group,
                    self.consumer,
                    {self.stream: ">"},
                    count=1,
                    block=5000
                )
                if not messages:
                    continue

                stream_name, stream_messages = messages[0]
                for msg_id, msg_data in stream_messages:
                    task = {k.decode(): v.decode() if isinstance(v, bytes) else v 
                           for k, v in msg_data.items()}
                    
                    # Only process tasks for this agent type
                    if task.get("agent_type") != self.agent_type:
                        continue

                    try:
                        result = self.process_task(task)
                        self.redis.xadd(self.result_stream, {
                            "task_id": task["task_id"],
                            "status": "success",
                            "result": json.dumps(result),
                            "processed_by": self.consumer
                        })
                        self.redis.xack(self.stream, self.group, msg_id)
                    except Exception as e:
                        self.handle_failure(task, msg_id, str(e))

            except Exception as e:
                print(f"[{self.consumer}] Error in main loop: {e}")
                time.sleep(1)

    def process_task(self, task: dict) -> dict:
        """Override this in subclasses."""
        raise NotImplementedError

    def handle_failure(self, task: dict, msg_id: str, error: str):
        retry_count = int(task.get("retry_count", 0))
        if retry_count < int(task.get("max_retries", 3)):
            # Retry: update retry count and push back to stream
            task["retry_count"] = retry_count + 1
            task["last_error"] = error
            self.redis.xadd(self.stream, task)
            self.redis.xack(self.stream, self.group, msg_id)
            print(f"[{self.consumer}] Retrying task {task['task_id']} (attempt {retry_count + 1})")
        else:
            # Dead letter
            self.redis.xadd(self.dead_letter_stream, {
                "task_id": task["task_id"],
                "original_payload": json.dumps(task),
                "error": error,
                "failed_at": datetime.utcnow().isoformat()
            })
            self.redis.xack(self.stream, self.group, msg_id)
            print(f"[{self.consumer}] Task {task['task_id']} sent to dead letter")

Let's break down what's happening here.

The Read Loop

`xreadgroup` with `">"` tells Redis to give us messages that haven't been delivered to any consumer in this group. The `block=5000` means it waits up to 5 seconds for a message. No busy-waiting.

Acknowledgment

`xack` is critical. Until you acknowledge a message, Redis considers it "pending." If a worker crashes without acknowledging, another worker can claim it after a timeout. This is how we handle crashes without losing tasks.

Retry Logic

We increment `retry_count` and push the task back to the stream. On the next read, it gets delivered to a worker (possibly a different one). After three failures, it goes to the dead letter stream.

A Concrete Example: Sentiment Analysis Agent

Let's make this concrete. Here's a worker that performs sentiment analysis on incoming customer messages.

python
class SentimentAgent(AgentWorker):
    def process_task(self, task: dict) -> dict:
        payload = json.loads(task["payload"])
        text = payload.get("text", "")
        
        # Simulate some processing
        if not text:
            raise ValueError("Empty text in payload")
        
        # In production, you'd call an LLM or a model here
        sentiment = "positive" if "good" in text.lower() else "negative"
        confidence = 0.87
        
        return {
            "sentiment": sentiment,
            "confidence": confidence,
            "text_length": len(text)
        }

And here's how you'd run it:

python
r = redis.Redis(host="localhost", port=6379)
setup_consumer_group(r, "tasks", "sentiment_workers")

worker = SentimentAgent(
    redis_client=r,
    stream="tasks",
    group="sentiment_workers",
    consumer_name="worker-1",
    agent_type="sentiment",
    result_stream="results",
    dead_letter_stream="dead_letters"
)
worker.run()

Monitoring: The Pending Entries List

Here's a one-liner that saved our team in Can Tho more times than I can count:

python
def get_pending_count(redis_client: redis.Redis, stream: str, group: str) -> int:
    info = redis_client.xpending(stream, group)
    return info.get("pending", 0)

If this number grows, workers are crashing without acknowledging. You can also list specific pending messages:

python
pending = redis_client.xpending_range(stream, group, min="-", max="+", count=10)
for msg in pending:
    print(f"Task {msg['message_id']} has been pending for {msg['time_since_delivered']}ms")

We alert when pending exceeds 50. That's our threshold. Yours might be different, but you need one.

Performance Numbers

We benchmarked this on a single c6g.large EC2 instance (2 vCPU, 4GB RAM) with Redis 7.0. Here's what we saw:

Concurrency Tasks/Hour P99 Latency Memory Usage
5 workers 2,400 45ms 120MB
10 workers 5,100 52ms 180MB
20 workers 10,200 68ms 310MB
50 workers 24,000 112ms 680MB

The bottleneck is Redis, not Python. At 50 workers, we started seeing contention on the stream writes. The fix was sharding — splitting tasks across multiple streams by agent type.

Common Pitfalls

1. Not Setting `maxlen`

Without `maxlen`, your stream grows forever. Redis is memory-bound. A stream with 10 million messages at 1KB each is 10GB. Set a limit.

2. Ignoring the Pending List

We ignored it for two weeks. Then a worker silently died, and 15,000 tasks piled up. The pending list is your canary. Watch it.

3. Synchronous Processing

Don't block the read loop with slow operations. If your agent calls an external API with a 30-second timeout, you need async workers or a separate thread pool. Redis doesn't care, but your throughput will suffer.

When This Architecture Breaks

Honestly, this isn't perfect for everything.

  • Ultra-low latency (<10ms): The Redis round-trip adds 1-3ms. If you need single-digit millisecond responses, consider in-process queues.
  • Massive throughput (>100K tasks/sec): You'll need to shard across multiple Redis instances or use Kafka.
  • Exactly-once semantics: Redis Streams give you at-least-once. You can build idempotency on top (use task IDs), but it's not built-in.

But for 95% of multi-agent workloads — data processing pipelines, content moderation, LLM batch inference — this is rock solid.

The Full Picture

We run this with 12 different agent types across three Redis instances. Each stream has its own consumer group. The results stream feeds into a dashboard that tracks throughput, error rates, and latency. Our team in Ho Chi Minh City maintains it, and honestly, it's been the most stable piece of our infrastructure.

Why? Because Redis Streams force you to handle failures explicitly. There's no magic. You acknowledge, you retry, or you dead-letter. That's it.

If your multi-agent system is held together with duct tape and hope, try this. You'll sleep better.

Frequently Asked Questions

How is Redis Streams different from using a simple Redis List?

A Redis List gives you a basic queue — push from one end, pop from the other. Redis Streams add consumer groups, message acknowledgments, persistent history, and the ability to read messages from any point in time. With a list, if a worker crashes after popping a message, that message is gone forever. With a stream, the message stays pending until acknowledged.

Can I scale this across multiple servers?

Yes. Redis Streams are a centralized service. Workers can run on different machines, different data centers, or different continents. Each worker just needs the Redis host and port. Consumer groups handle distribution automatically — each message goes to exactly one consumer in the group.

What happens if Redis goes down?

Redis Streams are persistent by default (if you use AOF or RDB persistence). When Redis comes back up, the stream, consumer groups, and pending messages are all restored. Workers will reconnect and continue from where they left off. We've tested this. It works.

How do I handle duplicate task processing?

Redis Streams guarantee at-least-once delivery. To handle duplicates, make your task processing idempotent. Use the `task_id` as a unique key. Before processing, check if that ID already exists in a results set. If it does, skip it. We use a Redis Set with an expiry of 24 hours for this.

Leave a Comment

Your email address will not be published. Required fields are marked *

Ready to Build with AI-Powered Developers?

Hire Vietnamese engineers augmented by ECOA AI Platform + Claude Code. 5x faster, 40% cheaper.