import streamlit as st from transformers import WhisperForConditionalGeneration, WhisperProcessor from transformers import pipeline import librosa import torch from spleeter.separator import Separator from pydub import AudioSegment from IPython.display import Audio import os import accelerate #import pyaudio import numpy as np # Create PyAudio object #p = pyaudio.PyAudio() CHUNK_SIZE = 1024 SAMPLING_RATE = 16000 vocals_data = bytes() # preprocess and crop audio file def audio_preprocess(input_file): #, in_data, frame_count, time_info, status): # Define callback function for audio processing global vocals_data # Convert input data to numpy array with open(input_file, 'rb') as f: audio_bytes = f.read() audio_array = np.frombuffer(audio_bytes, dtype=np.int16) # Perform vocal removal on the audio input # Pass the audio array as waveform to separate() method vocals = Separator('spleeter:2stems').separate(audio_array) # Convert vocals to audio data vocals_data = vocals['vocals'].flatten().astype(np.int16).tobytes() processed_audio = vocals_data # Return processed data for output return vocals_data, pyaudio.paContinue, processed_audio # audio processing 2? def py_audio(): # Open stream for recording stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLING_RATE, input=True, output=True, frames_per_buffer=CHUNK_SIZE, stream_callback=process_audio) # Start stream stream.start_stream() # Create stream for playback playback_stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLING_RATE, output=True) # Play processed data in real-time while stream.is_active(): if len(vocals_data) >= CHUNK_SIZE: playback_stream.write(vocals_data[:CHUNK_SIZE]) vocals_data = vocals_data[CHUNK_SIZE:] # Stop streams stream.stop_stream() stream.close() playback_stream.stop_stream() playback_stream.close() # Terminate PyAudio object p.terminate() # Now 'processed_file' contains the separated vocals separated_audio = vocals_data # separate music and vocal #separator = Separator('spleeter:2stems') #separator.separate_to_file(input_file, output_file) #separated_audio = separator.separate(input_file) # Crop the audio start_time = 60000 # e.g. 30 seconds, 30000 end_time = 110000 # e.g. 40 seconds, 40000 audio = AudioSegment.from_file(separated_audio) cropped_audio = audio[start_time:end_time] processed_audio = cropped_audio # .export('cropped_vocals.wav', format='wav') # save vocal audio file return processed_audio # ASR transcription def asr_model(processed_audio): # load audio file y, sr = librosa.load(processed_audio, sr=16000) # ASR model MODEL_NAME = "RexChan/ISOM5240-whisper-small-zhhk_1" processor = WhisperProcessor.from_pretrained(MODEL_NAME) model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME, low_cpu_mem_usage=True) model.config.forced_decoder_ids = None model.config.suppress_tokens = [] model.config.use_cache = False processed_in = processor(y, sampling_rate=sr, return_tensors="pt") gout = model.generate( input_features=processed_in.input_features, output_scores=True, return_dict_in_generate=True ) transcription = processor.batch_decode(gout.sequences, skip_special_tokens=True)[0] # print result print(f"Song lyrics = {transcription}") return transcription # sentiment analysis def senti_model(transcription): pipe = pipeline("text-classification", model="lxyuan/distilbert-base-multilingual-cased-sentiments-student") final_result = pipe(transcription) display = f"Sentiment Analysis shows that this song is {final_result[0]['label']}. Confident level of this analysis is {final_result[0]['score']*100:.1f}%." print(display) return display # main def main(input_file): separated_audio = audio_preprocess(input_file) transcription = asr_model(processed_audio) final_result = senti_model(transcription) st.write(final_result) if st.button("Play Audio"): st.audio(audio_data['audio'], format="audio/wav", start_time=0, sample_rate = audio_data['sampling_rate']) if __name__ == '__main__': # steamlit setup st.set_page_config(page_title="Sentiment Analysis on Your Cantonese Song",) st.header("Cantonese Song Sentiment Analyzer") input_file = os.path.isfile("test1.mp3") #input_file = st.file_uploader("upload a song in mp3 format", type="mp3") # upload song if input_file is not None: st.write("File uploaded successfully!") st.write(input_file) else: st.write("No file uploaded.") button_click = st.button("Run Analysis", type="primary") if button_click: main(input_file=input_file)