from fastapi import APIRouter, status, Depends, UploadFile, File, Query, BackgroundTasks from typing_extensions import Annotated from .Schemas import ( UserDetails, TranscriptionMetadata, TranscriptionResult, BaseTranscription, ) from App import bot import aiofiles, os, re import tempfile from celery.result import AsyncResult from App.Worker import transcription_task, downloadfile from App.Users.Model import User from App.Users.UserRoutes import get_token_owner from App.Users.Schemas import UserSchema from .Model import Transcriptions from .Utils.fastapi_tasks import perform_background_task import yt_dlp from fastapi_jwt_auth import AuthJWT # from .Model import User # from sqlalchemy import and_ transcription_router = APIRouter(tags=["Transcription"]) @transcription_router.get("/download-audio") async def download_audio( url: str, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), user: UserSchema = Depends(get_token_owner), ): if user == None: return {"code": 400, "message": "doesn't exist", "payload": None} ydl_opts_info = { "quiet": True, } with yt_dlp.YoutubeDL(ydl_opts_info) as ydl: info_dict = ydl.extract_info(url, download=False) video_title = info_dict.get("title", None) sanitized_title = re.sub( r"(?u)[^-\w.]", "", video_title ) # Ensure the title is file-friendly filename = f"{sanitized_title}.mp3" file_path = os.path.join("/srv/App/", "Downloads", filename) ydl_opts = { "format": "bestaudio/best", "outtmpl": file_path, } task = downloadfile.delay(url, ydl_opts, model) response = {"task_id": task.id, "file_name": f"{video_title}.mp3"} transcription_enrty = await Transcriptions.objects.create( user=user, youtubeLink=url, **response ) return response @transcription_router.get("/transcriptions") async def get_user_transcriptions( user: UserSchema = Depends(get_token_owner), ): transcriptions = await Transcriptions.objects.filter(user=user.id).all() objects = [BaseTranscription(**obj.__dict__) for obj in transcriptions] return objects @transcription_router.post("/delete/{task_id}") async def delete_transcription( task_id: str, user: UserSchema = Depends(get_token_owner), ): transcript = await Transcriptions.objects.filter(task_id=task_id).first() await transcript.delete() return {"code": 200, "message": f"deleted {task_id}", "payload": None} @transcription_router.post("/uploadfile/") async def create_file( background_tasks: BackgroundTasks, file: UploadFile, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), user: UserSchema = Depends(get_token_owner), ): # Write the file to disk asynchronously Upload_dir = "" try: async with aiofiles.open(file.filename, "wb") as f: while contents := await file.read(1024 * 1): await f.write(contents) except Exception as e: return { "message": f"There was an error uploading the file, error message {str(e)} " } finally: await file.close() # celery task task = transcription_task.delay(file.filename, model) # create a transcription entry transcription_enrty = await Transcriptions.objects.create( task_id=task.id, user=user, file_name=file.filename ) background_tasks.add_task(perform_background_task, file=file, task_id=task.id) return { "file_size": file.size, "file_name": file.filename, "task_id": task.id, # "message_id": data.id, } @transcription_router.get("/tasks/{task_id}") async def get_status(task_id): task_result = AsyncResult(task_id) if task_result.result == None: return { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result, } entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first() if task_result.status == "SUCCESS": trans = TranscriptionResult(**task_result.result) trans await entry.update( content=trans.content, **trans.dict( exclude={"transcript"}, ), ) else: _trans = TranscriptionMetadata(**task_result.result) await entry.update(**_trans.dict(exclude={"transcription"})) result = { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result, "id": entry.tl_file_id, } return result