|
import torch |
|
import pytube as pt |
|
from transformers import pipeline |
|
import json |
|
import whisper_timestamped as whispertime |
|
from pydub import AudioSegment |
|
from spleeter.separator import Separator |
|
import os |
|
from profanity_check import predict |
|
import sys |
|
import tempfile |
|
import uuid |
|
import shutil |
|
import json |
|
|
|
import streamlit as st |
|
|
|
|
|
|
|
|
|
MODEL_NAME = "openai/whisper-large-v2" |
|
|
|
PROFANE_WORDS = ["Fuck", "fuck","f***","s***", "b****", "c***", "h**","n*****","f*****","p****", "dick", "slit", "slut", "pussy", "ass", "fucking", "fuckin", "pussy."] |
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
def create_tmp_copy_of_file(file, dir=None): |
|
""" |
|
Creates a temporary copy of the file and returns the path to the copy. |
|
:param file: the path to the file |
|
:param dir: optional directory to place the copy in |
|
:return: path to the temporary copy |
|
""" |
|
if isinstance(file, dict): |
|
file_path = file["path"] |
|
else: |
|
file_path = file |
|
|
|
if dir is None: |
|
dir = tempfile.gettempdir() |
|
|
|
file_name = os.path.basename(file_path) |
|
tmp_path = os.path.join(dir, f"{str(uuid.uuid4())}_{file_name}") |
|
shutil.copy2(file_path, tmp_path) |
|
|
|
return json.dumps(tmp_path).strip('"') |
|
|
|
def source_separation(input_file, output_folder="separated_audio"): |
|
separator = Separator('spleeter:2stems') |
|
separator.separate_to_file(input_file, output_folder) |
|
return f"{output_folder}/{os.path.splitext(os.path.basename(input_file))[0]}" |
|
|
|
def process_audio(input_file, model_size='tiny', verbose=False, play_output=False): |
|
if not os.path.isfile(input_file): |
|
print('Error: input file not found') |
|
sys.exit() |
|
|
|
stems_dir = source_separation(input_file) |
|
vocal_stem = os.path.join(stems_dir, 'vocals.wav') |
|
instr_stem = os.path.join(stems_dir, 'accompaniment.wav') |
|
|
|
model = whispertime.load_model(model_size, device=device) |
|
result = whispertime.transcribe(model, vocal_stem, language="en") |
|
|
|
if verbose: |
|
print('\nTranscribed text:') |
|
print(result['text']+'\n') |
|
|
|
print(result["text"]) |
|
|
|
profane_indices = predict(result["text"].split()) |
|
profanities = [word for word, is_profane in zip(result["text"].split(), profane_indices) if is_profane] |
|
if not profanities: |
|
print(f'No profanities detected found in {input_file} - exiting') |
|
|
|
if verbose: |
|
print('Profanities found in text:') |
|
print(profanities) |
|
|
|
vocals = AudioSegment.from_wav(vocal_stem) |
|
|
|
segments = result["segments"] |
|
|
|
for segment in segments: |
|
words = segment["words"] |
|
for word in words: |
|
if word["text"].lower() in PROFANE_WORDS: |
|
start_time = int(word["start"] * 1000) |
|
end_time = int(word["end"] * 1000) |
|
silence = AudioSegment.silent(duration=(end_time - start_time)) |
|
vocals = vocals[:start_time] + silence + vocals[end_time:] |
|
|
|
mix = AudioSegment.from_wav(instr_stem).overlay(vocals) |
|
print("#### \n\n" + input_file) |
|
outpath = input_file.replace('.mp3', '_masked.mp3').replace('.wav', '_masked.wav') |
|
print("#### \n\n" + outpath) |
|
|
|
|
|
|
|
final_mix = mix.export(outpath, format="wav") |
|
|
|
print(f'Mixed file written to: {outpath}') |
|
|
|
|
|
print('\n Returning final mix: ', final_mix) |
|
return outpath |
|
|
|
|
|
|
|
|
|
def transcribe(microphone=None, file_upload=None): |
|
if (microphone is not None) and (file_upload is not None): |
|
warn_output = ( |
|
"WARNING: You've uploaded an audio file and used the microphone. " |
|
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" |
|
) |
|
elif (microphone is None) and (file_upload is None): |
|
return "ERROR: You have to e~ither use the microphone or upload an audio file" |
|
|
|
file = microphone if microphone is not None else file_upload |
|
processed_file = process_audio(file) |
|
print('File sucessfully processed:, ', processed_file) |
|
|
|
audio = processed_file |
|
|
|
return str(audio) |
|
|
|
def _return_yt_html_embed(yt_url): |
|
video_id = yt_url.split("?v=")[-1] |
|
HTML_str = ( |
|
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' |
|
" </center>" |
|
) |
|
return HTML_str |
|
|
|
def yt_transcribe(yt_url): |
|
yt = pt.YouTube(yt_url) |
|
html_embed_str = _return_yt_html_embed(yt_url) |
|
stream = yt.streams.filter(only_audio=True)[0] |
|
stream.download(filename="audio.mp3") |
|
|
|
processed_file = process_audio("audio.mp3") |
|
audio = AudioSegment.from_file(processed_file, format="mp3") |
|
|
|
return html_embed_str, audio |
|
|
|
|
|
|
|
|
|
|
|
|
|
import streamlit as st |
|
|
|
st.title("Saylss - remove profane audio from uploaded content") |
|
|
|
f""" |
|
Saylss censors profane audio inputs with the click of a button! Saylss uses a custom Whisper model to timestamp at the word level and then several audio manipulation libraries to process and return a clean version. |
|
by olmec.ai & batman. |
|
""" |
|
|
|
tab1, tab2 = st.tabs(["Transcribe Audio", "Transcribe YouTube"]) |
|
|
|
with tab1: |
|
uploaded_files = st.file_uploader("Upload your audio file here", type=["mp3", "wav"], help="Drag and drop or click to choose file") |
|
if uploaded_files is not None: |
|
bytes_data = uploaded_files.read() |
|
|
|
st.write("Your uploaded file") |
|
st.audio(bytes_data) |
|
|
|
|
|
|
|
st.markdown("---") |
|
st.write("## Your processed file") |
|
with st.spinner("...is being processed"): |
|
|
|
|
|
with open(uploaded_files.name, "wb") as f: |
|
f.write((uploaded_files).getbuffer()) |
|
|
|
processed_audio = transcribe(microphone=None, file_upload=uploaded_files.name) |
|
|
|
audio_file = open(processed_audio, 'rb') |
|
audio_bytes2 = audio_file.read() |
|
st.audio(audio_bytes2) |
|
|
|
with tab2: |
|
link = st.text_input("Paste your YouTube link", placeholder="https://www.youtube.com/watch?v=EuEe3WKpbCo") |
|
if link != "": |
|
|
|
try: |
|
st.video(link) |
|
except: |
|
st.warning("Not a video") |
|
st.stop() |
|
|
|
with st.spinner("YouTube link is being processed"): |
|
html_embed_str, audio = yt_transcribe(link) |
|
|
|
audio_file = open(audio, 'rb') |
|
audio_bytes_yt = audio_file.read() |
|
st.audio(audio_bytes_yt) |