import os import hydra from aiflows.backends.api_info import ApiInfo from aiflows.messages import InputMessage from aiflows.utils.general_helpers import read_yaml_file from aiflows import logging from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching # clear_cache() # Uncomment this line to clear the cache logging.set_verbosity_debug() logging.auto_set_dir() dependencies = [ {"url": "Tachi67/ReplanningFlowModule", "revision": "main"}, {"url": "Tachi67/PlanGeneratorFlowModule", "revision": "main"}, {"url": "Tachi67/InteractivePlanGenFlowModule", "revision": "main"}, {"url": "Tachi67/PlanWriterFlowModule", "revision": "main"}, {"url": "Tachi67/PlanFileEditFlowModule", "revision": "main"}, {"url": "Tachi67/ParseFeedbackFlowModule", "revision": "main"}, {"url": "aiflows/HumanStandardInputFlowModule", "revision": "4ff043522c89a964ea3a928ce09811c51a2b5b98"} ] from aiflows import flow_verse flow_verse.sync_dependencies(dependencies) if __name__ == "__main__": # ~~~ make sure to set the openai api key in the envs ~~~ key = os.getenv("OPENAI_API_KEY") api_information = [ApiInfo(backend_used="openai", api_key=os.getenv("OPENAI_API_KEY"))] path_to_output_file = None # ~~~ setting api information into config ~~~ current_dir = os.getcwd() cfg_path = os.path.join(current_dir, "ReplanningFlow.yaml") cfg = read_yaml_file(cfg_path) cfg["subflows_config"]["Controller"]["backend"]["api_infos"] = api_information cfg["subflows_config"]["Executor"]["subflows_config"]["write_plan"]["subflows_config"]["PlanGenerator"]["backend"]["api_infos"] = api_information # ~~~ instantiating the flow and input data ~~~ ReplanningFlow = hydra.utils.instantiate(cfg, _recursive_=False, _convert_="partial") # ~~~ creating the plan file location (of the upper level flow e.g. ExtLib) plan_file_location = os.path.join(current_dir, "ExtLib_plan.txt") with open(plan_file_location, 'w') as file: pass input_data = { "goal": "data source OpenWeatherMap is deprecated", "plan": "1. Write a function that fetches today's weather at Lausanne from OpenWeatherMap. \n 2. Write a funtion that prints the fetched data in a formatted way.", "plan_file_location": plan_file_location } input_message = InputMessage.build( data_dict=input_data, src_flow="Launcher", dst_flow=ReplanningFlow.name ) # ~~~ calling the flow ~~~ output_message = ReplanningFlow(input_message)