Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/core/qtex/QSubprocessor.py
628 views
1
import multiprocessing
2
import sys
3
import time
4
import traceback
5
6
from PyQt5.QtCore import *
7
from PyQt5.QtGui import *
8
from PyQt5.QtWidgets import *
9
10
from core.interact import interact as io
11
12
from .qtex import *
13
14
class QSubprocessor(object):
15
"""
16
17
"""
18
19
class Cli(object):
20
def __init__ ( self, client_dict ):
21
s2c = multiprocessing.Queue()
22
c2s = multiprocessing.Queue()
23
self.p = multiprocessing.Process(target=self._subprocess_run, args=(client_dict,s2c,c2s) )
24
self.s2c = s2c
25
self.c2s = c2s
26
self.p.daemon = True
27
self.p.start()
28
29
self.state = None
30
self.sent_time = None
31
self.sent_data = None
32
self.name = None
33
self.host_dict = None
34
35
def kill(self):
36
self.p.terminate()
37
self.p.join()
38
39
#overridable optional
40
def on_initialize(self, client_dict):
41
#initialize your subprocess here using client_dict
42
pass
43
44
#overridable optional
45
def on_finalize(self):
46
#finalize your subprocess here
47
pass
48
49
#overridable
50
def process_data(self, data):
51
#process 'data' given from host and return result
52
raise NotImplementedError
53
54
#overridable optional
55
def get_data_name (self, data):
56
#return string identificator of your 'data'
57
return "undefined"
58
59
def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } )
60
def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } )
61
def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } )
62
63
def _subprocess_run(self, client_dict, s2c, c2s):
64
self.c2s = c2s
65
data = None
66
try:
67
self.on_initialize(client_dict)
68
c2s.put ( {'op': 'init_ok'} )
69
while True:
70
msg = s2c.get()
71
op = msg.get('op','')
72
if op == 'data':
73
data = msg['data']
74
result = self.process_data (data)
75
c2s.put ( {'op': 'success', 'data' : data, 'result' : result} )
76
data = None
77
elif op == 'close':
78
break
79
time.sleep(0.001)
80
self.on_finalize()
81
c2s.put ( {'op': 'finalized'} )
82
except Exception as e:
83
c2s.put ( {'op': 'error', 'data' : data} )
84
if data is not None:
85
print ('Exception while process data [%s]: %s' % (self.get_data_name(data), traceback.format_exc()) )
86
else:
87
print ('Exception: %s' % (traceback.format_exc()) )
88
c2s.close()
89
s2c.close()
90
self.c2s = None
91
92
# disable pickling
93
def __getstate__(self):
94
return dict()
95
def __setstate__(self, d):
96
self.__dict__.update(d)
97
98
#overridable
99
def __init__(self, name, SubprocessorCli_class, no_response_time_sec = 0, io_loop_sleep_time=0.005):
100
if not issubclass(SubprocessorCli_class, QSubprocessor.Cli):
101
raise ValueError("SubprocessorCli_class must be subclass of QSubprocessor.Cli")
102
103
self.name = name
104
self.SubprocessorCli_class = SubprocessorCli_class
105
self.no_response_time_sec = no_response_time_sec
106
self.io_loop_sleep_time = io_loop_sleep_time
107
108
self.clis = []
109
110
#getting info about name of subprocesses, host and client dicts, and spawning them
111
for name, host_dict, client_dict in self.process_info_generator():
112
try:
113
cli = self.SubprocessorCli_class(client_dict)
114
cli.state = 1
115
cli.sent_time = 0
116
cli.sent_data = None
117
cli.name = name
118
cli.host_dict = host_dict
119
120
self.clis.append (cli)
121
except:
122
raise Exception (f"Unable to start subprocess {name}. Error: {traceback.format_exc()}")
123
124
if len(self.clis) == 0:
125
raise Exception ("Unable to start QSubprocessor '%s' " % (self.name))
126
127
#waiting subprocesses their success(or not) initialization
128
while True:
129
for cli in self.clis[:]:
130
while not cli.c2s.empty():
131
obj = cli.c2s.get()
132
op = obj.get('op','')
133
if op == 'init_ok':
134
cli.state = 0
135
elif op == 'log_info':
136
io.log_info(obj['msg'])
137
elif op == 'log_err':
138
io.log_err(obj['msg'])
139
elif op == 'error':
140
cli.kill()
141
self.clis.remove(cli)
142
break
143
if all ([cli.state == 0 for cli in self.clis]):
144
break
145
io.process_messages(0.005)
146
147
if len(self.clis) == 0:
148
raise Exception ( "Unable to start subprocesses." )
149
150
#ok some processes survived, initialize host logic
151
self.on_clients_initialized()
152
153
self.q_timer = QTimer()
154
self.q_timer.timeout.connect(self.tick)
155
self.q_timer.start(5)
156
157
#overridable
158
def process_info_generator(self):
159
#yield per process (name, host_dict, client_dict)
160
for i in range(min(multiprocessing.cpu_count(), 8) ):
161
yield 'CPU%d' % (i), {}, {}
162
163
#overridable optional
164
def on_clients_initialized(self):
165
#logic when all subprocesses initialized and ready
166
pass
167
168
#overridable optional
169
def on_clients_finalized(self):
170
#logic when all subprocess finalized
171
pass
172
173
#overridable
174
def get_data(self, host_dict):
175
#return data for processing here
176
raise NotImplementedError
177
178
#overridable
179
def on_data_return (self, host_dict, data):
180
#you have to place returned 'data' back to your queue
181
raise NotImplementedError
182
183
#overridable
184
def on_result (self, host_dict, data, result):
185
#your logic what to do with 'result' of 'data'
186
raise NotImplementedError
187
188
def tick(self):
189
for cli in self.clis[:]:
190
while not cli.c2s.empty():
191
obj = cli.c2s.get()
192
op = obj.get('op','')
193
if op == 'success':
194
#success processed data, return data and result to on_result
195
self.on_result (cli.host_dict, obj['data'], obj['result'])
196
self.sent_data = None
197
cli.state = 0
198
elif op == 'error':
199
#some error occured while process data, returning chunk to on_data_return
200
if 'data' in obj.keys():
201
self.on_data_return (cli.host_dict, obj['data'] )
202
#and killing process
203
cli.kill()
204
self.clis.remove(cli)
205
elif op == 'log_info':
206
io.log_info(obj['msg'])
207
elif op == 'log_err':
208
io.log_err(obj['msg'])
209
elif op == 'progress_bar_inc':
210
io.progress_bar_inc(obj['c'])
211
212
for cli in self.clis[:]:
213
if cli.state == 1:
214
if cli.sent_time != 0 and self.no_response_time_sec != 0 and (time.time() - cli.sent_time) > self.no_response_time_sec:
215
#subprocess busy too long
216
io.log_info ( '%s doesnt response, terminating it.' % (cli.name) )
217
self.on_data_return (cli.host_dict, cli.sent_data )
218
cli.kill()
219
self.clis.remove(cli)
220
221
for cli in self.clis[:]:
222
if cli.state == 0:
223
#free state of subprocess, get some data from get_data
224
data = self.get_data(cli.host_dict)
225
if data is not None:
226
#and send it to subprocess
227
cli.s2c.put ( {'op': 'data', 'data' : data} )
228
cli.sent_time = time.time()
229
cli.sent_data = data
230
cli.state = 1
231
232
if all ([cli.state == 0 for cli in self.clis]):
233
#gracefully terminating subprocesses
234
for cli in self.clis[:]:
235
cli.s2c.put ( {'op': 'close'} )
236
cli.sent_time = time.time()
237
238
while True:
239
for cli in self.clis[:]:
240
terminate_it = False
241
while not cli.c2s.empty():
242
obj = cli.c2s.get()
243
obj_op = obj['op']
244
if obj_op == 'finalized':
245
terminate_it = True
246
break
247
248
if (time.time() - cli.sent_time) > 30:
249
terminate_it = True
250
251
if terminate_it:
252
cli.state = 2
253
cli.kill()
254
255
if all ([cli.state == 2 for cli in self.clis]):
256
break
257
258
#finalizing host logic
259
self.q_timer.stop()
260
self.q_timer = None
261
self.on_clients_finalized()
262
263
264