import os import time from datetime import datetime import logging from pathlib import Path import requests import json # import numpy as np import pandas as pd import spacy from sentence_transformers import CrossEncoder import litellm # from litellm import completion from tqdm import tqdm from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoConfig, pipeline # from accelerate import PartialState # from accelerate.inference import prepare_pippy import torch # import cohere # from openai import OpenAI # # import google import google.generativeai as genai import src.backend.util as util import src.envs as envs # # # import pandas as pd # import scipy from scipy.spatial.distance import jensenshannon from scipy.stats import bootstrap import numpy as np import spacy_transformers import subprocess # Run the command to download the spaCy model # subprocess.run(["python", "-m", "spacy", "download", "en_core_web_lg"], check=True) # subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True) # subprocess.run(["pip", "install", "spacy-transformers"], check=True) # subprocess.run(["pip", "install", "curated-transformers"], check=True) # Load spacy model for word tokenization # nlp = spacy.load("en_core_web_sm") try: nlp1 = spacy.load("en_core_web_sm") except OSError: print("Can not load spacy model") # litellm.set_verbose=False litellm.set_verbose=True # Set up basic configuration for logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class ModelLoadingException(Exception): """Exception raised for errors in loading a model. Attributes: model_id (str): The model identifier. revision (str): The model revision. """ def __init__(self, model_id, revision, messages="Error initializing model"): self.model_id = model_id self.revision = revision super().__init__(f"{messages} id={model_id} revision={revision}") class ResponseGenerator: """A class to generate responses using a causal language model. Attributes: model (str): huggingface/{model_id} api_base (str): https://api-inference.huggingface.co/models/{model_id} responses_df (DataFrame): DataFrame to store generated responses. revision (str): Model revision. avg_length (float): Average length of responses. answer_rate (float): Rate of non-empty responses. """ def __init__(self, model_id, revision): """ Initializes the ResponseGenerator with a model. Args: model_id (str): Identifier for the model. revision (str): Revision of the model. """ self.model_id = model_id self.model = f"huggingface/{model_id}" self.api_base = f"https://api-inference.huggingface.co/models/{model_id}" self.responses_df = pd.DataFrame() self.revision = revision self.avg_length = None self.answer_rate = None self.exceptions = None self.local_model = None def generate_response(self, dataset, df_prompt, save_path=None): """Generate responses for a given DataFrame of source docs. Args: dataset (DataFrame): DataFrame containing source docs. Returns: responses_df (DataFrame): Generated responses by the model. """ exceptions = [] if (save_path is not None) and os.path.exists(save_path): '''已存在文件,可以读取已经存在的测试文本''' self.responses_df = pd.read_csv(save_path) # print(self.responses_df['Experiment']) print(f'Loaded generated responses from {save_path}') else: '''测试文件不存在,则需要调用指定的模型来进行测试''' # prompt = {} # for index, row in tqdm(df_prompt.iterrows(), total=df_prompt.shape[0]): # prompt['E' + row['Item']] = row['Prompt'] xls = pd.ExcelFile(dataset) sheet_names = xls.sheet_names # sheet_names = df.sheetnames print(f"Total: {len(sheet_names)}") print(sheet_names) Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1 = [], [], [], [], [] ,[], [], [] exit_outer_loop = False # bad model for i, sheet_name in enumerate(sheet_names, start=1): if exit_outer_loop: break # 读取每个工作表 # if i > 2 and i ==1: # continue print(i, sheet_name) df_sheet = pd.read_excel(xls, sheet_name=sheet_name) # 假设第一列是'Prompt0',但这里我们使用列名来避免硬编码 if 'Prompt0' in df_sheet.columns: prompt_column = df_sheet['Prompt0'] else: # 如果'Prompt0'列不存在,则跳过该工作表或进行其他处理 continue if i == 3 : word1_list = df_sheet['Stimuli-2'] word2_list = df_sheet['Stimuli-3'] V2_column = [] for jj in range(len(word1_list)): V2_column.append(word1_list[jj] + '_' + word2_list[jj]) # print(V2_column) elif i == 9: V2_column = df_sheet['V2'] #SL, LS elif i == 4 or i == 6 : V2_column = df_sheet['Stimuli-2'] #Stimuli-2 else: V2_column = [""] * len(prompt_column) q_column = df_sheet["ID"] Item_column = df_sheet["Item"] Condition_column = df_sheet["Condition"] Stimuli_1_column = df_sheet["Stimuli-1"] if 'Stimuli-2' in df_sheet.columns: Stimuli_2_column = df_sheet["Stimuli-2"] for j, prompt_value in enumerate(tqdm(prompt_column, desc=f"Processing {sheet_name}"), start=0): if exit_outer_loop: break ID = 'E' + str(i) # q_ID = ID + '_' + str(j) # print(ID, q_ID, prompt_value) system_prompt = envs.SYSTEM_PROMPT _user_prompt = prompt_value print(_user_prompt) for ii in range(100): # user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}" while True: try: '''调用''' print(self.model_id.lower(),'-',ID,'-',j,'-',ii) _response = self.send_request(system_prompt, _user_prompt) # print(_response) # print(f"Finish index {index}") break except Exception as e: if 'Rate limit reached' in str(e): wait_time = 3660 current_time = datetime.now().strftime('%H:%M:%S') print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...") time.sleep(wait_time) elif 'is currently loading' in str(e): wait_time = 200 print(f"Model is loading, wait for {wait_time}") time.sleep(wait_time) elif '429 Resource has been exhausted' in str(e): # for gemini models wait_time = 60 print(f"Quota has reached, wait for {wait_time}") time.sleep(wait_time) else: max_retries = 30 retries = 0 wait_time = 120 while retries < max_retries: print(f"Error at index {i}: {e}") time.sleep(wait_time) try: _response = self.send_request(system_prompt, _user_prompt) break except Exception as ee: exceptions.append(ee) retries += 1 print(f"Retry {retries}/{max_retries} failed at index {i}: {ee}") if retries >= max_retries: exit_outer_loop = True break if exit_outer_loop: break if i == 5: #print(_response) # For E5, the responses might be in the following formats: # "Sure\n\nThe first sentence of the response\n\nThe second sentence of the response" # "The first sentence of the response\n\nThe second sentence of the response" # "XXX: The first sentence of the response\n\nXXX: The second sentence of the response" # "Sure\n\nXXX: The first sentence of the response\n\nXXX: The second sentence of the response" # "Sure\n\nThe first sentence of the response\n\nThe second sentence of the response\n\n" def extract_responses(text, trigger_words=None): if trigger_words is None: trigger_words = ["sure", "okay", "yes"] try: # Split the text into sentences sentences = text.split('\n') # Remove empty sentences sentences = [sentence.strip() for sentence in sentences if sentence.strip()] # Remove the first sentence if it has a : in it, sentences = [sentence.split(':', 1)[-1].strip() if ':' in sentence else sentence for sentence in sentences] # Remove empty sentences sentences = [sentence.strip() for sentence in sentences if sentence.strip()] # Remove the first sentence if it is a trigger word if any(sentences[0].lower().startswith(word) for word in trigger_words) and len( sentences) > 2: _response1 = sentences[1].strip() if len(sentences) > 1 else None _response2 = sentences[2].strip() if len(sentences) > 2 else None else: _response1 = sentences[0].strip() if len(sentences) > 0 else None _response2 = sentences[1].strip() if len(sentences) > 1 else None except Exception as e: print(f"Error occurred: {e}") _response1, _response2 = None, None print(_response1), print(_response2) return _response1, _response2 _response1, _response2 = extract_responses(_response) Experiment_ID.append(ID) Questions_ID.append(q_column[j]) User_prompt.append(_user_prompt) Response.append(_response2) Factor_2.append(_response) Stimuli_1.append(Stimuli_2_column[j]) Item_ID.append(Item_column[j]) Condition.append(Condition_column[j]) # the first sentence in the response is saved as E51 Experiment_ID.append(ID + '1') Questions_ID.append(str(q_column[j]) + '1') User_prompt.append(_user_prompt) Response.append(_response1) Factor_2.append(_response) Stimuli_1.append(Stimuli_1_column[j]) Item_ID.append(Item_column[j]) Condition.append(Condition_column[j]) else: Experiment_ID.append(ID) Questions_ID.append(q_column[j]) User_prompt.append(_user_prompt) Response.append(_response) if i == 6: Factor_2.append(Condition_column[j]) Stimuli_1.append(V2_column[j]) else: Factor_2.append(V2_column[j]) Stimuli_1.append(Stimuli_1_column[j]) Item_ID.append(Item_column[j]) Condition.append(Condition_column[j]) # Sleep to prevent hitting rate limits too frequently time.sleep(1) self.responses_df = pd.DataFrame(list(zip(Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1)), columns=["Experiment", "Question_ID", "Item", "Condition", "User_prompt", "Response","Factor 2","Stimuli 1"]) if save_path is not None: print(f'Save responses to {save_path}') fpath = Path(save_path) fpath.parent.mkdir(parents=True, exist_ok=True) self.responses_df.to_csv(fpath) self.exceptions = exceptions # self._compute_avg_length() # self._compute_answer_rate() return self.responses_df def send_request(self, system_prompt: str, user_prompt: str): # Using Together AI API using_together_api = False together_ai_api_models = ['mixtral', 'dbrx', 'wizardlm'] for together_ai_api_model in together_ai_api_models: if together_ai_api_model in self.model_id.lower(): #using_together_api = True break # print('适用哪一种LLM',together_ai_api_model , using_together_api) # print(self.model_id.lower()) #meta-llama/llama-2-7b-chat-hf # print('local',self.local_model) $None # exit() # if 'mixtral' in self.model_id.lower() or 'dbrx' in self.model_id.lower() or 'wizardlm' in self.model_id.lower(): # For mixtral and dbrx models, use Together AI API if using_together_api: # suffix = "completions" if ('mixtral' in self.model_id.lower() or 'base' in self.model_id.lower()) else "chat/completions" suffix = "chat/completions" url = f"https://api.together.xyz/v1/{suffix}" payload = { "model": self.model_id, # "max_tokens": 4096, 'max_new_tokens': 100, # "a": 0.0, # 'repetition_penalty': 1.1 if 'mixtral' in self.model_id.lower() else 1 } payload['messages'] = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] headers = { "accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {os.environ['TOGETHER_API_KEY']}" } response = requests.post(url, json=payload, headers=headers) try: result = json.loads(response.text) # print(result) result = result["choices"][0] if 'message' in result: result = result["message"]["content"].strip() else: result = result["text"] result_candidates = [result_cancdidate for result_cancdidate in result.split('\n\n') if len(result_cancdidate) > 0] result = result_candidates[0] print(result) except: print(response) result = '' print(result) return result if self.local_model: # cannot call API. using local model messages=[ {"role": "system", "content": system_prompt}, # gemma-1.1 does not accept system role {"role": "user", "content": user_prompt} ] try: # some models support pipeline pipe = pipeline( "text-generation", model=self.local_model, tokenizer=self.tokenizer, ) generation_args = { "max_new_tokens": 100, "return_full_text": False, #"temperature": 0.0, "do_sample": False, } output = pipe(messages, **generation_args) result = output[0]['generated_text'] print(result) except: prompt = self.tokenizer.apply_chat_template(messages,add_generation_prompt=True, tokenize=False) print(prompt) input_ids = self.tokenizer(prompt, return_tensors="pt").to('cuda') with torch.no_grad(): outputs = self.local_model.generate(**input_ids, max_new_tokens=100, do_sample=True, pad_token_id=self.tokenizer.eos_token_id) result = self.tokenizer.decode(outputs[0], skip_special_tokens=True) result = result.replace(prompt[0], '') print(result) return result # Using OpenAI API elif 'gpt' in self.model_id.lower(): response = litellm.completion( model=self.model_id.replace('openai/', ''), messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], # temperature=0.0, max_tokens=100, api_key=os.getenv('OpenAI_key') ) result = response['choices'][0]['message']['content'] # print() # print(result) return result elif self.local_model is None: import random def get_random_token(): i = random.randint(1, 20) token = getattr(envs, f"TOKEN{i}") return token, i tokens_tried = set() while len(tokens_tried) < 10: token,i = get_random_token() if token in tokens_tried: continue tokens_tried.add(token) print(f"Trying with token: TOKEN{i}") try: from huggingface_hub import InferenceClient client = InferenceClient(self.model_id, api_key=token, headers={"X-use-cache": "false"}) messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] result = None while result is None: outputs = client.chat_completion(messages, max_tokens=100) result = outputs['choices'][0]['message']['content'] if result is None: time.sleep(1) # Optional: Add a small delay before retrying return result except Exception as e: print(f"Error with token: {token}, trying another token...") continue raise Exception("All tokens failed.") elif 'gemini' in self.model_id.lower(): genai.configure(api_key=os.getenv('GOOGLE_AI_API_KEY')) generation_config = { # "temperature": 0, # "top_p": 0.95, # cannot change # "top_k": 0, "max_output_tokens": 100, # "response_mime_type": "application/json", } safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE" }, ] model = genai.GenerativeModel( model_name="gemini-1.5-pro-latest" if "gemini-1.5-pro" in self.model_id.lower() else self.model_id.lower().split('google/')[-1], generation_config=generation_config, system_instruction=system_prompt, safety_settings=safety_settings) convo = model.start_chat(history=[]) convo.send_message(user_prompt) # print(convo.last) result = convo.last.text print(result) return result # exit() # Using local model class EvaluationModel: """A class to evaluate generated responses. Attributes: model (CrossEncoder): The evaluation model. scores (list): List of scores for the responses. humanlike_score (float): Human-likeness score """ def __init__(self): """ Initializes the EvaluationModel. """ self.scores = [] self.humanlike_score = None def code_results_llm_cleaned(self, responses_df): '''code results from LLM's response''' output = [] '''database for Exp4''' item4 = pd.read_csv(envs.ITEM_4_DATA) wordpair2code = {} for j in range(len(item4['Coding'])): wordpair2code[item4['Pair'][j]] = item4['Coding'][j] '''verb for Exp5''' item5 = pd.read_csv(envs.ITEM_5_DATA) # item corresponding to verb, same item id corresponding to verb pair item2verb2 = {} item2verb1 = {} Stimuli1, Stimuli2 = {}, {} for j in range(len(item5['Item'])): item2verb1[item5['Item'][j]] = item5['Verb1'][j] item2verb2[item5['Item'][j]] = item5['Verb2'][j] Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j] Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j] male_keyword = ["he", "his", "himself"] female_keyword = ["she", "her", "herself"] #print(len(responses_df["Experiment"])) for i in range(len(responses_df["Experiment"])): print(i, "/", len(responses_df["Experiment"])) # vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 # print() if pd.isna(responses_df["Response"][i]): output.append("Other") continue rs = responses_df["Response"][i].strip().lower() rs = rs.replace('"', '').replace(" ", " ").replace('.', '') lines = rs.split("\n") filtered_lines = [line for line in lines if line and not (line.endswith(":") or line.endswith(":"))] filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for r in filtered_lines] rs = "\n".join(filtered_lines) rs = rs.strip() '''Exp1''' if responses_df["Experiment"][i] == "E1": #print("E1", rs) if rs == "round": # vote_1_1 += 1 output.append("Round") elif rs == "spiky": output.append("Spiky") else: output.append("Other") '''Exp2''' elif responses_df["Experiment"][i] == "E2": # rs = responses_df["Response"][i].strip() rs = rs.split(' ') #print("E2", rs) male, female = 0, 0 for word in rs: if word in female_keyword and male == 0: female = 1 output.append("Female") break if word in male_keyword and female == 0: male = 1 output.append("Male") break if male == 0 and female == 0: output.append("Other") '''Exp3''' elif responses_df["Experiment"][i] == "E3": # rs = responses_df["Response"][i].strip() #print("E3", rs) pair = responses_df["Factor 2"][i] word1, word2 = pair.replace(".", "").split('_') if responses_df["Item"][i] == 12: output.append("Other") else: words = rs.split() # split the response into words if any(word == word1 for word in words) and any(word == word2 for word in words): output.append("Other") else: if any(word.lower() == word1.lower() for word in words): if len(word1) > len(word2): output.append("Long") else: output.append("Short") elif any(word.lower() == word2.lower() for word in words): if len(word1) > len(word2): output.append("Short") else: output.append("Long") else: if len(words) > 1: # joint the words using " " word = " ".join(words) if word.lower() == word1.lower(): if len(word1) > len(word2): output.append("Long") else: output.append("Short") elif word.lower() == word2.lower(): if len(word1) > len(word2): output.append("Short") else: output.append("Long") else: output.append("Other") else: output.append("Other") '''Exp4''' elif responses_df["Experiment"][i] == "E4": lines = rs.split("\n") filtered_lines = [] if len(lines) > 1: for r in lines[1:]: if ':' in r: filtered_lines.append(r.split(':', 1)[-1].strip()) else: filtered_lines.append(r) filtered_lines.insert(0, lines[0]) else: filtered_lines = lines # print(filtered_lines) filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] rs = "\n".join(filtered_lines) filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for r in rs.split(";")] filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] rs = ";".join(filtered_lines).strip() try: meaning_word = rs.split(";")[4].replace(" ", '') except IndexError: try: meaning_word = rs.split("\n")[4].replace(" ", '') except IndexError: output.append("Other") continue except Exception as e: print(f"Unexpected error: {e}") output.append("Other") continue target = responses_df["Factor 2"][i].strip().lower() pair = target + "_" + meaning_word #print("E4:", pair) if pair in wordpair2code.keys(): output.append(wordpair2code[pair]) else: output.append("Other") '''Exp5''' elif responses_df["Experiment"][i] == "E5" or responses_df["Experiment"][i] == "E51": # sentence = responses_df["Response"][i].strip() item_id = responses_df["Item"][i] question_id = responses_df["Question_ID"][i] sti1, sti2 = "", "" if responses_df["Experiment"][i] == "E51": sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "") #sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "") verb = item2verb1[item_id].lower() sentence = sti1 + " " + rs.replace(sti1, "") #print("E5", verb, sentence) if responses_df["Experiment"][i] == "E5": #sti1 = Stimuli1[question_id].lower().replace("...", "") # print(sti1) sti2 = Stimuli2[question_id].lower().replace("...", "") verb = item2verb2[item_id].lower() sentence = sti2 + " " + rs.replace(sti2, "") #print("E5", verb, sentence) doc = nlp1(sentence.replace(" ", " ")) # print(doc) # print() verb_token = None for token in doc: # print(token.lemma_) if token.lemma_ == verb: verb_token = token break # exit() pobj, dative = None, None # print(verb_token.children) # exit() if verb_token is not None: for child in verb_token.children: # print(child) if (child.dep_ == 'dative' and child.pos_ == "ADP") or ( child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"): pobj = child.text if child.dep_ == 'dative': dative = child.text # print("E5", pobj, dative) # exit() if pobj: output.append("PO") elif dative: output.append("DO") else: # print("Other", sentence, pobj, dative) # exit() output.append("Other") '''Exp6''' elif responses_df["Experiment"][i] == "E6": sentence = responses_df["Stimuli 1"][i].strip().lower() #print("E6", sentence) doc = nlp1(sentence) subject = "None" obj = "None" pobj_list = [] # To collect all prepositional objects for token in doc: if token.dep_ == "nsubj": subject = token.text elif token.dep_ == "dobj": obj = token.text elif token.dep_ == "pobj": pobj_list.append(token.text) # Collect prepositional objects rs_list = rs.lower().split() if subject in rs_list and (obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list)): output.append("Other") elif subject in rs_list: output.append("VP") elif obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list): output.append("NP") else: output.append("Other") '''Exp7''' elif responses_df["Experiment"][i] == "E7": # rs = responses_df["Response"][i].strip().lower() rs = rs.replace(".", "").replace(",", "").lower() #print("E7", rs) if "yes" in rs and "no" in rs: output.append("Other") elif "no" in rs: output.append("0") elif "yes" in rs: output.append("1") else: output.append("Other") '''Exp8''' elif responses_df["Experiment"][i] == "E8": # rs = responses_df["Response"][i].strip() #print("E8", rs) if "something is wrong with the question" in rs: output.append("1") else: output.append("0") '''Exp9''' elif responses_df["Experiment"][i] == "E9": male, female = 0, 0 # rs = responses_df["Response"][i].strip() if "because" in rs: rs = rs.replace("because because", "because").split("because")[1] else: rs = rs condition = responses_df["Factor 2"][i].strip() rs = rs.split(" ") for w in rs: if w in male_keyword and female != 1: male = 1 break if w in female_keyword and male != 1: female = 1 break #print("E9", "condition", condition, "male", male, "female", female) if male == 0 and female == 0: output.append('Other') else: if male == 1 and female == 0: if condition == "MF": output.append("Subject") elif condition == "FM": output.append("Object") else: output.append("Other") elif female == 1 and male == 0: if condition == "MF": output.append("Object") elif condition == "FM": output.append("Subject") else: output.append("Other") '''Exp10''' elif responses_df["Experiment"][i] == "E10": # rs = responses_df["Response"][i].strip() rs = rs.replace(".", "") if rs == "yes": output.append("1") else: output.append("0") else: #print("can;t find the Exp:", responses_df["Experiment"][i]) output.append("NA") # print(output) # exit() '''LLM''' print(len(output)) import re def clean_text(text): if isinstance(text, str): return re.sub(r'[^\x00-\x7F]+', '', text) return text responses_df["Experiment"] = responses_df["Experiment"].apply(clean_text) responses_df["Question_ID"] = responses_df["Question_ID"].apply(clean_text) responses_df["Item"] = responses_df["Item"].apply(clean_text) responses_df["Response"] = responses_df["Response"].apply(clean_text) output = [str(item) for item in output] self.data = pd.DataFrame(list( zip(responses_df["Experiment"], responses_df["Question_ID"], responses_df["Item"], responses_df["Response"],output)), columns=["Experiment", "Question_ID", "Item", "Response","Coding"]) return self.data def code_results_llm(self, responses_df): '''code results from LLM's response''' output = [] '''database for Exp4''' item4 = pd.read_csv(envs.ITEM_4_DATA) wordpair2code = {} for j in range(len(item4['Coding'])): wordpair2code[item4['Pair'][j]] = item4['Coding'][j] '''verb for Exp5''' item5 = pd.read_csv(envs.ITEM_5_DATA) # item corresponding to verb, same item id corresponding to verb pair item2verb2 = {} item2verb1 = {} Stimuli1, Stimuli2 = {}, {} for j in range(len(item5['Item'])): item2verb1[item5['Item'][j]] = item5['Verb1'][j] item2verb2[item5['Item'][j]] = item5['Verb2'][j] Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j] Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j] male_keyword = ["he", "his", "himself"] female_keyword = ["she", "her", "herself"] #print(len(responses_df["Experiment"])) for i in range(len(responses_df["Experiment"])): print(i, "/", len(responses_df["Experiment"])) # vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 # print() if pd.isna(responses_df["Response"][i]): output.append("Other") continue rs = responses_df["Response"][i].strip().lower() print(rs) rs = rs.replace('"', '').replace(" ", " ").replace('.', '') #lines = rs.split("\n") #filtered_lines = [line for line in lines if line and not (line.endswith(":") or line.endswith(":"))] # filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for # r in filtered_lines] # rs = "\n".join(filtered_lines) # rs = rs.strip() '''Exp1''' if responses_df["Experiment"][i] == "E1": rs_lower = rs.lower() if "round" in rs_lower and "spiky" in rs_lower: output.append("Other") elif "round" in rs_lower: output.append("Round") elif "spiky" in rs_lower: output.append("Spiky") else: output.append("Other") '''Exp2''' elif responses_df["Experiment"][i] == "E2": # rs = responses_df["Response"][i].strip() rs = rs.split(' ') #print("E2", rs) male, female = 0, 0 for word in rs: if word in female_keyword and male == 0: female = 1 output.append("Female") break if word in male_keyword and female == 0: male = 1 output.append("Male") break if male == 0 and female == 0: output.append("Other") '''Exp3''' elif responses_df["Experiment"][i] == "E3": # rs = responses_df["Response"][i].strip() #print("E3", rs) pair = responses_df["Factor 2"][i] word1, word2 = pair.replace(".", "").split('_') if responses_df["Item"][i] == 12: output.append("Other") else: words = rs.split() # split the response into words if any(word == word1 for word in words) and any(word == word2 for word in words): output.append("Other") else: if any(word.lower() == word1.lower() for word in words): if len(word1) > len(word2): output.append("Long") else: output.append("Short") elif any(word.lower() == word2.lower() for word in words): if len(word1) > len(word2): output.append("Short") else: output.append("Long") else: if len(words) > 1: # joint the words using " " word = " ".join(words) if word.lower() == word1.lower(): if len(word1) > len(word2): output.append("Long") else: output.append("Short") elif word.lower() == word2.lower(): if len(word1) > len(word2): output.append("Short") else: output.append("Long") else: output.append("Other") else: output.append("Other") '''Exp4''' elif responses_df["Experiment"][i] == "E4": lines = rs.split("\n") filtered_lines = [] if len(lines) > 1: for r in lines[1:]: if ':' in r: filtered_lines.append(r.split(':', 1)[-1].strip()) else: filtered_lines.append(r) filtered_lines.insert(0, lines[0]) else: filtered_lines = lines # print(filtered_lines) #filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] #rs = "\n".join(filtered_lines) #filtered_lines = [r.split(':', 1)[-1].strip() if ':' in r else r for r in rs.split(";")] #filtered_lines = [r.split('-', 1)[-1].strip() if '-' in r else r for r in filtered_lines] rs = ";".join(filtered_lines).strip() try: meaning_word = rs.split(";")[4].replace(" ", '') except IndexError: try: meaning_word = rs.split("\n")[4].replace(" ", '') except IndexError: output.append("Other") continue except Exception as e: print(f"Unexpected error: {e}") output.append("Other") continue target = responses_df["Factor 2"][i].strip().lower() pair = target + "_" + meaning_word #print("E4:", pair) if pair in wordpair2code.keys(): output.append(wordpair2code[pair]) else: output.append("Other") '''Exp5''' elif responses_df["Experiment"][i] == "E5" or responses_df["Experiment"][i] == "E51": # sentence = responses_df["Response"][i].strip() item_id = responses_df["Item"][i] question_id = responses_df["Question_ID"][i] if responses_df["Experiment"][i] == "E51": sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "") #sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "") verb = item2verb1[item_id].lower() sentence = sti1 + " " + rs.replace(sti1, "") #print("E5", verb, sentence) if responses_df["Experiment"][i] == "E5": #sti1 = Stimuli1[question_id].lower().replace("...", "") # print(sti1) sti2 = Stimuli2[question_id].lower().replace("...", "") verb = item2verb2[item_id].lower() sentence = sti2 + " " + rs.replace(sti2, "") #print("E5", verb, sentence) doc = nlp1(sentence.replace(" ", " ")) # print(doc) # print() verb_token = None for token in doc: # print(token.lemma_) if token.lemma_ == verb: verb_token = token break # exit() pobj, dative = None, None # print(verb_token.children) # exit() if verb_token is not None: for child in verb_token.children: # print(child) if (child.dep_ == 'dative' and child.pos_ == "ADP") or ( child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"): pobj = child.text if child.dep_ == 'dative': dative = child.text # print("E5", pobj, dative) # exit() if pobj: output.append("PO") elif dative: output.append("DO") else: # print("Other", sentence, pobj, dative) # exit() output.append("Other") '''Exp6''' elif responses_df["Experiment"][i] == "E6": sentence = responses_df["Stimuli 1"][i].strip().lower() #print("E6", sentence) doc = nlp1(sentence) subject = "None" obj = "None" pobj_list = [] # To collect all prepositional objects for token in doc: if token.dep_ == "nsubj": subject = token.text elif token.dep_ == "dobj": obj = token.text elif token.dep_ == "pobj": pobj_list.append(token.text) # Collect prepositional objects rs_list = rs.lower().split() if subject in rs_list and (obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list)): output.append("Other") elif subject in rs_list: output.append("VP") elif obj in rs_list or any(pobj == r for pobj in pobj_list for r in rs_list): output.append("NP") else: output.append("Other") '''Exp7''' elif responses_df["Experiment"][i] == "E7": # rs = responses_df["Response"][i].strip().lower() rs = rs.replace(".", "").replace(",", "").lower() #print("E7", rs) if "yes" in rs and "no" in rs: output.append("Other") elif "no" in rs: output.append("0") elif "yes" in rs: output.append("1") else: output.append("Other") '''Exp8''' elif responses_df["Experiment"][i] == "E8": # rs = responses_df["Response"][i].strip() #print("E8", rs) if "something is wrong with the question" in rs: output.append("1") else: output.append("0") '''Exp9''' elif responses_df["Experiment"][i] == "E9": male, female = 0, 0 # rs = responses_df["Response"][i].strip() if "because" in rs: rs = rs.replace("because because", "because").split("because")[1] else: rs = rs condition = responses_df["Factor 2"][i].strip() rs = rs.split(" ") for w in rs: if w in male_keyword and female != 1: male = 1 break if w in female_keyword and male != 1: female = 1 break #print("E9", "condition", condition, "male", male, "female", female) if male == 0 and female == 0: output.append('Other') else: if male == 1 and female == 0: if condition == "MF": output.append("Subject") elif condition == "FM": output.append("Object") else: output.append("Other") elif female == 1 and male == 0: if condition == "MF": output.append("Object") elif condition == "FM": output.append("Subject") else: output.append("Other") '''Exp10''' elif responses_df["Experiment"][i] == "E10": # rs = responses_df["Response"][i].strip() rs = rs.replace(".", "") if rs == "yes": output.append("1") else: output.append("0") else: #print("can;t find the Exp:", responses_df["Experiment"][i]) output.append("NA") # print(output) # exit() '''LLM''' print(len(output)) import re def clean_text(text): if isinstance(text, str): return re.sub(r'[^\x00-\x7F]+', '', text) return text responses_df["Experiment"] = responses_df["Experiment"].apply(clean_text) responses_df["Question_ID"] = responses_df["Question_ID"].apply(clean_text) responses_df["Item"] = responses_df["Item"].apply(clean_text) responses_df["Response"] = responses_df["Response"].apply(clean_text) output = [str(item) for item in output] self.data = pd.DataFrame(list( zip(responses_df["Experiment"], responses_df["Question_ID"], responses_df["Item"], responses_df["Response"],output)), columns=["Experiment", "Question_ID", "Item", "Response","Coding"]) return self.data def calculate_js_divergence(self, file_path_1, file_path_2): """ Calculate the Jensen-Shannon divergence for response distributions between two datasets. - Extracts E5 and E51 pairs, creates new data based on comparison, removes the original E5 and E51, and then calculates the JS divergence between the datasets. Parameters: file_path_1 (str): Path to the first dataset file (CSV format). file_path_2 (str): Path to the second dataset file (CSV format). Returns: float: The average JS divergence across all common Question_IDs. """ # Load the datasets human_df = pd.read_csv(file_path_1, encoding='ISO-8859-1') llm_df = pd.read_csv(file_path_2) def create_e5_entries(df): new_entries = [] for i in range(len(df) - 1): if 'E51' in df.iloc[i]['Experiment']: priming_id = df.iloc[i][0]-1 priming_row_id = df[df.iloc[:, 0] == priming_id].index[0] new_question_id = df.iloc[priming_row_id]['Question_ID'] label = 1 if df.iloc[i]['Coding'] == df.iloc[priming_row_id]['Coding'] else 0 new_entries.append({ 'Question_ID': new_question_id, 'Response': f'{df.iloc[i]["Coding"]}-{df.iloc[priming_row_id]["Coding"]}', 'Coding': label }) return pd.DataFrame(new_entries) # Create new E5 entries for both datasets human_e5 = create_e5_entries(human_df) llm_e5 = create_e5_entries(llm_df) # Remove E5 and E51 entries from both datasets human_df = human_df[~human_df['Question_ID'].str.contains('E5')] llm_df = llm_df[~llm_df['Question_ID'].str.contains('E5')] # Append new E5 entries to the cleaned dataframes human_df = pd.concat([human_df, human_e5], ignore_index=True) llm_df = pd.concat([llm_df, llm_e5], ignore_index=True) ### Calculate Average JS Divergence ### # Extract the relevant columns for JS divergence calculation human_responses = human_df[['Question_ID', 'Coding']] llm_responses = llm_df[['Question_ID', 'Coding']] # Remove 'Other' responses #human_responses = human_responses[human_responses['Coding'] != 'Other'] #llm_responses = llm_responses[llm_responses['Coding'] != 'Other'] # Get unique Question_IDs present in both datasets common_question_ids = set(human_responses['Question_ID']).intersection(set(llm_responses['Question_ID'])) # Initialize a dictionary to store JS divergence for each experiment js_divergence = {} # Calculate JS divergence for each common Question_ID for q_id in common_question_ids: # Get response distributions for the current Question_ID in both datasets human_dist = human_responses[human_responses['Question_ID'] == q_id]['Coding'].value_counts( normalize=True) llm_dist = llm_responses[llm_responses['Question_ID'] == q_id]['Coding'].value_counts(normalize=True) # Reindex the distributions to have the same index, filling missing values with 0 all_responses = set(human_dist.index).union(set(llm_dist.index)) human_dist = human_dist.reindex(all_responses, fill_value=0) llm_dist = llm_dist.reindex(all_responses, fill_value=0) # Calculate JS divergence js_div = jensenshannon(human_dist, llm_dist, base=2) experiment_id = q_id.split('_')[1] if experiment_id not in js_divergence: js_divergence[experiment_id] = [] js_divergence[experiment_id].append(js_div) # Calculate the average JS divergence per experiment and the confidence interval results = {} experiment_averages = [] for exp, divs in js_divergence.items(): avg_js_divergence = 1 - np.nanmean(divs) ci_lower, ci_upper = bootstrap((divs,), np.nanmean, confidence_level=0.95, n_resamples=1000).confidence_interval results[exp] = { 'average_js_divergence': avg_js_divergence, 'confidence_interval': (1 - ci_upper, 1 - ci_lower) # Adjust for 1 - score } experiment_averages.append(avg_js_divergence) # Calculate the weighted average JS divergence across all experiments weighted_js_divergence = np.mean(experiment_averages) # Simple average over experiments # Calculate the confidence interval for the overall JS divergence using bootstrap overall_ci_lower, overall_ci_upper = bootstrap( (experiment_averages,), np.nanmean, confidence_level=0.95, n_resamples=1000 ).confidence_interval # Combine all results into one dictionary all_results = { 'overall': { 'average_js_divergence': weighted_js_divergence, 'confidence_interval': (overall_ci_lower, overall_ci_upper) }, 'per_experiment': results } return all_results def evaluate_humanlike(self, responses_df: pd.DataFrame, human_data_path: object, result_save_path: str) -> object: ''' evaluate humanlike score 1. code the result 2. comput the similaritirs between human and model process model responses''' '''coding human data''' # self.huamn_df = pd.read_csv(human_data_path) # self.data = self.code_results(self.huamn_df) #save_path = human_data_path.replace('.csv','_coding.csv') #human_save_path = "./src/datasets/coding_human.xlsx" # if save_path is not None: # print(f'Save human coding results to {save_path}') # fpath = Path(save_path) # fpath.parent.mkdir(parents=True, exist_ok=True) # self.data.to_csv(fpath) '''coding llm data''' save_path = result_save_path.replace('.csv','_coding.csv') self.llm_df = self.code_results_llm(responses_df) if save_path is not None: print(f'Save LLM coding results to {save_path}') fpath = Path(save_path) fpath.parent.mkdir(parents=True, exist_ok=True) self.llm_df.to_csv(fpath) envs.API.upload_file( path_or_fileobj=save_path,#./generation_results/meta-llama/Llama-2-13b-chat-hf_coding.csv path_in_repo=f"{save_path.replace('generation_results/','')}",# repo_id=envs.RESULTS_REPO, repo_type="dataset", ) # file_path_1 = '/Users/simon/Downloads/coding_human.xlsx' # file_path_2 = '/Users/simon/Downloads/Meta-Llama-3.1-70B-Instruct_coding.csv' avg_js_divergence = self.calculate_js_divergence(human_data_path, save_path) return avg_js_divergence