from typing import Dict, Any import os from aiflows.base_flows import CircularFlow from flow_modules.Tachi67.PlanWriterFlowModule import PlanWriterFlow class ReplanningFlow(PlanWriterFlow): """This flow inherits from PlanWriterFlow. By changing prompts and injecting proper information to the controller and the PlanGenerator, we are able to achieve the replanning. *Input Interface*: - `goal` (str): information on the old plan (e.g. what is wrong) - `plan` (str): the old plan - `plan_file_location` (str): the location of the old plan file *Output Interface*: - `plan` (str): the new plan - `status`: "finished" or "unfinished" - `summary` (str): summary of the flow, will be written to the log file of the caller flow. - `result` (str): result of the flow, will be passed to the controller of the caller flow. """ def _on_reach_max_round(self): self._state_update_dict({ "plan": "The maximum amount of rounds was reached before the model generated the plan.", "status": "unfinished", "summary": "Replanning: The maximum amount of rounds was reached before the model generated the plan.", }) @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: command = output_payload["command"] if command == "finish": # ~~~ fetch temp file location, plan content, memory file (of upper level flow e.g. ExtLib) from flow state keys_to_fetch_from_state = ["temp_plan_file_location", "new_plan", "plan_file_location"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) temp_plan_file_location = fetched_state["temp_plan_file_location"] new_plan_content = fetched_state["new_plan"] plan_file_location = fetched_state["plan_file_location"] # ~~~ delete the temp plan file ~~~ if os.path.exists(temp_plan_file_location): os.remove(temp_plan_file_location) # ~~~ write plan content to plan file ~~~ with open(plan_file_location, 'w') as file: file.write(new_plan_content) # ~~~ return the plan content ~~~ return { "EARLY_EXIT": True, "plan": new_plan_content, "result": "New plan was generated and has overriden the previous plan", "summary": "ReplanningFlow: " + output_payload["command_args"]["summary"] + f" new plan is written to {plan_file_location}", "status": "finished" } elif command == "manual_finish": # ~~~ delete the temp plan file ~~~ keys_to_fetch_from_state = ["temp_plan_file_location"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) temp_plan_file_location = fetched_state["temp_plan_file_location"] if os.path.exists(temp_plan_file_location): os.remove(temp_plan_file_location) # ~~~ return the manual quit status ~~~ return { "EARLY_EXIT": True, "new_plan": "no new plan was generated", "result": "The replanner was explicitly terminated by the user, no new plan was generated.", "summary": "ReplanningFlow: Replanner was terminated explicitly by the user, process is unfinished", "status": "unfinished" } elif command == "write_plan": keys_to_fetch_from_state = ["plan", "plan_file_location"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) old_plan = fetched_state["plan"] plan_file_location = fetched_state["plan_file_location"] output_payload["command_args"]["old_plan"] = old_plan output_payload["command_args"]["plan_file_location"] = plan_file_location return output_payload else: return output_payload