Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import random | |
import string | |
from PIL import Image | |
from src.GameMaster import GameMaster | |
from collections import deque | |
import time | |
import threading | |
import queue | |
from src.Founder import Founder | |
import os | |
# os.environ['HTTP_PROXY'] = 'http://localhost:8234' | |
# os.environ['HTTPS_PROXY'] = 'http://localhost:8234' | |
game_master = GameMaster() | |
founder_base = Founder() | |
blank_image_path = "datas/blank_item.jpg" | |
random_data = game_master.random_image_text_data( 12 ) | |
recent_generated_items = deque(random_data , maxlen=12) | |
def refresh_contribution_ladder(): | |
top_contributors = founder_base.get_top_rank(top_k=20) | |
contributions = [] | |
for founder_name, items in top_contributors: | |
random_items = random.sample(items, min(5, len(items))) | |
cultivation_names = [] | |
for item in random_items: | |
result = game_master.remote.search_by_en_keyword(item) | |
if result is not None and "name_in_cultivation" in result: | |
cultivation_names.append(result['name_in_cultivation']) | |
else: | |
cultivation_names.append(item) | |
items_description = ', '.join(cultivation_names) | |
contribution = f"""道友姓名: {founder_name}\n发现了{items_description}等物品""" | |
contributions.append(contribution) | |
while len(contributions) < 20: | |
contributions.append("") | |
return contributions | |
def expensive_generating(save_path, image_feature, backup_results, game_master, founder_name = None): | |
# for now it's a idle implementation for debug | |
# re-search incase redundant generate | |
image_search_result = game_master.remote.top_k_search(image_feature, top_k=1) | |
if image_search_result and len(image_search_result)>0 and image_search_result[0]['similarity'] > game_master.minimal_image_threshold: | |
return | |
global recent_generated_items | |
recent_generated_items.append((save_path, "天机阁长老正在鉴定,道友请耐心等待。。")) | |
search_result, backup_results, image_feature = game_master.search_with_path(save_path) | |
if search_result is None: | |
# real implementation should be | |
cultivation_data = game_master.generate_cultivation_data( \ | |
save_path, image_feature, backup_results ) | |
description = cultivation_data["description_in_cultivation"] | |
print("鉴定得到新物品:", description) | |
else: | |
cultivation_data = search_result | |
description = cultivation_data["description_in_cultivation"] | |
print("鉴定这是记载过的物品,但是第一次发现:", description) | |
suffix = "" | |
if founder_name is not None: | |
global founder_base | |
if "translated_word" in cultivation_data: | |
translated_word = cultivation_data["translated_word"] | |
else: | |
translated_word = cultivation_data["name_in_cultivation"] | |
if founder_base.get_founder(translated_word) is None: | |
if founder_name.strip() != "": | |
founder_base.set_founder(translated_word, founder_name) | |
suffix = f" 由道友 {founder_name} 发现" | |
else: | |
suffix = f" 由道友 {founder_base.get_founder(translated_word)} 发现" | |
# this function will automatically update the database | |
# cultivation_data = backup_results[0] | |
# recent_generated_items.append((save_path, description)) | |
# replace the same save_path in recent_generated_items | |
flag = True | |
for index, item in enumerate(recent_generated_items): | |
if item[0] == save_path: | |
if "name_in_cultivation" in cultivation_data: | |
description = cultivation_data["name_in_cultivation"] + suffix + "--" + description | |
recent_generated_items[index] = (save_path, description) | |
flag = False | |
if flag: | |
if "name_in_cultivation" in cultivation_data: | |
description = cultivation_data["name_in_cultivation"] + suffix + "--" + description | |
recent_generated_items.append((save_path, description)) | |
return | |
### | |
# Queue for managing tasks to process expensive operations | |
task_queue = queue.Queue() | |
# Function that processes the tasks in the queue | |
def worker(): | |
while True: | |
save_path, image_feature, backup_results, founder_base = task_queue.get() | |
if save_path is None: | |
break | |
expensive_generating(save_path, image_feature, backup_results, game_master, founder_base) | |
task_queue.task_done() | |
# Start the worker thread | |
thread = threading.Thread(target=worker, daemon=True) | |
thread.start() | |
### | |
def similarity2level( sim, max_val , min_val ): | |
level_num = 3 | |
level = int( (sim - min_val) / (max_val - min_val) * level_num ) | |
level = max(0, min(level_num, level)) | |
return level | |
from src.get_comments_from_level import get_comments_from_level | |
def is_empty_name( founder_name ): | |
empty_names = ["","测试","鲁鲁道祖"] | |
if founder_name.strip() in empty_names: | |
return True | |
# Function to handle image upload and search | |
def process_image(image, founder_name): | |
# Ensure temp_images directory exists | |
os.makedirs('temp_images', exist_ok=True) | |
prefix = "" | |
founder_name = founder_name.strip() | |
if is_empty_name(founder_name): | |
prefix = "道友,请在下面留下您的尊姓大名,提供大量珍宝鉴定的道友,天机阁将在后面送出灵泉玉液(珍珠奶茶)。" | |
try: | |
# Generate a random hash name for the image | |
random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) + '.jpg' | |
save_path = os.path.join('temp_images', random_name) | |
# Convert numpy.ndarray to PIL.Image | |
img = Image.fromarray(image) | |
# Resize image to height not exceeding 480 pixels while maintaining aspect ratio | |
img.thumbnail((img.width, 480)) | |
img.save(save_path) | |
except: | |
return "","",prefix + "道友你有什么物品要来鉴定吗?" | |
search_result, backup_results, image_feature = game_master.search_with_path(save_path, threshold = 0) | |
image_similarity = search_result['similarity'] | |
print("image_similarity:", image_similarity) | |
inbase_similarity_level = similarity2level(image_similarity, max_val=0.99, min_val=0.80) | |
suffix = "" | |
if image_similarity > game_master.minimal_image_threshold: | |
result = search_result | |
# remove the temp image | |
# 一般来说这个时候inbase_similarity_level是2或者3 | |
inlibrary_similarity_level = inbase_similarity_level | |
if "translated_word" in search_result: | |
translated_word = search_result["translated_word"] | |
else: | |
translated_word = search_result["name_in_cultivation"] | |
find_founder = founder_base.get_founder(translated_word) | |
if find_founder is None: | |
suffix = "这个物品很早就在天机阁中记载了,道友还有什么物品要鉴定吗?" | |
else: | |
if find_founder == founder_name: | |
suffix = "感谢道友送来这个物品进行鉴定,天机阁的天梯上已经记录了你的贡献。" | |
else: | |
suffix = f"这个物品是由道友{find_founder}发现的,道友还有什么物品要鉴定吗?" | |
else: | |
if len(backup_results) > 0 and "similarity" in backup_results[0]: | |
text_similarity = backup_results[0]['similarity'] | |
print("text_similarity:", text_similarity) | |
inlibrary_similarity_level = similarity2level(text_similarity, max_val=0.79, min_val=0.35) | |
result = backup_results[0] | |
# should call here | |
# expensive_generating(save_path, image_feature, backup_results, game_master) | |
task_queue.put((save_path, image_feature, backup_results, founder_name)) | |
suffix = "道友请移步天机阁内阁查询长老的鉴定结果。" | |
if is_empty_name( founder_name ): | |
suffix += "另外,只有留下姓名的道友,才能最终被记录在天梯获取天机阁奖励。" | |
# print(comments) | |
# update to recent items | |
# global recent_generated_items | |
# recent_generated_items.append((save_path, result["description_in_cultivation"])) | |
# Get the name and description from the first result | |
name = result["name_in_cultivation"] | |
description_in_cultivation = result["description_in_cultivation"] | |
comments = get_comments_from_level( inbase_similarity_level, inlibrary_similarity_level ) | |
# 将comments种的{name} format为name变量 | |
comments = prefix + comments.format(name=name) + suffix | |
# print(comments) | |
return name, description_in_cultivation, comments | |
# Function to refresh and display recent items | |
def refresh_recent_items(): | |
image_paths = [] | |
descriptions = [] | |
# for img_path, desc in recent_generated_items: | |
# loop in inverse order | |
for img_path, desc in reversed(recent_generated_items): | |
img = Image.open(img_path) | |
img.thumbnail((200, 200)) | |
image_paths.append(img) | |
descriptions.append(desc) | |
return image_paths + descriptions | |
# Prepare example images | |
example_images_dir = "datas/example_images" | |
example_images = [os.path.join(example_images_dir, img) for img in os.listdir(example_images_dir) if img.endswith('.jpg')] | |
TODO_list = """ | |
# TODO | |
- [ ] 增加天梯匹配系统 | |
- [ ] 增加readme | |
- [ ] 部署到gitee | |
""" | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# 这个系统是智障,错把日常当修仙\n\nby [李鲁鲁](https://github.com/LC1332)") | |
# Tab 1: Image processing and identification | |
with gr.Tab("鉴定"): | |
with gr.Row(): | |
with gr.Column(scale = 1): | |
image_input = gr.Image(label="上传图片") | |
with gr.Column(scale = 2): | |
comments = gr.Text("道友你有什么物品要来鉴定吗?", label = "") | |
name_output = gr.Textbox(label="鉴定物品名称") | |
description_output = gr.Textbox(label="物品描述") | |
submit_button = gr.Button("鉴定") | |
with gr.Row(): | |
founder_name = gr.Textbox(label="道友尊姓大名?", interactive=True) | |
image_input.upload(process_image, inputs=[image_input,founder_name], outputs=[name_output, description_output, comments]) | |
submit_button.click(process_image, inputs=[image_input,founder_name], outputs=[name_output, description_output, comments]) | |
gr.Examples(examples=example_images, inputs=image_input, label="选择一个示例图片") | |
# Tab 2: Recent items | |
with gr.Tab("天机阁最新鉴定"): | |
refresh_button = gr.Button("询问长老最新鉴定") | |
recent_images = [] | |
recent_descriptions = [] | |
for i in range(4): # Repeat for 4 rows, 3 columns each = 12 items | |
# with gr.Column(): | |
# Create a grid layout with 3 columns, each item having an image and a description below it | |
with gr.Row(): | |
with gr.Column(): | |
recent_image = gr.Image(label=f"物品 { i * 4 + 1}") | |
recent_description = gr.Textbox(label="描述", interactive=False) | |
recent_images.append(recent_image) | |
recent_descriptions.append(recent_description) | |
with gr.Column(): | |
recent_image = gr.Image(label=f"物品 { i * 4 + 2}") | |
recent_description = gr.Textbox(label="描述", interactive=False) | |
recent_images.append(recent_image) | |
recent_descriptions.append(recent_description) | |
with gr.Column(): | |
recent_image = gr.Image(label=f"物品 { i * 4 + 3}") | |
recent_description = gr.Textbox(label="描述", interactive=False) | |
recent_images.append(recent_image) | |
recent_descriptions.append(recent_description) | |
# On clicking the refresh button, update the recent items | |
refresh_button.click(refresh_recent_items, outputs=recent_images + recent_descriptions) | |
with gr.Tab("贡献天梯"): | |
refresh_ladder_button = gr.Button("刷新天梯") | |
contribution_textboxes = [gr.Textbox(label=f"贡献者 {i + 1}", interactive=False) for i in range(20)] | |
refresh_ladder_button.click(refresh_contribution_ladder, outputs=contribution_textboxes) | |
# Launch the demo | |
demo.launch(share=True) | |