"""Functions for computing metrics. Part of following code are modified from ` https://github.com/THUDM/LongBench` """ import re import string from collections import Counter from typing import List import jieba from rouge import Rouge from opencompass.openicl.icl_evaluator import BaseEvaluator from opencompass.registry import ICL_EVALUATORS ABANDON_WORDS_EN = [ 'and', 'to', 'of', 'in', 'her', 'was', 'with', 'for', 'it', 'from', 'is', 'that', 'his', 'he', 'by', 'she', 'they', 'or', 'at', 'because', 'be', 'on', 'are', 'their', 'what', 'as', 'had', 'were', 'about', 'being', 'this', 'who', 'but', 'have', 'has', 'when', 'which', 'does', ] ABANDON_WORDS_ZH = [ '的', '和', '是', '等', '在', '年', '可以', '为', '与', '‰', '了', '或', '一种', '月', 'c', '至', '日', '有', '进行', '于', '不', '中', '×', '根据', '小', '由', '亩', '也', '要', '指', '法', '会', '元', '主要', '以及', '通过', '首先', '对', '然后', '号', '以', '所', '后', '丁', '包括', '无', '将', '用', '能', '形', '方面', '因素', '位于', '而', '从', '到', '一定', '用于', '但', '使用', '让', '具有', '并', '亿元', '万元', '上', '类', '基于', '才', '来', '地', '片', '其他', '个', '或者', '变得', '时', '给', '你', '使', '条', '受', '已经', '带', '度', ] def normalize_answer(s): """Lower text and remove punctuation, articles and extra whitespace.""" def remove_articles(text): return re.sub(r'\b(a|an|the)\b', ' ', text) def white_space_fix(text): return ' '.join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return ''.join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() return white_space_fix(remove_articles(remove_punc(lower(s)))) def normalize_zh_answer(s): """Lower text and remove punctuation, extra whitespace.""" def white_space_fix(text): return ''.join(text.split()) def remove_punc(text): cn_punctuation = '!?。。"#$%&'()*+,-/:;<=>@[\]^_`\ {|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏.' all_punctuation = set(string.punctuation + cn_punctuation) return ''.join(ch for ch in text if ch not in all_punctuation) def lower(text): return text.lower() return white_space_fix(remove_punc(lower(s))) @ICL_EVALUATORS.register_module() class LVEvalF1Evaluator(BaseEvaluator): def __init__(self, language: str = 'en') -> None: super().__init__() assert language in ['en', 'zh'] self.language = language def score(self, predictions: List, references: List) -> dict: def f1_score(prediction, reference, **kwargs): common = Counter(prediction) & Counter(reference) num_same = sum(common.values()) if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction) recall = 1.0 * num_same / len(reference) f1 = (2 * precision * recall) / (precision + recall) return f1 score = 0.0 for i in range(len(predictions)): prediction = predictions[i] reference_list = references[i] task_score = 0.0 for reference in reference_list: if self.language == 'en': normalized_prediction = normalize_answer(prediction) normalized_reference = normalize_answer(reference) prediction_tokens = normalized_prediction.split() reference_tokens = normalized_reference.split() else: prediction_tokens = list( jieba.cut(prediction, cut_all=False)) reference_tokens = list(jieba.cut(reference, cut_all=False)) prediction_tokens = [ normalize_zh_answer(token) for token in prediction_tokens ] reference_tokens = [ normalize_zh_answer(token) for token in reference_tokens ] prediction_tokens = [ token for token in prediction_tokens if len(token) > 0 ] reference_tokens = [ token for token in reference_tokens if len(token) > 0 ] task_score = max(task_score, f1_score(prediction_tokens, reference_tokens)) break score += task_score score = score / len(predictions) * 100 return {'f1': score} @ICL_EVALUATORS.register_module() class LVEvalOPTF1Evaluator(BaseEvaluator): def __init__(self, language: str = 'en') -> None: super().__init__() assert language in ['en', 'zh'] self.language = language def score(self, predictions: List, references: List) -> dict: def f1_score(prediction, reference, **kwargs): common = Counter(prediction) & Counter(reference) num_same = sum(common.values()) if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction) recall = 1.0 * num_same / len(reference) f1 = (2 * precision * recall) / (precision + recall) return f1 score = 0.0 for i in range(len(predictions)): prediction = predictions[i] reference_list = references[i] answer_keyword = reference_list[-1] task_score = 0.0 for reference in reference_list: if self.language == 'en': normalized_prediction = normalize_answer(prediction) normalized_reference = normalize_answer(reference) prediction_tokens = normalized_prediction.split() reference_tokens = normalized_reference.split() # answer keywords recall if answer_keyword: answer_keyword_tokens = normalize_answer( answer_keyword) answer_keyword_tokens = answer_keyword_tokens.split() common = Counter(prediction_tokens) & Counter( answer_keyword_tokens) filtered_common = { key: value for key, value in common.items() if key not in ABANDON_WORDS_EN } num_same = sum(filtered_common.values()) recall = 1.0 * num_same / len(answer_keyword_tokens) if recall < 0.2: break else: prediction_tokens = list( jieba.cut(prediction, cut_all=False)) reference_tokens = list(jieba.cut(reference, cut_all=False)) prediction_tokens = [ normalize_zh_answer(token) for token in prediction_tokens ] reference_tokens = [ normalize_zh_answer(token) for token in reference_tokens ] prediction_tokens = [ token for token in prediction_tokens if len(token) > 0 ] reference_tokens = [ token for token in reference_tokens if len(token) > 0 ] if not answer_keyword: answer_keyword = reference if answer_keyword: answer_keyword_tokens = list( jieba.cut(answer_keyword, cut_all=False)) answer_keyword_tokens = [ normalize_zh_answer(token) for token in answer_keyword_tokens ] answer_keyword_tokens = [ token for token in answer_keyword_tokens if len(token) > 0 ] common = Counter(prediction_tokens) & Counter( answer_keyword_tokens) filtered_common = { key: value for key, value in common.items() if key not in ABANDON_WORDS_ZH } num_same = sum(filtered_common.values()) recall = 1.0 * num_same / len(answer_keyword_tokens) if recall < 0.4: break task_score = max(task_score, f1_score(prediction_tokens, reference_tokens)) break score += task_score score = score / len(predictions) * 100 return {'LVEval_f1': score} @ICL_EVALUATORS.register_module() class LVEvalOPTRougeEvaluator(BaseEvaluator): def __init__(self, language: str = 'en') -> None: super().__init__() assert language in ['en', 'zh'] self.language = language def score(self, predictions: List, references: List) -> dict: score = 0.0 for i in range(len(predictions)): prediction = predictions[i] reference_list = references[i] task_score = 0.0 for reference in reference_list: if self.language == 'zh': word_blacklist = ABANDON_WORDS_ZH prediction_tokens = list( jieba.cut(prediction, cut_all=False)) reference_tokens = list(jieba.cut(reference, cut_all=False)) prediction_tokens = [ normalize_zh_answer(token) for token in prediction_tokens ] reference_tokens = [ normalize_zh_answer(token) for token in reference_tokens ] else: word_blacklist = ABANDON_WORDS_EN prediction_tokens = normalize_answer(prediction) reference_tokens = normalize_answer(reference) prediction_tokens = prediction_tokens.split() reference_tokens = reference_tokens.split() filtered_prediction_tokens = [ i for i in prediction_tokens if i not in word_blacklist ] filtered_reference_tokens = [ i for i in reference_tokens if i not in word_blacklist ] prediction = ' '.join(filtered_prediction_tokens) reference = ' '.join(filtered_reference_tokens) rouge = Rouge() try: cur_score = rouge.get_scores([prediction], [reference], avg=True)['rouge-l']['f'] except Exception: cur_score = 0.0 task_score = max(task_score, cur_score) break score += task_score score = score / len(predictions) * 100 return {'LVEval_rouge': score}