|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
from openai import OpenAI
|
|
from tqdm import tqdm
|
|
from langdetect import detect
|
|
from transformers import GPT2Tokenizer
|
|
import nltk
|
|
from nltk.tokenize import sent_tokenize, word_tokenize
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
|
|
from App_Function_Libraries.Tokenization_Methods_Lib import openai_tokenize
|
|
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ntlk_prep():
|
|
nltk.download('punkt')
|
|
|
|
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
|
|
|
|
|
|
config = load_comprehensive_config()
|
|
|
|
chunk_options = {
|
|
'method': config.get('Chunking', 'method', fallback='words'),
|
|
'max_size': config.getint('Chunking', 'max_size', fallback=400),
|
|
'overlap': config.getint('Chunking', 'overlap', fallback=200),
|
|
'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
|
|
'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
|
|
'language': config.get('Chunking', 'language', fallback='english')
|
|
}
|
|
|
|
openai_api_key = config.get('API', 'openai_api_key')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_language(text: str) -> str:
|
|
try:
|
|
return detect(text)
|
|
except:
|
|
|
|
return 'en'
|
|
|
|
|
|
def load_document(file_path):
|
|
with open(file_path, 'r') as file:
|
|
text = file.read()
|
|
return re.sub('\\s+', ' ', text).strip()
|
|
|
|
|
|
def improved_chunking_process(text: str, custom_chunk_options: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
logging.debug("Improved chunking process started...")
|
|
|
|
|
|
json_content = {}
|
|
try:
|
|
json_end = text.index("}\n") + 1
|
|
json_content = json.loads(text[:json_end])
|
|
text = text[json_end:].strip()
|
|
logging.debug(f"Extracted JSON metadata: {json_content}")
|
|
except (ValueError, json.JSONDecodeError):
|
|
logging.debug("No JSON metadata found at the beginning of the text")
|
|
|
|
|
|
header_match = re.match(r"(This text was transcribed using.*?)\n\n", text, re.DOTALL)
|
|
header_text = ""
|
|
if header_match:
|
|
header_text = header_match.group(1)
|
|
text = text[len(header_text):].strip()
|
|
logging.debug(f"Extracted header text: {header_text}")
|
|
|
|
options = chunk_options.copy()
|
|
if custom_chunk_options:
|
|
options.update(custom_chunk_options)
|
|
|
|
chunk_method = options.get('method', 'words')
|
|
max_size = options.get('max_size', 2000)
|
|
overlap = options.get('overlap', 0)
|
|
language = options.get('language', None)
|
|
|
|
if language is None:
|
|
language = detect_language(text)
|
|
|
|
chunks = chunk_text(text, chunk_method, max_size, overlap, language)
|
|
|
|
chunks_with_metadata = []
|
|
total_chunks = len(chunks)
|
|
for i, chunk in enumerate(chunks):
|
|
metadata = {
|
|
'chunk_index': i,
|
|
'total_chunks': total_chunks,
|
|
'chunk_method': chunk_method,
|
|
'max_size': max_size,
|
|
'overlap': overlap,
|
|
'language': language,
|
|
'relative_position': i / total_chunks
|
|
}
|
|
metadata.update(json_content)
|
|
metadata['header_text'] = header_text
|
|
|
|
chunks_with_metadata.append({
|
|
'text': chunk,
|
|
'metadata': metadata
|
|
})
|
|
|
|
return chunks_with_metadata
|
|
|
|
|
|
|
|
def multi_level_chunking(text: str, method: str, max_size: int, overlap: int, language: str) -> List[str]:
|
|
logging.debug("Multi-level chunking process started...")
|
|
|
|
paragraphs = chunk_text_by_paragraphs(text, max_size * 2, overlap)
|
|
|
|
|
|
chunks = []
|
|
for para in paragraphs:
|
|
if method == 'words':
|
|
chunks.extend(chunk_text_by_words(para, max_size, overlap, language))
|
|
elif method == 'sentences':
|
|
chunks.extend(chunk_text_by_sentences(para, max_size, overlap, language))
|
|
else:
|
|
chunks.append(para)
|
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
|
def chunk_text(text: str, method: str, max_size: int, overlap: int, language: str=None) -> List[str]:
|
|
|
|
if method == 'words':
|
|
logging.debug("Chunking by words...")
|
|
return chunk_text_by_words(text, max_size, overlap, language)
|
|
elif method == 'sentences':
|
|
logging.debug("Chunking by sentences...")
|
|
return chunk_text_by_sentences(text, max_size, overlap, language)
|
|
elif method == 'paragraphs':
|
|
logging.debug("Chunking by paragraphs...")
|
|
return chunk_text_by_paragraphs(text, max_size, overlap)
|
|
elif method == 'tokens':
|
|
logging.debug("Chunking by tokens...")
|
|
return chunk_text_by_tokens(text, max_size, overlap)
|
|
elif method == 'semantic':
|
|
logging.debug("Chunking by semantic similarity...")
|
|
return semantic_chunking(text, max_size)
|
|
else:
|
|
return [text]
|
|
|
|
def determine_chunk_position(relative_position: float) -> str:
|
|
if relative_position < 0.33:
|
|
return "This chunk is from the beginning of the document"
|
|
elif relative_position < 0.66:
|
|
return "This chunk is from the middle of the document"
|
|
else:
|
|
return "This chunk is from the end of the document"
|
|
|
|
|
|
def chunk_text_by_words(text: str, max_words: int = 300, overlap: int = 0, language: str = None) -> List[str]:
|
|
logging.debug("chunk_text_by_words...")
|
|
if language is None:
|
|
language = detect_language(text)
|
|
|
|
if language.startswith('zh'):
|
|
import jieba
|
|
words = list(jieba.cut(text))
|
|
elif language == 'ja':
|
|
import fugashi
|
|
tagger = fugashi.Tagger()
|
|
words = [word.surface for word in tagger(text)]
|
|
else:
|
|
words = text.split()
|
|
|
|
chunks = []
|
|
for i in range(0, len(words), max_words - overlap):
|
|
chunk = ' '.join(words[i:i + max_words])
|
|
chunks.append(chunk)
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def chunk_text_by_sentences(text: str, max_sentences: int = 10, overlap: int = 0, language: str = None) -> List[str]:
|
|
logging.debug("chunk_text_by_sentences...")
|
|
if language is None:
|
|
language = detect_language(text)
|
|
|
|
nltk.download('punkt', quiet=True)
|
|
|
|
if language.startswith('zh'):
|
|
import jieba
|
|
sentences = list(jieba.cut(text, cut_all=False))
|
|
elif language == 'ja':
|
|
import fugashi
|
|
tagger = fugashi.Tagger()
|
|
sentences = [word.surface for word in tagger(text) if word.feature.pos1 in ['記号', '補助記号'] and word.surface.strip()]
|
|
else:
|
|
sentences = sent_tokenize(text, language=language)
|
|
|
|
chunks = []
|
|
for i in range(0, len(sentences), max_sentences - overlap):
|
|
chunk = ' '.join(sentences[i:i + max_sentences])
|
|
chunks.append(chunk)
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def chunk_text_by_paragraphs(text: str, max_paragraphs: int = 5, overlap: int = 0) -> List[str]:
|
|
logging.debug("chunk_text_by_paragraphs...")
|
|
paragraphs = re.split(r'\n\s*\n', text)
|
|
chunks = []
|
|
for i in range(0, len(paragraphs), max_paragraphs - overlap):
|
|
chunk = '\n\n'.join(paragraphs[i:i + max_paragraphs])
|
|
chunks.append(chunk)
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def chunk_text_by_tokens(text: str, max_tokens: int = 1000, overlap: int = 0) -> List[str]:
|
|
logging.debug("chunk_text_by_tokens...")
|
|
|
|
|
|
words = text.split()
|
|
chunks = []
|
|
current_chunk = []
|
|
current_token_count = 0
|
|
|
|
for word in words:
|
|
word_token_count = len(word) // 4 + 1
|
|
if current_token_count + word_token_count > max_tokens and current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
current_chunk = current_chunk[-overlap:] if overlap > 0 else []
|
|
current_token_count = sum(len(w) // 4 + 1 for w in current_chunk)
|
|
|
|
current_chunk.append(word)
|
|
current_token_count += word_token_count
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def post_process_chunks(chunks: List[str]) -> List[str]:
|
|
return [chunk.strip() for chunk in chunks if chunk.strip()]
|
|
|
|
|
|
|
|
def get_chunk_metadata(chunk: str, full_text: str, chunk_type: str = "generic",
|
|
chapter_number: Optional[int] = None,
|
|
chapter_pattern: Optional[str] = None,
|
|
language: str = None) -> Dict[str, Any]:
|
|
try:
|
|
logging.debug("get_chunk_metadata...")
|
|
start_index = full_text.index(chunk)
|
|
end_index = start_index + len(chunk)
|
|
|
|
chunk_hash = hashlib.md5(chunk.encode()).hexdigest()
|
|
|
|
metadata = {
|
|
'start_index': start_index,
|
|
'end_index': end_index,
|
|
'word_count': len(chunk.split()),
|
|
'char_count': len(chunk),
|
|
'chunk_type': chunk_type,
|
|
'language': language,
|
|
'chunk_hash': chunk_hash,
|
|
'relative_position': start_index / len(full_text)
|
|
}
|
|
|
|
if chunk_type == "chapter":
|
|
metadata['chapter_number'] = chapter_number
|
|
metadata['chapter_pattern'] = chapter_pattern
|
|
|
|
return metadata
|
|
except ValueError as e:
|
|
logging.error(f"Chunk not found in full_text: {chunk[:50]}... Full text length: {len(full_text)}")
|
|
raise
|
|
|
|
|
|
def process_document_with_metadata(text: str, chunk_options: Dict[str, Any],
|
|
document_metadata: Dict[str, Any]) -> Dict[str, Any]:
|
|
chunks = improved_chunking_process(text, chunk_options)
|
|
|
|
return {
|
|
'document_metadata': document_metadata,
|
|
'chunks': chunks
|
|
}
|
|
|
|
|
|
|
|
def chunk_text_hybrid(text, max_tokens=1000):
|
|
logging.debug("chunk_text_hybrid...")
|
|
sentences = nltk.tokenize.sent_tokenize(text)
|
|
chunks = []
|
|
current_chunk = []
|
|
current_length = 0
|
|
|
|
for sentence in sentences:
|
|
tokens = tokenizer.encode(sentence)
|
|
if current_length + len(tokens) <= max_tokens:
|
|
current_chunk.append(sentence)
|
|
current_length += len(tokens)
|
|
else:
|
|
chunks.append(' '.join(current_chunk))
|
|
current_chunk = [sentence]
|
|
current_length = len(tokens)
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return chunks
|
|
|
|
|
|
|
|
def chunk_on_delimiter(input_string: str,
|
|
max_tokens: int,
|
|
delimiter: str) -> List[str]:
|
|
logging.debug("chunk_on_delimiter...")
|
|
chunks = input_string.split(delimiter)
|
|
combined_chunks, _, dropped_chunk_count = combine_chunks_with_no_minimum(
|
|
chunks, max_tokens, chunk_delimiter=delimiter, add_ellipsis_for_overflow=True)
|
|
if dropped_chunk_count > 0:
|
|
print(f"Warning: {dropped_chunk_count} chunks were dropped due to exceeding the token limit.")
|
|
combined_chunks = [f"{chunk}{delimiter}" for chunk in combined_chunks]
|
|
return combined_chunks
|
|
|
|
|
|
|
|
|
|
|
|
def recursive_summarize_chunks(chunks, summarize_func, custom_prompt, temp=None, system_prompt=None):
|
|
logging.debug("recursive_summarize_chunks...")
|
|
summarized_chunks = []
|
|
current_summary = ""
|
|
|
|
logging.debug(f"recursive_summarize_chunks: Summarizing {len(chunks)} chunks recursively...")
|
|
logging.debug(f"recursive_summarize_chunks: temperature is @ {temp}")
|
|
for i, chunk in enumerate(chunks):
|
|
if i == 0:
|
|
current_summary = summarize_func(chunk, custom_prompt, temp, system_prompt)
|
|
else:
|
|
combined_text = current_summary + "\n\n" + chunk
|
|
current_summary = summarize_func(combined_text, custom_prompt, temp, system_prompt)
|
|
|
|
summarized_chunks.append(current_summary)
|
|
|
|
return summarized_chunks
|
|
|
|
|
|
|
|
sample_text = """
|
|
Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence
|
|
concerned with the interactions between computers and human language, in particular how to program computers
|
|
to process and analyze large amounts of natural language data. The result is a computer capable of "understanding"
|
|
the contents of documents, including the contextual nuances of the language within them. The technology can then
|
|
accurately extract information and insights contained in the documents as well as categorize and organize the documents themselves.
|
|
|
|
Challenges in natural language processing frequently involve speech recognition, natural language understanding,
|
|
and natural language generation.
|
|
|
|
Natural language processing has its roots in the 1950s. Already in 1950, Alan Turing published an article titled
|
|
"Computing Machinery and Intelligence" which proposed what is now called the Turing test as a criterion of intelligence.
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def count_units(text, unit='words'):
|
|
if unit == 'words':
|
|
return len(text.split())
|
|
elif unit == 'tokens':
|
|
return len(word_tokenize(text))
|
|
elif unit == 'characters':
|
|
return len(text)
|
|
else:
|
|
raise ValueError("Invalid unit. Choose 'words', 'tokens', or 'characters'.")
|
|
|
|
|
|
def semantic_chunking(text, max_chunk_size=2000, unit='words'):
|
|
logging.debug("semantic_chunking...")
|
|
nltk.download('punkt', quiet=True)
|
|
sentences = sent_tokenize(text)
|
|
vectorizer = TfidfVectorizer()
|
|
sentence_vectors = vectorizer.fit_transform(sentences)
|
|
|
|
chunks = []
|
|
current_chunk = []
|
|
current_size = 0
|
|
|
|
for i, sentence in enumerate(sentences):
|
|
sentence_size = count_units(sentence, unit)
|
|
if current_size + sentence_size > max_chunk_size and current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
overlap_size = count_units(' '.join(current_chunk[-3:]), unit)
|
|
current_chunk = current_chunk[-3:]
|
|
current_size = overlap_size
|
|
|
|
current_chunk.append(sentence)
|
|
current_size += sentence_size
|
|
|
|
if i + 1 < len(sentences):
|
|
current_vector = sentence_vectors[i]
|
|
next_vector = sentence_vectors[i + 1]
|
|
similarity = cosine_similarity(current_vector, next_vector)[0][0]
|
|
if similarity < 0.5 and current_size >= max_chunk_size // 2:
|
|
chunks.append(' '.join(current_chunk))
|
|
overlap_size = count_units(' '.join(current_chunk[-3:]), unit)
|
|
current_chunk = current_chunk[-3:]
|
|
current_size = overlap_size
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return chunks
|
|
|
|
|
|
def semantic_chunk_long_file(file_path, max_chunk_size=1000, overlap=100, unit='words'):
|
|
logging.debug("semantic_chunk_long_file...")
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
|
|
chunks = semantic_chunking(content, max_chunk_size, unit)
|
|
return chunks
|
|
except Exception as e:
|
|
logging.error(f"Error chunking text file: {str(e)}")
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_for_embedding(text: str, file_name: str, full_summary: str, custom_chunk_options: Dict[str, Any] = None) -> List[Dict[str, Any]]:
|
|
options = chunk_options.copy()
|
|
if custom_chunk_options:
|
|
options.update(custom_chunk_options)
|
|
|
|
chunks = improved_chunking_process(text, options)
|
|
total_chunks = len(chunks)
|
|
|
|
chunked_text_with_headers = []
|
|
for i, chunk in enumerate(chunks, 1):
|
|
chunk_text = chunk['text']
|
|
chunk_position = determine_chunk_position(chunk['metadata']['relative_position'])
|
|
|
|
chunk_header = f"""
|
|
Original Document: {file_name}
|
|
Full Document Summary: {full_summary or "Full document summary not available."}
|
|
Chunk: {i} of {total_chunks}
|
|
Position: {chunk_position}
|
|
|
|
--- Chunk Content ---
|
|
"""
|
|
|
|
full_chunk_text = chunk_header + chunk_text
|
|
chunk['text'] = full_chunk_text
|
|
chunk['metadata']['file_name'] = file_name
|
|
chunked_text_with_headers.append(chunk)
|
|
|
|
return chunked_text_with_headers
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
client = OpenAI(api_key=openai_api_key)
|
|
def get_chat_completion(messages, model='gpt-4-turbo'):
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=messages,
|
|
temperature=0,
|
|
)
|
|
return response.choices[0].message.content
|
|
|
|
|
|
|
|
|
|
def combine_chunks_with_no_minimum(
|
|
chunks: List[str],
|
|
max_tokens: int,
|
|
chunk_delimiter="\n\n",
|
|
header: Optional[str] = None,
|
|
add_ellipsis_for_overflow=False,
|
|
) -> Tuple[List[str], List[List[int]], int]:
|
|
dropped_chunk_count = 0
|
|
output = []
|
|
output_indices = []
|
|
candidate = (
|
|
[] if header is None else [header]
|
|
)
|
|
candidate_indices = []
|
|
for chunk_i, chunk in enumerate(chunks):
|
|
chunk_with_header = [chunk] if header is None else [header, chunk]
|
|
|
|
if len(openai_tokenize(chunk_delimiter.join(chunk_with_header))) > max_tokens:
|
|
print(f"warning: chunk overflow")
|
|
if (
|
|
add_ellipsis_for_overflow
|
|
|
|
and len(openai_tokenize(chunk_delimiter.join(candidate + ["..."]))) <= max_tokens
|
|
):
|
|
candidate.append("...")
|
|
dropped_chunk_count += 1
|
|
continue
|
|
|
|
|
|
extended_candidate_token_count = len(openai_tokenize(chunk_delimiter.join(candidate + [chunk])))
|
|
|
|
if extended_candidate_token_count > max_tokens:
|
|
output.append(chunk_delimiter.join(candidate))
|
|
output_indices.append(candidate_indices)
|
|
candidate = chunk_with_header
|
|
candidate_indices = [chunk_i]
|
|
|
|
else:
|
|
candidate.append(chunk)
|
|
candidate_indices.append(chunk_i)
|
|
|
|
if (header is not None and len(candidate) > 1) or (header is None and len(candidate) > 0):
|
|
output.append(chunk_delimiter.join(candidate))
|
|
output_indices.append(candidate_indices)
|
|
return output, output_indices, dropped_chunk_count
|
|
|
|
|
|
def rolling_summarize(text: str,
|
|
detail: float = 0,
|
|
model: str = 'gpt-4-turbo',
|
|
additional_instructions: Optional[str] = None,
|
|
minimum_chunk_size: Optional[int] = 500,
|
|
chunk_delimiter: str = ".",
|
|
summarize_recursively=False,
|
|
verbose=False):
|
|
"""
|
|
Summarizes a given text by splitting it into chunks, each of which is summarized individually.
|
|
The level of detail in the summary can be adjusted, and the process can optionally be made recursive.
|
|
|
|
Parameters:
|
|
- text (str): The text to be summarized.
|
|
- detail (float, optional): A value between 0 and 1
|
|
indicating the desired level of detail in the summary. 0 leads to a higher level summary, and 1 results in a more
|
|
detailed summary. Defaults to 0.
|
|
- additional_instructions (Optional[str], optional): Additional instructions to provide to the
|
|
model for customizing summaries. - minimum_chunk_size (Optional[int], optional): The minimum size for text
|
|
chunks. Defaults to 500.
|
|
- chunk_delimiter (str, optional): The delimiter used to split the text into chunks. Defaults to ".".
|
|
- summarize_recursively (bool, optional): If True, summaries are generated recursively, using previous summaries for context.
|
|
- verbose (bool, optional): If True, prints detailed information about the chunking process.
|
|
Returns:
|
|
- str: The final compiled summary of the text.
|
|
|
|
The function first determines the number of chunks by interpolating between a minimum and a maximum chunk count
|
|
based on the `detail` parameter. It then splits the text into chunks and summarizes each chunk. If
|
|
`summarize_recursively` is True, each summary is based on the previous summaries, adding more context to the
|
|
summarization process. The function returns a compiled summary of all chunks.
|
|
"""
|
|
|
|
|
|
assert 0 <= detail <= 1
|
|
|
|
|
|
max_chunks = len(chunk_on_delimiter(text, minimum_chunk_size, chunk_delimiter))
|
|
min_chunks = 1
|
|
num_chunks = int(min_chunks + detail * (max_chunks - min_chunks))
|
|
|
|
|
|
|
|
document_length = len(openai_tokenize(text))
|
|
chunk_size = max(minimum_chunk_size, document_length // num_chunks)
|
|
text_chunks = chunk_on_delimiter(text, chunk_size, chunk_delimiter)
|
|
if verbose:
|
|
print(f"Splitting the text into {len(text_chunks)} chunks to be summarized.")
|
|
|
|
print(f"Chunk lengths are {[len(openai_tokenize(x)) for x in text_chunks]}")
|
|
|
|
|
|
system_message_content = "Rewrite this text in summarized form."
|
|
if additional_instructions is not None:
|
|
system_message_content += f"\n\n{additional_instructions}"
|
|
|
|
accumulated_summaries = []
|
|
for i, chunk in enumerate(tqdm(text_chunks)):
|
|
if summarize_recursively and accumulated_summaries:
|
|
|
|
combined_text = accumulated_summaries[-1] + "\n\n" + chunk
|
|
user_message_content = f"Previous summary and new content to summarize:\n\n{combined_text}"
|
|
else:
|
|
user_message_content = chunk
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_message_content},
|
|
{"role": "user", "content": user_message_content}
|
|
]
|
|
|
|
response = get_chat_completion(messages, model=model)
|
|
accumulated_summaries.append(response)
|
|
|
|
final_summary = '\n\n'.join(accumulated_summaries)
|
|
return final_summary
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_ebook_by_chapters(text: str, chunk_options: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
logging.debug("chunk_ebook_by_chapters")
|
|
max_chunk_size = chunk_options.get('max_size', 300)
|
|
overlap = chunk_options.get('overlap', 0)
|
|
custom_pattern = chunk_options.get('custom_chapter_pattern', None)
|
|
|
|
|
|
chapter_patterns = [
|
|
custom_pattern,
|
|
r'^#{1,2}\s+',
|
|
r'^Chapter\s+\d+',
|
|
r'^\d+\.\s+',
|
|
r'^[A-Z\s]+$'
|
|
]
|
|
|
|
chapter_positions = []
|
|
used_pattern = None
|
|
|
|
for pattern in chapter_patterns:
|
|
if pattern is None:
|
|
continue
|
|
chapter_regex = re.compile(pattern, re.MULTILINE | re.IGNORECASE)
|
|
chapter_positions = [match.start() for match in chapter_regex.finditer(text)]
|
|
if chapter_positions:
|
|
used_pattern = pattern
|
|
break
|
|
|
|
|
|
if not chapter_positions:
|
|
return [{'text': text, 'metadata': get_chunk_metadata(text, text, chunk_type="whole_document")}]
|
|
|
|
|
|
chunks = []
|
|
for i in range(len(chapter_positions)):
|
|
start = chapter_positions[i]
|
|
end = chapter_positions[i + 1] if i + 1 < len(chapter_positions) else None
|
|
chapter = text[start:end]
|
|
|
|
|
|
if overlap > 0 and i > 0:
|
|
overlap_start = max(0, start - overlap)
|
|
chapter = text[overlap_start:end]
|
|
|
|
chunks.append(chapter)
|
|
|
|
|
|
processed_chunks = post_process_chunks(chunks)
|
|
|
|
|
|
return [{'text': chunk, 'metadata': get_chunk_metadata(chunk, text, chunk_type="chapter", chapter_number=i + 1,
|
|
chapter_pattern=used_pattern)}
|
|
for i, chunk in enumerate(processed_chunks)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def adaptive_chunk_size(text: str, base_size: int = 1000, min_size: int = 500, max_size: int = 2000) -> int:
|
|
|
|
nltk.download('punkt', quiet=True)
|
|
|
|
|
|
sentences = sent_tokenize(text)
|
|
|
|
|
|
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)
|
|
|
|
|
|
if avg_sentence_length < 10:
|
|
size_factor = 1.2
|
|
elif avg_sentence_length > 20:
|
|
size_factor = 0.8
|
|
else:
|
|
size_factor = 1.0
|
|
|
|
|
|
adaptive_size = int(base_size * size_factor)
|
|
|
|
|
|
return max(min_size, min(adaptive_size, max_size))
|
|
|
|
|
|
def adaptive_chunk_size_non_punkt(text: str, base_size: int, min_size: int = 100, max_size: int = 2000) -> int:
|
|
|
|
words = text.split()
|
|
if not words:
|
|
return base_size
|
|
|
|
avg_word_length = sum(len(word) for word in words) / len(words)
|
|
|
|
if avg_word_length > 6:
|
|
adjusted_size = int(base_size * 0.8)
|
|
elif avg_word_length < 4:
|
|
adjusted_size = int(base_size * 1.2)
|
|
else:
|
|
adjusted_size = base_size
|
|
|
|
|
|
return max(min_size, min(adjusted_size, max_size))
|
|
|
|
|
|
def adaptive_chunking(text: str, base_size: int = 1000, min_size: int = 500, max_size: int = 2000) -> List[str]:
|
|
logging.debug("adaptive_chunking...")
|
|
chunk_size = adaptive_chunk_size(text, base_size, min_size, max_size)
|
|
words = text.split()
|
|
chunks = []
|
|
current_chunk = []
|
|
current_length = 0
|
|
|
|
for word in words:
|
|
if current_length + len(word) > chunk_size and current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
current_chunk = []
|
|
current_length = 0
|
|
current_chunk.append(word)
|
|
current_length += len(word) + 1
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|