Skip to content
Lesson 11 of 12

Production RAG

7 min read

From Prototype to Production

A RAG system that works in a notebook is fundamentally different from one that serves real users. In production, you face latency budgets, cost constraints, concurrent users, stale data, adversarial inputs, and compliance requirements. This lesson covers the engineering practices that make a RAG system reliable, fast, and affordable at scale.

Caching Strategies

Caching is the single most effective optimization for production RAG. Many users ask similar or identical questions, and computing embeddings and LLM responses from scratch every time is wasteful.

Semantic Cache

A semantic cache stores query-answer pairs and returns cached answers for semantically similar queries. If a user asks "What is the PTO policy?" and another asks "How many vacation days do I get?", the semantic cache recognizes them as the same question.

from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
import hashlib
import numpy as np

class SemanticCache:
    def __init__(self, embeddings, threshold: float = 0.95):
        self.embeddings = embeddings
        self.threshold = threshold
        self.cache = {}  # {embedding_hash: {"embedding": [...], "answer": "..."}}

    def get(self, query: str) -> str | None:
        """Check cache for semantically similar query."""
        query_embedding = self.embeddings.embed_query(query)

        for key, entry in self.cache.items():
            similarity = np.dot(query_embedding, entry["embedding"])
            if similarity >= self.threshold:
                return entry["answer"]
        return None

    def set(self, query: str, answer: str):
        """Cache a query-answer pair."""
        embedding = self.embeddings.embed_query(query)
        key = hashlib.md5(query.encode()).hexdigest()
        self.cache[key] = {"embedding": embedding, "answer": answer}

# Usage
cache = SemanticCache(embeddings, threshold=0.95)

def query_with_cache(question: str, chain) -> str:
    cached = cache.get(question)
    if cached:
        return cached

    answer = chain.invoke(question)
    cache.set(question, answer)
    return answer

Embedding Cache

Embedding the same document twice is pure waste. Cache embeddings at the document level and only recompute when the source changes.

import json
import hashlib

class EmbeddingCache:
    def __init__(self, cache_file: str = "embedding_cache.json"):
        self.cache_file = cache_file
        try:
            with open(cache_file, "r") as f:
                self.cache = json.load(f)
        except FileNotFoundError:
            self.cache = {}

    def get_or_compute(self, text: str, embed_fn) -> list[float]:
        """Return cached embedding or compute and cache it."""
        text_hash = hashlib.sha256(text.encode()).hexdigest()

        if text_hash in self.cache:
            return self.cache[text_hash]

        embedding = embed_fn(text)
        self.cache[text_hash] = embedding
        self._save()
        return embedding

    def _save(self):
        with open(self.cache_file, "w") as f:
            json.dump(self.cache, f)

Streaming Responses

Users waiting for a complete RAG response experience a painful delay -- often 3-10 seconds. Streaming starts showing the answer immediately as the LLM generates it.

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)

# Stream the response
async def stream_rag_response(question: str, chain):
    """Stream RAG response token by token."""
    async for chunk in chain.astream(question):
        print(chunk, end="", flush=True)
    print()  # Newline at end

# For web applications (FastAPI example)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/query")
async def query_endpoint(question: str):
    async def generate():
        async for chunk in chain.astream(question):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Cost Optimization

LLM costs add up fast in production. Here are the key levers:

Model Selection

Not every query needs GPT-4o. Use a routing strategy:

def route_query(question: str) -> str:
    """Route to appropriate model based on query complexity."""
    # Simple factual queries -> cheap model
    # Complex reasoning queries -> expensive model

    simple_patterns = ["what is", "when was", "how many", "who is"]
    is_simple = any(question.lower().startswith(p) for p in simple_patterns)

    if is_simple:
        return "gpt-4o-mini"  # ~10x cheaper
    else:
        return "gpt-4o"

Batch Processing

When indexing documents, batch your embedding calls:

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# Instead of embedding one at a time
# for doc in documents:
#     embedding = embeddings.embed_query(doc.page_content)

# Batch embed (much cheaper due to fewer API calls)
texts = [doc.page_content for doc in documents]
batch_embeddings = embeddings.embed_documents(texts)

Token Budget Management

Control how much context you send to the LLM:

def trim_context(docs: list, max_tokens: int = 3000) -> list:
    """Trim retrieved documents to fit within token budget."""
    trimmed = []
    total_tokens = 0

    for doc in docs:
        # Rough estimate: 1 token ~ 4 characters
        doc_tokens = len(doc.page_content) // 4

        if total_tokens + doc_tokens > max_tokens:
            break

        trimmed.append(doc)
        total_tokens += doc_tokens

    return trimmed

Monitoring

You cannot improve what you do not measure. Monitor these metrics in production:

Key Metrics to Track

  • Latency -- Total response time, broken down by retrieval time, LLM time, and overhead.
  • Retrieval relevance -- Log the similarity scores of retrieved documents. A sudden drop indicates embedding drift or data quality issues.
  • User feedback -- Thumbs up/down on answers. This is the most direct signal of quality.
  • Token usage -- Track input and output tokens per request for cost monitoring.
  • Cache hit rate -- What percentage of queries are served from cache?
  • Error rate -- API failures, timeouts, empty retrievals.
