atereoyinn
removed timeout
f80f102
import gradio as gr
from faster_whisper import WhisperModel
from pydantic import BaseModel, Field, AliasChoices, field_validator, ValidationError
from typing import List
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import csv
import json
import tempfile
import torch
import os
# Set environment variables for gradio
os.environ["COMMANDLINE_ARGS"] = "--no-gradio-queue"
# Initiate checkpoints for model loading
numind_checkpoint = "numind/NuExtract-tiny"
llama_checkpoint = "Atereoyin/Llama3_finetuned_for_medical_entity_extraction"
whisper_checkpoint = "base"
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
)
# Load models with the correct device
whisper_model = WhisperModel(whisper_checkpoint, device="cuda")
numind_model = AutoModelForCausalLM.from_pretrained(numind_checkpoint, quantization_config=quantization_config, torch_dtype=torch.float16, trust_remote_code=True)
numind_tokenizer = AutoTokenizer.from_pretrained(numind_checkpoint)
llama_model = AutoModelForCausalLM.from_pretrained(llama_checkpoint, quantization_config=quantization_config, trust_remote_code=True)
llama_tokenizer = AutoTokenizer.from_pretrained(llama_checkpoint)
# Function to transcribe audio
def transcribe_audio(audio_file_path):
try:
segments, info = whisper_model.transcribe(audio_file_path, beam_size=5)
text = "".join([segment.text for segment in segments])
return text
except Exception as e:
return str(e)
# Functions for Person entity extraction
def predict_NuExtract(model, tokenizer, text, schema, example=["","",""]):
schema = json.dumps(json.loads(schema), indent=4)
input_llm = "<|input|>\n### Template:\n" + schema + "\n"
for i in example:
if i != "":
input_llm += "### Example:\n"+ json.dumps(json.loads(i), indent=4)+"\n"
input_llm += "### Text:\n"+text +"\n<|output|>\n"
input_ids = tokenizer(input_llm, return_tensors="pt", truncation=True, max_length=4000).to("cuda")
output = tokenizer.decode(model.generate(**input_ids)[0], skip_special_tokens=True)
return output.split("<|output|>")[1].split("<|end-output|>")[0]
#Function for generating promtps for Llama
def prompt_format(text):
prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.
### Instruction:
{}
### Input:
{}
### Response:
{}"""
instruction = """Extract the following entities from the medical conversation:
* **Symptoms:** List all the symptoms the patient mentions.
* **Diagnosis:** List the doctor's diagnosis or potential diagnoses.
* **Medical History:** Summarize the patient's relevant medical history.
* **Action Plan:** List the recommended actions or treatment plan.
Provide the result in the following JSON format:
{
"Symptoms": [...],
"Diagnosis": [...],
"Medical history": [...],
"Action plan": [...]
}"""
full_prompt = prompt.format(instruction, text, "")
return full_prompt
#Pydantic Validator to validate Llama's response
def validate_medical_record(response):
class MedicalRecord(BaseModel):
Symptoms: List[str] = Field(default_factory=list)
Diagnosis: List[str] = Field(default_factory=list)
Medical_history: List[str] = Field(
default_factory=list,
validation_alias=AliasChoices('Medical history', 'History of Patient')
)
Action_plan: List[str] = Field(
default_factory=list,
validation_alias=AliasChoices('Action plan', 'Plan of Action')
)
@field_validator('*', mode='before')
def ensure_list(cls, v):
if isinstance(v, str):
return [item.strip() for item in v.split(',')]
return v
try:
validated_data = MedicalRecord(**response)
return validated_data.dict()
except ValidationError as e:
return response
# Function to predict medical entities using Llama
def predict_Llama(model, tokenizer, text):
inputs = tokenizer(prompt_format(text), return_tensors="pt", truncation=True).to("cuda")
try:
outputs = model.generate(**inputs, max_new_tokens=128, temperature=0.2, use_cache=True)
extracted_entities = tokenizer.decode(outputs[0], skip_special_tokens=True)
response = extracted_entities.split("### Response:", 1)[-1].strip()
response_dict = {k.strip(): v.strip() for k, v in (line.split(': ', 1) for line in response.splitlines() if ': ' in line)}
validated_response = validate_medical_record(response_dict)
return validated_response
except Exception as e:
print(f"Error during Llama prediction: {str(e)}")
return {}
#Control function that cordinates communication of other functions to map entities to form fields
def process_audio(audio):
if isinstance(audio, str):
with open(audio, 'rb') as f:
audio_bytes = f.read()
else:
audio_bytes = audio
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
temp_audio.write(audio_bytes)
temp_audio.flush()
audio_path = temp_audio.name
transcription = transcribe_audio(audio_path)
person_schema = """{"Name": "","Age": "","Gender": ""}"""
person_entities_raw = predict_NuExtract(numind_model, numind_tokenizer, transcription, person_schema)
try:
person_entities = json.loads(person_entities_raw)
except json.JSONDecodeError as e:
return f"Error in NuExtract response: {str(e)}"
medical_entities = predict_Llama(llama_model, llama_tokenizer, transcription)
return (
person_entities.get("Name", ""),
person_entities.get("Age", ""),
person_entities.get("Gender", ""),
", ".join(medical_entities.get("Symptoms", [])),
", ".join(medical_entities.get("Diagnosis", [])),
", ".join(medical_entities.get("Medical_history", [])),
", ".join(medical_entities.get("Action_plan", []))
)
#Function that allows users to download information
def download_csv(name, age, gender, symptoms, diagnosis, medical_history, action_plan):
csv_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
with open(csv_file.name, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Name", "Age", "Gender", "Symptoms", "Diagnosis", "Medical History", "Plan of Action"])
writer.writerow([name, age, gender, symptoms, diagnosis, medical_history, action_plan])
return csv_file.name
# Gradio interface to create a web-based form for users to input audio and fill the medical diagnostic form
demo = gr.Interface(
fn=process_audio,
inputs=[
gr.Audio(type="filepath")
],
outputs=[
gr.Textbox(label="Name"),
gr.Textbox(label="Age"),
gr.Textbox(label="Gender"),
gr.Textbox(label="Symptoms"),
gr.Textbox(label="Diagnosis"),
gr.Textbox(label="Medical History"),
gr.Textbox(label="Plan of Action"),
],
title="Medical Diagnostic Form Assistant",
description="Upload an audio file or record audio to generate a medical diagnostic form."
)
with demo:
download_button = gr.Button("Download CSV")
download_button.click(
fn=lambda name, age, gender, symptoms, diagnosis, medical_history, action_plan: download_csv(name, age, gender, symptoms, diagnosis, medical_history, action_plan),
inputs=demo.output_components,
outputs=gr.File(label="Download CSV")
)
demo.launch()