from fastapi import APIRouter, status, Depends, UploadFile, File, Query, BackgroundTasks from typing_extensions import Annotated from urllib.parse import urlparse, parse_qs from .Schemas import ( UserDetails, TranscriptionMetadata, TranscriptionResult, BaseTranscription, ) from App import bot import aiofiles, os, re import uuid import tempfile from celery.result import AsyncResult from App.Worker import transcription_task, downloadfile,downloadUrl from App.Users.Model import User from App.Users.UserRoutes import get_token_owner from App.Users.Schemas import UserSchema from .Model import Transcriptions from .Utils.fastapi_tasks import perform_background_task import yt_dlp from fastapi_jwt_auth import AuthJWT from App.Embedding.utils.Initialize import delete_documents # from .Model import User # from sqlalchemy import and_ transcription_router = APIRouter(tags=["Transcription"]) def genUUID(): uuid_value = uuid.uuid4() short_uuid = str(uuid_value)[:6] return short_uuid @transcription_router.get("/download-audio") async def download_audio( url: str, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), user: UserSchema = Depends(get_token_owner), ): youtube_url = url parsed_url = urlparse(youtube_url) # Get the query parameters query_parameters = parse_qs(parsed_url.query) # Get the value of the 'v' parameter v_param_value = ( query_parameters.get("v", [])[0] if "v" in query_parameters else None ) url = f"https://www.youtube.com/watch?v={v_param_value}" if user == None: return {"code": 400, "message": "doesn't exist", "payload": None} ydl_opts_info = { "quiet": True, } with yt_dlp.YoutubeDL(ydl_opts_info) as ydl: info_dict = ydl.extract_info(url, download=False) video_title = info_dict.get("title", None) short_uuid = genUUID() sanitized_title = re.sub( r"(?u)[^-\w.]", "", short_uuid ) # Ensure the title is file-friendly filename = f"{sanitized_title}.mp3" file_path = os.path.join("./", "Downloads", filename) ydl_opts = { "format": "bestaudio/best", "outtmpl": file_path, } task = downloadfile.delay(url=url, ydl_opts=ydl_opts, model_size=model) response = {"task_id": task.id, "file_name": video_title} transcription_enrty = await Transcriptions.objects.create( user=user, youtubeLink=url, **response ) entry = BaseTranscription(**transcription_enrty.__dict__) return entry @transcription_router.get("/transcriptions") async def get_user_transcriptions( user: UserSchema = Depends(get_token_owner), ): transcriptions = await Transcriptions.objects.filter(user=user.id).all() objects = [ BaseTranscription(**obj.__dict__) for obj in transcriptions if obj != None ] return objects @transcription_router.post("/delete/{task_id}") async def delete_transcription( task_id: str, user: UserSchema = Depends(get_token_owner), ): transcript = ( await Transcriptions.objects.filter(user=user.id) .filter(task_id=task_id) .first() ) if transcript: await transcript.delete() task = AsyncResult(task_id) task.revoke(terminate=True) await delete_documents(task_id=task_id) return {"code": 200, "message": f"deleted {task_id}", "payload": None} else: return { "code": 404, "message": f"task {task_id} does not exist", "payload": None, } @transcription_router.post("/url/") async def url_task( url: str, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), user: UserSchema = Depends(get_token_owner), ): extension = 'wav' file_name = f"{genUUID()}.{extension}" file_dir = os.path.join("/tmp/", "Downloads") # celery task task = downloadUrl.delay(link=url, download_dir=file_dir, filename=file_name, model_size=model) # create a transcription entry transcription_enrty = await Transcriptions.objects.create( task_id=task.id, user=user, file_name=file_name ) print(task.id) return { "file_name": file_name, "task_id": task.id, # "message_id": data.id, } @transcription_router.post("/uploadfile/") async def create_file( background_tasks: BackgroundTasks, file: UploadFile, model: str = Query( "tiny", enum=["tiny", "small", "medium", "base", "large-v2"], description="Whisper model Sizes", ), user: UserSchema = Depends(get_token_owner), ): extension = file.filename.split(".")[-1] file_name = f"{genUUID()}.{extension}" # Write the file to disk asynchronously Upload_dir = "" try: async with aiofiles.open(file_name, "wb") as f: while contents := await file.read(1024 * 1): await f.write(contents) except Exception as e: return { "message": f"There was an error uploading the file, error message {str(e)} " } finally: await file.close() # celery task task = transcription_task.delay(file_name, model) # create a transcription entry transcription_enrty = await Transcriptions.objects.create( task_id=task.id, user=user, file_name=file_name ) background_tasks.add_task( perform_background_task, file_name, file=file, task_id=task.id ) return { "file_size": file.size, "file_name": file.filename, "task_id": task.id, # "message_id": data.id, } @transcription_router.get("/tasks/{task_id}") async def get_status(task_id): entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first() if entry == None: return {"payload": None, "message": "Nothing found", "code": 200} result = BaseTranscription(**entry.__dict__) if result.status == "SUCCESS": result.percentage = "100" return result task_result = AsyncResult(task_id) # print(task_result.result) if task_result.result == None: return { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result, } if task_result.status == "SUCCESS": trans = TranscriptionMetadata(**task_result.result) percentage = "100" await entry.update(**trans.dict()) else: try: _trans = TranscriptionMetadata(**task_result.result) percentage = _trans.percentage except Exception as e: print(e) return {"payload": None, "message": "Nothing found", "code": 200} await entry.update(**_trans.dict()) result = BaseTranscription(**entry.__dict__) result.percentage = percentage return result