Building a RAG Pipeline
From Components to System
In the previous five lessons, you learned each component of the RAG stack in isolation: embeddings, vector databases, document processing, chunking, and retrieval. Now it is time to assemble them into a complete, working pipeline that takes a user's question, retrieves relevant documents, and generates a grounded answer with source citations.
This is where RAG goes from theory to practice.
The End-to-End Pipeline
Every RAG system follows the same flow:
- Load -- ingest documents from their source formats
- Chunk -- split documents into retrievable segments
- Embed -- convert chunks into vectors
- Store -- save vectors in a vector database
- Retrieve -- find relevant chunks for a query
- Generate -- produce an answer grounded in the retrieved context
Let's build each step.
Step 1: Load and Chunk Documents
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
def load_documents(data_dir: str) -> list:
"""Load documents from multiple formats."""
all_docs = []
# Load PDFs
pdf_loader = DirectoryLoader(
data_dir, glob="**/*.pdf", loader_cls=PyPDFLoader
)
all_docs.extend(pdf_loader.load())
# Load Markdown files
md_loader = DirectoryLoader(
data_dir, glob="**/*.md", loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"}
)
all_docs.extend(md_loader.load())
return all_docs
def chunk_documents(docs: list, chunk_size: int = 1000, overlap: int = 200) -> list:
"""Split documents into chunks."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
separators=["\n\n", "\n", ". ", " ", ""],
)
chunks = splitter.split_documents(docs)
print(f"Split {len(docs)} documents into {len(chunks)} chunks")
return chunks
# Execute
docs = load_documents("./data")
chunks = chunk_documents(docs)
Step 2: Embed and Store
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
def create_vectorstore(chunks: list, persist_dir: str = "./chroma_db") -> Chroma:
"""Embed chunks and store in ChromaDB."""
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_dir,
collection_metadata={"hnsw:space": "cosine"},
)
print(f"Stored {len(chunks)} chunks in vector database")
return vectorstore
vectorstore = create_vectorstore(chunks)
Step 3: Build the Retriever
def create_retriever(vectorstore, k: int = 5):
"""Create a retriever with MMR for diversity."""
return vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": k,
"fetch_k": 20,
"lambda_mult": 0.7,
},
)
retriever = create_retriever(vectorstore)
Step 4: Design the Prompt Template
The prompt template is the bridge between retrieval and generation. A well-designed prompt instructs the LLM to answer from the provided context, admit when it does not know, and cite its sources.
from langchain_core.prompts import ChatPromptTemplate
RAG_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant that answers questions based on
the provided context. Follow these rules strictly:
1. Answer ONLY based on the provided context.
2. If the context does not contain enough information to answer,
say "I don't have enough information to answer this question."
3. Cite the source document for each claim using [Source: filename].
4. Be concise and direct. Do not add information beyond what the
context provides.
5. If multiple sources provide relevant information, synthesize
them into a coherent answer.
Context:
{context}"""),
("human", "{question}"),
])
Key Prompt Design Principles
Ground the model. The instruction "Answer ONLY based on the provided context" is the most important line. Without it, the model will freely mix its parametric knowledge with the retrieved context, which defeats the purpose of RAG.
Handle unknowns gracefully. The "I don't know" instruction prevents the model from hallucinating when the context does not contain the answer. This is a feature, not a limitation.
Request citations. Asking for source references makes the answer verifiable. The user can check the original document if they need more detail or want to confirm the claim.
Step 5: Assemble the Chain
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
def format_docs(docs):
"""Format retrieved documents for the prompt."""
formatted = []
for doc in docs:
source = doc.metadata.get("source", "Unknown")
formatted.append(f"[Source: {source}]\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Build the RAG chain
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| RAG_PROMPT
| llm
| StrOutputParser()
)
# Ask a question
answer = rag_chain.invoke("What is the company's remote work policy?")
print(answer)
Complete Working Example
Here is the entire pipeline in a single, runnable script:
import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# --- Configuration ---
DATA_DIR = "./data"
CHROMA_DIR = "./chroma_db"
EMBEDDING_MODEL = "text-embedding-3-small"
LLM_MODEL = "gpt-4o"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
# --- Step 1: Load and chunk ---
loader = DirectoryLoader(DATA_DIR, glob="**/*.md", loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"})
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
)
chunks = splitter.split_documents(docs)
print(f"Loaded {len(docs)} docs -> {len(chunks)} chunks")
# --- Step 2: Embed and store ---
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
vectorstore = Chroma.from_documents(chunks, embeddings, persist_directory=CHROMA_DIR)
# --- Step 3: Retriever ---
retriever = vectorstore.as_retriever(
search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20}
)
# --- Step 4: Prompt ---
prompt = ChatPromptTemplate.from_messages([
("system", """Answer based ONLY on the context below. If unsure, say so.
Cite sources as [Source: filename].
Context:
{context}"""),
("human", "{question}"),
])
# --- Step 5: Chain ---
def format_docs(docs):
return "\n\n---\n\n".join(
f"[Source: {d.metadata.get('source', '?')}]\n{d.page_content}"
for d in docs
)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| ChatOpenAI(model=LLM_MODEL, temperature=0)
| StrOutputParser()
)
# --- Use it ---
response = chain.invoke("What is the vacation policy for new employees?")
print(response)
Using LlamaIndex Instead
LlamaIndex provides a higher-level abstraction for the same pipeline:
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
# Configure global settings
Settings.llm = OpenAI(model="gpt-4o", temperature=0)
Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
# Load, chunk, embed, and store in one step
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
# Query
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("What is the vacation policy?")
print(response.response)
print("\nSources:")
for node in response.source_nodes:
print(f" - {node.metadata.get('file_name', 'Unknown')} "
f"(score: {node.score:.4f})")
LlamaIndex is more opinionated and requires less code for standard pipelines. LangChain gives you more control over each component. Choose based on your team's preferences and how much customization you need.
Tracking Sources and Citations
Source tracking is not optional in production RAG. Users need to verify answers and understand where information came from.
from langchain_core.runnables import RunnableParallel
# Return both the answer and the source documents
rag_with_sources = RunnableParallel(
answer=rag_chain,
sources=retriever,
)
result = rag_with_sources.invoke("What is the PTO policy?")
print("Answer:", result["answer"])
print("\nSources:")
for doc in result["sources"]:
print(f" - {doc.metadata.get('source', 'Unknown')}")
print(f" Preview: {doc.page_content[:100]}...")
Handling Edge Cases
When the Context Has No Answer
If the retriever returns chunks that are not relevant to the question, the LLM should acknowledge this. Test your system with questions that are clearly outside the knowledge base to make sure the "I don't know" behavior works.
When Multiple Chunks Conflict
If different documents say different things, the LLM should note the discrepancy rather than choosing one arbitrarily. Add this to your prompt: "If sources contradict each other, mention the disagreement."
Long Contexts
If you retrieve many chunks, the total context might exceed the LLM's effective attention. Strategies to handle this: reduce the number of retrieved chunks, use contextual compression, or use a model with a longer context window.
Tips for Production
- Use streaming. For chat interfaces, stream the response token-by-token instead of waiting for the complete answer. LangChain and LlamaIndex both support streaming natively.
- Log everything. Log the query, retrieved documents, and generated answer for every interaction. This data is invaluable for debugging and evaluation.
- Set temperature to 0. For factual RAG, you want deterministic, grounded answers. Temperature 0 reduces creative variation.
- Test with adversarial questions. Ask questions that are slightly off-topic, use different phrasing, or reference things not in your knowledge base. These tests reveal where your pipeline breaks.
You now have a working RAG pipeline. In the next lesson, you will learn advanced patterns that push beyond basic retrieve-and-generate.