File size: 4,128 Bytes
d6eb336
 
e2f2906
d6eb336
 
 
f5b91c0
 
 
 
 
 
 
 
 
 
 
 
 
 
d6eb336
 
 
 
ca95c8a
d6eb336
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca95c8a
d6eb336
 
 
 
 
 
 
 
 
 
 
 
 
 
ca95c8a
d6eb336
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from typing import Dict, Any
import os
from aiflows.base_flows import CircularFlow
from flow_modules.Tachi67.PlanWriterFlowModule import PlanWriterFlow

class ReplanningFlow(PlanWriterFlow):
    """This flow inherits from PlanWriterFlow.
     By changing prompts and injecting proper information to the controller and the PlanGenerator, we are able to achieve the replanning.

    *Input Interface*:
    - `goal` (str): information on the old plan (e.g. what is wrong)
    - `plan` (str): the old plan
    - `plan_file_location` (str): the location of the old plan file
    
    *Output Interface*:
    - `plan` (str): the new plan
    - `status`: "finished" or "unfinished"
    - `summary` (str): summary of the flow, will be written to the log file of the caller flow.
    - `result` (str): result of the flow, will be passed to the controller of the caller flow.
    """
    def _on_reach_max_round(self):
        self._state_update_dict({
            "plan": "The maximum amount of rounds was reached before the model generated the plan.",
            "status": "unfinished",
            "summary": "Replanning: The maximum amount of rounds was reached before the model generated the plan.",
        })


    @CircularFlow.output_msg_payload_processor
    def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
        command = output_payload["command"]
        if command == "finish":
            # ~~~ fetch temp file location, plan content, memory file (of upper level flow e.g. ExtLib) from flow state
            keys_to_fetch_from_state = ["temp_plan_file_location", "new_plan", "plan_file_location"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            temp_plan_file_location = fetched_state["temp_plan_file_location"]
            new_plan_content = fetched_state["new_plan"]
            plan_file_location = fetched_state["plan_file_location"]

            # ~~~ delete the temp plan file ~~~
            if os.path.exists(temp_plan_file_location):
                os.remove(temp_plan_file_location)

            # ~~~ write plan content to plan file ~~~
            with open(plan_file_location, 'w') as file:
                file.write(new_plan_content)

            # ~~~ return the plan content ~~~
            return {
                "EARLY_EXIT": True,
                "plan": new_plan_content,
                "result": "New plan was generated and has overriden the previous plan",
                "summary": "ReplanningFlow: " + output_payload["command_args"]["summary"] + f" new plan is written to {plan_file_location}",
                "status": "finished"
            }
        elif command == "manual_finish":
            # ~~~ delete the temp plan file ~~~
            keys_to_fetch_from_state = ["temp_plan_file_location"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            temp_plan_file_location = fetched_state["temp_plan_file_location"]
            if os.path.exists(temp_plan_file_location):
                os.remove(temp_plan_file_location)
            # ~~~ return the manual quit status ~~~
            return {
                "EARLY_EXIT": True,
                "new_plan": "no new plan was generated",
                "result": "The replanner was explicitly terminated by the user, no new plan was generated.",
                "summary": "ReplanningFlow: Replanner was terminated explicitly by the user, process is unfinished",
                "status": "unfinished"
            }
        elif command == "write_plan":
            keys_to_fetch_from_state = ["plan", "plan_file_location"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            old_plan = fetched_state["plan"]
            plan_file_location = fetched_state["plan_file_location"]
            output_payload["command_args"]["old_plan"] = old_plan
            output_payload["command_args"]["plan_file_location"] = plan_file_location
            return output_payload
        else:
            return output_payload