[project @ 2003-08-20 12:55:14 by simonmar]
[ghc-hetmet.git] / ghc / rts / parallel / HLComms.c
index e4cb026..9435536 100644 (file)
@@ -1,16 +1,15 @@
 /* ----------------------------------------------------------------------------
- * Time-stamp: <Wed Mar 29 2000 19:35:36 Stardate: [-30]4578.87 hwloidl>
- * $Id: HLComms.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
+ * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
+ * $Id: HLComms.c,v 1.6 2001/08/14 13:40:10 sewardj Exp $
  *
  * High Level Communications Routines (HLComms.lc)
  *
  * Contains the high-level routines (i.e. communication
  * subsystem independent) used by GUM
  * 
- * Phil Trinder, Glasgow University, 12 December 1994
- * Adapted for new RTS
- * Phil Trinder, Simon Marlow July 1998
- * H-W. Loidl, Heriot-Watt University, November 1999
+ * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
+ * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
+ * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
  * 
  * ------------------------------------------------------------------------- */
 
@@ -32,9 +31,8 @@
 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
 //@subsection Macros etc
 
-# ifndef _AIX
-# define NON_POSIX_SOURCE /* so says Solaris */
-# endif
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
 
 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
 //@subsection Includes
 #include "Parallel.h"
 #include "GranSimRts.h"
 #include "ParallelRts.h"
+#include "Sparks.h"
 #include "FetchMe.h"     // for BLOCKED_FETCH_info etc
 #if defined(DEBUG)
 # include "ParallelDebug.h"
 #endif
 #include "StgMacros.h" // inlined IS_... fcts
 
+#ifdef DIST
+#include "SchedAPI.h" //for createIOThread
+extern unsigned int context_switch; 
+#endif /* DIST */
+
 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
 //@subsection GUM Message Sending and Unpacking Functions
 
@@ -97,8 +101,7 @@ sendFetch(globalAddr *rga, globalAddr *lga, int load)
 {
   ASSERT(rga->weight > 0 && lga->weight > 0);
   IF_PAR_DEBUG(fetch,
-              belch("** [%x] Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
-                    mytid,
+              belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
                     rga->payload.gc.gtid, rga->payload.gc.slot, 
                     lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
                     load));
@@ -130,8 +133,7 @@ unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
   GetArgs(buf, 6); 
 
   IF_PAR_DEBUG(fetch,
-              belch("** [%x] Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
-                    mytid,
+              belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
                     (GlobalTaskId) buf[0], (int) buf[1], 
                     (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
 
@@ -166,16 +168,22 @@ unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
 
 //@cindex sendResume
 void
-sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
+sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
 {
-  IF_PAR_DEBUG(resume,
-              PrintPacket(data);
-              belch("[] [%x] Sending Resume for ((%x, %d, %x))", 
-                    mytid,
-                    rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight));
+  IF_PAR_DEBUG(fetch,
+              belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]", 
+                    packBuffer->id, nelem,
+                    rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
+                    rga->payload.gc.gtid));
+  IF_PAR_DEBUG(packet,
+              PrintPacket(packBuffer));
+
+  ASSERT(nelem==packBuffer->size);
+  /* check for magic end-of-buffer word */
+  IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
 
   sendOpNV(PP_RESUME, rga->payload.gc.gtid, 
-          nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data, 
+          nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer, 
           2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
 }
 
@@ -186,17 +194,12 @@ sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
 
 //@cindex unpackResume
 static void
-unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
+unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
 {
     long buf[3];
 
     GetArgs(buf, 3); 
 
-    IF_PAR_DEBUG(resume,
-                belch("[] [%x] Unpacking Resume for ((%x, %d, %x))", 
-                      mytid, mytid,
-                      (int) buf[1], (unsigned) buf[0]));
-
     /*
       RESUME event is written in awaken_blocked_queue
     DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid), 
@@ -207,9 +210,15 @@ unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
     lga->payload.gc.gtid = mytid;
     lga->payload.gc.slot = (int) buf[1];
 
-    *nelem = (int) buf[2]; // includes PACK_BUFFER_HDR_SIZE;
-    GetArgs(data, *nelem);
-    *nelem -= PACK_BUFFER_HDR_SIZE;
+    *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
+    GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
+
+    IF_PAR_DEBUG(fetch,
+                belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))", 
+                      packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
+
+    /* check for magic end-of-buffer word */
+    IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
 }
 
 /*
@@ -235,6 +244,9 @@ sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
   long *p;
   int i;
 
+  if(ngas==0)
+    return; //don't send unnecessary messages!!
+  
   buffer = (long *) gumPackBuffer;
 
   for(i = 0, p = buffer; i < ngas; i++, p += 6) {
@@ -249,10 +261,10 @@ sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
     gagamap++;
   }
   IF_PAR_DEBUG(schedule,
-              belch(",, [%x] Sending Ack (%d pairs) to PE %x\n", 
-                    mytid, ngas, task));
+              belch("~^,, Sending Ack (%d pairs) to [%x]\n", 
+                    ngas, task));
 
-  sendOpN(PP_ACK, task, p - buffer, buffer);
+  sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
 }
 
 /*
@@ -273,8 +285,8 @@ unpackAck(int *ngas, globalAddr *gagamap)
   *ngas = GAarraysize / 6;
   
   IF_PAR_DEBUG(schedule,
-              belch(",, [%x] Unpacking Ack (%d pairs) on %x\n", 
-                    mytid, *ngas, mytid));
+              belch("~^,, Unpacking Ack (%d pairs) on [%x]\n", 
+                    *ngas, mytid));
 
   while (GAarraysize > 0) {
     GetArgs(buf, 6);
@@ -310,8 +322,8 @@ sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
         int age, int history, int hunger)
 {
   IF_PAR_DEBUG(fish,
-              belch("$$ [%x] Sending Fish to %x (%d outstanding fishes)", 
-                    mytid, destPE, outstandingFishes));
+              belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)", 
+                    destPE, outstandingFishes));
 
   sendOpV(PP_FISH, destPE, 4, 
          (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
@@ -338,8 +350,8 @@ unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
   GetArgs(buf, 4);
   
   IF_PAR_DEBUG(fish,
-              belch("$$ [%x] Unpacking Fish from PE %x (age=%d)", 
-                    mytid, (GlobalTaskId) buf[0], (int) buf[1]));
+              belch("~^$$ Unpacking Fish from [%x] (age=%d)", 
+                    (GlobalTaskId) buf[0], (int) buf[1]));
 
   *origPE = (GlobalTaskId) buf[0];
   *age = (int) buf[1];
@@ -364,8 +376,8 @@ void
 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
 {
     IF_PAR_DEBUG(free,
-                belch("!! [%x] Sending Free (%d GAs) to %x", 
-                      mytid, nelem/2, pe));
+                belch("~^!! Sending Free (%d GAs) to [%x]", 
+                      nelem/2, pe));
 
     sendOpN(PP_FREE, pe, nelem, data);
 }
@@ -376,7 +388,7 @@ sendFree(GlobalTaskId pe, int nelem, StgPtr data)
  */
 //@cindex unpackFree
 static void
-unpackFree(int *nelem, rtsPackBuffer *data)
+unpackFree(int *nelem, StgWord *data)
 {
   long buf[1];
   
@@ -384,8 +396,8 @@ unpackFree(int *nelem, rtsPackBuffer *data)
   *nelem = (int) buf[0];
 
   IF_PAR_DEBUG(free,
-              belch("!! [%x] Unpacking Free (%d GAs)", 
-                    mytid, *nelem/2));
+              belch("~^!! Unpacking Free (%d GAs)", 
+                    *nelem/2));
 
   GetArgs(data, *nelem);
 }
@@ -406,14 +418,20 @@ unpackFree(int *nelem, rtsPackBuffer *data)
  */
 //@cindex sendSchedule
 void
-sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data)
+sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
 {
   IF_PAR_DEBUG(schedule,
-              PrintPacket(data);
-              belch("-- [%x] Sending Schedule (%d elems) to %x\n", 
-                    mytid, nelem, origPE));
+              belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n", 
+                    packBuffer->id, nelem, origPE));
+  IF_PAR_DEBUG(packet,
+              PrintPacket(packBuffer));
 
