Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import random | |
import json | |
from datetime import datetime | |
from huggingface_hub import InferenceClient, cached_download, hf_hub_url | |
import gradio as gr | |
from safe_search import safe_search | |
from i_search import google, i_search as i_s | |
from agent import ( | |
ACTION_PROMPT, | |
ADD_PROMPT, | |
COMPRESS_HISTORY_PROMPT, | |
LOG_PROMPT, | |
LOG_RESPONSE, | |
MODIFY_PROMPT, | |
PREFIX, | |
SEARCH_QUERY, | |
READ_PROMPT, | |
TASK_PROMPT, | |
UNDERSTAND_TEST_RESULTS_PROMPT, | |
) | |
from utils import parse_action, parse_file_content, read_python_module_structure | |
class App: | |
def __init__(self): | |
self.app_state = {"components": []} | |
self.terminal_history = "" | |
self.components_registry = { | |
"Button": { | |
"properties": { | |
"label": "Click Me", | |
"onclick": "" | |
}, | |
"description": "A clickable button", | |
"code_snippet": "gr.Button(value='{{label}}', variant='primary')" | |
}, | |
"Text Input": { | |
"properties": { | |
"value": "", | |
"placeholder": "Enter text" | |
}, | |
"description": "A field for entering text", | |
"code_snippet": "gr.Textbox(label='{{placeholder}}')" | |
}, | |
"Image": { | |
"properties": { | |
"src": "#", | |
"alt": "Image" | |
}, | |
"description": "Displays an image", | |
"code_snippet": "gr.Image(label='{{alt}}')" | |
}, | |
"Dropdown": { | |
"properties": { | |
"choices": ["Option 1", "Option 2"], | |
"value": "" | |
}, | |
"description": "A dropdown menu for selecting options", | |
"code_snippet": "gr.Dropdown(choices={{choices}}, label='Dropdown')" | |
} | |
} | |
self.nlp_model_names = [ | |
"google/flan-t5-small", | |
"Qwen/CodeQwen1.5-7B-Chat-GGUF", | |
"bartowski/Codestral-22B-v0.1-GGUF", | |
"bartowski/AutoCoder-GGUF" | |
] | |
self.nlp_models = [] | |
self.initialize_nlp_models() | |
def initialize_nlp_models(self): | |
for nlp_model_name in self.nlp_model_names: | |
try: | |
cached_download(hf_hub_url(nlp_model_name, revision="main")) | |
self.nlp_models.append(InferenceClient(nlp_model_name)) | |
except: | |
self.nlp_models.append(None) | |
def get_nlp_response(self, input_text, model_index): | |
if self.nlp_models[model_index]: | |
response = self.nlp_models[model_index].text_generation(input_text) | |
return response.generated_text | |
else: | |
return "NLP model not available." | |
class Component: | |
def __init__(self, type, properties=None, id=None): | |
self.id = id or random.randint(1000, 9999) | |
self.type = type | |
self.properties = properties or self.components_registry[type]["properties"].copy() | |
def to_dict(self): | |
return { | |
"id": self.id, | |
"type": self.type, | |
"properties": self.properties, | |
} | |
def render(self): | |
if self.type == "Dropdown": | |
self.properties["choices"] = str(self.properties["choices"]).replace("[", "").replace("]", "").replace("'", "") | |
return self.components_registry[self.type]["code_snippet"].format(**self.properties) | |
def update_app_canvas(self): | |
components_html = "".join([f"<div>Component ID: {component['id']}, Type: {component['type']}, Properties: {component['properties']}</div>" for component in self.app_state["components"]]) | |
return components_html | |
def add_component(self, component_type): | |
if component_type in self.components_registry: | |
new_component = self.Component(component_type) | |
self.app_state["components"].append(new_component.to_dict()) | |
return ( | |
self.update_app_canvas(), | |
f"System: Added component: {component_type}\n", | |
) | |
else: | |
return None, f"Error: Invalid component type: {component_type}\n" | |
def run_terminal_command(self, command, history): | |
output = "" | |
try: | |
if command.startswith("add "): | |
component_type = command.split("add ")[1] | |
return self.add_component(component_type) | |
elif command.startswith("search "): | |
query = command.split("search ")[1] | |
return google(query) | |
elif command.startswith("i search "): | |
query = command.split("i search ")[1] | |
return i_s(query) | |
elif command.startswith("safe search "): | |
query = command.split("safesearch ")[1] | |
return safe_search(query) | |
elif command.startswith("read "): | |
file_path = command.split("read ")[1] | |
return parse_file_content(file_path) | |
elif command == "task": | |
return TASK_PROMPT | |
elif command == "modify": | |
return MODIFY_PROMPT | |
elif command == "log": | |
return LOG_PROMPT | |
elif command.startswith("understand test results "): | |
test_results = command.split("understand test results ")[1] | |
return self.understand_test_results(test_results) | |
elif command.startswith("compress history"): | |
return self.compress_history(history) | |
elif command == "help": | |
return self.get_help_message() | |
elif command == "exit": | |
exit() | |
else: | |
output = subprocess.check_output(command, shell=True).decode("utf-8") | |
except Exception as e: | |
output = str(e) | |
return output or "No output\n" | |
def compress_history(self, history): | |
compressed_history = "" | |
lines = history.strip().split("\n") | |
for line in lines: | |
if not line.strip().startswith("#"): | |
compressed_history += line + "\n" | |
return compressed_history | |
def understand_test_results(self, test_results): | |
# Logic to understand test results | |
return UNDERSTAND_TEST_RESULTS_PROMPT | |
def get_help_message(self): | |
return """ | |
Available commands: | |
- add [component_type]: Add a component to the app canvas | |
- search [query]: Perform a Google search | |
- i search [query]: Perform an intelligent search | |
- safe search [query]: Perform a safe search | |
- read [file_path]: Read and parse the content of a Python module | |
- task: Prompt for a task to perform | |
- modify: Prompt to modify a component property | |
- log: Prompt to log a response | |
- understand test results [test_results]: Understand test results | |
- compress history: Compress the terminal history by removing comments | |
- help: Show this help message | |
- exit: Exit the program | |
""" | |
def process_input(self, input_text): | |
if input_text.strip().startswith("/"): | |
command = input_text.strip().lstrip("/") | |
output = self.run_terminal_command(command, self.terminal_history) | |
self.terminal_history += f"{input_text}\n{output}\n" | |
return output | |
else: | |
model_index = random.randint(0, len(self.nlp_models)-1) | |
response = self.get_nlp_response(input_text, model_index) | |
component_id, action, property_name, property_value = parse_action(response) | |
if component_id: | |
component = next((comp for comp in self.app_state["components"] if comp["id"] == component_id), None) | |
if component: | |
if action == "update": | |
component["properties"][property_name] = property_value | |
return ( | |
self.update_app_canvas(), | |
f"System: Updated property '{property_name}' of component with ID {component_id}\n", | |
) | |
elif action == "remove": | |
self.app_state["components"].remove(component) | |
return ( | |
self.update_app_canvas(), | |
f"System: Removed component with ID {component_id}\n", | |
) | |
else: | |
return None, f"Error: Invalid action: {action}\n" | |
else: | |
return None, f"Error: Component with ID {component_id} not found\n" | |
else: | |
return None, f"Error: Failed to parse action from NLP response\n" | |
def run(self): | |
print("Welcome to the Python App Builder!") | |
print("Type 'help' to see the available commands.") | |
print("-" * 50) | |
try: | |
while True: | |
try: | |
input_text = input("Enter input: ") | |
except EOFError: | |
print("Error: Input reading interrupted. Please provide valid input.") | |
continue | |
output, system_message = self.process_input(input_text) | |
if output: | |
print(output) | |
if system_message: | |
print(system_message) | |
except KeyboardInterrupt: | |
print("\nApplication stopped by user.") | |
if __name__ == "__main__": | |
app = App() | |
app.run() |