Testing latest pari + WASM + node.js... and it works?! Wow.
License: GPL3
ubuntu2004
/* Copyright (C) 2013 The PARI group.12This file is part of the PARI/GP package.34PARI/GP is free software; you can redistribute it and/or modify it under the5terms of the GNU General Public License as published by the Free Software6Foundation; either version 2 of the License, or (at your option) any later7version. It is distributed in the hope that it will be useful, but WITHOUT8ANY WARRANTY WHATSOEVER.910Check the License for details. You should have received a copy of it, along11with the package; see the file 'COPYING'. If not, write to the Free Software12Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */13#include <pthread.h>14#include "pari.h"15#include "paripriv.h"16#include "mt.h"17#if defined(_WIN32)18# include "../systems/mingw/mingw.h"19#endif2021#define DEBUGLEVEL DEBUGLEVEL_mt2223struct mt_queue24{25long no;26pari_sp avma;27struct pari_mainstack *mainstack;28GEN input, output;29GEN worker;30long workid;31pthread_cond_t cond;32pthread_mutex_t mut;33pthread_cond_t *pcond;34pthread_mutex_t *pmut;35};3637struct mt_pstate38{39pthread_t *th;40struct pari_thread *pth;41struct mt_queue *mq;42long n, nbint, last;43long pending;44pthread_cond_t pcond;45pthread_mutex_t pmut;46};4748static THREAD long mt_thread_no = -1, mt_issingle = 0;49static struct mt_pstate *pari_mt;5051#define LOCK(x) pthread_mutex_lock(x); do52#define UNLOCK(x) while(0); pthread_mutex_unlock(x)5354void55mt_sigint_block(void)56{57if (mt_thread_no>=0)58pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);59}6061void62mt_sigint_unblock(void)63{64if (mt_thread_no>=0)65pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);66}6768void69mt_err_recover(long er)70{71(void) er;72if (mt_thread_no>=0)73{74struct mt_pstate *mt = pari_mt;75struct mt_queue *mq = mt->mq+mt_thread_no;76GEN err = pari_err_last();77err = err_get_num(err)==e_STACK ? err_e_STACK: bin_copy(copy_bin(err));78LOCK(mq->pmut)79{80mq->output = err;81pthread_cond_signal(mq->pcond);82} UNLOCK(mq->pmut);83pthread_exit((void*)1);84}85else if (mt_issingle) mtsingle_err_recover(er);86}8788void89mt_sigint(void)90{91if (pari_mt) pthread_cond_broadcast(&pari_mt->pcond);92}9394int95mt_is_parallel(void)96{97return !!pari_mt;98}99100int101mt_is_thread(void)102{103return mt_thread_no>=0 ? 1: mt_issingle ? mtsingle_is_thread(): 0;104}105106long107mt_nbthreads(void)108{109return pari_mt ? 1: pari_mt_nbthreads;110}111112void113mt_export_add(const char *str, GEN val)114{115if (pari_mt)116pari_err(e_MISC,"export() not allowed during parallel sections");117export_add(str, val);118}119120void121mt_export_del(const char *str)122{123if (pari_mt)124pari_err(e_MISC,"unexport() not allowed during parallel sections");125export_del(str);126}127128void mt_broadcast(GEN code) {(void) code;}129130void pari_mt_init(void)131{132pari_mt = NULL;133mt_issingle = 0;134#ifdef _SC_NPROCESSORS_CONF135if (!pari_mt_nbthreads) pari_mt_nbthreads = sysconf(_SC_NPROCESSORS_CONF);136#elif defined(_WIN32)137if (!pari_mt_nbthreads) pari_mt_nbthreads = win32_nbthreads();138#else139pari_mt_nbthreads = 1;140#endif141}142143void pari_mt_close(void) { }144145static void146mt_queue_cleanup(void *arg)147{148(void) arg;149pari_thread_close();150}151152static void153mt_queue_unlock(void *arg)154{ pthread_mutex_unlock((pthread_mutex_t*) arg); }155156static void*157mt_queue_run(void *arg)158{159GEN args = pari_thread_start((struct pari_thread*) arg);160pari_sp av = avma;161struct mt_queue *mq = (struct mt_queue *) args;162mt_thread_no = mq->no;163pthread_cleanup_push(mt_queue_cleanup,NULL);164LOCK(mq->pmut)165{166mq->mainstack = pari_mainstack;167mq->avma = av;168pthread_cond_signal(mq->pcond);169} UNLOCK(mq->pmut);170for(;;)171{172GEN work, done;173LOCK(&mq->mut)174{175pthread_cleanup_push(mt_queue_unlock, &mq->mut);176while(!mq->input)177pthread_cond_wait(&mq->cond, &mq->mut);178pthread_cleanup_pop(0);179} UNLOCK(&mq->mut);180pari_mainstack = mq->mainstack;181set_avma(mq->avma);182work = mq->input;183pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);184done = closure_callgenvec(mq->worker,work);185pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);186LOCK(mq->pmut)187{188mq->mainstack = pari_mainstack;189mq->avma = av;190mq->input = NULL;191mq->output = done;192pthread_cond_signal(mq->pcond);193} UNLOCK(mq->pmut);194}195pthread_cleanup_pop(1);196#ifdef __GNUC__197return NULL; /* LCOV_EXCL_LINE */198#endif199}200201static long202mt_queue_check(struct mt_pstate *mt)203{204long i;205for(i=0; i<mt->n; i++)206{207struct mt_queue *mq = mt->mq+i;208if (mq->output) return i;209}210return -1;211}212213static GEN214mtpthread_queue_get(struct mt_state *junk, long *workid, long *pending)215{216struct mt_pstate *mt = pari_mt;217struct mt_queue *mq;218GEN done = NULL;219long last;220(void) junk;221if (mt->nbint<mt->n)222{223mt->last = mt->nbint;224*pending = mt->pending;225return NULL;226}227BLOCK_SIGINT_START228LOCK(&mt->pmut)229{230while ((last = mt_queue_check(mt)) < 0)231{232pthread_cond_wait(&mt->pcond, &mt->pmut);233if (PARI_SIGINT_pending)234{235int sig = PARI_SIGINT_pending;236PARI_SIGINT_pending = 0;237pthread_mutex_unlock(&mt->pmut);238PARI_SIGINT_block = 0;239raise(sig);240PARI_SIGINT_block = 1;241pthread_mutex_lock(&mt->pmut);242}243}244} UNLOCK(&mt->pmut);245BLOCK_SIGINT_END246mq = mt->mq+last;247done = gcopy(mq->output);248mq->output = NULL;249if (workid) *workid = mq->workid;250if (typ(done) == t_ERROR)251{252if (err_get_num(done)==e_STACK)253pari_err(e_STACKTHREAD);254else255pari_err(0,done);256}257mt->last = last;258mt->pending--;259*pending = mt->pending;260return done;261}262263static void264mtpthread_queue_submit(struct mt_state *junk, long workid, GEN work)265{266struct mt_pstate *mt = pari_mt;267struct mt_queue *mq = mt->mq+mt->last;268(void) junk;269if (!work) { mt->nbint=mt->n; return; }270BLOCK_SIGINT_START271if (mt->nbint<mt->n)272{273mt->nbint++;274LOCK(mq->pmut)275{276while(!mq->avma)277pthread_cond_wait(mq->pcond, mq->pmut);278} UNLOCK(mq->pmut);279}280LOCK(&mq->mut)281{282mq->output = NULL;283mq->workid = workid;284BLOCK_SIGINT_START285{286pari_sp av = avma;287struct pari_mainstack *st = pari_mainstack;288pari_mainstack = mq->mainstack;289set_avma(mq->avma);290mq->input = gcopy(work);291mq->avma = avma;292mq->mainstack = pari_mainstack;293pari_mainstack = st;294set_avma(av);295}296BLOCK_SIGINT_END297pthread_cond_signal(&mq->cond);298} UNLOCK(&mq->mut);299mt->pending++;300BLOCK_SIGINT_END301}302303void304mt_queue_reset(void)305{306struct mt_pstate *mt = pari_mt;307long i;308BLOCK_SIGINT_START309for (i=0; i<mt->n; i++)310pthread_cancel(mt->th[i]);311for (i=0; i<mt->n; i++)312pthread_join(mt->th[i],NULL);313pari_mt = NULL;314BLOCK_SIGINT_END315if (DEBUGLEVEL) pari_warn(warner,"stopping %ld threads", mt->n);316for (i=0;i<mt->n;i++)317{318struct mt_queue *mq = mt->mq+i;319pthread_cond_destroy(&mq->cond);320pthread_mutex_destroy(&mq->mut);321pari_thread_free(&mt->pth[i]);322}323pari_free(mt->mq);324pari_free(mt->pth);325pari_free(mt->th);326pari_free(mt);327}328329static long330closure_has_clone(GEN fun)331{332if (isclone(fun)) return 1;333if (lg(fun) >= 8)334{335GEN f = closure_get_frame(fun);336long i, l = lg(f);337for (i = 1; i < l; i++)338if (isclone(gel(f,i))) return 1;339}340return 0;341}342343void344mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)345{346if (lim==0) lim = pari_mt_nbthreads;347else lim = minss(pari_mt_nbthreads, lim);348if (pari_mt || lim <= 1)349{350mt_issingle = 1;351mtsingle_queue_start(pt, worker);352}353else354{355struct mt_pstate *mt =356(struct mt_pstate*) pari_malloc(sizeof(struct mt_pstate));357long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;358long mtparisizemax = GP_DATA->threadsizemax;359long i;360if (closure_has_clone(worker))361worker = gcopy(worker); /* to avoid clone_lock race */362mt->mq = (struct mt_queue *) pari_malloc(sizeof(*mt->mq)*lim);363mt->th = (pthread_t *) pari_malloc(sizeof(*mt->th)*lim);364mt->pth = (struct pari_thread *) pari_malloc(sizeof(*mt->pth)*lim);365mt->pending = 0;366mt->n = lim;367mt->nbint = 0;368mt->last = 0;369pthread_cond_init(&mt->pcond,NULL);370pthread_mutex_init(&mt->pmut,NULL);371for (i=0;i<lim;i++)372{373struct mt_queue *mq = mt->mq+i;374mq->no = i;375mq->avma = 0;376mq->mainstack = NULL;377mq->worker = worker;378mq->input = NULL;379mq->output = NULL;380mq->pcond = &mt->pcond;381mq->pmut = &mt->pmut;382pthread_cond_init(&mq->cond,NULL);383pthread_mutex_init(&mq->mut,NULL);384if (mtparisizemax)385pari_thread_valloc(&mt->pth[i],mtparisize,mtparisizemax,(GEN)mq);386else387pari_thread_alloc(&mt->pth[i],mtparisize,(GEN)mq);388}389if (DEBUGLEVEL) pari_warn(warner,"starting %ld threads", lim);390BLOCK_SIGINT_START391for (i=0;i<lim;i++)392pthread_create(&mt->th[i],NULL, &mt_queue_run, (void*)&mt->pth[i]);393pari_mt = mt;394mt_issingle = 0;395BLOCK_SIGINT_END396pt->get=&mtpthread_queue_get;397pt->submit=&mtpthread_queue_submit;398pt->end=&mt_queue_reset;399}400}401402403