Skip to content
Lesson 7 of 12

Building a RAG Pipeline

7 min read

From Components to System

In the previous five lessons, you learned each component of the RAG stack in isolation: embeddings, vector databases, document processing, chunking, and retrieval. Now it is time to assemble them into a complete, working pipeline that takes a user's question, retrieves relevant documents, and generates a grounded answer with source citations.

This is where RAG goes from theory to practice.

The End-to-End Pipeline

Every RAG system follows the same flow:

  1. Load -- ingest documents from their source formats
  2. Chunk -- split documents into retrievable segments
  3. Embed -- convert chunks into vectors
  4. Store -- save vectors in a vector database
  5. Retrieve -- find relevant chunks for a query
  6. Generate -- produce an answer grounded in the retrieved context

Let's build each step.

Step 1: Load and Chunk Documents

from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

def load_documents(data_dir: str) -> list:
    """Load documents from multiple formats."""
    all_docs = []

    # Load PDFs
    pdf_loader = DirectoryLoader(
        data_dir, glob="**/*.pdf", loader_cls=PyPDFLoader
    )
    all_docs.extend(pdf_loader.load())

    # Load Markdown files
    md_loader = DirectoryLoader(
        data_dir, glob="**/*.md", loader_cls=TextLoader,
        loader_kwargs={"encoding": "utf-8"}
    )
    all_docs.extend(md_loader.load())

    return all_docs

def chunk_documents(docs: list, chunk_size: int = 1000, overlap: int = 200) -> list:
    """Split documents into chunks."""
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=overlap,
        separators=["\n\n", "\n", ". ", " ", ""],
    )
    chunks = splitter.split_documents(docs)
    print(f"Split {len(docs)} documents into {len(chunks)} chunks")
    return chunks

# Execute
docs = load_documents("./data")
chunks = chunk_documents(docs)

Step 2: Embed and Store

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

def create_vectorstore(chunks: list, persist_dir: str = "./chroma_db") -> Chroma:
    """Embed chunks and store in ChromaDB."""
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_dir,
        collection_metadata={"hnsw:space": "cosine"},
    )

    print(f"Stored {len(chunks)} chunks in vector database")
    return vectorstore

vectorstore = create_vectorstore(chunks)

Step 3: Build the Retriever

def create_retriever(vectorstore, k: int = 5):
    """Create a retriever with MMR for diversity."""
    return vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={
            "k": k,
            "fetch_k": 20,
            "lambda_mult": 0.7,
        },
    )

retriever = create_retriever(vectorstore)

Step 4: Design the Prompt Template

The prompt template is the bridge between retrieval and generation. A well-designed prompt instructs the LLM to answer from the provided context, admit when it does not know, and cite its sources.

from langchain_core.prompts import ChatPromptTemplate

RAG_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful assistant that answers questions based on
the provided context. Follow these rules strictly:

1. Answer ONLY based on the provided context.
2. If the context does not contain enough information to answer,
   say "I don't have enough information to answer this question."
3. Cite the source document for each claim using [Source: filename].
4. Be concise and direct. Do not add information beyond what the
   context provides.
5. If multiple sources provide relevant information, synthesize
   them into a coherent answer.

