[project @ 2002-12-05 23:49:43 by mthomas]
[ghc-hetmet.git] / ghc / rts / parallel / Pack.c
index 72c66bf..75e12e0 100644 (file)
@@ -1,6 +1,6 @@
 /* 
-   Time-stamp: <Thu Mar 30 2000 22:53:32 Stardate: [-30]4584.56 hwloidl>
-   $Id: Pack.c,v 1.5 2000/08/07 23:37:24 qrczak Exp $
+   Time-stamp: <Wed Mar 21 2001 16:32:47 Stardate: [-30]6363.44 hwloidl>
+   $Id: Pack.c,v 1.8 2001/07/24 05:04:59 ken Exp $
 
    Graph packing and unpacking code for sending it to another processor
    and retrieving the original graph structure from the packet.
@@ -51,6 +51,8 @@
 #include "GranSimRts.h"
 #include "ParallelRts.h"
 # if defined(DEBUG)
+# include "Sanity.h"
+# include "Printer.h"
 # include "ParallelDebug.h"
 # endif
 #include "FetchMe.h"
@@ -147,10 +149,11 @@ static void       DonePacking(void);
 static void    AmPacking(StgClosure *closure);
 static int     OffsetFor(StgClosure *closure);
 static rtsBool  NotYetPacking(int offset);
-static rtsBool  RoomToPack (nat size, nat ptrs);
-       rtsBool  isOffset(globalAddr *ga);
-       rtsBool  isFixed(globalAddr *ga);
-       rtsBool  isConstr(globalAddr *ga);
+static inline rtsBool  RoomToPack (nat size, nat ptrs);
+static inline rtsBool  isOffset(globalAddr *ga);
+static inline rtsBool  isFixed(globalAddr *ga);
+static inline rtsBool  isConstr(globalAddr *ga);
+static inline rtsBool  isUnglobalised(globalAddr *ga);
 # elif defined(GRAN)
 static void     DonePacking(void);
 static rtsBool  NotYetPacking(StgClosure *closure);
@@ -167,6 +170,9 @@ static nat     pack_locn,           /* ptr to first free loc in pack buffer */
                buf_id = 1;          /* identifier for buffer */
 static nat     unpacked_size;
 static rtsBool roomInBuffer;
+#if defined(PAR)
+static GlobalTaskId dest_gtid=0;    /* destination for message to send */
+#endif
 
 /* 
    The pack buffer
@@ -220,6 +226,10 @@ static nat clq_pos, clq_size;
 
 static StgClosure **ClosureQueue = NULL;   /* HWL: init in main */
 
+#if defined(DEBUG)
+static char graphFingerPrint[MAX_FINGER_PRINT_LEN];
+#endif
+
 //@node Init routines, Basic routines, Closure Queues, ADT of Closure Queues
 //@subsubsection Init routines
 
@@ -236,7 +246,7 @@ InitClosureQueue(void)
                                                 "InitClosureQueue");
 }
 
-//@node Basic routines,  , Init routines, ADT of Closure Queues
+//@node Basic routines, Types of Global Addresses, Init routines, ADT of Closure Queues
 //@subsubsection Basic routines
 
 /*
@@ -257,11 +267,15 @@ static inline void
 QueueClosure(closure)
 StgClosure *closure;
 {
-  if(clq_size < RTS_PACK_BUFFER_SIZE )
+  if(clq_size < RTS_PACK_BUFFER_SIZE ) {
+    IF_PAR_DEBUG(paranoia,
+                belch(">__> <<%d>> Q: %p (%s); %d elems in q",
+                      globalPackBuffer->id, closure, info_type(closure), clq_size-clq_pos));
     ClosureQueue[clq_size++] = closure;
-  else
+  } else { 
     barf("Closure Queue Overflow (EnQueueing %p (%s))", 
         closure, info_type(closure));
+  }
 }
 
 /* DeQueueClosure returns the head of the closure queue. */
@@ -270,16 +284,22 @@ StgClosure *closure;
 static inline StgClosure * 
 DeQueueClosure(void)
 {
-  if(!QueueEmpty())
+  if(!QueueEmpty()) {
+    IF_PAR_DEBUG(paranoia,
+                belch(">__> <<%d>> DeQ: %p (%s); %d elems in q",
+                      globalPackBuffer->id, ClosureQueue[clq_pos], info_type(ClosureQueue[clq_pos]), 
+                      clq_size-clq_pos));
     return(ClosureQueue[clq_pos++]);
-  else
+  } else {
     return((StgClosure*)NULL);
+  }
 }
 
 /* DeQueueClosure returns the head of the closure queue. */
 
-//@cindex DeQueueClosure
-static inline StgClosure * 
+#if defined(DEBUG)
+//@cindex PrintQueueClosure
+static void
 PrintQueueClosure(void)
 {
   nat i;
@@ -287,9 +307,51 @@ PrintQueueClosure(void)
   fputs("Closure queue:", stderr);
   for (i=clq_pos; i < clq_size; i++)
     fprintf(stderr, "%p (%s), ", 
-           ClosureQueue[clq_pos++], info_type(ClosureQueue[clq_pos++]));
+           (StgClosure *)ClosureQueue[clq_pos++], 
+           info_type(ClosureQueue[clq_pos++]));
   fputc('\n', stderr);
 }
+#endif
+
+//@node Types of Global Addresses,  , Basic routines, ADT of Closure Queues
+//@subsubsection Types of Global Addresses
+
+/*
+  Types of Global Addresses
+
+  These routines determine whether a GA is one of a number of special types
+  of GA.
+*/
+
+# if defined(PAR)
+//@cindex isOffset
+static inline rtsBool 
+isOffset(globalAddr *ga)
+{
+    return (ga->weight == 1U && ga->payload.gc.gtid == (GlobalTaskId)0);
+}
+
+//@cindex isFixed
+static inline rtsBool
+isFixed(globalAddr *ga)
+{
+    return (ga->weight == 0U);
+}
+
+//@cindex isConstr
+static inline rtsBool
+isConstr(globalAddr *ga)
+{
+    return (ga->weight == 2U);
+}
+
+//@cindex isUnglobalised
+static inline rtsBool
+isUnglobalised(globalAddr *ga)
+{
+    return (ga->weight == 2U);
+}
+# endif
 
 //@node Initialisation for packing, Packing Functions, ADT of Closure Queues, Graph packing
 //@subsection Initialisation for packing
@@ -360,7 +422,7 @@ InitPackBuffer(void)
 {
   if (globalPackBuffer==(rtsPackBuffer*)NULL) {
     if ((globalPackBuffer = (rtsPackBuffer *) 
-        stgMallocWords(sizeofW(rtsPackBuffer)+RtsFlags.ParFlags.packBufferSize,
+        stgMallocWords(sizeofW(rtsPackBuffer)+RtsFlags.ParFlags.packBufferSize+DEBUG_HEADROOM,
                        "InitPackBuffer")) == NULL)
       return rtsFalse;
   }
@@ -427,21 +489,31 @@ InitPacking(rtsBool unpack)
 /* NB: this code is shared between GranSim and GUM;
        tso only used in GranSim */
 rtsPackBuffer *
-PackNearbyGraph(closure, tso, packBufferSize)
+PackNearbyGraph(closure, tso, packBufferSize, dest)
 StgClosure* closure;
 StgTSO* tso;
 nat *packBufferSize;
+GlobalTaskId dest;
 {
+  IF_PAR_DEBUG(resume,
+              graphFingerPrint[0] = '\0');
+
   ASSERT(RTS_PACK_BUFFER_SIZE > 0);
+  ASSERT(_HS==1);  // HWL HACK; compile time constant
+
+#if defined(PAR_TICKY) // HWL HAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACK
+  PAR_TICKY_PACK_NEARBY_GRAPH_START();
+#endif
 
   /* ToDo: check that we have enough heap for the packet
      ngoq ngo'
      if (Hp + PACK_HEAP_REQUIRED > HpLim) 
      return NULL;
   */
-
   InitPacking(rtsFalse);
-# if defined(GRAN)
+# if defined(PAR)
+  dest_gtid=dest; //-1 to disable
+# elif defined(GRAN)
   graph_root = closure;
 # endif
 
@@ -454,10 +526,12 @@ nat *packBufferSize;
                belch("** PrintGraph of %p is:", closure); 
                PrintGraph(closure,0));
 
-  IF_PAR_DEBUG(packet,
-              belch(">>> Packing <<%d>> (buffer @ %p); graph root @ %p [%x]\n    demanded by TSO %d (%p)",
+  IF_PAR_DEBUG(resume,
+              GraphFingerPrint(closure, graphFingerPrint);
+              ASSERT(strlen(graphFingerPrint)<=MAX_FINGER_PRINT_LEN);
+              belch(">>> Packing <<%d>> (buffer @ %p); graph root @ %p [%x]\n    demanded by TSO %d (%p); Finger-print is\n    {%s}",
                     globalPackBuffer->id, globalPackBuffer, closure, mytid,
-                    tso->id, tso)); 
+                    tso->id, tso, graphFingerPrint)); 
 
   IF_PAR_DEBUG(packet,
               belch("** PrintGraph of %p is:", closure); 
@@ -471,13 +545,15 @@ nat *packBufferSize;
   
 # if defined(PAR)
 
-  /* Record how much space is needed to unpack the graph */
-  globalPackBuffer->tso = tso; // ToDo: check: used in GUM or only for debugging?
+  /* Record how much space the graph needs in packet and in heap */
+  globalPackBuffer->tso = tso;       // currently unused, I think (debugging?)
   globalPackBuffer->unpacked_size = unpacked_size;
   globalPackBuffer->size = pack_locn;
 
-  /* Set the size parameter */
-  ASSERT(pack_locn <= RtsFlags.ParFlags.packBufferSize);
+  /* Check for buffer overflow (again) */
+  ASSERT(pack_locn <= RtsFlags.ParFlags.packBufferSize+DEBUG_HEADROOM);
+  IF_DEBUG(sanity,                           // write magic end-of-buffer word
+          globalPackBuffer->buffer[pack_locn] = END_OF_BUFFER_MARKER);
   *packBufferSize = pack_locn;
 
 # else  /* GRAN */
@@ -511,13 +587,18 @@ nat *packBufferSize;
   IF_GRAN_DEBUG(pack, PrintPacket(globalPackBuffer));
 # elif defined(PAR)
   IF_PAR_DEBUG(packet,
-               belch("** Finished <<%d>> packing graph %p; closures packed: %d; thunks packed: %d; size of graph: %d",
-                     globalPackBuffer->id, closure, globalPackBuffer->size, packed_thunks, globalPackBuffer->unpacked_size);
-               PrintPacket(globalPackBuffer));
+               belch("** Finished <<%d>> packing graph %p (%s); closures packed: %d; thunks packed: %d; size of graph: %d",
+                     globalPackBuffer->id, closure, info_type(closure),
+                     globalPackBuffer->size, packed_thunks, 
+                     globalPackBuffer->unpacked_size));;
 
   IF_DEBUG(sanity, // do a sanity check on the packet just constructed 
           checkPacket(globalPackBuffer));
 # endif   /* GRAN */
+
+#if defined(PAR_TICKY) // HWL HAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACK
+  PAR_TICKY_PACK_NEARBY_GRAPH_END(globalPackBuffer->size, packed_thunks);
+#endif
   
   return (globalPackBuffer);
 }
@@ -635,6 +716,9 @@ PackFetchMe(StgClosure *closure)
   StgInfoTable *ip;
   nat i;
   int offset;
+#if defined(DEBUG)
+  nat x = pack_locn;
+#endif
 
 #if defined(GRAN)
   barf("{PackFetchMe}Daq Qagh: no FetchMe closures in GRAN!");
@@ -645,12 +729,13 @@ PackFetchMe(StgClosure *closure)
                 belch("*>.. Packing FETCH_ME for closure %p (s) as offset to %d",
                       closure, info_type(closure), offset));
     PackOffset(offset);
