Spaces:
Sleeping
Sleeping
import torchaudio | |
import gradio as gr | |
import soundfile as sf | |
import tempfile | |
import os | |
import io | |
import librosa | |
import numpy as np | |
import pandas as pd | |
from transformers import ASTFeatureExtractor, AutoModelForAudioClassification, Trainer, Wav2Vec2FeatureExtractor, HubertForSequenceClassification, pipeline | |
from datasets import Dataset, DatasetDict | |
import torch.nn.functional as F | |
import torch | |
from collections import Counter | |
from scipy.stats import kurtosis | |
from huggingface_hub import InferenceClient | |
import os | |
import time | |
''' | |
Predictor | |
''' | |
#Obtenemos el token para traernos el modelo: | |
access_token_mod_1 = os.getenv('HF_Access_Personal') | |
#Cargamos procesador y modelo: | |
processor = ASTFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") | |
model = AutoModelForAudioClassification.from_pretrained("Robertomarting/tmp_trainer",token=access_token_mod_1) | |
#Definimos una función para eliminar segmentos de audio con un determinado porcentaje de ruido blanco: | |
def is_white_noise(audio, threshold=0.75): | |
kurt = kurtosis(audio) | |
return np.abs(kurt) < 0.1 and np.mean(np.abs(audio)) < threshold | |
#Función de procesado de audio, permite particionar en fragmentos de 1 segundo, hacer un trim, volverlo mono si está en estéreo, resamplearlo | |
#al sampling rate que admite el modelo, etc. | |
def process_audio(audio_tuple, target_sr=16000, target_duration=1.0): | |
data = [] | |
target_length = int(target_sr * target_duration) | |
wav_buffer = io.BytesIO() | |
sf.write(wav_buffer, audio_tuple[1], audio_tuple[0], format='wav') | |
wav_buffer.seek(0) | |
audio_data, sample_rate = sf.read(wav_buffer) | |
audio_data = audio_data.astype(np.float32) | |
if len(audio_data.shape) > 1: | |
audio_data = np.mean(audio_data, axis=1) | |
if sample_rate != target_sr: | |
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=target_sr) | |
audio_data, _ = librosa.effects.trim(audio_data) | |
if len(audio_data) > target_length: | |
for i in range(0, len(audio_data), target_length): | |
segment = audio_data[i:i + target_length] | |
if len(segment) == target_length and not is_white_noise(segment): | |
data.append(segment) | |
else: | |
if not is_white_noise(audio_data): | |
data.append(audio_data) | |
return data | |
#Se aplica al extractor de características del modelo: | |
def preprocess_audio(audio_segments): | |
inputs = processor( | |
audio_segments, | |
padding=True, | |
sampling_rate=processor.sampling_rate, | |
max_length=int(processor.sampling_rate * 1), | |
truncation=True, | |
return_tensors="pt" | |
) | |
return inputs | |
#Se hace la predicción para cada audio: | |
def predict_audio(audio): | |
audio_segments = process_audio(audio) | |
inputs = preprocess_audio(audio_segments) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
probabilities = torch.nn.functional.softmax(logits, dim=-1).numpy() | |
predicted_classes = probabilities.argmax(axis=1) | |
most_common_predicted_label = Counter(predicted_classes).most_common(1)[0][0] | |
replace_dict = {0: 'Hambre', 1: 'Problemas para respirar', 2: 'Dolor', 3: 'Cansancio/Incomodidad'} | |
most_common_predicted_label = replace_dict[most_common_predicted_label] | |
return most_common_predicted_label | |
def display_prediction(audio): | |
prediction = predict_audio(audio) | |
return f"<h3 style='text-align: center; font-size: 1.5em;'>Tu bebé llora por: {prediction}</h3>" | |
def clear_audio_input(audio): | |
return "" | |
''' | |
Monitor | |
''' | |
def process_audio_monitor(audio_tuple, target_sr=16000, target_duration=1.0): | |
data = [] | |
target_length = int(target_sr * target_duration) | |
wav_buffer = io.BytesIO() | |
sf.write(wav_buffer, audio_tuple[1], audio_tuple[0], format='wav') | |
wav_buffer.seek(0) | |
audio_data, sample_rate = sf.read(wav_buffer) | |
audio_data = audio_data.astype(np.float32) | |
if len(audio_data.shape) > 1: | |
audio_data = np.mean(audio_data, axis=1) | |
if sample_rate != target_sr: | |
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=target_sr) | |
audio_data, _ = librosa.effects.trim(audio_data) | |
if len(audio_data) > target_length: | |
for i in range(0, len(audio_data), target_length): | |
segment = audio_data[i:i + target_length] | |
if len(segment) == target_length: | |
data.append(segment) | |
else: | |
data.append(audio_data) | |
return data | |
#Sacamos extractor de características: | |
FEATURE_EXTRACTOR = Wav2Vec2FeatureExtractor.from_pretrained("ntu-spml/distilhubert") | |
#Y nuestro modelo: | |
model_monitor = HubertForSequenceClassification.from_pretrained("A-POR-LOS-8000/distilhubert-finetuned-cry-detector") | |
#Calculamos decibelios de lo que llega al gradio: | |
def compute_db(audio_data): | |
rms = np.sqrt(np.mean(np.square(audio_data))) | |
db = 20 * np.log10(rms + 1e-6) | |
db = round(db,2) | |
return db | |
#Función de extracción de características para el monitor: | |
def preprocess_audio_monitor(audio_segments): | |
inputs = FEATURE_EXTRACTOR( | |
audio_segments, | |
padding=True, | |
sampling_rate=16000, | |
max_length=int(16000*1), | |
return_tensors="pt" | |
) | |
return inputs | |
#Función de predicción en streaming: | |
def predict_audio_stream(audio_data, sample_rate): | |
audio_segments = process_audio_monitor(audio_data) | |
inputs = preprocess_audio_monitor(audio_segments) | |
with torch.no_grad(): | |
outputs = model_monitor(**inputs) | |
logits = outputs.logits | |
probabilities = torch.nn.functional.softmax(logits, dim=-1).numpy() | |
crying_probabilities = probabilities[:, 1] | |
avg_crying_probability = crying_probabilities.mean() | |
if avg_crying_probability < 0.25: | |
inputs = preprocess_audio(audio_segments) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
probabilities = torch.nn.functional.softmax(logits, dim=-1).numpy() | |
predicted_classes = probabilities.argmax(axis=1) | |
most_common_predicted_label = Counter(predicted_classes).most_common(1)[0][0] | |
replace_dict = {0: 'Hambre', 1: 'Problemas para respirar', 2: 'Dolor', 3: 'Cansancio/Incomodidad'} | |
most_common_predicted_label = replace_dict[most_common_predicted_label] | |
return "Está llorando", 1-avg_crying_probability, most_common_predicted_label | |
else: | |
return "No está llorando", 1-avg_crying_probability, "" | |
#Función que se encarga de indicarle al usuario si se ha pasado el umbral: | |
def update_status_to_predicting(audio, visual_threshold): | |
sample_rate, audio_data = audio | |
audio_data = np.array(audio_data, dtype=np.float32) | |
db_level = compute_db(audio_data) | |
db_level = round(db_level, 2) | |
if db_level < visual_threshold: | |
return f"Esperando... Decibelios: {db_level}" | |
else: | |
return f"Prediciendo... Decibelios: {db_level}" | |
#Función que realiza la predicción | |
def capture_and_predict(audio,visual_threshold, sample_rate=16000, duration=5): | |
sample_rate, audio_data = audio | |
audio_data = np.array(audio_data, dtype=np.float32) | |
db_level = compute_db(audio_data) | |
if db_level > visual_threshold: | |
max_samples = sample_rate * duration | |
audio_data = audio[:max_samples] | |
if len(audio_data) != 0: | |
result, probabilidad, result_2 = predict_audio_stream(audio_data, sample_rate) | |
if result == "Está llorando": | |
return f"{result}, por {result_2}" | |
else: | |
return "No está llorando" | |
''' | |
Asistente | |
''' | |
#Traemos el token: | |
access_token = os.getenv('HF_ACCESS_TOKEN') | |
#Generamos el cliente: | |
client = InferenceClient("mistralai/Mistral-Nemo-Instruct-2407", token=access_token) | |
#Generamos una función de respuesta: | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
messages = [{"role": "system", "content": system_message}] | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
for message in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = message.choices[0].delta.content | |
response += token | |
yield response | |
''' | |
Interfaz | |
''' | |
#Generamos un theme con parámetros personalizados: | |
my_theme = gr.themes.Soft( | |
primary_hue="emerald", | |
secondary_hue="green", | |
neutral_hue="slate", | |
text_size="sm", | |
spacing_size="sm", | |
font=[gr.themes.GoogleFont('Nunito'), 'ui-sans-serif', 'system-ui', 'sans-serif'], | |
font_mono=[gr.themes.GoogleFont('Nunito'), 'ui-monospace', 'Consolas', 'monospace'], | |
).set( | |
body_background_fill='*neutral_50', | |
body_text_color='*neutral_600', | |
body_text_size='*text_sm', | |
embed_radius='*radius_md', | |
shadow_drop='*shadow_spread', | |
shadow_spread='*button_shadow_active' | |
) | |
#Función para mostrar la página del Predictor | |
def mostrar_pagina_1(): | |
return gr.update(visible=False), gr.update(visible=True) | |
#Función para regresar a la pantalla inicial | |
def redirigir_a_pantalla_inicial(): | |
return gr.update(visible=True), gr.update(visible=False) | |
#Generamos el gradio: | |
with gr.Blocks(theme = my_theme) as demo: | |
with gr.Column() as pantalla_inicial: | |
gr.HTML( | |
""" | |
<style> | |
@import url('https://fonts.googleapis.com/css2?family=Lobster&display=swap'); | |
@import url('https://fonts.googleapis.com/css2?family=Roboto&display=swap'); | |
h1 { | |
font-family: 'Lobster', cursive; | |
font-size: 5em !important; | |
text-align: center; | |
margin: 0; | |
} | |
h2 { | |
font-family: 'Lobster', cursive; | |
font-size: 3em !important; | |
text-align: center; | |
margin: 0; | |
} | |
h3 { | |
font-family: 'Roboto', sans-serif; | |
text-align: center; | |
font-size: 1.5em !important; | |
} | |
p.slogan, h4, p { | |
font-family: 'Roboto', sans-serif; | |
text-align: center; | |
} | |
</style> | |
<h1>Iremia</h1> | |
<h4 style='text-align: center; font-size: 1.5em'>El mejor aliado para el bienestar de tu bebé</h4> | |
""" | |
) | |
gr.Markdown("<h4 style='text-align: left; font-size: 1.5em;'>¿Qué es Iremia?</h4>") | |
gr.Markdown("<p style='text-align: left'>Iremia es un proyecto llevado a cabo por un grupo de estudiantes interesados en el desarrollo de modelos de inteligencia artificial, enfocados específicamente en casos de uso relevantes para ayudar a cuidar a los más pequeños de la casa.</p>") | |
gr.Markdown("<h4 style='text-align: left; font-size: 1.5em;'>Nuestra misión</h4>") | |
gr.Markdown("<p style='text-align: left'>Sabemos que la paternidad puede suponer un gran desafío. Nuestra misión es brindarles a todos los padres unas herramientas de última tecnología que los ayuden a navegar esos primeros meses de vida tan cruciales en el desarrollo de sus pequeños.</p>") | |
gr.Markdown("<h4 style='text-align: left; font-size: 1.5em;'>¿Qué ofrece Iremia?</h4>") | |
gr.Markdown("<p style='text-align: left'>Iremia ofrece dos funcionalidades muy interesantes:</p>") | |
gr.Markdown("<p style='text-align: left'>Predictor: Con nuestro modelo de inteligencia artificial, somos capaces de predecir por qué tu hijo de menos de 2 años está llorando. Además, tendrás acceso a un asistente personal para consultar cualquier duda que tengas sobre el cuidado de tu pequeño.</p>") | |
gr.Markdown("<p style='text-align: left'>Monitor: Nuestro monitor no es como otros que hay en el mercado, ya que es capaz de reconocer si un sonido es un llanto del bebé o no, y si está llorando, predice automáticamente la causa, lo cual te brindará la tranquilidad de saber siempre qué pasa con tu pequeño y te ahorrará tiempo y muchas horas de sueño.</p>") | |
gr.Markdown("<p style='text-align: left'>Asistente: Contamos con un chatbot que podrá responder cualquier duda que tengas sobre el cuidado de tu bebé.</p>") | |
with gr.Row(): | |
with gr.Column(): | |
boton_pagina_1 = gr.Button("¡Prueba nuestros modelos!") | |
gr.Markdown("<p>Descubre por qué llora tu bebé, prueba nuestro monitor inteligente y resuelve dudas sobre el cuidado de tu pequeño con nuestras herramientas de última tecnología</p>") | |
with gr.Column(visible=False) as pagina_1: | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("<h2>Predictor</h2>") | |
gr.Markdown("<h4 style='text-align: center; font-size: 1.5em'>Descubre por qué tu bebé está llorando</h4>") | |
audio_input = gr.Audio(type="numpy", label="Baby recorder") | |
classify_btn = gr.Button("¿Por qué llora?") | |
classification_output = gr.Markdown() | |
classify_btn.click(display_prediction, inputs=audio_input, outputs=classification_output) | |
audio_input.change(fn=clear_audio_input, inputs=audio_input, outputs=classification_output) | |
with gr.Column(): | |
gr.Markdown("<h2>Monitor</h2>") | |
gr.Markdown("<h4 style='text-align: center; font-size: 1.5em'>Detecta en tiempo real si tu bebé está llorando</h4>") | |
audio_stream = gr.Audio(sources=["microphone"], streaming=True) | |
threshold_db = gr.Slider(minimum=0, maximum=200, step=1, value=50, label="Umbral de dB para activar la predicción") | |
status_label = gr.Textbox(label="Estado") | |
prediction_label = gr.Textbox(label="Tu bebé:") | |
audio_stream.stream( | |
fn=update_status_to_predicting, | |
inputs=[audio_stream, threshold_db], | |
outputs=status_label | |
) | |
# Captura el audio y realiza la predicción si se supera el umbral | |
audio_stream.stream( | |
fn=capture_and_predict, | |
inputs=[audio_stream,threshold_db], | |
outputs=prediction_label | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("<h2>Asistente</h2>") | |
gr.Markdown("<h4 style='text-align: center; font-size: 1.5em'>Pregunta a nuestro asistente cualquier duda que tengas sobre tu pequeño</h4>") | |
system_message = "Eres un chatbot especializado en el cuidado y la salud de los bebés. Estás dispuesto a ayudar amablemente a cualquier padre que tenga dudas o preocupaciones sobre su hijo o hija." | |
max_tokens = 512 | |
temperature = 0.7 | |
top_p = 0.95 | |
chatbot = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.State(value=system_message), | |
gr.State(value=max_tokens), | |
gr.State(value=temperature), | |
gr.State(value=top_p) | |
], | |
) | |
gr.Markdown("Este chatbot no sustituye a un profesional de la salud. Ante cualquier preocupación o duda, consulta con tu pediatra.") | |
boton_volver_inicio_1 = gr.Button("Volver a la pantalla inicial") | |
boton_volver_inicio_1.click(redirigir_a_pantalla_inicial, inputs=None, outputs=[pantalla_inicial, pagina_1]) | |
boton_pagina_1.click(mostrar_pagina_1, inputs=None, outputs=[pantalla_inicial, pagina_1]) | |
demo.launch() |