Spaces:
Running
Running
import traceback | |
import gradio as gr | |
import requests | |
import time | |
import os | |
import re | |
from typing import Dict, Tuple, Optional, Union | |
# Read from environment variables with default values if not set | |
LETTERCAST_API_BASE = os.environ.get("LETTERCAST_API_BASE", "https://app.lettercast.ai/v1") | |
WAIT_TIME_S = 20 | |
DEFAULT_API_TOKEN = os.environ.get("LETTERCAST_DEFAULT_API_TOKEN") # Replace with your default token | |
def is_valid_url(url: str) -> bool: | |
url_pattern = re.compile( | |
r'^https?://' # http:// or https:// | |
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... | |
r'localhost|' # localhost... | |
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip | |
r'(?::\d+)?' # optional port | |
r'(?:/?|[/?]\S+)$', re.IGNORECASE) | |
return url_pattern.match(url) is not None | |
def create_pod(url: Optional[str], file: Optional[str], | |
html_content: Optional[str], detail_level: str, create_video: bool, | |
progress: gr.Progress = gr.Progress()) -> Tuple[Dict, str, Optional[str], Optional[str]]: | |
# Validate that only one input method is provided | |
input_count = sum(bool(x) for x in (url, file, html_content)) | |
if input_count != 1: | |
return {"error": "Please provide exactly one of: URL, PDF file, or HTML content."}, "Error", None, None | |
if url and not is_valid_url(url): | |
return {"error": "Invalid URL format. Please enter a valid URL."}, "Error", None, None | |
token = DEFAULT_API_TOKEN | |
headers = {"Authorization": f"Bearer {token}"} | |
data = { | |
"detail_level": detail_level, | |
"create_video": create_video, | |
} | |
if url: | |
data["url"] = url | |
elif html_content: | |
data["html_content"] = html_content | |
endpoint_url = f"{LETTERCAST_API_BASE}/pods" | |
try: | |
if file is not None: | |
with open(file, "rb") as f: | |
files = {"file": (os.path.basename(file), f, "application/pdf")} | |
response = requests.post(endpoint_url, headers=headers, data=data, files=files) | |
else: | |
headers["Content-Type"] = "application/json" | |
response = requests.post(endpoint_url, headers=headers, json=data) | |
if response.status_code == 200: | |
resp_data = response.json() | |
pod_id = resp_data.get("pod_id") | |
if pod_id: | |
# Trigger the status check immediately if we have a valid pod_id | |
status, _, audio_file, video_file = check_pod_status(pod_id, progress) | |
return resp_data, status, audio_file, video_file | |
else: | |
return resp_data, "Pod created, but no pod_id received.", None, None | |
else: | |
error_message = f"Error {response.status_code}: {response.text}" | |
return {"error": error_message}, "Pod creation failed", None, None | |
except Exception as e: | |
error_message = f"Error creating pod: {str(e)}" | |
return {"error": error_message}, "Error", None, None | |
def check_pod_status(pod_id: str, | |
progress: gr.Progress = gr.Progress()) -> Tuple[str, Optional[str], Optional[str], Optional[str]]: | |
token = DEFAULT_API_TOKEN | |
headers = {"Authorization": f"Bearer {token}"} | |
max_retries = 30 # Max wait time of 10 minutes | |
for i in range(max_retries): | |
time.sleep(WAIT_TIME_S) | |
progress(i / max_retries, desc="Checking pod status...") | |
try: | |
response = requests.get(f"{LETTERCAST_API_BASE}/pods/{pod_id}", headers=headers) | |
if response.status_code == 200: | |
pod = response.json() | |
if pod['generation_status'] == 'done': | |
audio_file = download_media(pod_id, 'audio') | |
video_file = download_media(pod_id, 'video') | |
return "Pod is ready!", pod_id, audio_file, video_file | |
elif pod['generation_status'] == 'failed': | |
return "Pod creation failed.", None, None, None | |
else: | |
continue | |
except Exception as e: | |
return f"Error checking pod status: {str(e)}", None, None, None | |
return "Pod generation timed out.", None, None, None | |
def download_media(pod_id: str, media_type: str) -> Optional[str]: | |
token = DEFAULT_API_TOKEN | |
headers = {"Authorization": f"Bearer {token}"} | |
url = f"{LETTERCAST_API_BASE}/pods/{pod_id}/{media_type}" | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
file_extension = 'mp3' if media_type == 'audio' else 'mp4' | |
file_path = f'pod_{media_type}_{pod_id}.{file_extension}' | |
with open(file_path, 'wb') as file: | |
file.write(response.content) | |
return file_path | |
else: | |
return None | |
with gr.Blocks() as demo: | |
gr.Markdown("## Try the [Lettercast.ai](https://lettercast.ai) API") | |
gr.Markdown(""" | |
Provide exactly one of the following: | |
- Enter a URL | |
- Upload a PDF file | |
- Enter HTML content | |
The API home is at our [Developer Hub](https://developer.lettercast.ai). | |
""") | |
with gr.Row(): | |
url = gr.Textbox(label="URL", placeholder="Enter a URL") | |
detail_level = gr.Dropdown( | |
choices=["summary_discussion", "summary_single_host"], | |
label="Pod Type", | |
value="summary_discussion" | |
) | |
with gr.Row(): | |
file = gr.File(label="Upload a PDF file", type="filepath") | |
create_video = gr.Checkbox(label="Create Video (PDF only)", value=False) | |
with gr.Row(): | |
html_content = gr.Textbox(label="HTML Content", placeholder="Enter HTML content", lines=5) | |
create_pod_btn = gr.Button("Create Pod") | |
output = gr.JSON(label="Response") | |
status_output = gr.Textbox(label="Status") | |
audio_player = gr.Audio(label="Generated Audio", visible=False, autoplay=True) | |
video_player = gr.Video(label="Generated Video", visible=False) | |
def update_ui(result, status, audio_file, video_file): | |
audio_visible = audio_file is not None | |
video_visible = video_file is not None | |
return ( | |
result, | |
status, | |
audio_file, | |
gr.update(visible=audio_visible), | |
video_file, | |
gr.update(visible=video_visible) | |
) | |
create_pod_btn.click( | |
fn=lambda *args: update_ui(*create_pod(*args)), | |
inputs=[url, file, html_content, detail_level, create_video], | |
outputs=[output, status_output, audio_player, audio_player, video_player, video_player], | |
show_progress="full", | |
show_api=False | |
) | |
demo.launch() |