|
from fastapi import APIRouter, status, Depends, UploadFile, File, Query, BackgroundTasks |
|
from typing_extensions import Annotated |
|
from .Schemas import ( |
|
UserDetails, |
|
TranscriptionMetadata, |
|
TranscriptionResult, |
|
BaseTranscription, |
|
) |
|
from App import bot |
|
import aiofiles, os, re |
|
import tempfile |
|
from celery.result import AsyncResult |
|
from App.Worker import transcription_task, downloadfile |
|
from App.Users.Model import User |
|
from App.Users.UserRoutes import get_token_owner |
|
from App.Users.Schemas import UserSchema |
|
from .Model import Transcriptions |
|
from .Utils.fastapi_tasks import perform_background_task |
|
import yt_dlp |
|
from fastapi_jwt_auth import AuthJWT |
|
|
|
|
|
|
|
|
|
|
|
transcription_router = APIRouter(tags=["Transcription"]) |
|
|
|
|
|
@transcription_router.get("/download-audio") |
|
async def download_audio( |
|
url: str, |
|
model: str = Query( |
|
"tiny", |
|
enum=["tiny", "small", "medium", "base", "large-v2"], |
|
description="Whisper model Sizes", |
|
), |
|
user: UserSchema = Depends(get_token_owner), |
|
): |
|
if user == None: |
|
return {"code": 400, "message": "doesn't exist", "payload": None} |
|
|
|
ydl_opts_info = { |
|
"quiet": True, |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
video_title = info_dict.get("title", None) |
|
|
|
sanitized_title = re.sub( |
|
r"(?u)[^-\w.]", "", video_title |
|
) |
|
filename = f"{sanitized_title}.mp3" |
|
file_path = os.path.join("/srv/App/", "Downloads", filename) |
|
|
|
ydl_opts = { |
|
"format": "bestaudio/best", |
|
"outtmpl": file_path, |
|
} |
|
|
|
task = downloadfile.delay(url, ydl_opts, model) |
|
response = {"task_id": task.id, "file_name": f"{video_title}.mp3"} |
|
transcription_enrty = await Transcriptions.objects.create( |
|
user=user, youtubeLink=url, **response |
|
) |
|
|
|
return response |
|
|
|
|
|
@transcription_router.get("/transcriptions") |
|
async def get_user_transcriptions( |
|
user: UserSchema = Depends(get_token_owner), |
|
): |
|
transcriptions = await Transcriptions.objects.filter(user=user.id).all() |
|
objects = [BaseTranscription(**obj.__dict__) for obj in transcriptions] |
|
return objects |
|
|
|
|
|
@transcription_router.post("/delete/{task_id}") |
|
async def delete_transcription( |
|
task_id: str, |
|
user: UserSchema = Depends(get_token_owner), |
|
): |
|
transcript = await Transcriptions.objects.filter(task_id=task_id).first() |
|
await transcript.delete() |
|
return {"code": 200, "message": f"deleted {task_id}", "payload": None} |
|
|
|
|
|
@transcription_router.post("/uploadfile/") |
|
async def create_file( |
|
background_tasks: BackgroundTasks, |
|
file: UploadFile, |
|
model: str = Query( |
|
"tiny", |
|
enum=["tiny", "small", "medium", "base", "large-v2"], |
|
description="Whisper model Sizes", |
|
), |
|
user: UserSchema = Depends(get_token_owner), |
|
): |
|
|
|
Upload_dir = "" |
|
try: |
|
async with aiofiles.open(file.filename, "wb") as f: |
|
while contents := await file.read(1024 * 1): |
|
await f.write(contents) |
|
|
|
except Exception as e: |
|
return { |
|
"message": f"There was an error uploading the file, error message {str(e)} " |
|
} |
|
finally: |
|
await file.close() |
|
|
|
|
|
task = transcription_task.delay(file.filename, model) |
|
|
|
|
|
transcription_enrty = await Transcriptions.objects.create( |
|
task_id=task.id, user=user, file_name=file.filename |
|
) |
|
background_tasks.add_task(perform_background_task, file=file, task_id=task.id) |
|
return { |
|
"file_size": file.size, |
|
"file_name": file.filename, |
|
"task_id": task.id, |
|
|
|
} |
|
|
|
|
|
@transcription_router.get("/tasks/{task_id}") |
|
async def get_status(task_id): |
|
task_result = AsyncResult(task_id) |
|
|
|
if task_result.result == None: |
|
return { |
|
"task_id": task_id, |
|
"task_status": task_result.status, |
|
"task_result": task_result.result, |
|
} |
|
|
|
entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first() |
|
if task_result.status == "SUCCESS": |
|
trans = TranscriptionResult(**task_result.result) |
|
trans |
|
await entry.update( |
|
content=trans.content, |
|
**trans.dict( |
|
exclude={"transcript"}, |
|
), |
|
) |
|
else: |
|
_trans = TranscriptionMetadata(**task_result.result) |
|
await entry.update(**_trans.dict(exclude={"transcription"})) |
|
|
|
result = { |
|
"task_id": task_id, |
|
"task_status": task_result.status, |
|
"task_result": task_result.result, |
|
"id": entry.tl_file_id, |
|
} |
|
return result |
|
|