Spaces:
Runtime error
Runtime error
File size: 5,479 Bytes
0117cec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import pandas as pd
import os
from tqdm import tqdm
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class Imagebase:
def __init__(self, parquet_path=None):
self.default_parquet_path = 'datas/imagebase.parquet'
self.parquet_path = parquet_path or self.default_parquet_path
self.datas = None
if os.path.exists(self.parquet_path):
# self.load_from_parquet(self.parquet_path)
pass
self.clip_extractor = None
def random_sample(self, num_samples=12):
if self.datas is not None:
return self.datas.sample(num_samples).to_dict(orient='records')
else:
return []
def load_from_parquet(self, parquet_path):
self.datas = pd.read_parquet(parquet_path)
def save_to_parquet(self, parquet_path=None):
parquet_path = parquet_path or self.default_parquet_path
if self.datas is not None:
self.datas.to_parquet(parquet_path)
def init_clip_extractor(self):
if self.clip_extractor is None:
try:
from CLIPExtractor import CLIPExtractor
except:
from src.CLIPExtractor import CLIPExtractor
cache_dir = "models"
self.clip_extractor = CLIPExtractor(model_name="openai/clip-vit-large-patch14", cache_dir=cache_dir)
def top_k_search(self, query_feature, top_k=15):
if self.datas is None:
return []
if 'clip_feature' not in self.datas.columns:
raise ValueError("clip_feature column not found in the data.")
query_feature = np.array(query_feature).reshape(1, -1)
attribute_features = np.stack(self.datas['clip_feature'].dropna().values)
similarities = cosine_similarity(query_feature, attribute_features)[0]
top_k_indices = np.argsort(similarities)[-top_k:][::-1]
top_k_results = self.datas.iloc[top_k_indices].copy()
top_k_results['similarity'] = similarities[top_k_indices]
# Drop the 'clip_feature' column
top_k_results = top_k_results.drop(columns=['clip_feature'])
return top_k_results.to_dict(orient='records')
def search_with_image_name(self, image_name):
self.init_clip_extractor()
img_feature = self.clip_extractor.extract_image_from_file(image_name)
return self.top_k_search(img_feature)
def search_with_image(self, image, if_opencv=False):
self.init_clip_extractor()
img_feature = self.clip_extractor.extract_image(image, if_opencv=if_opencv)
return self.top_k_search(img_feature)
def add_image(self, data, if_save = True, image_feature = None):
required_fields = ['image_name', 'keyword', 'translated_word']
if not all(field in data for field in required_fields):
raise ValueError(f"Data must contain the following fields: {required_fields}")
image_name = data['image_name']
if image_feature is None:
self.init_clip_extractor()
data['clip_feature'] = self.clip_extractor.extract_image_from_file(image_name)
else:
data['clip_feature'] = image_feature
if self.datas is None:
self.datas = pd.DataFrame([data])
else:
self.datas = pd.concat([self.datas, pd.DataFrame([data])], ignore_index=True)
if if_save:
self.save_to_parquet()
def add_images(self, datas):
for data in datas:
self.add_image(data, if_save=False)
self.save_to_parquet()
import os
from glob import glob
def scan_and_update_imagebase(db, target_folder="temp_images"):
# 获取target_folder目录下所有.jpg文件
image_files = glob(os.path.join(target_folder, "*.jpg"))
duplicate_count = 0
added_count = 0
for image_path in image_files:
# 使用文件名作为keyword
keyword = os.path.basename(image_path).rsplit('.', 1)[0]
translated_word = keyword # 可以根据需要调整translated_word
# 搜索数据库中是否有相似的图片
results = db.search_with_image_name(image_path)
if results and results[0]['similarity'] > 0.9:
print(f"Image '{image_path}' is considered a duplicate.")
duplicate_count += 1
else:
new_image_data = {
'image_name': image_path,
'keyword': keyword,
'translated_word': translated_word
}
db.add_image(new_image_data)
print(f"Image '{image_path}' added to the database.")
added_count += 1
print(f"Total duplicate images found: {duplicate_count}")
print(f"Total new images added to the database: {added_count}")
if __name__ == '__main__':
img_db = Imagebase()
# 目标目录
target_folder = "temp_images"
# 扫描并更新数据库
scan_and_update_imagebase(img_db, target_folder)
# Usage example
# img_db = Imagebase()
# new_image_data = {
# 'image_name': "datas/老虎.jpg",
# 'keyword': 'tiger',
# 'translated_word': '老虎'
# }
# img_db.add_image(new_image_data)
# image_path = "datas/老虎.jpg"
# results = img_db.search_with_image_name(image_path)
# for result in results[:3]:
# print(result)
|