|
import json |
|
import random |
|
from pathlib import Path |
|
|
|
import tiktoken |
|
from datasets import Dataset |
|
|
|
from opencompass.datasets.base import BaseDataset |
|
from opencompass.openicl import BaseEvaluator |
|
from opencompass.registry import LOAD_DATASET |
|
|
|
|
|
def get_unique_entries(file_path, |
|
n, |
|
language, |
|
unique_arg1=False, |
|
unique_arg2=False, |
|
unique_combination=False): |
|
seen_arg1 = set() |
|
seen_arg2 = set() |
|
seen_combinations = set() |
|
results = [] |
|
|
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
lines = file.readlines() |
|
random.shuffle(lines) |
|
|
|
for line in lines: |
|
try: |
|
entry = json.loads(line.strip()) |
|
except json.JSONDecodeError: |
|
continue |
|
|
|
if entry.get('language') != language: |
|
continue |
|
|
|
key1 = entry.get('arg1', '') if unique_arg1 else '' |
|
key2 = entry.get('arg2', '') if unique_arg2 else '' |
|
combination = (key1, key2) if unique_combination else '' |
|
|
|
if (key1 not in seen_arg1 or not unique_arg1) and \ |
|
(key2 not in seen_arg2 or not unique_arg2) and \ |
|
(combination not in seen_combinations or not unique_combination): |
|
seen_arg1.add(key1) |
|
seen_arg2.add(key2) |
|
seen_combinations.add(combination) |
|
results.append(entry) |
|
|
|
if len(results) == n: |
|
break |
|
|
|
return results |
|
|
|
|
|
@LOAD_DATASET.register_module() |
|
class NeedleBenchParallelDataset(BaseDataset): |
|
|
|
@staticmethod |
|
def load( |
|
path: str, |
|
needle_file_name: str, |
|
length: int, |
|
depths: list[int], |
|
tokenizer_model: str, |
|
file_list: list[str], |
|
num_repeats_per_file: int, |
|
length_buffer: int, |
|
guide: bool, |
|
language: str, |
|
): |
|
data = {'prompt': [], 'answer': []} |
|
tokenizer = tiktoken.encoding_for_model(tokenizer_model) |
|
|
|
files = Path(path).glob('*.jsonl') |
|
for file in files: |
|
if file.name == needle_file_name: |
|
needle_file_path = file |
|
|
|
predefined_needles_bak = get_unique_entries(needle_file_path, |
|
len(depths), |
|
language, |
|
unique_arg1=True, |
|
unique_arg2=True, |
|
unique_combination=True) |
|
|
|
def _generate_context(tokens_context, depths, needles): |
|
insertion_points = [ |
|
int(len(tokens_context) * (depth / 100)) for depth in depths |
|
] |
|
|
|
cumulative_inserted_length = 0 |
|
|
|
for i, needle in enumerate(needles): |
|
needle_tokens = _get_tokens_from_context(needle) |
|
current_insertion_point = min( |
|
insertion_points[i] + cumulative_inserted_length, |
|
len(tokens_context)) |
|
|
|
tokens_context = tokens_context[:current_insertion_point] + \ |
|
needle_tokens + tokens_context[current_insertion_point:] |
|
cumulative_inserted_length += len(needle_tokens) |
|
|
|
new_context = _decode_tokens(tokens_context) |
|
return new_context |
|
|
|
def _get_tokens_from_context(context): |
|
if isinstance(context, list): |
|
return [tokenizer.encode(item) for item in context] |
|
else: |
|
return tokenizer.encode(context) |
|
|
|
def _decode_tokens(tokens): |
|
return tokenizer.decode(tokens) |
|
|
|
def _modify_retrieval_question(retrieval_question): |
|
if language == 'Chinese': |
|
parts = retrieval_question.split('请按照') |
|
guide_retrieval_question = (parts[0] + '在回答之前,请思考文档中与此问题' |
|
'最相关的内容是什么。请按照' + parts[1]) |
|
return guide_retrieval_question |
|
elif language == 'English': |
|
parts = retrieval_question.split('Please answer in the format') |
|
guide_retrieval_question = ( |
|
parts[0] + 'Before answering, please consider' |
|
' what in the document is most relevant to this question.' |
|
' Please answer in the format' + parts[1]) |
|
return guide_retrieval_question |
|
else: |
|
raise ValueError(f"Language '{language}' is not supported.") |
|
|
|
def _generate_prompt(context, retrieval_question): |
|
if guide: |
|
retrieval_question = _modify_retrieval_question( |
|
retrieval_question) |
|
|
|
if language == 'Chinese': |
|
prompt = ('你是一个善于回答用户问题的智能AI助手\n' |
|
'请保持你的回答简洁清楚。不要说和下面文档中的无关的话' |
|
',或重复你的回答\n请先仔细阅读下面的文档再依次回答' |
|
f'最后提出的问题\n用户现在给你的文档是{context}\n\n' |
|
f'现在请问:{retrieval_question}\n') |
|
elif language == 'English': |
|
prompt = ( |
|
'You are an intelligent AI assistant skilled in ' |
|
'answering user questions.\n' |
|
'Please keep your answers concise and clear. Do not' |
|
' talk about irrelevant topics or repeat your ' |
|
'answers.\n' |
|
f'The document given to you by the user is {context}' |
|
f'\n\nNow, the questions are: {retrieval_question}\n') |
|
else: |
|
raise ValueError(f"Language '{language}' is not supported.") |
|
|
|
return prompt |
|
|
|
files = Path(path).glob('*.jsonl') |
|
for file in files: |
|
if file.name not in file_list: |
|
continue |
|
|
|
with open(file, 'r', encoding='utf-8') as f: |
|
lines_bak = [json.loads(line.strip()) for line in f] |
|
lines = lines_bak.copy() |
|
for counter in range(num_repeats_per_file): |
|
random.seed(counter) |
|
random.shuffle(lines) |
|
predefined_needles = predefined_needles_bak.copy() |
|
random.shuffle(predefined_needles) |
|
|
|
needles = [ |
|
'\n' + item['needle'] + '\n' for item in predefined_needles |
|
] |
|
keywords = [item['arg2'] for item in predefined_needles] |
|
if language == 'Chinese': |
|
questions = '、'.join([ |
|
item['retrieval_question'].split('?')[0] + '?' |
|
for item in predefined_needles |
|
]) |
|
|
|
answers_format = '、'.join([ |
|
item['retrieval_question'].split("'")[1].split('。')[0] |
|
for item in predefined_needles |
|
]) |
|
retrieval_question = questions + "请按照'" + \ |
|
answers_format + "'的格式回答。" |
|
elif language == 'English': |
|
questions = '、'.join([ |
|
item['retrieval_question'].split('?')[0] + '?' |
|
for item in predefined_needles |
|
]) |
|
|
|
answers_format = '、'.join([ |
|
item['retrieval_question'].split("'")[1].split('.')[0] |
|
for item in predefined_needles |
|
]) |
|
retrieval_question = questions + \ |
|
"Please answer in the format of '" + \ |
|
answers_format + "'" |
|
|
|
context_length = length - length_buffer |
|
target_length_per_record = context_length - \ |
|
sum(len(tokens) for tokens |
|
in _get_tokens_from_context(needles)) |
|
target_length_per_record = max(target_length_per_record, 0) |
|
accumulated_tokens = [] |
|
for line in lines: |
|
tokens_current_line = _get_tokens_from_context( |
|
line['text']) |
|
accumulated_tokens.extend(tokens_current_line) |
|
|
|
if len(accumulated_tokens) >= target_length_per_record: |
|
break |
|
|
|
processed_text = _generate_context( |
|
accumulated_tokens[:target_length_per_record], depths, |
|
needles) |
|
|
|
processed_prompt = _generate_prompt(processed_text, |
|
retrieval_question) |
|
|
|
data['prompt'].append(processed_prompt) |
|
|
|
data['answer'].append('*'.join(keywords) + '#' + |
|
'*'.join(map(str, depths))) |
|
|
|
dataset = Dataset.from_dict({ |
|
'prompt': data['prompt'], |
|
'answer': data['answer'], |
|
}) |
|
return dataset |
|
|
|
|
|
class NeedleBenchParallelEvaluator(BaseEvaluator): |
|
|
|
def levenshtein_distance(self, s1, s2): |
|
if len(s1) < len(s2): |
|
return self.levenshtein_distance(s2, s1) |
|
|
|
if len(s2) == 0: |
|
return len(s1) |
|
|
|
previous_row = range(len(s2) + 1) |
|
for i, c1 in enumerate(s1): |
|
current_row = [i + 1] |
|
for j, c2 in enumerate(s2): |
|
insertions = previous_row[j + 1] + 1 |
|
deletions = current_row[j] + 1 |
|
substitutions = previous_row[j] + (c1 != c2) |
|
current_row.append(min(insertions, deletions, substitutions)) |
|
previous_row = current_row |
|
|
|
return previous_row[-1] |
|
|
|
def score(self, predictions, gold): |
|
if len(predictions) != len(gold): |
|
return {'error': 'predictions and gold have different lengths'} |
|
print('predictions:', predictions) |
|
print('gold:', gold) |
|
|
|
details = [] |
|
depths = [int(i) for i in gold[0].split('#')[1].split('*')] |
|
scores_by_depth = {depth: 0 for depth in depths} |
|
|
|
for prediction, reference in zip(predictions, gold): |
|
print(reference) |
|
keywords = reference.split('#')[0].split('*') |
|
print(keywords) |
|
for keyword, depth in zip(keywords, depths): |
|
print('iterating:', keyword, depth) |
|
if keyword in prediction: |
|
print(f'{keyword} at depth {depth} is in {prediction}') |
|
scores_by_depth[depth] += 100 / (len(predictions)) |
|
|
|
average_score = sum(scores_by_depth.values()) / len(scores_by_depth) |
|
|
|
flattened_scores = { |
|
'Depth' + str(depth): score |
|
for depth, score in scores_by_depth.items() |
|
} |
|
|
|
result = { |
|
**flattened_scores, 'details': details, |
|
'average_score': average_score |
|
} |
|
return result |
|
|