Tutorial

Build a RAG agent to answer complex questions

Use Python, LangGraph, watsonx.ai, Elasticsearch, and Tavily to build a customized, modular agentic AI system.

AI developers are focussed on building the next frontier of AI systems called LLM agents (LLM-based AI agents). These systems will likely automate long-horizon tasks, whereas current AI systems mainly handle simple questions and short-horizon tasks.

An example of a long-horizon task could be a typical 30-minute ChatGPT session, where you ask several questions with follow-ups, copy and paste good responses elsewhere (for example, a spreadsheet, document, or IDE), ignore the bad responses, synthesize and edit the responses, use the internet to fill in the gaps, plot some data, draw a diagram, and so on.

These intermediate tasks (lets call them "glue tasks"), which you perform to piece together LLM outputs, are fully tractable with today’s best AI models.

In areas where LLMs struggle (such as math), we can now connect to external tools (for example, a calculator) via APIs. The newest class of LLMs (such as Llama 3.1) were deliberately aligned for tool-calling skills to enable an ecosystem of agentic application development.

At work, we are rarely asked to produce a deliverable instantly and without any external help (the way ChatGPT does). Instead, we usually seek feedback, collaborate with colleagues, make iterative edits, use various tools, and reflect on our work before submitting. LLM agent systems embrace this same core philosophy.

In the long term, these LLM agents might look like AI colleagues that you interact with via Slack. Imagine delegating work to your AI colleague via Slack, jumping on a meeting, and then checking back 30 minutes later to a fully completed deliverable.

For now, I built a RAG-based LLM agent (hereafter called a RAG agent) to handle complex questions and interact with external information sources such as vector databases and the internet. In this tutorial, I introduce you to my RAG agent (with diagrams and full Python implementation), and share an example question that is answered correctly by my agent and incorrectly by ChatGPT 4o.

Designing a RAG agent

Imagine you are designing a RAG system to handle complex, multi-step questions.

We will use the following example question, which was developed by my brilliant colleague Deniz Askin, which he often uses as a “litmus test” for assessing new AI systems:

  • Which David Fincher film that stars Edward Norton does not star Brad Pitt?

The correct answer to this question is “None,” and most AI systems cannot get it right. Here is the incorrect response from the GPT-4o on August 2, 2024:

Screenshot of GPT-4o answering the question posed

This answer is completely false. ChatGPT 4o is trying to answer this complex, multi-hop, logical question in one feed-forward pass of a neural network. Current autoregressive LLMs are not designed to do this effectively.

Let’s investigate an alternate approach using a RAG agent.

For a human to answer this question, we can break it down into the following sub-questions, and answer them sequentially:

  1. Which David Fincher films star Edward Norton?
  2. Which David Fincher films star Brad Pitt?
  3. Which of the David Fincher films that star Edward Norton do not star Brad Pitt?

This sequential process is illustrated in the following diagram, where the blue circle nodes are LLM-based tasks, and the purple circle nodes consist of information retrieval and LLM tasks for in-context learning (that is, RAG). The blue circle nodes (query decomposition and answer consolidation) are examples of the glue tasks that I mentioned earlier.

Diagram showing a query decomposition and consolidation architecture of a RAG system

Let’s dive deeper into one of the RAG nodes by trying to answer the first sub-question, “Which David Fincher films star Edward Norton?”

Perhaps we have a vector database containing IMDB information about David Fincher, or maybe we don't have that vector database and instead need to find this information on the internet.

Rather than using a naive one-shot RAG approach, I implemented a more robust RAG architecture inspired by the Corrective RAG work. This algorithm goes as follows:

  1. Perform retrieval against a vector database using the sub-query “Which David Fincher films star Edward Norton?"
  2. Use an LLM to grade the relevance of each document retrieved.
  3. If all documents retrieved are relevant, generate an answer to the question.
  4. If any of the retrieved documents are irrelevant, store only the relevant ones, and query the internet to retrieve additional information. Generate a response using the retrieved documents and/or internet information.

This process is illustrated in the following diagram, where blue circle nodes symbolize LLM-based tasks, green circle nodes describe information-retrieval tasks, and the orange diamond is a logical decision edge. For this example question, each corrective RAG invocation will route to the internet because I do not have a vector database containing information on David Fincher films.

Diagram showing a corrective architecture of a RAG system

Remember, this diagram only represents the process for handling one of three subqueries (“Which David Fincher films star Edward Norton?”).

Now, lets illustrate the entire algorithm, which you could extend to handle n sub-queries to handle even more complex questions.

Diagram showing the architecture of a RAG agent

