|
import json |
|
import random |
|
import re |
|
from dataclasses import dataclass, field |
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
import torchaudio |
|
import transformers |
|
import datasets |
|
from datasets import ClassLabel, load_dataset, load_metric |
|
from transformers import (Trainer, TrainingArguments, Wav2Vec2CTCTokenizer, |
|
Wav2Vec2FeatureExtractor, Wav2Vec2ForCTC, |
|
Wav2Vec2Processor) |
|
|
|
import argparse |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--model', type=str, default="facebook/wav2vec2-xls-r-300m") |
|
parser.add_argument('--unfreeze', action='store_true') |
|
parser.add_argument('--lr', type=float, default=3e-4) |
|
parser.add_argument('--warmup', type=float, default=500) |
|
args = parser.parse_args() |
|
|
|
|
|
print(f"args: {args}") |
|
|
|
common_voice_train = datasets.load_dataset("mozilla-foundation/common_voice_8_0", "zh-HK", split="train+validation", use_auth_token=True) |
|
common_voice_test = datasets.load_dataset("mozilla-foundation/common_voice_8_0", "zh-HK", split="test[:10%]", use_auth_token=True) |
|
|
|
|
|
|
|
|
|
unused_cols = ["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"] |
|
common_voice_train = common_voice_train.remove_columns(unused_cols) |
|
common_voice_test = common_voice_test.remove_columns(unused_cols) |
|
|
|
chars_to_ignore_regex = '[\丶\,\?\.\!\-\;\:"\“\%\‘\”\�\.\⋯\!\-\:\–\。\》\,\)\,\?\;\~\~\…\︰\,\(\」\‧\《\﹔\、\—\/\,\「\﹖\·\']' |
|
|
|
import string |
|
def remove_special_characters(batch): |
|
sen = re.sub(chars_to_ignore_regex, '', batch["sentence"]).lower() + " " |
|
|
|
|
|
|
|
if "d" in sen: |
|
if len([c for c in sen if c in string.ascii_lowercase]) == 1: |
|
sen = sen.replace("d", "啲") |
|
batch["sentence"] = sen |
|
return batch |
|
|
|
common_voice_train = common_voice_train.map(remove_special_characters) |
|
common_voice_test = common_voice_test.map(remove_special_characters) |
|
|
|
def extract_all_chars(batch): |
|
all_text = " ".join(batch["sentence"]) |
|
vocab = list(set(all_text)) |
|
return {"vocab": [vocab], "all_text": [all_text]} |
|
|
|
vocab_train = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_train.column_names,) |
|
vocab_test = common_voice_test.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_test.column_names,) |
|
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0])) |
|
vocab_list = [char for char in vocab_list if not char.isascii()] |
|
vocab_list.append(" ") |
|
|
|
vocab_dict = {v: k for k, v in enumerate(vocab_list)} |
|
vocab_dict["|"] = vocab_dict[" "] |
|
del vocab_dict[" "] |
|
|
|
vocab_dict["[UNK]"] = len(vocab_dict) |
|
vocab_dict["[PAD]"] = len(vocab_dict) |
|
|
|
with open("vocab.json", "w") as vocab_file: |
|
json.dump(vocab_dict, vocab_file) |
|
|
|
tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|") |
|
|
|
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True,) |
|
|
|
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer) |
|
processor.save_pretrained("./finetuned-wav2vec2-xls-r-300m-cantonese") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
common_voice_train = common_voice_train.cast_column('audio', datasets.features.Audio(sampling_rate=feature_extractor.sampling_rate)) |
|
common_voice_test = common_voice_test.cast_column('audio', datasets.features.Audio(sampling_rate=feature_extractor.sampling_rate)) |
|
|
|
|
|
def prepare_dataset(batch): |
|
batch["input_values"] = processor(batch["array"], sampling_rate=batch["sampling_rate"][0]).input_values |
|
with processor.as_target_processor(): |
|
batch["labels"] = processor(batch["target_text"]).input_ids |
|
return batch |
|
|
|
print(common_voice_train[0]['audio']) |
|
|
|
common_voice_train = common_voice_train.map(prepare_dataset, remove_columns=common_voice_train.column_names, batched=True,) |
|
common_voice_test = common_voice_test.map(prepare_dataset, remove_columns=common_voice_test.column_names, batched=True,) |
|
|
|
|
|
@dataclass |
|
class DataCollatorCTCWithPadding: |
|
""" |
|
Data collator that will dynamically pad the inputs received. |
|
Args: |
|
processor (:class:`~transformers.Wav2Vec2Processor`) |
|
The processor used for proccessing the data. |
|
padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`): |
|
Select a strategy to pad the returned sequences (according to the model's padding side and padding index) |
|
among: |
|
* :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single |
|
sequence if provided). |
|
* :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the |
|
maximum acceptable input length for the model if that argument is not provided. |
|
* :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of |
|
different lengths). |
|
max_length (:obj:`int`, `optional`): |
|
Maximum length of the ``input_values`` of the returned list and optionally padding length (see above). |
|
max_length_labels (:obj:`int`, `optional`): |
|
Maximum length of the ``labels`` returned list and optionally padding length (see above). |
|
pad_to_multiple_of (:obj:`int`, `optional`): |
|
If set will pad the sequence to a multiple of the provided value. |
|
This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >= |
|
7.5 (Volta). |
|
""" |
|
|
|
processor: Wav2Vec2Processor |
|
padding: Union[bool, str] = True |
|
max_length: Optional[int] = None |
|
max_length_labels: Optional[int] = None |
|
pad_to_multiple_of: Optional[int] = None |
|
pad_to_multiple_of_labels: Optional[int] = None |
|
|
|
def __call__( |
|
self, features: List[Dict[str, Union[List[int], torch.Tensor]]] |
|
) -> Dict[str, torch.Tensor]: |
|
|
|
|
|
input_features = [ |
|
{"input_values": feature["input_values"]} for feature in features |
|
] |
|
label_features = [{"input_ids": feature["labels"]} for feature in features] |
|
|
|
batch = self.processor.pad( |
|
input_features, |
|
padding=self.padding, |
|
max_length=self.max_length, |
|
pad_to_multiple_of=self.pad_to_multiple_of, |
|
return_tensors="pt", |
|
) |
|
with self.processor.as_target_processor(): |
|
labels_batch = self.processor.pad( |
|
label_features, |
|
padding=self.padding, |
|
max_length=self.max_length_labels, |
|
pad_to_multiple_of=self.pad_to_multiple_of_labels, |
|
return_tensors="pt", |
|
) |
|
|
|
|
|
labels = labels_batch["input_ids"].masked_fill( |
|
labels_batch.attention_mask.ne(1), -100 |
|
) |
|
|
|
batch["labels"] = labels |
|
|
|
return batch |
|
|
|
|
|
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_metrics(pred): |
|
pred_logits = pred.predictions |
|
pred_ids = np.argmax(pred_logits, axis=-1) |
|
|
|
pred.label_ids[pred.label_ids == -100] = tokenizer.pad_token_id |
|
|
|
pred_str = tokenizer.batch_decode(pred_ids) |
|
|
|
label_str = tokenizer.batch_decode(pred.label_ids, group_tokens=False) |
|
|
|
metrics = {k: v.compute(predictions=pred_str, references=label_str) for k, v in eval_metrics.items()} |
|
|
|
return metrics |
|
|
|
model = Wav2Vec2ForCTC.from_pretrained( |
|
args.model, |
|
attention_dropout=0.1, |
|
hidden_dropout=0.1, |
|
feat_proj_dropout=0.0, |
|
mask_time_prob=0.05, |
|
layerdrop=0.1, |
|
gradient_checkpointing=True, |
|
ctc_loss_reduction="mean", |
|
pad_token_id=processor.tokenizer.pad_token_id, |
|
vocab_size=len(processor.tokenizer), |
|
) |
|
|
|
if not args.unfreeze: |
|
model.freeze_feature_extractor() |
|
|
|
training_args = TrainingArguments( |
|
output_dir="./finetuned-wav2vec2-xls-r-300m-cantonese/wav2vec2-xls-r-300m-cantonese", |
|
group_by_length=True, |
|
per_device_train_batch_size=8, |
|
gradient_accumulation_steps=2, |
|
|
|
evaluation_strategy="steps", |
|
|
|
eval_steps=400, |
|
|
|
num_train_epochs=1, |
|
fp16=True, |
|
fp16_backend="amp", |
|
logging_strategy="steps", |
|
logging_steps=400, |
|
|
|
learning_rate=args.lr, |
|
warmup_steps=100, |
|
save_steps=2376, |
|
|
|
save_total_limit=3, |
|
|
|
|
|
dataloader_num_workers=20, |
|
) |
|
|
|
trainer = Trainer( |
|
model=model, |
|
data_collator=data_collator, |
|
args=training_args, |
|
compute_metrics=compute_metrics, |
|
train_dataset=common_voice_train, |
|
eval_dataset=common_voice_test, |
|
tokenizer=processor.feature_extractor, |
|
) |
|
trainer.train() |