import json import random import re import unicodedata from typing import Tuple import gradio as gr import spacy import torch import torch.nn as nn import torch.nn.functional as F nlp = spacy.load('en_core_web_sm') def greet(name): return "Hello " + name + "!!" # read word2idx and idx2word from json file with open('vocab/word2idx.json', 'r') as f: word2idx = json.load(f) with open('vocab/idx2word.json', 'r') as f: idx2word = json.load(f) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def unicodetoascii(text): """ Turn a Unicode string to plain ASCII :param text: text to be converted :return: text in ascii format """ normalized_text = unicodedata.normalize('NFKD', str(text)) ascii_text = ''.join(char for char in normalized_text if unicodedata.category(char) != 'Mn') return ascii_text def preprocess_text(text, fn=unicodetoascii): text = fn(text) text = text.lower() text = re.sub(r'http\S+', '', text) text = re.sub(r'[^\x00-\x7F]+', "", text) # Remove non-ASCII characters text = re.sub(r"(\w)[!?]+(\w)", r'\1\2', text) # Remove !? between words text = re.sub(r"\s\s+", r" ", text).strip() # Remove extra spaces return text def tokenize(text, nlp=nlp): """ Tokenize text :param text: text to be tokenized :return: list of tokens """ return [tok.text for tok in nlp.tokenizer(text)] def lookup_words(idx2word, indices): """ Lookup words from indices :param idx2word: index to word mapping :param indices: indices to be converted :return: list of words """ return [idx2word[str(idx)] for idx in indices] class Encoder(nn.Module): """ GRU RNN Encoder """ def __init__(self, input_dim: int, emb_dim: int, enc_hid_dim: int, dec_hid_dim: int, dropout: float = 0): super(Encoder, self).__init__() # dimension of imput self.input_dim = input_dim # dimension of embedding layer self.emb_dim = emb_dim # dimension of encoding hidden layer self.enc_hid_dim = enc_hid_dim # dimension of decoding hidden layer self.dec_hid_dim = dec_hid_dim # create embedding layer use to train embedding representations of the corpus self.embedding = nn.Embedding(input_dim, emb_dim) # use GRU for RNN self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True, batch_first=False, num_layers=1) self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim) # create dropout layer which will help produce a more generalisable model self.dropout = nn.Dropout(dropout) def forward(self, src: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: # apply dropout to the embedding layer embedded = self.dropout(self.embedding(src)) # generate an output and hidden layer from the rnn outputs, hidden = self.rnn(embedded) hidden = torch.tanh(self.fc(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1))) return outputs, hidden class Attention(nn.Module): """ Luong attention """ def __init__(self, enc_hid_dim: int, dec_hid_dim: int, attn_dim: int): super(Attention, self).__init__() # dimension of encoding hidden layer self.enc_hid_dim = enc_hid_dim # dimension of decoding hidden layer self.dec_hid_dim = dec_hid_dim self.attn_in = (enc_hid_dim * 2) + dec_hid_dim self.attn = nn.Linear(self.attn_in, attn_dim) def forward(self, decoder_hidden: torch.Tensor, encoder_outputs: torch.Tensor) -> torch.Tensor: src_len = encoder_outputs.shape[0] repeated_decoder_hidden = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1) encoder_outputs = encoder_outputs.permute(1, 0, 2) # Luong attention energy = torch.tanh(self.attn(torch.cat((repeated_decoder_hidden, encoder_outputs), dim=2))) attention = torch.sum(energy, dim=2) return F.softmax(attention, dim=1) class AttnDecoder(nn.Module): """ GRU RNN Decoder with attention """ def __init__(self, output_dim: int, emb_dim: int, enc_hid_dim: int, dec_hid_dim: int, attention: nn.Module, dropout: float = 0): super(AttnDecoder, self).__init__() # dimention of output layer self.output_dim = output_dim # dimention of embedding layer self.emb_dim = emb_dim # dimention of encoding hidden layer self.enc_hid_dim = enc_hid_dim # dimention of decoding hidden layer self.dec_hid_dim = dec_hid_dim # drouput rate self.dropout = dropout # attention layer self.attention = attention # create embedding layer use to train embedding representations of the corpus self.embedding = nn.Embedding(output_dim, emb_dim) # use GRU for RNN self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim, batch_first=False, num_layers=1) self.out = nn.Linear(self.attention.attn_in + emb_dim, output_dim) self.dropout = nn.Dropout(dropout) def encode_attention(self, decoder_hidden: torch.Tensor, encoder_outputs: torch.Tensor) -> torch.Tensor: a = self.attention(decoder_hidden, encoder_outputs) a = a.unsqueeze(1) encoder_outputs = encoder_outputs.permute(1, 0, 2) weighted_encoder_rep = torch.bmm(a, encoder_outputs) weighted_encoder_rep = weighted_encoder_rep.permute(1, 0, 2) return weighted_encoder_rep def forward(self, input: torch.Tensor, decoder_hidden: torch.Tensor, encoder_outputs: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: input = input.unsqueeze(0) # apply dropout to embedding layer embedded = self.dropout(self.embedding(input)) weighted_encoder = self.encode_attention(decoder_hidden, encoder_outputs) # generate an output and hidden layer from the rnn rnn_input = torch.cat((embedded, weighted_encoder), dim=2) output, decoder_hidden = self.rnn(rnn_input, decoder_hidden.unsqueeze(0)) embedded = embedded.squeeze(0) output = output.squeeze(0) weighted_encoder = weighted_encoder.squeeze(0) output = self.out(torch.cat((output, weighted_encoder, embedded), dim=1)) return output, decoder_hidden.squeeze(0) class Decoder(nn.Module): """ GRU RNN Decoder without attention """ def __init__(self, output_dim: int, emb_dim: int, enc_hid_dim: int, dec_hid_dim: int, dropout: float = 0): super(Decoder, self).__init__() # dimention of output layer self.output_dim = output_dim # dimention of embedding layer self.emb_dim = emb_dim # dimention of encoding hidden layer self.enc_hid_dim = enc_hid_dim # dimention of decoding hidden layer self.dec_hid_dim = dec_hid_dim # drouput rate self.dropout = dropout # create embedding layer use to train embedding representations of the corpus self.embedding = nn.Embedding(output_dim, emb_dim) # GRU RNN self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim, batch_first=False, num_layers=1) self.out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim) self.dropout = nn.Dropout(dropout) def forward(self, input: torch.Tensor, decoder_hidden: torch.Tensor, encoder_outputs: torch.Tensor) -> Tuple[torch.Tensor , torch.Tensor]: input = input.unsqueeze(0) # apply dropout to embedding layer embedded = self.dropout(self.embedding(input)) context = encoder_outputs[-1,:,:] context = context.repeat(embedded.shape[0], 1, 1) embs_and_context = torch.cat((embedded, context), -1) # generate an output and hidden layer from the rnn output, decoder_hidden = self.rnn(embs_and_context, decoder_hidden.unsqueeze(0)) embedded = embedded.squeeze(0) output = output.squeeze(0) context = context.squeeze(0) output = self.out(torch.cat((output, embedded, context), -1)) return output, decoder_hidden.squeeze(0) class Seq2Seq(nn.Module): """ Seq-2-Seq model combining RNN encoder and RNN decoder """ def __init__(self, encoder: nn.Module, decoder: nn.Module, device: torch.device): super(Seq2Seq, self).__init__() self.encoder = encoder self.decoder = decoder self.device = device def forward(self, src: torch.Tensor, trg: torch.Tensor, teacher_forcing_ratio: float = 0.5) -> torch.Tensor: src = src.transpose(0, 1) # (max_len, batch_size) trg = trg.transpose(0, 1) # (max_len, batch_size) batch_size = src.shape[1] max_len = trg.shape[0] trg_vocab_size = self.decoder.output_dim outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device) encoder_outputs, hidden = self.encoder(src) # first input to the decoder is the token output = trg[0,:] for t in range(1, max_len): output, hidden = self.decoder(output, hidden, encoder_outputs) outputs[t] = output teacher_force = random.random() < teacher_forcing_ratio top1 = output.max(1)[1] output = trg[t] if teacher_force else top1 return outputs params = {'input_dim': len(word2idx), 'emb_dim': 128, 'enc_hid_dim': 256, 'dec_hid_dim': 256, 'dropout': 0.5, 'attn_dim': 32, 'teacher_forcing_ratio': 0.5, 'epochs': 35} enc = Encoder(input_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], dropout=params['dropout']) attn = Attention(enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], attn_dim=params['attn_dim']) dec = AttnDecoder(output_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], attention=attn, dropout=params['dropout']) attn_model = Seq2Seq(encoder=enc, decoder=dec, device=device) attn_model.load_state_dict(torch.load('AttnSeq2Seq-188M_epoch35.pt', map_location=torch.device('cpu'))) attn_model.to(device) enc = Encoder(input_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], dropout=params['dropout']) dec = Decoder(output_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], dropout=params['dropout']) norm_model = Seq2Seq(encoder=enc, decoder=dec, device=device) norm_model.load_state_dict(torch.load('NormSeq2Seq-188M_epoch35.pt', map_location=torch.device('cpu'))) norm_model.to(device) with open('vocab219/word2idx.json', 'r') as f: word2idx2 = json.load(f) with open('vocab219/idx2word.json', 'r') as f: idx2word2 = json.load(f) params219 = {'input_dim': len(word2idx2), 'emb_dim': 192, 'enc_hid_dim': 256, 'dec_hid_dim': 256, 'dropout': 0.5, 'attn_dim': 64, 'teacher_forcing_ratio': 0.5, 'epochs': 35} enc = Encoder(input_dim=params219['input_dim'], emb_dim=params219['emb_dim'], enc_hid_dim=params219['enc_hid_dim'], dec_hid_dim=params219['dec_hid_dim'], dropout=params219['dropout']) attn = Attention(enc_hid_dim=params219['enc_hid_dim'], dec_hid_dim=params219['dec_hid_dim'], attn_dim=params219['attn_dim']) dec = AttnDecoder(output_dim=params219['input_dim'], emb_dim=params219['emb_dim'], enc_hid_dim=params219['enc_hid_dim'], dec_hid_dim=params219['dec_hid_dim'], attention=attn, dropout=params219['dropout']) attn_model219 = Seq2Seq(encoder=enc, decoder=dec, device=device) attn_model219.load_state_dict(torch.load('AttnSeq2Seq-219M_epoch35.pt', map_location=torch.device('cpu'))) attn_model219.to(device) enc = Encoder(input_dim=params219['input_dim'], emb_dim=params219['emb_dim'], enc_hid_dim=params219['enc_hid_dim'], dec_hid_dim=params219['dec_hid_dim'], dropout=params219['dropout']) dec = Decoder(output_dim=params219['input_dim'], emb_dim=params219['emb_dim'], enc_hid_dim=params219['enc_hid_dim'], dec_hid_dim=params219['dec_hid_dim'], dropout=params219['dropout']) norm_model219 = Seq2Seq(encoder=enc, decoder=dec, device=device) norm_model219.load_state_dict(torch.load('NormSeq2Seq-219M_epoch35.pt', map_location=torch.device('cpu'))) norm_model219.to(device) with open('vocab219SW/word2idx.json', 'r') as f: word2idx3 = json.load(f) with open('vocab219SW/idx2word.json', 'r') as f: idx2word3 = json.load(f) params219SW = {'input_dim': len(word2idx3), 'emb_dim': 192, 'enc_hid_dim': 256, 'dec_hid_dim': 256, 'dropout': 0.5, 'attn_dim': 64, 'teacher_forcing_ratio': 0.5, 'epochs': 35} enc = Encoder(input_dim=params219SW['input_dim'], emb_dim=params219SW['emb_dim'], enc_hid_dim=params219SW['enc_hid_dim'], dec_hid_dim=params219SW['dec_hid_dim'], dropout=params219SW['dropout']) attn = Attention(enc_hid_dim=params219SW['enc_hid_dim'], dec_hid_dim=params219SW['dec_hid_dim'], attn_dim=params219SW['attn_dim']) dec = AttnDecoder(output_dim=params219SW['input_dim'], emb_dim=params219['emb_dim'], enc_hid_dim=params219SW['enc_hid_dim'], dec_hid_dim=params219SW['dec_hid_dim'], attention=attn, dropout=params219SW['dropout']) attn_model219SW = Seq2Seq(encoder=enc, decoder=dec, device=device) attn_model219SW.load_state_dict(torch.load('AttnSeq2Seq-219M-SW_epoch35.pt', map_location=torch.device('cpu'))) attn_model219SW.to(device) enc = Encoder(input_dim=params219SW['input_dim'], emb_dim=params219SW['emb_dim'], enc_hid_dim=params219SW['enc_hid_dim'], dec_hid_dim=params219SW['dec_hid_dim'], dropout=params219SW['dropout']) dec = Decoder(output_dim=params219SW['input_dim'], emb_dim=params219SW['emb_dim'], enc_hid_dim=params219SW['enc_hid_dim'], dec_hid_dim=params219SW['dec_hid_dim'], dropout=params219SW['dropout']) norm_model219SW = Seq2Seq(encoder=enc, decoder=dec, device=device) norm_model219SW.load_state_dict(torch.load('NormSeq2Seq-219M-SW_epoch35.pt', map_location=torch.device('cpu'))) norm_model219SW.to(device) nlp = spacy.load('en_core_web_sm') models_dict = {'AttentionSeq2Seq-188M': attn_model, 'NormalSeq2Seq-188M': norm_model, 'AttentionSeq2Seq-219M': attn_model219, 'NormalSeq2Seq-219M': norm_model219, 'AttentionSeq2Seq-219M-SW': attn_model219SW, 'NormalSeq2Seq-219M-SW': norm_model219SW} def generateAttn188(sentence, history, max_len=12, word2idx=word2idx, idx2word=idx2word, device=device, tokenize=tokenize, preprocess_text=preprocess_text, lookup_words=lookup_words, models_dict=models_dict): """ Generate response :param model: model :param sentence: sentence :param max_len: maximum length of sequence :param word2idx: word to index mapping :param idx2word: index to word mapping :return: response """ history = history model = models_dict['AttentionSeq2Seq-188M'] model.eval() sentence = preprocess_text(sentence) tokens = tokenize(sentence) tokens = [word2idx[token] if token in word2idx else word2idx[''] for token in tokens] tokens = [word2idx['']] + tokens + [word2idx['']] tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) outputs = [word2idx['']] with torch.no_grad(): encoder_outputs, hidden = model.encoder(tokens) for t in range(max_len): output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) top1 = output.max(1)[1] outputs.append(top1.item()) if top1.item() == word2idx['']: break response = lookup_words(idx2word, outputs) return ' '.join(response).replace('', '').replace('', '').strip() def generateNorm188(sentence, history, max_len=12, word2idx=word2idx, idx2word=idx2word, device=device, tokenize=tokenize, preprocess_text=preprocess_text, lookup_words=lookup_words, models_dict=models_dict): """ Generate response :param model: model :param sentence: sentence :param max_len: maximum length of sequence :param word2idx: word to index mapping :param idx2word: index to word mapping :return: response """ history = history model = models_dict['NormalSeq2Seq-188M'] model.eval() sentence = preprocess_text(sentence) tokens = tokenize(sentence) tokens = [word2idx[token] if token in word2idx else word2idx[''] for token in tokens] tokens = [word2idx['']] + tokens + [word2idx['']] tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) outputs = [word2idx['']] with torch.no_grad(): encoder_outputs, hidden = model.encoder(tokens) for t in range(max_len): output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) top1 = output.max(1)[1] outputs.append(top1.item()) if top1.item() == word2idx['']: break response = lookup_words(idx2word, outputs) return ' '.join(response).replace('', '').replace('', '').strip() def generateAttn219(sentence, history, max_len=12, word2idx=word2idx2, idx2word=idx2word2, device=device, tokenize=tokenize, preprocess_text=preprocess_text, lookup_words=lookup_words, models_dict=models_dict): """ Generate response :param model: model :param sentence: sentence :param max_len: maximum length of sequence :param word2idx: word to index mapping :param idx2word: index to word mapping :return: response """ history = history model = models_dict['AttentionSeq2Seq-219M'] model.eval() sentence = preprocess_text(sentence) tokens = tokenize(sentence) tokens = [word2idx[token] if token in word2idx else word2idx[''] for token in tokens] tokens = [word2idx['']] + tokens + [word2idx['']] tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) outputs = [word2idx['']] with torch.no_grad(): encoder_outputs, hidden = model.encoder(tokens) for t in range(max_len): output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) top1 = output.max(1)[1] outputs.append(top1.item()) if top1.item() == word2idx['']: break response = lookup_words(idx2word, outputs) return ' '.join(response).replace('', '').replace('', '').strip() def generateNorm219(sentence, history, max_len=12, word2idx=word2idx2, idx2word=idx2word2, device=device, tokenize=tokenize, preprocess_text=preprocess_text, lookup_words=lookup_words, models_dict=models_dict): """ Generate response :param model: model :param sentence: sentence :param max_len: maximum length of sequence :param word2idx: word to index mapping :param idx2word: index to word mapping :return: response """ history = history model = models_dict['NormalSeq2Seq-219M'] model.eval() sentence = preprocess_text(sentence) tokens = tokenize(sentence) tokens = [word2idx[token] if token in word2idx else word2idx[''] for token in tokens] tokens = [word2idx['']] + tokens + [word2idx['']] tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) outputs = [word2idx['']] with torch.no_grad(): encoder_outputs, hidden = model.encoder(tokens) for t in range(max_len): output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) top1 = output.max(1)[1] outputs.append(top1.item()) if top1.item() == word2idx['']: break response = lookup_words(idx2word, outputs) return ' '.join(response).replace('', '').replace('', '').strip() def tokenize_context(text, nlp=nlp): """ Tokenize text and remove stop words :param text: text to be tokenized :return: list of tokens """ return [tok.text for tok in nlp.tokenizer(text) if not tok.is_stop] def generateAttn219SW(sentence, history, max_len=12, word2idx=word2idx3, idx2word=idx2word3, device=device, tokenize_context=tokenize_context, preprocess_text=preprocess_text, lookup_words=lookup_words, models_dict=models_dict): """ Generate response :param model: model :param sentence: sentence :param max_len: maximum length of sequence :param word2idx: word to index mapping :param idx2word: index to word mapping :return: response """ history = history model = models_dict['AttentionSeq2Seq-219M'] model.eval() sentence = preprocess_text(sentence) tokens = tokenize_context(sentence) tokens = [word2idx[token] if token in word2idx else word2idx[''] for token in tokens] tokens = [word2idx['']] + tokens + [word2idx['']] tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) outputs = [word2idx['']] with torch.no_grad(): encoder_outputs, hidden = model.encoder(tokens) for t in range(max_len): output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) top1 = output.max(1)[1] outputs.append(top1.item()) if top1.item() == word2idx['']: break response = lookup_words(idx2word, outputs) return ' '.join(response).replace('', '').replace('', '').strip() def generateNorm219SW(sentence, history, max_len=12, word2idx=word2idx3, idx2word=idx2word3, device=device, tokenize_context=tokenize_context, preprocess_text=preprocess_text, lookup_words=lookup_words, models_dict=models_dict): """ Generate response :param model: model :param sentence: sentence :param max_len: maximum length of sequence :param word2idx: word to index mapping :param idx2word: index to word mapping :return: response """ history = history model = models_dict['NormalSeq2Seq-219M'] model.eval() sentence = preprocess_text(sentence) tokens = tokenize_context(sentence) tokens = [word2idx[token] if token in word2idx else word2idx[''] for token in tokens] tokens = [word2idx['']] + tokens + [word2idx['']] tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) outputs = [word2idx['']] with torch.no_grad(): encoder_outputs, hidden = model.encoder(tokens) for t in range(max_len): output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) top1 = output.max(1)[1] outputs.append(top1.item()) if top1.item() == word2idx['']: break response = lookup_words(idx2word, outputs) return ' '.join(response).replace('', '').replace('', '').strip() norm188 = gr.ChatInterface(generateNorm188, title="NormalSeq2Seq-188M", description="""Seq2Seq Generative Chatbot without Attention. 188,204,500 trainable parameters""") norm219 = gr.ChatInterface(generateNorm219, title="NormalSeq2Seq-219M", description="""Seq2Seq Generative Chatbot without Attention. 219,456,724 trainable parameters""") norm219sw = gr.ChatInterface(generateNorm219SW, title="NormalSeq2Seq-219M-SW", description="""Seq2Seq Generative Chatbot without Attention. 219,451,344 trainable parameters Trained with stop words removed for context (input) and more data.""") attn188 = gr.ChatInterface(generateAttn188, title="AttentionSeq2Seq-188M", description="""Seq2Seq Generative Chatbot with Attention. 188,229,108 trainable parameters""") attn219 = gr.ChatInterface(generateAttn219, title="AttentionSeq2Seq-219M", description="""Seq2Seq Generative Chatbot with Attention. 219,505,940 trainable parameters """) attn219sw = gr.ChatInterface(generateAttn219SW, title="AttentionSeq2Seq-219M-SW", description="""Seq2Seq Generative Chatbot with Attention. 219,500,560 trainable parameters Trained with stop words removed for context (input) and more data""") with gr.Blocks() as demo: gr.Markdown(""" > This chatbot is created as part of the Group Project Practical Assessment for University of Liverpool's CSCK507 Natural Language Processing and Understanding (June 2023) > Disclaimer: Please be advised that this chatbot is an AI language model designed to generate responses based on patterns in data it has been trained on (Ubuntu Dialogue Dataset). While efforts have been made to ensure that the responses generated are appropriate and respectful, there is a possibility that the chatbot may occasionally produce content that could be offensive, vulgar, or inappropriate.""") gr.TabbedInterface([norm188, norm219, norm219sw], ["188M", "219M", "219M-SW"]) gr.TabbedInterface([attn188, attn219, attn219sw], ["188M", "219M", "219M-SW"]) if __name__ == "__main__": demo.launch()