tranny / App /Transcription /TranscriptionRoutes.py
Mbonea's picture
temp dir
59681d8
raw
history blame
7.02 kB
from fastapi import APIRouter, status, Depends, UploadFile, File, Query, BackgroundTasks
from typing_extensions import Annotated
from urllib.parse import urlparse, parse_qs
from .Schemas import (
UserDetails,
TranscriptionMetadata,
TranscriptionResult,
BaseTranscription,
)
from App import bot
import aiofiles, os, re
import uuid
import tempfile
from celery.result import AsyncResult
from App.Worker import transcription_task, downloadfile,downloadUrl
from App.Users.Model import User
from App.Users.UserRoutes import get_token_owner
from App.Users.Schemas import UserSchema
from .Model import Transcriptions
from .Utils.fastapi_tasks import perform_background_task
import yt_dlp
from fastapi_jwt_auth import AuthJWT
from App.Embedding.utils.Initialize import delete_documents
# from .Model import User
# from sqlalchemy import and_
transcription_router = APIRouter(tags=["Transcription"])
def genUUID():
uuid_value = uuid.uuid4()
short_uuid = str(uuid_value)[:6]
return short_uuid
@transcription_router.get("/download-audio")
async def download_audio(
url: str,
model: str = Query(
"tiny",
enum=["tiny", "small", "medium", "base", "large-v2"],
description="Whisper model Sizes",
),
user: UserSchema = Depends(get_token_owner),
):
youtube_url = url
parsed_url = urlparse(youtube_url)
# Get the query parameters
query_parameters = parse_qs(parsed_url.query)
# Get the value of the 'v' parameter
v_param_value = (
query_parameters.get("v", [])[0] if "v" in query_parameters else None
)
url = f"https://www.youtube.com/watch?v={v_param_value}"
if user == None:
return {"code": 400, "message": "doesn't exist", "payload": None}
ydl_opts_info = {
"quiet": True,
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info_dict = ydl.extract_info(url, download=False)
video_title = info_dict.get("title", None)
short_uuid = genUUID()
sanitized_title = re.sub(
r"(?u)[^-\w.]", "", short_uuid
) # Ensure the title is file-friendly
filename = f"{sanitized_title}.mp3"
file_path = os.path.join("./", "Downloads", filename)
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": file_path,
}
task = downloadfile.delay(url=url, ydl_opts=ydl_opts, model_size=model)
response = {"task_id": task.id, "file_name": video_title}
transcription_enrty = await Transcriptions.objects.create(
user=user, youtubeLink=url, **response
)
entry = BaseTranscription(**transcription_enrty.__dict__)
return entry
@transcription_router.get("/transcriptions")
async def get_user_transcriptions(
user: UserSchema = Depends(get_token_owner),
):
transcriptions = await Transcriptions.objects.filter(user=user.id).all()
objects = [
BaseTranscription(**obj.__dict__) for obj in transcriptions if obj != None
]
return objects
@transcription_router.post("/delete/{task_id}")
async def delete_transcription(
task_id: str,
user: UserSchema = Depends(get_token_owner),
):
transcript = (
await Transcriptions.objects.filter(user=user.id)
.filter(task_id=task_id)
.first()
)
if transcript:
await transcript.delete()
task = AsyncResult(task_id)
task.revoke(terminate=True)
await delete_documents(task_id=task_id)
return {"code": 200, "message": f"deleted {task_id}", "payload": None}
else:
return {
"code": 404,
"message": f"task {task_id} does not exist",
"payload": None,
}
@transcription_router.post("/url/")
async def url_task(
url: str,
model: str = Query(
"tiny",
enum=["tiny", "small", "medium", "base", "large-v2"],
description="Whisper model Sizes",
),
user: UserSchema = Depends(get_token_owner),
):
extension = 'wav'
file_name = f"{genUUID()}.{extension}"
file_dir = os.path.join("/tmp/", "Downloads")
# celery task
task = downloadUrl.delay(link=url, download_dir=file_dir, filename=file_name, model_size=model)
# create a transcription entry
transcription_enrty = await Transcriptions.objects.create(
task_id=task.id, user=user, file_name=file_name
)
print(task.id)
return {
"file_name": file_name,
"task_id": task.id,
# "message_id": data.id,
}
@transcription_router.post("/uploadfile/")
async def create_file(
background_tasks: BackgroundTasks,
file: UploadFile,
model: str = Query(
"tiny",
enum=["tiny", "small", "medium", "base", "large-v2"],
description="Whisper model Sizes",
),
user: UserSchema = Depends(get_token_owner),
):
extension = file.filename.split(".")[-1]
file_name = f"{genUUID()}.{extension}"
# Write the file to disk asynchronously
Upload_dir = ""
try:
async with aiofiles.open(file_name, "wb") as f:
while contents := await file.read(1024 * 1):
await f.write(contents)
except Exception as e:
return {
"message": f"There was an error uploading the file, error message {str(e)} "
}
finally:
await file.close()
# celery task
task = transcription_task.delay(file_name, model)
# create a transcription entry
transcription_enrty = await Transcriptions.objects.create(
task_id=task.id, user=user, file_name=file_name
)
background_tasks.add_task(
perform_background_task, file_name, file=file, task_id=task.id
)
return {
"file_size": file.size,
"file_name": file.filename,
"task_id": task.id,
# "message_id": data.id,
}
@transcription_router.get("/tasks/{task_id}")
async def get_status(task_id):
entry: Transcriptions = await Transcriptions.objects.filter(task_id=task_id).first()
if entry == None:
return {"payload": None, "message": "Nothing found", "code": 200}
result = BaseTranscription(**entry.__dict__)
if result.status == "SUCCESS":
result.percentage = "100"
return result
task_result = AsyncResult(task_id)
# print(task_result.result)
if task_result.result == None:
return {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result,
}
if task_result.status == "SUCCESS":
trans = TranscriptionMetadata(**task_result.result)
percentage = "100"
await entry.update(**trans.dict())
else:
try:
_trans = TranscriptionMetadata(**task_result.result)
percentage = _trans.percentage
except Exception as e:
print(e)
return {"payload": None, "message": "Nothing found", "code": 200}
await entry.update(**_trans.dict())
result = BaseTranscription(**entry.__dict__)
result.percentage = percentage
return result