Spaces:
Running
on
Zero
Running
on
Zero
from typing import List | |
import json | |
TOOL_SYSTEM_PROMPT_RUBRA = ( | |
"You have access to the following tools: {tool_text}\n" | |
"You can choose to respond with one or more tool calls at once, or with a chat message back to the user. " | |
"Ensure you have all necessary details before making tool calls. If additional information is needed, " | |
"ask the user appropriately. Any tool call you make must correspond to the functions listed above.\n" | |
"If you decide to call a tool, format it like this: " | |
'starttoolcall{{"name": "<function_name>", "arguments": {{"<arg1_name>": "<arg1_value>", "<arg2_name>": "<arg2_value>", ...}}}}endtoolcall ' | |
"where the JSON wrapped between starttoolcall and endtoolcall represents the function call.\n" | |
) | |
def json_schema_to_typescript_type(schema, param_name): | |
ts_type = "any" # default type | |
enum_comment = "" | |
integer_comment = "" | |
description_comment = "" | |
if isinstance(schema, dict) and "type" in schema: | |
json_type = schema["type"] | |
if json_type == "array": | |
item_type = ( | |
"any" | |
if "items" not in schema | |
else json_schema_to_typescript_type(schema["items"], param_name)[0] | |
) | |
ts_type = f"{item_type}[]" | |
elif json_type == "number": | |
ts_type = "number" | |
elif json_type == "integer": | |
ts_type = ( | |
"number" # TypeScript doesn't differentiate between number and integer | |
) | |
integer_comment = f" * @param {param_name} - Integer" | |
elif json_type == "object": | |
ts_type, _ = generate_typescript_interface(schema, param_name) | |
elif json_type == "boolean": | |
ts_type = "boolean" | |
elif json_type == "null": | |
ts_type = "null" | |
elif json_type == "string": | |
ts_type = "string" | |
if "enum" in schema: | |
enum_comment = f" * @enum {param_name} - Possible values: " + ", ".join( | |
[f'"{enum_value}"' for enum_value in schema["enum"]] | |
) | |
ts_type = "string" | |
if "description" in schema: | |
description_comment = f' * @param {param_name} - {schema["description"]}' | |
# Return only the type for nested objects to avoid duplicating comments | |
if isinstance(schema, dict) and schema.get("type") == "object": | |
return ts_type, "", "", "" | |
return ts_type, enum_comment, integer_comment, description_comment | |
def generate_typescript_interface(schema, interface_name): | |
properties = schema.get("properties", {}) | |
required = schema.get("required", []) | |
interface_body = [] | |
descriptions = [] | |
for prop_name, prop_schema in properties.items(): | |
prop_type, enum_comment, integer_comment, description_comment = ( | |
json_schema_to_typescript_type(prop_schema, prop_name) | |
) | |
is_optional = prop_name not in required | |
interface_body.append( | |
f' {prop_name}{"?" if is_optional else ""}: {prop_type};' | |
) | |
if description_comment: | |
descriptions.append(description_comment) | |
if enum_comment: | |
descriptions.append(enum_comment) | |
if integer_comment: | |
descriptions.append(integer_comment) | |
comments = "\n".join(descriptions) | |
interface_definition = ( | |
f"interface {interface_name} {{\n" + "\n".join(interface_body) + "\n}" | |
) | |
return interface_definition, comments | |
def convert_parameters_list_to_dict(parameters): | |
properties = {} | |
required = [] | |
for param in parameters: | |
properties[param["name"]] = param | |
if "default" not in param: | |
required.append(param["name"]) | |
return {"properties": properties, "required": required} | |
def generate_typescript_function(function_schema) -> str: | |
func_name = function_schema["name"] | |
description = function_schema.get("description", "") | |
# Check if parameters is a list and convert if necessary | |
parameters_info = function_schema.get("parameters", {}) | |
if isinstance(parameters_info, list): | |
parameters_info = convert_parameters_list_to_dict(parameters_info) | |
if parameters_info is None: | |
parameters_info = {} | |
parameters_schema = parameters_info.get("properties", {}) | |
required_params = parameters_info.get("required", []) | |
args_list = [] | |
comments_list = [] | |
interfaces = [] | |
for param_name, param_schema in parameters_schema.items(): | |
ts_type, enum_comment, integer_comment, description_comment = ( | |
json_schema_to_typescript_type(param_schema, param_name) | |
) | |
if ts_type.startswith("interface"): | |
interface_definition, nested_comments = generate_typescript_interface( | |
param_schema, f"{func_name}_{param_name.capitalize()}Params" | |
) | |
interfaces.append(interface_definition) | |
comments_list.append(nested_comments) | |
ts_type = f"{func_name}_{param_name.capitalize()}Params" | |
else: | |
if description_comment: | |
comments_list.append(description_comment) | |
if enum_comment: | |
comments_list.append(enum_comment) | |
if integer_comment: | |
comments_list.append(integer_comment) | |
is_optional = param_name not in required_params | |
args_list.append(f'{param_name}{"?" if is_optional else ""}: {ts_type}') | |
args_str = ", ".join(args_list) | |
comments_str = "\n".join(comments_list) | |
interfaces_str = "\n\n".join(interfaces) | |
description_comment = f" * {description}\n" if description else "" | |
typescript_func_declaration = ( | |
"/**\n" | |
+ description_comment | |
+ (comments_str + "\n" if comments_str else "") | |
+ " */\n" | |
+ (interfaces_str + "\n\n" if interfaces_str else "") | |
+ f"function {func_name}({args_str}): any {{}}" | |
) | |
return typescript_func_declaration | |
def format_tools(tools: List[dict]) -> str: | |
func_defs = [] | |
for t in tools: | |
tool_schema = t["function"] if "function" in t else t | |
func_defs.append(generate_typescript_function(tool_schema)) | |
typescript_functions_str = "\n\n".join(func_defs) | |
res = TOOL_SYSTEM_PROMPT_RUBRA.format(tool_text=typescript_functions_str) | |
return res | |
def preprocess_input(msgs: List[dict], tools: List[dict]): | |
tool_system_prompt = format_tools(tools) | |
processed_msgs = process_messages(msgs, tool_system_prompt) | |
return processed_msgs | |
def process_messages(messages: List[dict], function_str: str): | |
func_observation_map = {} | |
processed_msg = [] | |
for i in range(len(messages)): | |
if messages[i]["role"] != "tool" and len(func_observation_map) > 0: | |
# func_observation_array = [f'{k}: {func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map] | |
func_observation_array = [f'{func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map] | |
observation_str = json.dumps(func_observation_array) | |
observation_call = {"role": "user", "content": "start observation " + observation_str + " end observation"} | |
processed_msg.append(observation_call) | |
func_observation_map.clear() | |
if i == 0: | |
if messages[0]["role"] == "system": | |
old_content = messages[0]["content"] | |
sys_msg = {"role": "system", "content": old_content + "\n" + function_str} | |
processed_msg.append(sys_msg) | |
else: | |
# Insert a system message of tool definition before the first message | |
sys_msg = {"role": "system", "content": "You are a helpful assistant.\n" + function_str} | |
processed_msg.append(sys_msg) | |
processed_msg.append(messages[0]) # first message is always either system or user msg | |
elif messages[i]["role"] == "assistant" and "tool_calls" in messages[i]: | |
# Convert OpenAI function call format to Rubra format | |
tool_call_str = construct_tool_call_str(messages[i]["tool_calls"], func_observation_map) | |
function_call = {"role": "assistant", "content": tool_call_str} | |
processed_msg.append(function_call) | |
elif messages[i]["role"] == "tool": | |
tool_call_id = messages[i]["tool_call_id"] | |
if tool_call_id in func_observation_map: | |
func_observation_map[tool_call_id] = messages[i]["content"] | |
else: | |
print(func_observation_map) | |
print(f"Tool call id not found in the map: {tool_call_id}") | |
# TODO: the input is not valid in this case, should return an error | |
else: | |
processed_msg.append(messages[i]) | |
if len(func_observation_map) > 0: | |
# func_observation_array = [f'{k}: {func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map] | |
func_observation_array = [f'{func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map] | |
observation_str = json.dumps(func_observation_array) | |
observation_call = {"role": "user", "content": "start observation " + observation_str + " end observation"} | |
processed_msg.append(observation_call) | |
func_observation_map.clear() | |
return processed_msg | |
def construct_tool_call_str(tool_calls, func_observation_map) -> str: | |
tool_list = [] | |
for tool_call in tool_calls: | |
tool_call_id = tool_call["id"] | |
func_observation_map[tool_call_id] = "" # Initialize with empty value, updated later from the message with tool role | |
if type(tool_call["function"]["arguments"]) == str: | |
tool_call["function"]["arguments"] = json.loads(tool_call["function"]["arguments"]) | |
tool_list.append("starttoolcall"+str(tool_call["function"]) + "endtoolcall") | |
# Converting the Python dictionary to a YAML formatted string | |
tool_call_str = "".join(tool_list) | |
return tool_call_str | |
if __name__ == "__main__": | |
tools = [{ | |
"type": "function", | |
"function": { | |
"name": "dummy", | |
"description": "just to say hi", | |
"parameters": None, | |
} | |
},{"type": "function","function":{"name":"calculate_distance","description":"Calculate the distance between two locations","parameters":{"type":"object","properties":{"origin":{"type":"string","description":"The starting location"},"destination":{"type":"string","description":"The destination location"},"mode":{"type":"string","description":"The mode of transportation"}},"required":["origin","destination","mode"]}}},{"type": "function","function":{"name":"generate_password","description":"Generate a random password","parameters":{"type":"object","properties":{"length":{"type":"integer","description":"The length of the password"}},"required":["length"]}}}] | |
# msgs = [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': 'What is the distance between San Francisco and Cupertino by driving and by air from both directions?'}, {'role': 'assistant', 'tool_calls': [{'id': '0', 'function': {'name': 'calculate_distance', 'arguments': '{"origin":"San Francisco","destination":"Cupertino","mode":"drive"}'}, 'type': 'function'},{'id': '1', 'function': {'name': 'calculate_distance', 'arguments': '{"origin":"San Francisco","destination":"Cupertino","mode":"air"}'}, 'type': 'function'}]}, {'role': 'tool', 'tool_call_id': '0', 'name': 'calculate_distance', 'content': 'Distance is 50 miles.'}, {'role': 'tool', 'tool_call_id': '1', 'name': 'calculate_distance', 'content': ''}] | |
msgs = [{'role': 'user', 'content': "\nYou are task oriented system.\nYou receive input from a user, process the input from the given instructions, and then output the result.\nYour objective is to provide consistent and correct results.\nYou do not need to explain the steps taken, only provide the result to the given instructions.\nYou are referred to as a tool.\nYou don't move to the next step until you have a result.\n\nDownload https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip to a\nrandom file. Then expand the archive to a temporary location as there is a sqlite\ndatabase in it.\n\nFirst inspect the schema of the database to understand the table structure.\n\nForm and run a SQL query to find the artist with the most number of albums and output\nthe result of that.\n\nWhen done remove the database file and the downloaded content."}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': 'cdf7eb2e', 'type': 'function', 'function': {'name': 'download', 'arguments': '{"url":"https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"}'}}]}, {'role': 'tool', 'content': '/tmp/gpt-download2815959664.zip', 'name': 'download', 'tool_call_id': 'cdf7eb2e'}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': '07cdee86', 'type': 'function', 'function': {'name': 'exec', 'arguments': '{"command":"unzip /tmp/gpt-download2815959664.zip","directory":"."}'}}]}, {'role': 'tool', 'content': 'Archive: /tmp/gpt-download2815959664.zip\n inflating: chinook.db \n', 'name': 'exec', 'tool_call_id': '07cdee86'}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': 'e8042027', 'type': 'function', 'function': {'name': 'download', 'arguments': '{"url":"https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"}'}}]}, {'role': 'tool', 'content': '/tmp/gpt-download3736408077.zip', 'name': 'download', 'tool_call_id': 'e8042027'}] | |
new_msgs = preprocess_input(msgs, tools) | |
print(json.dumps(new_msgs, indent=2)) | |