/* --------------------------------------------------------------------------
- Time-stamp: <Sat Dec 04 1999 18:26:22 Stardate: [-30]3998.84 hwloidl>
- $Id: ParInit.c,v 1.2 2000/01/13 14:34:08 hwloidl Exp $
+ Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
Initialising the parallel RTS
#ifdef PAR /* whole file */
-#define NON_POSIX_SOURCE /* so says Solaris */
-
//@menu
//* Includes::
//* Global variables::
//@node Includes, Global variables
//@subsection Includes
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
+
+#include <setjmp.h>
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "ParallelRts.h"
-#include <setjmp.h>
+#include "Sparks.h"
#include "LLC.h"
#include "HLC.h"
/* Global conditions defined here. */
-rtsBool IAmMainThread = rtsFalse, /* Set for the main thread */
- GlobalStopPending = rtsFalse; /* Terminating */
+rtsBool IAmMainThread = rtsFalse; /* Set for the main thread */
/* Task identifiers for various interesting global tasks. */
see RtsFlags.ParFlags.maxLocalSparks */
nat spark_limit[SPARK_POOLS];
-globalAddr theGlobalFromGA, theGlobalToGA;
-/*
- HAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACK !! see FETCH_ME_entry
- Only used within FETCH_ME_entry as local vars, but they shouldn't
- be defined locally in there -- that would move %esp and you'll never
- return from STG land.
- -- HWL
-*/
-globalAddr *rga_GLOBAL;
-globalAddr *lga_GLOBAL;
-globalAddr fmbqga_GLOBAL;
-StgClosure *p_GLOBAL;
-
//@cindex PendingFetches
/* A list of fetch reply messages not yet processed; this list is filled
by awaken_blocked_queue and processed by processFetches */
//@cindex advisory_thread_count
nat advisory_thread_count = 0;
-/* Where to write the log file
- This is now in Parallel.c
-FILE *gr_file = NULL;
-char gr_filename[STATS_FILENAME_MAXLEN];
-*/
+globalAddr theGlobalFromGA;
-/* Flag handling. */
+/* For flag handling see RtsFlags.h */
-#if 0
-/* that's now all done via RtsFlags.ParFlags... */
-rtsBool TraceSparks = rtsFalse; /* Enable the spark trace mode */
-rtsBool SparkLocally = rtsFalse; /* Use local threads if possible */
-rtsBool DelaySparks = rtsFalse; /* Use delayed sparking */
-rtsBool LocalSparkStrategy = rtsFalse; /* Either delayed threads or local threads*/
-rtsBool GlobalSparkStrategy = rtsFalse; /* Export all threads */
-
-rtsBool DeferGlobalUpdates = rtsFalse; /* Defer updating of global nodes */
-#endif
+//@node Prototypes
+//@subsection Prototypes
+
+/* Needed for FISH messages (initialisation of random number generator) */
+void srand48 (long);
+time_t time (time_t *);
//@node Initialisation Routines, , Global variables
//@subsection Initialisation Routines
terminate, since STG_Exception will call myexit\tr{(0)} when
it received a PP_FINISH from the system manager task.
*/
-//@cindex par_exit
+//@cindex shutdownParallelSystem
void
shutdownParallelSystem(StgInt n)
{
- belch(" entered shutdownParallelSystem ...");
- ASSERT(GlobalStopPending = rtsTrue);
- sendOp(PP_FINISH, SysManTask);
- if (n != 0)
- waitForTermination();
- else
- waitForPEOp(PP_FINISH, SysManTask);
- shutDownPE();
+ /* use the file specified via -S */
+ FILE *sf = RtsFlags.GcFlags.statsFile;
+
IF_PAR_DEBUG(verbose,
- belch("--++ shutting down PE %lx, %ld sparks created, %ld sparks Ignored, %ld threads created, %ld threads Ignored",
- (W_) mytid, sparksCreated, sparksIgnored,
- threadsCreated, threadsIgnored));
- exit(n);
-}
+ if (n==0)
+ belch("==== entered shutdownParallelSystem ...");
+ else
+ belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
+ );
+
+ stopPEComms(n);
-/* Some prototypes */
-void srand48 (long);
-time_t time (time_t *);
+#if 0
+ if (sf!=(FILE*)NULL)
+ fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored",
+ (W_) mytid, sparksCreated, sparksIgnored,
+ threadsCreated, threadsIgnored);
+#endif
+
+ ShutdownEachPEHook();
+}
//@cindex initParallelSystem
void
initParallelSystem(void)
{
- belch("entered initParallelSystem ...");
-
/* Don't buffer standard channels... */
setbuf(stdout,NULL);
setbuf(stderr,NULL);
+
+ srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
+ /* used to select target of FISH message*/
+ if (!InitPackBuffer())
+ barf("InitPackBuffer");
- srand48(time(NULL) * getpid()); /*Initialise Random-number generator seed*/
- /* Used to select target of FISH message*/
-
- theGlobalFromGA.payload.gc.gtid = 0;
- theGlobalToGA.payload.gc.gtid = 0;
-
- //IF_PAR_DEBUG(verbose,
- belch("initPackBuffer ...");
- if (!initPackBuffer())
- barf("initPackBuffer");
-
- // IF_PAR_DEBUG(verbose,
- belch("initMoreBuffers ...");
if (!initMoreBuffers())
barf("initMoreBuffers");
- // IF_PAR_DEBUG(verbose,
- belch("initSparkPools ...");
if (!initSparkPools())
barf("initSparkPools");
}
* manager, and initialises the Global address tables (LAGA & GALA)
*/
-//@cindex SynchroniseSystem
+//@cindex synchroniseSystem
void
-SynchroniseSystem(void)
+synchroniseSystem(void)
{
- int i;
+ /* Only in debug mode? */
+ fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
- fprintf(stderr, "SynchroniseSystem: nPEs=%d\n", nPEs);
+ InitEachPEHook(); /* HWL: hook to be execed on each PE */
- initEachPEHook(); /* HWL: hook to be execed on each PE */
+ /* Initialize global address tables */
+ initGAtables();
- fprintf(stderr, "SynchroniseSystem: initParallelSystem\n");
initParallelSystem();
- allPEs = startUpPE(nPEs);
+
+ startPEComms();
+}
- /* Initialize global address tables */
- initGAtables();
+/*
+ Do the startup stuff (this is PVM specific!).
+ Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
+ Called at the beginning of RtsStartup.startupHaskell
+*/
+void
+startupParallelSystem(char *argv[]) {
+ mytid = pvm_mytid(); /* Connect to PVM */
+
+ if (*argv[0] == '-') { /* Look to see whether we're the Main Thread */
+ IAmMainThread = rtsTrue;
+ sscanf(argv[0],"-%0X",&SysManTask); /* extract SysMan task ID*/
+ argv++; /* Strip off flag argument */
+ } else {
+ SysManTask = pvm_parent();
+ }
+
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
+ mytid, IAmMainThread?"Main":"Remote", SysManTask));
+
+ nPEs = atoi(argv[1]);
+}
+
+/*
+ Exception handler during startup.
+*/
+void *
+processUnexpectedMessageDuringStartup(rtsPacket p) {
+ OpCode opCode;
+ GlobalTaskId sender_id;
+
+ getOpcodeAndSender(p, &opCode, &sender_id);
+
+ switch(opCode) {
+ case PP_FISH:
+ bounceFish();
+ break;
+#if defined(DIST)
+ case PP_REVAL:
+ bounceReval();
+ break;
+#endif
+ case PP_FINISH:
+ stg_exit(EXIT_SUCCESS);
+ break;
+ default:
+ fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
+ mytid, opCode, getOpName(opCode), sender_id);
+ }
+}
+
+void
+startPEComms(void){
+
+ startUpPE();
+ allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
+ "(PEs)");
+
+ /* Send our tid and IAmMainThread flag back to SysMan */
+ sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);
+ /* Wait until we get the PE-Id table from Sysman */
+ waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup);
+
+ IF_PAR_DEBUG(verbose,
+ belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
+
+ /* Digest the PE table we received */
+ processPEtids();
+}
- /* Record the shortened the PE identifiers for LAGA etc. tables */
- for (i = 0; i < nPEs; ++i) {
- fprintf(stderr, "[%x] registering %d-th PE as %x\n", mytid, i, allPEs[i]);
- registerTask(allPEs[i]);
+void
+processPEtids(void) {
+ long newPE;
+ nat i, sentPEs, currentPEs;
+
+ nPEs=0;
+
+ currentPEs = nPEs;
+
+ IF_PAR_DEBUG(verbose,
+ belch("==-- processPEtids: starting to iterate over a PVM buffer"));
+ /* ToDo: this has to go into LLComms !!! */
+ GetArgs(&sentPEs,1);
+
+ ASSERT(sentPEs > currentPEs);
+ ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/
+
+ for (i = 0; i < sentPEs; i++) {
+ GetArgs(&newPE,1);
+ if (i<currentPEs) {
+ ASSERT(newPE == allPEs[i]);
+ } else {
+#if defined(DIST)
+ // breaks with PAR && !DEBUG
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE));
+ if(!looks_like_tid(newPE))
+ barf("unacceptable taskID %x\n",newPE);
+#endif
+ allPEs[i] = newPE;
+ nPEs++;
+ registerTask(newPE);
+ }
}
+
+ IF_PAR_DEBUG(verbose,
+ /* debugging */
+ belch("++++ [%x] PE table as I see it:", mytid);
+ for (i = 0; i < sentPEs; i++) {
+ belch("++++ allPEs[%d] = %x", i, allPEs[i]);
+ });
+}
+
+void
+stopPEComms(StgInt n) {
+ if (n != 0) {
+ /* In case sysman doesn't know about us yet...
+ pvm_initsend(PvmDataDefault);
+ PutArgs(&IAmMainThread,1);
+ pvm_send(SysManTask, PP_READY);
+ */
+ sendOp(PP_READY, SysManTask);
+ }
+
+ sendOp2(PP_FINISH, SysManTask, n, n);
+ waitForPEOp(PP_FINISH, SysManTask, NULL);
+ fflush(gr_file);
+ shutDownPE();
}
#endif /* PAR -- whole file */
//* spark queue:: @cindex\s-+spark queue
//* sparksIgnored:: @cindex\s-+sparksIgnored
//@end index
+