Observability & Monitoring

Track, debug, and optimize RAG systems in production with comprehensive observability.

Overview

Production RAG systems require visibility into retrieval quality, latency, costs, and failures. This guide covers essential monitoring practices.

Key Metrics

Retrieval Metrics

metrics = {
    'recall_at_k': 0.85,  # % of relevant docs in top-k
    'mrr': 0.72,  # Mean Reciprocal Rank
    'avg_retrieval_latency_ms': 150,
    'p95_retrieval_latency_ms': 300
}

LLM Metrics

llm_metrics = {
    'avg_generation_latency_ms': 1200,
    'tokens_per_second': 25,
    'total_tokens_used': 1_500_000,
    'estimated_cost_usd': 45.00
}

System Metrics

system_metrics = {
    'requests_per_minute': 120,
    'error_rate': 0.02,  # 2%
    'cache_hit_rate': 0.65  # 65%
}

Logging

Structured Logging

import logging
import json
from datetime import datetime

class RAGLogger:
    def __init__(self):
        self.logger = logging.getLogger('rag_system')
    
    def log_query(self, query, results, latency_ms, user_id=None):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'event': 'query',
            'query': query,
            'num_results': len(results),
            'latency_ms': latency_ms,
            'user_id': user_id,
            'result_ids': [r.id for r in results]
        }
        self.logger.info(json.dumps(log_entry))

Distributed Tracing

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider

tracer = trace.get_tracer(__name__)

def rag_pipeline(query):
    with tracer.start_as_current_span("rag_pipeline") as span:
        span.set_attribute("query", query)
        
        # Retrieval
        with tracer.start_as_current_span("retrieval"):
            docs = retrieve(query)
            span.set_attribute("num_docs", len(docs))
        
        # Generation
        with tracer.start_as_current_span("generation"):
            answer = generate(query, docs)
            span.set_attribute("answer_length", len(answer))
        
        return answer

Dashboards

Grafana Dashboard Example

# Export metrics to Prometheus
from prometheus_client import Counter, Histogram, Gauge

query_counter = Counter('rag_queries_total', 'Total queries')
latency_histogram = Histogram('rag_latency_seconds', 'Query latency')
cost_gauge = Gauge('rag_cost_usd', 'Estimated cost')

@latency_histogram.time()
def process_query(query):
    query_counter.inc()
    result = rag_pipeline(query)
    cost_gauge.set(calculate_cost(result))
    return result

Alerting

class AlertManager:
    def __init__(self):
        self.thresholds = {
            'error_rate': 0.05,  # 5%
            'p95_latency_ms': 2000,
            'cost_per_hour': 10.00
        }
    
    def check_alerts(self, metrics):
        alerts = []
        
        if metrics['error_rate'] > self.thresholds['error_rate']:
            alerts.append({
                'severity': 'high',
                'message': f"Error rate {metrics['error_rate']} exceeds threshold"
            })
        
        if metrics['p95_latency_ms'] > self.thresholds['p95_latency_ms']:
            alerts.append({
                'severity': 'medium',
                'message': f"P95 latency {metrics['p95_latency_ms']}ms too high"
            })
        
        return alerts

User Feedback Loop

class FeedbackCollector:
    def collect_feedback(self, query_id, rating, comment=None):
        """Collect user feedback on answers"""
        feedback = {
            'query_id': query_id,
            'rating': rating,  # 1-5
            'comment': comment,
            'timestamp': datetime.utcnow()
        }
        self.db.insert('feedback', feedback)
    
    def analyze_feedback(self):
        """Identify low-quality responses"""
        low_rated = self.db.query(
            "SELECT * FROM feedback WHERE rating <= 2"
        )
        return low_rated

Cost Tracking

class CostTracker:
    def __init__(self):
        self.costs = {
            'embedding': 0.0001,  # per 1K tokens
            'llm_input': 0.0015,
            'llm_output': 0.002
        }
    
    def track_query_cost(self, query, docs, answer):
        # Embedding cost
        embedding_tokens = len(query.split()) * 1.3
        embedding_cost = (embedding_tokens / 1000) * self.costs['embedding']
        
        # LLM cost
        input_tokens = sum(len(d.split()) for d in docs) * 1.3
        output_tokens = len(answer.split()) * 1.3
        
        llm_cost = (
            (input_tokens / 1000) * self.costs['llm_input'] +
            (output_tokens / 1000) * self.costs['llm_output']
        )
        
        total = embedding_cost + llm_cost
        
        self.log_cost(total)
        return total

Next Steps