JustinLin610
update
8437114
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import csv
from pathlib import Path
import zipfile
from functools import reduce
from multiprocessing import cpu_count
from typing import Any, Dict, List, Optional, Union
import io
import numpy as np
import pandas as pd
import sentencepiece as sp
from fairseq.data.audio.audio_utils import (
convert_waveform, _get_kaldi_fbank, _get_torchaudio_fbank, is_npy_data,
is_sf_audio_data
)
import torch
import soundfile as sf
from tqdm import tqdm
UNK_TOKEN, UNK_TOKEN_ID = "<unk>", 3
BOS_TOKEN, BOS_TOKEN_ID = "<s>", 0
EOS_TOKEN, EOS_TOKEN_ID = "</s>", 2
PAD_TOKEN, PAD_TOKEN_ID = "<pad>", 1
def gen_vocab(
input_path: Path, output_path_prefix: Path, model_type="bpe",
vocab_size=1000, special_symbols: Optional[List[str]] = None
):
# Train SentencePiece Model
arguments = [
f"--input={input_path.as_posix()}",
f"--model_prefix={output_path_prefix.as_posix()}",
f"--model_type={model_type}",
f"--vocab_size={vocab_size}",
"--character_coverage=1.0",
f"--num_threads={cpu_count()}",
f"--unk_id={UNK_TOKEN_ID}",
f"--bos_id={BOS_TOKEN_ID}",
f"--eos_id={EOS_TOKEN_ID}",
f"--pad_id={PAD_TOKEN_ID}",
]
if special_symbols is not None:
_special_symbols = ",".join(special_symbols)
arguments.append(f"--user_defined_symbols={_special_symbols}")
sp.SentencePieceTrainer.Train(" ".join(arguments))
# Export fairseq dictionary
spm = sp.SentencePieceProcessor()
spm.Load(output_path_prefix.as_posix() + ".model")
vocab = {i: spm.IdToPiece(i) for i in range(spm.GetPieceSize())}
assert (
vocab.get(UNK_TOKEN_ID) == UNK_TOKEN
and vocab.get(PAD_TOKEN_ID) == PAD_TOKEN
and vocab.get(BOS_TOKEN_ID) == BOS_TOKEN
and vocab.get(EOS_TOKEN_ID) == EOS_TOKEN
)
vocab = {
i: s
for i, s in vocab.items()
if s not in {UNK_TOKEN, BOS_TOKEN, EOS_TOKEN, PAD_TOKEN}
}
with open(output_path_prefix.as_posix() + ".txt", "w") as f_out:
for _, s in sorted(vocab.items(), key=lambda x: x[0]):
f_out.write(f"{s} 1\n")
def extract_fbank_features(
waveform: torch.FloatTensor,
sample_rate: int,
output_path: Optional[Path] = None,
n_mel_bins: int = 80,
overwrite: bool = False,
):
if output_path is not None and output_path.is_file() and not overwrite:
return
_waveform = convert_waveform(waveform, sample_rate, to_mono=True)
# Kaldi compliance: 16-bit signed integers
_waveform = _waveform * (2 ** 15)
_waveform = _waveform.numpy()
features = _get_kaldi_fbank(_waveform, sample_rate, n_mel_bins)
if features is None:
features = _get_torchaudio_fbank(_waveform, sample_rate, n_mel_bins)
if features is None:
raise ImportError(
"Please install pyKaldi or torchaudio to enable fbank feature extraction"
)
if output_path is not None:
np.save(output_path.as_posix(), features)
return features
def create_zip(data_root: Path, zip_path: Path):
paths = list(data_root.glob("*.npy"))
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_STORED) as f:
for path in tqdm(paths):
f.write(path, arcname=path.name)
def get_zip_manifest(
zip_path: Path, zip_root: Optional[Path] = None, is_audio=False
):
_zip_path = Path.joinpath(zip_root or Path(""), zip_path)
with zipfile.ZipFile(_zip_path, mode="r") as f:
info = f.infolist()
paths, lengths = {}, {}
for i in tqdm(info):
utt_id = Path(i.filename).stem
offset, file_size = i.header_offset + 30 + len(i.filename), i.file_size
paths[utt_id] = f"{zip_path.as_posix()}:{offset}:{file_size}"
with open(_zip_path, "rb") as f:
f.seek(offset)
byte_data = f.read(file_size)
assert len(byte_data) > 1
if is_audio:
assert is_sf_audio_data(byte_data), i
else:
assert is_npy_data(byte_data), i
byte_data_fp = io.BytesIO(byte_data)
if is_audio:
lengths[utt_id] = sf.info(byte_data_fp).frames
else:
lengths[utt_id] = np.load(byte_data_fp).shape[0]
return paths, lengths
def gen_config_yaml(
manifest_root: Path,
spm_filename: Optional[str] = None,
vocab_name: Optional[str] = None,
yaml_filename: str = "config.yaml",
specaugment_policy: Optional[str] = "lb",
prepend_tgt_lang_tag: bool = False,
sampling_alpha: Optional[float] = None,
input_channels: Optional[int] = 1,
input_feat_per_channel: Optional[int] = 80,
audio_root: str = "",
cmvn_type: str = "utterance",
gcmvn_path: Optional[Path] = None,
extra=None
):
manifest_root = manifest_root.absolute()
writer = S2TDataConfigWriter(manifest_root / yaml_filename)
assert spm_filename is not None or vocab_name is not None
vocab_name = spm_filename.replace(".model", ".txt") if vocab_name is None \
else vocab_name
writer.set_vocab_filename(vocab_name)
if input_channels is not None:
writer.set_input_channels(input_channels)
if input_feat_per_channel is not None:
writer.set_input_feat_per_channel(input_feat_per_channel)
specaugment_setters = {
"lb": writer.set_specaugment_lb_policy,
"ld": writer.set_specaugment_ld_policy,
"sm": writer.set_specaugment_sm_policy,
"ss": writer.set_specaugment_ss_policy,
}
specaugment_setter = specaugment_setters.get(specaugment_policy, None)
if specaugment_setter is not None:
specaugment_setter()
if spm_filename is not None:
writer.set_bpe_tokenizer(
{
"bpe": "sentencepiece",
"sentencepiece_model": (manifest_root / spm_filename).as_posix(),
}
)
if prepend_tgt_lang_tag:
writer.set_prepend_tgt_lang_tag(True)
if sampling_alpha is not None:
writer.set_sampling_alpha(sampling_alpha)
if cmvn_type not in ["global", "utterance"]:
raise NotImplementedError
if specaugment_policy is not None:
writer.set_feature_transforms(
"_train", [f"{cmvn_type}_cmvn", "specaugment"]
)
writer.set_feature_transforms("*", [f"{cmvn_type}_cmvn"])
if cmvn_type == "global":
if gcmvn_path is None:
raise ValueError("Please provide path of global cmvn file.")
else:
writer.set_global_cmvn(gcmvn_path.as_posix())
if len(audio_root) > 0:
writer.set_audio_root(audio_root)
if extra is not None:
writer.set_extra(extra)
writer.flush()
def load_df_from_tsv(path: Union[str, Path]) -> pd.DataFrame:
_path = path if isinstance(path, str) else path.as_posix()
return pd.read_csv(
_path,
sep="\t",
header=0,
encoding="utf-8",
escapechar="\\",
quoting=csv.QUOTE_NONE,
na_filter=False,
)
def save_df_to_tsv(dataframe, path: Union[str, Path]):
_path = path if isinstance(path, str) else path.as_posix()
dataframe.to_csv(
_path,
sep="\t",
header=True,
index=False,
encoding="utf-8",
escapechar="\\",
quoting=csv.QUOTE_NONE,
)
def load_tsv_to_dicts(path: Union[str, Path]) -> List[dict]:
with open(path, "r") as f:
reader = csv.DictReader(
f,
delimiter="\t",
quotechar=None,
doublequote=False,
lineterminator="\n",
quoting=csv.QUOTE_NONE,
)
rows = [dict(e) for e in reader]
return rows
def filter_manifest_df(
df, is_train_split=False, extra_filters=None, min_n_frames=5, max_n_frames=3000
):
filters = {
"no speech": df["audio"] == "",
f"short speech (<{min_n_frames} frames)": df["n_frames"] < min_n_frames,
"empty sentence": df["tgt_text"] == "",
}
if is_train_split:
filters[f"long speech (>{max_n_frames} frames)"] = df["n_frames"] > max_n_frames
if extra_filters is not None:
filters.update(extra_filters)
invalid = reduce(lambda x, y: x | y, filters.values())
valid = ~invalid
print(
"| "
+ ", ".join(f"{n}: {f.sum()}" for n, f in filters.items())
+ f", total {invalid.sum()} filtered, {valid.sum()} remained."
)
return df[valid]
def cal_gcmvn_stats(features_list):
features = np.concatenate(features_list)
square_sums = (features ** 2).sum(axis=0)
mean = features.mean(axis=0)
features = np.subtract(features, mean)
var = square_sums / features.shape[0] - mean ** 2
std = np.sqrt(np.maximum(var, 1e-8))
return {"mean": mean.astype("float32"), "std": std.astype("float32")}
class S2TDataConfigWriter(object):
DEFAULT_VOCAB_FILENAME = "dict.txt"
DEFAULT_INPUT_FEAT_PER_CHANNEL = 80
DEFAULT_INPUT_CHANNELS = 1
def __init__(self, yaml_path: Path):
try:
import yaml
except ImportError:
print("Please install PyYAML for S2T data config YAML files")
self.yaml = yaml
self.yaml_path = yaml_path
self.config = {}
def flush(self):
with open(self.yaml_path, "w") as f:
self.yaml.dump(self.config, f)
def set_audio_root(self, audio_root=""):
self.config["audio_root"] = audio_root
def set_vocab_filename(self, vocab_filename: str = "dict.txt"):
self.config["vocab_filename"] = vocab_filename
def set_specaugment(
self,
time_wrap_w: int,
freq_mask_n: int,
freq_mask_f: int,
time_mask_n: int,
time_mask_t: int,
time_mask_p: float,
):
self.config["specaugment"] = {
"time_wrap_W": time_wrap_w,
"freq_mask_N": freq_mask_n,
"freq_mask_F": freq_mask_f,
"time_mask_N": time_mask_n,
"time_mask_T": time_mask_t,
"time_mask_p": time_mask_p,
}
def set_specaugment_lb_policy(self):
self.set_specaugment(
time_wrap_w=0,
freq_mask_n=1,
freq_mask_f=27,
time_mask_n=1,
time_mask_t=100,
time_mask_p=1.0,
)
def set_specaugment_ld_policy(self):
self.set_specaugment(
time_wrap_w=0,
freq_mask_n=2,
freq_mask_f=27,
time_mask_n=2,
time_mask_t=100,
time_mask_p=1.0,
)
def set_specaugment_sm_policy(self):
self.set_specaugment(
time_wrap_w=0,
freq_mask_n=2,
freq_mask_f=15,
time_mask_n=2,
time_mask_t=70,
time_mask_p=0.2,
)
def set_specaugment_ss_policy(self):
self.set_specaugment(
time_wrap_w=0,
freq_mask_n=2,
freq_mask_f=27,
time_mask_n=2,
time_mask_t=70,
time_mask_p=0.2,
)
def set_input_channels(self, input_channels: int = 1):
self.config["input_channels"] = input_channels
def set_input_feat_per_channel(self, input_feat_per_channel: int = 80):
self.config["input_feat_per_channel"] = input_feat_per_channel
def set_bpe_tokenizer(self, bpe_tokenizer: Dict[str, Any]):
self.config["bpe_tokenizer"] = bpe_tokenizer
def set_global_cmvn(self, stats_npz_path: str):
self.config["global_cmvn"] = {"stats_npz_path": stats_npz_path}
def set_feature_transforms(self, split: str, transforms: List[str]):
if "transforms" not in self.config:
self.config["transforms"] = {}
self.config["transforms"][split] = transforms
def set_prepend_tgt_lang_tag(self, flag: bool = True):
self.config["prepend_tgt_lang_tag"] = flag
def set_sampling_alpha(self, sampling_alpha: float = 1.0):
self.config["sampling_alpha"] = sampling_alpha
def set_extra(self, data):
self.config.update(data)