import streamlit as st import pandas as pd import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from transformers import AutoTokenizer,AutoModel import random from bs4 import BeautifulSoup import re from transformers import AutoModelForSequenceClassification import pytorch_lightning as pl device = "cuda:0" if torch.cuda.is_available() else "cpu" train_path = "train.csv" test_path = "test.csv" test_labels_paths = "test_labels.csv" test_df = pd.read_csv(test_path) test_labels_df = pd.read_csv(test_labels_paths) test_df = pd.concat([test_df.iloc[:, 1], test_labels_df.iloc[:, 1:]], axis = 1) test_df.to_csv("test-dataset.csv") test_dataset_path = "test-dataset.csv" #Lets make a new column labeled "healthy" def healthy_filter(df): if (df["toxic"]==0) and (df["severe_toxic"]==0) and (df["obscene"]==0) and (df["threat"]==0) and (df["insult"]==0) and (df["identity_hate"]==0): return 1 else: return 0 attributes = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate', 'healthy'] class Comments_Dataset(Dataset): def __init__(self, data_path, tokenizer, attributes, max_token_len = 128, sample=5000): self.data_path = data_path self.tokenizer = tokenizer self.attributes = attributes self.max_token_len = max_token_len self.sample = sample self._prepare_data() def _prepare_data(self): data = pd.read_csv(self.data_path) data["healthy"] = data.apply(healthy_filter,axis=1) data["unhealthy"] = np.where(data['healthy']==1, 0, 1) if self.sample is not None: unhealthy = data.loc[data["healthy"] == 0] healthy = data.loc[data["healthy"] ==1] self.data = pd.concat([unhealthy, healthy.sample(self.sample, random_state=42)]) else: self.data = data def __len__(self): return len(self.data) def __getitem__(self,index): item = self.data.iloc[index] comment = str(item.comment_text) attributes = torch.FloatTensor(item[self.attributes]) tokens = self.tokenizer.encode_plus(comment, add_special_tokens=True, return_tensors='pt', truncation=True, padding='max_length', max_length=self.max_token_len, return_attention_mask = True) return {'input_ids': tokens.input_ids.flatten(), 'attention_mask': tokens.attention_mask.flatten(), 'labels': attributes} class Comments_Data_Module(pl.LightningDataModule): def __init__(self, train_path, val_path, attributes, batch_size: int = 16, max_token_length: int = 128, model_name='roberta-base'): super().__init__() self.train_path = train_path self.val_path = val_path self.attributes = attributes self.batch_size = batch_size self.max_token_length = max_token_length self.model_name = model_name self.tokenizer = AutoTokenizer.from_pretrained(model_name) def setup(self, stage = None): if stage in (None, "fit"): self.train_dataset = Comments_Dataset(self.train_path, attributes=self.attributes, tokenizer=self.tokenizer) self.val_dataset = Comments_Dataset(self.val_path, attributes=self.attributes, tokenizer=self.tokenizer, sample=None) if stage == 'predict': self.val_dataset = Comments_Dataset(self.val_path, attributes=self.attributes, tokenizer=self.tokenizer, sample=None) def train_dataloader(self): return DataLoader(self.train_dataset, batch_size = self.batch_size, num_workers=4, shuffle=True) def val_dataloader(self): return DataLoader(self.val_dataset, batch_size = self.batch_size, num_workers=4, shuffle=False) def predict_dataloader(self): return DataLoader(self.val_dataset, batch_size = self.batch_size, num_workers=4, shuffle=False) comments_data_module = Comments_Data_Module(train_path, test_dataset_path, attributes=attributes) comments_data_module.setup() comments_data_module.train_dataloader() class Comment_Classifier(pl.LightningModule): #the config dict has the hugginface parameters in it def __init__(self, config: dict): super().__init__() self.config = config self.pretrained_model = AutoModel.from_pretrained(config['model_name'], return_dict = True) self.hidden = torch.nn.Linear(self.pretrained_model.config.hidden_size, self.pretrained_model.config.hidden_size) self.classifier = torch.nn.Linear(self.pretrained_model.config.hidden_size, self.config['n_labels']) torch.nn.init.xavier_uniform_(self.classifier.weight) self.loss_func = nn.CrossEntropyLoss() self.dropout = nn.Dropout() def forward(self, input_ids, attention_mask, labels=None): # roberta layer output = self.pretrained_model(input_ids=input_ids, attention_mask=attention_mask) pooled_output = torch.mean(output.last_hidden_state, 1) # final logits / classification layers pooled_output = self.dropout(pooled_output) pooled_output = self.hidden(pooled_output) pooled_output = F.relu(pooled_output) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) # calculate loss loss = 0 if labels is not None: loss = self.loss_func(logits.view(-1, self.config['n_labels']), labels.view(-1, self.config['n_labels'])) return loss, logits def training_step(self, batch, batch_index): loss, outputs = self(**batch) self.log("train loss ", loss, prog_bar = True, logger=True) return {"loss":loss, "predictions":outputs, "labels": batch["labels"]} def validation_step(self, batch, batch_index): loss, outputs = self(**batch) self.log("validation loss ", loss, prog_bar = True, logger=True) return {"val_loss": loss, "predictions":outputs, "labels": batch["labels"]} def predict_step(self, batch, batch_index): loss, outputs = self(**batch) return outputs def configure_optimizers(self): optimizer = AdamW(self.parameters(), lr=self.config['lr'], weight_decay=self.config['weight_decay']) total_steps = self.config['train_size']/self.config['batch_size'] warmup_steps = math.floor(total_steps * self.config['warmup']) scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps) return [optimizer],[scheduler] config = { 'model_name': 'distilroberta-base', 'n_labels': len(attributes), 'batch_size': 128, 'lr': 1.5e-6, 'warmup': 0.2, 'train_size': len(comments_data_module.train_dataloader()), 'weight_decay': 0.001, 'n_epochs': 100 } model_name = 'distilroberta-base' tokenizer = AutoTokenizer.from_pretrained(model_name) model = Comment_Classifier(config=config) model.load_state_dict(torch.load("model_state_dict.pt")) model.eval() def prepare_tokenized_review(raw_review): # Remove HTML tags with BS review_text = BeautifulSoup(raw_review).get_text() # Removing non-letters using a regular expression review_text = re.sub("[^a-zA-Z!?]"," ", review_text) # Convert words to lower case and split them words = review_text.lower().split() return " ".join(words) def get_encodings(text): MAX_LEN=256 encodings = tokenizer.encode_plus( text, None, add_special_tokens=True, max_length=MAX_LEN, padding='max_length', truncation=True, return_attention_mask=True, return_tensors='pt') return encodings def run_inference(encoding): with torch.no_grad(): input_ids = encoding['input_ids'].to(device, dtype=torch.long) attention_mask = encoding['attention_mask'].to(device, dtype=torch.long) output = model(input_ids, attention_mask) final_output = torch.softmax(output[1][0],dim=0).cpu() print(final_output.numpy().tolist()) return final_output.numpy().tolist() test_tweets = test_df["comment_text"].values #streamlit section models = ["distilroberta-base"] model_pointers = ["default: distilroberta-base"] # current_random_tweet = test_tweets[random.randint(0,len(test_tweets))] # current_random_tweet = prepare_tokenized_review(current_random_tweet) st.write("1. Hit the button to view and see the analyis of a random tweet") with st.form(key="init_form"): current_random_tweet = test_tweets[random.randint(0,len(test_tweets))] current_random_tweet = prepare_tokenized_review(current_random_tweet) choice = st.selectbox("Choose Model", model_pointers) user_picked_model = models[model_pointers.index(choice)] with st.spinner("Analyzing..."): text_encoding = get_encodings(current_random_tweet) result = run_inference(text_encoding) df = pd.DataFrame({"Tweet":current_random_tweet}, index=[0]) df["Highest Toxicity Class"] = attributes[result.index(max(result))] df["Sentiment Score"] = max(result) st.table(df) next_tweet = st.form_submit_button("Next Tweet") if next_tweet: with st.spinner("Analyzing..."): st.write("")