Performance Optimization
Reduce latency, improve throughput, and optimize caching in RAG systems.
Overview
Production RAG systems must be fast. This guide covers latency reduction, caching, and throughput optimization.
Latency Breakdown
Typical RAG query:
- Embedding: 50-100ms
- Vector search: 50-200ms
- LLM generation: 1000-3000ms
- Total: 1100-3300ms
Caching Strategies
Query Caching
from functools import lru_cache
import hashlib
class RAGCache:
def __init__(self):
self.cache = {}
def get_cache_key(self, query):
return hashlib.md5(query.encode()).hexdigest()
def query(self, query):
key = self.get_cache_key(query)
if key in self.cache:
return self.cache[key]
result = self.rag_pipeline(query)
self.cache[key] = result
return result
Embedding Caching
@lru_cache(maxsize=10000)
def embed_cached(text):
return embedding_model.encode(text)
Semantic Caching
class SemanticCache:
def __init__(self, similarity_threshold=0.95):
self.cache_vectors = []
self.cache_results = []
self.threshold = similarity_threshold
def get(self, query):
query_vector = embed(query)
for i, cached_vector in enumerate(self.cache_vectors):
similarity = cosine_similarity(query_vector, cached_vector)
if similarity > self.threshold:
return self.cache_results[i]
return None
Parallel Processing
from concurrent.futures import ThreadPoolExecutor
def parallel_retrieve(queries):
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(retrieve, queries))
return results
Batch Operations
# Bad: One at a time
for query in queries:
embed(query) # 100 API calls
# Good: Batch
embeddings = embedding_model.encode(queries) # 1 API call
Database Optimization
# Add indexes
vector_db.create_index(
field="vector",
index_type="HNSW",
params={"M": 16, "efConstruction": 200}
)
# Use connection pooling
from qdrant_client import QdrantClient
client = QdrantClient(
url="localhost:6333",
timeout=60,
prefer_grpc=True # 2-3x faster than HTTP
)
Async/Await
import asyncio
async def async_rag(query):
# Parallel retrieval and embedding
docs_task = asyncio.create_task(retrieve_async(query))
embed_task = asyncio.create_task(embed_async(query))
docs, query_embedding = await asyncio.gather(docs_task, embed_task)
# Generate answer
answer = await generate_async(query, docs)
return answer
Monitoring
import time
class PerformanceMonitor:
def __init__(self):
self.metrics = []
def measure(self, func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
latency = (time.time() - start) * 1000
self.metrics.append({
'function': func.__name__,
'latency_ms': latency
})
return result
return wrapper
Optimization Checklist
- Cache frequent queries (50%+ hit rate)
- Batch embedding requests
- Use HNSW index for vector search
- Enable gRPC for vector DB
- Implement async/await
- Monitor P95 latency (<2s)
Next Steps
- Observability - Monitor performance
- Cost Optimization - Reduce costs