1 /****************************************************************
3 * High Level Communications Routines (HLComms.lc) *
5 * Contains the high-level routines (i.e. communication *
6 * subsystem independent) used by GUM *
7 * (c) The Parade/AQUA Projects, Glasgow University, 1995 *
8 * Phil Trinder, Glasgow University, 12 December 1994 *
10 *****************************************************************/
12 #ifdef PAR /* whole file */
14 #define NON_POSIX_SOURCE /* so says Solaris */
20 \section{GUM Message Sending and Unpacking Functions}
23 @SendFetch@ packs the two global addresses and a load into a message +
27 static W_ *gumPackBuffer;
30 InitMoreBuffers(STG_NO_ARGS)
33 = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
37 sendFetch(rga, lga, load)
38 globalAddr *rga, *lga;
41 CostCentre Save_CCC = CCC;
43 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
46 ASSERT(rga->weight > 0 && lga->weight > 0);
48 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
49 rga->loc.gc.gtid, rga->loc.gc.slot, load);
51 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
52 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
53 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
59 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
64 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
70 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
71 lga->loc.gc.slot = (int) buf[1];
73 rga->weight = (unsigned) buf[2];
74 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
75 rga->loc.gc.slot = (int) buf[4];
79 ASSERT(rga->weight > 0);
83 @SendResume@ packs the remote blocking queue's GA and data into a message
88 sendResume(rga, nelem, data)
93 CostCentre Save_CCC = CCC;
95 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
100 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
101 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
104 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105 (W_) rga->weight, (W_) rga->loc.gc.slot);
111 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
115 blockFetch(P_ bf, P_ bh)
117 switch (INFO_TYPE(INFO_PTR(bh))) {
119 BF_LINK(bf) = Prelude_Z91Z93_closure;
120 SET_INFO_PTR(bh, BQ_info);
121 BQ_ENTRIES(bh) = (W_) bf;
123 #ifdef GC_MUT_REQUIRED
125 * If we modify a black hole in the old generation, we have to
126 * make sure it goes on the mutables list
129 if (bh <= StorageMgrInfo.OldLim) {
130 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
131 StorageMgrInfo.OldMutables = bh;
133 MUT_LINK(bh) = MUT_NOT_LINKED;
137 BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
138 BQ_ENTRIES(bh) = (W_) bf;
141 BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
142 FMBQ_ENTRIES(bh) = (W_) bf;
144 case INFO_SPEC_RBH_TYPE:
145 BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
146 SPEC_RBH_BQ(bh) = (W_) bf;
148 case INFO_GEN_RBH_TYPE:
149 BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
150 GEN_RBH_BQ(bh) = (W_) bf;
153 fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
154 (W_) bh, INFO_PTR(bh));
160 @processFetches@ constructs and sends resume messages for every
161 @BlockedFetch@ which is ready to be awakened.
164 extern P_ PendingFetches;
175 for (bf = PendingFetches; bf != Prelude_Z91Z93_closure; bf = next) {
179 * Find the target at the end of the indirection chain, and
180 * process it in much the same fashion as the original target
181 * of the fetch. Though we hope to find graph here, we could
182 * find a black hole (of any flavor) or even a FetchMe.
184 closure = BF_NODE(bf);
185 while (IS_INDIRECTION(INFO_PTR(closure)))
186 closure = (P_) IND_CLOSURE_PTR(closure);
187 ip = (P_) INFO_PTR(closure);
189 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
190 /* Forward the Fetch to someone else */
191 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
192 rga.loc.gc.slot = (int) BF_SLOT(bf);
193 rga.weight = (unsigned) BF_WEIGHT(bf);
195 sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
196 } else if (IS_BLACK_HOLE(ip)) {
197 BF_NODE(bf) = closure;
198 blockFetch(bf, closure);
200 /* We now have some local graph to send back */
204 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
206 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
207 SAVE_Hp -= PACK_HEAP_REQUIRED;
210 closure = BF_NODE(bf);
211 graph = PackNearbyGraph(closure, &size);
212 ASSERT(graph != NULL);
214 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
215 rga.loc.gc.slot = (int) BF_SLOT(bf);
216 rga.weight = (unsigned) BF_WEIGHT(bf);
218 sendResume(&rga, size, graph);
221 PendingFetches = Prelude_Z91Z93_closure;
226 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
231 unpackResume(globalAddr *lga, int *nelem, W_ *data)
236 lga->weight = (unsigned) buf[0];
237 lga->loc.gc.gtid = mytid;
238 lga->loc.gc.slot = (int) buf[1];
240 *nelem = (int) buf[2];
241 GetArgs(data, *nelem);
245 @SendAck@ packs the global address being acknowledged, together with
246 an array of global addresses for any closures shipped and sends them.
250 sendAck(task, ngas, gagamap)
258 CostCentre Save_CCC = CCC;
260 buffer = (long *) gumPackBuffer;
262 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
265 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
266 ASSERT(gagamap[1].weight > 0);
267 p[0] = (long) gagamap->weight;
268 p[1] = (long) gagamap->loc.gc.gtid;
269 p[2] = (long) gagamap->loc.gc.slot;
271 p[3] = (long) gagamap->weight;
272 p[4] = (long) gagamap->loc.gc.gtid;
273 p[5] = (long) gagamap->loc.gc.slot;
277 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
279 SendOpN(PP_ACK, task, p - buffer, buffer);
285 @unpackAck@ unpacks an Acknowledgement message into a Global address,
286 a count of the number of global addresses following and a map of
292 unpackAck(int *ngas, globalAddr *gagamap)
297 GetArgs(&GAarraysize, 1);
299 *ngas = GAarraysize / 6;
301 while (GAarraysize > 0) {
303 gagamap->weight = (unsigned) buf[0];
304 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
305 gagamap->loc.gc.slot = (int) buf[2];
307 gagamap->weight = (unsigned) buf[3];
308 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
309 gagamap->loc.gc.slot = (int) buf[5];
310 ASSERT(gagamap->weight > 0);
317 @SendFish@ packs the global address being acknowledged, together with
318 an array of global addresses for any closures shipped and sends them.
322 sendFish(destPE, origPE, age, history, hunger)
323 GLOBAL_TASK_ID destPE, origPE;
324 int age, history, hunger;
326 CostCentre Save_CCC = CCC;
328 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
332 fprintf(stderr,"Sending Fish to %lx\n", destPE);
334 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
342 @unpackFish@ unpacks a FISH message into the global task id of the
343 originating PE and 3 data fields: the age, history and hunger of the
344 fish. The history + hunger are not currently used.
349 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
355 *origPE = (GLOBAL_TASK_ID) buf[0];
357 *history = (int) buf[2];
358 *hunger = (int) buf[3];
362 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
367 sendFree(pe, nelem, data)
372 CostCentre Save_CCC = CCC;
374 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
378 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
380 SendOpN(PP_FREE, pe, nelem, data);
387 @unpackFree@ unpacks a FREE message into the amount of data shipped and
393 unpackFree(int *nelem, W_ *data)
398 *nelem = (int) buf[0];
399 GetArgs(data, *nelem);
403 @SendSchedule@ sends a closure to be evaluated in response to a Fish
404 message. The message is directed to the PE that originated the Fish
405 (origPE), and includes the packed closure (data) along with its size
411 sendSchedule(origPE, nelem, data)
412 GLOBAL_TASK_ID origPE;
417 CostCentre Save_CCC = CCC;
419 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
422 #ifdef SCHEDULE_DEBUG
424 fprintf(stderr, "Sending Schedule to %x\n", origPE);
427 SendOpN(PP_SCHEDULE, origPE, nelem, data);
433 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
434 the closure shipped, the amount of data shipped (nelem) and the data
440 unpackSchedule(int *nelem, W_ *data)
445 *nelem = (int) buf[0];
446 GetArgs(data, *nelem);
450 \section{Message-Processing Functions}
452 The following routines process incoming GUM messages. Often reissuing
453 messages in response.
455 @processFish@ unpacks a fish message, reissuing it if it's our own,
456 sending work if we have it or sending it onwards otherwise.
460 processFish(STG_NO_ARGS)
462 GLOBAL_TASK_ID origPE;
463 int age, history, hunger;
465 unpackFish(&origPE, &age, &history, &hunger);
467 if (origPE == mytid) {
472 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
476 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
477 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
478 SAVE_Hp -= PACK_HEAP_REQUIRED;
479 /* Now go back and try again */
481 sendSchedule(origPE, size, graph);
487 /* We have no sparks to give */
488 if (age < FISH_LIFE_EXPECTANCY)
489 sendFish(choosePE(), origPE,
490 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
492 /* Send it home to die */
494 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
500 @processFetch@ either returns the requested data (if available)
501 or blocks the remote blocking queue on a black hole (if not).
505 processFetch(STG_NO_ARGS)
513 unpackFetch(&ga, &rga, &load);
515 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
516 ga.loc.gc.gtid, ga.loc.gc.slot,
517 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
520 closure = GALAlookup(&ga);
521 ip = (P_) INFO_PTR(closure);
523 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
524 /* Forward the Fetch to someone else */
525 sendFetch(FETCHME_GA(closure), &rga, load);
526 } else if (rga.loc.gc.gtid == mytid) {
527 /* Our own FETCH forwarded back around to us */
528 P_ fmbq = GALAlookup(&rga);
530 /* We may have already discovered that the fetch target is our own. */
532 CommonUp(fmbq, closure);
533 (void) addWeight(&rga);
534 } else if (IS_BLACK_HOLE(ip)) {
535 /* This includes RBH's and FMBQ's */
538 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
539 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
540 closure = GALAlookup(&ga);
541 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
543 ASSERT(GALAlookup(&rga) == NULL);
545 SET_BF_HDR(bf, BF_info, bogosity);
546 BF_NODE(bf) = closure;
547 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
548 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
549 BF_WEIGHT(bf) = (W_) rga.weight;
550 blockFetch(bf, closure);
553 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
554 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
558 /* The target of the FetchMe is some local graph */
562 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
563 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
564 SAVE_Hp -= PACK_HEAP_REQUIRED;
565 closure = GALAlookup(&ga);
566 graph = PackNearbyGraph(closure, &size);
567 ASSERT(graph != NULL);
569 sendResume(&rga, size, graph);
574 @processFree@ unpacks a FREE message and adds the weights to our GAs.
578 processFree(STG_NO_ARGS)
581 static W_ *freeBuffer;
585 freeBuffer = gumPackBuffer;
586 unpackFree(&nelem, freeBuffer);
588 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
590 ga.loc.gc.gtid = mytid;
591 for (i = 0; i < nelem;) {
592 ga.weight = (unsigned) freeBuffer[i++];
593 ga.loc.gc.slot = (int) freeBuffer[i++];
595 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
596 ga.loc.gc.slot, ga.weight);
598 (void) addWeight(&ga);
603 @processResume@ unpacks a RESUME message into the graph, filling in
604 the LA -> GA, and GA -> LA tables. Threads blocked on the original
605 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
606 is converted into an indirection. Finally it sends an ACK in response
607 which contains any newly allocated GAs.
612 processResume(GLOBAL_TASK_ID sender)
616 static W_ *packBuffer;
622 packBuffer = gumPackBuffer;
623 unpackResume(&lga, &nelem, packBuffer);
626 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
627 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
628 PrintPacket(packBuffer);
632 * We always unpack the incoming graph, even if we've received the
633 * requested node in some other data packet (and already awakened
634 * the blocking queue).
636 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
637 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
638 SAVE_Hp -= packBuffer[0];
641 /* Do this *after* GC; we don't want to release the object early! */
644 (void) addWeight(&lga);
646 old = GALAlookup(&lga);
648 if (RTSflags.ParFlags.granSimStats) {
651 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
652 for(tso = (P_) FMBQ_ENTRIES(old);
653 TSO_LINK(tso) != Prelude_Z91Z93_closure;
657 /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
658 DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
662 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
663 ASSERT(newGraph != NULL);
666 * Sometimes, unpacking will common up the resumee with the
667 * incoming graph, but if it hasn't, we'd better do so now.
670 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
671 CommonUp(old, newGraph);
674 DebugPrintGAGAMap(gagamap, nGAs);
677 sendAck(sender, nGAs, gagamap);
681 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
682 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
683 the local spark queue. Finally it sends an ACK in response
684 which contains any newly allocated GAs.
688 processSchedule(GLOBAL_TASK_ID sender)
693 static W_ *packBuffer;
698 packBuffer = gumPackBuffer; /* HWL */
699 unpackSchedule(&nelem, packBuffer);
701 #ifdef SCHEDULE_DEBUG
702 fprintf(stderr, "Rcvd Schedule\n");
703 PrintPacket(packBuffer);
707 * For now, the graph is a closure to be sparked as an advisory
708 * spark, but in future it may be a complete spark with
709 * required/advisory status, priority etc.
712 space_required = packBuffer[0];
713 if (SAVE_Hp + space_required >= SAVE_HpLim) {
714 ReallyPerformThreadGC(space_required, rtsFalse);
715 SAVE_Hp -= space_required;
717 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
718 ASSERT(newGraph != NULL);
719 success = Spark(newGraph, rtsFalse);
722 #ifdef SCHEDULE_DEBUG
723 DebugPrintGAGAMap(gagamap, nGAs);
727 sendAck(sender, nGAs, gagamap);
733 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
734 (which represent shared thunks that have been shipped) into fetch-mes
739 processAck(STG_NO_ARGS)
744 globalAddr gagamap[MAX_GAS * 2];
746 unpackAck(&nGAs, gagamap);
749 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
750 DebugPrintGAGAMap(gagamap, nGAs);
754 * For each (oldGA, newGA) pair, set the GA of the corresponding
755 * thunk to the newGA, convert the thunk to a FetchMe, and return
756 * the weight from the oldGA.
758 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
759 P_ old = GALAlookup(gaga);
760 P_ new = GALAlookup(gaga + 1);
763 /* We don't have this closure, so we make a fetchme for it */
764 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
766 convertToFetchMe(old, ga);
769 * Oops...we've got this one already; update the RBH to
770 * point to the object we already know about, whatever it
776 * Increase the weight of the object by the amount just
777 * received in the second part of the ACK pair.
779 (void) addWeight(gaga + 1);
781 (void) addWeight(gaga);
786 \section{GUM Message Processor}
788 @processMessages@ processes any messages that have arrived, calling
789 appropriate routines depending on the message tag
790 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
791 present and performs a blocking receive! During profiling it
792 busy-waits in order to record idle time.
796 processMessages(STG_NO_ARGS)
802 /* Temporary Test Definitions */
806 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
809 if (RTSflags.CcFlags.doCostCentres) {
810 CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
813 while (!PacketsWaiting())
816 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
819 packet = GetPacket(); /* Get next message; block until one available */
822 get_opcode_and_sender(packet, &opcode, &task);
827 EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
852 processSchedule(task);
856 /* Anything we're not prepared to deal with. */
857 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
858 mytid, opcode, task);
863 } while (PacketsWaiting()); /* While there are messages: process them */
865 } /* processMessages */
868 \section{Exception Handlers}
871 @Comms_Harness_Exception@ is an exception handler that tests out the different
876 Comms_Harness_Exception(packet)
881 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
882 OPCODE opcode = Opcode(packet);
885 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
896 get_opcode_and_sender(packet,&opcode,&task);
897 fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
898 unpackFetch(&ga,&bqga,&load);
899 fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
900 ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight,
901 bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
902 /*Send Resume in Response*/
903 for (i=0; i <= 10; ++i) data[i] = i;
904 sendResume(&bqga,11,data);
911 globalAddr gagamap[MAX_GAS*2];
913 get_opcode_and_sender(packet,&opcode,&task);
914 fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
915 unpackAck(&nGAs,gagamap);
917 DebugPrintGAGAMap(gagamap,nGAs);
924 GLOBAL_TASK_ID origPE;
925 int age, history, hunger;
929 get_opcode_and_sender(packet,&opcode,&task);
930 fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
931 unpackFish(&origPE, &age, &history, &hunger);
932 fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
933 origPE, age, history, hunger);
935 testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
936 for (i=0; i <= 5; ++i) testData[i] = 40+i;
937 sendSchedule(origPE,6,testData);
942 { /* Test variables */
946 get_opcode_and_sender(packet,&opcode,&task);
947 fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
948 unpackSchedule(&nelem, &testData);
949 fprintf(stderr,"In PE, nelem = %d \n", nelem);
950 for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
951 fprintf(stderr,"\n");
955 /* Anything we're not prepared to deal with. Note that ALL
956 * opcodes are discarded during termination -- this helps
957 * prevent bizarre race conditions.
960 if (!GlobalStopPending)
962 GLOBAL_TASK_ID ErrorTask;
965 get_opcode_and_sender(packet,&opcode,&ErrorTask);
966 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
967 mytid, opcode, ErrorTask );
977 @STG_Exception@ handles real communication exceptions
981 STG_Exception(packet)
984 GLOBAL_TASK_ID sender = Sender_Task(packet);
985 OPCODE opcode = Opcode(packet);
987 fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender);
995 /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded
996 during termination -- this helps prevent bizarre race conditions.
999 if (!GlobalStopPending)
1001 GLOBAL_TASK_ID ErrorTask;
1004 get_opcode_and_sender(packet,&opcode,&ErrorTask);
1005 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1006 mytid, opcode, ErrorTask );
1014 \section{Miscellaneous Functions}
1016 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1017 Important properties:
1018 o it varies during execution, even if the PE is idle
1019 o it's different for each PE
1020 o we never send a fish to ourselves
1023 extern long lrand48 (STG_NO_ARGS);
1026 choosePE(STG_NO_ARGS)
1030 temp = lrand48() % nPEs;
1031 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
1032 temp = (temp + 1) % nPEs;
1038 @WaitForTermination@ enters an infinite loop waiting for the
1039 termination sequence to be completed.
1043 WaitForTermination(STG_NO_ARGS)
1046 PACKET p = GetPacket();
1055 DebugPrintGAGAMap(gagamap, nGAs)
1056 globalAddr *gagamap;
1061 for (i = 0; i < nGAs; ++i, gagamap += 2)
1062 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1063 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1064 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1071 static PP_ freeMsgBuffer = NULL;
1072 static int *freeMsgIndex = NULL;
1075 prepareFreeMsgBuffers(STG_NO_ARGS)
1079 /* Allocate the freeMsg buffers just once and then hang onto them. */
1081 if (freeMsgIndex == NULL) {
1083 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1084 freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1086 for(i = 0; i < nPEs; i++) {
1088 freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1089 "prepareFreeMsgBuffers (Buffer #i)");
1094 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1095 for (i = 0; i < nPEs; i++)
1096 freeMsgIndex[i] = 0;
1100 freeRemoteGA(pe, ga)
1106 ASSERT(GALAlookup(ga) == NULL);
1108 if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1110 fprintf(stderr, "Filled a free message buffer\n");
1112 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1115 freeMsgBuffer[pe][i++] = (W_) ga->weight;
1116 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1117 freeMsgIndex[pe] = i;
1119 ga->weight = 0x0f0f0f0f;
1120 ga->loc.gc.gtid = 0x666;
1121 ga->loc.gc.slot = 0xdeaddead;
1126 sendFreeMessages(STG_NO_ARGS)
1130 for (i = 0; i < nPEs; i++) {
1131 if (freeMsgIndex[i] > 0)
1132 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1136 #endif /* PAR -- whole file */