Build a Production-Ready Python Caching Layer with Redis: A Step-by-Step Developer Tutorial
You know that feeling. Your API response times creep up. The database CPU spikes during peak hours. Someone yells “it’s slow” in Slack.
Adding a caching layer is the obvious fix. But slapping `redis.set()` and `redis.get()` around your queries isn’t enough. In production, a poorly designed cache is worse than no cache at all.
Best AI Coding Assistants Compared 2026: Claude Code vs Cursor vs Cline vs Aider
Best AI Coding Assistants Compared 2026: Claude Code vs Cursor vs Cline vs Aider TL;DR: We benchmarked the… ...
I’ve seen teams in Ho Chi Minh City and Can Tho build caching layers that handle millions of requests daily. The difference between a cache that works and one that burns down your infrastructure comes down to a few critical patterns.
Let’s build a real one.
How AI Is Reshaping Every Stage of the Software Development Lifecycle
TL;DR: AI tools are now automating code generation, testing, debugging, and deployment across the entire software lifecycle. This… ...
What We’re Building
A production-grade Python caching layer that handles:
- Connection pooling (no, you don’t open a new connection per request)
- Serialization (Redis stores bytes, not Python objects)
- TTL strategies (different data needs different expiration)
- Cache invalidation (the hardest problem in computer science)
- Graceful degradation (Redis goes down? Your app shouldn’t)
Here’s the stack:
- Python 3.11+
- Redis 7.x
- `redis-py` (the official client)
- `pydantic` for serialization
Step 1: The Connection Pool
Don’t create a Redis client per request. That’s how you exhaust file descriptors and crash your app.
python
import redis
from typing import Optional
class RedisPool:
"""Thread-safe Redis connection pool."""
_pool: Optional[redis.ConnectionPool] = None
@classmethod
def get_pool(cls, host: str = "localhost", port: int = 6379,
db: int = 0, max_connections: int = 20) -> redis.ConnectionPool:
if cls._pool is None:
cls._pool = redis.ConnectionPool(
host=host,
port=port,
db=db,
max_connections=max_connections,
decode_responses=False, # We'll handle decoding ourselves
socket_connect_timeout=2,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
return cls._pool
@classmethod
def get_client(cls) -> redis.Redis:
return redis.Redis(connection_pool=cls.get_pool())
Why this matters:
- `max_connections=20` prevents connection leaks
- `health_check_interval=30` drops dead connections
- `retry_on_timeout=True` handles transient network issues
We’ve seen teams in Can Tho skip connection pooling entirely. Their apps crashed under 500 concurrent users. Don’t be that team.
Step 2: Serialization with Pydantic
Redis stores strings and bytes. Your Python objects need to serialize cleanly.
python
import json
from datetime import datetime
from typing import Any, TypeVar, Generic
from pydantic import BaseModel
T = TypeVar('T', bound=BaseModel)
class CacheSerializer:
"""Handles serialization with type safety."""
@staticmethod
def serialize(data: Any) -> bytes:
"""Serialize any JSON-compatible data to bytes."""
if isinstance(data, BaseModel):
return data.model_dump_json().encode('utf-8')
return json.dumps(data, default=str).encode('utf-8')
@staticmethod
def deserialize(data: bytes, model_class: type[T]) -> T:
"""Deserialize bytes back to a Pydantic model."""
return model_class.model_validate_json(data.decode('utf-8'))
The trick: We use `model_dump_json()` instead of `model_dump()`. It handles datetime serialization and nested models automatically. No more `datetime is not JSON serializable` errors at 2 AM.
Step 3: The Core Caching Class
Now let’s tie it together with a caching class that actually thinks about production.
python
import hashlib
import logging
from typing import Optional, Callable
from functools import wraps
logger = logging.getLogger(__name__)
class RedisCache:
"""Production-ready Redis cache with fallback."""
def __init__(self, prefix: str = "app", default_ttl: int = 300):
self.client = RedisPool.get_client()
self.prefix = prefix
self.default_ttl = default_ttl
def _make_key(self, key: str) -> str:
"""Create a namespaced key to avoid collisions."""
return f"{self.prefix}:{key}"
def get(self, key: str) -> Optional[bytes]:
"""Get a value from cache."""
try:
data = self.client.get(self._make_key(key))
return data
except redis.RedisError as e:
logger.warning(f"Cache get failed for {key}: {e}")
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set a value in cache with TTL."""
try:
serialized = CacheSerializer.serialize(value)
ttl = ttl or self.default_ttl
return self.client.setex(self._make_key(key), ttl, serialized)
except redis.RedisError as e:
logger.warning(f"Cache set failed for {key}: {e}")
return False
def delete(self, key: str) -> bool:
"""Invalidate a cache key."""
try:
return bool(self.client.delete(self._make_key(key)))
except redis.RedisError as e:
logger.warning(f"Cache delete failed for {key}: {e}")
return False
def invalidate_pattern(self, pattern: str) -> int:
"""Invalidate all keys matching a pattern."""
try:
cursor = 0
deleted = 0
while True:
cursor, keys = self.client.scan(
cursor=cursor,
match=self._make_key(pattern),
count=100
)
if keys:
deleted += self.client.delete(*keys)
if cursor == 0:
break
return deleted
except redis.RedisError as e:
logger.warning(f"Cache pattern invalidation failed: {e}")
return 0
Notice the `try/except` blocks? When Redis goes down, your app should degrade gracefully, not throw 500s. Log the error, return `None`, and let the caller fall back to the database.
Step 4: The Decorator Pattern
This is where the magic happens. A decorator that caches function results transparently.
python
def cached(ttl: Optional[int] = None, key_builder: Optional[Callable] = None):
"""Decorator that caches function results in Redis."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache = RedisCache()
# Build cache key from function name and arguments
if key_builder:
cache_key = key_builder(*args, **kwargs)
else:
# Create a deterministic key from args
key_parts = [func.__name__]
key_parts.extend(str(a) for a in args)
key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))
cache_key = hashlib.md5(":".join(key_parts).encode()).hexdigest()
# Try cache first
cached_data = cache.get(cache_key)
if cached_data is not None:
# We need to know the return type - this is a simplification
return cached_data
# Execute function
result = func(*args, **kwargs)
# Cache the result
cache.set(cache_key, result, ttl=ttl)
return result
return wrapper
return decorator
Step 5: Real-World Usage
Here’s how you’d use this in a production API:
python
from pydantic import BaseModel
from typing import List, Optional
import time
class UserProfile(BaseModel):
user_id: int
name: str
email: str
last_login: str
cache = RedisCache(prefix="users", default_ttl=600) # 10 minutes
def get_user_profile(user_id: int) -> Optional[UserProfile]:
"""Get user profile with caching."""
# Try cache
cached = cache.get(f"profile:{user_id}")
if cached:
return CacheSerializer.deserialize(cached, UserProfile)
# Simulate database query (2 seconds - ouch)
time.sleep(2)
profile = UserProfile(
user_id=user_id,
name="Jane Doe",
email="jane@example.com",
last_login="2025-05-15T10:30:00"
)
# Cache for next time
cache.set(f"profile:{user_id}", profile, ttl=300) # 5 minutes
return profile
# First call: 2 seconds
# Subsequent calls: < 1 millisecond
Step 6: Cache Invalidation Strategies
Here's where most caching implementations fail. You need a strategy for keeping your cache fresh.
Time-Based Expiration (TTL)
The simplest approach. Set a TTL and let Redis handle cleanup.
python
# Short TTL for frequently changing data
cache.set("leaderboard:today", data, ttl=60) # 1 minute
# Long TTL for stable reference data
cache.set("country_codes", data, ttl=86400) # 24 hours
Write-Through Cache
Update the cache when you write to the database.
python
def update_user_profile(user_id: int, name: str) -> UserProfile:
# Update database
db.execute("UPDATE users SET name = %s WHERE id = %s", (name, user_id))
# Update cache immediately
updated = get_user_profile_from_db(user_id)
cache.set(f"profile:{user_id}", updated, ttl=300)
return updated
Pattern-Based Invalidation
Invalidate all related cache entries when data changes.
python
def update_user_email(user_id: int, new_email: str):
# Update database
db.execute("UPDATE users SET email = %s WHERE id = %s", (new_email, user_id))
# Invalidate all cache entries for this user
cache.invalidate_pattern(f"profile:{user_id}:*")
cache.invalidate_pattern(f"settings:{user_id}:*")
Step 7: Monitoring and Observability
You can't improve what you don't measure.
python
import time
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class CacheMetrics:
hits: int = 0
misses: int = 0
errors: int = 0
latency: list[float] = field(default_factory=list)
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def avg_latency_ms(self) -> float:
return (sum(self.latency) / len(self.latency) * 1000) if self.latency else 0.0
class MonitoredCache(RedisCache):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = CacheMetrics()
def get(self, key: str) -> Optional[bytes]:
start = time.time()
try:
data = super().get(key)
if data is not None:
self.metrics.hits += 1
else:
self.metrics.misses += 1
return data
except Exception:
self.metrics.errors += 1
raise
finally:
self.metrics.latency.append(time.time() - start)
Target metrics for production:
- Hit rate: > 85% for well-designed caches
- Average latency: < 1ms for cache reads
- Error rate: < 0.1% (network blips happen)
Common Production Gotchas
1. Cache Stampede
When a popular cache key expires and 100 requests all hit the database simultaneously.
Fix: Use Redis locks for cache regeneration.
python
def get_or_compute(key: str, compute_func: Callable, ttl: int = 300):
data = cache.get(key)
if data is not None:
return data
# Try to acquire lock
lock_key = f"lock:{key}"
if cache.client.setnx(lock_key, "1"):
cache.client.expire(lock_key, 10) # Auto-release after 10 seconds
data = compute_func()
cache.set(key, data, ttl=ttl)
cache.client.delete(lock_key)
return data
# Wait for the other process to finish
time.sleep(0.1)
return get_or_compute(key, compute_func, ttl)
2. Serialization Overhead
Pydantic serialization adds ~0.5ms per operation. For high-throughput systems, consider `msgpack` or `pickle`.
python
import msgpack
class FastSerializer:
@staticmethod
def serialize(data: Any) -> bytes:
return msgpack.packb(data, default=str)
@staticmethod
def deserialize(data: bytes) -> Any:
return msgpack.unpackb(data)
3. Memory Management
Redis runs in RAM. Monitor your memory usage.
bash
# Check memory usage
redis-cli INFO memory
# Set max memory policy
redis-cli CONFIG SET maxmemory-policy allkeys-lru
The Complete Production Setup
Here's everything you need in one file:
Related reading: Vietnam Outsourcing: Why Southeast Asia’s Tech Hub Is Redefining Offshore Development
Related reading: Outsourcing Software in 2025: Why Vietnam Is Winning the Offshore Engineering War