Context:
{context}"""),
    ("human", "{question}"),
])

Key Prompt Design Principles

Ground the model. The instruction "Answer ONLY based on the provided context" is the most important line. Without it, the model will freely mix its parametric knowledge with the retrieved context, which defeats the purpose of RAG.

Handle unknowns gracefully. The "I don't know" instruction prevents the model from hallucinating when the context does not contain the answer. This is a feature, not a limitation.

Request citations. Asking for source references makes the answer verifiable. The user can check the original document if they need more detail or want to confirm the claim.

Step 5: Assemble the Chain

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

def format_docs(docs):
    """Format retrieved documents for the prompt."""
    formatted = []
    for doc in docs:
        source = doc.metadata.get("source", "Unknown")
        formatted.append(f"[Source: {source}]\n{doc.page_content}")
    return "\n\n---\n\n".join(formatted)

# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Build the RAG chain
rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough(),
    }
    | RAG_PROMPT
    | llm
    | StrOutputParser()
)

# Ask a question
answer = rag_chain.invoke("What is the company's remote work policy?")
print(answer)

Complete Working Example

Here is the entire pipeline in a single, runnable script:

import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# --- Configuration ---
DATA_DIR = "./data"
CHROMA_DIR = "./chroma_db"
EMBEDDING_MODEL = "text-embedding-3-small"
LLM_MODEL = "gpt-4o"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

# --- Step 1: Load and chunk ---
loader = DirectoryLoader(DATA_DIR, glob="**/*.md", loader_cls=TextLoader,
                         loader_kwargs={"encoding": "utf-8"})
docs = loader.load()

splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
)
chunks = splitter.split_documents(docs)
print(f"Loaded {len(docs)} docs -> {len(chunks)} chunks")

# --- Step 2: Embed and store ---
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
vectorstore = Chroma.from_documents(chunks, embeddings, persist_directory=CHROMA_DIR)

# --- Step 3: Retriever ---
retriever = vectorstore.as_retriever(
    search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20}
)

# --- Step 4: Prompt ---
prompt = ChatPromptTemplate.from_messages([
    ("system", """Answer based ONLY on the context below. If unsure, say so.
Cite sources as [Source: filename].

Context:
{context}"""),
    ("human", "{question}"),
])

# --- Step 5: Chain ---
def format_docs(docs):
    return "\n\n---\n\n".join(
        f"[Source: {d.metadata.get('source', '?')}]\n{d.page_content}"
        for d in docs
    )

chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model=LLM_MODEL, temperature=0)
    | StrOutputParser()
)

# --- Use it ---
response = chain.invoke("What is the vacation policy for new employees?")
print(response)

Using LlamaIndex Instead

LlamaIndex provides a higher-level abstraction for the same pipeline:

from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

# Configure global settings
Settings.llm = OpenAI(model="gpt-4o", temperature=0)
Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")

# Load, chunk, embed, and store in one step
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)

# Query
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("What is the vacation policy?")

print(response.response)
print("\nSources:")
for node in response.source_nodes:
    print(f"  - {node.metadata.get('file_name', 'Unknown')} "
          f"(score: {node.score:.4f})")

LlamaIndex is more opinionated and requires less code for standard pipelines. LangChain gives you more control over each component. Choose based on your team's preferences and how much customization you need.

Tracking Sources and Citations

Source tracking is not optional in production RAG. Users need to verify answers and understand where information came from.

from langchain_core.runnables import RunnableParallel

# Return both the answer and the source documents
rag_with_sources = RunnableParallel(
    answer=rag_chain,
    sources=retriever,
)

result = rag_with_sources.invoke("What is the PTO policy?")
print("Answer:", result["answer"])
print("\nSources:")
for doc in result["sources"]:
    print(f"  - {doc.metadata.get('source', 'Unknown')}")
    print(f"    Preview: {doc.page_content[:100]}...")

Handling Edge Cases

When the Context Has No Answer

If the retriever returns chunks that are not relevant to the question, the LLM should acknowledge this. Test your system with questions that are clearly outside the knowledge base to make sure the "I don't know" behavior works.

When Multiple Chunks Conflict

If different documents say different things, the LLM should note the discrepancy rather than choosing one arbitrarily. Add this to your prompt: "If sources contradict each other, mention the disagreement."

Long Contexts

If you retrieve many chunks, the total context might exceed the LLM's effective attention. Strategies to handle this: reduce the number of retrieved chunks, use contextual compression, or use a model with a longer context window.

Tips for Production

  • Use streaming. For chat interfaces, stream the response token-by-token instead of waiting for the complete answer. LangChain and LlamaIndex both support streaming natively.
  • Log everything. Log the query, retrieved documents, and generated answer for every interaction. This data is invaluable for debugging and evaluation.
  • Set temperature to 0. For factual RAG, you want deterministic, grounded answers. Temperature 0 reduces creative variation.
  • Test with adversarial questions. Ask questions that are slightly off-topic, use different phrasing, or reference things not in your knowledge base. These tests reveal where your pipeline breaks.

You now have a working RAG pipeline. In the next lesson, you will learn advanced patterns that push beyond basic retrieve-and-generate.