-    unpacked_size += 2;
+    // unpacked_size += 0;   // unpacked_size unchanged (closure is shared!!)
     return;
   }
 
   /* Need a GA even when packing a constructed FETCH_ME (cruel world!) */
   AmPacking(closure);
+  /* FMs must be always globalised */
   GlobaliseAndPackGA(closure);
 
   IF_PAR_DEBUG(pack,
@@ -661,18 +746,84 @@ PackFetchMe(StgClosure *closure)
                     globalPackBuffer->buffer[pack_locn-3]));
 
   /* Pack a FetchMe closure instead of closure */
-  ip = &FETCH_ME_info;
+  ip = &stg_FETCH_ME_info;
   /* this assumes that the info ptr is always the first word in a closure*/
   Pack((StgWord)ip);
   for (i = 1; i < _HS; ++i)               // pack rest of fixed header
     Pack((StgWord)*(((StgPtr)closure)+i));
   
-  unpacked_size += PACK_FETCHME_SIZE;
+  unpacked_size += sizeofW(StgFetchMe);
+  /* size of FETCHME in packed is the same as that constant */
+  ASSERT(pack_locn-x==PACK_FETCHME_SIZE);
+  /* In the pack buffer the pointer to a GA (in the FetchMe closure) 
+     is expanded to the full GA; this is a compile-time const */
+  //ASSERT(PACK_FETCHME_SIZE == sizeofW(StgFetchMe)-1+PACK_GA_SIZE);  
 #endif
 }
 
 #endif
 
+#ifdef DIST
+static void
+PackRemoteRef(StgClosure *closure)
+{
+  StgInfoTable *ip;
+  nat i;
+  int offset;
+
+  offset = OffsetFor(closure);
+  if (!NotYetPacking(offset)) {
+    PackOffset(offset);
+    unpacked_size += 2;
+    return;
+  }
+
+  /* Need a GA even when packing a constructed REMOTE_REF (cruel world!) */
+  AmPacking(closure);
+  
+  /* basically we just Globalise, but for sticky things we can't have multiple GAs,
+     so we must prevent the GAs being split.
+     
+     In returning things to the true sticky owner, this case is already handled, but for
+     anything else we just give up at the moment... This needs to be fixed! 
+  */
+  { globalAddr *ga;
+    ga = LAGAlookup(closure); // surely this ga must exist?
+    
+    // ***************************************************************************
+    // ***************************************************************************
+    // REMOTE_REF HACK - dual is in SetGAandCommonUp
+    // - prevents the weight from ever reaching zero
+    if(ga != NULL) 
+      ga->weight=0x06660666; //anything apart from 0 really...
+    // ***************************************************************************
+    // ***************************************************************************
+    
+    if((ga != NULL)&&(ga->weight / 2 <= 2))
+      barf("Cant split the weight any further when packing REMOTE_REF for closure %p (%s) with GA: ((%x, %d, %x))",
+               closure, info_type(closure), 
+               ga->payload.gc.gtid, ga->payload.gc.slot, ga->weight);                               
+  } 
+  GlobaliseAndPackGA(closure);
+      
+  IF_PAR_DEBUG(pack,
+              belch("*>.. Packing REMOTE_REF for closure %p (%s) with GA: ((%x, %d, %x))",
+                    closure, info_type(closure), 
+                    globalPackBuffer->buffer[pack_locn-2],
+                    globalPackBuffer->buffer[pack_locn-1],
+                    globalPackBuffer->buffer[pack_locn-3]));
+
+  /* Pack a REMOTE_REF closure instead of closure */
+  ip = &stg_REMOTE_REF_info;
+  /* this assumes that the info ptr is always the first word in a closure*/
+  Pack((StgWord)ip);
+  for (i = 1; i < _HS; ++i)               // pack rest of fixed header
+    Pack((StgWord)*(((StgPtr)closure)+i));
+  
+  unpacked_size += PACK_FETCHME_SIZE;
+}
+#endif /* DIST */
+
 //@node Packing Closures,  , Packing Sections of Nearby Graph, Packing Functions
 //@subsubsection Packing Closures
 /*
@@ -701,7 +852,6 @@ PackClosure(closure)
 StgClosure *closure;
 {
   StgInfoTable *info;
-  StgClosure *indirectee;
   nat clpack_locn;
 
   ASSERT(LOOKS_LIKE_GHC_INFO(get_itbl(closure)));
@@ -722,37 +872,32 @@ StgClosure *closure;
   switch (info->type) {
 
   case CONSTR_CHARLIKE:
-    {
-      StgChar val = ((StgIntCharlikeClosure*)closure)->data;
-      
-      if ((val <= MAX_CHARLIKE) && (val >= MIN_CHARLIKE)) {
-       IF_PAR_DEBUG(pack,
-                    belch("*>^^ Packing a small charlike %d as a PLC", val));
-       PackPLC((StgPtr)CHARLIKE_CLOSURE(val));
-      } else {
-       IF_PAR_DEBUG(pack,
-                    belch("*>^^ Packing a big charlike %d as a normal closure", 
-                          val));
-       PackGeneric(closure);
-      }
-      return;
-    }
+    IF_PAR_DEBUG(pack,
+                belch("*>^^ Packing a charlike closure %d", 
+                      ((StgIntCharlikeClosure*)closure)->data));
+    
+    PackPLC((StgPtr)CHARLIKE_CLOSURE(((StgIntCharlikeClosure*)closure)->data));
+    // NB: unpacked_size of a PLC is 0
+    return;
       
   case CONSTR_INTLIKE:
     {
       StgInt val = ((StgIntCharlikeClosure*)closure)->data;
-      
+
       if ((val <= MAX_INTLIKE) && (val >= MIN_INTLIKE)) {
        IF_PAR_DEBUG(pack,
-                    belch("*>^^ Packing a small intlike %d as a PLC", val));
+                    belch("*>^^ Packing a small intlike %d as a PLC", 
+                          val));
        PackPLC((StgPtr)INTLIKE_CLOSURE(val));
+       // NB: unpacked_size of a PLC is 0
+       return;
       } else {
        IF_PAR_DEBUG(pack,
                     belch("*>^^ Packing a big intlike %d as a normal closure", 
                           val));
        PackGeneric(closure);
+       return;
       }
-      return;
     }
 
   case CONSTR:
@@ -774,10 +919,11 @@ StgClosure *closure;
   case CONSTR_NOCAF_STATIC:// For now we ship indirections to CAFs: They are
                           // evaluated on each PE if needed
     IF_PAR_DEBUG(pack,
-      belch("*>~~ Packing a %p (%s) as a PLC", 
-           closure, info_type(closure)));
+                belch("*>~~ Packing a %p (%s) as a PLC", 
+                      closure, info_type(closure)));
 
     PackPLC((StgPtr)closure);
+    // NB: unpacked_size of a PLC is 0
     return;
 
   case THUNK_SELECTOR: 
@@ -830,8 +976,6 @@ StgClosure *closure;
     PackPAP((StgPAP *)closure);
     return;
 
-  case CAF_UNENTERED:
-  case CAF_ENTERED:
   case CAF_BLACKHOLE:
   case BLACKHOLE:
   case BLACKHOLE_BQ:
@@ -852,23 +996,30 @@ StgClosure *closure;
     PackFetchMe(closure);
     return;
 
-  case MVAR:
-    barf("*>   Pack: packing of MVARs not implemented",
-                      closure, info_type(closure));
-        
-    /* MVARs may not be copied; they are sticky objects in the new RTS */
-    /* therefore we treat them just as RBHs etc (what a great system!) 
-    IF_PAR_DEBUG(pack,
-                belch("** Found an MVar at %p (%s)", 
-                closure, info_type(closure))); */
+#ifdef DIST    
+  case REMOTE_REF:
     IF_PAR_DEBUG(pack,
-                belch("*>.. Packing an MVAR at %p (%s) as a FETCH_ME", 
+                belch("*>.. Packing %p (%s) as a REMOTE_REF", 
                       closure, info_type(closure)));
-    /* NB: in case of a FETCH_ME this might build up a chain of FETCH_MEs;
-           phps short-cut the GA here */
-    PackFetchMe(closure);
-    return;
+    PackRemoteRef(closure);
+    /* we hopefully don't end up with a chain of REMOTE_REFs!!!!!!!!!! */
 
+    return;
+#endif  
+    
+  case TSO:
+  case MVAR:
+#ifdef DIST
+          IF_PAR_DEBUG(pack,
+                belch("*>.. Packing %p (%s) as a RemoteRef", 
+                      closure, info_type(closure)));
+    PackRemoteRef(closure);
+#else
+    barf("{Pack}Daq Qagh: Only GdH can pack %p (%s)", 
+        closure, info_type(closure));
+#endif    
+    return;
+    
   case ARR_WORDS:
     PackArray(closure);
     return;
@@ -911,7 +1062,6 @@ StgClosure *closure;
         closure, info_type(closure));
     /* never reached */
 
-  case TSO:
   case BLOCKED_FETCH:
   case EVACUATED:
     /* something's very wrong */