-  sendOpN(PP_SCHEDULE, origPE, nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data);
+  ASSERT(nelem==packBuffer->size);
+  /* check for magic end-of-buffer word */
+  IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+  sendOpN(PP_SCHEDULE, origPE, 
+         nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
 }
 
 /*
@@ -424,22 +442,75 @@ sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data
 
 //@cindex unpackSchedule
 static void
-unpackSchedule(int *nelem, rtsPackBuffer *data)
+unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
 {
-    long buf[1];
+  long buf[1];
+
+  /* first, just unpack 1 word containing the total size (including header) */
+  GetArgs(buf, 1);
+  /* no. of elems, not counting the header of the pack buffer */
+  *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
 
-    GetArgs(buf, 1);
-    /* no. of elems, not counting the header of the pack buffer */
-    *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE;
+  /* automatic cast of flat pvm-data to rtsPackBuffer */
+  GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
 
-    IF_PAR_DEBUG(schedule,
-                belch("-- [%x] Unpacking Schedule (%d elems) on %x\n", 
-                      mytid, *nelem));
+  IF_PAR_DEBUG(schedule,
+              belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n", 
+                    packBuffer->id, *nelem, mytid));
 
-    /* automatic cast of flat pvm-data to rtsPackBuffer */
-    GetArgs(data, *nelem + PACK_BUFFER_HDR_SIZE);
+  ASSERT(*nelem==packBuffer->size);
+  /* check for magic end-of-buffer word */
+  IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
 }
 
