HumanLikeness / src /backend /model_operations.py
XufengDuan's picture
update scripts
9acf026
raw
history blame
43.3 kB
import os
import time
from datetime import datetime
import logging
from pathlib import Path
import requests
import json
# import numpy as np
import pandas as pd
import spacy
from sentence_transformers import CrossEncoder
import litellm
# from litellm import completion
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoConfig, pipeline
# from accelerate import PartialState
# from accelerate.inference import prepare_pippy
import torch
# import cohere
# from openai import OpenAI
# # import google
import google.generativeai as genai
import src.backend.util as util
import src.envs as envs
#
# # import pandas as pd
# import scipy
from scipy.spatial.distance import jensenshannon
from scipy.stats import bootstrap
import numpy as np
import spacy_transformers
import subprocess
# Run the command to download the spaCy model
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_lg"], check=True)
# subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True)
# subprocess.run(["pip", "install", "spacy-transformers"], check=True)
# subprocess.run(["pip", "install", "curated-transformers"], check=True)
# Load spacy model for word tokenization
# nlp = spacy.load("en_core_web_sm")
try:
nlp1 = spacy.load("en_core_web_lg")
except OSError:
print("Can not load spacy model")
# litellm.set_verbose=False
litellm.set_verbose=True
# Set up basic configuration for logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
class ModelLoadingException(Exception):
"""Exception raised for errors in loading a model.
Attributes:
model_id (str): The model identifier.
revision (str): The model revision.
"""
def __init__(self, model_id, revision, messages="Error initializing model"):
self.model_id = model_id
self.revision = revision
super().__init__(f"{messages} id={model_id} revision={revision}")
class ResponseGenerator:
"""A class to generate responses using a causal language model.
Attributes:
model (str): huggingface/{model_id}
api_base (str): https://api-inference.huggingface.co/models/{model_id}
responses_df (DataFrame): DataFrame to store generated responses.
revision (str): Model revision.
avg_length (float): Average length of responses.
answer_rate (float): Rate of non-empty responses.
"""
def __init__(self, model_id, revision):
"""
Initializes the ResponseGenerator with a model.
Args:
model_id (str): Identifier for the model.
revision (str): Revision of the model.
"""
self.model_id = model_id
self.model = f"huggingface/{model_id}"
self.api_base = f"https://api-inference.huggingface.co/models/{model_id}"
self.responses_df = pd.DataFrame()
self.revision = revision
self.avg_length = None
self.answer_rate = None
self.exceptions = None
self.local_model = None
def generate_response(self, dataset, df_prompt, save_path=None):
"""Generate responses for a given DataFrame of source docs.
Args:
dataset (DataFrame): DataFrame containing source docs.
Returns:
responses_df (DataFrame): Generated responses by the model.
"""
exceptions = []
if (save_path is not None) and os.path.exists(save_path):
'''已存在文件,可以读取已经存在的测试文本'''
self.responses_df = pd.read_csv(save_path)
# print(self.responses_df['Experiment'])
print(f'Loaded generated responses from {save_path}')
else:
'''测试文件不存在,则需要调用指定的模型来进行测试'''
# prompt = {}
# for index, row in tqdm(df_prompt.iterrows(), total=df_prompt.shape[0]):
# prompt['E' + row['Item']] = row['Prompt']
xls = pd.ExcelFile(dataset)
sheet_names = xls.sheet_names
# sheet_names = df.sheetnames
print(f"Total: {len(sheet_names)}")
print(sheet_names)
Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1 = [], [], [], [], [] ,[], [], []
exit_outer_loop = False # bad model
for i, sheet_name in enumerate(sheet_names, start=1):
if exit_outer_loop:
break
# 读取每个工作表
# if i > 2 and i ==1:
# continue
print(i, sheet_name)
df_sheet = pd.read_excel(xls, sheet_name=sheet_name)
# 假设第一列是'Prompt0',但这里我们使用列名来避免硬编码
if 'Prompt0' in df_sheet.columns:
prompt_column = df_sheet['Prompt0']
else:
# 如果'Prompt0'列不存在,则跳过该工作表或进行其他处理
continue
if i == 3 :
word1_list = df_sheet['Stimuli-2']
word2_list = df_sheet['Stimuli-3']
V2_column = []
for jj in range(len(word1_list)):
V2_column.append(word1_list[jj] + '_' + word2_list[jj])
# print(V2_column)
elif i == 9:
V2_column = df_sheet['V2'] #SL, LS
elif i == 4 or i == 6 :
V2_column = df_sheet['Stimuli-2'] #Stimuli-2
else:
V2_column = [""] * len(prompt_column)
q_column = df_sheet["ID"]
Item_column = df_sheet["Item"]
Condition_column = df_sheet["Condition"]
Stimuli_1_column = df_sheet["Stimuli-1"]
if 'Stimuli-2' in df_sheet.columns:
Stimuli_2_column = df_sheet["Stimuli-2"]
for j, prompt_value in enumerate(tqdm(prompt_column, desc=f"Processing {sheet_name}"), start=0):
if exit_outer_loop:
break
ID = 'E' + str(i)
# q_ID = ID + '_' + str(j)
# print(ID, q_ID, prompt_value)
system_prompt = envs.SYSTEM_PROMPT
_user_prompt = prompt_value
for ii in range(10):
# user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}"
while True:
try:
'''调用'''
print(self.model_id.lower(),'-',ID,'-',j,'-',ii)
_response = self.send_request(system_prompt, _user_prompt)
# print(f"Finish index {index}")
break
except Exception as e:
if 'Rate limit reached' in str(e):
wait_time = 3660
current_time = datetime.now().strftime('%H:%M:%S')
print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...")
time.sleep(wait_time)
elif 'is currently loading' in str(e):
wait_time = 200
print(f"Model is loading, wait for {wait_time}")
time.sleep(wait_time)
elif '429 Resource has been exhausted' in str(e): # for gemini models
wait_time = 60
print(f"Quota has reached, wait for {wait_time}")
time.sleep(wait_time)
else:
max_retries = 30
retries = 0
wait_time = 120
while retries < max_retries:
print(f"Error at index {i}: {e}")
time.sleep(wait_time)
try:
_response = self.send_request(system_prompt, _user_prompt)
break
except Exception as ee:
exceptions.append(ee)
retries += 1
print(f"Retry {retries}/{max_retries} failed at index {i}: {ee}")
if retries >= max_retries:
exit_outer_loop = True
break
if exit_outer_loop:
break
if i == 5:
#print(_response)
# For E5, the responses might be in the following formats:
# "Sure\n\nThe first sentence of the response\n\nThe second sentence of the response"
# "The first sentence of the response\n\nThe second sentence of the response"
# "XXX: The first sentence of the response\n\nXXX: The second sentence of the response"
# "Sure\n\nXXX: The first sentence of the response\n\nXXX: The second sentence of the response"
# "Sure\n\nThe first sentence of the response\n\nThe second sentence of the response\n\n"
def extract_responses(text, trigger_words=None):
if trigger_words is None:
trigger_words = ["sure", "okay", "yes"]
try:
# Split the text into sentences
sentences = text.split('\n')
# Remove empty sentences
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
# Remove the first sentence if it has a : in it,
sentences = [sentence.split(':', 1)[-1].strip() if ':' in sentence else sentence for
sentence in sentences]
# Remove empty sentences
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
# Remove the first sentence if it is a trigger word
if any(sentences[0].lower().startswith(word) for word in trigger_words) and len(
sentences) > 2:
_response1 = sentences[1].strip() if len(sentences) > 1 else None
_response2 = sentences[2].strip() if len(sentences) > 2 else None
else:
_response1 = sentences[0].strip() if len(sentences) > 0 else None
_response2 = sentences[1].strip() if len(sentences) > 1 else None
except Exception as e:
print(f"Error occurred: {e}")
_response1, _response2 = None, None
print(_response1), print(_response2)
return _response1, _response2
_response1, _response2 = extract_responses(_response)
Experiment_ID.append(ID)
Questions_ID.append(q_column[j])
User_prompt.append(_user_prompt)
Response.append(_response2)
Factor_2.append(_response)
Stimuli_1.append(Stimuli_2_column[j])
Item_ID.append(Item_column[j])
Condition.append(Condition_column[j])
# the first sentence in the response is saved as E51
Experiment_ID.append(ID + '1')
Questions_ID.append(str(q_column[j]) + '1')
User_prompt.append(_user_prompt)
Response.append(_response1)
Factor_2.append(_response)
Stimuli_1.append(Stimuli_1_column[j])
Item_ID.append(Item_column[j])
Condition.append(Condition_column[j])
else:
Experiment_ID.append(ID)
Questions_ID.append(q_column[j])
User_prompt.append(_user_prompt)
Response.append(_response)
if i == 6:
Factor_2.append(Condition_column[j])
Stimuli_1.append(V2_column[j])
else:
Factor_2.append(V2_column[j])
Stimuli_1.append(Stimuli_1_column[j])
Item_ID.append(Item_column[j])
Condition.append(Condition_column[j])
# Sleep to prevent hitting rate limits too frequently
time.sleep(1)
self.responses_df = pd.DataFrame(list(zip(Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1)),
columns=["Experiment", "Question_ID", "Item", "Condition", "User_prompt", "Response","Factor 2","Stimuli 1"])
if save_path is not None:
print(f'Save responses to {save_path}')
fpath = Path(save_path)
fpath.parent.mkdir(parents=True, exist_ok=True)
self.responses_df.to_csv(fpath)
self.exceptions = exceptions
# self._compute_avg_length()
# self._compute_answer_rate()
return self.responses_df
def send_request(self, system_prompt: str, user_prompt: str):
# Using Together AI API
using_together_api = False
together_ai_api_models = ['mixtral', 'dbrx', 'wizardlm']
for together_ai_api_model in together_ai_api_models:
if together_ai_api_model in self.model_id.lower():
#using_together_api = True
break
# print('适用哪一种LLM',together_ai_api_model , using_together_api)
# print(self.model_id.lower()) #meta-llama/llama-2-7b-chat-hf
# print('local',self.local_model) $None
# exit()
# if 'mixtral' in self.model_id.lower() or 'dbrx' in self.model_id.lower() or 'wizardlm' in self.model_id.lower(): # For mixtral and dbrx models, use Together AI API
if using_together_api:
# suffix = "completions" if ('mixtral' in self.model_id.lower() or 'base' in self.model_id.lower()) else "chat/completions"
suffix = "chat/completions"
url = f"https://api.together.xyz/v1/{suffix}"
payload = {
"model": self.model_id,
# "max_tokens": 4096,
'max_new_tokens': 100,
# "a": 0.0,
# 'repetition_penalty': 1.1 if 'mixtral' in self.model_id.lower() else 1
}
payload['messages'] = [{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}]
headers = {
"accept": "application/json",
"content-type": "application/json",
"Authorization": f"Bearer {os.environ['TOGETHER_API_KEY']}"
}
response = requests.post(url, json=payload, headers=headers)
try:
result = json.loads(response.text)
# print(result)
result = result["choices"][0]
if 'message' in result:
result = result["message"]["content"].strip()
else:
result = result["text"]
result_candidates = [result_cancdidate for result_cancdidate in result.split('\n\n') if len(result_cancdidate) > 0]
result = result_candidates[0]
print(result)
except:
print(response)
result = ''
print(result)
return result
if self.local_model: # cannot call API. using local model
messages=[
{"role": "system", "content": system_prompt}, # gemma-1.1 does not accept system role
{"role": "user", "content": user_prompt}
]
try: # some models support pipeline
pipe = pipeline(
"text-generation",
model=self.local_model,
tokenizer=self.tokenizer,
)
generation_args = {
"max_new_tokens": 100,
"return_full_text": False,
#"temperature": 0.0,
"do_sample": False,
}
output = pipe(messages, **generation_args)
result = output[0]['generated_text']
print(result)
except:
prompt = self.tokenizer.apply_chat_template(messages,add_generation_prompt=True, tokenize=False)
print(prompt)
input_ids = self.tokenizer(prompt, return_tensors="pt").to('cuda')
with torch.no_grad():
outputs = self.local_model.generate(**input_ids, max_new_tokens=100, do_sample=True, pad_token_id=self.tokenizer.eos_token_id)
result = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
result = result.replace(prompt[0], '')
print(result)
return result
elif self.local_model is None:
import random
def get_random_token():
i = random.randint(1, 20)
token = getattr(envs, f"TOKEN{i}")
return token, i
tokens_tried = set()
while len(tokens_tried) < 10:
token,i = get_random_token()
if token in tokens_tried:
continue
tokens_tried.add(token)
print(f"Trying with token: TOKEN{i}")
try:
from huggingface_hub import InferenceClient
client = InferenceClient(self.model_id, api_key=token, headers={"X-use-cache": "false"})
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
result = None
while result is None:
outputs = client.chat_completion(messages, max_tokens=100)
result = outputs['choices'][0]['message']['content']
if result is None:
time.sleep(1) # Optional: Add a small delay before retrying
return result
except Exception as e:
print(f"Error with token: {token}, trying another token...")
continue
raise Exception("All tokens failed.")
elif 'gemini' in self.model_id.lower():
genai.configure(api_key=os.getenv('GOOGLE_AI_API_KEY'))
generation_config = {
# "temperature": 0,
# "top_p": 0.95, # cannot change
# "top_k": 0,
"max_output_tokens": 100,
# "response_mime_type": "application/json",
}
safety_settings = [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE"
},
]
model = genai.GenerativeModel(
model_name="gemini-1.5-pro-latest" if "gemini-1.5-pro" in self.model_id.lower() else
self.model_id.lower().split('google/')[-1],
generation_config=generation_config,
system_instruction=system_prompt,
safety_settings=safety_settings)
convo = model.start_chat(history=[])
convo.send_message(user_prompt)
# print(convo.last)
result = convo.last.text
print(result)
return result
# Using OpenAI API
elif 'gpt' in self.model_id.lower():
response = litellm.completion(
model=self.model_id.replace('openai/',''),
messages=[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}],
# temperature=0.0,
max_tokens=100,
api_key = os.getenv('OpenAI_key')
)
result = response['choices'][0]['message']['content']
# print()
print(result)
return result
# exit()
# Using local model
class EvaluationModel:
"""A class to evaluate generated responses.
Attributes:
model (CrossEncoder): The evaluation model.
scores (list): List of scores for the responses.
humanlike_score (float): Human-likeness score
"""
def __init__(self):
"""
Initializes the EvaluationModel.
"""
self.scores = []
self.humanlike_score = None
def code_results_llm(self, responses_df):
'''code results from LLM's response'''
output = []
'''database for Exp4'''
item4 = pd.read_csv(envs.ITEM_4_DATA)
wordpair2code = {}
for j in range(len(item4['Coding'])):
wordpair2code[item4['Pair'][j]] = item4['Coding'][j]
'''verb for Exp5'''
item5 = pd.read_csv(envs.ITEM_5_DATA)
# item corresponding to verb, same item id corresponding to verb pair
item2verb2 = {}
item2verb1 = {}
Stimuli1, Stimuli2 = {}, {}
for j in range(len(item5['Item'])):
item2verb1[item5['Item'][j]] = item5['Verb1'][j]
item2verb2[item5['Item'][j]] = item5['Verb2'][j]
Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j]
Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j]
male_keyword = ["he", "his", "himself"]
female_keyword = ["she", "her", "herself"]
#print(len(responses_df["Experiment"]))
for i in range(len(responses_df["Experiment"])):
print(i, "/", len(responses_df["Experiment"]))
# vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0
# print()
if pd.isna(responses_df["Response"][i]):
output.append("Other")
continue
rs = responses_df["Response"][i].strip().lower()
rs = rs.replace('"', '').replace(" ", " ").replace('.', '')
lines = rs.split("\n")
filtered_lines = [line for line in lines if line and not (line.endswith(":") or line.endswith(":"))]
filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for
r in filtered_lines]
rs = "\n".join(filtered_lines)
rs = rs.strip()
'''Exp1'''
if responses_df["Experiment"][i] == "E1":
#print("E1", rs)
if rs == "round":
# vote_1_1 += 1
output.append("Round")
elif rs == "spiky":
output.append("Spiky")
else:
output.append("Other")
'''Exp2'''
elif responses_df["Experiment"][i] == "E2":
# rs = responses_df["Response"][i].strip()
rs = rs.split(' ')
#print("E2", rs)
male, female = 0, 0
for word in rs:
if word in female_keyword and male == 0:
female = 1
output.append("Female")
break
if word in male_keyword and female == 0:
male = 1
output.append("Male")
break
if male == 0 and female == 0:
output.append("Other")
'''Exp3'''
elif responses_df["Experiment"][i] == "E3":
# rs = responses_df["Response"][i].strip()
#print("E3", rs)
pair = responses_df["Factor 2"][i]
word1, word2 = pair.replace(".", "").split('_')
if responses_df["Item"][i] == 12:
output.append("Other")
else:
words = rs.split() # split the response into words
if any(word == word1 for word in words) and any(word == word2 for word in words):
output.append("Other")
else:
if any(word.lower() == word1.lower() for word in words):
if len(word1) > len(word2):
output.append("Long")
else:
output.append("Short")
elif any(word.lower() == word2.lower() for word in words):
if len(word1) > len(word2):
output.append("Short")
else:
output.append("Long")
else:
if len(words) > 1:
# joint the words using " "
word = " ".join(words)
if word.lower() == word1.lower():
if len(word1) > len(word2):
output.append("Long")
else:
output.append("Short")
elif word.lower() == word2.lower():
if len(word1) > len(word2):
output.append("Short")
else:
output.append("Long")
else:
output.append("Other")
else:
output.append("Other")
'''Exp4'''
elif responses_df["Experiment"][i] == "E4":
lines = rs.split("\n")
filtered_lines = []
if len(lines) > 1:
for r in lines[1:]:
if ':' in r:
filtered_lines.append(r.split(':', 1)[-1].strip())
else:
filtered_lines.append(r)
filtered_lines.insert(0, lines[0])
else:
filtered_lines = lines
print(filtered_lines)
filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines]
rs = "\n".join(filtered_lines)
filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for r in rs.split(";")]
filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines]
rs = ";".join(filtered_lines).strip()
try:
meaning_word = rs.split(";")[4].replace(" ", '')
except IndexError:
try:
meaning_word = rs.split("\n")[4].replace(" ", '')
except IndexError:
output.append("Other")
continue
except Exception as e:
print(f"Unexpected error: {e}")
output.append("Other")
continue
target = responses_df["Factor 2"][i].strip().lower()
pair = target + "_" + meaning_word
#print("E4:", pair)
if pair in wordpair2code.keys():
output.append(wordpair2code[pair])
else:
output.append("Other")
'''Exp5'''
elif responses_df["Experiment"][i] == "E5" or responses_df["Experiment"][i] == "E51":
# sentence = responses_df["Response"][i].strip()
item_id = responses_df["Item"][i]
question_id = responses_df["Question_ID"][i]
sti1, sti2 = "", ""
if responses_df["Experiment"][i] == "E51":
sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "")
#sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "")
verb = item2verb1[item_id].lower()
sentence = sti1 + " " + rs.replace(sti1, "")
#print("E5", verb, sentence)
if responses_df["Experiment"][i] == "E5":
#sti1 = Stimuli1[question_id].lower().replace("...", "")
# print(sti1)
sti2 = Stimuli2[question_id].lower().replace("...", "")
verb = item2verb2[item_id].lower()
sentence = sti2 + " " + rs.replace(sti2, "")
#print("E5", verb, sentence)
doc = nlp1(sentence.replace(" ", " "))
# print(doc)
# print()
verb_token = None
for token in doc:
# print(token.lemma_)
if token.lemma_ == verb:
verb_token = token
break
# exit()
pobj, dative = None, None
# print(verb_token.children)
# exit()
if verb_token is not None:
for child in verb_token.children:
# print(child)
if (child.dep_ == 'dative' and child.pos_ == "ADP") or (
child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"):
pobj = child.text
if child.dep_ == 'dative':
dative = child.text
# print("E5", pobj, dative)
# exit()
if pobj:
output.append("PO")
elif dative:
output.append("DO")
else:
# print("Other", sentence, pobj, dative)
# exit()
output.append("Other")
'''Exp6'''
elif responses_df["Experiment"][i] == "E6":
sentence = responses_df["Stimuli 1"][i].strip().lower()
#print("E6", sentence)
doc = nlp1(sentence)
subject = "None"
obj = "None"
for token in doc:
if token.dep_ == "nsubj":
subject = token.text
elif token.dep_ == "dobj":
obj = token.text
#print("E6", subject, obj)
if subject in rs and obj in rs:
#print(rs, subject, obj, "Other")
output.append("Other")
elif subject in rs:
#print(rs, subject, obj, "VP")
output.append("VP")
elif obj in rs:
#print(rs, subject, obj, "NP")
output.append("NP")
else:
#print(rs, subject, obj, "Other")
output.append("Other")
'''Exp7'''
elif responses_df["Experiment"][i] == "E7":
# rs = responses_df["Response"][i].strip().lower()
rs = rs.replace(".", "").replace(",", "").lower()
#print("E7", rs)
if "yes" in rs and "no" in rs:
output.append("Other")
elif "no" in rs:
output.append("0")
elif "yes" in rs:
output.append("1")
else:
output.append("Other")
'''Exp8'''
elif responses_df["Experiment"][i] == "E8":
# rs = responses_df["Response"][i].strip()
#print("E8", rs)
if "something is wrong with the question" in rs:
output.append("1")
else:
output.append("0")
'''Exp9'''
elif responses_df["Experiment"][i] == "E9":
male, female = 0, 0
# rs = responses_df["Response"][i].strip()
if "because" in rs:
rs = rs.replace("because because", "because").split("because")[1]
else:
rs = rs
condition = responses_df["Factor 2"][i].strip()
rs = rs.split(" ")
for w in rs:
if w in male_keyword and female != 1:
male = 1
break
if w in female_keyword and male != 1:
female = 1
break
#print("E9", "condition", condition, "male", male, "female", female)
if male == 0 and female == 0:
output.append('Other')
else:
if male == 1 and female == 0:
if condition == "MF":
output.append("Subject")
elif condition == "FM":
output.append("Object")
else:
output.append("Other")
elif female == 1 and male == 0:
if condition == "MF":
output.append("Object")
elif condition == "FM":
output.append("Subject")
else:
output.append("Other")
'''Exp10'''
elif responses_df["Experiment"][i] == "E10":
# rs = responses_df["Response"][i].strip()
rs = rs.replace(".", "")
if rs == "yes":
output.append("1")
else:
output.append("0")
else:
#print("can;t find the Exp:", responses_df["Experiment"][i])
output.append("NA")
# print(output)
# exit()
'''LLM'''
print(len(output))
self.data = pd.DataFrame(list(
zip(responses_df["Experiment"], responses_df["Question_ID"], responses_df["Item"], responses_df["Response"],
responses_df["Factor 2"], responses_df["Stimuli 1"], output)),
columns=["Experiment", "Question_ID", "Item", "Response", "Factor 2", "Stimuli 1",
"Coding"])
return self.data
def calculate_js_divergence(self, file_path_1, file_path_2):
"""
Calculate the Jensen-Shannon divergence for response distributions between two datasets.
- Extracts E5 and E51 pairs, creates new data based on comparison,
removes the original E5 and E51, and then calculates the JS divergence between the datasets.
Parameters:
file_path_1 (str): Path to the first dataset file (Excel format).
file_path_2 (str): Path to the second dataset file (CSV format).
Returns:
float: The average JS divergence across all common Question_IDs.
"""
# Load the datasets
human_df = pd.read_csv(file_path_1, encoding='ISO-8859-1')
llm_df = pd.read_csv(file_path_2)
def create_e5_entries(df):
new_entries = []
for i in range(len(df) - 1):
if 'E51' in df.iloc[i]['Experiment']:
priming_id = df.iloc[i][0]-1
priming_row_id = df[df.iloc[:, 0] == priming_id].index[0]
new_question_id = df.iloc[priming_row_id]['Question_ID']
label = 1 if df.iloc[i]['Coding'] == df.iloc[priming_row_id]['Coding'] else 0
new_entries.append({
'Question_ID': new_question_id,
'Response': f'{df.iloc[i]["Coding"]}-{df.iloc[priming_row_id]["Coding"]}',
'Coding': label
})
return pd.DataFrame(new_entries)
# Create new E5 entries for both datasets
human_e5 = create_e5_entries(human_df)
llm_e5 = create_e5_entries(llm_df)
# Remove E5 and E51 entries from both datasets
human_df = human_df[~human_df['Question_ID'].str.contains('E5')]
llm_df = llm_df[~llm_df['Question_ID'].str.contains('E5')]
# Append new E5 entries to the cleaned dataframes
human_df = pd.concat([human_df, human_e5], ignore_index=True)
llm_df = pd.concat([llm_df, llm_e5], ignore_index=True)
### Calculate Average JS Divergence ###
# Extract the relevant columns for JS divergence calculation
human_responses = human_df[['Question_ID', 'Coding']]
llm_responses = llm_df[['Question_ID', 'Coding']]
# Get unique Question_IDs present in both datasets
common_question_ids = set(human_responses['Question_ID']).intersection(set(llm_responses['Question_ID']))
# Initialize a dictionary to store JS divergence for each experiment
js_divergence = {}
# Calculate JS divergence for each common Question_ID
for q_id in common_question_ids:
# Get response distributions for the current Question_ID in both datasets
human_dist = human_responses[human_responses['Question_ID'] == q_id]['Coding'].value_counts(
normalize=True)
llm_dist = llm_responses[llm_responses['Question_ID'] == q_id]['Coding'].value_counts(normalize=True)
# Reindex the distributions to have the same index, filling missing values with 0
all_responses = set(human_dist.index).union(set(llm_dist.index))
human_dist = human_dist.reindex(all_responses, fill_value=0)
llm_dist = llm_dist.reindex(all_responses, fill_value=0)
# Calculate JS divergence
js_div = jensenshannon(human_dist, llm_dist, base=2)
experiment_id = q_id.split('_')[1]
if experiment_id not in js_divergence:
js_divergence[experiment_id] = []
js_divergence[experiment_id].append(js_div)
# Calculate the average JS divergence per experiment and the confidence interval
results = {}
for exp, divs in js_divergence.items():
avg_js_divergence = 1 - np.nanmean(divs)
ci_lower, ci_upper = bootstrap((divs,), np.nanmean, confidence_level=0.95,
n_resamples=1000).confidence_interval
results[exp] = {
'average_js_divergence': avg_js_divergence,
'confidence_interval': (1 - ci_upper, 1 - ci_lower) # Adjust for 1 - score
}
# Calculate the overall average JS divergence and confidence interval
overall_js_divergence = 1 - np.nanmean([js for divs in js_divergence.values() for js in divs])
flattened_js_divergence = np.concatenate([np.array(divs) for divs in js_divergence.values()])
# 计算总体的置信区间
overall_ci_lower, overall_ci_upper = bootstrap(
(flattened_js_divergence,),
np.nanmean,
confidence_level=0.95,
n_resamples=1000
).confidence_interval
# Combine all results into one dictionary
all_results = {
'overall': {
'average_js_divergence': overall_js_divergence,
'confidence_interval': (1 - overall_ci_upper, 1 - overall_ci_lower)
},
'per_experiment': results
}
return all_results
def evaluate_humanlike(self, responses_df: pd.DataFrame, human_data_path: object, result_save_path: str) -> object:
'''
evaluate humanlike score
1. code the result
2. comput the similaritirs between human and model
process model responses'''
'''coding human data'''
# self.huamn_df = pd.read_csv(human_data_path)
# self.data = self.code_results(self.huamn_df)
#save_path = human_data_path.replace('.csv','_coding.csv')
#human_save_path = "./src/datasets/coding_human.xlsx"
# if save_path is not None:
# print(f'Save human coding results to {save_path}')
# fpath = Path(save_path)
# fpath.parent.mkdir(parents=True, exist_ok=True)
# self.data.to_csv(fpath)
'''coding llm data'''
save_path = result_save_path.replace('.csv','_coding.csv')
self.llm_df = self.code_results_llm(responses_df)
if save_path is not None:
print(f'Save LLM coding results to {save_path}')
fpath = Path(save_path)
fpath.parent.mkdir(parents=True, exist_ok=True)
self.llm_df.to_csv(fpath)
envs.API.upload_file(
path_or_fileobj=save_path,#./generation_results/meta-llama/Llama-2-13b-chat-hf_coding.csv
path_in_repo=f"{save_path.replace('generation_results/','')}",#
repo_id=envs.RESULTS_REPO,
repo_type="dataset",
)
# file_path_1 = '/Users/simon/Downloads/coding_human.xlsx'
# file_path_2 = '/Users/simon/Downloads/Meta-Llama-3.1-70B-Instruct_coding.csv'
avg_js_divergence = self.calculate_js_divergence(human_data_path, save_path)
return avg_js_divergence