YourMT3 / amt /src /utils /preprocess /preprocess_enstdrums.py
mimbres's picture
.
a03c9b4
raw
history blame
19.6 kB
""" preprocess_enstdrums.py """
import os
import re
import glob
import copy
import json
import numpy as np
from typing import Dict
from utils.note_event_dataclasses import Note
from utils.audio import get_audio_file_info, load_audio_file, write_wav_file
from utils.midi import note_event2midi
from utils.note2event import note2note_event, sort_notes, validate_notes, trim_overlapping_notes, mix_notes
from config.vocabulary import ENST_DRUM_NOTES
DRUM_OFFSET = 0.01
def create_enst_audio_stem(drum_audio_file, accomp_audio_file, enst_id) -> Dict:
program = [128, 129]
is_drum = [1, 0]
audio_tracks = [] # multi-channel audio array (C, T)
drum_audio = load_audio_file(drum_audio_file, dtype=np.int16) / 2**15 # returns bytes
audio_tracks.append(drum_audio.astype(np.float16))
accomp_audio = load_audio_file(accomp_audio_file, dtype=np.int16) / 2**15 # returns bytes
audio_tracks.append(accomp_audio.astype(np.float16))
max_length = max(len(drum_audio), len(accomp_audio))
# collate all the audio tracks into a single array
n_tracks = 2
audio_array = np.zeros((n_tracks, max_length), dtype=np.float16)
for j, audio in enumerate(audio_tracks):
audio_array[j, :len(audio)] = audio
stem_content = {
'enstdrums_id': enst_id,
'program': program,
'is_drum': is_drum,
'n_frames': max_length, # int
'audio_array': audio_array # (n_tracks, n_frames)
}
return stem_content
def create_note_note_event_midi_from_enst_annotation(ann_file, enst_id):
"""
Args:
ann_file: 'path/to/annotation.txt'
enst_id: str
Returns:
notes: List[Note]
note_events: List[NoteEvent]
midi: List[List[int]]
"""
# Read the text file and split each line into timestamp and drum instrument name
with open(ann_file, 'r') as f:
lines = f.readlines() # ignore additional annotations like rc2, sd-
anns = [(float(line.split()[0]), re.sub('[^a-zA-Z]', '', line.split()[1])) for line in lines]
# Convert ann to notes by ENST_DRUM_NOTES vocabulary
notes = []
for time, drum_name in anns:
if drum_name not in ENST_DRUM_NOTES.keys():
raise ValueError(f"Drum name {drum_name} is not in ENST_DRUM_NOTES")
notes.append(
Note(
is_drum=True,
program=128,
onset=float(time),
offset=float(time) + DRUM_OFFSET,
pitch=ENST_DRUM_NOTES[drum_name][0],
velocity=1))
notes = sort_notes(notes)
notes = validate_notes(notes)
notes = trim_overlapping_notes(notes)
note_events = note2note_event(notes)
# Write midi file
midi_file = ann_file.replace('.txt', '.mid')
note_event2midi(note_events, midi_file)
print(f"Created {midi_file}")
program = [128]
is_drum = [1]
return { # notes
'enstdrums_id': enst_id,
'program': program,
'is_drum': is_drum,
'duration_sec': note_events[-1].time,
'notes': notes,
}, { # note_events
'enstdrums_id': enst_id,
'program': program,
'is_drum': is_drum,
'duration_sec': note_events[-1].time,
'note_events': note_events,
}
def preprocess_enstdrums16k(data_home: os.PathLike, dataset_name='enstdrums') -> None:
"""
Some tracks ('minus-one' in the file name) of ENST-drums contain accompaniments.
'stem file' will contain these accompaniments. 'mix_audio_file' will contain the
mix of the drums and accompaniments.
Splits:
- drummer_1, drummer_2, drummer_3, all
- drummer3_dtd, drummer3_dtp, drummer3_dtm_r1, drummer3_dtm_r2 (for validation/test)
DTD means drum track only, DTP means drum track plus percussions, DTM means
drum track plus music.
DTM r1 and r2 are two different versions of the mixing tracks derived from listening
test in [Gillet 2008, Paulus 2009], and used in [Wu 2018].
r1 uses 1:3 ratio of accompaniment to drums, and r2 uses 2:3 ratio.
O. Gillet and G. Richard, “Transcription and separation of drum signals from polyphonic music,”
IEEE Trans. Audio, Speech, Lang. Process., vol. 16, no. 3, pp. 529–540, Mar. 2008.
J. Paulus and A. Klapuri, “Drum sound detection in polyphonic mu- sic with hidden Markov models,”
EURASIP J. Audio, Speech, Music Process., vol. 2009, no. 14, 2009, Art. no. 14.
C. -W. Wu et al., "A Review of Automatic Drum Transcription,"
IEEE/ACM TASLP, vol. 26, no. 9, pp. 1457-1483, Sept. 2018,
Writes:
- {dataset_name}_{split}_file_list.json: a dictionary with the following keys:
{
index:
{
'enstdrums_id': {drummer_id}_{3-digit-track-id}
'n_frames': (int),
'stem_file': Dict of stem audio file with metadata,
'mix_audio_file': 'path/to/mix.wav',
'notes_file': 'path/to/notes.npy',
'note_events_file': 'path/to/note_events.npy',
'midi_file': 'path/to/midi.mid',
'program': List[int], # 128 for drums, 129 for unannotated (accompaniment)
'is_drum': List[int], # 0 or 1
}
}
"""
# Directory and file path
base_dir = os.path.join(data_home, dataset_name + '_yourmt3_16k')
output_index_dir = os.path.join(data_home, 'yourmt3_indexes')
os.makedirs(output_index_dir, exist_ok=True)
# Gather info
enst_ids = []
enst_info = {}
for i in [1, 2, 3]:
drummer_files = sorted(
glob.glob(os.path.join(base_dir, f'drummer_{i}', 'annotation/*.txt')))
for file in drummer_files:
track_id = os.path.basename(file).split('_')[0]
enst_id = f'{i}_{track_id}'
enst_ids.append(enst_id)
# Create notes, note_events, and MIDI from annotation
ann_file = file
assert os.path.exists(ann_file), f'{ann_file} does not exist'
notes, note_events = create_note_note_event_midi_from_enst_annotation(ann_file, enst_id)
notes_file = ann_file.replace('.txt', '_notes.npy')
note_events_file = ann_file.replace('.txt', '_note_events.npy')
np.save(notes_file, notes, allow_pickle=True, fix_imports=False)
print(f"Created {notes_file}")
np.save(note_events_file, note_events, allow_pickle=True, fix_imports=False)
print(f"Created {note_events_file}")
# Create stem file from audio for accompaniment
drum_audio_file = os.path.join(base_dir, f'drummer_{i}', 'audio', 'wet_mix',
os.path.basename(file).replace('.txt', '.wav'))
assert os.path.exists(drum_audio_file), f'{drum_audio_file} does not exist'
if 'minus-one' in file: # unannotated accompaniment exists
# 129: Unannotated accompaniment exists
accomp_audio_file = os.path.join(base_dir, f'drummer_{i}', 'audio', 'accompaniment',
os.path.basename(file).replace('.txt', '.wav'))
assert os.path.exists(accomp_audio_file), f'{accomp_audio_file} does not exist'
os.makedirs(os.path.join(base_dir, f'drummer_{i}', 'audio', 'stem'), exist_ok=True)
stem_file = os.path.join(base_dir, f'drummer_{i}', 'audio', 'stem',
os.path.basename(file).replace('.txt', '_stem.npy'))
stem_content = create_enst_audio_stem(drum_audio_file, accomp_audio_file, enst_id)
# write audio stem
np.save(stem_file, stem_content, allow_pickle=True, fix_imports=False)
print(f"Created {stem_file}")
# create (drum + accompaniment) mix audio file. r1
os.makedirs(
os.path.join(base_dir, f'drummer_{i}', 'audio', 'accompaniment_mix_r1'),
exist_ok=True)
accomp_mix_audio_file_r1 = os.path.join(
base_dir, f'drummer_{i}', 'audio', 'accompaniment_mix_r1',
os.path.basename(file).replace('.txt', '.wav'))
accomp_mix_audio_r1 = stem_content['audio_array'][0] / np.max(
np.abs(stem_content['audio_array'][0])) * 0.75 + stem_content['audio_array'][
1] / np.max(np.abs(stem_content['audio_array'][1])) * 0.25
accomp_mix_audio_r1 = accomp_mix_audio_r1 / np.max(np.abs(accomp_mix_audio_r1))
write_wav_file(accomp_mix_audio_file_r1, accomp_mix_audio_r1, 16000)
print(f"Created {accomp_mix_audio_file_r1}")
# create (drum + accompaniment) mix audio file. r1
os.makedirs(
os.path.join(base_dir, f'drummer_{i}', 'audio', 'accompaniment_mix_r2'),
exist_ok=True)
accomp_mix_audio_file_r2 = os.path.join(
base_dir, f'drummer_{i}', 'audio', 'accompaniment_mix_r2',
os.path.basename(file).replace('.txt', '.wav'))
accomp_mix_audio_r2 = stem_content['audio_array'][0] / np.max(
np.abs(stem_content['audio_array'][0])) * 0.6 + stem_content['audio_array'][
1] / np.max(np.abs(stem_content['audio_array'][1])) * 0.4
accomp_mix_audio_r2 = accomp_mix_audio_r2 / np.max(np.abs(accomp_mix_audio_r2))
write_wav_file(accomp_mix_audio_file_r2, accomp_mix_audio_r2, 16000)
print(f"Created {accomp_mix_audio_file_r2}")
n_frames = len(accomp_mix_audio_r2)
# use r2 for training...
mix_audio_file = accomp_mix_audio_file_r2
else:
# No unannotated accompaniment
stem_file = None
mix_audio_file = drum_audio_file
n_frames = get_audio_file_info(drum_audio_file)[1]
# Create index, this is based on dtm setup
enst_info[enst_id] = {
'enstdrums_id': enst_id,
'n_frames': n_frames,
'stem_file': stem_file,
'mix_audio_file': mix_audio_file,
'notes_file': notes_file,
'note_events_file': note_events_file,
'midi_file': ann_file.replace('.txt', '.mid'),
'program': stem_content['program'] if 'minus-one' in file else notes['program'],
'is_drum': stem_content['is_drum'] if 'minus-one' in file else notes['is_drum'],
}
# Write index
for split in [
'drummer_1_dtm', 'drummer_2_dtm', 'all_dtm', 'drummer_1_dtp', 'drummer_2_dtp',
'all_dtp', 'drummer_3_dtd', 'drummer_3_dtp', 'drummer_3_dtm_r1', 'drummer_3_dtm_r2'
]:
# splits for training
file_list = {}
i = 0
if split == 'drummer_1_dtm':
for enst_id in enst_ids:
if enst_id.startswith('1_'):
file_list[str(i)] = enst_info[enst_id]
i += 1
assert len(file_list) == 97
elif split == 'drummer_2_dtm':
for enst_id in enst_ids:
if enst_id.startswith('2_'):
file_list[str(i)] = enst_info[enst_id]
i += 1
assert len(file_list) == 105
elif split == 'all_dtm':
for enst_id in enst_ids:
file_list[str(i)] = enst_info[enst_id]
i += 1
assert len(file_list) == 318
elif split == 'drummer_1_dtp':
for enst_id in enst_ids:
if enst_id.startswith('1_'):
file_list[str(i)] = copy.deepcopy(enst_info[enst_id])
file_list[str(i)]['stem_file'] = None
file_list[str(i)]['mix_audio_file'] = file_list[str(
i)]['mix_audio_file'].replace('accompaniment_mix_r2', 'wet_mix')
file_list[str(i)]['program'] = [128]
file_list[str(i)]['is_drum'] = [1]
i += 1
assert len(file_list) == 97
elif split == 'drummer_2_dtp':
for enst_id in enst_ids:
if enst_id.startswith('2_'):
file_list[str(i)] = copy.deepcopy(enst_info[enst_id])
file_list[str(i)]['stem_file'] = None
file_list[str(i)]['mix_audio_file'] = file_list[str(
i)]['mix_audio_file'].replace('accompaniment_mix_r2', 'wet_mix')
file_list[str(i)]['program'] = [128]
file_list[str(i)]['is_drum'] = [1]
i += 1
assert len(file_list) == 105
elif split == 'all_dtp':
for enst_id in enst_ids:
file_list[str(i)] = copy.deepcopy(enst_info[enst_id])
file_list[str(i)]['stem_file'] = None
file_list[str(i)]['mix_audio_file'] = file_list[str(i)]['mix_audio_file'].replace(
'accompaniment_mix_r2', 'wet_mix')
file_list[str(i)]['program'] = [128]
file_list[str(i)]['is_drum'] = [1]
i += 1
assert len(file_list) == 318
elif split == 'drummer_3_dtd':
for enst_id in enst_ids:
if enst_id.startswith('3_') and len(enst_info[enst_id]['program']) == 1:
assert enst_info[enst_id]['stem_file'] == None
file_list[str(i)] = enst_info[enst_id]
i += 1
assert len(file_list) == 95
elif split == 'drummer_3_dtp':
for enst_id in enst_ids:
if enst_id.startswith('3_') and len(enst_info[enst_id]['program']) == 2:
file_list[str(i)] = copy.deepcopy(enst_info[enst_id])
file_list[str(i)]['stem_file'] = None
# For DTP, we use the drum audio file as the mix audio file
file_list[str(i)]['mix_audio_file'] = file_list[str(
i)]['mix_audio_file'].replace('accompaniment_mix_r2', 'wet_mix')
file_list[str(i)]['program'] = [128]
file_list[str(i)]['is_drum'] = [1]
i += 1
assert len(file_list) == 21
elif split == 'drummer_3_dtm_r1':
for enst_id in enst_ids:
if enst_id.startswith('3_') and len(enst_info[enst_id]['program']) == 2:
file_list[str(i)] = copy.deepcopy(enst_info[enst_id])
file_list[str(i)]['stem_file'] = None
file_list[str(i)]['mix_audio_file'] = file_list[str(
i)]['mix_audio_file'].replace('accompaniment_mix_r2',
'accompaniment_mix_r1')
i += 1
assert len(file_list) == 21
elif split == 'drummer_3_dtm_r2':
for enst_id in enst_ids:
if enst_id.startswith('3_') and len(enst_info[enst_id]['program']) == 2:
file_list[str(i)] = copy.deepcopy(enst_info[enst_id])
file_list[str(i)]['stem_file'] = None
i += 1
assert len(file_list) == 21
# final check for file existence
for k, v in file_list.items():
if v['stem_file'] is not None:
assert os.path.exists(v['stem_file'])
assert os.path.exists(v['mix_audio_file'])
assert os.path.exists(v['notes_file'])
assert os.path.exists(v['note_events_file'])
assert os.path.exists(v['midi_file'])
# write json file
output_index_file = os.path.join(output_index_dir, f'{dataset_name}_{split}_file_list.json')
with open(output_index_file, 'w') as f:
json.dump(file_list, f, indent=4)
print(f"Created {output_index_file}")
def create_filelist_dtm_random_enstdrums16k(data_home: os.PathLike,
dataset_name: str = 'enstdrums') -> None:
# Directory and file paths
base_dir = os.path.join(data_home, dataset_name + '_yourmt3_16k')
output_index_dir = os.path.join(data_home, 'yourmt3_indexes')
os.makedirs(output_index_dir, exist_ok=True)
# Load all filelist
file_list_all_dtm_path = os.path.join(output_index_dir,
f'{dataset_name}_all_dtm_file_list.json')
file_list_all_dtp_path = os.path.join(output_index_dir,
f'{dataset_name}_all_dtp_file_list.json')
# Collect dtm tracks
with open(file_list_all_dtm_path, 'r') as f:
fl = json.load(f)
fl_dtm = {}
i = 0
for v in fl.values():
if 129 in v['program']:
fl_dtm[i] = copy.deepcopy(v)
i += 1
# Collect dtd tracks
fl_dtd = {}
i = 0
for v in fl.values():
if 129 not in v['program']:
fl_dtd[i] = copy.deepcopy(v)
i += 1
# Split: 70, 15, 15
# rand_idx = np.random.permutation(len(fl_dtm))
idx = {}
idx['train_dtm'] = [
47, 58, 14, 48, 60, 44, 34, 31, 5, 62, 46, 12, 9, 26, 57, 11, 16, 22, 33, 3, 6, 55, 50, 32,
52, 53, 10, 28, 24, 41, 63, 51, 43, 49, 54, 15, 20, 1, 27, 2, 23, 45, 38, 37
]
idx['validation_dtm'] = [39, 4, 19, 59, 61, 17, 56, 36, 29, 0]
idx['test_dtm'] = [18, 7, 42, 25, 40, 8, 30, 21, 13, 35]
idx['train_dtp'] = idx['train_dtm']
idx['validation_dtp'] = idx['validation_dtm']
idx['test_dtp'] = idx['test_dtm']
for split in [
'train_dtm',
'validation_dtm',
'test_dtm',
'train_dtp',
'validation_dtp',
'test_dtp',
'all_dtd',
]:
file_list = {}
i = 0
if 'dtm' in split:
for k, v in fl_dtm.items():
if int(k) in idx[split]:
file_list[i] = copy.deepcopy(v)
i += 1
if split == 'test_dtm' or split == 'validation_dtm':
# add r1 mix tracks
for k, v in fl_dtm.items():
if int(k) in idx[split]:
_v = copy.deepcopy(v)
_v['mix_audio_file'] = _v['mix_audio_file'].replace(
'accompaniment_mix_r2', 'accompaniment_mix_r1')
file_list[i] = _v
i += 1
elif 'dtp' in split:
for k, v in fl_dtm.items():
if int(k) in idx[split]:
_v = copy.deepcopy(v)
_v['stem_file'] = None
_v['mix_audio_file'] = _v['mix_audio_file'].replace(
'accompaniment_mix_r2', 'wet_mix')
_v['program'] = [128] # bug fixed..
_v['is_drum'] = [1] # bug fixed..
file_list[i] = _v
i += 1
elif 'dtd' in split:
for k, v in fl_dtd.items():
file_list[i] = copy.deepcopy(v)
i += 1
else:
raise ValueError(f'Unknown split: {split}')
output_file = os.path.join(output_index_dir, f'{dataset_name}_{split}_file_list.json')
with open(output_file, 'w') as f:
json.dump(file_list, f, indent=4)
print(f'Created {output_file}')