Spaces:
Sleeping
Sleeping
import pandas as pd | |
from transformers import AutoTokenizer, AutoModel | |
from sentence_transformers import SentenceTransformer, util | |
import numpy as np | |
import torch | |
### Functions needed for Classfication | |
def addCategories(df,df_all): | |
categories = df.to_dict("records") | |
categories_all = df_all.to_dict("list") | |
for cat in categories: | |
if cat['topic'] not in categories_all['topic']: | |
categories_all['topic'].append(cat['topic']) | |
categories_all['description'].append(cat['description']) | |
categories_all['experts'].append(cat['experts']) | |
print(f"AFTER ADDINGS Those are the categories_all : {categories_all}") | |
return gr.update(choices=categories_all['topic']),pd.DataFrame.from_dict(categories_all) | |
df_cat_filter = df_cate.to_dict("list")["topic"] | |
def filterByTopics(filters, categories): | |
value_filtered = [] | |
categories = categories.to_dict("records") | |
for cat in categories: | |
if cat['topic'] in filters: | |
value_filtered.append(cat) | |
return gr.DataFrame(label='categories', value=pd.DataFrame(value_filtered), interactive=True) | |
### End | |
def reset_cate(df_categories): | |
if df_categories.equals(df_cate): | |
df_categories = pd.DataFrame([['', '', '']], columns=['topic', 'description', 'expert']) | |
else: | |
df_categories = df_cate.copy() | |
return df_categories | |
def load_data(file_obj): | |
# Assuming file_obj is a file-like object uploaded via Gradio, use `pd.read_excel` directly on it | |
return pd.read_excel(file_obj) | |
def initialize_models(): | |
model_ST = SentenceTransformer("all-mpnet-base-v2") | |
return model_ST | |
def generate_embeddings(df, model, Column): | |
embeddings_list = [] | |
for index, row in df.iterrows(): | |
if type(row[Column]) == str: | |
print(index) | |
if 'Title' in df.columns: | |
if type(row["Title"]) == str: | |
content = row["Title"] + "\n" + row[Column] | |
else: | |
content = row[Column] | |
else: | |
content = row[Column] | |
embeddings = model.encode(content, convert_to_tensor=True) | |
embeddings_list.append(embeddings) | |
else: | |
embeddings_list.append(np.nan) | |
df['Embeddings'] = embeddings_list | |
return df | |
def process_categories(categories, model): | |
# Create a new DataFrame to store category information and embeddings | |
df_cate = pd.DataFrame(categories) | |
# Generate embeddings for each category description | |
df_cate['Embeddings'] = df_cate.apply(lambda cat: model.encode(cat['description'], convert_to_tensor=True), axis=1) | |
return df_cate | |
def match_categories(df, category_df, treshold=0.45): | |
categories_list, experts_list, topic_list, scores_list = [], [], [], [] | |
for ebd_content in df['Embeddings']: | |
if isinstance(ebd_content, torch.Tensor): | |
cos_scores = util.cos_sim(ebd_content, torch.stack(list(category_df['Embeddings']), dim=0))[0] | |
high_score_indices = [i for i, score in enumerate(cos_scores) if score > treshold] | |
# Append the corresponding categories, experts, and topics for each high-scoring index | |
categories_list.append([category_df.loc[index, 'description'] for index in high_score_indices]) | |
experts_list.append([category_df.loc[index, 'experts'] for index in high_score_indices]) | |
topic_list.append([category_df.loc[index, 'topic'] for index in high_score_indices]) | |
scores_list.append([float(cos_scores[index]) for index in high_score_indices]) | |
else: | |
categories_list.append(np.nan) | |
experts_list.append(np.nan) | |
topic_list.append(np.nan) | |
scores_list.append('pas interessant') | |
df["Description"] = categories_list | |
df["Expert"] = experts_list | |
df["Topic"] = topic_list | |
df["Score"] = scores_list | |
return df | |
def flatten_nested_lists(nested_list): | |
"""Flatten a list of potentially nested lists into a single list.""" | |
flattened_list = [] | |
for item in nested_list: | |
if isinstance(item, list): | |
flattened_list.extend(flatten_nested_lists(item)) # Recursively flatten the list | |
else: | |
flattened_list.append(item) | |
return flattened_list | |
def save_data(df, filename): | |
# Apply flattening and then join for the 'Expert' column | |
df['Expert'] = df['Expert'].apply(lambda x: ', '.join(flatten_nested_lists(x)) if isinstance(x, list) else x) | |
df['Description'] = df['Description'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x) | |
df['Topic'] = df['Topic'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x) | |
df['Score'] = df['Score'].apply(lambda x: ', '.join(map(str, x)) if isinstance(x, list) else x) | |
df = df.drop(columns=['Embeddings']) | |
new_filename = filename.replace(".", "_classified.") | |
df.to_excel(new_filename, index=False) | |
return new_filename | |
def classification(column, file_path, categories, treshold): | |
# Load data | |
df = load_data(file_path) | |
# Initialize models | |
model_ST = initialize_models() | |
# Generate embeddings for df | |
df = generate_embeddings(df, model_ST, column) | |
category_df = process_categories(categories, model_ST) | |
# Match categories | |
df = match_categories(df, category_df, treshold=treshold) | |
# Save data | |
return save_data(df,file_path), df | |