Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download

Testing latest pari + WASM + node.js... and it works?! Wow.

28485 views
License: GPL3
ubuntu2004
1
/* Copyright (C) 2013 The PARI group.
2
3
This file is part of the PARI/GP package.
4
5
PARI/GP is free software; you can redistribute it and/or modify it under the
6
terms of the GNU General Public License as published by the Free Software
7
Foundation; either version 2 of the License, or (at your option) any later
8
version. It is distributed in the hope that it will be useful, but WITHOUT
9
ANY WARRANTY WHATSOEVER.
10
11
Check the License for details. You should have received a copy of it, along
12
with the package; see the file 'COPYING'. If not, write to the Free Software
13
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
14
#include <mpi.h>
15
#include "pari.h"
16
#include "paripriv.h"
17
#include "../src/language/anal.h"
18
#include "mt.h"
19
20
#define DEBUGLEVEL DEBUGLEVEL_mt
21
22
static THREAD int pari_MPI_size, pari_MPI_rank;
23
static THREAD long nbreq = 0, mt_issingle = 0;
24
25
enum PMPI_cmd { PMPI_close, PMPI_worker, PMPI_work, PMPI_parisizemax,
26
PMPI_parisize, PMPI_precreal, PMPI_primetab,
27
PMPI_varpriority, PMPI_eval,
28
PMPI_exportadd, PMPI_exportdel};
29
30
struct mt_mstate
31
{
32
long n;
33
int source;
34
long nbint;
35
long *workid;
36
};
37
38
static struct mt_mstate pari_mt_data;
39
static struct mt_mstate *pari_mt;
40
41
static void
42
send_long(long a, long dest)
43
{
44
BLOCK_SIGINT_START
45
MPI_Send(&a, 1, MPI_LONG, dest, 0, MPI_COMM_WORLD);
46
BLOCK_SIGINT_END
47
}
48
49
static void
50
send_vlong(long *a, long n, long dest)
51
{
52
BLOCK_SIGINT_START
53
MPI_Send(a, n, MPI_LONG, dest, 0, MPI_COMM_WORLD);
54
BLOCK_SIGINT_END
55
}
56
57
static void
58
send_request(enum PMPI_cmd ecmd, long dest)
59
{
60
send_long((long)ecmd, dest);
61
}
62
63
static void
64
send_GEN(GEN elt, int dest)
65
{
66
pari_sp av = avma;
67
int size;
68
GEN reloc = copybin_unlink(elt);
69
GENbin *buf = copy_bin_canon(mkvec2(elt,reloc));
70
size = sizeof(GENbin) + buf->len*sizeof(ulong);
71
{
72
BLOCK_SIGINT_START
73
MPI_Send(buf, size, MPI_CHAR, dest, 0, MPI_COMM_WORLD);
74
BLOCK_SIGINT_END
75
}
76
pari_free(buf); set_avma(av);
77
}
78
79
static void
80
send_request_GEN(enum PMPI_cmd ecmd, GEN elt, int dest)
81
{
82
send_request(ecmd, dest);
83
send_GEN(elt, dest);
84
}
85
86
static void
87
send_request_long(enum PMPI_cmd ecmd, long elt, int dest)
88
{
89
send_request(ecmd, dest);
90
send_long(elt, dest);
91
}
92
93
static void
94
send_request_vlong(enum PMPI_cmd ecmd, long *a, long n, int dest)
95
{
96
send_request(ecmd, dest);
97
send_vlong(a, n, dest);
98
}
99
100
static long
101
recvfrom_long(int src)
102
{
103
long a;
104
BLOCK_SIGINT_START
105
MPI_Recv(&a, 1, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
106
BLOCK_SIGINT_END
107
return a;
108
}
109
110
static void
111
recvfrom_vlong(long *a, long n, int src)
112
{
113
BLOCK_SIGINT_START
114
MPI_Recv(a, n, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
115
BLOCK_SIGINT_END
116
}
117
118
static enum PMPI_cmd
119
recvfrom_request(int src)
120
{
121
return (enum PMPI_cmd) recvfrom_long(src);
122
}
123
124
static GENbin *
125
recvstatus_buf(int source, MPI_Status *status)
126
{
127
int size;
128
GENbin *buf;
129
BLOCK_SIGINT_START
130
131
MPI_Get_count(status, MPI_CHAR, &size);
132
buf = (GENbin *)pari_malloc(size);
133
MPI_Recv(buf, size, MPI_CHAR, source, 0/* tag */,
134
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
135
BLOCK_SIGINT_END
136
return buf;
137
}
138
139
static GEN
140
recvstatus_GEN(int source, MPI_Status *status)
141
{
142
GEN res;
143
GENbin *buf = recvstatus_buf(source, status);
144
buf->rebase = &shiftaddress_canon;
145
res = bin_copy(buf);
146
bincopy_relink(gel(res,1),gel(res,2));
147
return gel(res,1);
148
}
149
150
static void
151
recvstatus_void(int source, MPI_Status *status)
152
{
153
GENbin *buf = recvstatus_buf(source, status);
154
free(buf);
155
}
156
157
static GEN
158
recvfrom_GEN(int src)
159
{
160
MPI_Status status;
161
BLOCK_SIGINT_START
162
MPI_Probe(src, 0, MPI_COMM_WORLD, &status);
163
BLOCK_SIGINT_END
164
return recvstatus_GEN(src, &status);
165
}
166
167
static GEN
168
recvany_GEN(int *source)
169
{
170
MPI_Status status;
171
BLOCK_SIGINT_START
172
MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);
173
*source = status.MPI_SOURCE;
174
BLOCK_SIGINT_END
175
return recvstatus_GEN(*source, &status);
176
}
177
178
static void
179
recvany_void(int *source)
180
{
181
MPI_Status status;
182
BLOCK_SIGINT_START
183
MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);
184
*source = status.MPI_SOURCE;
185
BLOCK_SIGINT_END
186
recvstatus_void(*source, &status);
187
}
188
189
static jmp_buf child_env;
190
191
static void
192
pari_MPI_child(void)
193
{
194
pari_sp av = avma;
195
ulong rsize = 0, vsize = 0;
196
GEN worker = NULL, work, done;
197
struct gp_context rec;
198
pari_mt_nbthreads = 1;
199
gp_context_save(&rec);
200
if (setjmp(child_env))
201
{
202
send_GEN(pari_err_last(), 0);
203
gp_context_restore(&rec);
204
}
205
while (1)
206
switch (recvfrom_request(0))
207
{
208
case PMPI_worker:
209
paristack_setsize(rsize, vsize);
210
gp_context_save(&rec);
211
worker = recvfrom_GEN(0);
212
av = avma;
213
break;
214
case PMPI_work:
215
work = recvfrom_GEN(0);
216
done = closure_callgenvec(worker, work);
217
send_GEN(done, 0);
218
set_avma(av);
219
break;
220
case PMPI_parisizemax:
221
vsize = recvfrom_long(0);
222
break;
223
case PMPI_parisize:
224
rsize = recvfrom_long(0);
225
break;
226
case PMPI_precreal:
227
precreal = recvfrom_long(0);
228
break;
229
case PMPI_primetab:
230
{
231
pari_sp ltop = avma;
232
GEN tab = recvfrom_GEN(0);
233
if (!gequal(tab, primetab))
234
{
235
long i, l = lg(tab);
236
GEN old = primetab, t = cgetg_block(l, t_VEC);
237
for (i = 1; i < l; i++) gel(t,i) = gclone(gel(tab,i));
238
primetab = t;
239
gunclone_deep(old);
240
}
241
set_avma(ltop);
242
}
243
break;
244
case PMPI_eval:
245
(void) closure_evalgen(recvfrom_GEN(0));
246
set_avma(av);
247
break;
248
case PMPI_varpriority:
249
recvfrom_vlong(varpriority-1,MAXVARN+2,0);
250
break;
251
case PMPI_exportadd:
252
{
253
GEN str = recvfrom_GEN(0);
254
GEN val = recvfrom_GEN(0);
255
entree *ep = fetch_entry(GSTR(str));
256
export_add(ep->name, val);
257
set_avma(av);
258
break;
259
}
260
case PMPI_exportdel:
261
{
262
GEN str = recvfrom_GEN(0);
263
entree *ep = fetch_entry(GSTR(str));
264
export_del(ep->name);
265
set_avma(av);
266
break;
267
}
268
case PMPI_close:
269
MPI_Barrier(MPI_COMM_WORLD);
270
MPI_Finalize();
271
exit(0);
272
break;
273
}
274
}
275
276
void
277
mt_err_recover(long er)
278
{
279
if (pari_MPI_rank) longjmp(child_env,er);
280
else if (mt_issingle) mtsingle_err_recover(er);
281
}
282
void mt_sigint_block(void) { }
283
void mt_sigint_unblock(void) { }
284
void mt_sigint(void) {}
285
286
int
287
mt_is_parallel(void)
288
{
289
return !!pari_mt;
290
}
291
292
int
293
mt_is_thread(void)
294
{
295
return pari_MPI_rank ? 1 : mt_issingle ? mtsingle_is_thread(): 0;
296
}
297
298
long
299
mt_nbthreads(void)
300
{
301
return pari_mt || pari_MPI_rank || pari_MPI_size <= 2 ? 1: pari_mt_nbthreads;
302
}
303
304
void
305
mt_export_add(const char *str, GEN val)
306
{
307
pari_sp av = avma;
308
long i, n = pari_MPI_size-1;
309
GEN s;
310
if (pari_mt || pari_MPI_rank)
311
pari_err(e_MISC,"export not allowed during parallel sections");
312
export_add(str, val);
313
s = strtoGENstr(str);
314
for (i=1; i <= n; i++)
315
{
316
send_request(PMPI_exportadd, i);
317
send_GEN(s, i);
318
send_GEN(val, i);
319
}
320
set_avma(av);
321
}
322
323
void
324
mt_export_del(const char *str)
325
{
326
pari_sp av = avma;
327
long i, n = pari_MPI_size-1;
328
GEN s;
329
if (pari_MPI_rank)
330
pari_err(e_MISC,"unexport not allowed during parallel sections");
331
export_del(str);
332
s = strtoGENstr(str);
333
for (i=1; i <= n; i++)
334
send_request_GEN(PMPI_exportdel, s, i);
335
set_avma(av);
336
}
337
338
void
339
mt_broadcast(GEN code)
340
{
341
long i;
342
if (!pari_MPI_rank && !pari_mt)
343
for (i=1;i<pari_MPI_size;i++)
344
send_request_GEN(PMPI_eval, code, i);
345
}
346
347
void
348
pari_mt_init(void)
349
{
350
int res = MPI_Init(0, NULL);
351
if (res == MPI_SUCCESS)
352
{
353
MPI_Comm_size(MPI_COMM_WORLD, &pari_MPI_size);
354
MPI_Comm_rank(MPI_COMM_WORLD, &pari_MPI_rank);
355
if (pari_MPI_rank) pari_MPI_child();
356
#ifdef _IOFBF
357
/* HACK: most MPI implementation does not handle stdin well.
358
stdinsize is sufficient for the largest test file to fit */
359
setvbuf(stdin,pari_malloc(128*1024),_IOFBF,128*1024);
360
#endif
361
if (!pari_mt_nbthreads)
362
pari_mt_nbthreads = maxss(1, pari_MPI_size-1);
363
}
364
else
365
{
366
pari_MPI_size = 0;
367
pari_MPI_rank = 0;
368
pari_mt_nbthreads = 1;
369
}
370
pari_mt = NULL;
371
}
372
373
void
374
pari_mt_close(void)
375
{
376
long i;
377
if (!pari_MPI_rank)
378
for (i = 1; i < pari_MPI_size; i++)
379
send_request(PMPI_close, i);
380
MPI_Barrier(MPI_COMM_WORLD);
381
MPI_Finalize();
382
}
383
384
static GEN
385
mtmpi_queue_get(struct mt_state *junk, long *workid, long *pending)
386
{
387
struct mt_mstate *mt = pari_mt;
388
GEN done;
389
(void) junk;
390
if (mt->nbint<=mt->n) { mt->source=mt->nbint; *pending = nbreq; return NULL; }
391
done = recvany_GEN(&mt->source);
392
nbreq--; *pending = nbreq;
393
if (workid) *workid = mt->workid[mt->source];
394
if (typ(done) == t_ERROR)
395
{
396
if (err_get_num(done)==e_STACK)
397
pari_err(e_STACKTHREAD);
398
else
399
pari_err(0,done);
400
}
401
return done;
402
}
403
404
static void
405
mtmpi_queue_submit(struct mt_state *junk, long workid, GEN work)
406
{
407
struct mt_mstate *mt = pari_mt;
408
(void) junk;
409
if (!work) { mt->nbint=mt->n+1; return; }
410
if (mt->nbint<=mt->n) mt->nbint++;
411
nbreq++;
412
mt->workid[mt->source] = workid;
413
send_request_GEN(PMPI_work, work, mt->source);
414
}
415
416
void
417
mt_queue_reset(void)
418
{
419
struct mt_mstate *mt = pari_mt;
420
if (DEBUGLEVEL>0 && nbreq)
421
pari_warn(warner,"%ld discarded threads (MPI)",nbreq);
422
for( ;nbreq>0; nbreq--) recvany_void(&mt->source);
423
pari_free(mt->workid);
424
pari_mt = NULL;
425
}
426
427
void
428
mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)
429
{
430
if (lim==0) lim = pari_mt_nbthreads;
431
else lim = minss(pari_mt_nbthreads, lim);
432
if (pari_mt || pari_MPI_rank || pari_MPI_size <= 2 || lim <= 1)
433
{
434
mt_issingle = 1;
435
mtsingle_queue_start(pt, worker);
436
}
437
else
438
{
439
struct mt_mstate *mt = &pari_mt_data;
440
long i, n = minss(lim, pari_MPI_size-1);
441
long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;
442
long mtparisizemax = GP_DATA->threadsizemax;
443
pari_mt = mt;
444
mt_issingle = 0;
445
mt->workid = (long*) pari_malloc(sizeof(long)*(n+1));
446
for (i=1; i <= n; i++)
447
{
448
send_request_long(PMPI_parisize, mtparisize, i);
449
send_request_long(PMPI_parisizemax, mtparisizemax, i);
450
send_request_long(PMPI_precreal, get_localbitprec(), i);
451
send_request_vlong(PMPI_varpriority,varpriority-1,MAXVARN+2, i);
452
send_request_GEN(PMPI_primetab, primetab, i);
453
send_request_GEN(PMPI_worker, worker, i);
454
}
455
mt->n = n;
456
mt->nbint = 1;
457
mt->source = 1;
458
pt->get=&mtmpi_queue_get;
459
pt->submit=&mtmpi_queue_submit;
460
pt->end=&mt_queue_reset;
461
}
462
}
463
464