|
|
|
import io |
|
from fastapi import FastAPI, File, UploadFile |
|
|
|
import subprocess |
|
import os |
|
import requests |
|
import random |
|
|
|
import shutil |
|
import json |
|
|
|
from pydantic import BaseModel |
|
from typing import Annotated |
|
|
|
from fastapi import Form |
|
|
|
|
|
import selenium |
|
|
|
from selenium import webdriver |
|
from selenium.webdriver import ChromeOptions |
|
from selenium.webdriver.chrome.service import Service |
|
import threading |
|
import random |
|
import string |
|
import time |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Query(BaseModel): |
|
text: str |
|
host:str |
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, Request, Depends, UploadFile, File |
|
from fastapi.exceptions import HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
app = FastAPI() |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=['*'], |
|
allow_credentials=True, |
|
allow_methods=['*'], |
|
allow_headers=['*'], |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
print("on startup") |
|
|
|
|
|
|
|
from selenium.webdriver.common.by import By |
|
|
|
@app.post("/") |
|
async def get_answer(q: Query ): |
|
|
|
text = q.text |
|
host= q.host |
|
|
|
N = 20 |
|
res = ''.join(random.choices(string.ascii_uppercase + |
|
string.digits, k=N)) |
|
res= res+ str(time.time()) |
|
|
|
id= res |
|
|
|
t = threading.Thread(target=do_ML, args=(id,text,host,0)) |
|
t.start() |
|
|
|
|
|
dict= {"id":id} |
|
|
|
|
|
return JSONResponse(dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def do_ML(id:str,text:str,host:str, trycount:int): |
|
options = ChromeOptions() |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('-headless') |
|
service = Service() |
|
driver = webdriver.Chrome(options= options,service=service) |
|
try: |
|
starttime=time.time() |
|
driver.get("https://talkai.info/chat/") |
|
time.sleep(2+trycount) |
|
textarea = driver.find_element(By.CSS_SELECTOR, "textarea") |
|
textarea.send_keys(text) |
|
time.sleep(0.1) |
|
button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") |
|
button.click() |
|
prev ="" |
|
|
|
|
|
while True: |
|
time.sleep(2+trycount) |
|
currtime= time.time() |
|
if(currtime>starttime+600): |
|
data=data={"id":id,"result":"","status":"Error"} |
|
x= requests.post(host,data= data) |
|
return |
|
|
|
messages = driver.find_elements(By.CLASS_NAME, 'messageContain') |
|
last_message_contain = messages[len(messages)-2] |
|
value = last_message_contain.text |
|
value = value[8:len(value)] |
|
print(value) |
|
if value=="Please, wait...": |
|
continue |
|
|
|
if prev!="": |
|
if value==prev: |
|
data={"id":id,"result":value,"status":"Generated"} |
|
|
|
driver.delete_all_cookies() |
|
driver.quit() |
|
requests.post(host, data=data) |
|
break |
|
prev= value |
|
|
|
data=data={"id":id,"result":value,"status":"Generating --keep refreshing"} |
|
x= requests.post(host,data= data) |
|
print(x.text) |
|
|
|
except: |
|
print("Error") |
|
driver.delete_all_cookies() |
|
if trycount>3: |
|
data=data={"id":id,"result":"","status":"Error"} |
|
x= requests.post(host,data= data) |
|
return |
|
driver.quit() |
|
do_ML(id,text,host,trycount+1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/image") |
|
async def get_answer(q: Query ): |
|
|
|
text = q.text |
|
host= q.host |
|
|
|
N = 20 |
|
res = ''.join(random.choices(string.ascii_uppercase + |
|
string.digits, k=N)) |
|
res= res+ str(time.time()) |
|
|
|
id= res |
|
|
|
t = threading.Thread(target=do_ML2, args=(id,text,host,0)) |
|
t.start() |
|
|
|
|
|
dict= {"id":id} |
|
|
|
|
|
return JSONResponse(dict) |
|
|
|
|
|
|
|
|
|
|
|
def do_ML2(id:str,text:str,host:str, trycount:int): |
|
options = ChromeOptions() |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('-headless') |
|
service = Service() |
|
driver = webdriver.Chrome(options= options,service=service) |
|
try: |
|
starttime=time.time() |
|
driver.get("https://talkai.info/image/") |
|
time.sleep(2+trycount) |
|
textarea = driver.find_element(By.CSS_SELECTOR, "textarea") |
|
textarea.send_keys(text) |
|
time.sleep(0.1) |
|
button = driver.find_element(By.CLASS_NAME, "sectionChatFormButton") |
|
button.click() |
|
|
|
while True: |
|
time.sleep(1+trycount) |
|
currtime= time.time() |
|
if(currtime>starttime+600): |
|
data=data={"id":id,"result":"","status":"Error"} |
|
x= requests.post(host,data= data) |
|
return |
|
try: |
|
messages = driver.find_elements(By.XPATH, "//div[@class='messageContain']/p/img") |
|
last_message_contain = messages[len(messages)-2] |
|
src = last_message_contain.get_attribute("src") |
|
print(src) |
|
data={"id":id,"result":src,"status":"Generated"} |
|
requests.post(host, data=data) |
|
driver.delete_all_cookies() |
|
driver.quit() |
|
break |
|
except: |
|
continue |
|
|
|
except: |
|
print("Error") |
|
driver.delete_all_cookies() |
|
if trycount>3: |
|
data=data={"id":id,"result":"","status":"Error"} |
|
x= requests.post(host,data= data) |
|
return |
|
driver.quit() |
|
do_ML2(id,text,host,trycount+1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|