Saltar al contenido
Lección 11 de 12

RAG en Produccion

8 min read

De Prototipo a Produccion

Un sistema RAG que funciona en un notebook es fundamentalmente diferente de uno que sirve a usuarios reales. En produccion, enfrentas presupuestos de latencia, restricciones de costos, usuarios concurrentes, datos obsoletos, entradas adversarias y requisitos de cumplimiento. Esta leccion cubre las practicas de ingenieria que hacen un sistema RAG confiable, rapido y asequible a escala.

Estrategias de Cache

El cache es la optimizacion mas efectiva para RAG en produccion. Muchos usuarios hacen preguntas similares o identicas, y calcular embeddings y respuestas LLM desde cero cada vez es un desperdicio.

Cache Semantico

Un cache semantico almacena pares consulta-respuesta y retorna respuestas en cache para consultas semanticamente similares. Si un usuario pregunta "Cual es la politica de PTO?" y otro pregunta "Cuantos dias de vacaciones tengo?", el cache semantico las reconoce como la misma pregunta.

from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
import hashlib
import numpy as np

class SemanticCache:
    def __init__(self, embeddings, threshold: float = 0.95):
        self.embeddings = embeddings
        self.threshold = threshold
        self.cache = {}  # {hash_embedding: {"embedding": [...], "answer": "..."}}

    def get(self, query: str) -> str | None:
        """Verificar cache para consulta semanticamente similar."""
        query_embedding = self.embeddings.embed_query(query)

        for key, entry in self.cache.items():
            similarity = np.dot(query_embedding, entry["embedding"])
            if similarity >= self.threshold:
                return entry["answer"]
        return None

    def set(self, query: str, answer: str):
        """Guardar en cache un par consulta-respuesta."""
        embedding = self.embeddings.embed_query(query)
        key = hashlib.md5(query.encode()).hexdigest()
        self.cache[key] = {"embedding": embedding, "answer": answer}

# Uso
cache = SemanticCache(embeddings, threshold=0.95)

def query_with_cache(question: str, chain) -> str:
    cached = cache.get(question)
    if cached:
        return cached

    answer = chain.invoke(question)
    cache.set(question, answer)
    return answer

Cache de Embeddings

Embeber el mismo documento dos veces es puro desperdicio. Almacena embeddings en cache a nivel de documento y solo recalcula cuando la fuente cambia.

import json
import hashlib

class EmbeddingCache:
    def __init__(self, cache_file: str = "embedding_cache.json"):
        self.cache_file = cache_file
        try:
            with open(cache_file, "r") as f:
                self.cache = json.load(f)
        except FileNotFoundError:
            self.cache = {}

    def get_or_compute(self, text: str, embed_fn) -> list[float]:
        """Retornar embedding en cache o calcular y almacenar."""
        text_hash = hashlib.sha256(text.encode()).hexdigest()

        if text_hash in self.cache:
            return self.cache[text_hash]

        embedding = embed_fn(text)
        self.cache[text_hash] = embedding
        self._save()
        return embedding

    def _save(self):
        with open(self.cache_file, "w") as f:
            json.dump(self.cache, f)

Streaming de Respuestas

Los usuarios esperando una respuesta RAG completa experimentan un retraso doloroso -- a menudo 3-10 segundos. El streaming comienza a mostrar la respuesta inmediatamente mientras el LLM la genera.

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)

# Transmitir la respuesta
async def stream_rag_response(question: str, chain):
    """Transmitir respuesta RAG token por token."""
    async for chunk in chain.astream(question):
        print(chunk, end="", flush=True)
    print()  # Nueva linea al final

# Para aplicaciones web (ejemplo FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/query")
async def query_endpoint(question: str):
    async def generate():
        async for chunk in chain.astream(question):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Optimizacion de Costos

Los costos de LLM se acumulan rapido en produccion. Estas son las palancas principales:

Seleccion de Modelo

No toda consulta necesita GPT-4o. Usa una estrategia de enrutamiento:

