tranny / App /Transcription /TranscriptionRoutes.py
Mbonea's picture
changed permissions
5600a35
raw
history blame
4.8 kB
from fastapi import APIRouter, status, Depends, UploadFile, File, Query, BackgroundTasks
from typing_extensions import Annotated
from .Schemas import (
UserDetails,
TranscriptionMetadata,
TranscriptionResult,
BaseTranscription,
)
from App import bot
import aiofiles, os, re
import tempfile
from celery.result import AsyncResult
from App.Worker import transcription_task, downloadfile
from App.Users.Model import User
from App.Users.UserRoutes import get_token_owner
from App.Users.Schemas import UserSchema
from .Model import Transcriptions
from .Utils.fastapi_tasks import perform_background_task
import yt_dlp
from fastapi_jwt_auth import AuthJWT
# from .Model import User
# from sqlalchemy import and_
transcription_router = APIRouter(tags=["Transcription"])
@transcription_router.get("/download-audio")
async def download_audio(
url: str,
model: str = Query(
"tiny",
enum=["tiny", "small", "medium", "base", "large-v2"],
description="Whisper model Sizes",
),
user: UserSchema = Depends(get_token_owner),
):
if user == None:
return {"code": 400, "message": "doesn't exist", "payload": None}
ydl_opts_info = {
"quiet": True,
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info_dict = ydl.extract_info(url, download=False)
video_title = info_dict.get("title", None)
sanitized_title = re.sub(
r"(?u)[^-\w.]", "", video_title
) # Ensure the title is file-friendly
filename = f"{sanitized_title}.mp3"
file_path = os.path.join("/srv/App/", "Downloads", filename)
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": file_path,
}
task = downloadfile.delay(url, ydl_opts, model)
response = {"task_id": task.id, "file_name": f"{video_title}.mp3"}
transcription_enrty = await Transcriptions.objects.create(
user=user, youtubeLink=url, **response
)
return response
@transcription_router.get("/transcriptions")
async def get_user_transcriptions(
user: UserSchema = Depends(get_token_owner),
):
transcriptions = await Transcriptions.objects.filter(user=user.id).all()
objects = [BaseTranscription(**obj.__dict__) for obj in transcriptions]
return objects
@transcription_router.post("/delete/{task_id}")
async def delete_transcription(
task_id: str,
user: UserSchema = Depends(get_token_owner),
):
transcript = await Transcriptions.objects.filter(task_id=task_id).first()
await transcript.delete()
return {"code": 200, "message": f"deleted {task_id}", "payload": None}
@transcription_router.post("/uploadfile/")
async def create_file(
background_tasks: BackgroundTasks,
file: UploadFile,
model: str = Query(
"tiny",
enum=["tiny", "small", "medium", "base", "large-v2"],
description="Whisper model Sizes",
),
user: UserSchema = Depends(get_token_owner),
):
# Write the file to disk asynchronously
Upload_dir = ""
try:
async with aiofiles.open(file.filename, "wb") as f:
while contents := await file.read(1024 * 1):
await f.write(contents)
except Exception as e:
return {
"message": f"There was an error uploading the file, error message {str(e)} "
}
finally:
await file.close()
# celery task
task = transcription_task.delay(file.filename, model)
# create a transcription entry
transcription_enrty = await Transcriptions.objects.create(
task_id=task.id, user=user, file_name=file.filename
)
background_tasks.add_task(perform_background_task, file=file, task_id=task.id)
return {
"file_size": file.size,
"file_name": file.filename,
"task_id": task.id,
# "message_id": data.id,
}
@transcription_router.get("/tasks/{task_id}")
async def get_status(task_id):
task_result = AsyncResult(task_id)
if task_result.result == None:
return {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result,
}
entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first()
if task_result.status == "SUCCESS":
trans = TranscriptionResult(**task_result.result)
trans
await entry.update(
content=trans.content,
**trans.dict(
exclude={"transcript"},
),
)
else:
_trans = TranscriptionMetadata(**task_result.result)
await entry.update(**_trans.dict(exclude={"transcription"}))
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result,
"id": entry.tl_file_id,
}
return result