|
"""Functions for computing metrics. |
|
|
|
Part of following code are modified from ` https://github.com/THUDM/LongBench` |
|
""" |
|
|
|
import re |
|
import string |
|
from collections import Counter |
|
from typing import List |
|
|
|
import jieba |
|
from rouge import Rouge |
|
|
|
from opencompass.openicl.icl_evaluator import BaseEvaluator |
|
from opencompass.registry import ICL_EVALUATORS |
|
|
|
ABANDON_WORDS_EN = [ |
|
'and', |
|
'to', |
|
'of', |
|
'in', |
|
'her', |
|
'was', |
|
'with', |
|
'for', |
|
'it', |
|
'from', |
|
'is', |
|
'that', |
|
'his', |
|
'he', |
|
'by', |
|
'she', |
|
'they', |
|
'or', |
|
'at', |
|
'because', |
|
'be', |
|
'on', |
|
'are', |
|
'their', |
|
'what', |
|
'as', |
|
'had', |
|
'were', |
|
'about', |
|
'being', |
|
'this', |
|
'who', |
|
'but', |
|
'have', |
|
'has', |
|
'when', |
|
'which', |
|
'does', |
|
] |
|
|
|
ABANDON_WORDS_ZH = [ |
|
'的', |
|
'和', |
|
'是', |
|
'等', |
|
'在', |
|
'年', |
|
'可以', |
|
'为', |
|
'与', |
|
'‰', |
|
'了', |
|
'或', |
|
'一种', |
|
'月', |
|
'c', |
|
'至', |
|
'日', |
|
'有', |
|
'进行', |
|
'于', |
|
'不', |
|
'中', |
|
'×', |
|
'根据', |
|
'小', |
|
'由', |
|
'亩', |
|
'也', |
|
'要', |
|
'指', |
|
'法', |
|
'会', |
|
'元', |
|
'主要', |
|
'以及', |
|
'通过', |
|
'首先', |
|
'对', |
|
'然后', |
|
'号', |
|
'以', |
|
'所', |
|
'后', |
|
'丁', |
|
'包括', |
|
'无', |
|
'将', |
|
'用', |
|
'能', |
|
'形', |
|
'方面', |
|
'因素', |
|
'位于', |
|
'而', |
|
'从', |
|
'到', |
|
'一定', |
|
'用于', |
|
'但', |
|
'使用', |
|
'让', |
|
'具有', |
|
'并', |
|
'亿元', |
|
'万元', |
|
'上', |
|
'类', |
|
'基于', |
|
'才', |
|
'来', |
|
'地', |
|
'片', |
|
'其他', |
|
'个', |
|
'或者', |
|
'变得', |
|
'时', |
|
'给', |
|
'你', |
|
'使', |
|
'条', |
|
'受', |
|
'已经', |
|
'带', |
|
'度', |
|
] |
|
|
|
|
|
def normalize_answer(s): |
|
"""Lower text and remove punctuation, articles and extra whitespace.""" |
|
|
|
def remove_articles(text): |
|
return re.sub(r'\b(a|an|the)\b', ' ', text) |
|
|
|
def white_space_fix(text): |
|
return ' '.join(text.split()) |
|
|
|
def remove_punc(text): |
|
exclude = set(string.punctuation) |
|
return ''.join(ch for ch in text if ch not in exclude) |
|
|
|
def lower(text): |
|
return text.lower() |
|
|
|
return white_space_fix(remove_articles(remove_punc(lower(s)))) |
|
|
|
|
|
def normalize_zh_answer(s): |
|
"""Lower text and remove punctuation, extra whitespace.""" |
|
|
|
def white_space_fix(text): |
|
return ''.join(text.split()) |
|
|
|
def remove_punc(text): |
|
cn_punctuation = '!?。。"#$%&'()*+,-/:;<=>@[\]^_`\ |
|
{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏.' |
|
|
|
all_punctuation = set(string.punctuation + cn_punctuation) |
|
return ''.join(ch for ch in text if ch not in all_punctuation) |
|
|
|
def lower(text): |
|
return text.lower() |
|
|
|
return white_space_fix(remove_punc(lower(s))) |
|
|
|
|
|
@ICL_EVALUATORS.register_module() |
|
class LVEvalF1Evaluator(BaseEvaluator): |
|
|
|
def __init__(self, language: str = 'en') -> None: |
|
super().__init__() |
|
assert language in ['en', 'zh'] |
|
self.language = language |
|
|
|
def score(self, predictions: List, references: List) -> dict: |
|
|
|
def f1_score(prediction, reference, **kwargs): |
|
common = Counter(prediction) & Counter(reference) |
|
num_same = sum(common.values()) |
|
if num_same == 0: |
|
return 0 |
|
precision = 1.0 * num_same / len(prediction) |
|
recall = 1.0 * num_same / len(reference) |
|
f1 = (2 * precision * recall) / (precision + recall) |
|
return f1 |
|
|
|
score = 0.0 |
|
for i in range(len(predictions)): |
|
prediction = predictions[i] |
|
reference_list = references[i] |
|
task_score = 0.0 |
|
for reference in reference_list: |
|
if self.language == 'en': |
|
normalized_prediction = normalize_answer(prediction) |
|
normalized_reference = normalize_answer(reference) |
|
|
|
prediction_tokens = normalized_prediction.split() |
|
reference_tokens = normalized_reference.split() |
|
|
|
else: |
|
prediction_tokens = list( |
|
jieba.cut(prediction, cut_all=False)) |
|
reference_tokens = list(jieba.cut(reference, |
|
cut_all=False)) |
|
prediction_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in prediction_tokens |
|
] |
|
reference_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in reference_tokens |
|
] |
|
prediction_tokens = [ |
|
token for token in prediction_tokens if len(token) > 0 |
|
] |
|
reference_tokens = [ |
|
token for token in reference_tokens if len(token) > 0 |
|
] |
|
|
|
task_score = max(task_score, |
|
f1_score(prediction_tokens, reference_tokens)) |
|
break |
|
|
|
score += task_score |
|
|
|
score = score / len(predictions) * 100 |
|
return {'f1': score} |
|
|
|
|
|
@ICL_EVALUATORS.register_module() |
|
class LVEvalOPTF1Evaluator(BaseEvaluator): |
|
|
|
def __init__(self, language: str = 'en') -> None: |
|
super().__init__() |
|
assert language in ['en', 'zh'] |
|
self.language = language |
|
|
|
def score(self, predictions: List, references: List) -> dict: |
|
|
|
def f1_score(prediction, reference, **kwargs): |
|
common = Counter(prediction) & Counter(reference) |
|
num_same = sum(common.values()) |
|
if num_same == 0: |
|
return 0 |
|
precision = 1.0 * num_same / len(prediction) |
|
recall = 1.0 * num_same / len(reference) |
|
f1 = (2 * precision * recall) / (precision + recall) |
|
return f1 |
|
|
|
score = 0.0 |
|
for i in range(len(predictions)): |
|
prediction = predictions[i] |
|
reference_list = references[i] |
|
answer_keyword = reference_list[-1] |
|
task_score = 0.0 |
|
for reference in reference_list: |
|
if self.language == 'en': |
|
normalized_prediction = normalize_answer(prediction) |
|
normalized_reference = normalize_answer(reference) |
|
|
|
prediction_tokens = normalized_prediction.split() |
|
reference_tokens = normalized_reference.split() |
|
|
|
if answer_keyword: |
|
answer_keyword_tokens = normalize_answer( |
|
answer_keyword) |
|
answer_keyword_tokens = answer_keyword_tokens.split() |
|
common = Counter(prediction_tokens) & Counter( |
|
answer_keyword_tokens) |
|
filtered_common = { |
|
key: value |
|
for key, value in common.items() |
|
if key not in ABANDON_WORDS_EN |
|
} |
|
num_same = sum(filtered_common.values()) |
|
recall = 1.0 * num_same / len(answer_keyword_tokens) |
|
if recall < 0.2: |
|
break |
|
else: |
|
prediction_tokens = list( |
|
jieba.cut(prediction, cut_all=False)) |
|
reference_tokens = list(jieba.cut(reference, |
|
cut_all=False)) |
|
prediction_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in prediction_tokens |
|
] |
|
reference_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in reference_tokens |
|
] |
|
prediction_tokens = [ |
|
token for token in prediction_tokens if len(token) > 0 |
|
] |
|
reference_tokens = [ |
|
token for token in reference_tokens if len(token) > 0 |
|
] |
|
if not answer_keyword: |
|
answer_keyword = reference |
|
if answer_keyword: |
|
answer_keyword_tokens = list( |
|
jieba.cut(answer_keyword, cut_all=False)) |
|
answer_keyword_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in answer_keyword_tokens |
|
] |
|
answer_keyword_tokens = [ |
|
token for token in answer_keyword_tokens |
|
if len(token) > 0 |
|
] |
|
common = Counter(prediction_tokens) & Counter( |
|
answer_keyword_tokens) |
|
filtered_common = { |
|
key: value |
|
for key, value in common.items() |
|
if key not in ABANDON_WORDS_ZH |
|
} |
|
num_same = sum(filtered_common.values()) |
|
recall = 1.0 * num_same / len(answer_keyword_tokens) |
|
if recall < 0.4: |
|
break |
|
|
|
task_score = max(task_score, |
|
f1_score(prediction_tokens, reference_tokens)) |
|
break |
|
|
|
score += task_score |
|
|
|
score = score / len(predictions) * 100 |
|
return {'LVEval_f1': score} |
|
|
|
|
|
@ICL_EVALUATORS.register_module() |
|
class LVEvalOPTRougeEvaluator(BaseEvaluator): |
|
|
|
def __init__(self, language: str = 'en') -> None: |
|
super().__init__() |
|
assert language in ['en', 'zh'] |
|
self.language = language |
|
|
|
def score(self, predictions: List, references: List) -> dict: |
|
score = 0.0 |
|
for i in range(len(predictions)): |
|
prediction = predictions[i] |
|
reference_list = references[i] |
|
task_score = 0.0 |
|
for reference in reference_list: |
|
|
|
if self.language == 'zh': |
|
word_blacklist = ABANDON_WORDS_ZH |
|
prediction_tokens = list( |
|
jieba.cut(prediction, cut_all=False)) |
|
reference_tokens = list(jieba.cut(reference, |
|
cut_all=False)) |
|
prediction_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in prediction_tokens |
|
] |
|
reference_tokens = [ |
|
normalize_zh_answer(token) |
|
for token in reference_tokens |
|
] |
|
else: |
|
word_blacklist = ABANDON_WORDS_EN |
|
prediction_tokens = normalize_answer(prediction) |
|
reference_tokens = normalize_answer(reference) |
|
prediction_tokens = prediction_tokens.split() |
|
reference_tokens = reference_tokens.split() |
|
|
|
filtered_prediction_tokens = [ |
|
i for i in prediction_tokens if i not in word_blacklist |
|
] |
|
filtered_reference_tokens = [ |
|
i for i in reference_tokens if i not in word_blacklist |
|
] |
|
prediction = ' '.join(filtered_prediction_tokens) |
|
reference = ' '.join(filtered_reference_tokens) |
|
|
|
rouge = Rouge() |
|
try: |
|
cur_score = rouge.get_scores([prediction], [reference], |
|
avg=True)['rouge-l']['f'] |
|
except Exception: |
|
cur_score = 0.0 |
|
task_score = max(task_score, cur_score) |
|
break |
|
|
|
score += task_score |
|
|
|
score = score / len(predictions) * 100 |
|
return {'LVEval_rouge': score} |
|
|