import time
import logging
from dataclasses import dataclass, asdict

logger = logging.getLogger("rag_monitor")

@dataclass
class RAGMetrics:
    query: str
    retrieval_time_ms: float
    generation_time_ms: float
    total_time_ms: float
    num_chunks_retrieved: int
    top_similarity_score: float
    input_tokens: int
    output_tokens: int
    cache_hit: bool

def monitored_query(question: str, retriever, chain) -> tuple[str, RAGMetrics]:
    """Run a RAG query with full monitoring."""
    start = time.time()

    # Retrieval
    retrieval_start = time.time()
    docs = retriever.invoke(question)
    retrieval_time = (time.time() - retrieval_start) * 1000

    # Generation
    gen_start = time.time()
    answer = chain.invoke(question)
    gen_time = (time.time() - gen_start) * 1000

    total_time = (time.time() - start) * 1000

    metrics = RAGMetrics(
        query=question,
        retrieval_time_ms=retrieval_time,
        generation_time_ms=gen_time,
        total_time_ms=total_time,
        num_chunks_retrieved=len(docs),
        top_similarity_score=0.0,
        input_tokens=0,
        output_tokens=0,
        cache_hit=False,
    )

    logger.info(f"RAG query completed", extra=asdict(metrics))
    return answer, metrics

Scaling

Async Processing

Handle multiple concurrent queries without blocking:

import asyncio
from langchain_openai import ChatOpenAI

async def process_queries(questions: list[str], chain) -> list[str]:
    """Process multiple queries concurrently."""
    tasks = [chain.ainvoke(q) for q in questions]
    return await asyncio.gather(*tasks)

Vector Database Sharding

For very large datasets, shard your vector database by category:

# Create separate collections for different document types
collections = {
    "policies": chromadb_client.get_or_create_collection("policies"),
    "engineering": chromadb_client.get_or_create_collection("engineering"),
    "product": chromadb_client.get_or_create_collection("product"),
}

def routed_retrieval(query: str, category: str) -> list:
    """Route retrieval to the appropriate collection."""
    collection = collections.get(category, collections["policies"])
    return collection.query(query_texts=[query], n_results=5)

Security

PII Filtering

Prevent personally identifiable information from leaking through RAG responses:

import re

PII_PATTERNS = {
    "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
    "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
    "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
}

def filter_pii(text: str) -> str:
    """Remove PII from text before sending to LLM or returning to user."""
    for pii_type, pattern in PII_PATTERNS.items():
        text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
    return text

def safe_rag_query(question: str, chain) -> str:
    """RAG query with PII filtering."""
    safe_question = filter_pii(question)
    answer = chain.invoke(safe_question)
    return filter_pii(answer)

Access Control

Not all users should access all documents. Implement document-level access control:

def filtered_retrieval(query: str, user_role: str, retriever) -> list:
    """Retrieve only documents the user is authorized to see."""
    role_access = {
        "employee": ["public", "internal"],
        "manager": ["public", "internal", "management"],
        "admin": ["public", "internal", "management", "confidential"],
    }

    allowed_levels = role_access.get(user_role, ["public"])

    results = retriever.invoke(
        query,
        filter={"access_level": {"$in": allowed_levels}},
    )
    return results

Input Validation

Protect against prompt injection attacks:

def validate_input(question: str) -> tuple[bool, str]:
    """Basic input validation for RAG queries."""
    if len(question) > 1000:
        return False, "Question too long. Please keep it under 1000 characters."

    injection_patterns = [
        "ignore previous instructions",
        "ignore all instructions",
        "you are now",
        "system prompt",
    ]
    lower_q = question.lower()
    for pattern in injection_patterns:
        if pattern in lower_q:
            return False, "Invalid query detected."

    return True, question

Data Freshness

Keep your knowledge base current with automated re-indexing:

from datetime import datetime, timedelta

def check_stale_documents(vectorstore, max_age_days: int = 30):
    """Find documents that may need re-indexing."""
    cutoff = (datetime.now() - timedelta(days=max_age_days)).isoformat()

    stale_docs = vectorstore.get(
        where={"indexed_at": {"$lt": cutoff}},
    )

    if stale_docs:
        print(f"Found {len(stale_docs['ids'])} documents older than {max_age_days} days")

    return stale_docs

Production Checklist

Before going live, verify these items:

  • Caching is configured for both embeddings and query results
  • Streaming is enabled for the chat interface
  • Monitoring tracks latency, relevance, errors, and costs
  • PII filtering prevents data leaks in both input and output
  • Access control restricts document visibility based on user roles
  • Input validation protects against injection and abuse
  • Error handling provides graceful fallbacks when components fail
  • Rate limiting prevents abuse and controls costs
  • Data freshness alerts fire when documents become stale
  • Assessment suite runs on a regular schedule with alerts for regressions

In the final lesson, you will put everything together by building a complete knowledge base system from scratch.