[project @ 1999-02-15 14:30:56 by simonm]
[ghc-hetmet.git] / ghc / rts / gum / HLComms.c
1 /* -----------------------------------------------------------------------------
2  * 
3  * $Id: HLComms.c,v 1.3 1999/02/15 14:30:56 simonm Exp $
4  *
5  * High Level Communications Routines (HLComms.lc)
6  *
7  *  Contains the high-level routines (i.e. communication
8  *  subsystem independent) used by GUM
9  *
10  *  Phil Trinder, Glasgow University, 12 December 1994
11  *  Adapted for new RTS
12  *  Phil Trinder, Simon Marlow July 1998
13  * 
14  * -------------------------------------------------------------------------- */
15
16 #ifdef PAR /* whole file */
17
18 #ifndef _AIX
19 #define NON_POSIX_SOURCE /* so says Solaris */
20 #endif
21
22 #include "Rts.h"
23 #include "RtsUtils.h"
24 #include "RtsFlags.h"
25
26 #include "HLC.h"
27 #include "Parallel.h"
28
29 /*
30  * GUM Message Sending and Unpacking Functions
31  * ********************************************
32  */
33
34 /*
35  * Allocate space for message processing
36  */
37
38 static W_ *gumPackBuffer;
39
40 void 
41 InitMoreBuffers(void)
42 {
43     gumPackBuffer
44       = (W_ *) stgMallocWords(RtsFlags.ParFlags.packBufferSize, "initMoreBuffers");
45 }
46
47 /*
48  *SendFetch packs the two global addresses and a load into a message +
49  *sends it.  
50  */
51
52 void
53 sendFetch(globalAddr *rga, globalAddr *lga, int load)
54 {
55
56     ASSERT(rga->weight > 0 && lga->weight > 0);
57 #ifdef FETCH_DEBUG    
58     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
59       rga->loc.gc.gtid, rga->loc.gc.slot, load);
60 #endif
61     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
62       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
63       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
64 }
65
66 /*
67  *unpackFetch unpacks a FETCH message into two Global addresses and a load figure.
68  */
69
70 static void
71 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
72 {
73     long buf[6];
74
75     GetArgs(buf, 6); 
76     lga->weight = 1;
77     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
78     lga->loc.gc.slot = (int) buf[1];
79
80     rga->weight = (unsigned) buf[2];
81     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
82     rga->loc.gc.slot = (int) buf[4];
83
84     *load = (int) buf[5];
85
86     ASSERT(rga->weight > 0);
87 }
88
89 /*
90  * SendResume packs the remote blocking queue's GA and data into a message 
91  * and sends it.
92  */
93
94 void
95 sendResume(globalAddr *rga, int nelem, StgPtr data)
96 {
97
98 #ifdef RESUME_DEBUG
99     PrintPacket(data);
100     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
101       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
102 #endif
103
104     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105       (W_) rga->weight, (W_) rga->loc.gc.slot);
106
107 }
108
109 /*
110  * blockFetch blocks a BlockedFetch node on some kind of black hole.
111  */
112 static void
113 blockFetch(StgPtr bf, StgPtr bh)
114 {}
115
116 #if 0 
117  Empty until Blocked fetches etc defined 
118     switch (INFO_TYPE(INFO_PTR(bh))) {
119     case INFO_BH_TYPE:
120         BF_LINK(bf) = PrelBase_Z91Z93_closure;
121         SET_INFO_PTR(bh, BQ_info);
122         BQ_ENTRIES(bh) = (W_) bf;
123
124 #ifdef GC_MUT_REQUIRED
125         /*
126          * If we modify a black hole in the old generation, we have to
127          * make sure it goes on the mutables list
128          */
129
130         if (bh <= StorageMgrInfo.OldLim) {
131             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
132             StorageMgrInfo.OldMutables = bh;
133         } else
134             MUT_LINK(bh) = MUT_NOT_LINKED;
135 #endif
136         break;
137     case INFO_BQ_TYPE:
138         BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
139         BQ_ENTRIES(bh) = (W_) bf;
140         break;
141     case INFO_FMBQ_TYPE:
142         BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
143         FMBQ_ENTRIES(bh) = (W_) bf;
144         break;
145     case INFO_SPEC_RBH_TYPE:
146         BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
147         SPEC_RBH_BQ(bh) = (W_) bf;
148         break;
149     case INFO_GEN_RBH_TYPE:
150         BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
151         GEN_RBH_BQ(bh) = (W_) bf;
152         break;
153     default:
154         fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
155           (W_) bh, INFO_PTR(bh));
156         EXIT(EXIT_FAILURE);
157     }
158 }
159 #endif
160
161 /*
162  * processFetches constructs and sends resume messages for every
163  * BlockedFetch which is ready to be awakened.
164  */
165 extern P_ PendingFetches;
166
167 void
168 processFetches()
169 {}
170  
171 #if 0
172  Empty till closure defined 
173     P_ bf;
174     P_ next;
175     P_ closure;
176     P_ ip;
177     globalAddr rga;
178     
179     for (bf = PendingFetches; bf != PrelBase_Z91Z93_closure; bf = next) {
180         next = BF_LINK(bf);
181
182         /*
183          * Find the target at the end of the indirection chain, and
184          * process it in much the same fashion as the original target
185          * of the fetch.  Though we hope to find graph here, we could
186          * find a black hole (of any flavor) or even a FetchMe.
187          */
188         closure = BF_NODE(bf);
189         while (IS_INDIRECTION(INFO_PTR(closure)))
190             closure = (P_) IND_CLOSURE_PTR(closure);
191         ip = (P_) INFO_PTR(closure);
192
193         if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
194             /* Forward the Fetch to someone else */
195             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
196             rga.loc.gc.slot = (int) BF_SLOT(bf);
197             rga.weight = (unsigned) BF_WEIGHT(bf);
198
199             sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
200         } else if (IS_BLACK_HOLE(ip)) {
201             BF_NODE(bf) = closure;
202             blockFetch(bf, closure);
203         } else {
204             /* We now have some local graph to send back */
205             W_ size;
206             P_ graph;
207
208             if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
209                 PendingFetches = bf;
210                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
211                 SAVE_Hp -= PACK_HEAP_REQUIRED;
212                 bf = PendingFetches;
213                 next = BF_LINK(bf);
214                 closure = BF_NODE(bf);
215                 graph = PackNearbyGraph(closure, &size);
216                 ASSERT(graph != NULL);
217             }
218             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
219             rga.loc.gc.slot = (int) BF_SLOT(bf);
220             rga.weight = (unsigned) BF_WEIGHT(bf);
221
222             sendResume(&rga, size, graph);
223         }
224     }
225     PendingFetches = PrelBase_Z91Z93_closure;
226 }
227 #endif
228
229 /*
230  * unpackResume unpacks a Resume message into two Global addresses and
231  * a data array.
232  */
233
234 static void
235 unpackResume(globalAddr *lga, int *nelem, W_ *data)
236 {
237     long buf[3];
238
239     GetArgs(buf, 3); 
240     lga->weight = (unsigned) buf[0];
241     lga->loc.gc.gtid = mytid;
242     lga->loc.gc.slot = (int) buf[1];
243
244     *nelem = (int) buf[2];
245     GetArgs(data, *nelem);
246 }
247
248 /*
249  *SendAck packs the global address being acknowledged, together with
250  *an array of global addresses for any closures shipped and sends them.
251  */
252
253 void
254 sendAck(GLOBAL_TASK_ID task, int ngas, globalAddr *gagamap)
255 {
256     static long *buffer;
257     long *p;
258     int i;
259
260     buffer = (long *) gumPackBuffer;
261
262     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
263         ASSERT(gagamap[1].weight > 0);
264         p[0] = (long) gagamap->weight;
265         p[1] = (long) gagamap->loc.gc.gtid;
266         p[2] = (long) gagamap->loc.gc.slot;
267         gagamap++;
268         p[3] = (long) gagamap->weight;
269         p[4] = (long) gagamap->loc.gc.gtid;
270         p[5] = (long) gagamap->loc.gc.slot;
271         gagamap++;
272     }
273 #ifdef ACK_DEBUG    
274     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
275 #endif
276     SendOpN(PP_ACK, task, p - buffer, buffer);
277
278 }
279
280 /*
281  *unpackAck unpacks an Acknowledgement message into a Global address,
282  *a count of the number of global addresses following and a map of 
283  *Global addresses
284  */
285
286 static void
287 unpackAck(int *ngas, globalAddr *gagamap)
288 {
289     long GAarraysize;
290     long buf[6];
291
292     GetArgs(&GAarraysize, 1);
293
294     *ngas = GAarraysize / 6;
295
296     while (GAarraysize > 0) {
297         GetArgs(buf, 6);
298         gagamap->weight = (unsigned) buf[0];
299         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
300         gagamap->loc.gc.slot = (int) buf[2];
301         gagamap++;
302         gagamap->weight = (unsigned) buf[3];
303         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
304         gagamap->loc.gc.slot = (int) buf[5];
305         ASSERT(gagamap->weight > 0);
306         gagamap++;
307         GAarraysize -= 6;
308     }
309 }
310
311 /*
312  *SendFish packs the global address being acknowledged, together with
313  *an array of global addresses for any closures shipped and sends them.
314  */
315
316 void
317 sendFish(GLOBAL_TASK_ID destPE, GLOBAL_TASK_ID origPE, 
318          int age, int history, int hunger)
319 {
320
321 #ifdef FISH_DEBUG
322     fprintf(stderr,"Sending Fish to %lx\n", destPE);
323 #endif
324     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
325     if (origPE == mytid)
326         fishing = rtsTrue;
327
328 }
329
330 /*
331  *unpackFish unpacks a FISH message into the global task id of the
332  *originating PE and 3 data fields: the age, history and hunger of the
333  *fish. The history + hunger are not currently used.
334  */
335
336 static void
337 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
338 {
339     long buf[4];
340
341     GetArgs(buf, 4);
342
343     *origPE = (GLOBAL_TASK_ID) buf[0];
344     *age = (int) buf[1];
345     *history = (int) buf[2];
346     *hunger = (int) buf[3];
347 }
348
349 /*
350  *SendFree sends (weight, slot) pairs for GAs that we no longer need references
351  *to.
352  */
353 void
354 sendFree(GLOBAL_TASK_ID pe, int nelem, StgPtr data)
355 {
356 #ifdef FREE_DEBUG
357     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
358 #endif
359     SendOpN(PP_FREE, pe, nelem, data);
360
361 }
362
363
364 /*
365  *unpackFree unpacks a FREE message into the amount of data shipped and
366  *a data block.
367  */
368
369 static void
370 unpackFree(int *nelem, W_ *data)
371 {
372     long buf[1];
373
374     GetArgs(buf, 1);
375     *nelem = (int) buf[0];
376     GetArgs(data, *nelem);
377 }
378
379 /*
380  *SendSchedule sends a closure to be evaluated in response to a Fish
381  *message. The message is directed to the PE that originated the Fish
382  *(origPE), and includes the packed closure (data) along with its size
383  *(nelem).
384  */
385
386 void
387 sendSchedule(GLOBAL_TASK_ID origPE, int nelem, StgPtr data)
388 {
389 #ifdef SCHEDULE_DEBUG
390     PrintPacket(data);
391     fprintf(stderr, "Sending Schedule to %x\n", origPE);
392 #endif
393
394     SendOpN(PP_SCHEDULE, origPE, nelem, data);
395 }
396
397 /*
398  *unpackSchedule unpacks a SCHEDULE message into the Global address of
399  *the closure shipped, the amount of data shipped (nelem) and the data
400  *block (data).
401  */
402
403 static void
404 unpackSchedule(int *nelem, W_ *data)
405 {
406     long buf[1];
407
408     GetArgs(buf, 1);
409     *nelem = (int) buf[0];
410     GetArgs(data, *nelem);
411 }
412
413 /*
414  *Message-Processing Functions
415  *
416  *The following routines process incoming GUM messages. Often reissuing
417  *messages in response.
418  *
419  *processFish unpacks a fish message, reissuing it if it's our own,
420  *sending work if we have it or sending it onwards otherwise.
421  *
422  * Only stubs now. Real stuff in HLCommsRest PWT
423  */
424 static void
425 processFish(void)
426 {}                              /* processFish */
427
428 /*
429  * processFetch either returns the requested data (if available) 
430  * or blocks the remote blocking queue on a black hole (if not).
431  */
432 static void
433 processFetch(void)
434 {}
435
436 /*
437  * processFree unpacks a FREE message and adds the weights to our GAs.
438  */
439 static void
440 processFree(void)
441 {}
442
443 /*
444  * processResume unpacks a RESUME message into the graph, filling in
445  * the LA -> GA, and GA -> LA tables. Threads blocked on the original
446  * FetchMe (now a blocking queue) are awakened, and the blocking queue
447  * is converted into an indirection.  Finally it sends an ACK in response
448  * which contains any newly allocated GAs.
449  */
450
451 static void
452 processResume(GLOBAL_TASK_ID sender)
453 {}
454
455 /*
456  * processSchedule unpacks a SCHEDULE message into the graph, filling
457  * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
458  * the local spark queue.  Finally it sends an ACK in response
459  * which contains any newly allocated GAs.
460  */
461 static void
462 processSchedule(GLOBAL_TASK_ID sender)
463 {
464 }
465
466 /*
467  * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
468  * (which represent shared thunks that have been shipped) into fetch-mes
469  * to remote GAs.
470  */
471 static void
472 processAck(void)
473 {}
474
475 /*
476  * GUM Message Processor
477
478  * processMessages processes any messages that have arrived, calling
479  * appropriate routines depending on the message tag
480  * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
481  * present and performs a blocking receive! During profiling it
482  * busy-waits in order to record idle time.
483  */
484
485 void
486 processMessages(void)
487 {
488     PACKET packet;
489     OPCODE opcode;
490     GLOBAL_TASK_ID task;
491     
492     do {
493
494         packet = GetPacket();   /* Get next message; block until one available */
495
496         get_opcode_and_sender(packet, &opcode, &task);
497
498         switch (opcode) {
499
500         case PP_FINISH:
501             stg_exit(EXIT_SUCCESS);     /* The computation has been completed by someone
502                                  * else */
503             break;
504
505         case PP_FETCH:
506             processFetch();
507             break;
508
509         case PP_RESUME:
510             processResume(task);
511             break;
512
513         case PP_ACK:
514             processAck();
515             break;
516
517         case PP_FISH:
518             processFish();
519             break;
520
521         case PP_FREE:
522             processFree();
523             break;
524
525         case PP_SCHEDULE:
526             processSchedule(task);
527             break;
528
529         default:
530             /* Anything we're not prepared to deal with. */
531             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
532               mytid, opcode, task);
533
534             stg_exit(EXIT_FAILURE);
535         }                       /* switch */
536
537     } while (PacketsWaiting()); /* While there are messages: process them */
538 }                               /* processMessages */
539
540 /*
541  * Miscellaneous Functions
542  * 
543  *
544  * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
545  * Important properties:
546  *   - it varies during execution, even if the PE is idle
547  *   - it's different for each PE
548  *   - we never send a fish to ourselves
549  */
550 extern long lrand48 (void);
551
552 GLOBAL_TASK_ID
553 choosePE(void)
554 {
555     long temp;
556
557     temp = lrand48() % nPEs;
558     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
559         temp = (temp + 1) % nPEs;
560     }
561     return PEs[temp];
562 }
563
564 /*
565  *WaitForTermination enters a loop ignoring spurious messages while waiting for the
566  *termination sequence to be completed.
567  */
568 void
569 WaitForTermination(void)
570 {
571   do {
572     PACKET p = GetPacket();
573     ProcessUnexpected(p);
574   } while (rtsTrue);
575 }
576
577 #ifdef DEBUG
578 void
579 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
580 {
581     int i;
582
583     for (i = 0; i < nGAs; ++i, gagamap += 2)
584         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
585           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
586           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
587 }
588 #endif
589
590 static PP_ freeMsgBuffer = NULL;
591 static int *freeMsgIndex = NULL;
592
593 void
594 prepareFreeMsgBuffers(void)
595 {
596     int i;
597
598     /* Allocate the freeMsg buffers just once and then hang onto them. */
599
600     if (freeMsgIndex == NULL) {
601
602         freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
603         freeMsgBuffer = (PP_)  stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
604
605         for(i = 0; i < nPEs; i++) {
606             if (i != thisPE) {
607               freeMsgBuffer[i] = (P_) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
608                                         "prepareFreeMsgBuffers (Buffer #i)");
609             }
610         }
611     }
612
613     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
614     for (i = 0; i < nPEs; i++)
615         freeMsgIndex[i] = 0;
616 }
617
618 void
619 freeRemoteGA(int pe, globalAddr *ga)
620 {
621     int i;
622
623     ASSERT(GALAlookup(ga) == NULL);
624
625     if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
626 #ifdef FREE_DEBUG
627         fprintf(stderr, "Filled a free message buffer\n");      
628 #endif
629         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
630         i = 0;
631     }
632     freeMsgBuffer[pe][i++] = (W_) ga->weight;
633     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
634     freeMsgIndex[pe] = i;
635 #ifdef DEBUG
636     ga->weight = 0x0f0f0f0f;
637     ga->loc.gc.gtid = 0x666;
638     ga->loc.gc.slot = 0xdeaddead;
639 #endif
640 }
641
642 void
643 sendFreeMessages(void)
644 {
645     int i;
646
647     for (i = 0; i < nPEs; i++) {
648         if (freeMsgIndex[i] > 0)
649             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
650     }
651 }
652
653 /* Process messaging code ripped out for the time being -- SDM & PWT */
654
655 #ifdef 0
656 /* These are the remaining message-processing functions from HLComms*/
657
658
659 /*
660  *Message-Processing Functions
661  *
662  *The following routines process incoming GUM messages. Often reissuing
663  *messages in response.
664  *
665  *processFish unpacks a fish message, reissuing it if it's our own,
666  *sending work if we have it or sending it onwards otherwise.
667  */
668 static void
669 processFish(void)
670 {
671     GLOBAL_TASK_ID origPE;
672     int age, history, hunger;
673
674     unpackFish(&origPE, &age, &history, &hunger);
675
676     if (origPE == mytid) {
677         fishing = rtsFalse;
678     } else {
679         P_ spark;
680
681         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
682             W_ size;
683             P_ graph;
684
685             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
686                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
687                 SAVE_Hp -= PACK_HEAP_REQUIRED;
688                 /* Now go back and try again */
689             } else {
690                 sendSchedule(origPE, size, graph);
691                 DisposeSpark(spark);
692                 break;
693             }
694         }
695         if (spark == NULL) {
696             /* We have no sparks to give */
697             if (age < FISH_LIFE_EXPECTANCY)
698                 sendFish(choosePE(), origPE,
699                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
700
701             /* Send it home to die */
702             else
703                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
704         }
705     }
706 }                               /* processFish */
707
708 /*
709  *processFetch either returns the requested data (if available) 
710  *or blocks the remote blocking queue on a black hole (if not).
711  */
712 static void
713 processFetch(void)
714 {
715     globalAddr ga, rga;
716     int load;
717
718     P_ closure;
719     P_ ip;
720
721     unpackFetch(&ga, &rga, &load);
722 #ifdef FETCH_DEBUG
723     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
724       ga.loc.gc.gtid, ga.loc.gc.slot,
725       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
726 #endif
727
728     closure = GALAlookup(&ga);
729     ip = (P_) INFO_PTR(closure);
730
731     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
732         /* Forward the Fetch to someone else */
733         sendFetch(FETCHME_GA(closure), &rga, load);
734     } else if (rga.loc.gc.gtid == mytid) {
735         /* Our own FETCH forwarded back around to us */
736         P_ fmbq = GALAlookup(&rga);
737
738         /* We may have already discovered that the fetch target is our own. */
739         if (fmbq != closure) 
740             CommonUp(fmbq, closure);
741         (void) addWeight(&rga);
742     } else if (IS_BLACK_HOLE(ip)) {
743         /* This includes RBH's and FMBQ's */
744         P_ bf;
745
746         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
747             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
748             closure = GALAlookup(&ga);
749             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
750         }
751         ASSERT(GALAlookup(&rga) == NULL);
752
753         SET_BF_HDR(bf, BF_info, bogosity);
754         BF_NODE(bf) = closure;
755         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
756         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
757         BF_WEIGHT(bf) = (W_) rga.weight;
758         blockFetch(bf, closure);
759
760 #ifdef FETCH_DEBUG
761         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
762           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
763 #endif
764
765     } else {                    
766         /* The target of the FetchMe is some local graph */
767         W_ size;
768         P_ graph;
769
770         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
771             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
772             SAVE_Hp -= PACK_HEAP_REQUIRED;
773             closure = GALAlookup(&ga);
774             graph = PackNearbyGraph(closure, &size);
775             ASSERT(graph != NULL);
776         }
777         sendResume(&rga, size, graph);
778     }
779 }
780
781 /*
782  *processFree unpacks a FREE message and adds the weights to our GAs.
783  */
784 static void
785 processFree(void)
786 {
787     int nelem;
788     static W_ *freeBuffer;
789     int i;
790     globalAddr ga;
791
792     freeBuffer = gumPackBuffer;
793     unpackFree(&nelem, freeBuffer);
794 #ifdef FREE_DEBUG
795     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
796 #endif
797     ga.loc.gc.gtid = mytid;
798     for (i = 0; i < nelem;) {
799         ga.weight = (unsigned) freeBuffer[i++];
800         ga.loc.gc.slot = (int) freeBuffer[i++];
801 #ifdef FREE_DEBUG
802         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
803           ga.loc.gc.slot, ga.weight);
804 #endif
805         (void) addWeight(&ga);
806     }
807 }
808
809 /*
810  *processResume unpacks a RESUME message into the graph, filling in
811  *the LA -> GA, and GA -> LA tables. Threads blocked on the original
812  *FetchMe (now a blocking queue) are awakened, and the blocking queue
813  *is converted into an indirection.  Finally it sends an ACK in response
814  *which contains any newly allocated GAs.
815  */
816
817 static void
818 processResume(GLOBAL_TASK_ID sender)
819 {
820     int nelem;
821     W_ nGAs;
822     static W_ *packBuffer;
823     P_ newGraph;
824     P_ old;
825     globalAddr lga;
826     globalAddr *gagamap;
827
828     packBuffer = gumPackBuffer;
829     unpackResume(&lga, &nelem, packBuffer);
830
831 #ifdef RESUME_DEBUG
832     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
833       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
834     PrintPacket(packBuffer);
835 #endif
836
837     /* 
838      * We always unpack the incoming graph, even if we've received the
839      * requested node in some other data packet (and already awakened
840      * the blocking queue).
841      */
842     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
843         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
844         SAVE_Hp -= packBuffer[0];
845     }
846
847     /* Do this *after* GC; we don't want to release the object early! */
848
849     if (lga.weight > 0)
850         (void) addWeight(&lga);
851
852     old = GALAlookup(&lga);
853
854     if (RtsFlags.ParFlags.granSimStats) {
855         P_ tso = NULL;
856
857         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
858             for(tso = (P_) FMBQ_ENTRIES(old); 
859               TSO_LINK(tso) != PrelBase_Z91Z93_closure; 
860               tso = TSO_LINK(tso))
861                 ;
862         }
863         /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
864         DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
865                          tso,old,0);
866     }
867
868     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
869     ASSERT(newGraph != NULL);
870
871     /* 
872      * Sometimes, unpacking will common up the resumee with the
873      * incoming graph, but if it hasn't, we'd better do so now.
874      */
875    
876     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
877         CommonUp(old, newGraph);
878
879 #ifdef RESUME_DEBUG
880     DebugPrintGAGAMap(gagamap, nGAs);
881 #endif
882
883     sendAck(sender, nGAs, gagamap);
884 }
885
886 /*
887  *processSchedule unpacks a SCHEDULE message into the graph, filling
888  *in the LA -> GA, and GA -> LA tables. The root of the graph is added to
889  *the local spark queue.  Finally it sends an ACK in response
890  *which contains any newly allocated GAs.
891  */
892 static void
893 processSchedule(GLOBAL_TASK_ID sender)
894 {
895     int nelem;
896     int space_required;
897     rtsBool success;
898     static W_ *packBuffer;
899     W_ nGAs;
900     P_ newGraph;
901     globalAddr *gagamap;
902
903     packBuffer = gumPackBuffer;         /* HWL */
904     unpackSchedule(&nelem, packBuffer);
905
906 #ifdef SCHEDULE_DEBUG
907     fprintf(stderr, "Rcvd Schedule\n");
908     PrintPacket(packBuffer);
909 #endif
910
911     /*
912      * For now, the graph is a closure to be sparked as an advisory
913      * spark, but in future it may be a complete spark with
914      * required/advisory status, priority etc.
915      */
916
917     space_required = packBuffer[0];
918     if (SAVE_Hp + space_required >= SAVE_HpLim) {
919         ReallyPerformThreadGC(space_required, rtsFalse);
920         SAVE_Hp -= space_required;
921     }
922     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
923     ASSERT(newGraph != NULL);
924     success = Spark(newGraph, rtsFalse);
925     ASSERT(success);
926
927 #ifdef SCHEDULE_DEBUG
928     DebugPrintGAGAMap(gagamap, nGAs);
929 #endif
930
931     if (nGAs > 0)
932         sendAck(sender, nGAs, gagamap);
933
934     fishing = rtsFalse;
935 }
936
937 /*
938  *processAck unpacks an ACK, and uses the GAGA map to convert RBH's
939  *(which represent shared thunks that have been shipped) into fetch-mes
940  *to remote GAs.
941  */
942 static void
943 processAck(void)
944 {
945     int nGAs;
946     globalAddr *gaga;
947
948     globalAddr gagamap[MAX_GAS * 2];
949
950     unpackAck(&nGAs, gagamap);
951
952 #ifdef ACK_DEBUG
953     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
954     DebugPrintGAGAMap(gagamap, nGAs);
955 #endif
956
957     /*
958      * For each (oldGA, newGA) pair, set the GA of the corresponding
959      * thunk to the newGA, convert the thunk to a FetchMe, and return
960      * the weight from the oldGA.
961      */
962     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
963         P_ old = GALAlookup(gaga);
964         P_ new = GALAlookup(gaga + 1);
965
966         if (new == NULL) {
967             /* We don't have this closure, so we make a fetchme for it */
968             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
969
970             convertToFetchMe(old, ga);
971         } else {
972             /* 
973              * Oops...we've got this one already; update the RBH to
974              * point to the object we already know about, whatever it
975              * happens to be.
976              */
977             CommonUp(old, new);
978
979             /* 
980              * Increase the weight of the object by the amount just
981              * received in the second part of the ACK pair.
982              */
983             (void) addWeight(gaga + 1);
984         }
985         (void) addWeight(gaga);
986     }
987 }
988
989 #endif
990
991 #endif /* PAR -- whole file */
992