import io from fastapi import FastAPI, File, UploadFile import subprocess import os import requests import random import shutil import json # from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline from pydantic import BaseModel from typing import Annotated from fastapi import Form import selenium from selenium import webdriver from selenium.webdriver import ChromeOptions from selenium.webdriver.chrome.service import Service import threading import random import string import time # from selenium.webdriver.firefox.options import Options # options = FirefoxOptions() # options.headless = True # service = Service() # driver = webdriver.Firefox(options= options,service=service) # driver.get("https://yuntian-deng-chatgpt.hf.space/") # driver.get("https://yuntian-deng-chatgpt.hf.space/") class Query(BaseModel): text: str host:str from fastapi import FastAPI, Request, Depends, UploadFile, File from fastapi.exceptions import HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) # cred = credentials.Certificate('key.json') # app1 = firebase_admin.initialize_app(cred) # db = firestore.client() # data_frame = pd.read_csv('data.csv') from selenium.webdriver.common.by import By @app.on_event("startup") async def startup_event(): print("on startup") t = threading.Thread(target=makeqchat) t.start() t = threading.Thread(target=makeqimg) t.start() from queue import Queue chatq = Queue() imgq= Queue() def makeqchat(): while chatq.qsize()<2: print("appending in chat queue") options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') service = Service() driver = webdriver.Chrome(options= options,service=service) driver.get("https://talkai.info/chat/") chatq.put(driver) def makeqimg(): while imgq.qsize()<2: print("appending in img queue") options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') service = Service() driver = webdriver.Chrome(options= options,service=service) driver.get("https://talkai.info/image/") imgq.put(driver) @app.post("/") async def get_answer(request: Request ): data = await request.json() text = data['text'] host= '' temperature=-1 try: temperature= data['temperature'] temperature= float(temperature) temperature= round(temperature,1) except: print("No temperature") # N = 20 # res = ''.join(random.choices(string.ascii_uppercase + # string.digits, k=N)) # res= res+ str(time.time()) id= '' # t = threading.Thread(target=do_ML, args=(id,text,host,0)) # t.start() res= do_ML(id,text,host,0,temperature) dict={"ChatGPT":res} # dict= {"id":id} return JSONResponse(dict) def do_ML(id:str,text:str,host:str, trycount:int,temperature:float): try: starttime=time.time() driver= None while True: try: currtime= time.time() if(currtime>starttime+10): return "Error" driver= chatq.get() # chatq.pop()[0] t = threading.Thread(target=makeqchat) t.start() break; except Exception as error: print("Error in popping ", error ) t = threading.Thread(target=makeqchat) t.start() time.sleep(0.5) if temperature>=0 and temperature<=2: try: print("setting temperature ",temperature) while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" try: setting_button = driver.find_element(By.ID, "openSettings") setting_button.click() break except: time.sleep(0.2) while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" try: input_element = driver.find_element(By.CLASS_NAME,"styled-slider") new_value = temperature driver.execute_script("arguments[0].value = arguments[1]", input_element, new_value) break except: time.sleep(0.2) while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" try: confirm_button = driver.find_element(By.CLASS_NAME, "settingsButtonConfirm") confirm_button.click() break except: time.sleep(0.2) except: print("could not set temperature") while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" try: textarea = driver.find_element(By.CSS_SELECTOR, "textarea") textarea.send_keys(text) button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") button.click() break except: time.sleep(0.2) prev ="" # time.sleep(2) while True: time.sleep(0.2) currtime= time.time() if(currtime>starttime+18.5): return "Requested Could not be proceed" value="" try: messages = driver.find_elements(By.CLASS_NAME, 'messageContain') last_message_contain = messages[len(messages)-2] value = last_message_contain.text value = value[8:len(value)] print(value) if value=="Please, wait...": continue except: continue driver.delete_all_cookies() driver.quit() return value except: print("Error") driver.delete_all_cookies() if trycount>3: return driver.quit() return do_ML(id,text,host,trycount+1) @app.post("/image") async def get_answer(q: Query ): text = q.text host= q.host # N = 20 # res = ''.join(random.choices(string.ascii_uppercase + # string.digits, k=N)) # res= res+ str(time.time()) id= '' # t = threading.Thread(target=do_ML2, args=(id,text,host,0)) # t.start() url = do_ML2(id,text,host,0) dict= {"url":url} # dict= {"id":id} return JSONResponse(dict) def do_ML2(id:str,text:str,host:str, trycount:int): try: starttime=time.time() driver= None while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" try: currtime= time.time() if(currtime>starttime+10): return "Error" driver= imgq.get() # chatq.pop()[0] t = threading.Thread(target=makeqimg) t.start() break except Exception as error: print("Error in popping ", error ) t = threading.Thread(target=makeqimg) t.start() time.sleep(0.5) while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" try: textarea = driver.find_element(By.CSS_SELECTOR, "textarea") textarea.send_keys(text) time.sleep(0.1) button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") button.click() break except: time.sleep(0.2) # time.sleep(2) while True: currtime= time.time() if(currtime>starttime+10): return "Requested Could not be proceed" time.sleep(0.2) currtime= time.time() if(currtime>starttime+18.5): return "Request Could not be proceed" try: messages = driver.find_elements(By.XPATH, "//div[@class='messageContain']/p/img") last_message_contain = messages[len(messages)-2] src = last_message_contain.get_attribute("src") print(src) driver.delete_all_cookies() driver.quit() return src break except: continue except: print("Error") driver.delete_all_cookies() if trycount>1: return "Request Could not be proceed" driver.quit() return do_ML2(id,text,host,trycount+1)