Spaces:
Running
Running
File size: 3,474 Bytes
5e9cd1d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
from pathlib import Path
from pprint import pprint
import os
import shutil
import sys
root_path = Path(__file__).parent.parent
sys.path.append(str(root_path))
from server.knowledge_base.kb_service.base import KBServiceFactory
from server.knowledge_base.utils import get_kb_path, get_doc_path, KnowledgeFile
from server.knowledge_base.migrate import folder2db, prune_db_docs, prune_folder_files
# setup test knowledge base
kb_name = "test_kb_for_migrate"
test_files = {
"readme.md": str(root_path / "readme.md"),
}
kb_path = get_kb_path(kb_name)
doc_path = get_doc_path(kb_name)
if not os.path.isdir(doc_path):
os.makedirs(doc_path)
for k, v in test_files.items():
shutil.copy(v, os.path.join(doc_path, k))
def test_recreate_vs():
folder2db([kb_name], "recreate_vs")
kb = KBServiceFactory.get_service_by_name(kb_name)
assert kb and kb.exists()
files = kb.list_files()
print(files)
for name in test_files:
assert name in files
path = os.path.join(doc_path, name)
# list docs based on file name
docs = kb.list_docs(file_name=name)
assert len(docs) > 0
pprint(docs[0])
for doc in docs:
assert doc.metadata["source"] == name
# list docs base on metadata
docs = kb.list_docs(metadata={"source": name})
assert len(docs) > 0
for doc in docs:
assert doc.metadata["source"] == name
def test_increment():
kb = KBServiceFactory.get_service_by_name(kb_name)
kb.clear_vs()
assert kb.list_files() == []
assert kb.list_docs() == []
folder2db([kb_name], "increment")
files = kb.list_files()
print(files)
for f in test_files:
assert f in files
docs = kb.list_docs(file_name=f)
assert len(docs) > 0
pprint(docs[0])
for doc in docs:
assert doc.metadata["source"] == f
def test_prune_db():
del_file, keep_file = list(test_files)[:2]
os.remove(os.path.join(doc_path, del_file))
prune_db_docs([kb_name])
kb = KBServiceFactory.get_service_by_name(kb_name)
files = kb.list_files()
print(files)
assert del_file not in files
assert keep_file in files
docs = kb.list_docs(file_name=del_file)
assert len(docs) == 0
docs = kb.list_docs(file_name=keep_file)
assert len(docs) > 0
pprint(docs[0])
shutil.copy(test_files[del_file], os.path.join(doc_path, del_file))
def test_prune_folder():
del_file, keep_file = list(test_files)[:2]
kb = KBServiceFactory.get_service_by_name(kb_name)
# delete docs for file
kb.delete_doc(KnowledgeFile(del_file, kb_name))
files = kb.list_files()
print(files)
assert del_file not in files
assert keep_file in files
docs = kb.list_docs(file_name=del_file)
assert len(docs) == 0
docs = kb.list_docs(file_name=keep_file)
assert len(docs) > 0
docs = kb.list_docs(file_name=del_file)
assert len(docs) == 0
assert os.path.isfile(os.path.join(doc_path, del_file))
# prune folder
prune_folder_files([kb_name])
# check result
assert not os.path.isfile(os.path.join(doc_path, del_file))
assert os.path.isfile(os.path.join(doc_path, keep_file))
def test_drop_kb():
kb = KBServiceFactory.get_service_by_name(kb_name)
kb.drop_kb()
assert not kb.exists()
assert not os.path.isdir(kb_path)
kb = KBServiceFactory.get_service_by_name(kb_name)
assert kb is None
|