Spaces:
Running
Running
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import functools | |
import unittest | |
from typing import Any, Dict, Sequence | |
import fairseq | |
import fairseq.options | |
import fairseq.tasks | |
import torch | |
from tests.utils import dummy_dictionary | |
VOCAB_SIZE = 100 | |
class FakeTask(fairseq.tasks.LegacyFairseqTask): | |
def __init__(self, args): | |
super().__init__(args) | |
self.dictionary = dummy_dictionary(VOCAB_SIZE - 4) | |
assert len(self.dictionary) == VOCAB_SIZE | |
def source_dictionary(self): | |
return self.dictionary | |
def target_dictionary(self): | |
return self.dictionary | |
def get_toy_model( | |
device: str, | |
architecture: str = "roberta_enc_dec", | |
**extra_args: Any, | |
): | |
assert device in ("gpu", "cpu") | |
kwargs = { | |
"arch": architecture, | |
# Use characteristics dimensions | |
"encoder_layers": 3, | |
"encoder_embed_dim": 12, | |
"encoder_ffn_embed_dim": 14, | |
"encoder_attention_heads": 4, | |
"decoder_layers": 3, | |
"decoder_embed_dim": 12, | |
"decoder_ffn_embed_dim": 14, | |
"decoder_attention_heads": 4, | |
# Disable dropout so we have comparable tests. | |
"dropout": 0, | |
"attention_dropout": 0, | |
"activation_dropout": 0, | |
"encoder_layerdrop": 0, | |
# required args | |
"tokens_per_sample": 256, | |
"data": "/tmp/test_roberta", | |
} | |
kwargs.update(extra_args) | |
fake_task = FakeTask(kwargs) | |
args = fairseq.options.get_args( | |
task="online_backtranslation", | |
mono_langs="en,ro", | |
valid_lang_pairs="en-ro", | |
**kwargs, | |
) | |
torch.manual_seed(0) | |
model = fake_task.build_model(args) | |
if device == "gpu": | |
model.cuda() | |
return fake_task, model | |
def mk_sample( | |
lang: str, device: str, tok: Sequence[int] = None, batch_size: int = 2 | |
) -> Dict[str, Any]: | |
assert device in ("gpu", "cpu") | |
if not tok: | |
if lang == "en": | |
tok = [10, 11, 12, 13, 14, 15, 2] | |
else: | |
tok = [20, 21, 22, 23, 24, 25, 26, 27, 2] | |
batch = torch.stack([torch.tensor(tok, dtype=torch.long)] * batch_size) | |
if device == "gpu": | |
batch = batch.cuda() | |
sample = { | |
"net_input": { | |
"src_tokens": batch, | |
"prev_output_tokens": batch, | |
"src_lengths": torch.tensor( | |
[len(tok)] * batch_size, dtype=torch.long, device=batch.device | |
), | |
}, | |
"target": batch[:, 1:], | |
} | |
return sample | |
def cpu_gpu(fn): | |
def helper(self): | |
fn(self, "cpu") | |
if torch.cuda.is_available(): | |
fn(self, "gpu") | |
return helper | |
def architectures(fn): | |
def helper(self): | |
for arch in ["roberta_enc_dec", "transformer"]: | |
fn(self, arch) | |
return helper | |
class RobertaTest(unittest.TestCase): | |
def assertTensorEqual(self, t1, t2, delta: float = 1e-6): | |
self.assertEqual(t1.size(), t2.size(), "size mismatch") | |
if delta == 0.0: | |
self.assertEqual(t1.ne(t2).long().sum(), 0) | |
else: | |
self.assertEqual(((t2 - t1).abs() > delta).long().sum(), 0) | |
def assertSharing(self, model, link_groups: Sequence[Sequence[str]]): | |
ids = {} | |
for group in link_groups: | |
group_ids = {name: id(params(model, name)) for name in group} | |
shared_id = group_ids[group[0]] | |
self.assertEqual(group_ids, {name: shared_id for name in group}) | |
self.assertNotIn(shared_id, ids) | |
ids[shared_id] = group | |
def test_roberta_shared_params(self): | |
_, roberta = get_toy_model("cpu", architecture="roberta") | |
self.assertSharing( | |
roberta, | |
[ | |
[ | |
"encoder.sentence_encoder.embed_tokens.weight", | |
"encoder.lm_head.weight", | |
] | |
], | |
) | |
_, roberta = get_toy_model( | |
"cpu", architecture="roberta", untie_weights_roberta=True | |
) | |
self.assertSharing( | |
roberta, | |
[ | |
["encoder.sentence_encoder.embed_tokens.weight"], | |
["encoder.lm_head.weight"], | |
], | |
) | |
def test_roberta_enc_dec_shared_params(self): | |
# 3 distinct embeddings | |
_, enc_dec = get_toy_model("cpu", architecture="roberta_enc_dec") | |
self.assertSharing( | |
enc_dec, | |
[ | |
["encoder.embed_tokens.weight"], | |
["decoder.embed_tokens.weight"], | |
["decoder.reg_head.weight"], | |
], | |
) | |
# 2 distinct embeddings, one for encoder, one for decoder | |
_, enc_dec = get_toy_model( | |
"cpu", architecture="roberta_enc_dec", share_decoder_input_output_embed=True | |
) | |
self.assertSharing( | |
enc_dec, | |
[ | |
["encoder.embed_tokens.weight"], | |
[ | |
"decoder.embed_tokens.weight", | |
"decoder.reg_head.weight", | |
], | |
], | |
) | |
# shared embeddings | |
_, enc_dec = get_toy_model( | |
"cpu", architecture="roberta_enc_dec", share_all_embeddings=True | |
) | |
self.assertSharing( | |
enc_dec, | |
[ | |
[ | |
"encoder.embed_tokens.weight", | |
"decoder.embed_tokens.weight", | |
"decoder.reg_head.weight", | |
] | |
], | |
) | |
def test_roberta_max_positions_is_correctly_set(self): | |
device = "cpu" | |
task, model = get_toy_model(device) | |
max_pos = model.max_decoder_positions() | |
self.assertEqual(max_pos, 256) | |
self.assertEqual(max_pos, model.decoder.max_positions()) | |
self.assertEqual(max_pos, model.encoder.max_positions()) | |
self.assertEqual(max_pos, model.encoder.embed_positions.max_positions) | |
sentence = [31 for _ in range(max_pos)] | |
sample = mk_sample("en", device, sentence, batch_size=1) | |
self.assertEqual(list(sample["net_input"]["src_lengths"]), [max_pos]) | |
self.assertEqual(len(sample["net_input"]["src_tokens"][0]), max_pos) | |
x, _ = model.forward(**sample["net_input"]) | |
self.assertEqual(x.shape, (1, max_pos, VOCAB_SIZE)) | |
def test_roberta_forward_backward(self, device: str): | |
_, model = get_toy_model(device) | |
sample = mk_sample("en", device) | |
en_tokens = sample["net_input"]["src_tokens"] | |
(bs, l) = en_tokens.shape | |
# Forward | |
logits, _ = model(**sample["net_input"]) | |
self.assertEqual(logits.shape, (bs, l, VOCAB_SIZE)) | |
# Backward | |
loss = logits.sum() | |
loss.backward() | |
def test_roberta_forward_backward_bs1(self, device: str): | |
_, model = get_toy_model(device) | |
sample = mk_sample("en", device, batch_size=1) | |
o, _ = model.forward(**sample["net_input"]) | |
loss = o.sum() | |
sample2 = mk_sample("ro", device, batch_size=1) | |
o, _ = model.forward(**sample2["net_input"]) | |
loss += o.sum() | |
loss.backward() | |
def test_roberta_batching(self, device: str): | |
""" | |
Checks that the batch of size 2 give twice the same results than the batch of size 1. | |
""" | |
_, model = get_toy_model(device) | |
sample = mk_sample("en", device, batch_size=1) | |
slen = sample["net_input"]["src_lengths"][0] | |
sample2 = mk_sample("en", device, batch_size=2) | |
with torch.no_grad(): | |
z = model.encoder.forward( | |
sample["net_input"]["src_tokens"], sample["net_input"]["src_lengths"] | |
) | |
z = z["encoder_out"][-1] | |
logits, _ = model.forward(**sample["net_input"]) | |
z2 = model.encoder.forward( | |
sample2["net_input"]["src_tokens"], sample["net_input"]["src_lengths"] | |
) | |
z2 = z2["encoder_out"][-1] | |
logits2, _ = model.forward(**sample2["net_input"]) | |
self.assertEqual(z.shape, (slen, 1, 12)) | |
self.assertEqual(z2.shape, (slen, 2, 12)) | |
self.assertTensorEqual(logits2[0], logits2[1]) | |
self.assertTensorEqual(logits[0], logits2[0]) | |
def test_roberta_incremental_decoder(self, device: str): | |
""" | |
Checks that incremental decoding yields the same result than non incremental one. | |
""" | |
task, model = get_toy_model(device) | |
en_sample = mk_sample("en", device) | |
en_tokens = en_sample["net_input"]["src_tokens"] | |
ro_sample = mk_sample("ro", device) | |
ro_tokens = ro_sample["net_input"]["src_tokens"] | |
en_enc = model.encoder.forward( | |
en_tokens, src_lengths=en_sample["net_input"]["src_lengths"] | |
) | |
(bs, tgt_len) = ro_tokens.shape | |
# Decode without incremental state | |
ro_dec, _ = model.decoder.forward(ro_tokens, encoder_out=en_enc) | |
self.assertEqual(ro_dec.shape, (bs, tgt_len, VOCAB_SIZE)) | |
self.assertTensorEqual(ro_dec[0], ro_dec[1]) | |
# Decode with incremental state | |
inc_state = {} | |
ro_dec_inc = [] | |
for l in range(tgt_len): | |
ro, _ = model.decoder.forward( | |
ro_tokens[:, : l + 1], encoder_out=en_enc, incremental_state=inc_state | |
) | |
self.assertEqual(ro.shape, (bs, 1, VOCAB_SIZE)) | |
ro_dec_inc.append(ro) | |
for l in range(tgt_len): | |
# Intra-batch | |
self.assertTensorEqual(ro_dec_inc[l][0], ro_dec_inc[l][1]) | |
# Incremental vs non-incremental | |
self.assertTensorEqual(ro_dec_inc[l][:, 0], ro_dec[:, l]) | |
def params(model, name): | |
if "." not in name: | |
return getattr(model, name) | |
prefix, name = name.split(".", 1) | |
return params(getattr(model, prefix), name) | |