Streaming RAG
Implement streaming responses for better user experience in RAG applications.
Overview
Streaming provides immediate feedback to users instead of waiting for the complete response. Critical for production RAG systems.
Basic Streaming
import openai
def stream_rag_response(query, context):
stream = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Answer based on the context."},
{"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
FastAPI Implementation
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/query")
async def query_stream(query: str):
async def generate():
# Retrieve context
docs = await retrieve(query)
context = "\n".join([d.text for d in docs])
# Stream response
for chunk in stream_rag_response(query, context):
yield f"data: {chunk}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Client-Side Handling
async function streamQuery(query) {
const response = await fetch('/query', {
method: 'POST',
body: JSON.stringify({ query }),
headers: { 'Content-Type': 'application/json' }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
document.getElementById('answer').innerHTML += chunk;
}
}
Progressive Context Loading
async def progressive_rag(query):
# Start with fast, low-quality results
quick_docs = await fast_retrieve(query, k=3)
# Stream initial response
async for chunk in stream_response(query, quick_docs):
yield chunk
# Meanwhile, get better results
better_docs = await deep_retrieve(query, k=10)
# Optionally refine answer
if better_docs != quick_docs:
yield "\n\n[Refining answer with additional context...]\n"
async for chunk in stream_response(query, better_docs):
yield chunk
The "Thinking" Pattern (Reasoning Models)
With models like OpenAI o1 and DeepSeek R1, we now have "reasoning tokens" that happen before the final answer.
UX Challenge: Users might see a spinner for 10-30 seconds. Solution: Stream the thought process (or a summary of it) to keep the user engaged.
From Production: "Stream thinking tokens to improve perceived latency by ~45%. Even if the total time is the same, seeing 'Thinking: Analyzing documents...' makes the wait feel shorter."
UX Pattern: "Think Harder" Button
Don't force deep reasoning on every query.
- Default: Fast RAG (Standard LLM)
- Opt-in: "Deep Research" button that enables reasoning models + multi-step retrieval.
Common Questions
"How do I handle citations in a stream?"
Stream them as they are generated.
- Don't wait for the full response to append citations.
- Use a structured stream format (e.g., XML tags or JSON lines) to send metadata alongside text chunks.
"What if the stream fails mid-way?"
Retry with a non-streaming fallback.
- If the connection drops, the client should automatically retry the request (idempotency keys help here).
- Consider caching the partial response if possible, though usually it's better to just restart for consistency.
Next Steps
- Observability - Monitor streaming performance
- Production Deployment - Deploy streaming APIs