import copy import csv import json import os from typing import List from datasets import Dataset from opencompass.datasets.circular import (CircularDatasetMeta, CircularEvaluator) from opencompass.openicl.icl_evaluator import AccEvaluator, BaseEvaluator from opencompass.openicl.icl_inferencer import GenInferencer, PPLInferencer from opencompass.openicl.icl_prompt_template import PromptTemplate from opencompass.openicl.icl_retriever import ZeroRetriever from opencompass.registry import LOAD_DATASET from .base import BaseDataset class OptionSimAccEvaluator(BaseEvaluator): def __init__(self, options) -> None: super().__init__() if not all((isinstance(i, str) and i.isupper() and len(i) == 1) for i in options): raise ValueError( f'Each options should be single upper letter, got {options}') self.options = options def match_any_label(self, pred, test_item): from rapidfuzz.distance import Levenshtein as L from opencompass.utils.text_postprocessors import \ first_option_postprocess pred = pred.strip() if any([pred == i for i in self.options]): parsed = pred else: parsed = '' if parsed == '': parsed = first_option_postprocess(pred, ''.join(self.options), cushion=False) if parsed == '': possible_options = [] for opt in self.options: opt_str = test_item[opt] if opt_str is not None and opt_str.lower() in pred.lower(): possible_options.append(opt) if len(possible_options) == 1: parsed = possible_options[0] if parsed == '': dists = [] for opt in self.options: opt_str = test_item[opt] if opt_str is None: continue cands = [opt, opt_str, opt + '. ' + opt_str] d = min(L.distance(pred, cand) for cand in cands) dists.append((d, opt)) if len(dists) > 0: parsed = min(dists)[1] return parsed def score(self, predictions: List, references: List, test_set) -> dict: assert len(predictions) == len(references) num_correct, num_total = 0, 0 details = {} for index in range(len(predictions)): pred = predictions[index] refr = references[index] parsed = self.match_any_label(pred, test_set[index]) num_correct += 1 if parsed == refr else 0 num_total += 1 details[str(index)] = {} details[str(index)]['pred'] = pred details[str(index)]['parsed'] = parsed details[str(index)]['refr'] = refr details[str(index)]['correct'] = parsed == refr return {'accuracy': num_correct / num_total * 100, 'details': details} # TODO: DO NOT COPY YOURSELF!!! class CircularOptionSimAccEvaluator(OptionSimAccEvaluator): def __init__(self, options, circular_pattern='circular'): super().__init__(options) self.circular_pattern = circular_pattern def score(self, predictions, references, test_set): from opencompass.datasets.circular import (get_all_possible_patterns, get_circular_patterns, get_origin_patterns) circular_patterns = {} circular_patterns['origin'] = get_origin_patterns( test_set[0]['circular_pattern']) circular_patterns['circular'] = get_circular_patterns( test_set[0]['circular_pattern']) if self.circular_pattern == 'all_possible': circular_patterns['all_possible'] = get_all_possible_patterns( test_set[0]['circular_pattern']) metrics = {} tmp_metrics = {} tmp_metrics.update({f'correct_{k}': 0 for k in circular_patterns}) tmp_metrics.update({f'count_{k}': 0 for k in circular_patterns}) # calculate the original accuracy for pred, refr, origin_item in zip(predictions, references, test_set): parsed = self.match_any_label(pred, origin_item) circular_pattern = origin_item['circular_pattern'] for k in circular_patterns: if tuple(circular_pattern) in circular_patterns[k]: tmp_metrics[f'correct_{k}'] += (1 if parsed == refr else 0) tmp_metrics[f'count_{k}'] += 1 for k in circular_patterns: metrics[f'acc_{k}'] = (tmp_metrics[f'correct_{k}'] / tmp_metrics[f'count_{k}'] * 100) # calculate the circular accuracy _details = {k: {} for k in circular_patterns} for pred, refr, origin_item in zip(predictions, references, test_set): index = origin_item['qid'] parsed = self.match_any_label(pred, origin_item) circular_pattern = origin_item['circular_pattern'] for k in circular_patterns: if tuple(circular_pattern) in circular_patterns[k]: _details[k].setdefault( index, []).append(True if parsed == refr else False) for k in _details: _details[k] = { index: sum(_details[k][index]) for index in _details[k] } for k in _details: for j in range(1, len(circular_patterns[k]) + 1): count = sum([_details[k][index] >= j for index in _details[k]]) total = len(_details[k]) if j != len(circular_patterns[k]): metrics[f'more_{j}_{k}'] = count / total * 100 else: metrics[f'perf_{k}'] = count / total * 100 # make details details = {} for index in range(len(predictions)): parsed = self.match_any_label(predictions[index], test_set[index]) details[str(index)] = {} if 'question' in test_set[index]: details[str(index)]['question'] = test_set[index]['question'] details[str(index)]['pred'] = predictions[index] details[str(index)]['parsed'] = parsed details[str(index)]['refr'] = references[index] details[str(index)]['correct'] = parsed == references[index] metrics['details'] = details return metrics @LOAD_DATASET.register_module() class CustomDataset(BaseDataset): @staticmethod def load(path): if path.endswith('.jsonl'): with open(path, 'r', encoding='utf-8-sig') as f: data = [json.loads(line) for line in f] elif path.endswith('.csv'): with open(path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) header = next(reader) data = [dict(zip(header, row)) for row in reader] else: raise ValueError(f'Unsupported file format: {path}') return Dataset.from_list(data) class CircularCustomDataset(CustomDataset, metaclass=CircularDatasetMeta): dataset_class = CustomDataset def stringfy_types(obj): for k, v in obj.items(): if k == 'type': obj[k] = f'{v.__module__}.{v.__name__}' elif isinstance(v, dict): stringfy_types(v) return obj def make_mcq_gen_config(meta): if meta.get('template', None) is None: _human_prompt = 'Question: {question}' + ''.join( [f'\n{item}. {{{item}}}' for item in meta['options']]) human_prompt = meta.get('human_prompt', _human_prompt) _bot_prompt = f'Answer: {{{meta["output_column"]}}}' bot_prompt = meta.get('bot_prompt', _bot_prompt) template = dict(round=[ dict(role='HUMAN', prompt=human_prompt), dict(role='BOT', prompt=bot_prompt), ]) else: template = meta['template'] reader_cfg = dict( input_columns=meta['input_columns'], output_column=meta['output_column'], ) infer_cfg = dict( prompt_template=dict( type=PromptTemplate, template=template, ), retriever=dict(type=ZeroRetriever), inferencer=dict(type=GenInferencer), ) eval_cfg = dict( evaluator=dict(type=meta.get('evaluator', OptionSimAccEvaluator), **meta.get('evaluator_kwargs', {'options': meta['options']})), pred_role='BOT', ) dataset = dict( abbr=meta['abbr'], type=CustomDataset, path=meta['path'], reader_cfg=reader_cfg, infer_cfg=infer_cfg, eval_cfg=eval_cfg, ) return dataset def make_circular_mcq_gen_config(meta): if meta.get('template', None) is None: _human_prompt = 'Question: {question}' + ''.join( [f'\n{item}. {{{item}}}' for item in meta['options']]) human_prompt = meta.get('human_prompt', _human_prompt) _bot_prompt = f'Answer: {{{meta["output_column"]}}}' bot_prompt = meta.get('bot_prompt', _bot_prompt) template = dict(round=[ dict(role='HUMAN', prompt=human_prompt), dict(role='BOT', prompt=bot_prompt), ]) else: template = meta['template'] reader_cfg = dict( input_columns=meta['input_columns'], output_column=meta['output_column'], ) infer_cfg = dict( prompt_template=dict( type=PromptTemplate, template=template, ), retriever=dict(type=ZeroRetriever), inferencer=dict(type=GenInferencer), ) eval_cfg = dict( evaluator=dict(type=meta.get('evaluator', CircularOptionSimAccEvaluator), **meta.get('evaluator_kwargs', {'options': meta['options']})), pred_role='BOT', ) dataset = dict( abbr=meta['abbr'], type=CircularCustomDataset, option_keys=meta['options'], answer_key=meta['output_column'], path=meta['path'], reader_cfg=reader_cfg, infer_cfg=infer_cfg, eval_cfg=eval_cfg, ) return dataset def make_qa_gen_config(meta): if meta.get('template', None) is None: human_prompt = meta.get('human_prompt', '{question}') if meta['output_column'] is None: template = dict(round=[ dict(role='HUMAN', prompt=human_prompt), ]) else: bot_prompt = meta.get('bot_prompt', f'{{{meta["output_column"]}}}') template = dict(round=[ dict(role='HUMAN', prompt=human_prompt), dict(role='BOT', prompt=bot_prompt), ]) else: template = meta['template'] reader_cfg = dict( input_columns=meta['input_columns'], output_column=meta['output_column'], ) infer_cfg = dict( prompt_template=dict( type=PromptTemplate, template=template, ), retriever=dict(type=ZeroRetriever), inferencer=dict(type=GenInferencer), ) eval_cfg = dict( evaluator=dict(type=meta.get('evaluator', AccEvaluator), **meta.get('evaluator_kwargs', {})), pred_role='BOT', ) dataset = dict( abbr=meta['abbr'], type=CustomDataset, path=meta['path'], reader_cfg=reader_cfg, infer_cfg=infer_cfg, eval_cfg=eval_cfg, ) return dataset def make_mcq_ppl_config(meta): if meta.get('template', None) is None: _human_prompt = 'Question: {question}' + ''.join( [f'\n{item}. {{{item}}}' for item in meta['options']]) human_prompt = meta.get('human_prompt', _human_prompt) _bot_prompt = f'Answer: {{{meta["output_column"]}}}' bot_prompt = meta.get('bot_prompt', _bot_prompt) template = { answer: dict(round=[ dict(role='HUMAN', prompt=human_prompt), dict(role='BOT', prompt=bot_prompt.format( **{meta['output_column']: answer})), ], ) for answer in meta['options'] } else: template = meta['template'] reader_cfg = dict( input_columns=meta['input_columns'], output_column=meta['output_column'], ) infer_cfg = dict( prompt_template=dict( type=PromptTemplate, template=template, ), retriever=dict(type=ZeroRetriever), inferencer=dict(type=PPLInferencer), ) eval_cfg = dict(evaluator=dict(type=meta.get('evaluator', AccEvaluator), **meta.get('evaluator_kwargs', {}))) dataset = dict( abbr=meta['abbr'], type=CustomDataset, path=meta['path'], reader_cfg=reader_cfg, infer_cfg=infer_cfg, eval_cfg=eval_cfg, ) return dataset def make_circular_mcq_ppl_config(meta): if meta.get('template', None) is None: _human_prompt = 'Question: {question}' + ''.join( [f'\n{item}. {{{item}}}' for item in meta['options']]) human_prompt = meta.get('human_prompt', _human_prompt) _bot_prompt = f'Answer: {{{meta["output_column"]}}}' bot_prompt = meta.get('bot_prompt', _bot_prompt) template = { answer: dict(round=[ dict(role='HUMAN', prompt=human_prompt), dict(role='BOT', prompt=bot_prompt.format( **{meta['output_column']: answer})), ], ) for answer in meta['options'] } else: template = meta['template'] reader_cfg = dict( input_columns=meta['input_columns'], output_column=meta['output_column'], ) infer_cfg = dict( prompt_template=dict( type=PromptTemplate, template=template, ), retriever=dict(type=ZeroRetriever), inferencer=dict(type=PPLInferencer), ) eval_cfg = dict( evaluator=dict(type=meta.get('evaluator', CircularEvaluator), **meta.get('evaluator_kwargs', {}))) dataset = dict( abbr=meta['abbr'], type=CircularCustomDataset, option_keys=meta['options'], answer_key=meta['output_column'], path=meta['path'], reader_cfg=reader_cfg, infer_cfg=infer_cfg, eval_cfg=eval_cfg, ) return dataset def parse_example_dataset(config): # config -> .meta.jsonl -> parsed_results path = config['path'] # load sample and get parsed_meta parsed_meta = {} if path.endswith('.jsonl'): with open(path, 'r', encoding='utf-8') as f: data_item = json.loads(f.readline()) elif path.endswith('.csv'): with open(path, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) row = next(reader) data_item = dict(zip(header, row)) else: raise ValueError(f'Unsupported ext: {path}, .jsonl or .csv required') parsed_meta['path'] = path input_columns = [i for i in data_item.keys() if i != 'answer'] parsed_meta['input_columns'] = input_columns output_column = 'answer' if 'answer' in data_item else None parsed_meta['output_column'] = output_column options = [] for i in range(26): i = chr(ord('A') + i) if i in data_item: options.append(i) else: break parsed_meta['options'] = options abbr = os.path.basename(path).split('.')[0] parsed_meta['abbr'] = abbr parsed_meta['data_type'] = 'mcq' if len(options) > 1 else 'qa' parsed_meta['infer_method'] = 'gen' # try to read meta json meta_path = config.get('meta_path', path + '.meta.json') if os.path.exists(meta_path): with open(meta_path, 'r', encoding='utf-8') as f: read_from_file_meta = json.load(f) else: read_from_file_meta = {} # get config meta config_meta = copy.deepcopy(config) # merge meta meta = {} meta.update(parsed_meta) meta.update(read_from_file_meta) meta.update(config_meta) return meta def make_custom_dataset_config(config): # considered as a custom dataset meta = parse_example_dataset(config) make_config_func = { ('mcq', 'gen'): make_mcq_gen_config, ('mcq', 'ppl'): make_mcq_ppl_config, ('qa', 'gen'): make_qa_gen_config, ('circular-mcq', 'gen'): make_circular_mcq_gen_config, ('circular-mcq', 'ppl'): make_circular_mcq_ppl_config, }.get((meta['data_type'], meta['infer_method']), None) if make_config_func is None: raise ValueError(f'Unsupported dataset data_type: {meta["data_type"]}' f' and infer_method: {meta["infer_method"]}') dataset = make_config_func(meta) dataset = stringfy_types(dataset) return dataset