Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/mainscripts/FacesetEnhancer.py
628 views
1
import multiprocessing
2
import shutil
3
4
from DFLIMG import *
5
from core.interact import interact as io
6
from core.joblib import Subprocessor
7
from core.leras import nn
8
from core import pathex
9
from core.cv2ex import *
10
11
12
class FacesetEnhancerSubprocessor(Subprocessor):
13
14
#override
15
def __init__(self, image_paths, output_dirpath, device_config):
16
self.image_paths = image_paths
17
self.output_dirpath = output_dirpath
18
self.result = []
19
self.nn_initialize_mp_lock = multiprocessing.Lock()
20
self.devices = FacesetEnhancerSubprocessor.get_devices_for_config(device_config)
21
22
super().__init__('FacesetEnhancer', FacesetEnhancerSubprocessor.Cli, 600)
23
24
#override
25
def on_clients_initialized(self):
26
io.progress_bar (None, len (self.image_paths))
27
28
#override
29
def on_clients_finalized(self):
30
io.progress_bar_close()
31
32
#override
33
def process_info_generator(self):
34
base_dict = {'output_dirpath':self.output_dirpath,
35
'nn_initialize_mp_lock': self.nn_initialize_mp_lock,}
36
37
for (device_idx, device_type, device_name, device_total_vram_gb) in self.devices:
38
client_dict = base_dict.copy()
39
client_dict['device_idx'] = device_idx
40
client_dict['device_name'] = device_name
41
client_dict['device_type'] = device_type
42
yield client_dict['device_name'], {}, client_dict
43
44
#override
45
def get_data(self, host_dict):
46
if len (self.image_paths) > 0:
47
return self.image_paths.pop(0)
48
49
#override
50
def on_data_return (self, host_dict, data):
51
self.image_paths.insert(0, data)
52
53
#override
54
def on_result (self, host_dict, data, result):
55
io.progress_bar_inc(1)
56
if result[0] == 1:
57
self.result +=[ (result[1], result[2]) ]
58
59
#override
60
def get_result(self):
61
return self.result
62
63
@staticmethod
64
def get_devices_for_config (device_config):
65
devices = device_config.devices
66
cpu_only = len(devices) == 0
67
68
if not cpu_only:
69
return [ (device.index, 'GPU', device.name, device.total_mem_gb) for device in devices ]
70
else:
71
return [ (i, 'CPU', 'CPU%d' % (i), 0 ) for i in range( min(8, multiprocessing.cpu_count() // 2) ) ]
72
73
class Cli(Subprocessor.Cli):
74
75
#override
76
def on_initialize(self, client_dict):
77
device_idx = client_dict['device_idx']
78
cpu_only = client_dict['device_type'] == 'CPU'
79
self.output_dirpath = client_dict['output_dirpath']
80
nn_initialize_mp_lock = client_dict['nn_initialize_mp_lock']
81
82
if cpu_only:
83
device_config = nn.DeviceConfig.CPU()
84
device_vram = 99
85
else:
86
device_config = nn.DeviceConfig.GPUIndexes ([device_idx])
87
device_vram = device_config.devices[0].total_mem_gb
88
89
nn.initialize (device_config)
90
91
intro_str = 'Running on %s.' % (client_dict['device_name'])
92
93
self.log_info (intro_str)
94
95
from facelib import FaceEnhancer
96
self.fe = FaceEnhancer( place_model_on_cpu=(device_vram<=2 or cpu_only), run_on_cpu=cpu_only )
97
98
#override
99
def process_data(self, filepath):
100
try:
101
dflimg = DFLIMG.load (filepath)
102
if dflimg is None or not dflimg.has_data():
103
self.log_err (f"{filepath.name} is not a dfl image file")
104
else:
105
dfl_dict = dflimg.get_dict()
106
107
img = cv2_imread(filepath).astype(np.float32) / 255.0
108
img = self.fe.enhance(img)
109
img = np.clip (img*255, 0, 255).astype(np.uint8)
110
111
output_filepath = self.output_dirpath / filepath.name
112
113
cv2_imwrite ( str(output_filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
114
115
dflimg = DFLIMG.load (output_filepath)
116
dflimg.set_dict(dfl_dict)
117
dflimg.save()
118
119
return (1, filepath, output_filepath)
120
except:
121
self.log_err (f"Exception occured while processing file {filepath}. Error: {traceback.format_exc()}")
122
123
return (0, filepath, None)
124
125
def process_folder ( dirpath, cpu_only=False, force_gpu_idxs=None ):
126
device_config = nn.DeviceConfig.GPUIndexes( force_gpu_idxs or nn.ask_choose_device_idxs(suggest_all_gpu=True) ) \
127
if not cpu_only else nn.DeviceConfig.CPU()
128
129
output_dirpath = dirpath.parent / (dirpath.name + '_enhanced')
130
output_dirpath.mkdir (exist_ok=True, parents=True)
131
132
dirpath_parts = '/'.join( dirpath.parts[-2:])
133
output_dirpath_parts = '/'.join( output_dirpath.parts[-2:] )
134
io.log_info (f"Enhancing faceset in {dirpath_parts}")
135
io.log_info ( f"Processing to {output_dirpath_parts}")
136
137
output_images_paths = pathex.get_image_paths(output_dirpath)
138
if len(output_images_paths) > 0:
139
for filename in output_images_paths:
140
Path(filename).unlink()
141
142
image_paths = [Path(x) for x in pathex.get_image_paths( dirpath )]
143
result = FacesetEnhancerSubprocessor ( image_paths, output_dirpath, device_config=device_config).run()
144
145
is_merge = io.input_bool (f"\r\nMerge {output_dirpath_parts} to {dirpath_parts} ?", True)
146
if is_merge:
147
io.log_info (f"Copying processed files to {dirpath_parts}")
148
149
for (filepath, output_filepath) in result:
150
try:
151
shutil.copy (output_filepath, filepath)
152
except:
153
pass
154
155
io.log_info (f"Removing {output_dirpath_parts}")
156
shutil.rmtree(output_dirpath)
157
158