Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/mainscripts/FacesetResizer.py
628 views
1
import multiprocessing
2
import shutil
3
4
import cv2
5
from core import pathex
6
from core.cv2ex import *
7
from core.interact import interact as io
8
from core.joblib import Subprocessor
9
from DFLIMG import *
10
from facelib import FaceType, LandmarksProcessor
11
12
13
class FacesetResizerSubprocessor(Subprocessor):
14
15
#override
16
def __init__(self, image_paths, output_dirpath, image_size, face_type=None):
17
self.image_paths = image_paths
18
self.output_dirpath = output_dirpath
19
self.image_size = image_size
20
self.face_type = face_type
21
self.result = []
22
23
super().__init__('FacesetResizer', FacesetResizerSubprocessor.Cli, 600)
24
25
#override
26
def on_clients_initialized(self):
27
io.progress_bar (None, len (self.image_paths))
28
29
#override
30
def on_clients_finalized(self):
31
io.progress_bar_close()
32
33
#override
34
def process_info_generator(self):
35
base_dict = {'output_dirpath':self.output_dirpath, 'image_size':self.image_size, 'face_type':self.face_type}
36
37
for device_idx in range( min(8, multiprocessing.cpu_count()) ):
38
client_dict = base_dict.copy()
39
device_name = f'CPU #{device_idx}'
40
client_dict['device_name'] = device_name
41
yield device_name, {}, client_dict
42
43
#override
44
def get_data(self, host_dict):
45
if len (self.image_paths) > 0:
46
return self.image_paths.pop(0)
47
48
#override
49
def on_data_return (self, host_dict, data):
50
self.image_paths.insert(0, data)
51
52
#override
53
def on_result (self, host_dict, data, result):
54
io.progress_bar_inc(1)
55
if result[0] == 1:
56
self.result +=[ (result[1], result[2]) ]
57
58
#override
59
def get_result(self):
60
return self.result
61
62
class Cli(Subprocessor.Cli):
63
64
#override
65
def on_initialize(self, client_dict):
66
self.output_dirpath = client_dict['output_dirpath']
67
self.image_size = client_dict['image_size']
68
self.face_type = client_dict['face_type']
69
self.log_info (f"Running on { client_dict['device_name'] }")
70
71
#override
72
def process_data(self, filepath):
73
try:
74
dflimg = DFLIMG.load (filepath)
75
if dflimg is None or not dflimg.has_data():
76
self.log_err (f"{filepath.name} is not a dfl image file")
77
else:
78
img = cv2_imread(filepath)
79
h,w = img.shape[:2]
80
if h != w:
81
raise Exception(f'w != h in {filepath}')
82
83
image_size = self.image_size
84
face_type = self.face_type
85
output_filepath = self.output_dirpath / filepath.name
86
87
if face_type is not None:
88
lmrks = dflimg.get_landmarks()
89
mat = LandmarksProcessor.get_transform_mat(lmrks, image_size, face_type)
90
91
img = cv2.warpAffine(img, mat, (image_size, image_size), flags=cv2.INTER_LANCZOS4 )
92
img = np.clip(img, 0, 255).astype(np.uint8)
93
94
cv2_imwrite ( str(output_filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
95
96
dfl_dict = dflimg.get_dict()
97
dflimg = DFLIMG.load (output_filepath)
98
dflimg.set_dict(dfl_dict)
99
100
xseg_mask = dflimg.get_xseg_mask()
101
if xseg_mask is not None:
102
xseg_res = 256
103
104
xseg_lmrks = lmrks.copy()
105
xseg_lmrks *= (xseg_res / w)
106
xseg_mat = LandmarksProcessor.get_transform_mat(xseg_lmrks, xseg_res, face_type)
107
108
xseg_mask = cv2.warpAffine(xseg_mask, xseg_mat, (xseg_res, xseg_res), flags=cv2.INTER_LANCZOS4 )
109
xseg_mask[xseg_mask < 0.5] = 0
110
xseg_mask[xseg_mask >= 0.5] = 1
111
112
dflimg.set_xseg_mask(xseg_mask)
113
114
seg_ie_polys = dflimg.get_seg_ie_polys()
115
116
for poly in seg_ie_polys.get_polys():
117
poly_pts = poly.get_pts()
118
poly_pts = LandmarksProcessor.transform_points(poly_pts, mat)
119
poly.set_points(poly_pts)
120
121
dflimg.set_seg_ie_polys(seg_ie_polys)
122
123
lmrks = LandmarksProcessor.transform_points(lmrks, mat)
124
dflimg.set_landmarks(lmrks)
125
126
image_to_face_mat = dflimg.get_image_to_face_mat()
127
if image_to_face_mat is not None:
128
image_to_face_mat = LandmarksProcessor.get_transform_mat ( dflimg.get_source_landmarks(), image_size, face_type )
129
dflimg.set_image_to_face_mat(image_to_face_mat)
130
dflimg.set_face_type( FaceType.toString(face_type) )
131
dflimg.save()
132
133
else:
134
dfl_dict = dflimg.get_dict()
135
136
scale = w / image_size
137
138
img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_LANCZOS4)
139
140
cv2_imwrite ( str(output_filepath), img, [int(cv2.IMWRITE_JPEG_QUALITY), 100] )
141
142
dflimg = DFLIMG.load (output_filepath)
143
dflimg.set_dict(dfl_dict)
144
145
lmrks = dflimg.get_landmarks()
146
lmrks /= scale
147
dflimg.set_landmarks(lmrks)
148
149
seg_ie_polys = dflimg.get_seg_ie_polys()
150
seg_ie_polys.mult_points( 1.0 / scale)
151
dflimg.set_seg_ie_polys(seg_ie_polys)
152
153
image_to_face_mat = dflimg.get_image_to_face_mat()
154
155
if image_to_face_mat is not None:
156
face_type = FaceType.fromString ( dflimg.get_face_type() )
157
image_to_face_mat = LandmarksProcessor.get_transform_mat ( dflimg.get_source_landmarks(), image_size, face_type )
158
dflimg.set_image_to_face_mat(image_to_face_mat)
159
dflimg.save()
160
161
return (1, filepath, output_filepath)
162
except:
163
self.log_err (f"Exception occured while processing file {filepath}. Error: {traceback.format_exc()}")
164
165
return (0, filepath, None)
166
167
def process_folder ( dirpath):
168
169
image_size = io.input_int(f"New image size", 512, valid_range=[128,2048])
170
171
face_type = io.input_str ("Change face type", 'same', ['h','mf','f','wf','head','same']).lower()
172
if face_type == 'same':
173
face_type = None
174
else:
175
face_type = {'h' : FaceType.HALF,
176
'mf' : FaceType.MID_FULL,
177
'f' : FaceType.FULL,
178
'wf' : FaceType.WHOLE_FACE,
179
'head' : FaceType.HEAD}[face_type]
180
181
182
output_dirpath = dirpath.parent / (dirpath.name + '_resized')
183
output_dirpath.mkdir (exist_ok=True, parents=True)
184
185
dirpath_parts = '/'.join( dirpath.parts[-2:])
186
output_dirpath_parts = '/'.join( output_dirpath.parts[-2:] )
187
io.log_info (f"Resizing faceset in {dirpath_parts}")
188
io.log_info ( f"Processing to {output_dirpath_parts}")
189
190
output_images_paths = pathex.get_image_paths(output_dirpath)
191
if len(output_images_paths) > 0:
192
for filename in output_images_paths:
193
Path(filename).unlink()
194
195
image_paths = [Path(x) for x in pathex.get_image_paths( dirpath )]
196
result = FacesetResizerSubprocessor ( image_paths, output_dirpath, image_size, face_type).run()
197
198
is_merge = io.input_bool (f"\r\nMerge {output_dirpath_parts} to {dirpath_parts} ?", True)
199
if is_merge:
200
io.log_info (f"Copying processed files to {dirpath_parts}")
201
202
for (filepath, output_filepath) in result:
203
try:
204
shutil.copy (output_filepath, filepath)
205
except:
206
pass
207
208
io.log_info (f"Removing {output_dirpath_parts}")
209
shutil.rmtree(output_dirpath)
210
211