+#ifdef DIST
+/* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
+void
+sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer) 
+{  
+  IF_PAR_DEBUG(schedule,
+              belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n", 
+                    packBuffer->id, nelem, origPE));
+  IF_PAR_DEBUG(packet,
+              PrintPacket(packBuffer));
+
+  ASSERT(nelem==packBuffer->size);
+  /* check for magic end-of-buffer word */
+  IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+  sendOpN(PP_REVAL, origPE, 
+         nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
+}
+
+void FinishReval(StgTSO *t)
+{ StgClosure *res;
+  globalAddr ga;
+  nat size;
+  rtsPackBuffer *buffer=NULL;
+  
+  ga.payload.gc.slot = t->revalSlot;
+  ga.payload.gc.gtid = t->revalTid;
+  ga.weight = 0; 
+  
+  //find where the reval result is
+  res = GALAlookup(&ga);
+  ASSERT(res);
+  
+  IF_PAR_DEBUG(schedule,
+    printGA(&ga);
+    belch(" needs the result %08x\n",res));       
+  
+  //send off the result
+  buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
+  ASSERT(buffer != (rtsPackBuffer *)NULL);
+  sendResume(&ga, size, buffer);
+
+  IF_PAR_DEBUG(schedule,
+    belch("@;~) Reval Finished"));
+}
+
+#endif /* DIST */
+
 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
 //@subsection Message-Processing Functions
 
@@ -522,6 +593,12 @@ processFetches(void) {
       
       sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
 
+      // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_fetch_mess++;
+      }
+
       IF_PAR_DEBUG(fetch,
                   belch("__-> processFetches: Forwarding fetch from %lx to %lx",
                         mytid, rga.payload.gc.gtid));
@@ -541,17 +618,17 @@ processFetches(void) {
                   belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
                         closure, info_type(closure)));
 
-      if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
+      if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
        // Put current BF back on list
        bf->link = (StgBlockingQueueElement *)PendingFetches;
        PendingFetches = (StgBlockedFetch *)bf;
        // ToDo: check that nothing more has to be done to prepare for GC!
        barf("processFetches: out of heap while packing graph; ToDo: call GC here");
-       GarbageCollect(GetRoots); 
+       GarbageCollect(GetRoots, rtsFalse); 
        bf = PendingFetches;
        PendingFetches = (StgBlockedFetch *)(bf->link);
        closure = bf->node;
-       packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
+       packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
        ASSERT(packBuffer != (rtsPackBuffer *)NULL);
       }
       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
@@ -559,6 +636,12 @@ processFetches(void) {
       rga.weight = bf->ga.weight;
       
       sendResume(&rga, size, packBuffer);
+
+      // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_resume_mess++;
+      }
     }
   }
   PendingFetches = END_BF_QUEUE;
@@ -593,6 +676,36 @@ processTheRealFetches(void) {
 #endif
 
 
+/* 
+   Way of dealing with unwanted fish.
+   Used during startup/shutdown, or from unknown PEs 
+*/
+void
+bounceFish(void) { 
+  GlobalTaskId origPE;
+  int age, history, hunger;
+  
+  /* IF_PAR_DEBUG(verbose, */
+              belch(".... [%x] Bouncing unwanted FISH",mytid);
+
+  unpackFish(&origPE, &age, &history, &hunger);
+         
+  if (origPE == mytid) {
+    //fishing = rtsFalse;                   // fish has come home
+    outstandingFishes--;
+    last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
+    return;                               // that's all
+  }
+
+  /* otherwise, send it home to die */
+  sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+  // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_fish_mess++;
+      }
+}
+   
 /*
  * processFish unpacks a fish message, reissuing it if it's our own,
  * sending work if we have it or sending it onwards otherwise.
@@ -619,26 +732,36 @@ processFish(void)
   IF_PAR_DEBUG(fish,
               belch("$$__ processing fish; %d sparks available",
                     spark_queue_len(&(MainRegTable.rSparks))));
-  while ((spark = findSpark()) != NULL) {
+  while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
     nat size;
     // StgClosure *graph;
 
     packBuffer = gumPackBuffer; 
     ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
-    if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size)) == NULL) {
+    if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
       IF_PAR_DEBUG(fish,
                   belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
                         (StgClosure *)spark));
       barf("processFish: out of heap while packing graph; ToDo: call GC here");
-      GarbageCollect(GetRoots);
+      GarbageCollect(GetRoots, rtsFalse);
       /* Now go back and try again */
     } else {
+      IF_PAR_DEBUG(verbose,
+                  if (RtsFlags.ParFlags.ParStats.Sparks)
+                    belch("==== STEALING spark %x; sending to %x", spark, origPE));
+      
       IF_PAR_DEBUG(fish,
                   belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
                         origPE, 
                         (StgClosure *)spark, info_type((StgClosure *)spark)));
       sendSchedule(origPE, size, packBuffer);
       disposeSpark(spark);
+      // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_schedule_mess++;
+      }
+
       break;
     }
   }
@@ -647,15 +770,25 @@ processFish(void)
                 belch("$$^^ No sparks available for FISH from %x",
                       origPE));
     /* We have no sparks to give */
-    if (age < FISH_LIFE_EXPECTANCY)
+    if (age < FISH_LIFE_EXPECTANCY) {
       /* and the fish is atill young, send it to another PE to look for work */
       sendFish(choosePE(), origPE,
               (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
 
-    /* otherwise, send it home to die */
-    else
+      // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_fish_mess++;
+      }
+    } else { /* otherwise, send it home to die */
       sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+      // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_fish_mess++;
+      }
     }
