Spaces:
Running
Running
import roop.globals | |
from threading import Thread | |
from chain_img_processor import ChainImgProcessor | |
class ThreadWithReturnValue(Thread): | |
def __init__(self, group=None, target=None, name=None, | |
args=(), kwargs={}, Verbose=None): | |
Thread.__init__(self, group, target, name, args, kwargs) | |
self._return = None | |
def run(self): | |
if self._target is not None: | |
self._return = self._target(*self._args, | |
**self._kwargs) | |
def join(self, *args): | |
Thread.join(self, *args) | |
return self._return | |
# in beta | |
class ChainVideoProcessor(ChainImgProcessor): | |
def __init__(self): | |
ChainImgProcessor.__init__(self) | |
self.video_save_codec = "libx264" | |
self.video_save_crf = 14 | |
def init_with_plugins(self): | |
self.init_plugins(["core","core_video"]) | |
self.display_init_info() | |
init_on_start_arr = self.init_on_start.split(",") | |
for proc_id in init_on_start_arr: | |
self.init_processor(proc_id) | |
def run_video_chain(self, source_video, target_video, fps, threads:int = 1, chain = None, params_frame_gen_func = None, video_audio = None): | |
import cv2 | |
from tqdm import tqdm | |
from chain_img_processor.ffmpeg_writer import FFMPEG_VideoWriter # ffmpeg install needed | |
cap = cv2.VideoCapture(source_video) | |
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
# first frame do manually - because upscale may happen, we need to estimate width/height | |
ret, frame = cap.read() | |
if params_frame_gen_func is not None: | |
params = params_frame_gen_func(self, frame) | |
else: | |
params = {} | |
params["original_frame"] = frame | |
frame_processed, params = self.run_chain(frame,params,chain) | |
height, width, channels = frame_processed.shape | |
self.fill_processors_for_thread_chains(threads,chain) | |
#print(self.processors_objects) | |
#import threading | |
#locks:list[threading.Lock] = [] | |
locks: list[bool] = [] | |
for i in range(threads): | |
#locks.append(threading.Lock()) | |
locks.append(False) | |
temp = [] | |
with FFMPEG_VideoWriter(target_video, (width, height), fps, codec=roop.globals.video_encoder, crf=roop.globals.video_quality, audiofile=video_audio) as output_video_ff: | |
with tqdm(total=frame_count, desc='Processing', unit="frame", dynamic_ncols=True, | |
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]') as progress: | |
# do first frame | |
output_video_ff.write_frame(frame_processed) | |
progress.update(1) # | |
cnt_frames = 0 | |
# do rest frames | |
while True: | |
# getting frame | |
ret, frame = cap.read() | |
if not ret: | |
break | |
cnt_frames+=1 | |
thread_ind = cnt_frames % threads | |
# we are having an array of length %gpu_threads%, running in parallel | |
# so if array is equal or longer than gpu threads, waiting | |
#while len(temp) >= threads: | |
while locks[thread_ind]: | |
#print('WAIT', thread_ind) | |
# we are order dependent, so we are forced to wait for first element to finish. When finished removing thread from the list | |
frame_processed, params = temp.pop(0).join() | |
locks[params["_thread_index"]] = False | |
#print('OFF',cnt_frames,locks[params["_thread_index"]],locks) | |
# writing into output | |
output_video_ff.write_frame(frame_processed) | |
# updating the status | |
progress.update(1) | |
# calc params for frame | |
if params_frame_gen_func is not None: | |
params = params_frame_gen_func(self,frame) | |
else: | |
params = {} | |
# adding new frame to the list and starting it | |
locks[thread_ind] = True | |
#print('ON', cnt_frames, thread_ind, locks) | |
params["original_frame"] = frame | |
temp.append( | |
ThreadWithReturnValue(target=self.run_chain, args=(frame, params, chain, thread_ind))) | |
temp[-1].start() | |
while len(temp) > 0: | |
# we are order dependent, so we are forced to wait for first element to finish. When finished removing thread from the list | |
frame_processed, params = temp.pop(0).join() | |
locks[params["_thread_index"]] = False | |
# writing into output | |
output_video_ff.write_frame(frame_processed) | |
progress.update(1) | |
#print("FINAL", locks) | |
_video_processor:ChainVideoProcessor = None | |
def get_single_video_processor() -> ChainVideoProcessor: | |
global _video_processor | |
if _video_processor is None: | |
_video_processor = ChainVideoProcessor() | |
_video_processor.init_with_plugins() | |
return _video_processor | |