Mbonea commited on
Commit
07fd3f6
1 Parent(s): 5e865c5

added streaming

Browse files
App/Streaming/StreamingRoutes.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Request
2
+ from .Utils.Handler import Handler
3
+ from App import bot
4
+
5
+ streaming_router = APIRouter(tags=["streaming"])
6
+
7
+
8
+ @streaming_router.get("/stream/")
9
+ async def create_file(id: int, req: Request):
10
+ hander = Handler(req=req, client=bot, id=id)
11
+ hander.route = "inline"
12
+ return await hander.process_request()
App/Streaming/Utils/Download.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import math, asyncio, subprocess
2
+ from telethon import TelegramClient
3
+ from fastapi.responses import StreamingResponse
4
+
5
+ import logging
6
+
7
+ logging.getLogger(__name__)
8
+ logging.basicConfig(level=logging.INFO)
9
+
10
+
11
+ class Download:
12
+ client: TelegramClient
13
+ route: str
14
+ offset: int
15
+ handler: None
16
+ file: None
17
+ limit: int
18
+ file_size: float
19
+
20
+ def __init__(self, handler):
21
+ self.handler = handler
22
+ self.file = handler.message.media
23
+ self.file_size = handler.message.file.size
24
+ self.limit = handler.sanity.limit
25
+ self.offset = handler.sanity.offset
26
+ self.client = handler.client
27
+ self.mime_type = handler.message.file.mime_type
28
+
29
+ async def download(self):
30
+ part_size = int(512 * 1024) * 2
31
+ first_part_cut = self.offset % part_size
32
+ first_part = math.floor(self.offset / part_size)
33
+ last_part_cut = part_size - (self.limit % part_size)
34
+ last_part = math.ceil(self.limit / part_size)
35
+ part_count = math.ceil(self.file_size / part_size)
36
+ part = first_part
37
+ try:
38
+ async for chunk in self.client.iter_download(
39
+ self.file, offset=first_part * part_size, request_size=part_size
40
+ ):
41
+ if part == first_part:
42
+ yield bytes(chunk[first_part_cut:])
43
+ elif part == last_part:
44
+ yield bytes(chunk[:last_part_cut])
45
+ else:
46
+ yield bytes(chunk)
47
+ logging.debug(f"Part {part}/{last_part} (total {part_count}) served!")
48
+ part += 1
49
+ logging.debug("serving finished")
50
+ except (GeneratorExit, StopAsyncIteration, asyncio.CancelledError):
51
+ logging.debug("file serve interrupted")
52
+
53
+ raise
54
+ except Exception as e:
55
+ print(e)
56
+ logging.debug("file serve errored", exc_info=True)
57
+
58
+ async def handle_request(self):
59
+ headers = {
60
+ "content-type": self.mime_type,
61
+ "content-range": f"bytes {self.offset}-{self.limit-1}/{self.file_size}",
62
+ "content-length": str(self.limit - self.offset),
63
+ "accept-ranges": "bytes",
64
+ "content-transfer-encoding": "Binary",
65
+ "content-disposition": f'{self.handler.route}; filename="{self.handler.message.file.name}"',
66
+ }
67
+ logging.info(
68
+ f"Serving file in {self.handler.message.file.name}) ; Range: {self.offset} - {self.limit}"
69
+ )
70
+ if self.handler.head:
71
+ body = None
72
+ else:
73
+ body = self.download()
74
+ return StreamingResponse(
75
+ media_type=self.mime_type,
76
+ content=body,
77
+ headers=headers,
78
+ status_code=206 if self.offset else 200,
79
+ )
App/Streaming/Utils/Handler.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .Sanity import Sanity
2
+ from fastapi import Request
3
+ from telethon import TelegramClient
4
+ from telethon.tl.types import Message
5
+ from .Download import Download
6
+ from fastapi.responses import JSONResponse
7
+
8
+
9
+ class Handler:
10
+ req: Request
11
+ client: TelegramClient
12
+ chat_id = -1001925049183
13
+ message: Message
14
+ route: str
15
+ head = False
16
+ sanity: Sanity
17
+
18
+ def __init__(self, id, req: Request, client, route=None, head=False):
19
+ self.head = head
20
+ self.req = req
21
+ self.client = client
22
+ self.sanity = Sanity()
23
+ self.sanity.client = self.client
24
+ self.sanity.chat_id = self.chat_id
25
+ self.sanity.req = self.req
26
+ self.sanity.file_id = id
27
+
28
+ async def sanity_checks(self):
29
+ self.message = await self.sanity.file_exists()
30
+
31
+ try:
32
+ if not self.message.media:
33
+ return JSONResponse(
34
+ status_code=404,
35
+ content={"Error": "File Does not Exist"},
36
+ )
37
+ except:
38
+ return JSONResponse(
39
+ status=404,
40
+ content={"Error": "File Does not Exist", "route": self.route},
41
+ )
42
+
43
+ if self.sanity.check_ranges() == False:
44
+ return JSONResponse(
45
+ status=416,
46
+ content={"Error": "416: Range Not Satisfiable"},
47
+ headers={"Content-Range": f"bytes */{self.message.file.size}"},
48
+ )
49
+
50
+ async def process_request(self):
51
+ response = await self.sanity_checks()
52
+ if type(response) is JSONResponse:
53
+ return response
54
+
55
+ # download/stream
56
+ return await Download(self).handle_request()
App/Streaming/Utils/Sanity.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from telethon import TelegramClient
2
+ from fastapi import Request
3
+
4
+
5
+ class Sanity:
6
+ client: TelegramClient
7
+ media = None
8
+ chat_id: int = -1001925049183
9
+ file_id: int
10
+ req: Request
11
+ limit: int
12
+ offset: int
13
+
14
+ async def file_exists(self):
15
+ try:
16
+ self.media = await self.client.get_messages(
17
+ entity=self.chat_id, ids=self.file_id
18
+ )
19
+ return self.media
20
+ except Exception as e:
21
+ pass
22
+
23
+ def check_ranges(self):
24
+ range_header = self.req.headers.get("Range")
25
+ if range_header:
26
+ offset = range_header.split("=")[1].split("-")[0] or 0
27
+ limit = range_header.split("=")[1].split("-")[1] or self.media.file.size
28
+ else:
29
+ offset = 0
30
+ limit = self.media.file.size
31
+ self.offset = int(offset)
32
+ self.limit = int(limit)
33
+
34
+ if (
35
+ (limit > self.media.file.size)
36
+ or (self.offset < 0)
37
+ or (self.limit < self.offset)
38
+ ):
39
+ return False
40
+ else:
41
+ return True
App/Transcription/Model.py CHANGED
@@ -1,10 +1,5 @@
1
- import asyncio
2
  import orm
