Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download

Testing latest pari + WASM + node.js... and it works?! Wow.

28485 views
License: GPL3
ubuntu2004
1
/* Copyright (C) 2013 The PARI group.
2
3
This file is part of the PARI/GP package.
4
5
PARI/GP is free software; you can redistribute it and/or modify it under the
6
terms of the GNU General Public License as published by the Free Software
7
Foundation; either version 2 of the License, or (at your option) any later
8
version. It is distributed in the hope that it will be useful, but WITHOUT
9
ANY WARRANTY WHATSOEVER.
10
11
Check the License for details. You should have received a copy of it, along
12
with the package; see the file 'COPYING'. If not, write to the Free Software
13
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
14
#include <pthread.h>
15
#include "pari.h"
16
#include "paripriv.h"
17
#include "mt.h"
18
#if defined(_WIN32)
19
# include "../systems/mingw/mingw.h"
20
#endif
21
22
#define DEBUGLEVEL DEBUGLEVEL_mt
23
24
struct mt_queue
25
{
26
long no;
27
pari_sp avma;
28
struct pari_mainstack *mainstack;
29
GEN input, output;
30
GEN worker;
31
long workid;
32
pthread_cond_t cond;
33
pthread_mutex_t mut;
34
pthread_cond_t *pcond;
35
pthread_mutex_t *pmut;
36
};
37
38
struct mt_pstate
39
{
40
pthread_t *th;
41
struct pari_thread *pth;
42
struct mt_queue *mq;
43
long n, nbint, last;
44
long pending;
45
pthread_cond_t pcond;
46
pthread_mutex_t pmut;
47
};
48
49
static THREAD long mt_thread_no = -1, mt_issingle = 0;
50
static struct mt_pstate *pari_mt;
51
52
#define LOCK(x) pthread_mutex_lock(x); do
53
#define UNLOCK(x) while(0); pthread_mutex_unlock(x)
54
55
void
56
mt_sigint_block(void)
57
{
58
if (mt_thread_no>=0)
59
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
60
}
61
62
void
63
mt_sigint_unblock(void)
64
{
65
if (mt_thread_no>=0)
66
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
67
}
68
69
void
70
mt_err_recover(long er)
71
{
72
(void) er;
73
if (mt_thread_no>=0)
74
{
75
struct mt_pstate *mt = pari_mt;
76
struct mt_queue *mq = mt->mq+mt_thread_no;
77
GEN err = pari_err_last();
78
err = err_get_num(err)==e_STACK ? err_e_STACK: bin_copy(copy_bin(err));
79
LOCK(mq->pmut)
80
{
81
mq->output = err;
82
pthread_cond_signal(mq->pcond);
83
} UNLOCK(mq->pmut);
84
pthread_exit((void*)1);
85
}
86
else if (mt_issingle) mtsingle_err_recover(er);
87
}
88
89
void
90
mt_sigint(void)
91
{
92
if (pari_mt) pthread_cond_broadcast(&pari_mt->pcond);
93
}
94
95
int
96
mt_is_parallel(void)
97
{
98
return !!pari_mt;
99
}
100
101
int
102
mt_is_thread(void)
103
{
104
return mt_thread_no>=0 ? 1: mt_issingle ? mtsingle_is_thread(): 0;
105
}
106
107
long
108
mt_nbthreads(void)
109
{
110
return pari_mt ? 1: pari_mt_nbthreads;
111
}
112
113
void
114
mt_export_add(const char *str, GEN val)
115
{
116
if (pari_mt)
117
pari_err(e_MISC,"export() not allowed during parallel sections");
118
export_add(str, val);
119
}
120
121
void
122
mt_export_del(const char *str)
123
{
124
if (pari_mt)
125
pari_err(e_MISC,"unexport() not allowed during parallel sections");
126
export_del(str);
127
}
128
129
void mt_broadcast(GEN code) {(void) code;}
130
131
void pari_mt_init(void)
132
{
133
pari_mt = NULL;
134
mt_issingle = 0;
135
#ifdef _SC_NPROCESSORS_CONF
136
if (!pari_mt_nbthreads) pari_mt_nbthreads = sysconf(_SC_NPROCESSORS_CONF);
137
#elif defined(_WIN32)
138
if (!pari_mt_nbthreads) pari_mt_nbthreads = win32_nbthreads();
139
#else
140
pari_mt_nbthreads = 1;
141
#endif
142
}
143
144
void pari_mt_close(void) { }
145
146
static void
147
mt_queue_cleanup(void *arg)
148
{
149
(void) arg;
150
pari_thread_close();
151
}
152
153
static void
154
mt_queue_unlock(void *arg)
155
{ pthread_mutex_unlock((pthread_mutex_t*) arg); }
156
157
static void*
158
mt_queue_run(void *arg)
159
{
160
GEN args = pari_thread_start((struct pari_thread*) arg);
161
pari_sp av = avma;
162
struct mt_queue *mq = (struct mt_queue *) args;
163
mt_thread_no = mq->no;
164
pthread_cleanup_push(mt_queue_cleanup,NULL);
165
LOCK(mq->pmut)
166
{
167
mq->mainstack = pari_mainstack;
168
mq->avma = av;
169
pthread_cond_signal(mq->pcond);
170
} UNLOCK(mq->pmut);
171
for(;;)
172
{
173
GEN work, done;
174
LOCK(&mq->mut)
175
{
176
pthread_cleanup_push(mt_queue_unlock, &mq->mut);
177
while(!mq->input)
178
pthread_cond_wait(&mq->cond, &mq->mut);
179
pthread_cleanup_pop(0);
180
} UNLOCK(&mq->mut);
181
pari_mainstack = mq->mainstack;
182
set_avma(mq->avma);
183
work = mq->input;
184
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
185
done = closure_callgenvec(mq->worker,work);
186
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
187
LOCK(mq->pmut)
188
{
189
mq->mainstack = pari_mainstack;
190
mq->avma = av;
191
mq->input = NULL;
192
mq->output = done;
193
pthread_cond_signal(mq->pcond);
194
} UNLOCK(mq->pmut);
195
}
196
pthread_cleanup_pop(1);
197
#ifdef __GNUC__
198
return NULL; /* LCOV_EXCL_LINE */
199
#endif
200
}
201
202
static long
203
mt_queue_check(struct mt_pstate *mt)
204
{
205
long i;
206
for(i=0; i<mt->n; i++)
207
{
208
struct mt_queue *mq = mt->mq+i;
209
if (mq->output) return i;
210
}
211
return -1;
212
}
213
214
static GEN
215
mtpthread_queue_get(struct mt_state *junk, long *workid, long *pending)
216
{
217
struct mt_pstate *mt = pari_mt;
218
struct mt_queue *mq;
219
GEN done = NULL;
220
long last;
221
(void) junk;
222
if (mt->nbint<mt->n)
223
{
224
mt->last = mt->nbint;
225
*pending = mt->pending;
226
return NULL;
227
}
228
BLOCK_SIGINT_START
229
LOCK(&mt->pmut)
230
{
231
while ((last = mt_queue_check(mt)) < 0)
232
{
233
pthread_cond_wait(&mt->pcond, &mt->pmut);
234
if (PARI_SIGINT_pending)
235
{
236
int sig = PARI_SIGINT_pending;
237
PARI_SIGINT_pending = 0;
238
pthread_mutex_unlock(&mt->pmut);
239
PARI_SIGINT_block = 0;
240
raise(sig);
241
PARI_SIGINT_block = 1;
242
pthread_mutex_lock(&mt->pmut);
243
}
244
}
245
} UNLOCK(&mt->pmut);
246
BLOCK_SIGINT_END
247
mq = mt->mq+last;
248
done = gcopy(mq->output);
249
mq->output = NULL;
250
if (workid) *workid = mq->workid;
251
if (typ(done) == t_ERROR)
252
{
253
if (err_get_num(done)==e_STACK)
254
pari_err(e_STACKTHREAD);
255
else
256
pari_err(0,done);
257
}
258
mt->last = last;
259
mt->pending--;
260
*pending = mt->pending;
261
return done;
262
}
263
264
static void
265
mtpthread_queue_submit(struct mt_state *junk, long workid, GEN work)
266
{
267
struct mt_pstate *mt = pari_mt;
268
struct mt_queue *mq = mt->mq+mt->last;
269
(void) junk;
270
if (!work) { mt->nbint=mt->n; return; }
271
BLOCK_SIGINT_START
272
if (mt->nbint<mt->n)
273
{
274
mt->nbint++;
275
LOCK(mq->pmut)
276
{
277
while(!mq->avma)
278
pthread_cond_wait(mq->pcond, mq->pmut);
279
} UNLOCK(mq->pmut);
280
}
281
LOCK(&mq->mut)
282
{
283
mq->output = NULL;
284
mq->workid = workid;
285
BLOCK_SIGINT_START
286
{
287
pari_sp av = avma;
288
struct pari_mainstack *st = pari_mainstack;
289
pari_mainstack = mq->mainstack;
290
set_avma(mq->avma);
291
mq->input = gcopy(work);
292
mq->avma = avma;
293
mq->mainstack = pari_mainstack;
294
pari_mainstack = st;
295
set_avma(av);
296
}
297
BLOCK_SIGINT_END
298
pthread_cond_signal(&mq->cond);
299
} UNLOCK(&mq->mut);
300
mt->pending++;
301
BLOCK_SIGINT_END
302
}
303
304
void
305
mt_queue_reset(void)
306
{
307
struct mt_pstate *mt = pari_mt;
308
long i;
309
BLOCK_SIGINT_START
310
for (i=0; i<mt->n; i++)
311
pthread_cancel(mt->th[i]);
312
for (i=0; i<mt->n; i++)
313
pthread_join(mt->th[i],NULL);
314
pari_mt = NULL;
315
BLOCK_SIGINT_END
316
if (DEBUGLEVEL) pari_warn(warner,"stopping %ld threads", mt->n);
317
for (i=0;i<mt->n;i++)
318
{
319
struct mt_queue *mq = mt->mq+i;
320
pthread_cond_destroy(&mq->cond);
321
pthread_mutex_destroy(&mq->mut);
322
pari_thread_free(&mt->pth[i]);
323
}
324
pari_free(mt->mq);
325
pari_free(mt->pth);
326
pari_free(mt->th);
327
pari_free(mt);
328
}
329
330
static long
331
closure_has_clone(GEN fun)
332
{
333
if (isclone(fun)) return 1;
334
if (lg(fun) >= 8)
335
{
336
GEN f = closure_get_frame(fun);
337
long i, l = lg(f);
338
for (i = 1; i < l; i++)
339
if (isclone(gel(f,i))) return 1;
340
}
341
return 0;
342
}
343
344
void
345
mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)
346
{
347
if (lim==0) lim = pari_mt_nbthreads;
348
else lim = minss(pari_mt_nbthreads, lim);
349
if (pari_mt || lim <= 1)
350
{
351
mt_issingle = 1;
352
mtsingle_queue_start(pt, worker);
353
}
354
else
355
{
356
struct mt_pstate *mt =
357
(struct mt_pstate*) pari_malloc(sizeof(struct mt_pstate));
358
long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;
359
long mtparisizemax = GP_DATA->threadsizemax;
360
long i;
361
if (closure_has_clone(worker))
362
worker = gcopy(worker); /* to avoid clone_lock race */
363
mt->mq = (struct mt_queue *) pari_malloc(sizeof(*mt->mq)*lim);
364
mt->th = (pthread_t *) pari_malloc(sizeof(*mt->th)*lim);
365
mt->pth = (struct pari_thread *) pari_malloc(sizeof(*mt->pth)*lim);
366
mt->pending = 0;
367
mt->n = lim;
368
mt->nbint = 0;
369
mt->last = 0;
370
pthread_cond_init(&mt->pcond,NULL);
371
pthread_mutex_init(&mt->pmut,NULL);
372
for (i=0;i<lim;i++)
373
{
374
struct mt_queue *mq = mt->mq+i;
375
mq->no = i;
376
mq->avma = 0;
377
mq->mainstack = NULL;
378
mq->worker = worker;
379
mq->input = NULL;
380
mq->output = NULL;
381
mq->pcond = &mt->pcond;
382
mq->pmut = &mt->pmut;
383
pthread_cond_init(&mq->cond,NULL);
384
pthread_mutex_init(&mq->mut,NULL);
385
if (mtparisizemax)
386
pari_thread_valloc(&mt->pth[i],mtparisize,mtparisizemax,(GEN)mq);
387
else
388
pari_thread_alloc(&mt->pth[i],mtparisize,(GEN)mq);
389
}
390
if (DEBUGLEVEL) pari_warn(warner,"starting %ld threads", lim);
391
BLOCK_SIGINT_START
392
for (i=0;i<lim;i++)
393
pthread_create(&mt->th[i],NULL, &mt_queue_run, (void*)&mt->pth[i]);
394
pari_mt = mt;
395
mt_issingle = 0;
396
BLOCK_SIGINT_END
397
pt->get=&mtpthread_queue_get;
398
pt->submit=&mtpthread_queue_submit;
399
pt->end=&mt_queue_reset;
400
}
401
}
402
403