chat-7 / app.py
rishi1985's picture
Update app.py
4848391
raw
history blame
11.4 kB
import io
from fastapi import FastAPI, File, UploadFile
import subprocess
import os
import requests
import random
import shutil
import json
# from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline
from pydantic import BaseModel
from typing import Annotated
from fastapi import Form
import selenium
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.chrome.service import Service
import threading
import random
import string
import time
# from selenium.webdriver.firefox.options import Options
# options = FirefoxOptions()
# options.headless = True
# service = Service()
# driver = webdriver.Firefox(options= options,service=service)
# driver.get("https://yuntian-deng-chatgpt.hf.space/")
# driver.get("https://yuntian-deng-chatgpt.hf.space/")
class Query(BaseModel):
text: str
host:str
from fastapi import FastAPI, Request, Depends, UploadFile, File
from fastapi.exceptions import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# cred = credentials.Certificate('key.json')
# app1 = firebase_admin.initialize_app(cred)
# db = firestore.client()
# data_frame = pd.read_csv('data.csv')
from selenium.webdriver.common.by import By
from pymongo.mongo_client import MongoClient
@app.on_event("startup")
async def startup_event():
print("on startup")
# t = threading.Thread(target=makeqimg)
# t.start()
mycol = None
@app.post("/url")
async def get_url(request: Request ):
return "k"
# data = await request.json()
# text = data['url']
# mongo_url=text
# print("mongo url ",text)
# global mycol
# if mycol==None:
# myclient = MongoClient(mongo_url)
# try:
# myclient.admin.command('ping')
# print("Pinged your deployment. You successfully connected to MongoDB!")
# except Exception as e:
# print(e)
# mydb = myclient['open-ai-api-keys']
# mycol= mydb['key']
# extract()
def extract():
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
service = Service()
driver = webdriver.Chrome(options= options,service=service)
global mycol
# global driver
if True:
# time.sleep(60)
try:
driver.get("https://talkai.info/chat/")
element = driver.find_element(By.CSS_SELECTOR,".chat")
api_key = element.get_attribute("data-api-key")
dict={"key":"open-ai"}
mycol.delete_one(dict)
dict={"key":"open-ai","value":api_key}
mycol.insert_one(dict)
print(api_key)
driver.delete_all_cookies()
driver.quit()
except Exception as e:
print('error in extract ',e)
pass
# time.sleep(60)
from queue import Queue
chatq = Queue()
imgq= Queue()
def makeqchat():
while chatq.qsize()<2:
print("appending in chat queue")
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
service = Service()
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://talkai.info/chat/")
chatq.put(driver)
def makeqimg():
while imgq.qsize()<2:
print("appending in img queue")
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
service = Service()
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://talkai.info/image/")
imgq.put(driver)
@app.post("/")
async def get_answer(request: Request ):
data = await request.json()
text = data['text']
host= ''
temperature=-1
try:
temperature= data['temperature']
temperature= float(temperature)
temperature= round(temperature,1)
except:
print("No temperature")
# N = 20
# res = ''.join(random.choices(string.ascii_uppercase +
# string.digits, k=N))
# res= res+ str(time.time())
id= ''
# t = threading.Thread(target=do_ML, args=(id,text,host,0))
# t.start()
res= do_ML(id,text,host,0,temperature)
dict={"ChatGPT":res}
# dict= {"id":id}
return JSONResponse(dict)
def do_ML(id:str,text:str,host:str, trycount:int,temperature:float):
try:
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
service = Service()
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://talkai.info/chat/")
if temperature>=0 and temperature<=2:
try:
print("setting temperature ",temperature)
while True:
currtime= time.time()
if(currtime>starttime+10):
return "Requested Could not be proceed"
try:
setting_button = driver.find_element(By.ID, "openSettings")
setting_button.click()
break
except:
time.sleep(0.2)
while True:
currtime= time.time()
if(currtime>starttime+10):
return "Requested Could not be proceed"
try:
input_element = driver.find_element(By.CLASS_NAME,"styled-slider")
new_value = temperature
driver.execute_script("arguments[0].value = arguments[1]", input_element, new_value)
break
except:
time.sleep(0.2)
while True:
currtime= time.time()
if(currtime>starttime+10):
return "Requested Could not be proceed"
try:
confirm_button = driver.find_element(By.CLASS_NAME, "settingsButtonConfirm")
confirm_button.click()
break
except:
time.sleep(0.2)
except:
print("could not set temperature")
while True:
currtime= time.time()
if(currtime>starttime+10):
return "Requested Could not be proceed"
try:
textarea = driver.find_element(By.CSS_SELECTOR, "textarea")
textarea.send_keys(text)
button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton")
button.click()
break
except:
time.sleep(0.2)
prev =""
# time.sleep(2)
while True:
time.sleep(0.2)
# currtime= time.time()
# if(currtime>starttime+18.5):
# return "Requested Could not be proceed"
value=""
try:
messages = driver.find_elements(By.CLASS_NAME, 'messageContain')
last_message_contain = messages[len(messages)-2]
value = last_message_contain.text
value = value[8:len(value)]
print(value)
if value=="Please, wait...":
continue
except:
continue
driver.delete_all_cookies()
driver.quit()
return value
except:
print("Error")
driver.delete_all_cookies()
if trycount>3:
return
driver.quit()
return do_ML(id,text,host,trycount+1)
@app.post("/image")
async def get_answer(q: Query ):
text = q.text
host= q.host
# N = 20
# res = ''.join(random.choices(string.ascii_uppercase +
# string.digits, k=N))
# res= res+ str(time.time())
id= ''
# t = threading.Thread(target=do_ML2, args=(id,text,host,0))
# t.start()
url = do_ML2(id,text,host,0)
dict= {"url":url}
# dict= {"id":id}
return JSONResponse(dict)
def do_ML2(id:str,text:str,host:str, trycount:int):
starttime=time.time()
options = ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('-headless')
service = Service()
driver = webdriver.Chrome(options= options,service=service)
driver.get("https://talkai.info/image/")
try:
while True:
currtime= time.time()
if(currtime>starttime+5):
raise ValueError("h")
# return "Requested Could not be proceed"
try:
textarea = driver.find_element(By.CSS_SELECTOR, "textarea")
textarea.send_keys(text)
time.sleep(0.1)
button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton")
button.click()
break
except:
time.sleep(0.2)
# time.sleep(2)
while True:
currtime= time.time()
if(currtime>starttime+40):
raise ValueError("j")
return "Requested Could not be proceed"
time.sleep(0.2)
# currtime= time.time()
# if(currtime>starttime+18.5):
try:
messages = driver.find_elements(By.CLASS_NAME,"messageContain")
x= messages[len(messages)-2]
# Extract the text from the 'p' element within the 'div' with class 'messageContain'
message_text = x.find_element(By.TAG_NAME,"p").text
if message_text=='The request could not be processed':
return "The request could not be processed"
except Exception as e:
print(e)
pass
# return "Request Could not be proceed"
try:
messages = driver.find_elements(By.XPATH, "//div[@class='messageContain']/p/img")
last_message_contain = messages[len(messages)-2]
src = last_message_contain.get_attribute("src")
print(src)
driver.delete_all_cookies()
driver.quit()
return src
break
except:
continue
except:
print("Error")
driver.delete_all_cookies()
if trycount>2:
return "Request Could not be proceed"
driver.quit()
return do_ML2(id,text,host,trycount+1)