+  }
 }  /* processFish */
 
 /*
@@ -685,13 +818,19 @@ processFetch(void)
   if (ip->type == FETCH_ME) {
     /* Forward the Fetch to someone else */
     sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
+
+    // Global statistics: count no. of fetches
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+      globalParStats.tot_fetch_mess++;
+    }
   } else if (rga.payload.gc.gtid == mytid) {
     /* Our own FETCH forwarded back around to us */
     StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
     
     IF_PAR_DEBUG(fetch,
                 belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
-                      closure, info_type(closure), fmbq, info_type(fmbq)));
+                      closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
     /* We may have already discovered that the fetch target is our own. */
     if ((StgClosure *)fmbq != closure) 
       CommonUp((StgClosure *)fmbq, closure);
@@ -700,33 +839,65 @@ processFetch(void)
     /* This includes RBH's and FMBQ's */
     StgBlockedFetch *bf;
 
+    /* Can we assert something on the remote GA? */
     ASSERT(GALAlookup(&rga) == NULL);
 
     /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
        closure into the BQ in order to denote that when updating this node
        the result should be sent to the originator of this fetch message. */
     bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
-    blockFetch(bf, closure);
-
     IF_PAR_DEBUG(fetch,
                 belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
                       rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, 
                       closure, info_type(closure)));
-    } else {                   
-      /* The target of the FetchMe is some local graph */
-      nat size;
-      // StgClosure *graph;
-      rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
-
-      if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
-       barf("processFetch: out of heap while packing graph; ToDo: call GC here");
-       GarbageCollect(GetRoots); 
-       closure = GALAlookup(&ga);
-       buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
-       ASSERT(buffer != (rtsPackBuffer *)NULL);
-      }
-      sendResume(&rga, size, buffer);
+    blockFetch(bf, closure);
+  } else {                     
+    /* The target of the FetchMe is some local graph */
+    nat size;
+    // StgClosure *graph;
+    rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
+
+    if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
+      barf("processFetch: out of heap while packing graph; ToDo: call GC here");
+      GarbageCollect(GetRoots, rtsFalse); 
+      closure = GALAlookup(&ga);
+      buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
+      ASSERT(buffer != (rtsPackBuffer *)NULL);
     }
+    sendResume(&rga, size, buffer);
+
+    // Global statistics: count no. of fetches
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+      globalParStats.tot_resume_mess++;
+    }
+  }
+}
+
+/* 
+   The list of pending fetches must be a root-list for GC.
+   This routine is called from GC.c (same as marking GAs etc).
+*/
+void
+markPendingFetches(rtsBool major_gc) {
+
+  /* No need to traverse the list; this is done via the scavenge code
+     for a BLOCKED_FETCH closure, which evacuates the link field */
+
+  if (PendingFetches != END_BF_QUEUE ) {
+    IF_PAR_DEBUG(tables,
+                fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
+                        PendingFetches));
+
+    PendingFetches = MarkRoot((StgClosure*)PendingFetches);
+
+    IF_PAR_DEBUG(verbose,
+                fprintf(stderr, " %p\n", PendingFetches));
+
+  } else {
+    IF_PAR_DEBUG(tables,
+                fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
+  }
 }
 
 /*
@@ -778,13 +949,14 @@ processResume(GlobalTaskId sender)
   globalAddr lga;
   globalAddr *gagamap;
   
-  packBuffer = gumPackBuffer;
-  unpackResume(&lga, &nelem, (StgPtr)packBuffer);
+  packBuffer = (rtsPackBuffer *)gumPackBuffer;
+  unpackResume(&lga, &nelem, packBuffer);
 
-  IF_PAR_DEBUG(resume,
+  IF_PAR_DEBUG(fetch,
               fprintf(stderr, "[]__ Rcvd Resume for "); 
               printGA(&lga);
-              fputc('\n', stderr);
+              fputc('\n', stderr));
+  IF_PAR_DEBUG(packet,
               PrintPacket((rtsPackBuffer *)packBuffer));
   
   /* 
@@ -806,21 +978,40 @@ processResume(GlobalTaskId sender)
 
   old = GALAlookup(&lga);
 
+  /* ToDo:  The closure that requested this graph must be one of these two?*/
+  ASSERT(get_itbl(old)->type == FETCH_ME_BQ || 
+        get_itbl(old)->type == RBH);
+
   if (RtsFlags.ParFlags.ParStats.Full) {
-    // StgTSO *tso = END_TSO_QUEUE;
-    StgBlockingQueueElement *bqe;
+    StgBlockingQueueElement *bqe, *last_bqe;
+
+    IF_PAR_DEBUG(fetch,
+                belch("[]-- Resume is REPLY to closure %lx", old));
 
     /* Write REPLY events to the log file, indicating that the remote
-       data has arrived */
-    if (get_itbl(old)->type == FETCH_ME_BQ ||
-       get_itbl(old)->type == RBH) 
-      for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue;
-          bqe->link != END_BQ_QUEUE;
-          bqe = bqe->link)
-       if (get_itbl((StgClosure *)bqe)->type == TSO)
-         DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
-                          GR_REPLY, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
-                          0, spark_queue_len(&(MainRegTable.rSparks)));
+       data has arrived 
+       NB: we emit a REPLY only for the *last* elem in the queue; this is
+           the one that triggered the fetch message; all other entries
+          have just added themselves to the queue, waiting for the data 
+          they know that has been requested (see entry code for FETCH_ME_BQ)
+    */
+    if ((get_itbl(old)->type == FETCH_ME_BQ ||
+        get_itbl(old)->type == RBH)) {
+      for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
+          last_bqe = END_BQ_QUEUE;
+            get_itbl(bqe)->type==TSO || 
+            get_itbl(bqe)->type==BLOCKED_FETCH;
+          last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
+
+      ASSERT(last_bqe==END_BQ_QUEUE || 
+            get_itbl((StgClosure *)last_bqe)->type == TSO);
+
+      /* last_bqe now points to the TSO that triggered the FETCH */ 
+      if (get_itbl((StgClosure *)last_bqe)->type == TSO)
+       DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
+                        GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
+                        0, spark_queue_len(&(MainRegTable.rSparks)));
+    }
   }
 
   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
