File size: 5,866 Bytes
5f37ab9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import zhipuai
import traceback
import pandas as pd
from tqdm import *
import re
import torch
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '0,1,2'
import random
import time
from transformers import (
AutoModelForSeq2SeqLM,
AutoModelForCausalLM,
AutoTokenizer,
PreTrainedModel,
PreTrainedTokenizerBase,
)
from transformers.generation.utils import GenerationConfig
class GLM3_6B_API():
'''
GLM3_6B_API defined by yourself
'''
def __init__(self) -> None:
self.model_name_or_path = "your_model_path"
self.init = True
def chat(self, prompt) -> str:
for _ in range(5):
if self.init:
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name_or_path,
trust_remote_code=True,
device_map="auto",
torch_dtype=(
torch.bfloat16
if torch.cuda.is_bf16_supported()
else torch.float32
),
).eval()
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name_or_path,
trust_remote_code=True,
use_fast=True,
add_bos_token=False,
add_eos_token=False,
padding_side="left",
)
self.init = False
try:
print(prompt)
response, re_history, probabilities = self.model.chat(self.tokenizer, prompt, history=[], do_sample=False)
print(response)
return response
except:
traceback.print_exc()
time.sleep(5)
continue
return None
glm3_6b = GLM3_6B_API()
def parse_num(res, min_score, max_score):
"""
Extract the integers within the specified range from the evaluation result.
Input: A string
Output: A score within the specified range or exception(-1)
If no numbers appear: return -1
If a fraction appears, match the numerator; exit if it falls within the range, otherwise continue.
If "out of" appears, match the preceding number; exit if it falls within the range, otherwise continue.
Extract the first number that falls within the specified range from all appearing numbers; exit if it falls within the range, otherwise continue.
If no numbers fall within the specified range, return -1.
"""
all_nums = re.findall(r"-?\d+(?:\.\d+)?", res)
probs1_nums = re.finditer(r"\b(\d+(\.\d+)?)/\d+\b" , res) # extract fraction
probs2_nums = re.finditer(r"\b(\d+(\.\d+)?)\s+out\s+of\s+\d+\b" , res) # extract "out of"
if len(all_nums) == 0:
print("this res doesn't have num! \n", res)
return -1
answer = -1
for match in probs1_nums:
answer = match.group(1)
if float(answer) >= min_score and float(answer) <= max_score:
return answer
else:
for match in probs2_nums:
answer = match.group(1)
if float(answer) >= min_score and float(answer) <= max_score:
return answer
else:
for num in all_nums:
if float(num) >= min_score and float(num) <= max_score: # the specified range
answer = num
return answer
print("this res doesn't have right num! ", res)
return -1
def get_prompt(taskId):
"""
Find the corresponding prompt based on the taskId.
"""
prompt = ""
if taskId == 0:
prompt = open("prompt/prompt_Dialog.txt", encoding='utf-8').read().strip()
elif taskId == 1:
prompt = open("prompt/prompt_Story.txt", encoding='utf-8').read().strip()
elif taskId == 2:
prompt = open("prompt/prompt_Xsum.txt", encoding='utf-8').read().strip()
elif taskId == 3:
prompt = open("prompt/prompt_NFCATS.txt", encoding='utf-8').read().strip()
return prompt
def get_model_score(taskId, question, answer, model):
"""
pointwise 5-level as an example
"""
prompt = get_prompt(taskId)
prompt = prompt.replace("{{question_text}}", question)
prompt = prompt.replace("{{answer_text}}", answer)
result = model.chat(prompt)
score = int(parse_num(result, 1, 5))
if score == -1:
score = random.randint(1,5)
return score
def get_rank(data):
"""
Calculate the rankings in descending order, and for ties, assign the lowest rank. For example, the ranking for [1,1,2] would be [2,2,1].
"""
series = pd.Series(data)
ranks = series.rank(method='min', ascending=False)
return list(map(int, ranks.tolist()))
def get_output(path, model):
"""
Obtain the results of the test set from the specified path.
"""
df = pd.read_csv(path)
row_labels = df.index
# taskId,taskName,questionId,question,answerId,answer,score,rank
model_scores = []
with open("output/baseline1_chatglm3_6B.txt", 'a') as f:
for row in tqdm(row_labels):
taskId = df.loc[row, "taskId"]
questionId = df.loc[row, "questionId"]
question = df.loc[row, "question"]
answer = df.loc[row, "answer"]
model_score = get_model_score(taskId, question, answer, model)
model_scores.append(model_score)
if len(model_scores) == 7:
ranks = get_rank(model_scores)
for i in range(7):
answerId = i
f.write(f"{taskId} {questionId} {answerId} {model_scores[i]} {ranks[i]}\n")
model_scores = []
if __name__ == '__main__':
paths = ['test/test_dialog.csv', 'test/test_NFCATS.csv', 'test/test_story.csv', 'test/test_Xsum.csv']
for path in paths[1:]:
get_output(path, glm3_6b) |