@@ -1019,9 +1169,21 @@ PackGeneric(StgClosure *closure)
   ASSERT(!IS_BLACK_HOLE(closure));
 
   IF_PAR_DEBUG(pack,
-              fprintf(stderr, "*>== generic packing of %p (%s) (size=%d, ptrs=%d, nonptrs=%d)\n",
+              fprintf(stderr, "*>== %p (%s): generic packing (size=%d, ptrs=%d, nonptrs=%d)\n",
                       closure, info_type(closure), size, ptrs, nonptrs));
 
+  /* packing strategies: how many thunks to add to a packet; 
+     default is infinity i.e. RtsFlags.ParFlags.thunksToPack==0 */
+  if (RtsFlags.ParFlags.thunksToPack &&
+      packed_thunks >= RtsFlags.ParFlags.thunksToPack &&
+      closure_THUNK(closure)) {
+    IF_PAR_DEBUG(pack,
+                belch("*>&& refusing to pack more than %d thunks per packet; packing FETCH_ME for closure %p (%s)",
+                      packed_thunks, closure, info_type(closure)));
+    PackFetchMe(closure);
+    return;
+  }
+
   /* Primitive arrays have gone; now we have (MUT_)ARR_WORDS etc */
 
   if (!RoomToPack(PACK_GA_SIZE + _HS + vhs + nonptrs, ptrs)) {
@@ -1035,14 +1197,21 @@ PackGeneric(StgClosure *closure)
   /* Record the location of the GA */
   AmPacking(closure);
   /* Allocate a GA for this closure and put it into the buffer */
-  GlobaliseAndPackGA(closure);
+  /* Checks for globalisation scheme; default: globalise everything thunks */
+  if ( RtsFlags.ParFlags.globalising == 0 || 
+       (closure_THUNK(closure) && !closure_UNPOINTED(closure)) )
+    GlobaliseAndPackGA(closure);
+  else
+    Pack((StgWord)2);  // marker for unglobalised closure
+
 
   ASSERT(!(info->type == ARR_WORDS || info->type == MUT_ARR_PTRS ||
           info->type == MUT_ARR_PTRS_FROZEN || info->type == MUT_VAR));
 
   /* At last! A closure we can actually pack! */
-  if (ip_MUTABLE(info) && (info->type != FETCH_ME))
-    barf("*>// PackClosure: trying to replicate a Mutable closure!");
+  if (ip_MUTABLE(info) && ((info->type != FETCH_ME)||(info->type != REMOTE_REF)))
+    barf("*>// %p (%s) PackClosure: trying to replicate a Mutable closure!",
+        closure, info_type(closure));
       
   /* 
      Remember, the generic closure layout is as follows:
@@ -1072,7 +1241,7 @@ PackGeneric(StgClosure *closure)
   }
 
   unpacked_size += size;
-  // unpacked_size += (size < MIN_UPD_SIZE) ? MIN_UPD_SIZE : size;
+  //unpacked_size += (size < MIN_UPD_SIZE) ? MIN_UPD_SIZE : size;
 
   /*
    * Record that this is a revertable black hole so that we can fill in
@@ -1081,11 +1250,13 @@ PackGeneric(StgClosure *closure)
    * ACK.
    */
       
-  // IS_UPDATABLE(closure) == !closure_UNPOINTED(closure) !? HWL
   if (closure_THUNK(closure) && !closure_UNPOINTED(closure)) { 
     rbh = convertToRBH(closure);
-    ASSERT(rbh == closure); // rbh at the same position (minced version)
+    ASSERT(size>=_HS+MIN_UPD_SIZE); // min size for any updatable closure
+    ASSERT(rbh == closure);         // rbh at the same position (minced version)
     packed_thunks++;
+  } else if ( closure==graph_root ) {
+    packed_thunks++;                // root of graph is counted as a thunk
   }
 }
 /*
@@ -1098,31 +1269,49 @@ static void
 PackArray(StgClosure *closure)
 {
   StgInfoTable *info;
-  nat size, ptrs, nonptrs, vhs, i, n;
+  nat size, ptrs, nonptrs, vhs;
+  nat i, n;
   char str[80];
 
-#if DEBUG
-  /* we don't really need all that get_closure_info delivers; however, for
-     debugging it's useful to have the stuff anyway */
-
   /* get info about basic layout of the closure */
   info = get_closure_info(closure, &size, &ptrs, &nonptrs, &vhs, str);
 
   ASSERT(info->type == ARR_WORDS || info->type == MUT_ARR_PTRS ||
         info->type == MUT_ARR_PTRS_FROZEN || info->type == MUT_VAR);
-#endif
-  /* record offset of the closure and allocate a GA */
-  AmPacking(closure);
-  GlobaliseAndPackGA(closure);
 
   n = ((StgArrWords *)closure)->words;
   // this includes the header!: arr_words_sizeW(stgCast(StgArrWords*,q)); 
 
   IF_PAR_DEBUG(pack,
-              belch("*>== packing an array of %d words %p (%s) (size=%d)\n",
-                    n, closure, info_type(closure), 
+              belch("*>== %p (%s): packing an array of %d words (size=%d)\n",
+                    closure, info_type(closure), n,
                     arr_words_sizeW((StgArrWords *)closure)));
 
+  /* check that we have enough room in the pack buffer */
+  if (!RoomToPack(PACK_GA_SIZE + _HS + vhs + nonptrs, ptrs)) {
+    IF_PAR_DEBUG(pack,
+                belch("*>&& pack buffer is full; packing FETCH_ME for closure %p (%s)",
+                      closure, info_type(closure)));
+    PackFetchMe(closure);
+    return;
+  }
+
+  /* global stats about arrays sent */
+  if (RtsFlags.ParFlags.ParStats.Global &&
+      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+    globalParStats.tot_arrs++;
+    globalParStats.tot_arr_size += ((StgArrWords *)closure)->words;
+  }
+
+  /* record offset of the closure and allocate a GA */
+  AmPacking(closure);
+  /* Checks for globalisation scheme; default: globalise everything thunks */
+  if ( RtsFlags.ParFlags.globalising == 0 || 
+       (closure_THUNK(closure) && !closure_UNPOINTED(closure)) )
+    GlobaliseAndPackGA(closure);
+  else
+    Pack((StgWord)2);  // marker for unglobalised closure
+
   /* Pack the header (2 words: info ptr and the number of words to follow) */
   Pack((StgWord)*(StgPtr)closure);
   Pack(((StgArrWords *)closure)->words);
@@ -1147,33 +1336,52 @@ PackArray(StgClosure *closure)
 //@cindex PackPAP
 static void
 PackPAP(StgPAP *pap) {
-  nat m, n, i, j, pack_start;
-  StgPtr p, q,  end/*dbg*/;
+  nat n, i, j, pack_start;
+  StgPtr p, q;
   const StgInfoTable* info;
-  StgWord32 bitmap;
+  StgWord bitmap;
   /* debugging only */
+  StgPtr end;
   nat size, ptrs, nonptrs, vhs;
   char str[80];
+  nat unpacked_size_before_PAP, FMs_in_PAP=0; // debugging only
 
   /* This is actually a setup invariant; checked here 'cause it affects PAPs*/
-  ASSERT(PACK_FETCHME_SIZE == 1 + sizeofW(StgFetchMe));
+  //ASSERT(PACK_FETCHME_SIZE == sizeofW(StgFetchMe)-1+PACK_GA_SIZE);
   ASSERT(NotYetPacking(OffsetFor((StgClosure *)pap)));
+  IF_DEBUG(sanity,
+          unpacked_size_before_PAP = unpacked_size);
 
-  /* record offset of the closure and allocate a GA */
-  AmPacking((StgClosure *)pap);
-  GlobaliseAndPackGA((StgClosure *)pap);
+  n = (nat)(pap->n_args);
 
   /* get info about basic layout of the closure */
   info = get_closure_info((StgClosure *)pap, &size, &ptrs, &nonptrs, &vhs, str);
   ASSERT(ptrs==0 && nonptrs==0 && size==pap_sizeW(pap));
 
-  n = (nat)(pap->n_args);
-
   IF_PAR_DEBUG(pack,
-              belch("*>** PackPAP: packing PAP @ %p with %d words (size=%d; ptrs=%d; nonptrs=%d:", 
-                        (StgClosure *)pap, n,  size, ptrs, nonptrs);
+              belch("*>**  %p (%s): PackPAP: packing PAP with %d words (size=%d; ptrs=%d; nonptrs=%d:", 
+                    (StgClosure *)pap, info_type((StgClosure *)pap),
+                    n, size, ptrs, nonptrs);
                printClosure((StgClosure *)pap));
 
+  /* check that we have enough room in the pack buffer */
+  if (!RoomToPack(PACK_GA_SIZE + _HS + vhs + nonptrs, ptrs)) {
+    IF_PAR_DEBUG(pack,
+                belch("*>&& pack buffer is full; packing FETCH_ME for closure %p (%s)",
+                      (StgClosure *)pap, info_type((StgClosure *)pap)));
+    PackFetchMe((StgClosure *)pap);
+    return;
+  }
+
+  /* record offset of the closure and allocate a GA */
+  AmPacking((StgClosure *)pap);
+  /* Checks for globalisation scheme; default: globalise everything thunks */
+  if ( RtsFlags.ParFlags.globalising == 0 || 
+       (closure_THUNK(pap) && !closure_UNPOINTED(pap)) )
+    GlobaliseAndPackGA((StgClosure *)pap);
+  else
+    Pack((StgWord)2);  // marker for unglobalised closure
+
   /* Pack the PAP header */
   Pack((StgWord)(pap->header.info));
   Pack((StgWord)(pap->n_args));
@@ -1191,9 +1399,9 @@ PackPAP(StgPAP *pap) {
 
     /* If we've got a tag, pack all words in that block */
     if (IS_ARG_TAG((W_)q)) {   // q stands for the no. of non-ptrs to follow
-      nat m = ARG_TAG(q);      // first word after this block
+      nat m = ARG_TAG((W_)q);      // first word after this block
       IF_PAR_DEBUG(pack,
-                  belch("*>** PackPAP @ %p: packing %d words (tagged), starting @ %p", 
+                  belch("*>**    PackPAP @ %p: packing %d words (tagged), starting @ %p", 
                         p, m, p));
       for (i=0; i<m+1; i++)
        Pack((StgWord)*(p+i));
@@ -1208,23 +1416,13 @@ PackPAP(StgPAP *pap) {
       /* distinguish static closure (PLC) from other closures (FM) */
       switch (get_itbl((StgClosure*)q)->type) {
       case CONSTR_CHARLIKE:
-       {
-         StgChar val = ((StgIntCharlikeClosure*)q)->data;
-      
-         if ((val <= MAX_CHARLIKE) && (val >= MIN_CHARLIKE)) {
-           IF_PAR_DEBUG(pack,
-                        belch("*>** PackPAP: Packing ptr to a small charlike %d as a PLC", val));
-           PackPLC((StgPtr)CHARLIKE_CLOSURE(val));
-         } else {
-           IF_PAR_DEBUG(pack,
-                        belch("*>** PackPAP: Packing a ptr to a big charlike %d as a FM", 
-                              val));
-           Pack((StgWord)(ARGTAG_MAX+1));
-           PackFetchMe((StgClosure *)q);
-         }
-         p++;
-         break;
-       }
+       IF_PAR_DEBUG(pack,
+                    belch("*>**    PackPAP: packing a charlike closure %d", 
+                          ((StgIntCharlikeClosure*)q)->data));
+    
+       PackPLC((StgPtr)CHARLIKE_CLOSURE(((StgIntCharlikeClosure*)q)->data));
+       p++;
+       break;
       
       case CONSTR_INTLIKE:
        {
@@ -1232,17 +1430,20 @@ PackPAP(StgPAP *pap) {
       
          if ((val <= MAX_INTLIKE) && (val >= MIN_INTLIKE)) {
            IF_PAR_DEBUG(pack,
-                        belch("*>** PackPAP: Packing ptr to a small intlike %d as a PLC", val));
+                        belch("*>**    PackPAP: Packing ptr to a small intlike %d as a PLC", val));
            PackPLC((StgPtr)INTLIKE_CLOSURE(val));
+           p++;
+           break;
          } else {
            IF_PAR_DEBUG(pack,
-                        belch("*>** PackPAP: Packing a ptr to a big intlike %d as a FM", 
+                        belch("*>**    PackPAP: Packing a ptr to a big intlike %d as a FM", 
                               val));
            Pack((StgWord)(ARGTAG_MAX+1));
            PackFetchMe((StgClosure *)q);
+           p++;
+           IF_DEBUG(sanity, FMs_in_PAP++);
+           break;
          }
-         p++;
-         break;
        }
        case THUNK_STATIC:       // ToDo: check whether that's ok
        case FUN_STATIC:       // ToDo: check whether that's ok
@@ -1250,7 +1451,7 @@ PackPAP(StgPAP *pap) {
        case CONSTR_NOCAF_STATIC:
          {
            IF_PAR_DEBUG(pack,
-                        belch("*>** PackPAP: packing a ptr to a %p (%s) as a PLC", 
+                        belch("*>**    PackPAP: packing a ptr to a %p (%s) as a PLC", 
                               q, info_type((StgClosure *)q)));
            
            PackPLC((StgPtr)q);
@@ -1259,11 +1460,12 @@ PackPAP(StgPAP *pap) {
          }
       default:
          IF_PAR_DEBUG(pack,
-                      belch("*>** PackPAP @ %p: packing FM to %p (%s)", 
+                      belch("*>**    PackPAP @ %p: packing FM to %p (%s)", 
                             p, q, info_type((StgClosure*)q)));
          Pack((StgWord)(ARGTAG_MAX+1));
          PackFetchMe((StgClosure *)q);
          p++;
+         IF_DEBUG(sanity, FMs_in_PAP++);
          break;
       }
       continue;
@@ -1280,7 +1482,7 @@ PackPAP(StgPAP *pap) {
       /* Dynamic bitmap: the mask is stored on the stack */
     case RET_DYN:
       IF_PAR_DEBUG(pack,
-                  belch("*>** PackPAP @ %p: RET_DYN", 
+                  belch("*>**    PackPAP @ %p: RET_DYN", 
                         p));
 
       /* Pack the header as is */
@@ -1297,7 +1499,7 @@ PackPAP(StgPAP *pap) {
     case FUN_STATIC:
       {
       IF_PAR_DEBUG(pack,
-                  belch("*>** PackPAP @ %p: FUN or FUN_STATIC", 
+                  belch("*>**    PackPAP @ %p: FUN or FUN_STATIC", 
                         p));
 
       Pack((StgWord)(((StgClosure *)p)->header.info));
@@ -1315,7 +1517,7 @@ PackPAP(StgPAP *pap) {
        ASSERT(type==BLACKHOLE || type==CAF_BLACKHOLE || type==BLACKHOLE_BQ);
 
        IF_PAR_DEBUG(pack,
-                    belch("*>** PackPAP @ %p: UPDATE_FRAME (updatee=%p; link=%p)", 
+                    belch("*>**    PackPAP @ %p: UPDATE_FRAME (updatee=%p; link=%p)", 
                           p, frame->updatee, frame->link));
 
        Pack((StgWord)(frame->header.info));
@@ -1329,7 +1531,7 @@ PackPAP(StgPAP *pap) {
     case STOP_FRAME:
       {
        IF_PAR_DEBUG(pack,
-                    belch("*>** PackPAP @ %p: STOP_FRAME", 
+                    belch("*>**    PackPAP @ %p: STOP_FRAME", 
                           p));
        Pack((StgWord)((StgStopFrame *)p)->header.info);
        p++;
@@ -1338,7 +1540,7 @@ PackPAP(StgPAP *pap) {
     case CATCH_FRAME:
       {
        IF_PAR_DEBUG(pack,
-                    belch("*>** PackPAP @ %p: CATCH_FRAME (handler=%p)", 
+                    belch("*>**    PackPAP @ %p: CATCH_FRAME (handler=%p)", 
                           p, ((StgCatchFrame *)p)->handler));
 
        Pack((StgWord)((StgCatchFrame *)p)->header.info);
@@ -1351,7 +1553,7 @@ PackPAP(StgPAP *pap) {
     case SEQ_FRAME:
       {
        IF_PAR_DEBUG(pack,
-                    belch("*>** PackPAP @ %p: UPDATE_FRAME (link=%p)", 
+                    belch("*>**    PackPAP @ %p: UPDATE_FRAME (link=%p)", 
                           p, ((StgSeqFrame *)p)->link));
 
        Pack((StgWord)((StgSeqFrame *)p)->header.info);
@@ -1367,7 +1569,7 @@ PackPAP(StgPAP *pap) {
     case RET_SMALL:
     case RET_VEC_SMALL:
       IF_PAR_DEBUG(pack,
-                  belch("*>** PackPAP @ %p: RET_{BCO,SMALL,VEC_SMALL} (bitmap=%o)", 
+                  belch("*>**    PackPAP @ %p: RET_{BCO,SMALL,VEC_SMALL} (bitmap=%o)", 
                         p, info->layout.bitmap));
 
 
@@ -1382,6 +1584,7 @@ PackPAP(StgPAP *pap) {
        if ((bitmap & 1) == 0) {
          Pack((StgWord)(ARGTAG_MAX+1));
          PackFetchMe((StgClosure *)*p++); // pack a FetchMe to the closure
+         IF_DEBUG(sanity, FMs_in_PAP++);
        } else {
          Pack((StgWord)*p++);
        }
@@ -1389,7 +1592,8 @@ PackPAP(StgPAP *pap) {
       }
       
     follow_srt:
-      belch("*>-- PackPAP: nothing to do for follow_srt");
+       IF_PAR_DEBUG(pack,
+                    belch("*>--    PackPAP: nothing to do for follow_srt"));
       continue;
 
       /* large bitmap (> 32 entries) */
@@ -1400,7 +1604,7 @@ PackPAP(StgPAP *pap) {
        StgLargeBitmap *large_bitmap;
 
        IF_PAR_DEBUG(pack,
-                    belch("*>** PackPAP @ %p: RET_{BIG,VEC_BIG} (large_bitmap=%p)", 
+                    belch("*>**    PackPAP @ %p: RET_{BIG,VEC_BIG} (large_bitmap=%p)", 
                           p, info->layout.large_bitmap));
 
 
@@ -1411,11 +1615,12 @@ PackPAP(StgPAP *pap) {
 
        for (j=0; j<large_bitmap->size; j++) {
          bitmap = large_bitmap->bitmap[j];
-         q = p + sizeof(W_) * 8;
+         q = p + BITS_IN(W_);
          while (bitmap != 0) {
            if ((bitmap & 1) == 0) {
              Pack((StgWord)(ARGTAG_MAX+1));
              PackFetchMe((StgClosure *)*p++); // ToDo: pack pointer(StgClosure *)*p = evacuate((StgClosure *)*p);
+             IF_DEBUG(sanity, FMs_in_PAP++);
            } else {
              Pack((StgWord)*p++);
            }
@@ -1425,6 +1630,7 @@ PackPAP(StgPAP *pap) {
            while (p < q) {
              Pack((StgWord)(ARGTAG_MAX+1));
              PackFetchMe((StgClosure *)*p++); // ToDo: pack pointer (StgClosure *)*p = evacuate((StgClosure *)*p);
+             IF_DEBUG(sanity, FMs_in_PAP++);
            }
          }
        }
@@ -1440,10 +1646,18 @@ PackPAP(StgPAP *pap) {
   }
   // fill in size of the PAP (only the payload!) in buffer
   globalPackBuffer->buffer[pack_start] = (StgWord)(pack_locn - pack_start - 1*sizeofW(StgWord));
-  // add the size of the whole packed closure; this relies on the fact that
-  // the size of the unpacked PAP + size of all unpacked FMs is the same as
-  // the size of the packed PAP!!
-  unpacked_size += sizeofW(pap) + (nat)(globalPackBuffer->buffer[pack_start]);
+  /*
+    We can use the generic pap_sizeW macro to compute the size of the
+    unpacked PAP because whenever we pack a new FETCHME as part of the
+    PAP's payload we also adjust unpacked_size accordingly (smart, aren't we?)
+
+    NB: the current PAP (un-)packing code  relies on the fact that
+    the size of the unpacked PAP + size of all unpacked FMs is the same as
+    the size of the packed PAP!!
+  */
+  unpacked_size += pap_sizeW(pap); // sizeofW(pap) + (nat)(globalPackBuffer->buffer[pack_start]);
+  IF_DEBUG(sanity,
+          ASSERT(unpacked_size-unpacked_size_before_PAP==pap_sizeW(pap)+FMs_in_PAP*sizeofW(StgFetchMe)));
 }
 # else  /* GRAN */
 
@@ -1555,11 +1769,6 @@ StgClosure *closure;
       /* partial applications; special treatment necessary? */
       break;
 
-    case CAF_UNENTERED:    /* # of ptrs, nptrs: 1,3 */
-    case CAF_ENTERED:      /* # of ptrs, nptrs: 0,4  (allegedly bogus!!) */
-      /* CAFs; special treatment necessary? */
-      break;
-
     case MVAR:
       barf("{PackClosure}Daq Qagh: found an MVAR (%p, %s); ToDo: implement proper treatment of MVARs",
           closure, info_type(closure));
@@ -1665,14 +1874,14 @@ StgClosure *closure;
        P_ childInfo;
        W_ childSize, childPtrs, childNonPtrs, childVhs;
        
-       childInfo = get_closure_info(((StgPtrPtr) (closure))[i + FIXED_HS + vhs],
+       childInfo = get_closure_info(((StgPtrPtr) (closure))[i + _HS + vhs],
        &childSize, &childPtrs, &childNonPtrs,
        &childVhs, junk_str);
        if (IS_BIG_MOTHER(childInfo)) {
-       reservedPAsize += PACK_GA_SIZE + FIXED_HS + 
+       reservedPAsize += PACK_GA_SIZE + _HS + 
        childVhs + childNonPtrs +
        childPtrs * PACK_FETCHME_SIZE;
-       PAsize += PACK_GA_SIZE + FIXED_HS + childSize;
+       PAsize += PACK_GA_SIZE + _HS + childSize;
        PAptrs += childPtrs;
        }
        }
@@ -1685,7 +1894,7 @@ StgClosure *closure;
        /*
          ToDo: fix this code
          || 
-         !(RoomToPack(PACK_GA_SIZE + FIXED_HS + vhs + nonptrs, ptrs) 
+         !(RoomToPack(PACK_GA_SIZE + _HS + vhs + nonptrs, ptrs) 
          || IS_BIG_MOTHER(info))) 
          */
       return;
@@ -1716,7 +1925,9 @@ StgClosure *closure;
       QueueClosure((StgClosure *)(closure->payload[i]));
       IF_GRAN_DEBUG(pack,
                    belch("**    [%p (%s) (Queueing closure) ....]",
-                         closure->payload[i], info_type(payloadPtr(closure,i))));
+                         closure->payload[i], 
+                         info_type(*stgCast(StgPtr*,((closure)->payload+(i))))));
+                                  //^^^^^^^^^^^ payloadPtr(closure,i))));
     }
 
     /* 
@@ -1792,15 +2003,36 @@ StgClosure *closure;
   globalAddr *ga;
   globalAddr packGA;
 
-  if ((ga = LAGAlookup(closure)) == NULL)
+  if ((ga = LAGAlookup(closure)) == NULL) {
     ga = makeGlobal(closure, rtsTrue);
-  ASSERT(ga->weight==MAX_GA_WEIGHT || ga->weight > 2);
-  splitWeight(&packGA, ga);
-  ASSERT(packGA.weight > 0);
 
+    // Global statistics: increase amount of global data by closure-size
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+      StgInfoTable *info;
+      nat size, ptrs, nonptrs, vhs, i, m; // stats only!!
+      char str[80]; // stats only!!
+
+      info = get_closure_info(closure, &size, &ptrs, &nonptrs, &vhs, str);
+      globalParStats.tot_global += size;
+    }
+  }
+  ASSERT(ga->weight==MAX_GA_WEIGHT || ga->weight > 2);
+  
+  if(dest_gtid==ga->payload.gc.gtid)
+  {  packGA.payload = ga->payload;
+     packGA.weight = 0xFFFFFFFF; // 0,1,2 are used already
+  }
+  else
+  { splitWeight(&packGA, ga);
+    ASSERT(packGA.weight > 0);
+  }  
   IF_PAR_DEBUG(pack,
-              fprintf(stderr, "*>## Globalising closure %p (%s) with GA ", 
-                      closure, info_type(closure));
+              fprintf(stderr, "*>## %p (%s): Globalising (%s) closure with GA ",
+                      closure, info_type(closure),
+                      ( (ga->payload.gc.gtid==dest_gtid)?"returning":
+                          ( (ga->payload.gc.gtid==mytid)?"creating":"sharing" ) ));
               printGA(&packGA);
               fputc('\n', stderr));
 
@@ -1859,7 +2091,7 @@ int offset;
   unpacking of closures as it is done in the parallel runtime system.
 */
 
-//@node GUM code, Local Definitions, Unpacking routines, Unpacking routines
+//@node GUM code, GranSim Code, Unpacking routines, Unpacking routines
 //@subsubsection GUM code
 
 #if defined(PAR) 
@@ -1887,12 +2119,20 @@ void
 CommonUp(StgClosure *src, StgClosure *dst)
 {
   StgBlockingQueueElement *bqe;
+#if defined(DEBUG)
+  StgInfoTable *info;
+  nat size, ptrs, nonptrs, vhs, i;
+  char str[80];
+
+  /* get info about basic layout of the closure */
+  info = get_closure_info(src, &size, &ptrs, &nonptrs, &vhs, str);
+#endif
 
   ASSERT(src != (StgClosure *)NULL && dst != (StgClosure *)NULL);
   ASSERT(src != dst);
 
   IF_PAR_DEBUG(pack,
-              belch("*___ CommonUp %p (%s) --> %p (%s)",
+              belch("*___  CommonUp %p (%s) --> %p (%s)",
                     src, info_type(src), dst, info_type(dst)));
   
   switch (get_itbl(src)->type) {
@@ -1913,12 +2153,16 @@ CommonUp(StgClosure *src, StgClosure *dst)
     bqe = END_BQ_QUEUE;
     break;
 
+    /* These closures are too small to be updated with an indirection!!! */
+  case CONSTR_1_0:
+  case CONSTR_0_1:
+    ASSERT(size<_HS+MIN_UPD_SIZE); // that's why we have to avoid UPD_IND
+    return;
+
     /* currently we also common up 2 CONSTRs; this should reduce heap 
      * consumption but also does more work; not sure whether it's worth doing 
      */ 
   case CONSTR:
-  case CONSTR_1_0:
-  case CONSTR_0_1:
   case CONSTR_2_0:
   case CONSTR_1_1:
   case CONSTR_0_2:
@@ -1932,14 +2176,11 @@ CommonUp(StgClosure *src, StgClosure *dst)
     /* Don't common up anything else */
     return;
   }
+
+  /* closure must be big enough to permit update with ind */
+  ASSERT(size>=_HS+MIN_UPD_SIZE);
   /* NB: this also awakens the blocking queue for src */
   UPD_IND(src, dst);
-  // updateWithIndirection(src, dst);
-  /*
-    ASSERT(!IS_BIG_MOTHER(INFO_PTR(dst)));
-    if (bqe != END_BQ_QUEUE)
-    awakenBlockedQueue(bqe, src);
-  */
 }
 
 /*
@@ -1956,30 +2197,59 @@ SetGAandCommonUp(globalAddr *ga, StgClosure *closure, rtsBool hasGA)
 
   if (!hasGA)
     return closure;
-
+  
+  /* should we already have a local copy? */
+  if (ga->weight==0xFFFFFFFF) { 
+    ASSERT(ga->payload.gc.gtid==mytid); //sanity
+    ga->weight=0;
+    /* probably should also ASSERT that a commonUp takes place...*/
+  }
+  
   ip = get_itbl(closure);
   if ((existing = GALAlookup(ga)) == NULL) {
     /* Just keep the new object */
     IF_PAR_DEBUG(pack,
-                belch("*<## Unpacking new GA ((%x, %d, %x))", 
-                      ga->payload.gc.gtid, ga->payload.gc.slot, ga->weight));
+                belch("*<##  New local object for GA ((%x, %d, %x)) is %p (%s)", 
+                      ga->payload.gc.gtid, ga->payload.gc.slot, ga->weight,
+                      closure, info_type(closure)));
 
     // make an entry binding closure to ga in the RemoteGA table
     newGA = setRemoteGA(closure, ga, rtsTrue);
-    if (ip->type == FETCH_ME)
+    // if local closure is a FETCH_ME etc fill in the global indirection
+    if (ip->type == FETCH_ME || ip->type == REMOTE_REF)
       ((StgFetchMe *)closure)->ga = newGA;
   } else {
+    
+
+#ifdef DIST 
+// ***************************************************************************
+// ***************************************************************************
+// REMOTE_REF HACK - dual is in PackRemoteRef  
+// - prevents the weight ever being updated
+  if (ip->type == REMOTE_REF)
+    ga->weight=0;
+// ***************************************************************************
+// ***************************************************************************
+#endif /* DIST */
+    
     /* Two closures, one global name.  Someone loses */
     oldip = get_itbl(existing);
     if ((oldip->type == FETCH_ME || 
-        /* If we pack GAs for CONSTRs we have to check for them, too */
-        IS_BLACK_HOLE(existing)) &&
+        IS_BLACK_HOLE(existing) ||
+        /* try to share evaluated closures */
+         oldip->type == CONSTR ||
+        oldip->type == CONSTR_1_0 ||
+        oldip->type == CONSTR_0_1 ||
+        oldip->type == CONSTR_2_0 ||
+        oldip->type == CONSTR_1_1 ||
+        oldip->type == CONSTR_0_2 
+       ) &&
        ip->type != FETCH_ME) 
     {
       IF_PAR_DEBUG(pack,
-                  belch("*<#- Unpacking old GA ((%x, %d, %x)); redirecting %p -> %p",
+                  belch("*<#-  Duplicate local object for GA ((%x, %d, %x)); redirecting %p (%s) -> %p (%s)",
                         ga->payload.gc.gtid, ga->payload.gc.slot, ga->weight,
-                        existing, closure));
+                        existing, info_type(existing), closure, info_type(closure)));
 
       /* 
        * What we had wasn't worth keeping, so make the old closure an
@@ -1989,8 +2259,20 @@ SetGAandCommonUp(globalAddr *ga, StgClosure *closure, rtsBool hasGA)
        */
       CommonUp(existing, closure);
       //GALAdeprecate(ga);
-      /* now ga indirectly refers to the new closure */
-      ASSERT(UNWIND_IND(GALAlookup(ga))==closure);
+#if defined(DEBUG)
+      { 
+        StgInfoTable *info;
+        nat size, ptrs, nonptrs, vhs, i;
+        char str[80];
+      
+        /* get info about basic layout of the closure */
+        info = get_closure_info(GALAlookup(ga), &size, &ptrs, &nonptrs, &vhs, str);
+      
+        /* now ga indirectly refers to the new closure */
+        ASSERT(size<_HS+MIN_UPD_SIZE || 
+               UNWIND_IND(GALAlookup(ga))==closure);
+      }
+#endif
     } else {
       /*
        * Either we already had something worthwhile by this name or
@@ -2001,10 +2283,17 @@ SetGAandCommonUp(globalAddr *ga, StgClosure *closure, rtsBool hasGA)
        * the same as when they were packed.
        */
       IF_PAR_DEBUG(pack,
-                  belch("*<#@ Unpacking old GA ((%x, %d, %x)); keeping %p (%s) nuking unpacked %p (%s)", 
+                  belch("*<#@  Duplicate local object for GA ((%x, %d, %x)); keeping %p (%s) nuking unpacked %p (%s)", 
                         ga->payload.gc.gtid, ga->payload.gc.slot, ga->weight,
                         existing, info_type(existing), closure, info_type(closure)));
 
+      /* overwrite 2nd word; indicates that the closure is garbage */
+      IF_DEBUG(sanity,
+              ((StgFetchMe*)closure)->ga = (globalAddr*)GARBAGE_MARKER;
+              IF_PAR_DEBUG(pack,
+                           belch("++++  unpacked closure %p (%s) is garbage: %p",
+                                 closure, info_type(closure), *(closure+1))));
+
       closure = existing;
 #if 0
       // HACK
@@ -2018,15 +2307,14 @@ SetGAandCommonUp(globalAddr *ga, StgClosure *closure, rtsBool hasGA)
        CommonUp(closure, graph);
 #endif
     }
-    /* Pool the total weight in the stored ga */
+    /* We don't use this GA after all, so give back the weight */
     (void) addWeight(ga);
   }
 
-  /* ToDo: check this assertion!!
-     if we have unpacked a FETCH_ME, we have a GA, too 
-  ASSERT(get_itbl(*closureP)->type!=FETCH_ME || 
-        looks_like_ga(((StgFetchMe *)*closureP)->ga));
-  */
+  /* if we have unpacked a FETCH_ME, we have a GA, too */
+  ASSERT(get_itbl(closure)->type!=FETCH_ME || 
+        looks_like_ga(((StgFetchMe*)closure)->ga));
+
   /* Sort out the global address mapping */
   if (ip_THUNK(ip)){ 
     // || // (ip_THUNK(ip) && !ip_UNPOINTED(ip)) || 
@@ -2044,7 +2332,7 @@ SetGAandCommonUp(globalAddr *ga, StgClosure *closure, rtsBool hasGA)
     newGA->weight = gaga->weight = 1L << (BITS_IN(unsigned) - 1);    
     gaga->payload = newGA->payload;
     */
-    ASSERT(gaga->weight == 1L << (BITS_IN(unsigned) - 1));
+    ASSERT(gaga->weight == 1U << (BITS_IN(unsigned) - 1));
     gaga++;
   }
   return closure;
@@ -2077,8 +2365,8 @@ FillInClosure(StgWord ***bufptrP, StgClosure *graph)
          
   /* Make sure that nothing sans the fixed header is filled in
      The ga field of the FETCH_ME is filled in in SetGAandCommonUp */
-  if (ip->type == FETCH_ME) {
-    ASSERT(size>=MIN_UPD_SIZE);    // size of the FM in the heap
+  if (ip->type == FETCH_ME || ip->type == REMOTE_REF) {
+    ASSERT(size>=_HS+MIN_UPD_SIZE);    // size of the FM in the heap
     ptrs = nonptrs = vhs = 0;      // i.e. only unpack FH from buffer
   }
   /* ToDo: check whether this is really needed */
@@ -2115,7 +2403,7 @@ FillInClosure(StgWord ***bufptrP, StgClosure *graph)
   // ASSERT(INFO_PTR(graph) != (W_) Ind_info_TO_USE);
   // return bufptr;
    *bufptrP = bufptr;
-   ASSERT((ip->type==FETCH_ME && sizeofW(StgFetchMe)==size) ||
+   ASSERT(((ip->type==FETCH_ME || ip->type==REMOTE_REF)&& sizeofW(StgFetchMe)==size) ||
          _HS+vhs+ptrs+nonptrs == size);
    return size; 
 }
@@ -2143,8 +2431,9 @@ nat *pptrP, *pptrsP, *sizeP;
   while (*pptrP + 1 > *pptrsP) {
     /* *parentP has been constructed (all pointer set); so check it now */
     IF_DEBUG(sanity,
-            if (*parentP!=(StgClosure*)NULL &&
-                get_itbl(*parentP)->type != FETCH_ME)
+            if ((*parentP!=(StgClosure*)NULL) &&         // not root
+                (*((StgPtr)(*parentP)+1)!=GARBAGE_MARKER) && // not commoned up
+                (get_itbl(*parentP)->type != FETCH_ME))
               checkClosure(*parentP));
 
     *parentP = DeQueueClosure();
@@ -2181,7 +2470,7 @@ static  StgClosure*
 UnpackClosure (StgWord ***bufptrP, StgClosure **graphP, globalAddr *ga) {
   StgClosure *closure;
   nat size;
-  rtsBool hasGA = rtsFalse;
+  rtsBool hasGA = rtsFalse, unglobalised = rtsFalse;
 
   /* Now unpack the closure body, if there is one; three cases:
      - PLC: closure is just a pointer to a static closure
@@ -2193,26 +2482,32 @@ UnpackClosure (StgWord ***bufptrP, StgClosure **graphP, globalAddr *ga) {
   } else if (isOffset(ga)) {
     closure = UnpackOffset(ga);
   } else {
-    ASSERT(LOOKS_LIKE_GA(ga));
+    /* if not PLC or Offset it must be a GA and then the closure */
+    ASSERT(RtsFlags.ParFlags.globalising!=0 || LOOKS_LIKE_GA(ga));
+    /* check whether this is an unglobalised closure */
+    unglobalised = isUnglobalised(ga);
     /* Now we have to build something. */
     hasGA = !isConstr(ga);
     /* the new closure will be built here */
     closure = *graphP;
-    
+
     /* fill in the closure from the buffer */
     size = FillInClosure(/*in/out*/bufptrP, /*in*/closure);
+    /* if it is unglobalised, it may not be a thunk!! */
+    ASSERT(!unglobalised || !closure_THUNK(closure));
     
-    /* Add to queue for processing */
+   /* Add to queue for processing */
     QueueClosure(closure);
-    
+
     /* common up with other graph if necessary */
-    closure = SetGAandCommonUp(ga, closure, hasGA);
+    if (!unglobalised)
+      closure = SetGAandCommonUp(ga, closure, hasGA);
 
     /* if we unpacked a THUNK, check that it is large enough to update */
-    ASSERT(!closure_THUNK(closure) || size>=MIN_UPD_SIZE);
+    ASSERT(!closure_THUNK(closure) || size>=_HS+MIN_UPD_SIZE);
     /* graph shall point to next free word in the heap */
     *graphP += size;
-    //graph += (size < MIN_UPD_SIZE) ? MIN_UPD_SIZE : size;
+    //*graphP += (size < _HS+MIN_UPD_SIZE) ? _HS+MIN_UPD_SIZE : size; // see ASSERT
   }
   return closure;
 }
@@ -2238,7 +2533,17 @@ nat *nGAs;
   StgClosure *closure, *graphroot, *graph, *parent;
   nat size, heapsize, bufsize, 
       pptr = 0, pptrs = 0, pvhs = 0;
+  nat unpacked_closures = 0, unpacked_thunks = 0; // stats only
+
+  IF_PAR_DEBUG(resume,
+              graphFingerPrint[0] = '\0');
+
+  ASSERT(_HS==1);  // HWL HACK; compile time constant
 
+#if defined(PAR_TICKY) // HWL HAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACK
+  PAR_TICKY_UNPACK_GRAPH_START();
+#endif
+  
   /* Initialisation */
   InitPacking(rtsTrue);      // same as in PackNearbyGraph
   globalUnpackBuffer = packBuffer;
@@ -2258,11 +2563,19 @@ nat *nGAs;
   if (heapsize > 0) {
     graph = (StgClosure *)allocate(heapsize);
     ASSERT(graph != NULL);
+    // parallel global statistics: increase amount of global data
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+      globalParStats.tot_global += heapsize;
+    }
   }
 
   /* iterate over the buffer contents and unpack all closures */
   parent = (StgClosure *)NULL;
   do {
+    /* check that we aren't at the end of the buffer, yet */
+    IF_DEBUG(sanity, ASSERT(*bufptr != END_OF_BUFFER_MARKER));
+
     /* This is where we will ultimately save the closure's address */
     slotptr = bufptr;
 
@@ -2271,6 +2584,8 @@ nat *nGAs;
 
     /* this allocates heap space, updates LAGA tables etc */
     closure = UnpackClosure (/*in/out*/&bufptr, /*in/out*/&graph, /*in*/&gaS);
+    unpacked_closures++; // stats only; doesn't count FMs in PAP!!!
+    unpacked_thunks += (closure_THUNK(closure)) ? 1 : 0; // stats only
 
     /*
      * Set parent pointer to point to chosen closure.  If we're at the top of
@@ -2284,7 +2599,7 @@ nat *nGAs;
       ((StgPtr)parent)[_HS + pvhs + pptr] = (StgWord) closure;
 
     /* Save closure pointer for resolving offsets */
-    *slotptr = (StgWord) closure;
+    *slotptr = (StgWord*) closure;
 
     /* Locate next parent pointer */
     LocateNextParent(&parent, &pptr, &pptrs, &size);
@@ -2295,9 +2610,31 @@ nat *nGAs;
             gaS.payload.gc.slot = 0xdeadbeef;);
   } while (parent != NULL);
 
+  IF_PAR_DEBUG(resume,
+              GraphFingerPrint(graphroot, graphFingerPrint);
+              ASSERT(strlen(graphFingerPrint)<=MAX_FINGER_PRINT_LEN);
+              belch(">>> Fingerprint of graph rooted at %p (after unpacking <<%d>>:\n    {%s}",
+                    graphroot, packBuffer->id, graphFingerPrint));
+
   /* we unpacked exactly as many words as there are in the buffer */
-  ASSERT(bufsize == bufptr-(packBuffer->buffer) &&
-        heapsize >= graph-graphroot); // should be ==
+  ASSERT(bufsize == (nat) (bufptr-(packBuffer->buffer)));
+  /* we filled no more heap closure than we allocated at the beginning; 
+     ideally this should be a ==; 
+     NB: test is only valid if we unpacked anything at all (graphroot might
+         end up to be a PLC!), therfore the strange test for HEAP_ALLOCED 
+  */
+
+  /*
+  {
+   StgInfoTable *info = get_itbl(graphroot);
+   ASSERT(!HEAP_ALLOCED(graphroot) || heapsize >= (nat) (graph-graphroot) ||
+          // ToDo: check whether CAFs are really a special case here!!
+          info->type==CAF_BLACKHOLE || info->type==FETCH_ME || info->type==FETCH_ME_BQ); 
+  }
+  */
+
+  /* check for magic end-of-buffer word */
+  IF_DEBUG(sanity, ASSERT(*bufptr == END_OF_BUFFER_MARKER));
 
   *gamap = PendingGABuffer;
   *nGAs = (gaga - PendingGABuffer) / 2;
@@ -2315,15 +2652,20 @@ nat *nGAs;
             StgPtr p;
 
             /* check the unpacked graph */
-            checkHeapChunk(graphroot,graph-sizeof(StgWord));
+            //checkHeapChunk(graphroot,graph-sizeof(StgWord));
 
             // if we do sanity checks, then wipe the pack buffer after unpacking
-            for (p=packBuffer->buffer; p<(packBuffer->buffer)+(packBuffer->size); )
+            for (p=(StgPtr)packBuffer->buffer; p<(StgPtr)(packBuffer->buffer)+(packBuffer->size); )
               *p++ = 0xdeadbeef;
             });
 
   /* reset the global variable */
   globalUnpackBuffer = (rtsPackBuffer*)NULL;
+
+#if defined(PAR_TICKY) // HWL HAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACK
+  PAR_TICKY_UNPACK_GRAPH_END(unpacked_closures, unpacked_thunks);
+#endif
+
   return (graphroot);
 }
 
@@ -2334,7 +2676,9 @@ UnpackGA(StgWord **bufptr, globalAddr *ga)
   /* First, unpack the next GA or PLC */
   ga->weight = (rtsWeight) *bufptr++;
 
-  if (ga->weight > 0) {
+  if (ga->weight == 2) {  // unglobalised closure to follow
+    // nothing to do; closure starts at *bufptr
+  } else if (ga->weight > 0) { // fill in GA
     ga->payload.gc.gtid = (GlobalTaskId) *bufptr++;
     ga->payload.gc.slot = (int) *bufptr++;
   } else {
@@ -2350,7 +2694,7 @@ UnpackPLC(globalAddr *ga)
   /* No more to unpack; just set closure to local address */
   IF_PAR_DEBUG(pack,
               belch("*<^^ Unpacked PLC at %x", ga->payload.plc)); 
-  return ga->payload.plc;
+  return (StgClosure*)ga->payload.plc;
 }
 
 //@cindex UnpackOffset
@@ -2361,10 +2705,10 @@ UnpackOffset(globalAddr *ga)
   ASSERT(globalUnpackBuffer!=(rtsPackBuffer*)NULL);
   /* No more to unpack; just set closure to cached address */
   IF_PAR_DEBUG(pack,
-              belch("*<__ Unpacked indirection to %p (was offset %d)", 
+              belch("*<__ Unpacked indirection to %p (was OFFSET %d)", 
                     (StgClosure *)((globalUnpackBuffer->buffer)[ga->payload.gc.slot]),
                     ga->payload.gc.slot)); 
-return (StgClosure *)(globalUnpackBuffer->buffer)[ga->payload.gc.slot];
+  return (StgClosure *)(globalUnpackBuffer->buffer)[ga->payload.gc.slot];
 }
 
 /*
@@ -2418,12 +2762,13 @@ UnpackFetchMe (StgWord ***bufptrP, StgClosure **graphP) {
   IF_DEBUG(sanity,
           if (isFixed(&gaS)) 
           barf("*<   UnpackFetchMe: found PLC where FM was expected %p (%s)",
-               *bufptrP, info_type(*bufptrP)));
+               *bufptrP, info_type((StgClosure*)*bufptrP)));
 
   IF_PAR_DEBUG(pack,
               belch("*<_- Unpacked @ %p a FETCH_ME to GA ", 
                     *graphP);
-              printGA(&gaS));
+              printGA(&gaS);
+              fputc('\n', stderr));
 
   /* the next thing must be the IP to a FETCH_ME closure */
   ASSERT(get_itbl((StgClosure *)*bufptrP)->type == FETCH_ME);
@@ -2442,9 +2787,11 @@ UnpackFetchMe (StgWord ***bufptrP, StgClosure **graphP) {
   ASSERT(foo!=closure || LOOKS_LIKE_GA(((StgFetchMe*)closure)->ga));
   
   IF_PAR_DEBUG(pack,
-              belch("*<_- current FM @ %p next FM @ %p; unpacked FM @ %p is ", 
-                    *graphP, *graphP+sizeofW(StgFetchMe), closure);
-              printClosure(closure));
+              if (foo==closure) {  // only if not commoned up 
+                belch("*<_- current FM @ %p next FM @ %p; unpacked FM @ %p is ", 
+                      *graphP, *graphP+sizeofW(StgFetchMe), closure);
+                printClosure(closure);
+               });
   *graphP += sizeofW(StgFetchMe);
   return foo;
 }
@@ -2475,17 +2822,24 @@ UnpackArray(StgWord ***bufptrP, StgClosure *graph)
   // this includes the header!: arr_words_sizeW(stgCast(StgArrWords*,q)); 
 
   IF_PAR_DEBUG(pack,
-              belch("*<== unpacking an array of %d words %p (%s) (size=%d)\n",
+               if (n<100) 
+                belch("*<== unpacking an array of %d words %p (%s) (size=%d) |%s|\n",
+                    n, (StgClosure*)bufptr, info_type((StgClosure*)bufptr), 
+                    arr_words_sizeW((StgArrWords *)bufptr), 
+                      /* print array (string?) */
+                     ((StgArrWords *)graph)->payload);
+               else
+                belch("*<== unpacking an array of %d words %p (%s) (size=%d)\n",
                     n, (StgClosure*)bufptr, info_type((StgClosure*)bufptr), 
                     arr_words_sizeW((StgArrWords *)bufptr)));
 
   /* Unpack the header (2 words: info ptr and the number of words to follow) */
-  ((StgArrWords *)graph)->header.info = *bufptr++;  // assumes _HS==1; yuck!
-  ((StgArrWords *)graph)->words = *bufptr++;
+  ((StgArrWords *)graph)->header.info = (StgInfoTable*)*bufptr++;  // assumes _HS==1; yuck!
+  ((StgArrWords *)graph)->words = (StgWord)*bufptr++;
 
   /* unpack the payload of the closure (all non-ptrs) */
   for (i=0; i<n; i++)
-    ((StgArrWords *)graph)->payload[i] = *bufptr++;
+    ((StgArrWords *)graph)->payload[i] = (StgWord)*bufptr++;
 
   ASSERT(bufptr==*bufptrP+arr_words_sizeW((StgArrWords *)*bufptrP));
   *bufptrP = bufptr;
@@ -2519,18 +2873,22 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
   nat n, i, j, packed_size = 0;
   StgPtr p, q, end, payload_start, p_FMs;
   const StgInfoTable* info;
-  StgWord32 bitmap;
+  StgWord bitmap;
   StgWord **bufptr = *bufptrP;
+#if defined(DEBUG)
+  nat FMs_in_PAP=0;
+  void checkPAPSanity(StgPAP *graph, StgPtr p_FM_begin, StgPtr p_FM_end);
+#endif
 
   IF_PAR_DEBUG(pack,
               belch("*<** UnpackPAP: unpacking PAP @ %p with %d words to closure %p", 
                         *bufptr, *(bufptr+1), graph));
 
   /* Unpack the PAP header (both fixed and variable) */
-  ((StgPAP *)graph)->header.info = *bufptr++;
-  n = ((StgPAP *)graph)->n_args = *bufptr++;
-  ((StgPAP *)graph)->fun = *bufptr++;
-  packed_size = *bufptr++;
+  ((StgPAP *)graph)->header.info = (StgInfoTable*)*bufptr++;
+  n = ((StgPAP *)graph)->n_args = (StgWord)*bufptr++;
+  ((StgPAP *)graph)->fun = (StgClosure*)*bufptr++;
+  packed_size = (nat)*bufptr++;
 
   IF_PAR_DEBUG(pack,
               belch("*<** UnpackPAP: PAP header is [%p, %d, %p] %d",
@@ -2539,20 +2897,20 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                     ((StgPAP *)graph)->fun,
                     packed_size));
 
-  payload_start = bufptr;
+  payload_start = (StgPtr)bufptr;
   /* p points to the current word in the heap */
-  p = ((StgPAP *)graph)->payload;      // payload of PAP will be unpacked here
-  p_FMs = graph+pap_sizeW((StgPAP*)graph);  // FMs will be unpacked here
+  p = (StgPtr)((StgPAP *)graph)->payload;      // payload of PAP will be unpacked here
+  p_FMs = (StgPtr)graph+pap_sizeW((StgPAP*)graph);  // FMs will be unpacked here
   end = (StgPtr) payload_start+packed_size;
   /*
     The main loop unpacks the PAP in *bufptr into *p, with *p_FMS as the
     FM area for unpacking all FETCHMEs encountered during unpacking.
   */
-  while (bufptr<end) {
+  while ((StgPtr)bufptr<end) {
     /* be sure that we don't write more than we allocated for this closure */
-    ASSERT(p_FMs <= graph+_HS+2+packed_size);
+    ASSERT(p_FMs <= (StgPtr)(graph+_HS+2+packed_size));
     /* be sure that the unpacked PAP doesn't run into the FM area */
-    ASSERT(p < graph+pap_sizeW((StgPAP*)graph));
+    ASSERT(p < (StgPtr)(graph+pap_sizeW((StgPAP*)graph)));
     /* the loop body has been borrowed from scavenge_stack */
     q = *bufptr; // let q be the contents of the current pointer into the buffer
 
@@ -2561,11 +2919,12 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
     */
     if (q==(StgPtr)(ARGTAG_MAX+1)) {
       IF_PAR_DEBUG(pack,
-                  belch("*<** UnpackPAP @ %p: unpacking FM to %p", 
-                        p, q));
+                  belch("*<** UnpackPAP @ %p: unpacking FM; filling in ptr to FM area: %p", 
+                        p, p_FMs));
       bufptr++;         // skip ARGTAG_MAX+1 marker
       // Unpack a FM into the FM area after the PAP proper and insert pointer
-      *p++ = UnpackFetchMe(&bufptr, &p_FMs); 
+      *p++ = (StgWord)UnpackFetchMe(&bufptr, (StgClosure**)&p_FMs); 
+      IF_DEBUG(sanity, FMs_in_PAP++);
       continue;
     }
 
@@ -2575,18 +2934,18 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                   belch("*<** UnpackPAP @ %p: unpacking PLC to %p", 
                         p, *(bufptr+1)));
       bufptr++;          // skip 0 marker
-      *p++ = *bufptr++;
+      *p++ = (StgWord)*bufptr++;
       continue;
     }
 
     /* If we've got a tag, pack all words in that block */
     if (IS_ARG_TAG((W_)q)) {   // q stands for the no. of non-ptrs to follow
-      nat m = i+ARG_SIZE(q);   // first word after this block
+      nat m = ARG_SIZE(q);     // first word after this block
       IF_PAR_DEBUG(pack,
                   belch("*<** UnpackPAP @ %p: unpacking %d words (tagged), starting @ %p", 
                         p, m, p));
       for (i=0; i<m+1; i++)
-       *p++ = *bufptr++;
+       *p++ = (StgWord)*bufptr++;
       continue;
     }
 
@@ -2605,9 +2964,10 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                         p));
 
       /* Pack the header as is */
-      ((StgRetDyn *)p)->info = *bufptr++;
-      ((StgRetDyn *)p)->liveness = *bufptr;
-      ((StgRetDyn *)p)->ret_addr= *bufptr;
+      ((StgRetDyn *)p)->info     = (StgWord)*bufptr++;
+      ((StgRetDyn *)p)->liveness = (StgWord)*bufptr++;
+      ((StgRetDyn *)p)->ret_addr = (StgWord)*bufptr++;
+      p += 3;
 
       //bitmap = ((StgRetDyn *)p)->liveness;
       //p      = (P_)&((StgRetDyn *)p)->payload[0];
@@ -2621,7 +2981,7 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                   belch("*<** UnpackPAP @ %p: FUN or FUN_STATIC", 
                         p));
 
-      ((StgClosure *)p)->header.info = *bufptr;
+      ((StgClosure *)p)->header.info = (StgInfoTable*)*bufptr;
       p++;
 
       goto follow_srt; //??
@@ -2639,9 +2999,9 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                     belch("*<** UnackPAP @ %p: UPDATE_FRAME", 
                           p));
 
-       ((StgUpdateFrame *)p)->header.info = *bufptr;
-       ((StgUpdateFrame *)p)->link= *bufptr++;     // ToDo: fix intra-stack pointer
-       ((StgUpdateFrame *)p)->updatee = *bufptr;   // ToDo: follow link 
+       ((StgUpdateFrame *)p)->header.info = (StgInfoTable*)*bufptr++;
+       ((StgUpdateFrame *)p)->link        = (StgUpdateFrame*)*bufptr++;     // ToDo: fix intra-stack pointer
+       ((StgUpdateFrame *)p)->updatee     = (StgClosure*)*bufptr++;   // ToDo: follow link 
 
        p += 3;
       }
@@ -2652,7 +3012,7 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
        IF_PAR_DEBUG(pack,
                     belch("*<** UnpackPAP @ %p: STOP_FRAME", 
                           p));
-       ((StgStopFrame *)p)->header.info = *bufptr;
+       ((StgStopFrame *)p)->header.info = (StgInfoTable*)*bufptr;
        p++;
       }
 
@@ -2662,10 +3022,10 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                     belch("*<** UnpackPAP @ %p: CATCH_FRAME",
                           p));
 
-       ((StgCatchFrame *)p)->header.info = *bufptr++;
-       ((StgCatchFrame *)p)->link = *bufptr++;
-       ((StgCatchFrame *)p)->exceptions_blocked = *bufptr++;
-       ((StgCatchFrame *)p)->handler = *bufptr++;
+       ((StgCatchFrame *)p)->header.info = (StgInfoTable*)*bufptr++;
+       ((StgCatchFrame *)p)->link        = (StgUpdateFrame*)*bufptr++;
+       ((StgCatchFrame *)p)->exceptions_blocked = (StgInt)*bufptr++;
+       ((StgCatchFrame *)p)->handler     = (StgClosure*)*bufptr++;
        p += 4;
       }
 
@@ -2675,8 +3035,8 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                     belch("*<** UnpackPAP @ %p: UPDATE_FRAME",
                           p));
 
-       ((StgSeqFrame *)p)->header.info = *bufptr++;
-       ((StgSeqFrame *)p)->link = *bufptr++;
+       ((StgSeqFrame *)p)->header.info = (StgInfoTable*)*bufptr++;
+       ((StgSeqFrame *)p)->link        = (StgUpdateFrame*)*bufptr++;
 
         // ToDo: handle bitmap
         bitmap = info->layout.bitmap;
@@ -2692,7 +3052,7 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
                         p));
 
 
-      ((StgClosure *)p)->header.info = *bufptr++;
+      ((StgClosure *)p)->header.info = (StgInfoTable*)*bufptr++;
       p++;
       // ToDo: handle bitmap
       bitmap = info->layout.bitmap;
@@ -2701,9 +3061,10 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
     small_bitmap:
       while (bitmap != 0) {
        if ((bitmap & 1) == 0) {
-         *p++ = UnpackFetchMe(&bufptr, &p_FMs);
+         *p++ = (StgWord)UnpackFetchMe(&bufptr, (StgClosure**)&p_FMs);
+         IF_DEBUG(sanity, FMs_in_PAP++);
        } else {
-         *p++ = *bufptr++;
+         *p++ = (StgWord)*bufptr++;
        }
        bitmap = bitmap >> 1;
       }
@@ -2718,32 +3079,33 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
       {
        StgPtr q;
        StgLargeBitmap *large_bitmap;
-       nat i;
 
        IF_PAR_DEBUG(pack,
                     belch("*<** UnpackPAP @ %p: RET_{BIG,VEC_BIG} (large_bitmap=%p)", 
                           p, info->layout.large_bitmap));
 
 
-       ((StgClosure *)p)->header.info = *bufptr++;
+       ((StgClosure *)p)->header.info = (StgInfoTable*)*bufptr++;
        p++;
 
        large_bitmap = info->layout.large_bitmap;
 
        for (j=0; j<large_bitmap->size; j++) {
          bitmap = large_bitmap->bitmap[j];
-         q = p + sizeof(W_) * 8;
+         q = p + BITS_IN(W_);
          while (bitmap != 0) {
            if ((bitmap & 1) == 0) {
-             *p++ = UnpackFetchMe(&bufptr, &p_FMs);
+             *p++ = (StgWord)UnpackFetchMe(&bufptr, (StgClosure**)&p_FMs);
+             IF_DEBUG(sanity, FMs_in_PAP++);
            } else {
-             *p++ = *bufptr;
+             *p++ = (StgWord)*bufptr;
            }
            bitmap = bitmap >> 1;
          }
          if (j+1 < large_bitmap->size) {
            while (p < q) {
-             *p++ = UnpackFetchMe(&bufptr, &p_FMs);
+             *p++ = (StgWord)UnpackFetchMe(&bufptr, (StgClosure**)&p_FMs);
+             IF_DEBUG(sanity, FMs_in_PAP++);
            }
          }
        }
@@ -2766,12 +3128,53 @@ UnpackPAP(StgWord ***bufptrP, StgClosure *graph)
           checkClosure(graph));
 
   *bufptrP = bufptr;
-  return _HS+2+packed_size;
+  /* 
+     Now p points to the first word after the PAP proper and p_FMs points 
+     to the next free word in the heap; everything between p and p_FMs are 
+     FETCHMEs 
+  */
+  IF_DEBUG(sanity,
+          checkPAPSanity(graph, p, p_FMs));
+
+  /* we have to return the size of PAP + FMs as size of the unpacked thing */
+  ASSERT(graph+pap_sizeW((StgPAP*)graph)==p);
+  return (nat)((StgClosure*)p_FMs-graph);
 }
 
+#if defined(DEBUG)
+/* 
+   Check sanity of a PAP after unpacking the PAP.
+   This means that there is slice of heap after the PAP containing FETCHMEs
+*/
+void
+checkPAPSanity(StgPAP *graph, StgPtr p_FM_begin, StgPtr p_FM_end)
+{
+  StgPtr xx;
+
+  /* check that the main unpacked closure is a PAP */
+  ASSERT(graph->header.info = &stg_PAP_info);
+  checkClosure(graph);
+  /* check that all of the closures in the FM-area are FETCHMEs */
+  for (xx=p_FM_begin; xx<p_FM_end; xx += sizeofW(StgFetchMe)) {
+    /* must be a FETCHME closure */
+    ASSERT(((StgClosure*)xx)->header.info == &stg_FETCH_ME_info);
+    /* it might have been commoned up (=> marked as garbage);
+       otherwise it points to a GA */
+    ASSERT((((StgFetchMe*)xx)->ga)==GARBAGE_MARKER ||
+          LOOKS_LIKE_GA(((StgFetchMe*)xx)->ga));
+  }
+  /* traverse the payload of the PAP */
+  for (xx=graph->payload; xx-(StgPtr)(graph->payload)<graph->n_args; xx++) {
+    /* if the current elem is a pointer into the FM area, check that
+       the GA field is ok */
+    ASSERT(!(p_FM_begin<(StgPtr)*xx && (StgPtr)*xx<p_FM_end) ||
+          LOOKS_LIKE_GA(((StgFetchMe*)*xx)->ga));
+  }
+}
+#endif  /* DEBUG */
 #endif  /* PAR */
 
-//@node GranSim Code,  , Local Definitions, Unpacking routines
+//@node GranSim Code,  , GUM code, Unpacking routines
 //@subsubsection GranSim Code
 
 /*
@@ -2973,9 +3376,27 @@ StgClosure *closure;
 }
 # endif
 
-//@node Packet size, Types of Global Addresses, Offset table, Aux fcts for packing
+//@node Packet size, Closure Info, Offset table, Aux fcts for packing
 //@subsubsection Packet size
 
+/* 
+   The size needed if all currently queued closures are packed as FETCH_ME
+   closures. This represents the headroom we must have when packing the
+   buffer in order to maintain all links in the graphs.
+*/
+// ToDo: check and merge cases
+#if defined(PAR)
+static nat
+QueuedClosuresMinSize (nat ptrs) {
+  return ((clq_size - clq_pos) + ptrs) * PACK_FETCHME_SIZE;
+}
+#else /* GRAN */
+static nat
+QueuedClosuresMinSize (nat ptrs) {
+  return ((clq_size - clq_pos) + ptrs) * PACK_FETCHME_SIZE;
+}
+#endif 
+
 /*
   RoomToPack determines whether there's room to pack the closure into
   the pack buffer based on 
@@ -2996,63 +3417,30 @@ nat size, ptrs;
 {
 # if defined(PAR)
   if (roomInBuffer &&
-      (pack_locn + // where we are in the buffer right now
-       size +      // space needed for the current closure
-       ((clq_size - clq_pos) + ptrs) * PACK_FETCHME_SIZE // space for queued closures
+      (pack_locn +                 // where we are in the buffer right now
+       size +                      // space needed for the current closure
+       QueuedClosuresMinSize(ptrs) // space for queued closures as FETCH_MEs
+       + 1                         // headroom (DEBUGGING only)
        >= 
        RTS_PACK_BUFFER_SIZE))
     {
-      IF_PAR_DEBUG(pack,
-                  belch("*>** pack buffer full"));
       roomInBuffer = rtsFalse;
     }
 # else   /* GRAN */
   if (roomInBuffer &&
-      (unpacked_size + size +
-       ((clq_size - clq_pos) + ptrs) * PACK_FETCHME_SIZE >= RTS_PACK_BUFFER_SIZE))
+      (unpacked_size + 
+       size +
+       QueuedClosuresMinSize(ptrs)
+       >= 
+       RTS_PACK_BUFFER_SIZE))
     {
-      IF_GRAN_DEBUG(pack,
-                  belch("*>** pack buffer full"));
       roomInBuffer = rtsFalse;
     }
 # endif
   return (roomInBuffer);
 }
 
-//@node Types of Global Addresses, Closure Info, Packet size, Aux fcts for packing
-//@subsubsection Types of Global Addresses
-
-/*
-  Types of Global Addresses
-
-  These routines determine whether a GA is one of a number of special types
-  of GA.
-*/
-
-# if defined(PAR)
-//@cindex isOffset
-rtsBool inline
-isOffset(globalAddr *ga)
-{
-    return (ga->weight == 1 && ga->payload.gc.gtid == 0);
-}
-
-//@cindex isFixed
-rtsBool inline
-isFixed(globalAddr *ga)
-{
-    return (ga->weight == 0);
-}
-
-//@cindex isConstr
-rtsBool inline
-isConstr(globalAddr *ga)
-{
-    return (ga->weight == 2);
-}
-# endif
-
-//@node Closure Info,  , Types of Global Addresses, Aux fcts for packing
+//@node Closure Info,  , Packet size, Aux fcts for packing
 //@subsubsection Closure Info
 
 /*
@@ -3463,7 +3851,7 @@ rtsPackBuffer *packBuffer;
   StgClosure *parent, *graphroot, *closure_start;
   const StgInfoTable *ip;
   globalAddr ga;
-  StgWord **buffer, **bufptr, **slotptr;
+  StgWord **bufptr, **slotptr;
 
   nat bufsize;
   nat pptr = 0, pptrs = 0, pvhs;
@@ -3472,6 +3860,10 @@ rtsPackBuffer *packBuffer;
   nat size, ptrs, nonptrs, vhs;
   char str[80];
 
+  /* disable printing if a non-std globalisation scheme is used; ToDo: FIX */
+  if (RtsFlags.ParFlags.globalising != 0)
+    return;
+
   /* NB: this whole routine is more or less a copy of UnpackGraph with all
      unpacking components replaced by printing fcts
      Long live higher-order fcts!
@@ -3502,7 +3894,9 @@ rtsPackBuffer *packBuffer;
     /* First, unpack the next GA or PLC */
     ga.weight = (rtsWeight) *bufptr++;
 
-    if (ga.weight > 0) {
+    if (ga.weight == 2) {  // unglobalised closure to follow
+      // nothing to do; closure starts at *bufptr
+    } else if (ga.weight > 0) { // fill in GA
       ga.payload.gc.gtid = (GlobalTaskId) *bufptr++;
       ga.payload.gc.slot = (int) *bufptr++;
     } else
@@ -3523,12 +3917,12 @@ rtsPackBuffer *packBuffer;
       fprintf(stderr, "*. %u: ((%x, %d, %x)) ", locn,
               ga.payload.gc.gtid, ga.payload.gc.slot, ga.weight);
 
-      closure_start = bufptr;
+      closure_start = (StgClosure*)bufptr;
       ip = get_closure_info((StgClosure *)bufptr, 
                            &size, &ptrs, &nonptrs, &vhs, str);
          
       /* ToDo: check whether this is really needed */
-      if (ip->type == FETCH_ME) {
+      if (ip->type == FETCH_ME || ip->type == REMOTE_REF) {
        size = _HS;
        ptrs = nonptrs = vhs = 0;
       }
@@ -3543,7 +3937,7 @@ rtsPackBuffer *packBuffer;
       if (ip->type == PAP || ip->type == AP_UPD) {
         vhs = 3; 
        ptrs = 0;
-        nonptrs = ((StgPAP *)bufptr)->payload[0];
+        nonptrs = (nat)((StgPAP *)bufptr)->payload[0];
        size = _HS+vhs+ptrs+nonptrs;
       }
 
@@ -3558,7 +3952,7 @@ rtsPackBuffer *packBuffer;
       for (i = 0; i < _HS; i++)
        fprintf(stderr, " %p", *bufptr++);
 
-      if (ip->type == FETCH_ME)
+      if (ip->type == FETCH_ME || ip->type == REMOTE_REF)
        size = ptrs = nonptrs = vhs = 0;
 
       // VH is always empty in the new RTS
@@ -3628,7 +4022,7 @@ rtsPackBuffer *packBuffer;
   StgClosure *parent, *graphroot, *closure_start;
   const StgInfoTable *ip;
   globalAddr ga;
-  StgWord **buffer, **bufptr, **slotptr;
+  StgWord **bufptr, **slotptr;
 
   nat bufsize;
   nat pptr = 0, pptrs = 0, pvhs;
@@ -3652,6 +4046,9 @@ rtsPackBuffer *packBuffer;
   parent = (StgClosure *)NULL;
   ASSERT(bufsize > 0);
   do {
+    /* check that we are not at the end of the buffer, yet */
+    IF_DEBUG(sanity, ASSERT(*bufptr != END_OF_BUFFER_MARKER));
+
     /* This is where we will ultimately save the closure's address */
     slotptr = bufptr;
     locn = slotptr-(packBuffer->buffer); // index of closure in buffer
@@ -3659,7 +4056,10 @@ rtsPackBuffer *packBuffer;
   
     /* First, check whether we have a GA, a PLC, or an OFFSET at hand */
     ga.weight = (rtsWeight) *bufptr++;
-    if (ga.weight > 0) {
+
+    if (ga.weight == 2) {  // unglobalised closure to follow
+      // nothing to do; closure starts at *bufptr
+    } else if (ga.weight > 0) { // fill in GA
       ga.payload.gc.gtid = (GlobalTaskId) *bufptr++;
       ga.payload.gc.slot = (int) *bufptr++;
     } else
@@ -3670,18 +4070,18 @@ rtsPackBuffer *packBuffer;
       /* It's a PLC */
       ASSERT(LOOKS_LIKE_STATIC(ga.payload.plc));
     } else if (isOffset(&ga)) {
-      ASSERT(ga.payload.gc.slot<=bufsize);
+      ASSERT(ga.payload.gc.slot<=(int)bufsize);
     } else {
       /* normal closure */
-      ASSERT(LOOKS_LIKE_GA(&ga));
+      ASSERT(!RtsFlags.ParFlags.globalising==0 || LOOKS_LIKE_GA(&ga));
 
-      closure_start = bufptr;
+      closure_start = (StgClosure*)bufptr;
       ASSERT(LOOKS_LIKE_GHC_INFO((StgPtr)*bufptr));
       ip = get_closure_info((StgClosure *)bufptr, 
                            &size, &ptrs, &nonptrs, &vhs, str);
 
       /* ToDo: check whether this is really needed */
-      if (ip->type == FETCH_ME) {
+      if (ip->type == FETCH_ME || ip->type == REMOTE_REF) {
        size = _HS;
        ptrs = nonptrs = vhs = 0;
       }
@@ -3696,7 +4096,7 @@ rtsPackBuffer *packBuffer;
       if (ip->type == PAP || ip->type == AP_UPD) {
         vhs = 3; 
        ptrs = 0;
-        nonptrs = ((StgPAP *)bufptr)->payload[0];
+        nonptrs = (nat)((StgPAP *)bufptr)->payload[0];
        size = _HS+vhs+ptrs+nonptrs;
       }
 
@@ -3730,6 +4130,8 @@ rtsPackBuffer *packBuffer;
   } while (parent != NULL);
   /* we unpacked exactly as many words as there are in the buffer */
   ASSERT(packBuffer->size == bufptr-(packBuffer->buffer));
+  /* check for magic end-of-buffer word */  
+  IF_DEBUG(sanity, ASSERT(*bufptr == END_OF_BUFFER_MARKER));
 }
 #else  /* GRAN */
 void