File size: 4,004 Bytes
2faefa9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import sys
import importlib
import psutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
from types import ModuleType
from typing import Any, List, Callable
from tqdm import tqdm

import DeepFakeAI.globals
from DeepFakeAI import wording

FRAME_PROCESSORS_MODULES : List[ModuleType] = []
FRAME_PROCESSORS_METHODS =\
[
	'get_frame_processor',
	'clear_frame_processor',
	'pre_check',
	'pre_process',
	'process_frame',
	'process_frames',
	'process_image',
	'process_video',
	'post_process'
]


def load_frame_processor_module(frame_processor : str) -> Any:
	try:
		frame_processor_module = importlib.import_module('DeepFakeAI.processors.frame.modules.' + frame_processor)
		for method_name in FRAME_PROCESSORS_METHODS:
			if not hasattr(frame_processor_module, method_name):
				raise NotImplementedError
	except ModuleNotFoundError:
		sys.exit(wording.get('frame_processor_not_loaded').format(frame_processor = frame_processor))
	except NotImplementedError:
		sys.exit(wording.get('frame_processor_not_implemented').format(frame_processor = frame_processor))
	return frame_processor_module


def get_frame_processors_modules(frame_processors : List[str]) -> List[ModuleType]:
	global FRAME_PROCESSORS_MODULES

	if not FRAME_PROCESSORS_MODULES:
		for frame_processor in frame_processors:
			frame_processor_module = load_frame_processor_module(frame_processor)
			FRAME_PROCESSORS_MODULES.append(frame_processor_module)
	return FRAME_PROCESSORS_MODULES


def clear_frame_processors_modules() -> None:
	global FRAME_PROCESSORS_MODULES

	for frame_processor_module in get_frame_processors_modules(DeepFakeAI.globals.frame_processors):
		frame_processor_module.clear_frame_processor()
	FRAME_PROCESSORS_MODULES = []


def multi_process_frame(source_path : str, temp_frame_paths : List[str], process_frames: Callable[[str, List[str], Any], None], update: Callable[[], None]) -> None:
	with ThreadPoolExecutor(max_workers = DeepFakeAI.globals.execution_thread_count) as executor:
		futures = []
		queue = create_queue(temp_frame_paths)
		queue_per_future = max(len(temp_frame_paths) // DeepFakeAI.globals.execution_thread_count * DeepFakeAI.globals.execution_queue_count, 1)
		while not queue.empty():
			future = executor.submit(process_frames, source_path, pick_queue(queue, queue_per_future), update)
			futures.append(future)
		for future in as_completed(futures):
			future.result()


def create_queue(temp_frame_paths : List[str]) -> Queue[str]:
	queue: Queue[str] = Queue()
	for frame_path in temp_frame_paths:
		queue.put(frame_path)
	return queue


def pick_queue(queue : Queue[str], queue_per_future : int) -> List[str]:
	queues = []
	for _ in range(queue_per_future):
		if not queue.empty():
			queues.append(queue.get())
	return queues


def process_video(source_path : str, frame_paths : List[str], process_frames : Callable[[str, List[str], Any], None]) -> None:
	progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
	total = len(frame_paths)
	with tqdm(total = total, desc = wording.get('processing'), unit = 'frame', dynamic_ncols = True, bar_format = progress_bar_format) as progress:
		multi_process_frame(source_path, frame_paths, process_frames, lambda: update_progress(progress))


def update_progress(progress : Any = None) -> None:
	process = psutil.Process(os.getpid())
	memory_usage = process.memory_info().rss / 1024 / 1024 / 1024
	progress.set_postfix(
	{
		'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB',
		'execution_providers': DeepFakeAI.globals.execution_providers,
		'execution_thread_count': DeepFakeAI.globals.execution_thread_count,
		'execution_queue_count': DeepFakeAI.globals.execution_queue_count
	})
	progress.refresh()
	progress.update(1)


def get_device() -> str:
	if 'CUDAExecutionProvider' in DeepFakeAI.globals.execution_providers:
		return 'cuda'
	if 'CoreMLExecutionProvider' in DeepFakeAI.globals.execution_providers:
		return 'mps'
	return 'cpu'