|
import json |
|
import re |
|
import traceback |
|
from concurrent.futures import ThreadPoolExecutor |
|
from abc import ABC, abstractmethod |
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
import gradio as gr |
|
|
|
import ai |
|
|
|
|
|
class Component(ABC): |
|
def __init__(self, id_: int): |
|
|
|
self._id = id_ |
|
self._source = self.__class__.__name__ |
|
self.vname: str |
|
|
|
|
|
self.component_id: gr.Number |
|
self.gr_component: Union[gr.Box, gr.Textbox] |
|
self.output: gr.Textbox |
|
self.visible: gr.Number |
|
|
|
def render(self) -> None: |
|
self.component_id = gr.Number(value=self._id, visible=False) |
|
self.visible = gr.Number(0, visible=False) |
|
self.gr_component = self._render() |
|
|
|
@abstractmethod |
|
def _render(self) -> Union[gr.Box, gr.Textbox]: |
|
... |
|
|
|
|
|
class Input(Component): |
|
vname = "v" |
|
|
|
def _render(self) -> gr.Textbox: |
|
self.output = gr.Textbox( |
|
label=f"Input: {{{self.vname}{self._id}}}", |
|
interactive=True, |
|
placeholder="Variable value", |
|
visible=False, |
|
) |
|
return self.output |
|
|
|
|
|
class TaskComponent(Component, ABC): |
|
vname = "t" |
|
|
|
def __init__(self, id_: int, value: str = "", visible: bool = False): |
|
super().__init__(id_) |
|
self._initial_value = value |
|
self._initial_visbility = visible |
|
self.name: str |
|
self.input: gr.Textbox |
|
|
|
def format_input(self, input: str, vars_in_scope: Dict[str, Any]) -> str: |
|
try: |
|
json.loads(input) |
|
return input |
|
except: |
|
input = input.strip() |
|
prompt_vars = [v for v in re.findall("{(.*)}", input)] |
|
undefined_vars = prompt_vars - vars_in_scope.keys() |
|
if len(undefined_vars) > 0: |
|
raise KeyError( |
|
f"The variables :: {undefined_vars} in task :: {self._id} are being used before being defined." |
|
) |
|
return input.format(**vars_in_scope) |
|
|
|
@property |
|
def n_inputs(self) -> int: |
|
return len(self.inputs) |
|
|
|
@property |
|
@abstractmethod |
|
def inputs(self) -> List[gr.Textbox]: |
|
... |
|
|
|
@abstractmethod |
|
def execute(self, *args, vars_in_scope: Dict[str, Any]): |
|
... |
|
|
|
|
|
class AITask(TaskComponent): |
|
name = "AI Task" |
|
|
|
def _render(self) -> gr.Box: |
|
with gr.Box(visible=self._initial_visbility) as gr_component: |
|
with gr.Row(): |
|
self.input = gr.Textbox( |
|
label="Instructions", |
|
lines=10, |
|
interactive=True, |
|
placeholder="What would you like ChatGPT to do?", |
|
value=self._initial_value, |
|
) |
|
self.output = gr.Textbox( |
|
label=f"Output: {{{self.vname}{self._id}}}", |
|
lines=10, |
|
interactive=True, |
|
) |
|
return gr_component |
|
|
|
@property |
|
def inputs(self) -> List[gr.Textbox]: |
|
return [self.input] |
|
|
|
def execute(self, prompt: str, vars_in_scope: Dict[str, Any]) -> Optional[str]: |
|
formatted_prompt = self.format_input(prompt, vars_in_scope) |
|
if formatted_prompt: |
|
return ai.llm.next([{"role": "user", "content": formatted_prompt}]) |
|
|
|
|
|
class CodeTask(TaskComponent): |
|
name = "Code Task" |
|
|
|
def __init__( |
|
self, id_: int, value: str = "", visible: bool = False, code_value: str = "" |
|
): |
|
super().__init__(id_, value, visible) |
|
self._initial_code_value = code_value |
|
|
|
def _render(self) -> gr.Box: |
|
with gr.Box(visible=self._initial_visbility) as gr_component: |
|
with gr.Row(): |
|
with gr.Column(): |
|
self.code_prompt = gr.Textbox( |
|
label="What would you like the code to do?", |
|
interactive=True, |
|
value=self._initial_code_value, |
|
lines=3, |
|
) |
|
generate_code = gr.Button("Generate code") |
|
with gr.Accordion(label="Generated code", open=False) as accordion: |
|
self.accordion = accordion |
|
self.raw_output = gr.Textbox( |
|
label="Raw output", |
|
lines=5, |
|
interactive=False, |
|
) |
|
self.packages = gr.Textbox( |
|
label="The following packages will be installed", |
|
interactive=True, |
|
) |
|
self.script = gr.Textbox( |
|
label="Code to be executed", |
|
lines=10, |
|
interactive=True, |
|
) |
|
self.error_message = gr.HighlightedText( |
|
value=None, visible=False |
|
) |
|
|
|
self.input = gr.Textbox( |
|
label="Input", interactive=True, value=self._initial_value |
|
) |
|
with gr.Column(): |
|
self.output = gr.Textbox( |
|
label=f"Output: {{{self.vname}{self._id}}}", |
|
lines=14, |
|
interactive=True, |
|
) |
|
|
|
generate_code.click( |
|
self.generate_code, |
|
inputs=[self.code_prompt], |
|
outputs=[ |
|
self.raw_output, |
|
self.packages, |
|
self.script, |
|
self.error_message, |
|
self.accordion, |
|
], |
|
) |
|
|
|
return gr_component |
|
|
|
@staticmethod |
|
def generate_code(code_prompt: str): |
|
raw_output = "" |
|
packages = "" |
|
script = "" |
|
error_message = gr.HighlightedText.update(None, visible=False) |
|
accordion = gr.Accordion.update() |
|
|
|
if not code_prompt: |
|
return ( |
|
raw_output, |
|
packages, |
|
script, |
|
error_message, |
|
accordion, |
|
) |
|
|
|
def llm_call(prompt): |
|
return ai.llm.next([{"role": "user", "content": prompt}], temperature=0) |
|
|
|
print(f"Generating code.") |
|
try: |
|
raw_output = llm_call( |
|
f"""Write a python function to: |
|
{code_prompt} |
|
|
|
Write the code for the function. Name the function toolkit. |
|
Use pip packages where available. |
|
Include the necessary imports. |
|
Instead of printing or saving to disk, the function should return the data.""" |
|
) |
|
with ThreadPoolExecutor() as executor: |
|
packages, script = tuple( |
|
executor.map( |
|
llm_call, |
|
[ |
|
f"""The following text has some python code: |
|
{raw_output} |
|
|
|
Find the pip packages that need to be installed and get their corresponsing names in pip. |
|
Package names in the imports and in pip might be different. Use the correct pip names. |
|
Include only the packages that need to be installed with pip. |
|
|
|
Put them in a valid JSON: |
|
``` |
|
{{ |
|
"packages": List of packages. If no packages, empty list. |
|
}} |
|
```""", |
|
f"""The following text has some python code: |
|
{raw_output} |
|
|
|
Extract it. Remove anything after the function definition.""", |
|
], |
|
) |
|
) |
|
for packages in re.findall("{.*}", packages, re.DOTALL): |
|
try: |
|
packages = json.loads(packages) |
|
packages = packages["packages"] |
|
except: |
|
print(packages) |
|
traceback.print_exc() |
|
packages = "ERROR" |
|
except Exception as e: |
|
traceback.print_exc() |
|
error_message = gr.HighlightedText.update( |
|
value=[(str(e), "ERROR")], visible=True |
|
) |
|
accordion = gr.Accordion.update(open=True) |
|
return ( |
|
raw_output, |
|
packages, |
|
script.replace("```python", "").replace("```", "").strip(), |
|
error_message, |
|
accordion, |
|
) |
|
|
|
@property |
|
def inputs(self) -> List[gr.Textbox]: |
|
return [self.packages, self.script, self.input] |
|
|
|
def execute( |
|
self, packages: str, script: str, input: str, vars_in_scope: Dict[str, Any] |
|
): |
|
if not script: |
|
return None |
|
script = script.strip() |
|
|
|
formatted_input = self.format_input(input, vars_in_scope) |
|
|
|
import inspect |
|
import subprocess |
|
import sys |
|
|
|
def run(toolkit_func): |
|
if len(inspect.getfullargspec(toolkit_func)[0]) > 0: |
|
if formatted_input: |
|
try: |
|
return toolkit_func(eval(formatted_input)) |
|
except: |
|
return toolkit_func(formatted_input) |
|
raise ValueError(f"Code for task :: {self._id} needs an input.") |
|
return toolkit_func() |
|
|
|
for p in eval(packages): |
|
subprocess.check_call([sys.executable, "-m", "pip", "install", p]) |
|
|
|
script = f"import os\nos.environ = {{}}\n\n{script}" |
|
exec(script, locals()) |
|
|
|
locals_ = locals() |
|
if "toolkit" in locals_: |
|
toolkit_func = locals_["toolkit"] |
|
return run(toolkit_func) |
|
for var in reversed(locals_.values()): |
|
|
|
if callable(var): |
|
try: |
|
return run(var) |
|
except: |
|
continue |
|
raise RuntimeError(f"Unable to run the code for task :: {self._id}") |
|
|
|
|
|
class Task(Component): |
|
available_tasks = [AITask, CodeTask] |
|
vname = "t" |
|
|
|
def __init__(self, id_: int): |
|
super().__init__(id_) |
|
self._inner_tasks = [t(id_) for t in self.available_tasks] |
|
self.gr_component: gr.Box |
|
|
|
def _render(self) -> gr.Box: |
|
with gr.Box(visible=False) as gr_component: |
|
self.active_index = gr.Dropdown( |
|
[AITask.name, CodeTask.name], |
|
label="Pick a new Task", |
|
type="index", |
|
) |
|
for t in self._inner_tasks: |
|
t.render() |
|
|
|
self.active_index.select( |
|
self.pick_task, |
|
inputs=[self.active_index], |
|
outputs=[t.gr_component for t in self._inner_tasks], |
|
) |
|
return gr_component |
|
|
|
@staticmethod |
|
def pick_task(idx: int) -> List[Dict]: |
|
update = [gr.Box.update(visible=False)] * len(Task.available_tasks) |
|
update[idx] = gr.Box.update(visible=True) |
|
return update |
|
|
|
@property |
|
def inputs(self) -> List[gr.Textbox]: |
|
return [i for t in self._inner_tasks for i in t.inputs] |
|
|
|
@property |
|
def outputs(self) -> List[gr.Textbox]: |
|
return [t.output for t in self._inner_tasks] |
|
|
|
@property |
|
def inner_n_inputs(self) -> List[int]: |
|
return [t.n_inputs for t in self._inner_tasks] |
|
|
|
def execute(self, active_index, *args, vars_in_scope: Dict[str, Any]): |
|
inner_task = self._inner_tasks[active_index] |
|
print(f"Executing {self._source}: {self._id}") |
|
return inner_task.execute(*args, vars_in_scope) |
|
|
|
|
|
MAX_TASKS = 10 |
|
|
|
all_tasks = {i: Task(i) for i in range(MAX_TASKS)} |
|
|
|
|
|
class Tasks: |
|
@classmethod |
|
def visibilities(cls) -> List[gr.Number]: |
|
return [t.visible for t in all_tasks.values()] |
|
|
|
@classmethod |
|
def active_indexes(cls) -> List[gr.Dropdown]: |
|
return [t.active_index for t in all_tasks.values()] |
|
|
|
@classmethod |
|
def gr_components(cls) -> List[gr.Box]: |
|
return [t.gr_component for t in all_tasks.values()] |
|
|