Spaces:
Running
Running
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from df.enhance import enhance, init_df, load_audio, save_audio | |
import time | |
import os | |
import gradio as gr | |
import re | |
from gradio.themes.base import Base | |
from datasets import load_dataset | |
from datasets import Dataset,DatasetDict | |
import librosa | |
import torch | |
model_enhance, df_state, _ = init_df() | |
def Read_DataSet(link): | |
dataset = load_dataset(link,token=os.environ.get("auth_acess_data")) | |
df = dataset["train"].to_pandas() | |
return df | |
def remove_nn(wav, sample_rate=16000): | |
audio=librosa.resample(wav,orig_sr=sample_rate,target_sr=df_state.sr(),) | |
audio=torch.tensor([audio]) | |
# audio, _ = load_audio('full_generation.wav', sr=df_state.sr()) | |
print(audio) | |
enhanced = enhance(model_enhance, df_state, audio) | |
print(enhanced) | |
# save_audio("enhanced.wav", enhanced, df_state.sr()) | |
audiodata=librosa.resample(enhanced[0].numpy(),orig_sr=df_state.sr(),target_sr=sample_rate) | |
return 16000, audiodata/np.max(audiodata) | |
class DataViewerApp: | |
def __init__(self,df): | |
#df=Read_DataSet(link) | |
self.df=df | |
# self.df1=df | |
self.data =self.df[['text','speaker_id','secs','flag']] | |
self.dataa =self.df[['text','speaker_id','secs','flag']] | |
self.sdata =self.df['audio'].to_list() # Separate audio data storage | |
self.current_page = 0 | |
self.current_selected = -1 | |
self.speaker_id= -1 | |
class Seafoam(Base): | |
pass | |
self.seafoam = Seafoam() | |
#self.data =df[['text','speaker_id']] | |
#self.sdata = df['audio'].to_list() # Separate audio data storage | |
#self.current_page = 0 | |
#self.current_selected = -1 | |
def set1(self,df): | |
self.data =df[['text','speaker_id','secs','flag']] | |
self.sdata =df['audio'].to_list() | |
return self.get_page_data(self.current_page) | |
def settt(self,df): | |
self.df=pd.DataFrame() | |
self.data =pd.DataFrame() | |
self.sdata =[] | |
self.df=df | |
self.data =df[['text','speaker_id','secs','flag']] | |
self.dataa =df[['text','speaker_id','secs','flag']] | |
self.sdata =df['audio'].to_list() | |
self.current_page = 0 | |
self.current_selected =1 | |
self.speaker_id= -1 | |
return self.data | |
def clear(self,text): | |
text=re.sub(r'[a-zA-Z]', '', text) | |
return text | |
def clearenglish(self): | |
for i in range(len(self.df)): | |
x=self.clear(self.df['text'][i]) | |
x1=self.df['text'][i] | |
if x!=x1: | |
self.df.drop(i, inplace=True) | |
self.df.reset_index(drop=True, inplace=True) | |
return self.settt(self.df) | |
def splitt(self,link,num): | |
df=download_youtube_video(link,num) | |
v=self.settt(df) | |
return self.get_page_data(self.current_page),len(v) | |
def getdataset(self,link): | |
self.link_dataset=link | |
df=Read_DataSet(link) | |
v=self.settt(df) | |
return self.get_page_data(self.current_page),len(v),self.link_dataset | |
def remove_hamza_from_alif_and_symbols(self,text): | |
text = re.sub(r"[أإآ]", "ا", text) | |
text = re.sub(r"ٱ", "ا", text) | |
text = re.sub(r"[_\-\+\,\(\)]", " ", text) | |
text = re.sub(r"\d", " ", text) | |
return text | |
def save_row(self, text,data_oudio): | |
if text!="" : | |
row = self.data.iloc[self.current_selected] | |
row['text'] = text | |
row['flag']=1 | |
self.data.iloc[self.current_selected] = row | |
sr,audio=data_oudio | |
if sr!=16000: | |
audio=audio.astype(np.float32) | |
audio/=np.max(np.abs(audio)) | |
audio=librosa.resample(audio,orig_sr=sr,target_sr=16000) | |
self.sdata[self.current_selected] = audio | |
self.df['text'][self.current_selected] =text | |
self.df['audio'][self.current_selected] = audio | |
self.df['flag'][self.current_selected] =1 | |
return self.get_page_data(self.current_page),None,"" | |
def GetDataset_2(self,filename,ds=1.5): | |
audios_data = [] | |
audios_samplerate = [] | |
num_specker=[] | |
texts=[] | |
secs=[] | |
audiodata,samplerate = librosa.load(filename, sr=16000) # Removed extra indent here | |
audios_data.append(audiodata*ds) | |
audios_samplerate.append(samplerate) | |
texts.append(filename.replace('.wav','')) | |
secs.append(round(len(audiodata)/samplerate,2)) | |
df = pd.DataFrame() | |
df['secs'] = secs | |
df['audio'] = audios_data | |
df['samplerate'] = audios_samplerate | |
df['text'] =os.path.splitext(os.path.basename(filename))[0] | |
df['speaker_id'] =self.speaker_id | |
df['_speaker_id'] =self.speaker_id | |
df['flag']=1 | |
df = df[['text','audio','samplerate','secs','speaker_id','_speaker_id','flag']] | |
self.df = pd.concat([self.df, df], axis=0, ignore_index=True) | |
self.data =self.df[['text','speaker_id','secs','flag']] | |
self.sdata =self.df['audio'].to_list() | |
return self.get_page_data(self.current_page) | |
def trim_audio(self, text,data_oudio): | |
if text!="" : | |
audios_data = [] | |
audios_samplerate = [] | |
sr,audio=data_oudio | |
audio=audio.astype(np.float32) | |
audio/=np.max(np.abs(audio)) | |
audio=librosa.resample(audio,orig_sr=sr,target_sr=16000) | |
audios_data.append(audio) | |
secs=round(len(audios_data)/16000,2) | |
audios_samplerate.append(16000) | |
df = pd.DataFrame() | |
df['secs'] = secs | |
df['audio'] =[ audio] | |
df['samplerate'] = 16000 | |
df['text'] =text | |
df['speaker_id'] =self.speaker_id | |
df['_speaker_id'] =self.speaker_id | |
df['flag']=1 | |
df = df[['text','audio','samplerate','secs','speaker_id','_speaker_id','flag']] | |
self.df = pd.concat([self.df, df], axis=0, ignore_index=True) | |
self.data =self.df[['text','speaker_id','secs','flag']] | |
self.sdata =self.df['audio'].to_list() | |
return self.get_page_data(self.current_page),None,"" | |
def order_data(self): | |
self.df[['text','speaker_id','secs','flag']]=self.data | |
self.df=self.df.sort_values(by=['flag'], ascending=False) | |
vv=self.settt(self.df) | |
return vv | |
def connect_drive(self): | |
from google.colab import drive | |
drive.mount('/content/drive') | |
def get_page_data(self, page_number): | |
start_index = page_number * 10 | |
end_index = start_index + 10 | |
return self.data.iloc[start_index:end_index] | |
def update_page(self, new_page): | |
self.current_page = new_page | |
return ( | |
self.get_page_data(self.current_page), | |
self.current_page > 0, | |
self.current_page < len(self.data) // 10 - 1, | |
self.current_page | |
) | |
def clear_txt(self): | |
self.data['text'] =self.data['text'].apply(self.remove_hamza_from_alif_and_symbols) | |
return self.get_page_data(self.current_page) | |
def get_text_from_audio(self,audio): | |
if len(audio)!=0: | |
sf.write("temp.wav", audio, 16000,format='WAV') | |
client = Client("MohamedRashad/Arabic-Whisper-CodeSwitching-Edition") | |
result = client.predict( | |
inputs=handle_file('temp.wav'), | |
api_name="/predict_1" | |
) | |
return result | |
else: | |
return "" | |
def on_column_dropdown_change_operater(self,selected_column,selected_column1): | |
if selected_column1==">": | |
return self.data[self.data['secs'] > selected_column ] | |
elif selected_column1=="<": | |
return self.data[self.data['secs'] < selected_column] | |
elif selected_column1=="=": | |
return self.data[self.data['secs'] == selected_column] | |
else: | |
return self.data | |
# Perform actions based on the selected column | |
def on_column_dropdown_change(self,selected_column): | |
data=self.df | |
if selected_column=="all": | |
return self.set1(data),len(data) | |
elif selected_column=="0": | |
data=data[data['flag'] ==0] | |
return self.set1(data),len(data) | |
else : | |
data=data[data['flag'] ==1] | |
return self.set1(data),len(data) | |
def on_select(self,evt:gr.SelectData): | |
index_now = evt.index[0] | |
self.current_selected = (self.current_page * 10) + index_now | |
row = self.data.iloc[self.current_selected] | |
row_audio = self.sdata[self.current_selected] | |
self.speaker_id=row['speaker_id'] | |
return (16000, row_audio), row['text'] | |
def finsh_data(self): | |
self.df['audio'] = self.sdata | |
self.df[['text','speaker_id','secs','flag']]=self.data | |
return self.df | |
def All_enhance(self): | |
for i in range(0,len(self.sdata)): | |
_,y=remove_nn(self.sdata[i]) | |
self.sdata[i]=y | |
return self.data | |
return self.get_page_data(self.current_page) | |
def get_output_audio(self): | |
return self.sdata[self.current_selected] if self.current_selected >= 0 else None | |
def Convert_DataFreme_To_DataSet(self,namedata): | |
df=self.df | |
df['audio'] = df['audio'].apply(lambda x: np.array(x, dtype=np.float32)) | |
if "__index_level_0__" in df.columns: | |
df =df.drop(columns=["__index_level_0__"]) | |
train_df =df | |
ds = { | |
"train": Dataset.from_pandas(train_df) | |
} | |
dataset = DatasetDict(ds) | |
dataset.push_to_hub(namedata,token=os.environ.get("auth_acess_data"),private=True) | |
return namedata | |
def delete_row(self): | |
if len(self.data)!=0 or self.current_selected != -1 : | |
self.data.drop(self.current_selected, inplace=True) | |
self.data.reset_index(drop=True, inplace=True) | |
self.df.drop(self.current_selected, inplace=True) | |
self.df.reset_index(drop=True, inplace=True) | |
self.sdata.pop(self.current_selected) | |
self.current_selected = -1 | |
# self.audio_player.update(None) # Clear audio player | |
# self.txt_audio.update("") # Clear text input | |
return self.get_page_data(self.current_page),None,"" | |
def login(self, token): | |
# Your actual login logic here (e.g., database check) | |
if token == os.environ.get("token_login") : | |
return gr.update(visible=False),gr.update(visible=True),True | |
else: | |
return gr.update(visible=True), gr.update(visible=False),None | |
def load_demo(self,sesion): | |
if sesion: | |
return gr.update(visible=False),gr.update(visible=True) | |
return gr.update(visible=True), gr.update(visible=False) | |
def start_tab1(self): | |
with gr.Blocks(theme=self.seafoam, css=""" | |
table.svelte-82jkx.svelte-82jkx{ | |
font-size: x-small; | |
} | |
.checkbox-group label { | |
background-color: #f0f0f5; /* لون خلفية فاتح */ | |
padding: 10px; | |
border-radius: 5px; /* زوايا دائرية */ | |
} | |
const textbox = document.querySelector('.txt_audio'); // تحديد المكون النصي | |
textbox.style.direction = 'ltr'; | |
.checkbox-group input:checked + label { | |
background-color: #e0f0ff; /* لون خلفية عند التحديد */ | |
font-weight: bold; | |
} | |
""") as demo: | |
sesion_state = gr.State() | |
with gr.Column(scale=1, min_width=200,visible=True) as login_panal: # Login panel | |
gr.Markdown("## auth acess page") | |
token_login = gr.Textbox(label="token") | |
login_button = gr.Button("Login") | |
with gr.Column(scale=1, visible=False) as main_panel: | |
with gr.Row(equal_height=False): | |
with gr.Tabs(): | |
with gr.TabItem("Processing Data "): | |
self.data_Processing() | |
login_button.click(self.login, inputs=[token_login], outputs=[login_panal,main_panel,sesion_state]) | |
demo.load(self.load_demo, [sesion_state], [login_panal,main_panel]) | |
return demo | |
def create_Tabs(self): # fix: method was missing | |
#with gr.Blocks() as interface: | |
with gr.Tabs(): | |
with gr.TabItem("Excel"): | |
with gr.Row(): | |
txt_filepath_excel=gr.Text("NameFile") | |
txt_text_excel=gr.Text("Text" ) | |
but_send_excel=gr.Button("Send",size="sm") | |
with gr.TabItem("CVC"): | |
with gr.Row(): | |
txt_filepath_cvc=gr.Text("File") | |
txt_text_cvc=gr.Text("Text" ) | |
but_send_cvc=gr.Button("Send",size="sm") | |
with gr.TabItem("DateSet"): | |
self.txt_filepath_dir=gr.Text(placeholder="link dir",interactive=True) | |
#self.txt_text=gr.Text("Text" ) | |
self.but_send_dir=gr.Button("Send",size="sm") | |
with gr.TabItem("Dir"): | |
txt_filepath_dateSet=gr.Text("link DateSet") | |
#self.txt_text=gr.Text("Text" ) | |
but_send_dateSet=gr.Button("Send",size="sm") | |
with gr.TabItem("Cut Video"): | |
self.txt_filepath_dateSet=gr.Text("رابط الفيديو",interactive=True) | |
self.num = gr.Number(label=" ادخل رقم طبيعي") | |
self.but_send_dateSet_cut=gr.Button("Send",size="sm") | |
def Convert_DataFrame_to_Bitch(self): | |
with gr.Row(): | |
self.txt_output_dir=gr.Text("output Name dir",interactive=True) | |
self.txt_train_batch_size=gr.Text("train_batch_size",interactive=True) | |
self.txt_eval_batch_size=gr.Text("eval_batch_size",interactive=True ) | |
self.but_convert_bitch=gr.Button("Convert Bitch",size="sm") | |
with gr.Row(): | |
self.label_Bitch=gr.Label("Dir Output Bitch :") | |
def data_Processing(self): | |
#with gr.Column(scale=2,min_width=40): | |
#with gr.Row(): | |
#with gr.Accordion("Open Data", open=False): | |
#with gr.Row(): | |
# self.txt_filepath_dateSet=gr.Text("link DateSet",interactive=True) | |
#self.txt_text=gr.Text("Text" ) | |
#self.but_send_dateSet=gr.Button("Send",size="sm") | |
with gr.Accordion("Install Data", open=False): | |
with gr.Row(): | |
self.create_Tabs() | |
with gr.Row(): | |
columns = [] | |
columns1 = [] | |
columns =["all","0","1"] | |
columns.append("all") | |
self.labell=gr.Label("count:") | |
self.column_dropdown = gr.Dropdown(choices=columns, label="speaker_id") | |
with gr.Row(): | |
columns1=unique_speaker_ids =self.df['secs'].unique().tolist() | |
columns1.append("all") | |
self.column_dropdown1 = gr.Dropdown(choices=columns1 , label="secs") | |
self.column_dropdown11 = gr.Dropdown(choices=["all","<",">","="], label="operater") | |
with gr.Row(): | |
with gr.Column(scale=5): | |
gr.Markdown("## Data Viewer") | |
#d=self.get_page_data(self.current_page) | |
# Correct the indentation here: | |
self.data_table = gr.DataFrame( # Notice 'self.' here | |
value=self.get_page_data(self.current_page), | |
headers=["Text","speaker_id"]) | |
# interactive=True | |
#self.data_table1 = gr.DataFrame(headers=[ "Text","Id_spiker"]) | |
with gr.Row(equal_height=False): | |
self.prev_button = gr.Button("<",scale=1, size="sm",min_width=30) | |
self.page_number = gr.Number(value=self.current_page + 1, label="Page",scale=1,min_width=100) | |
self.next_button = gr.Button(">",scale=1, size="sm",min_width=30) | |
with gr.Row(equal_height=False): | |
#inputs=gr.CheckboxGroup(["John", "Mary", "Peter", "Susan"]) | |
self.but_cleartxt=gr.Button("clear Text",variant="primary",size="sm",min_width=30) | |
self.btn_all_enhance=gr.Button("All enhance",size="sm",variant="primary",min_width=30) | |
self.btn_ClearEnglish=gr.Button("ClearEnglish",size="sm",variant="primary",min_width=30) | |
with gr.Column(scale=4): | |
gr.Markdown("## Row Data") | |
self.txt_audio = gr.Textbox(label="Text", interactive=True,rtl=True) | |
with gr.Row(equal_height=False): | |
self.audio_player = gr.Audio(label="Audio") | |
with gr.Row(equal_height=False): | |
self.btn_del = gr.Button("Delete ", size="sm",variant="primary",min_width=50) | |
self.btn_save = gr.Button("Save", size="sm",variant="primary",min_width=50) | |
self.totext=gr.Button("to text",size="sm" ,variant="primary",min_width=50) | |
# with gr.Row(equal_height=False): | |
with gr.Row(equal_height=False): | |
self.btn_newsave=gr.Button("New Save Cut",size="sm",variant="primary",min_width=50) | |
self.btn_enhance = gr.Button("enhance ", size="sm",variant="primary",min_width=50) | |
self.order= gr.Button("order ", size="sm",variant="primary",min_width=50) | |
with gr.Row(equal_height=False,variant="heading-1"): | |
with gr.Accordion("Save Bitch", open=False): | |
self.txt_dataset=gr.Text("save dataset",interactive=True) | |
self.btn_convertDataset=gr.Button("Dir Output Bitch :",variant="primary") | |
self.label_dataset=gr.Label("count:") | |
self.order.click(self.order_data,[],[self.data_table]) | |
self.btn_ClearEnglish.click(self.clearenglish,[],[self.data_table]) | |
self.but_send_dir.click(self.getdataset, [self.txt_filepath_dir],[self.data_table,self.labell,self.txt_dataset]) | |
#self.but_send_dateSet_cut.click(self.splitt, [self.txt_filepath_dateSet,self.num],[self.data_table,self.labell]) | |
#self.txt_audio.Style(container=False, css=".txt_audio { direction: rtl; }") | |
#self.but_send_dateSet.click(self.Read_DataSet, [self.txt_filepath_dateSet],[self.data_table ]) | |
self.data_table.select(self.on_select, None, [self.audio_player, self.txt_audio]) | |
self.prev_button.click(lambda page: self.update_page(page - 1), [self.page_number], [self.data_table, self.prev_button, self.next_button, self.page_number]) | |
#self.btn_save.click(self.save_row, [self.txt_audio,self.audio_player], [self.data_table]) | |
self.next_button.click(lambda page: self.update_page(page + 1), [self.page_number], [self.data_table, self.prev_button, self.next_button, self.page_number]) | |
self.column_dropdown.change(self.on_column_dropdown_change,[self.column_dropdown], [self.data_table,self.labell]) | |
self.column_dropdown11.change(self.on_column_dropdown_change_operater,[self.column_dropdown1,self.column_dropdown11], [self.data_table]) | |
self.btn_convertDataset.click(self.Convert_DataFreme_To_DataSet,[self.txt_dataset],[self.label_dataset]) | |
self.totext.click(lambda:self.get_text_from_audio(self.get_output_audio()), [], self.txt_audio) | |
self.btn_newsave.click(self.trim_audio,[self.txt_audio,self.audio_player],[self.data_table,self.audio_player,self.txt_audio]) | |
self.btn_save.click(self.save_row, [self.txt_audio,self.audio_player], [self.data_table,self.audio_player,self.txt_audio]) | |
#self.btn_save.click(self.save_row, [self.txt_audio,self.audio_player], [self.data_table]) | |
self.btn_all_enhance.click(self.All_enhance,[],[self.data_table]) | |
#self.btn_enhance.click(remove_nn, [self.audio_player], [self.audio_player]) | |
self.but_cleartxt.click(self.clear_txt,[],[self.data_table]) | |
self.btn_del.click(self.delete_row,[], [self.data_table,self.audio_player,self.txt_audio]) | |
self.btn_enhance.click(lambda: remove_nn(self.get_output_audio()), [], self.audio_player) | |
#self.column_dropdown.change(lambda selected_column:self.settt(self.on_column_dropdown_change(selected_column)), [self.column_dropdown], [self.data_table]) | |
#self.column_dropdown.change(lambda selected_column:self.settt(x.on_column_dropdown_change(selected_column)), [x.column_dropdown], [self.data_table]) | |
#self.btn_denoise.click(self.remove_nn, [self.audio_player], [self.audio_player]) | |
dff=pd.DataFrame(columns=['text', 'audio', 'samplerate', 'secs', 'speaker_id', '_speaker_id','flag']) | |
app=DataViewerApp(dff) | |
s=app.start_tab1() | |
s.launch() |