import os import asyncio import gradio as gr from PIL import Image from dotenv import load_dotenv, find_dotenv from naptha_sdk.client.naptha import Naptha from naptha_sdk.user import generate_user from naptha_sdk.schemas import ModuleRun load_dotenv(find_dotenv()) async def run_module(module_name, prompt, selected_node): user, _ = generate_user(os.getenv("PRIVATE_KEY")) hub_url = os.getenv("HUB_URL") hub_username = os.getenv("HUB_USER") hub_password = os.getenv("HUB_PASS") # Use the selected node if selected_node == "node": node_url = 'http://node.naptha.ai:7001' elif selected_node == "node1": node_url = 'http://node1.naptha.ai:7001' else: return None, f"Error: Invalid node selection: {selected_node}" routing_url = os.getenv("ROUTING_URL", None) indirect_node_id = os.getenv("INDIRECT_NODE_ID", None) output_dir = './naptha_output' if module_name == 'generate_image': os.makedirs(output_dir, exist_ok=True) for f in os.listdir(output_dir): os.remove(os.path.join(output_dir, f)) naptha = await Naptha( user=user, hub_username=hub_username, hub_password=hub_password, hub_url=hub_url, node_url=node_url, routing_url=routing_url, indirect_node_id=indirect_node_id, ) module_run_input = { 'consumer_id': naptha.user["id"], "module_name": module_name, 'worker_nodes': None, "module_params": { "prompt": prompt }, } print(f"Running module {module_name} on {selected_node} with parameters: {module_run_input}") print("Checking user...") user = await naptha.node.check_user(user_input=naptha.user) if user["is_registered"]: print("Found user...", user) else: print("No user found. Registering user...") user = await naptha.node.register_user(user_input=user) print(f"User registered: {user}.") print("Running...") module_run = await naptha.node.run_task(module_run_input) if isinstance(module_run, dict): module_run = ModuleRun(**module_run) print(f"Module Run ID: {module_run.id}") current_results_len = 0 while True: module_run = await naptha.node.check_task(module_run) if isinstance(module_run, dict): module_run = ModuleRun(**module_run) output = f"{module_run.status} {module_run.module_type} {module_run.module_name}" if len(module_run.child_runs) > 0: output += f", task {len(module_run.child_runs)} {module_run.child_runs[-1].module_name} (node: {module_run.child_runs[-1].worker_nodes[0]})" print(output) if len(module_run.results) > current_results_len: print("Output: ", module_run.results[-1]) current_results_len += 1 if module_run.status in ['completed', 'error']: break await asyncio.sleep(3) if module_run.status == 'completed': results = module_run.results print(f"Results: {results}") if module_name == 'generate_image': folder_id = module_run.id.split(":")[1] image_path = f"{output_dir}/output.png" await naptha.node.read_storage(folder_id.strip(), output_dir, ipfs=None) if os.path.exists(image_path): return image_path, None else: return None, "Image generation failed or image not found." else: # chat return None, results[-1] if results else "No output generated." else: error_message = module_run.error_message print(f"Error: {error_message}") return None, f"Error: {error_message}" def gradio_wrapper(module, prompt, node): image_path, text = asyncio.run(run_module(module, prompt, node)) if image_path: return image_path, gr.update(visible=False) elif text: return gr.update(visible=False), text else: return gr.update(visible=False), gr.update(visible=False) with gr.Blocks() as iface: gr.Markdown("# Naptha Module Runner") gr.Markdown("Run either the 'generate_image' or 'chat' module using Naptha on selected node.") with gr.Row(): module = gr.Dropdown(["generate_image", "chat"], label="Module") node = gr.Dropdown(["node", "node1"], label="Node") prompt = gr.Textbox(label="Prompt") with gr.Row(): image_output = gr.Image(label="Generated Image") text_output = gr.Textbox(label="Chat Output") submit_btn = gr.Button("Run") submit_btn.click( fn=gradio_wrapper, inputs=[module, prompt, node], outputs=[image_output, text_output] ) iface.launch()