|
import streamlit as st |
|
st.set_page_config(layout="wide") |
|
from annotated_text import annotated_text, annotation |
|
import fitz |
|
import os |
|
import chromadb |
|
import uuid |
|
from pathlib import Path |
|
import os |
|
os.environ['OPENAI_API_KEY'] = os.environ['OPEN_API_KEY'] |
|
st.title("Contracts Multiple File Search ") |
|
import pandas as pd |
|
|
|
from langchain.retrievers import BM25Retriever, EnsembleRetriever |
|
from langchain.schema import Document |
|
from langchain.vectorstores import Chroma |
|
from langchain.embeddings import HuggingFaceEmbeddings |
|
embedding = HuggingFaceEmbeddings(model_name='BAAI/bge-base-en-v1.5') |
|
from FlagEmbedding import FlagReranker |
|
reranker = FlagReranker('BAAI/bge-reranker-base') |
|
import spacy |
|
|
|
nlp = spacy.load("en_core_web_md") |
|
|
|
def util_upload_file_and_return_list_docs(uploaded_files): |
|
|
|
list_docs = [] |
|
list_save_path = [] |
|
for uploaded_file in uploaded_files: |
|
save_path = Path(os.getcwd(), uploaded_file.name) |
|
with open(save_path, mode='wb') as w: |
|
w.write(uploaded_file.getvalue()) |
|
|
|
docs = fitz.open(save_path) |
|
list_docs.append(docs) |
|
list_save_path.append(save_path) |
|
return(list_docs, list_save_path) |
|
|
|
def split_txt_file_synthetic_sentence_rolling(ctxt, sentence_size_in_chars, sliding_size_in_chars,debug=False): |
|
sliding_size_in_chars = sentence_size_in_chars - sliding_size_in_chars |
|
pos_start = 0 |
|
pos_end = len(ctxt) |
|
final_return = [] |
|
if(debug): |
|
print('pos_start : ',pos_start) |
|
print('pos_end : ',pos_end) |
|
if(pos_end<sentence_size_in_chars): |
|
return([{'section_org_text':ctxt[pos_start:pos_end],'section_char_start':pos_start,'section_char_end':pos_end}]) |
|
if(sentence_size_in_chars<sliding_size_in_chars): |
|
return(None) |
|
stop_condition = False |
|
start = pos_start |
|
end = start + sentence_size_in_chars |
|
mydict = {} |
|
mydict['section_org_text'] = ctxt[start:end] |
|
mydict['section_char_start'] = start |
|
mydict['section_char_end'] = end |
|
final_return.append(mydict) |
|
|
|
while(stop_condition==False): |
|
start = end - sliding_size_in_chars |
|
end = start + sentence_size_in_chars |
|
if(end>pos_end): |
|
if(start<pos_end): |
|
end = pos_end |
|
mydict = {} |
|
mydict['section_org_text'] = ctxt[start:end] |
|
mydict['section_char_start'] = start |
|
mydict['section_char_end'] = end |
|
final_return.append(mydict) |
|
stop_condition=True |
|
else: |
|
stop_condition=True |
|
else: |
|
mydict = {} |
|
mydict['section_org_text'] = ctxt[start:end] |
|
mydict['section_char_start'] = start |
|
mydict['section_char_end'] = end |
|
final_return.append(mydict) |
|
if(debug): |
|
print('start : ', start) |
|
print('end : ', end) |
|
return(final_return) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def split_into_sentences_with_offsets(text): |
|
""" |
|
Splits a paragraph into sentences and returns them along with their start and end offsets. |
|
:param text: The input text to be split into sentences. |
|
:return: A list of tuples, each containing a sentence and its start and end offsets. |
|
""" |
|
doc = nlp(text) |
|
return [(sent.text, sent.start_char, sent.end_char) for sent in doc.sents] |
|
|
|
def util_get_list_page_and_passage(list_docs, list_save_path): |
|
|
|
passage_documents = [] |
|
for ind_doc, docs in enumerate(list_docs): |
|
for txt_index, txt_page in enumerate(docs): |
|
page_document = txt_page.get_text() |
|
|
|
sections = split_into_sentences_with_offsets(page_document) |
|
for sub_sub_index, sub_sub_item in enumerate(sections): |
|
sub_text=sub_sub_item[0] |
|
passage_document = Document(page_content=sub_text, metadata={"page_content": page_document,"page_index": txt_index, "file_name" : str(list_save_path[ind_doc])}) |
|
passage_documents.append(passage_document) |
|
return(passage_documents) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def util_get_only_content_inside_loop(page_no,page_documents): |
|
for index, item in enumerate(page_documents): |
|
if(page_documents[index].metadata['txt_page_index']==page_no): |
|
return(page_documents[index].get_content()) |
|
return(None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def util_get_list_pageno_and_contents(some_query_passage,passage_documents,passage_nodes): |
|
''' page no starts with index 1 ''' |
|
|
|
return_value = [] |
|
rescore = reranker.compute_score([[some_query_passage , x.page_content] for x in passage_nodes]) |
|
print('rescore :: ',rescore) |
|
tmp_array = [] |
|
for i, x in enumerate(passage_nodes): |
|
tmp_dict = {"passage_content":x.page_content, |
|
"page_no":int(x.metadata['page_index'])+1, |
|
"page_content":str(x.metadata['page_content']), |
|
"file_name": str(x.metadata['file_name']), |
|
"score" : float(rescore[i])} |
|
tmp_array.append(tmp_dict) |
|
df = pd.DataFrame(tmp_array) |
|
df = df.sort_values(by='score', ascending=False) |
|
df = df.drop_duplicates(subset=['file_name'], keep='first') |
|
df = df[["passage_content","file_name","page_no","page_content","score"]] |
|
return(df) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def util_openai_extract_clause(example_prompt, page_content): |
|
import openai |
|
openai.api_key = os.environ['OPENAI_API_KEY'] |
|
content = example_prompt |
|
content = content + "\n Answer precisely; do not add anything extra, and try to locate the answer in the below context \n context: " |
|
return_value = openai.ChatCompletion.create(model="gpt-3.5-turbo",temperature=0.0001,messages=[{"role": "user", "content": content + "\n" + page_content},]) |
|
return(str(return_value['choices'][0]['message']['content'])) |
|
|
|
|
|
def util_openai_hyde(example_prompt): |
|
import openai |
|
openai.api_key = os.environ['OPENAI_API_KEY'] |
|
content = example_prompt |
|
return_value = openai.ChatCompletion.create(model="gpt-3.5-turbo",temperature=0.0001,messages=[ |
|
{"role": "system", "content": "You are a legal contract lawyer. generate a summary from below text " + "\n"}, |
|
{"role": "user", "content": example_prompt + "\n"}, |
|
|
|
] |
|
) |
|
return(str(return_value['choices'][0]['message']['content'])) |
|
|
|
|
|
def util_openai_format (example_passage, page_content): |
|
''' |
|
annotated_text(" ",annotation("ENTITY : ", str(page_no)),) |
|
''' |
|
if(True): |
|
found_value = util_openai_extract_clause(example_passage, page_content) |
|
if(len(found_value)>0): |
|
found_value = found_value.strip() |
|
first_index = page_content.find(found_value) |
|
if(first_index!=-1): |
|
print('first_index : ',first_index) |
|
print('found_value : ',found_value) |
|
return(annotated_text(page_content[0:first_index-1],annotation(found_value, " FOUND ENTITY "),page_content[first_index+len(found_value):])) |
|
return(annotated_text(page_content)) |
|
def util_openai_modify_prompt(example_prompt, page_content): |
|
import openai |
|
openai.api_key = os.environ['OPENAI_API_KEY'] |
|
my_prompt = """Expand the original Query to show exact resuls for extraction\n |
|
Query: """ + example_prompt |
|
return_value = openai.ChatCompletion.create(model="gpt-4",temperature=0.0001,messages=[{"role": "user", "content": my_prompt},]) |
|
return(str(return_value['choices'][0]['message']['content'])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
passage_documents = [] |
|
|
|
if(True): |
|
with st.form("my_form"): |
|
multi = '''1. Download and Upload Multiple contracts (PDF) |
|
|
|
e.g. https://www.barc.gov.in/tenders/GCC-LPS.pdf |
|
|
|
e.g. https://www.montrosecounty.net/DocumentCenter/View/823/Sample-Construction-Contract |
|
''' |
|
st.markdown(multi) |
|
multi = '''2. Insert Query to search or find similar language ''' |
|
st.markdown(multi) |
|
multi = '''3. Press Index.''' |
|
st.markdown(multi) |
|
multi = ''' |
|
** Attempt is made for appropriate page and passage retrieval ** \n |
|
''' |
|
st.markdown(multi) |
|
|
|
|
|
list_docs = [] |
|
list_save_path = [] |
|
uploaded_files = st.file_uploader("Choose file(s)", accept_multiple_files=True) |
|
print('uploaded_files ', uploaded_files) |
|
|
|
|
|
single_example_passage = st.text_area('Enter Query or similar passage Here and press Chat',"What is Governing Law?") |
|
submitted = st.form_submit_button("Index and Answer") |
|
|
|
if submitted and (uploaded_files is not None): |
|
list_docs, list_save_path = util_upload_file_and_return_list_docs(uploaded_files) |
|
passage_documents = util_get_list_page_and_passage(list_docs, list_save_path) |
|
|
|
|
|
|
|
|
|
bm25_retriever = BM25Retriever.from_documents(passage_documents) |
|
bm25_retriever.k = 2 |
|
chroma_vectorstore = Chroma.from_documents(passage_documents, embedding) |
|
chroma_retriever = chroma_vectorstore.as_retriever(search_kwargs={"k": 2}) |
|
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, chroma_retriever],weights=[0.25, 0.75]) |
|
passage_nodes = ensemble_retriever.get_relevant_documents(single_example_passage) |
|
print('len(passage_nodes):', len(passage_nodes)) |
|
df = util_get_list_pageno_and_contents(single_example_passage,passage_documents,passage_nodes) |
|
st.write(df) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|