Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
iperov
GitHub Repository: iperov/deepfacelab
Path: blob/master/core/joblib/SubprocessorBase.py
628 views
1
import traceback
2
import multiprocessing
3
import time
4
import sys
5
from core.interact import interact as io
6
7
8
class Subprocessor(object):
9
10
class SilenceException(Exception):
11
pass
12
13
class Cli(object):
14
def __init__ ( self, client_dict ):
15
s2c = multiprocessing.Queue()
16
c2s = multiprocessing.Queue()
17
self.p = multiprocessing.Process(target=self._subprocess_run, args=(client_dict,s2c,c2s) )
18
self.s2c = s2c
19
self.c2s = c2s
20
self.p.daemon = True
21
self.p.start()
22
23
self.state = None
24
self.sent_time = None
25
self.sent_data = None
26
self.name = None
27
self.host_dict = None
28
29
def kill(self):
30
self.p.terminate()
31
self.p.join()
32
33
#overridable optional
34
def on_initialize(self, client_dict):
35
#initialize your subprocess here using client_dict
36
pass
37
38
#overridable optional
39
def on_finalize(self):
40
#finalize your subprocess here
41
pass
42
43
#overridable
44
def process_data(self, data):
45
#process 'data' given from host and return result
46
raise NotImplementedError
47
48
#overridable optional
49
def get_data_name (self, data):
50
#return string identificator of your 'data'
51
return "undefined"
52
53
def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } )
54
def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } )
55
def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } )
56
57
def _subprocess_run(self, client_dict, s2c, c2s):
58
self.c2s = c2s
59
data = None
60
is_error = False
61
try:
62
self.on_initialize(client_dict)
63
64
c2s.put ( {'op': 'init_ok'} )
65
66
while True:
67
msg = s2c.get()
68
op = msg.get('op','')
69
if op == 'data':
70
data = msg['data']
71
result = self.process_data (data)
72
c2s.put ( {'op': 'success', 'data' : data, 'result' : result} )
73
data = None
74
elif op == 'close':
75
break
76
77
time.sleep(0.001)
78
79
self.on_finalize()
80
c2s.put ( {'op': 'finalized'} )
81
except Subprocessor.SilenceException as e:
82
c2s.put ( {'op': 'error', 'data' : data} )
83
except Exception as e:
84
err_msg = traceback.format_exc()
85
c2s.put ( {'op': 'error', 'data' : data, 'err_msg' : err_msg} )
86
87
c2s.close()
88
s2c.close()
89
self.c2s = None
90
91
# disable pickling
92
def __getstate__(self):
93
return dict()
94
def __setstate__(self, d):
95
self.__dict__.update(d)
96
97
#overridable
98
def __init__(self, name, SubprocessorCli_class, no_response_time_sec = 0, io_loop_sleep_time=0.005, initialize_subprocesses_in_serial=False):
99
if not issubclass(SubprocessorCli_class, Subprocessor.Cli):
100
raise ValueError("SubprocessorCli_class must be subclass of Subprocessor.Cli")
101
102
self.name = name
103
self.SubprocessorCli_class = SubprocessorCli_class
104
self.no_response_time_sec = no_response_time_sec
105
self.io_loop_sleep_time = io_loop_sleep_time
106
self.initialize_subprocesses_in_serial = initialize_subprocesses_in_serial
107
108
#overridable
109
def process_info_generator(self):
110
#yield per process (name, host_dict, client_dict)
111
raise NotImplementedError
112
113
#overridable optional
114
def on_clients_initialized(self):
115
#logic when all subprocesses initialized and ready
116
pass
117
118
#overridable optional
119
def on_clients_finalized(self):
120
#logic when all subprocess finalized
121
pass
122
123
#overridable
124
def get_data(self, host_dict):
125
#return data for processing here
126
raise NotImplementedError
127
128
#overridable
129
def on_data_return (self, host_dict, data):
130
#you have to place returned 'data' back to your queue
131
raise NotImplementedError
132
133
#overridable
134
def on_result (self, host_dict, data, result):
135
#your logic what to do with 'result' of 'data'
136
raise NotImplementedError
137
138
#overridable
139
def get_result(self):
140
#return result that will be returned in func run()
141
return None
142
143
#overridable
144
def on_tick(self):
145
#tick in main loop
146
#return True if system can be finalized when no data in get_data, orelse False
147
return True
148
149
#overridable
150
def on_check_run(self):
151
return True
152
153
def run(self):
154
if not self.on_check_run():
155
return self.get_result()
156
157
self.clis = []
158
159
def cli_init_dispatcher(cli):
160
while not cli.c2s.empty():
161
obj = cli.c2s.get()
162
op = obj.get('op','')
163
if op == 'init_ok':
164
cli.state = 0
165
elif op == 'log_info':
166
io.log_info(obj['msg'])
167
elif op == 'log_err':
168
io.log_err(obj['msg'])
169
elif op == 'error':
170
err_msg = obj.get('err_msg', None)
171
if err_msg is not None:
172
io.log_info(f'Error while subprocess initialization: {err_msg}')
173
cli.kill()
174
self.clis.remove(cli)
175
break
176
177
#getting info about name of subprocesses, host and client dicts, and spawning them
178
for name, host_dict, client_dict in self.process_info_generator():
179
try:
180
cli = self.SubprocessorCli_class(client_dict)
181
cli.state = 1
182
cli.sent_time = 0
183
cli.sent_data = None
184
cli.name = name
185
cli.host_dict = host_dict
186
187
self.clis.append (cli)
188
189
if self.initialize_subprocesses_in_serial:
190
while True:
191
cli_init_dispatcher(cli)
192
if cli.state == 0:
193
break
194
io.process_messages(0.005)
195
except:
196
raise Exception (f"Unable to start subprocess {name}. Error: {traceback.format_exc()}")
197
198
if len(self.clis) == 0:
199
raise Exception ("Unable to start Subprocessor '%s' " % (self.name))
200
201
#waiting subprocesses their success(or not) initialization
202
while True:
203
for cli in self.clis[:]:
204
cli_init_dispatcher(cli)
205
if all ([cli.state == 0 for cli in self.clis]):
206
break
207
io.process_messages(0.005)
208
209
if len(self.clis) == 0:
210
raise Exception ( "Unable to start subprocesses." )
211
212
#ok some processes survived, initialize host logic
213
214
self.on_clients_initialized()
215
216
#main loop of data processing
217
while True:
218
for cli in self.clis[:]:
219
while not cli.c2s.empty():
220
obj = cli.c2s.get()
221
op = obj.get('op','')
222
if op == 'success':
223
#success processed data, return data and result to on_result
224
self.on_result (cli.host_dict, obj['data'], obj['result'])
225
self.sent_data = None
226
cli.state = 0
227
elif op == 'error':
228
#some error occured while process data, returning chunk to on_data_return
229
err_msg = obj.get('err_msg', None)
230
if err_msg is not None:
231
io.log_info(f'Error while processing data: {err_msg}')
232
233
if 'data' in obj.keys():
234
self.on_data_return (cli.host_dict, obj['data'] )
235
#and killing process
236
cli.kill()
237
self.clis.remove(cli)
238
elif op == 'log_info':
239
io.log_info(obj['msg'])
240
elif op == 'log_err':
241
io.log_err(obj['msg'])
242
elif op == 'progress_bar_inc':
243
io.progress_bar_inc(obj['c'])
244
245
for cli in self.clis[:]:
246
if cli.state == 1:
247
if cli.sent_time != 0 and self.no_response_time_sec != 0 and (time.time() - cli.sent_time) > self.no_response_time_sec:
248
#subprocess busy too long
249
print ( '%s doesnt response, terminating it.' % (cli.name) )
250
self.on_data_return (cli.host_dict, cli.sent_data )
251
cli.kill()
252
self.clis.remove(cli)
253
254
for cli in self.clis[:]:
255
if cli.state == 0:
256
#free state of subprocess, get some data from get_data
257
data = self.get_data(cli.host_dict)
258
if data is not None:
259
#and send it to subprocess
260
cli.s2c.put ( {'op': 'data', 'data' : data} )
261
cli.sent_time = time.time()
262
cli.sent_data = data
263
cli.state = 1
264
265
if self.io_loop_sleep_time != 0:
266
io.process_messages(self.io_loop_sleep_time)
267
268
if self.on_tick() and all ([cli.state == 0 for cli in self.clis]):
269
#all subprocesses free and no more data available to process, ending loop
270
break
271
272
273
274
#gracefully terminating subprocesses
275
for cli in self.clis[:]:
276
cli.s2c.put ( {'op': 'close'} )
277
cli.sent_time = time.time()
278
279
while True:
280
for cli in self.clis[:]:
281
terminate_it = False
282
while not cli.c2s.empty():
283
obj = cli.c2s.get()
284
obj_op = obj['op']
285
if obj_op == 'finalized':
286
terminate_it = True
287
break
288
289
if (time.time() - cli.sent_time) > 30:
290
terminate_it = True
291
292
if terminate_it:
293
cli.state = 2
294
cli.kill()
295
296
if all ([cli.state == 2 for cli in self.clis]):
297
break
298
299
#finalizing host logic and return result
300
self.on_clients_finalized()
301
302
return self.get_result()
303
304