Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/mainscripts/Merger.py
628 views
1
import math
2
import multiprocessing
3
import traceback
4
from pathlib import Path
5
6
import numpy as np
7
import numpy.linalg as npla
8
9
import samplelib
10
from core import pathex
11
from core.cv2ex import *
12
from core.interact import interact as io
13
from core.joblib import MPClassFuncOnDemand, MPFunc
14
from core.leras import nn
15
from DFLIMG import DFLIMG
16
from facelib import FaceEnhancer, FaceType, LandmarksProcessor, XSegNet
17
from merger import FrameInfo, InteractiveMergerSubprocessor, MergerConfig
18
19
20
def main (model_class_name=None,
21
saved_models_path=None,
22
training_data_src_path=None,
23
force_model_name=None,
24
input_path=None,
25
output_path=None,
26
output_mask_path=None,
27
aligned_path=None,
28
force_gpu_idxs=None,
29
cpu_only=None):
30
io.log_info ("Running merger.\r\n")
31
32
try:
33
if not input_path.exists():
34
io.log_err('Input directory not found. Please ensure it exists.')
35
return
36
37
if not output_path.exists():
38
output_path.mkdir(parents=True, exist_ok=True)
39
40
if not output_mask_path.exists():
41
output_mask_path.mkdir(parents=True, exist_ok=True)
42
43
if not saved_models_path.exists():
44
io.log_err('Model directory not found. Please ensure it exists.')
45
return
46
47
# Initialize model
48
import models
49
model = models.import_model(model_class_name)(is_training=False,
50
saved_models_path=saved_models_path,
51
force_gpu_idxs=force_gpu_idxs,
52
force_model_name=force_model_name,
53
cpu_only=cpu_only)
54
55
predictor_func, predictor_input_shape, cfg = model.get_MergerConfig()
56
57
# Preparing MP functions
58
predictor_func = MPFunc(predictor_func)
59
60
run_on_cpu = len(nn.getCurrentDeviceConfig().devices) == 0
61
xseg_256_extract_func = MPClassFuncOnDemand(XSegNet, 'extract',
62
name='XSeg',
63
resolution=256,
64
weights_file_root=saved_models_path,
65
place_model_on_cpu=True,
66
run_on_cpu=run_on_cpu)
67
68
face_enhancer_func = MPClassFuncOnDemand(FaceEnhancer, 'enhance',
69
place_model_on_cpu=True,
70
run_on_cpu=run_on_cpu)
71
72
is_interactive = io.input_bool ("Use interactive merger?", True) if not io.is_colab() else False
73
74
if not is_interactive:
75
cfg.ask_settings()
76
77
subprocess_count = io.input_int("Number of workers?", max(8, multiprocessing.cpu_count()),
78
valid_range=[1, multiprocessing.cpu_count()], help_message="Specify the number of threads to process. A low value may affect performance. A high value may result in memory error. The value may not be greater than CPU cores." )
79
80
input_path_image_paths = pathex.get_image_paths(input_path)
81
82
if cfg.type == MergerConfig.TYPE_MASKED:
83
if not aligned_path.exists():
84
io.log_err('Aligned directory not found. Please ensure it exists.')
85
return
86
87
packed_samples = None
88
try:
89
packed_samples = samplelib.PackedFaceset.load(aligned_path)
90
except:
91
io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(aligned_path)}, {traceback.format_exc()}")
92
93
94
if packed_samples is not None:
95
io.log_info ("Using packed faceset.")
96
def generator():
97
for sample in io.progress_bar_generator( packed_samples, "Collecting alignments"):
98
filepath = Path(sample.filename)
99
yield filepath, DFLIMG.load(filepath, loader_func=lambda x: sample.read_raw_file() )
100
else:
101
def generator():
102
for filepath in io.progress_bar_generator( pathex.get_image_paths(aligned_path), "Collecting alignments"):
103
filepath = Path(filepath)
104
yield filepath, DFLIMG.load(filepath)
105
106
alignments = {}
107
multiple_faces_detected = False
108
109
for filepath, dflimg in generator():
110
if dflimg is None or not dflimg.has_data():
111
io.log_err (f"{filepath.name} is not a dfl image file")
112
continue
113
114
source_filename = dflimg.get_source_filename()
115
if source_filename is None:
116
continue
117
118
source_filepath = Path(source_filename)
119
source_filename_stem = source_filepath.stem
120
121
if source_filename_stem not in alignments.keys():
122
alignments[ source_filename_stem ] = []
123
124
alignments_ar = alignments[ source_filename_stem ]
125
alignments_ar.append ( (dflimg.get_source_landmarks(), filepath, source_filepath ) )
126
127
if len(alignments_ar) > 1:
128
multiple_faces_detected = True
129
130
if multiple_faces_detected:
131
io.log_info ("")
132
io.log_info ("Warning: multiple faces detected. Only one alignment file should refer one source file.")
133
io.log_info ("")
134
135
for a_key in list(alignments.keys()):
136
a_ar = alignments[a_key]
137
if len(a_ar) > 1:
138
for _, filepath, source_filepath in a_ar:
139
io.log_info (f"alignment {filepath.name} refers to {source_filepath.name} ")
140
io.log_info ("")
141
142
alignments[a_key] = [ a[0] for a in a_ar]
143
144
if multiple_faces_detected:
145
io.log_info ("It is strongly recommended to process the faces separatelly.")
146
io.log_info ("Use 'recover original filename' to determine the exact duplicates.")
147
io.log_info ("")
148
149
frames = [ InteractiveMergerSubprocessor.Frame( frame_info=FrameInfo(filepath=Path(p),
150
landmarks_list=alignments.get(Path(p).stem, None)
151
)
152
)
153
for p in input_path_image_paths ]
154
155
if multiple_faces_detected:
156
io.log_info ("Warning: multiple faces detected. Motion blur will not be used.")
157
io.log_info ("")
158
else:
159
s = 256
160
local_pts = [ (s//2-1, s//2-1), (s//2-1,0) ] #center+up
161
frames_len = len(frames)
162
for i in io.progress_bar_generator( range(len(frames)) , "Computing motion vectors"):
163
fi_prev = frames[max(0, i-1)].frame_info
164
fi = frames[i].frame_info
165
fi_next = frames[min(i+1, frames_len-1)].frame_info
166
if len(fi_prev.landmarks_list) == 0 or \
167
len(fi.landmarks_list) == 0 or \
168
len(fi_next.landmarks_list) == 0:
169
continue
170
171
mat_prev = LandmarksProcessor.get_transform_mat ( fi_prev.landmarks_list[0], s, face_type=FaceType.FULL)
172
mat = LandmarksProcessor.get_transform_mat ( fi.landmarks_list[0] , s, face_type=FaceType.FULL)
173
mat_next = LandmarksProcessor.get_transform_mat ( fi_next.landmarks_list[0], s, face_type=FaceType.FULL)
174
175
pts_prev = LandmarksProcessor.transform_points (local_pts, mat_prev, True)
176
pts = LandmarksProcessor.transform_points (local_pts, mat, True)
177
pts_next = LandmarksProcessor.transform_points (local_pts, mat_next, True)
178
179
prev_vector = pts[0]-pts_prev[0]
180
next_vector = pts_next[0]-pts[0]
181
182
motion_vector = pts_next[0] - pts_prev[0]
183
fi.motion_power = npla.norm(motion_vector)
184
185
motion_vector = motion_vector / fi.motion_power if fi.motion_power != 0 else np.array([0,0],dtype=np.float32)
186
187
fi.motion_deg = -math.atan2(motion_vector[1],motion_vector[0])*180 / math.pi
188
189
190
if len(frames) == 0:
191
io.log_info ("No frames to merge in input_dir.")
192
else:
193
if False:
194
pass
195
else:
196
InteractiveMergerSubprocessor (
197
is_interactive = is_interactive,
198
merger_session_filepath = model.get_strpath_storage_for_file('merger_session.dat'),
199
predictor_func = predictor_func,
200
predictor_input_shape = predictor_input_shape,
201
face_enhancer_func = face_enhancer_func,
202
xseg_256_extract_func = xseg_256_extract_func,
203
merger_config = cfg,
204
frames = frames,
205
frames_root_path = input_path,
206
output_path = output_path,
207
output_mask_path = output_mask_path,
208
model_iter = model.get_iter(),
209
subprocess_count = subprocess_count,
210
).run()
211
212
model.finalize()
213
214
except Exception as e:
215
print ( traceback.format_exc() )
216
217
218
"""
219
elif cfg.type == MergerConfig.TYPE_FACE_AVATAR:
220
filesdata = []
221
for filepath in io.progress_bar_generator(input_path_image_paths, "Collecting info"):
222
filepath = Path(filepath)
223
224
dflimg = DFLIMG.x(filepath)
225
if dflimg is None:
226
io.log_err ("%s is not a dfl image file" % (filepath.name) )
227
continue
228
filesdata += [ ( FrameInfo(filepath=filepath, landmarks_list=[dflimg.get_landmarks()] ), dflimg.get_source_filename() ) ]
229
230
filesdata = sorted(filesdata, key=operator.itemgetter(1)) #sort by source_filename
231
frames = []
232
filesdata_len = len(filesdata)
233
for i in range(len(filesdata)):
234
frame_info = filesdata[i][0]
235
236
prev_temporal_frame_infos = []
237
next_temporal_frame_infos = []
238
239
for t in range (cfg.temporal_face_count):
240
prev_frame_info = filesdata[ max(i -t, 0) ][0]
241
next_frame_info = filesdata[ min(i +t, filesdata_len-1 )][0]
242
243
prev_temporal_frame_infos.insert (0, prev_frame_info )
244
next_temporal_frame_infos.append ( next_frame_info )
245
246
frames.append ( InteractiveMergerSubprocessor.Frame(prev_temporal_frame_infos=prev_temporal_frame_infos,
247
frame_info=frame_info,
248
next_temporal_frame_infos=next_temporal_frame_infos) )
249
"""
250
251
#interpolate landmarks
252
#from facelib import LandmarksProcessor
253
#from facelib import FaceType
254
#a = sorted(alignments.keys())
255
#a_len = len(a)
256
#
257
#box_pts = 3
258
#box = np.ones(box_pts)/box_pts
259
#for i in range( a_len ):
260
# if i >= box_pts and i <= a_len-box_pts-1:
261
# af0 = alignments[ a[i] ][0] ##first face
262
# m0 = LandmarksProcessor.get_transform_mat (af0, 256, face_type=FaceType.FULL)
263
#
264
# points = []
265
#
266
# for j in range(-box_pts, box_pts+1):
267
# af = alignments[ a[i+j] ][0] ##first face
268
# m = LandmarksProcessor.get_transform_mat (af, 256, face_type=FaceType.FULL)
269
# p = LandmarksProcessor.transform_points (af, m)
270
# points.append (p)
271
#
272
# points = np.array(points)
273
# points_len = len(points)
274
# t_points = np.transpose(points, [1,0,2])
275
#
276
# p1 = np.array ( [ int(np.convolve(x[:,0], box, mode='same')[points_len//2]) for x in t_points ] )
277
# p2 = np.array ( [ int(np.convolve(x[:,1], box, mode='same')[points_len//2]) for x in t_points ] )
278
#
279
# new_points = np.concatenate( [np.expand_dims(p1,-1),np.expand_dims(p2,-1)], -1 )
280
#
281
# alignments[ a[i] ][0] = LandmarksProcessor.transform_points (new_points, m0, True).astype(np.int32)
282
283