4599972b59a899850909af9288c964bcec3e952d
[ghc-hetmet.git] / ghc / runtime / gum / HLComms.lc
1 /****************************************************************
2 *                                                               *
3 *       High Level Communications Routines (HLComms.lc)         *
4 *                                                               *
5 *  Contains the high-level routines (i.e. communication         *
6 *  subsystem independent) used by GUM                           *
7 *  (c) The Parade/AQUA Projects, Glasgow University, 1995       *
8 *  Phil Trinder, Glasgow University, 12 December 1994           *
9 *                                                               *
10 *****************************************************************/
11 \begin{code}
12 #ifdef PAR /* whole file */
13
14 #define NON_POSIX_SOURCE /* so says Solaris */
15
16 #include "rtsdefs.h"
17 #include "HLC.h"
18 \end{code}
19
20 \section{GUM Message Sending and Unpacking Functions}
21
22
23 @SendFetch@ packs the two global addresses and a load into a message +
24 sends it.  
25
26 \begin{code}
27 static W_ *gumPackBuffer;
28
29 void 
30 InitMoreBuffers(STG_NO_ARGS)
31 {
32     gumPackBuffer
33       = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
34 }
35
36 void
37 sendFetch(rga, lga, load)
38 globalAddr *rga, *lga;
39 int load;
40 {
41     CostCentre Save_CCC = CCC;
42
43     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
44     CCC->scc_count++;
45
46     ASSERT(rga->weight > 0 && lga->weight > 0);
47 #ifdef FETCH_DEBUG    
48     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
49       rga->loc.gc.gtid, rga->loc.gc.slot, load);
50 #endif
51     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
52       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
53       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
54
55     CCC = Save_CCC;
56 }
57 \end{code}
58
59 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
60
61 \begin{code}
62
63 static void
64 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
65 {
66     long buf[6];
67
68     GetArgs(buf, 6); 
69     lga->weight = 1;
70     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
71     lga->loc.gc.slot = (int) buf[1];
72
73     rga->weight = (unsigned) buf[2];
74     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
75     rga->loc.gc.slot = (int) buf[4];
76
77     *load = (int) buf[5];
78
79     ASSERT(rga->weight > 0);
80 }
81 \end{code}
82
83 @SendResume@ packs the remote blocking queue's GA and data into a message 
84 and sends it.
85
86 \begin{code}
87 void
88 sendResume(rga, nelem, data)
89 globalAddr *rga;
90 int nelem;
91 P_ data;
92 {
93     CostCentre Save_CCC = CCC;
94
95     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
96     CCC->scc_count++;
97
98 #ifdef RESUME_DEBUG
99     PrintPacket(data);
100     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
101       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
102 #endif
103
104     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105       (W_) rga->weight, (W_) rga->loc.gc.slot);
106
107     CCC = Save_CCC;
108 }
109 \end{code}
110
111 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
112
113 \begin{code}
114 static void
115 blockFetch(P_ bf, P_ bh)
116 {
117     switch (INFO_TYPE(INFO_PTR(bh))) {
118     case INFO_BH_TYPE:
119         BF_LINK(bf) = Nil_closure;
120         SET_INFO_PTR(bh, BQ_info);
121         BQ_ENTRIES(bh) = (W_) bf;
122
123 #ifdef GC_MUT_REQUIRED
124         /*
125          * If we modify a black hole in the old generation, we have to
126          * make sure it goes on the mutables list
127          */
128
129         if (bh <= StorageMgrInfo.OldLim) {
130             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
131             StorageMgrInfo.OldMutables = bh;
132         } else
133             MUT_LINK(bh) = MUT_NOT_LINKED;
134 #endif
135         break;
136     case INFO_BQ_TYPE:
137         BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
138         BQ_ENTRIES(bh) = (W_) bf;
139         break;
140     case INFO_FMBQ_TYPE:
141         BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
142         FMBQ_ENTRIES(bh) = (W_) bf;
143         break;
144     case INFO_SPEC_RBH_TYPE:
145         BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
146         SPEC_RBH_BQ(bh) = (W_) bf;
147         break;
148     case INFO_GEN_RBH_TYPE:
149         BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
150         GEN_RBH_BQ(bh) = (W_) bf;
151         break;
152     default:
153         fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
154           (W_) bh, INFO_PTR(bh));
155         EXIT(EXIT_FAILURE);
156     }
157 }
158 \end{code}
159
160 @processFetches@ constructs and sends resume messages for every
161 @BlockedFetch@ which is ready to be awakened.
162
163 \begin{code}
164 extern P_ PendingFetches;
165
166 void
167 processFetches()
168 {
169     P_ bf;
170     P_ next;
171     P_ closure;
172     P_ ip;
173     globalAddr rga;
174     
175     for (bf = PendingFetches; bf != Nil_closure; bf = next) {
176         next = BF_LINK(bf);
177
178         /*
179          * Find the target at the end of the indirection chain, and
180          * process it in much the same fashion as the original target
181          * of the fetch.  Though we hope to find graph here, we could
182          * find a black hole (of any flavor) or even a FetchMe.
183          */
184         closure = BF_NODE(bf);
185         while (IS_INDIRECTION(INFO_PTR(closure)))
186             closure = (P_) IND_CLOSURE_PTR(closure);
187         ip = (P_) INFO_PTR(closure);
188
189         if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
190             /* Forward the Fetch to someone else */
191             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
192             rga.loc.gc.slot = (int) BF_SLOT(bf);
193             rga.weight = (unsigned) BF_WEIGHT(bf);
194
195             sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
196         } else if (IS_BLACK_HOLE(ip)) {
197             BF_NODE(bf) = closure;
198             blockFetch(bf, closure);
199         } else {
200             /* We now have some local graph to send back */
201             W_ size;
202             P_ graph;
203
204             if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
205                 PendingFetches = bf;
206                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
207                 SAVE_Hp -= PACK_HEAP_REQUIRED;
208                 bf = PendingFetches;
209                 next = BF_LINK(bf);
210                 closure = BF_NODE(bf);
211                 graph = PackNearbyGraph(closure, &size);
212                 ASSERT(graph != NULL);
213             }
214             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
215             rga.loc.gc.slot = (int) BF_SLOT(bf);
216             rga.weight = (unsigned) BF_WEIGHT(bf);
217
218             sendResume(&rga, size, graph);
219         }
220     }
221     PendingFetches = Nil_closure;
222 }
223
224 \end{code}
225
226 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
227
228 \begin{code}
229
230 static void
231 unpackResume(globalAddr *lga, int *nelem, W_ *data)
232 {
233     long buf[3];
234
235     GetArgs(buf, 3); 
236     lga->weight = (unsigned) buf[0];
237     lga->loc.gc.gtid = mytid;
238     lga->loc.gc.slot = (int) buf[1];
239
240     *nelem = (int) buf[2];
241     GetArgs(data, *nelem);
242 }
243 \end{code}
244
245 @SendAck@ packs the global address being acknowledged, together with
246 an array of global addresses for any closures shipped and sends them.
247 \begin{code}
248
249 void
250 sendAck(task, ngas, gagamap)
251 GLOBAL_TASK_ID task;
252 int ngas;
253 globalAddr *gagamap;
254 {
255     static long *buffer;
256     long *p;
257     int i;
258     CostCentre Save_CCC = CCC;
259
260     buffer = (long *) gumPackBuffer;
261
262     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
263     CCC->scc_count++;
264
265     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
266         ASSERT(gagamap[1].weight > 0);
267         p[0] = (long) gagamap->weight;
268         p[1] = (long) gagamap->loc.gc.gtid;
269         p[2] = (long) gagamap->loc.gc.slot;
270         gagamap++;
271         p[3] = (long) gagamap->weight;
272         p[4] = (long) gagamap->loc.gc.gtid;
273         p[5] = (long) gagamap->loc.gc.slot;
274         gagamap++;
275     }
276 #ifdef ACK_DEBUG    
277     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
278 #endif
279     SendOpN(PP_ACK, task, p - buffer, buffer);
280
281     CCC = Save_CCC;
282 }
283 \end{code}
284
285 @unpackAck@ unpacks an Acknowledgement message into a Global address,
286 a count of the number of global addresses following and a map of 
287 Global addresses
288
289 \begin{code}
290
291 static void
292 unpackAck(int *ngas, globalAddr *gagamap)
293 {
294     long GAarraysize;
295     long buf[6];
296
297     GetArgs(&GAarraysize, 1);
298
299     *ngas = GAarraysize / 6;
300
301     while (GAarraysize > 0) {
302         GetArgs(buf, 6);
303         gagamap->weight = (unsigned) buf[0];
304         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
305         gagamap->loc.gc.slot = (int) buf[2];
306         gagamap++;
307         gagamap->weight = (unsigned) buf[3];
308         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
309         gagamap->loc.gc.slot = (int) buf[5];
310         ASSERT(gagamap->weight > 0);
311         gagamap++;
312         GAarraysize -= 6;
313     }
314 }
315 \end{code}
316
317 @SendFish@ packs the global address being acknowledged, together with
318 an array of global addresses for any closures shipped and sends them.
319 \begin{code}
320
321 void
322 sendFish(destPE, origPE, age, history, hunger)
323 GLOBAL_TASK_ID destPE, origPE;
324 int age, history, hunger;
325 {
326     CostCentre Save_CCC = CCC;
327
328     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
329     CCC->scc_count++;
330
331 #ifdef FISH_DEBUG
332     fprintf(stderr,"Sending Fish to %lx\n", destPE);
333 #endif
334     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
335     if (origPE == mytid)
336         fishing = rtsTrue;
337
338     CCC = Save_CCC;
339 }
340 \end{code}
341
342 @unpackFish@ unpacks a FISH message into the global task id of the
343 originating PE and 3 data fields: the age, history and hunger of the
344 fish. The history + hunger are not currently used.
345
346 \begin{code}
347
348 static void
349 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
350 {
351     long buf[4];
352
353     GetArgs(buf, 4);
354
355     *origPE = (GLOBAL_TASK_ID) buf[0];
356     *age = (int) buf[1];
357     *history = (int) buf[2];
358     *hunger = (int) buf[3];
359 }
360 \end{code}
361
362 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
363 to.
364
365 \begin{code}
366 void
367 sendFree(pe, nelem, data)
368 GLOBAL_TASK_ID pe;
369 int nelem;
370 P_ data;
371 {
372     CostCentre Save_CCC = CCC;
373
374     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
375     CCC->scc_count++;
376
377 #ifdef FREE_DEBUG
378     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
379 #endif
380     SendOpN(PP_FREE, pe, nelem, data);
381
382     CCC = Save_CCC;
383 }
384
385 \end{code}
386
387 @unpackFree@ unpacks a FREE message into the amount of data shipped and
388 a data block.
389
390 \begin{code}
391
392 static void
393 unpackFree(int *nelem, W_ *data)
394 {
395     long buf[1];
396
397     GetArgs(buf, 1);
398     *nelem = (int) buf[0];
399     GetArgs(data, *nelem);
400 }
401 \end{code}
402
403 @SendSchedule@ sends a closure to be evaluated in response to a Fish
404 message. The message is directed to the PE that originated the Fish
405 (origPE), and includes the packed closure (data) along with its size
406 (nelem).
407
408 \begin{code}
409
410 void
411 sendSchedule(origPE, nelem, data)
412 GLOBAL_TASK_ID origPE;
413 int nelem;
414 P_ data;
415 {
416
417     CostCentre Save_CCC = CCC;
418
419     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
420     CCC->scc_count++;
421
422 #ifdef SCHEDULE_DEBUG
423     PrintPacket(data);
424     fprintf(stderr, "Sending Schedule to %x\n", origPE);
425 #endif
426
427     SendOpN(PP_SCHEDULE, origPE, nelem, data);
428
429     CCC = Save_CCC;
430 }
431 \end{code}
432
433 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
434 the closure shipped, the amount of data shipped (nelem) and the data
435 block (data).
436
437 \begin{code}
438
439 static void
440 unpackSchedule(int *nelem, W_ *data)
441 {
442     long buf[1];
443
444     GetArgs(buf, 1);
445     *nelem = (int) buf[0];
446     GetArgs(data, *nelem);
447 }
448 \end{code}
449
450 \section{Message-Processing Functions}
451
452 The following routines process incoming GUM messages. Often reissuing
453 messages in response.
454
455 @processFish@ unpacks a fish message, reissuing it if it's our own,
456 sending work if we have it or sending it onwards otherwise.
457
458 \begin{code}
459 static void
460 processFish(STG_NO_ARGS)
461 {
462     GLOBAL_TASK_ID origPE;
463     int age, history, hunger;
464
465     unpackFish(&origPE, &age, &history, &hunger);
466
467     if (origPE == mytid) {
468         fishing = rtsFalse;
469     } else {
470         P_ spark;
471
472         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
473             W_ size;
474             P_ graph;
475
476             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
477                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
478                 SAVE_Hp -= PACK_HEAP_REQUIRED;
479                 /* Now go back and try again */
480             } else {
481                 sendSchedule(origPE, size, graph);
482                 DisposeSpark(spark);
483                 break;
484             }
485         }
486         if (spark == NULL) {
487             /* We have no sparks to give */
488             if (age < FISH_LIFE_EXPECTANCY)
489                 sendFish(choosePE(), origPE,
490                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
491
492             /* Send it home to die */
493             else
494                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
495         }
496     }
497 }                               /* processFish */
498 \end{code}
499
500 @processFetch@ either returns the requested data (if available) 
501 or blocks the remote blocking queue on a black hole (if not).
502
503 \begin{code}
504 static void
505 processFetch(STG_NO_ARGS)
506 {
507     globalAddr ga, rga;
508     int load;
509
510     P_ closure;
511     P_ ip;
512
513     unpackFetch(&ga, &rga, &load);
514 #ifdef FETCH_DEBUG
515     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
516       ga.loc.gc.gtid, ga.loc.gc.slot,
517       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
518 #endif
519
520     closure = GALAlookup(&ga);
521     ip = (P_) INFO_PTR(closure);
522
523     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
524         /* Forward the Fetch to someone else */
525         sendFetch(FETCHME_GA(closure), &rga, load);
526     } else if (rga.loc.gc.gtid == mytid) {
527         /* Our own FETCH forwarded back around to us */
528         P_ fmbq = GALAlookup(&rga);
529
530         /* We may have already discovered that the fetch target is our own. */
531         if (fmbq != closure) 
532             CommonUp(fmbq, closure);
533         (void) addWeight(&rga);
534     } else if (IS_BLACK_HOLE(ip)) {
535         /* This includes RBH's and FMBQ's */
536         P_ bf;
537
538         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
539             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
540             closure = GALAlookup(&ga);
541             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
542         }
543         ASSERT(GALAlookup(&rga) == NULL);
544
545         SET_BF_HDR(bf, BF_info, bogosity);
546         BF_NODE(bf) = closure;
547         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
548         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
549         BF_WEIGHT(bf) = (W_) rga.weight;
550         blockFetch(bf, closure);
551
552 #ifdef FETCH_DEBUG
553         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
554           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
555 #endif
556
557     } else {                    
558         /* The target of the FetchMe is some local graph */
559         W_ size;
560         P_ graph;
561
562         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
563             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
564             SAVE_Hp -= PACK_HEAP_REQUIRED;
565             closure = GALAlookup(&ga);
566             graph = PackNearbyGraph(closure, &size);
567             ASSERT(graph != NULL);
568         }
569         sendResume(&rga, size, graph);
570     }
571 }
572 \end{code}
573
574 @processFree@ unpacks a FREE message and adds the weights to our GAs.
575
576 \begin{code}
577 static void
578 processFree(STG_NO_ARGS)
579 {
580     int nelem;
581     static W_ *freeBuffer;
582     int i;
583     globalAddr ga;
584
585     freeBuffer = gumPackBuffer;
586     unpackFree(&nelem, freeBuffer);
587 #ifdef FREE_DEBUG
588     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
589 #endif
590     ga.loc.gc.gtid = mytid;
591     for (i = 0; i < nelem;) {
592         ga.weight = (unsigned) freeBuffer[i++];
593         ga.loc.gc.slot = (int) freeBuffer[i++];
594 #ifdef FREE_DEBUG
595         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
596           ga.loc.gc.slot, ga.weight);
597 #endif
598         (void) addWeight(&ga);
599     }
600 }
601 \end{code}
602
603 @processResume@ unpacks a RESUME message into the graph, filling in
604 the LA -> GA, and GA -> LA tables. Threads blocked on the original
605 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
606 is converted into an indirection.  Finally it sends an ACK in response
607 which contains any newly allocated GAs.
608
609 \begin{code}
610
611 static void
612 processResume(GLOBAL_TASK_ID sender)
613 {
614     int nelem;
615     W_ nGAs;
616     static W_ *packBuffer;
617     P_ newGraph;
618     P_ old;
619     globalAddr lga;
620     globalAddr *gagamap;
621
622     packBuffer = gumPackBuffer;
623     unpackResume(&lga, &nelem, packBuffer);
624
625 #ifdef RESUME_DEBUG
626     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
627       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
628     PrintPacket(packBuffer);
629 #endif
630
631     /* 
632      * We always unpack the incoming graph, even if we've received the
633      * requested node in some other data packet (and already awakened
634      * the blocking queue).
635      */
636     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
637         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
638         SAVE_Hp -= packBuffer[0];
639     }
640
641     /* Do this *after* GC; we don't want to release the object early! */
642
643     if (lga.weight > 0)
644         (void) addWeight(&lga);
645
646     old = GALAlookup(&lga);
647
648     if (RTSflags.ParFlags.granSimStats) {
649         P_ tso = NULL;
650
651         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
652             for(tso = (P_) FMBQ_ENTRIES(old); 
653               TSO_LINK(tso) != Nil_closure; 
654               tso = TSO_LINK(tso))
655                 ;
656         }
657         DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender));
658     }
659
660     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
661     ASSERT(newGraph != NULL);
662
663     /* 
664      * Sometimes, unpacking will common up the resumee with the
665      * incoming graph, but if it hasn't, we'd better do so now.
666      */
667    
668     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
669         CommonUp(old, newGraph);
670
671 #ifdef RESUME_DEBUG
672     DebugPrintGAGAMap(gagamap, nGAs);
673 #endif
674
675     sendAck(sender, nGAs, gagamap);
676 }
677 \end{code}
678
679 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
680 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
681 the local spark queue.  Finally it sends an ACK in response
682 which contains any newly allocated GAs.
683
684 \begin{code}
685 static void
686 processSchedule(GLOBAL_TASK_ID sender)
687 {
688     int nelem;
689     int space_required;
690     rtsBool success;
691     static W_ *packBuffer;
692     W_ nGAs;
693     P_ newGraph;
694     globalAddr *gagamap;
695
696     packBuffer = gumPackBuffer;         /* HWL */
697     unpackSchedule(&nelem, packBuffer);
698
699 #ifdef SCHEDULE_DEBUG
700     fprintf(stderr, "Rcvd Schedule\n");
701     PrintPacket(packBuffer);
702 #endif
703
704     /*
705      * For now, the graph is a closure to be sparked as an advisory
706      * spark, but in future it may be a complete spark with
707      * required/advisory status, priority etc.
708      */
709
710     space_required = packBuffer[0];
711     if (SAVE_Hp + space_required >= SAVE_HpLim) {
712         ReallyPerformThreadGC(space_required, rtsFalse);
713         SAVE_Hp -= space_required;
714     }
715     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
716     ASSERT(newGraph != NULL);
717     success = Spark(newGraph, rtsFalse);
718     ASSERT(success);
719
720 #ifdef SCHEDULE_DEBUG
721     DebugPrintGAGAMap(gagamap, nGAs);
722 #endif
723
724     if (nGAs > 0)
725         sendAck(sender, nGAs, gagamap);
726
727     fishing = rtsFalse;
728 }
729 \end{code}
730
731 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
732 (which represent shared thunks that have been shipped) into fetch-mes
733 to remote GAs.
734
735 \begin{code}
736 static void
737 processAck(STG_NO_ARGS)
738 {
739     int nGAs;
740     globalAddr *gaga;
741
742     globalAddr gagamap[MAX_GAS * 2];
743
744     unpackAck(&nGAs, gagamap);
745
746 #ifdef ACK_DEBUG
747     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
748     DebugPrintGAGAMap(gagamap, nGAs);
749 #endif
750
751     /*
752      * For each (oldGA, newGA) pair, set the GA of the corresponding
753      * thunk to the newGA, convert the thunk to a FetchMe, and return
754      * the weight from the oldGA.
755      */
756     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
757         P_ old = GALAlookup(gaga);
758         P_ new = GALAlookup(gaga + 1);
759
760         if (new == NULL) {
761             /* We don't have this closure, so we make a fetchme for it */
762             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
763
764             convertToFetchMe(old, ga);
765         } else {
766             /* 
767              * Oops...we've got this one already; update the RBH to
768              * point to the object we already know about, whatever it
769              * happens to be.
770              */
771             CommonUp(old, new);
772
773             /* 
774              * Increase the weight of the object by the amount just
775              * received in the second part of the ACK pair.
776              */
777             (void) addWeight(gaga + 1);
778         }
779         (void) addWeight(gaga);
780     }
781 }
782 \end{code}
783
784 \section{GUM Message Processor}
785
786 @processMessages@ processes any messages that have arrived, calling
787 appropriate routines depending on the message tag
788 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
789 present and performs a blocking receive! During profiling it
790 busy-waits in order to record idle time.
791
792 \begin{code}
793 void
794 processMessages(STG_NO_ARGS)
795 {
796     PACKET packet;
797     OPCODE opcode;
798     CostCentre Save_CCC;
799
800     /* Temporary Test Definitions */
801     GLOBAL_TASK_ID task;
802
803     Save_CCC = CCC;
804     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
805     
806     do {
807         if (RTSflags.CcFlags.doCostCentres) {
808             CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
809             CCC->scc_count++;
810
811             while (!PacketsWaiting())
812                 /*busy-wait loop*/;
813
814             CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
815         }
816
817         packet = GetPacket();   /* Get next message; block until one available */
818         CCC->scc_count++;
819
820         get_opcode_and_sender(packet, &opcode, &task);
821
822         switch (opcode) {
823
824         case PP_FINISH:
825             EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
826                                  * else */
827             break;
828
829         case PP_FETCH:
830             processFetch();
831             break;
832
833         case PP_RESUME:
834             processResume(task);
835             break;
836
837         case PP_ACK:
838             processAck();
839             break;
840
841         case PP_FISH:
842             processFish();
843             break;
844
845         case PP_FREE:
846             processFree();
847             break;
848
849         case PP_SCHEDULE:
850             processSchedule(task);
851             break;
852
853         default:
854             /* Anything we're not prepared to deal with. */
855             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
856               mytid, opcode, task);
857
858             EXIT(EXIT_FAILURE);
859         }                       /* switch */
860
861     } while (PacketsWaiting()); /* While there are messages: process them */
862     CCC = Save_CCC;
863 }                               /* processMessages */
864 \end{code}
865
866 \section{Exception Handlers}
867
868
869 @Comms_Harness_Exception@ is an exception handler that tests out the different 
870 GUM messages. 
871
872 \begin{code}
873 void
874 Comms_Harness_Exception(packet)
875 PACKET packet;
876 {
877     int i, load;
878     globalAddr ga,bqga;
879 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
880     OPCODE opcode = Opcode(packet);
881     GLOBAL_TASK_ID task;
882     
883 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
884
885     switch (opcode) {
886
887     case PP_FINISH:
888         EXIT(EXIT_SUCCESS);
889         break;
890
891     case PP_FETCH:
892         {
893             W_ data[11];
894             get_opcode_and_sender(packet,&opcode,&task);
895             fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
896             unpackFetch(&ga,&bqga,&load);
897             fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
898                             ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight, 
899                             bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
900             /*Send Resume in Response*/
901             for (i=0; i <= 10; ++i) data[i] = i;
902             sendResume(&bqga,11,data);      
903         }
904         break;
905
906     case PP_ACK:
907         {
908             int nGAs;
909             globalAddr gagamap[MAX_GAS*2];
910
911             get_opcode_and_sender(packet,&opcode,&task);
912             fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
913             unpackAck(&nGAs,gagamap);
914 #ifdef DEBUG
915             DebugPrintGAGAMap(gagamap,nGAs);
916 #endif
917         }
918         break;
919
920     case PP_FISH:
921         {
922             GLOBAL_TASK_ID origPE;
923             int age, history, hunger;
924             globalAddr testGA;
925             StgWord testData[6];
926
927             get_opcode_and_sender(packet,&opcode,&task);
928             fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
929             unpackFish(&origPE, &age, &history, &hunger);
930             fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
931                             origPE, age, history, hunger);
932
933             testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
934             for (i=0; i <= 5; ++i) testData[i] = 40+i;
935             sendSchedule(origPE,6,testData);        
936         }
937         break;
938
939     case PP_SCHEDULE:
940         {                               /* Test variables */
941             int nelem;
942             int testData[6];
943
944             get_opcode_and_sender(packet,&opcode,&task);
945             fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
946             unpackSchedule(&nelem, &testData);
947             fprintf(stderr,"In PE, nelem = %d \n", nelem);
948             for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
949             fprintf(stderr,"\n");
950         }
951         break;
952
953       /* Anything we're not prepared to deal with.  Note that ALL
954        * opcodes are discarded during termination -- this helps
955        * prevent bizarre race conditions.
956        */
957       default:
958         if (!GlobalStopPending) 
959           {
960             GLOBAL_TASK_ID ErrorTask;
961             int opcode;
962
963             get_opcode_and_sender(packet,&opcode,&ErrorTask);
964             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
965                     mytid, opcode, ErrorTask );
966             
967             PEShutDown();
968             
969             EXIT(EXIT_FAILURE);
970           }
971     }
972 }
973 \end{code}
974
975 @STG_Exception@ handles real communication exceptions
976
977 \begin{code}
978 void
979 STG_Exception(packet)
980 PACKET packet;
981 {
982     GLOBAL_TASK_ID sender = Sender_Task(packet); 
983     OPCODE opcode = Opcode(packet);
984 #if 0    
985     fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); 
986 #endif
987     switch (opcode) {
988
989     case PP_FINISH:
990         EXIT(EXIT_SUCCESS);
991         break;
992
993       /* Anything we're not prepared to deal with.  Note that ALL opcodes are discarded
994          during termination -- this helps prevent bizarre race conditions.
995       */
996       default:
997         if (!GlobalStopPending) 
998           {
999             GLOBAL_TASK_ID ErrorTask;
1000             int opcode;
1001
1002             get_opcode_and_sender(packet,&opcode,&ErrorTask);
1003             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1004                     mytid, opcode, ErrorTask );
1005             
1006             EXIT(EXIT_FAILURE);
1007           }
1008     }
1009 }
1010 \end{code}
1011
1012 \section{Miscellaneous Functions}
1013
1014 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1015 Important properties:
1016 o it varies during execution, even if the PE is idle
1017 o it's different for each PE
1018 o we never send a fish to ourselves
1019
1020 \begin{code}
1021 extern long lrand48 (STG_NO_ARGS);
1022
1023 GLOBAL_TASK_ID
1024 choosePE(STG_NO_ARGS)
1025 {
1026     long temp;
1027
1028     temp = lrand48() % nPEs;
1029     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
1030         temp = (temp + 1) % nPEs;
1031     }
1032     return PEs[temp];
1033 }
1034 \end{code}
1035
1036 @WaitForTermination@ enters an infinite loop waiting for the
1037 termination sequence to be completed.
1038
1039 \begin{code}
1040 void
1041 WaitForTermination(STG_NO_ARGS)
1042 {
1043   do {
1044     PACKET p = GetPacket();
1045     HandleException(p);
1046   } while (rtsTrue);
1047 }
1048 \end{code}
1049
1050 \begin{code}
1051 #ifdef DEBUG
1052 void
1053 DebugPrintGAGAMap(gagamap, nGAs)
1054 globalAddr *gagamap;
1055 int nGAs;
1056 {
1057     int i;
1058
1059     for (i = 0; i < nGAs; ++i, gagamap += 2)
1060         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1061           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1062           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1063 }
1064 #endif
1065 \end{code}
1066
1067 \begin{code}
1068
1069 static PP_ freeMsgBuffer = NULL;
1070 static int *freeMsgIndex = NULL;
1071
1072 void
1073 prepareFreeMsgBuffers(STG_NO_ARGS)
1074 {
1075     int i;
1076
1077     /* Allocate the freeMsg buffers just once and then hang onto them. */
1078
1079     if (freeMsgIndex == NULL) {
1080
1081         freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1082         freeMsgBuffer = (PP_)  stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1083
1084         for(i = 0; i < nPEs; i++) {
1085             if (i != thisPE) {
1086               freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1087                                         "prepareFreeMsgBuffers (Buffer #i)");
1088             }
1089         }
1090     }
1091
1092     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1093     for (i = 0; i < nPEs; i++)
1094         freeMsgIndex[i] = 0;
1095 }
1096
1097 void
1098 freeRemoteGA(pe, ga)
1099 int pe;
1100 globalAddr *ga;
1101 {
1102     int i;
1103
1104     ASSERT(GALAlookup(ga) == NULL);
1105
1106     if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1107 #ifdef FREE_DEBUG
1108         fprintf(stderr, "Filled a free message buffer\n");      
1109 #endif
1110         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1111         i = 0;
1112     }
1113     freeMsgBuffer[pe][i++] = (W_) ga->weight;
1114     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1115     freeMsgIndex[pe] = i;
1116 #ifdef DEBUG
1117     ga->weight = 0x0f0f0f0f;
1118     ga->loc.gc.gtid = 0x666;
1119     ga->loc.gc.slot = 0xdeaddead;
1120 #endif
1121 }
1122
1123 void
1124 sendFreeMessages(STG_NO_ARGS)
1125 {
1126     int i;
1127
1128     for (i = 0; i < nPEs; i++) {
1129         if (freeMsgIndex[i] > 0)
1130             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1131     }
1132 }
1133
1134 #endif /* PAR -- whole file */
1135 \end{code}