from fastapi import FastAPI, File, UploadFile, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
import requests
from urllib.parse import urljoin
import time
from typing import Dict
app = FastAPI()
HTML_CONTENT = """
File Upload
Upload File
"""
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML_CONTENT
@app.post("/upload")
async def handle_upload(file: UploadFile = File(...)):
if not file.filename:
return {"error": "No file selected."}, 400
cookies = await get_cookies()
if 'csrftoken' not in cookies or 'sessionid' not in cookies:
return {"error": "Failed to obtain necessary cookies"}, 500
upload_result = await initiate_upload(cookies, file.filename, file.content_type)
if not upload_result or 'upload_url' not in upload_result:
return {"error": "Failed to initiate upload"}, 500
file_content = await file.read()
upload_success = await retry_upload(upload_result['upload_url'], file_content, file.content_type)
if not upload_success:
return {"error": "File upload failed after multiple attempts"}, 500
original_url = upload_result['serving_url']
mirrored_url = f"/rbxg/{original_url.split('/pbxt/')[1]}"
return RedirectResponse(url=mirrored_url, status_code=302)
@app.get("/rbxg/{path:path}")
async def handle_video_stream(path: str, request: Request):
original_url = f'https://replicate.delivery/pbxt/{path}'
range_header = request.headers.get('Range')
headers = {'Range': range_header} if range_header else {}
response = requests.get(original_url, headers=headers, stream=True)
def generate():
for chunk in response.iter_content(chunk_size=8192):
yield chunk
headers = dict(response.headers)
headers['Access-Control-Allow-Origin'] = '*'
if response.status_code == 206:
headers['Content-Range'] = response.headers.get('Content-Range')
return Response(content=generate(), status_code=response.status_code, headers=headers)
async def get_cookies() -> Dict[str, str]:
try:
response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
})
return dict(response.cookies)
except Exception as e:
print(f'Error fetching the page: {e}')
return {}
async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict:
url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}'
try:
response = requests.post(url, cookies=cookies, headers={
'X-CSRFToken': cookies.get('csrftoken'),
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Referer': 'https://replicate.com/levelsio/neon-tokyo',
'Origin': 'https://replicate.com',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'identity',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-GPC': '1',
'Priority': 'u=1, i'
})
print(f'Initiate upload response status: {response.status_code}')
print(f'Initiate upload response headers: {response.headers}')
print(f'Response body: {response.text}')
return response.json()
except Exception as e:
print(f'Error initiating upload: {e}')
raise
async def upload_file(upload_url: str, file_content: bytes, content_type: str) -> bool:
try:
response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type})
print(f'File upload response status: {response.status_code}')
print(f'File upload response headers: {response.headers}')
return response.status_code == 200
except Exception as e:
print(f'Error uploading file: {e}')
return False
async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool:
for attempt in range(1, max_retries + 1):
print(f'Upload attempt {attempt} of {max_retries}')
success = await upload_file(upload_url, file_content, content_type)
if success:
print('Upload successful')
return True
if attempt < max_retries:
print(f'Upload failed, retrying in {delay}s...')
time.sleep(delay)
delay *= 2 # Exponential backoff
print('Upload failed after all retry attempts')
return False