Production RAG
From Prototype to Production
A RAG system that works in a notebook is fundamentally different from one that serves real users. In production, you face latency budgets, cost constraints, concurrent users, stale data, adversarial inputs, and compliance requirements. This lesson covers the engineering practices that make a RAG system reliable, fast, and affordable at scale.
Caching Strategies
Caching is the single most effective optimization for production RAG. Many users ask similar or identical questions, and computing embeddings and LLM responses from scratch every time is wasteful.
Semantic Cache
A semantic cache stores query-answer pairs and returns cached answers for semantically similar queries. If a user asks "What is the PTO policy?" and another asks "How many vacation days do I get?", the semantic cache recognizes them as the same question.
from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
import hashlib
import numpy as np
class SemanticCache:
def __init__(self, embeddings, threshold: float = 0.95):
self.embeddings = embeddings
self.threshold = threshold
self.cache = {} # {embedding_hash: {"embedding": [...], "answer": "..."}}
def get(self, query: str) -> str | None:
"""Check cache for semantically similar query."""
query_embedding = self.embeddings.embed_query(query)
for key, entry in self.cache.items():
similarity = np.dot(query_embedding, entry["embedding"])
if similarity >= self.threshold:
return entry["answer"]
return None
def set(self, query: str, answer: str):
"""Cache a query-answer pair."""
embedding = self.embeddings.embed_query(query)
key = hashlib.md5(query.encode()).hexdigest()
self.cache[key] = {"embedding": embedding, "answer": answer}
# Usage
cache = SemanticCache(embeddings, threshold=0.95)
def query_with_cache(question: str, chain) -> str:
cached = cache.get(question)
if cached:
return cached
answer = chain.invoke(question)
cache.set(question, answer)
return answer
Embedding Cache
Embedding the same document twice is pure waste. Cache embeddings at the document level and only recompute when the source changes.
import json
import hashlib
class EmbeddingCache:
def __init__(self, cache_file: str = "embedding_cache.json"):
self.cache_file = cache_file
try:
with open(cache_file, "r") as f:
self.cache = json.load(f)
except FileNotFoundError:
self.cache = {}
def get_or_compute(self, text: str, embed_fn) -> list[float]:
"""Return cached embedding or compute and cache it."""
text_hash = hashlib.sha256(text.encode()).hexdigest()
if text_hash in self.cache:
return self.cache[text_hash]
embedding = embed_fn(text)
self.cache[text_hash] = embedding
self._save()
return embedding
def _save(self):
with open(self.cache_file, "w") as f:
json.dump(self.cache, f)
Streaming Responses
Users waiting for a complete RAG response experience a painful delay -- often 3-10 seconds. Streaming starts showing the answer immediately as the LLM generates it.
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
# Stream the response
async def stream_rag_response(question: str, chain):
"""Stream RAG response token by token."""
async for chunk in chain.astream(question):
print(chunk, end="", flush=True)
print() # Newline at end
# For web applications (FastAPI example)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/query")
async def query_endpoint(question: str):
async def generate():
async for chunk in chain.astream(question):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Cost Optimization
LLM costs add up fast in production. Here are the key levers:
Model Selection
Not every query needs GPT-4o. Use a routing strategy:
def route_query(question: str) -> str:
"""Route to appropriate model based on query complexity."""
# Simple factual queries -> cheap model
# Complex reasoning queries -> expensive model
simple_patterns = ["what is", "when was", "how many", "who is"]
is_simple = any(question.lower().startswith(p) for p in simple_patterns)
if is_simple:
return "gpt-4o-mini" # ~10x cheaper
else:
return "gpt-4o"
Batch Processing
When indexing documents, batch your embedding calls:
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Instead of embedding one at a time
# for doc in documents:
# embedding = embeddings.embed_query(doc.page_content)
# Batch embed (much cheaper due to fewer API calls)
texts = [doc.page_content for doc in documents]
batch_embeddings = embeddings.embed_documents(texts)
Token Budget Management
Control how much context you send to the LLM:
def trim_context(docs: list, max_tokens: int = 3000) -> list:
"""Trim retrieved documents to fit within token budget."""
trimmed = []
total_tokens = 0
for doc in docs:
# Rough estimate: 1 token ~ 4 characters
doc_tokens = len(doc.page_content) // 4
if total_tokens + doc_tokens > max_tokens:
break
trimmed.append(doc)
total_tokens += doc_tokens
return trimmed
Monitoring
You cannot improve what you do not measure. Monitor these metrics in production:
Key Metrics to Track
- Latency -- Total response time, broken down by retrieval time, LLM time, and overhead.
- Retrieval relevance -- Log the similarity scores of retrieved documents. A sudden drop indicates embedding drift or data quality issues.
- User feedback -- Thumbs up/down on answers. This is the most direct signal of quality.
- Token usage -- Track input and output tokens per request for cost monitoring.
- Cache hit rate -- What percentage of queries are served from cache?
- Error rate -- API failures, timeouts, empty retrievals.
import time
import logging
from dataclasses import dataclass, asdict
logger = logging.getLogger("rag_monitor")
@dataclass
class RAGMetrics:
query: str
retrieval_time_ms: float
generation_time_ms: float
total_time_ms: float
num_chunks_retrieved: int
top_similarity_score: float
input_tokens: int
output_tokens: int
cache_hit: bool
def monitored_query(question: str, retriever, chain) -> tuple[str, RAGMetrics]:
"""Run a RAG query with full monitoring."""
start = time.time()
# Retrieval
retrieval_start = time.time()
docs = retriever.invoke(question)
retrieval_time = (time.time() - retrieval_start) * 1000
# Generation
gen_start = time.time()
answer = chain.invoke(question)
gen_time = (time.time() - gen_start) * 1000
total_time = (time.time() - start) * 1000
metrics = RAGMetrics(
query=question,
retrieval_time_ms=retrieval_time,
generation_time_ms=gen_time,
total_time_ms=total_time,
num_chunks_retrieved=len(docs),
top_similarity_score=0.0,
input_tokens=0,
output_tokens=0,
cache_hit=False,
)
logger.info(f"RAG query completed", extra=asdict(metrics))
return answer, metrics
Scaling
Async Processing
Handle multiple concurrent queries without blocking:
import asyncio
from langchain_openai import ChatOpenAI
async def process_queries(questions: list[str], chain) -> list[str]:
"""Process multiple queries concurrently."""
tasks = [chain.ainvoke(q) for q in questions]
return await asyncio.gather(*tasks)
Vector Database Sharding
For very large datasets, shard your vector database by category:
# Create separate collections for different document types
collections = {
"policies": chromadb_client.get_or_create_collection("policies"),
"engineering": chromadb_client.get_or_create_collection("engineering"),
"product": chromadb_client.get_or_create_collection("product"),
}
def routed_retrieval(query: str, category: str) -> list:
"""Route retrieval to the appropriate collection."""
collection = collections.get(category, collections["policies"])
return collection.query(query_texts=[query], n_results=5)
Security
PII Filtering
Prevent personally identifiable information from leaking through RAG responses:
import re
PII_PATTERNS = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
}
def filter_pii(text: str) -> str:
"""Remove PII from text before sending to LLM or returning to user."""
for pii_type, pattern in PII_PATTERNS.items():
text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
return text
def safe_rag_query(question: str, chain) -> str:
"""RAG query with PII filtering."""
safe_question = filter_pii(question)
answer = chain.invoke(safe_question)
return filter_pii(answer)
Access Control
Not all users should access all documents. Implement document-level access control:
def filtered_retrieval(query: str, user_role: str, retriever) -> list:
"""Retrieve only documents the user is authorized to see."""
role_access = {
"employee": ["public", "internal"],
"manager": ["public", "internal", "management"],
"admin": ["public", "internal", "management", "confidential"],
}
allowed_levels = role_access.get(user_role, ["public"])
results = retriever.invoke(
query,
filter={"access_level": {"$in": allowed_levels}},
)
return results
Input Validation
Protect against prompt injection attacks:
def validate_input(question: str) -> tuple[bool, str]:
"""Basic input validation for RAG queries."""
if len(question) > 1000:
return False, "Question too long. Please keep it under 1000 characters."
injection_patterns = [
"ignore previous instructions",
"ignore all instructions",
"you are now",
"system prompt",
]
lower_q = question.lower()
for pattern in injection_patterns:
if pattern in lower_q:
return False, "Invalid query detected."
return True, question
Data Freshness
Keep your knowledge base current with automated re-indexing:
from datetime import datetime, timedelta
def check_stale_documents(vectorstore, max_age_days: int = 30):
"""Find documents that may need re-indexing."""
cutoff = (datetime.now() - timedelta(days=max_age_days)).isoformat()
stale_docs = vectorstore.get(
where={"indexed_at": {"$lt": cutoff}},
)
if stale_docs:
print(f"Found {len(stale_docs['ids'])} documents older than {max_age_days} days")
return stale_docs
Production Checklist
Before going live, verify these items:
- Caching is configured for both embeddings and query results
- Streaming is enabled for the chat interface
- Monitoring tracks latency, relevance, errors, and costs
- PII filtering prevents data leaks in both input and output
- Access control restricts document visibility based on user roles
- Input validation protects against injection and abuse
- Error handling provides graceful fallbacks when components fail
- Rate limiting prevents abuse and controls costs
- Data freshness alerts fire when documents become stale
- Assessment suite runs on a regular schedule with alerts for regressions
In the final lesson, you will put everything together by building a complete knowledge base system from scratch.