naptha-module / app.py
arshy's picture
initial commit
9f012c2
import os
import asyncio
import gradio as gr
from PIL import Image
from dotenv import load_dotenv, find_dotenv
from naptha_sdk.client.naptha import Naptha
from naptha_sdk.user import generate_user
from naptha_sdk.schemas import ModuleRun
load_dotenv(find_dotenv())
async def run_module(module_name, prompt, selected_node):
user, _ = generate_user(os.getenv("PRIVATE_KEY"))
hub_url = os.getenv("HUB_URL")
hub_username = os.getenv("HUB_USER")
hub_password = os.getenv("HUB_PASS")
# Use the selected node
if selected_node == "node":
node_url = 'http://node.naptha.ai:7001'
elif selected_node == "node1":
node_url = 'http://node1.naptha.ai:7001'
else:
return None, f"Error: Invalid node selection: {selected_node}"
routing_url = os.getenv("ROUTING_URL", None)
indirect_node_id = os.getenv("INDIRECT_NODE_ID", None)
output_dir = './naptha_output'
if module_name == 'generate_image':
os.makedirs(output_dir, exist_ok=True)
for f in os.listdir(output_dir):
os.remove(os.path.join(output_dir, f))
naptha = await Naptha(
user=user,
hub_username=hub_username,
hub_password=hub_password,
hub_url=hub_url,
node_url=node_url,
routing_url=routing_url,
indirect_node_id=indirect_node_id,
)
module_run_input = {
'consumer_id': naptha.user["id"],
"module_name": module_name,
'worker_nodes': None,
"module_params": {
"prompt": prompt
},
}
print(f"Running module {module_name} on {selected_node} with parameters: {module_run_input}")
print("Checking user...")
user = await naptha.node.check_user(user_input=naptha.user)
if user["is_registered"]:
print("Found user...", user)
else:
print("No user found. Registering user...")
user = await naptha.node.register_user(user_input=user)
print(f"User registered: {user}.")
print("Running...")
module_run = await naptha.node.run_task(module_run_input)
if isinstance(module_run, dict):
module_run = ModuleRun(**module_run)
print(f"Module Run ID: {module_run.id}")
current_results_len = 0
while True:
module_run = await naptha.node.check_task(module_run)
if isinstance(module_run, dict):
module_run = ModuleRun(**module_run)
output = f"{module_run.status} {module_run.module_type} {module_run.module_name}"
if len(module_run.child_runs) > 0:
output += f", task {len(module_run.child_runs)} {module_run.child_runs[-1].module_name} (node: {module_run.child_runs[-1].worker_nodes[0]})"
print(output)
if len(module_run.results) > current_results_len:
print("Output: ", module_run.results[-1])
current_results_len += 1
if module_run.status in ['completed', 'error']:
break
await asyncio.sleep(3)
if module_run.status == 'completed':
results = module_run.results
print(f"Results: {results}")
if module_name == 'generate_image':
folder_id = module_run.id.split(":")[1]
image_path = f"{output_dir}/output.png"
await naptha.node.read_storage(folder_id.strip(), output_dir, ipfs=None)
if os.path.exists(image_path):
return image_path, None
else:
return None, "Image generation failed or image not found."
else: # chat
return None, results[-1] if results else "No output generated."
else:
error_message = module_run.error_message
print(f"Error: {error_message}")
return None, f"Error: {error_message}"
def gradio_wrapper(module, prompt, node):
image_path, text = asyncio.run(run_module(module, prompt, node))
if image_path:
return image_path, gr.update(visible=False)
elif text:
return gr.update(visible=False), text
else:
return gr.update(visible=False), gr.update(visible=False)
with gr.Blocks() as iface:
gr.Markdown("# Naptha Module Runner")
gr.Markdown("Run either the 'generate_image' or 'chat' module using Naptha on selected node.")
with gr.Row():
module = gr.Dropdown(["generate_image", "chat"], label="Module")
node = gr.Dropdown(["node", "node1"], label="Node")
prompt = gr.Textbox(label="Prompt")
with gr.Row():
image_output = gr.Image(label="Generated Image")
text_output = gr.Textbox(label="Chat Output")
submit_btn = gr.Button("Run")
submit_btn.click(
fn=gradio_wrapper,
inputs=[module, prompt, node],
outputs=[image_output, text_output]
)
iface.launch()