import streamlit as st if not hasattr(st, "cache_resource"): st.cache_resource = st.experimental_singleton import torch import torch.nn as nn import torch.nn.functional as F import transformers import pandas as pd device = torch.device("cuda" if torch.cuda.is_available() else "cpu") from transformers import MarianMTModel, MarianTokenizer model_options = [ 'Helsinki-NLP/opus-mt-roa-en', 'Helsinki-NLP/opus-mt-en-roa', ] col1, col2 = st.columns(2) with col1: model_name = st.selectbox("Select a model", model_options + ['other']) if model_name == 'other': model_name = st.text_input("Enter model name", model_options[0]) @st.cache_resource def get_tokenizer(model_name): return MarianTokenizer.from_pretrained(model_name) @st.cache_resource def get_model(model_name): model = MarianMTModel.from_pretrained(model_name).to(device) print(f"Loaded model, {model.num_parameters():,d} parameters.") return model tokenizer = get_tokenizer(model_name) model = get_model(model_name) if tokenizer.supported_language_codes: lang_code = st.selectbox("Select a language", tokenizer.supported_language_codes) else: lang_code = None with col2: input_text = st.text_input("Enter text to translate", "Hola, mi nombre es Juan") input_text = input_text.strip() if not input_text: st.stop() # prepend the language code if necessary if lang_code: input_text = f"{lang_code} {input_text}" input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(device) example_generations = model.generate( input_ids, num_beams=4, num_return_sequences=4, ) col1, col2 = st.columns(2) with col1: st.write("Example generations:") st.write('\n'.join( '- ' + translation for translation in tokenizer.batch_decode(example_generations, skip_special_tokens=True))) with col2: example_first_word = tokenizer.decode(example_generations[0, 1]) output_so_far = st.text_input("Enter text translated so far", example_first_word) # tokenize the output so far with tokenizer.as_target_tokenizer(): output_tokens = tokenizer.tokenize(output_so_far) decoder_input_ids = tokenizer.convert_tokens_to_ids(output_tokens) # Add the start token decoder_input_ids = [model.config.decoder_start_token_id] + decoder_input_ids with torch.no_grad(): model_output = model( input_ids = input_ids, decoder_input_ids = torch.tensor([decoder_input_ids]).to(device)) last_token_logits = model_output.logits[0, -1].cpu() assert len(last_token_logits.shape) == 1 most_likely_tokens = last_token_logits.topk(k=20) probs = last_token_logits.softmax(dim=-1) probs_for_likely_tokens = probs[most_likely_tokens.indices] with tokenizer.as_target_tokenizer(): probs_table = pd.DataFrame({ 'token': [tokenizer.decode(token_id) for token_id in most_likely_tokens.indices], 'id': most_likely_tokens.indices, 'probability': probs_for_likely_tokens, 'logprob': probs_for_likely_tokens.log(), 'cumulative probability': probs_for_likely_tokens.cumsum(0) }) st.subheader("Most likely next tokens") st.table(probs_table.style.hide(axis='index')) if len(decoder_input_ids) > 1: st.subheader("Loss by already-generated token") loss_table = pd.DataFrame({ 'token': [tokenizer.decode(token_id) for token_id in decoder_input_ids[1:]], 'loss': F.cross_entropy(model_output.logits[0, :-1], torch.tensor(decoder_input_ids[1:]).to(device), reduction='none').cpu() }) st.write(loss_table) st.write("Total loss so far:", loss_table.loss.sum())