Spaces:
Sleeping
Sleeping
import streamlit as st | |
from transformers import WhisperForConditionalGeneration, WhisperProcessor | |
from transformers import pipeline | |
import librosa | |
import torch | |
from spleeter.separator import Separator | |
from pydub import AudioSegment | |
from IPython.display import Audio | |
import os | |
import accelerate | |
#import pyaudio | |
import numpy as np | |
# Create PyAudio object | |
#p = pyaudio.PyAudio() | |
CHUNK_SIZE = 1024 | |
SAMPLING_RATE = 16000 | |
vocals_data = bytes() | |
# preprocess and crop audio file | |
def audio_preprocess(input_file): #, in_data, frame_count, time_info, status): | |
# Define callback function for audio processing | |
global vocals_data | |
# Convert input data to numpy array | |
with open(input_file, 'rb') as f: | |
audio_bytes = f.read() | |
audio_array = np.frombuffer(audio_bytes, dtype=np.int16) | |
# Perform vocal removal on the audio input | |
# Pass the audio array as waveform to separate() method | |
vocals = Separator('spleeter:2stems').separate(audio_array) | |
# Convert vocals to audio data | |
vocals_data = vocals['vocals'].flatten().astype(np.int16).tobytes() | |
processed_audio = vocals_data | |
# Return processed data for output | |
return vocals_data, pyaudio.paContinue, processed_audio | |
# audio processing 2? | |
def py_audio(): | |
# Open stream for recording | |
stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLING_RATE, input=True, output=True, | |
frames_per_buffer=CHUNK_SIZE, stream_callback=process_audio) | |
# Start stream | |
stream.start_stream() | |
# Create stream for playback | |
playback_stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLING_RATE, output=True) | |
# Play processed data in real-time | |
while stream.is_active(): | |
if len(vocals_data) >= CHUNK_SIZE: | |
playback_stream.write(vocals_data[:CHUNK_SIZE]) | |
vocals_data = vocals_data[CHUNK_SIZE:] | |
# Stop streams | |
stream.stop_stream() | |
stream.close() | |
playback_stream.stop_stream() | |
playback_stream.close() | |
# Terminate PyAudio object | |
p.terminate() | |
# Now 'processed_file' contains the separated vocals | |
separated_audio = vocals_data | |
# separate music and vocal | |
#separator = Separator('spleeter:2stems') | |
#separator.separate_to_file(input_file, output_file) | |
#separated_audio = separator.separate(input_file) | |
# Crop the audio | |
start_time = 60000 # e.g. 30 seconds, 30000 | |
end_time = 110000 # e.g. 40 seconds, 40000 | |
audio = AudioSegment.from_file(separated_audio) | |
cropped_audio = audio[start_time:end_time] | |
processed_audio = cropped_audio | |
# .export('cropped_vocals.wav', format='wav') # save vocal audio file | |
return processed_audio | |
# ASR transcription | |
def asr_model(processed_audio): | |
# load audio file | |
y, sr = librosa.load(processed_audio, sr=16000) | |
# ASR model | |
MODEL_NAME = "RexChan/ISOM5240-whisper-small-zhhk_1" | |
processor = WhisperProcessor.from_pretrained(MODEL_NAME) | |
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME, low_cpu_mem_usage=True) | |
model.config.forced_decoder_ids = None | |
model.config.suppress_tokens = [] | |
model.config.use_cache = False | |
processed_in = processor(y, sampling_rate=sr, return_tensors="pt") | |
gout = model.generate( | |
input_features=processed_in.input_features, | |
output_scores=True, return_dict_in_generate=True | |
) | |
transcription = processor.batch_decode(gout.sequences, skip_special_tokens=True)[0] | |
# print result | |
print(f"Song lyrics = {transcription}") | |
return transcription | |
# sentiment analysis | |
def senti_model(transcription): | |
pipe = pipeline("text-classification", model="lxyuan/distilbert-base-multilingual-cased-sentiments-student") | |
final_result = pipe(transcription) | |
display = f"Sentiment Analysis shows that this song is {final_result[0]['label']}. Confident level of this analysis is {final_result[0]['score']*100:.1f}%." | |
print(display) | |
return display | |
# main | |
def main(input_file): | |
separated_audio = audio_preprocess(input_file) | |
transcription = asr_model(processed_audio) | |
final_result = senti_model(transcription) | |
st.write(final_result) | |
if st.button("Play Audio"): | |
st.audio(audio_data['audio'], | |
format="audio/wav", | |
start_time=0, | |
sample_rate = audio_data['sampling_rate']) | |
if __name__ == '__main__': | |
# steamlit setup | |
st.set_page_config(page_title="Sentiment Analysis on Your Cantonese Song",) | |
st.header("Cantonese Song Sentiment Analyzer") | |
input_file = os.path.isfile("test1.mp3") | |
#input_file = st.file_uploader("upload a song in mp3 format", type="mp3") # upload song | |
if input_file is not None: | |
st.write("File uploaded successfully!") | |
st.write(input_file) | |
else: | |
st.write("No file uploaded.") | |
button_click = st.button("Run Analysis", type="primary") | |
if button_click: | |
main(input_file=input_file) | |