[project @ 1996-06-27 16:13:29 by partain]
[ghc-hetmet.git] / ghc / runtime / gum / HLComms.lc
1 /****************************************************************
2 *                                                               *
3 *       High Level Communications Routines (HLComms.lc)         *
4 *                                                               *
5 *  Contains the high-level routines (i.e. communication         *
6 *  subsystem independent) used by GUM                           *
7 *  (c) The Parade/AQUA Projects, Glasgow University, 1995       *
8 *  Phil Trinder, Glasgow University, 12 December 1994           *
9 *                                                               *
10 *****************************************************************/
11 \begin{code}
12 #ifdef PAR /* whole file */
13
14 #define NON_POSIX_SOURCE /* so says Solaris */
15
16 #include "rtsdefs.h"
17 #include "HLC.h"
18 \end{code}
19
20 \section{GUM Message Sending and Unpacking Functions}
21
22
23 @SendFetch@ packs the two global addresses and a load into a message +
24 sends it.  
25
26 \begin{code}
27 static W_ *gumPackBuffer;
28
29 void 
30 InitMoreBuffers(STG_NO_ARGS)
31 {
32     gumPackBuffer
33       = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
34 }
35
36 void
37 sendFetch(rga, lga, load)
38 globalAddr *rga, *lga;
39 int load;
40 {
41     CostCentre Save_CCC = CCC;
42
43     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
44     CCC->scc_count++;
45
46     ASSERT(rga->weight > 0 && lga->weight > 0);
47 #ifdef FETCH_DEBUG    
48     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
49       rga->loc.gc.gtid, rga->loc.gc.slot, load);
50 #endif
51     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
52       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
53       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
54
55     CCC = Save_CCC;
56 }
57 \end{code}
58
59 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
60
61 \begin{code}
62
63 static void
64 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
65 {
66     long buf[6];
67
68     GetArgs(buf, 6); 
69     lga->weight = 1;
70     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
71     lga->loc.gc.slot = (int) buf[1];
72
73     rga->weight = (unsigned) buf[2];
74     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
75     rga->loc.gc.slot = (int) buf[4];
76
77     *load = (int) buf[5];
78
79     ASSERT(rga->weight > 0);
80 }
81 \end{code}
82
83 @SendResume@ packs the remote blocking queue's GA and data into a message 
84 and sends it.
85
86 \begin{code}
87 void
88 sendResume(rga, nelem, data)
89 globalAddr *rga;
90 int nelem;
91 P_ data;
92 {
93     CostCentre Save_CCC = CCC;
94
95     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
96     CCC->scc_count++;
97
98 #ifdef RESUME_DEBUG
99     PrintPacket(data);
100     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
101       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
102 #endif
103
104     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105       (W_) rga->weight, (W_) rga->loc.gc.slot);
106
107     CCC = Save_CCC;
108 }
109 \end{code}
110
111 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
112
113 \begin{code}
114 static void
115 blockFetch(P_ bf, P_ bh)
116 {
117     switch (INFO_TYPE(INFO_PTR(bh))) {
118     case INFO_BH_TYPE:
119         BF_LINK(bf) = Prelude_Z91Z93_closure;
120         SET_INFO_PTR(bh, BQ_info);
121         BQ_ENTRIES(bh) = (W_) bf;
122
123 #ifdef GC_MUT_REQUIRED
124         /*
125          * If we modify a black hole in the old generation, we have to
126          * make sure it goes on the mutables list
127          */
128
129         if (bh <= StorageMgrInfo.OldLim) {
130             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
131             StorageMgrInfo.OldMutables = bh;
132         } else
133             MUT_LINK(bh) = MUT_NOT_LINKED;
134 #endif
135         break;
136     case INFO_BQ_TYPE:
137         BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
138         BQ_ENTRIES(bh) = (W_) bf;
139         break;
140     case INFO_FMBQ_TYPE:
141         BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
142         FMBQ_ENTRIES(bh) = (W_) bf;
143         break;
144     case INFO_SPEC_RBH_TYPE:
145         BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
146         SPEC_RBH_BQ(bh) = (W_) bf;
147         break;
148     case INFO_GEN_RBH_TYPE:
149         BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
150         GEN_RBH_BQ(bh) = (W_) bf;
151         break;
152     default:
153         fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
154           (W_) bh, INFO_PTR(bh));
155         EXIT(EXIT_FAILURE);
156     }
157 }
158 \end{code}
159
160 @processFetches@ constructs and sends resume messages for every
161 @BlockedFetch@ which is ready to be awakened.
162
163 \begin{code}
164 extern P_ PendingFetches;
165
166 void
167 processFetches()
168 {
169     P_ bf;
170     P_ next;
171     P_ closure;
172     P_ ip;
173     globalAddr rga;
174     
175     for (bf = PendingFetches; bf != Prelude_Z91Z93_closure; bf = next) {
176         next = BF_LINK(bf);
177
178         /*
179          * Find the target at the end of the indirection chain, and
180          * process it in much the same fashion as the original target
181          * of the fetch.  Though we hope to find graph here, we could
182          * find a black hole (of any flavor) or even a FetchMe.
183          */
184         closure = BF_NODE(bf);
185         while (IS_INDIRECTION(INFO_PTR(closure)))
186             closure = (P_) IND_CLOSURE_PTR(closure);
187         ip = (P_) INFO_PTR(closure);
188
189         if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
190             /* Forward the Fetch to someone else */
191             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
192             rga.loc.gc.slot = (int) BF_SLOT(bf);
193             rga.weight = (unsigned) BF_WEIGHT(bf);
194
195             sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
196         } else if (IS_BLACK_HOLE(ip)) {
197             BF_NODE(bf) = closure;
198             blockFetch(bf, closure);
199         } else {
200             /* We now have some local graph to send back */
201             W_ size;
202             P_ graph;
203
204             if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
205                 PendingFetches = bf;
206                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
207                 SAVE_Hp -= PACK_HEAP_REQUIRED;
208                 bf = PendingFetches;
209                 next = BF_LINK(bf);
210                 closure = BF_NODE(bf);
211                 graph = PackNearbyGraph(closure, &size);
212                 ASSERT(graph != NULL);
213             }
214             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
215             rga.loc.gc.slot = (int) BF_SLOT(bf);
216             rga.weight = (unsigned) BF_WEIGHT(bf);
217
218             sendResume(&rga, size, graph);
219         }
220     }
221     PendingFetches = Prelude_Z91Z93_closure;
222 }
223
224 \end{code}
225
226 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
227
228 \begin{code}
229
230 static void
231 unpackResume(globalAddr *lga, int *nelem, W_ *data)
232 {
233     long buf[3];
234
235     GetArgs(buf, 3); 
236     lga->weight = (unsigned) buf[0];
237     lga->loc.gc.gtid = mytid;
238     lga->loc.gc.slot = (int) buf[1];
239
240     *nelem = (int) buf[2];
241     GetArgs(data, *nelem);
242 }
243 \end{code}
244
245 @SendAck@ packs the global address being acknowledged, together with
246 an array of global addresses for any closures shipped and sends them.
247 \begin{code}
248
249 void
250 sendAck(task, ngas, gagamap)
251 GLOBAL_TASK_ID task;
252 int ngas;
253 globalAddr *gagamap;
254 {
255     static long *buffer;
256     long *p;
257     int i;
258     CostCentre Save_CCC = CCC;
259
260     buffer = (long *) gumPackBuffer;
261
262     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
263     CCC->scc_count++;
264
265     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
266         ASSERT(gagamap[1].weight > 0);
267         p[0] = (long) gagamap->weight;
268         p[1] = (long) gagamap->loc.gc.gtid;
269         p[2] = (long) gagamap->loc.gc.slot;
270         gagamap++;
271         p[3] = (long) gagamap->weight;
272         p[4] = (long) gagamap->loc.gc.gtid;
273         p[5] = (long) gagamap->loc.gc.slot;
274         gagamap++;
275     }
276 #ifdef ACK_DEBUG    
277     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
278 #endif
279     SendOpN(PP_ACK, task, p - buffer, buffer);
280
281     CCC = Save_CCC;
282 }
283 \end{code}
284
285 @unpackAck@ unpacks an Acknowledgement message into a Global address,
286 a count of the number of global addresses following and a map of 
287 Global addresses
288
289 \begin{code}
290
291 static void
292 unpackAck(int *ngas, globalAddr *gagamap)
293 {
294     long GAarraysize;
295     long buf[6];
296
297     GetArgs(&GAarraysize, 1);
298
299     *ngas = GAarraysize / 6;
300
301     while (GAarraysize > 0) {
302         GetArgs(buf, 6);
303         gagamap->weight = (unsigned) buf[0];
304         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
305         gagamap->loc.gc.slot = (int) buf[2];
306         gagamap++;
307         gagamap->weight = (unsigned) buf[3];
308         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
309         gagamap->loc.gc.slot = (int) buf[5];
310         ASSERT(gagamap->weight > 0);
311         gagamap++;
312         GAarraysize -= 6;
313     }
314 }
315 \end{code}
316
317 @SendFish@ packs the global address being acknowledged, together with
318 an array of global addresses for any closures shipped and sends them.
319 \begin{code}
320
321 void
322 sendFish(destPE, origPE, age, history, hunger)
323 GLOBAL_TASK_ID destPE, origPE;
324 int age, history, hunger;
325 {
326     CostCentre Save_CCC = CCC;
327
328     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
329     CCC->scc_count++;
330
331 #ifdef FISH_DEBUG
332     fprintf(stderr,"Sending Fish to %lx\n", destPE);
333 #endif
334     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
335     if (origPE == mytid)
336         fishing = rtsTrue;
337
338     CCC = Save_CCC;
339 }
340 \end{code}
341
342 @unpackFish@ unpacks a FISH message into the global task id of the
343 originating PE and 3 data fields: the age, history and hunger of the
344 fish. The history + hunger are not currently used.
345
346 \begin{code}
347
348 static void
349 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
350 {
351     long buf[4];
352
353     GetArgs(buf, 4);
354
355     *origPE = (GLOBAL_TASK_ID) buf[0];
356     *age = (int) buf[1];
357     *history = (int) buf[2];
358     *hunger = (int) buf[3];
359 }
360 \end{code}
361
362 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
363 to.
364
365 \begin{code}
366 void
367 sendFree(pe, nelem, data)
368 GLOBAL_TASK_ID pe;
369 int nelem;
370 P_ data;
371 {
372     CostCentre Save_CCC = CCC;
373
374     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
375     CCC->scc_count++;
376
377 #ifdef FREE_DEBUG
378     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
379 #endif
380     SendOpN(PP_FREE, pe, nelem, data);
381
382     CCC = Save_CCC;
383 }
384
385 \end{code}
386
387 @unpackFree@ unpacks a FREE message into the amount of data shipped and
388 a data block.
389
390 \begin{code}
391
392 static void
393 unpackFree(int *nelem, W_ *data)
394 {
395     long buf[1];
396
397     GetArgs(buf, 1);
398     *nelem = (int) buf[0];
399     GetArgs(data, *nelem);
400 }
401 \end{code}
402
403 @SendSchedule@ sends a closure to be evaluated in response to a Fish
404 message. The message is directed to the PE that originated the Fish
405 (origPE), and includes the packed closure (data) along with its size
406 (nelem).
407
408 \begin{code}
409
410 void
411 sendSchedule(origPE, nelem, data)
412 GLOBAL_TASK_ID origPE;
413 int nelem;
414 P_ data;
415 {
416
417     CostCentre Save_CCC = CCC;
418
419     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
420     CCC->scc_count++;
421
422 #ifdef SCHEDULE_DEBUG
423     PrintPacket(data);
424     fprintf(stderr, "Sending Schedule to %x\n", origPE);
425 #endif
426
427     SendOpN(PP_SCHEDULE, origPE, nelem, data);
428
429     CCC = Save_CCC;
430 }
431 \end{code}
432
433 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
434 the closure shipped, the amount of data shipped (nelem) and the data
435 block (data).
436
437 \begin{code}
438
439 static void
440 unpackSchedule(int *nelem, W_ *data)
441 {
442     long buf[1];
443
444     GetArgs(buf, 1);
445     *nelem = (int) buf[0];
446     GetArgs(data, *nelem);
447 }
448 \end{code}
449
450 \section{Message-Processing Functions}
451
452 The following routines process incoming GUM messages. Often reissuing
453 messages in response.
454
455 @processFish@ unpacks a fish message, reissuing it if it's our own,
456 sending work if we have it or sending it onwards otherwise.
457
458 \begin{code}
459 static void
460 processFish(STG_NO_ARGS)
461 {
462     GLOBAL_TASK_ID origPE;
463     int age, history, hunger;
464
465     unpackFish(&origPE, &age, &history, &hunger);
466
467     if (origPE == mytid) {
468         fishing = rtsFalse;
469     } else {
470         P_ spark;
471
472         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
473             W_ size;
474             P_ graph;
475
476             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
477                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
478                 SAVE_Hp -= PACK_HEAP_REQUIRED;
479                 /* Now go back and try again */
480             } else {
481                 sendSchedule(origPE, size, graph);
482                 DisposeSpark(spark);
483                 break;
484             }
485         }
486         if (spark == NULL) {
487             /* We have no sparks to give */
488             if (age < FISH_LIFE_EXPECTANCY)
489                 sendFish(choosePE(), origPE,
490                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
491
492             /* Send it home to die */
493             else
494                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
495         }
496     }
497 }                               /* processFish */
498 \end{code}
499
500 @processFetch@ either returns the requested data (if available) 
501 or blocks the remote blocking queue on a black hole (if not).
502
503 \begin{code}
504 static void
505 processFetch(STG_NO_ARGS)
506 {
507     globalAddr ga, rga;
508     int load;
509
510     P_ closure;
511     P_ ip;
512
513     unpackFetch(&ga, &rga, &load);
514 #ifdef FETCH_DEBUG
515     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
516       ga.loc.gc.gtid, ga.loc.gc.slot,
517       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
518 #endif
519
520     closure = GALAlookup(&ga);
521     ip = (P_) INFO_PTR(closure);
522
523     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
524         /* Forward the Fetch to someone else */
525         sendFetch(FETCHME_GA(closure), &rga, load);
526     } else if (rga.loc.gc.gtid == mytid) {
527         /* Our own FETCH forwarded back around to us */
528         P_ fmbq = GALAlookup(&rga);
529
530         /* We may have already discovered that the fetch target is our own. */
531         if (fmbq != closure) 
532             CommonUp(fmbq, closure);
533         (void) addWeight(&rga);
534     } else if (IS_BLACK_HOLE(ip)) {
535         /* This includes RBH's and FMBQ's */
536         P_ bf;
537
538         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
539             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
540             closure = GALAlookup(&ga);
541             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
542         }
543         ASSERT(GALAlookup(&rga) == NULL);
544
545         SET_BF_HDR(bf, BF_info, bogosity);
546         BF_NODE(bf) = closure;
547         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
548         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
549         BF_WEIGHT(bf) = (W_) rga.weight;
550         blockFetch(bf, closure);
551
552 #ifdef FETCH_DEBUG
553         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
554           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
555 #endif
556
557     } else {                    
558         /* The target of the FetchMe is some local graph */
559         W_ size;
560         P_ graph;
561
562         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
563             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
564             SAVE_Hp -= PACK_HEAP_REQUIRED;
565             closure = GALAlookup(&ga);
566             graph = PackNearbyGraph(closure, &size);
567             ASSERT(graph != NULL);
568         }
569         sendResume(&rga, size, graph);
570     }
571 }
572 \end{code}
573
574 @processFree@ unpacks a FREE message and adds the weights to our GAs.
575
576 \begin{code}
577 static void
578 processFree(STG_NO_ARGS)
579 {
580     int nelem;
581     static W_ *freeBuffer;
582     int i;
583     globalAddr ga;
584
585     freeBuffer = gumPackBuffer;
586     unpackFree(&nelem, freeBuffer);
587 #ifdef FREE_DEBUG
588     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
589 #endif
590     ga.loc.gc.gtid = mytid;
591     for (i = 0; i < nelem;) {
592         ga.weight = (unsigned) freeBuffer[i++];
593         ga.loc.gc.slot = (int) freeBuffer[i++];
594 #ifdef FREE_DEBUG
595         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
596           ga.loc.gc.slot, ga.weight);
597 #endif
598         (void) addWeight(&ga);
599     }
600 }
601 \end{code}
602
603 @processResume@ unpacks a RESUME message into the graph, filling in
604 the LA -> GA, and GA -> LA tables. Threads blocked on the original
605 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
606 is converted into an indirection.  Finally it sends an ACK in response
607 which contains any newly allocated GAs.
608
609 \begin{code}
610
611 static void
612 processResume(GLOBAL_TASK_ID sender)
613 {
614     int nelem;
615     W_ nGAs;
616     static W_ *packBuffer;
617     P_ newGraph;
618     P_ old;
619     globalAddr lga;
620     globalAddr *gagamap;
621
622     packBuffer = gumPackBuffer;
623     unpackResume(&lga, &nelem, packBuffer);
624
625 #ifdef RESUME_DEBUG
626     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
627       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
628     PrintPacket(packBuffer);
629 #endif
630
631     /* 
632      * We always unpack the incoming graph, even if we've received the
633      * requested node in some other data packet (and already awakened
634      * the blocking queue).
635      */
636     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
637         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
638         SAVE_Hp -= packBuffer[0];
639     }
640
641     /* Do this *after* GC; we don't want to release the object early! */
642
643     if (lga.weight > 0)
644         (void) addWeight(&lga);
645
646     old = GALAlookup(&lga);
647
648     if (RTSflags.ParFlags.granSimStats) {
649         P_ tso = NULL;
650
651         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
652             for(tso = (P_) FMBQ_ENTRIES(old); 
653               TSO_LINK(tso) != Prelude_Z91Z93_closure; 
654               tso = TSO_LINK(tso))
655                 ;
656         }
657         /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
658         DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
659                          tso,old,0);
660     }
661
662     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
663     ASSERT(newGraph != NULL);
664
665     /* 
666      * Sometimes, unpacking will common up the resumee with the
667      * incoming graph, but if it hasn't, we'd better do so now.
668      */
669    
670     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
671         CommonUp(old, newGraph);
672
673 #ifdef RESUME_DEBUG
674     DebugPrintGAGAMap(gagamap, nGAs);
675 #endif
676
677     sendAck(sender, nGAs, gagamap);
678 }
679 \end{code}
680
681 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
682 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
683 the local spark queue.  Finally it sends an ACK in response
684 which contains any newly allocated GAs.
685
686 \begin{code}
687 static void
688 processSchedule(GLOBAL_TASK_ID sender)
689 {
690     int nelem;
691     int space_required;
692     rtsBool success;
693     static W_ *packBuffer;
694     W_ nGAs;
695     P_ newGraph;
696     globalAddr *gagamap;
697
698     packBuffer = gumPackBuffer;         /* HWL */
699     unpackSchedule(&nelem, packBuffer);
700
701 #ifdef SCHEDULE_DEBUG
702     fprintf(stderr, "Rcvd Schedule\n");
703     PrintPacket(packBuffer);
704 #endif
705
706     /*
707      * For now, the graph is a closure to be sparked as an advisory
708      * spark, but in future it may be a complete spark with
709      * required/advisory status, priority etc.
710      */
711
712     space_required = packBuffer[0];
713     if (SAVE_Hp + space_required >= SAVE_HpLim) {
714         ReallyPerformThreadGC(space_required, rtsFalse);
715         SAVE_Hp -= space_required;
716     }
717     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
718     ASSERT(newGraph != NULL);
719     success = Spark(newGraph, rtsFalse);
720     ASSERT(success);
721
722 #ifdef SCHEDULE_DEBUG
723     DebugPrintGAGAMap(gagamap, nGAs);
724 #endif
725
726     if (nGAs > 0)
727         sendAck(sender, nGAs, gagamap);
728
729     fishing = rtsFalse;
730 }
731 \end{code}
732
733 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
734 (which represent shared thunks that have been shipped) into fetch-mes
735 to remote GAs.
736
737 \begin{code}
738 static void
739 processAck(STG_NO_ARGS)
740 {
741     int nGAs;
742     globalAddr *gaga;
743
744     globalAddr gagamap[MAX_GAS * 2];
745
746     unpackAck(&nGAs, gagamap);
747
748 #ifdef ACK_DEBUG
749     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
750     DebugPrintGAGAMap(gagamap, nGAs);
751 #endif
752
753     /*
754      * For each (oldGA, newGA) pair, set the GA of the corresponding
755      * thunk to the newGA, convert the thunk to a FetchMe, and return
756      * the weight from the oldGA.
757      */
758     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
759         P_ old = GALAlookup(gaga);
760         P_ new = GALAlookup(gaga + 1);
761
762         if (new == NULL) {
763             /* We don't have this closure, so we make a fetchme for it */
764             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
765
766             convertToFetchMe(old, ga);
767         } else {
768             /* 
769              * Oops...we've got this one already; update the RBH to
770              * point to the object we already know about, whatever it
771              * happens to be.
772              */
773             CommonUp(old, new);
774
775             /* 
776              * Increase the weight of the object by the amount just
777              * received in the second part of the ACK pair.
778              */
779             (void) addWeight(gaga + 1);
780         }
781         (void) addWeight(gaga);
782     }
783 }
784 \end{code}
785
786 \section{GUM Message Processor}
787
788 @processMessages@ processes any messages that have arrived, calling
789 appropriate routines depending on the message tag
790 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
791 present and performs a blocking receive! During profiling it
792 busy-waits in order to record idle time.
793
794 \begin{code}
795 void
796 processMessages(STG_NO_ARGS)
797 {
798     PACKET packet;
799     OPCODE opcode;
800     CostCentre Save_CCC;
801
802     /* Temporary Test Definitions */
803     GLOBAL_TASK_ID task;
804
805     Save_CCC = CCC;
806     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
807     
808     do {
809         if (RTSflags.CcFlags.doCostCentres) {
810             CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
811             CCC->scc_count++;
812
813             while (!PacketsWaiting())
814                 /*busy-wait loop*/;
815
816             CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
817         }
818
819         packet = GetPacket();   /* Get next message; block until one available */
820         CCC->scc_count++;
821
822         get_opcode_and_sender(packet, &opcode, &task);
823
824         switch (opcode) {
825
826         case PP_FINISH:
827             EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
828                                  * else */
829             break;
830
831         case PP_FETCH:
832             processFetch();
833             break;
834
835         case PP_RESUME:
836             processResume(task);
837             break;
838
839         case PP_ACK:
840             processAck();
841             break;
842
843         case PP_FISH:
844             processFish();
845             break;
846
847         case PP_FREE:
848             processFree();
849             break;
850
851         case PP_SCHEDULE:
852             processSchedule(task);
853             break;
854
855         default:
856             /* Anything we're not prepared to deal with. */
857             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
858               mytid, opcode, task);
859
860             EXIT(EXIT_FAILURE);
861         }                       /* switch */
862
863     } while (PacketsWaiting()); /* While there are messages: process them */
864     CCC = Save_CCC;
865 }                               /* processMessages */
866 \end{code}
867
868 \section{Exception Handlers}
869
870
871 @Comms_Harness_Exception@ is an exception handler that tests out the different 
872 GUM messages. 
873
874 \begin{code}
875 void
876 Comms_Harness_Exception(packet)
877 PACKET packet;
878 {
879     int i, load;
880     globalAddr ga,bqga;
881 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
882     OPCODE opcode = Opcode(packet);
883     GLOBAL_TASK_ID task;
884     
885 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
886
887     switch (opcode) {
888
889     case PP_FINISH:
890         EXIT(EXIT_SUCCESS);
891         break;
892
893     case PP_FETCH:
894         {
895             W_ data[11];
896             get_opcode_and_sender(packet,&opcode,&task);
897             fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
898             unpackFetch(&ga,&bqga,&load);
899             fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
900                             ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight, 
901                             bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
902             /*Send Resume in Response*/
903             for (i=0; i <= 10; ++i) data[i] = i;
904             sendResume(&bqga,11,data);      
905         }
906         break;
907
908     case PP_ACK:
909         {
910             int nGAs;
911             globalAddr gagamap[MAX_GAS*2];
912
913             get_opcode_and_sender(packet,&opcode,&task);
914             fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
915             unpackAck(&nGAs,gagamap);
916 #ifdef DEBUG
917             DebugPrintGAGAMap(gagamap,nGAs);
918 #endif
919         }
920         break;
921
922     case PP_FISH:
923         {
924             GLOBAL_TASK_ID origPE;
925             int age, history, hunger;
926             globalAddr testGA;
927             StgWord testData[6];
928
929             get_opcode_and_sender(packet,&opcode,&task);
930             fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
931             unpackFish(&origPE, &age, &history, &hunger);
932             fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
933                             origPE, age, history, hunger);
934
935             testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
936             for (i=0; i <= 5; ++i) testData[i] = 40+i;
937             sendSchedule(origPE,6,testData);        
938         }
939         break;
940
941     case PP_SCHEDULE:
942         {                               /* Test variables */
943             int nelem;
944             int testData[6];
945
946             get_opcode_and_sender(packet,&opcode,&task);
947             fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
948             unpackSchedule(&nelem, &testData);
949             fprintf(stderr,"In PE, nelem = %d \n", nelem);
950             for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
951             fprintf(stderr,"\n");
952         }
953         break;
954
955       /* Anything we're not prepared to deal with.  Note that ALL
956        * opcodes are discarded during termination -- this helps
957        * prevent bizarre race conditions.
958        */
959       default:
960         if (!GlobalStopPending) 
961           {
962             GLOBAL_TASK_ID ErrorTask;
963             int opcode;
964
965             get_opcode_and_sender(packet,&opcode,&ErrorTask);
966             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
967                     mytid, opcode, ErrorTask );
968             
969             PEShutDown();
970             
971             EXIT(EXIT_FAILURE);
972           }
973     }
974 }
975 \end{code}
976
977 @STG_Exception@ handles real communication exceptions
978
979 \begin{code}
980 void
981 STG_Exception(packet)
982 PACKET packet;
983 {
984     GLOBAL_TASK_ID sender = Sender_Task(packet); 
985     OPCODE opcode = Opcode(packet);
986 #if 0    
987     fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); 
988 #endif
989     switch (opcode) {
990
991     case PP_FINISH:
992         EXIT(EXIT_SUCCESS);
993         break;
994
995       /* Anything we're not prepared to deal with.  Note that ALL opcodes are discarded
996          during termination -- this helps prevent bizarre race conditions.
997       */
998       default:
999         if (!GlobalStopPending) 
1000           {
1001             GLOBAL_TASK_ID ErrorTask;
1002             int opcode;
1003
1004             get_opcode_and_sender(packet,&opcode,&ErrorTask);
1005             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1006                     mytid, opcode, ErrorTask );
1007             
1008             EXIT(EXIT_FAILURE);
1009           }
1010     }
1011 }
1012 \end{code}
1013
1014 \section{Miscellaneous Functions}
1015
1016 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1017 Important properties:
1018 o it varies during execution, even if the PE is idle
1019 o it's different for each PE
1020 o we never send a fish to ourselves
1021
1022 \begin{code}
1023 extern long lrand48 (STG_NO_ARGS);
1024
1025 GLOBAL_TASK_ID
1026 choosePE(STG_NO_ARGS)
1027 {
1028     long temp;
1029
1030     temp = lrand48() % nPEs;
1031     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
1032         temp = (temp + 1) % nPEs;
1033     }
1034     return PEs[temp];
1035 }
1036 \end{code}
1037
1038 @WaitForTermination@ enters an infinite loop waiting for the
1039 termination sequence to be completed.
1040
1041 \begin{code}
1042 void
1043 WaitForTermination(STG_NO_ARGS)
1044 {
1045   do {
1046     PACKET p = GetPacket();
1047     HandleException(p);
1048   } while (rtsTrue);
1049 }
1050 \end{code}
1051
1052 \begin{code}
1053 #ifdef DEBUG
1054 void
1055 DebugPrintGAGAMap(gagamap, nGAs)
1056 globalAddr *gagamap;
1057 int nGAs;
1058 {
1059     int i;
1060
1061     for (i = 0; i < nGAs; ++i, gagamap += 2)
1062         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1063           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1064           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1065 }
1066 #endif
1067 \end{code}
1068
1069 \begin{code}
1070
1071 static PP_ freeMsgBuffer = NULL;
1072 static int *freeMsgIndex = NULL;
1073
1074 void
1075 prepareFreeMsgBuffers(STG_NO_ARGS)
1076 {
1077     int i;
1078
1079     /* Allocate the freeMsg buffers just once and then hang onto them. */
1080
1081     if (freeMsgIndex == NULL) {
1082
1083         freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1084         freeMsgBuffer = (PP_)  stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1085
1086         for(i = 0; i < nPEs; i++) {
1087             if (i != thisPE) {
1088               freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1089                                         "prepareFreeMsgBuffers (Buffer #i)");
1090             }
1091         }
1092     }
1093
1094     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1095     for (i = 0; i < nPEs; i++)
1096         freeMsgIndex[i] = 0;
1097 }
1098
1099 void
1100 freeRemoteGA(pe, ga)
1101 int pe;
1102 globalAddr *ga;
1103 {
1104     int i;
1105
1106     ASSERT(GALAlookup(ga) == NULL);
1107
1108     if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1109 #ifdef FREE_DEBUG
1110         fprintf(stderr, "Filled a free message buffer\n");      
1111 #endif
1112         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1113         i = 0;
1114     }
1115     freeMsgBuffer[pe][i++] = (W_) ga->weight;
1116     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1117     freeMsgIndex[pe] = i;
1118 #ifdef DEBUG
1119     ga->weight = 0x0f0f0f0f;
1120     ga->loc.gc.gtid = 0x666;
1121     ga->loc.gc.slot = 0xdeaddead;
1122 #endif
1123 }
1124
1125 void
1126 sendFreeMessages(STG_NO_ARGS)
1127 {
1128     int i;
1129
1130     for (i = 0; i < nPEs; i++) {
1131         if (freeMsgIndex[i] > 0)
1132             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1133     }
1134 }
1135
1136 #endif /* PAR -- whole file */
1137 \end{code}