File size: 4,830 Bytes
256a159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
import json
import re
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from .base import BaseDataset
@LOAD_DATASET.register_module()
class GaokaoBenchDataset(BaseDataset):
@staticmethod
def load(path: str):
with open(path, encoding='utf-8') as f:
data = json.load(f)
return Dataset.from_list(data['example'])
valid_gaokao_bench_question_types = [
'single_choice', 'multi_choice', 'multi_question_choice',
'five_out_of_seven', 'cloze', 'subjective', 'correction'
]
class GaokaoBenchEvaluator(BaseEvaluator):
def __init__(self, question_type) -> None:
super().__init__()
assert question_type in valid_gaokao_bench_question_types
self.question_type = question_type
def do_predictions_postprocess(self, model_output, answer_lenth=None):
if self.question_type == 'single_choice':
model_answer = []
temp = re.findall(r'[A-D]', model_output[::-1])
if len(temp) != 0:
model_answer.append(temp[0])
elif self.question_type == 'multi_question_choice':
model_answer = []
temp = re.findall(r'【答案】\s*[::]*\s*[A-Z]', model_output)
if len(temp) == answer_lenth:
for t in temp:
model_answer.append(re.findall(r'[A-Z]', t)[0])
else:
temp = re.findall(r'[A-Z]', model_output)
if len(temp) > 0:
for k in range(min(len(temp), answer_lenth)):
model_answer.append(temp[k])
elif self.question_type == 'multi_choice':
model_answer = []
answer = ''
content = re.sub(r'\s+', '', model_output)
answer_index = content.find('【答案】')
if answer_index > 0:
temp = content[answer_index:]
if len(re.findall(r'[A-D]', temp)) > 0:
for t in re.findall(r'[A-D]', temp):
answer += t
else:
temp = content[-10:]
if len(re.findall(r'[A-D]', temp)) > 0:
for t in re.findall(r'[A-D]', temp):
answer += t
if len(answer) != 0:
model_answer.append(answer)
elif self.question_type == 'five_out_of_seven':
model_answer = []
temp = re.findall(r'[A-G]', model_output)
if len(temp) > 0:
for k in range(min(5, len(temp))):
model_answer.append(temp[k])
return model_answer
def ensure_same_length(self, pred, refr):
if len(pred) == len(refr):
return pred
return ['Z'] * len(refr)
def score(self, predictions, references):
if self.question_type not in [
'single_choice', 'multi_choice', 'multi_question_choice',
'five_out_of_seven'
]:
return {'score': 0}
elif self.question_type == 'multi_choice':
correct_score, total_score = 0, 0
for pred, refr in zip(predictions, references):
pred = self.do_predictions_postprocess(pred)
pred = self.ensure_same_length(pred, refr)
for p, r in zip(pred, refr):
if p == r:
correct_score += 2
else:
for i in p:
if i not in r:
break
else:
correct_score += 1
total_score += 2
return {'score': correct_score / total_score * 100}
else:
correct_score, total_score = 0, 0
for pred, refr in zip(predictions, references):
if self.question_type == 'multi_question_choice':
pred = self.do_predictions_postprocess(pred, len(refr))
else:
pred = self.do_predictions_postprocess(pred)
pred = self.ensure_same_length(pred, refr)
for p, r in zip(pred, refr):
if p == r:
correct_score += 1
total_score += 1
return {'score': correct_score / total_score * 100}
for question_type in valid_gaokao_bench_question_types:
# fix classic closure problem
def _gaokao_register(question_type):
ICL_EVALUATORS.register_module(
name='GaokaoBenchEvaluator' + '_' + question_type,
module=lambda *args, **kwargs: GaokaoBenchEvaluator(
question_type=question_type, *args, **kwargs))
_gaokao_register(question_type)
|