Implementing my RAG agent

So, how do we build this type of system?

We can easily build granular, multi-step LLM applications (that is, LLM agents) by formulating the system as a cyclical graph using the LangGraph framework. We can connect graph nodes to external tools, such as web search, vector databases, calculators, and so on, by using LangChain tool integrations.

What follows is my Python implementation.

First, we import the libraries.

!pip install ibm-watsonx-ai
!pip install langchain
!pip install langchain-community
!pip install langchain-ibm
!pip install langgraph
!pip install elasticsearch
!pip install langchain-elasticsearch
!pip install elastic-transport
!pip install dotenv

Then, we assign the API keys.

import os
#watsonx.ai api keys
project_id = os.getenv("GENAI_PROJECT_ID")
api_key = os.getenv("GENAI_KEY")

#elasticsearch api keys
url = os.getenv("WXD_URL", None)
username= os.getenv("WXD_USERNAME", None)
password = os.getenv("WXD_PASSWORD", None)

credentials = {
    "url": "https://us-south.ml.cloud.ibm.com",
    "apikey": api_key}

Then, we load a vector database (let’s assume the vectorDB is already built).

from elasticsearch import Elasticsearch
from langchain_ibm import WatsonxEmbeddings, WatsonxLLM
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenTextParamsMetaNames
from langchain_elasticsearch import ElasticsearchStore

#initialize elasticsearch client
es_client = Elasticsearch(
    url,
    basic_auth=(username,password),
    verify_certs=False,
    request_timeout=3600)

#define embeddings model
embeddings = WatsonxEmbeddings(model_id='ibm/slate-125m-english-rtrvr',
                               apikey=credentials.get('apikey'),
                               url=credentials.get('url'),
                               project_id=project_id)
#load existing vectorstore
vectorstore = ElasticsearchStore(
    es_connection=es_client,
    embedding = embeddings,
    index_name="<insert index name>")

Next, we need to define the Retriever.

from langchain.schema import Document
retriever = vectorstore.as_retriever()

And then define the Tavily Web Search tool.

from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults()

We need to choose an LLM. We use IBM's watsonx.ai - a hybrid, multi-cloud AI and data platform, bringing together generative AI capabilities powered by a suite of foundation models. Let's use Meta's llama-3.1-405b model, which is available on watsonx.ai...however, feel free to experiment with other generative foundation models.

llm = WatsonxLLM(
    model_id = "meta-llama/llama-3-405b-instruct",
    url=credentials.get("url"),
    apikey=credentials.get("apikey"),
    project_id=project_id,
    params = {  GenTextParamsMetaNames.DECODING_METHOD: "greedy",
                GenTextParamsMetaNames.MAX_NEW_TOKENS: 200,
                GenTextParamsMetaNames.MIN_NEW_TOKENS: 10})

Next, we define a RAG chain using LangChain.

from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>You are an assistant for question-answering tasks.
    {{Below is some context from different sources followed by a user's question. Please answer the question based on the context.

    Documents: {documents}}} <|eot_id|><|start_header_id|>user<|end_header_id|>

    {{ Question: {question} }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>

    Answer:
    """,
    input_variables=["question", "documents"],
)

#define rag chain
rag_chain = prompt | llm | StrOutputParser()

Then, we define the Retrieval Grader chain.

from langchain_core.output_parsers import JsonOutputParser

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
    You are a grader assessing relevance of a retrieved document to a user question. \n
    Here is the retrieved document: \n\n {document} \n\n

    <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the user question: {question} \n

    Give a binary score 'yes' or 'no' to indicate whether the answer is useful to resolve a question. \n
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

#define retieval grader chain
retrieval_grader = prompt | llm | JsonOutputParser()

Then, we define Query Decomposition chain.

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
    You are an assistant for question-answering tasks.
    Perform query decomposition. Given a user question, break it down into distinct sub questions that \
    you need to answer in order to answer the original question. Response with \"The question needs no decomposition\" when no decomposition is needed.
    Generate questions that explicitly mention the subject by name, avoiding pronouns like 'these,' 'they,' 'he,' 'she,' 'it,' etc. Each question should clearly state the subject to ensure no ambiguity.

    Example 1:
    Question: Is Hamlet more common on IMDB than Comedy of Errors?
    Decompositions:
    How many listings of Hamlet are there on IMDB?
    How many listing of Comedy of Errors is there on IMDB?

    Example 2:
    Question: What is the Capital city of Japan?
    Decompositions:
    The question needs no decomposition

    <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {user_query} <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    Decompositions:"
    """,
    input_variables=["user_query"],
)