@@ -834,6 +1025,10 @@ processResume(GlobalTaskId sender)
   if (get_itbl(old)->type == FETCH_ME_BQ)
     CommonUp(old, newGraph);
 
+  IF_PAR_DEBUG(fetch,
+              belch("[]-- Ready to resume unpacked graph at %p (%s)",
+                    newGraph, info_type(newGraph)));
+
   IF_PAR_DEBUG(tables,
               DebugPrintGAGAMap(gagamap, nGAs));
   
@@ -850,7 +1045,7 @@ processResume(GlobalTaskId sender)
 static void
 processSchedule(GlobalTaskId sender)
 {
-  nat nelem, space_required, nGAs;
+  nat nelem, nGAs;
   rtsBool success;
   static rtsPackBuffer *packBuffer;
   StgClosure *newGraph;
@@ -859,8 +1054,9 @@ processSchedule(GlobalTaskId sender)
   packBuffer = gumPackBuffer;          /* HWL */
   unpackSchedule(&nelem, packBuffer);
 
+  IF_PAR_DEBUG(schedule,
+              belch("--__ Rcvd Schedule (%d elems)", nelem));
   IF_PAR_DEBUG(packet,
-              belch("--__ Rcvd Schedule (%d elems)", nelem);
               PrintPacket(packBuffer));
 
   /*
@@ -881,13 +1077,21 @@ processSchedule(GlobalTaskId sender)
   ASSERT(newGraph != NULL);
   success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
 
-  IF_PAR_DEBUG(packet,
+  if (RtsFlags.ParFlags.ParStats.Full && 
+      RtsFlags.ParFlags.ParStats.Sparks && 
+      success) 
+    DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
+                    GR_STOLEN, ((StgTSO *)NULL), newGraph, 
+                    0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+
+  IF_PAR_DEBUG(schedule,
               if (success)
-                belch("--^^ added spark to unpacked graph %p; %d sparks available on [%x]", 
-                    newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
+                belch("--^^  added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)", 
+                    newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
               else
-                 belch("--^^ received non-sparkable closure %p; nothing added to spark pool; %d sparks available on [%x]", 
-                    newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
+                 belch("--^^  received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]", 
+                    newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
+  IF_PAR_DEBUG(packet,
               belch("*<    Unpacked graph with root at %p (%s):", 
                     newGraph, info_type(newGraph));
               PrintGraph(newGraph, 0));
@@ -895,8 +1099,7 @@ processSchedule(GlobalTaskId sender)
   IF_PAR_DEBUG(tables,
               DebugPrintGAGAMap(gagamap, nGAs));
 
-  if (nGAs > 0)
-    sendAck(sender, nGAs, gagamap);
+  sendAck(sender, nGAs, gagamap);
 
   //fishing = rtsFalse;
   ASSERT(outstandingFishes>0);
@@ -946,7 +1149,7 @@ processAck(void)
         ASSERT(get_itbl(old_closure)==RBH);
       */
       if (get_itbl(old_closure)->type==RBH)
-       convertToFetchMe(old_closure, ga);
+       convertToFetchMe((StgRBH *)old_closure, ga);
     } else {
       /* 
        * Oops...we've got this one already; update the RBH to
@@ -968,6 +1171,71 @@ processAck(void)
   IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
 }
 
+#ifdef DIST
+
+void
+bounceReval(void) {  
+  barf("Task %x: TODO: should send NACK in response to REVAL",mytid);    
+}
+
+static void
+processReval(GlobalTaskId sender) //similar to schedule...
+{ nat nelem, space_required, nGAs;
+  static rtsPackBuffer *packBuffer;
+  StgClosure *newGraph;
+  globalAddr *gagamap;
+  StgTSO*     tso;
+  globalAddr *ga;
+  
+  packBuffer = gumPackBuffer;          /* HWL */
+  unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
+
+  IF_PAR_DEBUG(packet,
+              belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
+              PrintPacket(packBuffer));
+
+  /*
+  space_required = packBuffer[0];
+  if (SAVE_Hp + space_required >= SAVE_HpLim) {
+    ReallyPerformThreadGC(space_required, rtsFalse);
+    SAVE_Hp -= space_required;
+  }
+  */
+  
+  // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
+  newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
+  ASSERT(newGraph != NULL);
+  
+  IF_PAR_DEBUG(packet,
+              belch("@;~)  Unpacked graph with root at %p (%s):", 
+                    newGraph, info_type(newGraph));
+              PrintGraph(newGraph, 0));
+
+  IF_PAR_DEBUG(tables,
+              DebugPrintGAGAMap(gagamap, nGAs));
+
+  IF_PAR_DEBUG(tables, 
+    printLAGAtable();   
+    DebugPrintGAGAMap(gagamap, nGAs));   
+
+  //We don't send an Ack to the head!!!!
+  ASSERT(nGAs>0);  
+  sendAck(sender, nGAs-1, gagamap+2);
+  
+  IF_PAR_DEBUG(verbose,
+              belch("@;~)  About to create Reval thread on behalf of %x", 
+                    sender));
+  
+  tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
+  tso->priority=RevalPriority;
+  tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
+  tso->revalTid =gagamap->payload.gc.gtid;
+  scheduleThread(tso);
+  context_switch = 1; // switch at the earliest opportunity
+} 
+#endif
+
+
 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
 //@subsection GUM Message Processor
 
