[project @ 1996-01-08 20:28:12 by partain]
[ghc-hetmet.git] / ghc / runtime / gum / HLComms.lc
1 /****************************************************************
2 *                                                               *
3 *       High Level Communications Routines (HLComms.lc)         *
4 *                                                               *
5 *  Contains the high-level routines (i.e. communication         *
6 *  subsystem independent) used by GUM                           *
7 *  (c) The Parade/AQUA Projects, Glasgow University, 1995       *
8 *  Phil Trinder, Glasgow University, 12 December 1994           *
9 *                                                               *
10 *****************************************************************/
11 \begin{code}
12 #ifdef PAR /* whole file */
13
14 #define NON_POSIX_SOURCE /* so says Solaris */
15
16 #include "rtsdefs.h"
17 #include "HLC.h"
18 \end{code}
19
20 \section{GUM Message Sending and Unpacking Functions}
21
22
23 @SendFetch@ packs the two global addresses and a load into a message +
24 sends it.  
25
26 \begin{code}
27 void
28 sendFetch(rga, lga, load)
29 globalAddr *rga, *lga;
30 int load;
31 {
32     CostCentre Save_CCC = CCC;
33
34     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
35     CCC->scc_count++;
36
37     ASSERT(rga->weight > 0 && lga->weight > 0);
38 #ifdef FETCH_DEBUG    
39     fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n", 
40       rga->loc.gc.gtid, rga->loc.gc.slot, load);
41 #endif
42     SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
43       (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot, 
44       (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
45
46     CCC = Save_CCC;
47 }
48 \end{code}
49
50 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
51
52 \begin{code}
53
54 static void
55 unpackFetch(lga, rga, load)
56 globalAddr *lga, *rga;
57 int *load;
58 {
59     long buf[6];
60
61     GetArgs(buf, 6); 
62     lga->weight = 1;
63     lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
64     lga->loc.gc.slot = (int) buf[1];
65
66     rga->weight = (unsigned) buf[2];
67     rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
68     rga->loc.gc.slot = (int) buf[4];
69
70     *load = (int) buf[5];
71
72     ASSERT(rga->weight > 0);
73 }
74 \end{code}
75
76 @SendResume@ packs the remote blocking queue's GA and data into a message 
77 and sends it.
78
79 \begin{code}
80 void
81 sendResume(rga, nelem, data)
82 globalAddr *rga;
83 int nelem;
84 P_ data;
85 {
86     CostCentre Save_CCC = CCC;
87
88     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
89     CCC->scc_count++;
90
91 #ifdef RESUME_DEBUG
92     PrintPacket(data);
93     fprintf(stderr, "Sending Resume for (%x, %d, %x)\n", 
94       rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
95 #endif
96
97     SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
98       (W_) rga->weight, (W_) rga->loc.gc.slot);
99
100     CCC = Save_CCC;
101 }
102 \end{code}
103
104 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
105
106 \begin{code}
107 static void
108 blockFetch(bf, bh)
109 P_ bf;
110 P_ bh;
111 {
112     switch (INFO_TYPE(INFO_PTR(bh))) {
113     case INFO_BH_TYPE:
114         BF_LINK(bf) = Nil_closure;
115         SET_INFO_PTR(bh, BQ_info);
116         BQ_ENTRIES(bh) = (W_) bf;
117
118 #ifdef GC_MUT_REQUIRED
119         /*
120          * If we modify a black hole in the old generation, we have to make sure it
121          * goes on the mutables list
122          */
123
124         if (bh <= StorageMgrInfo.OldLim) {
125             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
126             StorageMgrInfo.OldMutables = bh;
127         } else
128             MUT_LINK(bh) = MUT_NOT_LINKED;
129 #endif
130         break;
131     case INFO_BQ_TYPE:
132         BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
133         BQ_ENTRIES(bh) = (W_) bf;
134         break;
135     case INFO_FMBQ_TYPE:
136         BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
137         FMBQ_ENTRIES(bh) = (W_) bf;
138         break;
139     case INFO_SPEC_RBH_TYPE:
140         BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
141         SPEC_RBH_BQ(bh) = (W_) bf;
142         break;
143     case INFO_GEN_RBH_TYPE:
144         BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
145         GEN_RBH_BQ(bh) = (W_) bf;
146         break;
147     default:
148         fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
149           (W_) bh, INFO_PTR(bh));
150         EXIT(EXIT_FAILURE);
151     }
152 }
153 \end{code}
154
155 @processFetches@ constructs and sends resume messages for every
156 @BlockedFetch@ which is ready to be awakened.
157
158 \begin{code}
159 extern P_ PendingFetches;
160
161 void
162 processFetches()
163 {
164     P_ bf;
165     P_ next;
166     P_ closure;
167     P_ ip;
168     globalAddr rga;
169     
170     for (bf = PendingFetches; bf != Nil_closure; bf = next) {
171         next = BF_LINK(bf);
172
173         /*
174          * Find the target at the end of the indirection chain, and process it in
175          * much the same fashion as the original target of the fetch.  Though we
176          * hope to find graph here, we could find a black hole (of any flavor) or
177          * even a FetchMe.
178          */
179         closure = BF_NODE(bf);
180         while (IS_INDIRECTION(INFO_PTR(closure)))
181             closure = (P_) IND_CLOSURE_PTR(closure);
182         ip = (P_) INFO_PTR(closure);
183
184         if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
185             /* Forward the Fetch to someone else */
186             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
187             rga.loc.gc.slot = (int) BF_SLOT(bf);
188             rga.weight = (unsigned) BF_WEIGHT(bf);
189
190             sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
191         } else if (IS_BLACK_HOLE(ip)) {
192             BF_NODE(bf) = closure;
193             blockFetch(bf, closure);
194         } else {
195             /* We now have some local graph to send back */
196             W_ size;
197             P_ graph;
198
199             if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
200                 PendingFetches = bf;
201                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
202                 SAVE_Hp -= PACK_HEAP_REQUIRED;
203                 bf = PendingFetches;
204                 next = BF_LINK(bf);
205                 closure = BF_NODE(bf);
206                 graph = PackNearbyGraph(closure, &size);
207                 ASSERT(graph != NULL);
208             }
209             rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
210             rga.loc.gc.slot = (int) BF_SLOT(bf);
211             rga.weight = (unsigned) BF_WEIGHT(bf);
212
213             sendResume(&rga, size, graph);
214         }
215     }
216     PendingFetches = Nil_closure;
217 }
218
219 \end{code}
220
221 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
222
223 \begin{code}
224
225 static void
226 unpackResume(lga, nelem, data)
227 globalAddr *lga;
228 int *nelem;
229 StgWord *data;
230 {
231     long buf[3];
232
233     GetArgs(buf, 3); 
234     lga->weight = (unsigned) buf[0];
235     lga->loc.gc.gtid = mytid;
236     lga->loc.gc.slot = (int) buf[1];
237
238     *nelem = (int) buf[2];
239     GetArgs(data, *nelem);
240 }
241 \end{code}
242
243 @SendAck@ packs the global address being acknowledged, together with
244 an array of global addresses for any closures shipped and sends them.
245 \begin{code}
246
247 void
248 sendAck(task, ngas, gagamap)
249 GLOBAL_TASK_ID task;
250 int ngas;
251 globalAddr *gagamap;
252 {
253     long buffer[PACK_BUFFER_SIZE - PACK_HDR_SIZE];
254     long *p;
255     int i;
256
257     CostCentre Save_CCC = CCC;
258
259     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
260     CCC->scc_count++;
261
262     for(i = 0, p = buffer; i < ngas; i++, p += 6) {
263         ASSERT(gagamap[1].weight > 0);
264         p[0] = (long) gagamap->weight;
265         p[1] = (long) gagamap->loc.gc.gtid;
266         p[2] = (long) gagamap->loc.gc.slot;
267         gagamap++;
268         p[3] = (long) gagamap->weight;
269         p[4] = (long) gagamap->loc.gc.gtid;
270         p[5] = (long) gagamap->loc.gc.slot;
271         gagamap++;
272     }
273 #ifdef ACK_DEBUG    
274     fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
275 #endif
276     SendOpN(PP_ACK, task, p - buffer, buffer);
277
278     CCC = Save_CCC;
279 }
280 \end{code}
281
282 @unpackAck@ unpacks an Acknowledgement message into a Global address,
283 a count of the number of global addresses following and a map of 
284 Global addresses
285
286 \begin{code}
287
288 static void
289 unpackAck(ngas, gagamap)
290 int *ngas;
291 globalAddr *gagamap;
292 {
293     long GAarraysize;
294     long buf[6];
295
296     GetArgs(&GAarraysize, 1);
297
298     *ngas = GAarraysize / 6;
299
300     while (GAarraysize > 0) {
301         GetArgs(buf, 6);
302         gagamap->weight = (unsigned) buf[0];
303         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
304         gagamap->loc.gc.slot = (int) buf[2];
305         gagamap++;
306         gagamap->weight = (unsigned) buf[3];
307         gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
308         gagamap->loc.gc.slot = (int) buf[5];
309         ASSERT(gagamap->weight > 0);
310         gagamap++;
311         GAarraysize -= 6;
312     }
313 }
314 \end{code}
315
316 @SendFish@ packs the global address being acknowledged, together with
317 an array of global addresses for any closures shipped and sends them.
318 \begin{code}
319
320 void
321 sendFish(destPE, origPE, age, history, hunger)
322 GLOBAL_TASK_ID destPE, origPE;
323 int age, history, hunger;
324 {
325     CostCentre Save_CCC = CCC;
326
327     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
328     CCC->scc_count++;
329
330 #ifdef FISH_DEBUG
331     fprintf(stderr,"Sending Fish to %lx\n", destPE);
332 #endif
333     SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
334     if (origPE == mytid)
335         fishing = rtsTrue;
336
337     CCC = Save_CCC;
338 }
339 \end{code}
340
341 @unpackFish@ unpacks a FISH message into the global task id of the
342 originating PE and 3 data fields: the age, history and hunger of the
343 fish. The history + hunger are not currently used.
344
345 \begin{code}
346
347 static void
348 unpackFish(origPE, age, history, hunger)
349 GLOBAL_TASK_ID *origPE;
350 int *age, *history, *hunger;
351 {
352     long buf[4];
353
354     GetArgs(buf, 4);
355
356     *origPE = (GLOBAL_TASK_ID) buf[0];
357     *age = (int) buf[1];
358     *history = (int) buf[2];
359     *hunger = (int) buf[3];
360 }
361 \end{code}
362
363 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
364 to.
365
366 \begin{code}
367 void
368 sendFree(pe, nelem, data)
369 GLOBAL_TASK_ID pe;
370 int nelem;
371 P_ data;
372 {
373     CostCentre Save_CCC = CCC;
374
375     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
376     CCC->scc_count++;
377
378 #ifdef FREE_DEBUG
379     fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
380 #endif
381     SendOpN(PP_FREE, pe, nelem, data);
382
383     CCC = Save_CCC;
384 }
385
386 \end{code}
387
388 @unpackFree@ unpacks a FREE message into the amount of data shipped and
389 a data block.
390
391 \begin{code}
392
393 static void
394 unpackFree(nelem, data)
395 int *nelem;
396 W_ *data;
397 {
398     long buf[1];
399
400     GetArgs(buf, 1);
401     *nelem = (int) buf[0];
402     GetArgs(data, *nelem);
403 }
404 \end{code}
405
406 @SendSchedule@ sends a closure to be evaluated in response to a Fish
407 message. The message is directed to the PE that originated the Fish
408 (origPE), and includes the packed closure (data) along with its size
409 (nelem).
410
411 \begin{code}
412
413 void
414 sendSchedule(origPE, nelem, data)
415 GLOBAL_TASK_ID origPE;
416 int nelem;
417 P_ data;
418 {
419
420     CostCentre Save_CCC = CCC;
421
422     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
423     CCC->scc_count++;
424
425 #ifdef SCHEDULE_DEBUG
426     PrintPacket(data);
427     fprintf(stderr, "Sending Schedule to %x\n", origPE);
428 #endif
429
430     SendOpN(PP_SCHEDULE, origPE, nelem, data);
431
432     CCC = Save_CCC;
433 }
434 \end{code}
435
436 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
437 the closure shipped, the amount of data shipped (nelem) and the data
438 block (data).
439
440 \begin{code}
441
442 static void
443 unpackSchedule(nelem, data)
444 int *nelem;
445 W_ *data;
446 {
447     long buf[1];
448
449     GetArgs(buf, 1);
450     *nelem = (int) buf[0];
451     GetArgs(data, *nelem);
452 }
453 \end{code}
454
455 \section{Message-Processing Functions}
456
457 The following routines process incoming GUM messages. Often reissuing
458 messages in response.
459
460 @processFish@ unpacks a fish message, reissuing it if it's our own,
461 sending work if we have it or sending it onwards otherwise.
462
463 \begin{code}
464 static void
465 processFish(STG_NO_ARGS)
466 {
467     GLOBAL_TASK_ID origPE;
468     int age, history, hunger;
469
470     unpackFish(&origPE, &age, &history, &hunger);
471
472     /* Ignore our own fish if we're busy; otherwise send it out after a delay */
473     if (origPE == mytid) {
474         fishing = rtsFalse;
475     } else {
476         P_ spark;
477
478         while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
479             W_ size;
480             P_ graph;
481
482             if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
483                 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
484                 SAVE_Hp -= PACK_HEAP_REQUIRED;
485                 /* Now go back and try again */
486             } else {
487                 sendSchedule(origPE, size, graph);
488                 DisposeSpark(spark);
489                 break;
490             }
491         }
492         if (spark == NULL) {
493             /* We have no sparks to give */
494             if (age < FISH_LIFE_EXPECTANCY)
495                 sendFish(choosePE(), origPE,
496                   (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
497
498             /* Send it home to die */
499             else
500                 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
501         }
502     }
503 }                               /* processFish */
504 \end{code}
505
506 @processFetch@ either returns the requested data (if available) 
507 or blocks the remote blocking queue on a black hole (if not).
508
509 \begin{code}
510 static void
511 processFetch(STG_NO_ARGS)
512 {
513     globalAddr ga, rga;
514     int load;
515
516     P_ closure;
517     P_ ip;
518
519     unpackFetch(&ga, &rga, &load);
520 #ifdef FETCH_DEBUG
521     fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
522       ga.loc.gc.gtid, ga.loc.gc.slot,
523       rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
524 #endif
525
526     closure = GALAlookup(&ga);
527     ip = (P_) INFO_PTR(closure);
528
529     if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
530         /* Forward the Fetch to someone else */
531         sendFetch(FETCHME_GA(closure), &rga, load);
532     } else if (rga.loc.gc.gtid == mytid) {
533         /* Our own FETCH forwarded back around to us */
534         P_ fmbq = GALAlookup(&rga);
535
536         /* We may have already discovered that the fetch target is our own. */
537         if (fmbq != closure) 
538             CommonUp(fmbq, closure);
539         (void) addWeight(&rga);
540     } else if (IS_BLACK_HOLE(ip)) {
541         /* This includes RBH's and FMBQ's */
542         P_ bf;
543
544         if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
545             ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
546             closure = GALAlookup(&ga);
547             bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
548         }
549         ASSERT(GALAlookup(&rga) == NULL);
550
551         SET_BF_HDR(bf, BF_info, bogosity);
552         BF_NODE(bf) = closure;
553         BF_GTID(bf) = (W_) rga.loc.gc.gtid;
554         BF_SLOT(bf) = (W_) rga.loc.gc.slot;
555         BF_WEIGHT(bf) = (W_) rga.weight;
556         blockFetch(bf, closure);
557
558 #ifdef FETCH_DEBUG
559         fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
560           rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
561 #endif
562
563     } else {                    
564         /* The target of the FetchMe is some local graph */
565         W_ size;
566         P_ graph;
567
568         if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
569             ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
570             SAVE_Hp -= PACK_HEAP_REQUIRED;
571             closure = GALAlookup(&ga);
572             graph = PackNearbyGraph(closure, &size);
573             ASSERT(graph != NULL);
574         }
575         sendResume(&rga, size, graph);
576     }
577 }
578 \end{code}
579
580 @processFree@ unpacks a FREE message and adds the weights to our GAs.
581
582 \begin{code}
583 static void
584 processFree(STG_NO_ARGS)
585 {
586     int nelem;
587     W_ freeBuffer[PACK_BUFFER_SIZE];
588     int i;
589     globalAddr ga;
590
591     unpackFree(&nelem, freeBuffer);
592 #ifdef FREE_DEBUG
593     fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
594 #endif
595     ga.loc.gc.gtid = mytid;
596     for (i = 0; i < nelem;) {
597         ga.weight = (unsigned) freeBuffer[i++];
598         ga.loc.gc.slot = (int) freeBuffer[i++];
599 #ifdef FREE_DEBUG
600         fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid, 
601           ga.loc.gc.slot, ga.weight);
602 #endif
603         (void) addWeight(&ga);
604     }
605 }
606 \end{code}
607
608 @processResume@ unpacks a RESUME message into the graph, filling in
609 the LA -> GA, and GA -> LA tables. Threads blocked on the original
610 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
611 is converted into an indirection.  Finally it sends an ACK in response
612 which contains any newly allocated GAs.
613
614 \begin{code}
615
616 static void
617 processResume(sender)
618 GLOBAL_TASK_ID sender;
619 {
620     int nelem;
621     W_ packBuffer[PACK_BUFFER_SIZE], nGAs;
622     P_ newGraph;
623     P_ old;
624     globalAddr lga;
625     globalAddr *gagamap;
626
627     unpackResume(&lga, &nelem, packBuffer);
628
629 #ifdef RESUME_DEBUG
630     fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
631       lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
632     PrintPacket(packBuffer);
633 #endif
634
635     /* 
636      * We always unpack the incoming graph, even if we've received the
637      * requested node in some other data packet (and already awakened the
638      * blocking queue).
639      */
640     if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
641         ReallyPerformThreadGC(packBuffer[0], rtsFalse);
642         SAVE_Hp -= packBuffer[0];
643     }
644
645     /* Do this *after* GC; we don't want to release the object early! */
646
647     if (lga.weight > 0)
648         (void) addWeight(&lga);
649
650     old = GALAlookup(&lga);
651
652     if (do_gr_profile) {
653         P_ tso = NULL;
654
655         if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
656             for(tso = (P_) FMBQ_ENTRIES(old); 
657               TSO_LINK(tso) != Nil_closure; 
658               tso = TSO_LINK(tso))
659                 ;
660         }
661         DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender));
662     }
663
664     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
665     ASSERT(newGraph != NULL);
666
667     /* 
668      * Sometimes, unpacking will common up the resumee with the incoming graph,
669      * but if it hasn't, we'd better do so now.
670      */
671    
672     if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
673         CommonUp(old, newGraph);
674
675 #ifdef RESUME_DEBUG
676     DebugPrintGAGAMap(gagamap, nGAs);
677 #endif
678
679     sendAck(sender, nGAs, gagamap);
680 }
681 \end{code}
682
683 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
684 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
685 the local spark queue.  Finally it sends an ACK in response
686 which contains any newly allocated GAs.
687
688 \begin{code}
689 static void
690 processSchedule(sender)
691 GLOBAL_TASK_ID sender;
692 {
693     int nelem;
694     int space_required;
695     rtsBool success;
696     W_ packBuffer[PACK_BUFFER_SIZE], nGAs;
697     P_ newGraph;
698     globalAddr *gagamap;
699
700     unpackSchedule(&nelem, packBuffer);
701
702 #ifdef SCHEDULE_DEBUG
703     fprintf(stderr, "Rcvd Schedule\n");
704     PrintPacket(packBuffer);
705 #endif
706
707     /*
708      * For now, the graph is a closure to be sparked as an advisory spark, but in
709      * future it may be a complete spark with required/advisory status, priority
710      * etc.
711      */
712
713     space_required = packBuffer[0];
714     if (SAVE_Hp + space_required >= SAVE_HpLim) {
715         ReallyPerformThreadGC(space_required, rtsFalse);
716         SAVE_Hp -= space_required;
717     }
718     newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
719     ASSERT(newGraph != NULL);
720     success = Spark(newGraph, rtsFalse);
721     ASSERT(success);
722
723 #ifdef SCHEDULE_DEBUG
724     DebugPrintGAGAMap(gagamap, nGAs);
725 #endif
726
727     if (nGAs > 0)
728         sendAck(sender, nGAs, gagamap);
729
730     fishing = rtsFalse;
731 }
732 \end{code}
733
734 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
735 (which represent shared thunks that have been shipped) into fetch-mes
736 to remote GAs.
737
738 \begin{code}
739 static void
740 processAck(STG_NO_ARGS)
741 {
742     int nGAs;
743     globalAddr *gaga;
744
745     globalAddr gagamap[MAX_GAS * 2];
746
747     unpackAck(&nGAs, gagamap);
748
749 #ifdef ACK_DEBUG
750     fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
751     DebugPrintGAGAMap(gagamap, nGAs);
752 #endif
753
754     /*
755      * For each (oldGA, newGA) pair, set the GA of the corresponding thunk to the
756      * newGA, convert the thunk to a FetchMe, and return the weight from the oldGA.
757      */
758     for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
759         P_ old = GALAlookup(gaga);
760         P_ new = GALAlookup(gaga + 1);
761
762         if (new == NULL) {
763             /* We don't have this closure, so we make a fetchme for it */
764             globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
765
766             convertToFetchMe(old, ga);
767         } else {
768             /* 
769              * Oops...we've got this one already; update the RBH to point to
770              * the object we already know about, whatever it happens to be.
771              */
772             CommonUp(old, new);
773
774             /* 
775              * Increase the weight of the object by the amount just received 
776              * in the second part of the ACK pair.
777              */
778             (void) addWeight(gaga + 1);
779         }
780         (void) addWeight(gaga);
781     }
782 }
783 \end{code}
784
785 \section{GUM Message Processor}
786
787 @processMessages@ processes any messages that have arrived, calling
788 appropriate routines depending on the message tag
789 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
790 present and performs a blocking receive! During profiling it
791 busy-waits in order to record idle time.
792
793 \begin{code}
794 void
795 processMessages(STG_NO_ARGS)
796 {
797     PACKET packet;
798     OPCODE opcode;
799     CostCentre Save_CCC;
800
801     /* Temporary Test Definitions */
802     GLOBAL_TASK_ID task;
803
804     Save_CCC = CCC;
805     CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
806     
807     do {
808         if (cc_profiling) {
809             CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
810             CCC->scc_count++;
811
812             while (!PacketsWaiting())
813                 /*busy-wait loop*/;
814
815             CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
816         }
817
818         packet = GetPacket();   /* Get next message; block until one available */
819         CCC->scc_count++;
820
821         get_opcode_and_sender(packet, &opcode, &task);
822
823         switch (opcode) {
824
825         case PP_FINISH:
826             EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
827                                  * else */
828             break;
829
830         case PP_FETCH:
831             processFetch();
832             break;
833
834         case PP_RESUME:
835             processResume(task);
836             break;
837
838         case PP_ACK:
839             processAck();
840             break;
841
842         case PP_FISH:
843             processFish();
844             break;
845
846         case PP_FREE:
847             processFree();
848             break;
849
850         case PP_SCHEDULE:
851             processSchedule(task);
852             break;
853
854         default:
855             /* Anything we're not prepared to deal with. */
856             fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
857               mytid, opcode, task);
858
859             EXIT(EXIT_FAILURE);
860         }                       /* switch */
861
862     } while (PacketsWaiting()); /* While there are messages: process them */
863     CCC = Save_CCC;
864 }                               /* processMessages */
865 \end{code}
866
867 \section{Exception Handlers}
868
869
870 @Comms_Harness_Exception@ is an exception handler that tests out the different 
871 GUM messages. 
872
873 \begin{code}
874 void
875 Comms_Harness_Exception(packet)
876 PACKET packet;
877 {
878     int i, load;
879     globalAddr ga,bqga;
880 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
881     OPCODE opcode = Opcode(packet);
882     GLOBAL_TASK_ID task;
883     
884 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
885
886     switch (opcode) {
887
888     case PP_IO_INIT:
889         IAmMainThread = rtsTrue;        /* This processor is the IO task */
890 /*        fprintf(stderr,"I am Main Thread\n"); */
891         break;
892
893     case PP_FINISH:
894         EXIT(EXIT_SUCCESS);
895         break;
896
897     case PP_FETCH:
898         {
899             W_ data[11];
900             get_opcode_and_sender(packet,&opcode,&task);
901             fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
902             unpackFetch(&ga,&bqga,&load);
903             fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
904                             ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight, 
905                             bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
906             /*Send Resume in Response*/
907             for (i=0; i <= 10; ++i) data[i] = i;
908             sendResume(&bqga,11,data);      
909         }
910         break;
911
912     case PP_ACK:
913         {
914             int nGAs;
915             globalAddr gagamap[MAX_GAS*2];
916
917             get_opcode_and_sender(packet,&opcode,&task);
918             fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
919             unpackAck(&nGAs,gagamap);
920 #ifdef DEBUG
921             DebugPrintGAGAMap(gagamap,nGAs);
922 #endif
923         }
924         break;
925
926     case PP_FISH:
927         {
928             GLOBAL_TASK_ID origPE;
929             int age, history, hunger;
930             globalAddr testGA;
931             StgWord testData[6];
932
933             get_opcode_and_sender(packet,&opcode,&task);
934             fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
935             unpackFish(&origPE, &age, &history, &hunger);
936             fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
937                             origPE, age, history, hunger);
938
939             testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
940             for (i=0; i <= 5; ++i) testData[i] = 40+i;
941             sendSchedule(origPE,6,testData);        
942         }
943         break;
944
945     case PP_SCHEDULE:
946         {                               /* Test variables */
947             int nelem;
948             int testData[6];
949
950             get_opcode_and_sender(packet,&opcode,&task);
951             fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
952             unpackSchedule(&nelem, &testData);
953             fprintf(stderr,"In PE, nelem = %d \n", nelem);
954             for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
955             fprintf(stderr,"\n");
956         }
957         break;
958
959       /* Anything we're not prepared to deal with.  Note that ALL opcodes are discarded
960          during termination -- this helps prevent bizarre race conditions.
961       */
962       default:
963         if (!GlobalStopPending) 
964           {
965             GLOBAL_TASK_ID ErrorTask;
966             int opcode;
967
968             get_opcode_and_sender(packet,&opcode,&ErrorTask);
969             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
970                     mytid, opcode, ErrorTask );
971             
972             PEShutDown();
973             
974             EXIT(EXIT_FAILURE);
975           }
976     }
977 }
978 \end{code}
979
980 @STG_Exception@ handles real communication exceptions
981
982 \begin{code}
983 void
984 STG_Exception(packet)
985 PACKET packet;
986 {
987 /*  GLOBAL_TASK_ID sender = Sender_Task(packet); */
988     OPCODE opcode = Opcode(packet);
989     
990 /*    fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
991
992     switch (opcode) {
993
994     case PP_IO_INIT:
995         IAmMainThread = rtsTrue;        /* This processor is the IO task */
996 /*        fprintf(stderr,"I am Main Thread\n"); */
997         break;
998
999     case PP_FINISH:
1000         EXIT(EXIT_SUCCESS);
1001         break;
1002
1003       /* Anything we're not prepared to deal with.  Note that ALL opcodes are discarded
1004          during termination -- this helps prevent bizarre race conditions.
1005       */
1006       default:
1007         if (!GlobalStopPending) 
1008           {
1009             GLOBAL_TASK_ID ErrorTask;
1010             int opcode;
1011
1012             get_opcode_and_sender(packet,&opcode,&ErrorTask);
1013             fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1014                     mytid, opcode, ErrorTask );
1015             
1016             EXIT(EXIT_FAILURE);
1017           }
1018     }
1019 }
1020 \end{code}
1021
1022 \section{Miscellaneous Functions}
1023
1024 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1025 Important properties:
1026 o it varies during execution, even if the PE is idle
1027 o it's different for each PE
1028 o we never send a fish to ourselves
1029
1030 \begin{code}
1031 extern long lrand48 (STG_NO_ARGS);
1032
1033 GLOBAL_TASK_ID
1034 choosePE(STG_NO_ARGS)
1035 {
1036     long temp;
1037
1038     temp = lrand48() % nPEs;
1039     if (PEs[temp] == mytid) {   /* Never send a FISH to yourself */
1040         temp = (temp + 1) % nPEs;
1041     }
1042     return PEs[temp];
1043 }
1044 \end{code}
1045
1046 @WaitForTermination@ enters an infinite loop waiting for the
1047 termination sequence to be completed.
1048
1049 \begin{code}
1050 void
1051 WaitForTermination(STG_NO_ARGS)
1052 {
1053   do {
1054     PACKET p = GetPacket();
1055     HandleException(p);
1056   } while (rtsTrue);
1057 }
1058 \end{code}
1059
1060 \begin{code}
1061 #ifdef DEBUG
1062 void
1063 DebugPrintGAGAMap(gagamap, nGAs)
1064 globalAddr *gagamap;
1065 int nGAs;
1066 {
1067     int i;
1068
1069     for (i = 0; i < nGAs; ++i, gagamap += 2)
1070         fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1071           gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1072           gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1073 }
1074 #endif
1075 \end{code}
1076
1077 \begin{code}
1078
1079 static PP_ freeMsgBuffer = NULL;
1080 static int *freeMsgIndex = NULL;
1081
1082 void
1083 prepareFreeMsgBuffers(STG_NO_ARGS)
1084 {
1085     int i;
1086
1087     /* Allocate the freeMsg buffers just once and then hang onto them. */
1088
1089     if (freeMsgIndex == NULL) {
1090         freeMsgIndex = (int *) malloc(nPEs * sizeof(int));
1091         freeMsgBuffer = (PP_) malloc(nPEs * sizeof(long *));
1092         if (freeMsgIndex == NULL || freeMsgBuffer == NULL) {
1093             fflush(stdout);
1094             fprintf(stderr, "VM exhausted\n");
1095             EXIT(EXIT_FAILURE);
1096         }
1097         for(i = 0; i < nPEs; i++) {
1098             if(i != thisPE &&
1099               (freeMsgBuffer[i] = (P_) malloc(PACK_BUFFER_SIZE * sizeof(W_))) == NULL) {
1100                 fflush(stdout);
1101                 fprintf(stderr, "VM exhausted\n");
1102                 EXIT(EXIT_FAILURE);
1103             }
1104         }
1105     }
1106
1107     /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1108     for (i = 0; i < nPEs; i++)
1109         freeMsgIndex[i] = 0;
1110 }
1111
1112 void
1113 freeRemoteGA(pe, ga)
1114 int pe;
1115 globalAddr *ga;
1116 {
1117     int i;
1118
1119     ASSERT(GALAlookup(ga) == NULL);
1120
1121     if ((i = freeMsgIndex[pe]) + 2 >= PACK_BUFFER_SIZE) {
1122 #ifdef FREE_DEBUG
1123         fprintf(stderr, "Filled a free message buffer\n");      
1124 #endif
1125         sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1126         i = 0;
1127     }
1128     freeMsgBuffer[pe][i++] = (W_) ga->weight;
1129     freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1130     freeMsgIndex[pe] = i;
1131 #ifdef DEBUG
1132     ga->weight = 0x0f0f0f0f;
1133     ga->loc.gc.gtid = 0x666;
1134     ga->loc.gc.slot = 0xdeaddead;
1135 #endif
1136 }
1137
1138 void
1139 sendFreeMessages(STG_NO_ARGS)
1140 {
1141     int i;
1142
1143     for (i = 0; i < nPEs; i++) {
1144         if (freeMsgIndex[i] > 0)
1145             sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1146     }
1147 }
1148
1149 #endif /* PAR -- whole file */
1150 \end{code}