Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
import spaces | |
import torchaudio | |
from whisperspeech.vq_stoks import RQBottleneckTransformer | |
from encodec.utils import convert_audio | |
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline | |
from transformers import StoppingCriteria, StoppingCriteriaList, TextIteratorStreamer | |
from threading import Thread | |
import logging | |
import os | |
from generate_audio import ( | |
TTSProcessor, | |
) | |
import uuid | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
vq_model = RQBottleneckTransformer.load_model( | |
"whisper-vq-stoks-v3-7lang-fixed.model" | |
).to(device) | |
# tts = TTSProcessor('cpu') | |
use_8bit = False | |
llm_path = "homebrewltd/Ichigo-llama3.1-s-instruct-v0.3-phase-3" | |
tokenizer = AutoTokenizer.from_pretrained(llm_path) | |
model_kwargs = {} | |
if use_8bit: | |
model_kwargs["quantization_config"] = BitsAndBytesConfig( | |
load_in_8bit=True, | |
llm_int8_enable_fp32_cpu_offload=False, | |
llm_int8_has_fp16_weight=False, | |
) | |
else: | |
model_kwargs["torch_dtype"] = torch.bfloat16 | |
model = AutoModelForCausalLM.from_pretrained(llm_path, **model_kwargs).to(device) | |
def audio_to_sound_tokens_whisperspeech(audio_path): | |
vq_model.ensure_whisper('cuda') | |
wav, sr = torchaudio.load(audio_path) | |
if sr != 16000: | |
wav = torchaudio.functional.resample(wav, sr, 16000) | |
with torch.no_grad(): | |
codes = vq_model.encode_audio(wav.to(device)) | |
codes = codes[0].cpu().tolist() | |
result = ''.join(f'<|sound_{num:04d}|>' for num in codes) | |
return f'<|sound_start|>{result}<|sound_end|>' | |
def audio_to_sound_tokens_whisperspeech_transcribe(audio_path): | |
vq_model.ensure_whisper('cuda') | |
wav, sr = torchaudio.load(audio_path) | |
if sr != 16000: | |
wav = torchaudio.functional.resample(wav, sr, 16000) | |
with torch.no_grad(): | |
codes = vq_model.encode_audio(wav.to(device)) | |
codes = codes[0].cpu().tolist() | |
result = ''.join(f'<|sound_{num:04d}|>' for num in codes) | |
return f'Transcribe the speech in this audio sample:<|sound_start|>{result}<|sound_end|>' | |
# print(tokenizer.encode("<|sound_0001|>", add_special_tokens=False))# return the audio tensor | |
# print(tokenizer.eos_token) | |
def text_to_audio_file(text): | |
# gen a random id for the audio file | |
id = str(uuid.uuid4()) | |
temp_file = f"./user_audio/{id}_temp_audio.wav" | |
text = text | |
text_split = "_".join(text.lower().split(" ")) | |
# remove the last character if it is a period | |
if text_split[-1] == ".": | |
text_split = text_split[:-1] | |
tts = TTSProcessor("cuda") | |
tts.convert_text_to_audio_file(text, temp_file) | |
# logging.info(f"Saving audio to {temp_file}") | |
# torchaudio.save(temp_file, audio.cpu(), sample_rate=24000) | |
print(f"Saved audio to {temp_file}") | |
return temp_file | |
def process_input(audio_file=None): | |
for partial_message in process_audio(audio_file): | |
yield partial_message | |
def process_transcribe_input(audio_file=None): | |
for partial_message in process_audio(audio_file, transcript=True): | |
yield partial_message | |
class StopOnTokens(StoppingCriteria): | |
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: | |
# encode </s> token | |
stop_ids = [tokenizer.eos_token_id, 128009] # Adjust this based on your model's tokenizer | |
for stop_id in stop_ids: | |
if input_ids[0][-1] == stop_id: | |
return True | |
return False | |
def process_audio(audio_file, transcript=False): | |
if audio_file is None: | |
raise ValueError("No audio file provided") | |
logging.info(f"Audio file received: {audio_file}") | |
logging.info(f"Audio file type: {type(audio_file)}") | |
sound_tokens = audio_to_sound_tokens_whisperspeech_transcribe(audio_file) if transcript else audio_to_sound_tokens_whisperspeech(audio_file) | |
logging.info("Sound tokens generated successfully") | |
# logging.info(f"audio_file: {audio_file.name}") | |
messages = [ | |
{"role": "user", "content": sound_tokens}, | |
] | |
stop = StopOnTokens() | |
input_str = tokenizer.apply_chat_template(messages, tokenize=False) | |
input_ids = tokenizer.encode(input_str, return_tensors="pt") | |
input_ids = input_ids.to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) | |
generation_kwargs = dict( | |
input_ids=input_ids, | |
streamer=streamer, | |
max_new_tokens=1024, | |
do_sample=False, | |
stopping_criteria=StoppingCriteriaList([stop]) | |
) | |
thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
thread.start() | |
partial_message = "" | |
for new_token in streamer: | |
partial_message += new_token | |
if tokenizer.eos_token in partial_message: | |
break | |
partial_message = partial_message.replace("assistant\n\n", "") | |
yield partial_message | |
# def stop_generation(): | |
# # This is a placeholder. Implement actual stopping logic here if needed. | |
# return "Generation stopped.", gr.Button.update(interactive=False) | |
# take all the examples from the examples folder | |
good_examples = [] | |
for file in os.listdir("./examples"): | |
if file.endswith(".wav"): | |
good_examples.append([f"./examples/{file}"]) | |
bad_examples = [] | |
for file in os.listdir("./bad_examples"): | |
if file.endswith(".wav"): | |
bad_examples.append([f"./bad_examples/{file}"]) | |
examples = [] | |
examples.extend(good_examples) | |
examples.extend(bad_examples) | |
with gr.Blocks() as iface: | |
gr.Markdown("# Ichigo-llama3-s: Llama3.1 with listening capabilities") | |
gr.Markdown("Record your voice or upload audio and send it to the model.") | |
gr.Markdown("Powered by [Homebrew Ltd](https://homebrew.ltd/) | [Read our blog post](https://homebrew.ltd/blog/llama3-just-got-ears)") | |
with gr.Row(): | |
input_type = gr.Radio(["text", "audio"], label="Input Type", value="audio") | |
text_input = gr.Textbox(label="Send", visible=False) | |
audio_input = gr.Audio(label="Audio", type="filepath", visible=True) | |
# audio_output = gr.Audio(label="Converted Audio", type="filepath", visible=False) | |
convert_button = gr.Button("Convert to Audio", visible=False) | |
submit_button = gr.Button("Send") | |
transcrip_button = gr.Button("Make Model Transcribe the audio") | |
text_output = gr.Textbox(label="Generated Text") | |
def update_visibility(input_type): | |
return (gr.update(visible=input_type == "text"), | |
gr.update(visible=input_type == "text")) | |
def convert_and_display(text): | |
audio_file = text_to_audio_file(text) | |
return audio_file | |
def process_example(file_path): | |
return update_visibility("audio") | |
input_type.change( | |
update_visibility, | |
inputs=[input_type], | |
outputs=[text_input, convert_button] | |
) | |
convert_button.click( | |
convert_and_display, | |
inputs=[text_input], | |
outputs=[audio_input] | |
) | |
submit_button.click( | |
process_input, | |
inputs=[audio_input], | |
outputs=[text_output] | |
) | |
transcrip_button.click( | |
process_transcribe_input, | |
inputs=[audio_input], | |
outputs=[text_output] | |
) | |
gr.Examples(examples, inputs=[audio_input]) | |
iface.queue() | |
iface.launch() | |
# launch locally | |
# iface.launch(server_name="0.0.0.0") | |