Spaces:
Build error
Build error
import os | |
import cv2 | |
import pdb | |
import sys | |
import time | |
import numpy as np | |
from transformers import logging | |
logging.set_verbosity_error() | |
from models.kts_model import VideoSegmentor | |
from models.clip_model import FeatureExtractor | |
from models.blip2_model import ImageCaptioner | |
from models.grit_model import DenseCaptioner | |
from models.whisper_model import AudioTranslator | |
from models.gpt_model import LlmReasoner | |
from utils.utils import logger_creator, format_time | |
class Vlogger : | |
def __init__(self, args): | |
self.args = args | |
self.alpha = args.alpha | |
self.beta = args.beta | |
self.data_dir = args.data_dir | |
self.tmp_dir = args.tmp_dir | |
self.models_flag = False | |
# self.init_llm() | |
if not os.path.exists(self.tmp_dir): | |
os.makedirs(self.tmp_dir) | |
def init_models(self): | |
print('\033[1;34m' + "Welcome to the our Vlog toolbox...".center(50, '-') + '\033[0m') | |
print('\033[1;33m' + "Initializing models...".center(50, '-') + '\033[0m') | |
print('\033[1;31m' + "This may time-consuming, please wait...".center(50, '-') + '\033[0m') | |
self.feature_extractor = FeatureExtractor(self.args) | |
self.video_segmenter = VideoSegmentor(alpha=self.alpha, beta=self.beta) | |
self.image_captioner = ImageCaptioner(model_name=self.args.captioner_base_model, device=self.args.image_captioner_device) | |
self.dense_captioner = DenseCaptioner(device=self.args.dense_captioner_device) | |
self.audio_translator = AudioTranslator(model=self.args.audio_translator, device=self.args.audio_translator_device) | |
print('\033[1;32m' + "Model initialization finished!".center(50, '-') + '\033[0m') | |
def init_llm_with_api_key(self, api_key): | |
print('\033[1;33m' + "Initializing LLM Reasoner...".center(50, '-') + '\033[0m') | |
os.environ["OPENAI_API_KEY"] = api_key | |
self.llm_reasoner = LlmReasoner(self.args) | |
print('\033[1;32m' + "LLM initialization finished!".center(50, '-') + '\033[0m') | |
def init_llm(self): | |
print('\033[1;33m' + "Initializing LLM Reasoner...".center(50, '-') + '\033[0m') | |
os.environ["OPENAI_API_KEY"] = self.args.openai_api_key | |
self.llm_reasoner = LlmReasoner(self.args) | |
print('\033[1;32m' + "LLM initialization finished!".center(50, '-') + '\033[0m') | |
def video2log(self, video_path): | |
video_path = video_path | |
video_id = os.path.basename(video_path).split('.')[0] | |
if self.llm_reasoner.exist_vectorstore(video_id): | |
return self.printlog(video_id) | |
try: | |
self.llm_reasoner.create_vectorstore(video_id) | |
return self.printlog(video_id) | |
except: | |
pass | |
if not self.models_flag: | |
self.init_models() | |
self.models_flag = True | |
logger = logger_creator(video_id) | |
clip_features, video_length = self.feature_extractor(video_path, video_id) | |
seg_windows = self.video_segmenter(clip_features, video_length) | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
audio_results = self.audio_translator(video_path) | |
for start_sec, end_sec in seg_windows: | |
middle_sec = (start_sec + end_sec) // 2 | |
middle_frame_idx = int(middle_sec * fps) | |
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame_idx) | |
ret, frame = cap.read() | |
if ret: | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
image_caption = self.image_captioner.image_caption(frame) | |
dense_caption = self.dense_captioner.image_dense_caption(frame) | |
audio_transcript = self.audio_translator.match(audio_results, start_sec, end_sec) | |
logger.info(f"When {format_time(start_sec)} - {format_time(end_sec)}") | |
logger.info(f"I saw {image_caption}.") | |
logger.info(f"I found {dense_caption}.") | |
if len(audio_transcript) > 0: | |
logger.info(f"I heard someone say \"{audio_transcript}\"") | |
logger.info("\n") | |
cap.release() | |
self.llm_reasoner.create_vectorstore(video_id) | |
return self.printlog(video_id) | |
def printlog(self, video_id): | |
log_list = [] | |
log_path = os.path.join(self.data_dir, video_id + '.log') | |
with open(log_path, 'r') as f: | |
for line in f: | |
log_list.append(line.strip()) | |
return log_list | |
def chat2video(self, user_input): | |
response = self.llm_reasoner(user_input) | |
return response | |
def clean_history(self): | |
self.llm_reasoner.clean_history() | |
return | |