RAG en Produccion
De Prototipo a Produccion
Un sistema RAG que funciona en un notebook es fundamentalmente diferente de uno que sirve a usuarios reales. En produccion, enfrentas presupuestos de latencia, restricciones de costos, usuarios concurrentes, datos obsoletos, entradas adversarias y requisitos de cumplimiento. Esta leccion cubre las practicas de ingenieria que hacen un sistema RAG confiable, rapido y asequible a escala.
Estrategias de Cache
El cache es la optimizacion mas efectiva para RAG en produccion. Muchos usuarios hacen preguntas similares o identicas, y calcular embeddings y respuestas LLM desde cero cada vez es un desperdicio.
Cache Semantico
Un cache semantico almacena pares consulta-respuesta y retorna respuestas en cache para consultas semanticamente similares. Si un usuario pregunta "Cual es la politica de PTO?" y otro pregunta "Cuantos dias de vacaciones tengo?", el cache semantico las reconoce como la misma pregunta.
from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
import hashlib
import numpy as np
class SemanticCache:
def __init__(self, embeddings, threshold: float = 0.95):
self.embeddings = embeddings
self.threshold = threshold
self.cache = {} # {hash_embedding: {"embedding": [...], "answer": "..."}}
def get(self, query: str) -> str | None:
"""Verificar cache para consulta semanticamente similar."""
query_embedding = self.embeddings.embed_query(query)
for key, entry in self.cache.items():
similarity = np.dot(query_embedding, entry["embedding"])
if similarity >= self.threshold:
return entry["answer"]
return None
def set(self, query: str, answer: str):
"""Guardar en cache un par consulta-respuesta."""
embedding = self.embeddings.embed_query(query)
key = hashlib.md5(query.encode()).hexdigest()
self.cache[key] = {"embedding": embedding, "answer": answer}
# Uso
cache = SemanticCache(embeddings, threshold=0.95)
def query_with_cache(question: str, chain) -> str:
cached = cache.get(question)
if cached:
return cached
answer = chain.invoke(question)
cache.set(question, answer)
return answer
Cache de Embeddings
Embeber el mismo documento dos veces es puro desperdicio. Almacena embeddings en cache a nivel de documento y solo recalcula cuando la fuente cambia.
import json
import hashlib
class EmbeddingCache:
def __init__(self, cache_file: str = "embedding_cache.json"):
self.cache_file = cache_file
try:
with open(cache_file, "r") as f:
self.cache = json.load(f)
except FileNotFoundError:
self.cache = {}
def get_or_compute(self, text: str, embed_fn) -> list[float]:
"""Retornar embedding en cache o calcular y almacenar."""
text_hash = hashlib.sha256(text.encode()).hexdigest()
if text_hash in self.cache:
return self.cache[text_hash]
embedding = embed_fn(text)
self.cache[text_hash] = embedding
self._save()
return embedding
def _save(self):
with open(self.cache_file, "w") as f:
json.dump(self.cache, f)
Streaming de Respuestas
Los usuarios esperando una respuesta RAG completa experimentan un retraso doloroso -- a menudo 3-10 segundos. El streaming comienza a mostrar la respuesta inmediatamente mientras el LLM la genera.
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
# Transmitir la respuesta
async def stream_rag_response(question: str, chain):
"""Transmitir respuesta RAG token por token."""
async for chunk in chain.astream(question):
print(chunk, end="", flush=True)
print() # Nueva linea al final
# Para aplicaciones web (ejemplo FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/query")
async def query_endpoint(question: str):
async def generate():
async for chunk in chain.astream(question):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Optimizacion de Costos
Los costos de LLM se acumulan rapido en produccion. Estas son las palancas principales:
Seleccion de Modelo
No toda consulta necesita GPT-4o. Usa una estrategia de enrutamiento:
def route_query(question: str) -> str:
"""Enrutar al modelo apropiado basandose en complejidad de la consulta."""
simple_patterns = ["que es", "cuando fue", "cuantos", "quien es"]
is_simple = any(question.lower().startswith(p) for p in simple_patterns)
if is_simple:
return "gpt-4o-mini" # ~10x mas barato
else:
return "gpt-4o"
Procesamiento por Lotes
Cuando indexas documentos, agrupa tus llamadas de embedding:
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# En lugar de embeber uno a la vez
# for doc in documents:
# embedding = embeddings.embed_query(doc.page_content)
# Embeber en lote (mucho mas barato por menos llamadas API)
texts = [doc.page_content for doc in documents]
batch_embeddings = embeddings.embed_documents(texts)
Gestion de Presupuesto de Tokens
Controla cuanto contexto envias al LLM:
def trim_context(docs: list, max_tokens: int = 3000) -> list:
"""Recortar documentos recuperados para ajustarse al presupuesto de tokens."""
trimmed = []
total_tokens = 0
for doc in docs:
# Estimacion aproximada: 1 token ~ 4 caracteres
doc_tokens = len(doc.page_content) // 4
if total_tokens + doc_tokens > max_tokens:
break
trimmed.append(doc)
total_tokens += doc_tokens
return trimmed
Monitoreo
No puedes mejorar lo que no mides. Monitorea estas metricas en produccion:
Metricas Clave a Rastrear
- Latencia -- Tiempo total de respuesta, desglosado por tiempo de recuperacion, tiempo de LLM y overhead.
- Relevancia de recuperacion -- Registra las puntuaciones de similitud de documentos recuperados. Una caida repentina indica drift de embeddings o problemas de calidad de datos.
- Feedback del usuario -- Pulgar arriba/abajo en respuestas. Esta es la senal mas directa de calidad.
- Uso de tokens -- Rastrea tokens de entrada y salida por solicitud para monitoreo de costos.
- Tasa de aciertos de cache -- Que porcentaje de consultas se sirven desde cache?
- Tasa de errores -- Fallos de API, timeouts, recuperaciones vacias.
import time
import logging
from dataclasses import dataclass, asdict
logger = logging.getLogger("rag_monitor")
@dataclass
class RAGMetrics:
query: str
retrieval_time_ms: float
generation_time_ms: float
total_time_ms: float
num_chunks_retrieved: int
top_similarity_score: float
input_tokens: int
output_tokens: int
cache_hit: bool
def monitored_query(question: str, retriever, chain) -> tuple[str, RAGMetrics]:
"""Ejecutar una consulta RAG con monitoreo completo."""
start = time.time()
# Recuperacion
retrieval_start = time.time()
docs = retriever.invoke(question)
retrieval_time = (time.time() - retrieval_start) * 1000
# Generacion
gen_start = time.time()
answer = chain.invoke(question)
gen_time = (time.time() - gen_start) * 1000
total_time = (time.time() - start) * 1000
metrics = RAGMetrics(
query=question,
retrieval_time_ms=retrieval_time,
generation_time_ms=gen_time,
total_time_ms=total_time,
num_chunks_retrieved=len(docs),
top_similarity_score=0.0,
input_tokens=0,
output_tokens=0,
cache_hit=False,
)
logger.info(f"Consulta RAG completada", extra=asdict(metrics))
return answer, metrics
Escalado
Procesamiento Asincrono
Maneja multiples consultas concurrentes sin bloqueo:
import asyncio
from langchain_openai import ChatOpenAI
async def process_queries(questions: list[str], chain) -> list[str]:
"""Procesar multiples consultas concurrentemente."""
tasks = [chain.ainvoke(q) for q in questions]
return await asyncio.gather(*tasks)
Sharding de Base de Datos Vectorial
Para conjuntos de datos muy grandes, divide tu base de datos vectorial por categoria:
# Crear colecciones separadas para diferentes tipos de documentos
collections = {
"policies": chromadb_client.get_or_create_collection("policies"),
"engineering": chromadb_client.get_or_create_collection("engineering"),
"product": chromadb_client.get_or_create_collection("product"),
}
def routed_retrieval(query: str, category: str) -> list:
"""Enrutar recuperacion a la coleccion apropiada."""
collection = collections.get(category, collections["policies"])
return collection.query(query_texts=[query], n_results=5)
Seguridad
Filtrado de PII
Previene que informacion de identificacion personal se filtre a traves de respuestas RAG:
import re
PII_PATTERNS = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
}
def filter_pii(text: str) -> str:
"""Eliminar PII del texto antes de enviar al LLM o retornar al usuario."""
for pii_type, pattern in PII_PATTERNS.items():
text = re.sub(pattern, f"[REDACTADO_{pii_type.upper()}]", text)
return text
def safe_rag_query(question: str, chain) -> str:
"""Consulta RAG con filtrado de PII."""
safe_question = filter_pii(question)
answer = chain.invoke(safe_question)
return filter_pii(answer)
Control de Acceso
No todos los usuarios deberian acceder a todos los documentos. Implementa control de acceso a nivel de documento:
def filtered_retrieval(query: str, user_role: str, retriever) -> list:
"""Recuperar solo documentos que el usuario esta autorizado a ver."""
role_access = {
"employee": ["public", "internal"],
"manager": ["public", "internal", "management"],
"admin": ["public", "internal", "management", "confidential"],
}
allowed_levels = role_access.get(user_role, ["public"])
results = retriever.invoke(
query,
filter={"access_level": {"$in": allowed_levels}},
)
return results
Validacion de Entrada
Protege contra ataques de inyeccion de prompt:
def validate_input(question: str) -> tuple[bool, str]:
"""Validacion basica de entrada para consultas RAG."""
if len(question) > 1000:
return False, "Pregunta demasiado larga. Mantener bajo 1000 caracteres."
injection_patterns = [
"ignora instrucciones anteriores",
"ignora todas las instrucciones",
"ahora eres",
"system prompt",
]
lower_q = question.lower()
for pattern in injection_patterns:
if pattern in lower_q:
return False, "Consulta invalida detectada."
return True, question
Frescura de Datos
Manten tu base de conocimiento actualizada con re-indexacion automatizada:
from datetime import datetime, timedelta
def check_stale_documents(vectorstore, max_age_days: int = 30):
"""Encontrar documentos que pueden necesitar re-indexacion."""
cutoff = (datetime.now() - timedelta(days=max_age_days)).isoformat()
stale_docs = vectorstore.get(
where={"indexed_at": {"$lt": cutoff}},
)
if stale_docs:
print(f"Encontrados {len(stale_docs['ids'])} documentos mas antiguos que {max_age_days} dias")
return stale_docs
Lista de Verificacion para Produccion
Antes de ir en vivo, verifica estos elementos:
- Cache configurado tanto para embeddings como para resultados de consultas
- Streaming habilitado para la interfaz de chat
- Monitoreo rastrea latencia, relevancia, errores y costos
- Filtrado de PII previene fugas de datos tanto en entrada como en salida
- Control de acceso restringe visibilidad de documentos basado en roles de usuario
- Validacion de entrada protege contra inyeccion y abuso
- Manejo de errores proporciona fallbacks elegantes cuando los componentes fallan
- Limitacion de tasa previene abuso y controla costos
- Frescura de datos alertas se disparan cuando los documentos se vuelven obsoletos
- Suite de pruebas se ejecuta en un horario regular con alertas para regresiones
En la leccion final, pondras todo junto construyendo un sistema de base de conocimiento completo desde cero.