ReplanningFlowModule / ReplanningFlow.py
Tachi67's picture
Update ReplanningFlow.py
f5b91c0
from typing import Dict, Any
import os
from aiflows.base_flows import CircularFlow
from flow_modules.Tachi67.PlanWriterFlowModule import PlanWriterFlow
class ReplanningFlow(PlanWriterFlow):
"""This flow inherits from PlanWriterFlow.
By changing prompts and injecting proper information to the controller and the PlanGenerator, we are able to achieve the replanning.
*Input Interface*:
- `goal` (str): information on the old plan (e.g. what is wrong)
- `plan` (str): the old plan
- `plan_file_location` (str): the location of the old plan file
*Output Interface*:
- `plan` (str): the new plan
- `status`: "finished" or "unfinished"
- `summary` (str): summary of the flow, will be written to the log file of the caller flow.
- `result` (str): result of the flow, will be passed to the controller of the caller flow.
"""
def _on_reach_max_round(self):
self._state_update_dict({
"plan": "The maximum amount of rounds was reached before the model generated the plan.",
"status": "unfinished",
"summary": "Replanning: The maximum amount of rounds was reached before the model generated the plan.",
})
@CircularFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
command = output_payload["command"]
if command == "finish":
# ~~~ fetch temp file location, plan content, memory file (of upper level flow e.g. ExtLib) from flow state
keys_to_fetch_from_state = ["temp_plan_file_location", "new_plan", "plan_file_location"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_plan_file_location = fetched_state["temp_plan_file_location"]
new_plan_content = fetched_state["new_plan"]
plan_file_location = fetched_state["plan_file_location"]
# ~~~ delete the temp plan file ~~~
if os.path.exists(temp_plan_file_location):
os.remove(temp_plan_file_location)
# ~~~ write plan content to plan file ~~~
with open(plan_file_location, 'w') as file:
file.write(new_plan_content)
# ~~~ return the plan content ~~~
return {
"EARLY_EXIT": True,
"plan": new_plan_content,
"result": "New plan was generated and has overriden the previous plan",
"summary": "ReplanningFlow: " + output_payload["command_args"]["summary"] + f" new plan is written to {plan_file_location}",
"status": "finished"
}
elif command == "manual_finish":
# ~~~ delete the temp plan file ~~~
keys_to_fetch_from_state = ["temp_plan_file_location"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_plan_file_location = fetched_state["temp_plan_file_location"]
if os.path.exists(temp_plan_file_location):
os.remove(temp_plan_file_location)
# ~~~ return the manual quit status ~~~
return {
"EARLY_EXIT": True,
"new_plan": "no new plan was generated",
"result": "The replanner was explicitly terminated by the user, no new plan was generated.",
"summary": "ReplanningFlow: Replanner was terminated explicitly by the user, process is unfinished",
"status": "unfinished"
}
elif command == "write_plan":
keys_to_fetch_from_state = ["plan", "plan_file_location"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
old_plan = fetched_state["plan"]
plan_file_location = fetched_state["plan_file_location"]
output_payload["command_args"]["old_plan"] = old_plan
output_payload["command_args"]["plan_file_location"] = plan_file_location
return output_payload
else:
return output_payload