|
import os |
|
import re |
|
from typing import List, Dict, Tuple |
|
import chromadb |
|
from chromadb.utils import embedding_functions |
|
from config import CHUNK_SIZE, CHUNK_OVERLAP, DATABASE_DIR, EMBEDDING_MODEL |
|
|
|
class KodeksProcessor: |
|
def __init__(self): |
|
self.client = chromadb.PersistentClient(path=DATABASE_DIR) |
|
try: |
|
self.collection = self.client.get_collection("kodeksy") |
|
except: |
|
self.collection = self.client.create_collection( |
|
name="kodeksy", |
|
embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction( |
|
model_name=EMBEDDING_MODEL |
|
) |
|
) |
|
|
|
def extract_metadata(self, text: str) -> Dict: |
|
metadata = {} |
|
dz_u_match = re.search(r'Dz\.U\.(\d{4})\.(\d+)\.(\d+)', text) |
|
if dz_u_match: |
|
metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}" |
|
metadata['rok'] = dz_u_match.group(1) |
|
|
|
nazwa_match = re.search(r'USTAWA\s+z dnia(.*?)\n(.*?)\n', text) |
|
if nazwa_match: |
|
metadata['data_ustawy'] = nazwa_match.group(1).strip() |
|
metadata['nazwa'] = nazwa_match.group(2).strip() |
|
|
|
return metadata |
|
|
|
def split_header_and_content(self, text: str) -> Tuple[str, str]: |
|
parts = text.split("USTAWA", 1) |
|
if len(parts) > 1: |
|
return parts[0], "USTAWA" + parts[1] |
|
return "", text |
|
|
|
def process_article(self, article_text: str) -> Dict: |
|
art_num_match = re.match(r'Art\.\s*(\d+)', article_text) |
|
article_num = art_num_match.group(1) if art_num_match else "" |
|
|
|
paragraphs = re.findall(r'§\s*(\d+)[.\s]+(.*?)(?=§\s*\d+|$)', article_text, re.DOTALL) |
|
|
|
if not paragraphs: |
|
return { |
|
"article_num": article_num, |
|
"content": article_text.strip(), |
|
"has_paragraphs": False |
|
} |
|
|
|
return { |
|
"article_num": article_num, |
|
"paragraphs": paragraphs, |
|
"has_paragraphs": True |
|
} |
|
|
|
def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]: |
|
chunks = [] |
|
chapters = re.split(r'(Rozdział \d+\n\n[^\\n]+)\n', text) |
|
current_chapter = "" |
|
|
|
for i, section in enumerate(chapters): |
|
if section.startswith('Rozdział'): |
|
current_chapter = section.strip() |
|
continue |
|
|
|
articles = re.split(r'(Art\.\s*\d+.*?)(?=Art\.\s*\d+|$)', section) |
|
|
|
for article in articles: |
|
if not article.strip(): |
|
continue |
|
|
|
if article.startswith('Art.'): |
|
processed_article = self.process_article(article) |
|
|
|
chunk_metadata = { |
|
**metadata, |
|
"chapter": current_chapter, |
|
"article": processed_article["article_num"] |
|
} |
|
|
|
if processed_article["has_paragraphs"]: |
|
for par_num, par_content in processed_article["paragraphs"]: |
|
chunks.append({ |
|
"text": f"Art. {processed_article['article_num']} § {par_num}. {par_content}", |
|
"metadata": {**chunk_metadata, "paragraph": par_num} |
|
}) |
|
else: |
|
chunks.append({ |
|
"text": processed_article["content"], |
|
"metadata": chunk_metadata |
|
}) |
|
|
|
return chunks |
|
|
|
def process_file(self, filepath: str) -> None: |
|
print(f"Przetwarzanie pliku: {filepath}") |
|
|
|
with open(filepath, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
|
|
header, main_content = self.split_header_and_content(content) |
|
metadata = self.extract_metadata(main_content) |
|
metadata['filename'] = os.path.basename(filepath) |
|
|
|
chunks = self.split_into_chunks(main_content, metadata) |
|
|
|
for i, chunk in enumerate(chunks): |
|
self.collection.add( |
|
documents=[chunk["text"]], |
|
metadatas=[chunk["metadata"]], |
|
ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"] |
|
) |
|
|
|
print(f"Dodano {len(chunks)} chunków z pliku {metadata['filename']}") |
|
|
|
def process_all_files(self, directory: str) -> None: |
|
for filename in os.listdir(directory): |
|
if filename.endswith('.txt'): |
|
filepath = os.path.join(directory, filename) |
|
self.process_file(filepath) |
|
|
|
def search(self, query: str, n_results: int = 3) -> Dict: |
|
results = self.collection.query( |
|
query_texts=[query], |
|
n_results=n_results |
|
) |
|
return results |