Spaces:
Sleeping
Sleeping
import sklearn | |
from tenacity import retry, stop_after_attempt, wait_random_exponential | |
from tqdm import tqdm | |
import sys | |
# import openai | |
import time | |
# import pandas as pd | |
import random | |
import csv | |
import os | |
import pickle | |
import json | |
from langchain.chat_models import AzureChatOpenAI | |
from langchain.schema import HumanMessage, SystemMessage | |
from langchain.callbacks import get_openai_callback | |
from langchain.llms import OpenAI | |
import tiktoken | |
from sklearn.feature_extraction.text import CountVectorizer | |
from collections import Counter | |
import math | |
import io | |
import contextlib | |
# os.system('pip install pandas reportlab') | |
# os.system('pip install openai==0.27.2') | |
# os.system('pip install tenacity') | |
import requests | |
from bs4 import BeautifulSoup | |
import ast | |
import nltk | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
from nltk.tokenize import sent_tokenize | |
from nltk.corpus import stopwords | |
import string | |
from nltk.tokenize import sent_tokenize | |
from nltk.tokenize import word_tokenize | |
import numpy as np | |
import evaluate | |
def tree_edit_distance(tree1, tree2): | |
def cost(node1, node2): | |
""" Cost to transform node1 to node2 """ | |
if node1 == node2: | |
return 0 | |
return 1 | |
def tree_size(tree): | |
""" Calculate the size of the tree """ | |
if not isinstance(tree, list) or not tree: | |
return 1 | |
return 1 + sum(tree_size(child) for child in tree) | |
def ted(tree1, tree2): | |
""" Compute tree edit distance between two trees """ | |
if not isinstance(tree1, list) and not isinstance(tree2, list): | |
return cost(tree1, tree2) | |
if not isinstance(tree1, list): | |
return tree_size(tree2) | |
if not isinstance(tree2, list): | |
return tree_size(tree1) | |
if not tree1 and not tree2: | |
return 0 | |
if not tree1: | |
return sum(tree_size(child) for child in tree2) | |
if not tree2: | |
return sum(tree_size(child) for child in tree1) | |
dp = [[0] * (len(tree2) + 1) for _ in range(len(tree1) + 1)] | |
for i in range(1, len(tree1) + 1): | |
dp[i][0] = dp[i-1][0] + tree_size(tree1[i-1]) | |
for j in range(1, len(tree2) + 1): | |
dp[0][j] = dp[0][j-1] + tree_size(tree2[j-1]) | |
for i in range(1, len(tree1) + 1): | |
for j in range(1, len(tree2) + 1): | |
dp[i][j] = min(dp[i-1][j] + tree_size(tree1[i-1]), | |
dp[i][j-1] + tree_size(tree2[j-1]), | |
dp[i-1][j-1] + ted(tree1[i-1], tree2[j-1])) | |
return dp[len(tree1)][len(tree2)] | |
return ted(tree1, tree2) | |
def preprocess_code_str(code_str): | |
prefix = "citation_bracket = {}\nsentence = {}\n" | |
code_str = code_str.replace(" ", "") | |
code_lines = code_str.split("\n") | |
code_line_list = [] | |
for line in code_lines: | |
if "citation_bracket[" in line.split("=")[0]: | |
code_line_list.append(line) | |
if "sentence[" in line.split("=")[0]: | |
code_line_list.append(line) | |
return prefix + "\n".join(code_line_list) + "\nprint(sentence)" | |
def run_code(code_str): | |
# Redirect stdout to capture print statements | |
f = io.StringIO() | |
with contextlib.redirect_stdout(f): | |
exec(preprocess_code_str(code_str)) | |
# Get the standard output | |
output = f.getvalue() | |
return ast.literal_eval(output) | |
def replace_with_char(input_list, char='a'): | |
def replace_in_nested_list(nested_list): | |
if isinstance(nested_list, list): | |
return [replace_in_nested_list(item) for item in nested_list] | |
else: | |
return char | |
return replace_in_nested_list(input_list) | |
def top_k_keys(input_dict, k): | |
# Sort the dictionary items by value in descending order and extract the keys | |
sorted_keys = sorted(input_dict, key=input_dict.get, reverse=True) | |
# Return the top-k keys | |
return sorted_keys[:k] | |
def keys_with_least_k_values(d, k): | |
if k <= 0: | |
return [] | |
# Get the sorted list of (key, value) tuples based on the values | |
sorted_items = sorted(d.items(), key=lambda item: item[1]) | |
# Extract the keys of the first k items | |
least_k_keys = [item[0] for item in sorted_items[:k]] | |
return least_k_keys | |
def edit_distance_code_str(code1, code2, just_tree_structure=False): | |
# code1 = preprocess_code_str(code1) | |
# code2 = preprocess_code_str(code2) | |
sentence1 = run_code(code1) | |
list_1 = [sentence1[key] for key in sentence1] | |
sentence2 = run_code(code2) | |
list_2 = [sentence2[key] for key in sentence2] | |
if just_tree_structure: | |
list_1 = replace_with_char(list_1) | |
list_2 = replace_with_char(list_2) | |
return tree_edit_distance(list_1, list_2) | |
class eval_metrics: | |
def __init__(self): | |
pass | |
# if is_bertscore: | |
# pass | |
def get_rouge_l(self, pred, refs): | |
rouge = evaluate.load('rouge') | |
results = rouge.compute(predictions=pred, references=refs) | |
return results['rougeL'] | |
def get_bleu(self, pred, refs): | |
bleu = evaluate.load('bleu') | |
tmp_refs = [[item] for item in refs] | |
results = bleu.compute(predictions=pred, references=tmp_refs) | |
return results['bleu'] | |
def get_meteor(self, pred, refs): | |
meteor = evaluate.load('meteor') | |
results = meteor.compute(predictions=pred, references=refs) | |
return results['meteor'] | |
def get_bertscore(self, pred, refs): | |
bertscore = evaluate.load('bertscore') | |
results = bertscore.compute(predictions=pred, references=refs, lang = "en") | |
return np.mean(results['f1']) | |
def get_bleurt(self, pred, refs): | |
bleurt = evaluate.load('bleurt', module_type="metric") | |
# tmp_refs = [[item] for item in refs] | |
results = bleurt.compute(predictions=pred, references=refs) | |
return np.mean(results['scores']) | |
class BM25: | |
def __init__(self, documents, k1=1.5, b=0.75): | |
self.documents = documents | |
self.k1 = k1 | |
self.b = b | |
self.vectorizer = CountVectorizer().fit(documents) | |
self.doc_term_matrix = self.vectorizer.transform(documents) | |
self.doc_lengths = np.array(self.doc_term_matrix.sum(axis=1)).flatten() | |
self.avg_doc_length = np.mean(self.doc_lengths) | |
self.df = np.diff(self.doc_term_matrix.tocsc().indptr) | |
self.idf = self.compute_idf() | |
def compute_idf(self): | |
N = len(self.documents) | |
idf = np.log((N - self.df + 0.5) / (self.df + 0.5) + 1) | |
return idf | |
def compute_bm25(self, query): | |
query_vec = self.vectorizer.transform([query]) | |
scores = [] | |
for doc_idx in range(self.doc_term_matrix.shape[0]): | |
score = 0 | |
for term_idx in query_vec.indices: | |
if term_idx in self.doc_term_matrix[doc_idx].indices: | |
tf = self.doc_term_matrix[doc_idx, term_idx] | |
idf = self.idf[term_idx] | |
numerator = tf * (self.k1 + 1) | |
denominator = tf + self.k1 * (1 - self.b + self.b * (self.doc_lengths[doc_idx] / self.avg_doc_length)) | |
score += idf * numerator / denominator | |
scores.append(score) | |
return scores | |
def get_top_k(self, query, k=5): | |
scores = self.compute_bm25(query) | |
top_k_indices = np.argsort(scores)[::-1][:k] | |
top_k_docs = [self.documents[i] for i in top_k_indices] | |
return top_k_docs, top_k_indices | |
def get_nmis(true_dict, pred_dict): | |
labels_true = [] | |
labels_pred = [] | |
# print(true_dict.keys()) | |
# print(pred_dict.keys()) | |
# print() | |
for key in true_dict: | |
labels_true.append(true_dict[key]) | |
if key not in pred_dict: | |
labels_pred.append(-1) | |
else: | |
labels_pred.append(pred_dict[key]) | |
if len(labels_pred) == 0: | |
max_label_pred = 0 | |
else: | |
max_label_pred = np.max(labels_pred) + 1 | |
for label_idx, item in enumerate(labels_pred): | |
if item==-1: | |
labels_pred[label_idx] = max_label_pred | |
max_label_pred+=1 | |
return sklearn.metrics.normalized_mutual_info_score(labels_true=labels_true, labels_pred=labels_pred), sklearn.metrics.adjusted_mutual_info_score(labels_true=labels_true, labels_pred=labels_pred) | |
def calculate_precision_recall_f1(predicted, ground_truth): | |
# print(predicted) | |
# print() | |
# print(ground_truth) | |
# print("-------------") | |
# Convert lists to sets to handle duplicates and perform set operations | |
predicted_set = set(predicted) | |
ground_truth_set = set(ground_truth) | |
# Calculate true positives (intersection of predicted and ground truth) | |
true_positives = predicted_set.intersection(ground_truth_set) | |
# Calculate precision | |
precision = len(true_positives) / len(predicted_set) if predicted_set else 0 | |
# Calculate recall | |
recall = len(true_positives) / len(ground_truth_set) if ground_truth_set else 0 | |
# Calculate F1-score | |
if precision + recall == 0: | |
f1_score = 0 | |
else: | |
f1_score = 2 * (precision * recall) / (precision + recall) | |
return precision, recall, f1_score | |
def get_introduction(arxiv_id): | |
# Step 1: Construct the URL | |
url = f"https://ar5iv.org/html/{arxiv_id}" | |
# Step 2: Fetch the HTML content of the page | |
response = requests.get(url) | |
if response.status_code != 200: | |
raise Exception(f"Failed to fetch the page: Status code {response.status_code}") | |
# Step 3: Parse the HTML content | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Step 4: Locate the introduction section | |
# We assume the introduction is the first section after the abstract | |
# This heuristic might need adjustment depending on the exact structure of the paper | |
introduction_text = "" | |
found_introduction = False | |
# Look for h2 tags which usually denote sections | |
for tag in soup.find_all(['h2', 'h3']): | |
# print(tag.text.lower()) | |
if 'introduction' in tag.text.lower(): | |
# print(tag.text) | |
introduction_text += tag.text.strip() + "\n\n" | |
next_node = tag.find_next_sibling() | |
while next_node and next_node.name not in ['h2', 'h3']: | |
introduction_text += next_node.get_text().strip() + "\n\n" | |
next_node = next_node.find_next_sibling() | |
break | |
return introduction_text | |
def write_to_file(filepath, content): | |
if '.txt' in filepath: | |
with open(filepath, 'w') as fw: | |
fw.write(content) | |
elif '.json' in filepath: | |
with open(filepath, 'w') as fw: | |
json.dump(content, fw) | |
elif '.pickle' in filepath or '.pkl' in filepath: | |
with open(filepath, 'wb') as fw: | |
pickle.dump(content, fw) | |
elif '.npy' in filepath: | |
np.save(filepath, content) | |
def read_from_file(filepath): | |
if '.txt' in filepath: | |
with open(filepath, 'r') as fr: | |
return fr.read() | |
elif '.json' in filepath: | |
with open(filepath, 'r') as fr: | |
return json.load(fr) | |
elif '.pickle' in filepath or '.pkl' in filepath: | |
with open(filepath, 'rb') as fr: | |
return pickle.load(fr) | |
elif '.npy' in filepath: | |
return np.load(filepath) | |
def remove_stopwords_and_punctuation(text): | |
# Get the list of stopwords | |
stop_words = set(stopwords.words('english')) | |
# Remove punctuation from text | |
text = text.translate(str.maketrans('', '', string.punctuation.replace('_', '').replace('@', ''))) | |
# Split the text into words | |
words = text.split() | |
# Remove stopwords | |
filtered_words = [word for word in words if word.lower() not in stop_words] | |
# Join the words back into a single string | |
filtered_text = ' '.join(filtered_words) | |
return filtered_text | |
class AzureModels: | |
def __init__(self, model_name): | |
if model_name == "gpt4": | |
DEPLOYMENT_NAME = "gentech-gpt4-research" | |
BASE_URL = "https://gentechworkbench-stage.openai.azure.com/" | |
API_KEY = "f074d7f2bfdf486783db5f4605b263a6" | |
self.model = AzureChatOpenAI( | |
openai_api_base=BASE_URL, | |
openai_api_version="2023-03-15-preview", | |
deployment_name=DEPLOYMENT_NAME, | |
openai_api_key=API_KEY, | |
openai_api_type="azure", | |
) | |
self.enc = tiktoken.encoding_for_model("gpt-4-0314") | |
elif model_name == "gpt4o": | |
DEPLOYMENT_NAME = "gpt-4o" | |
BASE_URL = "https://docexpresearch.openai.azure.com/" | |
API_KEY = "2d6dc256edd94e65a2fa4b5658651377" | |
self.model = AzureChatOpenAI( | |
openai_api_base=BASE_URL, | |
openai_api_version="2023-07-01-preview", | |
deployment_name=DEPLOYMENT_NAME, | |
openai_api_key=API_KEY, | |
openai_api_type="azure", | |
) | |
self.enc = tiktoken.encoding_for_model("gpt-4o") | |
def get_completion(self, question, max_tokens, stop=None): | |
gpt_answer = self.model( | |
[ | |
HumanMessage( | |
content=question | |
) | |
], max_tokens = max_tokens, stop=stop | |
) | |
gpt_answer_content = gpt_answer.content # Access the content attribute | |
# Convert the answer_content to string datatype | |
if isinstance(gpt_answer_content, str): | |
gpt_answer_string = gpt_answer_content # If the content is already a string, use it directly | |
else: | |
gpt_answer_string = str(gpt_answer_content) # Convert to string if it's not already a string | |
return gpt_answer_string | |
def get_num_inp_tokens(self, inp): | |
tokens = self.enc.encode(inp) | |
return len(tokens) | |