SalahZa's picture
first commit
0d1350d
raw
history blame
12.5 kB
"""
Data preparation.
Download: https://voice.mozilla.org/en/datasets
Author
------
Titouan Parcollet
Luca Della Libera 2022
Pooneh Mousavi 2022
"""
from dataclasses import dataclass
import os
import csv
import re
import logging
import torchaudio
from tqdm import tqdm
import unicodedata
import functools
torchaudio.set_audio_backend("soundfile")
from speechbrain.utils.parallel import parallel_map
from speechbrain.dataio.dataio import read_audio_info
logger = logging.getLogger(__name__)
def prepare_common_voice(
data_folder,
save_folder,
train_tsv_file=None,
dev_tsv_file=None,
test_tsv_file=None,
accented_letters=False,
language="en",
skip_prep=False,
):
"""
Prepares the csv files for the Mozilla Common Voice dataset.
Download: https://voice.mozilla.org/en/datasets
Arguments
---------
data_folder : str
Path to the folder where the original Common Voice dataset is stored.
This path should include the lang: /datasets/CommonVoice/<language>/
save_folder : str
The directory where to store the csv files.
train_tsv_file : str, optional
Path to the Train Common Voice .tsv file (cs)
dev_tsv_file : str, optional
Path to the Dev Common Voice .tsv file (cs)
test_tsv_file : str, optional
Path to the Test Common Voice .tsv file (cs)
accented_letters : bool, optional
Defines if accented letters will be kept as individual letters or
transformed to the closest non-accented letters.
language: str
Specify the language for text normalization.
skip_prep: bool
If True, skip data preparation.
Example
-------
>>> from recipes.CommonVoice.common_voice_prepare import prepare_common_voice
>>> data_folder = '/datasets/CommonVoice/en'
>>> save_folder = 'exp/CommonVoice_exp'
>>> train_tsv_file = '/datasets/CommonVoice/en/train.tsv'
>>> dev_tsv_file = '/datasets/CommonVoice/en/dev.tsv'
>>> test_tsv_file = '/datasets/CommonVoice/en/test.tsv'
>>> accented_letters = False
>>> duration_threshold = 10
>>> prepare_common_voice( \
data_folder, \
save_folder, \
train_tsv_file, \
dev_tsv_file, \
test_tsv_file, \
accented_letters, \
language="en" \
)
"""
if skip_prep:
return
# If not specified point toward standard location w.r.t CommonVoice tree
if train_tsv_file is None:
train_tsv_file = data_folder + "/train.tsv"
else:
train_tsv_file = train_tsv_file
if dev_tsv_file is None:
dev_tsv_file = data_folder + "/dev.tsv"
else:
dev_tsv_file = dev_tsv_file
if test_tsv_file is None:
test_tsv_file = data_folder + "/test.tsv"
else:
test_tsv_file = test_tsv_file
# Setting the save folder
if not os.path.exists(save_folder):
os.makedirs(save_folder)
# Setting ouput files
save_csv_train = save_folder + "/train.csv"
save_csv_dev = save_folder + "/dev.csv"
save_csv_test = save_folder + "/test.csv"
# If csv already exists, we skip the data preparation
if skip(save_csv_train, save_csv_dev, save_csv_test):
msg = "%s already exists, skipping data preparation!" % (save_csv_train)
logger.info(msg)
msg = "%s already exists, skipping data preparation!" % (save_csv_dev)
logger.info(msg)
msg = "%s already exists, skipping data preparation!" % (save_csv_test)
logger.info(msg)
return
# Additional checks to make sure the data folder contains Common Voice
check_commonvoice_folders(data_folder)
# Creating csv files for {train, dev, test} data
file_pairs = zip(
[train_tsv_file, dev_tsv_file, test_tsv_file],
[save_csv_train, save_csv_dev, save_csv_test],
)
for tsv_file, save_csv in file_pairs:
create_csv(
tsv_file, save_csv, data_folder, accented_letters, language,
)
def skip(save_csv_train, save_csv_dev, save_csv_test):
"""
Detects if the Common Voice data preparation has been already done.
If the preparation has been done, we can skip it.
Returns
-------
bool
if True, the preparation phase can be skipped.
if False, it must be done.
"""
# Checking folders and save options
skip = False
if (
os.path.isfile(save_csv_train)
and os.path.isfile(save_csv_dev)
and os.path.isfile(save_csv_test)
):
skip = True
return skip
@dataclass
class CVRow:
snt_id: str
duration: float
mp3_path: str
spk_id: str
words: str
def process_line(line, data_folder, language, accented_letters):
# Path is at indice 1 in Common Voice tsv files. And .mp3 files
# are located in datasets/lang/clips/
mp3_path = data_folder + "/clips/" + line.split("\t")[1]
file_name = mp3_path.split(".")[-2].split("/")[-1]
spk_id = line.split("\t")[0]
snt_id = file_name
# Setting torchaudio backend to sox-io (needed to read mp3 files)
"""
if torchaudio.get_audio_backend() != "sox_io":
logger.warning("This recipe needs the sox-io backend of torchaudio")
logger.warning("The torchaudio backend is changed to sox_io")
torchaudio.set_audio_backend("sox_io")
"""
# Reading the signal (to retrieve duration in seconds)
if os.path.isfile(mp3_path):
info = read_audio_info(mp3_path)
else:
msg = "\tError loading: %s" % (str(len(file_name)))
logger.info(msg)
return None
duration = info.num_frames / info.sample_rate
# Getting transcript
words = line.split("\t")[2]
# Unicode Normalization
words = unicode_normalisation(words)
# !! Language specific cleaning !!
words = language_specific_preprocess(language, words)
# Remove accents if specified
if not accented_letters:
words = strip_accents(words)
words = words.replace("'", " ")
words = words.replace("’", " ")
# Remove multiple spaces
words = re.sub(" +", " ", words)
# Remove spaces at the beginning and the end of the sentence
words = words.lstrip().rstrip()
# Getting chars
chars = words.replace(" ", "_")
chars = " ".join([char for char in chars][:])
# Remove too short sentences (or empty):
if language in ["ja", "ch"]:
if len(chars) < 3:
return None
else:
if len(words.split(" ")) < 3:
return None
# Composition of the csv_line
return CVRow(snt_id, duration, mp3_path, spk_id, words)
def create_csv(
orig_tsv_file, csv_file, data_folder, accented_letters=False, language="en"
):
"""
Creates the csv file given a list of wav files.
Arguments
---------
orig_tsv_file : str
Path to the Common Voice tsv file (standard file).
data_folder : str
Path of the CommonVoice dataset.
accented_letters : bool, optional
Defines if accented letters will be kept as individual letters or
transformed to the closest non-accented letters.
Returns
-------
None
"""
# Check if the given files exists
if not os.path.isfile(orig_tsv_file):
msg = "\t%s doesn't exist, verify your dataset!" % (orig_tsv_file)
logger.info(msg)
raise FileNotFoundError(msg)
# We load and skip the header
loaded_csv = open(orig_tsv_file, "r").readlines()[1:]
nb_samples = len(loaded_csv)
msg = "Preparing CSV files for %s samples ..." % (str(nb_samples))
logger.info(msg)
# Adding some Prints
msg = "Creating csv lists in %s ..." % (csv_file)
logger.info(msg)
# Process and write lines
total_duration = 0.0
line_processor = functools.partial(
process_line,
data_folder=data_folder,
language=language,
accented_letters=accented_letters,
)
# Stream into a .tmp file, and rename it to the real path at the end.
csv_file_tmp = csv_file + ".tmp"
with open(csv_file_tmp, mode="w", encoding="utf-8") as csv_f:
csv_writer = csv.writer(
csv_f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)
csv_writer.writerow(["ID", "duration", "wav", "spk_id", "wrd"])
for line in tqdm(loaded_csv) :
row = line_processor(line)
if row is not None :
total_duration += row.duration
csv_writer.writerow(
[
row.snt_id,
str(row.duration),
row.mp3_path,
row.spk_id,
row.words,
]
)
os.replace(csv_file_tmp, csv_file)
# Final prints
msg = "%s successfully created!" % (csv_file)
logger.info(msg)
msg = "Number of samples: %s " % (str(len(loaded_csv)))
logger.info(msg)
msg = "Total duration: %s Hours" % (str(round(total_duration / 3600, 2)))
logger.info(msg)
def language_specific_preprocess(language, words):
# !! Language specific cleaning !!
# Important: feel free to specify the text normalization
# corresponding to your alphabet.
if language in ["en", "fr", "it", "rw"]:
words = re.sub(
"[^’'A-Za-z0-9À-ÖØ-öø-ÿЀ-ӿéæœâçèàûî]+", " ", words
).upper()
if language == "de":
# this replacement helps preserve the case of ß
# (and helps retain solitary occurrences of SS)
# since python's upper() converts ß to SS.
words = words.replace("ß", "0000ß0000")
words = re.sub("[^’'A-Za-z0-9öÖäÄüÜß]+", " ", words).upper()
words = words.replace("'", " ")
words = words.replace("’", " ")
words = words.replace(
"0000SS0000", "ß"
) # replace 0000SS0000 back to ß as its initial presence in the corpus
if language == "fr":
# Replace J'y D'hui etc by J_ D_hui
words = words.replace("'", " ")
words = words.replace("’", " ")
elif language == "ar":
HAMZA = "\u0621"
ALEF_MADDA = "\u0622"
ALEF_HAMZA_ABOVE = "\u0623"
letters = (
"ابتةثجحخدذرزژشسصضطظعغفقكلمنهويىءآأؤإئ"
+ HAMZA
+ ALEF_MADDA
+ ALEF_HAMZA_ABOVE
)
words = re.sub("[^" + letters + " ]+", "", words).upper()
elif language == "fa":
HAMZA = "\u0621"
ALEF_MADDA = "\u0622"
ALEF_HAMZA_ABOVE = "\u0623"
letters = (
"ابپتةثجحخچدذرزژسشصضطظعغفقگکلمنهویىءآأؤإئ"
+ HAMZA
+ ALEF_MADDA
+ ALEF_HAMZA_ABOVE
)
words = re.sub("[^" + letters + " ]+", "", words).upper()
elif language == "ga-IE":
# Irish lower() is complicated, but upper() is nondeterministic, so use lowercase
def pfxuc(a):
return len(a) >= 2 and a[0] in "tn" and a[1] in "AEIOUÁÉÍÓÚ"
def galc(w):
return w.lower() if not pfxuc(w) else w[0] + "-" + w[1:].lower()
words = re.sub("[^-A-Za-z'ÁÉÍÓÚáéíóú]+", " ", words)
words = " ".join(map(galc, words.split(" ")))
elif language == "es":
# Fix the following error in dataset large:
# KeyError: 'The item En noviembre lanzaron Queen Elizabeth , coproducida por Foreign Noi$e . requires replacements which were not supplied.'
words = words.replace("$", "s")
return words
def check_commonvoice_folders(data_folder):
"""
Check if the data folder actually contains the Common Voice dataset.
If not, raises an error.
Returns
-------
None
Raises
------
FileNotFoundError
If data folder doesn't contain Common Voice dataset.
"""
files_str = "/clips"
# Checking clips
if not os.path.exists(data_folder + files_str):
err_msg = (
"the folder %s does not exist (it is expected in "
"the Common Voice dataset)" % (data_folder + files_str)
)
raise FileNotFoundError(err_msg)
def unicode_normalisation(text):
return str(text)
def strip_accents(text):
text = (
unicodedata.normalize("NFD", text)
.encode("ascii", "ignore")
.decode("utf-8")
)
return str(text)