[project @ 1998-12-02 13:17:09 by simonm]
[ghc-hetmet.git] / ghc / rts / gum / HLComms.c
1 /* -----------------------------------------------------------------------------
2  * 
3  * $Id: HLComms.c,v 1.2 1998/12/02 13:29:05 simonm Exp $
4  *
5  * High Level Communications Routines (HLComms.lc)
6  *
7  *  Contains the high-level routines (i.e. communication
8  *  subsystem independent) used by GUM
9  *
10  *  Phil Trinder, Glasgow University, 12 December 1994
11  *  Adapted for new RTS
12  *  Phil Trinder, Simon Marlow July 1998
13  * 
14  * -------------------------------------------------------------------------- */
15
16 #ifdef PAR /* whole file */
17
18 #ifndef _AIX
19 #define NON_POSIX_SOURCE /* so says Solaris */
20 #endif
21
22 #include "Rts.h"
23 #include "RtsUtils.h"
24 #include "RtsFlags.h"
25
26 #include "HLC.h"
27 #include "Parallel.h"
28
29 /*
30  * GUM Message Sending and Unpacking Functions
31  * ********************************************
32  */
33
34 /*
35  * Allocate space for message processing
36  */
37
38 static W_ *gumPackBuffer;
39
40 void 
41 InitMoreBuffers(void)
42 {
43     gumPackBuffer
44       = (W_ *) stgMallocWords(RtsFlags.ParFlags.packBufferSize, "initMoreBuffers");
45 }
46
47 /*
48  *SendFetch packs the two global addresses and a load into a message +
49  *sends it.  
50  */
51
52 void
53 sendFetch(globalAddr *rga, globalAddr *lga, int load)
54 {
55
56     ASSERT(rga->weight > 0 && lga->weight > 0);
57 #ifdef FETCH_DEBUG    
58     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
59       rga->loc.gc.gtid, rga->loc.gc.slot, load);
60 #endif
61     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
62       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
63       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
64 }
65
66 /*
67  *unpackFetch unpacks a FETCH message into two Global addresses and a load figure.
68  */
69
70 static void
71 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
72 {
73     long buf[6];
74
75     GetArgs(buf, 6); 
76     lga->weight = 1;
77     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
78     lga->loc.gc.slot = (int) buf[1];
79
80     rga->weight = (unsigned) buf[2];
81     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
82     rga->loc.gc.slot = (int) buf[4];
83
84     *load = (int) buf[5];
85
86     ASSERT(rga->weight > 0);
87 }
88
89 /*
90  * SendResume packs the remote blocking queue's GA and data into a message 
91  * and sends it.
92  */
93
94 void
95 sendResume(globalAddr *rga, int nelem, StgPtr data)
96 {
97
98 #ifdef RESUME_DEBUG
99     PrintPacket(data);
100     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
101       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
102 #endif
103
104     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105       (W_) rga->weight, (W_) rga->loc.gc.slot);
106
107 }
108
109 /*
110  * blockFetch blocks a BlockedFetch node on some kind of black hole.
111  */
112 static void
113 blockFetch(StgPtr bf, StgPtr bh)
114 {}
115
116 /* 
117  * Empty until Blocked fetches etc defined 
118  *    switch (INFO_TYPE(INFO_PTR(bh))) {
119  *    case INFO_BH_TYPE:
120  *      BF_LINK(bf) = PrelBase_Z91Z93_closure;
121  *      SET_INFO_PTR(bh, BQ_info);
122  *      BQ_ENTRIES(bh) = (W_) bf;
123  *
124  *#ifdef GC_MUT_REQUIRED
125  *      /*
126  *       * If we modify a black hole in the old generation, we have to
127  *       * make sure it goes on the mutables list
128  *       *
129  *
130  *      if (bh <= StorageMgrInfo.OldLim) {
131  *          MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
132  *          StorageMgrInfo.OldMutables = bh;
133  *      } else
134  *          MUT_LINK(bh) = MUT_NOT_LINKED;
135  *#endif
136  *      break;
137  *    case INFO_BQ_TYPE:
138  *      BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
139  *      BQ_ENTRIES(bh) = (W_) bf;
140  *      break;
141  *    case INFO_FMBQ_TYPE:
142  *      BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
143  *      FMBQ_ENTRIES(bh) = (W_) bf;
144  *      break;
145  *    case INFO_SPEC_RBH_TYPE:
146  *      BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
147  *      SPEC_RBH_BQ(bh) = (W_) bf;
148  *      break;
149  *    case INFO_GEN_RBH_TYPE:
150  *      BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
151  *      GEN_RBH_BQ(bh) = (W_) bf;
152  *      break;
153  *    default:
154  *      fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
155  *        (W_) bh, INFO_PTR(bh));
156  *      EXIT(EXIT_FAILURE);
157  *    }
158  *}
159  */
160
161 /*
162  * processFetches constructs and sends resume messages for every
163  * BlockedFetch which is ready to be awakened.
164  */
165 extern P_ PendingFetches;
166
167 void
168 processFetches()
169 {}
170 /* 
171  * Empty till closure defined 
172  *    P_ bf;
173  *    P_ next;
174  *    P_ closure;
175  *    P_ ip;
176  *    globalAddr rga;
177  *    
178  *    for (bf = PendingFetches; bf != PrelBase_Z91Z93_closure; bf = next) {
179  *      next = BF_LINK(bf);
180  *
181  *      /*
182  *       * Find the target at the end of the indirection chain, and
183  *       * process it in much the same fashion as the original target
184  *       * of the fetch.  Though we hope to find graph here, we could
185  *       * find a black hole (of any flavor) or even a FetchMe.
186  *       *
187  *      closure = BF_NODE(bf);
188  *      while (IS_INDIRECTION(INFO_PTR(closure)))
189  *          closure = (P_) IND_CLOSURE_PTR(closure);
190  *        ip = (P_) INFO_PTR(closure);
191  *
192  *      if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
193  *          /* Forward the Fetch to someone else *
194  *          rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
195  *          rga.loc.gc.slot = (int) BF_SLOT(bf);
196  *          rga.weight = (unsigned) BF_WEIGHT(bf);
197  *
198  *          sendFetch(FETCHME_GA(closure), &rga, 0 /* load *);
199  *      } else if (IS_BLACK_HOLE(ip)) {
200  *          BF_NODE(bf) = closure;
201  *          blockFetch(bf, closure);
202  *      } else {
203  *          /* We now have some local graph to send back *
204  *          W_ size;
205  *          P_ graph;
206  *
207  *          if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
208  *              PendingFetches = bf;
209  *              ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
210  *              SAVE_Hp -= PACK_HEAP_REQUIRED;
211  *              bf = PendingFetches;
212  *              next = BF_LINK(bf);
213  *              closure = BF_NODE(bf);
214  *              graph = PackNearbyGraph(closure, &size);
215  *              ASSERT(graph != NULL);
216  *          }
217  *          rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
218  *          rga.loc.gc.slot = (int) BF_SLOT(bf);
219  *          rga.weight = (unsigned) BF_WEIGHT(bf);
220  *
221  *          sendResume(&rga, size, graph);
222  *      }
223  *    }
224  *    PendingFetches = PrelBase_Z91Z93_closure;
225  *}
226  */
227
228 /*
229  * unpackResume unpacks a Resume message into two Global addresses and
230  * a data array.
231  */
232
233 static void
234 unpackResume(globalAddr *lga, int *nelem, W_ *data)
235 {
236     long buf[3];
237
238     GetArgs(buf, 3); 
239     lga->weight = (unsigned) buf[0];
240     lga->loc.gc.gtid = mytid;
241     lga->loc.gc.slot = (int) buf[1];
242
243     *nelem = (int) buf[2];
244     GetArgs(data, *nelem);
245 }
246
247 /*
248  *SendAck packs the global address being acknowledged, together with
249  *an array of global addresses for any closures shipped and sends them.
250  */
251
252 void
253 sendAck(GLOBAL_TASK_ID task, int ngas, globalAddr *gagamap)
254 {
255     static long *buffer;
256     long *p;
257     int i;
258
259     buffer = (long *) gumPackBuffer;
260
261     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
262         ASSERT(gagamap[1].weight > 0);
263         p[0] = (long) gagamap->weight;
264         p[1] = (long) gagamap->loc.gc.gtid;
265         p[2] = (long) gagamap->loc.gc.slot;
266         gagamap++;
267         p[3] = (long) gagamap->weight;
268         p[4] = (long) gagamap->loc.gc.gtid;
269         p[5] = (long) gagamap->loc.gc.slot;
270         gagamap++;
271     }
272 #ifdef ACK_DEBUG    
273     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
274 #endif
275     SendOpN(PP_ACK, task, p - buffer, buffer);
276
277 }
278
279 /*
280  *unpackAck unpacks an Acknowledgement message into a Global address,
281  *a count of the number of global addresses following and a map of 
282  *Global addresses
283  */
284
285 static void
286 unpackAck(int *ngas, globalAddr *gagamap)
287 {
288     long GAarraysize;
289     long buf[6];
290
291     GetArgs(&GAarraysize, 1);
292
293     *ngas = GAarraysize / 6;
294
295     while (GAarraysize > 0) {
296         GetArgs(buf, 6);
297         gagamap->weight = (unsigned) buf[0];
298         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
299         gagamap->loc.gc.slot = (int) buf[2];
300         gagamap++;
301         gagamap->weight = (unsigned) buf[3];
302         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
303         gagamap->loc.gc.slot = (int) buf[5];
304         ASSERT(gagamap->weight > 0);
305         gagamap++;
306         GAarraysize -= 6;
307     }
308 }
309
310 /*
311  *SendFish packs the global address being acknowledged, together with
312  *an array of global addresses for any closures shipped and sends them.
313  */
314
315 void
316 sendFish(GLOBAL_TASK_ID destPE, GLOBAL_TASK_ID origPE, 
317          int age, int history, int hunger)
318 {
319
320 #ifdef FISH_DEBUG
321     fprintf(stderr,"Sending Fish to %lx\n", destPE);
322 #endif
323     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
324     if (origPE == mytid)
325         fishing = rtsTrue;
326
327 }
328
329 /*
330  *unpackFish unpacks a FISH message into the global task id of the
331  *originating PE and 3 data fields: the age, history and hunger of the
332  *fish. The history + hunger are not currently used.
333  */
334
335 static void
336 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
337 {
338     long buf[4];
339
340     GetArgs(buf, 4);
341
342     *origPE = (GLOBAL_TASK_ID) buf[0];
343     *age = (int) buf[1];
344     *history = (int) buf[2];
345     *hunger = (int) buf[3];
346 }
347
348 /*
349  *SendFree sends (weight, slot) pairs for GAs that we no longer need references
350  *to.
351  */
352 void
353 sendFree(GLOBAL_TASK_ID pe, int nelem, StgPtr data)
354 {
355 #ifdef FREE_DEBUG
356     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
357 #endif
358     SendOpN(PP_FREE, pe, nelem, data);
359
360 }
361
362
363 /*
364  *unpackFree unpacks a FREE message into the amount of data shipped and
365  *a data block.
366  */
367
368 static void
369 unpackFree(int *nelem, W_ *data)
370 {
371     long buf[1];
372
373     GetArgs(buf, 1);
374     *nelem = (int) buf[0];
375     GetArgs(data, *nelem);
376 }
377
378 /*
379  *SendSchedule sends a closure to be evaluated in response to a Fish
380  *message. The message is directed to the PE that originated the Fish
381  *(origPE), and includes the packed closure (data) along with its size
382  *(nelem).
383  */
384
385 void
386 sendSchedule(GLOBAL_TASK_ID origPE, int nelem, StgPtr data)
387 {
388 #ifdef SCHEDULE_DEBUG
389     PrintPacket(data);
390     fprintf(stderr, "Sending Schedule to %x\n", origPE);
391 #endif
392
393     SendOpN(PP_SCHEDULE, origPE, nelem, data);
394 }
395
396 /*
397  *unpackSchedule unpacks a SCHEDULE message into the Global address of
398  *the closure shipped, the amount of data shipped (nelem) and the data
399  *block (data).
400  */
401
402 static void
403 unpackSchedule(int *nelem, W_ *data)
404 {
405     long buf[1];
406
407     GetArgs(buf, 1);
408     *nelem = (int) buf[0];
409     GetArgs(data, *nelem);
410 }
411
412 /*
413  *Message-Processing Functions
414  *
415  *The following routines process incoming GUM messages. Often reissuing
416  *messages in response.
417  *
418  *processFish unpacks a fish message, reissuing it if it's our own,
419  *sending work if we have it or sending it onwards otherwise.
420  *
421  * Only stubs now. Real stuff in HLCommsRest PWT
422  */
423 static void
424 processFish(void)
425 {}                              /* processFish */
426
427 /*
428  * processFetch either returns the requested data (if available) 
429  * or blocks the remote blocking queue on a black hole (if not).
430  */
431 static void
432 processFetch(void)
433 {}
434
435 /*
436  * processFree unpacks a FREE message and adds the weights to our GAs.
437  */
438 static void
439 processFree(void)
440 {}
441
442 /*
443  * processResume unpacks a RESUME message into the graph, filling in
444  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
445  * FetchMe (now a blocking queue) are awakened, and the blocking queue
446  * is converted into an indirection.  Finally it sends an ACK in response
447  * which contains any newly allocated GAs.
448  */
449
450 static void
451 processResume(GLOBAL_TASK_ID sender)
452 {}
453
454 /*
455  * processSchedule unpacks a SCHEDULE message into the graph, filling
456  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
457  * the local spark queue.  Finally it sends an ACK in response
458  * which contains any newly allocated GAs.
459  */
460 static void
461 processSchedule(GLOBAL_TASK_ID sender)
462 {
463 }
464
465 /*
466  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
467  * (which represent shared thunks that have been shipped) into fetch-mes
468  * to remote GAs.
469  */
470 static void
471 processAck(void)
472 {}
473
474 /*
475  * GUM Message Processor
476
477  * processMessages processes any messages that have arrived, calling
478  * appropriate routines depending on the message tag
479  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
480  * present and performs a blocking receive! During profiling it
481  * busy-waits in order to record idle time.
482  */
483
484 void
485 processMessages(void)
486 {
487     PACKET packet;
488     OPCODE opcode;
489     GLOBAL_TASK_ID task;
490     
491     do {
492
493         packet = GetPacket();   /* Get next message; block until one available */
494
495         get_opcode_and_sender(packet, &opcode, &task);
496
497         switch (opcode) {
498
499         case PP_FINISH:
500             stg_exit(EXIT_SUCCESS);     /* The computation has been completed by someone
501                                  * else */
502             break;
503
504         case PP_FETCH:
505             processFetch();
506             break;
507
508         case PP_RESUME:
509             processResume(task);
510             break;
511
512         case PP_ACK:
513             processAck();
514             break;
515
516         case PP_FISH:
517             processFish();
518             break;
519
520         case PP_FREE:
521             processFree();
522             break;
523
524         case PP_SCHEDULE:
525             processSchedule(task);
526             break;
527
528         default:
529             /* Anything we're not prepared to deal with. */
530             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
531               mytid, opcode, task);
532
533             stg_exit(EXIT_FAILURE);
534         }                       /* switch */
535
536     } while (PacketsWaiting()); /* While there are messages: process them */
537 }                               /* processMessages */
538
539 /*
540  * Miscellaneous Functions
541  * 
542  *
543  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
544  * Important properties:
545  *   - it varies during execution, even if the PE is idle
546  *   - it's different for each PE
547  *   - we never send a fish to ourselves
548  */
549 extern long lrand48 (void);
550
551 GLOBAL_TASK_ID
552 choosePE(void)
553 {
554     long temp;
555
556     temp = lrand48() % nPEs;
557     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
558         temp = (temp + 1) % nPEs;
559     }
560     return PEs[temp];
561 }
562
563 /*
564  *WaitForTermination enters a loop ignoring spurious messages while waiting for the
565  *termination sequence to be completed.
566  */
567 void
568 WaitForTermination(void)
569 {
570   do {
571     PACKET p = GetPacket();
572     ProcessUnexpected(p);
573   } while (rtsTrue);
574 }
575
576 #ifdef DEBUG
577 void
578 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
579 {
580     int i;
581
582     for (i = 0; i < nGAs; ++i, gagamap += 2)
583         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
584           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
585           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
586 }
587 #endif
588
589 static PP_ freeMsgBuffer = NULL;
590 static int *freeMsgIndex = NULL;
591
592 void
593 prepareFreeMsgBuffers(void)
594 {
595     int i;
596
597     /* Allocate the freeMsg buffers just once and then hang onto them. */
598
599     if (freeMsgIndex == NULL) {
600
601         freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
602         freeMsgBuffer = (PP_)  stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
603
604         for(i = 0; i < nPEs; i++) {
605             if (i != thisPE) {
606               freeMsgBuffer[i] = (P_) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
607                                         "prepareFreeMsgBuffers (Buffer #i)");
608             }
609         }
610     }
611
612     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
613     for (i = 0; i < nPEs; i++)
614         freeMsgIndex[i] = 0;
615 }
616
617 void
618 freeRemoteGA(int pe, globalAddr *ga)
619 {
620     int i;
621
622     ASSERT(GALAlookup(ga) == NULL);
623
624     if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
625 #ifdef FREE_DEBUG
626         fprintf(stderr, "Filled a free message buffer\n");      
627 #endif
628         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
629         i = 0;
630     }
631     freeMsgBuffer[pe][i++] = (W_) ga->weight;
632     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
633     freeMsgIndex[pe] = i;
634 #ifdef DEBUG
635     ga->weight = 0x0f0f0f0f;
636     ga->loc.gc.gtid = 0x666;
637     ga->loc.gc.slot = 0xdeaddead;
638 #endif
639 }
640
641 void
642 sendFreeMessages(void)
643 {
644     int i;
645
646     for (i = 0; i < nPEs; i++) {
647         if (freeMsgIndex[i] > 0)
648             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
649     }
650 }
651
652 /* Process messaging code ripped out for the time being -- SDM & PWT */
653
654 #ifdef 0
655 /* These are the remaining message-processing functions from HLComms*/
656
657
658 /*
659  *Message-Processing Functions
660  *
661  *The following routines process incoming GUM messages. Often reissuing
662  *messages in response.
663  *
664  *processFish unpacks a fish message, reissuing it if it's our own,
665  *sending work if we have it or sending it onwards otherwise.
666  */
667 static void
668 processFish(void)
669 {
670     GLOBAL_TASK_ID origPE;
671     int age, history, hunger;
672
673     unpackFish(&origPE, &age, &history, &hunger);
674
675     if (origPE == mytid) {
676         fishing = rtsFalse;
677     } else {
678         P_ spark;
679
680         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
681             W_ size;
682             P_ graph;
683
684             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
685                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
686                 SAVE_Hp -= PACK_HEAP_REQUIRED;
687                 /* Now go back and try again */
688             } else {
689                 sendSchedule(origPE, size, graph);
690                 DisposeSpark(spark);
691                 break;
692             }
693         }
694         if (spark == NULL) {
695             /* We have no sparks to give */
696             if (age < FISH_LIFE_EXPECTANCY)
697                 sendFish(choosePE(), origPE,
698                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
699
700             /* Send it home to die */
701             else
702                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
703         }
704     }
705 }                               /* processFish */
706
707 /*
708  *processFetch either returns the requested data (if available) 
709  *or blocks the remote blocking queue on a black hole (if not).
710  */
711 static void
712 processFetch(void)
713 {
714     globalAddr ga, rga;
715     int load;
716
717     P_ closure;
718     P_ ip;
719
720     unpackFetch(&ga, &rga, &load);
721 #ifdef FETCH_DEBUG
722     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
723       ga.loc.gc.gtid, ga.loc.gc.slot,
724       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
725 #endif
726
727     closure = GALAlookup(&ga);
728     ip = (P_) INFO_PTR(closure);
729
730     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
731         /* Forward the Fetch to someone else */
732         sendFetch(FETCHME_GA(closure), &rga, load);
733     } else if (rga.loc.gc.gtid == mytid) {
734         /* Our own FETCH forwarded back around to us */
735         P_ fmbq = GALAlookup(&rga);
736
737         /* We may have already discovered that the fetch target is our own. */
738         if (fmbq != closure) 
739             CommonUp(fmbq, closure);
740         (void) addWeight(&rga);
741     } else if (IS_BLACK_HOLE(ip)) {
742         /* This includes RBH's and FMBQ's */
743         P_ bf;
744
745         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
746             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
747             closure = GALAlookup(&ga);
748             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
749         }
750         ASSERT(GALAlookup(&rga) == NULL);
751
752         SET_BF_HDR(bf, BF_info, bogosity);
753         BF_NODE(bf) = closure;
754         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
755         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
756         BF_WEIGHT(bf) = (W_) rga.weight;
757         blockFetch(bf, closure);
758
759 #ifdef FETCH_DEBUG
760         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
761           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
762 #endif
763
764     } else {                    
765         /* The target of the FetchMe is some local graph */
766         W_ size;
767         P_ graph;
768
769         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
770             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
771             SAVE_Hp -= PACK_HEAP_REQUIRED;
772             closure = GALAlookup(&ga);
773             graph = PackNearbyGraph(closure, &size);
774             ASSERT(graph != NULL);
775         }
776         sendResume(&rga, size, graph);
777     }
778 }
779
780 /*
781  *processFree unpacks a FREE message and adds the weights to our GAs.
782  */
783 static void
784 processFree(void)
785 {
786     int nelem;
787     static W_ *freeBuffer;
788     int i;
789     globalAddr ga;
790
791     freeBuffer = gumPackBuffer;
792     unpackFree(&nelem, freeBuffer);
793 #ifdef FREE_DEBUG
794     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
795 #endif
796     ga.loc.gc.gtid = mytid;
797     for (i = 0; i < nelem;) {
798         ga.weight = (unsigned) freeBuffer[i++];
799         ga.loc.gc.slot = (int) freeBuffer[i++];
800 #ifdef FREE_DEBUG
801         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
802           ga.loc.gc.slot, ga.weight);
803 #endif
804         (void) addWeight(&ga);
805     }
806 }
807
808 /*
809  *processResume unpacks a RESUME message into the graph, filling in
810  *the LA -> GA, and GA -> LA tables. Threads blocked on the original
811  *FetchMe (now a blocking queue) are awakened, and the blocking queue
812  *is converted into an indirection.  Finally it sends an ACK in response
813  *which contains any newly allocated GAs.
814  */
815
816 static void
817 processResume(GLOBAL_TASK_ID sender)
818 {
819     int nelem;
820     W_ nGAs;
821     static W_ *packBuffer;
822     P_ newGraph;
823     P_ old;
824     globalAddr lga;
825     globalAddr *gagamap;
826
827     packBuffer = gumPackBuffer;
828     unpackResume(&lga, &nelem, packBuffer);
829
830 #ifdef RESUME_DEBUG
831     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
832       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
833     PrintPacket(packBuffer);
834 #endif
835
836     /* 
837      * We always unpack the incoming graph, even if we've received the
838      * requested node in some other data packet (and already awakened
839      * the blocking queue).
840      */
841     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
842         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
843         SAVE_Hp -= packBuffer[0];
844     }
845
846     /* Do this *after* GC; we don't want to release the object early! */
847
848     if (lga.weight > 0)
849         (void) addWeight(&lga);
850
851     old = GALAlookup(&lga);
852
853     if (RtsFlags.ParFlags.granSimStats) {
854         P_ tso = NULL;
855
856         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
857             for(tso = (P_) FMBQ_ENTRIES(old); 
858               TSO_LINK(tso) != PrelBase_Z91Z93_closure; 
859               tso = TSO_LINK(tso))
860                 ;
861         }
862         /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
863         DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
864                          tso,old,0);
865     }
866
867     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
868     ASSERT(newGraph != NULL);
869
870     /* 
871      * Sometimes, unpacking will common up the resumee with the
872      * incoming graph, but if it hasn't, we'd better do so now.
873      */
874    
875     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
876         CommonUp(old, newGraph);
877
878 #ifdef RESUME_DEBUG
879     DebugPrintGAGAMap(gagamap, nGAs);
880 #endif
881
882     sendAck(sender, nGAs, gagamap);
883 }
884
885 /*
886  *processSchedule unpacks a SCHEDULE message into the graph, filling
887  *in the LA -> GA, and GA -> LA tables. The root of the graph is added to
888  *the local spark queue.  Finally it sends an ACK in response
889  *which contains any newly allocated GAs.
890  */
891 static void
892 processSchedule(GLOBAL_TASK_ID sender)
893 {
894     int nelem;
895     int space_required;
896     rtsBool success;
897     static W_ *packBuffer;
898     W_ nGAs;
899     P_ newGraph;
900     globalAddr *gagamap;
901
902     packBuffer = gumPackBuffer;         /* HWL */
903     unpackSchedule(&nelem, packBuffer);
904
905 #ifdef SCHEDULE_DEBUG
906     fprintf(stderr, "Rcvd Schedule\n");
907     PrintPacket(packBuffer);
908 #endif
909
910     /*
911      * For now, the graph is a closure to be sparked as an advisory
912      * spark, but in future it may be a complete spark with
913      * required/advisory status, priority etc.
914      */
915
916     space_required = packBuffer[0];
917     if (SAVE_Hp + space_required >= SAVE_HpLim) {
918         ReallyPerformThreadGC(space_required, rtsFalse);
919         SAVE_Hp -= space_required;
920     }
921     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
922     ASSERT(newGraph != NULL);
923     success = Spark(newGraph, rtsFalse);
924     ASSERT(success);
925
926 #ifdef SCHEDULE_DEBUG
927     DebugPrintGAGAMap(gagamap, nGAs);
928 #endif
929
930     if (nGAs > 0)
931         sendAck(sender, nGAs, gagamap);
932
933     fishing = rtsFalse;
934 }
935
936 /*
937  *processAck unpacks an ACK, and uses the GAGA map to convert RBH's
938  *(which represent shared thunks that have been shipped) into fetch-mes
939  *to remote GAs.
940  */
941 static void
942 processAck(void)
943 {
944     int nGAs;
945     globalAddr *gaga;
946
947     globalAddr gagamap[MAX_GAS * 2];
948
949     unpackAck(&nGAs, gagamap);
950
951 #ifdef ACK_DEBUG
952     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
953     DebugPrintGAGAMap(gagamap, nGAs);
954 #endif
955
956     /*
957      * For each (oldGA, newGA) pair, set the GA of the corresponding
958      * thunk to the newGA, convert the thunk to a FetchMe, and return
959      * the weight from the oldGA.
960      */
961     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
962         P_ old = GALAlookup(gaga);
963         P_ new = GALAlookup(gaga + 1);
964
965         if (new == NULL) {
966             /* We don't have this closure, so we make a fetchme for it */
967             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
968
969             convertToFetchMe(old, ga);
970         } else {
971             /* 
972              * Oops...we've got this one already; update the RBH to
973              * point to the object we already know about, whatever it
974              * happens to be.
975              */
976             CommonUp(old, new);
977
978             /* 
979              * Increase the weight of the object by the amount just
980              * received in the second part of the ACK pair.
981              */
982             (void) addWeight(gaga + 1);
983         }
984         (void) addWeight(gaga);
985     }
986 }
987
988 #endif
989
990 #endif /* PAR -- whole file */
991