AI developers are focussed on building the next frontier of AI systems called LLM agents (LLM-based AI agents). These systems will likely automate long-horizon tasks, whereas current AI systems mainly handle simple questions and short-horizon tasks.
An example of a long-horizon task could be a typical 30-minute ChatGPT session, where you ask several questions with follow-ups, copy and paste good responses elsewhere (for example, a spreadsheet, document, or IDE), ignore the bad responses, synthesize and edit the responses, use the internet to fill in the gaps, plot some data, draw a diagram, and so on.
These intermediate tasks (lets call them "glue tasks"), which you perform to piece together LLM outputs, are fully tractable with today’s best AI models.
In areas where LLMs struggle (such as math), we can now connect to external tools (for example, a calculator) via APIs. The newest class of LLMs (such as Llama 3.1) were deliberately aligned for tool-calling skills to enable an ecosystem of agentic application development.
At work, we are rarely asked to produce a deliverable instantly and without any external help (the way ChatGPT does). Instead, we usually seek feedback, collaborate with colleagues, make iterative edits, use various tools, and reflect on our work before submitting. LLM agent systems embrace this same core philosophy.
In the long term, these LLM agents might look like AI colleagues that you interact with via Slack. Imagine delegating work to your AI colleague via Slack, jumping on a meeting, and then checking back 30 minutes later to a fully completed deliverable.
For now, I built a RAG-based LLM agent (hereafter called a RAG agent) to handle complex questions and interact with external information sources such as vector databases and the internet. In this tutorial, I introduce you to my RAG agent (with diagrams and full Python implementation), and share an example question that is answered correctly by my agent and incorrectly by ChatGPT 4o.
Designing a RAG agent
Imagine you are designing a RAG system to handle complex, multi-step questions.
We will use the following example question, which was developed by my brilliant colleague Deniz Askin, which he often uses as a “litmus test” for assessing new AI systems:
Which David Fincher film that stars Edward Norton does not star Brad Pitt?
The correct answer to this question is “None,” and most AI systems cannot get it right. Here is the incorrect response from the GPT-4o on August 2, 2024:
This answer is completely false. ChatGPT 4o is trying to answer this complex, multi-hop, logical question in one feed-forward pass of a neural network. Current autoregressive LLMs are not designed to do this effectively.
Let’s investigate an alternate approach using a RAG agent.
For a human to answer this question, we can break it down into the following sub-questions, and answer them sequentially:
Which David Fincher films star Edward Norton?
Which David Fincher films star Brad Pitt?
Which of the David Fincher films that star Edward Norton do not star Brad Pitt?
This sequential process is illustrated in the following diagram, where the blue circle nodes are LLM-based tasks, and the purple circle nodes consist of information retrieval and LLM tasks for in-context learning (that is, RAG). The blue circle nodes (query decomposition and answer consolidation) are examples of the glue tasks that I mentioned earlier.
Let’s dive deeper into one of the RAG nodes by trying to answer the first sub-question, “Which David Fincher films star Edward Norton?”
Perhaps we have a vector database containing IMDB information about David Fincher, or maybe we don't have that vector database and instead need to find this information on the internet.
Rather than using a naive one-shot RAG approach, I implemented a more robust RAG architecture inspired by the Corrective RAG work. This algorithm goes as follows:
Perform retrieval against a vector database using the sub-query “Which David Fincher films star Edward Norton?"
Use an LLM to grade the relevance of each document retrieved.
If all documents retrieved are relevant, generate an answer to the question.
If any of the retrieved documents are irrelevant, store only the relevant ones, and query the internet to retrieve additional information. Generate a response using the retrieved documents and/or internet information.
This process is illustrated in the following diagram, where blue circle nodes symbolize LLM-based tasks, green circle nodes describe information-retrieval tasks, and the orange diamond is a logical decision edge. For this example question, each corrective RAG invocation will route to the internet because I do not have a vector database containing information on David Fincher films.
Remember, this diagram only represents the process for handling one of three subqueries (“Which David Fincher films star Edward Norton?”).
Now, lets illustrate the entire algorithm, which you could extend to handle n sub-queries to handle even more complex questions.
Implementing my RAG agent
So, how do we build this type of system?
We can easily build granular, multi-step LLM applications (that is, LLM agents) by formulating the system as a cyclical graph using the LangGraph framework. We can connect graph nodes to external tools, such as web search, vector databases, calculators, and so on, by using LangChain tool integrations.
Then, we load a vector database (let’s assume the vectorDB is already built).
from elasticsearch import Elasticsearch
from langchain_ibm import WatsonxEmbeddings, WatsonxLLM
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenTextParamsMetaNames
from langchain_elasticsearch import ElasticsearchStore
#initialize elasticsearch client
es_client = Elasticsearch(
url,
basic_auth=(username,password),
verify_certs=False,
request_timeout=3600)
#define embeddings model
embeddings = WatsonxEmbeddings(model_id='ibm/slate-125m-english-rtrvr',
apikey=credentials.get('apikey'),
url=credentials.get('url'),
project_id=project_id)
#load existing vectorstore
vectorstore = ElasticsearchStore(
es_connection=es_client,
embedding = embeddings,
index_name="<insert index name>")
Show more
Next, we need to define the Retriever.
from langchain.schemaimport Document
retriever = vectorstore.as_retriever()
Show more
And then define the Tavily Web Search tool.
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults()
Show more
We need to choose an LLM. We use IBM's watsonx.ai - a hybrid, multi-cloud AI and data platform, bringing together generative AI capabilities powered by a suite of foundation models. Let's use Meta's llama-3.1-405b model, which is available on watsonx.ai...however, feel free to experiment with other generative foundation models.
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>You are an assistant for question-answering tasks.
{{Below is some context from different sources followed by a user's question. Please answer the question based on the context.
Documents: {documents}}} <|eot_id|><|start_header_id|>user<|end_header_id|>
{{ Question: {question} }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Answer:
""",
input_variables=["question", "documents"],
)
#define rag chain
rag_chain = prompt | llm | StrOutputParser()
Show more
Then, we define the Retrieval Grader chain.
from langchain_core.output_parsers import JsonOutputParser
prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a grader assessing relevance of a retrieved document to a user question. \n
Here is the retrieved document: \n\n {document} \n\n
<|eot_id|><|start_header_id|>user<|end_header_id|>
Here is the user question: {question} \n
Give a binary score 'yes' or 'no' to indicate whether the answer is useful to resolve a question. \n
Provide the binary score as a JSON with a single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
input_variables=["question", "document"],
)
#define retieval grader chain
retrieval_grader = prompt | llm | JsonOutputParser()
Show more
Then, we define Query Decomposition chain.
prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are an assistant for question-answering tasks.
Perform query decomposition. Given a user question, break it down into distinct sub questions that \
you need to answer in order to answer the original question. Response with \"The question needs no decomposition\" when no decomposition is needed.
Generate questions that explicitly mention the subject by name, avoiding pronouns like 'these,''they,''he,''she,''it,' etc. Each question should clearly state the subject to ensure no ambiguity.
Example 1:
Question: Is Hamlet more common on IMDB than Comedy of Errors?
Decompositions:
How many listings of Hamlet are there on IMDB?
How many listing of Comedy of Errors is there on IMDB?
Example 2:
Question: What is the Capital city of Japan?
Decompositions:
The question needs no decomposition
<|eot_id|><|start_header_id|>user<|end_header_id|>
Question: {user_query} <|eot_id|><|start_header_id|>assistant<|end_header_id|>
Decompositions:"""",
input_variables=["user_query"],
)
#define query decomposition chain
query_decompose = prompt | llm | StrOutputParser()
Show more
Now, we're getting to the good stuff. Let's define our Graph class and the 'nodes' in our corrective RAG graph. These 'nodes', defined as python functions, correspond to the blocks in the diagrams at the beggining of this tutorial.
from typing_extensions import TypedDict, Listfrom IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
classGraphState(TypedDict):
"""
Represents the state of our graph.
Attributes:
question: question to be used as input in LLM chain
generation: LLM generation response
search: "yes" or "no" string acting as boolean for whether to invoke web search
documents: list of documents for in-context learning
steps: List of steps taken in agent flow
user_query: original user query, stored here for persistence during consolidation stage
sub_answers: list of answers to decomposed questions
"""
question: str
generation: str
search: str
documents: List[str]
steps: List[str]
user_query: str
sub_answers: List[str]
sub_questions: List[str]
defretrieve(state):
"""
Retrieve documents
This is the first Node invoked in the CRAG_graph
# CRAG_graph is invoked in the CRAG_loop node:
#response = CRAG_graph.invoke({"question": q, "steps": steps})["generation"]
#we initialize the state with a sub-question and list of steps
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, documents, that contains retrieved documents
"""print("---Retrieving Documents---")
"""-----------inputs-----------"""
question = state["question"]
steps = state["steps"]
"""-----------actions-----------"""
steps.append("retrieve_documents")
documents = retriever.invoke(question)
"""-----------outputs-----------"""return {
"documents": documents,
"question": question,
"steps": steps
}
defgrade_documents(state):
"""
Determines whether the retrieved documents are relevant to the question. Store all relevant documents to the documents dictionary.
However, if there is even one irrelevant document, then websearch will be invoked.
Args:
state (dict): The current graph state
Returns:
state (dict): Updates documents key with only filtered relevant documents
"""print("---Grading Retrieved Documents---")
"""-----------inputs-----------"""
documents = state["documents"]
question = state["question"]
steps = state["steps"]
"""-----------actions-----------"""
steps.append("grade_document_retrieval")
relevant_docs = []
search = "No"for d in documents:
score = retrieval_grader.invoke(
{"question": question, "document": d.page_content}
)
grade = score["score"]
if grade == "yes":
relevant_docs.append(d)
else:
search = "Yes"continue"""-----------outputs-----------"""return {
"documents": relevant_docs,
"question": question,
"search": search,
"steps": steps,
}
defdecide_to_generate(state):
"""
Determines whether to generate an answer, or re-generate a question.
Args:
state (dict): The current graph state
Returns:
str: Binary decision for next node to call
"""print("---At decision Edge---")
"""-----------inputs-----------"""
search = state["search"]
"""-----------actions & outputs-----------"""if search == "Yes":
return"search"else:
return"generate"defweb_search(state):
"""
Web search based on the re-phrased question.
Args:
state (dict): The current graph state
Returns:
state (dict): Updates documents key with appended web results
"""print("---Searching the Web---")
"""-----------inputs-----------"""
documents = state.get("documents", [])
question = state["question"]
steps = state["steps"]
"""-----------actions-----------"""
steps.append("web_search")
web_results = web_search_tool.invoke({"query": question})
documents.extend(
[
Document(page_content=d["content"], metadata={"url": d["url"]})
for d in web_results
]
)
"""-----------outputs-----------"""return {
"documents": documents,
"question": question,
"steps": steps
}
defgenerate(state):
"""
Generate answer
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, generation, that contains LLM generation
"""print("---Generating Response---")
"""-----------inputs-----------"""
documents = state["documents"]
question = state["question"]
steps = state["steps"]
"""-----------actions-----------"""
steps.append("generating sub-answer")
generation = rag_chain.invoke({"documents": documents, "question": question})
print("Response to subquestion:", generation)
"""-----------outputs-----------"""return {
"documents": documents,
"question": question,
"generation": generation,
"steps": steps,
}
Show more
Next, we need to define and compile our corrective RAG graph.
# intialize Graph
CRAG = StateGraph(GraphState)
# Define the nodes
CRAG.add_node("retrieve", retrieve) # retrieve
CRAG.add_node("grade_documents", grade_documents) # grade documents
CRAG.add_node("generate", generate) # generatae
CRAG.add_node("web_search", web_search) # web search# Build graph
CRAG.set_entry_point("retrieve")
CRAG.add_edge("retrieve", "grade_documents")
CRAG.add_conditional_edges(
"grade_documents", #at grade_documents node, invoke decide_to_generate function
decide_to_generate,
{
"search": "web_search", #if"search" is returned, invoke the "web_search" node"generate": "generate", #if"generate" is returned, invoke the "generate" node
},
)
CRAG.add_edge("web_search", "generate")
CRAG.add_edge("generate", END)
CRAG_graph = CRAG.compile()
display(Image(CRAG_graph.get_graph(xray=True).draw_mermaid_png()))
Show more
We can visualize the compiled CRAG_graph instance using Python, as seen in the following image.
Next, we need to define the nodes in our Query Decomposition and Consolidation graph.
deftransform_query(state: dict) -> dict:
"""
Transform the user_query to produce a list of simple questions.
This is the first node invoked in the graph, with input user question and empty steps list
response = agentic_rag.invoke({"user_query": question3, "steps": []})
Args:
state (dict): The current graph state
Returns:
state (dict): Updates question key with a list of re-phrased question
""""""-----------inputs-----------"""
user_query = state["user_query"]
steps = state["steps"]
print("User Query:", user_query)
print("---Decomposing the QUERY---")
"""-----------actions-----------"""
steps.append("transform_query")
# Re-write question
sub_questions = query_decompose.invoke({"user_query": user_query})
#parse sub questions as a list
list_of_questions = [question.strip() for question in sub_questions.strip().split('\n')]
if list_of_questions[0] == 'The question needs no decomposition':
#no query decomposition required#return question field as list"""-----------outputs-----------"""return {
"sub_questions": [user_query],
"steps": steps,
"user_query": user_query
}
else:
print("Decomposed into the following queries:", list_of_questions)
return {
"sub_questions": list_of_questions,
"steps": steps,
"user_query": user_query
}
defCRAG_loop(state: dict) -> dict:
"""
Determines whether to invoke CRAG graph call.
Args:
state (dict): The current graph state
Returns:
str: Binary decision for next node to call
""""""-----------inputs-----------"""
questions = state["sub_questions"] #list of questions
steps = state["steps"]
user_query = state["user_query"]
"""-----------actions-----------"""
sub_answers =[]
steps.append("entering iterative CRAG for sub questions")
#loop through list of decomposed questionsfor q in questions:
print("Handling subquestion:", q)
#enters beggining of CRAG graph -- retrieve node with the following state (question, step)
response = CRAG_graph.invoke({"question": q, "steps": steps})["generation"]
sub_answers.append(response)
"""-----------outputs-----------"""return {
"sub_answers": sub_answers,
"sub_questions": questions,
"user_query": user_query
}
defconsolidate(state: dict) -> dict:
"""
Generate consolidated final answer to the original question, given 1. the original question and 2. the sub_questions with corresponding sub_answers
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, generation, that contains LLM generation
"""print("---Consolidating Response---")
"""-----------inputs-----------"""
answers = state['sub_answers']
questions = state['sub_questions']
user_query = state['user_query']
"""-----------actions-----------"""
steps = state["steps"]
steps.append("generating final answer")
qa_pairs = []
#create a list of the decomposed questions with their corresponding answers#this intermediary information is used as context to answer the original user_query via in-context learning / RAG approachfor i inrange(min(len(questions), len(answers))):
qa_pairs.append({questions[i]: answers[i].strip()})
print("multi hop context", qa_pairs)
final_response = rag_chain.invoke({"documents": qa_pairs, "question": user_query})
print("Final Response to Original Query:", final_response)
"""-----------outputs-----------"""return {
"user_query": user_query,
"final_response": final_response,
"steps": steps,
"intermediate_qa": qa_pairs,
}
Show more
Now, let's compile the Query Decomposition and Consolidation graph.
We can visualize the compiled agentic_rag graph instance using python, as seen in the following image.
Finally, let's test our RAG agent!
question = "Which David Fincher film that stars Edward Norton does not star Brad Pitt?"response = agentic_rag.invoke({"user_query": question, "steps": []})
Show more
Here is sample output:
Summary and next steps
In this tutorial, we demonstrated the value of building customized, modular agentic AI systems as opposed to generic systems like ChatGPT 4o.