Spaces:
Sleeping
Sleeping
import os | |
import re | |
import time | |
from typing import List | |
from dotenv import load_dotenv | |
import edgedb | |
from openai import OpenAI | |
from qdrant_client import QdrantClient | |
from utils.cli import \ | |
print_execution_time, start_loading_animation, stop_loading_animation | |
from splitter.splitter import Splitter | |
from qdrant_client.models import Distance, VectorParams | |
from qdrant_client.models import PointStruct | |
class Rag(): | |
def __init__(self): | |
self.type_env = None | |
self.loading_split = None | |
self.loading_image_description = None | |
self.loading_embedding = None | |
self.done_split = [False] | |
self.done_image_description = [False] | |
self.done_embeding = [False] | |
self.edgedb_client = None | |
self.qdrant_client = None | |
def run(self): | |
try: | |
self._initial_config() | |
start_time = time.time() | |
print("➗ Começando o RAG...") | |
self._create_collection() | |
print("✅ Collection criada.") | |
web_sites = self.edgedb_client.query(''' | |
SELECT Website { | |
url, | |
text: { | |
id, | |
content | |
}, | |
videos: { | |
url | |
}, | |
images: { | |
path | |
} | |
} | |
''') | |
print("✅ Websites capturados.") | |
self.loading_image_description = start_loading_animation( | |
self.done_image_description, | |
"Gerando Descrições de imagens caso não tenha..." | |
) | |
texts_with_images_descriptions, error_images_description = \ | |
self._generate_image_description_with_empty_description(web_sites) | |
stop_loading_animation( | |
self.done_image_description, | |
self.loading_image_description | |
) | |
print("✅ Descrição de imagens geradas.") | |
self.loading_split = start_loading_animation( | |
self.done_split, | |
"Dividindo textos e salvando textos pai no bd..." | |
) | |
splitter = Splitter(texts_with_images_descriptions) | |
splitter.split() | |
for text_parent in splitter.texts_parent: | |
self.edgedb_client.query( | |
""" | |
INSERT Pattern { | |
id := <uuid>$id, | |
content := <str>$content, | |
parent_id := <uuid>$parent_id, | |
url := <str>$url | |
} | |
""", | |
id=text_parent["id"], | |
content=text_parent["content"].page_content, | |
parent_id=text_parent["parent_id"], | |
url=text_parent["url"] | |
) | |
stop_loading_animation(self.done_split, self.loading_split) | |
print("✅ Textos dividos e textos pai salvo no Edgedb.") | |
count_embeddings = 0 | |
total_child = len(splitter.texts_child) | |
for text_child in splitter.texts_child: | |
self.done_embeding = [False] | |
self.loading_embedding = start_loading_animation( | |
self.done_embeding, | |
f"""Gerando embeddings e salvando no qdrant: {count_embeddings} de {total_child}""" # noqa | |
) | |
embedding = self.oa_client.embeddings.create( | |
input=[text_child["content"].page_content], | |
model=os.environ.get("OPENAI_MODEL_EMBEDDING") | |
).data[0].embedding | |
self.qdrant_client.upsert( | |
collection_name=os.environ.get("COLLECTION_NAME"), | |
points=[PointStruct( | |
id=str(text_child["id"]), | |
vector=embedding, | |
payload={ | |
"content": text_child["content"].page_content, | |
"parent_id": text_child["parent_id"], | |
"type": "text" | |
} | |
)] | |
) | |
count_embeddings += 1 | |
stop_loading_animation(self.done_embeding, self.loading_embedding) | |
print("✅ Texto filho salvo no qdrant.") | |
print("✅ RAG finalizado.") | |
print( | |
f""" | |
📊 Relatório:\n | |
\t Tempo de execução: {print_execution_time(start_time)}\n | |
\t Textos Filhos e Embedding gerados: {len(splitter.texts_child)}\n | |
\t Textos pai gerados: {len(splitter.texts_parent)}\n | |
\t WebSites recuperados da base: {len(web_sites)}\n | |
\t Erros ao gerar descrição de imagens: {len(error_images_description)}\n | |
""") # noqa | |
except Exception as error: | |
print(f"❌ Erro: {error}") | |
finally: | |
Rag.safe_join(self.loading_split, self.done_split) | |
Rag.safe_join(self.loading_image_description, self.done_image_description) | |
Rag.safe_join(self.loading_embedding, self.done_embeding) | |
def _initial_config(self): | |
""" | |
Carrega as novas váriaveis de ambiente dependendo do tipo de ambiente e faz as conexões necessárias. | |
""" | |
self.type_env = input("Qual o tipo de ambiente? (dev, prod): ") | |
if self.type_env == "prod": | |
print("Carregando váriaveis de ambiente de produção...") | |
load_dotenv(dotenv_path="../../.env", override=True) | |
self.edgedb_client = edgedb.create_client() | |
print("✅ Conectado ao edgedb.") | |
self.qdrant_client = QdrantClient( | |
url=os.environ.get("QDRANT_URL"), | |
api_key=os.environ.get("QDRANT_KEY") | |
) | |
print("✅ Conectado ao qdrant.") | |
elif self.type_env == "dev": | |
print("Carregando váriaveis de ambiente de desenvolvimento...") | |
load_dotenv(dotenv_path="../../.env.development", override=True) | |
self.edgedb_client = edgedb.create_client( | |
host=os.environ.get("EDGEDB_HOST"), | |
user=os.environ.get("EDGEDB_USER"), | |
password=os.environ.get("EDGEDB_PASSWORD"), | |
port=os.environ.get("EDGEDB_PORT"), | |
tls_security="insecure" | |
) | |
print("✅ Conectado ao edgedb.") | |
self.qdrant_client = QdrantClient( | |
host=os.environ.get("QDRANT_HOST"), | |
port=os.environ.get("QDRANT_PORT") | |
) | |
print("✅ Conectado ao qdrant.") | |
else: | |
raise Exception("Tipo de ambiente inválido.") | |
self.oa_client = OpenAI( | |
api_key=os.environ.get("OPENAI_API_KEY") | |
) | |
print("✅ Conectado a OpenAI.") | |
def _create_collection(self): | |
""" | |
Cria a collection do qdrant e se caso existir, a deleta. | |
""" | |
if self.qdrant_client.collection_exists(os.environ.get("COLLECTION_NAME")): | |
self.qdrant_client.delete_collection( | |
collection_name=os.environ.get("COLLECTION_NAME") | |
) | |
self.qdrant_client.create_collection( | |
collection_name=os.environ.get("COLLECTION_NAME"), | |
vectors_config=VectorParams(size=1536, distance=Distance.COSINE) | |
) | |
def _generate_image_description_with_empty_description(self, websites): | |
""" | |
Gera descrição de imagens caso não tenha. | |
""" | |
# Expressão regular para encontrar o padrão ![](<link>) | |
pattern = r'!\[\]\(([^)]+)\)' | |
def replace_description_image(match): | |
link = match.group(1) | |
new_description = self._get_description_of_image(link) | |
return f'![{new_description}]({link})' | |
texts_with_images_descriptions = [] | |
error_images_upload = [] | |
for website in websites: | |
try: | |
text_with_image_description = re.sub( | |
pattern, | |
replace_description_image, | |
website.text.content | |
) | |
texts_with_images_descriptions.append({ | |
"id": website.text.id, | |
"content": text_with_image_description, | |
"url": website.url | |
}) | |
except Exception: | |
error_images_upload.append( | |
f"{website.text.id}: {website.text.content}") | |
texts_with_images_descriptions.append({ | |
"id": website.text.id, | |
"content": website.text.content, | |
"url": website.url | |
}) | |
return texts_with_images_descriptions, error_images_upload | |
def _get_description_of_image(self, image_url): | |
""" | |
Faz uma chamada para Open AI aonde gera uma descrição para uma imagem | |
""" | |
completion = self.oa_client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{ | |
"role": "system", | |
"content": """ | |
You are an image descriptor who will describe images using the Brazilian Portuguese language. | |
This description should be short and to the point, with a maximum of 50 characters, to be placed in the alt tag of an HTML. | |
""" # noqa | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": "What's in this image?" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"{image_url}" | |
} | |
} | |
] | |
} | |
], | |
max_tokens=300 | |
) | |
return completion.choices[0].message.content | |
def safe_join(thread, done: List[bool]): | |
""" | |
Faz o join de uma thread se ela estiver viva | |
parando assim o print de loading. | |
""" | |
if thread is not None and thread.is_alive(): | |
done[0] = True | |
thread.join() | |
if __name__ == "__main__": | |
rag = Rag() | |
rag.run() | |