@@ -982,58 +1250,125 @@ processAck(void)
  */
 
 //@cindex processMessages
-void
+rtsBool
 processMessages(void)
 {
   rtsPacket packet;
   OpCode opcode;
   GlobalTaskId task;
-    
+  rtsBool receivedFinish = rtsFalse;
+
   do {
     packet = GetPacket();  /* Get next message; block until one available */
     getOpcodeAndSender(packet, &opcode, &task);
 
-    switch (opcode) {
-    case PP_FINISH:
-      IF_PAR_DEBUG(verbose,
-                  belch("==== received FINISH [%p]", mytid));
-      /* setting this global variables eventually terminates the main
-         scheduling loop for this PE and causes a shut-down, sending 
-        PP_FINISH to SysMan */
-      GlobalStopPending = rtsTrue;
-      break;
-
-    case PP_FETCH:
-      processFetch();
-      break;
-
-    case PP_RESUME:
-      processResume(task);
-      break;
-
-    case PP_ACK:
-      processAck();
-      break;
-
-    case PP_FISH:
-      processFish();
-      break;
-
-    case PP_FREE:
-      processFree();
-      break;
+    if (task==SysManTask) { 
+      switch (opcode) { 
+      case PP_PETIDS:
+       processPEtids();
+       break;
+         
+      case PP_FINISH:
+       IF_PAR_DEBUG(verbose,
+                    belch("==== received FINISH [%p]", mytid));
+       /* this boolean value is returned and propagated to the main 
+          scheduling loop, thus shutting-down this PE */
+       receivedFinish = rtsTrue;
+       break;  
+         
+      default:  
+       barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
+      }
+    } else if (taskIDtoPE(task)==0) { 
+      /* When a new PE joins then potentially FISH & REVAL message may
+        reach PES before they are notified of the new PEs existance.  The
+        only solution is to bounce/fail these messages back to the sender.
+        But we will worry about it once we start seeing these race
+        conditions!  */
+      switch (opcode) { 
+      case PP_FISH:
+       bounceFish();
+       break;
+#ifdef DIST      
+      case PP_REVAL:
+       bounceReval();
+       break;    
+#endif          
+      case PP_PETIDS:
+       belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
+       break;
+        
+      case PP_FINISH:   
+       break;
+       
+      default:  
+       belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
+      }
+    } else
+      switch (opcode) {
+      case PP_FETCH:
+       processFetch();
+       // Global statistics: count no. of fetches
+       if (RtsFlags.ParFlags.ParStats.Global &&
+           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+         globalParStats.rec_fetch_mess++;
+       }
+       break;
+
+      case PP_RESUME:
+       processResume(task);
+       // Global statistics: count no. of fetches
+       if (RtsFlags.ParFlags.ParStats.Global &&
+           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+         globalParStats.rec_resume_mess++;
+       }
+       break;
+
+      case PP_ACK:
+       processAck();
+       break;
+
+      case PP_FISH:
+       processFish();
+       // Global statistics: count no. of fetches
+       if (RtsFlags.ParFlags.ParStats.Global &&
+           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+         globalParStats.rec_fish_mess++;
+       }
+       break;
+
+      case PP_FREE:
+       processFree();
+       break;
       
-    case PP_SCHEDULE:
-      processSchedule(task);
-      break;
-    
-    default:
-      /* Anything we're not prepared to deal with. */
-      barf("Task %x: Unexpected opcode %x from %x",
-          mytid, opcode, task);
-    } /* switch */
+      case PP_SCHEDULE:
+       processSchedule(task);
+       // Global statistics: count no. of fetches
+       if (RtsFlags.ParFlags.ParStats.Global &&
+           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+         globalParStats.rec_schedule_mess++;
+       }
+       break;
+      
+#ifdef DIST      
+      case PP_REVAL:
+       processReval(task);
+       // Global statistics: count no. of fetches
+       if (RtsFlags.ParFlags.ParStats.Global &&
+           RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+         globalParStats.rec_reval_mess++;
+       }
+       break;
+#endif
+      
+      default:
+       /* Anything we're not prepared to deal with. */
+       barf("Task %x: Unexpected opcode %x from %x",
+            mytid, opcode, task);
+      } /* switch */
 
   } while (PacketsWaiting());  /* While there are messages: process them */
+  return receivedFinish;
 }                              /* processMessages */
 
 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
