[project @ 1998-11-26 09:17:22 by sof]
[ghc-hetmet.git] / ghc / runtime / gum / HLComms.lc
1 /****************************************************************
2 *                                                               *
3 *       High Level Communications Routines (HLComms.lc)         *
4 *                                                               *
5 *  Contains the high-level routines (i.e. communication         *
6 *  subsystem independent) used by GUM                           *
7 *  (c) The Parade/AQUA Projects, Glasgow University, 1995       *
8 *  Phil Trinder, Glasgow University, 12 December 1994           *
9 *                                                               *
10 *****************************************************************/
11 \begin{code}
12 #ifdef PAR /* whole file */
13
14 #ifndef _AIX
15 #define NON_POSIX_SOURCE /* so says Solaris */
16 #endif
17
18 #include "rtsdefs.h"
19 #include "HLC.h"
20 \end{code}
21
22 \section{GUM Message Sending and Unpacking Functions}
23
24
25 @SendFetch@ packs the two global addresses and a load into a message +
26 sends it.  
27
28 \begin{code}
29 static W_ *gumPackBuffer;
30
31 void 
32 InitMoreBuffers(STG_NO_ARGS)
33 {
34     gumPackBuffer
35       = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
36 }
37
38 void
39 sendFetch(rga, lga, load)
40 globalAddr *rga, *lga;
41 int load;
42 {
43     CostCentre Save_CCC = CCC;
44
45     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
46     CCC->scc_count++;
47
48     ASSERT(rga->weight > 0 && lga->weight > 0);
49 #ifdef FETCH_DEBUG    
50     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
51       rga->loc.gc.gtid, rga->loc.gc.slot, load);
52 #endif
53     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
54       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
55       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
56
57     CCC = Save_CCC;
58 }
59 \end{code}
60
61 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
62
63 \begin{code}
64
65 static void
66 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
67 {
68     long buf[6];
69
70     GetArgs(buf, 6); 
71     lga->weight = 1;
72     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
73     lga->loc.gc.slot = (int) buf[1];
74
75     rga->weight = (unsigned) buf[2];
76     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
77     rga->loc.gc.slot = (int) buf[4];
78
79     *load = (int) buf[5];
80
81     ASSERT(rga->weight > 0);
82 }
83 \end{code}
84
85 @SendResume@ packs the remote blocking queue's GA and data into a message 
86 and sends it.
87
88 \begin{code}
89 void
90 sendResume(rga, nelem, data)
91 globalAddr *rga;
92 int nelem;
93 P_ data;
94 {
95     CostCentre Save_CCC = CCC;
96
97     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
98     CCC->scc_count++;
99
100 #ifdef RESUME_DEBUG
101     PrintPacket(data);
102     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
103       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
104 #endif
105
106     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
107       (W_) rga->weight, (W_) rga->loc.gc.slot);
108
109     CCC = Save_CCC;
110 }
111 \end{code}
112
113 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
114
115 \begin{code}
116 static void
117 blockFetch(P_ bf, P_ bh)
118 {
119     switch (INFO_TYPE(INFO_PTR(bh))) {
120     case INFO_BH_TYPE:
121         BF_LINK(bf) = PrelBase_Z91Z93_closure;
122         SET_INFO_PTR(bh, BQ_info);
123         BQ_ENTRIES(bh) = (W_) bf;
124
125 #ifdef GC_MUT_REQUIRED
126         /*
127          * If we modify a black hole in the old generation, we have to
128          * make sure it goes on the mutables list
129          */
130
131         if (bh <= StorageMgrInfo.OldLim) {
132             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
133             StorageMgrInfo.OldMutables = bh;
134         } else
135             MUT_LINK(bh) = MUT_NOT_LINKED;
136 #endif
137         break;
138     case INFO_BQ_TYPE:
139         BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
140         BQ_ENTRIES(bh) = (W_) bf;
141         break;
142     case INFO_FMBQ_TYPE:
143         BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
144         FMBQ_ENTRIES(bh) = (W_) bf;
145         break;
146     case INFO_SPEC_RBH_TYPE:
147         BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
148         SPEC_RBH_BQ(bh) = (W_) bf;
149         break;
150     case INFO_GEN_RBH_TYPE:
151         BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
152         GEN_RBH_BQ(bh) = (W_) bf;
153         break;
154     default:
155         fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
156           (W_) bh, INFO_PTR(bh));
157         EXIT(EXIT_FAILURE);
158     }
159 }
160 \end{code}
161
162 @processFetches@ constructs and sends resume messages for every
163 @BlockedFetch@ which is ready to be awakened.
164
165 \begin{code}
166 extern P_ PendingFetches;
167
168 void
169 processFetches()
170 {
171     P_ bf;
172     P_ next;
173     P_ closure;
174     P_ ip;
175     globalAddr rga;
176     
177     for (bf = PendingFetches; bf != PrelBase_Z91Z93_closure; bf = next) {
178         next = BF_LINK(bf);
179
180         /*
181          * Find the target at the end of the indirection chain, and
182          * process it in much the same fashion as the original target
183          * of the fetch.  Though we hope to find graph here, we could
184          * find a black hole (of any flavor) or even a FetchMe.
185          */
186         closure = BF_NODE(bf);
187         while (IS_INDIRECTION(INFO_PTR(closure)))
188             closure = (P_) IND_CLOSURE_PTR(closure);
189         ip = (P_) INFO_PTR(closure);
190
191         if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
192             /* Forward the Fetch to someone else */
193             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
194             rga.loc.gc.slot = (int) BF_SLOT(bf);
195             rga.weight = (unsigned) BF_WEIGHT(bf);
196
197             sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
198         } else if (IS_BLACK_HOLE(ip)) {
199             BF_NODE(bf) = closure;
200             blockFetch(bf, closure);
201         } else {
202             /* We now have some local graph to send back */
203             W_ size;
204             P_ graph;
205
206             if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
207                 PendingFetches = bf;
208                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
209                 SAVE_Hp -= PACK_HEAP_REQUIRED;
210                 bf = PendingFetches;
211                 next = BF_LINK(bf);
212                 closure = BF_NODE(bf);
213                 graph = PackNearbyGraph(closure, &size);
214                 ASSERT(graph != NULL);
215             }
216             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
217             rga.loc.gc.slot = (int) BF_SLOT(bf);
218             rga.weight = (unsigned) BF_WEIGHT(bf);
219
220             sendResume(&rga, size, graph);
221         }
222     }
223     PendingFetches = PrelBase_Z91Z93_closure;
224 }
225
226 \end{code}
227
228 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
229
230 \begin{code}
231
232 static void
233 unpackResume(globalAddr *lga, int *nelem, W_ *data)
234 {
235     long buf[3];
236
237     GetArgs(buf, 3); 
238     lga->weight = (unsigned) buf[0];
239     lga->loc.gc.gtid = mytid;
240     lga->loc.gc.slot = (int) buf[1];
241
242     *nelem = (int) buf[2];
243     GetArgs(data, *nelem);
244 }
245 \end{code}
246
247 @SendAck@ packs the global address being acknowledged, together with
248 an array of global addresses for any closures shipped and sends them.
249 \begin{code}
250
251 void
252 sendAck(task, ngas, gagamap)
253 GLOBAL_TASK_ID task;
254 int ngas;
255 globalAddr *gagamap;
256 {
257     static long *buffer;
258     long *p;
259     int i;
260     CostCentre Save_CCC = CCC;
261
262     buffer = (long *) gumPackBuffer;
263
264     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
265     CCC->scc_count++;
266
267     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
268         ASSERT(gagamap[1].weight > 0);
269         p[0] = (long) gagamap->weight;
270         p[1] = (long) gagamap->loc.gc.gtid;
271         p[2] = (long) gagamap->loc.gc.slot;
272         gagamap++;
273         p[3] = (long) gagamap->weight;
274         p[4] = (long) gagamap->loc.gc.gtid;
275         p[5] = (long) gagamap->loc.gc.slot;
276         gagamap++;
277     }
278 #ifdef ACK_DEBUG    
279     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
280 #endif
281     SendOpN(PP_ACK, task, p - buffer, buffer);
282
283     CCC = Save_CCC;
284 }
285 \end{code}
286
287 @unpackAck@ unpacks an Acknowledgement message into a Global address,
288 a count of the number of global addresses following and a map of 
289 Global addresses
290
291 \begin{code}
292
293 static void
294 unpackAck(int *ngas, globalAddr *gagamap)
295 {
296     long GAarraysize;
297     long buf[6];
298
299     GetArgs(&GAarraysize, 1);
300
301     *ngas = GAarraysize / 6;
302
303     while (GAarraysize > 0) {
304         GetArgs(buf, 6);
305         gagamap->weight = (unsigned) buf[0];
306         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
307         gagamap->loc.gc.slot = (int) buf[2];
308         gagamap++;
309         gagamap->weight = (unsigned) buf[3];
310         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
311         gagamap->loc.gc.slot = (int) buf[5];
312         ASSERT(gagamap->weight > 0);
313         gagamap++;
314         GAarraysize -= 6;
315     }
316 }
317 \end{code}
318
319 @SendFish@ packs the global address being acknowledged, together with
320 an array of global addresses for any closures shipped and sends them.
321 \begin{code}
322
323 void
324 sendFish(destPE, origPE, age, history, hunger)
325 GLOBAL_TASK_ID destPE, origPE;
326 int age, history, hunger;
327 {
328     CostCentre Save_CCC = CCC;
329
330     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
331     CCC->scc_count++;
332
333 #ifdef FISH_DEBUG
334     fprintf(stderr,"Sending Fish to %lx\n", destPE);
335 #endif
336     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
337     if (origPE == mytid)
338         fishing = rtsTrue;
339
340     CCC = Save_CCC;
341 }
342 \end{code}
343
344 @unpackFish@ unpacks a FISH message into the global task id of the
345 originating PE and 3 data fields: the age, history and hunger of the
346 fish. The history + hunger are not currently used.
347
348 \begin{code}
349
350 static void
351 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
352 {
353     long buf[4];
354
355     GetArgs(buf, 4);
356
357     *origPE = (GLOBAL_TASK_ID) buf[0];
358     *age = (int) buf[1];
359     *history = (int) buf[2];
360     *hunger = (int) buf[3];
361 }
362 \end{code}
363
364 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
365 to.
366
367 \begin{code}
368 void
369 sendFree(pe, nelem, data)
370 GLOBAL_TASK_ID pe;
371 int nelem;
372 P_ data;
373 {
374     CostCentre Save_CCC = CCC;
375
376     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
377     CCC->scc_count++;
378
379 #ifdef FREE_DEBUG
380     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
381 #endif
382     SendOpN(PP_FREE, pe, nelem, data);
383
384     CCC = Save_CCC;
385 }
386
387 \end{code}
388
389 @unpackFree@ unpacks a FREE message into the amount of data shipped and
390 a data block.
391
392 \begin{code}
393
394 static void
395 unpackFree(int *nelem, W_ *data)
396 {
397     long buf[1];
398
399     GetArgs(buf, 1);
400     *nelem = (int) buf[0];
401     GetArgs(data, *nelem);
402 }
403 \end{code}
404
405 @SendSchedule@ sends a closure to be evaluated in response to a Fish
406 message. The message is directed to the PE that originated the Fish
407 (origPE), and includes the packed closure (data) along with its size
408 (nelem).
409
410 \begin{code}
411
412 void
413 sendSchedule(origPE, nelem, data)
414 GLOBAL_TASK_ID origPE;
415 int nelem;
416 P_ data;
417 {
418
419     CostCentre Save_CCC = CCC;
420
421     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
422     CCC->scc_count++;
423
424 #ifdef SCHEDULE_DEBUG
425     PrintPacket(data);
426     fprintf(stderr, "Sending Schedule to %x\n", origPE);
427 #endif
428
429     SendOpN(PP_SCHEDULE, origPE, nelem, data);
430
431     CCC = Save_CCC;
432 }
433 \end{code}
434
435 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
436 the closure shipped, the amount of data shipped (nelem) and the data
437 block (data).
438
439 \begin{code}
440
441 static void
442 unpackSchedule(int *nelem, W_ *data)
443 {
444     long buf[1];
445
446     GetArgs(buf, 1);
447     *nelem = (int) buf[0];
448     GetArgs(data, *nelem);
449 }
450 \end{code}
451
452 \section{Message-Processing Functions}
453
454 The following routines process incoming GUM messages. Often reissuing
455 messages in response.
456
457 @processFish@ unpacks a fish message, reissuing it if it's our own,
458 sending work if we have it or sending it onwards otherwise.
459
460 \begin{code}
461 static void
462 processFish(STG_NO_ARGS)
463 {
464     GLOBAL_TASK_ID origPE;
465     int age, history, hunger;
466
467     unpackFish(&origPE, &age, &history, &hunger);
468
469     if (origPE == mytid) {
470         fishing = rtsFalse;
471     } else {
472         P_ spark;
473
474         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
475             W_ size;
476             P_ graph;
477
478             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
479                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
480                 SAVE_Hp -= PACK_HEAP_REQUIRED;
481                 /* Now go back and try again */
482             } else {
483                 sendSchedule(origPE, size, graph);
484                 DisposeSpark(spark);
485                 break;
486             }
487         }
488         if (spark == NULL) {
489             /* We have no sparks to give */
490             if (age < FISH_LIFE_EXPECTANCY)
491                 sendFish(choosePE(), origPE,
492                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
493
494             /* Send it home to die */
495             else
496                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
497         }
498     }
499 }                               /* processFish */
500 \end{code}
501
502 @processFetch@ either returns the requested data (if available) 
503 or blocks the remote blocking queue on a black hole (if not).
504
505 \begin{code}
506 static void
507 processFetch(STG_NO_ARGS)
508 {
509     globalAddr ga, rga;
510     int load;
511
512     P_ closure;
513     P_ ip;
514
515     unpackFetch(&ga, &rga, &load);
516 #ifdef FETCH_DEBUG
517     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
518       ga.loc.gc.gtid, ga.loc.gc.slot,
519       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
520 #endif
521
522     closure = GALAlookup(&ga);
523     ip = (P_) INFO_PTR(closure);
524
525     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
526         /* Forward the Fetch to someone else */
527         sendFetch(FETCHME_GA(closure), &rga, load);
528     } else if (rga.loc.gc.gtid == mytid) {
529         /* Our own FETCH forwarded back around to us */
530         P_ fmbq = GALAlookup(&rga);
531
532         /* We may have already discovered that the fetch target is our own. */
533         if (fmbq != closure) 
534             CommonUp(fmbq, closure);
535         (void) addWeight(&rga);
536     } else if (IS_BLACK_HOLE(ip)) {
537         /* This includes RBH's and FMBQ's */
538         P_ bf;
539
540         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
541             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
542             closure = GALAlookup(&ga);
543             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
544         }
545         ASSERT(GALAlookup(&rga) == NULL);
546
547         SET_BF_HDR(bf, BF_info, bogosity);
548         BF_NODE(bf) = closure;
549         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
550         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
551         BF_WEIGHT(bf) = (W_) rga.weight;
552         blockFetch(bf, closure);
553
554 #ifdef FETCH_DEBUG
555         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
556           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
557 #endif
558
559     } else {                    
560         /* The target of the FetchMe is some local graph */
561         W_ size;
562         P_ graph;
563
564         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
565             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
566             SAVE_Hp -= PACK_HEAP_REQUIRED;
567             closure = GALAlookup(&ga);
568             graph = PackNearbyGraph(closure, &size);
569             ASSERT(graph != NULL);
570         }
571         sendResume(&rga, size, graph);
572     }
573 }
574 \end{code}
575
576 @processFree@ unpacks a FREE message and adds the weights to our GAs.
577
578 \begin{code}
579 static void
580 processFree(STG_NO_ARGS)
581 {
582     int nelem;
583     static W_ *freeBuffer;
584     int i;
585     globalAddr ga;
586
587     freeBuffer = gumPackBuffer;
588     unpackFree(&nelem, freeBuffer);
589 #ifdef FREE_DEBUG
590     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
591 #endif
592     ga.loc.gc.gtid = mytid;
593     for (i = 0; i < nelem;) {
594         ga.weight = (unsigned) freeBuffer[i++];
595         ga.loc.gc.slot = (int) freeBuffer[i++];
596 #ifdef FREE_DEBUG
597         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
598           ga.loc.gc.slot, ga.weight);
599 #endif
600         (void) addWeight(&ga);
601     }
602 }
603 \end{code}
604
605 @processResume@ unpacks a RESUME message into the graph, filling in
606 the LA -> GA, and GA -> LA tables. Threads blocked on the original
607 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
608 is converted into an indirection.  Finally it sends an ACK in response
609 which contains any newly allocated GAs.
610
611 \begin{code}
612
613 static void
614 processResume(GLOBAL_TASK_ID sender)
615 {
616     int nelem;
617     W_ nGAs;
618     static W_ *packBuffer;
619     P_ newGraph;
620     P_ old;
621     globalAddr lga;
622     globalAddr *gagamap;
623
624     packBuffer = gumPackBuffer;
625     unpackResume(&lga, &nelem, packBuffer);
626
627 #ifdef RESUME_DEBUG
628     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
629       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
630     PrintPacket(packBuffer);
631 #endif
632
633     /* 
634      * We always unpack the incoming graph, even if we've received the
635      * requested node in some other data packet (and already awakened
636      * the blocking queue).
637      */
638     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
639         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
640         SAVE_Hp -= packBuffer[0];
641     }
642
643     /* Do this *after* GC; we don't want to release the object early! */
644
645     if (lga.weight > 0)
646         (void) addWeight(&lga);
647
648     old = GALAlookup(&lga);
649
650     if (RTSflags.ParFlags.granSimStats) {
651         P_ tso = NULL;
652
653         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
654             for(tso = (P_) FMBQ_ENTRIES(old); 
655               TSO_LINK(tso) != PrelBase_Z91Z93_closure; 
656               tso = TSO_LINK(tso))
657                 ;
658         }
659         /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
660         DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
661                          tso,old,0);
662     }
663
664     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
665     ASSERT(newGraph != NULL);
666
667     /* 
668      * Sometimes, unpacking will common up the resumee with the
669      * incoming graph, but if it hasn't, we'd better do so now.
670      */
671    
672     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
673         CommonUp(old, newGraph);
674
675 #ifdef RESUME_DEBUG
676     DebugPrintGAGAMap(gagamap, nGAs);
677 #endif
678
679     sendAck(sender, nGAs, gagamap);
680 }
681 \end{code}
682
683 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
684 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
685 the local spark queue.  Finally it sends an ACK in response
686 which contains any newly allocated GAs.
687
688 \begin{code}
689 static void
690 processSchedule(GLOBAL_TASK_ID sender)
691 {
692     int nelem;
693     int space_required;
694     rtsBool success;
695     static W_ *packBuffer;
696     W_ nGAs;
697     P_ newGraph;
698     globalAddr *gagamap;
699
700     packBuffer = gumPackBuffer;         /* HWL */
701     unpackSchedule(&nelem, packBuffer);
702
703 #ifdef SCHEDULE_DEBUG
704     fprintf(stderr, "Rcvd Schedule\n");
705     PrintPacket(packBuffer);
706 #endif
707
708     /*
709      * For now, the graph is a closure to be sparked as an advisory
710      * spark, but in future it may be a complete spark with
711      * required/advisory status, priority etc.
712      */
713
714     space_required = packBuffer[0];
715     if (SAVE_Hp + space_required >= SAVE_HpLim) {
716         ReallyPerformThreadGC(space_required, rtsFalse);
717         SAVE_Hp -= space_required;
718     }
719     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
720     ASSERT(newGraph != NULL);
721     success = Spark(newGraph, rtsFalse);
722     ASSERT(success);
723
724 #ifdef SCHEDULE_DEBUG
725     DebugPrintGAGAMap(gagamap, nGAs);
726 #endif
727
728     if (nGAs > 0)
729         sendAck(sender, nGAs, gagamap);
730
731     fishing = rtsFalse;
732 }
733 \end{code}
734
735 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
736 (which represent shared thunks that have been shipped) into fetch-mes
737 to remote GAs.
738
739 \begin{code}
740 static void
741 processAck(STG_NO_ARGS)
742 {
743     int nGAs;
744     globalAddr *gaga;
745
746     globalAddr gagamap[MAX_GAS * 2];
747
748     unpackAck(&nGAs, gagamap);
749
750 #ifdef ACK_DEBUG
751     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
752     DebugPrintGAGAMap(gagamap, nGAs);
753 #endif
754
755     /*
756      * For each (oldGA, newGA) pair, set the GA of the corresponding
757      * thunk to the newGA, convert the thunk to a FetchMe, and return
758      * the weight from the oldGA.
759      */
760     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
761         P_ old = GALAlookup(gaga);
762         P_ new = GALAlookup(gaga + 1);
763
764         if (new == NULL) {
765             /* We don't have this closure, so we make a fetchme for it */
766             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
767
768             convertToFetchMe(old, ga);
769         } else {
770             /* 
771              * Oops...we've got this one already; update the RBH to
772              * point to the object we already know about, whatever it
773              * happens to be.
774              */
775             CommonUp(old, new);
776
777             /* 
778              * Increase the weight of the object by the amount just
779              * received in the second part of the ACK pair.
780              */
781             (void) addWeight(gaga + 1);
782         }
783         (void) addWeight(gaga);
784     }
785 }
786 \end{code}
787
788 \section{GUM Message Processor}
789
790 @processMessages@ processes any messages that have arrived, calling
791 appropriate routines depending on the message tag
792 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
793 present and performs a blocking receive! During profiling it
794 busy-waits in order to record idle time.
795
796 \begin{code}
797 void
798 processMessages(STG_NO_ARGS)
799 {
800     PACKET packet;
801     OPCODE opcode;
802     CostCentre Save_CCC;
803
804     /* Temporary Test Definitions */
805     GLOBAL_TASK_ID task;
806
807     Save_CCC = CCC;
808     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
809     
810     do {
811         if (RTSflags.CcFlags.doCostCentres) {
812             CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
813             CCC->scc_count++;
814
815             while (!PacketsWaiting())
816                 /*busy-wait loop*/;
817
818             CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
819         }
820
821         packet = GetPacket();   /* Get next message; block until one available */
822         CCC->scc_count++;
823
824         get_opcode_and_sender(packet, &opcode, &task);
825
826         switch (opcode) {
827
828         case PP_FINISH:
829             EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
830                                  * else */
831             break;
832
833         case PP_FETCH:
834             processFetch();
835             break;
836
837         case PP_RESUME:
838             processResume(task);
839             break;
840
841         case PP_ACK:
842             processAck();
843             break;
844
845         case PP_FISH:
846             processFish();
847             break;
848
849         case PP_FREE:
850             processFree();
851             break;
852
853         case PP_SCHEDULE:
854             processSchedule(task);
855             break;
856
857         default:
858             /* Anything we're not prepared to deal with. */
859             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
860               mytid, opcode, task);
861
862             EXIT(EXIT_FAILURE);
863         }                       /* switch */
864
865     } while (PacketsWaiting()); /* While there are messages: process them */
866     CCC = Save_CCC;
867 }                               /* processMessages */
868 \end{code}
869
870 \section{Exception Handlers}
871
872
873 @Comms_Harness_Exception@ is an exception handler that tests out the different 
874 GUM messages. 
875
876 \begin{code}
877 void
878 Comms_Harness_Exception(packet)
879 PACKET packet;
880 {
881     int i, load;
882     globalAddr ga,bqga;
883 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
884     OPCODE opcode = Opcode(packet);
885     GLOBAL_TASK_ID task;
886     
887 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
888
889     switch (opcode) {
890
891     case PP_FINISH:
892         EXIT(EXIT_SUCCESS);
893         break;
894
895     case PP_FETCH:
896         {
897             W_ data[11];
898             get_opcode_and_sender(packet,&opcode,&task);
899             fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
900             unpackFetch(&ga,&bqga,&load);
901             fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
902                             ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight, 
903                             bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
904             /*Send Resume in Response*/
905             for (i=0; i <= 10; ++i) data[i] = i;
906             sendResume(&bqga,11,data);      
907         }
908         break;
909
910     case PP_ACK:
911         {
912             int nGAs;
913             globalAddr gagamap[MAX_GAS*2];
914
915             get_opcode_and_sender(packet,&opcode,&task);
916             fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
917             unpackAck(&nGAs,gagamap);
918 #ifdef DEBUG
919             DebugPrintGAGAMap(gagamap,nGAs);
920 #endif
921         }
922         break;
923
924     case PP_FISH:
925         {
926             GLOBAL_TASK_ID origPE;
927             int age, history, hunger;
928             globalAddr testGA;
929             StgWord testData[6];
930
931             get_opcode_and_sender(packet,&opcode,&task);
932             fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
933             unpackFish(&origPE, &age, &history, &hunger);
934             fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
935                             origPE, age, history, hunger);
936
937             testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
938             for (i=0; i <= 5; ++i) testData[i] = 40+i;
939             sendSchedule(origPE,6,testData);        
940         }
941         break;
942
943     case PP_SCHEDULE:
944         {                               /* Test variables */
945             int nelem;
946             int testData[6];
947
948             get_opcode_and_sender(packet,&opcode,&task);
949             fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
950             unpackSchedule(&nelem, &testData);
951             fprintf(stderr,"In PE, nelem = %d \n", nelem);
952             for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
953             fprintf(stderr,"\n");
954         }
955         break;
956
957       /* Anything we're not prepared to deal with.  Note that ALL
958        * opcodes are discarded during termination -- this helps
959        * prevent bizarre race conditions.
960        */
961       default:
962         if (!GlobalStopPending) 
963           {
964             GLOBAL_TASK_ID ErrorTask;
965             int opcode;
966
967             get_opcode_and_sender(packet,&opcode,&ErrorTask);
968             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
969                     mytid, opcode, ErrorTask );
970             
971             PEShutDown();
972             
973             EXIT(EXIT_FAILURE);
974           }
975     }
976 }
977 \end{code}
978
979 @STG_Exception@ handles real communication exceptions
980
981 \begin{code}
982 void
983 STG_Exception(packet)
984 PACKET packet;
985 {
986     GLOBAL_TASK_ID sender = Sender_Task(packet); 
987     OPCODE opcode = Opcode(packet);
988 #if 0    
989     fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); 
990 #endif
991     switch (opcode) {
992
993     case PP_FINISH:
994         EXIT(EXIT_SUCCESS);
995         break;
996
997       /* Anything we're not prepared to deal with.  Note that ALL opcodes are discarded
998          during termination -- this helps prevent bizarre race conditions.
999       */
1000       default:
1001         if (!GlobalStopPending) 
1002           {
1003             GLOBAL_TASK_ID ErrorTask;
1004             int opcode;
1005
1006             get_opcode_and_sender(packet,&opcode,&ErrorTask);
1007             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1008                     mytid, opcode, ErrorTask );
1009             
1010             EXIT(EXIT_FAILURE);
1011           }
1012     }
1013 }
1014 \end{code}
1015
1016 \section{Miscellaneous Functions}
1017
1018 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1019 Important properties:
1020 o it varies during execution, even if the PE is idle
1021 o it's different for each PE
1022 o we never send a fish to ourselves
1023
1024 \begin{code}
1025 extern long lrand48 (STG_NO_ARGS);
1026
1027 GLOBAL_TASK_ID
1028 choosePE(STG_NO_ARGS)
1029 {
1030     long temp;
1031
1032     temp = lrand48() % nPEs;
1033     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
1034         temp = (temp + 1) % nPEs;
1035     }
1036     return PEs[temp];
1037 }
1038 \end{code}
1039
1040 @WaitForTermination@ enters an infinite loop waiting for the
1041 termination sequence to be completed.
1042
1043 \begin{code}
1044 void
1045 WaitForTermination(STG_NO_ARGS)
1046 {
1047   do {
1048     PACKET p = GetPacket();
1049     HandleException(p);
1050   } while (rtsTrue);
1051 }
1052 \end{code}
1053
1054 \begin{code}
1055 #ifdef DEBUG
1056 void
1057 DebugPrintGAGAMap(gagamap, nGAs)
1058 globalAddr *gagamap;
1059 int nGAs;
1060 {
1061     int i;
1062
1063     for (i = 0; i < nGAs; ++i, gagamap += 2)
1064         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1065           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1066           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1067 }
1068 #endif
1069 \end{code}
1070
1071 \begin{code}
1072
1073 static PP_ freeMsgBuffer = NULL;
1074 static int *freeMsgIndex = NULL;
1075
1076 void
1077 prepareFreeMsgBuffers(STG_NO_ARGS)
1078 {
1079     int i;
1080
1081     /* Allocate the freeMsg buffers just once and then hang onto them. */
1082
1083     if (freeMsgIndex == NULL) {
1084
1085         freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1086         freeMsgBuffer = (PP_)  stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1087
1088         for(i = 0; i < nPEs; i++) {
1089             if (i != thisPE) {
1090               freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1091                                         "prepareFreeMsgBuffers (Buffer #i)");
1092             }
1093         }
1094     }
1095
1096     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1097     for (i = 0; i < nPEs; i++)
1098         freeMsgIndex[i] = 0;
1099 }
1100
1101 void
1102 freeRemoteGA(pe, ga)
1103 int pe;
1104 globalAddr *ga;
1105 {
1106     int i;
1107
1108     ASSERT(GALAlookup(ga) == NULL);
1109
1110     if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1111 #ifdef FREE_DEBUG
1112         fprintf(stderr, "Filled a free message buffer\n");      
1113 #endif
1114         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1115         i = 0;
1116     }
1117     freeMsgBuffer[pe][i++] = (W_) ga->weight;
1118     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1119     freeMsgIndex[pe] = i;
1120 #ifdef DEBUG
1121     ga->weight = 0x0f0f0f0f;
1122     ga->loc.gc.gtid = 0x666;
1123     ga->loc.gc.slot = 0xdeaddead;
1124 #endif
1125 }
1126
1127 void
1128 sendFreeMessages(STG_NO_ARGS)
1129 {
1130     int i;
1131
1132     for (i = 0; i < nPEs; i++) {
1133         if (freeMsgIndex[i] > 0)
1134             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1135     }
1136 }
1137
1138 #endif /* PAR -- whole file */
1139 \end{code}