Spaces:
Running
Running
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import unittest | |
from typing import Dict, List | |
import tests.utils as test_utils | |
import torch | |
from fairseq import utils | |
from fairseq.data import ( | |
Dictionary, | |
LanguagePairDataset, | |
TransformEosDataset, | |
data_utils, | |
noising, | |
) | |
class TestDataNoising(unittest.TestCase): | |
def _get_test_data_with_bpe_cont_marker(self, append_eos=True): | |
""" | |
Args: | |
append_eos: if True, each input sentence in the source tokens tensor | |
will have an EOS appended to the end. | |
Returns: | |
vocabs: BPE vocab with continuation markers as suffixes to denote | |
non-end of word tokens. This is the standard BPE format used in | |
fairseq's preprocessing. | |
x: input tensor containing numberized source tokens, with EOS at the | |
end if append_eos is true | |
src_lengths: and source lengths. | |
""" | |
vocab = Dictionary() | |
vocab.add_symbol("he@@") | |
vocab.add_symbol("llo") | |
vocab.add_symbol("how") | |
vocab.add_symbol("are") | |
vocab.add_symbol("y@@") | |
vocab.add_symbol("ou") | |
vocab.add_symbol("n@@") | |
vocab.add_symbol("ew") | |
vocab.add_symbol("or@@") | |
vocab.add_symbol("k") | |
src_tokens = [ | |
["he@@", "llo", "n@@", "ew", "y@@", "or@@", "k"], | |
["how", "are", "y@@", "ou"], | |
] | |
x, src_lengths = x, src_lengths = self._convert_src_tokens_to_tensor( | |
vocab=vocab, src_tokens=src_tokens, append_eos=append_eos | |
) | |
return vocab, x, src_lengths | |
def _get_test_data_with_bpe_end_marker(self, append_eos=True): | |
""" | |
Args: | |
append_eos: if True, each input sentence in the source tokens tensor | |
will have an EOS appended to the end. | |
Returns: | |
vocabs: BPE vocab with end-of-word markers as suffixes to denote | |
tokens at the end of a word. This is an alternative to fairseq's | |
standard preprocessing framework and is not generally supported | |
within fairseq. | |
x: input tensor containing numberized source tokens, with EOS at the | |
end if append_eos is true | |
src_lengths: and source lengths. | |
""" | |
vocab = Dictionary() | |
vocab.add_symbol("he") | |
vocab.add_symbol("llo_EOW") | |
vocab.add_symbol("how_EOW") | |
vocab.add_symbol("are_EOW") | |
vocab.add_symbol("y") | |
vocab.add_symbol("ou_EOW") | |
vocab.add_symbol("n") | |
vocab.add_symbol("ew_EOW") | |
vocab.add_symbol("or") | |
vocab.add_symbol("k_EOW") | |
src_tokens = [ | |
["he", "llo_EOW", "n", "ew_EOW", "y", "or", "k_EOW"], | |
["how_EOW", "are_EOW", "y", "ou_EOW"], | |
] | |
x, src_lengths = x, src_lengths = self._convert_src_tokens_to_tensor( | |
vocab=vocab, src_tokens=src_tokens, append_eos=append_eos | |
) | |
return vocab, x, src_lengths | |
def _get_test_data_with_word_vocab(self, append_eos=True): | |
""" | |
Args: | |
append_eos: if True, each input sentence in the source tokens tensor | |
will have an EOS appended to the end. | |
Returns: | |
vocabs: word vocab | |
x: input tensor containing numberized source tokens, with EOS at the | |
end if append_eos is true | |
src_lengths: and source lengths. | |
""" | |
vocab = Dictionary() | |
vocab.add_symbol("hello") | |
vocab.add_symbol("how") | |
vocab.add_symbol("are") | |
vocab.add_symbol("you") | |
vocab.add_symbol("new") | |
vocab.add_symbol("york") | |
src_tokens = [ | |
["hello", "new", "york", "you"], | |
["how", "are", "you", "new", "york"], | |
] | |
x, src_lengths = self._convert_src_tokens_to_tensor( | |
vocab=vocab, src_tokens=src_tokens, append_eos=append_eos | |
) | |
return vocab, x, src_lengths | |
def _convert_src_tokens_to_tensor( | |
self, vocab: Dictionary, src_tokens: List[List[str]], append_eos: bool | |
): | |
src_len = [len(x) for x in src_tokens] | |
# If we have to append EOS, we include EOS in counting src length | |
if append_eos: | |
src_len = [length + 1 for length in src_len] | |
x = torch.LongTensor(len(src_tokens), max(src_len)).fill_(vocab.pad()) | |
for i in range(len(src_tokens)): | |
for j in range(len(src_tokens[i])): | |
x[i][j] = vocab.index(src_tokens[i][j]) | |
if append_eos: | |
x[i][j + 1] = vocab.eos() | |
x = x.transpose(1, 0) | |
return x, torch.LongTensor(src_len) | |
def assert_eos_at_end(self, x, x_len, eos): | |
"""Asserts last token of every sentence in x is EOS """ | |
for i in range(len(x_len)): | |
self.assertEqual( | |
x[x_len[i] - 1][i], | |
eos, | |
( | |
"Expected eos (token id {eos}) at the end of sentence {i} " | |
"but got {other} instead" | |
).format(i=i, eos=eos, other=x[i][-1]), | |
) | |
def assert_word_dropout_correct(self, x, x_noised, x_len, l_noised): | |
# Expect only the first word (2 bpe tokens) of the first example | |
# was dropped out | |
self.assertEqual(x_len[0] - 2, l_noised[0]) | |
for i in range(l_noised[0]): | |
self.assertEqual(x_noised[i][0], x[i + 2][0]) | |
def test_word_dropout_with_eos(self): | |
vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=True) | |
with data_utils.numpy_seed(1234): | |
noising_gen = noising.WordDropout(vocab) | |
x_noised, l_noised = noising_gen.noising(x, x_len, 0.2) | |
self.assert_word_dropout_correct( | |
x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised | |
) | |
self.assert_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) | |
def assert_word_blanking_correct(self, x, x_noised, x_len, l_noised, unk): | |
# Expect only the first word (2 bpe tokens) of the first example | |
# was blanked out | |
self.assertEqual(x_len[0], l_noised[0]) | |
for i in range(l_noised[0]): | |
if i < 2: | |
self.assertEqual(x_noised[i][0], unk) | |
else: | |
self.assertEqual(x_noised[i][0], x[i][0]) | |
def test_word_blank_with_eos(self): | |
vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=True) | |
with data_utils.numpy_seed(1234): | |
noising_gen = noising.WordDropout(vocab) | |
x_noised, l_noised = noising_gen.noising(x, x_len, 0.2, vocab.unk()) | |
self.assert_word_blanking_correct( | |
x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised, unk=vocab.unk() | |
) | |
self.assert_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) | |
def generate_unchanged_shuffle_map(self, length): | |
return {i: i for i in range(length)} | |
def assert_word_shuffle_matches_expected( | |
self, | |
x, | |
x_len, | |
max_shuffle_distance: int, | |
vocab: Dictionary, | |
expected_shufle_maps: List[Dict[int, int]], | |
expect_eos_at_end: bool, | |
bpe_end_marker=None, | |
): | |
""" | |
This verifies that with a given x, x_len, max_shuffle_distance, and | |
vocab, we get the expected shuffle result. | |
Args: | |
x: Tensor of shape (T x B) = (sequence_length, batch_size) | |
x_len: Tensor of length B = batch_size | |
max_shuffle_distance: arg to pass to noising | |
expected_shuffle_maps: List[mapping] where mapping is a | |
Dict[old_index, new_index], mapping x's elements from their | |
old positions in x to their new positions in x. | |
expect_eos_at_end: if True, check the output to make sure there is | |
an EOS at the end. | |
bpe_end_marker: str denoting the BPE end token. If this is not None, we | |
set the BPE cont token to None in the noising classes. | |
""" | |
bpe_cont_marker = None | |
if bpe_end_marker is None: | |
bpe_cont_marker = "@@" | |
with data_utils.numpy_seed(1234): | |
word_shuffle = noising.WordShuffle( | |
vocab, bpe_cont_marker=bpe_cont_marker, bpe_end_marker=bpe_end_marker | |
) | |
x_noised, l_noised = word_shuffle.noising( | |
x, x_len, max_shuffle_distance=max_shuffle_distance | |
) | |
# For every example, we have a different expected shuffle map. We check | |
# that each example is shuffled as expected according to each | |
# corresponding shuffle map. | |
for i in range(len(expected_shufle_maps)): | |
shuffle_map = expected_shufle_maps[i] | |
for k, v in shuffle_map.items(): | |
self.assertEqual(x[k][i], x_noised[v][i]) | |
# Shuffling should not affect the length of each example | |
for pre_shuffle_length, post_shuffle_length in zip(x_len, l_noised): | |
self.assertEqual(pre_shuffle_length, post_shuffle_length) | |
if expect_eos_at_end: | |
self.assert_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) | |
def test_word_shuffle_with_eos(self): | |
vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=True) | |
# Assert word shuffle with max shuffle distance 0 causes input to be | |
# unchanged | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
max_shuffle_distance=0, | |
vocab=vocab, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(example_len) | |
for example_len in x_len | |
], | |
expect_eos_at_end=True, | |
) | |
# Assert word shuffle with max shuffle distance 3 matches our expected | |
# shuffle order | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
vocab=vocab, | |
max_shuffle_distance=3, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(x_len[0]), | |
{0: 0, 1: 3, 2: 1, 3: 2}, | |
], | |
expect_eos_at_end=True, | |
) | |
def test_word_shuffle_with_eos_nonbpe(self): | |
"""The purpose of this is to test shuffling logic with word vocabs""" | |
vocab, x, x_len = self._get_test_data_with_word_vocab(append_eos=True) | |
# Assert word shuffle with max shuffle distance 0 causes input to be | |
# unchanged | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
max_shuffle_distance=0, | |
vocab=vocab, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(example_len) | |
for example_len in x_len | |
], | |
expect_eos_at_end=True, | |
) | |
# Assert word shuffle with max shuffle distance 3 matches our expected | |
# shuffle order | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
vocab=vocab, | |
max_shuffle_distance=3, | |
expected_shufle_maps=[ | |
{0: 0, 1: 1, 2: 3, 3: 2}, | |
{0: 0, 1: 2, 2: 1, 3: 3, 4: 4}, | |
], | |
expect_eos_at_end=True, | |
) | |
def test_word_shuffle_without_eos(self): | |
"""Same result as word shuffle with eos except no EOS at end""" | |
vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=False) | |
# Assert word shuffle with max shuffle distance 0 causes input to be | |
# unchanged | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
max_shuffle_distance=0, | |
vocab=vocab, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(example_len) | |
for example_len in x_len | |
], | |
expect_eos_at_end=False, | |
) | |
# Assert word shuffle with max shuffle distance 3 matches our expected | |
# shuffle order | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
vocab=vocab, | |
max_shuffle_distance=3, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(x_len[0]), | |
{0: 0, 1: 3, 2: 1, 3: 2}, | |
], | |
expect_eos_at_end=False, | |
) | |
def test_word_shuffle_without_eos_with_bpe_end_marker(self): | |
"""Same result as word shuffle without eos except using BPE end token""" | |
vocab, x, x_len = self._get_test_data_with_bpe_end_marker(append_eos=False) | |
# Assert word shuffle with max shuffle distance 0 causes input to be | |
# unchanged | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
max_shuffle_distance=0, | |
vocab=vocab, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(example_len) | |
for example_len in x_len | |
], | |
expect_eos_at_end=False, | |
bpe_end_marker="_EOW", | |
) | |
# Assert word shuffle with max shuffle distance 3 matches our expected | |
# shuffle order | |
self.assert_word_shuffle_matches_expected( | |
x=x, | |
x_len=x_len, | |
vocab=vocab, | |
max_shuffle_distance=3, | |
expected_shufle_maps=[ | |
self.generate_unchanged_shuffle_map(x_len[0]), | |
{0: 0, 1: 3, 2: 1, 3: 2}, | |
], | |
expect_eos_at_end=False, | |
bpe_end_marker="_EOW", | |
) | |
def assert_no_eos_at_end(self, x, x_len, eos): | |
"""Asserts that the last token of each sentence in x is not EOS """ | |
for i in range(len(x_len)): | |
self.assertNotEqual( | |
x[x_len[i] - 1][i], | |
eos, | |
"Expected no eos (token id {eos}) at the end of sentence {i}.".format( | |
eos=eos, i=i | |
), | |
) | |
def test_word_dropout_without_eos(self): | |
"""Same result as word dropout with eos except no EOS at end""" | |
vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=False) | |
with data_utils.numpy_seed(1234): | |
noising_gen = noising.WordDropout(vocab) | |
x_noised, l_noised = noising_gen.noising(x, x_len, 0.2) | |
self.assert_word_dropout_correct( | |
x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised | |
) | |
self.assert_no_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) | |
def test_word_blank_without_eos(self): | |
"""Same result as word blank with eos except no EOS at end""" | |
vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=False) | |
with data_utils.numpy_seed(1234): | |
noising_gen = noising.WordDropout(vocab) | |
x_noised, l_noised = noising_gen.noising(x, x_len, 0.2, vocab.unk()) | |
self.assert_word_blanking_correct( | |
x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised, unk=vocab.unk() | |
) | |
self.assert_no_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) | |
def _get_noising_dataset_batch( | |
self, | |
src_tokens_no_pad, | |
src_dict, | |
append_eos_to_tgt=False, | |
): | |
""" | |
Constructs a NoisingDataset and the corresponding | |
``LanguagePairDataset(NoisingDataset(src), src)``. If | |
*append_eos_to_tgt* is True, wrap the source dataset in | |
:class:`TransformEosDataset` to append EOS to the clean source when | |
using it as the target. | |
""" | |
src_dataset = test_utils.TestDataset(data=src_tokens_no_pad) | |
noising_dataset = noising.NoisingDataset( | |
src_dataset=src_dataset, | |
src_dict=src_dict, | |
seed=1234, | |
max_word_shuffle_distance=3, | |
word_dropout_prob=0.2, | |
word_blanking_prob=0.2, | |
noising_class=noising.UnsupervisedMTNoising, | |
) | |
tgt = src_dataset | |
language_pair_dataset = LanguagePairDataset( | |
src=noising_dataset, tgt=tgt, src_sizes=None, src_dict=src_dict | |
) | |
language_pair_dataset = TransformEosDataset( | |
language_pair_dataset, | |
src_dict.eos(), | |
append_eos_to_tgt=append_eos_to_tgt, | |
) | |
dataloader = torch.utils.data.DataLoader( | |
dataset=language_pair_dataset, | |
batch_size=2, | |
collate_fn=language_pair_dataset.collater, | |
) | |
denoising_batch_result = next(iter(dataloader)) | |
return denoising_batch_result | |
def test_noising_dataset_with_eos(self): | |
src_dict, src_tokens, _ = self._get_test_data_with_bpe_cont_marker( | |
append_eos=True | |
) | |
# Format data for src_dataset | |
src_tokens = torch.t(src_tokens) | |
src_tokens_no_pad = [] | |
for src_sentence in src_tokens: | |
src_tokens_no_pad.append( | |
utils.strip_pad(tensor=src_sentence, pad=src_dict.pad()) | |
) | |
denoising_batch_result = self._get_noising_dataset_batch( | |
src_tokens_no_pad=src_tokens_no_pad, src_dict=src_dict | |
) | |
eos, pad = src_dict.eos(), src_dict.pad() | |
# Generated noisy source as source | |
expected_src = torch.LongTensor( | |
[[4, 5, 10, 11, 8, 12, 13, eos], [pad, pad, pad, 6, 8, 9, 7, eos]] | |
) | |
# Original clean source as target (right-padded) | |
expected_tgt = torch.LongTensor( | |
[[4, 5, 10, 11, 8, 12, 13, eos], [6, 7, 8, 9, eos, pad, pad, pad]] | |
) | |
generated_src = denoising_batch_result["net_input"]["src_tokens"] | |
tgt_tokens = denoising_batch_result["target"] | |
self.assertTensorEqual(expected_src, generated_src) | |
self.assertTensorEqual(expected_tgt, tgt_tokens) | |
def test_noising_dataset_without_eos(self): | |
""" | |
Similar to test noising dataset with eos except that we have to set | |
*append_eos_to_tgt* to ``True``. | |
""" | |
src_dict, src_tokens, _ = self._get_test_data_with_bpe_cont_marker( | |
append_eos=False | |
) | |
# Format data for src_dataset | |
src_tokens = torch.t(src_tokens) | |
src_tokens_no_pad = [] | |
for src_sentence in src_tokens: | |
src_tokens_no_pad.append( | |
utils.strip_pad(tensor=src_sentence, pad=src_dict.pad()) | |
) | |
denoising_batch_result = self._get_noising_dataset_batch( | |
src_tokens_no_pad=src_tokens_no_pad, | |
src_dict=src_dict, | |
append_eos_to_tgt=True, | |
) | |
eos, pad = src_dict.eos(), src_dict.pad() | |
# Generated noisy source as source | |
expected_src = torch.LongTensor( | |
[[4, 5, 10, 11, 8, 12, 13], [pad, pad, pad, 6, 8, 9, 7]] | |
) | |
# Original clean source as target (right-padded) | |
expected_tgt = torch.LongTensor( | |
[[4, 5, 10, 11, 8, 12, 13, eos], [6, 7, 8, 9, eos, pad, pad, pad]] | |
) | |
generated_src = denoising_batch_result["net_input"]["src_tokens"] | |
tgt_tokens = denoising_batch_result["target"] | |
self.assertTensorEqual(expected_src, generated_src) | |
self.assertTensorEqual(expected_tgt, tgt_tokens) | |
def assertTensorEqual(self, t1, t2): | |
self.assertEqual(t1.size(), t2.size(), "size mismatch") | |
self.assertEqual(t1.ne(t2).long().sum(), 0) | |
if __name__ == "__main__": | |
unittest.main() | |