[project @ 2001-03-22 03:51:08 by hwloidl]
[ghc-hetmet.git] / ghc / rts / parallel / LLComms.c
index 3790890..84f5ff9 100644 (file)
@@ -1,14 +1,14 @@
 /* ----------------------------------------------------------------------------
- * Time-stamp: <Tue Mar 21 2000 20:23:41 Stardate: [-30]4539.24 hwloidl>
- * $Id: LLComms.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
+ * Time-stamp: <Mon Mar 19 2001 22:10:38 Stardate: [-30]6354.62 hwloidl>
+ * $Id: LLComms.c,v 1.4 2001/03/22 03:51:11 hwloidl Exp $
  *
  * GUM Low-Level Inter-Task Communication
  *
  * This module defines PVM Routines for PE-PE  communication.
+ *
  * P. Trinder, December 5th. 1994.
- * Adapted for the new RTS 
  * P. Trinder, July 1998
- * H-W. Loidl, November 1999
+ * H-W. Loidl, November 1999 -
  --------------------------------------------------------------------------- */
 
 #ifdef PAR /* whole file */
@@ -227,7 +227,7 @@ sendOpNV(OpCode op, GlobalTaskId task, int nelem,
 
     traceSendOp(op, task, 0, 0);
     IF_PAR_DEBUG(trace,
-                fprintf(stderr,"sendOpNV: op = %x (%s), task = %x, narg = %d, nelem = %d",
+                fprintf(stderr,"~~ sendOpNV: op = %x (%s), task = %x, narg = %d, nelem = %d",
                       op, getOpName(op), task, narg, nelem));
 
     pvm_initsend(PvmDataRaw);
@@ -235,7 +235,7 @@ sendOpNV(OpCode op, GlobalTaskId task, int nelem,
     for (i = 0; i < narg; ++i) {
        arg = va_arg(ap, StgWord);
         IF_PAR_DEBUG(trace,
-                    fprintf(stderr,"sendOpNV: arg = %d\n",arg));
+                    fprintf(stderr,"~~ sendOpNV: arg = %d\n",arg));
        PutArgN(i, arg);
     }
     arg = (StgWord) nelem;
@@ -272,13 +272,32 @@ sendOpN(OpCode op, GlobalTaskId task, int n, StgPtr args)
     pvm_send(task, op);
 }
 
+/*    
+ * broadcastOpN is as sendOpN but broadcasts to all members of a group.
+ */
+
+void
+broadcastOpN(OpCode op, char *group, int n, StgPtr args)
+{
+  long arg;
+
+  //traceSendOp(op, task, 0, 0);
+  
+  pvm_initsend(PvmDataRaw);
+  arg = (long) n;
+  PutArgN(0, arg);
+  PutArgs(args, n);
+  pvm_bcast(group, op);
+}
+
 /*
- * waitForPEOp waits for a packet from global task {\em who} with the
- * OpCode {\em op}.  Other OpCodes are handled by processUnexpected.
+   waitForPEOp waits for a packet from global task who with the
+   OpCode op.  If ignore is true all other messages are simply ignored; 
+   otherwise they are handled by processUnexpected.
  */
 //@cindex waitForPEOp
 rtsPacket 
-waitForPEOp(OpCode op, GlobalTaskId who)
+waitForPEOp(OpCode op, GlobalTaskId who, void(*processUnexpected)(rtsPacket) )
 {
   rtsPacket p;
   int nbytes;
@@ -286,44 +305,52 @@ waitForPEOp(OpCode op, GlobalTaskId who)
   GlobalTaskId sender_id;
   rtsBool match;
 
-  do {
-    IF_PAR_DEBUG(verbose,
-                 fprintf(stderr,"waitForPEOp: op = %x (%s), who = %x\n", 
-                         op, getOpName(op), who)); 
+  IF_PAR_DEBUG(verbose,
+              fprintf(stderr,"~~ waitForPEOp: expecting op = %x (%s), who = [%x]\n", 
+                      op, getOpName(op), who)); 
 
+  do {
     while((p = pvm_recv(ANY_TASK,ANY_OPCODE)) < 0)
       pvm_perror("waitForPEOp: Waiting for PEOp");
       
     pvm_bufinfo( p, &nbytes, &opCode, &sender_id );
-    IF_PAR_DEBUG(verbose,
-                fprintf(stderr,"waitForPEOp: received: OpCode = %x, sender_id = %x",
-                      opCode, getOpName(opCode), sender_id)); 
-
     match = (op == ANY_OPCODE || op == opCode) && 
             (who == ANY_TASK || who == sender_id);
 
-    if (match)
+    if (match) {
+      IF_PAR_DEBUG(verbose,
+                  fprintf(stderr,
+                          "~~waitForPEOp: Qapla! received: OpCode = %#x (%s), sender_id = [%x]",
+                          opCode, getOpName(opCode), sender_id)); 
+
       return(p);
+    }
 
     /* Handle the unexpected OpCodes */
-    processUnexpected(p);
+    if (processUnexpected!=NULL) {
+      (*processUnexpected)(p);
+    } else {
+      IF_PAR_DEBUG(verbose,
+                  fprintf(stderr,
+                          "~~ waitForPEOp: ignoring OpCode = %#x (%s), sender_id = [%x]",
+                          opCode, getOpName(opCode), sender_id)); 
+    }
 
   } while(rtsTrue);
 }
 
 /*
- * processUnexpected processes unexpected messages. If the message is a
- * FINISH it exits the prgram, and PVM gracefully
+  processUnexpected processes unexpected messages. If the message is a
+  FINISH it exits the prgram, and PVM gracefully
  */
-//@cindex processUnexpected
+//@cindex processUnexpectedMessage
 void
-processUnexpected(rtsPacket packet)
-{
+processUnexpectedMessage(rtsPacket packet) {
     OpCode opCode = getOpcode(packet);
 
     IF_PAR_DEBUG(verbose,
                 GlobalTaskId sender = senderTask(packet); 
-                fprintf(stderr,"== [%x] processUnexpected: Received %x (%s), sender %x\n",
+                fprintf(stderr,"~~ [%x] processUnexpected: Received %x (%s), sender %x\n",
                       mytid, opCode, getOpName(opCode), sender)); 
 
     switch (opCode) {
@@ -335,12 +362,13 @@ processUnexpected(rtsPacket packet)
         are discarded during termination -- this helps prevent bizarre
         race conditions.  */
       default:
-       if (!GlobalStopPending) {
+       // if (!GlobalStopPending) 
+        {
          GlobalTaskId errorTask;
          OpCode opCode;
 
-         getOpcodeAndSender(packet,&opCode,&errorTask);
-         fprintf(stderr,"Task %x: Unexpected OpCode %x from %x in processUnexpected",
+         getOpcodeAndSender(packet, &opCode, &errorTask);
+         fprintf(stderr,"== Task %x: Unexpected OpCode %x from %x in processUnexpected",
                mytid, opCode, errorTask );
             
          stg_exit(EXIT_FAILURE);
@@ -355,7 +383,9 @@ getOpcode(rtsPacket p)
   int nbytes;
   OpCode OpCode;
   GlobalTaskId sender_id;
+  /* read PVM buffer */
   pvm_bufinfo(p, &nbytes, &OpCode, &sender_id);
+  /* return tag of the buffer as opcode */
   return(OpCode);
 }
 
@@ -364,6 +394,7 @@ void
 getOpcodeAndSender(rtsPacket p, OpCode *opCodep, GlobalTaskId *senderIdp)
 {
   int nbytes;
+  /* read PVM buffer */
   pvm_bufinfo(p, &nbytes, opCodep, senderIdp);
 }
 
@@ -374,66 +405,29 @@ senderTask(rtsPacket p)
   int nbytes;
   OpCode opCode;
   GlobalTaskId sender_id;
+  /* read PVM buffer */
   pvm_bufinfo(p, &nbytes, &opCode, &sender_id);
   return(sender_id);
 }
 
 /*
- * PEStartUp does the low-level comms specific startup stuff for a
- * PE. It initialises the comms system, joins the appropriate groups,
- * synchronises with the other PEs. Receives and records in a global
- * variable the task-id of SysMan. If this is the main thread (discovered
- * in main.lc), identifies itself to SysMan. Finally it receives
- * from SysMan an array of the Global Task Ids of each PE, which is
- * returned as the value of the function.
+ * startUpPE does the low-level comms specific startup stuff for a
+ * PE. It initialises the comms system, joins the appropriate groups
+ * allocates the PE buffer
  */
 
 //@cindex startUpPE
-GlobalTaskId *
-startUpPE(nat nPEs)
-{
-  int i;
-  rtsPacket addr;
-  long *buffer = (long *) stgMallocBytes(sizeof(long) * nPEs, 
-                                        "PEStartUp (buffer)");
-  GlobalTaskId *thePEs = (GlobalTaskId *) 
-    stgMallocBytes(sizeof(GlobalTaskId) * nPEs, 
-                  "PEStartUp (PEs)");
-
+void
+startUpPE(void)
+{ 
   mytid = _my_gtid;    /* Initialise PVM and get task id into global var.*/
-
+  
   IF_PAR_DEBUG(verbose,
               fprintf(stderr,"== [%x] PEStartup: Task id = [%x], No. PEs = %d \n", 
                       mytid, mytid, nPEs));
   checkComms(pvm_joingroup(PEGROUP), "PEStartup");
   IF_PAR_DEBUG(verbose,
               fprintf(stderr,"== [%x] PEStartup: Joined PEGROUP\n", mytid));
-  checkComms(pvm_joingroup(PECTLGROUP), "PEStartup");
-  IF_PAR_DEBUG(verbose,
-              fprintf(stderr,"== [%x] PEStartup: Joined PECTLGROUP\n", mytid));
-  checkComms(pvm_barrier(PECTLGROUP, nPEs+1), "PEStartup");
-  IF_PAR_DEBUG(verbose,
-              fprintf(stderr,"== [%x] PEStartup, Passed PECTLGROUP barrier\n", mytid));
-
-  addr = waitForPEOp(PP_SYSMAN_TID, ANY_GLOBAL_TASK);
-  SysManTask = senderTask(addr);
-  if (IAmMainThread) {         /* Main Thread Identifies itself to SysMan */
-    pvm_initsend(PvmDataDefault);
-    pvm_send(SysManTask, PP_MAIN_TASK);
-  } 
-  IF_PAR_DEBUG(verbose,
-              fprintf(stderr,"== [%x] Thread waits for %s\n", 
-                      mytid, getOpName(PP_PETIDS)));
-  addr = waitForPEOp(PP_PETIDS, ANY_GLOBAL_TASK);
-  GetArgs(buffer, nPEs);
-  for (i = 0; i < nPEs; ++i) {
-    thePEs[i] = (GlobalTaskId) buffer[i];
-    IF_PAR_DEBUG(verbose,
-                fprintf(stderr,"== [%x] PEStartup: PEs[%d] = %x \n", 
-                        mytid, i, thePEs[i])); 
-  }
-  free(buffer);
-  return thePEs;
 }
 
 /*
@@ -448,10 +442,28 @@ shutDownPE(void)
               fprintf(stderr, "== [%x] PEshutdown\n", mytid));
 
   checkComms(pvm_lvgroup(PEGROUP),"PEShutDown");
-  checkComms(pvm_lvgroup(PECTLGROUP),"PEShutDown");
   checkComms(pvm_exit(),"PEShutDown");
 }
 
+/* 
+   Extract the exit code out of a PP_FINISH packet (used in SysMan)
+*/
+int
+getExitCode(int nbytes, GlobalTaskId *sender_idp) {
+  int exitCode=0;
+
+  if (nbytes==4) {               // Notification from a task doing pvm_exit
+    GetArgs(sender_idp,1);       // Presumably this must be MainPE Id
+    exitCode = -1;
+  } else if (nbytes==8) {        // Doing a controlled shutdown
+    GetArgs(&exitCode,1);        // HACK: controlled shutdown == 2 values
+    GetArgs(&exitCode,1);
+  } else {
+    exitCode = -2;               // everything else
+  }
+  return exitCode;
+}
+
 #endif /* PAR -- whole file */
 
 //@node Index,  , Auxiliary functions, GUM Low-Level Inter-Task Communication
@@ -467,7 +479,7 @@ shutDownPE(void)
 //* sendOpNV::  @cindex\s-+sendOpNV
 //* sendOpN::  @cindex\s-+sendOpN
 //* waitForPEOp::  @cindex\s-+waitForPEOp
-//* processUnexpected::  @cindex\s-+processUnexpected
+//* processUnexpectedMessage::  @cindex\s-+processUnexpectedMessage
 //* getOpcode::  @cindex\s-+getOpcode
 //* getOpcodeAndSender::  @cindex\s-+getOpcodeAndSender
 //* senderTask::  @cindex\s-+senderTask