|
from fastapi import APIRouter, status, Form, UploadFile, File, Query, BackgroundTasks |
|
from typing_extensions import Annotated |
|
from .Schemas import UserDetails |
|
from App import bot |
|
import aiofiles, os, re |
|
import tempfile |
|
from celery.result import AsyncResult |
|
from App.Worker import transcription_task, downloadfile |
|
from App.Users.Model import User |
|
from .Model import Transcriptions |
|
from .Utils.fastapi_tasks import perform_background_task |
|
import yt_dlp |
|
|
|
|
|
|
|
|
|
|
|
transcription_router = APIRouter(tags=["Transcription"]) |
|
|
|
|
|
@transcription_router.get("/download-audio") |
|
async def download_audio( |
|
url: str, |
|
userId: int = 1, |
|
model: str = Query( |
|
"tiny", |
|
enum=["tiny", "small", "medium", "base", "large-v2"], |
|
description="Whisper model Sizes", |
|
), |
|
): |
|
user = await User.objects.filter(id=userId).first() |
|
if user == None: |
|
return {"code": 400, "message": "doesn't exist", "payload": None} |
|
|
|
ydl_opts_info = { |
|
"quiet": True, |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
video_title = info_dict.get("title", None) |
|
|
|
sanitized_title = re.sub( |
|
r"(?u)[^-\w.]", "", video_title |
|
) |
|
filename = f"{sanitized_title}.mp3" |
|
file_path = os.path.join("./", "Downloads", filename) |
|
|
|
ydl_opts = { |
|
"format": "bestaudio/best", |
|
"outtmpl": file_path, |
|
} |
|
|
|
task = downloadfile.delay(url, ydl_opts, model) |
|
transcription_enrty = await Transcriptions.objects.create( |
|
task_id=task.id, user=user |
|
) |
|
return { |
|
"task_id": task.id, |
|
} |
|
|
|
|
|
@transcription_router.post("/uploadfile/") |
|
async def create_file( |
|
background_tasks: BackgroundTasks, |
|
file: UploadFile, |
|
userId: int = 1, |
|
model: str = Query( |
|
"tiny", |
|
enum=["tiny", "small", "medium", "base", "large-v2"], |
|
description="Whisper model Sizes", |
|
), |
|
): |
|
user = await User.objects.filter(id=userId).first() |
|
if user == None: |
|
return {"code": 400, "message": "doesn't exist", "payload": None} |
|
|
|
|
|
try: |
|
async with aiofiles.open(file.filename, "wb") as f: |
|
while contents := await file.read(1024 * 1): |
|
await f.write(contents) |
|
|
|
except Exception: |
|
return {"message": "There was an error uploading the file"} |
|
finally: |
|
await file.close() |
|
|
|
|
|
task = transcription_task.delay(file.filename, model) |
|
|
|
|
|
transcription_enrty = await Transcriptions.objects.create( |
|
task_id=task.id, user=user |
|
) |
|
background_tasks.add_task(perform_background_task, file=file, task_id=task.id) |
|
return { |
|
"file_size": file.size, |
|
"file_name": file.filename, |
|
"task_id": task.id, |
|
|
|
} |
|
|
|
|
|
@transcription_router.get("/tasks/{task_id}") |
|
async def get_status(task_id): |
|
task_result = AsyncResult(task_id) |
|
entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first() |
|
if task_result.status == "SUCCESS": |
|
await entry.update( |
|
content=task_result.result, transcription_state=task_result.status |
|
) |
|
else: |
|
await entry.update(transcription_state=task_result.status) |
|
result = { |
|
"task_id": task_id, |
|
"task_status": task_result.status, |
|
"task_result": task_result.result, |
|
"id": entry.tl_file_id, |
|
} |
|
return result |
|
|