Streaming RAG

Implement streaming responses for better user experience in RAG applications.

Overview

Streaming provides immediate feedback to users instead of waiting for the complete response. Critical for production RAG systems.

Basic Streaming

import openai

def stream_rag_response(query, context):
    stream = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "Answer based on the context."},
            {"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}
        ],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

FastAPI Implementation

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/query")
async def query_stream(query: str):
    async def generate():
        # Retrieve context
        docs = await retrieve(query)
        context = "\n".join([d.text for d in docs])
        
        # Stream response
        for chunk in stream_rag_response(query, context):
            yield f"data: {chunk}\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Client-Side Handling

async function streamQuery(query) {
    const response = await fetch('/query', {
        method: 'POST',
        body: JSON.stringify({ query }),
        headers: { 'Content-Type': 'application/json' }
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        document.getElementById('answer').innerHTML += chunk;
    }
}

Progressive Context Loading

async def progressive_rag(query):
    # Start with fast, low-quality results
    quick_docs = await fast_retrieve(query, k=3)
    
    # Stream initial response
    async for chunk in stream_response(query, quick_docs):
        yield chunk
    
    # Meanwhile, get better results
    better_docs = await deep_retrieve(query, k=10)
    
    # Optionally refine answer
    if better_docs != quick_docs:
        yield "\n\n[Refining answer with additional context...]\n"
        async for chunk in stream_response(query, better_docs):
            yield chunk

The "Thinking" Pattern (Reasoning Models)

With models like OpenAI o1 and DeepSeek R1, we now have "reasoning tokens" that happen before the final answer.

UX Challenge: Users might see a spinner for 10-30 seconds. Solution: Stream the thought process (or a summary of it) to keep the user engaged.

From Production: "Stream thinking tokens to improve perceived latency by ~45%. Even if the total time is the same, seeing 'Thinking: Analyzing documents...' makes the wait feel shorter."

UX Pattern: "Think Harder" Button

Don't force deep reasoning on every query.

  • Default: Fast RAG (Standard LLM)
  • Opt-in: "Deep Research" button that enables reasoning models + multi-step retrieval.

Common Questions

"How do I handle citations in a stream?"

Stream them as they are generated.

  • Don't wait for the full response to append citations.
  • Use a structured stream format (e.g., XML tags or JSON lines) to send metadata alongside text chunks.

"What if the stream fails mid-way?"

Retry with a non-streaming fallback.

  • If the connection drops, the client should automatically retry the request (idempotency keys help here).
  • Consider caching the partial response if possible, though usually it's better to just restart for consistency.

Next Steps