def route_query(question: str) -> str:
    """Enrutar al modelo apropiado basandose en complejidad de la consulta."""
    simple_patterns = ["que es", "cuando fue", "cuantos", "quien es"]
    is_simple = any(question.lower().startswith(p) for p in simple_patterns)

    if is_simple:
        return "gpt-4o-mini"  # ~10x mas barato
    else:
        return "gpt-4o"

Procesamiento por Lotes

Cuando indexas documentos, agrupa tus llamadas de embedding:

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# En lugar de embeber uno a la vez
# for doc in documents:
#     embedding = embeddings.embed_query(doc.page_content)

# Embeber en lote (mucho mas barato por menos llamadas API)
texts = [doc.page_content for doc in documents]
batch_embeddings = embeddings.embed_documents(texts)

Gestion de Presupuesto de Tokens

Controla cuanto contexto envias al LLM:

def trim_context(docs: list, max_tokens: int = 3000) -> list:
    """Recortar documentos recuperados para ajustarse al presupuesto de tokens."""
    trimmed = []
    total_tokens = 0

    for doc in docs:
        # Estimacion aproximada: 1 token ~ 4 caracteres
        doc_tokens = len(doc.page_content) // 4

        if total_tokens + doc_tokens > max_tokens:
            break

        trimmed.append(doc)
        total_tokens += doc_tokens

    return trimmed

Monitoreo

No puedes mejorar lo que no mides. Monitorea estas metricas en produccion:

Metricas Clave a Rastrear

  • Latencia -- Tiempo total de respuesta, desglosado por tiempo de recuperacion, tiempo de LLM y overhead.
  • Relevancia de recuperacion -- Registra las puntuaciones de similitud de documentos recuperados. Una caida repentina indica drift de embeddings o problemas de calidad de datos.
  • Feedback del usuario -- Pulgar arriba/abajo en respuestas. Esta es la senal mas directa de calidad.
  • Uso de tokens -- Rastrea tokens de entrada y salida por solicitud para monitoreo de costos.
  • Tasa de aciertos de cache -- Que porcentaje de consultas se sirven desde cache?
  • Tasa de errores -- Fallos de API, timeouts, recuperaciones vacias.
import time
import logging
from dataclasses import dataclass, asdict

logger = logging.getLogger("rag_monitor")

@dataclass
class RAGMetrics:
    query: str
    retrieval_time_ms: float
    generation_time_ms: float
    total_time_ms: float
    num_chunks_retrieved: int
    top_similarity_score: float
    input_tokens: int
    output_tokens: int
    cache_hit: bool

def monitored_query(question: str, retriever, chain) -> tuple[str, RAGMetrics]:
    """Ejecutar una consulta RAG con monitoreo completo."""
    start = time.time()

    # Recuperacion
    retrieval_start = time.time()
    docs = retriever.invoke(question)
    retrieval_time = (time.time() - retrieval_start) * 1000

    # Generacion
    gen_start = time.time()
    answer = chain.invoke(question)
    gen_time = (time.time() - gen_start) * 1000

    total_time = (time.time() - start) * 1000

    metrics = RAGMetrics(
        query=question,
        retrieval_time_ms=retrieval_time,
        generation_time_ms=gen_time,
        total_time_ms=total_time,
        num_chunks_retrieved=len(docs),
        top_similarity_score=0.0,
        input_tokens=0,
        output_tokens=0,
        cache_hit=False,
    )

    logger.info(f"Consulta RAG completada", extra=asdict(metrics))
    return answer, metrics

Escalado

Procesamiento Asincrono

Maneja multiples consultas concurrentes sin bloqueo:

import asyncio
from langchain_openai import ChatOpenAI

async def process_queries(questions: list[str], chain) -> list[str]:
    """Procesar multiples consultas concurrentemente."""
    tasks = [chain.ainvoke(q) for q in questions]
    return await asyncio.gather(*tasks)

Sharding de Base de Datos Vectorial

