Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/core/interact/interact.py
628 views
1
import multiprocessing
2
import os
3
import sys
4
import threading
5
import time
6
import types
7
8
import colorama
9
import cv2
10
import numpy as np
11
from tqdm import tqdm
12
13
from core import stdex
14
15
try:
16
import IPython #if success we are in colab
17
from IPython.display import display, clear_output
18
import PIL
19
import matplotlib.pyplot as plt
20
is_colab = True
21
except:
22
is_colab = False
23
24
yn_str = {True:'y',False:'n'}
25
26
class InteractBase(object):
27
EVENT_LBUTTONDOWN = 1
28
EVENT_LBUTTONUP = 2
29
EVENT_MBUTTONDOWN = 3
30
EVENT_MBUTTONUP = 4
31
EVENT_RBUTTONDOWN = 5
32
EVENT_RBUTTONUP = 6
33
EVENT_MOUSEWHEEL = 10
34
35
def __init__(self):
36
self.named_windows = {}
37
self.capture_mouse_windows = {}
38
self.capture_keys_windows = {}
39
self.mouse_events = {}
40
self.key_events = {}
41
self.pg_bar = None
42
self.focus_wnd_name = None
43
self.error_log_line_prefix = '/!\\ '
44
45
self.process_messages_callbacks = {}
46
47
def is_support_windows(self):
48
return False
49
50
def is_colab(self):
51
return False
52
53
def on_destroy_all_windows(self):
54
raise NotImplemented
55
56
def on_create_window (self, wnd_name):
57
raise NotImplemented
58
59
def on_destroy_window (self, wnd_name):
60
raise NotImplemented
61
62
def on_show_image (self, wnd_name, img):
63
raise NotImplemented
64
65
def on_capture_mouse (self, wnd_name):
66
raise NotImplemented
67
68
def on_capture_keys (self, wnd_name):
69
raise NotImplemented
70
71
def on_process_messages(self, sleep_time=0):
72
raise NotImplemented
73
74
def on_wait_any_key(self):
75
raise NotImplemented
76
77
def log_info(self, msg, end='\n'):
78
if self.pg_bar is not None:
79
print ("\n")
80
print (msg, end=end)
81
82
def log_err(self, msg, end='\n'):
83
if self.pg_bar is not None:
84
print ("\n")
85
print (f'{self.error_log_line_prefix}{msg}', end=end)
86
87
def named_window(self, wnd_name):
88
if wnd_name not in self.named_windows:
89
#we will show window only on first show_image
90
self.named_windows[wnd_name] = 0
91
self.focus_wnd_name = wnd_name
92
else: print("named_window: ", wnd_name, " already created.")
93
94
def destroy_all_windows(self):
95
if len( self.named_windows ) != 0:
96
self.on_destroy_all_windows()
97
self.named_windows = {}
98
self.capture_mouse_windows = {}
99
self.capture_keys_windows = {}
100
self.mouse_events = {}
101
self.key_events = {}
102
self.focus_wnd_name = None
103
104
def destroy_window(self, wnd_name):
105
if wnd_name in self.named_windows:
106
self.on_destroy_window(wnd_name)
107
self.named_windows.pop(wnd_name)
108
109
if wnd_name == self.focus_wnd_name:
110
self.focus_wnd_name = list(self.named_windows.keys())[-1] if len( self.named_windows ) != 0 else None
111
112
if wnd_name in self.capture_mouse_windows:
113
self.capture_mouse_windows.pop(wnd_name)
114
115
if wnd_name in self.capture_keys_windows:
116
self.capture_keys_windows.pop(wnd_name)
117
118
if wnd_name in self.mouse_events:
119
self.mouse_events.pop(wnd_name)
120
121
if wnd_name in self.key_events:
122
self.key_events.pop(wnd_name)
123
124
def show_image(self, wnd_name, img):
125
if wnd_name in self.named_windows:
126
if self.named_windows[wnd_name] == 0:
127
self.named_windows[wnd_name] = 1
128
self.on_create_window(wnd_name)
129
if wnd_name in self.capture_mouse_windows:
130
self.capture_mouse(wnd_name)
131
self.on_show_image(wnd_name,img)
132
else: print("show_image: named_window ", wnd_name, " not found.")
133
134
def capture_mouse(self, wnd_name):
135
if wnd_name in self.named_windows:
136
self.capture_mouse_windows[wnd_name] = True
137
if self.named_windows[wnd_name] == 1:
138
self.on_capture_mouse(wnd_name)
139
else: print("capture_mouse: named_window ", wnd_name, " not found.")
140
141
def capture_keys(self, wnd_name):
142
if wnd_name in self.named_windows:
143
if wnd_name not in self.capture_keys_windows:
144
self.capture_keys_windows[wnd_name] = True
145
self.on_capture_keys(wnd_name)
146
else: print("capture_keys: already set for window ", wnd_name)
147
else: print("capture_keys: named_window ", wnd_name, " not found.")
148
149
def progress_bar(self, desc, total, leave=True, initial=0):
150
if self.pg_bar is None:
151
self.pg_bar = tqdm( total=total, desc=desc, leave=leave, ascii=True, initial=initial )
152
else: print("progress_bar: already set.")
153
154
def progress_bar_inc(self, c):
155
if self.pg_bar is not None:
156
self.pg_bar.n += c
157
self.pg_bar.refresh()
158
else: print("progress_bar not set.")
159
160
def progress_bar_close(self):
161
if self.pg_bar is not None:
162
self.pg_bar.close()
163
self.pg_bar = None
164
else: print("progress_bar not set.")
165
166
def progress_bar_generator(self, data, desc=None, leave=True, initial=0):
167
self.pg_bar = tqdm( data, desc=desc, leave=leave, ascii=True, initial=initial )
168
for x in self.pg_bar:
169
yield x
170
self.pg_bar.close()
171
self.pg_bar = None
172
173
def add_process_messages_callback(self, func ):
174
tid = threading.get_ident()
175
callbacks = self.process_messages_callbacks.get(tid, None)
176
if callbacks is None:
177
callbacks = []
178
self.process_messages_callbacks[tid] = callbacks
179
180
callbacks.append ( func )
181
182
def process_messages(self, sleep_time=0):
183
callbacks = self.process_messages_callbacks.get(threading.get_ident(), None)
184
if callbacks is not None:
185
for func in callbacks:
186
func()
187
188
self.on_process_messages(sleep_time)
189
190
def wait_any_key(self):
191
self.on_wait_any_key()
192
193
def add_mouse_event(self, wnd_name, x, y, ev, flags):
194
if wnd_name not in self.mouse_events:
195
self.mouse_events[wnd_name] = []
196
self.mouse_events[wnd_name] += [ (x, y, ev, flags) ]
197
198
def add_key_event(self, wnd_name, ord_key, ctrl_pressed, alt_pressed, shift_pressed):
199
if wnd_name not in self.key_events:
200
self.key_events[wnd_name] = []
201
self.key_events[wnd_name] += [ (ord_key, chr(ord_key) if ord_key <= 255 else chr(0), ctrl_pressed, alt_pressed, shift_pressed) ]
202
203
def get_mouse_events(self, wnd_name):
204
ar = self.mouse_events.get(wnd_name, [])
205
self.mouse_events[wnd_name] = []
206
return ar
207
208
def get_key_events(self, wnd_name):
209
ar = self.key_events.get(wnd_name, [])
210
self.key_events[wnd_name] = []
211
return ar
212
213
def input(self, s):
214
return input(s)
215
216
def input_number(self, s, default_value, valid_list=None, show_default_value=True, add_info=None, help_message=None):
217
if show_default_value and default_value is not None:
218
s = f"[{default_value}] {s}"
219
220
if add_info is not None or \
221
help_message is not None:
222
s += " ("
223
224
if add_info is not None:
225
s += f" {add_info}"
226
if help_message is not None:
227
s += " ?:help"
228
229
if add_info is not None or \
230
help_message is not None:
231
s += " )"
232
233
s += " : "
234
235
while True:
236
try:
237
inp = input(s)
238
if len(inp) == 0:
239
result = default_value
240
break
241
242
if help_message is not None and inp == '?':
243
print (help_message)
244
continue
245
246
i = float(inp)
247
if (valid_list is not None) and (i not in valid_list):
248
result = default_value
249
break
250
result = i
251
break
252
except:
253
result = default_value
254
break
255
256
print(result)
257
return result
258
259
def input_int(self, s, default_value, valid_range=None, valid_list=None, add_info=None, show_default_value=True, help_message=None):
260
if show_default_value:
261
if len(s) != 0:
262
s = f"[{default_value}] {s}"
263
else:
264
s = f"[{default_value}]"
265
266
if add_info is not None or \
267
valid_range is not None or \
268
help_message is not None:
269
s += " ("
270
271
if valid_range is not None:
272
s += f" {valid_range[0]}-{valid_range[1]}"
273
274
if add_info is not None:
275
s += f" {add_info}"
276
277
if help_message is not None:
278
s += " ?:help"
279
280
if add_info is not None or \
281
valid_range is not None or \
282
help_message is not None:
283
s += " )"
284
285
s += " : "
286
287
while True:
288
try:
289
inp = input(s)
290
if len(inp) == 0:
291
raise ValueError("")
292
293
if help_message is not None and inp == '?':
294
print (help_message)
295
continue
296
297
i = int(inp)
298
if valid_range is not None:
299
i = int(np.clip(i, valid_range[0], valid_range[1]))
300
301
if (valid_list is not None) and (i not in valid_list):
302
i = default_value
303
304
result = i
305
break
306
except:
307
result = default_value
308
break
309
print (result)
310
return result
311
312
def input_bool(self, s, default_value, help_message=None):
313
s = f"[{yn_str[default_value]}] {s} ( y/n"
314
315
if help_message is not None:
316
s += " ?:help"
317
s += " ) : "
318
319
while True:
320
try:
321
inp = input(s)
322
if len(inp) == 0:
323
raise ValueError("")
324
325
if help_message is not None and inp == '?':
326
print (help_message)
327
continue
328
329
return bool ( {"y":True,"n":False}.get(inp.lower(), default_value) )
330
except:
331
print ( "y" if default_value else "n" )
332
return default_value
333
334
def input_str(self, s, default_value=None, valid_list=None, show_default_value=True, help_message=None):
335
if show_default_value and default_value is not None:
336
s = f"[{default_value}] {s}"
337
338
if valid_list is not None or \
339
help_message is not None:
340
s += " ("
341
342
if valid_list is not None:
343
s += " " + "/".join(valid_list)
344
345
if help_message is not None:
346
s += " ?:help"
347
348
if valid_list is not None or \
349
help_message is not None:
350
s += " )"
351
352
s += " : "
353
354
355
while True:
356
try:
357
inp = input(s)
358
359
if len(inp) == 0:
360
if default_value is None:
361
print("")
362
return None
363
result = default_value
364
break
365
366
if help_message is not None and inp == '?':
367
print(help_message)
368
continue
369
370
if valid_list is not None:
371
if inp.lower() in valid_list:
372
result = inp.lower()
373
break
374
if inp in valid_list:
375
result = inp
376
break
377
continue
378
379
result = inp
380
break
381
except:
382
result = default_value
383
break
384
385
print(result)
386
return result
387
388
def input_process(self, stdin_fd, sq, str):
389
sys.stdin = os.fdopen(stdin_fd)
390
try:
391
inp = input (str)
392
sq.put (True)
393
except:
394
sq.put (False)
395
396
def input_in_time (self, str, max_time_sec):
397
sq = multiprocessing.Queue()
398
p = multiprocessing.Process(target=self.input_process, args=( sys.stdin.fileno(), sq, str))
399
p.daemon = True
400
p.start()
401
t = time.time()
402
inp = False
403
while True:
404
if not sq.empty():
405
inp = sq.get()
406
break
407
if time.time() - t > max_time_sec:
408
break
409
410
411
p.terminate()
412
p.join()
413
414
old_stdin = sys.stdin
415
sys.stdin = os.fdopen( os.dup(sys.stdin.fileno()) )
416
old_stdin.close()
417
return inp
418
419
def input_process_skip_pending(self, stdin_fd):
420
sys.stdin = os.fdopen(stdin_fd)
421
while True:
422
try:
423
if sys.stdin.isatty():
424
sys.stdin.read()
425
except:
426
pass
427
428
def input_skip_pending(self):
429
if is_colab:
430
# currently it does not work on Colab
431
return
432
"""
433
skips unnecessary inputs between the dialogs
434
"""
435
p = multiprocessing.Process(target=self.input_process_skip_pending, args=( sys.stdin.fileno(), ))
436
p.daemon = True
437
p.start()
438
time.sleep(0.5)
439
p.terminate()
440
p.join()
441
sys.stdin = os.fdopen( sys.stdin.fileno() )
442
443
444
class InteractDesktop(InteractBase):
445
def __init__(self):
446
colorama.init()
447
super().__init__()
448
449
def color_red(self):
450
pass
451
452
453
def is_support_windows(self):
454
return True
455
456
def on_destroy_all_windows(self):
457
cv2.destroyAllWindows()
458
459
def on_create_window (self, wnd_name):
460
cv2.namedWindow(wnd_name)
461
462
def on_destroy_window (self, wnd_name):
463
cv2.destroyWindow(wnd_name)
464
465
def on_show_image (self, wnd_name, img):
466
cv2.imshow (wnd_name, img)
467
468
def on_capture_mouse (self, wnd_name):
469
self.last_xy = (0,0)
470
471
def onMouse(event, x, y, flags, param):
472
(inst, wnd_name) = param
473
if event == cv2.EVENT_LBUTTONDOWN: ev = InteractBase.EVENT_LBUTTONDOWN
474
elif event == cv2.EVENT_LBUTTONUP: ev = InteractBase.EVENT_LBUTTONUP
475
elif event == cv2.EVENT_RBUTTONDOWN: ev = InteractBase.EVENT_RBUTTONDOWN
476
elif event == cv2.EVENT_RBUTTONUP: ev = InteractBase.EVENT_RBUTTONUP
477
elif event == cv2.EVENT_MBUTTONDOWN: ev = InteractBase.EVENT_MBUTTONDOWN
478
elif event == cv2.EVENT_MBUTTONUP: ev = InteractBase.EVENT_MBUTTONUP
479
elif event == cv2.EVENT_MOUSEWHEEL:
480
ev = InteractBase.EVENT_MOUSEWHEEL
481
x,y = self.last_xy #fix opencv bug when window size more than screen size
482
else: ev = 0
483
484
self.last_xy = (x,y)
485
inst.add_mouse_event (wnd_name, x, y, ev, flags)
486
cv2.setMouseCallback(wnd_name, onMouse, (self,wnd_name) )
487
488
def on_capture_keys (self, wnd_name):
489
pass
490
491
def on_process_messages(self, sleep_time=0):
492
493
has_windows = False
494
has_capture_keys = False
495
496
if len(self.named_windows) != 0:
497
has_windows = True
498
499
if len(self.capture_keys_windows) != 0:
500
has_capture_keys = True
501
502
if has_windows or has_capture_keys:
503
wait_key_time = max(1, int(sleep_time*1000) )
504
ord_key = cv2.waitKeyEx(wait_key_time)
505
506
shift_pressed = False
507
if ord_key != -1:
508
chr_key = chr(ord_key) if ord_key <= 255 else chr(0)
509
510
if chr_key >= 'A' and chr_key <= 'Z':
511
shift_pressed = True
512
ord_key += 32
513
elif chr_key == '?':
514
shift_pressed = True
515
ord_key = ord('/')
516
elif chr_key == '<':
517
shift_pressed = True
518
ord_key = ord(',')
519
elif chr_key == '>':
520
shift_pressed = True
521
ord_key = ord('.')
522
else:
523
if sleep_time != 0:
524
time.sleep(sleep_time)
525
526
if has_capture_keys and ord_key != -1:
527
self.add_key_event ( self.focus_wnd_name, ord_key, False, False, shift_pressed)
528
529
def on_wait_any_key(self):
530
cv2.waitKey(0)
531
532
class InteractColab(InteractBase):
533
534
def is_support_windows(self):
535
return False
536
537
def is_colab(self):
538
return True
539
540
def on_destroy_all_windows(self):
541
pass
542
#clear_output()
543
544
def on_create_window (self, wnd_name):
545
pass
546
#clear_output()
547
548
def on_destroy_window (self, wnd_name):
549
pass
550
551
def on_show_image (self, wnd_name, img):
552
pass
553
# # cv2 stores colors as BGR; convert to RGB
554
# if img.ndim == 3:
555
# if img.shape[2] == 4:
556
# img = cv2.cvtColor(img, cv2.COLOR_BGRA2RGBA)
557
# else:
558
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
559
# img = PIL.Image.fromarray(img)
560
# plt.imshow(img)
561
# plt.show()
562
563
def on_capture_mouse (self, wnd_name):
564
pass
565
#print("on_capture_mouse(): Colab does not support")
566
567
def on_capture_keys (self, wnd_name):
568
pass
569
#print("on_capture_keys(): Colab does not support")
570
571
def on_process_messages(self, sleep_time=0):
572
time.sleep(sleep_time)
573
574
def on_wait_any_key(self):
575
pass
576
#print("on_wait_any_key(): Colab does not support")
577
578
if is_colab:
579
interact = InteractColab()
580
else:
581
interact = InteractDesktop()
582
583