Observability & Monitoring
Track, debug, and optimize RAG systems in production with comprehensive observability.
Overview
Production RAG systems require visibility into retrieval quality, latency, costs, and failures. This guide covers essential monitoring practices.
Key Metrics
Retrieval Metrics
metrics = {
'recall_at_k': 0.85, # % of relevant docs in top-k
'mrr': 0.72, # Mean Reciprocal Rank
'avg_retrieval_latency_ms': 150,
'p95_retrieval_latency_ms': 300
}
LLM Metrics
llm_metrics = {
'avg_generation_latency_ms': 1200,
'tokens_per_second': 25,
'total_tokens_used': 1_500_000,
'estimated_cost_usd': 45.00
}
System Metrics
system_metrics = {
'requests_per_minute': 120,
'error_rate': 0.02, # 2%
'cache_hit_rate': 0.65 # 65%
}
Logging
Structured Logging
import logging
import json
from datetime import datetime
class RAGLogger:
def __init__(self):
self.logger = logging.getLogger('rag_system')
def log_query(self, query, results, latency_ms, user_id=None):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'query',
'query': query,
'num_results': len(results),
'latency_ms': latency_ms,
'user_id': user_id,
'result_ids': [r.id for r in results]
}
self.logger.info(json.dumps(log_entry))
Distributed Tracing
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
tracer = trace.get_tracer(__name__)
def rag_pipeline(query):
with tracer.start_as_current_span("rag_pipeline") as span:
span.set_attribute("query", query)
# Retrieval
with tracer.start_as_current_span("retrieval"):
docs = retrieve(query)
span.set_attribute("num_docs", len(docs))
# Generation
with tracer.start_as_current_span("generation"):
answer = generate(query, docs)
span.set_attribute("answer_length", len(answer))
return answer
Dashboards
Grafana Dashboard Example
# Export metrics to Prometheus
from prometheus_client import Counter, Histogram, Gauge
query_counter = Counter('rag_queries_total', 'Total queries')
latency_histogram = Histogram('rag_latency_seconds', 'Query latency')
cost_gauge = Gauge('rag_cost_usd', 'Estimated cost')
@latency_histogram.time()
def process_query(query):
query_counter.inc()
result = rag_pipeline(query)
cost_gauge.set(calculate_cost(result))
return result
Alerting
class AlertManager:
def __init__(self):
self.thresholds = {
'error_rate': 0.05, # 5%
'p95_latency_ms': 2000,
'cost_per_hour': 10.00
}
def check_alerts(self, metrics):
alerts = []
if metrics['error_rate'] > self.thresholds['error_rate']:
alerts.append({
'severity': 'high',
'message': f"Error rate {metrics['error_rate']} exceeds threshold"
})
if metrics['p95_latency_ms'] > self.thresholds['p95_latency_ms']:
alerts.append({
'severity': 'medium',
'message': f"P95 latency {metrics['p95_latency_ms']}ms too high"
})
return alerts
User Feedback Loop
class FeedbackCollector:
def collect_feedback(self, query_id, rating, comment=None):
"""Collect user feedback on answers"""
feedback = {
'query_id': query_id,
'rating': rating, # 1-5
'comment': comment,
'timestamp': datetime.utcnow()
}
self.db.insert('feedback', feedback)
def analyze_feedback(self):
"""Identify low-quality responses"""
low_rated = self.db.query(
"SELECT * FROM feedback WHERE rating <= 2"
)
return low_rated
Cost Tracking
class CostTracker:
def __init__(self):
self.costs = {
'embedding': 0.0001, # per 1K tokens
'llm_input': 0.0015,
'llm_output': 0.002
}
def track_query_cost(self, query, docs, answer):
# Embedding cost
embedding_tokens = len(query.split()) * 1.3
embedding_cost = (embedding_tokens / 1000) * self.costs['embedding']
# LLM cost
input_tokens = sum(len(d.split()) for d in docs) * 1.3
output_tokens = len(answer.split()) * 1.3
llm_cost = (
(input_tokens / 1000) * self.costs['llm_input'] +
(output_tokens / 1000) * self.costs['llm_output']
)
total = embedding_cost + llm_cost
self.log_cost(total)
return total
Next Steps
- Production Deployment - Deploy with monitoring
- Cost Optimization - Reduce costs