Testing latest pari + WASM + node.js... and it works?! Wow.
License: GPL3
ubuntu2004
/* Copyright (C) 2013 The PARI group.12This file is part of the PARI/GP package.34PARI/GP is free software; you can redistribute it and/or modify it under the5terms of the GNU General Public License as published by the Free Software6Foundation; either version 2 of the License, or (at your option) any later7version. It is distributed in the hope that it will be useful, but WITHOUT8ANY WARRANTY WHATSOEVER.910Check the License for details. You should have received a copy of it, along11with the package; see the file 'COPYING'. If not, write to the Free Software12Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */13#include <mpi.h>14#include "pari.h"15#include "paripriv.h"16#include "../src/language/anal.h"17#include "mt.h"1819#define DEBUGLEVEL DEBUGLEVEL_mt2021static THREAD int pari_MPI_size, pari_MPI_rank;22static THREAD long nbreq = 0, mt_issingle = 0;2324enum PMPI_cmd { PMPI_close, PMPI_worker, PMPI_work, PMPI_parisizemax,25PMPI_parisize, PMPI_precreal, PMPI_primetab,26PMPI_varpriority, PMPI_eval,27PMPI_exportadd, PMPI_exportdel};2829struct mt_mstate30{31long n;32int source;33long nbint;34long *workid;35};3637static struct mt_mstate pari_mt_data;38static struct mt_mstate *pari_mt;3940static void41send_long(long a, long dest)42{43BLOCK_SIGINT_START44MPI_Send(&a, 1, MPI_LONG, dest, 0, MPI_COMM_WORLD);45BLOCK_SIGINT_END46}4748static void49send_vlong(long *a, long n, long dest)50{51BLOCK_SIGINT_START52MPI_Send(a, n, MPI_LONG, dest, 0, MPI_COMM_WORLD);53BLOCK_SIGINT_END54}5556static void57send_request(enum PMPI_cmd ecmd, long dest)58{59send_long((long)ecmd, dest);60}6162static void63send_GEN(GEN elt, int dest)64{65pari_sp av = avma;66int size;67GEN reloc = copybin_unlink(elt);68GENbin *buf = copy_bin_canon(mkvec2(elt,reloc));69size = sizeof(GENbin) + buf->len*sizeof(ulong);70{71BLOCK_SIGINT_START72MPI_Send(buf, size, MPI_CHAR, dest, 0, MPI_COMM_WORLD);73BLOCK_SIGINT_END74}75pari_free(buf); set_avma(av);76}7778static void79send_request_GEN(enum PMPI_cmd ecmd, GEN elt, int dest)80{81send_request(ecmd, dest);82send_GEN(elt, dest);83}8485static void86send_request_long(enum PMPI_cmd ecmd, long elt, int dest)87{88send_request(ecmd, dest);89send_long(elt, dest);90}9192static void93send_request_vlong(enum PMPI_cmd ecmd, long *a, long n, int dest)94{95send_request(ecmd, dest);96send_vlong(a, n, dest);97}9899static long100recvfrom_long(int src)101{102long a;103BLOCK_SIGINT_START104MPI_Recv(&a, 1, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);105BLOCK_SIGINT_END106return a;107}108109static void110recvfrom_vlong(long *a, long n, int src)111{112BLOCK_SIGINT_START113MPI_Recv(a, n, MPI_LONG, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);114BLOCK_SIGINT_END115}116117static enum PMPI_cmd118recvfrom_request(int src)119{120return (enum PMPI_cmd) recvfrom_long(src);121}122123static GENbin *124recvstatus_buf(int source, MPI_Status *status)125{126int size;127GENbin *buf;128BLOCK_SIGINT_START129130MPI_Get_count(status, MPI_CHAR, &size);131buf = (GENbin *)pari_malloc(size);132MPI_Recv(buf, size, MPI_CHAR, source, 0/* tag */,133MPI_COMM_WORLD, MPI_STATUS_IGNORE);134BLOCK_SIGINT_END135return buf;136}137138static GEN139recvstatus_GEN(int source, MPI_Status *status)140{141GEN res;142GENbin *buf = recvstatus_buf(source, status);143buf->rebase = &shiftaddress_canon;144res = bin_copy(buf);145bincopy_relink(gel(res,1),gel(res,2));146return gel(res,1);147}148149static void150recvstatus_void(int source, MPI_Status *status)151{152GENbin *buf = recvstatus_buf(source, status);153free(buf);154}155156static GEN157recvfrom_GEN(int src)158{159MPI_Status status;160BLOCK_SIGINT_START161MPI_Probe(src, 0, MPI_COMM_WORLD, &status);162BLOCK_SIGINT_END163return recvstatus_GEN(src, &status);164}165166static GEN167recvany_GEN(int *source)168{169MPI_Status status;170BLOCK_SIGINT_START171MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);172*source = status.MPI_SOURCE;173BLOCK_SIGINT_END174return recvstatus_GEN(*source, &status);175}176177static void178recvany_void(int *source)179{180MPI_Status status;181BLOCK_SIGINT_START182MPI_Probe(MPI_ANY_SOURCE, 0 /* tag */, MPI_COMM_WORLD, &status);183*source = status.MPI_SOURCE;184BLOCK_SIGINT_END185recvstatus_void(*source, &status);186}187188static jmp_buf child_env;189190static void191pari_MPI_child(void)192{193pari_sp av = avma;194ulong rsize = 0, vsize = 0;195GEN worker = NULL, work, done;196struct gp_context rec;197pari_mt_nbthreads = 1;198gp_context_save(&rec);199if (setjmp(child_env))200{201send_GEN(pari_err_last(), 0);202gp_context_restore(&rec);203}204while (1)205switch (recvfrom_request(0))206{207case PMPI_worker:208paristack_setsize(rsize, vsize);209gp_context_save(&rec);210worker = recvfrom_GEN(0);211av = avma;212break;213case PMPI_work:214work = recvfrom_GEN(0);215done = closure_callgenvec(worker, work);216send_GEN(done, 0);217set_avma(av);218break;219case PMPI_parisizemax:220vsize = recvfrom_long(0);221break;222case PMPI_parisize:223rsize = recvfrom_long(0);224break;225case PMPI_precreal:226precreal = recvfrom_long(0);227break;228case PMPI_primetab:229{230pari_sp ltop = avma;231GEN tab = recvfrom_GEN(0);232if (!gequal(tab, primetab))233{234long i, l = lg(tab);235GEN old = primetab, t = cgetg_block(l, t_VEC);236for (i = 1; i < l; i++) gel(t,i) = gclone(gel(tab,i));237primetab = t;238gunclone_deep(old);239}240set_avma(ltop);241}242break;243case PMPI_eval:244(void) closure_evalgen(recvfrom_GEN(0));245set_avma(av);246break;247case PMPI_varpriority:248recvfrom_vlong(varpriority-1,MAXVARN+2,0);249break;250case PMPI_exportadd:251{252GEN str = recvfrom_GEN(0);253GEN val = recvfrom_GEN(0);254entree *ep = fetch_entry(GSTR(str));255export_add(ep->name, val);256set_avma(av);257break;258}259case PMPI_exportdel:260{261GEN str = recvfrom_GEN(0);262entree *ep = fetch_entry(GSTR(str));263export_del(ep->name);264set_avma(av);265break;266}267case PMPI_close:268MPI_Barrier(MPI_COMM_WORLD);269MPI_Finalize();270exit(0);271break;272}273}274275void276mt_err_recover(long er)277{278if (pari_MPI_rank) longjmp(child_env,er);279else if (mt_issingle) mtsingle_err_recover(er);280}281void mt_sigint_block(void) { }282void mt_sigint_unblock(void) { }283void mt_sigint(void) {}284285int286mt_is_parallel(void)287{288return !!pari_mt;289}290291int292mt_is_thread(void)293{294return pari_MPI_rank ? 1 : mt_issingle ? mtsingle_is_thread(): 0;295}296297long298mt_nbthreads(void)299{300return pari_mt || pari_MPI_rank || pari_MPI_size <= 2 ? 1: pari_mt_nbthreads;301}302303void304mt_export_add(const char *str, GEN val)305{306pari_sp av = avma;307long i, n = pari_MPI_size-1;308GEN s;309if (pari_mt || pari_MPI_rank)310pari_err(e_MISC,"export not allowed during parallel sections");311export_add(str, val);312s = strtoGENstr(str);313for (i=1; i <= n; i++)314{315send_request(PMPI_exportadd, i);316send_GEN(s, i);317send_GEN(val, i);318}319set_avma(av);320}321322void323mt_export_del(const char *str)324{325pari_sp av = avma;326long i, n = pari_MPI_size-1;327GEN s;328if (pari_MPI_rank)329pari_err(e_MISC,"unexport not allowed during parallel sections");330export_del(str);331s = strtoGENstr(str);332for (i=1; i <= n; i++)333send_request_GEN(PMPI_exportdel, s, i);334set_avma(av);335}336337void338mt_broadcast(GEN code)339{340long i;341if (!pari_MPI_rank && !pari_mt)342for (i=1;i<pari_MPI_size;i++)343send_request_GEN(PMPI_eval, code, i);344}345346void347pari_mt_init(void)348{349int res = MPI_Init(0, NULL);350if (res == MPI_SUCCESS)351{352MPI_Comm_size(MPI_COMM_WORLD, &pari_MPI_size);353MPI_Comm_rank(MPI_COMM_WORLD, &pari_MPI_rank);354if (pari_MPI_rank) pari_MPI_child();355#ifdef _IOFBF356/* HACK: most MPI implementation does not handle stdin well.357stdinsize is sufficient for the largest test file to fit */358setvbuf(stdin,pari_malloc(128*1024),_IOFBF,128*1024);359#endif360if (!pari_mt_nbthreads)361pari_mt_nbthreads = maxss(1, pari_MPI_size-1);362}363else364{365pari_MPI_size = 0;366pari_MPI_rank = 0;367pari_mt_nbthreads = 1;368}369pari_mt = NULL;370}371372void373pari_mt_close(void)374{375long i;376if (!pari_MPI_rank)377for (i = 1; i < pari_MPI_size; i++)378send_request(PMPI_close, i);379MPI_Barrier(MPI_COMM_WORLD);380MPI_Finalize();381}382383static GEN384mtmpi_queue_get(struct mt_state *junk, long *workid, long *pending)385{386struct mt_mstate *mt = pari_mt;387GEN done;388(void) junk;389if (mt->nbint<=mt->n) { mt->source=mt->nbint; *pending = nbreq; return NULL; }390done = recvany_GEN(&mt->source);391nbreq--; *pending = nbreq;392if (workid) *workid = mt->workid[mt->source];393if (typ(done) == t_ERROR)394{395if (err_get_num(done)==e_STACK)396pari_err(e_STACKTHREAD);397else398pari_err(0,done);399}400return done;401}402403static void404mtmpi_queue_submit(struct mt_state *junk, long workid, GEN work)405{406struct mt_mstate *mt = pari_mt;407(void) junk;408if (!work) { mt->nbint=mt->n+1; return; }409if (mt->nbint<=mt->n) mt->nbint++;410nbreq++;411mt->workid[mt->source] = workid;412send_request_GEN(PMPI_work, work, mt->source);413}414415void416mt_queue_reset(void)417{418struct mt_mstate *mt = pari_mt;419if (DEBUGLEVEL>0 && nbreq)420pari_warn(warner,"%ld discarded threads (MPI)",nbreq);421for( ;nbreq>0; nbreq--) recvany_void(&mt->source);422pari_free(mt->workid);423pari_mt = NULL;424}425426void427mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)428{429if (lim==0) lim = pari_mt_nbthreads;430else lim = minss(pari_mt_nbthreads, lim);431if (pari_mt || pari_MPI_rank || pari_MPI_size <= 2 || lim <= 1)432{433mt_issingle = 1;434mtsingle_queue_start(pt, worker);435}436else437{438struct mt_mstate *mt = &pari_mt_data;439long i, n = minss(lim, pari_MPI_size-1);440long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;441long mtparisizemax = GP_DATA->threadsizemax;442pari_mt = mt;443mt_issingle = 0;444mt->workid = (long*) pari_malloc(sizeof(long)*(n+1));445for (i=1; i <= n; i++)446{447send_request_long(PMPI_parisize, mtparisize, i);448send_request_long(PMPI_parisizemax, mtparisizemax, i);449send_request_long(PMPI_precreal, get_localbitprec(), i);450send_request_vlong(PMPI_varpriority,varpriority-1,MAXVARN+2, i);451send_request_GEN(PMPI_primetab, primetab, i);452send_request_GEN(PMPI_worker, worker, i);453}454mt->n = n;455mt->nbint = 1;456mt->source = 1;457pt->get=&mtmpi_queue_get;458pt->submit=&mtmpi_queue_submit;459pt->end=&mt_queue_reset;460}461}462463464