import ast
import networkx as nx
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import gcp_dPrompts
def q2text(q, p=gcp_dPrompts):
number_of_colors = q.split('\n')[0].split()[-2] # last character of the first line
number_of_vertices = q.split('\n')[1].split(' ')[2] # third word of the second line
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(total_vertices=number_of_vertices, number_of_colors=number_of_colors) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format'] + '\n' + \
'\n The graph is below: \n'
for line in q.split('\n')[2:]:
vertex_list = line.split(' ')
this_line = 'Vertex {} is connected to vertex {}.'.format(
vertex_list[1], vertex_list[2])
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class cmp_GCP_D_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
for file_num in range(10):
with open(data_path + 'decision_data_GCP_{}.txt'.format(file_num)) as f:
data = f.read()
sample = data.split('\n\n')[:-1]
all_data += zip([file_num + 1] * len(sample), sample)
for (level, q) in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + q,
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class cmp_GCP_D_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
details = {}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = q.split('####\n')[-1]
try:
number_of_colors = int(q.split('\n')[0].split()[-2])
output, reasoning = self.parse_xml_to_dict(output)
output_dict['output'] = output
output_dict['correctness'], _ = self.gcp_decision_check(q, output, number_of_colors)
except Exception as e:
print(f'Attempt failed: {e}')
output_dict['correctness'] = False
output_dict['reasoning'] = reasoning
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
details[str(index)] = {'q': q, 'output': output, 'result': r}
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
result['details'] = details
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
assert '' in xml_string
assert '' in xml_string
assert '' in xml_string
assert '' in xml_string
final_answer_start = xml_string.index('') + len('')
final_answer_end = xml_string.index('')
reasoning_start = xml_string.index('') + len('')
reasoning_end = xml_string.index('')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
except Exception:
final_answer_element = ''
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def read_dimacs_format(self, dimacs_str):
lines = dimacs_str.strip().split('\n')
p_line = next(line for line in lines if line.startswith('p'))
_, _, num_vertices, num_edges = p_line.split()
num_vertices, num_edges = int(num_vertices), int(num_edges)
adjacency_list = {i: set() for i in range(1, num_vertices + 1)}
for line in lines:
if line.startswith('e'):
_, vertex1, vertex2 = line.split()
vertex1, vertex2 = int(vertex1), int(vertex2)
if vertex1 in adjacency_list and vertex2 in adjacency_list:
adjacency_list[vertex1].add(vertex2)
adjacency_list[vertex2].add(vertex1)
return num_vertices, adjacency_list
def gcp_greedy_solution(self, adjacency_list):
"""Provides a greedy solution to the GCP problem.
:param adjacency_list: A dictionary of the adjacency list.
:return: A tuple of (num_colors, coloring).
"""
G = nx.Graph()
G.add_nodes_from(adjacency_list.keys())
for vertex, neighbors in adjacency_list.items():
for neighbor in neighbors:
G.add_edge(vertex, neighbor)
coloring = nx.coloring.greedy_color(G, strategy='largest_first')
num_colors = max(coloring.values()) + 1
return num_colors, coloring
def gcp_decision_check(self, dimacs_str, answer, k_colors):
"""Check if the given GCP instance is feasible with k_colors.
:param dimacs_str: The DIMACS format string of the GCP instance.
:param answer: The answer returned by the model.
:param k_colors: The target number of colors.
:return: A tuple of (is_correct, message).
"""
num_vertices, adjacency_list = self.read_dimacs_format(dimacs_str)
try:
is_feasible = answer.get('Feasible', 'no').lower() == 'yes'
except Exception:
return False, 'Feasible key not found'
num_colors, coloring = self.gcp_greedy_solution(adjacency_list)
exist_optimal = num_colors <= k_colors
if is_feasible != exist_optimal:
if exist_optimal:
return False, f'Feasibility mismatch: {coloring}'
else:
return False, f'Feasibility mismatch: {is_feasible} vs {exist_optimal}'
return True, 'Feasible' if is_feasible else 'Infeasible'