|
import copy |
|
import csv |
|
import json |
|
import os |
|
from typing import List |
|
|
|
from datasets import Dataset |
|
|
|
from opencompass.datasets.circular import (CircularDatasetMeta, |
|
CircularEvaluator) |
|
from opencompass.openicl.icl_evaluator import AccEvaluator, BaseEvaluator |
|
from opencompass.openicl.icl_inferencer import GenInferencer, PPLInferencer |
|
from opencompass.openicl.icl_prompt_template import PromptTemplate |
|
from opencompass.openicl.icl_retriever import ZeroRetriever |
|
from opencompass.registry import LOAD_DATASET |
|
|
|
from .base import BaseDataset |
|
|
|
|
|
class OptionSimAccEvaluator(BaseEvaluator): |
|
|
|
def __init__(self, options) -> None: |
|
super().__init__() |
|
if not all((isinstance(i, str) and i.isupper() and len(i) == 1) |
|
for i in options): |
|
raise ValueError( |
|
f'Each options should be single upper letter, got {options}') |
|
|
|
self.options = options |
|
|
|
def match_any_label(self, pred, test_item): |
|
from rapidfuzz.distance import Levenshtein as L |
|
|
|
from opencompass.utils.text_postprocessors import \ |
|
first_option_postprocess |
|
|
|
pred = pred.strip() |
|
if any([pred == i for i in self.options]): |
|
parsed = pred |
|
else: |
|
parsed = '' |
|
if parsed == '': |
|
parsed = first_option_postprocess(pred, |
|
''.join(self.options), |
|
cushion=False) |
|
if parsed == '': |
|
possible_options = [] |
|
for opt in self.options: |
|
opt_str = test_item[opt] |
|
if opt_str is not None and opt_str.lower() in pred.lower(): |
|
possible_options.append(opt) |
|
if len(possible_options) == 1: |
|
parsed = possible_options[0] |
|
if parsed == '': |
|
dists = [] |
|
for opt in self.options: |
|
opt_str = test_item[opt] |
|
if opt_str is None: |
|
continue |
|
cands = [opt, opt_str, opt + '. ' + opt_str] |
|
d = min(L.distance(pred, cand) for cand in cands) |
|
dists.append((d, opt)) |
|
if len(dists) > 0: |
|
parsed = min(dists)[1] |
|
return parsed |
|
|
|
def score(self, predictions: List, references: List, test_set) -> dict: |
|
assert len(predictions) == len(references) |
|
|
|
num_correct, num_total = 0, 0 |
|
details = {} |
|
for index in range(len(predictions)): |
|
pred = predictions[index] |
|
refr = references[index] |
|
parsed = self.match_any_label(pred, test_set[index]) |
|
num_correct += 1 if parsed == refr else 0 |
|
num_total += 1 |
|
details[str(index)] = {} |
|
details[str(index)]['pred'] = pred |
|
details[str(index)]['parsed'] = parsed |
|
details[str(index)]['refr'] = refr |
|
details[str(index)]['correct'] = parsed == refr |
|
return {'accuracy': num_correct / num_total * 100, 'details': details} |
|
|
|
|
|
|
|
class CircularOptionSimAccEvaluator(OptionSimAccEvaluator): |
|
|
|
def __init__(self, options, circular_pattern='circular'): |
|
super().__init__(options) |
|
self.circular_pattern = circular_pattern |
|
|
|
def score(self, predictions, references, test_set): |
|
from opencompass.datasets.circular import (get_all_possible_patterns, |
|
get_circular_patterns, |
|
get_origin_patterns) |
|
|
|
circular_patterns = {} |
|
circular_patterns['origin'] = get_origin_patterns( |
|
test_set[0]['circular_pattern']) |
|
circular_patterns['circular'] = get_circular_patterns( |
|
test_set[0]['circular_pattern']) |
|
if self.circular_pattern == 'all_possible': |
|
circular_patterns['all_possible'] = get_all_possible_patterns( |
|
test_set[0]['circular_pattern']) |
|
|
|
metrics = {} |
|
tmp_metrics = {} |
|
tmp_metrics.update({f'correct_{k}': 0 for k in circular_patterns}) |
|
tmp_metrics.update({f'count_{k}': 0 for k in circular_patterns}) |
|
|
|
for pred, refr, origin_item in zip(predictions, references, test_set): |
|
parsed = self.match_any_label(pred, origin_item) |
|
circular_pattern = origin_item['circular_pattern'] |
|
for k in circular_patterns: |
|
if tuple(circular_pattern) in circular_patterns[k]: |
|
tmp_metrics[f'correct_{k}'] += (1 if parsed == refr else 0) |
|
tmp_metrics[f'count_{k}'] += 1 |
|
|
|
for k in circular_patterns: |
|
metrics[f'acc_{k}'] = (tmp_metrics[f'correct_{k}'] / |
|
tmp_metrics[f'count_{k}'] * 100) |
|
|
|
|
|
_details = {k: {} for k in circular_patterns} |
|
for pred, refr, origin_item in zip(predictions, references, test_set): |
|
index = origin_item['qid'] |
|
parsed = self.match_any_label(pred, origin_item) |
|
circular_pattern = origin_item['circular_pattern'] |
|
for k in circular_patterns: |
|
if tuple(circular_pattern) in circular_patterns[k]: |
|
_details[k].setdefault( |
|
index, []).append(True if parsed == refr else False) |
|
for k in _details: |
|
_details[k] = { |
|
index: sum(_details[k][index]) |
|
for index in _details[k] |
|
} |
|
for k in _details: |
|
for j in range(1, len(circular_patterns[k]) + 1): |
|
count = sum([_details[k][index] >= j for index in _details[k]]) |
|
total = len(_details[k]) |
|
if j != len(circular_patterns[k]): |
|
metrics[f'more_{j}_{k}'] = count / total * 100 |
|
else: |
|
metrics[f'perf_{k}'] = count / total * 100 |
|
|
|
|
|
details = {} |
|
for index in range(len(predictions)): |
|
parsed = self.match_any_label(predictions[index], test_set[index]) |
|
details[str(index)] = {} |
|
if 'question' in test_set[index]: |
|
details[str(index)]['question'] = test_set[index]['question'] |
|
details[str(index)]['pred'] = predictions[index] |
|
details[str(index)]['parsed'] = parsed |
|
details[str(index)]['refr'] = references[index] |
|
details[str(index)]['correct'] = parsed == references[index] |
|
metrics['details'] = details |
|
return metrics |
|
|
|
|
|
@LOAD_DATASET.register_module() |
|
class CustomDataset(BaseDataset): |
|
|
|
@staticmethod |
|
def load(path): |
|
if path.endswith('.jsonl'): |
|
with open(path, 'r', encoding='utf-8-sig') as f: |
|
data = [json.loads(line) for line in f] |
|
elif path.endswith('.csv'): |
|
with open(path, 'r', encoding='utf-8-sig') as f: |
|
reader = csv.reader(f) |
|
header = next(reader) |
|
data = [dict(zip(header, row)) for row in reader] |
|
else: |
|
raise ValueError(f'Unsupported file format: {path}') |
|
|
|
return Dataset.from_list(data) |
|
|
|
|
|
class CircularCustomDataset(CustomDataset, metaclass=CircularDatasetMeta): |
|
dataset_class = CustomDataset |
|
|
|
|
|
def stringfy_types(obj): |
|
for k, v in obj.items(): |
|
if k == 'type': |
|
obj[k] = f'{v.__module__}.{v.__name__}' |
|
elif isinstance(v, dict): |
|
stringfy_types(v) |
|
return obj |
|
|
|
|
|
def make_mcq_gen_config(meta): |
|
if meta.get('template', None) is None: |
|
_human_prompt = 'Question: {question}' + ''.join( |
|
[f'\n{item}. {{{item}}}' for item in meta['options']]) |
|
human_prompt = meta.get('human_prompt', _human_prompt) |
|
_bot_prompt = f'Answer: {{{meta["output_column"]}}}' |
|
bot_prompt = meta.get('bot_prompt', _bot_prompt) |
|
template = dict(round=[ |
|
dict(role='HUMAN', prompt=human_prompt), |
|
dict(role='BOT', prompt=bot_prompt), |
|
]) |
|
else: |
|
template = meta['template'] |
|
|
|
reader_cfg = dict( |
|
input_columns=meta['input_columns'], |
|
output_column=meta['output_column'], |
|
) |
|
infer_cfg = dict( |
|
prompt_template=dict( |
|
type=PromptTemplate, |
|
template=template, |
|
), |
|
retriever=dict(type=ZeroRetriever), |
|
inferencer=dict(type=GenInferencer), |
|
) |
|
|
|
eval_cfg = dict( |
|
evaluator=dict(type=meta.get('evaluator', OptionSimAccEvaluator), |
|
**meta.get('evaluator_kwargs', |
|
{'options': meta['options']})), |
|
pred_role='BOT', |
|
) |
|
|
|
dataset = dict( |
|
abbr=meta['abbr'], |
|
type=CustomDataset, |
|
path=meta['path'], |
|
reader_cfg=reader_cfg, |
|
infer_cfg=infer_cfg, |
|
eval_cfg=eval_cfg, |
|
) |
|
return dataset |
|
|
|
|
|
def make_circular_mcq_gen_config(meta): |
|
if meta.get('template', None) is None: |
|
_human_prompt = 'Question: {question}' + ''.join( |
|
[f'\n{item}. {{{item}}}' for item in meta['options']]) |
|
human_prompt = meta.get('human_prompt', _human_prompt) |
|
_bot_prompt = f'Answer: {{{meta["output_column"]}}}' |
|
bot_prompt = meta.get('bot_prompt', _bot_prompt) |
|
template = dict(round=[ |
|
dict(role='HUMAN', prompt=human_prompt), |
|
dict(role='BOT', prompt=bot_prompt), |
|
]) |
|
else: |
|
template = meta['template'] |
|
|
|
reader_cfg = dict( |
|
input_columns=meta['input_columns'], |
|
output_column=meta['output_column'], |
|
) |
|
infer_cfg = dict( |
|
prompt_template=dict( |
|
type=PromptTemplate, |
|
template=template, |
|
), |
|
retriever=dict(type=ZeroRetriever), |
|
inferencer=dict(type=GenInferencer), |
|
) |
|
|
|
eval_cfg = dict( |
|
evaluator=dict(type=meta.get('evaluator', |
|
CircularOptionSimAccEvaluator), |
|
**meta.get('evaluator_kwargs', |
|
{'options': meta['options']})), |
|
pred_role='BOT', |
|
) |
|
|
|
dataset = dict( |
|
abbr=meta['abbr'], |
|
type=CircularCustomDataset, |
|
option_keys=meta['options'], |
|
answer_key=meta['output_column'], |
|
path=meta['path'], |
|
reader_cfg=reader_cfg, |
|
infer_cfg=infer_cfg, |
|
eval_cfg=eval_cfg, |
|
) |
|
return dataset |
|
|
|
|
|
def make_qa_gen_config(meta): |
|
if meta.get('template', None) is None: |
|
human_prompt = meta.get('human_prompt', '{question}') |
|
if meta['output_column'] is None: |
|
template = dict(round=[ |
|
dict(role='HUMAN', prompt=human_prompt), |
|
]) |
|
else: |
|
bot_prompt = meta.get('bot_prompt', f'{{{meta["output_column"]}}}') |
|
template = dict(round=[ |
|
dict(role='HUMAN', prompt=human_prompt), |
|
dict(role='BOT', prompt=bot_prompt), |
|
]) |
|
else: |
|
template = meta['template'] |
|
reader_cfg = dict( |
|
input_columns=meta['input_columns'], |
|
output_column=meta['output_column'], |
|
) |
|
infer_cfg = dict( |
|
prompt_template=dict( |
|
type=PromptTemplate, |
|
template=template, |
|
), |
|
retriever=dict(type=ZeroRetriever), |
|
inferencer=dict(type=GenInferencer), |
|
) |
|
|
|
eval_cfg = dict( |
|
evaluator=dict(type=meta.get('evaluator', AccEvaluator), |
|
**meta.get('evaluator_kwargs', {})), |
|
pred_role='BOT', |
|
) |
|
|
|
dataset = dict( |
|
abbr=meta['abbr'], |
|
type=CustomDataset, |
|
path=meta['path'], |
|
reader_cfg=reader_cfg, |
|
infer_cfg=infer_cfg, |
|
eval_cfg=eval_cfg, |
|
) |
|
return dataset |
|
|
|
|
|
def make_mcq_ppl_config(meta): |
|
if meta.get('template', None) is None: |
|
_human_prompt = 'Question: {question}' + ''.join( |
|
[f'\n{item}. {{{item}}}' for item in meta['options']]) |
|
human_prompt = meta.get('human_prompt', _human_prompt) |
|
_bot_prompt = f'Answer: {{{meta["output_column"]}}}' |
|
bot_prompt = meta.get('bot_prompt', _bot_prompt) |
|
template = { |
|
answer: dict(round=[ |
|
dict(role='HUMAN', prompt=human_prompt), |
|
dict(role='BOT', |
|
prompt=bot_prompt.format( |
|
**{meta['output_column']: answer})), |
|
], ) |
|
for answer in meta['options'] |
|
} |
|
else: |
|
template = meta['template'] |
|
|
|
reader_cfg = dict( |
|
input_columns=meta['input_columns'], |
|
output_column=meta['output_column'], |
|
) |
|
infer_cfg = dict( |
|
prompt_template=dict( |
|
type=PromptTemplate, |
|
template=template, |
|
), |
|
retriever=dict(type=ZeroRetriever), |
|
inferencer=dict(type=PPLInferencer), |
|
) |
|
|
|
eval_cfg = dict(evaluator=dict(type=meta.get('evaluator', AccEvaluator), |
|
**meta.get('evaluator_kwargs', {}))) |
|
|
|
dataset = dict( |
|
abbr=meta['abbr'], |
|
type=CustomDataset, |
|
path=meta['path'], |
|
reader_cfg=reader_cfg, |
|
infer_cfg=infer_cfg, |
|
eval_cfg=eval_cfg, |
|
) |
|
return dataset |
|
|
|
|
|
def make_circular_mcq_ppl_config(meta): |
|
if meta.get('template', None) is None: |
|
_human_prompt = 'Question: {question}' + ''.join( |
|
[f'\n{item}. {{{item}}}' for item in meta['options']]) |
|
human_prompt = meta.get('human_prompt', _human_prompt) |
|
_bot_prompt = f'Answer: {{{meta["output_column"]}}}' |
|
bot_prompt = meta.get('bot_prompt', _bot_prompt) |
|
template = { |
|
answer: dict(round=[ |
|
dict(role='HUMAN', prompt=human_prompt), |
|
dict(role='BOT', |
|
prompt=bot_prompt.format( |
|
**{meta['output_column']: answer})), |
|
], ) |
|
for answer in meta['options'] |
|
} |
|
else: |
|
template = meta['template'] |
|
|
|
reader_cfg = dict( |
|
input_columns=meta['input_columns'], |
|
output_column=meta['output_column'], |
|
) |
|
infer_cfg = dict( |
|
prompt_template=dict( |
|
type=PromptTemplate, |
|
template=template, |
|
), |
|
retriever=dict(type=ZeroRetriever), |
|
inferencer=dict(type=PPLInferencer), |
|
) |
|
|
|
eval_cfg = dict( |
|
evaluator=dict(type=meta.get('evaluator', CircularEvaluator), |
|
**meta.get('evaluator_kwargs', {}))) |
|
|
|
dataset = dict( |
|
abbr=meta['abbr'], |
|
type=CircularCustomDataset, |
|
option_keys=meta['options'], |
|
answer_key=meta['output_column'], |
|
path=meta['path'], |
|
reader_cfg=reader_cfg, |
|
infer_cfg=infer_cfg, |
|
eval_cfg=eval_cfg, |
|
) |
|
return dataset |
|
|
|
|
|
def parse_example_dataset(config): |
|
|
|
path = config['path'] |
|
|
|
|
|
parsed_meta = {} |
|
if path.endswith('.jsonl'): |
|
with open(path, 'r', encoding='utf-8') as f: |
|
data_item = json.loads(f.readline()) |
|
elif path.endswith('.csv'): |
|
with open(path, 'r', encoding='utf-8') as f: |
|
reader = csv.reader(f) |
|
header = next(reader) |
|
row = next(reader) |
|
data_item = dict(zip(header, row)) |
|
else: |
|
raise ValueError(f'Unsupported ext: {path}, .jsonl or .csv required') |
|
|
|
parsed_meta['path'] = path |
|
input_columns = [i for i in data_item.keys() if i != 'answer'] |
|
parsed_meta['input_columns'] = input_columns |
|
output_column = 'answer' if 'answer' in data_item else None |
|
parsed_meta['output_column'] = output_column |
|
options = [] |
|
for i in range(26): |
|
i = chr(ord('A') + i) |
|
if i in data_item: |
|
options.append(i) |
|
else: |
|
break |
|
parsed_meta['options'] = options |
|
abbr = os.path.basename(path).split('.')[0] |
|
parsed_meta['abbr'] = abbr |
|
parsed_meta['data_type'] = 'mcq' if len(options) > 1 else 'qa' |
|
parsed_meta['infer_method'] = 'gen' |
|
|
|
|
|
meta_path = config.get('meta_path', path + '.meta.json') |
|
if os.path.exists(meta_path): |
|
with open(meta_path, 'r', encoding='utf-8') as f: |
|
read_from_file_meta = json.load(f) |
|
else: |
|
read_from_file_meta = {} |
|
|
|
|
|
config_meta = copy.deepcopy(config) |
|
|
|
|
|
meta = {} |
|
meta.update(parsed_meta) |
|
meta.update(read_from_file_meta) |
|
meta.update(config_meta) |
|
|
|
return meta |
|
|
|
|
|
def make_custom_dataset_config(config): |
|
|
|
meta = parse_example_dataset(config) |
|
make_config_func = { |
|
('mcq', 'gen'): make_mcq_gen_config, |
|
('mcq', 'ppl'): make_mcq_ppl_config, |
|
('qa', 'gen'): make_qa_gen_config, |
|
('circular-mcq', 'gen'): make_circular_mcq_gen_config, |
|
('circular-mcq', 'ppl'): make_circular_mcq_ppl_config, |
|
}.get((meta['data_type'], meta['infer_method']), None) |
|
if make_config_func is None: |
|
raise ValueError(f'Unsupported dataset data_type: {meta["data_type"]}' |
|
f' and infer_method: {meta["infer_method"]}') |
|
dataset = make_config_func(meta) |
|
dataset = stringfy_types(dataset) |
|
return dataset |
|
|