import io from fastapi import FastAPI, File, UploadFile import subprocess import os import requests import random import shutil import json # from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline from pydantic import BaseModel from typing import Annotated from fastapi import Form import selenium from selenium import webdriver from selenium.webdriver import ChromeOptions from selenium.webdriver.chrome.service import Service import threading import random import string import time # from selenium.webdriver.firefox.options import Options # options = FirefoxOptions() # options.headless = True # service = Service() # driver = webdriver.Firefox(options= options,service=service) # driver.get("https://yuntian-deng-chatgpt.hf.space/") # driver.get("https://yuntian-deng-chatgpt.hf.space/") class Query(BaseModel): text: str host:str from fastapi import FastAPI, Request, Depends, UploadFile, File from fastapi.exceptions import HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) # cred = credentials.Certificate('key.json') # app1 = firebase_admin.initialize_app(cred) # db = firestore.client() # data_frame = pd.read_csv('data.csv') @app.on_event("startup") async def startup_event(): print("on startup") from selenium.webdriver.common.by import By @app.post("/") async def get_answer(q: Query ): text = q.text host= q.host N = 20 res = ''.join(random.choices(string.ascii_uppercase + string.digits, k=N)) res= res+ str(time.time()) id= res t = threading.Thread(target=do_ML, args=(id,text,host,0)) t.start() dict= {"id":id} return JSONResponse(dict) def do_ML(id:str,text:str,host:str, trycount:int): options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') service = Service() driver = webdriver.Chrome(options= options,service=service) try: starttime=time.time() driver.get("https://talkai.info/chat/") time.sleep(2+trycount) textarea = driver.find_element(By.CSS_SELECTOR, "textarea") textarea.send_keys(text) time.sleep(0.1) button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") button.click() prev ="" # time.sleep(2) while True: time.sleep(2+trycount) currtime= time.time() if(currtime>starttime+600): data=data={"id":id,"result":"","status":"Error"} x= requests.post(host,data= data) return messages = driver.find_elements(By.CLASS_NAME, 'messageContain') last_message_contain = messages[len(messages)-2] value = last_message_contain.text value = value[8:len(value)] print(value) if value=="Please, wait...": continue if prev!="": if value==prev: data={"id":id,"result":value,"status":"Generated"} driver.delete_all_cookies() driver.quit() requests.post(host, data=data) break prev= value data=data={"id":id,"result":value,"status":"Generating --keep refreshing"} x= requests.post(host,data= data) print(x.text) except: print("Error") driver.delete_all_cookies() if trycount>3: data=data={"id":id,"result":"","status":"Error"} x= requests.post(host,data= data) return driver.quit() do_ML(id,text,host,trycount+1) @app.post("/image") async def get_answer(q: Query ): text = q.text host= q.host N = 20 res = ''.join(random.choices(string.ascii_uppercase + string.digits, k=N)) res= res+ str(time.time()) id= res t = threading.Thread(target=do_ML2, args=(id,text,host,0)) t.start() dict= {"id":id} return JSONResponse(dict) def do_ML2(id:str,text:str,host:str, trycount:int): options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') service = Service() driver = webdriver.Chrome(options= options,service=service) try: starttime=time.time() driver.get("https://talkai.info/image/") time.sleep(2+trycount) textarea = driver.find_element(By.CSS_SELECTOR, "textarea") textarea.send_keys(text) time.sleep(0.1) button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") button.click() # time.sleep(2) while True: time.sleep(1+trycount) currtime= time.time() if(currtime>starttime+600): data=data={"id":id,"result":"","status":"Error"} x= requests.post(host,data= data) return try: messages = driver.find_elements(By.XPATH, "//div[@class='messageContain']/p/img") last_message_contain = messages[len(messages)-2] src = last_message_contain.get_attribute("src") print(src) data={"id":id,"result":src,"status":"Generated"} requests.post(host, data=data) driver.delete_all_cookies() driver.quit() break except: continue except: print("Error") driver.delete_all_cookies() if trycount>3: data=data={"id":id,"result":"","status":"Error"} x= requests.post(host,data= data) return driver.quit() do_ML2(id,text,host,trycount+1)