Performance Optimization

Reduce latency, improve throughput, and optimize caching in RAG systems.

Overview

Production RAG systems must be fast. This guide covers latency reduction, caching, and throughput optimization.

Latency Breakdown

Typical RAG query:

  • Embedding: 50-100ms
  • Vector search: 50-200ms
  • LLM generation: 1000-3000ms
  • Total: 1100-3300ms

Caching Strategies

Query Caching

from functools import lru_cache
import hashlib

class RAGCache:
    def __init__(self):
        self.cache = {}
    
    def get_cache_key(self, query):
        return hashlib.md5(query.encode()).hexdigest()
    
    def query(self, query):
        key = self.get_cache_key(query)
        
        if key in self.cache:
            return self.cache[key]
        
        result = self.rag_pipeline(query)
        self.cache[key] = result
        return result

Embedding Caching

@lru_cache(maxsize=10000)
def embed_cached(text):
    return embedding_model.encode(text)

Semantic Caching

class SemanticCache:
    def __init__(self, similarity_threshold=0.95):
        self.cache_vectors = []
        self.cache_results = []
        self.threshold = similarity_threshold
    
    def get(self, query):
        query_vector = embed(query)
        
        for i, cached_vector in enumerate(self.cache_vectors):
            similarity = cosine_similarity(query_vector, cached_vector)
            if similarity > self.threshold:
                return self.cache_results[i]
        
        return None

Parallel Processing

from concurrent.futures import ThreadPoolExecutor

def parallel_retrieve(queries):
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(retrieve, queries))
    return results

Batch Operations

# Bad: One at a time
for query in queries:
    embed(query)  # 100 API calls

# Good: Batch
embeddings = embedding_model.encode(queries)  # 1 API call

Database Optimization

# Add indexes
vector_db.create_index(
    field="vector",
    index_type="HNSW",
    params={"M": 16, "efConstruction": 200}
)

# Use connection pooling
from qdrant_client import QdrantClient

client = QdrantClient(
    url="localhost:6333",
    timeout=60,
    prefer_grpc=True  # 2-3x faster than HTTP
)

Async/Await

import asyncio

async def async_rag(query):
    # Parallel retrieval and embedding
    docs_task = asyncio.create_task(retrieve_async(query))
    embed_task = asyncio.create_task(embed_async(query))
    
    docs, query_embedding = await asyncio.gather(docs_task, embed_task)
    
    # Generate answer
    answer = await generate_async(query, docs)
    return answer

Monitoring

import time

class PerformanceMonitor:
    def __init__(self):
        self.metrics = []
    
    def measure(self, func):
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            latency = (time.time() - start) * 1000
            
            self.metrics.append({
                'function': func.__name__,
                'latency_ms': latency
            })
            
            return result
        return wrapper

Optimization Checklist

  • Cache frequent queries (50%+ hit rate)
  • Batch embedding requests
  • Use HNSW index for vector search
  • Enable gRPC for vector DB
  • Implement async/await
  • Monitor P95 latency (<2s)

Next Steps