fileuploader / app.py
coollsd's picture
Create app.py
6d7bfa0 verified
raw
history blame
5.15 kB
from fastapi import FastAPI, File, UploadFile, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
import requests
from urllib.parse import urljoin
import time
from typing import Dict
app = FastAPI()
HTML_CONTENT = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>File Upload</title>
</head>
<body>
<h1>Upload File</h1>
<form action="/upload" method="post" enctype="multipart/form-data">
<input type="file" name="file" accept="*/*" required>
<button type="submit">Upload</button>
</form>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML_CONTENT
@app.post("/upload")
async def handle_upload(file: UploadFile = File(...)):
if not file.filename:
return {"error": "No file selected."}, 400
cookies = await get_cookies()
if 'csrftoken' not in cookies or 'sessionid' not in cookies:
return {"error": "Failed to obtain necessary cookies"}, 500
upload_result = await initiate_upload(cookies, file.filename, file.content_type)
if not upload_result or 'upload_url' not in upload_result:
return {"error": "Failed to initiate upload"}, 500
file_content = await file.read()
upload_success = await retry_upload(upload_result['upload_url'], file_content, file.content_type)
if not upload_success:
return {"error": "File upload failed after multiple attempts"}, 500
original_url = upload_result['serving_url']
mirrored_url = f"/rbxg/{original_url.split('/pbxt/')[1]}"
return RedirectResponse(url=mirrored_url, status_code=302)
@app.get("/rbxg/{path:path}")
async def handle_video_stream(path: str, request: Request):
original_url = f'https://replicate.delivery/pbxt/{path}'
range_header = request.headers.get('Range')
headers = {'Range': range_header} if range_header else {}
response = requests.get(original_url, headers=headers, stream=True)
def generate():
for chunk in response.iter_content(chunk_size=8192):
yield chunk
headers = dict(response.headers)
headers['Access-Control-Allow-Origin'] = '*'
if response.status_code == 206:
headers['Content-Range'] = response.headers.get('Content-Range')
return Response(content=generate(), status_code=response.status_code, headers=headers)
async def get_cookies() -> Dict[str, str]:
try:
response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
})
return dict(response.cookies)
except Exception as e:
print(f'Error fetching the page: {e}')
return {}
async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict:
url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}'
try:
response = requests.post(url, cookies=cookies, headers={
'X-CSRFToken': cookies.get('csrftoken'),
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Referer': 'https://replicate.com/levelsio/neon-tokyo',
'Origin': 'https://replicate.com',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'identity',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-GPC': '1',
'Priority': 'u=1, i'
})
print(f'Initiate upload response status: {response.status_code}')
print(f'Initiate upload response headers: {response.headers}')
print(f'Response body: {response.text}')
return response.json()
except Exception as e:
print(f'Error initiating upload: {e}')
raise
async def upload_file(upload_url: str, file_content: bytes, content_type: str) -> bool:
try:
response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type})
print(f'File upload response status: {response.status_code}')
print(f'File upload response headers: {response.headers}')
return response.status_code == 200
except Exception as e:
print(f'Error uploading file: {e}')
return False
async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool:
for attempt in range(1, max_retries + 1):
print(f'Upload attempt {attempt} of {max_retries}')
success = await upload_file(upload_url, file_content, content_type)
if success:
print('Upload successful')
return True
if attempt < max_retries:
print(f'Upload failed, retrying in {delay}s...')
time.sleep(delay)
delay *= 2 # Exponential backoff
print('Upload failed after all retry attempts')
return False