TwT-6's picture
Upload 2667 files
256a159 verified
raw
history blame
17.3 kB
import copy
import csv
import json
import os
from typing import List
from datasets import Dataset
from opencompass.datasets.circular import (CircularDatasetMeta,
CircularEvaluator)
from opencompass.openicl.icl_evaluator import AccEvaluator, BaseEvaluator
from opencompass.openicl.icl_inferencer import GenInferencer, PPLInferencer
from opencompass.openicl.icl_prompt_template import PromptTemplate
from opencompass.openicl.icl_retriever import ZeroRetriever
from opencompass.registry import LOAD_DATASET
from .base import BaseDataset
class OptionSimAccEvaluator(BaseEvaluator):
def __init__(self, options) -> None:
super().__init__()
if not all((isinstance(i, str) and i.isupper() and len(i) == 1)
for i in options):
raise ValueError(
f'Each options should be single upper letter, got {options}')
self.options = options
def match_any_label(self, pred, test_item):
from rapidfuzz.distance import Levenshtein as L
from opencompass.utils.text_postprocessors import \
first_option_postprocess
pred = pred.strip()
if any([pred == i for i in self.options]):
parsed = pred
else:
parsed = ''
if parsed == '':
parsed = first_option_postprocess(pred,
''.join(self.options),
cushion=False)
if parsed == '':
possible_options = []
for opt in self.options:
opt_str = test_item[opt]
if opt_str is not None and opt_str.lower() in pred.lower():
possible_options.append(opt)
if len(possible_options) == 1:
parsed = possible_options[0]
if parsed == '':
dists = []
for opt in self.options:
opt_str = test_item[opt]
if opt_str is None:
continue
cands = [opt, opt_str, opt + '. ' + opt_str]
d = min(L.distance(pred, cand) for cand in cands)
dists.append((d, opt))
if len(dists) > 0:
parsed = min(dists)[1]
return parsed
def score(self, predictions: List, references: List, test_set) -> dict:
assert len(predictions) == len(references)
num_correct, num_total = 0, 0
details = {}
for index in range(len(predictions)):
pred = predictions[index]
refr = references[index]
parsed = self.match_any_label(pred, test_set[index])
num_correct += 1 if parsed == refr else 0
num_total += 1
details[str(index)] = {}
details[str(index)]['pred'] = pred
details[str(index)]['parsed'] = parsed
details[str(index)]['refr'] = refr
details[str(index)]['correct'] = parsed == refr
return {'accuracy': num_correct / num_total * 100, 'details': details}
# TODO: DO NOT COPY YOURSELF!!!
class CircularOptionSimAccEvaluator(OptionSimAccEvaluator):
def __init__(self, options, circular_pattern='circular'):
super().__init__(options)
self.circular_pattern = circular_pattern
def score(self, predictions, references, test_set):
from opencompass.datasets.circular import (get_all_possible_patterns,
get_circular_patterns,
get_origin_patterns)
circular_patterns = {}
circular_patterns['origin'] = get_origin_patterns(
test_set[0]['circular_pattern'])
circular_patterns['circular'] = get_circular_patterns(
test_set[0]['circular_pattern'])
if self.circular_pattern == 'all_possible':
circular_patterns['all_possible'] = get_all_possible_patterns(
test_set[0]['circular_pattern'])
metrics = {}
tmp_metrics = {}
tmp_metrics.update({f'correct_{k}': 0 for k in circular_patterns})
tmp_metrics.update({f'count_{k}': 0 for k in circular_patterns})
# calculate the original accuracy
for pred, refr, origin_item in zip(predictions, references, test_set):
parsed = self.match_any_label(pred, origin_item)
circular_pattern = origin_item['circular_pattern']
for k in circular_patterns:
if tuple(circular_pattern) in circular_patterns[k]:
tmp_metrics[f'correct_{k}'] += (1 if parsed == refr else 0)
tmp_metrics[f'count_{k}'] += 1
for k in circular_patterns:
metrics[f'acc_{k}'] = (tmp_metrics[f'correct_{k}'] /
tmp_metrics[f'count_{k}'] * 100)
# calculate the circular accuracy
_details = {k: {} for k in circular_patterns}
for pred, refr, origin_item in zip(predictions, references, test_set):
index = origin_item['qid']
parsed = self.match_any_label(pred, origin_item)
circular_pattern = origin_item['circular_pattern']
for k in circular_patterns:
if tuple(circular_pattern) in circular_patterns[k]:
_details[k].setdefault(
index, []).append(True if parsed == refr else False)
for k in _details:
_details[k] = {
index: sum(_details[k][index])
for index in _details[k]
}
for k in _details:
for j in range(1, len(circular_patterns[k]) + 1):
count = sum([_details[k][index] >= j for index in _details[k]])
total = len(_details[k])
if j != len(circular_patterns[k]):
metrics[f'more_{j}_{k}'] = count / total * 100
else:
metrics[f'perf_{k}'] = count / total * 100
# make details
details = {}
for index in range(len(predictions)):
parsed = self.match_any_label(predictions[index], test_set[index])
details[str(index)] = {}
if 'question' in test_set[index]:
details[str(index)]['question'] = test_set[index]['question']
details[str(index)]['pred'] = predictions[index]
details[str(index)]['parsed'] = parsed
details[str(index)]['refr'] = references[index]
details[str(index)]['correct'] = parsed == references[index]
metrics['details'] = details
return metrics
@LOAD_DATASET.register_module()
class CustomDataset(BaseDataset):
@staticmethod
def load(path):
if path.endswith('.jsonl'):
with open(path, 'r', encoding='utf-8-sig') as f:
data = [json.loads(line) for line in f]
elif path.endswith('.csv'):
with open(path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
header = next(reader)
data = [dict(zip(header, row)) for row in reader]
else:
raise ValueError(f'Unsupported file format: {path}')
return Dataset.from_list(data)
class CircularCustomDataset(CustomDataset, metaclass=CircularDatasetMeta):
dataset_class = CustomDataset
def stringfy_types(obj):
for k, v in obj.items():
if k == 'type':
obj[k] = f'{v.__module__}.{v.__name__}'
elif isinstance(v, dict):
stringfy_types(v)
return obj
def make_mcq_gen_config(meta):
if meta.get('template', None) is None:
_human_prompt = 'Question: {question}' + ''.join(
[f'\n{item}. {{{item}}}' for item in meta['options']])
human_prompt = meta.get('human_prompt', _human_prompt)
_bot_prompt = f'Answer: {{{meta["output_column"]}}}'
bot_prompt = meta.get('bot_prompt', _bot_prompt)
template = dict(round=[
dict(role='HUMAN', prompt=human_prompt),
dict(role='BOT', prompt=bot_prompt),
])
else:
template = meta['template']
reader_cfg = dict(
input_columns=meta['input_columns'],
output_column=meta['output_column'],
)
infer_cfg = dict(
prompt_template=dict(
type=PromptTemplate,
template=template,
),
retriever=dict(type=ZeroRetriever),
inferencer=dict(type=GenInferencer),
)
eval_cfg = dict(
evaluator=dict(type=meta.get('evaluator', OptionSimAccEvaluator),
**meta.get('evaluator_kwargs',
{'options': meta['options']})),
pred_role='BOT',
)
dataset = dict(
abbr=meta['abbr'],
type=CustomDataset,
path=meta['path'],
reader_cfg=reader_cfg,
infer_cfg=infer_cfg,
eval_cfg=eval_cfg,
)
return dataset
def make_circular_mcq_gen_config(meta):
if meta.get('template', None) is None:
_human_prompt = 'Question: {question}' + ''.join(
[f'\n{item}. {{{item}}}' for item in meta['options']])
human_prompt = meta.get('human_prompt', _human_prompt)
_bot_prompt = f'Answer: {{{meta["output_column"]}}}'
bot_prompt = meta.get('bot_prompt', _bot_prompt)
template = dict(round=[
dict(role='HUMAN', prompt=human_prompt),
dict(role='BOT', prompt=bot_prompt),
])
else:
template = meta['template']
reader_cfg = dict(
input_columns=meta['input_columns'],
output_column=meta['output_column'],
)
infer_cfg = dict(
prompt_template=dict(
type=PromptTemplate,
template=template,
),
retriever=dict(type=ZeroRetriever),
inferencer=dict(type=GenInferencer),
)
eval_cfg = dict(
evaluator=dict(type=meta.get('evaluator',
CircularOptionSimAccEvaluator),
**meta.get('evaluator_kwargs',
{'options': meta['options']})),
pred_role='BOT',
)
dataset = dict(
abbr=meta['abbr'],
type=CircularCustomDataset,
option_keys=meta['options'],
answer_key=meta['output_column'],
path=meta['path'],
reader_cfg=reader_cfg,
infer_cfg=infer_cfg,
eval_cfg=eval_cfg,
)
return dataset
def make_qa_gen_config(meta):
if meta.get('template', None) is None:
human_prompt = meta.get('human_prompt', '{question}')
if meta['output_column'] is None:
template = dict(round=[
dict(role='HUMAN', prompt=human_prompt),
])
else:
bot_prompt = meta.get('bot_prompt', f'{{{meta["output_column"]}}}')
template = dict(round=[
dict(role='HUMAN', prompt=human_prompt),
dict(role='BOT', prompt=bot_prompt),
])
else:
template = meta['template']
reader_cfg = dict(
input_columns=meta['input_columns'],
output_column=meta['output_column'],
)
infer_cfg = dict(
prompt_template=dict(
type=PromptTemplate,
template=template,
),
retriever=dict(type=ZeroRetriever),
inferencer=dict(type=GenInferencer),
)
eval_cfg = dict(
evaluator=dict(type=meta.get('evaluator', AccEvaluator),
**meta.get('evaluator_kwargs', {})),
pred_role='BOT',
)
dataset = dict(
abbr=meta['abbr'],
type=CustomDataset,
path=meta['path'],
reader_cfg=reader_cfg,
infer_cfg=infer_cfg,
eval_cfg=eval_cfg,
)
return dataset
def make_mcq_ppl_config(meta):
if meta.get('template', None) is None:
_human_prompt = 'Question: {question}' + ''.join(
[f'\n{item}. {{{item}}}' for item in meta['options']])
human_prompt = meta.get('human_prompt', _human_prompt)
_bot_prompt = f'Answer: {{{meta["output_column"]}}}'
bot_prompt = meta.get('bot_prompt', _bot_prompt)
template = {
answer: dict(round=[
dict(role='HUMAN', prompt=human_prompt),
dict(role='BOT',
prompt=bot_prompt.format(
**{meta['output_column']: answer})),
], )
for answer in meta['options']
}
else:
template = meta['template']
reader_cfg = dict(
input_columns=meta['input_columns'],
output_column=meta['output_column'],
)
infer_cfg = dict(
prompt_template=dict(
type=PromptTemplate,
template=template,
),
retriever=dict(type=ZeroRetriever),
inferencer=dict(type=PPLInferencer),
)
eval_cfg = dict(evaluator=dict(type=meta.get('evaluator', AccEvaluator),
**meta.get('evaluator_kwargs', {})))
dataset = dict(
abbr=meta['abbr'],
type=CustomDataset,
path=meta['path'],
reader_cfg=reader_cfg,
infer_cfg=infer_cfg,
eval_cfg=eval_cfg,
)
return dataset
def make_circular_mcq_ppl_config(meta):
if meta.get('template', None) is None:
_human_prompt = 'Question: {question}' + ''.join(
[f'\n{item}. {{{item}}}' for item in meta['options']])
human_prompt = meta.get('human_prompt', _human_prompt)
_bot_prompt = f'Answer: {{{meta["output_column"]}}}'
bot_prompt = meta.get('bot_prompt', _bot_prompt)
template = {
answer: dict(round=[
dict(role='HUMAN', prompt=human_prompt),
dict(role='BOT',
prompt=bot_prompt.format(
**{meta['output_column']: answer})),
], )
for answer in meta['options']
}
else:
template = meta['template']
reader_cfg = dict(
input_columns=meta['input_columns'],
output_column=meta['output_column'],
)
infer_cfg = dict(
prompt_template=dict(
type=PromptTemplate,
template=template,
),
retriever=dict(type=ZeroRetriever),
inferencer=dict(type=PPLInferencer),
)
eval_cfg = dict(
evaluator=dict(type=meta.get('evaluator', CircularEvaluator),
**meta.get('evaluator_kwargs', {})))
dataset = dict(
abbr=meta['abbr'],
type=CircularCustomDataset,
option_keys=meta['options'],
answer_key=meta['output_column'],
path=meta['path'],
reader_cfg=reader_cfg,
infer_cfg=infer_cfg,
eval_cfg=eval_cfg,
)
return dataset
def parse_example_dataset(config):
# config -> .meta.jsonl -> parsed_results
path = config['path']
# load sample and get parsed_meta
parsed_meta = {}
if path.endswith('.jsonl'):
with open(path, 'r', encoding='utf-8') as f:
data_item = json.loads(f.readline())
elif path.endswith('.csv'):
with open(path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader)
row = next(reader)
data_item = dict(zip(header, row))
else:
raise ValueError(f'Unsupported ext: {path}, .jsonl or .csv required')
parsed_meta['path'] = path
input_columns = [i for i in data_item.keys() if i != 'answer']
parsed_meta['input_columns'] = input_columns
output_column = 'answer' if 'answer' in data_item else None
parsed_meta['output_column'] = output_column
options = []
for i in range(26):
i = chr(ord('A') + i)
if i in data_item:
options.append(i)
else:
break
parsed_meta['options'] = options
abbr = os.path.basename(path).split('.')[0]
parsed_meta['abbr'] = abbr
parsed_meta['data_type'] = 'mcq' if len(options) > 1 else 'qa'
parsed_meta['infer_method'] = 'gen'
# try to read meta json
meta_path = config.get('meta_path', path + '.meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r', encoding='utf-8') as f:
read_from_file_meta = json.load(f)
else:
read_from_file_meta = {}
# get config meta
config_meta = copy.deepcopy(config)
# merge meta
meta = {}
meta.update(parsed_meta)
meta.update(read_from_file_meta)
meta.update(config_meta)
return meta
def make_custom_dataset_config(config):
# considered as a custom dataset
meta = parse_example_dataset(config)
make_config_func = {
('mcq', 'gen'): make_mcq_gen_config,
('mcq', 'ppl'): make_mcq_ppl_config,
('qa', 'gen'): make_qa_gen_config,
('circular-mcq', 'gen'): make_circular_mcq_gen_config,
('circular-mcq', 'ppl'): make_circular_mcq_ppl_config,
}.get((meta['data_type'], meta['infer_method']), None)
if make_config_func is None:
raise ValueError(f'Unsupported dataset data_type: {meta["data_type"]}'
f' and infer_method: {meta["infer_method"]}')
dataset = make_config_func(meta)
dataset = stringfy_types(dataset)
return dataset