1 /****************************************************************
3 * High Level Communications Routines (HLComms.lc) *
5 * Contains the high-level routines (i.e. communication *
6 * subsystem independent) used by GUM *
7 * (c) The Parade/AQUA Projects, Glasgow University, 1995 *
8 * Phil Trinder, Glasgow University, 12 December 1994 *
10 *****************************************************************/
12 #ifdef PAR /* whole file */
15 #define NON_POSIX_SOURCE /* so says Solaris */
22 \section{GUM Message Sending and Unpacking Functions}
25 @SendFetch@ packs the two global addresses and a load into a message +
29 static W_ *gumPackBuffer;
32 InitMoreBuffers(STG_NO_ARGS)
35 = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
39 sendFetch(rga, lga, load)
40 globalAddr *rga, *lga;
43 CostCentre Save_CCC = CCC;
45 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
48 ASSERT(rga->weight > 0 && lga->weight > 0);
50 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
51 rga->loc.gc.gtid, rga->loc.gc.slot, load);
53 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
54 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
55 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
61 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
66 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
72 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
73 lga->loc.gc.slot = (int) buf[1];
75 rga->weight = (unsigned) buf[2];
76 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
77 rga->loc.gc.slot = (int) buf[4];
81 ASSERT(rga->weight > 0);
85 @SendResume@ packs the remote blocking queue's GA and data into a message
90 sendResume(rga, nelem, data)
95 CostCentre Save_CCC = CCC;
97 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
102 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
103 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
106 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
107 (W_) rga->weight, (W_) rga->loc.gc.slot);
113 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
117 blockFetch(P_ bf, P_ bh)
119 switch (INFO_TYPE(INFO_PTR(bh))) {
121 BF_LINK(bf) = PrelBase_Z91Z93_closure;
122 SET_INFO_PTR(bh, BQ_info);
123 BQ_ENTRIES(bh) = (W_) bf;
125 #ifdef GC_MUT_REQUIRED
127 * If we modify a black hole in the old generation, we have to
128 * make sure it goes on the mutables list
131 if (bh <= StorageMgrInfo.OldLim) {
132 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
133 StorageMgrInfo.OldMutables = bh;
135 MUT_LINK(bh) = MUT_NOT_LINKED;
139 BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
140 BQ_ENTRIES(bh) = (W_) bf;
143 BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
144 FMBQ_ENTRIES(bh) = (W_) bf;
146 case INFO_SPEC_RBH_TYPE:
147 BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
148 SPEC_RBH_BQ(bh) = (W_) bf;
150 case INFO_GEN_RBH_TYPE:
151 BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
152 GEN_RBH_BQ(bh) = (W_) bf;
155 fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
156 (W_) bh, INFO_PTR(bh));
162 @processFetches@ constructs and sends resume messages for every
163 @BlockedFetch@ which is ready to be awakened.
166 extern P_ PendingFetches;
177 for (bf = PendingFetches; bf != PrelBase_Z91Z93_closure; bf = next) {
181 * Find the target at the end of the indirection chain, and
182 * process it in much the same fashion as the original target
183 * of the fetch. Though we hope to find graph here, we could
184 * find a black hole (of any flavor) or even a FetchMe.
186 closure = BF_NODE(bf);
187 while (IS_INDIRECTION(INFO_PTR(closure)))
188 closure = (P_) IND_CLOSURE_PTR(closure);
189 ip = (P_) INFO_PTR(closure);
191 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
192 /* Forward the Fetch to someone else */
193 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
194 rga.loc.gc.slot = (int) BF_SLOT(bf);
195 rga.weight = (unsigned) BF_WEIGHT(bf);
197 sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
198 } else if (IS_BLACK_HOLE(ip)) {
199 BF_NODE(bf) = closure;
200 blockFetch(bf, closure);
202 /* We now have some local graph to send back */
206 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
208 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
209 SAVE_Hp -= PACK_HEAP_REQUIRED;
212 closure = BF_NODE(bf);
213 graph = PackNearbyGraph(closure, &size);
214 ASSERT(graph != NULL);
216 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
217 rga.loc.gc.slot = (int) BF_SLOT(bf);
218 rga.weight = (unsigned) BF_WEIGHT(bf);
220 sendResume(&rga, size, graph);
223 PendingFetches = PrelBase_Z91Z93_closure;
228 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
233 unpackResume(globalAddr *lga, int *nelem, W_ *data)
238 lga->weight = (unsigned) buf[0];
239 lga->loc.gc.gtid = mytid;
240 lga->loc.gc.slot = (int) buf[1];
242 *nelem = (int) buf[2];
243 GetArgs(data, *nelem);
247 @SendAck@ packs the global address being acknowledged, together with
248 an array of global addresses for any closures shipped and sends them.
252 sendAck(task, ngas, gagamap)
260 CostCentre Save_CCC = CCC;
262 buffer = (long *) gumPackBuffer;
264 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
267 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
268 ASSERT(gagamap[1].weight > 0);
269 p[0] = (long) gagamap->weight;
270 p[1] = (long) gagamap->loc.gc.gtid;
271 p[2] = (long) gagamap->loc.gc.slot;
273 p[3] = (long) gagamap->weight;
274 p[4] = (long) gagamap->loc.gc.gtid;
275 p[5] = (long) gagamap->loc.gc.slot;
279 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
281 SendOpN(PP_ACK, task, p - buffer, buffer);
287 @unpackAck@ unpacks an Acknowledgement message into a Global address,
288 a count of the number of global addresses following and a map of
294 unpackAck(int *ngas, globalAddr *gagamap)
299 GetArgs(&GAarraysize, 1);
301 *ngas = GAarraysize / 6;
303 while (GAarraysize > 0) {
305 gagamap->weight = (unsigned) buf[0];
306 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
307 gagamap->loc.gc.slot = (int) buf[2];
309 gagamap->weight = (unsigned) buf[3];
310 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
311 gagamap->loc.gc.slot = (int) buf[5];
312 ASSERT(gagamap->weight > 0);
319 @SendFish@ packs the global address being acknowledged, together with
320 an array of global addresses for any closures shipped and sends them.
324 sendFish(destPE, origPE, age, history, hunger)
325 GLOBAL_TASK_ID destPE, origPE;
326 int age, history, hunger;
328 CostCentre Save_CCC = CCC;
330 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
334 fprintf(stderr,"Sending Fish to %lx\n", destPE);
336 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
344 @unpackFish@ unpacks a FISH message into the global task id of the
345 originating PE and 3 data fields: the age, history and hunger of the
346 fish. The history + hunger are not currently used.
351 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
357 *origPE = (GLOBAL_TASK_ID) buf[0];
359 *history = (int) buf[2];
360 *hunger = (int) buf[3];
364 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
369 sendFree(pe, nelem, data)
374 CostCentre Save_CCC = CCC;
376 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
380 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
382 SendOpN(PP_FREE, pe, nelem, data);
389 @unpackFree@ unpacks a FREE message into the amount of data shipped and
395 unpackFree(int *nelem, W_ *data)
400 *nelem = (int) buf[0];
401 GetArgs(data, *nelem);
405 @SendSchedule@ sends a closure to be evaluated in response to a Fish
406 message. The message is directed to the PE that originated the Fish
407 (origPE), and includes the packed closure (data) along with its size
413 sendSchedule(origPE, nelem, data)
414 GLOBAL_TASK_ID origPE;
419 CostCentre Save_CCC = CCC;
421 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
424 #ifdef SCHEDULE_DEBUG
426 fprintf(stderr, "Sending Schedule to %x\n", origPE);
429 SendOpN(PP_SCHEDULE, origPE, nelem, data);
435 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
436 the closure shipped, the amount of data shipped (nelem) and the data
442 unpackSchedule(int *nelem, W_ *data)
447 *nelem = (int) buf[0];
448 GetArgs(data, *nelem);
452 \section{Message-Processing Functions}
454 The following routines process incoming GUM messages. Often reissuing
455 messages in response.
457 @processFish@ unpacks a fish message, reissuing it if it's our own,
458 sending work if we have it or sending it onwards otherwise.
462 processFish(STG_NO_ARGS)
464 GLOBAL_TASK_ID origPE;
465 int age, history, hunger;
467 unpackFish(&origPE, &age, &history, &hunger);
469 if (origPE == mytid) {
474 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
478 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
479 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
480 SAVE_Hp -= PACK_HEAP_REQUIRED;
481 /* Now go back and try again */
483 sendSchedule(origPE, size, graph);
489 /* We have no sparks to give */
490 if (age < FISH_LIFE_EXPECTANCY)
491 sendFish(choosePE(), origPE,
492 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
494 /* Send it home to die */
496 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
502 @processFetch@ either returns the requested data (if available)
503 or blocks the remote blocking queue on a black hole (if not).
507 processFetch(STG_NO_ARGS)
515 unpackFetch(&ga, &rga, &load);
517 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
518 ga.loc.gc.gtid, ga.loc.gc.slot,
519 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
522 closure = GALAlookup(&ga);
523 ip = (P_) INFO_PTR(closure);
525 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
526 /* Forward the Fetch to someone else */
527 sendFetch(FETCHME_GA(closure), &rga, load);
528 } else if (rga.loc.gc.gtid == mytid) {
529 /* Our own FETCH forwarded back around to us */
530 P_ fmbq = GALAlookup(&rga);
532 /* We may have already discovered that the fetch target is our own. */
534 CommonUp(fmbq, closure);
535 (void) addWeight(&rga);
536 } else if (IS_BLACK_HOLE(ip)) {
537 /* This includes RBH's and FMBQ's */
540 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
541 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
542 closure = GALAlookup(&ga);
543 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
545 ASSERT(GALAlookup(&rga) == NULL);
547 SET_BF_HDR(bf, BF_info, bogosity);
548 BF_NODE(bf) = closure;
549 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
550 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
551 BF_WEIGHT(bf) = (W_) rga.weight;
552 blockFetch(bf, closure);
555 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
556 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
560 /* The target of the FetchMe is some local graph */
564 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
565 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
566 SAVE_Hp -= PACK_HEAP_REQUIRED;
567 closure = GALAlookup(&ga);
568 graph = PackNearbyGraph(closure, &size);
569 ASSERT(graph != NULL);
571 sendResume(&rga, size, graph);
576 @processFree@ unpacks a FREE message and adds the weights to our GAs.
580 processFree(STG_NO_ARGS)
583 static W_ *freeBuffer;
587 freeBuffer = gumPackBuffer;
588 unpackFree(&nelem, freeBuffer);
590 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
592 ga.loc.gc.gtid = mytid;
593 for (i = 0; i < nelem;) {
594 ga.weight = (unsigned) freeBuffer[i++];
595 ga.loc.gc.slot = (int) freeBuffer[i++];
597 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
598 ga.loc.gc.slot, ga.weight);
600 (void) addWeight(&ga);
605 @processResume@ unpacks a RESUME message into the graph, filling in
606 the LA -> GA, and GA -> LA tables. Threads blocked on the original
607 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
608 is converted into an indirection. Finally it sends an ACK in response
609 which contains any newly allocated GAs.
614 processResume(GLOBAL_TASK_ID sender)
618 static W_ *packBuffer;
624 packBuffer = gumPackBuffer;
625 unpackResume(&lga, &nelem, packBuffer);
628 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
629 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
630 PrintPacket(packBuffer);
634 * We always unpack the incoming graph, even if we've received the
635 * requested node in some other data packet (and already awakened
636 * the blocking queue).
638 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
639 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
640 SAVE_Hp -= packBuffer[0];
643 /* Do this *after* GC; we don't want to release the object early! */
646 (void) addWeight(&lga);
648 old = GALAlookup(&lga);
650 if (RTSflags.ParFlags.granSimStats) {
653 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
654 for(tso = (P_) FMBQ_ENTRIES(old);
655 TSO_LINK(tso) != PrelBase_Z91Z93_closure;
659 /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
660 DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
664 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
665 ASSERT(newGraph != NULL);
668 * Sometimes, unpacking will common up the resumee with the
669 * incoming graph, but if it hasn't, we'd better do so now.
672 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
673 CommonUp(old, newGraph);
676 DebugPrintGAGAMap(gagamap, nGAs);
679 sendAck(sender, nGAs, gagamap);
683 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
684 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
685 the local spark queue. Finally it sends an ACK in response
686 which contains any newly allocated GAs.
690 processSchedule(GLOBAL_TASK_ID sender)
695 static W_ *packBuffer;
700 packBuffer = gumPackBuffer; /* HWL */
701 unpackSchedule(&nelem, packBuffer);
703 #ifdef SCHEDULE_DEBUG
704 fprintf(stderr, "Rcvd Schedule\n");
705 PrintPacket(packBuffer);
709 * For now, the graph is a closure to be sparked as an advisory
710 * spark, but in future it may be a complete spark with
711 * required/advisory status, priority etc.
714 space_required = packBuffer[0];
715 if (SAVE_Hp + space_required >= SAVE_HpLim) {
716 ReallyPerformThreadGC(space_required, rtsFalse);
717 SAVE_Hp -= space_required;
719 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
720 ASSERT(newGraph != NULL);
721 success = Spark(newGraph, rtsFalse);
724 #ifdef SCHEDULE_DEBUG
725 DebugPrintGAGAMap(gagamap, nGAs);
729 sendAck(sender, nGAs, gagamap);
735 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
736 (which represent shared thunks that have been shipped) into fetch-mes
741 processAck(STG_NO_ARGS)
746 globalAddr gagamap[MAX_GAS * 2];
748 unpackAck(&nGAs, gagamap);
751 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
752 DebugPrintGAGAMap(gagamap, nGAs);
756 * For each (oldGA, newGA) pair, set the GA of the corresponding
757 * thunk to the newGA, convert the thunk to a FetchMe, and return
758 * the weight from the oldGA.
760 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
761 P_ old = GALAlookup(gaga);
762 P_ new = GALAlookup(gaga + 1);
765 /* We don't have this closure, so we make a fetchme for it */
766 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
768 convertToFetchMe(old, ga);
771 * Oops...we've got this one already; update the RBH to
772 * point to the object we already know about, whatever it
778 * Increase the weight of the object by the amount just
779 * received in the second part of the ACK pair.
781 (void) addWeight(gaga + 1);
783 (void) addWeight(gaga);
788 \section{GUM Message Processor}
790 @processMessages@ processes any messages that have arrived, calling
791 appropriate routines depending on the message tag
792 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
793 present and performs a blocking receive! During profiling it
794 busy-waits in order to record idle time.
798 processMessages(STG_NO_ARGS)
804 /* Temporary Test Definitions */
808 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
811 if (RTSflags.CcFlags.doCostCentres) {
812 CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
815 while (!PacketsWaiting())
818 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
821 packet = GetPacket(); /* Get next message; block until one available */
824 get_opcode_and_sender(packet, &opcode, &task);
829 EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
854 processSchedule(task);
858 /* Anything we're not prepared to deal with. */
859 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
860 mytid, opcode, task);
865 } while (PacketsWaiting()); /* While there are messages: process them */
867 } /* processMessages */
870 \section{Exception Handlers}
873 @Comms_Harness_Exception@ is an exception handler that tests out the different
878 Comms_Harness_Exception(packet)
883 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
884 OPCODE opcode = Opcode(packet);
887 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
898 get_opcode_and_sender(packet,&opcode,&task);
899 fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
900 unpackFetch(&ga,&bqga,&load);
901 fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
902 ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight,
903 bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
904 /*Send Resume in Response*/
905 for (i=0; i <= 10; ++i) data[i] = i;
906 sendResume(&bqga,11,data);
913 globalAddr gagamap[MAX_GAS*2];
915 get_opcode_and_sender(packet,&opcode,&task);
916 fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
917 unpackAck(&nGAs,gagamap);
919 DebugPrintGAGAMap(gagamap,nGAs);
926 GLOBAL_TASK_ID origPE;
927 int age, history, hunger;
931 get_opcode_and_sender(packet,&opcode,&task);
932 fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
933 unpackFish(&origPE, &age, &history, &hunger);
934 fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
935 origPE, age, history, hunger);
937 testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
938 for (i=0; i <= 5; ++i) testData[i] = 40+i;
939 sendSchedule(origPE,6,testData);
944 { /* Test variables */
948 get_opcode_and_sender(packet,&opcode,&task);
949 fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
950 unpackSchedule(&nelem, &testData);
951 fprintf(stderr,"In PE, nelem = %d \n", nelem);
952 for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
953 fprintf(stderr,"\n");
957 /* Anything we're not prepared to deal with. Note that ALL
958 * opcodes are discarded during termination -- this helps
959 * prevent bizarre race conditions.
962 if (!GlobalStopPending)
964 GLOBAL_TASK_ID ErrorTask;
967 get_opcode_and_sender(packet,&opcode,&ErrorTask);
968 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
969 mytid, opcode, ErrorTask );
979 @STG_Exception@ handles real communication exceptions
983 STG_Exception(packet)
986 GLOBAL_TASK_ID sender = Sender_Task(packet);
987 OPCODE opcode = Opcode(packet);
989 fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender);
997 /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded
998 during termination -- this helps prevent bizarre race conditions.
1001 if (!GlobalStopPending)
1003 GLOBAL_TASK_ID ErrorTask;
1006 get_opcode_and_sender(packet,&opcode,&ErrorTask);
1007 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1008 mytid, opcode, ErrorTask );
1016 \section{Miscellaneous Functions}
1018 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1019 Important properties:
1020 o it varies during execution, even if the PE is idle
1021 o it's different for each PE
1022 o we never send a fish to ourselves
1025 extern long lrand48 (STG_NO_ARGS);
1028 choosePE(STG_NO_ARGS)
1032 temp = lrand48() % nPEs;
1033 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
1034 temp = (temp + 1) % nPEs;
1040 @WaitForTermination@ enters an infinite loop waiting for the
1041 termination sequence to be completed.
1045 WaitForTermination(STG_NO_ARGS)
1048 PACKET p = GetPacket();
1057 DebugPrintGAGAMap(gagamap, nGAs)
1058 globalAddr *gagamap;
1063 for (i = 0; i < nGAs; ++i, gagamap += 2)
1064 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1065 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1066 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1073 static PP_ freeMsgBuffer = NULL;
1074 static int *freeMsgIndex = NULL;
1077 prepareFreeMsgBuffers(STG_NO_ARGS)
1081 /* Allocate the freeMsg buffers just once and then hang onto them. */
1083 if (freeMsgIndex == NULL) {
1085 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1086 freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1088 for(i = 0; i < nPEs; i++) {
1090 freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1091 "prepareFreeMsgBuffers (Buffer #i)");
1096 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1097 for (i = 0; i < nPEs; i++)
1098 freeMsgIndex[i] = 0;
1102 freeRemoteGA(pe, ga)
1108 ASSERT(GALAlookup(ga) == NULL);
1110 if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1112 fprintf(stderr, "Filled a free message buffer\n");
1114 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1117 freeMsgBuffer[pe][i++] = (W_) ga->weight;
1118 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1119 freeMsgIndex[pe] = i;
1121 ga->weight = 0x0f0f0f0f;
1122 ga->loc.gc.gtid = 0x666;
1123 ga->loc.gc.slot = 0xdeaddead;
1128 sendFreeMessages(STG_NO_ARGS)
1132 for (i = 0; i < nPEs; i++) {
1133 if (freeMsgIndex[i] > 0)
1134 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1138 #endif /* PAR -- whole file */