import os import json import random import torch import torchaudio from torch.utils.data import Dataset, DataLoader from huggingface_hub import login, upload_folder from transformers.integrations import TensorBoardCallback from transformers import ( Wav2Vec2FeatureExtractor, HubertConfig, HubertForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback ) MODEL = "ntu-spml/distilhubert" # modelo base utilizado, para usar otro basta con cambiar esto FEATURE_EXTRACTOR = Wav2Vec2FeatureExtractor.from_pretrained(MODEL) seed = 123 MAX_DURATION = 1.00 SAMPLING_RATE = FEATURE_EXTRACTOR.sampling_rate # 16000 token = os.getenv('MODEL_REPO_ID') config_file = "models_config.json" clasificador = "class" monitor = "mon" batch_size = 4096 class AudioDataset(Dataset): def __init__(self, dataset_path, label2id): self.dataset_path = dataset_path self.label2id = label2id self.file_paths = [] self.labels = [] for label_dir, label_id in self.label2id.items(): label_path = os.path.join(self.dataset_path, label_dir) if os.path.isdir(label_path): for file_name in os.listdir(label_path): audio_path = os.path.join(label_path, file_name) self.file_paths.append(audio_path) self.labels.append(label_id) def __len__(self): return len(self.file_paths) def __getitem__(self, idx): audio_path = self.file_paths[idx] label = self.labels[idx] input_values = self.preprocess_audio(audio_path) return { "input_values": input_values, "labels": torch.tensor(label) } def preprocess_audio(self, audio_path): waveform, sample_rate = torchaudio.load( audio_path, normalize=True, # Convierte a float32 # num_frames= # TODO: Probar para que no haga falta recortar los audios ) if sample_rate != SAMPLING_RATE: # Resamplear si no es 16kHz resampler = torchaudio.transforms.Resample(sample_rate, SAMPLING_RATE) waveform = resampler(waveform) if waveform.shape[0] > 1: # Si es stereo, convertir a mono waveform = waveform.mean(dim=0) waveform = waveform / torch.max(torch.abs(waveform)) inputs = FEATURE_EXTRACTOR( waveform, sampling_rate=SAMPLING_RATE, return_tensors="pt", max_length=int(SAMPLING_RATE * MAX_DURATION), truncation=True, padding=True, ) return inputs.input_values.squeeze() def seed_everything(): torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':16384:8' def build_label_mappings(dataset_path): label2id = {} id2label = {} label_id = 0 for label_dir in os.listdir(dataset_path): if os.path.isdir(os.path.join(dataset_path, label_dir)): label2id[label_dir] = label_id id2label[label_id] = label_dir label_id += 1 return label2id, id2label def create_dataloader(dataset_path, test_size=0.2, num_workers=12, shuffle=True, pin_memory=True): label2id, id2label = build_label_mappings(dataset_path) dataset = AudioDataset(dataset_path, label2id) dataset_size = len(dataset) indices = list(range(dataset_size)) random.shuffle(indices) split_idx = int(dataset_size * (1 - test_size)) train_indices = indices[:split_idx] test_indices = indices[split_idx:] train_dataset = torch.utils.data.Subset(dataset, train_indices) test_dataset = torch.utils.data.Subset(dataset, test_indices) train_dataloader = DataLoader( train_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory ) test_dataloader = DataLoader( test_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory ) return train_dataloader, test_dataloader, label2id, id2label def load_model(num_labels, label2id, id2label): config = HubertConfig.from_pretrained( MODEL, num_labels=num_labels, label2id=label2id, id2label=id2label, finetuning_task="audio-classification" ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = HubertForSequenceClassification.from_pretrained( # TODO: mirar parámetros. Posibles optimizaciones MODEL, config=config, torch_dtype=torch.float32, # No afecta 1ª época, mejor ponerlo ) model.to(device) return model def model_params(dataset_path): train_dataloader, test_dataloader, label2id, id2label = create_dataloader(dataset_path) model = load_model(num_labels=len(id2label), label2id=label2id, id2label=id2label) return model, train_dataloader, test_dataloader, id2label def compute_metrics(eval_pred): predictions = torch.argmax(input=eval_pred.predictions) references = eval_pred.label_ids return { "accuracy": torch.mean(predictions == references), } def main(training_args, output_dir, dataset_path): seed_everything() model, train_dataloader, test_dataloader, _ = model_params(dataset_path) trainer = Trainer( model=model, args=training_args, compute_metrics=compute_metrics, train_dataset=train_dataloader.dataset, eval_dataset=test_dataloader.dataset, callbacks=[TensorBoardCallback(), EarlyStoppingCallback(early_stopping_patience=3)] ) torch.cuda.empty_cache() # liberar memoria de la GPU trainer.train() # se pueden modificar los parámetros para continuar el train login(token, add_to_git_credential=True) trainer.push_to_hub(token=token) # Subir modelo a mi cuenta. Necesario para hacer la predicción, no sé por qué. trainer.save_model(output_dir) # para subir el modelo a Hugging Face. Necesario para hacer la predicción, no sé por qué. os.makedirs(output_dir, exist_ok=True) # Crear carpeta con el modelo si no existe # upload_folder(repo_id=f"A-POR-LOS-8000/{output_dir}",folder_path=output_dir, token=token) # subir modelo a organización def load_config(model_name): with open(config_file, 'r') as f: config = json.load(f) model_config = config[model_name] training_args = TrainingArguments(**model_config["training_args"]) model_config["training_args"] = training_args return model_config if __name__ == "__main__": # config = load_config(clasificador) # PARA CAMBIAR MODELOS config = load_config(monitor) # PARA CAMBIAR MODELOS training_args = config["training_args"] output_dir = config["output_dir"] dataset_path = config["dataset_path"] main(training_args, output_dir, dataset_path)