Implementation of RAG Fusion using LangChain, Qdrant, and Athina

Implementation of RAG Fusion using LangChain, Qdrant, and Athina

Retrieval-augmented generation (RAG) improves large language models (LLMs) by integrating external data, enhancing the relevance and accuracy of outputs. Instead of relying solely on pre-trained knowledge, RAG fetches and uses information from external sources like vector databases, making it ideal for domain-specific or up-to-date tasks. However, traditional RAG has limitations, which RAG Fusion addresses by improving retrieval and ranking methods.

RAG Fusion is a retrieval-augmented generation technique that enhances traditional RAG by generating sub-queries from an input query. These sub-queries help explore different aspects of the query, retrieve diverse documents, and rank them using Reciprocal Rank Fusion (RRF) to ensure relevance and accuracy.

How Does RAG Fusion Work?

When a query is received, the model generates related sub-queries using a large language model (LLM). These sub-queries improve document retrieval by exploring different aspects of the query. After retrieving documents, RAG Fusion uses Reciprocal Rank Fusion (RRF) to score and reorder them based on relevance. The highest-ranked documents are then passed to the model to generate an accurate and well-informed response.

RAG Fusion Over Traditional RAG

Traditional RAG has limitations when dealing with complex queries. It primarily relies on a single query to retrieve documents, which may not adequately address all aspects of the user's intent. Additionally, traditional ranking mechanisms may not always surface the most relevant information for nuanced queries. These challenges can limit its effectiveness in scenarios requiring diverse and contextually rich responses.

RAG Fusion addresses these issues by generating sub-queries, using RRF for result aggregation, and efficiently handling large datasets. It ensures improved diversity, accurate ranking, and scalable performance for complex queries.

Now that we understand the limitations of traditional RAG and how RAG fusion solves these problems, it's time to move on to the implementation part.

Step-by-Step Implementation

Implementing RAG-Fusion involves setting up a pipeline that integrates several tools and libraries. Below is a straightforward code walkthrough showing how to build a RAG-Fusion pipeline using LangChain, Qdrant, and Athina for evaluation.

For the complete code and step-by-step guidance, check out this Google Colab Notebook. Also, if you are interested in learning advanced RAG techniques, check out the GitHub repository we created to support developers and researchers.

1. Initial Setup

Start by installing the necessary libraries and setting up your environment variables to handle API keys securely.

!pip install --q athina langsmith
import os
from google.colab import userdata

os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
os.environ['ATHINA_API_KEY'] = userdata.get('ATHINA_API_KEY')
os.environ['QDRANT_API_KEY'] = userdata.get('QDRANT_API_KEY')

2. Data Indexing

Load and process your data to prepare it for vector-based retrieval.

from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()

from langchain.document_loaders import CSVLoader
loader = CSVLoader("./context.csv")
documents = loader.load()

from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
documents = text_splitter.split_documents(documents)

3. Vector Database with Qdrant

Set up a vector database using Qdrant to store and manage your document embeddings.

from langchain_community.vectorstores import Qdrant

vectorstore = Qdrant.from_documents(
    documents,
    embeddings,
    url="your_qdrant_url",
    prefer_grpc=True,
    collection_name="documents",
    api_key=os.environ["QDRANT_API_KEY"],
)

4. Retrieval Setup

Configure the retriever to retrieve information from the vector database.

retriever = vectorstore.as_retriever()

5. RRF Chain Creation

Create the RRF logic to aggregate and rerank documents based on multiple sub-queries.

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langsmith import Client

llm = ChatOpenAI()

client = Client()
prompt = client.pull_prompt("langchain-ai/rag-fusion-query-generation")


generate_queries = (
    prompt 
    | ChatOpenAI(temperature=0) 
    | StrOutputParser() 
    | (lambda x: x.split("\n"))
)


from langchain.load import dumps, loads

def reciprocal_rank_fusion(results: list[list], k=60):
    fused_scores = {}
    for docs in results:
        for rank, doc in enumerate(docs):
            doc_str = dumps(doc)
            fused_scores.setdefault(doc_str, 0)
            fused_scores[doc_str] += 1 / (rank + k)

    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]
    return reranked_results

chain = generate_queries | retriever.map() | reciprocal_rank_fusion

6. Assemble the RAG-Fusion Pipeline

Integrate all components to form the complete RAG-Fusion pipeline.

from langchain.schema.runnable import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate

template = """Answer the question based only on the following context.
If you don't find the answer in the context, just say that you don't know.

Context: {context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

rag_fusion_chain = (
    {
        "context": chain,
        "question": RunnablePassthrough()
    }
    | prompt
    | llm
    | StrOutputParser()
)

response = rag_fusion_chain.invoke("Tell me about the benefits of MEMS microphones")
print(response)

7. Evaluation with Athina

Once your RAG pipeline is set up, you can evaluate its performance using Athina AI. This step is optional but helpful for testing and validating your pipeline. Athina AI provides automated tools to measure accuracy and ensure the pipeline meets your requirements.

First, prepare your data by generating queries, capturing pipeline responses, and organise the context for each query:

import pandas as pd
from datasets import Dataset

question = ["what are points on a mortgage"]
response = []
contexts = []
ground_truths = ["Points, sometimes also called a 'discount point', are a form of pre-paid interest."]


for query in question:
  response.append(rag_fusion_chain.invoke(query))
  contexts.append([docs.page_content for docs in retriever.get_relevant_documents(query)])


data = {
    "query": question,
    "response": response,
    "context": contexts,
    "ground_truth": ground_truths
}

dataset = Dataset.from_dict(data)
df = pd.DataFrame(dataset)

df_dict = df.to_dict(orient='records')

for record in df_dict:
    if not isinstance(record.get('context'), list):
        if record.get('context') is None:
            record['context'] = []
        else:
            record['context'] = [record['context']]

Now, setup API keys for both OpenAI and Athina

from athina.keys import AthinaApiKey, OpenAiApiKey
OpenAiApiKey.set_key(os.getenv('OPENAI_API_KEY'))
AthinaApiKey.set_key(os.getenv('ATHINA_API_KEY'))

Then, use the Loader class from Athina to load your dataset in dictionary format:

from athina.loaders import Loader
dataset = Loader().load_dict(df_dict)

Finally, run the AnswerRelevancyQuery evaluation metric. It Measures how pertinent the generated response is to the given prompt.

To learn more about this. Please refer to the Athina AI documentation page for further details.
from athina.evals import RagasAnswerRelevancy
RagasAnswerRelevancy(model="gpt-4o").run_batch(data=dataset).to_df()

The results will be converted into a data frame, and you can click on the generated link to open the Athina IDE, where you can explore detailed evaluation results and refine your pipeline further.

Conclusion

RAG Fusion improves traditional retrieval-augmented generation systems by using sub-query generation and Reciprocal Rank Fusion (RRF). These enhancements make it better at handling complex queries and retrieving data from large datasets. By combining smarter retrieval methods with effective ranking, it provides accurate and diverse responses.

This guide outlines how to implement RAG Fusion with LangChain, Qdrant, and Athina, making it easier to adopt for various applications. RAG Fusion is a scalable and efficient solution to enhance retrieval-based workflows.

I hope this guide helps you implement and refine your RAG workflows. If you want to explore more advanced RAG techniques, check out the GitHub Repository.

Read more