Spaces:
Sleeping
Sleeping
import gradio as gr | |
import inseq | |
import captum | |
import torch | |
import os | |
# import nltk | |
import argparse | |
import random | |
import numpy as np | |
import pandas as pd | |
from argparse import Namespace | |
from tqdm.notebook import tqdm | |
from torch.utils.data import DataLoader | |
from functools import partial | |
from transformers import AutoTokenizer, MarianTokenizer, AutoModel, AutoModelForSeq2SeqLM, MarianMTModel | |
model_es = "Helsinki-NLP/opus-mt-en-es" | |
model_fr = "Helsinki-NLP/opus-mt-en-fr" | |
model_zh = "Helsinki-NLP/opus-mt-en-zh" | |
model_sw = "Helsinki-NLP/opus-mt-en-sw" | |
tokenizer_es = AutoTokenizer.from_pretrained(model_es) | |
tokenizer_fr = AutoTokenizer.from_pretrained(model_fr) | |
tokenizer_zh = AutoTokenizer.from_pretrained(model_zh) | |
tokenizer_sw = AutoTokenizer.from_pretrained(model_sw) | |
model_tr_es = MarianMTModel.from_pretrained(model_es) | |
model_tr_fr = MarianMTModel.from_pretrained(model_fr) | |
model_tr_zh = MarianMTModel.from_pretrained(model_zh) | |
model_tr_sw = MarianMTModel.from_pretrained(model_sw) | |
model_es = inseq.load_model("Helsinki-NLP/opus-mt-en-es", "input_x_gradient") | |
model_fr = inseq.load_model("Helsinki-NLP/opus-mt-en-fr", "input_x_gradient") | |
model_zh = inseq.load_model("Helsinki-NLP/opus-mt-en-zh", "input_x_gradient") | |
model_sw = inseq.load_model("Helsinki-NLP/opus-mt-en-sw", "input_x_gradient") | |
dict_models = { | |
'en-es': model_es, | |
'en-fr': model_fr, | |
'en-zh': model_zh, | |
'en-sw': model_sw, | |
} | |
dict_models_tr = { | |
'en-es': model_tr_es, | |
'en-fr': model_tr_fr, | |
'en-zh': model_tr_zh, | |
'en-sw': model_tr_sw, | |
} | |
dict_tokenizer_tr = { | |
'en-es': tokenizer_es, | |
'en-fr': tokenizer_fr, | |
'en-zh': tokenizer_zh, | |
'en-sw': tokenizer_sw, | |
} | |
saliency_examples = [ | |
"Peace of Mind: Protection for consumers.", | |
"The sustainable development goals report: towards a rescue plan for people and planet", | |
"We will leave no stone unturned to hold those responsible to account.", | |
"The clock is now ticking on our work to finalise the remaining key legislative proposals presented by this Commission to ensure that citizens and businesses can reap the benefits of our policy actions.", | |
"Pumpkins, squash and gourds, fresh or chilled, excluding courgettes", | |
"The labour market participation of mothers with infants has even deteriorated over the past two decades, often impacting their career and incomes for years.", | |
] | |
contrastive_examples = [ | |
["Peace of Mind: Protection for consumers.", | |
"Paz mental: protección de los consumidores", | |
"Paz de la mente: protección de los consumidores"], | |
["the slaughterer has finished his work.", | |
"l'abatteur a terminé son travail.", | |
"l'abatteuse a terminé son travail."], | |
['A fundamental shift is needed - in commitment, solidarity, financing and action - to put the world on a better path.', | |
'需要在承诺、团结、筹资和行动方面进行根本转变,使世界走上更美好的道路。', | |
'我们需要从根本上转变承诺、团结、资助和行动,使世界走上更美好的道路。',] | |
] | |
#Load challenge set examples | |
df_challenge_set = pd.read_csv("challenge_sets.csv") | |
arr_challenge_set = df_challenge_set.values | |
arr_challenge_set = [[x[2], x[3], x[4], x[5]] for x in arr_challenge_set] | |
def get_k_prob_tokens(transition_scores, result, model, k_values=5): | |
tokenizer_tr = dict_tokenizer_tr[model] | |
gen_sequences = result.sequences[:, 1:] | |
result_output = [] | |
# First beam only... | |
bs = 0 | |
text = ' ' | |
for tok, score, i_step in zip(gen_sequences[bs], transition_scores[bs],range(len(gen_sequences[bs]))): | |
beam_i = result.beam_indices[0][i_step] | |
if beam_i < 0: | |
beam_i = bs | |
bs_alt = [tokenizer_tr.decode(tok) for tok in result.scores[i_step][beam_i].topk(k_values).indices ] | |
bs_alt_scores = np.exp(result.scores[i_step][beam_i].topk(k_values).values) | |
result_output.append([np.array(result.scores[i_step][beam_i].topk(k_values).indices), np.array(bs_alt_scores),bs_alt]) | |
return result_output | |
def split_token_from_sequences(sequences, model) -> dict : | |
n_sentences = len(sequences) | |
gen_sequences_texts = [] | |
for bs in range(n_sentences): | |
# gen_sequences_texts.append(dict_tokenizer_tr[model].decode(sequences[:, 1:][bs], skip_special_tokens=True).split(' ')) | |
#### decoder per token. | |
seq_bs = [] | |
for token in sequences[:, 1:][bs]: | |
seq_bs.append(dict_tokenizer_tr[model].decode(token, skip_special_tokens=True)) | |
gen_sequences_texts.append(seq_bs) | |
score = 0 | |
#raw dict is bos | |
text = 'bos' | |
new_id = text +'--1' | |
dict_parent = [{'id': new_id, 'parentId': None , 'text': text, 'name': 'bos', 'prob': score }] | |
id_dict_pos = {} | |
step_i = 0 | |
cont = True | |
words_by_step = [] #[['bos' for i in range(n_sentences)]] | |
while cont: | |
# append to dict_parent for all beams of step_i | |
cont = False | |
step_words = [] | |
for beam in range(n_sentences): | |
app_text = '<empty_word>' | |
if step_i < len(gen_sequences_texts[beam]): | |
app_text = gen_sequences_texts[beam][step_i] | |
cont = True | |
step_words.append(app_text) | |
words_by_step.append(step_words) | |
print(words_by_step) | |
for i_bs, step_w in enumerate(step_words): | |
if not step_w in ['<empty_word>', '<pad>']: | |
#new id if the same word is not in another beam (?) [beam[i] was a token id] | |
#parent id = previous word and previous step. | |
# new_parent_id = "-".join([str(beam[i]) for i in range(step_i)]) | |
new_id = "-".join([str(words_by_step[i][i_bs])+ '-' + str(i) for i in range(step_i+1)]) | |
parent_id = "-".join([words_by_step[i][i_bs] + '-' + str(i) for i in range(step_i) ]) | |
# new_id = step_w +'-' + str(step_i) | |
# parent_id = words_by_step[step_i-1][i_bs] + '-' + str(step_i -1) | |
next_word_flag = 1 | |
if step_i == 0 : | |
parent_id = 'bos--1' | |
## if the dict already exists remove it, if it is not a root... | |
## root?? then next is '' | |
else: | |
next_word_flag = len(gen_sequences_texts[i_bs][step_i]) > step_i ## Not in step_i = 0; | |
if next_word_flag: | |
if not (new_id in id_dict_pos): | |
dict_parent.append({'id': new_id, 'parentId': parent_id , 'text': step_w, 'name': step_w, 'prob' : score }) | |
id_dict_pos[new_id] = len(dict_parent) - 1 | |
else: | |
if not (new_id in id_dict_pos): | |
dict_parent.append({'id': new_id, 'parentId': parent_id , 'text': step_w, 'name': step_w, 'prob' : score }) | |
id_dict_pos[new_id] = len(dict_parent) - 1 | |
step_i += 1 | |
return dict_parent | |
html = """ | |
<html> | |
<script async src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.6/require.min.js"></script> | |
<body> | |
<p id="demo"></p> | |
<p id="viz"></p> | |
<p id="demo2"></p> | |
<h4> Exploring top-k probable tokens </h4> | |
<div id="d3_text_grid">... top 10 tokens generated at each step ...</div> | |
<h4> Exploring the Beam Search sequence generation</h4> | |
<div id="d3_beam_search">... top 4 generated sequences using Beam Search...</div> | |
</body> | |
</html> | |
""" | |
def sentence_maker(w1, model, var2={}): | |
#translate and get internal values | |
# src_text = saliency_examples[0] | |
inputs = dict_tokenizer_tr[model](w1, return_tensors="pt") | |
num_ret_seq = 4 | |
translated = dict_models_tr[model].generate(**inputs, | |
num_beams=4, | |
num_return_sequences=num_ret_seq, | |
return_dict_in_generate=True, | |
output_attentions =True, | |
output_hidden_states = True, | |
output_scores=True,) | |
beam_dict = split_token_from_sequences(translated.sequences,model ) | |
tgt_text = dict_tokenizer_tr[model].decode(translated.sequences[0], skip_special_tokens=True) | |
transition_scores = dict_models_tr[model].compute_transition_scores( | |
translated.sequences, translated.scores, translated.beam_indices , normalize_logits=True | |
) | |
prob_tokens = get_k_prob_tokens(transition_scores, translated, model, k_values=10) | |
return [tgt_text,[beam_dict,prob_tokens]] | |
def sentence_maker2(w1,j2): | |
print(w1,j2) | |
return "in sentence22..." | |
with gr.Blocks(js="plotsjs.js") as demo: | |
gr.Markdown( | |
""" | |
# MAKE NMT Workshop \t `BeamSearch` | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Markdown( | |
""" | |
### Translation | |
""") | |
in_text = gr.Textbox(label="source text") | |
out_text = gr.Textbox(label="target text") | |
out_text2 = gr.Textbox(visible=False) | |
var2 = gr.JSON(visible=False) | |
with gr.Column(scale=4): | |
gr.Markdown( | |
""" | |
### If challenge is selected from the challenge set list bellow | |
""") | |
challenge_ex = gr.Textbox(label="Challenge", interactive=False) | |
category_minor = gr.Textbox(label="category_minor", interactive=False) | |
category_major = gr.Textbox(label="category_major", interactive=False) | |
with gr.Accordion("Challenge selection:"): | |
gr.Examples(arr_challenge_set,[in_text, challenge_ex,category_minor,category_major], label="") | |
radio_c = gr.Radio(choices=['en-zh', 'en-es', 'en-fr', 'en-sw'], value="en-zh", label= '', container=False) | |
btn = gr.Button("Translate") | |
input_mic = gr.HTML(html) | |
btn.click(sentence_maker, [in_text, radio_c], [out_text,var2], js="(in_text,radio_c) => testFn_out(in_text,radio_c)") #should return an output comp. | |
out_text.change(sentence_maker2, [out_text, var2], out_text2, js="(out_text,var2) => testFn_out_json(var2)") # | |
# run script function on load, | |
# demo.load(None,None,None,js="plotsjs.js") | |
if __name__ == "__main__": | |
demo.launch() |