import sklearn from tenacity import retry, stop_after_attempt, wait_random_exponential from tqdm import tqdm import sys # import openai import time # import pandas as pd import random import csv import os import pickle import json from langchain.chat_models import AzureChatOpenAI from langchain.schema import HumanMessage, SystemMessage from langchain.callbacks import get_openai_callback from langchain.llms import OpenAI import tiktoken from sklearn.feature_extraction.text import CountVectorizer from collections import Counter import math import io import contextlib # os.system('pip install pandas reportlab') # os.system('pip install openai==0.27.2') # os.system('pip install tenacity') import requests from bs4 import BeautifulSoup import ast import nltk nltk.download('punkt') nltk.download('stopwords') from nltk.tokenize import sent_tokenize from nltk.corpus import stopwords import string from nltk.tokenize import sent_tokenize from nltk.tokenize import word_tokenize import numpy as np import evaluate def tree_edit_distance(tree1, tree2): def cost(node1, node2): """ Cost to transform node1 to node2 """ if node1 == node2: return 0 return 1 def tree_size(tree): """ Calculate the size of the tree """ if not isinstance(tree, list) or not tree: return 1 return 1 + sum(tree_size(child) for child in tree) def ted(tree1, tree2): """ Compute tree edit distance between two trees """ if not isinstance(tree1, list) and not isinstance(tree2, list): return cost(tree1, tree2) if not isinstance(tree1, list): return tree_size(tree2) if not isinstance(tree2, list): return tree_size(tree1) if not tree1 and not tree2: return 0 if not tree1: return sum(tree_size(child) for child in tree2) if not tree2: return sum(tree_size(child) for child in tree1) dp = [[0] * (len(tree2) + 1) for _ in range(len(tree1) + 1)] for i in range(1, len(tree1) + 1): dp[i][0] = dp[i-1][0] + tree_size(tree1[i-1]) for j in range(1, len(tree2) + 1): dp[0][j] = dp[0][j-1] + tree_size(tree2[j-1]) for i in range(1, len(tree1) + 1): for j in range(1, len(tree2) + 1): dp[i][j] = min(dp[i-1][j] + tree_size(tree1[i-1]), dp[i][j-1] + tree_size(tree2[j-1]), dp[i-1][j-1] + ted(tree1[i-1], tree2[j-1])) return dp[len(tree1)][len(tree2)] return ted(tree1, tree2) def preprocess_code_str(code_str): prefix = "citation_bracket = {}\nsentence = {}\n" code_str = code_str.replace(" ", "") code_lines = code_str.split("\n") code_line_list = [] for line in code_lines: if "citation_bracket[" in line.split("=")[0]: code_line_list.append(line) if "sentence[" in line.split("=")[0]: code_line_list.append(line) return prefix + "\n".join(code_line_list) + "\nprint(sentence)" def run_code(code_str): # Redirect stdout to capture print statements f = io.StringIO() with contextlib.redirect_stdout(f): exec(preprocess_code_str(code_str)) # Get the standard output output = f.getvalue() return ast.literal_eval(output) def replace_with_char(input_list, char='a'): def replace_in_nested_list(nested_list): if isinstance(nested_list, list): return [replace_in_nested_list(item) for item in nested_list] else: return char return replace_in_nested_list(input_list) def top_k_keys(input_dict, k): # Sort the dictionary items by value in descending order and extract the keys sorted_keys = sorted(input_dict, key=input_dict.get, reverse=True) # Return the top-k keys return sorted_keys[:k] def keys_with_least_k_values(d, k): if k <= 0: return [] # Get the sorted list of (key, value) tuples based on the values sorted_items = sorted(d.items(), key=lambda item: item[1]) # Extract the keys of the first k items least_k_keys = [item[0] for item in sorted_items[:k]] return least_k_keys def edit_distance_code_str(code1, code2, just_tree_structure=False): # code1 = preprocess_code_str(code1) # code2 = preprocess_code_str(code2) sentence1 = run_code(code1) list_1 = [sentence1[key] for key in sentence1] sentence2 = run_code(code2) list_2 = [sentence2[key] for key in sentence2] if just_tree_structure: list_1 = replace_with_char(list_1) list_2 = replace_with_char(list_2) return tree_edit_distance(list_1, list_2) class eval_metrics: def __init__(self): pass # if is_bertscore: # pass def get_rouge_l(self, pred, refs): rouge = evaluate.load('rouge') results = rouge.compute(predictions=pred, references=refs) return results['rougeL'] def get_bleu(self, pred, refs): bleu = evaluate.load('bleu') tmp_refs = [[item] for item in refs] results = bleu.compute(predictions=pred, references=tmp_refs) return results['bleu'] def get_meteor(self, pred, refs): meteor = evaluate.load('meteor') results = meteor.compute(predictions=pred, references=refs) return results['meteor'] def get_bertscore(self, pred, refs): bertscore = evaluate.load('bertscore') results = bertscore.compute(predictions=pred, references=refs, lang = "en") return np.mean(results['f1']) def get_bleurt(self, pred, refs): bleurt = evaluate.load('bleurt', module_type="metric") # tmp_refs = [[item] for item in refs] results = bleurt.compute(predictions=pred, references=refs) return np.mean(results['scores']) class BM25: def __init__(self, documents, k1=1.5, b=0.75): self.documents = documents self.k1 = k1 self.b = b self.vectorizer = CountVectorizer().fit(documents) self.doc_term_matrix = self.vectorizer.transform(documents) self.doc_lengths = np.array(self.doc_term_matrix.sum(axis=1)).flatten() self.avg_doc_length = np.mean(self.doc_lengths) self.df = np.diff(self.doc_term_matrix.tocsc().indptr) self.idf = self.compute_idf() def compute_idf(self): N = len(self.documents) idf = np.log((N - self.df + 0.5) / (self.df + 0.5) + 1) return idf def compute_bm25(self, query): query_vec = self.vectorizer.transform([query]) scores = [] for doc_idx in range(self.doc_term_matrix.shape[0]): score = 0 for term_idx in query_vec.indices: if term_idx in self.doc_term_matrix[doc_idx].indices: tf = self.doc_term_matrix[doc_idx, term_idx] idf = self.idf[term_idx] numerator = tf * (self.k1 + 1) denominator = tf + self.k1 * (1 - self.b + self.b * (self.doc_lengths[doc_idx] / self.avg_doc_length)) score += idf * numerator / denominator scores.append(score) return scores def get_top_k(self, query, k=5): scores = self.compute_bm25(query) top_k_indices = np.argsort(scores)[::-1][:k] top_k_docs = [self.documents[i] for i in top_k_indices] return top_k_docs, top_k_indices def get_nmis(true_dict, pred_dict): labels_true = [] labels_pred = [] # print(true_dict.keys()) # print(pred_dict.keys()) # print() for key in true_dict: labels_true.append(true_dict[key]) if key not in pred_dict: labels_pred.append(-1) else: labels_pred.append(pred_dict[key]) if len(labels_pred) == 0: max_label_pred = 0 else: max_label_pred = np.max(labels_pred) + 1 for label_idx, item in enumerate(labels_pred): if item==-1: labels_pred[label_idx] = max_label_pred max_label_pred+=1 return sklearn.metrics.normalized_mutual_info_score(labels_true=labels_true, labels_pred=labels_pred), sklearn.metrics.adjusted_mutual_info_score(labels_true=labels_true, labels_pred=labels_pred) def calculate_precision_recall_f1(predicted, ground_truth): # print(predicted) # print() # print(ground_truth) # print("-------------") # Convert lists to sets to handle duplicates and perform set operations predicted_set = set(predicted) ground_truth_set = set(ground_truth) # Calculate true positives (intersection of predicted and ground truth) true_positives = predicted_set.intersection(ground_truth_set) # Calculate precision precision = len(true_positives) / len(predicted_set) if predicted_set else 0 # Calculate recall recall = len(true_positives) / len(ground_truth_set) if ground_truth_set else 0 # Calculate F1-score if precision + recall == 0: f1_score = 0 else: f1_score = 2 * (precision * recall) / (precision + recall) return precision, recall, f1_score def get_introduction(arxiv_id): # Step 1: Construct the URL url = f"https://ar5iv.org/html/{arxiv_id}" # Step 2: Fetch the HTML content of the page response = requests.get(url) if response.status_code != 200: raise Exception(f"Failed to fetch the page: Status code {response.status_code}") # Step 3: Parse the HTML content soup = BeautifulSoup(response.content, 'html.parser') # Step 4: Locate the introduction section # We assume the introduction is the first section after the abstract # This heuristic might need adjustment depending on the exact structure of the paper introduction_text = "" found_introduction = False # Look for h2 tags which usually denote sections for tag in soup.find_all(['h2', 'h3']): # print(tag.text.lower()) if 'introduction' in tag.text.lower(): # print(tag.text) introduction_text += tag.text.strip() + "\n\n" next_node = tag.find_next_sibling() while next_node and next_node.name not in ['h2', 'h3']: introduction_text += next_node.get_text().strip() + "\n\n" next_node = next_node.find_next_sibling() break return introduction_text def write_to_file(filepath, content): if '.txt' in filepath: with open(filepath, 'w') as fw: fw.write(content) elif '.json' in filepath: with open(filepath, 'w') as fw: json.dump(content, fw) elif '.pickle' in filepath or '.pkl' in filepath: with open(filepath, 'wb') as fw: pickle.dump(content, fw) elif '.npy' in filepath: np.save(filepath, content) def read_from_file(filepath): if '.txt' in filepath: with open(filepath, 'r') as fr: return fr.read() elif '.json' in filepath: with open(filepath, 'r') as fr: return json.load(fr) elif '.pickle' in filepath or '.pkl' in filepath: with open(filepath, 'rb') as fr: return pickle.load(fr) elif '.npy' in filepath: return np.load(filepath) def remove_stopwords_and_punctuation(text): # Get the list of stopwords stop_words = set(stopwords.words('english')) # Remove punctuation from text text = text.translate(str.maketrans('', '', string.punctuation.replace('_', '').replace('@', ''))) # Split the text into words words = text.split() # Remove stopwords filtered_words = [word for word in words if word.lower() not in stop_words] # Join the words back into a single string filtered_text = ' '.join(filtered_words) return filtered_text class AzureModels: def __init__(self, model_name): if model_name == "gpt4": DEPLOYMENT_NAME = "gentech-gpt4-research" BASE_URL = "https://gentechworkbench-stage.openai.azure.com/" API_KEY = "f074d7f2bfdf486783db5f4605b263a6" self.model = AzureChatOpenAI( openai_api_base=BASE_URL, openai_api_version="2023-03-15-preview", deployment_name=DEPLOYMENT_NAME, openai_api_key=API_KEY, openai_api_type="azure", ) self.enc = tiktoken.encoding_for_model("gpt-4-0314") elif model_name == "gpt4o": DEPLOYMENT_NAME = "gpt-4o" BASE_URL = "https://docexpresearch.openai.azure.com/" API_KEY = "2d6dc256edd94e65a2fa4b5658651377" self.model = AzureChatOpenAI( openai_api_base=BASE_URL, openai_api_version="2023-07-01-preview", deployment_name=DEPLOYMENT_NAME, openai_api_key=API_KEY, openai_api_type="azure", ) self.enc = tiktoken.encoding_for_model("gpt-4o") @retry(wait=wait_random_exponential(min=30, max=80), stop=stop_after_attempt(5)) def get_completion(self, question, max_tokens, stop=None): gpt_answer = self.model( [ HumanMessage( content=question ) ], max_tokens = max_tokens, stop=stop ) gpt_answer_content = gpt_answer.content # Access the content attribute # Convert the answer_content to string datatype if isinstance(gpt_answer_content, str): gpt_answer_string = gpt_answer_content # If the content is already a string, use it directly else: gpt_answer_string = str(gpt_answer_content) # Convert to string if it's not already a string return gpt_answer_string def get_num_inp_tokens(self, inp): tokens = self.enc.encode(inp) return len(tokens)