from transformers import AutoTokenizer from rank_bm25 import BM25Okapi import json import re tokenizer = AutoTokenizer.from_pretrained("deepvk/USER-bge-m3") with open('search.json', 'r', encoding='utf-8') as file: data = json.load(file) def prepare_data(data=data): for item in data: corpus = [item['name'].lower()] + [x.lower() for x in item['synonyms']] corpus = " ".join(corpus) item['corpus'] = corpus item['tokenized_corpus'] = tokenizer.tokenize(corpus) return data def clean_text(text): """ Очищает текст от специальных символов, чтобы можно было искать по содержимому. """ return re.sub(r'[^\w\s]', '', text).lower() def search_bm25(query, data): """ Выполняет поиск по запросу с использованием BM25 + токенизатора. query: строка, поисковый запрос. data: список словарей, содержащих информацию о каждом элементе (name, id, synonyms, tokenized_corpus). return: список словарей с уникальными name, id и score. """ tokenized_corpus = [item['tokenized_corpus'] for item in data] bm25 = BM25Okapi(tokenized_corpus) tokenized_query = tokenizer.tokenize(query) doc_scores = bm25.get_scores(tokenized_query) results = [] for idx in reversed(doc_scores.argsort()): item = data[idx] if item['name'] not in [result['name'] for result in results] and doc_scores[idx] > 0: results.append({ 'name': item['name'], 'id': item['id'], 'score': float(doc_scores[idx]) }) return results def search_exact(query, data): """ Выполняет точный поиск подстроки в 'name' и 'synonyms'. Слово либо является началом слова, либо целым словом. """ results = [] cleaned_query = clean_text(query) query_regex = re.compile(r'\b' + re.escape(cleaned_query) + r'\b|\b' + re.escape(cleaned_query)) # Выбираем только целые слова или начала слов (чтобы не искать подстроки) # Сначала ищем подстроку в 'name' for item in data: cleaned_name = clean_text(item['name']) if query_regex.search(cleaned_name): results.append({ 'name': item['name'], 'id': item['id'], 'score': 1 # В данном случае score не важен, но можно оставить }) # Затем ищем подстроку в 'synonyms' for item in data: for synonym in item['synonyms']: cleaned_synonym = clean_text(synonym) if query_regex.search(cleaned_synonym): if not any(res['id'] == item['id'] for res in results): # Избегаем дублирования results.append({ 'name': item['name'], 'id': item['id'], 'score': 1 }) break # Достаточно найти одно совпадение среди синонимов return results #TODO: Добавить поиск подкатегорий def merge_results(results1, results2): """ Объединяет два списка результатов с чередованием элементов, избегая дубликатов. results1: первый список результатов (список словарей с ключами 'name', 'id', 'score') results2: второй список результатов (список словарей с ключами 'name', 'id', 'score') return: объединённый список результатов """ merged_results = [] seen_ids = set() # Набор для отслеживания уникальных id # Чередуем элементы из двух списков for res1, res2 in zip(results1, results2): # Добавляем элемент из первого списка, если его id ещё не было if res1['id'] not in seen_ids: merged_results.append(res1) seen_ids.add(res1['id']) # Добавляем элемент из второго списка, если его id ещё не было if res2['id'] not in seen_ids: merged_results.append(res2) seen_ids.add(res2['id']) for res in results1[len(results2):]: if res['id'] not in seen_ids: merged_results.append(res) seen_ids.add(res['id']) for res in results2[len(results1):]: if res['id'] not in seen_ids: merged_results.append(res) seen_ids.add(res['id']) return merged_results