#define query decomposition chain
query_decompose = prompt | llm | StrOutputParser()

Now, we're getting to the good stuff. Let's define our Graph class and the 'nodes' in our corrective RAG graph. These 'nodes', defined as python functions, correspond to the blocks in the diagrams at the beggining of this tutorial.

from typing_extensions import TypedDict, List
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph

class GraphState(TypedDict):
    """
    Represents the state of our graph.
    Attributes:
        question: question to be used as input in LLM chain
        generation: LLM generation response
        search: "yes" or "no" string acting as boolean for whether to invoke web search
        documents: list of documents for in-context learning
        steps: List of steps taken in agent flow
        user_query: original user query, stored here for persistence during consolidation stage
        sub_answers: list of answers to decomposed questions
    """
    question: str
    generation: str
    search: str
    documents: List[str]
    steps: List[str]
    user_query: str
    sub_answers: List[str]
    sub_questions: List[str]

def retrieve(state):
    """
    Retrieve documents
    This is the first Node invoked in the CRAG_graph

    # CRAG_graph is invoked in the CRAG_loop node:
    #response = CRAG_graph.invoke({"question": q, "steps": steps})["generation"]
    #we initialize the state with a sub-question and list of steps

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---Retrieving Documents---")
    """-----------inputs-----------"""
    question = state["question"]
    steps = state["steps"]

    """-----------actions-----------"""
    steps.append("retrieve_documents")
    documents = retriever.invoke(question)

    """-----------outputs-----------"""
    return {
        "documents": documents,
        "question": question,
        "steps": steps
    }

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question. Store all relevant documents to the documents dictionary. 
    However, if there is even one irrelevant document, then websearch will be invoked.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """
    print("---Grading Retrieved Documents---")
    """-----------inputs-----------"""
    documents = state["documents"]
    question = state["question"]
    steps = state["steps"]

    """-----------actions-----------"""
    steps.append("grade_document_retrieval")
    relevant_docs = []
    search = "No"

    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score["score"]
        if grade == "yes":
            relevant_docs.append(d)
        else:
            search = "Yes"
            continue
    """-----------outputs-----------"""
    return {
        "documents": relevant_docs,
        "question": question,
        "search": search,
        "steps": steps,
    }

def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """
    print("---At decision Edge---")
    """-----------inputs-----------"""
    search = state["search"]

    """-----------actions & outputs-----------"""
    if search == "Yes":
        return "search"
    else:
        return "generate"

def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """
    print("---Searching the Web---")
    """-----------inputs-----------"""
    documents = state.get("documents", [])
    question = state["question"]
    steps = state["steps"]

    """-----------actions-----------"""
    steps.append("web_search")
    web_results = web_search_tool.invoke({"query": question})
    documents.extend(
        [
            Document(page_content=d["content"], metadata={"url": d["url"]})
            for d in web_results
        ]
    )
    """-----------outputs-----------"""
    return {
        "documents": documents, 
        "question": question, 
        "steps": steps
    }

def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---Generating Response---")
    """-----------inputs-----------"""
    documents = state["documents"]
    question = state["question"]
    steps = state["steps"]

    """-----------actions-----------"""
    steps.append("generating sub-answer")
    generation = rag_chain.invoke({"documents": documents, "question": question})
    print("Response to subquestion:", generation)

    """-----------outputs-----------"""
    return {
        "documents": documents,
        "question": question,
        "generation": generation,
        "steps": steps,
    }

Next, we need to define and compile our corrective RAG graph.

# intialize Graph
CRAG = StateGraph(GraphState)

# Define the nodes
CRAG.add_node("retrieve", retrieve)  # retrieve
CRAG.add_node("grade_documents", grade_documents) # grade documents
CRAG.add_node("generate", generate)  # generatae
CRAG.add_node("web_search", web_search)  # web search

# Build graph
CRAG.set_entry_point("retrieve")
CRAG.add_edge("retrieve", "grade_documents")
CRAG.add_conditional_edges(
    "grade_documents",  #at grade_documents node, invoke decide_to_generate function
    decide_to_generate,
    {
        "search": "web_search", #if "search" is returned, invoke the "web_search" node
        "generate": "generate", #if "generate" is returned, invoke the "generate" node
    },
)
CRAG.add_edge("web_search", "generate")
CRAG.add_edge("generate", END)

CRAG_graph = CRAG.compile()

display(Image(CRAG_graph.get_graph(xray=True).draw_mermaid_png()))

We can visualize the compiled CRAG_graph instance using Python, as seen in the following image.