@@ -1049,8 +1384,8 @@ blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
   switch (get_itbl(bh)->type) {
   case BLACKHOLE:
     bf->link = END_BQ_QUEUE;
-    //((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
-    SET_INFO(bh, &BLACKHOLE_BQ_info);  // turn closure into a blocking queue
+    //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
+    SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
     
     // put bh on the mutables list
@@ -1089,7 +1424,7 @@ blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
         (StgClosure *)bh, get_itbl((StgClosure *)bh), 
         info_type((StgClosure *)bh));
   }
-  IF_PAR_DEBUG(schedule,
+  IF_PAR_DEBUG(bq,
               belch("##++ blockFetch: after block the BQ of %p (%s) is:",
                     bh, info_type(bh));
               print_bq(bh));
@@ -1097,7 +1432,7 @@ blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
 
 
 /*
-  blockThread is called from the main scheduler whenever tso returns with
+  @blockThread@ is called from the main scheduler whenever tso returns with
   a ThreadBlocked return code; tso has already been added to a blocking
   queue (that's done in the entry code of the closure, because it is a 
   cheap operation we have to do in any case); the main purpose of this
@@ -1114,7 +1449,7 @@ blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
 void
 blockThread(StgTSO *tso)
 {
-  globalAddr *remote_ga;
+  globalAddr *remote_ga=NULL;
   globalAddr *local_ga;
   globalAddr fmbq_ga;
 
@@ -1134,24 +1469,23 @@ blockThread(StgTSO *tso)
         end this point; if something (eg. GC) happens inbetween the whole
         thing will blow up 
         The problem is that the ga field of the FETCH_ME has been overwritten
-        with the head of the blocking (which is tso). 
+        with the head of the blocking queue (which is tso). 
       */
-      //ASSERT(looks_like_ga((globalAddr *)tso->link));
-      ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
-      remote_ga = (globalAddr *)tso->link; // ((StgFetchMe *)tso->block_info.closure)->ga;
-      tso->link = END_BQ_QUEUE;
+      ASSERT(looks_like_ga(&theGlobalFromGA));
+      // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
+      remote_ga = &theGlobalFromGA; //tso->link;
+      tso->link = (StgTSO*)END_BQ_QUEUE;
       /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
         we have to send a Fetch message here! */
       if (RtsFlags.ParFlags.ParStats.Full) {
        /* Note that CURRENT_TIME may perform an unsafe call */
-       //rtsTime now = CURRENT_TIME; /* Now */
        tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
        tso->par.fetchcount++;
        tso->par.blockedat = CURRENT_TIME;
        /* we are about to send off a FETCH message, so dump a FETCH event */
        DumpRawGranEvent(CURRENT_PROC, 
                         taskIDtoPE(remote_ga->payload.gc.gtid),
-                        GR_FETCH, tso, tso->block_info.closure, 0);
+                        GR_FETCH, tso, tso->block_info.closure, 0, 0);
       }
       /* Phil T. claims that this was a workaround for a hard-to-find
        * bug, hence I'm leaving it out for now --SDM 
@@ -1159,10 +1493,18 @@ blockThread(StgTSO *tso)
       /* Assign a brand-new global address to the newly created FMBQ  */
       local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
       splitWeight(&fmbq_ga, local_ga);
-      ASSERT(fmbq_ga.weight == 1L << (BITS_IN(unsigned) - 1));
+      ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
       
       sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
 
+      // Global statistics: count no. of fetches
+      if (RtsFlags.ParFlags.ParStats.Global &&
+         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       globalParStats.tot_fetch_mess++;
+      }
+
+      IF_DEBUG(sanity,
+              theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
       break;
 
     case BlockedOnGA_NoSend:
@@ -1173,42 +1515,58 @@ blockThread(StgTSO *tso)
       /* Fetch message has been sent already */
       if (RtsFlags.ParFlags.ParStats.Full) {
        /* Note that CURRENT_TIME may perform an unsafe call */
-       //rtsTime now = CURRENT_TIME; /* Now */
        tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
        tso->par.blockcount++;
        tso->par.blockedat = CURRENT_TIME;
        /* dump a block event, because fetch has been sent already */
        DumpRawGranEvent(CURRENT_PROC, thisPE,
-                        GR_BLOCK, tso, tso->block_info.closure, 0);
+                        GR_BLOCK, tso, tso->block_info.closure, 0, 0);
       }
       break;
 
+    case BlockedOnMVar:
     case BlockedOnBlackHole:
       /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via 
         BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
