|
|
|
import sys |
|
import torch |
|
import logging |
|
import gradio as gr |
|
import speechbrain as sb |
|
from pathlib import Path |
|
import os |
|
import torchaudio |
|
from hyperpyyaml import load_hyperpyyaml |
|
from speechbrain.tokenizers.SentencePiece import SentencePiece |
|
from speechbrain.utils.data_utils import undo_padding |
|
from speechbrain.utils.distributed import run_on_main |
|
|
|
"""Recipe for training a sequence-to-sequence ASR system with CommonVoice. |
|
The system employs a wav2vec2 encoder and a CTC decoder. |
|
Decoding is performed with greedy decoding (will be extended to beam search). |
|
|
|
To run this recipe, do the following: |
|
> python train_with_wav2vec2.py hparams/train_with_wav2vec2.yaml |
|
|
|
With the default hyperparameters, the system employs a pretrained wav2vec2 encoder. |
|
The wav2vec2 model is pretrained following the model given in the hprams file. |
|
It may be dependent on the language. |
|
|
|
The neural network is trained with CTC on sub-word units estimated with |
|
Byte Pairwise Encoding (BPE). |
|
|
|
The experiment file is flexible enough to support a large variety of |
|
different systems. By properly changing the parameter files, you can try |
|
different encoders, decoders, tokens (e.g, characters instead of BPE), |
|
training languages (all CommonVoice languages), and many |
|
other possible variations. |
|
|
|
Authors |
|
* Titouan Parcollet 2021 |
|
""" |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
class ASR(sb.core.Brain): |
|
def compute_forward(self, batch, stage): |
|
"""Forward computations from the waveform batches to the output probabilities.""" |
|
|
|
batch = batch.to(self.device) |
|
wavs, wav_lens = batch.sig |
|
wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) |
|
if stage == sb.Stage.TRAIN: |
|
if hasattr(self.hparams, "augmentation"): |
|
wavs = self.hparams.augmentation(wavs, wav_lens) |
|
|
|
|
|
feats = self.modules.wav2vec2(wavs, wav_lens) |
|
x = self.modules.enc(feats) |
|
logits = self.modules.ctc_lin(x) |
|
p_ctc = self.hparams.log_softmax(logits) |
|
|
|
return p_ctc, wav_lens |
|
|
|
def treat_wav(self,sig): |
|
feats = self.modules.wav2vec2(sig.to("cpu"), torch.tensor([1]).to("cpu")) |
|
feats = self.modules.enc(feats) |
|
logits = self.modules.ctc_lin(feats) |
|
p_ctc = self.hparams.log_softmax(logits) |
|
predicted_words =[] |
|
for logs in p_ctc: |
|
text = decoder.decode(logs.detach().cpu().numpy()) |
|
predicted_words.append(text.split(" ")) |
|
return " ".join(predicted_words[0]) |
|
|
|
def compute_objectives(self, predictions, batch, stage): |
|
"""Computes the loss (CTC) given predictions and targets.""" |
|
|
|
p_ctc, wav_lens = predictions |
|
|
|
ids = batch.id |
|
tokens, tokens_lens = batch.tokens |
|
|
|
loss = self.hparams.ctc_cost(p_ctc, tokens, wav_lens, tokens_lens) |
|
|
|
if stage != sb.Stage.TRAIN: |
|
predicted_tokens = sb.decoders.ctc_greedy_decode( |
|
p_ctc, wav_lens, blank_id=self.hparams.blank_index |
|
) |
|
|
|
if self.hparams.use_language_modelling: |
|
predicted_words = [] |
|
for logs in p_ctc: |
|
text = decoder.decode(logs.detach().cpu().numpy()) |
|
predicted_words.append(text.split(" ")) |
|
else: |
|
predicted_words = [ |
|
"".join(self.tokenizer.decode_ndim(utt_seq)).split(" ") |
|
for utt_seq in predicted_tokens |
|
] |
|
|
|
target_words = [wrd.split(" ") for wrd in batch.wrd] |
|
|
|
self.wer_metric.append(ids, predicted_words, target_words) |
|
self.cer_metric.append(ids, predicted_words, target_words) |
|
|
|
return loss |
|
|
|
def fit_batch(self, batch): |
|
"""Train the parameters given a single batch in input""" |
|
should_step = self.step % self.grad_accumulation_factor == 0 |
|
|
|
|
|
|
|
if self.auto_mix_prec: |
|
with torch.cuda.amp.autocast(): |
|
with self.no_sync(): |
|
outputs = self.compute_forward(batch, sb.Stage.TRAIN) |
|
loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) |
|
with self.no_sync(not should_step): |
|
self.scaler.scale( |
|
loss / self.grad_accumulation_factor |
|
).backward() |
|
if should_step: |
|
|
|
if not self.hparams.wav2vec2.freeze: |
|
self.scaler.unscale_(self.wav2vec_optimizer) |
|
self.scaler.unscale_(self.model_optimizer) |
|
if self.check_gradients(loss): |
|
if not self.hparams.wav2vec2.freeze: |
|
if self.optimizer_step >= self.hparams.warmup_steps: |
|
self.scaler.step(self.wav2vec_optimizer) |
|
self.scaler.step(self.model_optimizer) |
|
self.scaler.update() |
|
self.zero_grad() |
|
self.optimizer_step += 1 |
|
else: |
|
|
|
|
|
with self.no_sync(): |
|
outputs = self.compute_forward(batch, sb.Stage.TRAIN) |
|
|
|
loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) |
|
|
|
with self.no_sync(not should_step): |
|
(loss / self.grad_accumulation_factor).backward() |
|
if should_step: |
|
if self.check_gradients(loss): |
|
if not self.hparams.wav2vec2.freeze: |
|
if self.optimizer_step >= self.hparams.warmup_steps: |
|
self.wav2vec_optimizer.step() |
|
self.model_optimizer.step() |
|
self.zero_grad() |
|
self.optimizer_step += 1 |
|
|
|
self.on_fit_batch_end(batch, outputs, loss, should_step) |
|
return loss.detach().cpu() |
|
|
|
def evaluate_batch(self, batch, stage): |
|
"""Computations needed for validation/test batches""" |
|
predictions = self.compute_forward(batch, stage=stage) |
|
with torch.no_grad(): |
|
loss = self.compute_objectives(predictions, batch, stage=stage) |
|
return loss.detach() |
|
|
|
def on_stage_start(self, stage, epoch): |
|
"""Gets called at the beginning of each epoch""" |
|
if stage != sb.Stage.TRAIN: |
|
self.cer_metric = self.hparams.cer_computer() |
|
self.wer_metric = self.hparams.error_rate_computer() |
|
|
|
def on_stage_end(self, stage, stage_loss, epoch): |
|
"""Gets called at the end of an epoch.""" |
|
|
|
stage_stats = {"loss": stage_loss} |
|
if stage == sb.Stage.TRAIN: |
|
self.train_stats = stage_stats |
|
else: |
|
stage_stats["CER"] = self.cer_metric.summarize("error_rate") |
|
stage_stats["WER"] = self.wer_metric.summarize("error_rate") |
|
|
|
|
|
if stage == sb.Stage.VALID: |
|
old_lr_model, new_lr_model = self.hparams.lr_annealing_model( |
|
stage_stats["loss"] |
|
) |
|
old_lr_wav2vec, new_lr_wav2vec = self.hparams.lr_annealing_wav2vec( |
|
stage_stats["loss"] |
|
) |
|
sb.nnet.schedulers.update_learning_rate( |
|
self.model_optimizer, new_lr_model |
|
) |
|
if not self.hparams.wav2vec2.freeze: |
|
sb.nnet.schedulers.update_learning_rate( |
|
self.wav2vec_optimizer, new_lr_wav2vec |
|
) |
|
self.hparams.train_logger.log_stats( |
|
stats_meta={ |
|
"epoch": epoch, |
|
"lr_model": old_lr_model, |
|
"lr_wav2vec": old_lr_wav2vec, |
|
}, |
|
train_stats=self.train_stats, |
|
valid_stats=stage_stats, |
|
) |
|
self.checkpointer.save_and_keep_only( |
|
meta={"WER": stage_stats["WER"]}, min_keys=["WER"], |
|
) |
|
elif stage == sb.Stage.TEST: |
|
self.hparams.train_logger.log_stats( |
|
stats_meta={"Epoch loaded": self.hparams.epoch_counter.current}, |
|
test_stats=stage_stats, |
|
) |
|
with open(self.hparams.wer_file, "w") as w: |
|
self.wer_metric.write_stats(w) |
|
|
|
def init_optimizers(self): |
|
"Initializes the wav2vec2 optimizer and model optimizer" |
|
|
|
|
|
if not self.hparams.wav2vec2.freeze: |
|
self.wav2vec_optimizer = self.hparams.wav2vec_opt_class( |
|
self.modules.wav2vec2.parameters() |
|
) |
|
if self.checkpointer is not None: |
|
self.checkpointer.add_recoverable( |
|
"wav2vec_opt", self.wav2vec_optimizer |
|
) |
|
|
|
self.model_optimizer = self.hparams.model_opt_class( |
|
self.hparams.model.parameters() |
|
) |
|
|
|
if self.checkpointer is not None: |
|
self.checkpointer.add_recoverable("modelopt", self.model_optimizer) |
|
|
|
def zero_grad(self, set_to_none=False): |
|
if not self.hparams.wav2vec2.freeze: |
|
self.wav2vec_optimizer.zero_grad(set_to_none) |
|
self.model_optimizer.zero_grad(set_to_none) |
|
|
|
|
|
|
|
def dataio_prepare(hparams): |
|
"""This function prepares the datasets to be used in the brain class. |
|
It also defines the data processing pipeline through user-defined functions.""" |
|
|
|
|
|
data_folder = hparams["data_folder"] |
|
|
|
train_data = sb.dataio.dataset.DynamicItemDataset.from_csv( |
|
csv_path=hparams["train_csv"], replacements={"data_root": data_folder}, |
|
) |
|
|
|
if hparams["sorting"] == "ascending": |
|
|
|
train_data = train_data.filtered_sorted( |
|
sort_key="duration", |
|
key_max_value={"duration": hparams["avoid_if_longer_than"]}, |
|
) |
|
|
|
hparams["dataloader_options"]["shuffle"] = False |
|
|
|
elif hparams["sorting"] == "descending": |
|
train_data = train_data.filtered_sorted( |
|
sort_key="duration", |
|
reverse=True, |
|
key_max_value={"duration": hparams["avoid_if_longer_than"]}, |
|
) |
|
|
|
hparams["dataloader_options"]["shuffle"] = False |
|
|
|
elif hparams["sorting"] == "random": |
|
pass |
|
|
|
else: |
|
raise NotImplementedError( |
|
"sorting must be random, ascending or descending" |
|
) |
|
|
|
valid_data = sb.dataio.dataset.DynamicItemDataset.from_csv( |
|
csv_path=hparams["valid_csv"], replacements={"data_root": data_folder}, |
|
) |
|
|
|
valid_data = valid_data.filtered_sorted(sort_key="duration") |
|
test_datasets = {} |
|
for csv_file in hparams["test_csv"]: |
|
name = Path(csv_file).stem |
|
test_datasets[name] = sb.dataio.dataset.DynamicItemDataset.from_csv( |
|
csv_path=csv_file, replacements={"data_root": data_folder} |
|
) |
|
test_datasets[name] = test_datasets[name].filtered_sorted( |
|
sort_key="duration" |
|
) |
|
|
|
datasets = [train_data, valid_data] + [i for k, i in test_datasets.items()] |
|
|
|
|
|
|
|
@sb.utils.data_pipeline.takes("wav") |
|
@sb.utils.data_pipeline.provides("sig") |
|
def audio_pipeline(wav): |
|
info = torchaudio.info(wav) |
|
sig = sb.dataio.dataio.read_audio(wav) |
|
resampled = torchaudio.transforms.Resample( |
|
info.sample_rate, hparams["sample_rate"], |
|
)(sig) |
|
return resampled |
|
|
|
sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline) |
|
label_encoder = sb.dataio.encoder.CTCTextEncoder() |
|
|
|
|
|
@sb.utils.data_pipeline.takes("wrd") |
|
@sb.utils.data_pipeline.provides( |
|
"wrd", "char_list", "tokens_list", "tokens" |
|
) |
|
def text_pipeline(wrd): |
|
yield wrd |
|
char_list = list(wrd) |
|
yield char_list |
|
tokens_list = label_encoder.encode_sequence(char_list) |
|
yield tokens_list |
|
tokens = torch.LongTensor(tokens_list) |
|
yield tokens |
|
|
|
sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline) |
|
lab_enc_file = os.path.join(hparams["save_folder"], "label_encoder.txt") |
|
special_labels = { |
|
"blank_label": hparams["blank_index"], |
|
"unk_label": hparams["unk_index"] |
|
} |
|
label_encoder.load_or_create( |
|
path=lab_enc_file, |
|
from_didatasets=[train_data], |
|
output_key="char_list", |
|
special_labels=special_labels, |
|
sequence_input=True, |
|
) |
|
|
|
|
|
sb.dataio.dataset.set_output_keys( |
|
datasets, ["id", "sig", "wrd", "char_list", "tokens"], |
|
) |
|
return train_data, valid_data,test_datasets, label_encoder |
|
|
|
|
|
|
|
|
|
hparams_file, run_opts, overrides = sb.parse_arguments(["train_semi.yaml"]) |
|
with open(hparams_file) as fin: |
|
hparams = load_hyperpyyaml(fin, overrides) |
|
|
|
|
|
|
|
sb.utils.distributed.ddp_init_group(run_opts) |
|
|
|
|
|
|
|
sb.create_experiment_directory( |
|
experiment_directory=hparams["output_folder"], |
|
hyperparams_to_save=hparams_file, |
|
overrides=overrides, |
|
) |
|
|
|
|
|
|
|
|
|
label_encoder = sb.dataio.encoder.CTCTextEncoder() |
|
|
|
lab_enc_file = os.path.join(hparams["save_folder"], "label_encoder.txt") |
|
special_labels = { |
|
"blank_label": hparams["blank_index"], |
|
"unk_label": hparams["unk_index"] |
|
} |
|
label_encoder.load_or_create( |
|
path=lab_enc_file, |
|
from_didatasets=[[]], |
|
output_key="char_list", |
|
special_labels=special_labels, |
|
sequence_input=True, |
|
) |
|
|
|
from pyctcdecode import build_ctcdecoder |
|
ind2lab = label_encoder.ind2lab |
|
print(ind2lab) |
|
labels = [ind2lab[x] for x in range(len(ind2lab))] |
|
labels = [""] + labels[1:-1] + ["1"] |
|
|
|
print(labels) |
|
decoder = build_ctcdecoder( |
|
labels, |
|
kenlm_model_path=hparams["ngram_lm_path"], |
|
alpha=0.5, |
|
beta=1.0, |
|
) |
|
|
|
run_opts["device"] = "cpu" |
|
asr_brain = ASR( |
|
modules=hparams["modules"], |
|
hparams=hparams, |
|
run_opts=run_opts, |
|
checkpointer=hparams["checkpointer"], |
|
) |
|
|
|
|
|
asr_brain.tokenizer = label_encoder |
|
asr_brain.checkpointer.recover_if_possible(device="cpu") |
|
asr_brain.modules.eval() |
|
description = """This is a speechbrain-based Automatic Speech Recognition (ASR) model for Tunisian arabic. It outputs Tunisian Arabic transcriptions written in Arabic characters. |
|
This model outputs transcriptions in arabic alphabet only and performs poorly with sentences containing foreign words. However if you do need code-switching in your transcripts, i.e. foreign outputs in latin alphabet, you would better use the code switched model, available in another space from the same author. (https://huggingface.co/SalahZa/Code_Switched_Tunisian_Speech_Recognition) |
|
|
|
Run is done on CPU to keep it free in this space. This leads to quite long running times on long sequences. If for your project or research, you want to transcribe long sequences, you would better use the model directly from its page, some instructions for inference on a test set have been provided there. (https://huggingface.co/SalahZa/Tunisian_Automatic_Speech_Recognition). If you need help, feel free to drop an email here : [email protected] |
|
|
|
Authors : |
|
* [Salah Zaiem](https://fr.linkedin.com/in/salah-zaiem) |
|
* [Ahmed Amine Ben Aballah](https://www.linkedin.com/in/aabenz/) |
|
* [Ata Kaboudi](https://www.linkedin.com/in/ata-kaboudi-63365b1a8) |
|
* [Amir Kanoun](https://tn.linkedin.com/in/ahmed-amir-kanoun) |
|
|
|
More in-depth details and insights are available in a released preprint. Please find the paper [here](https://arxiv.org/abs/2309.11327). |
|
If you use or refer to this model, please cite : |
|
|
|
``` |
|
@misc{abdallah2023leveraging, |
|
title={Leveraging Data Collection and Unsupervised Learning for Code-switched Tunisian Arabic Automatic Speech Recognition}, |
|
author={Ahmed Amine Ben Abdallah and Ata Kabboudi and Amir Kanoun and Salah Zaiem}, |
|
year={2023}, |
|
eprint={2309.11327}, |
|
archivePrefix={arXiv}, |
|
primaryClass={eess.AS} |
|
} |
|
|
|
|
|
""" |
|
title = "Tunisian Speech Recognition" |
|
|
|
def treat_wav_file(file_mic,file_upload ,asr=asr_brain, device="cpu") : |
|
if (file_mic is not None) and (file_upload is not None): |
|
warn_output = "WARNING: You've uploaded an audio file and used the microphone. The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" |
|
wav = file_mic |
|
elif (file_mic is None) and (file_upload is None): |
|
return "ERROR: You have to either use the microphone or upload an audio file" |
|
elif file_mic is not None: |
|
wav = file_mic |
|
else: |
|
wav = file_upload |
|
info = torchaudio.info(wav) |
|
sr = info.sample_rate |
|
sig = sb.dataio.dataio.read_audio(wav) |
|
if len(sig.shape)>1 : |
|
sig = torch.mean(sig, dim=1) |
|
sig = torch.unsqueeze(sig, 0) |
|
tensor_wav = sig.to(device) |
|
resampled = torchaudio.functional.resample( tensor_wav, sr, 16000) |
|
sentence = asr.treat_wav(resampled) |
|
return sentence |
|
|
|
gr.Interface( |
|
title = title, |
|
description = description, |
|
fn=treat_wav_file, |
|
inputs=[gr.Audio(source="microphone", type='filepath', label = "record", optional = True), |
|
gr.Audio(source="upload", type='filepath', label="filein", optional=True)] |
|
,outputs="text").launch() |
|
|
|
|
|
|