import os from configs import ( KB_ROOT_PATH, CHUNK_SIZE, OVERLAP_SIZE, ZH_TITLE_ENHANCE, logger, log_verbose, text_splitter_dict, LLM_MODELS, TEXT_SPLITTER_NAME, ) import importlib from text_splitter import zh_title_enhance as func_zh_title_enhance import langchain.document_loaders from langchain.docstore.document import Document from langchain.text_splitter import TextSplitter from pathlib import Path from server.utils import run_in_thread_pool, get_model_worker_config import json from typing import List, Union,Dict, Tuple, Generator import chardet def validate_kb_name(knowledge_base_id: str) -> bool: # 检查是否包含预期外的字符或路径攻击关键字 if "../" in knowledge_base_id: return False return True def get_kb_path(knowledge_base_name: str): return os.path.join(KB_ROOT_PATH, knowledge_base_name) def get_doc_path(knowledge_base_name: str): return os.path.join(get_kb_path(knowledge_base_name), "content") def get_vs_path(knowledge_base_name: str, vector_name: str): return os.path.join(get_kb_path(knowledge_base_name), "vector_store", vector_name) def get_file_path(knowledge_base_name: str, doc_name: str): return os.path.join(get_doc_path(knowledge_base_name), doc_name) def list_kbs_from_folder(): return [f for f in os.listdir(KB_ROOT_PATH) if os.path.isdir(os.path.join(KB_ROOT_PATH, f))] def list_files_from_folder(kb_name: str): doc_path = get_doc_path(kb_name) result = [] def is_skiped_path(path: str): tail = os.path.basename(path).lower() for x in ["temp", "tmp", ".", "~$"]: if tail.startswith(x): return True return False def process_entry(entry): if is_skiped_path(entry.path): return if entry.is_symlink(): target_path = os.path.realpath(entry.path) with os.scandir(target_path) as target_it: for target_entry in target_it: process_entry(target_entry) elif entry.is_file(): file_path = (Path(os.path.relpath(entry.path, doc_path)).as_posix()) # 路径统一为 posix 格式 result.append(file_path) elif entry.is_dir(): with os.scandir(entry.path) as it: for sub_entry in it: process_entry(sub_entry) with os.scandir(doc_path) as it: for entry in it: process_entry(entry) return result LOADER_DICT = {"UnstructuredHTMLLoader": ['.html'], "MHTMLLoader": ['.mhtml'], "UnstructuredMarkdownLoader": ['.md'], "JSONLoader": [".json"], "JSONLinesLoader": [".jsonl"], "CSVLoader": [".csv"], # "FilteredCSVLoader": [".csv"], 如果使用自定义分割csv "RapidOCRPDFLoader": [".pdf"], "RapidOCRDocLoader": ['.docx', '.doc'], "RapidOCRPPTLoader": ['.ppt', '.pptx', ], "RapidOCRLoader": ['.png', '.jpg', '.jpeg', '.bmp'], "UnstructuredFileLoader": ['.eml', '.msg', '.rst', '.rtf', '.txt', '.xml', '.epub', '.odt','.tsv'], "UnstructuredEmailLoader": ['.eml', '.msg'], "UnstructuredEPubLoader": ['.epub'], "UnstructuredExcelLoader": ['.xlsx', '.xls', '.xlsd'], "NotebookLoader": ['.ipynb'], "UnstructuredODTLoader": ['.odt'], "PythonLoader": ['.py'], "UnstructuredRSTLoader": ['.rst'], "UnstructuredRTFLoader": ['.rtf'], "SRTLoader": ['.srt'], "TomlLoader": ['.toml'], "UnstructuredTSVLoader": ['.tsv'], "UnstructuredWordDocumentLoader": ['.docx', '.doc'], "UnstructuredXMLLoader": ['.xml'], "UnstructuredPowerPointLoader": ['.ppt', '.pptx'], "EverNoteLoader": ['.enex'], } SUPPORTED_EXTS = [ext for sublist in LOADER_DICT.values() for ext in sublist] # patch json.dumps to disable ensure_ascii def _new_json_dumps(obj, **kwargs): kwargs["ensure_ascii"] = False return _origin_json_dumps(obj, **kwargs) if json.dumps is not _new_json_dumps: _origin_json_dumps = json.dumps json.dumps = _new_json_dumps class JSONLinesLoader(langchain.document_loaders.JSONLoader): ''' 行式 Json 加载器,要求文件扩展名为 .jsonl ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._json_lines = True langchain.document_loaders.JSONLinesLoader = JSONLinesLoader def get_LoaderClass(file_extension): for LoaderClass, extensions in LOADER_DICT.items(): if file_extension in extensions: return LoaderClass def get_loader(loader_name: str, file_path: str, loader_kwargs: Dict = None): ''' 根据loader_name和文件路径或内容返回文档加载器。 ''' loader_kwargs = loader_kwargs or {} try: if loader_name in ["RapidOCRPDFLoader", "RapidOCRLoader", "FilteredCSVLoader", "RapidOCRDocLoader", "RapidOCRPPTLoader"]: document_loaders_module = importlib.import_module('document_loaders') else: document_loaders_module = importlib.import_module('langchain.document_loaders') DocumentLoader = getattr(document_loaders_module, loader_name) except Exception as e: msg = f"为文件{file_path}查找加载器{loader_name}时出错:{e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) document_loaders_module = importlib.import_module('langchain.document_loaders') DocumentLoader = getattr(document_loaders_module, "UnstructuredFileLoader") if loader_name == "UnstructuredFileLoader": loader_kwargs.setdefault("autodetect_encoding", True) elif loader_name == "CSVLoader": if not loader_kwargs.get("encoding"): # 如果未指定 encoding,自动识别文件编码类型,避免langchain loader 加载文件报编码错误 with open(file_path, 'rb') as struct_file: encode_detect = chardet.detect(struct_file.read()) if encode_detect is None: encode_detect = {"encoding": "utf-8"} loader_kwargs["encoding"] = encode_detect["encoding"] elif loader_name == "JSONLoader": loader_kwargs.setdefault("jq_schema", ".") loader_kwargs.setdefault("text_content", False) elif loader_name == "JSONLinesLoader": loader_kwargs.setdefault("jq_schema", ".") loader_kwargs.setdefault("text_content", False) loader = DocumentLoader(file_path, **loader_kwargs) return loader def make_text_splitter( splitter_name: str = TEXT_SPLITTER_NAME, chunk_size: int = CHUNK_SIZE, chunk_overlap: int = OVERLAP_SIZE, llm_model: str = LLM_MODELS[0], ): """ 根据参数获取特定的分词器 """ splitter_name = splitter_name or "SpacyTextSplitter" try: if splitter_name == "MarkdownHeaderTextSplitter": # MarkdownHeaderTextSplitter特殊判定 headers_to_split_on = text_splitter_dict[splitter_name]['headers_to_split_on'] text_splitter = langchain.text_splitter.MarkdownHeaderTextSplitter( headers_to_split_on=headers_to_split_on) else: try: ## 优先使用用户自定义的text_splitter text_splitter_module = importlib.import_module('text_splitter') TextSplitter = getattr(text_splitter_module, splitter_name) except: ## 否则使用langchain的text_splitter text_splitter_module = importlib.import_module('langchain.text_splitter') TextSplitter = getattr(text_splitter_module, splitter_name) if text_splitter_dict[splitter_name]["source"] == "tiktoken": ## 从tiktoken加载 try: text_splitter = TextSplitter.from_tiktoken_encoder( encoding_name=text_splitter_dict[splitter_name]["tokenizer_name_or_path"], pipeline="zh_core_web_sm", chunk_size=chunk_size, chunk_overlap=chunk_overlap ) except: text_splitter = TextSplitter.from_tiktoken_encoder( encoding_name=text_splitter_dict[splitter_name]["tokenizer_name_or_path"], chunk_size=chunk_size, chunk_overlap=chunk_overlap ) elif text_splitter_dict[splitter_name]["source"] == "huggingface": ## 从huggingface加载 if text_splitter_dict[splitter_name]["tokenizer_name_or_path"] == "": config = get_model_worker_config(llm_model) text_splitter_dict[splitter_name]["tokenizer_name_or_path"] = \ config.get("model_path") if text_splitter_dict[splitter_name]["tokenizer_name_or_path"] == "gpt2": from transformers import GPT2TokenizerFast from langchain.text_splitter import CharacterTextSplitter tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") else: ## 字符长度加载 from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained( text_splitter_dict[splitter_name]["tokenizer_name_or_path"], trust_remote_code=True) text_splitter = TextSplitter.from_huggingface_tokenizer( tokenizer=tokenizer, chunk_size=chunk_size, chunk_overlap=chunk_overlap ) else: try: text_splitter = TextSplitter( pipeline="zh_core_web_sm", chunk_size=chunk_size, chunk_overlap=chunk_overlap ) except: text_splitter = TextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) except Exception as e: print(e) text_splitter_module = importlib.import_module('langchain.text_splitter') TextSplitter = getattr(text_splitter_module, "RecursiveCharacterTextSplitter") text_splitter = TextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) # If you use SpacyTextSplitter you can use GPU to do split likes Issue #1287 # text_splitter._tokenizer.max_length = 37016792 # text_splitter._tokenizer.prefer_gpu() return text_splitter class KnowledgeFile: def __init__( self, filename: str, knowledge_base_name: str, loader_kwargs: Dict = {}, ): ''' 对应知识库目录中的文件,必须是磁盘上存在的才能进行向量化等操作。 ''' self.kb_name = knowledge_base_name self.filename = str(Path(filename).as_posix()) self.ext = os.path.splitext(filename)[-1].lower() if self.ext not in SUPPORTED_EXTS: raise ValueError(f"暂未支持的文件格式 {self.filename}") self.loader_kwargs = loader_kwargs self.filepath = get_file_path(knowledge_base_name, filename) self.docs = None self.splited_docs = None self.document_loader_name = get_LoaderClass(self.ext) self.text_splitter_name = TEXT_SPLITTER_NAME def file2docs(self, refresh: bool = False): if self.docs is None or refresh: logger.info(f"{self.document_loader_name} used for {self.filepath}") loader = get_loader(loader_name=self.document_loader_name, file_path=self.filepath, loader_kwargs=self.loader_kwargs) self.docs = loader.load() return self.docs def docs2texts( self, docs: List[Document] = None, zh_title_enhance: bool = ZH_TITLE_ENHANCE, refresh: bool = False, chunk_size: int = CHUNK_SIZE, chunk_overlap: int = OVERLAP_SIZE, text_splitter: TextSplitter = None, ): docs = docs or self.file2docs(refresh=refresh) if not docs: return [] if self.ext not in [".csv"]: if text_splitter is None: text_splitter = make_text_splitter(splitter_name=self.text_splitter_name, chunk_size=chunk_size, chunk_overlap=chunk_overlap) if self.text_splitter_name == "MarkdownHeaderTextSplitter": docs = text_splitter.split_text(docs[0].page_content) else: docs = text_splitter.split_documents(docs) if not docs: return [] print(f"文档切分示例:{docs[0]}") if zh_title_enhance: docs = func_zh_title_enhance(docs) self.splited_docs = docs return self.splited_docs def file2text( self, zh_title_enhance: bool = ZH_TITLE_ENHANCE, refresh: bool = False, chunk_size: int = CHUNK_SIZE, chunk_overlap: int = OVERLAP_SIZE, text_splitter: TextSplitter = None, ): if self.splited_docs is None or refresh: docs = self.file2docs() self.splited_docs = self.docs2texts(docs=docs, zh_title_enhance=zh_title_enhance, refresh=refresh, chunk_size=chunk_size, chunk_overlap=chunk_overlap, text_splitter=text_splitter) return self.splited_docs def file_exist(self): return os.path.isfile(self.filepath) def get_mtime(self): return os.path.getmtime(self.filepath) def get_size(self): return os.path.getsize(self.filepath) def files2docs_in_thread( files: List[Union[KnowledgeFile, Tuple[str, str], Dict]], chunk_size: int = CHUNK_SIZE, chunk_overlap: int = OVERLAP_SIZE, zh_title_enhance: bool = ZH_TITLE_ENHANCE, ) -> Generator: ''' 利用多线程批量将磁盘文件转化成langchain Document. 如果传入参数是Tuple,形式为(filename, kb_name) 生成器返回值为 status, (kb_name, file_name, docs | error) ''' def file2docs(*, file: KnowledgeFile, **kwargs) -> Tuple[bool, Tuple[str, str, List[Document]]]: try: return True, (file.kb_name, file.filename, file.file2text(**kwargs)) except Exception as e: msg = f"从文件 {file.kb_name}/{file.filename} 加载文档时出错:{e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) return False, (file.kb_name, file.filename, msg) kwargs_list = [] for i, file in enumerate(files): kwargs = {} try: if isinstance(file, tuple) and len(file) >= 2: filename = file[0] kb_name = file[1] file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name) elif isinstance(file, dict): filename = file.pop("filename") kb_name = file.pop("kb_name") kwargs.update(file) file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name) kwargs["file"] = file kwargs["chunk_size"] = chunk_size kwargs["chunk_overlap"] = chunk_overlap kwargs["zh_title_enhance"] = zh_title_enhance kwargs_list.append(kwargs) except Exception as e: yield False, (kb_name, filename, str(e)) for result in run_in_thread_pool(func=file2docs, params=kwargs_list): yield result if __name__ == "__main__": from pprint import pprint kb_file = KnowledgeFile( filename="/home/congyin/Code/Project_Langchain_0814/Langchain-Chatchat/knowledge_base/csv1/content/gm.csv", knowledge_base_name="samples") # kb_file.text_splitter_name = "RecursiveCharacterTextSplitter" docs = kb_file.file2docs() # pprint(docs[-1])