File size: 4,323 Bytes
a35fa4d
9f68e0d
a35fa4d
 
 
c4df480
a35fa4d
 
9f68e0d
 
 
 
 
 
 
 
 
0905f7c
9f68e0d
c4df480
9f68e0d
 
 
 
 
 
a35fa4d
c4df480
 
 
 
9f68e0d
 
 
 
 
a35fa4d
c4df480
 
 
 
 
a35fa4d
 
 
9f68e0d
0905f7c
9f68e0d
c4df480
9f68e0d
 
 
 
 
 
a35fa4d
9f68e0d
 
 
 
a35fa4d
0905f7c
 
 
 
 
 
a35fa4d
 
c4df480
78dfff8
 
c4df480
 
78dfff8
c4df480
78dfff8
c4df480
 
 
 
 
 
 
 
 
 
4bd8f86
c4df480
 
 
 
 
 
 
 
 
 
 
 
 
a35fa4d
78dfff8
c4df480
a35fa4d
c4df480
 
 
 
 
 
 
 
 
 
 
a35fa4d
c4df480
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import re
from typing import List

import gradio as gr

from components import MAX_TASKS, all_tasks, Task


def _is_task_row_fully_invisible(row: List[int]) -> bool:
    for visible in row:
        if bool(visible):
            return False
    return True


def add_task(index, *visibility):
    visibility = list(visibility)
    n_avail_tasks = len(Task.available_tasks)

    for i in range(MAX_TASKS):
        start_row = i * n_avail_tasks
        is_row_invisible = _is_task_row_fully_invisible(
            visibility[start_row : start_row + n_avail_tasks]
        )
        if is_row_invisible:
            unchanged_up_to = start_row + index
            return (
                [gr.Number.update()] * i
                + [index]
                + [gr.Number.update()] * (MAX_TASKS - i - 1)
                + [gr.Box.update()] * unchanged_up_to
                + [gr.Box.update(visible=True)]
                + [gr.Box.update()] * (len(visibility) - unchanged_up_to - 1)
                + [gr.Number.update()] * unchanged_up_to
                + [1]
                + [gr.Number.update()] * (len(visibility) - unchanged_up_to - 1)
            )
    return (
        [gr.Number.update()] * MAX_TASKS
        + [gr.Box.update()] * len(visibility)
        + [gr.Number.update()] * len(visibility)
    )


def remove_task(*visibility):
    visibility = list(visibility)
    n_avail_tasks = len(Task.available_tasks)

    for i in range(MAX_TASKS):
        start_row = i * n_avail_tasks
        is_row_invisible = _is_task_row_fully_invisible(
            visibility[start_row : start_row + n_avail_tasks]
        )
        if is_row_invisible:
            unchanged_up_to = start_row - n_avail_tasks
            return (
                [gr.Box.update()] * unchanged_up_to
                + [gr.Box.update(visible=False)] * (len(visibility) - unchanged_up_to)
                + [gr.Number.update()] * unchanged_up_to
                + [0] * (len(visibility) - unchanged_up_to)
            )
    return (
        [gr.Box.update()] * (len(visibility) - n_avail_tasks)
        + [gr.Box.update(visible=False)] * n_avail_tasks
        + [gr.Number.update()] * (len(visibility) - n_avail_tasks)
        + [0] * (len(visibility) - n_avail_tasks)
    )


def execute_task(task_id: int, active_index: int, error_value, *args):
    """
    Params:
        - task_id: This will tell us which task to execute.
        - active_index: The index of the actual task that is visible.
        - prev_error_value: I carry around whether there is an error in the execution, to be displayed at the end.
        - args: Other variables that will be decomposed.
    """
    task_id = int(task_id)
    active_index = int(active_index)
    n_avail_tasks = len(Task.available_tasks)

    task_input = args[:n_avail_tasks][active_index]
    prev_active_indexes = args[n_avail_tasks : n_avail_tasks + task_id]
    prev_task_outputs = args[n_avail_tasks + task_id :]

    error_update = gr.HighlightedText.update(
        value=error_value, visible=error_value is not None
    )
    # We need to return outputs for all tasks in the row.
    outputs = [""] * n_avail_tasks

    if not task_input:
        return outputs + [error_update]

    vars_in_scope = {}
    for i, prev_active_index in enumerate(prev_active_indexes):
        vars_in_scope[f"{Task.vname}{i}"] = prev_task_outputs[
            i * n_avail_tasks + int(prev_active_index)
        ]
    # Get all variables referenced within the task input
    prompt_vars = re.findall("{(.*?)}", task_input)

    # If there is an undefined variable referenced, HighlightedText will signal the error.
    undefined_vars = prompt_vars - vars_in_scope.keys()
    if len(undefined_vars) > 0:
        return outputs + [
            gr.HighlightedText.update(
                value=[
                    (
                        f"The following variables are being used before being defined :: {undefined_vars}. Please check your tasks.",
                        "ERROR",
                    )
                ],
                visible=True,
            )
        ]

    formatted_input = task_input.format(**vars_in_scope)
    # Task logic gets inserted into the right index
    outputs[active_index] = all_tasks[task_id].execute(active_index, formatted_input)
    return outputs + [error_update]