1 /****************************************************************
3 * High Level Communications Routines (HLComms.lc) *
5 * Contains the high-level routines (i.e. communication *
6 * subsystem independent) used by GUM *
7 * (c) The Parade/AQUA Projects, Glasgow University, 1995 *
8 * Phil Trinder, Glasgow University, 12 December 1994 *
10 *****************************************************************/
12 #ifdef PAR /* whole file */
14 #define NON_POSIX_SOURCE /* so says Solaris */
20 \section{GUM Message Sending and Unpacking Functions}
23 @SendFetch@ packs the two global addresses and a load into a message +
28 sendFetch(rga, lga, load)
29 globalAddr *rga, *lga;
32 CostCentre Save_CCC = CCC;
34 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
37 ASSERT(rga->weight > 0 && lga->weight > 0);
39 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
40 rga->loc.gc.gtid, rga->loc.gc.slot, load);
42 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
43 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
44 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
50 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
55 unpackFetch(lga, rga, load)
56 globalAddr *lga, *rga;
63 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
64 lga->loc.gc.slot = (int) buf[1];
66 rga->weight = (unsigned) buf[2];
67 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
68 rga->loc.gc.slot = (int) buf[4];
72 ASSERT(rga->weight > 0);
76 @SendResume@ packs the remote blocking queue's GA and data into a message
81 sendResume(rga, nelem, data)
86 CostCentre Save_CCC = CCC;
88 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
93 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
94 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
97 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
98 (W_) rga->weight, (W_) rga->loc.gc.slot);
104 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
112 switch (INFO_TYPE(INFO_PTR(bh))) {
114 BF_LINK(bf) = Nil_closure;
115 SET_INFO_PTR(bh, BQ_info);
116 BQ_ENTRIES(bh) = (W_) bf;
118 #ifdef GC_MUT_REQUIRED
120 * If we modify a black hole in the old generation, we have to make sure it
121 * goes on the mutables list
124 if (bh <= StorageMgrInfo.OldLim) {
125 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
126 StorageMgrInfo.OldMutables = bh;
128 MUT_LINK(bh) = MUT_NOT_LINKED;
132 BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
133 BQ_ENTRIES(bh) = (W_) bf;
136 BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
137 FMBQ_ENTRIES(bh) = (W_) bf;
139 case INFO_SPEC_RBH_TYPE:
140 BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
141 SPEC_RBH_BQ(bh) = (W_) bf;
143 case INFO_GEN_RBH_TYPE:
144 BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
145 GEN_RBH_BQ(bh) = (W_) bf;
148 fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
149 (W_) bh, INFO_PTR(bh));
155 @processFetches@ constructs and sends resume messages for every
156 @BlockedFetch@ which is ready to be awakened.
159 extern P_ PendingFetches;
170 for (bf = PendingFetches; bf != Nil_closure; bf = next) {
174 * Find the target at the end of the indirection chain, and process it in
175 * much the same fashion as the original target of the fetch. Though we
176 * hope to find graph here, we could find a black hole (of any flavor) or
179 closure = BF_NODE(bf);
180 while (IS_INDIRECTION(INFO_PTR(closure)))
181 closure = (P_) IND_CLOSURE_PTR(closure);
182 ip = (P_) INFO_PTR(closure);
184 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
185 /* Forward the Fetch to someone else */
186 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
187 rga.loc.gc.slot = (int) BF_SLOT(bf);
188 rga.weight = (unsigned) BF_WEIGHT(bf);
190 sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
191 } else if (IS_BLACK_HOLE(ip)) {
192 BF_NODE(bf) = closure;
193 blockFetch(bf, closure);
195 /* We now have some local graph to send back */
199 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
201 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
202 SAVE_Hp -= PACK_HEAP_REQUIRED;
205 closure = BF_NODE(bf);
206 graph = PackNearbyGraph(closure, &size);
207 ASSERT(graph != NULL);
209 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
210 rga.loc.gc.slot = (int) BF_SLOT(bf);
211 rga.weight = (unsigned) BF_WEIGHT(bf);
213 sendResume(&rga, size, graph);
216 PendingFetches = Nil_closure;
221 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
226 unpackResume(lga, nelem, data)
234 lga->weight = (unsigned) buf[0];
235 lga->loc.gc.gtid = mytid;
236 lga->loc.gc.slot = (int) buf[1];
238 *nelem = (int) buf[2];
239 GetArgs(data, *nelem);
243 @SendAck@ packs the global address being acknowledged, together with
244 an array of global addresses for any closures shipped and sends them.
248 sendAck(task, ngas, gagamap)
253 long buffer[PACK_BUFFER_SIZE - PACK_HDR_SIZE];
257 CostCentre Save_CCC = CCC;
259 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
262 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
263 ASSERT(gagamap[1].weight > 0);
264 p[0] = (long) gagamap->weight;
265 p[1] = (long) gagamap->loc.gc.gtid;
266 p[2] = (long) gagamap->loc.gc.slot;
268 p[3] = (long) gagamap->weight;
269 p[4] = (long) gagamap->loc.gc.gtid;
270 p[5] = (long) gagamap->loc.gc.slot;
274 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
276 SendOpN(PP_ACK, task, p - buffer, buffer);
282 @unpackAck@ unpacks an Acknowledgement message into a Global address,
283 a count of the number of global addresses following and a map of
289 unpackAck(ngas, gagamap)
296 GetArgs(&GAarraysize, 1);
298 *ngas = GAarraysize / 6;
300 while (GAarraysize > 0) {
302 gagamap->weight = (unsigned) buf[0];
303 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
304 gagamap->loc.gc.slot = (int) buf[2];
306 gagamap->weight = (unsigned) buf[3];
307 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
308 gagamap->loc.gc.slot = (int) buf[5];
309 ASSERT(gagamap->weight > 0);
316 @SendFish@ packs the global address being acknowledged, together with
317 an array of global addresses for any closures shipped and sends them.
321 sendFish(destPE, origPE, age, history, hunger)
322 GLOBAL_TASK_ID destPE, origPE;
323 int age, history, hunger;
325 CostCentre Save_CCC = CCC;
327 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
331 fprintf(stderr,"Sending Fish to %lx\n", destPE);
333 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
341 @unpackFish@ unpacks a FISH message into the global task id of the
342 originating PE and 3 data fields: the age, history and hunger of the
343 fish. The history + hunger are not currently used.
348 unpackFish(origPE, age, history, hunger)
349 GLOBAL_TASK_ID *origPE;
350 int *age, *history, *hunger;
356 *origPE = (GLOBAL_TASK_ID) buf[0];
358 *history = (int) buf[2];
359 *hunger = (int) buf[3];
363 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
368 sendFree(pe, nelem, data)
373 CostCentre Save_CCC = CCC;
375 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
379 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
381 SendOpN(PP_FREE, pe, nelem, data);
388 @unpackFree@ unpacks a FREE message into the amount of data shipped and
394 unpackFree(nelem, data)
401 *nelem = (int) buf[0];
402 GetArgs(data, *nelem);
406 @SendSchedule@ sends a closure to be evaluated in response to a Fish
407 message. The message is directed to the PE that originated the Fish
408 (origPE), and includes the packed closure (data) along with its size
414 sendSchedule(origPE, nelem, data)
415 GLOBAL_TASK_ID origPE;
420 CostCentre Save_CCC = CCC;
422 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
425 #ifdef SCHEDULE_DEBUG
427 fprintf(stderr, "Sending Schedule to %x\n", origPE);
430 SendOpN(PP_SCHEDULE, origPE, nelem, data);
436 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
437 the closure shipped, the amount of data shipped (nelem) and the data
443 unpackSchedule(nelem, data)
450 *nelem = (int) buf[0];
451 GetArgs(data, *nelem);
455 \section{Message-Processing Functions}
457 The following routines process incoming GUM messages. Often reissuing
458 messages in response.
460 @processFish@ unpacks a fish message, reissuing it if it's our own,
461 sending work if we have it or sending it onwards otherwise.
465 processFish(STG_NO_ARGS)
467 GLOBAL_TASK_ID origPE;
468 int age, history, hunger;
470 unpackFish(&origPE, &age, &history, &hunger);
472 /* Ignore our own fish if we're busy; otherwise send it out after a delay */
473 if (origPE == mytid) {
478 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
482 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
483 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
484 SAVE_Hp -= PACK_HEAP_REQUIRED;
485 /* Now go back and try again */
487 sendSchedule(origPE, size, graph);
493 /* We have no sparks to give */
494 if (age < FISH_LIFE_EXPECTANCY)
495 sendFish(choosePE(), origPE,
496 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
498 /* Send it home to die */
500 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
506 @processFetch@ either returns the requested data (if available)
507 or blocks the remote blocking queue on a black hole (if not).
511 processFetch(STG_NO_ARGS)
519 unpackFetch(&ga, &rga, &load);
521 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
522 ga.loc.gc.gtid, ga.loc.gc.slot,
523 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
526 closure = GALAlookup(&ga);
527 ip = (P_) INFO_PTR(closure);
529 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
530 /* Forward the Fetch to someone else */
531 sendFetch(FETCHME_GA(closure), &rga, load);
532 } else if (rga.loc.gc.gtid == mytid) {
533 /* Our own FETCH forwarded back around to us */
534 P_ fmbq = GALAlookup(&rga);
536 /* We may have already discovered that the fetch target is our own. */
538 CommonUp(fmbq, closure);
539 (void) addWeight(&rga);
540 } else if (IS_BLACK_HOLE(ip)) {
541 /* This includes RBH's and FMBQ's */
544 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
545 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
546 closure = GALAlookup(&ga);
547 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
549 ASSERT(GALAlookup(&rga) == NULL);
551 SET_BF_HDR(bf, BF_info, bogosity);
552 BF_NODE(bf) = closure;
553 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
554 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
555 BF_WEIGHT(bf) = (W_) rga.weight;
556 blockFetch(bf, closure);
559 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
560 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
564 /* The target of the FetchMe is some local graph */
568 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
569 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
570 SAVE_Hp -= PACK_HEAP_REQUIRED;
571 closure = GALAlookup(&ga);
572 graph = PackNearbyGraph(closure, &size);
573 ASSERT(graph != NULL);
575 sendResume(&rga, size, graph);
580 @processFree@ unpacks a FREE message and adds the weights to our GAs.
584 processFree(STG_NO_ARGS)
587 W_ freeBuffer[PACK_BUFFER_SIZE];
591 unpackFree(&nelem, freeBuffer);
593 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
595 ga.loc.gc.gtid = mytid;
596 for (i = 0; i < nelem;) {
597 ga.weight = (unsigned) freeBuffer[i++];
598 ga.loc.gc.slot = (int) freeBuffer[i++];
600 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
601 ga.loc.gc.slot, ga.weight);
603 (void) addWeight(&ga);
608 @processResume@ unpacks a RESUME message into the graph, filling in
609 the LA -> GA, and GA -> LA tables. Threads blocked on the original
610 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
611 is converted into an indirection. Finally it sends an ACK in response
612 which contains any newly allocated GAs.
617 processResume(sender)
618 GLOBAL_TASK_ID sender;
621 W_ packBuffer[PACK_BUFFER_SIZE], nGAs;
627 unpackResume(&lga, &nelem, packBuffer);
630 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
631 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
632 PrintPacket(packBuffer);
636 * We always unpack the incoming graph, even if we've received the
637 * requested node in some other data packet (and already awakened the
640 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
641 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
642 SAVE_Hp -= packBuffer[0];
645 /* Do this *after* GC; we don't want to release the object early! */
648 (void) addWeight(&lga);
650 old = GALAlookup(&lga);
655 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
656 for(tso = (P_) FMBQ_ENTRIES(old);
657 TSO_LINK(tso) != Nil_closure;
661 DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender));
664 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
665 ASSERT(newGraph != NULL);
668 * Sometimes, unpacking will common up the resumee with the incoming graph,
669 * but if it hasn't, we'd better do so now.
672 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
673 CommonUp(old, newGraph);
676 DebugPrintGAGAMap(gagamap, nGAs);
679 sendAck(sender, nGAs, gagamap);
683 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
684 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
685 the local spark queue. Finally it sends an ACK in response
686 which contains any newly allocated GAs.
690 processSchedule(sender)
691 GLOBAL_TASK_ID sender;
696 W_ packBuffer[PACK_BUFFER_SIZE], nGAs;
700 unpackSchedule(&nelem, packBuffer);
702 #ifdef SCHEDULE_DEBUG
703 fprintf(stderr, "Rcvd Schedule\n");
704 PrintPacket(packBuffer);
708 * For now, the graph is a closure to be sparked as an advisory spark, but in
709 * future it may be a complete spark with required/advisory status, priority
713 space_required = packBuffer[0];
714 if (SAVE_Hp + space_required >= SAVE_HpLim) {
715 ReallyPerformThreadGC(space_required, rtsFalse);
716 SAVE_Hp -= space_required;
718 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
719 ASSERT(newGraph != NULL);
720 success = Spark(newGraph, rtsFalse);
723 #ifdef SCHEDULE_DEBUG
724 DebugPrintGAGAMap(gagamap, nGAs);
728 sendAck(sender, nGAs, gagamap);
734 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
735 (which represent shared thunks that have been shipped) into fetch-mes
740 processAck(STG_NO_ARGS)
745 globalAddr gagamap[MAX_GAS * 2];
747 unpackAck(&nGAs, gagamap);
750 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
751 DebugPrintGAGAMap(gagamap, nGAs);
755 * For each (oldGA, newGA) pair, set the GA of the corresponding thunk to the
756 * newGA, convert the thunk to a FetchMe, and return the weight from the oldGA.
758 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
759 P_ old = GALAlookup(gaga);
760 P_ new = GALAlookup(gaga + 1);
763 /* We don't have this closure, so we make a fetchme for it */
764 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
766 convertToFetchMe(old, ga);
769 * Oops...we've got this one already; update the RBH to point to
770 * the object we already know about, whatever it happens to be.
775 * Increase the weight of the object by the amount just received
776 * in the second part of the ACK pair.
778 (void) addWeight(gaga + 1);
780 (void) addWeight(gaga);
785 \section{GUM Message Processor}
787 @processMessages@ processes any messages that have arrived, calling
788 appropriate routines depending on the message tag
789 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
790 present and performs a blocking receive! During profiling it
791 busy-waits in order to record idle time.
795 processMessages(STG_NO_ARGS)
801 /* Temporary Test Definitions */
805 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
809 CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
812 while (!PacketsWaiting())
815 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
818 packet = GetPacket(); /* Get next message; block until one available */
821 get_opcode_and_sender(packet, &opcode, &task);
826 EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
851 processSchedule(task);
855 /* Anything we're not prepared to deal with. */
856 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
857 mytid, opcode, task);
862 } while (PacketsWaiting()); /* While there are messages: process them */
864 } /* processMessages */
867 \section{Exception Handlers}
870 @Comms_Harness_Exception@ is an exception handler that tests out the different
875 Comms_Harness_Exception(packet)
880 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
881 OPCODE opcode = Opcode(packet);
884 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
889 IAmMainThread = rtsTrue; /* This processor is the IO task */
890 /* fprintf(stderr,"I am Main Thread\n"); */
900 get_opcode_and_sender(packet,&opcode,&task);
901 fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
902 unpackFetch(&ga,&bqga,&load);
903 fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
904 ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight,
905 bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
906 /*Send Resume in Response*/
907 for (i=0; i <= 10; ++i) data[i] = i;
908 sendResume(&bqga,11,data);
915 globalAddr gagamap[MAX_GAS*2];
917 get_opcode_and_sender(packet,&opcode,&task);
918 fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
919 unpackAck(&nGAs,gagamap);
921 DebugPrintGAGAMap(gagamap,nGAs);
928 GLOBAL_TASK_ID origPE;
929 int age, history, hunger;
933 get_opcode_and_sender(packet,&opcode,&task);
934 fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
935 unpackFish(&origPE, &age, &history, &hunger);
936 fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
937 origPE, age, history, hunger);
939 testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
940 for (i=0; i <= 5; ++i) testData[i] = 40+i;
941 sendSchedule(origPE,6,testData);
946 { /* Test variables */
950 get_opcode_and_sender(packet,&opcode,&task);
951 fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
952 unpackSchedule(&nelem, &testData);
953 fprintf(stderr,"In PE, nelem = %d \n", nelem);
954 for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
955 fprintf(stderr,"\n");
959 /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded
960 during termination -- this helps prevent bizarre race conditions.
963 if (!GlobalStopPending)
965 GLOBAL_TASK_ID ErrorTask;
968 get_opcode_and_sender(packet,&opcode,&ErrorTask);
969 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
970 mytid, opcode, ErrorTask );
980 @STG_Exception@ handles real communication exceptions
984 STG_Exception(packet)
987 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
988 OPCODE opcode = Opcode(packet);
990 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
995 IAmMainThread = rtsTrue; /* This processor is the IO task */
996 /* fprintf(stderr,"I am Main Thread\n"); */
1003 /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded
1004 during termination -- this helps prevent bizarre race conditions.
1007 if (!GlobalStopPending)
1009 GLOBAL_TASK_ID ErrorTask;
1012 get_opcode_and_sender(packet,&opcode,&ErrorTask);
1013 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1014 mytid, opcode, ErrorTask );
1022 \section{Miscellaneous Functions}
1024 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1025 Important properties:
1026 o it varies during execution, even if the PE is idle
1027 o it's different for each PE
1028 o we never send a fish to ourselves
1031 extern long lrand48 (STG_NO_ARGS);
1034 choosePE(STG_NO_ARGS)
1038 temp = lrand48() % nPEs;
1039 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
1040 temp = (temp + 1) % nPEs;
1046 @WaitForTermination@ enters an infinite loop waiting for the
1047 termination sequence to be completed.
1051 WaitForTermination(STG_NO_ARGS)
1054 PACKET p = GetPacket();
1063 DebugPrintGAGAMap(gagamap, nGAs)
1064 globalAddr *gagamap;
1069 for (i = 0; i < nGAs; ++i, gagamap += 2)
1070 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1071 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1072 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1079 static PP_ freeMsgBuffer = NULL;
1080 static int *freeMsgIndex = NULL;
1083 prepareFreeMsgBuffers(STG_NO_ARGS)
1087 /* Allocate the freeMsg buffers just once and then hang onto them. */
1089 if (freeMsgIndex == NULL) {
1090 freeMsgIndex = (int *) malloc(nPEs * sizeof(int));
1091 freeMsgBuffer = (PP_) malloc(nPEs * sizeof(long *));
1092 if (freeMsgIndex == NULL || freeMsgBuffer == NULL) {
1094 fprintf(stderr, "VM exhausted\n");
1097 for(i = 0; i < nPEs; i++) {
1099 (freeMsgBuffer[i] = (P_) malloc(PACK_BUFFER_SIZE * sizeof(W_))) == NULL) {
1101 fprintf(stderr, "VM exhausted\n");
1107 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1108 for (i = 0; i < nPEs; i++)
1109 freeMsgIndex[i] = 0;
1113 freeRemoteGA(pe, ga)
1119 ASSERT(GALAlookup(ga) == NULL);
1121 if ((i = freeMsgIndex[pe]) + 2 >= PACK_BUFFER_SIZE) {
1123 fprintf(stderr, "Filled a free message buffer\n");
1125 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1128 freeMsgBuffer[pe][i++] = (W_) ga->weight;
1129 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1130 freeMsgIndex[pe] = i;
1132 ga->weight = 0x0f0f0f0f;
1133 ga->loc.gc.gtid = 0x666;
1134 ga->loc.gc.slot = 0xdeaddead;
1139 sendFreeMessages(STG_NO_ARGS)
1143 for (i = 0; i < nPEs; i++) {
1144 if (freeMsgIndex[i] > 0)
1145 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1149 #endif /* PAR -- whole file */