[project @ 2005-02-02 12:41:50 by simonmar]
[ghc-hetmet.git] / ghc / rts / parallel / ParInit.c
index 1a3abb5..22c9119 100644 (file)
@@ -1,6 +1,5 @@
 /* --------------------------------------------------------------------------
-   Time-stamp: <Fri Mar 24 2000 17:42:24 Stardate: [-30]4553.68 hwloidl>
-   $Id: ParInit.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
+   Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
 
    Initialising the parallel RTS
 
@@ -14,8 +13,6 @@
 
 #ifdef PAR /* whole file */
 
-#define NON_POSIX_SOURCE /* so says Solaris */
-
 //@menu
 //* Includes::                 
 //* Global variables::         
 //@node Includes, Global variables
 //@subsection Includes
 
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
+
+#include <setjmp.h>
 #include "Rts.h"
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParallelRts.h"
-#include <setjmp.h>
+#include "Sparks.h"
 #include "LLC.h"
 #include "HLC.h"
 
@@ -38,8 +39,7 @@
 
 /* Global conditions defined here. */
 
-rtsBool        IAmMainThread = rtsFalse,       /* Set for the main thread      */
-       GlobalStopPending = rtsFalse;   /* Terminating                  */
+rtsBool        IAmMainThread = rtsFalse;       /* Set for the main thread      */
 
 /* Task identifiers for various interesting global tasks. */
 
@@ -86,6 +86,8 @@ nat sparksIgnored = 0, sparksCreated = 0,
 //@cindex advisory_thread_count
 nat advisory_thread_count = 0;
 
+globalAddr theGlobalFromGA;
+
 /* For flag handling see RtsFlags.h */
 
 //@node Prototypes
@@ -106,24 +108,30 @@ time_t time (time_t *);
   terminate, since STG_Exception will call myexit\tr{(0)} when
   it received a PP_FINISH from the system manager task.
 */
-//@cindex par_exit
+//@cindex shutdownParallelSystem
 void
 shutdownParallelSystem(StgInt n)
 {
-  belch("==== entered shutdownParallelSystem ...");
-  ASSERT(GlobalStopPending = rtsTrue);
-  sendOp(PP_FINISH, SysManTask);
-  if (n != 0) 
-    waitForTermination();
-  else
-    waitForPEOp(PP_FINISH, SysManTask);
-  shutDownPE();
+  /* use the file specified via -S */ 
+  FILE *sf = RtsFlags.GcFlags.statsFile;
+
   IF_PAR_DEBUG(verbose,
-              belch("--++ shutting down PE %lx, %ld sparks created, %ld sparks Ignored, %ld threads created, %ld threads Ignored", 
-                    (W_) mytid, sparksCreated, sparksIgnored,
-                    threadsCreated, threadsIgnored));
-  if (n!=0)
-    exit(n);
+              if (n==0)
+                belch("==== entered shutdownParallelSystem ...");
+               else
+                belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
+              );
+  
+  stopPEComms(n);
+
+#if 0
+  if (sf!=(FILE*)NULL) 
+    fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored", 
+           (W_) mytid, sparksCreated, sparksIgnored,
+           threadsCreated, threadsIgnored);
+#endif
+
+  ShutdownEachPEHook();
 }
 
 //@cindex initParallelSystem
@@ -133,10 +141,9 @@ initParallelSystem(void)
   /* Don't buffer standard channels... */
   setbuf(stdout,NULL);
   setbuf(stderr,NULL);
-
-  srand48(time(NULL) * getpid());  /*Initialise Random-number generator seed*/
-                                   /* Used to select target of FISH message*/
-
+  
+  srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
+                                  /* used to select target of FISH message*/
   if (!InitPackBuffer())
     barf("InitPackBuffer");
 
@@ -152,28 +159,152 @@ initParallelSystem(void)
  * manager, and initialises the Global address tables (LAGA & GALA)
  */
 
-//@cindex SynchroniseSystem
+//@cindex synchroniseSystem
 void
-SynchroniseSystem(void)
+synchroniseSystem(void)
 {
-  int i;
+  /* Only in debug mode? */
+  fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
 
-  fprintf(stderr, "==== SynchroniseSystem: nPEs=%d\n", nPEs); 
+  InitEachPEHook();                  /* HWL: hook to be execed on each PE */
 
-  initEachPEHook();                  /* HWL: hook to be execed on each PE */
+  /* Initialize global address tables */
+  initGAtables();
 
-  fprintf(stderr, "==== SynchroniseSystem: initParallelSystem\n");
   initParallelSystem();
-  allPEs = startUpPE(nPEs);
+  
+  startPEComms();
+}
 