-      ASSERT(get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
+      ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
+            get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
             get_itbl(tso->block_info.closure)->type==RBH);
 
       /* if collecting stats update the execution time etc */
       if (RtsFlags.ParFlags.ParStats.Full) {
        /* Note that CURRENT_TIME may perform an unsafe call */
-       //rtsTime now = CURRENT_TIME; /* Now */
        tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
        tso->par.blockcount++;
        tso->par.blockedat = CURRENT_TIME;
        DumpRawGranEvent(CURRENT_PROC, thisPE,
-                        GR_BLOCK, tso, tso->block_info.closure, 0);
+                        GR_BLOCK, tso, tso->block_info.closure, 0, 0);
       }
       break;
-      
+
+    case BlockedOnDelay:
+      /* Whats sort of stats shall we collect for an explicit threadDelay? */
+      IF_PAR_DEBUG(verbose,
+              belch("##++ blockThread: TSO %d blocked on ThreadDelay",
+                    tso->id));
+      break;
+
+    /* Check that the following is impossible to happen, indeed
+    case BlockedOnException:
+    case BlockedOnRead:
+    case BlockedOnWrite:
+    */
     default:
       barf("blockThread: impossible why_blocked code %d for TSO %d",
           tso->why_blocked, tso->id);
   }
 
-  IF_PAR_DEBUG(schedule,
-              belch("##++ blockThread: TSO %d blocked on closure %p (%s)",
-                    tso->id, tso->block_info.closure, info_type(tso->block_info.closure)));
+  IF_PAR_DEBUG(verbose,
+              belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
+                    tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
+                    (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
+  
+  IF_PAR_DEBUG(bq,
+              print_bq(tso->block_info.closure));
 }
 
 /*
@@ -1246,16 +1604,16 @@ createBlockedFetch (globalAddr ga, globalAddr rga)
   StgClosure *closure;
 
   closure = GALAlookup(&ga);
-  if ((bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch))) == NULL) {
+  if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
     barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
-    GarbageCollect(GetRoots); 
+    GarbageCollect(GetRoots, rtsFalse); 
     closure = GALAlookup(&ga);
-    bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch));
+    bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
     // ToDo: check whether really guaranteed to succeed 2nd time around
   }
 
-  ASSERT(bf != (StgClosure *)NULL);
-  SET_INFO((StgClosure *)bf, &BLOCKED_FETCH_info);
+  ASSERT(bf != (StgBlockedFetch *)NULL);
+  SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
   // ToDo: check whether other header info is needed
   bf->node = closure;
   bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
@@ -1265,10 +1623,10 @@ createBlockedFetch (globalAddr ga, globalAddr rga)
 
   IF_PAR_DEBUG(schedule,
               fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
-                      bf, info_type(bf), closure);
+                      bf, info_type((StgClosure*)bf));
               printGA(&(bf->ga));
               fputc('\n',stderr));
-  return bf;
+  return (StgClosure *)bf;
 }
 
 /*
@@ -1281,7 +1639,7 @@ waitForTermination(void)
 {
   do {
     rtsPacket p = GetPacket();
-    processUnexpected(p);
+    processUnexpectedMessage(p);
   } while (rtsTrue);
 }
 
@@ -1304,7 +1662,7 @@ checkGAGAMap(globalAddr *gagamap, int nGAs)
 {
   nat i;
   
-  for (i = 0; i < nGAs; ++i, gagamap += 2) {
+  for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
     ASSERT(looks_like_ga(gagamap));
     ASSERT(looks_like_ga(gagamap+1));
   }
@@ -1330,9 +1688,11 @@ prepareFreeMsgBuffers(void)
                                          "prepareFreeMsgBuffers (Buffer)");
     
     for(i = 0; i < nPEs; i++) 
-      if (i != thisPE) 
+      if (i != (thisPE-1)) 
        freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
                                               "prepareFreeMsgBuffers (Buffer #i)");
+      else
+       freeMsgBuffer[i] = 0;
   }
   
   /* Initialize the freeMsg buffer pointers to point to the start of their
@@ -1377,6 +1737,34 @@ sendFreeMessages(void)
       sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
 }
 
+/* synchronises with the other PEs. Receives and records in a global
+ * variable the task-id of SysMan. If this is the main thread (discovered
+ * in main.lc), identifies itself to SysMan. Finally it receives
+ * from SysMan an array of the Global Task Ids of each PE, which is
+ * returned as the value of the function.
+ */
+
+#if defined(PAR_TICKY)
+/* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
+//@cindex stats_CntFreeGA
+void
+stats_CntFreeGA (void) {  // stats only
+
+  // Global statistics: residency of thread and spark pool
+  if (RtsFlags.ParFlags.ParStats.Global &&
+      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+    nat i, s;
+  
+    globalParStats.cnt_free_GA++;
+    for (i = 0, s = 0; i < nPEs; i++) 
+      s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
+
+    if ( s > globalParStats.res_free_GA )
+      globalParStats.res_free_GA = s;
+  }
+}
+#endif /* PAR_TICKY */
+
 #endif /* PAR -- whole file */
 
 //@node Index,  , Miscellaneous Functions, High Level Communications Routines