[project @ 2001-03-21 15:33:47 by simonmar]
[ghc-hetmet.git] / ghc / rts / parallel / HLComms.c
1 /* ----------------------------------------------------------------------------
2  * Time-stamp: <Wed Mar 29 2000 19:35:36 Stardate: [-30]4578.87 hwloidl>
3  * $Id: HLComms.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
4  *
5  * High Level Communications Routines (HLComms.lc)
6  *
7  * Contains the high-level routines (i.e. communication
8  * subsystem independent) used by GUM
9  * 
10  * Phil Trinder, Glasgow University, 12 December 1994
11  * Adapted for new RTS
12  * Phil Trinder, Simon Marlow July 1998
13  * H-W. Loidl, Heriot-Watt University, November 1999
14  * 
15  * ------------------------------------------------------------------------- */
16
17 #ifdef PAR /* whole file */
18
19 //@node High Level Communications Routines, , ,
20 //@section High Level Communications Routines
21
22 //@menu
23 //* Macros etc::                
24 //* Includes::                  
25 //* GUM Message Sending and Unpacking Functions::  
26 //* Message-Processing Functions::  
27 //* GUM Message Processor::     
28 //* Miscellaneous Functions::   
29 //* Index::                     
30 //@end menu
31
32 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
33 //@subsection Macros etc
34
35 # ifndef _AIX
36 # define NON_POSIX_SOURCE /* so says Solaris */
37 # endif
38
39 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
40 //@subsection Includes
41
42 #include "Rts.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"   // for recordMutable
46 #include "HLC.h"
47 #include "Parallel.h"
48 #include "GranSimRts.h"
49 #include "ParallelRts.h"
50 #include "FetchMe.h"     // for BLOCKED_FETCH_info etc
51 #if defined(DEBUG)
52 # include "ParallelDebug.h"
53 #endif
54 #include "StgMacros.h" // inlined IS_... fcts
55
56 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
57 //@subsection GUM Message Sending and Unpacking Functions
58
59 /*
60  * GUM Message Sending and Unpacking Functions
61  */
62
63 /*
64  * Allocate space for message processing
65  */
66
67 //@cindex gumPackBuffer
68 static rtsPackBuffer *gumPackBuffer;
69
70 //@cindex initMoreBuffers
71 rtsBool
72 initMoreBuffers(void)
73 {
74   if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize, 
75                                              "initMoreBuffers")) == NULL)
76     return rtsFalse;
77   return rtsTrue;
78 }
79
80 /*
81  * SendFetch packs the two global addresses and a load into a message +
82  * sends it.  
83
84 //@cindex FETCH
85
86    Structure of a FETCH message:
87
88          |    GA 1     |        GA 2          |
89          +------------------------------------+------+
90          | gtid | slot | weight | gtid | slot | load |
91          +------------------------------------+------+
92  */
93
94 //@cindex sendFetch
95 void
96 sendFetch(globalAddr *rga, globalAddr *lga, int load)
97 {
98   ASSERT(rga->weight > 0 && lga->weight > 0);
99   IF_PAR_DEBUG(fetch,
100                belch("** [%x] Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d", 
101                      mytid,
102                      rga->payload.gc.gtid, rga->payload.gc.slot, 
103                      lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
104                      load));
105
106
107   /* ToDo: Dump event
108   DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid), 
109                    GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
110                    0, spark_queue_len(ADVISORY_POOL));
111   */
112
113   sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
114           (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot, 
115           (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid, 
116           (StgWord) lga->payload.gc.slot, (StgWord) load);
117 }
118
119 /*
120  * unpackFetch unpacks a FETCH message into two Global addresses and a load
121  * figure.  
122 */
123
124 //@cindex unpackFetch
125 static void
126 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
127 {
128   long buf[6];
129
130   GetArgs(buf, 6); 
131
132   IF_PAR_DEBUG(fetch,
133                belch("** [%x] Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d", 
134                      mytid,
135                      (GlobalTaskId) buf[0], (int) buf[1], 
136                      (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
137
138   lga->weight = 1;
139   lga->payload.gc.gtid = (GlobalTaskId) buf[0];
140   lga->payload.gc.slot = (int) buf[1];
141
142   rga->weight = (unsigned) buf[2];
143   rga->payload.gc.gtid = (GlobalTaskId) buf[3];
144   rga->payload.gc.slot = (int) buf[4];
145
146   *load = (int) buf[5];
147
148   ASSERT(rga->weight > 0);
149 }
150
151 /*
152  * SendResume packs the remote blocking queue's GA and data into a message 
153  * and sends it.
154
155 //@cindex RESUME
156
157    Structure of a RESUME message:
158
159       -------------------------------
160       | weight | slot | n | data ...
161       -------------------------------
162
163    data is a packed graph represented as an rtsPackBuffer
164    n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
165  */
166
167 //@cindex sendResume
168 void
169 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
170 {
171   IF_PAR_DEBUG(resume,
172                PrintPacket(data);
173                belch("[] [%x] Sending Resume for ((%x, %d, %x))", 
174                      mytid,
175                      rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight));
176
177   sendOpNV(PP_RESUME, rga->payload.gc.gtid, 
178            nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data, 
179            2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
180 }
181
182 /*
183  * unpackResume unpacks a Resume message into two Global addresses and
184  * a data array.
185  */
186
187 //@cindex unpackResume
188 static void
189 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
190 {
191     long buf[3];
192
193     GetArgs(buf, 3); 
194
195     IF_PAR_DEBUG(resume,
196                  belch("[] [%x] Unpacking Resume for ((%x, %d, %x))", 
197                        mytid, mytid,
198                        (int) buf[1], (unsigned) buf[0]));
199
200     /*
201       RESUME event is written in awaken_blocked_queue
202     DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid), 
203                      GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
204     */
205
206     lga->weight = (unsigned) buf[0];
207     lga->payload.gc.gtid = mytid;
208     lga->payload.gc.slot = (int) buf[1];
209
210     *nelem = (int) buf[2]; // includes PACK_BUFFER_HDR_SIZE;
211     GetArgs(data, *nelem);
212     *nelem -= PACK_BUFFER_HDR_SIZE;
213 }
214
215 /*
216  * SendAck packs the global address being acknowledged, together with
217  * an array of global addresses for any closures shipped and sends them.
218
219 //@cindex ACK
220
221    Structure of an ACK message:
222
223       |        GA 1          |        GA 2          | 
224       +---------------------------------------------+-------
225       | weight | gtid | slot | weight | gtid | slot |  .....  ngas times
226       + --------------------------------------------+------- 
227
228  */
229
230 //@cindex sendAck
231 void
232 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
233 {
234   static long *buffer;
235   long *p;
236   int i;
237
238   buffer = (long *) gumPackBuffer;
239
240   for(i = 0, p = buffer; i < ngas; i++, p += 6) {
241     ASSERT(gagamap[1].weight > 0);
242     p[0] = (long) gagamap->weight;
243     p[1] = (long) gagamap->payload.gc.gtid;
244     p[2] = (long) gagamap->payload.gc.slot;
245     gagamap++;
246     p[3] = (long) gagamap->weight;
247     p[4] = (long) gagamap->payload.gc.gtid;
248     p[5] = (long) gagamap->payload.gc.slot;
249     gagamap++;
250   }
251   IF_PAR_DEBUG(schedule,
252                belch(",, [%x] Sending Ack (%d pairs) to PE %x\n", 
253                      mytid, ngas, task));
254
255   sendOpN(PP_ACK, task, p - buffer, buffer);
256 }
257
258 /*
259  * unpackAck unpacks an Acknowledgement message into a Global address,
260  * a count of the number of global addresses following and a map of 
261  * Global addresses
262  */
263
264 //@cindex unpackAck
265 static void
266 unpackAck(int *ngas, globalAddr *gagamap)
267 {
268   long GAarraysize;
269   long buf[6];
270   
271   GetArgs(&GAarraysize, 1);
272   
273   *ngas = GAarraysize / 6;
274   
275   IF_PAR_DEBUG(schedule,
276                belch(",, [%x] Unpacking Ack (%d pairs) on %x\n", 
277                      mytid, *ngas, mytid));
278
279   while (GAarraysize > 0) {
280     GetArgs(buf, 6);
281     gagamap->weight = (rtsWeight) buf[0];
282     gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
283     gagamap->payload.gc.slot = (int) buf[2];
284     gagamap++;
285     gagamap->weight = (rtsWeight) buf[3];
286     gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
287     gagamap->payload.gc.slot = (int) buf[5];
288     ASSERT(gagamap->weight > 0);
289     gagamap++;
290     GAarraysize -= 6;
291   }
292 }
293
294 /*
295  * SendFish packs the global address being acknowledged, together with
296  * an array of global addresses for any closures shipped and sends them.
297
298 //@cindex FISH
299
300  Structure of a FISH message:
301
302      +----------------------------------+
303      | orig PE | age | history | hunger |
304      +----------------------------------+
305  */
306
307 //@cindex sendFish
308 void
309 sendFish(GlobalTaskId destPE, GlobalTaskId origPE, 
310          int age, int history, int hunger)
311 {
312   IF_PAR_DEBUG(fish,
313                belch("$$ [%x] Sending Fish to %x (%d outstanding fishes)", 
314                      mytid, destPE, outstandingFishes));
315
316   sendOpV(PP_FISH, destPE, 4, 
317           (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
318
319   if (origPE == mytid) {
320     //fishing = rtsTrue;
321     outstandingFishes++;
322   }
323 }
324
325 /*
326  * unpackFish unpacks a FISH message into the global task id of the
327  * originating PE and 3 data fields: the age, history and hunger of the
328  * fish. The history + hunger are not currently used.
329
330  */
331
332 //@cindex unpackFish
333 static void
334 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
335 {
336   long buf[4];
337   
338   GetArgs(buf, 4);
339   
340   IF_PAR_DEBUG(fish,
341                belch("$$ [%x] Unpacking Fish from PE %x (age=%d)", 
342                      mytid, (GlobalTaskId) buf[0], (int) buf[1]));
343
344   *origPE = (GlobalTaskId) buf[0];
345   *age = (int) buf[1];
346   *history = (int) buf[2];
347   *hunger = (int) buf[3];
348 }
349
350 /*
351  * SendFree sends (weight, slot) pairs for GAs that we no longer need
352  * references to.  
353
354 //@cindex FREE
355
356    Structure of a FREE message:
357    
358        +-----------------------------
359        | n | weight_1 | slot_1 | ...
360        +-----------------------------
361  */
362 //@cindex sendFree
363 void
364 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
365 {
366     IF_PAR_DEBUG(free,
367                  belch("!! [%x] Sending Free (%d GAs) to %x", 
368                        mytid, nelem/2, pe));
369
370     sendOpN(PP_FREE, pe, nelem, data);
371 }
372
373 /*
374  * unpackFree unpacks a FREE message into the amount of data shipped and
375  * a data block.
376  */
377 //@cindex unpackFree
378 static void
379 unpackFree(int *nelem, rtsPackBuffer *data)
380 {
381   long buf[1];
382   
383   GetArgs(buf, 1);
384   *nelem = (int) buf[0];
385
386   IF_PAR_DEBUG(free,
387                belch("!! [%x] Unpacking Free (%d GAs)", 
388                      mytid, *nelem/2));
389
390   GetArgs(data, *nelem);
391 }
392
393 /*
394  * SendSchedule sends a closure to be evaluated in response to a Fish
395  * message. The message is directed to the PE that originated the Fish
396  * (origPE), and includes the packed closure (data) along with its size
397  * (nelem).
398
399 //@cindex SCHEDULE
400
401    Structure of a SCHEDULE message:
402
403        +------------------------------------
404        | PE | n | pack buffer of a graph ...
405        +------------------------------------
406  */
407 //@cindex sendSchedule
408 void
409 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data)
410 {
411   IF_PAR_DEBUG(schedule,
412                PrintPacket(data);
413                belch("-- [%x] Sending Schedule (%d elems) to %x\n", 
414                      mytid, nelem, origPE));
415
416   sendOpN(PP_SCHEDULE, origPE, nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data);
417 }
418
419 /*
420  * unpackSchedule unpacks a SCHEDULE message into the Global address of
421  * the closure shipped, the amount of data shipped (nelem) and the data
422  * block (data).
423  */
424
425 //@cindex unpackSchedule
426 static void
427 unpackSchedule(int *nelem, rtsPackBuffer *data)
428 {
429     long buf[1];
430
431     GetArgs(buf, 1);
432     /* no. of elems, not counting the header of the pack buffer */
433     *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE;
434
435     IF_PAR_DEBUG(schedule,
436                  belch("-- [%x] Unpacking Schedule (%d elems) on %x\n", 
437                        mytid, *nelem));
438
439     /* automatic cast of flat pvm-data to rtsPackBuffer */
440     GetArgs(data, *nelem + PACK_BUFFER_HDR_SIZE);
441 }
442
443 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
444 //@subsection Message-Processing Functions
445
446 /*
447  * Message-Processing Functions
448  *
449  * The following routines process incoming GUM messages. Often reissuing
450  * messages in response.
451  *
452  * processFish unpacks a fish message, reissuing it if it's our own,
453  * sending work if we have it or sending it onwards otherwise.
454  */
455
456 /*
457  * processFetches constructs and sends resume messages for every
458  * BlockedFetch which is ready to be awakened.
459  * awaken_blocked_queue (in Schedule.c) is responsible for moving 
460  * BlockedFetches from a blocking queue to the PendingFetches queue.
461  */
462 void GetRoots(void);
463 extern StgBlockedFetch *PendingFetches;
464
465 nat
466 pending_fetches_len(void)
467 {
468   StgBlockedFetch *bf;
469   nat n;
470
471   for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
472     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
473   }
474   return n;
475 }
476
477 //@cindex processFetches
478 void
479 processFetches(void) {
480   StgBlockedFetch *bf, *next;
481   StgClosure *closure;
482   StgInfoTable *ip;
483   globalAddr rga;
484   static rtsPackBuffer *packBuffer;
485     
486   IF_PAR_DEBUG(verbose,
487                belch("____ processFetches: %d pending fetches (root @ %p)",
488                      pending_fetches_len(), PendingFetches));
489   
490   for (bf = PendingFetches; 
491        bf != END_BF_QUEUE;
492        bf=next) {
493     /* the PendingFetches list contains only BLOCKED_FETCH closures */
494     ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
495     /* store link (we might overwrite it via blockFetch later on */
496     next = (StgBlockedFetch *)(bf->link);
497
498     /*
499      * Find the target at the end of the indirection chain, and
500      * process it in much the same fashion as the original target
501      * of the fetch.  Though we hope to find graph here, we could
502      * find a black hole (of any flavor) or even a FetchMe.
503      */
504     closure = bf->node;
505     /*
506       We evacuate BQs and update the node fields where necessary in GC.c
507       So, if we find an EVACUATED closure, something has gone Very Wrong
508       (and therefore we let the RTS crash most ungracefully).
509     */
510     ASSERT(get_itbl(closure)->type != EVACUATED);
511       //  closure = ((StgEvacuated *)closure)->evacuee;
512
513     closure = UNWIND_IND(closure);
514     //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
515
516     ip = get_itbl(closure);
517     if (ip->type == FETCH_ME) {
518       /* Forward the Fetch to someone else */
519       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
520       rga.payload.gc.slot = bf->ga.payload.gc.slot;
521       rga.weight = bf->ga.weight;
522       
523       sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
524
525       IF_PAR_DEBUG(fetch,
526                    belch("__-> processFetches: Forwarding fetch from %lx to %lx",
527                          mytid, rga.payload.gc.gtid));
528
529     } else if (IS_BLACK_HOLE(closure)) {
530       IF_PAR_DEBUG(verbose,
531                    belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
532                          closure, info_type(closure)));
533       bf->node = closure;
534       blockFetch(bf, closure);
535     } else {
536       /* We now have some local graph to send back */
537       nat size;
538
539       packBuffer = gumPackBuffer;
540       IF_PAR_DEBUG(verbose,
541                    belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
542                          closure, info_type(closure)));
543
544       if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
545         // Put current BF back on list
546         bf->link = (StgBlockingQueueElement *)PendingFetches;
547         PendingFetches = (StgBlockedFetch *)bf;
548         // ToDo: check that nothing more has to be done to prepare for GC!
549         barf("processFetches: out of heap while packing graph; ToDo: call GC here");
550         GarbageCollect(GetRoots); 
551         bf = PendingFetches;
552         PendingFetches = (StgBlockedFetch *)(bf->link);
553         closure = bf->node;
554         packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
555         ASSERT(packBuffer != (rtsPackBuffer *)NULL);
556       }
557       rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
558       rga.payload.gc.slot = bf->ga.payload.gc.slot;
559       rga.weight = bf->ga.weight;
560       
561       sendResume(&rga, size, packBuffer);
562     }
563   }
564   PendingFetches = END_BF_QUEUE;
565 }
566
567 #if 0
568 /*
569   Alternatively to sending fetch messages directly from the FETCH_ME_entry
570   code we could just store the data about the remote data in a global
571   variable and send the fetch request from the main scheduling loop (similar
572   to processFetches above). This would save an expensive STGCALL in the entry 
573   code because we have to go back to the scheduler anyway.
574 */
575 //@cindex processFetches
576 void
577 processTheRealFetches(void) {
578   StgBlockedFetch *bf;
579   StgClosure *closure, *next;
580     
581   IF_PAR_DEBUG(verbose,
582                belch("__ processTheRealFetches: ");
583                printGA(&theGlobalFromGA);
584                printGA(&theGlobalToGA));
585
586   ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
587          theGlobalToGA.payload.gc.gtid != 0);
588
589   /* the old version did this in the FETCH_ME entry code */
590   sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
591   
592 }
593 #endif
594
595
596 /*
597  * processFish unpacks a fish message, reissuing it if it's our own,
598  * sending work if we have it or sending it onwards otherwise.
599  */
600 //@cindex processFish
601 static void
602 processFish(void)
603 {
604   GlobalTaskId origPE;
605   int age, history, hunger;
606   rtsSpark spark;
607   static rtsPackBuffer *packBuffer; 
608
609   unpackFish(&origPE, &age, &history, &hunger);
610
611   if (origPE == mytid) {
612     //fishing = rtsFalse;                   // fish has come home
613     outstandingFishes--;
614     last_fish_arrived_at = CURRENT_TIME;  // remember time (see schedule fct)
615     return;                               // that's all
616   }
617
618   ASSERT(origPE != mytid);
619   IF_PAR_DEBUG(fish,
620                belch("$$__ processing fish; %d sparks available",
621                      spark_queue_len(&(MainRegTable.rSparks))));
622   while ((spark = findSpark()) != NULL) {
623     nat size;
624     // StgClosure *graph;
625
626     packBuffer = gumPackBuffer; 
627     ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
628     if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size)) == NULL) {
629       IF_PAR_DEBUG(fish,
630                    belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
631                          (StgClosure *)spark));
632       barf("processFish: out of heap while packing graph; ToDo: call GC here");
633       GarbageCollect(GetRoots);
634       /* Now go back and try again */
635     } else {
636       IF_PAR_DEBUG(fish,
637                    belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
638                          origPE, 
639                          (StgClosure *)spark, info_type((StgClosure *)spark)));
640       sendSchedule(origPE, size, packBuffer);
641       disposeSpark(spark);
642       break;
643     }
644   }
645   if (spark == (rtsSpark)NULL) {
646     IF_PAR_DEBUG(fish,
647                  belch("$$^^ No sparks available for FISH from %x",
648                        origPE));
649     /* We have no sparks to give */
650     if (age < FISH_LIFE_EXPECTANCY)
651       /* and the fish is atill young, send it to another PE to look for work */
652       sendFish(choosePE(), origPE,
653                (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
654
655     /* otherwise, send it home to die */
656     else
657       sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
658     }
659 }  /* processFish */
660
661 /*
662  * processFetch either returns the requested data (if available) 
663  * or blocks the remote blocking queue on a black hole (if not).
664  */
665
666 //@cindex processFetch
667 static void
668 processFetch(void)
669 {
670   globalAddr ga, rga;
671   int load;
672   StgClosure *closure;
673   StgInfoTable *ip;
674
675   unpackFetch(&ga, &rga, &load);
676   IF_PAR_DEBUG(fetch,
677                belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
678                      ga.payload.gc.gtid, ga.payload.gc.slot,
679                      rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
680                      rga.payload.gc.gtid));
681
682   closure = GALAlookup(&ga);
683   ASSERT(closure != (StgClosure *)NULL);
684   ip = get_itbl(closure);
685   if (ip->type == FETCH_ME) {
686     /* Forward the Fetch to someone else */
687     sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
688   } else if (rga.payload.gc.gtid == mytid) {
689     /* Our own FETCH forwarded back around to us */
690     StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
691     
692     IF_PAR_DEBUG(fetch,
693                  belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
694                        closure, info_type(closure), fmbq, info_type(fmbq)));
695     /* We may have already discovered that the fetch target is our own. */
696     if ((StgClosure *)fmbq != closure) 
697       CommonUp((StgClosure *)fmbq, closure);
698     (void) addWeight(&rga);
699   } else if (IS_BLACK_HOLE(closure)) {
700     /* This includes RBH's and FMBQ's */
701     StgBlockedFetch *bf;
702
703     ASSERT(GALAlookup(&rga) == NULL);
704
705     /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
706        closure into the BQ in order to denote that when updating this node
707        the result should be sent to the originator of this fetch message. */
708     bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
709     blockFetch(bf, closure);
710
711     IF_PAR_DEBUG(fetch,
712                  belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
713                        rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, 
714                        closure, info_type(closure)));
715     } else {                    
716       /* The target of the FetchMe is some local graph */
717       nat size;
718       // StgClosure *graph;
719       rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
720
721       if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
722         barf("processFetch: out of heap while packing graph; ToDo: call GC here");
723         GarbageCollect(GetRoots); 
724         closure = GALAlookup(&ga);
725         buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
726         ASSERT(buffer != (rtsPackBuffer *)NULL);
727       }
728       sendResume(&rga, size, buffer);
729     }
730 }
731
732 /*
733  * processFree unpacks a FREE message and adds the weights to our GAs.
734  */
735 //@cindex processFree
736 static void
737 processFree(void)
738 {
739   int nelem;
740   static StgWord *buffer;
741   int i;
742   globalAddr ga;
743
744   buffer = (StgWord *)gumPackBuffer;
745   unpackFree(&nelem, buffer);
746   IF_PAR_DEBUG(free,
747                belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
748
749   ga.payload.gc.gtid = mytid;
750   for (i = 0; i < nelem;) {
751     ga.weight = (rtsWeight) buffer[i++];
752     ga.payload.gc.slot = (int) buffer[i++];
753     IF_PAR_DEBUG(free,
754                  fprintf(stderr, "!!-- Processing free "); 
755                  printGA(&ga);
756                  fputc('\n', stderr);
757                  );
758     (void) addWeight(&ga);
759   }
760 }
761
762 /*
763  * processResume unpacks a RESUME message into the graph, filling in
764  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
765  * FetchMe (now a blocking queue) are awakened, and the blocking queue
766  * is converted into an indirection.  Finally it sends an ACK in response
767  * which contains any newly allocated GAs.
768  */
769
770 //@cindex processResume
771 static void
772 processResume(GlobalTaskId sender)
773 {
774   int nelem;
775   nat nGAs;
776   static rtsPackBuffer *packBuffer;
777   StgClosure *newGraph, *old;
778   globalAddr lga;
779   globalAddr *gagamap;
780   
781   packBuffer = gumPackBuffer;
782   unpackResume(&lga, &nelem, (StgPtr)packBuffer);
783
784   IF_PAR_DEBUG(resume,
785                fprintf(stderr, "[]__ Rcvd Resume for "); 
786                printGA(&lga);
787                fputc('\n', stderr);
788                PrintPacket((rtsPackBuffer *)packBuffer));
789   
790   /* 
791    * We always unpack the incoming graph, even if we've received the
792    * requested node in some other data packet (and already awakened
793    * the blocking queue).
794   if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
795     ReallyPerformThreadGC(packBuffer[0], rtsFalse);
796     SAVE_Hp -= packBuffer[0];
797   }
798    */
799
800   // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
801
802   /* Do this *after* GC; we don't want to release the object early! */
803
804   if (lga.weight > 0)
805     (void) addWeight(&lga);
806
807   old = GALAlookup(&lga);
808
809   if (RtsFlags.ParFlags.ParStats.Full) {
810     // StgTSO *tso = END_TSO_QUEUE;
811     StgBlockingQueueElement *bqe;
812
813     /* Write REPLY events to the log file, indicating that the remote
814        data has arrived */
815     if (get_itbl(old)->type == FETCH_ME_BQ ||
816         get_itbl(old)->type == RBH) 
817       for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue;
818            bqe->link != END_BQ_QUEUE;
819            bqe = bqe->link)
820         if (get_itbl((StgClosure *)bqe)->type == TSO)
821           DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender), 
822                            GR_REPLY, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
823                            0, spark_queue_len(&(MainRegTable.rSparks)));
824   }
825
826   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
827   ASSERT(newGraph != NULL);
828
829   /* 
830    * Sometimes, unpacking will common up the resumee with the
831    * incoming graph, but if it hasn't, we'd better do so now.
832    */
833    
834   if (get_itbl(old)->type == FETCH_ME_BQ)
835     CommonUp(old, newGraph);
836
837   IF_PAR_DEBUG(tables,
838                DebugPrintGAGAMap(gagamap, nGAs));
839   
840   sendAck(sender, nGAs, gagamap);
841 }
842
843 /*
844  * processSchedule unpacks a SCHEDULE message into the graph, filling
845  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
846  * the local spark queue.  Finally it sends an ACK in response
847  * which contains any newly allocated GAs.
848  */
849 //@cindex processSchedule
850 static void
851 processSchedule(GlobalTaskId sender)
852 {
853   nat nelem, space_required, nGAs;
854   rtsBool success;
855   static rtsPackBuffer *packBuffer;
856   StgClosure *newGraph;
857   globalAddr *gagamap;
858   
859   packBuffer = gumPackBuffer;           /* HWL */
860   unpackSchedule(&nelem, packBuffer);
861
862   IF_PAR_DEBUG(packet,
863                belch("--__ Rcvd Schedule (%d elems)", nelem);
864                PrintPacket(packBuffer));
865
866   /*
867    * For now, the graph is a closure to be sparked as an advisory
868    * spark, but in future it may be a complete spark with
869    * required/advisory status, priority etc.
870    */
871
872   /*
873   space_required = packBuffer[0];
874   if (SAVE_Hp + space_required >= SAVE_HpLim) {
875     ReallyPerformThreadGC(space_required, rtsFalse);
876     SAVE_Hp -= space_required;
877   }
878   */
879   // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
880   newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
881   ASSERT(newGraph != NULL);
882   success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
883
884   IF_PAR_DEBUG(packet,
885                if (success)
886                  belch("--^^ added spark to unpacked graph %p; %d sparks available on [%x]", 
887                      newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
888                else
889                  belch("--^^ received non-sparkable closure %p; nothing added to spark pool; %d sparks available on [%x]", 
890                      newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
891                belch("*<    Unpacked graph with root at %p (%s):", 
892                      newGraph, info_type(newGraph));
893                PrintGraph(newGraph, 0));
894
895   IF_PAR_DEBUG(tables,
896                DebugPrintGAGAMap(gagamap, nGAs));
897
898   if (nGAs > 0)
899     sendAck(sender, nGAs, gagamap);
900
901   //fishing = rtsFalse;
902   ASSERT(outstandingFishes>0);
903   outstandingFishes--;
904 }
905
906 /*
907  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
908  * (which represent shared thunks that have been shipped) into fetch-mes
909  * to remote GAs.
910  */
911 //@cindex processAck
912 static void
913 processAck(void)
914 {
915   nat nGAs;
916   globalAddr *gaga;
917   globalAddr gagamap[256]; // ToDo: elim magic constant!!   MAX_GAS * 2];??
918
919   unpackAck(&nGAs, gagamap);
920
921   IF_PAR_DEBUG(tables,
922                belch(",,,, Rcvd Ack (%d pairs)", nGAs);
923                DebugPrintGAGAMap(gagamap, nGAs));
924
925   IF_DEBUG(sanity,
926            checkGAGAMap(gagamap, nGAs));
927
928   /*
929    * For each (oldGA, newGA) pair, set the GA of the corresponding
930    * thunk to the newGA, convert the thunk to a FetchMe, and return
931    * the weight from the oldGA.
932    */
933   for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
934     StgClosure *old_closure = GALAlookup(gaga);
935     StgClosure *new_closure = GALAlookup(gaga + 1);
936
937     ASSERT(old_closure != NULL);
938     if (new_closure == NULL) {
939       /* We don't have this closure, so we make a fetchme for it */
940       globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
941       
942       /* convertToFetchMe should be done unconditionally here.
943          Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
944          so we have to check whether it is an RBH before converting
945
946          ASSERT(get_itbl(old_closure)==RBH);
947       */
948       if (get_itbl(old_closure)->type==RBH)
949         convertToFetchMe(old_closure, ga);
950     } else {
951       /* 
952        * Oops...we've got this one already; update the RBH to
953        * point to the object we already know about, whatever it
954        * happens to be.
955        */
956       CommonUp(old_closure, new_closure);
957       
958       /* 
959        * Increase the weight of the object by the amount just
960        * received in the second part of the ACK pair.
961        */
962       (void) addWeight(gaga + 1);
963     }
964     (void) addWeight(gaga);
965   }
966
967   /* check the sanity of the LAGA and GALA tables after mincing them */
968   IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
969 }
970
971 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
972 //@subsection GUM Message Processor
973
974 /*
975  * GUM Message Processor
976
977  * processMessages processes any messages that have arrived, calling
978  * appropriate routines depending on the message tag
979  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
980  * present and performs a blocking receive! During profiling it
981  * busy-waits in order to record idle time.
982  */
983
984 //@cindex processMessages
985 void
986 processMessages(void)
987 {
988   rtsPacket packet;
989   OpCode opcode;
990   GlobalTaskId task;
991     
992   do {
993     packet = GetPacket();  /* Get next message; block until one available */
994     getOpcodeAndSender(packet, &opcode, &task);
995
996     switch (opcode) {
997     case PP_FINISH:
998       IF_PAR_DEBUG(verbose,
999                    belch("==== received FINISH [%p]", mytid));
1000       /* setting this global variables eventually terminates the main
1001          scheduling loop for this PE and causes a shut-down, sending 
1002          PP_FINISH to SysMan */
1003       GlobalStopPending = rtsTrue;
1004       break;
1005
1006     case PP_FETCH:
1007       processFetch();
1008       break;
1009
1010     case PP_RESUME:
1011       processResume(task);
1012       break;
1013
1014     case PP_ACK:
1015       processAck();
1016       break;
1017
1018     case PP_FISH:
1019       processFish();
1020       break;
1021
1022     case PP_FREE:
1023       processFree();
1024       break;
1025       
1026     case PP_SCHEDULE:
1027       processSchedule(task);
1028       break;
1029     
1030     default:
1031       /* Anything we're not prepared to deal with. */
1032       barf("Task %x: Unexpected opcode %x from %x",
1033            mytid, opcode, task);
1034     } /* switch */
1035
1036   } while (PacketsWaiting());   /* While there are messages: process them */
1037 }                               /* processMessages */
1038
1039 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1040 //@subsection Miscellaneous Functions
1041
1042 /*
1043  * blockFetch blocks a BlockedFetch node on some kind of black hole.
1044  */
1045 //@cindex blockFetch
1046 void
1047 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1048   bf->node = bh;
1049   switch (get_itbl(bh)->type) {
1050   case BLACKHOLE:
1051     bf->link = END_BQ_QUEUE;
1052     //((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1053     SET_INFO(bh, &BLACKHOLE_BQ_info);  // turn closure into a blocking queue
1054     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1055     
1056     // put bh on the mutables list
1057     recordMutable((StgMutClosure *)bh);
1058     break;
1059     
1060   case BLACKHOLE_BQ:
1061     /* enqueue bf on blocking queue of closure bh */
1062     bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1063     ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1064
1065     // put bh on the mutables list; ToDo: check
1066     recordMutable((StgMutClosure *)bh);
1067     break;
1068
1069   case FETCH_ME_BQ:
1070     /* enqueue bf on blocking queue of closure bh */
1071     bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1072     ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1073
1074     // put bh on the mutables list; ToDo: check
1075     recordMutable((StgMutClosure *)bh);
1076     break;
1077     
1078   case RBH:
1079     /* enqueue bf on blocking queue of closure bh */
1080     bf->link = ((StgRBH *)bh)->blocking_queue;
1081     ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1082
1083     // put bh on the mutables list; ToDo: check
1084     recordMutable((StgMutClosure *)bh);
1085     break;
1086     
1087   default:
1088     barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1089          (StgClosure *)bh, get_itbl((StgClosure *)bh), 
1090          info_type((StgClosure *)bh));
1091   }
1092   IF_PAR_DEBUG(schedule,
1093                belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1094                      bh, info_type(bh));
1095                print_bq(bh));
1096 }
1097
1098
1099 /*
1100   blockThread is called from the main scheduler whenever tso returns with
1101   a ThreadBlocked return code; tso has already been added to a blocking
1102   queue (that's done in the entry code of the closure, because it is a 
1103   cheap operation we have to do in any case); the main purpose of this
1104   routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1105   closure, which is indicated by the tso.why_blocked field;
1106   we also write an entry into the log file if we are generating one
1107
1108   Should update exectime etc in the entry code already; but we don't have
1109   something like ``system time'' in the log file anyway, so this should
1110   even out the inaccuracies.
1111 */
1112
1113 //@cindex blockThread
1114 void
1115 blockThread(StgTSO *tso)
1116 {
1117   globalAddr *remote_ga;
1118   globalAddr *local_ga;
1119   globalAddr fmbq_ga;
1120
1121   // ASSERT(we are on some blocking queue)
1122   ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1123
1124   /*
1125     We have to check why this thread has been blocked.
1126   */
1127   switch (tso->why_blocked) {
1128     case BlockedOnGA:
1129       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1130          FETCH_ME entry code */
1131       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1132
1133       /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1134          end this point; if something (eg. GC) happens inbetween the whole
1135          thing will blow up 
1136          The problem is that the ga field of the FETCH_ME has been overwritten
1137          with the head of the blocking (which is tso). 
1138       */
1139       //ASSERT(looks_like_ga((globalAddr *)tso->link));
1140       ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1141       remote_ga = (globalAddr *)tso->link; // ((StgFetchMe *)tso->block_info.closure)->ga;
1142       tso->link = END_BQ_QUEUE;
1143       /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1144          we have to send a Fetch message here! */
1145       if (RtsFlags.ParFlags.ParStats.Full) {
1146         /* Note that CURRENT_TIME may perform an unsafe call */
1147         //rtsTime now = CURRENT_TIME; /* Now */
1148         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1149         tso->par.fetchcount++;
1150         tso->par.blockedat = CURRENT_TIME;
1151         /* we are about to send off a FETCH message, so dump a FETCH event */
1152         DumpRawGranEvent(CURRENT_PROC, 
1153                          taskIDtoPE(remote_ga->payload.gc.gtid),
1154                          GR_FETCH, tso, tso->block_info.closure, 0);
1155       }
1156       /* Phil T. claims that this was a workaround for a hard-to-find
1157        * bug, hence I'm leaving it out for now --SDM 
1158        */
1159       /* Assign a brand-new global address to the newly created FMBQ  */
1160       local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1161       splitWeight(&fmbq_ga, local_ga);
1162       ASSERT(fmbq_ga.weight == 1L << (BITS_IN(unsigned) - 1));
1163       
1164       sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1165
1166       break;
1167
1168     case BlockedOnGA_NoSend:
1169       /* the closure must be a FETCH_ME_BQ; tso came in here via 
1170          FETCH_ME_BQ entry code */
1171       ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1172
1173       /* Fetch message has been sent already */
1174       if (RtsFlags.ParFlags.ParStats.Full) {
1175         /* Note that CURRENT_TIME may perform an unsafe call */
1176         //rtsTime now = CURRENT_TIME; /* Now */
1177         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1178         tso->par.blockcount++;
1179         tso->par.blockedat = CURRENT_TIME;
1180         /* dump a block event, because fetch has been sent already */
1181         DumpRawGranEvent(CURRENT_PROC, thisPE,
1182                          GR_BLOCK, tso, tso->block_info.closure, 0);
1183       }
1184       break;
1185
1186     case BlockedOnBlackHole:
1187       /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via 
1188          BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1189       ASSERT(get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1190              get_itbl(tso->block_info.closure)->type==RBH);
1191
1192       /* if collecting stats update the execution time etc */
1193       if (RtsFlags.ParFlags.ParStats.Full) {
1194         /* Note that CURRENT_TIME may perform an unsafe call */
1195         //rtsTime now = CURRENT_TIME; /* Now */
1196         tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1197         tso->par.blockcount++;
1198         tso->par.blockedat = CURRENT_TIME;
1199         DumpRawGranEvent(CURRENT_PROC, thisPE,
1200                          GR_BLOCK, tso, tso->block_info.closure, 0);
1201       }
1202       break;
1203       
1204     default:
1205       barf("blockThread: impossible why_blocked code %d for TSO %d",
1206            tso->why_blocked, tso->id);
1207   }
1208
1209   IF_PAR_DEBUG(schedule,
1210                belch("##++ blockThread: TSO %d blocked on closure %p (%s)",
1211                      tso->id, tso->block_info.closure, info_type(tso->block_info.closure)));
1212 }
1213
1214 /*
1215  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1216  * Important properties:
1217  *   - it varies during execution, even if the PE is idle
1218  *   - it's different for each PE
1219  *   - we never send a fish to ourselves
1220  */
1221 extern long lrand48 (void);
1222
1223 //@cindex choosePE
1224 GlobalTaskId
1225 choosePE(void)
1226 {
1227   long temp;
1228
1229   temp = lrand48() % nPEs;
1230   if (allPEs[temp] == mytid) {  /* Never send a FISH to yourself */
1231     temp = (temp + 1) % nPEs;
1232   }
1233   return allPEs[temp];
1234 }
1235
1236 /* 
1237  * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1238  * of the ga argument; called from processFetch when the local closure is
1239  * under evaluation
1240  */
1241 //@cindex createBlockedFetch
1242 StgClosure *
1243 createBlockedFetch (globalAddr ga, globalAddr rga)
1244 {
1245   StgBlockedFetch *bf;
1246   StgClosure *closure;
1247
1248   closure = GALAlookup(&ga);
1249   if ((bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch))) == NULL) {
1250     barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1251     GarbageCollect(GetRoots); 
1252     closure = GALAlookup(&ga);
1253     bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch));
1254     // ToDo: check whether really guaranteed to succeed 2nd time around
1255   }
1256
1257   ASSERT(bf != (StgClosure *)NULL);
1258   SET_INFO((StgClosure *)bf, &BLOCKED_FETCH_info);
1259   // ToDo: check whether other header info is needed
1260   bf->node = closure;
1261   bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1262   bf->ga.payload.gc.slot = rga.payload.gc.slot;
1263   bf->ga.weight = rga.weight;
1264   // bf->link = NULL;  debugging
1265
1266   IF_PAR_DEBUG(schedule,
1267                fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1268                        bf, info_type(bf), closure);
1269                printGA(&(bf->ga));
1270                fputc('\n',stderr));
1271   return bf;
1272 }
1273
1274 /*
1275  * waitForTermination enters a loop ignoring spurious messages while
1276  * waiting for the termination sequence to be completed.  
1277  */
1278 //@cindex waitForTermination
1279 void
1280 waitForTermination(void)
1281 {
1282   do {
1283     rtsPacket p = GetPacket();
1284     processUnexpected(p);
1285   } while (rtsTrue);
1286 }
1287
1288 #ifdef DEBUG
1289 //@cindex DebugPrintGAGAMap
1290 void
1291 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1292 {
1293   nat i;
1294   
1295   for (i = 0; i < nGAs; ++i, gagamap += 2)
1296     fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1297             gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1298             gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1299 }
1300
1301 //@cindex checkGAGAMap
1302 void
1303 checkGAGAMap(globalAddr *gagamap, int nGAs)
1304 {
1305   nat i;
1306   
1307   for (i = 0; i < nGAs; ++i, gagamap += 2) {
1308     ASSERT(looks_like_ga(gagamap));
1309     ASSERT(looks_like_ga(gagamap+1));
1310   }
1311 }
1312 #endif
1313
1314 //@cindex freeMsgBuffer
1315 static StgWord **freeMsgBuffer = NULL;
1316 //@cindex freeMsgIndex
1317 static nat      *freeMsgIndex  = NULL;
1318
1319 //@cindex prepareFreeMsgBuffers
1320 void
1321 prepareFreeMsgBuffers(void)
1322 {
1323   nat i;
1324   
1325   /* Allocate the freeMsg buffers just once and then hang onto them. */
1326   if (freeMsgIndex == NULL) {
1327     freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat), 
1328                                           "prepareFreeMsgBuffers (Index)");
1329     freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), 
1330                                           "prepareFreeMsgBuffers (Buffer)");
1331     
1332     for(i = 0; i < nPEs; i++) 
1333       if (i != thisPE) 
1334         freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1335                                                "prepareFreeMsgBuffers (Buffer #i)");
1336   }
1337   
1338   /* Initialize the freeMsg buffer pointers to point to the start of their
1339      buffers */
1340   for (i = 0; i < nPEs; i++)
1341     freeMsgIndex[i] = 0;
1342 }
1343
1344 //@cindex freeRemoteGA
1345 void
1346 freeRemoteGA(int pe, globalAddr *ga)
1347 {
1348   nat i;
1349   
1350   ASSERT(GALAlookup(ga) == NULL);
1351   
1352   if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1353     IF_PAR_DEBUG(free,
1354                  belch("!! Filled a free message buffer (sending remaining messages indivisually)"));   
1355
1356     sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1357     i = 0;
1358   }
1359   freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1360   freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1361   freeMsgIndex[pe] = i;
1362
1363   IF_DEBUG(sanity,
1364            ga->weight = 0xdead0add;
1365            ga->payload.gc.gtid = 0xbbbbbbbb;
1366            ga->payload.gc.slot = 0xbbbbbbbb;);
1367 }
1368
1369 //@cindex sendFreeMessages
1370 void
1371 sendFreeMessages(void)
1372 {
1373   nat i;
1374   
1375   for (i = 0; i < nPEs; i++) 
1376     if (freeMsgIndex[i] > 0)
1377       sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1378 }
1379
1380 #endif /* PAR -- whole file */
1381
1382 //@node Index,  , Miscellaneous Functions, High Level Communications Routines
1383 //@subsection Index
1384
1385 //@index
1386 //* ACK::  @cindex\s-+ACK
1387 //* DebugPrintGAGAMap::  @cindex\s-+DebugPrintGAGAMap
1388 //* FETCH::  @cindex\s-+FETCH
1389 //* FISH::  @cindex\s-+FISH
1390 //* FREE::  @cindex\s-+FREE
1391 //* RESUME::  @cindex\s-+RESUME
1392 //* SCHEDULE::  @cindex\s-+SCHEDULE
1393 //* blockFetch::  @cindex\s-+blockFetch
1394 //* choosePE::  @cindex\s-+choosePE
1395 //* freeMsgBuffer::  @cindex\s-+freeMsgBuffer
1396 //* freeMsgIndex::  @cindex\s-+freeMsgIndex
1397 //* freeRemoteGA::  @cindex\s-+freeRemoteGA
1398 //* gumPackBuffer::  @cindex\s-+gumPackBuffer
1399 //* initMoreBuffers::  @cindex\s-+initMoreBuffers
1400 //* prepareFreeMsgBuffers::  @cindex\s-+prepareFreeMsgBuffers
1401 //* processAck::  @cindex\s-+processAck
1402 //* processFetch::  @cindex\s-+processFetch
1403 //* processFetches::  @cindex\s-+processFetches
1404 //* processFish::  @cindex\s-+processFish
1405 //* processFree::  @cindex\s-+processFree
1406 //* processMessages::  @cindex\s-+processMessages
1407 //* processResume::  @cindex\s-+processResume
1408 //* processSchedule::  @cindex\s-+processSchedule
1409 //* sendAck::  @cindex\s-+sendAck
1410 //* sendFetch::  @cindex\s-+sendFetch
1411 //* sendFish::  @cindex\s-+sendFish
1412 //* sendFree::  @cindex\s-+sendFree
1413 //* sendFreeMessages::  @cindex\s-+sendFreeMessages
1414 //* sendResume::  @cindex\s-+sendResume
1415 //* sendSchedule::  @cindex\s-+sendSchedule
1416 //* unpackAck::  @cindex\s-+unpackAck
1417 //* unpackFetch::  @cindex\s-+unpackFetch
1418 //* unpackFish::  @cindex\s-+unpackFish
1419 //* unpackFree::  @cindex\s-+unpackFree
1420 //* unpackResume::  @cindex\s-+unpackResume
1421 //* unpackSchedule::  @cindex\s-+unpackSchedule
1422 //* waitForTermination::  @cindex\s-+waitForTermination
1423 //@end index