Para conjuntos de datos muy grandes, divide tu base de datos vectorial por categoria:

# Crear colecciones separadas para diferentes tipos de documentos
collections = {
    "policies": chromadb_client.get_or_create_collection("policies"),
    "engineering": chromadb_client.get_or_create_collection("engineering"),
    "product": chromadb_client.get_or_create_collection("product"),
}

def routed_retrieval(query: str, category: str) -> list:
    """Enrutar recuperacion a la coleccion apropiada."""
    collection = collections.get(category, collections["policies"])
    return collection.query(query_texts=[query], n_results=5)

Seguridad

Filtrado de PII

Previene que informacion de identificacion personal se filtre a traves de respuestas RAG:

import re

PII_PATTERNS = {
    "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
    "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
    "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
}

def filter_pii(text: str) -> str:
    """Eliminar PII del texto antes de enviar al LLM o retornar al usuario."""
    for pii_type, pattern in PII_PATTERNS.items():
        text = re.sub(pattern, f"[REDACTADO_{pii_type.upper()}]", text)
    return text

def safe_rag_query(question: str, chain) -> str:
    """Consulta RAG con filtrado de PII."""
    safe_question = filter_pii(question)
    answer = chain.invoke(safe_question)
    return filter_pii(answer)

Control de Acceso

No todos los usuarios deberian acceder a todos los documentos. Implementa control de acceso a nivel de documento:

def filtered_retrieval(query: str, user_role: str, retriever) -> list:
    """Recuperar solo documentos que el usuario esta autorizado a ver."""
    role_access = {
        "employee": ["public", "internal"],
        "manager": ["public", "internal", "management"],
        "admin": ["public", "internal", "management", "confidential"],
    }

    allowed_levels = role_access.get(user_role, ["public"])

    results = retriever.invoke(
        query,
        filter={"access_level": {"$in": allowed_levels}},
    )
    return results

Validacion de Entrada

Protege contra ataques de inyeccion de prompt:

def validate_input(question: str) -> tuple[bool, str]:
    """Validacion basica de entrada para consultas RAG."""
    if len(question) > 1000:
        return False, "Pregunta demasiado larga. Mantener bajo 1000 caracteres."

    injection_patterns = [
        "ignora instrucciones anteriores",
        "ignora todas las instrucciones",
        "ahora eres",
        "system prompt",
    ]
    lower_q = question.lower()
    for pattern in injection_patterns:
        if pattern in lower_q:
            return False, "Consulta invalida detectada."

    return True, question

Frescura de Datos

Manten tu base de conocimiento actualizada con re-indexacion automatizada:

from datetime import datetime, timedelta

def check_stale_documents(vectorstore, max_age_days: int = 30):
    """Encontrar documentos que pueden necesitar re-indexacion."""
    cutoff = (datetime.now() - timedelta(days=max_age_days)).isoformat()

    stale_docs = vectorstore.get(
        where={"indexed_at": {"$lt": cutoff}},
    )

    if stale_docs:
        print(f"Encontrados {len(stale_docs['ids'])} documentos mas antiguos que {max_age_days} dias")

    return stale_docs

Lista de Verificacion para Produccion

Antes de ir en vivo, verifica estos elementos:

  • Cache configurado tanto para embeddings como para resultados de consultas
  • Streaming habilitado para la interfaz de chat
  • Monitoreo rastrea latencia, relevancia, errores y costos
  • Filtrado de PII previene fugas de datos tanto en entrada como en salida
  • Control de acceso restringe visibilidad de documentos basado en roles de usuario
  • Validacion de entrada protege contra inyeccion y abuso
  • Manejo de errores proporciona fallbacks elegantes cuando los componentes fallan
  • Limitacion de tasa previene abuso y controla costos
  • Frescura de datos alertas se disparan cuando los documentos se vuelven obsoletos
  • Suite de pruebas se ejecuta en un horario regular con alertas para regresiones

En la leccion final, pondras todo junto construyendo un sistema de base de conocimiento completo desde cero.