[project @ 1996-01-11 14:06:51 by partain]
[ghc-hetmet.git] / ghc / runtime / gum / HLComms.lc
1 /****************************************************************
2 *                                                               *
3 *       High Level Communications Routines (HLComms.lc)         *
4 *                                                               *
5 *  Contains the high-level routines (i.e. communication         *
6 *  subsystem independent) used by GUM                           *
7 *  (c) The Parade/AQUA Projects, Glasgow University, 1995       *
8 *  Phil Trinder, Glasgow University, 12 December 1994           *
9 *                                                               *
10 *****************************************************************/
11 \begin{code}
12 #ifdef PAR /* whole file */
13
14 #define NON_POSIX_SOURCE /* so says Solaris */
15
16 #include "rtsdefs.h"
17 #include "HLC.h"
18 \end{code}
19
20 \section{GUM Message Sending and Unpacking Functions}
21
22
23 @SendFetch@ packs the two global addresses and a load into a message +
24 sends it.  
25
26 \begin{code}
27 static W_ *gumPackBuffer;
28
29 void 
30 InitMoreBuffers(STG_NO_ARGS)
31 {
32     gumPackBuffer
33       = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
34 }
35
36 void
37 sendFetch(rga, lga, load)
38 globalAddr *rga, *lga;
39 int load;
40 {
41     CostCentre Save_CCC = CCC;
42
43     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
44     CCC->scc_count++;
45
46     ASSERT(rga->weight > 0 && lga->weight > 0);
47 #ifdef FETCH_DEBUG    
48     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
49       rga->loc.gc.gtid, rga->loc.gc.slot, load);
50 #endif
51     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
52       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
53       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
54
55     CCC = Save_CCC;
56 }
57 \end{code}
58
59 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
60
61 \begin{code}
62
63 static void
64 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
65 {
66     long buf[6];
67
68     GetArgs(buf, 6); 
69     lga->weight = 1;
70     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
71     lga->loc.gc.slot = (int) buf[1];
72
73     rga->weight = (unsigned) buf[2];
74     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
75     rga->loc.gc.slot = (int) buf[4];
76
77     *load = (int) buf[5];
78
79     ASSERT(rga->weight > 0);
80 }
81 \end{code}
82
83 @SendResume@ packs the remote blocking queue's GA and data into a message 
84 and sends it.
85
86 \begin{code}
87 void
88 sendResume(rga, nelem, data)
89 globalAddr *rga;
90 int nelem;
91 P_ data;
92 {
93     CostCentre Save_CCC = CCC;
94
95     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
96     CCC->scc_count++;
97
98 #ifdef RESUME_DEBUG
99     PrintPacket(data);
100     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
101       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
102 #endif
103
104     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105       (W_) rga->weight, (W_) rga->loc.gc.slot);
106
107     CCC = Save_CCC;
108 }
109 \end{code}
110
111 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
112
113 \begin{code}
114 static void
115 blockFetch(P_ bf, P_ bh)
116 {
117     switch (INFO_TYPE(INFO_PTR(bh))) {
118     case INFO_BH_TYPE:
119         BF_LINK(bf) = Nil_closure;
120         SET_INFO_PTR(bh, BQ_info);
121         BQ_ENTRIES(bh) = (W_) bf;
122
123 #ifdef GC_MUT_REQUIRED
124         /*
125          * If we modify a black hole in the old generation, we have to
126          * make sure it goes on the mutables list
127          */
128
129         if (bh <= StorageMgrInfo.OldLim) {
130             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
131             StorageMgrInfo.OldMutables = bh;
132         } else
133             MUT_LINK(bh) = MUT_NOT_LINKED;
134 #endif
135         break;
136     case INFO_BQ_TYPE:
137         BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
138         BQ_ENTRIES(bh) = (W_) bf;
139         break;
140     case INFO_FMBQ_TYPE:
141         BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
142         FMBQ_ENTRIES(bh) = (W_) bf;
143         break;
144     case INFO_SPEC_RBH_TYPE:
145         BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
146         SPEC_RBH_BQ(bh) = (W_) bf;
147         break;
148     case INFO_GEN_RBH_TYPE:
149         BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
150         GEN_RBH_BQ(bh) = (W_) bf;
151         break;
152     default:
153         fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
154           (W_) bh, INFO_PTR(bh));
155         EXIT(EXIT_FAILURE);
156     }
157 }
158 \end{code}
159
160 @processFetches@ constructs and sends resume messages for every
161 @BlockedFetch@ which is ready to be awakened.
162
163 \begin{code}
164 extern P_ PendingFetches;
165
166 void
167 processFetches()
168 {
169     P_ bf;
170     P_ next;
171     P_ closure;
172     P_ ip;
173     globalAddr rga;
174     
175     for (bf = PendingFetches; bf != Nil_closure; bf = next) {
176         next = BF_LINK(bf);
177
178         /*
179          * Find the target at the end of the indirection chain, and
180          * process it in much the same fashion as the original target
181          * of the fetch.  Though we hope to find graph here, we could
182          * find a black hole (of any flavor) or even a FetchMe.
183          */
184         closure = BF_NODE(bf);
185         while (IS_INDIRECTION(INFO_PTR(closure)))
186             closure = (P_) IND_CLOSURE_PTR(closure);
187         ip = (P_) INFO_PTR(closure);
188
189         if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
190             /* Forward the Fetch to someone else */
191             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
192             rga.loc.gc.slot = (int) BF_SLOT(bf);
193             rga.weight = (unsigned) BF_WEIGHT(bf);
194
195             sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
196         } else if (IS_BLACK_HOLE(ip)) {
197             BF_NODE(bf) = closure;
198             blockFetch(bf, closure);
199         } else {
200             /* We now have some local graph to send back */
201             W_ size;
202             P_ graph;
203
204             if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
205                 PendingFetches = bf;
206                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
207                 SAVE_Hp -= PACK_HEAP_REQUIRED;
208                 bf = PendingFetches;
209                 next = BF_LINK(bf);
210                 closure = BF_NODE(bf);
211                 graph = PackNearbyGraph(closure, &size);
212                 ASSERT(graph != NULL);
213             }
214             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
215             rga.loc.gc.slot = (int) BF_SLOT(bf);
216             rga.weight = (unsigned) BF_WEIGHT(bf);
217
218             sendResume(&rga, size, graph);
219         }
220     }
221     PendingFetches = Nil_closure;
222 }
223
224 \end{code}
225
226 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
227
228 \begin{code}
229
230 static void
231 unpackResume(globalAddr *lga, int *nelem, W_ *data)
232 {
233     long buf[3];
234
235     GetArgs(buf, 3); 
236     lga->weight = (unsigned) buf[0];
237     lga->loc.gc.gtid = mytid;
238     lga->loc.gc.slot = (int) buf[1];
239
240     *nelem = (int) buf[2];
241     GetArgs(data, *nelem);
242 }
243 \end{code}
244
245 @SendAck@ packs the global address being acknowledged, together with
246 an array of global addresses for any closures shipped and sends them.
247 \begin{code}
248
249 void
250 sendAck(task, ngas, gagamap)
251 GLOBAL_TASK_ID task;
252 int ngas;
253 globalAddr *gagamap;
254 {
255     static long *buffer;
256     long *p;
257     int i;
258     CostCentre Save_CCC = CCC;
259
260     buffer = (long *) gumPackBuffer;
261
262     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
263     CCC->scc_count++;
264
265     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
266         ASSERT(gagamap[1].weight > 0);
267         p[0] = (long) gagamap->weight;
268         p[1] = (long) gagamap->loc.gc.gtid;
269         p[2] = (long) gagamap->loc.gc.slot;
270         gagamap++;
271         p[3] = (long) gagamap->weight;
272         p[4] = (long) gagamap->loc.gc.gtid;
273         p[5] = (long) gagamap->loc.gc.slot;
274         gagamap++;
275     }
276 #ifdef ACK_DEBUG    
277     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
278 #endif
279     SendOpN(PP_ACK, task, p - buffer, buffer);
280
281     CCC = Save_CCC;
282 }
283 \end{code}
284
285 @unpackAck@ unpacks an Acknowledgement message into a Global address,
286 a count of the number of global addresses following and a map of 
287 Global addresses
288
289 \begin{code}
290
291 static void
292 unpackAck(int *ngas, globalAddr *gagamap)
293 {
294     long GAarraysize;
295     long buf[6];
296
297     GetArgs(&GAarraysize, 1);
298
299     *ngas = GAarraysize / 6;
300
301     while (GAarraysize > 0) {
302         GetArgs(buf, 6);
303         gagamap->weight = (unsigned) buf[0];
304         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
305         gagamap->loc.gc.slot = (int) buf[2];
306         gagamap++;
307         gagamap->weight = (unsigned) buf[3];
308         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
309         gagamap->loc.gc.slot = (int) buf[5];
310         ASSERT(gagamap->weight > 0);
311         gagamap++;
312         GAarraysize -= 6;
313     }
314 }
315 \end{code}
316
317 @SendFish@ packs the global address being acknowledged, together with
318 an array of global addresses for any closures shipped and sends them.
319 \begin{code}
320
321 void
322 sendFish(destPE, origPE, age, history, hunger)
323 GLOBAL_TASK_ID destPE, origPE;
324 int age, history, hunger;
325 {
326     CostCentre Save_CCC = CCC;
327
328     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
329     CCC->scc_count++;
330
331 #ifdef FISH_DEBUG
332     fprintf(stderr,"Sending Fish to %lx\n", destPE);
333 #endif
334     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
335     if (origPE == mytid)
336         fishing = rtsTrue;
337
338     CCC = Save_CCC;
339 }
340 \end{code}
341
342 @unpackFish@ unpacks a FISH message into the global task id of the
343 originating PE and 3 data fields: the age, history and hunger of the
344 fish. The history + hunger are not currently used.
345
346 \begin{code}
347
348 static void
349 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
350 {
351     long buf[4];
352
353     GetArgs(buf, 4);
354
355     *origPE = (GLOBAL_TASK_ID) buf[0];
356     *age = (int) buf[1];
357     *history = (int) buf[2];
358     *hunger = (int) buf[3];
359 }
360 \end{code}
361
362 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
363 to.
364
365 \begin{code}
366 void
367 sendFree(pe, nelem, data)
368 GLOBAL_TASK_ID pe;
369 int nelem;
370 P_ data;
371 {
372     CostCentre Save_CCC = CCC;
373
374     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
375     CCC->scc_count++;
376
377 #ifdef FREE_DEBUG
378     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
379 #endif
380     SendOpN(PP_FREE, pe, nelem, data);
381
382     CCC = Save_CCC;
383 }
384
385 \end{code}
386
387 @unpackFree@ unpacks a FREE message into the amount of data shipped and
388 a data block.
389
390 \begin{code}
391
392 static void
393 unpackFree(int *nelem, W_ *data)
394 {
395     long buf[1];
396
397     GetArgs(buf, 1);
398     *nelem = (int) buf[0];
399     GetArgs(data, *nelem);
400 }
401 \end{code}
402
403 @SendSchedule@ sends a closure to be evaluated in response to a Fish
404 message. The message is directed to the PE that originated the Fish
405 (origPE), and includes the packed closure (data) along with its size
406 (nelem).
407
408 \begin{code}
409
410 void
411 sendSchedule(origPE, nelem, data)
412 GLOBAL_TASK_ID origPE;
413 int nelem;
414 P_ data;
415 {
416
417     CostCentre Save_CCC = CCC;
418
419     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
420     CCC->scc_count++;
421
422 #ifdef SCHEDULE_DEBUG
423     PrintPacket(data);
424     fprintf(stderr, "Sending Schedule to %x\n", origPE);
425 #endif
426
427     SendOpN(PP_SCHEDULE, origPE, nelem, data);
428
429     CCC = Save_CCC;
430 }
431 \end{code}
432
433 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
434 the closure shipped, the amount of data shipped (nelem) and the data
435 block (data).
436
437 \begin{code}
438
439 static void
440 unpackSchedule(int *nelem, W_ *data)
441 {
442     long buf[1];
443
444     GetArgs(buf, 1);
445     *nelem = (int) buf[0];
446     GetArgs(data, *nelem);
447 }
448 \end{code}
449
450 \section{Message-Processing Functions}
451
452 The following routines process incoming GUM messages. Often reissuing
453 messages in response.
454
455 @processFish@ unpacks a fish message, reissuing it if it's our own,
456 sending work if we have it or sending it onwards otherwise.
457
458 \begin{code}
459 static void
460 processFish(STG_NO_ARGS)
461 {
462     GLOBAL_TASK_ID origPE;
463     int age, history, hunger;
464
465     unpackFish(&origPE, &age, &history, &hunger);
466
467     if (origPE == mytid) {
468         fishing = rtsFalse;
469     } else {
470         P_ spark;
471
472         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
473             W_ size;
474             P_ graph;
475
476             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
477                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
478                 SAVE_Hp -= PACK_HEAP_REQUIRED;
479                 /* Now go back and try again */
480             } else {
481                 sendSchedule(origPE, size, graph);
482                 DisposeSpark(spark);
483                 break;
484             }
485         }
486         if (spark == NULL) {
487             /* We have no sparks to give */
488             if (age < FISH_LIFE_EXPECTANCY)
489                 sendFish(choosePE(), origPE,
490                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
491
492             /* Send it home to die */
493             else
494                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
495         }
496     }
497 }                               /* processFish */
498 \end{code}
499
500 @processFetch@ either returns the requested data (if available) 
501 or blocks the remote blocking queue on a black hole (if not).
502
503 \begin{code}
504 static void
505 processFetch(STG_NO_ARGS)
506 {
507     globalAddr ga, rga;
508     int load;
509
510     P_ closure;
511     P_ ip;
512
513     unpackFetch(&ga, &rga, &load);
514 #ifdef FETCH_DEBUG
515     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
516       ga.loc.gc.gtid, ga.loc.gc.slot,
517       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
518 #endif
519
520     closure = GALAlookup(&ga);
521     ip = (P_) INFO_PTR(closure);
522
523     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
524         /* Forward the Fetch to someone else */
525         sendFetch(FETCHME_GA(closure), &rga, load);
526     } else if (rga.loc.gc.gtid == mytid) {
527         /* Our own FETCH forwarded back around to us */
528         P_ fmbq = GALAlookup(&rga);
529
530         /* We may have already discovered that the fetch target is our own. */
531         if (fmbq != closure) 
532             CommonUp(fmbq, closure);
533         (void) addWeight(&rga);
534     } else if (IS_BLACK_HOLE(ip)) {
535         /* This includes RBH's and FMBQ's */
536         P_ bf;
537
538         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
539             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
540             closure = GALAlookup(&ga);
541             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
542         }
543         ASSERT(GALAlookup(&rga) == NULL);
544
545         SET_BF_HDR(bf, BF_info, bogosity);
546         BF_NODE(bf) = closure;
547         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
548         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
549         BF_WEIGHT(bf) = (W_) rga.weight;
550         blockFetch(bf, closure);
551
552 #ifdef FETCH_DEBUG
553         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
554           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
555 #endif
556
557     } else {                    
558         /* The target of the FetchMe is some local graph */
559         W_ size;
560         P_ graph;
561
562         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
563             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
564             SAVE_Hp -= PACK_HEAP_REQUIRED;
565             closure = GALAlookup(&ga);
566             graph = PackNearbyGraph(closure, &size);
567             ASSERT(graph != NULL);
568         }
569         sendResume(&rga, size, graph);
570     }
571 }
572 \end{code}
573
574 @processFree@ unpacks a FREE message and adds the weights to our GAs.
575
576 \begin{code}
577 static void
578 processFree(STG_NO_ARGS)
579 {
580     int nelem;
581     static W_ *freeBuffer;
582     int i;
583     globalAddr ga;
584
585     freeBuffer = gumPackBuffer;
586     unpackFree(&nelem, freeBuffer);
587 #ifdef FREE_DEBUG
588     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
589 #endif
590     ga.loc.gc.gtid = mytid;
591     for (i = 0; i < nelem;) {
592         ga.weight = (unsigned) freeBuffer[i++];
593         ga.loc.gc.slot = (int) freeBuffer[i++];
594 #ifdef FREE_DEBUG
595         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
596           ga.loc.gc.slot, ga.weight);
597 #endif
598         (void) addWeight(&ga);
599     }
600 }
601 \end{code}
602
603 @processResume@ unpacks a RESUME message into the graph, filling in
604 the LA -> GA, and GA -> LA tables. Threads blocked on the original
605 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
606 is converted into an indirection.  Finally it sends an ACK in response
607 which contains any newly allocated GAs.
608
609 \begin{code}
610
611 static void
612 processResume(GLOBAL_TASK_ID sender)
613 {
614     int nelem;
615     W_ nGAs;
616     static W_ *packBuffer;
617     P_ newGraph;
618     P_ old;
619     globalAddr lga;
620     globalAddr *gagamap;
621
622     packBuffer = gumPackBuffer;
623     unpackResume(&lga, &nelem, packBuffer);
624
625 #ifdef RESUME_DEBUG
626     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
627       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
628     PrintPacket(packBuffer);
629 #endif
630
631     /* 
632      * We always unpack the incoming graph, even if we've received the
633      * requested node in some other data packet (and already awakened
634      * the blocking queue).
635      */
636     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
637         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
638         SAVE_Hp -= packBuffer[0];
639     }
640
641     /* Do this *after* GC; we don't want to release the object early! */
642
643     if (lga.weight > 0)
644         (void) addWeight(&lga);
645
646     old = GALAlookup(&lga);
647
648     if (RTSflags.ParFlags.granSimStats) {
649         P_ tso = NULL;
650
651         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
652             for(tso = (P_) FMBQ_ENTRIES(old); 
653               TSO_LINK(tso) != Nil_closure; 
654               tso = TSO_LINK(tso))
655                 ;
656         }
657         DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender));
658     }
659
660     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
661     ASSERT(newGraph != NULL);
662
663     /* 
664      * Sometimes, unpacking will common up the resumee with the
665      * incoming graph, but if it hasn't, we'd better do so now.
666      */
667    
668     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
669         CommonUp(old, newGraph);
670
671 #ifdef RESUME_DEBUG
672     DebugPrintGAGAMap(gagamap, nGAs);
673 #endif
674
675     sendAck(sender, nGAs, gagamap);
676 }
677 \end{code}
678
679 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
680 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
681 the local spark queue.  Finally it sends an ACK in response
682 which contains any newly allocated GAs.
683
684 \begin{code}
685 static void
686 processSchedule(GLOBAL_TASK_ID sender)
687 {
688     int nelem;
689     int space_required;
690     rtsBool success;
691     static W_ *packBuffer;
692     W_ nGAs;
693     P_ newGraph;
694     globalAddr *gagamap;
695
696     packBuffer = gumPackBuffer;         /* HWL */
697     unpackSchedule(&nelem, packBuffer);
698
699 #ifdef SCHEDULE_DEBUG
700     fprintf(stderr, "Rcvd Schedule\n");
701     PrintPacket(packBuffer);
702 #endif
703
704     /*
705      * For now, the graph is a closure to be sparked as an advisory
706      * spark, but in future it may be a complete spark with
707      * required/advisory status, priority etc.
708      */
709
710     space_required = packBuffer[0];
711     if (SAVE_Hp + space_required >= SAVE_HpLim) {
712         ReallyPerformThreadGC(space_required, rtsFalse);
713         SAVE_Hp -= space_required;
714     }
715     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
716     ASSERT(newGraph != NULL);
717     success = Spark(newGraph, rtsFalse);
718     ASSERT(success);
719
720 #ifdef SCHEDULE_DEBUG
721     DebugPrintGAGAMap(gagamap, nGAs);
722 #endif
723
724     if (nGAs > 0)
725         sendAck(sender, nGAs, gagamap);
726
727     fishing = rtsFalse;
728 }
729 \end{code}
730
731 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
732 (which represent shared thunks that have been shipped) into fetch-mes
733 to remote GAs.
734
735 \begin{code}
736 static void
737 processAck(STG_NO_ARGS)
738 {
739     int nGAs;
740     globalAddr *gaga;
741
742     globalAddr gagamap[MAX_GAS * 2];
743
744     unpackAck(&nGAs, gagamap);
745
746 #ifdef ACK_DEBUG
747     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
748     DebugPrintGAGAMap(gagamap, nGAs);
749 #endif
750
751     /*
752      * For each (oldGA, newGA) pair, set the GA of the corresponding
753      * thunk to the newGA, convert the thunk to a FetchMe, and return
754      * the weight from the oldGA.
755      */
756     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
757         P_ old = GALAlookup(gaga);
758         P_ new = GALAlookup(gaga + 1);
759
760         if (new == NULL) {
761             /* We don't have this closure, so we make a fetchme for it */
762             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
763
764             convertToFetchMe(old, ga);
765         } else {
766             /* 
767              * Oops...we've got this one already; update the RBH to
768              * point to the object we already know about, whatever it
769              * happens to be.
770              */
771             CommonUp(old, new);
772
773             /* 
774              * Increase the weight of the object by the amount just
775              * received in the second part of the ACK pair.
776              */
777             (void) addWeight(gaga + 1);
778         }
779         (void) addWeight(gaga);
780     }
781 }
782 \end{code}
783
784 \section{GUM Message Processor}
785
786 @processMessages@ processes any messages that have arrived, calling
787 appropriate routines depending on the message tag
788 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
789 present and performs a blocking receive! During profiling it
790 busy-waits in order to record idle time.
791
792 \begin{code}
793 void
794 processMessages(STG_NO_ARGS)
795 {
796     PACKET packet;
797     OPCODE opcode;
798     CostCentre Save_CCC;
799
800     /* Temporary Test Definitions */
801     GLOBAL_TASK_ID task;
802
803     Save_CCC = CCC;
804     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
805     
806     do {
807         if (RTSflags.CcFlags.doCostCentres) {
808             CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
809             CCC->scc_count++;
810
811             while (!PacketsWaiting())
812                 /*busy-wait loop*/;
813
814             CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
815         }
816
817         packet = GetPacket();   /* Get next message; block until one available */
818         CCC->scc_count++;
819
820         get_opcode_and_sender(packet, &opcode, &task);
821
822         switch (opcode) {
823
824         case PP_FINISH:
825             EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
826                                  * else */
827             break;
828
829         case PP_FETCH:
830             processFetch();
831             break;
832
833         case PP_RESUME:
834             processResume(task);
835             break;
836
837         case PP_ACK:
838             processAck();
839             break;
840
841         case PP_FISH:
842             processFish();
843             break;
844
845         case PP_FREE:
846             processFree();
847             break;
848
849         case PP_SCHEDULE:
850             processSchedule(task);
851             break;
852
853         default:
854             /* Anything we're not prepared to deal with. */
855             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
856               mytid, opcode, task);
857
858             EXIT(EXIT_FAILURE);
859         }                       /* switch */
860
861     } while (PacketsWaiting()); /* While there are messages: process them */
862     CCC = Save_CCC;
863 }                               /* processMessages */
864 \end{code}
865
866 \section{Exception Handlers}
867
868
869 @Comms_Harness_Exception@ is an exception handler that tests out the different 
870 GUM messages. 
871
872 \begin{code}
873 void
874 Comms_Harness_Exception(packet)
875 PACKET packet;
876 {
877     int i, load;
878     globalAddr ga,bqga;
879 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
880     OPCODE opcode = Opcode(packet);
881     GLOBAL_TASK_ID task;
882     
883 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
884
885     switch (opcode) {
886
887     case PP_IO_INIT:
888         IAmMainThread = rtsTrue;        /* This processor is the IO task */
889 /*        fprintf(stderr,"I am Main Thread\n"); */
890         break;
891
892     case PP_FINISH:
893         EXIT(EXIT_SUCCESS);
894         break;
895
896     case PP_FETCH:
897         {
898             W_ data[11];
899             get_opcode_and_sender(packet,&opcode,&task);
900             fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
901             unpackFetch(&ga,&bqga,&load);
902             fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
903                             ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight, 
904                             bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
905             /*Send Resume in Response*/
906             for (i=0; i <= 10; ++i) data[i] = i;
907             sendResume(&bqga,11,data);      
908         }
909         break;
910
911     case PP_ACK:
912         {
913             int nGAs;
914             globalAddr gagamap[MAX_GAS*2];
915
916             get_opcode_and_sender(packet,&opcode,&task);
917             fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
918             unpackAck(&nGAs,gagamap);
919 #ifdef DEBUG
920             DebugPrintGAGAMap(gagamap,nGAs);
921 #endif
922         }
923         break;
924
925     case PP_FISH:
926         {
927             GLOBAL_TASK_ID origPE;
928             int age, history, hunger;
929             globalAddr testGA;
930             StgWord testData[6];
931
932             get_opcode_and_sender(packet,&opcode,&task);
933             fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
934             unpackFish(&origPE, &age, &history, &hunger);
935             fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
936                             origPE, age, history, hunger);
937
938             testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
939             for (i=0; i <= 5; ++i) testData[i] = 40+i;
940             sendSchedule(origPE,6,testData);        
941         }
942         break;
943
944     case PP_SCHEDULE:
945         {                               /* Test variables */
946             int nelem;
947             int testData[6];
948
949             get_opcode_and_sender(packet,&opcode,&task);
950             fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
951             unpackSchedule(&nelem, &testData);
952             fprintf(stderr,"In PE, nelem = %d \n", nelem);
953             for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
954             fprintf(stderr,"\n");
955         }
956         break;
957
958       /* Anything we're not prepared to deal with.  Note that ALL
959        * opcodes are discarded during termination -- this helps
960        * prevent bizarre race conditions.
961        */
962       default:
963         if (!GlobalStopPending) 
964           {
965             GLOBAL_TASK_ID ErrorTask;
966             int opcode;
967
968             get_opcode_and_sender(packet,&opcode,&ErrorTask);
969             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
970                     mytid, opcode, ErrorTask );
971             
972             PEShutDown();
973             
974             EXIT(EXIT_FAILURE);
975           }
976     }
977 }
978 \end{code}
979
980 @STG_Exception@ handles real communication exceptions
981
982 \begin{code}
983 void
984 STG_Exception(packet)
985 PACKET packet;
986 {
987 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
988     OPCODE opcode = Opcode(packet);
989     
990 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
991
992     switch (opcode) {
993
994     case PP_IO_INIT:
995         IAmMainThread = rtsTrue;        /* This processor is the IO task */
996 /*        fprintf(stderr,"I am Main Thread\n"); */
997         break;
998
999     case PP_FINISH:
1000         EXIT(EXIT_SUCCESS);
1001         break;
1002
1003       /* Anything we're not prepared to deal with.  Note that ALL opcodes are discarded
1004          during termination -- this helps prevent bizarre race conditions.
1005       */
1006       default:
1007         if (!GlobalStopPending) 
1008           {
1009             GLOBAL_TASK_ID ErrorTask;
1010             int opcode;
1011
1012             get_opcode_and_sender(packet,&opcode,&ErrorTask);
1013             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1014                     mytid, opcode, ErrorTask );
1015             
1016             EXIT(EXIT_FAILURE);
1017           }
1018     }
1019 }
1020 \end{code}
1021
1022 \section{Miscellaneous Functions}
1023
1024 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1025 Important properties:
1026 o it varies during execution, even if the PE is idle
1027 o it's different for each PE
1028 o we never send a fish to ourselves
1029
1030 \begin{code}
1031 extern long lrand48 (STG_NO_ARGS);
1032
1033 GLOBAL_TASK_ID
1034 choosePE(STG_NO_ARGS)
1035 {
1036     long temp;
1037
1038     temp = lrand48() % nPEs;
1039     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
1040         temp = (temp + 1) % nPEs;
1041     }
1042     return PEs[temp];
1043 }
1044 \end{code}
1045
1046 @WaitForTermination@ enters an infinite loop waiting for the
1047 termination sequence to be completed.
1048
1049 \begin{code}
1050 void
1051 WaitForTermination(STG_NO_ARGS)
1052 {
1053   do {
1054     PACKET p = GetPacket();
1055     HandleException(p);
1056   } while (rtsTrue);
1057 }
1058 \end{code}
1059
1060 \begin{code}
1061 #ifdef DEBUG
1062 void
1063 DebugPrintGAGAMap(gagamap, nGAs)
1064 globalAddr *gagamap;
1065 int nGAs;
1066 {
1067     int i;
1068
1069     for (i = 0; i < nGAs; ++i, gagamap += 2)
1070         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1071           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1072           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1073 }
1074 #endif
1075 \end{code}
1076
1077 \begin{code}
1078
1079 static PP_ freeMsgBuffer = NULL;
1080 static int *freeMsgIndex = NULL;
1081
1082 void
1083 prepareFreeMsgBuffers(STG_NO_ARGS)
1084 {
1085     int i;
1086
1087     /* Allocate the freeMsg buffers just once and then hang onto them. */
1088
1089     if (freeMsgIndex == NULL) {
1090
1091         freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1092         freeMsgBuffer = (PP_)  stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1093
1094         for(i = 0; i < nPEs; i++) {
1095             if (i != thisPE) {
1096               freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1097                                         "prepareFreeMsgBuffers (Buffer #i)");
1098             }
1099         }
1100     }
1101
1102     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1103     for (i = 0; i < nPEs; i++)
1104         freeMsgIndex[i] = 0;
1105 }
1106
1107 void
1108 freeRemoteGA(pe, ga)
1109 int pe;
1110 globalAddr *ga;
1111 {
1112     int i;
1113
1114     ASSERT(GALAlookup(ga) == NULL);
1115
1116     if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1117 #ifdef FREE_DEBUG
1118         fprintf(stderr, "Filled a free message buffer\n");      
1119 #endif
1120         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1121         i = 0;
1122     }
1123     freeMsgBuffer[pe][i++] = (W_) ga->weight;
1124     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1125     freeMsgIndex[pe] = i;
1126 #ifdef DEBUG
1127     ga->weight = 0x0f0f0f0f;
1128     ga->loc.gc.gtid = 0x666;
1129     ga->loc.gc.slot = 0xdeaddead;
1130 #endif
1131 }
1132
1133 void
1134 sendFreeMessages(STG_NO_ARGS)
1135 {
1136     int i;
1137
1138     for (i = 0; i < nPEs; i++) {
1139         if (freeMsgIndex[i] > 0)
1140             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1141     }
1142 }
1143
1144 #endif /* PAR -- whole file */
1145 \end{code}