|
import ast |
|
import json |
|
|
|
import networkx as nx |
|
from datasets import Dataset |
|
|
|
from opencompass.openicl.icl_evaluator import BaseEvaluator |
|
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET |
|
|
|
from ..base import BaseDataset |
|
from .prompts import sppPrompts |
|
|
|
|
|
def q2text(q, p=sppPrompts): |
|
|
|
|
|
|
|
start_node = q['nodes'][0] |
|
end_node = q['nodes'][-1] |
|
edges = q['edges'] |
|
prompt_text = p['Intro'] + '\n' + \ |
|
p['Initial_question'].format(start_node=start_node, end_node=end_node) + '\n' + \ |
|
p['Output_content'] + '\n' + \ |
|
p['Output_format'] + \ |
|
"\n The graph's edges and weights are as follows: \n" |
|
for edge in edges: |
|
this_line = f"Edge from {edge['from']} to {edge['to']} has a weight of {edge['weight']}." |
|
prompt_text += this_line + '\n' |
|
return prompt_text |
|
|
|
|
|
@LOAD_DATASET.register_module(force=True) |
|
class p_SPP_Dataset(BaseDataset): |
|
|
|
@staticmethod |
|
def load(path: str): |
|
raw_data = [] |
|
data_path = path |
|
all_data = [] |
|
with open(data_path + 'spp_instances.json', 'r') as f: |
|
data = json.load(f) |
|
all_data = zip([int(d['complexity_level']) for d in data], data) |
|
for level, q in all_data: |
|
prompt = q2text(q) |
|
raw_data.append({ |
|
'prompt': prompt, |
|
'q': str(level) + '####\n' + json.dumps(q), |
|
'level': level |
|
}) |
|
dataset = Dataset.from_list(raw_data) |
|
return dataset |
|
|
|
|
|
@ICL_EVALUATORS.register_module(force=True) |
|
class p_SPP_Evaluator(BaseEvaluator): |
|
|
|
def score(self, predictions, references): |
|
assert len(predictions) == len(references) |
|
|
|
result = {'pass': 0, 'fail': 0} |
|
for index, (q, output) in enumerate(zip(references, predictions)): |
|
output_dict = {} |
|
level = int(q.split('####\n')[0]) |
|
q = json.loads(q.split('####\n')[-1]) |
|
output, reasoning = self.parse_xml_to_dict(output) |
|
output_dict['output'] = output |
|
try: |
|
output_dict['correctness'], _ = self.spp_check(q, output) |
|
except Exception as e: |
|
print(f'Check failed: {e}') |
|
output_dict['correctness'] = False |
|
output_dict['reasoning'] = reasoning |
|
output_dict['level'] = level |
|
|
|
if output_dict['correctness']: |
|
r = 'pass' |
|
else: |
|
r = 'fail' |
|
result[r] += level |
|
|
|
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100 |
|
final_result = {'Weighted Accuracy': result['score']} |
|
return final_result |
|
|
|
def parse_xml_to_dict(self, xml_string): |
|
try: |
|
assert '<final_answer>' in xml_string |
|
assert '</final_answer>' in xml_string |
|
|
|
|
|
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>') |
|
final_answer_end = xml_string.index('</final_answer>') |
|
|
|
|
|
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip() |
|
assert '{' in final_answer_element |
|
assert '}' in final_answer_element |
|
dic_start = final_answer_element.index('{') |
|
dic_end = final_answer_element.index('}') |
|
final_answer_element = final_answer_element[dic_start:dic_end + 1].rstrip().strip().rstrip() |
|
|
|
try: |
|
final_answer_element = ast.literal_eval(final_answer_element) |
|
reasoning_element = xml_string |
|
except Exception: |
|
final_answer_element = '' |
|
reasoning_element = xml_string |
|
except Exception: |
|
final_answer_element = '' |
|
reasoning_element = '' |
|
|
|
return final_answer_element, reasoning_element |
|
|
|
def ssp_optimal_solution(self, instance, source, target): |
|
"""Provides the optimal solution for the SSP instance. |
|
|
|
:param instance: The SSP instance as a dictionary with 'nodes' and 'edges'. |
|
:param source: The source node. |
|
:param target: The destination node. |
|
:return: The optimal shortest path length and path. |
|
""" |
|
G = nx.Graph() |
|
G.add_nodes_from(instance['nodes']) |
|
G.add_weighted_edges_from([(edge['from'], edge['to'], edge['weight']) |
|
for edge in instance['edges']]) |
|
shortest_path_length = None |
|
shortest_path = None |
|
if nx.has_path(G, source=source, target=target): |
|
shortest_path_length = nx.shortest_path_length(G, source=source, target=target, weight='weight') |
|
shortest_path = nx.shortest_path(G, source=source, target=target, weight='weight') |
|
return shortest_path_length, shortest_path |
|
|
|
|
|
def spp_check(self, instance, solution, start_node=None, end_node=None): |
|
"""Validate the solution of the SPP problem. |
|
|
|
:param instance: The instance dictionary with nodes and edges. |
|
:param solution: The solution dictionary with the path and total distance. |
|
:param start_node: The start node. |
|
:param end_node: The end node. |
|
:return: A tuple of (is_correct, message). |
|
""" |
|
|
|
|
|
if start_node is None: |
|
start_node = instance['nodes'][0] |
|
if end_node is None: |
|
end_node = instance['nodes'][-1] |
|
|
|
|
|
try: |
|
path_string = solution.get('Path', '') |
|
cost_string = solution.get('TotalDistance', '') |
|
except Exception: |
|
return False, 'The solution is not a dictionary.' |
|
|
|
|
|
ssp_optimal_length, ssp_optimal_path = self.ssp_optimal_solution( |
|
instance, start_node, end_node) |
|
if ssp_optimal_length is None: |
|
if isinstance(cost_string, int) or cost_string.isdigit(): |
|
return False, f'No path between from node {start_node} to node {end_node}.' |
|
else: |
|
return True, 'No path found from node {start_node} to node {end_node}.' |
|
|
|
try: |
|
path = list(map(int, path_string.split('->'))) |
|
total_cost = int(cost_string) |
|
except Exception: |
|
return False, 'The solution is not a valid dictionary.' |
|
|
|
|
|
if not path or path[0] != start_node or path[-1] != end_node: |
|
return False, 'The path does not start or end at the correct nodes.' |
|
|
|
|
|
calculated_cost = 0 |
|
is_in_edge = lambda edge, from_node, to_node: (edge['from'] == from_node and edge['to'] == to_node) or (edge['from'] == to_node and edge['to'] == from_node) |
|
for i in range(len(path) - 1): |
|
from_node, to_node = path[i], path[i + 1] |
|
edge = next((edge for edge in instance['edges'] if is_in_edge(edge, from_node, to_node)), None) |
|
|
|
if not edge: |
|
return False, f'No edge found from node {from_node} to node {to_node}.' |
|
|
|
calculated_cost += edge['weight'] |
|
|
|
|
|
if calculated_cost != total_cost: |
|
return False, f'The calculated cost ({calculated_cost}) does not match the provided total cost ({total_cost}).' |
|
|
|
if calculated_cost != ssp_optimal_length: |
|
|
|
return False, f'The calculated cost ({calculated_cost}) does not match the optimal solution ({ssp_optimal_length}): {ssp_optimal_path}.' |
|
|
|
return True, 'The solution is valid.' |
|
|