Flow chart of corrective RAG graph

Next, we need to define the nodes in our Query Decomposition and Consolidation graph.

def transform_query(state: dict) -> dict:
    """
    Transform the user_query to produce a list of simple questions.
    This is the first node invoked in the graph, with input user question and empty steps list
    response = agentic_rag.invoke({"user_query": question3, "steps": []})

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a list of re-phrased question
    """
    """-----------inputs-----------"""
    user_query = state["user_query"]
    steps = state["steps"]
    print("User Query:", user_query)
    print("---Decomposing the QUERY---")

    """-----------actions-----------"""
    steps.append("transform_query")
    # Re-write question
    sub_questions = query_decompose.invoke({"user_query": user_query})

    #parse sub questions as a list
    list_of_questions = [question.strip() for question in sub_questions.strip().split('\n')]

    if list_of_questions[0] == 'The question needs no decomposition':
        #no query decomposition required
        #return question field as list
        """-----------outputs-----------"""
        return {
            "sub_questions": [user_query], 
            "steps": steps, 
            "user_query": user_query
        }
    else:
        print("Decomposed into the following queries:", list_of_questions)
        return {
            "sub_questions": list_of_questions, 
            "steps": steps, 
            "user_query": user_query
        }

def CRAG_loop(state: dict) -> dict:
    """
    Determines whether to invoke CRAG graph call.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """
    """-----------inputs-----------"""
    questions = state["sub_questions"] #list of questions
    steps = state["steps"]
    user_query = state["user_query"]

    """-----------actions-----------"""
    sub_answers =[]
    steps.append("entering iterative CRAG for sub questions")

    #loop through list of decomposed questions
    for q in questions:
        print("Handling subquestion:", q)
        #enters beggining of CRAG graph -- retrieve node with the following state (question, step)
        response = CRAG_graph.invoke({"question": q, "steps": steps})["generation"]
        sub_answers.append(response)

    """-----------outputs-----------"""
    return {
            "sub_answers": sub_answers,
            "sub_questions": questions,
            "user_query": user_query
        }

def consolidate(state: dict) -> dict:
    """
    Generate consolidated final answer to the original question, given 1. the original question and 2. the sub_questions with corresponding sub_answers

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---Consolidating Response---")
    """-----------inputs-----------"""
    answers = state['sub_answers']
    questions = state['sub_questions']
    user_query = state['user_query']

    """-----------actions-----------"""
    steps = state["steps"]
    steps.append("generating final answer")
    qa_pairs = []

    #create a list of the decomposed questions with their corresponding answers
    #this intermediary information is used as context to answer the original user_query via in-context learning / RAG approach
    for i in range(min(len(questions), len(answers))):
        qa_pairs.append({questions[i]: answers[i].strip()})
    print("multi hop context", qa_pairs)
    final_response = rag_chain.invoke({"documents": qa_pairs, "question": user_query})
    print("Final Response to Original Query:", final_response)

    """-----------outputs-----------"""
    return {
        "user_query": user_query,
        "final_response": final_response,
        "steps": steps,
        "intermediate_qa": qa_pairs,
    }

Now, let's compile the Query Decomposition and Consolidation graph.

nested_CRAG = StateGraph(GraphState)
nested_CRAG.add_node("transform_query", transform_query)  # retrieve
nested_CRAG.add_node("CRAG_loop",CRAG_loop)
nested_CRAG.add_node("consolidate",consolidate)
nested_CRAG.set_entry_point("transform_query")
nested_CRAG.add_edge("transform_query", "CRAG_loop")
nested_CRAG.add_edge("CRAG_loop", "consolidate")
nested_CRAG.add_edge("consolidate", END)

agentic_rag = nested_CRAG.compile()

display(Image(agentic_rag.get_graph(xray=True).draw_mermaid_png()))

We can visualize the compiled agentic_rag graph instance using python, as seen in the following image.

Flow chart of query decomposition & consolidationra RAG graph

Finally, let's test our RAG agent!

question = "Which David Fincher film that stars Edward Norton does not star Brad Pitt?"

response = agentic_rag.invoke({"user_query": question, "steps": []})

Here is sample output:

agent output

Summary and next steps

In this tutorial, we demonstrated the value of building customized, modular agentic AI systems as opposed to generic systems like ChatGPT 4o.

Explore more RAG tutorials on IBM Developer, in particular how to build a RAG application with watsonx.ai flows engine.

If you are passionate about LLMs, RAG, agents, or similar topics, feel free to contact me at dean.sacoransky@ibm.com.