-  /* Initialize global address tables */
-  initGAtables();
+/* 
+  Do the startup stuff (this is PVM specific!).
+  Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
+  Called at the beginning of RtsStartup.startupHaskell
+*/
+void 
+startupParallelSystem(char *argv[]) { 
+ mytid = pvm_mytid();          /* Connect to PVM */
+
+ if (*argv[0] == '-') {         /* Look to see whether we're the Main Thread */
+  IAmMainThread = rtsTrue;
+  sscanf(argv[0],"-%0X",&SysManTask);  /* extract SysMan task ID*/     
+  argv++;                             /* Strip off flag argument */
+ } else {
+  SysManTask = pvm_parent();
+ }
+
+ IF_PAR_DEBUG(verbose,
+              fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
+                      mytid, IAmMainThread?"Main":"Remote", SysManTask));
+
+ nPEs = atoi(argv[1]);
+}
 
-  /* Record the shortened the PE identifiers for LAGA etc. tables */
-  for (i = 0; i < nPEs; ++i) {
-    fprintf(stderr, "==== [%x] registering %d-th PE as %x\n", mytid, i, allPEs[i]);
-    registerTask(allPEs[i]);
+/* 
+   Exception handler during startup.
+*/
+void *
+processUnexpectedMessageDuringStartup(rtsPacket p) {
+  OpCode opCode;
+  GlobalTaskId sender_id;
+
+  getOpcodeAndSender(p, &opCode, &sender_id);
+
+  switch(opCode) { 
+      case PP_FISH:
+        bounceFish();
+       break;
+#if defined(DIST)
+      case PP_REVAL:
+       bounceReval();
+       break;
+#endif
+      case PP_FINISH:
+        stg_exit(EXIT_SUCCESS);        
+       break;
+      default:
+       fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
+               mytid, opCode, getOpName(opCode), sender_id);
+    }
+}
+
+void 
+startPEComms(void){ 
+
+  startUpPE(); 
+  allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
+                                          "(PEs)");
+  
+  /* Send our tid and IAmMainThread flag back to SysMan */
+  sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);  
+  /* Wait until we get the PE-Id table from Sysman */    
+  waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup); 
+
+  IF_PAR_DEBUG(verbose,
+               belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
+
+  /* Digest the PE table we received */
+  processPEtids();
+}
+
+void
+processPEtids(void) { 
+  long newPE;
+  nat i, sentPEs, currentPEs;
+
+  nPEs=0;
+         
+  currentPEs = nPEs;
+
+  IF_PAR_DEBUG(verbose,
+               belch("==-- processPEtids: starting to iterate over a PVM buffer"));
+  /* ToDo: this has to go into LLComms !!! */
+  GetArgs(&sentPEs,1);
+
+  ASSERT(sentPEs > currentPEs);
+  ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/  
+  
+  for (i = 0; i < sentPEs; i++) { 
+    GetArgs(&newPE,1);
+    if (i<currentPEs) { 
+      ASSERT(newPE == allPEs[i]);
+    } else { 
+#if defined(DIST)
+      // breaks with PAR && !DEBUG
+      IF_PAR_DEBUG(verbose,
+       fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE)); 
+      if(!looks_like_tid(newPE))
+         barf("unacceptable taskID %x\n",newPE);
+#endif
+      allPEs[i] = newPE;      
+      nPEs++;
+      registerTask(newPE); 
+    }
   }
+
+  IF_PAR_DEBUG(verbose,
+              /* debugging */
+              belch("++++ [%x] PE table as I see it:", mytid);
+              for (i = 0; i < sentPEs; i++) { 
+                belch("++++ allPEs[%d] = %x", i, allPEs[i]);
+               });
+}
+
+void 
+stopPEComms(StgInt n) { 
+  if (n != 0) { 
+    /* In case sysman doesn't know about us yet...
+    pvm_initsend(PvmDataDefault);
+    PutArgs(&IAmMainThread,1);
+    pvm_send(SysManTask, PP_READY);
+     */
+    sendOp(PP_READY, SysManTask);  
+  } 
+  
+  sendOp2(PP_FINISH, SysManTask, n, n);  
+  waitForPEOp(PP_FINISH, SysManTask, NULL);
+  fflush(gr_file);
+  shutDownPE();
 }
 
 #endif /* PAR -- whole file */
@@ -188,3 +319,4 @@ SynchroniseSystem(void)
 //* spark queue::  @cindex\s-+spark queue
 //* sparksIgnored::  @cindex\s-+sparksIgnored
 //@end index
+