|
import io |
|
from fastapi import FastAPI, File, UploadFile |
|
|
|
import subprocess |
|
import os |
|
import requests |
|
import random |
|
|
|
import shutil |
|
import json |
|
|
|
from pydantic import BaseModel |
|
from typing import Annotated |
|
|
|
from fastapi import Form |
|
|
|
|
|
import selenium |
|
|
|
from selenium import webdriver |
|
from selenium.webdriver import ChromeOptions |
|
from selenium.webdriver.chrome.service import Service |
|
import threading |
|
import random |
|
import string |
|
import time |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Query(BaseModel): |
|
text: str |
|
host:str |
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, Request, Depends, UploadFile, File |
|
from fastapi.exceptions import HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
app = FastAPI() |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=['*'], |
|
allow_credentials=True, |
|
allow_methods=['*'], |
|
allow_headers=['*'], |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from selenium.webdriver.common.by import By |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
print("on startup") |
|
t = threading.Thread(target=makeqchat) |
|
t.start() |
|
t = threading.Thread(target=makeqimg) |
|
t.start() |
|
|
|
from queue import Queue |
|
chatq = Queue() |
|
imgq= Queue() |
|
|
|
|
|
def makeqchat(): |
|
|
|
while chatq.qsize()<2: |
|
print("appending in chat queue") |
|
options = ChromeOptions() |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('-headless') |
|
service = Service() |
|
driver = webdriver.Chrome(options= options,service=service) |
|
driver.get("https://talkai.info/chat/") |
|
chatq.put(driver) |
|
|
|
|
|
|
|
def makeqimg(): |
|
|
|
while imgq.qsize()<2: |
|
print("appending in img queue") |
|
options = ChromeOptions() |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('-headless') |
|
service = Service() |
|
driver = webdriver.Chrome(options= options,service=service) |
|
driver.get("https://talkai.info/image/") |
|
imgq.put(driver) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/") |
|
async def get_answer(request: Request ): |
|
data = await request.json() |
|
|
|
text = data['text'] |
|
host= '' |
|
|
|
temperature=-1 |
|
|
|
try: |
|
temperature= data['temperature'] |
|
temperature= float(temperature) |
|
temperature= round(temperature,1) |
|
|
|
except: |
|
print("No temperature") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
id= '' |
|
|
|
|
|
|
|
|
|
|
|
res= do_ML(id,text,host,0,temperature) |
|
|
|
|
|
dict={"ChatGPT":res} |
|
|
|
|
|
|
|
return JSONResponse(dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def do_ML(id:str,text:str,host:str, trycount:int,temperature:float): |
|
|
|
try: |
|
starttime=time.time() |
|
driver= None |
|
while True: |
|
try: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Error" |
|
|
|
driver= chatq.get() |
|
|
|
t = threading.Thread(target=makeqchat) |
|
t.start() |
|
break; |
|
except Exception as error: |
|
print("Error in popping ", error ) |
|
t = threading.Thread(target=makeqchat) |
|
t.start() |
|
time.sleep(0.5) |
|
|
|
if temperature>=0 and temperature<=2: |
|
try: |
|
print("setting temperature ",temperature) |
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
|
|
try: |
|
setting_button = driver.find_element(By.ID, "openSettings") |
|
setting_button.click() |
|
break |
|
except: |
|
time.sleep(0.2) |
|
|
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
try: |
|
input_element = driver.find_element(By.CLASS_NAME,"styled-slider") |
|
new_value = temperature |
|
driver.execute_script("arguments[0].value = arguments[1]", input_element, new_value) |
|
break |
|
except: |
|
time.sleep(0.2) |
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
try: |
|
confirm_button = driver.find_element(By.CLASS_NAME, "settingsButtonConfirm") |
|
confirm_button.click() |
|
break |
|
except: |
|
time.sleep(0.2) |
|
except: |
|
print("could not set temperature") |
|
|
|
|
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
try: |
|
textarea = driver.find_element(By.CSS_SELECTOR, "textarea") |
|
textarea.send_keys(text) |
|
|
|
button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") |
|
button.click() |
|
break |
|
except: |
|
time.sleep(0.2) |
|
|
|
|
|
prev ="" |
|
|
|
|
|
while True: |
|
time.sleep(0.2) |
|
currtime= time.time() |
|
if(currtime>starttime+18.5): |
|
|
|
return "Requested Could not be proceed" |
|
|
|
value="" |
|
try: |
|
messages = driver.find_elements(By.CLASS_NAME, 'messageContain') |
|
last_message_contain = messages[len(messages)-2] |
|
value = last_message_contain.text |
|
value = value[8:len(value)] |
|
print(value) |
|
if value=="Please, wait...": |
|
continue |
|
except: |
|
continue |
|
|
|
|
|
|
|
driver.delete_all_cookies() |
|
driver.quit() |
|
return value |
|
|
|
|
|
|
|
except: |
|
print("Error") |
|
driver.delete_all_cookies() |
|
if trycount>3: |
|
|
|
return |
|
driver.quit() |
|
return do_ML(id,text,host,trycount+1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/image") |
|
async def get_answer(q: Query ): |
|
|
|
text = q.text |
|
host= q.host |
|
|
|
|
|
|
|
|
|
|
|
|
|
id= '' |
|
|
|
|
|
|
|
|
|
url = do_ML2(id,text,host,0) |
|
|
|
dict= {"url":url} |
|
|
|
|
|
|
|
|
|
|
|
return JSONResponse(dict) |
|
|
|
|
|
|
|
|
|
|
|
def do_ML2(id:str,text:str,host:str, trycount:int): |
|
|
|
try: |
|
starttime=time.time() |
|
|
|
driver= None |
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
try: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Error" |
|
|
|
driver= imgq.get() |
|
|
|
t = threading.Thread(target=makeqimg) |
|
t.start() |
|
break |
|
except Exception as error: |
|
print("Error in popping ", error ) |
|
t = threading.Thread(target=makeqimg) |
|
t.start() |
|
time.sleep(0.5) |
|
|
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
try: |
|
textarea = driver.find_element(By.CSS_SELECTOR, "textarea") |
|
textarea.send_keys(text) |
|
time.sleep(0.1) |
|
button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") |
|
button.click() |
|
break |
|
except: |
|
time.sleep(0.2) |
|
|
|
|
|
while True: |
|
currtime= time.time() |
|
if(currtime>starttime+10): |
|
return "Requested Could not be proceed" |
|
|
|
time.sleep(0.2) |
|
currtime= time.time() |
|
if(currtime>starttime+18.5): |
|
|
|
return "Request Could not be proceed" |
|
try: |
|
messages = driver.find_elements(By.XPATH, "//div[@class='messageContain']/p/img") |
|
last_message_contain = messages[len(messages)-2] |
|
src = last_message_contain.get_attribute("src") |
|
print(src) |
|
|
|
driver.delete_all_cookies() |
|
driver.quit() |
|
|
|
return src |
|
break |
|
except: |
|
continue |
|
|
|
except: |
|
print("Error") |
|
driver.delete_all_cookies() |
|
if trycount>1: |
|
|
|
return "Request Could not be proceed" |
|
driver.quit() |
|
return do_ML2(id,text,host,trycount+1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|