1 /****************************************************************
3 * High Level Communications Routines (HLComms.lc) *
5 * Contains the high-level routines (i.e. communication *
6 * subsystem independent) used by GUM *
7 * (c) The Parade/AQUA Projects, Glasgow University, 1995 *
8 * Phil Trinder, Glasgow University, 12 December 1994 *
10 *****************************************************************/
12 #ifdef PAR /* whole file */
14 #define NON_POSIX_SOURCE /* so says Solaris */
20 \section{GUM Message Sending and Unpacking Functions}
23 @SendFetch@ packs the two global addresses and a load into a message +
27 static W_ *gumPackBuffer;
30 InitMoreBuffers(STG_NO_ARGS)
33 = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
37 sendFetch(rga, lga, load)
38 globalAddr *rga, *lga;
41 CostCentre Save_CCC = CCC;
43 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
46 ASSERT(rga->weight > 0 && lga->weight > 0);
48 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
49 rga->loc.gc.gtid, rga->loc.gc.slot, load);
51 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
52 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
53 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
59 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
64 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
70 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
71 lga->loc.gc.slot = (int) buf[1];
73 rga->weight = (unsigned) buf[2];
74 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
75 rga->loc.gc.slot = (int) buf[4];
79 ASSERT(rga->weight > 0);
83 @SendResume@ packs the remote blocking queue's GA and data into a message
88 sendResume(rga, nelem, data)
93 CostCentre Save_CCC = CCC;
95 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
100 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
101 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
104 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105 (W_) rga->weight, (W_) rga->loc.gc.slot);
111 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
115 blockFetch(P_ bf, P_ bh)
117 switch (INFO_TYPE(INFO_PTR(bh))) {
119 BF_LINK(bf) = Nil_closure;
120 SET_INFO_PTR(bh, BQ_info);
121 BQ_ENTRIES(bh) = (W_) bf;
123 #ifdef GC_MUT_REQUIRED
125 * If we modify a black hole in the old generation, we have to
126 * make sure it goes on the mutables list
129 if (bh <= StorageMgrInfo.OldLim) {
130 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
131 StorageMgrInfo.OldMutables = bh;
133 MUT_LINK(bh) = MUT_NOT_LINKED;
137 BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
138 BQ_ENTRIES(bh) = (W_) bf;
141 BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
142 FMBQ_ENTRIES(bh) = (W_) bf;
144 case INFO_SPEC_RBH_TYPE:
145 BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
146 SPEC_RBH_BQ(bh) = (W_) bf;
148 case INFO_GEN_RBH_TYPE:
149 BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
150 GEN_RBH_BQ(bh) = (W_) bf;
153 fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
154 (W_) bh, INFO_PTR(bh));
160 @processFetches@ constructs and sends resume messages for every
161 @BlockedFetch@ which is ready to be awakened.
164 extern P_ PendingFetches;
175 for (bf = PendingFetches; bf != Nil_closure; bf = next) {
179 * Find the target at the end of the indirection chain, and
180 * process it in much the same fashion as the original target
181 * of the fetch. Though we hope to find graph here, we could
182 * find a black hole (of any flavor) or even a FetchMe.
184 closure = BF_NODE(bf);
185 while (IS_INDIRECTION(INFO_PTR(closure)))
186 closure = (P_) IND_CLOSURE_PTR(closure);
187 ip = (P_) INFO_PTR(closure);
189 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
190 /* Forward the Fetch to someone else */
191 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
192 rga.loc.gc.slot = (int) BF_SLOT(bf);
193 rga.weight = (unsigned) BF_WEIGHT(bf);
195 sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
196 } else if (IS_BLACK_HOLE(ip)) {
197 BF_NODE(bf) = closure;
198 blockFetch(bf, closure);
200 /* We now have some local graph to send back */
204 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
206 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
207 SAVE_Hp -= PACK_HEAP_REQUIRED;
210 closure = BF_NODE(bf);
211 graph = PackNearbyGraph(closure, &size);
212 ASSERT(graph != NULL);
214 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
215 rga.loc.gc.slot = (int) BF_SLOT(bf);
216 rga.weight = (unsigned) BF_WEIGHT(bf);
218 sendResume(&rga, size, graph);
221 PendingFetches = Nil_closure;
226 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
231 unpackResume(globalAddr *lga, int *nelem, W_ *data)
236 lga->weight = (unsigned) buf[0];
237 lga->loc.gc.gtid = mytid;
238 lga->loc.gc.slot = (int) buf[1];
240 *nelem = (int) buf[2];
241 GetArgs(data, *nelem);
245 @SendAck@ packs the global address being acknowledged, together with
246 an array of global addresses for any closures shipped and sends them.
250 sendAck(task, ngas, gagamap)
258 CostCentre Save_CCC = CCC;
260 buffer = (long *) gumPackBuffer;
262 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
265 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
266 ASSERT(gagamap[1].weight > 0);
267 p[0] = (long) gagamap->weight;
268 p[1] = (long) gagamap->loc.gc.gtid;
269 p[2] = (long) gagamap->loc.gc.slot;
271 p[3] = (long) gagamap->weight;
272 p[4] = (long) gagamap->loc.gc.gtid;
273 p[5] = (long) gagamap->loc.gc.slot;
277 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
279 SendOpN(PP_ACK, task, p - buffer, buffer);
285 @unpackAck@ unpacks an Acknowledgement message into a Global address,
286 a count of the number of global addresses following and a map of
292 unpackAck(int *ngas, globalAddr *gagamap)
297 GetArgs(&GAarraysize, 1);
299 *ngas = GAarraysize / 6;
301 while (GAarraysize > 0) {
303 gagamap->weight = (unsigned) buf[0];
304 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
305 gagamap->loc.gc.slot = (int) buf[2];
307 gagamap->weight = (unsigned) buf[3];
308 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
309 gagamap->loc.gc.slot = (int) buf[5];
310 ASSERT(gagamap->weight > 0);
317 @SendFish@ packs the global address being acknowledged, together with
318 an array of global addresses for any closures shipped and sends them.
322 sendFish(destPE, origPE, age, history, hunger)
323 GLOBAL_TASK_ID destPE, origPE;
324 int age, history, hunger;
326 CostCentre Save_CCC = CCC;
328 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
332 fprintf(stderr,"Sending Fish to %lx\n", destPE);
334 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
342 @unpackFish@ unpacks a FISH message into the global task id of the
343 originating PE and 3 data fields: the age, history and hunger of the
344 fish. The history + hunger are not currently used.
349 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
355 *origPE = (GLOBAL_TASK_ID) buf[0];
357 *history = (int) buf[2];
358 *hunger = (int) buf[3];
362 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
367 sendFree(pe, nelem, data)
372 CostCentre Save_CCC = CCC;
374 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
378 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
380 SendOpN(PP_FREE, pe, nelem, data);
387 @unpackFree@ unpacks a FREE message into the amount of data shipped and
393 unpackFree(int *nelem, W_ *data)
398 *nelem = (int) buf[0];
399 GetArgs(data, *nelem);
403 @SendSchedule@ sends a closure to be evaluated in response to a Fish
404 message. The message is directed to the PE that originated the Fish
405 (origPE), and includes the packed closure (data) along with its size
411 sendSchedule(origPE, nelem, data)
412 GLOBAL_TASK_ID origPE;
417 CostCentre Save_CCC = CCC;
419 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
422 #ifdef SCHEDULE_DEBUG
424 fprintf(stderr, "Sending Schedule to %x\n", origPE);
427 SendOpN(PP_SCHEDULE, origPE, nelem, data);
433 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
434 the closure shipped, the amount of data shipped (nelem) and the data
440 unpackSchedule(int *nelem, W_ *data)
445 *nelem = (int) buf[0];
446 GetArgs(data, *nelem);
450 \section{Message-Processing Functions}
452 The following routines process incoming GUM messages. Often reissuing
453 messages in response.
455 @processFish@ unpacks a fish message, reissuing it if it's our own,
456 sending work if we have it or sending it onwards otherwise.
460 processFish(STG_NO_ARGS)
462 GLOBAL_TASK_ID origPE;
463 int age, history, hunger;
465 unpackFish(&origPE, &age, &history, &hunger);
467 if (origPE == mytid) {
472 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
476 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
477 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
478 SAVE_Hp -= PACK_HEAP_REQUIRED;
479 /* Now go back and try again */
481 sendSchedule(origPE, size, graph);
487 /* We have no sparks to give */
488 if (age < FISH_LIFE_EXPECTANCY)
489 sendFish(choosePE(), origPE,
490 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
492 /* Send it home to die */
494 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
500 @processFetch@ either returns the requested data (if available)
501 or blocks the remote blocking queue on a black hole (if not).
505 processFetch(STG_NO_ARGS)
513 unpackFetch(&ga, &rga, &load);
515 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
516 ga.loc.gc.gtid, ga.loc.gc.slot,
517 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
520 closure = GALAlookup(&ga);
521 ip = (P_) INFO_PTR(closure);
523 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
524 /* Forward the Fetch to someone else */
525 sendFetch(FETCHME_GA(closure), &rga, load);
526 } else if (rga.loc.gc.gtid == mytid) {
527 /* Our own FETCH forwarded back around to us */
528 P_ fmbq = GALAlookup(&rga);
530 /* We may have already discovered that the fetch target is our own. */
532 CommonUp(fmbq, closure);
533 (void) addWeight(&rga);
534 } else if (IS_BLACK_HOLE(ip)) {
535 /* This includes RBH's and FMBQ's */
538 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
539 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
540 closure = GALAlookup(&ga);
541 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
543 ASSERT(GALAlookup(&rga) == NULL);
545 SET_BF_HDR(bf, BF_info, bogosity);
546 BF_NODE(bf) = closure;
547 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
548 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
549 BF_WEIGHT(bf) = (W_) rga.weight;
550 blockFetch(bf, closure);
553 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
554 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
558 /* The target of the FetchMe is some local graph */
562 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
563 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
564 SAVE_Hp -= PACK_HEAP_REQUIRED;
565 closure = GALAlookup(&ga);
566 graph = PackNearbyGraph(closure, &size);
567 ASSERT(graph != NULL);
569 sendResume(&rga, size, graph);
574 @processFree@ unpacks a FREE message and adds the weights to our GAs.
578 processFree(STG_NO_ARGS)
581 static W_ *freeBuffer;
585 freeBuffer = gumPackBuffer;
586 unpackFree(&nelem, freeBuffer);
588 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
590 ga.loc.gc.gtid = mytid;
591 for (i = 0; i < nelem;) {
592 ga.weight = (unsigned) freeBuffer[i++];
593 ga.loc.gc.slot = (int) freeBuffer[i++];
595 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
596 ga.loc.gc.slot, ga.weight);
598 (void) addWeight(&ga);
603 @processResume@ unpacks a RESUME message into the graph, filling in
604 the LA -> GA, and GA -> LA tables. Threads blocked on the original
605 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
606 is converted into an indirection. Finally it sends an ACK in response
607 which contains any newly allocated GAs.
612 processResume(GLOBAL_TASK_ID sender)
616 static W_ *packBuffer;
622 packBuffer = gumPackBuffer;
623 unpackResume(&lga, &nelem, packBuffer);
626 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
627 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
628 PrintPacket(packBuffer);
632 * We always unpack the incoming graph, even if we've received the
633 * requested node in some other data packet (and already awakened
634 * the blocking queue).
636 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
637 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
638 SAVE_Hp -= packBuffer[0];
641 /* Do this *after* GC; we don't want to release the object early! */
644 (void) addWeight(&lga);
646 old = GALAlookup(&lga);
648 if (RTSflags.ParFlags.granSimStats) {
651 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
652 for(tso = (P_) FMBQ_ENTRIES(old);
653 TSO_LINK(tso) != Nil_closure;
657 DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender));
660 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
661 ASSERT(newGraph != NULL);
664 * Sometimes, unpacking will common up the resumee with the
665 * incoming graph, but if it hasn't, we'd better do so now.
668 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
669 CommonUp(old, newGraph);
672 DebugPrintGAGAMap(gagamap, nGAs);
675 sendAck(sender, nGAs, gagamap);
679 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
680 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
681 the local spark queue. Finally it sends an ACK in response
682 which contains any newly allocated GAs.
686 processSchedule(GLOBAL_TASK_ID sender)
691 static W_ *packBuffer;
696 packBuffer = gumPackBuffer; /* HWL */
697 unpackSchedule(&nelem, packBuffer);
699 #ifdef SCHEDULE_DEBUG
700 fprintf(stderr, "Rcvd Schedule\n");
701 PrintPacket(packBuffer);
705 * For now, the graph is a closure to be sparked as an advisory
706 * spark, but in future it may be a complete spark with
707 * required/advisory status, priority etc.
710 space_required = packBuffer[0];
711 if (SAVE_Hp + space_required >= SAVE_HpLim) {
712 ReallyPerformThreadGC(space_required, rtsFalse);
713 SAVE_Hp -= space_required;
715 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
716 ASSERT(newGraph != NULL);
717 success = Spark(newGraph, rtsFalse);
720 #ifdef SCHEDULE_DEBUG
721 DebugPrintGAGAMap(gagamap, nGAs);
725 sendAck(sender, nGAs, gagamap);
731 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
732 (which represent shared thunks that have been shipped) into fetch-mes
737 processAck(STG_NO_ARGS)
742 globalAddr gagamap[MAX_GAS * 2];
744 unpackAck(&nGAs, gagamap);
747 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
748 DebugPrintGAGAMap(gagamap, nGAs);
752 * For each (oldGA, newGA) pair, set the GA of the corresponding
753 * thunk to the newGA, convert the thunk to a FetchMe, and return
754 * the weight from the oldGA.
756 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
757 P_ old = GALAlookup(gaga);
758 P_ new = GALAlookup(gaga + 1);
761 /* We don't have this closure, so we make a fetchme for it */
762 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
764 convertToFetchMe(old, ga);
767 * Oops...we've got this one already; update the RBH to
768 * point to the object we already know about, whatever it
774 * Increase the weight of the object by the amount just
775 * received in the second part of the ACK pair.
777 (void) addWeight(gaga + 1);
779 (void) addWeight(gaga);
784 \section{GUM Message Processor}
786 @processMessages@ processes any messages that have arrived, calling
787 appropriate routines depending on the message tag
788 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
789 present and performs a blocking receive! During profiling it
790 busy-waits in order to record idle time.
794 processMessages(STG_NO_ARGS)
800 /* Temporary Test Definitions */
804 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
807 if (RTSflags.CcFlags.doCostCentres) {
808 CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
811 while (!PacketsWaiting())
814 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
817 packet = GetPacket(); /* Get next message; block until one available */
820 get_opcode_and_sender(packet, &opcode, &task);
825 EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
850 processSchedule(task);
854 /* Anything we're not prepared to deal with. */
855 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
856 mytid, opcode, task);
861 } while (PacketsWaiting()); /* While there are messages: process them */
863 } /* processMessages */
866 \section{Exception Handlers}
869 @Comms_Harness_Exception@ is an exception handler that tests out the different
874 Comms_Harness_Exception(packet)
879 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
880 OPCODE opcode = Opcode(packet);
883 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
894 get_opcode_and_sender(packet,&opcode,&task);
895 fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
896 unpackFetch(&ga,&bqga,&load);
897 fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
898 ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight,
899 bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
900 /*Send Resume in Response*/
901 for (i=0; i <= 10; ++i) data[i] = i;
902 sendResume(&bqga,11,data);
909 globalAddr gagamap[MAX_GAS*2];
911 get_opcode_and_sender(packet,&opcode,&task);
912 fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
913 unpackAck(&nGAs,gagamap);
915 DebugPrintGAGAMap(gagamap,nGAs);
922 GLOBAL_TASK_ID origPE;
923 int age, history, hunger;
927 get_opcode_and_sender(packet,&opcode,&task);
928 fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
929 unpackFish(&origPE, &age, &history, &hunger);
930 fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
931 origPE, age, history, hunger);
933 testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
934 for (i=0; i <= 5; ++i) testData[i] = 40+i;
935 sendSchedule(origPE,6,testData);
940 { /* Test variables */
944 get_opcode_and_sender(packet,&opcode,&task);
945 fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
946 unpackSchedule(&nelem, &testData);
947 fprintf(stderr,"In PE, nelem = %d \n", nelem);
948 for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
949 fprintf(stderr,"\n");
953 /* Anything we're not prepared to deal with. Note that ALL
954 * opcodes are discarded during termination -- this helps
955 * prevent bizarre race conditions.
958 if (!GlobalStopPending)
960 GLOBAL_TASK_ID ErrorTask;
963 get_opcode_and_sender(packet,&opcode,&ErrorTask);
964 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
965 mytid, opcode, ErrorTask );
975 @STG_Exception@ handles real communication exceptions
979 STG_Exception(packet)
982 GLOBAL_TASK_ID sender = Sender_Task(packet);
983 OPCODE opcode = Opcode(packet);
985 fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender);
993 /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded
994 during termination -- this helps prevent bizarre race conditions.
997 if (!GlobalStopPending)
999 GLOBAL_TASK_ID ErrorTask;
1002 get_opcode_and_sender(packet,&opcode,&ErrorTask);
1003 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1004 mytid, opcode, ErrorTask );
1012 \section{Miscellaneous Functions}
1014 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1015 Important properties:
1016 o it varies during execution, even if the PE is idle
1017 o it's different for each PE
1018 o we never send a fish to ourselves
1021 extern long lrand48 (STG_NO_ARGS);
1024 choosePE(STG_NO_ARGS)
1028 temp = lrand48() % nPEs;
1029 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
1030 temp = (temp + 1) % nPEs;
1036 @WaitForTermination@ enters an infinite loop waiting for the
1037 termination sequence to be completed.
1041 WaitForTermination(STG_NO_ARGS)
1044 PACKET p = GetPacket();
1053 DebugPrintGAGAMap(gagamap, nGAs)
1054 globalAddr *gagamap;
1059 for (i = 0; i < nGAs; ++i, gagamap += 2)
1060 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1061 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1062 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1069 static PP_ freeMsgBuffer = NULL;
1070 static int *freeMsgIndex = NULL;
1073 prepareFreeMsgBuffers(STG_NO_ARGS)
1077 /* Allocate the freeMsg buffers just once and then hang onto them. */
1079 if (freeMsgIndex == NULL) {
1081 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1082 freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1084 for(i = 0; i < nPEs; i++) {
1086 freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1087 "prepareFreeMsgBuffers (Buffer #i)");
1092 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1093 for (i = 0; i < nPEs; i++)
1094 freeMsgIndex[i] = 0;
1098 freeRemoteGA(pe, ga)
1104 ASSERT(GALAlookup(ga) == NULL);
1106 if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1108 fprintf(stderr, "Filled a free message buffer\n");
1110 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1113 freeMsgBuffer[pe][i++] = (W_) ga->weight;
1114 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1115 freeMsgIndex[pe] = i;
1117 ga->weight = 0x0f0f0f0f;
1118 ga->loc.gc.gtid = 0x666;
1119 ga->loc.gc.slot = 0xdeaddead;
1124 sendFreeMessages(STG_NO_ARGS)
1128 for (i = 0; i < nPEs; i++) {
1129 if (freeMsgIndex[i] > 0)
1130 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1134 #endif /* PAR -- whole file */