Spaces:
Runtime error
Runtime error
from typing import Dict, List | |
from pathlib import Path | |
from sqlite3 import Cursor | |
from utils import accepts_types, create_videos | |
from preprocessing.youtubevideopreprocessor import YoutubeVideoPreprocessor | |
from loading.loaderiterator import LoaderIterator | |
from transforming.batchtransformer import BatchTransformer | |
from storing.sqlitebatchvideostorer import SQLiteBatchVideoStorer | |
from storing.sqlitecontextmanager import SQLiteContextManager | |
from loading.serialization import JsonSerializer | |
from transforming.addtitletransform import AddTitleTransform | |
from transforming.adddescriptiontransform import AddDescriptionTransform | |
from transforming.whispertransform import WhisperTransform | |
class DataPipeline: | |
"""A class that wraps the different components of the system. It processes | |
data using these steps: load -> apply transform -> store. | |
""" | |
def __init__(self, | |
loader_iterator: LoaderIterator, | |
batch_transformer: BatchTransformer, | |
storer: SQLiteBatchVideoStorer, | |
sqlite_context_manager: SQLiteContextManager) -> None: | |
self.loader_iterator = loader_iterator | |
self.batch_transformer = batch_transformer | |
self.storer = storer | |
self.sqlite_context_manager = sqlite_context_manager | |
def process(self, load_paths: List[Path]) -> None: | |
"""Process files in batches: load -> transform -> store to db.""" | |
self.loader_iterator.load_paths = load_paths | |
with self.sqlite_context_manager as db_cursor: | |
for video_data_batch in self.loader_iterator: | |
self._process_video_batch(db_cursor, video_data_batch) | |
def _process_video_batch(self, | |
db_cursor: Cursor, | |
video_data_batch: List[Dict]) -> None: | |
videos = create_videos(video_data_batch) | |
transformed_videos = self.batch_transformer.apply(videos) | |
self.storer.store(db_cursor, transformed_videos) | |
def create_hardcoded_data_pipeline(db_path, whisper_model: str="base") -> DataPipeline: | |
"""Factory function to create a DataPipeline with | |
default arguments. | |
TODO: Create DataPipeline so users can pass the args. | |
""" | |
loader_iterator = LoaderIterator(JsonSerializer(), 2) | |
# Whisper transform using based model and timestamps | |
# TODO: Let user select this parameters. | |
batch_transformer = BatchTransformer([AddTitleTransform(), | |
AddDescriptionTransform(), | |
WhisperTransform(model=whisper_model)]) | |
video_storer = SQLiteBatchVideoStorer() | |
sqlite_context_manager = SQLiteContextManager(db_path) | |
return DataPipeline(loader_iterator, | |
batch_transformer, | |
video_storer, | |
sqlite_context_manager) |