toolkit / components.py
lgaleana's picture
Test
e919533
raw
history blame
11.6 kB
import json
import re
from concurrent.futures import ThreadPoolExecutor
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
import gradio as gr
import ai
class Component(ABC):
def __init__(self, id_: int):
# Internal state
self._id = id_
self._source = self.__class__.__name__
self.vname: str
# Gradio state
self.component_id: gr.Number
self.gr_component: Union[gr.Box, gr.Textbox]
self.output: gr.Textbox
self.visible: gr.Number
def render(self) -> None:
self.component_id = gr.Number(value=self._id, visible=False)
self.visible = gr.Number(0, visible=False)
self.gr_component = self._render()
@abstractmethod
def _render(self) -> Union[gr.Box, gr.Textbox]:
...
class Input(Component):
vname = "v"
def _render(self) -> gr.Textbox:
self.output = gr.Textbox(
label=f"Input: {{{self.vname}{self._id}}}",
interactive=True,
placeholder="Variable value",
visible=False,
)
return self.output
class TaskComponent(Component, ABC):
vname = "t"
def __init__(self, id_: int, value: str = "", visible: bool = False):
super().__init__(id_)
self._initial_value = value
self._initial_visbility = visible
self.name: str
self.input: gr.Textbox
def format_input(self, input: str, vars_in_scope: Dict[str, Any]) -> str:
try:
return json.loads(input)
except:
input = input.strip()
prompt_vars = [v for v in re.findall("{(.*)}", input)]
undefined_vars = prompt_vars - vars_in_scope.keys()
if len(undefined_vars) > 0:
raise KeyError(
f"The variables :: {undefined_vars} are being used before being defined."
)
formatted_input = input.format(**vars_in_scope)
try:
return eval(formatted_input)
except:
return formatted_input
@property
def n_inputs(self) -> int:
return len(self.inputs)
@property
@abstractmethod
def inputs(self) -> List[gr.Textbox]:
...
@abstractmethod
def execute(self, *args, vars_in_scope: Dict[str, Any]):
...
class AITask(TaskComponent):
name = "AI Task"
def _render(self) -> gr.Box:
with gr.Box(visible=self._initial_visbility) as gr_component:
with gr.Row():
self.input = gr.Textbox(
label="Instructions",
lines=10,
interactive=True,
placeholder="What would you like ChatGPT to do?",
value=self._initial_value,
)
self.output = gr.Textbox(
label=f"Output: {{{self.vname}{self._id}}}",
lines=10,
interactive=True,
)
return gr_component
@property
def inputs(self) -> List[gr.Textbox]:
return [self.input]
def execute(self, prompt: str, vars_in_scope: Dict[str, Any]) -> Optional[str]:
formatted_prompt = self.format_input(prompt, vars_in_scope)
if formatted_prompt:
return ai.llm.next([{"role": "user", "content": formatted_prompt}])
class CodeTask(TaskComponent):
name = "Code Task"
def __init__(
self, id_: int, value: str = "", visible: bool = False, code_value: str = ""
):
super().__init__(id_, value, visible)
self._initial_code_value = code_value
def _render(self) -> gr.Column:
with gr.Column(visible=self._initial_visbility) as gr_component:
self.code_prompt = gr.Textbox(
label="What would you like to do?",
interactive=True,
value=self._initial_code_value,
)
generate_code = gr.Button("Generate code")
with gr.Row():
with gr.Column():
with gr.Accordion(label="Generated code", open=False) as accordion:
self.accordion = accordion
self.raw_output = gr.Textbox(
label="Raw output",
lines=5,
interactive=False,
)
self.packages = gr.Textbox(
label="The following packages will be installed",
interactive=True,
)
self.script = gr.Textbox(
label="Code to be executed",
lines=10,
interactive=True,
)
self.error_message = gr.HighlightedText(
value=None, visible=False
)
self.input = gr.Textbox(
label="Input", interactive=True, value=self._initial_value
)
with gr.Column():
self.output = gr.Textbox(
label=f"Output: {{{self.vname}{self._id}}}",
lines=10,
interactive=True,
)
generate_code.click(
self.generate_code,
inputs=[self.code_prompt],
outputs=[
self.raw_output,
self.packages,
self.script,
self.error_message,
self.accordion,
],
)
return gr_component
@staticmethod
def generate_code(code_prompt: str):
import traceback
raw_output = ""
packages = ""
script = ""
error_message = gr.HighlightedText.update(None, visible=False)
accordion = gr.Accordion.update()
if not code_prompt:
return (
raw_output,
packages,
script,
error_message,
accordion,
)
def llm_call(prompt):
return ai.llm.next([{"role": "user", "content": prompt}], temperature=0)
print(f"Generating code.")
try:
raw_output = llm_call(
f"""{code_prompt}
Write one python function for the request above.
Use pip packages where available.
For example, if you wanted to make a google search, use the googlesearch-python package instead of scraping google.
Include the necessary imports.
Instead of printing or saving to disk, the function should return the data."""
)
with ThreadPoolExecutor() as executor:
packages, script = tuple(
executor.map(
llm_call,
[
f"""The following text has some python code:
{raw_output}
Find the pip packages that need to be installed and get their corresponsing names in pip.
Package names in the imports and in pip might be different. Use the correct pip names.
Include only the packages that need to be installed with pip.
Put them in a valid JSON:
```
{{
"packages": Python list to be used with eval(). If no packages, empty list.
}}
```""",
f"""The following text has some python code:
{raw_output}
Extract it. Remove anything after the function definition.""",
],
)
)
for packages in re.findall("{.*}", packages, re.DOTALL):
try:
packages = json.loads(packages)
packages = packages["packages"]
except:
print(packages)
traceback.print_exc()
packages = "ERROR"
except Exception as e:
traceback.print_exc()
error_message = gr.HighlightedText.update(
value=[(str(e), "ERROR")], visible=True
)
accordion = gr.Accordion.update(open=True)
return (
raw_output,
packages,
script.replace("```python", "").replace("```", "").strip(),
error_message,
accordion,
)
@property
def inputs(self) -> List[gr.Textbox]:
return [self.packages, self.script, self.input]
def execute(
self, packages: str, function: str, input: str, vars_in_scope: Dict[str, Any]
):
if not function:
return None
function = function.strip()
import inspect
import subprocess
import sys
for p in eval(packages):
print(f"Installing")
subprocess.check_call([sys.executable, "-m", "pip", "install", p])
function = f"import os\nos.environ = {{}}\n\n{function}"
exec(function, locals())
# Looking for the last defined function
for var in reversed(locals().values()):
if callable(var):
self._toolkit_func = var
break
if len(inspect.getfullargspec(self._toolkit_func)[0]) > 0:
formatted_input = self.format_input(input, vars_in_scope)
if formatted_input:
return self._toolkit_func(formatted_input)
return None
return self._toolkit_func()
class Task(Component):
available_tasks = [AITask, CodeTask]
vname = "t"
def __init__(self, id_: int):
super().__init__(id_)
self._inner_tasks = [t(id_) for t in self.available_tasks]
self.gr_component: gr.Box
def _render(self) -> gr.Box:
with gr.Box(visible=False) as gr_component:
self.active_index = gr.Dropdown(
[AITask.name, CodeTask.name],
label="Pick a new Task",
type="index",
)
for t in self._inner_tasks:
t.render()
self.active_index.select(
self.pick_task,
inputs=[self.active_index],
outputs=[t.gr_component for t in self._inner_tasks],
)
return gr_component
@staticmethod
def pick_task(idx: int) -> List[Dict]:
update = [gr.Box.update(visible=False)] * len(Task.available_tasks)
update[idx] = gr.Box.update(visible=True)
return update
@property
def inputs(self) -> List[gr.Textbox]:
return [i for t in self._inner_tasks for i in t.inputs]
@property
def outputs(self) -> List[gr.Textbox]:
return [t.output for t in self._inner_tasks]
@property
def inner_n_inputs(self) -> List[int]:
return [t.n_inputs for t in self._inner_tasks]
def execute(self, active_index, *args, vars_in_scope: Dict[str, Any]):
inner_task = self._inner_tasks[active_index]
print(f"Executing {self._source}: {self._id}")
return inner_task.execute(*args, vars_in_scope)
MAX_TASKS = 10
all_tasks = {i: Task(i) for i in range(MAX_TASKS)}
class Tasks:
@classmethod
def visibilities(cls) -> List[gr.Number]:
return [t.visible for t in all_tasks.values()]
@classmethod
def active_indexes(cls) -> List[gr.Dropdown]:
return [t.active_index for t in all_tasks.values()]
@classmethod
def gr_components(cls) -> List[gr.Box]:
return [t.gr_component for t in all_tasks.values()]