# https://github.com/langchain-ai/langchain/issues/8623 | |
import pandas as pd | |
from langchain_core.retrievers import BaseRetriever | |
from langchain_core.vectorstores import VectorStoreRetriever | |
from langchain_core.documents.base import Document | |
from langchain_core.vectorstores import VectorStore | |
from langchain_core.callbacks.manager import CallbackManagerForRetrieverRun | |
from typing import List | |
from pydantic import Field | |
class ClimateQARetriever(BaseRetriever): | |
vectorstore:VectorStore | |
sources:list = ["IPCC","IPBES","IPOS"] | |
reports:list = [] | |
threshold:float = 0.6 | |
k_summary:int = 3 | |
k_total:int = 10 | |
namespace:str = "vectors", | |
min_size:int = 200, | |
def _get_relevant_documents( | |
self, query: str, *, run_manager: CallbackManagerForRetrieverRun | |
) -> List[Document]: | |
# Check if all elements in the list are either IPCC or IPBES | |
assert isinstance(self.sources,list) | |
assert all([x in ["IPCC","IPBES","IPOS"] for x in self.sources]) | |
assert self.k_total > self.k_summary, "k_total should be greater than k_summary" | |
# Prepare base search kwargs | |
filters = {} | |
if len(self.reports) > 0: | |
filters["short_name"] = {"$in":self.reports} | |
else: | |
filters["source"] = { "$in":self.sources} | |
# Search for k_summary documents in the summaries dataset | |
filters_summaries = { | |
**filters, | |
"report_type": { "$in":["SPM"]}, | |
} | |
docs_summaries = self.vectorstore.similarity_search_with_score(query=query,filter = filters_summaries,k = self.k_summary) | |
docs_summaries = [x for x in docs_summaries if x[1] > self.threshold] | |
# Search for k_total - k_summary documents in the full reports dataset | |
filters_full = { | |
**filters, | |
"report_type": { "$nin":["SPM"]}, | |
} | |
k_full = self.k_total - len(docs_summaries) | |
docs_full = self.vectorstore.similarity_search_with_score(query=query,filter = filters_full,k = k_full) | |
# Concatenate documents | |
docs = docs_summaries + docs_full | |
# Filter if scores are below threshold | |
docs = [x for x in docs if len(x[0].page_content) > self.min_size] | |
# docs = [x for x in docs if x[1] > self.threshold] | |
# Add score to metadata | |
results = [] | |
for i,(doc,score) in enumerate(docs): | |
doc.metadata["similarity_score"] = score | |
doc.metadata["content"] = doc.page_content | |
doc.metadata["page_number"] = int(doc.metadata["page_number"]) + 1 | |
# doc.page_content = f"""Doc {i+1} - {doc.metadata['short_name']}: {doc.page_content}""" | |
results.append(doc) | |
# Sort by score | |
# results = sorted(results,key = lambda x : x.metadata["similarity_score"],reverse = True) | |
return results | |
# def filter_summaries(df,k_summary = 3,k_total = 10): | |
# # assert source in ["IPCC","IPBES","ALL"], "source arg should be in (IPCC,IPBES,ALL)" | |
# # # Filter by source | |
# # if source == "IPCC": | |
# # df = df.loc[df["source"]=="IPCC"] | |
# # elif source == "IPBES": | |
# # df = df.loc[df["source"]=="IPBES"] | |
# # else: | |
# # pass | |
# # Separate summaries and full reports | |
# df_summaries = df.loc[df["report_type"].isin(["SPM","TS"])] | |
# df_full = df.loc[~df["report_type"].isin(["SPM","TS"])] | |
# # Find passages from summaries dataset | |
# passages_summaries = df_summaries.head(k_summary) | |
# # Find passages from full reports dataset | |
# passages_fullreports = df_full.head(k_total - len(passages_summaries)) | |
# # Concatenate passages | |
# passages = pd.concat([passages_summaries,passages_fullreports],axis = 0,ignore_index = True) | |
# return passages | |
# def retrieve_with_summaries(query,retriever,k_summary = 3,k_total = 10,sources = ["IPCC","IPBES"],max_k = 100,threshold = 0.555,as_dict = True,min_length = 300): | |
# assert max_k > k_total | |
# validated_sources = ["IPCC","IPBES"] | |
# sources = [x for x in sources if x in validated_sources] | |
# filters = { | |
# "source": { "$in": sources }, | |
# } | |
# print(filters) | |
# # Retrieve documents | |
# docs = retriever.retrieve(query,top_k = max_k,filters = filters) | |
# # Filter by score | |
# docs = [{**x.meta,"score":x.score,"content":x.content} for x in docs if x.score > threshold] | |
# if len(docs) == 0: | |
# return [] | |
# res = pd.DataFrame(docs) | |
# passages_df = filter_summaries(res,k_summary,k_total) | |
# if as_dict: | |
# contents = passages_df["content"].tolist() | |
# meta = passages_df.drop(columns = ["content"]).to_dict(orient = "records") | |
# passages = [] | |
# for i in range(len(contents)): | |
# passages.append({"content":contents[i],"meta":meta[i]}) | |
# return passages | |
# else: | |
# return passages_df | |
# def retrieve(query,sources = ["IPCC"],threshold = 0.555,k = 10): | |
# print("hellooooo") | |
# # Reformulate queries | |
# reformulated_query,language = reformulate(query) | |
# print(reformulated_query) | |
# # Retrieve documents | |
# passages = retrieve_with_summaries(reformulated_query,retriever,k_total = k,k_summary = 3,as_dict = True,sources = sources,threshold = threshold) | |
# response = { | |
# "query":query, | |
# "reformulated_query":reformulated_query, | |
# "language":language, | |
# "sources":passages, | |
# "prompts":{"init_prompt":init_prompt,"sources_prompt":sources_prompt}, | |
# } | |
# return response | |