File size: 4,211 Bytes
a35fa4d c4df480 a35fa4d c121218 a35fa4d c121218 a35fa4d c121218 a35fa4d 9f68e0d c121218 a35fa4d c121218 a35fa4d c121218 a35fa4d c4df480 78dfff8 c4df480 78dfff8 c4df480 78dfff8 c4df480 cffb5aa 71586bc fb97b78 71586bc fb97b78 71586bc fb97b78 71586bc c4df480 71586bc a35fa4d 78dfff8 c4df480 a35fa4d ef06a8c c4df480 71586bc c4df480 a35fa4d ef06a8c 71586bc ef06a8c 71586bc ef06a8c cffb5aa 31918dc ef06a8c 71586bc ef06a8c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
import re
import gradio as gr
from components import MAX_TASKS, all_tasks, Task
def add_task(*visibilities):
for i, visible in enumerate(visibilities, 1):
if not bool(visible):
return (
[gr.Box.update(visible=True)] * i
+ [gr.Box.update(visible=False)] * (MAX_TASKS - i)
+ [1] * i
+ [0] * (MAX_TASKS - i)
)
return [gr.Box.update()] * MAX_TASKS + [gr.Number.update()] * MAX_TASKS
def remove_task(*visibilities):
for i, visible in reversed(list(enumerate(visibilities))):
if bool(visible):
return (
[gr.Box.update(visible=True)] * i
+ [gr.Box.update(visible=False)] * (MAX_TASKS - i)
+ [1] * i
+ [0] * (MAX_TASKS - i)
)
return [gr.Box.update()] * MAX_TASKS + [gr.Number.update()] * MAX_TASKS
def execute_task(task_id: int, active_index: int, error_value, *args):
"""
Params:
- task_id: This will tell us which task to execute.
- active_index: The index of the actual task that is visible.
- prev_error_value: I carry around whether there is an error in the execution, to be displayed at the end.
- args: Other variables that will be decomposed.
"""
n_avail_tasks = len(Task.available_tasks)
# We need to return outputs for all tasks in the row.
outputs = [""] * n_avail_tasks
if (
active_index is None or error_value
): # Active index could be 0 == not active_index
return outputs + [
gr.HighlightedText.update(
value=error_value, visible=error_value is not None
)
]
task_id = int(task_id)
active_index = int(active_index)
inner_n_inputs = all_tasks[task_id].inner_n_inputs
start_inputs = 0
end_inputs = 0
end_all_inputs = sum(inner_n_inputs)
for i, n in enumerate(inner_n_inputs):
if i == active_index:
end_inputs = start_inputs + n
break
start_inputs += n
task_inputs = args[start_inputs:end_inputs]
prev_active_indexes = args[end_all_inputs : end_all_inputs + task_id]
prev_task_outputs = args[end_all_inputs + task_id :]
non_empty_inputs = [i for i in task_inputs if i]
if len(non_empty_inputs) < len(task_inputs):
return outputs + [
gr.HighlightedText.update(
value=[(f"Missing inputs for Task: {task_id}", "ERROR")],
visible=True,
)
]
vars_in_scope = {}
for i, prev_active_index in enumerate(prev_active_indexes):
vars_in_scope[f"{Task.vname}{i}"] = prev_task_outputs[
i * n_avail_tasks + int(prev_active_index)
]
# Get all variables referenced within the task input
prompt_vars = [v for ti in non_empty_inputs for v in re.findall("{(.*?)}", ti)]
# If there is an undefined variable referenced, HighlightedText will signal the error.
undefined_vars = prompt_vars - vars_in_scope.keys()
if len(undefined_vars) > 0:
outputs[active_index] = "ERROR"
return outputs + [
gr.HighlightedText.update(
value=[
(
f"The variables in Task :: {task_id} are being used before being defined :: {undefined_vars}. Please check your tasks.",
"ERROR",
)
],
visible=True,
)
]
try:
# Task logic gets inserted into the right index
outputs[active_index] = all_tasks[task_id].execute(
active_index, *non_empty_inputs, vars_in_scope=vars_in_scope
)
return outputs + [
gr.HighlightedText.update(
value=error_value, visible=error_value is not None
)
]
except Exception as e:
import traceback
print(traceback.format_tb(e.__traceback__))
outputs[active_index] = "ERROR"
return outputs + [
gr.HighlightedText.update(
value=[(f"Error in Task {task_id} :: {e}", "ERROR")],
visible=True,
)
]
|