Spaces:
Runtime error
Runtime error
# Created by Leandro Carneiro at 19/01/2024 | |
# Description: | |
# ------------------------------------------------ | |
#from langchain.embeddings import OpenAIEmbeddings | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.document_loaders import DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.prompts import PromptTemplate | |
from langchain_openai import ChatOpenAI | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
import os | |
import csv | |
def read_csv_to_dict(filename): | |
data_dict = {} | |
with open(filename, mode='r', encoding='utf-8') as file: | |
csv_reader = csv.reader(file) | |
for row in csv_reader: | |
key, value = row[0].split(';') | |
data_dict[key] = value | |
return data_dict | |
def generate_embeddings_and_vectorstore(path): | |
try: | |
loader = DirectoryLoader(path=path, glob="**/*.txt") | |
corpus = loader.load() | |
print(f' Total de documentos antes do text_split = {len(corpus)}') | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=400) | |
docs = text_splitter.split_documents(corpus) | |
num_total_characters = sum([len(x.page_content) for x in docs]) | |
print(f" Total de chunks depois do text_split = {len(docs)}") | |
print(f" Média de caracteres por chunk = {num_total_characters / len(docs):,.0f}") | |
dict_filename_url = read_csv_to_dict('./local_base/filename_url.csv') | |
for doc in docs: | |
filename = os.path.basename(doc.metadata["source"]) | |
doc.metadata["link"] = dict_filename_url.get(filename) | |
#print('docs') | |
#print(docs) | |
fc_embeddings = OpenAIEmbeddings(openai_api_key=os.environ['OPENAI_KEY']) | |
vectorstore = Chroma.from_documents(docs, fc_embeddings) | |
print('total de docs no vectorstore=',len(vectorstore.get()['documents'])) | |
return vectorstore | |
except Exception as e: | |
print(str(e)) | |
return str(e) | |
class Rag: | |
def __init__(self, vectorstore, min_words, max_words): | |
self.text = None | |
self.vectorstore = vectorstore | |
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key="answer") | |
# | |
#Do not use only your knowledge to make the news. | |
prompt_template = """Your task is to create news for a newspaper based on pieces of text delimited by <> and a question delimited by <>. | |
Do not use only your knowledge to make the news. Make the news based on the question, but using the pieces of text. | |
If the pieces of text don't enough information about the question to create the news, just say that you need more sources of information, nothing more. | |
The news should have a title. | |
The news should be written in a formal language. | |
The news should have between {min_words} and {max_words} words and it should be in Portuguese language. | |
The news should be about the following context: <{context}> | |
Question: <{question}> | |
Answer here:""" | |
self.prompt = PromptTemplate(template=prompt_template, | |
input_variables=["context", "question"], | |
partial_variables={"min_words": min_words, "max_words": max_words}) | |
self.qa = ConversationalRetrievalChain.from_llm( | |
llm=ChatOpenAI(model_name="gpt-3.5-turbo-0125", #0125 #1106 | |
temperature=0, | |
openai_api_key=os.environ['OPENAI_KEY'], | |
max_tokens=int(int(max_words) + (int(max_words) / 2))), #número máximo de tokens para a resposta | |
memory=self.memory, | |
# retriever=vectorstore.as_retriever(search_type='similarity_score_threshold', | |
# search_kwargs={'k':4, 'score_threshold':0.8}), #search_kwargs={'k': 3} | |
retriever=vectorstore.as_retriever(), | |
combine_docs_chain_kwargs={"prompt": self.prompt}, | |
chain_type="stuff",#map_reduce, refine, map_rerank | |
return_source_documents=True, | |
) | |
def generate_text(self, subject): | |
try: | |
query = f"Elabore uma nova notícia sobre {subject}." | |
result_text = self.qa.invoke({"question": query}) | |
print('##### result', result_text) | |
list_result_sources = [] | |
str_result_sources = '' | |
for doc in result_text["source_documents"]: | |
list_result_sources.append(doc.metadata['link']) | |
result_sources = list(set(list_result_sources)) | |
for i in range(len(result_sources)): | |
str_result_sources += f'{i + 1}) {result_sources[i]}' + '\n' | |
self.vectorstore.delete_collection() | |
return (result_text["answer"], str_result_sources) | |
except Exception as e: | |
self.vectorstore.delete_collection() | |
return str(e) | |