3
- import psycopg2
4
  import datetime
5
- import ujson
6
- import pydantic
7
- from passlib.context import CryptContext
8
  from App.modelInit import database, models
9
 
10
 
 
 
1
  import orm
 
2
  import datetime
 
 
 
3
  from App.modelInit import database, models
4
 
5
 
App/Transcription/Schemas.py CHANGED
@@ -1,7 +1,4 @@
1
- from typing import List, Optional
2
- from pydantic import EmailStr, BaseModel
3
- from passlib.context import CryptContext
4
- from fastapi import UploadFile
5
 
6
 
7
  class TranscriptionMetadata(BaseModel):
 
1
+ from pydantic import BaseModel
 
 
 
2
 
3
 
4
  class TranscriptionMetadata(BaseModel):
App/Transcription/TranscriptionRoutes.py CHANGED
@@ -1,6 +1,7 @@
1
  from fastapi import APIRouter, status, Form, UploadFile, File, Query
2
  from typing_extensions import Annotated
3
  from .Schemas import UserDetails
 
4
  import aiofiles
5
  from celery.result import AsyncResult
6
  from App.Worker import transcription_task
@@ -27,15 +28,28 @@ async def create_file(
27
  async with aiofiles.open(file.filename, "wb") as f:
28
  while contents := await file.read(1024 * 1024):
29
  await f.write(contents)
 
 
 
 
 
 
30
  except Exception:
31
  return {"message": "There was an error uploading the file"}
32
  finally:
33
  await file.close()
34
 
 
 
35
  # celery task
36
- task = transcription_task.delay(file.filename, model)
37
 
38
- return {"file_size": file.size, "file_name": file.filename, "task_id": task.id}
 
 
 
 
 
39
 
40
 
41
  @transcription_router.get("/tasks/{task_id}")
 
1
  from fastapi import APIRouter, status, Form, UploadFile, File, Query
2
  from typing_extensions import Annotated
3
  from .Schemas import UserDetails
4
+ from App import bot
5
  import aiofiles
6
  from celery.result import AsyncResult
7
  from App.Worker import transcription_task
 
28
  async with aiofiles.open(file.filename, "wb") as f:
29
  while contents := await file.read(1024 * 1024):
30
  await f.write(contents)
31
+ data = await bot.send_file(
32
+ -1001925049183,
33
+ file_size=file.size,
34
+ caption=file.filename,
35
+ file=contents,
36
+ )
37
  except Exception:
38
  return {"message": "There was an error uploading the file"}
39
  finally:
40
  await file.close()
41
 
42
+ # telegram
43
+
44
  # celery task
45
+ # task = transcription_task.delay(file.filename, model)
46
 
47
+ return {
48
+ "file_size": file.size,
49
+ "file_name": file.filename,
50
+ "task_id": task.id,
51
+ "message_id": data.id,
52
+ }
53
 
54
 
55
  @transcription_router.get("/tasks/{task_id}")
App/__init__.py CHANGED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ import uuid, contextvars
2
+ from telethon import TelegramClient
3
+
4
+
5
+ bot = TelegramClient(
6
+ "transcription_bot",
7
+ api_id=870972,
8
+ api_hash="ce2efaca02dfcd110941be6025e9ac0d",
9
+ )
App/app.py CHANGED
@@ -1,15 +1,17 @@
1
  from fastapi import FastAPI
 
2
  from .Users.UserRoutes import user_router
3
  from .modelInit import models, database
4
  from .Transcription.TranscriptionRoutes import transcription_router
 
5
  from fastapi.middleware.cors import CORSMiddleware
6
  import logging
7
 
8
  # Configure logging
9
  logging.basicConfig(
10
  level=logging.DEBUG,
11
- format='%(asctime)s - %(levelname)s - %(message)s',
12
- datefmt='%Y-%m-%d %H:%M:%S'
13
  )
14
 
15
  app = FastAPI()
@@ -27,6 +29,7 @@ app.add_middleware(
27
 
28
  @app.on_event("startup")
29
  async def startup_event():
 
30
  await models.create_all()
31
  if not database.is_connected:
32
  await database.connect()
@@ -35,6 +38,7 @@ async def startup_event():
35
 
36
  @app.on_event("shutdown")
37
  async def shutdown_event():
 
38
  if not database.is_connected:
39
  await database.disconnect()
40
  print("shutting down!")
@@ -47,3 +51,4 @@ async def landing_page():
47
 
48
  app.include_router(user_router)
49
  app.include_router(transcription_router)
 
 
1
  from fastapi import FastAPI
2
+ from App import bot
3
  from .Users.UserRoutes import user_router
4
  from .modelInit import models, database
5
  from .Transcription.TranscriptionRoutes import transcription_router
6
+ from .Streaming.StreamingRoutes import streaming_router
7
  from fastapi.middleware.cors import CORSMiddleware
8
  import logging
9
 
10
  # Configure logging
11
  logging.basicConfig(
12
  level=logging.DEBUG,
13
+ format="%(asctime)s - %(levelname)s - %(message)s",
14
+ datefmt="%Y-%m-%d %H:%M:%S",
15
  )
16
 
17
  app = FastAPI()
 
29
 
30
  @app.on_event("startup")
31
  async def startup_event():
32
+ await bot.start()
33
  await models.create_all()
34
  if not database.is_connected:
35
  await database.connect()
 
38
 
39
  @app.on_event("shutdown")
40
  async def shutdown_event():
41
+ await bot.stop()
42
  if not database.is_connected:
43
  await database.disconnect()
44
  print("shutting down!")
 
51
 
52
  app.include_router(user_router)
53
  app.include_router(transcription_router)
54
+ app.include_router(streaming_router)
App/celery_config.py CHANGED
@@ -8,8 +8,6 @@ timezone = "Europe/Oslo"
8
  enable_utc = True
9
 
10
  broker_url = f"rediss://default:59c9ffda43f61cc18b44a3407a2a7793@master.transcription--legal-stuff--a96n-7tyr.addon.code.run:6379?ssl_cert_reqs=none"
11
-
12
-
13
  result_backend = f"rediss://default:59c9ffda43f61cc18b44a3407a2a7793@master.transcription--legal-stuff--a96n-7tyr.addon.code.run:6379?ssl_cert_reqs=none"
14
 
15
  # SSL/TLS and SNI configuration
 
8
  enable_utc = True
9
 
10
  broker_url = f"rediss://default:59c9ffda43f61cc18b44a3407a2a7793@master.transcription--legal-stuff--a96n-7tyr.addon.code.run:6379?ssl_cert_reqs=none"
 
 
11
  result_backend = f"rediss://default:59c9ffda43f61cc18b44a3407a2a7793@master.transcription--legal-stuff--a96n-7tyr.addon.code.run:6379?ssl_cert_reqs=none"
12
 
13
  # SSL/TLS and SNI configuration
transcription_bot.session ADDED
Binary file (28.7 kB). View file
 
transcription_bot.session-journal ADDED
Binary file (4.62 kB). View file