from fastapi import APIRouter, status, Form, UploadFile, File, Query, BackgroundTasks from typing_extensions import Annotated from .Schemas import UserDetails from App import bot import aiofiles, os, re import tempfile from celery.result import AsyncResult from App.Worker import transcription_task, downloadfile from App.Users.Model import User from .Model import Transcriptions from .Utils.fastapi_tasks import perform_background_task import yt_dlp # from .Model import User # from sqlalchemy import and_ transcription_router = APIRouter(tags=["Transcription"]) @transcription_router.get("/download-audio") async def download_audio( url: str, userId: int = 1, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), ): user = await User.objects.filter(id=userId).first() if user == None: return {"code": 400, "message": "doesn't exist", "payload": None} ydl_opts_info = { "quiet": True, } with yt_dlp.YoutubeDL(ydl_opts_info) as ydl: info_dict = ydl.extract_info(url, download=False) video_title = info_dict.get("title", None) sanitized_title = re.sub( r"(?u)[^-\w.]", "", video_title ) # Ensure the title is file-friendly filename = f"{sanitized_title}.mp3" file_path = os.path.join("./", "Downloads", filename) ydl_opts = { "format": "bestaudio/best", "outtmpl": file_path, } task = downloadfile.delay(url, ydl_opts, model) transcription_enrty = await Transcriptions.objects.create( task_id=task.id, user=user ) return { "task_id": task.id, } @transcription_router.post("/uploadfile/") async def create_file( background_tasks: BackgroundTasks, file: UploadFile, userId: int = 1, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), ): user = await User.objects.filter(id=userId).first() if user == None: return {"code": 400, "message": "doesn't exist", "payload": None} # Write the file to disk asynchronously try: async with aiofiles.open(file.filename, "wb") as f: while contents := await file.read(1024 * 1): await f.write(contents) except Exception: return {"message": "There was an error uploading the file"} finally: await file.close() # celery task task = transcription_task.delay(file.filename, model) # create a transcription entry transcription_enrty = await Transcriptions.objects.create( task_id=task.id, user=user ) background_tasks.add_task(perform_background_task, file=file, task_id=task.id) return { "file_size": file.size, "file_name": file.filename, "task_id": task.id, # "message_id": data.id, } @transcription_router.get("/tasks/{task_id}") async def get_status(task_id): task_result = AsyncResult(task_id) entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first() if task_result.status == "SUCCESS": await entry.update( content=task_result.result, transcription_state=task_result.status ) else: await entry.update(transcription_state=task_result.status) result = { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result, "id": entry.tl_file_id, } return result