Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/mainscripts/VideoEd.py
628 views
1
import subprocess
2
import numpy as np
3
import ffmpeg
4
from pathlib import Path
5
from core import pathex
6
from core.interact import interact as io
7
8
def extract_video(input_file, output_dir, output_ext=None, fps=None):
9
input_file_path = Path(input_file)
10
output_path = Path(output_dir)
11
12
if not output_path.exists():
13
output_path.mkdir(exist_ok=True)
14
15
16
if input_file_path.suffix == '.*':
17
input_file_path = pathex.get_first_file_by_stem (input_file_path.parent, input_file_path.stem)
18
else:
19
if not input_file_path.exists():
20
input_file_path = None
21
22
if input_file_path is None:
23
io.log_err("input_file not found.")
24
return
25
26
if fps is None:
27
fps = io.input_int ("Enter FPS", 0, help_message="How many frames of every second of the video will be extracted. 0 - full fps")
28
29
if output_ext is None:
30
output_ext = io.input_str ("Output image format", "png", ["png","jpg"], help_message="png is lossless, but extraction is x10 slower for HDD, requires x10 more disk space than jpg.")
31
32
for filename in pathex.get_image_paths (output_path, ['.'+output_ext]):
33
Path(filename).unlink()
34
35
job = ffmpeg.input(str(input_file_path))
36
37
kwargs = {'pix_fmt': 'rgb24'}
38
if fps != 0:
39
kwargs.update ({'r':str(fps)})
40
41
if output_ext == 'jpg':
42
kwargs.update ({'q:v':'2'}) #highest quality for jpg
43
44
job = job.output( str (output_path / ('%5d.'+output_ext)), **kwargs )
45
46
try:
47
job = job.run()
48
except:
49
io.log_err ("ffmpeg fail, job commandline:" + str(job.compile()) )
50
51
def cut_video ( input_file, from_time=None, to_time=None, audio_track_id=None, bitrate=None):
52
input_file_path = Path(input_file)
53
if input_file_path is None:
54
io.log_err("input_file not found.")
55
return
56
57
output_file_path = input_file_path.parent / (input_file_path.stem + "_cut" + input_file_path.suffix)
58
59
if from_time is None:
60
from_time = io.input_str ("From time", "00:00:00.000")
61
62
if to_time is None:
63
to_time = io.input_str ("To time", "00:00:00.000")
64
65
if audio_track_id is None:
66
audio_track_id = io.input_int ("Specify audio track id.", 0)
67
68
if bitrate is None:
69
bitrate = max (1, io.input_int ("Bitrate of output file in MB/s", 25) )
70
71
kwargs = {"c:v": "libx264",
72
"b:v": "%dM" %(bitrate),
73
"pix_fmt": "yuv420p",
74
}
75
76
job = ffmpeg.input(str(input_file_path), ss=from_time, to=to_time)
77
78
job_v = job['v:0']
79
job_a = job['a:' + str(audio_track_id) + '?' ]
80
81
job = ffmpeg.output(job_v, job_a, str(output_file_path), **kwargs).overwrite_output()
82
83
try:
84
job = job.run()
85
except:
86
io.log_err ("ffmpeg fail, job commandline:" + str(job.compile()) )
87
88
def denoise_image_sequence( input_dir, ext=None, factor=None ):
89
input_path = Path(input_dir)
90
91
if not input_path.exists():
92
io.log_err("input_dir not found.")
93
return
94
95
image_paths = [ Path(filepath) for filepath in pathex.get_image_paths(input_path) ]
96
97
# Check extension of all images
98
image_paths_suffix = None
99
for filepath in image_paths:
100
if image_paths_suffix is None:
101
image_paths_suffix = filepath.suffix
102
else:
103
if filepath.suffix != image_paths_suffix:
104
io.log_err(f"All images in {input_path.name} should be with the same extension.")
105
return
106
107
if factor is None:
108
factor = np.clip ( io.input_int ("Denoise factor?", 7, add_info="1-20"), 1, 20 )
109
110
# Rename to temporary filenames
111
for i,filepath in io.progress_bar_generator( enumerate(image_paths), "Renaming", leave=False):
112
src = filepath
113
dst = filepath.parent / ( f'{i+1:06}_{filepath.name}' )
114
try:
115
src.rename (dst)
116
except:
117
io.log_error ('fail to rename %s' % (src.name) )
118
return
119
120
# Rename to sequental filenames
121
for i,filepath in io.progress_bar_generator( enumerate(image_paths), "Renaming", leave=False):
122
123
src = filepath.parent / ( f'{i+1:06}_{filepath.name}' )
124
dst = filepath.parent / ( f'{i+1:06}{filepath.suffix}' )
125
try:
126
src.rename (dst)
127
except:
128
io.log_error ('fail to rename %s' % (src.name) )
129
return
130
131
# Process image sequence in ffmpeg
132
kwargs = {}
133
if image_paths_suffix == '.jpg':
134
kwargs.update ({'q:v':'2'})
135
136
job = ( ffmpeg
137
.input(str ( input_path / ('%6d'+image_paths_suffix) ) )
138
.filter("hqdn3d", factor, factor, 5,5)
139
.output(str ( input_path / ('%6d'+image_paths_suffix) ), **kwargs )
140
)
141
142
try:
143
job = job.run()
144
except:
145
io.log_err ("ffmpeg fail, job commandline:" + str(job.compile()) )
146
147
# Rename to temporary filenames
148
for i,filepath in io.progress_bar_generator( enumerate(image_paths), "Renaming", leave=False):
149
src = filepath.parent / ( f'{i+1:06}{filepath.suffix}' )
150
dst = filepath.parent / ( f'{i+1:06}_{filepath.name}' )
151
try:
152
src.rename (dst)
153
except:
154
io.log_error ('fail to rename %s' % (src.name) )
155
return
156
157
# Rename to initial filenames
158
for i,filepath in io.progress_bar_generator( enumerate(image_paths), "Renaming", leave=False):
159
src = filepath.parent / ( f'{i+1:06}_{filepath.name}' )
160
dst = filepath
161
162
try:
163
src.rename (dst)
164
except:
165
io.log_error ('fail to rename %s' % (src.name) )
166
return
167
168
def video_from_sequence( input_dir, output_file, reference_file=None, ext=None, fps=None, bitrate=None, include_audio=False, lossless=None ):
169
input_path = Path(input_dir)
170
output_file_path = Path(output_file)
171
reference_file_path = Path(reference_file) if reference_file is not None else None
172
173
if not input_path.exists():
174
io.log_err("input_dir not found.")
175
return
176
177
if not output_file_path.parent.exists():
178
output_file_path.parent.mkdir(parents=True, exist_ok=True)
179
return
180
181
out_ext = output_file_path.suffix
182
183
if ext is None:
184
ext = io.input_str ("Input image format (extension)", "png")
185
186
if lossless is None:
187
lossless = io.input_bool ("Use lossless codec", False)
188
189
video_id = None
190
audio_id = None
191
ref_in_a = None
192
if reference_file_path is not None:
193
if reference_file_path.suffix == '.*':
194
reference_file_path = pathex.get_first_file_by_stem (reference_file_path.parent, reference_file_path.stem)
195
else:
196
if not reference_file_path.exists():
197
reference_file_path = None
198
199
if reference_file_path is None:
200
io.log_err("reference_file not found.")
201
return
202
203
#probing reference file
204
probe = ffmpeg.probe (str(reference_file_path))
205
206
#getting first video and audio streams id with fps
207
for stream in probe['streams']:
208
if video_id is None and stream['codec_type'] == 'video':
209
video_id = stream['index']
210
fps = stream['r_frame_rate']
211
212
if audio_id is None and stream['codec_type'] == 'audio':
213
audio_id = stream['index']
214
215
if audio_id is not None:
216
#has audio track
217
ref_in_a = ffmpeg.input (str(reference_file_path))[str(audio_id)]
218
219
if fps is None:
220
#if fps not specified and not overwritten by reference-file
221
fps = max (1, io.input_int ("Enter FPS", 25) )
222
223
if not lossless and bitrate is None:
224
bitrate = max (1, io.input_int ("Bitrate of output file in MB/s", 16) )
225
226
input_image_paths = pathex.get_image_paths(input_path)
227
228
i_in = ffmpeg.input('pipe:', format='image2pipe', r=fps)
229
230
output_args = [i_in]
231
232
if include_audio and ref_in_a is not None:
233
output_args += [ref_in_a]
234
235
output_args += [str (output_file_path)]
236
237
output_kwargs = {}
238
239
if lossless:
240
output_kwargs.update ({"c:v": "libx264",
241
"crf": "0",
242
"pix_fmt": "yuv420p",
243
})
244
else:
245
output_kwargs.update ({"c:v": "libx264",
246
"b:v": "%dM" %(bitrate),
247
"pix_fmt": "yuv420p",
248
})
249
250
if include_audio and ref_in_a is not None:
251
output_kwargs.update ({"c:a": "aac",
252
"b:a": "192k",
253
"ar" : "48000",
254
"strict": "experimental"
255
})
256
257
job = ( ffmpeg.output(*output_args, **output_kwargs).overwrite_output() )
258
259
try:
260
job_run = job.run_async(pipe_stdin=True)
261
262
for image_path in input_image_paths:
263
with open (image_path, "rb") as f:
264
image_bytes = f.read()
265
job_run.stdin.write (image_bytes)
266
267
job_run.stdin.close()
268
job_run.wait()
269
except:
270
io.log_err ("ffmpeg fail, job commandline:" + str(job.compile()) )
271
272