import gradio as gr import os import random import string from PIL import Image from src.GameMaster import GameMaster from collections import deque import time import threading import queue from src.Founder import Founder import os # os.environ['HTTP_PROXY'] = 'http://localhost:8234' # os.environ['HTTPS_PROXY'] = 'http://localhost:8234' game_master = GameMaster() founder_base = Founder() blank_image_path = "datas/blank_item.jpg" random_data = game_master.random_image_text_data( 12 ) recent_generated_items = deque(random_data , maxlen=12) def refresh_contribution_ladder(): top_contributors = founder_base.get_top_rank(top_k=20) contributions = [] for founder_name, items in top_contributors: random_items = random.sample(items, min(5, len(items))) cultivation_names = [] for item in random_items: result = game_master.remote.search_by_en_keyword(item) if result is not None and "name_in_cultivation" in result: cultivation_names.append(result['name_in_cultivation']) else: cultivation_names.append(item) items_description = ', '.join(cultivation_names) contribution = f"""道友姓名: {founder_name}\n发现了{items_description}等物品""" contributions.append(contribution) while len(contributions) < 20: contributions.append("") return contributions def expensive_generating(save_path, image_feature, backup_results, game_master, founder_name = None): # for now it's a idle implementation for debug # re-search incase redundant generate image_search_result = game_master.remote.top_k_search(image_feature, top_k=1) if image_search_result and len(image_search_result)>0 and image_search_result[0]['similarity'] > game_master.minimal_image_threshold: return global recent_generated_items recent_generated_items.append((save_path, "天机阁长老正在鉴定,道友请耐心等待。。")) search_result, backup_results, image_feature = game_master.search_with_path(save_path) if search_result is None: # real implementation should be cultivation_data = game_master.generate_cultivation_data( \ save_path, image_feature, backup_results ) description = cultivation_data["description_in_cultivation"] print("鉴定得到新物品:", description) else: cultivation_data = search_result description = cultivation_data["description_in_cultivation"] print("鉴定这是记载过的物品,但是第一次发现:", description) suffix = "" if founder_name is not None: global founder_base if "translated_word" in cultivation_data: translated_word = cultivation_data["translated_word"] else: translated_word = cultivation_data["name_in_cultivation"] if founder_base.get_founder(translated_word) is None: if founder_name.strip() != "": founder_base.set_founder(translated_word, founder_name) suffix = f" 由道友 {founder_name} 发现" else: suffix = f" 由道友 {founder_base.get_founder(translated_word)} 发现" # this function will automatically update the database # cultivation_data = backup_results[0] # recent_generated_items.append((save_path, description)) # replace the same save_path in recent_generated_items flag = True for index, item in enumerate(recent_generated_items): if item[0] == save_path: if "name_in_cultivation" in cultivation_data: description = cultivation_data["name_in_cultivation"] + suffix + "--" + description recent_generated_items[index] = (save_path, description) flag = False if flag: if "name_in_cultivation" in cultivation_data: description = cultivation_data["name_in_cultivation"] + suffix + "--" + description recent_generated_items.append((save_path, description)) return ### # Queue for managing tasks to process expensive operations task_queue = queue.Queue() # Function that processes the tasks in the queue def worker(): while True: save_path, image_feature, backup_results, founder_base = task_queue.get() if save_path is None: break expensive_generating(save_path, image_feature, backup_results, game_master, founder_base) task_queue.task_done() # Start the worker thread thread = threading.Thread(target=worker, daemon=True) thread.start() ### def similarity2level( sim, max_val , min_val ): level_num = 3 level = int( (sim - min_val) / (max_val - min_val) * level_num ) level = max(0, min(level_num, level)) return level from src.get_comments_from_level import get_comments_from_level def is_empty_name( founder_name ): empty_names = ["","测试","鲁鲁道祖"] if founder_name.strip() in empty_names: return True # Function to handle image upload and search def process_image(image, founder_name): # Ensure temp_images directory exists os.makedirs('temp_images', exist_ok=True) prefix = "" founder_name = founder_name.strip() if is_empty_name(founder_name): prefix = "道友,请在下面留下您的尊姓大名,提供大量珍宝鉴定的道友,天机阁将在后面送出灵泉玉液(珍珠奶茶)。" try: # Generate a random hash name for the image random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) + '.jpg' save_path = os.path.join('temp_images', random_name) # Convert numpy.ndarray to PIL.Image img = Image.fromarray(image) # Resize image to height not exceeding 480 pixels while maintaining aspect ratio img.thumbnail((img.width, 480)) img.save(save_path) except: return "","",prefix + "道友你有什么物品要来鉴定吗?" search_result, backup_results, image_feature = game_master.search_with_path(save_path, threshold = 0) image_similarity = search_result['similarity'] print("image_similarity:", image_similarity) inbase_similarity_level = similarity2level(image_similarity, max_val=0.99, min_val=0.80) suffix = "" if image_similarity > game_master.minimal_image_threshold: result = search_result # remove the temp image # 一般来说这个时候inbase_similarity_level是2或者3 inlibrary_similarity_level = inbase_similarity_level if "translated_word" in search_result: translated_word = search_result["translated_word"] else: translated_word = search_result["name_in_cultivation"] find_founder = founder_base.get_founder(translated_word) if find_founder is None: suffix = "这个物品很早就在天机阁中记载了,道友还有什么物品要鉴定吗?" else: if find_founder == founder_name: suffix = "感谢道友送来这个物品进行鉴定,天机阁的天梯上已经记录了你的贡献。" else: suffix = f"这个物品是由道友{find_founder}发现的,道友还有什么物品要鉴定吗?" else: if len(backup_results) > 0 and "similarity" in backup_results[0]: text_similarity = backup_results[0]['similarity'] print("text_similarity:", text_similarity) inlibrary_similarity_level = similarity2level(text_similarity, max_val=0.79, min_val=0.35) result = backup_results[0] # should call here # expensive_generating(save_path, image_feature, backup_results, game_master) task_queue.put((save_path, image_feature, backup_results, founder_name)) suffix = "道友请移步天机阁内阁查询长老的鉴定结果。" if is_empty_name( founder_name ): suffix += "另外,只有留下姓名的道友,才能最终被记录在天梯获取天机阁奖励。" # print(comments) # update to recent items # global recent_generated_items # recent_generated_items.append((save_path, result["description_in_cultivation"])) # Get the name and description from the first result name = result["name_in_cultivation"] description_in_cultivation = result["description_in_cultivation"] comments = get_comments_from_level( inbase_similarity_level, inlibrary_similarity_level ) # 将comments种的{name} format为name变量 comments = prefix + comments.format(name=name) + suffix # print(comments) return name, description_in_cultivation, comments # Function to refresh and display recent items def refresh_recent_items(): image_paths = [] descriptions = [] # for img_path, desc in recent_generated_items: # loop in inverse order for img_path, desc in reversed(recent_generated_items): img = Image.open(img_path) img.thumbnail((200, 200)) image_paths.append(img) descriptions.append(desc) return image_paths + descriptions # Prepare example images example_images_dir = "datas/example_images" example_images = [os.path.join(example_images_dir, img) for img in os.listdir(example_images_dir) if img.endswith('.jpg')] TODO_list = """ # TODO - [ ] 增加天梯匹配系统 - [ ] 增加readme - [ ] 部署到gitee """ # Gradio Interface with gr.Blocks() as demo: gr.Markdown("# 这个系统是智障,错把日常当修仙\n\nby [李鲁鲁](https://github.com/LC1332)") # Tab 1: Image processing and identification with gr.Tab("鉴定"): with gr.Row(): with gr.Column(scale = 1): image_input = gr.Image(label="上传图片") with gr.Column(scale = 2): comments = gr.Text("道友你有什么物品要来鉴定吗?", label = "") name_output = gr.Textbox(label="鉴定物品名称") description_output = gr.Textbox(label="物品描述") submit_button = gr.Button("鉴定") with gr.Row(): founder_name = gr.Textbox(label="道友尊姓大名?", interactive=True) image_input.upload(process_image, inputs=[image_input,founder_name], outputs=[name_output, description_output, comments]) submit_button.click(process_image, inputs=[image_input,founder_name], outputs=[name_output, description_output, comments]) gr.Examples(examples=example_images, inputs=image_input, label="选择一个示例图片") # Tab 2: Recent items with gr.Tab("天机阁最新鉴定"): refresh_button = gr.Button("询问长老最新鉴定") recent_images = [] recent_descriptions = [] for i in range(4): # Repeat for 4 rows, 3 columns each = 12 items # with gr.Column(): # Create a grid layout with 3 columns, each item having an image and a description below it with gr.Row(): with gr.Column(): recent_image = gr.Image(label=f"物品 { i * 4 + 1}") recent_description = gr.Textbox(label="描述", interactive=False) recent_images.append(recent_image) recent_descriptions.append(recent_description) with gr.Column(): recent_image = gr.Image(label=f"物品 { i * 4 + 2}") recent_description = gr.Textbox(label="描述", interactive=False) recent_images.append(recent_image) recent_descriptions.append(recent_description) with gr.Column(): recent_image = gr.Image(label=f"物品 { i * 4 + 3}") recent_description = gr.Textbox(label="描述", interactive=False) recent_images.append(recent_image) recent_descriptions.append(recent_description) # On clicking the refresh button, update the recent items refresh_button.click(refresh_recent_items, outputs=recent_images + recent_descriptions) with gr.Tab("贡献天梯"): refresh_ladder_button = gr.Button("刷新天梯") contribution_textboxes = [gr.Textbox(label=f"贡献者 {i + 1}", interactive=False) for i in range(20)] refresh_ladder_button.click(refresh_contribution_ladder, outputs=contribution_textboxes) # Launch the demo demo.launch(share=True)