|
from typing import Dict, Any |
|
import os |
|
from aiflows.base_flows import CircularFlow |
|
from flow_modules.Tachi67.PlanWriterFlowModule import PlanWriterFlow |
|
|
|
class ReplanningFlow(PlanWriterFlow): |
|
"""This flow inherits from PlanWriterFlow. |
|
By changing prompts and injecting proper information to the controller and the PlanGenerator, we are able to achieve the replanning. |
|
|
|
*Input Interface*: |
|
- `goal` (str): information on the old plan (e.g. what is wrong) |
|
- `plan` (str): the old plan |
|
- `plan_file_location` (str): the location of the old plan file |
|
|
|
*Output Interface*: |
|
- `plan` (str): the new plan |
|
- `status`: "finished" or "unfinished" |
|
- `summary` (str): summary of the flow, will be written to the log file of the caller flow. |
|
- `result` (str): result of the flow, will be passed to the controller of the caller flow. |
|
""" |
|
def _on_reach_max_round(self): |
|
self._state_update_dict({ |
|
"plan": "The maximum amount of rounds was reached before the model generated the plan.", |
|
"status": "unfinished", |
|
"summary": "Replanning: The maximum amount of rounds was reached before the model generated the plan.", |
|
}) |
|
|
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: |
|
command = output_payload["command"] |
|
if command == "finish": |
|
|
|
keys_to_fetch_from_state = ["temp_plan_file_location", "new_plan", "plan_file_location"] |
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
temp_plan_file_location = fetched_state["temp_plan_file_location"] |
|
new_plan_content = fetched_state["new_plan"] |
|
plan_file_location = fetched_state["plan_file_location"] |
|
|
|
|
|
if os.path.exists(temp_plan_file_location): |
|
os.remove(temp_plan_file_location) |
|
|
|
|
|
with open(plan_file_location, 'w') as file: |
|
file.write(new_plan_content) |
|
|
|
|
|
return { |
|
"EARLY_EXIT": True, |
|
"plan": new_plan_content, |
|
"result": "New plan was generated and has overriden the previous plan", |
|
"summary": "ReplanningFlow: " + output_payload["command_args"]["summary"] + f" new plan is written to {plan_file_location}", |
|
"status": "finished" |
|
} |
|
elif command == "manual_finish": |
|
|
|
keys_to_fetch_from_state = ["temp_plan_file_location"] |
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
temp_plan_file_location = fetched_state["temp_plan_file_location"] |
|
if os.path.exists(temp_plan_file_location): |
|
os.remove(temp_plan_file_location) |
|
|
|
return { |
|
"EARLY_EXIT": True, |
|
"new_plan": "no new plan was generated", |
|
"result": "The replanner was explicitly terminated by the user, no new plan was generated.", |
|
"summary": "ReplanningFlow: Replanner was terminated explicitly by the user, process is unfinished", |
|
"status": "unfinished" |
|
} |
|
elif command == "write_plan": |
|
keys_to_fetch_from_state = ["plan", "plan_file_location"] |
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
old_plan = fetched_state["plan"] |
|
plan_file_location = fetched_state["plan_file_location"] |
|
output_payload["command_args"]["old_plan"] = old_plan |
|
output_payload["command_args"]["plan_file_location"] = plan_file_location |
|
return output_payload |
|
else: |
|
return output_payload |
|
|