|
import json |
|
import re |
|
import unicodedata |
|
from typing import Tuple |
|
|
|
import gradio as gr |
|
import torch |
|
import torch.nn as nn |
|
|
|
|
|
def greet(name): |
|
return "Hello " + name + "!!" |
|
|
|
|
|
|
|
with open('vocab/word2idx.json', 'r') as f: |
|
word2idx = json.load(f) |
|
with open('vocab/idx2word.json', 'r') as f: |
|
idx2word = json.load(f) |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
def unicodetoascii(text): |
|
""" |
|
Turn a Unicode string to plain ASCII |
|
|
|
:param text: text to be converted |
|
:return: text in ascii format |
|
""" |
|
normalized_text = unicodedata.normalize('NFKD', str(text)) |
|
ascii_text = ''.join(char for char in normalized_text if unicodedata.category(char) != 'Mn') |
|
return ascii_text |
|
|
|
def preprocess_text(text, fn=unicodetoascii): |
|
|
|
text = fn(text) |
|
text = text.lower() |
|
text = re.sub(r'http\S+', '', text) |
|
text = re.sub(r'[^\x00-\x7F]+', "", text) |
|
text = re.sub(r"(\w)[!?]+(\w)", r'\1\2', text) |
|
text = re.sub(r"\s\s+", r" ", text).strip() |
|
return text |
|
|
|
def tokenize(text): |
|
""" |
|
Tokenize text |
|
:param text: text to be tokenized |
|
:return: list of tokens |
|
""" |
|
return text.split() |
|
|
|
def lookup_words(idx2word, indices): |
|
""" |
|
Lookup words from indices |
|
:param idx2word: index to word mapping |
|
:param indices: indices to be converted |
|
:return: list of words |
|
""" |
|
return [idx2word[str(idx)] for idx in indices] |
|
|
|
|
|
params = {'input_dim': len(word2idx), |
|
'emb_dim': 128, |
|
'enc_hid_dim': 256, |
|
'dec_hid_dim': 256, |
|
'dropout': 0.5, |
|
'attn_dim': 32, |
|
'teacher_forcing_ratio': 0.5, |
|
'epochs': 35} |
|
|
|
class Encoder(nn.Module): |
|
""" |
|
GRU RNN Encoder |
|
""" |
|
def __init__(self, |
|
input_dim: int, |
|
emb_dim: int, |
|
enc_hid_dim: int, |
|
dec_hid_dim: int, |
|
dropout: float = 0): |
|
super(Encoder, self).__init__() |
|
|
|
|
|
self.input_dim = input_dim |
|
|
|
self.emb_dim = emb_dim |
|
|
|
self.enc_hid_dim = enc_hid_dim |
|
|
|
self.dec_hid_dim = dec_hid_dim |
|
|
|
|
|
self.embedding = nn.Embedding(input_dim, emb_dim) |
|
|
|
|
|
self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True, batch_first=False, num_layers=1) |
|
self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim) |
|
|
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, src: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
|
embedded = self.dropout(self.embedding(src)) |
|
|
|
outputs, hidden = self.rnn(embedded) |
|
hidden = torch.tanh(self.fc(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1))) |
|
return outputs, hidden |
|
|
|
|
|
class Attention(nn.Module): |
|
""" |
|
Luong attention |
|
""" |
|
def __init__(self, |
|
enc_hid_dim: int, |
|
dec_hid_dim: int, |
|
attn_dim: int): |
|
super(Attention, self).__init__() |
|
|
|
|
|
self.enc_hid_dim = enc_hid_dim |
|
|
|
self.dec_hid_dim = dec_hid_dim |
|
self.attn_in = (enc_hid_dim * 2) + dec_hid_dim |
|
|
|
self.attn = nn.Linear(self.attn_in, attn_dim) |
|
|
|
def forward(self, |
|
decoder_hidden: torch.Tensor, |
|
encoder_outputs: torch.Tensor) -> torch.Tensor: |
|
|
|
src_len = encoder_outputs.shape[0] |
|
repeated_decoder_hidden = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1) |
|
encoder_outputs = encoder_outputs.permute(1, 0, 2) |
|
|
|
energy = torch.tanh(self.attn(torch.cat((repeated_decoder_hidden, encoder_outputs), dim=2))) |
|
attention = torch.sum(energy, dim=2) |
|
|
|
return F.softmax(attention, dim=1) |
|
|
|
|
|
class AttnDecoder(nn.Module): |
|
""" |
|
GRU RNN Decoder with attention |
|
""" |
|
def __init__(self, |
|
output_dim: int, |
|
emb_dim: int, |
|
enc_hid_dim: int, |
|
dec_hid_dim: int, |
|
attention: nn.Module, |
|
dropout: float = 0): |
|
super(AttnDecoder, self).__init__() |
|
|
|
|
|
self.output_dim = output_dim |
|
|
|
self.emb_dim = emb_dim |
|
|
|
self.enc_hid_dim = enc_hid_dim |
|
|
|
self.dec_hid_dim = dec_hid_dim |
|
|
|
self.dropout = dropout |
|
|
|
self.attention = attention |
|
|
|
|
|
self.embedding = nn.Embedding(output_dim, emb_dim) |
|
|
|
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim, batch_first=False, num_layers=1) |
|
self.out = nn.Linear(self.attention.attn_in + emb_dim, output_dim) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def encode_attention(self, |
|
decoder_hidden: torch.Tensor, |
|
encoder_outputs: torch.Tensor) -> torch.Tensor: |
|
|
|
a = self.attention(decoder_hidden, encoder_outputs) |
|
a = a.unsqueeze(1) |
|
encoder_outputs = encoder_outputs.permute(1, 0, 2) |
|
weighted_encoder_rep = torch.bmm(a, encoder_outputs) |
|
weighted_encoder_rep = weighted_encoder_rep.permute(1, 0, 2) |
|
return weighted_encoder_rep |
|
|
|
def forward(self, |
|
input: torch.Tensor, |
|
decoder_hidden: torch.Tensor, |
|
encoder_outputs: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
|
input = input.unsqueeze(0) |
|
|
|
embedded = self.dropout(self.embedding(input)) |
|
weighted_encoder = self.encode_attention(decoder_hidden, encoder_outputs) |
|
|
|
|
|
rnn_input = torch.cat((embedded, weighted_encoder), dim=2) |
|
output, decoder_hidden = self.rnn(rnn_input, decoder_hidden.unsqueeze(0)) |
|
|
|
embedded = embedded.squeeze(0) |
|
output = output.squeeze(0) |
|
weighted_encoder = weighted_encoder.squeeze(0) |
|
output = self.out(torch.cat((output, weighted_encoder, embedded), dim=1)) |
|
return output, decoder_hidden.squeeze(0) |
|
|
|
class Decoder(nn.Module): |
|
""" |
|
GRU RNN Decoder without attention |
|
""" |
|
def __init__(self, |
|
output_dim: int, |
|
emb_dim: int, |
|
enc_hid_dim: int, |
|
dec_hid_dim: int, |
|
dropout: float = 0): |
|
super(Decoder, self).__init__() |
|
|
|
|
|
self.output_dim = output_dim |
|
|
|
self.emb_dim = emb_dim |
|
|
|
self.enc_hid_dim = enc_hid_dim |
|
|
|
self.dec_hid_dim = dec_hid_dim |
|
|
|
self.dropout = dropout |
|
|
|
|
|
self.embedding = nn.Embedding(output_dim, emb_dim) |
|
|
|
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim, batch_first=False, num_layers=1) |
|
self.out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, |
|
input: torch.Tensor, |
|
decoder_hidden: torch.Tensor, |
|
encoder_outputs: torch.Tensor) -> Tuple[torch.Tensor |
|
, torch.Tensor]: |
|
|
|
input = input.unsqueeze(0) |
|
|
|
embedded = self.dropout(self.embedding(input)) |
|
context = encoder_outputs[-1,:,:] |
|
context = context.repeat(embedded.shape[0], 1, 1) |
|
embs_and_context = torch.cat((embedded, context), -1) |
|
|
|
output, decoder_hidden = self.rnn(embs_and_context, decoder_hidden.unsqueeze(0)) |
|
embedded = embedded.squeeze(0) |
|
output = output.squeeze(0) |
|
context = context.squeeze(0) |
|
output = self.out(torch.cat((output, embedded, context), -1)) |
|
return output, decoder_hidden.squeeze(0) |
|
|
|
class Seq2Seq(nn.Module): |
|
""" |
|
Seq-2-Seq model combining RNN encoder and RNN decoder |
|
""" |
|
def __init__(self, |
|
encoder: nn.Module, |
|
decoder: nn.Module, |
|
device: torch.device): |
|
super(Seq2Seq, self).__init__() |
|
|
|
self.encoder = encoder |
|
self.decoder = decoder |
|
self.device = device |
|
|
|
def forward(self, |
|
src: torch.Tensor, |
|
trg: torch.Tensor, |
|
teacher_forcing_ratio: float = 0.5) -> torch.Tensor: |
|
src = src.transpose(0, 1) |
|
trg = trg.transpose(0, 1) |
|
batch_size = src.shape[1] |
|
max_len = trg.shape[0] |
|
trg_vocab_size = self.decoder.output_dim |
|
|
|
outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device) |
|
encoder_outputs, hidden = self.encoder(src) |
|
|
|
|
|
output = trg[0,:] |
|
|
|
for t in range(1, max_len): |
|
output, hidden = self.decoder(output, hidden, encoder_outputs) |
|
outputs[t] = output |
|
teacher_force = random.random() < teacher_forcing_ratio |
|
top1 = output.max(1)[1] |
|
output = trg[t] if teacher_force else top1 |
|
|
|
return outputs |
|
|
|
|
|
|
|
enc = Encoder(input_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], dropout=params['dropout']) |
|
attn = Attention(enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], attn_dim=params['attn_dim']) |
|
dec = AttnDecoder(output_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], attention=attn, dropout=params['dropout']) |
|
attn_model = Seq2Seq(encoder=enc, decoder=dec, device=device) |
|
attn_model.load_state_dict(torch.load('models/AttnSeq2Seq-188M_epoch35.pt')) |
|
attn_model.to(device) |
|
|
|
enc = Encoder(input_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], dropout=params['dropout']) |
|
dec = Decoder(output_dim=params['input_dim'], emb_dim=params['emb_dim'], enc_hid_dim=params['enc_hid_dim'], dec_hid_dim=params['dec_hid_dim'], dropout=params['dropout']) |
|
norm_model = Seq2Seq(encoder=enc, decoder=dec, device=device) |
|
norm_model.load_state_dict(torch.load('models/NormSeq2Seq-188M_epoch35.pt')) |
|
norm_model.to(device) |
|
|
|
models_dict = {'AttentionSeq2Seq-188M': attn_model, 'NormalSeq2Seq-188M': norm_model} |
|
|
|
def generate(models_str, sentence, max_len=12, word2idx=word2idx, idx2word=idx2word, |
|
device=device, tokenize=tokenize, preprocess_text=preprocess_text, |
|
lookup_words=lookup_words, models_dict=models_dict): |
|
""" |
|
Generate response |
|
:param model: model |
|
:param sentence: sentence |
|
:param max_len: maximum length of sequence |
|
:param word2idx: word to index mapping |
|
:param idx2word: index to word mapping |
|
:return: response |
|
""" |
|
model = models_dict[models_str] |
|
model.eval() |
|
sentence = preprocess_text(sentence) |
|
tokens = tokenize(sentence) |
|
tokens = [word2idx[token] if token in word2idx else word2idx['<unk>'] for token in tokens] |
|
tokens = [word2idx['<bos>']] + tokens + [word2idx['<eos>']] |
|
tokens = torch.tensor(tokens, dtype=torch.long).unsqueeze(1).to(device) |
|
outputs = [word2idx['<bos>']] |
|
with torch.no_grad(): |
|
encoder_outputs, hidden = model.encoder(tokens) |
|
for t in range(max_len): |
|
output, hidden = model.decoder(torch.tensor([outputs[-1]], dtype=torch.long).to(device), hidden, encoder_outputs) |
|
top1 = output.max(1)[1] |
|
outputs.append(top1.item()) |
|
if top1.item() == word2idx['<eos>']: |
|
break |
|
response = lookup_words(idx2word, outputs) |
|
return ' '.join(response).replace('<bos>', '').replace('<eos>', '').strip() |
|
|
|
|
|
|
|
demo = gr.Interface(fn=generate, |
|
inputs=[gr.Radio(list(models_dict.keys()), label="Model"), |
|
gr.Textbox(lines=2, label="Input Text")], |
|
outputs=gr.Textbox(label="Output Text")) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|