/* --------------------------------------------------------------------------
- Time-stamp: <Fri Mar 24 2000 17:42:24 Stardate: [-30]4553.68 hwloidl>
- $Id: ParInit.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
+ Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
+ $Id: ParInit.c,v 1.5 2001/08/14 13:40:10 sewardj Exp $
Initialising the parallel RTS
#ifdef PAR /* whole file */
-#define NON_POSIX_SOURCE /* so says Solaris */
-
//@menu
//* Includes::
//* Global variables::
//@node Includes, Global variables
//@subsection Includes
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
+
+#include <setjmp.h>
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "ParallelRts.h"
-#include <setjmp.h>
+#include "Sparks.h"
#include "LLC.h"
#include "HLC.h"
/* Global conditions defined here. */
-rtsBool IAmMainThread = rtsFalse, /* Set for the main thread */
- GlobalStopPending = rtsFalse; /* Terminating */
+rtsBool IAmMainThread = rtsFalse; /* Set for the main thread */
/* Task identifiers for various interesting global tasks. */
//@cindex advisory_thread_count
nat advisory_thread_count = 0;
+globalAddr theGlobalFromGA;
+
/* For flag handling see RtsFlags.h */
//@node Prototypes
terminate, since STG_Exception will call myexit\tr{(0)} when
it received a PP_FINISH from the system manager task.
*/
-//@cindex par_exit
+//@cindex shutdownParallelSystem
void
shutdownParallelSystem(StgInt n)
{
- belch("==== entered shutdownParallelSystem ...");
- ASSERT(GlobalStopPending = rtsTrue);
- sendOp(PP_FINISH, SysManTask);
- if (n != 0)
- waitForTermination();
- else
- waitForPEOp(PP_FINISH, SysManTask);
- shutDownPE();
+ /* use the file specified via -S */
+ FILE *sf = RtsFlags.GcFlags.statsFile;
+
IF_PAR_DEBUG(verbose,
- belch("--++ shutting down PE %lx, %ld sparks created, %ld sparks Ignored, %ld threads created, %ld threads Ignored",
- (W_) mytid, sparksCreated, sparksIgnored,
- threadsCreated, threadsIgnored));
- if (n!=0)
- exit(n);
+ if (n==0)
+ belch("==== entered shutdownParallelSystem ...");
+ else
+ belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
+ );
+
+ stopPEComms(n);
+
+#if 0
+ if (sf!=(FILE*)NULL)
+ fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored",
+ (W_) mytid, sparksCreated, sparksIgnored,
+ threadsCreated, threadsIgnored);
+#endif
+
+ ShutdownEachPEHook();
}
//@cindex initParallelSystem
/* Don't buffer standard channels... */
setbuf(stdout,NULL);
setbuf(stderr,NULL);
-
- srand48(time(NULL) * getpid()); /*Initialise Random-number generator seed*/
- /* Used to select target of FISH message*/
-
+
+ srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
+ /* used to select target of FISH message*/
if (!InitPackBuffer())
barf("InitPackBuffer");
* manager, and initialises the Global address tables (LAGA & GALA)
*/
-//@cindex SynchroniseSystem
+//@cindex synchroniseSystem
void
-SynchroniseSystem(void)
+synchroniseSystem(void)
{
- int i;
+ /* Only in debug mode? */
+ fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
- fprintf(stderr, "==== SynchroniseSystem: nPEs=%d\n", nPEs);
+ InitEachPEHook(); /* HWL: hook to be execed on each PE */
- initEachPEHook(); /* HWL: hook to be execed on each PE */
+ /* Initialize global address tables */
+ initGAtables();
- fprintf(stderr, "==== SynchroniseSystem: initParallelSystem\n");
initParallelSystem();
- allPEs = startUpPE(nPEs);
+
+ startPEComms();
+}
- /* Initialize global address tables */
- initGAtables();
+/*
+ Do the startup stuff (this is PVM specific!).
+ Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
+ Called at the beginning of RtsStartup.startupHaskell
+*/
+void
+startupParallelSystem(char *argv[]) {
+ mytid = pvm_mytid(); /* Connect to PVM */
+
+ if (*argv[0] == '-') { /* Look to see whether we're the Main Thread */
+ IAmMainThread = rtsTrue;
+ sscanf(argv[0],"-%0X",&SysManTask); /* extract SysMan task ID*/
+ argv++; /* Strip off flag argument */
+ } else {
+ SysManTask = pvm_parent();
+ }
+
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
+ mytid, IAmMainThread?"Main":"Remote", SysManTask));
+
+ nPEs = atoi(argv[1]);
+}
- /* Record the shortened the PE identifiers for LAGA etc. tables */
- for (i = 0; i < nPEs; ++i) {
- fprintf(stderr, "==== [%x] registering %d-th PE as %x\n", mytid, i, allPEs[i]);
- registerTask(allPEs[i]);
+/*
+ Exception handler during startup.
+*/
+void *
+processUnexpectedMessageDuringStartup(rtsPacket p) {
+ OpCode opCode;
+ GlobalTaskId sender_id;
+
+ getOpcodeAndSender(p, &opCode, &sender_id);
+
+ switch(opCode) {
+ case PP_FISH:
+ bounceFish();
+ break;
+#if defined(DIST)
+ case PP_REVAL:
+ bounceReval();
+ break;
+#endif
+ case PP_FINISH:
+ stg_exit(EXIT_SUCCESS);
+ break;
+ default:
+ fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
+ mytid, opCode, getOpName(opCode), sender_id);
+ }
+}
+
+void
+startPEComms(void){
+
+ startUpPE();
+ allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
+ "(PEs)");
+
+ /* Send our tid and IAmMainThread flag back to SysMan */
+ sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);
+ /* Wait until we get the PE-Id table from Sysman */
+ waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup);
+
+ IF_PAR_DEBUG(verbose,
+ belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
+
+ /* Digest the PE table we received */
+ processPEtids();
+}
+
+void
+processPEtids(void) {
+ long newPE;
+ nat i, sentPEs, currentPEs;
+
+ nPEs=0;
+
+ currentPEs = nPEs;
+
+ IF_PAR_DEBUG(verbose,
+ belch("==-- processPEtids: starting to iterate over a PVM buffer"));
+ /* ToDo: this has to go into LLComms !!! */
+ GetArgs(&sentPEs,1);
+
+ ASSERT(sentPEs > currentPEs);
+ ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/
+
+ for (i = 0; i < sentPEs; i++) {
+ GetArgs(&newPE,1);
+ if (i<currentPEs) {
+ ASSERT(newPE == allPEs[i]);
+ } else {
+#if defined(DIST)
+ // breaks with PAR && !DEBUG
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE));
+ if(!looks_like_tid(newPE))
+ barf("unacceptable taskID %x\n",newPE);
+#endif
+ allPEs[i] = newPE;
+ nPEs++;
+ registerTask(newPE);
+ }
}
+
+ IF_PAR_DEBUG(verbose,
+ /* debugging */
+ belch("++++ [%x] PE table as I see it:", mytid);
+ for (i = 0; i < sentPEs; i++) {
+ belch("++++ allPEs[%d] = %x", i, allPEs[i]);
+ });
+}
+
+void
+stopPEComms(StgInt n) {
+ if (n != 0) {
+ /* In case sysman doesn't know about us yet...
+ pvm_initsend(PvmDataDefault);
+ PutArgs(&IAmMainThread,1);
+ pvm_send(SysManTask, PP_READY);
+ */
+ sendOp(PP_READY, SysManTask);
+ }
+
+ sendOp2(PP_FINISH, SysManTask, n, n);
+ waitForPEOp(PP_FINISH, SysManTask, NULL);
+ fflush(gr_file);
+ shutDownPE();
}
#endif /* PAR -- whole file */
//* spark queue:: @cindex\s-+spark queue
//* sparksIgnored:: @cindex\s-+sparksIgnored
//@end index
+