File size: 9,044 Bytes
f92fc66 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
from huggingface_hub import InferenceClient
import gradio as gr
import json
from datetime import datetime
import os
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.oauth2 import service_account
from googleapiclient.http import MediaFileUpload
from pathlib import Path
import argparse
from huggingface_hub import snapshot_download
#TODO : Cleanup and comment
repo_name = 'TheBloke/OpenHermes-2.5-Mistral-7B-GGUF'
model_file = "openhermes-2.5-mistral-7b.Q4_K_M.gguf"
print('Fetching model:', repo_name, model_file)
snapshot_download(repo_id=repo_name, local_dir=".", allow_patterns=model_file)
print('Done fetching model:')
DEFAULT_MODEL_PATH = model_file
chat_history = []
chat_log_history = []
isFirstRun = True
dir_path = os.path.dirname(os.path.realpath(__file__))
directory = '/home/user/app'
if not os.path.exists(directory):
print("directory dont exist")
else: print("path exists")
file_path = '/home/user/app/test.txt'
with open(file_path, 'w') as file:
# Write "Hello World" to the file
file.write("Hello World")
print("file written")
print(dir_path)
SCOPES = ['https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = '/home/user/app/service_account_creds.json'
#initContext = "You are a hospital patient nameed Barry Lawson. You have an injured arm and are in great pain. You are frustrated because you have been waiting for a long time. Every time I prompt you you get more agressive and violent in your responses. Your responses should use excessive profanity. You will provide only one response to each of my prompts. I am a nurse and you are angry at me. Nurse: "
initContext = """<|im_start|>You are playing the role of an aggressive patient called Barry Lawson. Barry has been waiting for hours at the hospital with severe pain in his chest and hasn't been seen yet. You are incredibly frustrated and angry that things are taking so long, you will only calm down if your concerns are addressed. Waiting will only make you more angry and aggressive towards the staff. At any moment you could break into a RAGE! Make sure you don't censor swear words. Make sure to explain your emotional state and thinking before responding, for example, Barry: (tired of waiting and very angry) What the fuck do I have to do around here to get some treatment!"""
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
folder_id = '1Hjbu7FmuVs0Yz8y_veo6SzY_2tc48OWt'
Name = ""
Occupation = ""
Ethnicity = ""
Gender = ""
Age=""
chat_log_name =""
from llama_cpp import Llama
llm = Llama(model_path=model_file, model_type="mistral")
def get_drive_service():
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES)
service = build('drive', 'v3', credentials=credentials)
print("Google Service Created")
return service
service = get_drive_service()
def search_file():
#Search for a file by name in the specified Google Drive folder.
query = f"name = '{chat_log_name}' and '{folder_id}' in parents and trashed = false"
response = service.files().list(q=query, spaces='drive', fields='files(id, name)').execute()
files = response.get('files', [])
if not files:
print(f"Chat log {chat_log_name} does not exist")
else:
print(f"Chat log {chat_log_name} exist")
return files
def format_prompt(message, history):
global isFirstRun
if not isFirstRun:
print("reg prompt")
prompt = "<s>"
for i, (user_prompt,bot_response) in enumerate(chat_history):
if i == 0:
prompt += f"[INST]{user_prompt}[/INST]"
else:
prompt += f"Nurse : {user_prompt}"
prompt += f" Barry: {bot_response}"
prompt += f"Nurse: {message} Barry:</s>"
else:
prompt = "<s>"
isFirstRun = False
prompt += f"[INST] {message} [/INST] Barry:</s>"
print("init prompt")
return prompt
def strip_special_tokens(text):
# List of special tokens to be removed
special_tokens = ["</s>", "<s>", "[INST]", "[/INST]"]
# Iterate over the list of special tokens and replace each with an empty string
for token in special_tokens:
text = text.replace(token, "")
return text
def upload_to_google_drive():
existing_files = search_file()
print(existing_files)
data = {
"name": Name,
"occupation": Occupation,
"ethnicity": Ethnicity,
"gender": Gender,
"age": Age,
"chat_history": chat_log_history
}
with open(chat_log_name, "w") as log_file:
json.dump(data, log_file, indent=4)
if not existing_files:
# If the file does not exist, upload it
file_metadata = {
'name': chat_log_name,
'parents': [folder_id],'mimeType': 'application/json'
}
media = MediaFileUpload(chat_log_name, mimetype='application/json')
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
print(f"Uploaded new file with ID: {file.get('id')}")
else:
print(f"File '{chat_log_name}' already exists.")
# Example: Update the file content
file_id = existing_files[0]['id']
media = MediaFileUpload(chat_log_name, mimetype='application/json')
updated_file = service.files().update(fileId=file_id, media_body=media).execute()
print(f"Updated existing file with ID: {updated_file.get('id')}")
def generate(prompt, history):
global isFirstRun,initContext,Name,Occupation,Ethnicity,Gender,Age
if not len(Name) == 0 and not len(Occupation) == 0 and not len(Ethnicity) == 0 and not len(Gender) == 0 and not len(Age) == 0:
firstmsg =""
if not isFirstRun:
formatted_prompt = format_prompt(prompt, history)
else:
firstmsg = prompt
initContext += prompt
prompt = initContext
formatted_prompt=format_prompt(initContext,history)
print("init Context added")
print(f"\n THE PROMPT IS,\n {formatted_prompt} \n PROMPT END")
stream = client.text_generation(formatted_prompt, max_new_tokens = 2048,repetition_penalty = 1.4,temperature = 0.8,stream=True, details=True, return_full_text=False )
output = ""
#print(chat_history)
for response in stream:
output += response.token.text
yield output
output = strip_special_tokens(output)
chat_history.append([prompt, output])
if not isFirstRun:
chat_log_history.append({"user": prompt, "bot": output})
upload_to_google_drive()
else:
chat_log_history.append({"user": firstmsg, "bot": output})
return output
else:
output = "Did you forget to enter your Details? Please go to the User Info Tab and Input your data. "
yield output
def predict(input, chatbot, max_length, top_p, temperature, history):
chatbot.append((input, ""))
response = ""
history.append(input)
for output in llm(input, stream=True, temperature=temperature, top_p=top_p, max_tokens=max_length, ):
piece = output['choices'][0]['text']
response += piece
chatbot[-1] = (chatbot[-1][0], response)
yield chatbot, history
history.append(response)
yield chatbot, history
chat_bot=gr.ChatInterface(
fn=generate,
chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"),
title="""AI Chat Bot - ECU IVADE"""
)
def name_interface(name,occupation,ethnicity,gender,age):
global Name, Occupation,Ethnicity,Gender,Age,chat_log_name
Name = name
Occupation = occupation
Ethnicity=ethnicity
Gender=gender
Age=age
if name and occupation and ethnicity and gender and age:
chat_log_name = f'chat_log_for_{Name}_{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.json'
return f"You can start chatting now {Name}"
else:
return "Enter ALL the details to start chatting"
name_interface = gr.Interface(
fn=name_interface,
inputs=[
gr.Textbox(label="Name", placeholder="Enter your name here..."),
gr.Textbox(label="Occupation", placeholder="Enter your occupation here..."),
gr.Textbox(label="Ethnicity", placeholder="Enter your Ethnicity here..."),
gr.Textbox(label="Gender", placeholder="Enter your Gender here..."),
gr.Textbox(label="Age", placeholder="Enter your Age here...")
],outputs="text",
title="ECU-IVADE : User Information",
description="Please enter your name and occupation."
)
tabs = gr.TabbedInterface([name_interface, chat_bot], ["User Info", "Chat Bot"])
if __name__ == "__main__":
tabs.launch(debug=True,share=False,inbrowser=True) |