chat-1 / app.py
rishi1985's picture
Update app.py
bb8cfaa
import io
from fastapi import FastAPI, File, UploadFile
from fastapi import FastAPI, Request, Response, BackgroundTasks
import subprocess
import os
import requests
import random
# import undetected_chromedriver as uc
import shutil
import json
# from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline
from pydantic import BaseModel
from typing import Annotated
from fastapi import Form
import selenium
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.chrome.service import Service
import threading
import random
import string
from selenium.webdriver.common.keys import Keys
import time
# from selenium.webdriver.firefox.options import Options
# options = FirefoxOptions()
# options.headless = True
# service = Service()
# driver = webdriver.Firefox(options= options,service=service)
# driver.get("https://yuntian-deng-chatgpt.hf.space/")
# driver.get("https://yuntian-deng-chatgpt.hf.space/")
class Query(BaseModel):
text: str
host:str
from fastapi import FastAPI, Request, Depends, UploadFile, File
from fastapi.exceptions import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# cred = credentials.Certificate('key.json')
# app1 = firebase_admin.initialize_app(cred)
# db = firestore.client()
# data_frame = pd.read_csv('data.csv')
from selenium.webdriver.common.by import By
from pymongo.mongo_client import MongoClient
@app.on_event("startup")
async def startup_event():
x=requests.get('https://open-ai-ping-eight.vercel.app/proxy')
print("response ", x.text)
print("on startup")
# t = threading.Thread(target=makeqimg)
# t.start()
proxy =None
llama_last_error=0
# LLAMA
chrome_driver_path = '/usr/local/bin/chromedriver-linux64/chromedriver'
@app.post("/llama")
async def get_answer_llama(request: Request ):
data = await request.json()
text = data['text']
# print("recived ",text)
res= ""
res= do_ML_LLAMA70b(text,0)
dict={"LLAMA":res}
return JSONResponse(dict)
def do_ML_LLAMA70b(text:str, trycount:int):
starttime=time.time()
options = ChromeOptions()
global llama_last_error
options.add_argument('--no-sandbox')
options.add_argument('-headless')
options.add_argument("start-maximized")
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://huggingface-projects-llama-2-7b-chat.hf.space")
try:
while True:
currtime= time.time()
if(currtime>starttime+30):
driver.delete_all_cookies()
driver.quit()
return "Requested Could not be proceed"
try:
# span_element = driver.find_element(By.CLASS_NAME, 'svelte-s1r2yt')
# span_element.click()
# input_element = driver.find_elements(By.CSS_SELECTOR, 'input[data-testid="number-input"]')
# new_value = "2000"
# input_element[1].clear()
# input_element[1].send_keys(new_value)
xpath_expression = '//textarea[@data-testid="textbox"]'
textarea_element = driver.find_element(By.XPATH, xpath_expression)
for line in text.split('\n'):
textarea_element.send_keys(line)
textarea_element.send_keys(Keys.SHIFT + Keys.ENTER)
textarea_element.send_keys('\n')
break
except Exception as e:
print(e)
continue
prev =""
time.sleep(2)
while True:
time.sleep(0.5)
currtime= time.time()
if(currtime>starttime+170):
driver.delete_all_cookies()
driver.quit()
return prev
element_selector = '.wrap.default.minimal.svelte-119qaqt.translucent.generating'
try:
element = driver.find_element(By.CSS_SELECTOR,element_selector)
print("generating")
continue
except:
print("Element is not present.")
parent_element = driver.find_elements(By.CLASS_NAME,'message')
extracted_text = parent_element[1].text
element_text= extracted_text
if element_text=="" or element_text=='Loading content ':
pritn("empty text exception")
continue
driver.quit()
return element_text
driver.delete_all_cookies()
driver.quit()
llama_last_error= time.time()
return " --Error Occurred-- "
except Exception as e:
print(e)
driver.delete_all_cookies()
driver.quit()
llama_last_error= time.time()
return "Error Occurred "
########## MPT
@app.post("/mpt")
async def get_answer_mpt(request: Request ):
data = await request.json()
text = data['text']
print("recived ",text)
res= do_ML_MPT(text,0)
dict={"MPT":res}
return JSONResponse(dict)
def do_ML_MPT(text:str, trycount:int):
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
options.add_argument("start-maximized")
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://mosaicml-mpt-30b-chat.hf.space")
try:
while True:
currtime= time.time()
if(currtime>starttime+20):
driver.delete_all_cookies()
driver.quit()
return "Requested Could not be proceed"
try:
textarea_xpath = "//textarea[@data-testid='textbox' and @class='scroll-hide svelte-1pie7s6' and @placeholder='Chat Message Box']"
textarea_element = driver.find_element(By.XPATH,textarea_xpath)
for line in text.split('\n'):
textarea_element.send_keys(line)
# Simulate pressing the 'Shift + Enter' keys to add a newline without triggering submit
textarea_element.send_keys(Keys.SHIFT + Keys.ENTER)
break
except:
continue
while True:
currtime= time.time()
if(currtime>starttime+20):
driver.delete_all_cookies()
driver.quit()
return "Requested Could not be proceed"
try:
button_id = 'component-9' # Replace 'comonent-7' with the actual ID
button_element = driver.find_element(By.ID,button_id)
# Perform actions on the element (e.g., clicking the button)
button_element.click()
break
except Exception as e:
print(e)
time.sleep(0.2)
prev =""
# time.sleep(2)
while True:
time.sleep(0.5)
currtime= time.time()
if(currtime>starttime+120):
driver.delete_all_cookies()
driver.quit()
return "Requested Could not be proceed"
# value=""
try:
element = driver.find_element(By.XPATH,'//div[@data-testid="bot" and contains(@class, "message bot")]')
x=(element.text)
print("From text ",x)
if x=="":
raise ValueError(" k")
driver.quit()
return x
except Exception as e:
print(e)
continue
driver.quit()
return " --Error Occurred-- "
except:
print("Error")
if trycount>1:
driver.delete_all_cookies()
driver.quit()
return "Error"
driver.quit()
return do_ML_MPT(text,trycount+1)
'''Falcon 40 b intruct'''
'''Star Code'''
@app.post("/starcode")
async def get_answer_falcon(request: Request ):
data = await request.json()
text = data['text']
print("recived ",text)
res= do_ML_STARCODE(text,0)
dict={"RESULT":res}
return JSONResponse(dict)
def do_ML_STARCODE(text:str, trycount:int):
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
options.add_argument("start-maximized")
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://huggingfaceh4-starchat-playground.hf.space/")
try:
while True:
currtime= time.time()
if(currtime>starttime+20):
driver.delete_all_cookies()
driver.quit()
return "Requested Could not be proceed"
try:
textarea_element = driver.find_element(By.CSS_SELECTOR,'textarea[placeholder="Enter your message here"]')
for line in text.split('\n'):
textarea_element.send_keys(line)
# Simulate pressing the 'Shift + Enter' keys to add a newline without triggering submit
textarea_element.send_keys(Keys.SHIFT + Keys.ENTER)
time.sleep(0.5)
textarea_element.send_keys('\n')
break
except:
continue
prev =""
# time.sleep(2)
while True:
time.sleep(0.5)
currtime= time.time()
if(currtime>starttime+170):
driver.delete_all_cookies()
driver.quit()
return "Requested Could not be proceed"
try:
div_element = driver.find_element(By.CLASS_NAME,'svelte-j1gjts.generating')
continue
except:
pass
# value=""
try:
div_element = driver.find_element(By.CSS_SELECTOR,'div[data-testid="bot"]')
text_content = div_element.text
print(text_content)
driver.delete_all_cookies()
driver.quit()
return text_content
except Exception as e:
print(e)
continue
driver.quit()
return " --Error Occurred-- "
except:
print("Error")
driver.delete_all_cookies()
driver.quit()
return "Error Occureed"
@app.post("/falcon180b")
async def get_answer_falcon180(request: Request, background_tasks: BackgroundTasks ):
data = await request.json()
text= data['text']
print("recievoed , ", text )
return StreamingResponse(do_falcon180(text), media_type="text/plain")
import asyncio
from fastapi.responses import StreamingResponse
async def do_falcon180(text):
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
options.add_argument("start-maximized")
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(options= options,service=service)
# driver.get("https://tiiuae-falcon-180b-demo.hf.space")
# driver.get("https://ysharma-falcon-180b-demo.hf.space/")
driver.get("https://lunarflu-falcon-180b-demo-duplicate.hf.space")
try:
while True:
try:
curr= time.time()
if curr- starttime>20:
break
xpath_expression = "//span[@class='svelte-s1r2yt' and contains(text(), 'Additional Inputs')]"
element = driver.find_element(By.XPATH, xpath_expression)
element.click()
element = driver.find_elements(By.CSS_SELECTOR, 'input[data-testid="number-input"]')
element[1].clear() # Clear any existing value
element[1].send_keys('1900')
textarea_element = driver.find_element(By.XPATH,'//textarea[@placeholder="Type a message..."]')
for line in text.split('\n'):
textarea_element.send_keys(line)
textarea_element.send_keys(Keys.SHIFT + Keys.ENTER)
textarea_element.send_keys('\n')
break
except Exception as e:
print(e)
continue
old_text=""
while True:
# await asyncio.sleep(0.1)
curr= time.time()
if curr- starttime>180:
driver.quit()
break
done = False
try:
xpath_expression = "//span[contains(@class, 'chatbot')]"
element = driver.find_elements(By.XPATH, xpath_expression)
text = element[1].text
# print(text)
while True:
text = element[1].text
send_text= text[len(old_text):]
old_text= text
yield send_text.encode()
try:
element = driver.find_element(By.CLASS_NAME,"generating")
# print("generating")
except:
# print("generated")
driver.quit()
done= True
break
if done:
break
except Exception as e:
print(e)
continue
except:
driver.quit()
@app.post("/wizlm70b")
async def get_answer_falcon180(request: Request):
data = await request.json()
text= data['text']
print("recievoed , ", text )
return do_wizard70b(text)
def do_wizard70b(text):
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
options.add_argument("start-maximized")
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://chat.lmsys.org/")
while True:
curr= time.time()
if curr- starttime>10:
driver.delete_all_cookies()
driver.quit()
return "Error-- 1"
try:
button = driver.find_element(By.XPATH, "//button[contains(@class, 'svelte-kqij2n') and text()='Single Model']")
button.click()
break
except Exception as e:
print(e)
time.sleep(0.5)
# return driver.page_source
# while True:
# curr= time.time()
# if curr- starttime>50:
# driver.delete_all_cookies()
# driver.quit()
# return "Error-- 2"
# try:
# input_element = driver.find_elements(By.CLASS_NAME,'border-none')
# # Send text to the <input> element
# input_element[2].clear()
# input_element[2].send_keys('wizardlm-70b\n')
# span_element = driver.find_elements(By.XPATH,'//span[text()="Parameters"]')
# span_element[2].click()
# input_element = driver.find_elements(By.XPATH ,'//input[@data-testid="number-input"]')
# input_element[8].clear() # Clear any existing value
# input_element[8].send_keys('1024')
# break
# except Exception as e:
# print(e)
# time.sleep(0.5)
time.sleep(1)
while True:
curr= time.time()
if curr- starttime>60:
driver.delete_all_cookies()
driver.quit()
return "Error-- 3"
try:
textarea = driver.find_elements(By.XPATH,'//textarea[@data-testid="textbox"]') # You can also use other locators like by_id, by_name, by_class_name, etc.
textarea[2].clear()
for line in text.split('\n'):
textarea[2].send_keys(line)
textarea[2].send_keys('\n')
except Exception as e:
print(e)
time.sleep(0.5)
time.sleep(2)
while True:
curr= time.time()
if curr- starttime>178:
driver.delete_all_cookies()
driver.quit()
return "Error-- 3 -- timeout"
try:
element = driver.find_element(By.CSS_SELECTOR, '.generating')
print("geberating")
time.sleep(0.5)
except:
print("Element with 'generating' keyword in class is not present.")
div_element = driver.find_element(By.XPATH,'//div[@data-testid="bot"]')
extracted_text = div_element.text
driver.delete_all_cookies()
driver.quit()
return extracted_text
#####PALM 2
@app.post("/palm2")
async def get_answer_llama(request: Request ):
data = await request.json()
text = data['text']
# print("recived ",text)
res= ""
res= do_ML_PALM(text,0)
dict={"BOT":res}
return JSONResponse(dict)
def do_ML_PALM(text:str, trycount:int):
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
options.add_argument("start-maximized")
service = Service(executable_path=chrome_driver_path)
driver = webdriver.Chrome(options= options,service=service)
driver.get('https://chansung-palm-with-gradio-chat.hf.space')
try:
while True:
curr = time.time()
if curr-starttime>178:
driver.delete_all_cookies()
driver.quit()
return 'Error -1'
try:
xpath = "//textarea[@data-testid='textbox' and @placeholder='Ask anything']"
textarea = driver.find_element(By.XPATH,xpath)
textarea.clear()
for line in text.split('\n'):
textarea.send_keys(line)
textarea.send_keys("\n")
break
except:
continue
time.sleep(1.5)
while True:
if curr-starttime>178:
driver.delete_all_cookies()
driver.quit()
return 'Error -2'
try:
div_element = driver.find_element(By.CSS_SELECTOR,'div.wrap.default.full.svelte-zlszon.generating')
continue
except:
print('fail')
try:
div_element = driver.find_elements(By.CLASS_NAME,'md.svelte-9tftx4.chatbot')
div_element= div_element[1]
# Extract and print the text content of the div element
div_text = div_element.text
if div_text=="":
continue
driver.delete_all_cookies()
driver.quit()
return div_text
except:
time.sleep(0.5)
continue
except Exception as e:
print(e)
driver.delete_all_cookies()
driver.quit()
return "Error 3"