1 /****************************************************************
3 * High Level Communications Routines (HLComms.lc) *
5 * Contains the high-level routines (i.e. communication *
6 * subsystem independent) used by GUM *
7 * (c) The Parade/AQUA Projects, Glasgow University, 1995 *
8 * Phil Trinder, Glasgow University, 12 December 1994 *
10 *****************************************************************/
12 #ifdef PAR /* whole file */
14 #define NON_POSIX_SOURCE /* so says Solaris */
20 \section{GUM Message Sending and Unpacking Functions}
23 @SendFetch@ packs the two global addresses and a load into a message +
27 static W_ *gumPackBuffer;
30 InitMoreBuffers(STG_NO_ARGS)
33 = (W_ *) stgMallocWords(RTSflags.ParFlags.packBufferSize, "initMoreBuffers");
37 sendFetch(rga, lga, load)
38 globalAddr *rga, *lga;
41 CostCentre Save_CCC = CCC;
43 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
46 ASSERT(rga->weight > 0 && lga->weight > 0);
48 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
49 rga->loc.gc.gtid, rga->loc.gc.slot, load);
51 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
52 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
53 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
59 @unpackFetch@ unpacks a FETCH message into two Global addresses and a load figure.
64 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
70 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
71 lga->loc.gc.slot = (int) buf[1];
73 rga->weight = (unsigned) buf[2];
74 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
75 rga->loc.gc.slot = (int) buf[4];
79 ASSERT(rga->weight > 0);
83 @SendResume@ packs the remote blocking queue's GA and data into a message
88 sendResume(rga, nelem, data)
93 CostCentre Save_CCC = CCC;
95 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
100 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
101 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
104 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105 (W_) rga->weight, (W_) rga->loc.gc.slot);
111 @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole.
115 blockFetch(P_ bf, P_ bh)
117 switch (INFO_TYPE(INFO_PTR(bh))) {
119 BF_LINK(bf) = Nil_closure;
120 SET_INFO_PTR(bh, BQ_info);
121 BQ_ENTRIES(bh) = (W_) bf;
123 #ifdef GC_MUT_REQUIRED
125 * If we modify a black hole in the old generation, we have to
126 * make sure it goes on the mutables list
129 if (bh <= StorageMgrInfo.OldLim) {
130 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
131 StorageMgrInfo.OldMutables = bh;
133 MUT_LINK(bh) = MUT_NOT_LINKED;
137 BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
138 BQ_ENTRIES(bh) = (W_) bf;
141 BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
142 FMBQ_ENTRIES(bh) = (W_) bf;
144 case INFO_SPEC_RBH_TYPE:
145 BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
146 SPEC_RBH_BQ(bh) = (W_) bf;
148 case INFO_GEN_RBH_TYPE:
149 BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
150 GEN_RBH_BQ(bh) = (W_) bf;
153 fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
154 (W_) bh, INFO_PTR(bh));
160 @processFetches@ constructs and sends resume messages for every
161 @BlockedFetch@ which is ready to be awakened.
164 extern P_ PendingFetches;
175 for (bf = PendingFetches; bf != Nil_closure; bf = next) {
179 * Find the target at the end of the indirection chain, and
180 * process it in much the same fashion as the original target
181 * of the fetch. Though we hope to find graph here, we could
182 * find a black hole (of any flavor) or even a FetchMe.
184 closure = BF_NODE(bf);
185 while (IS_INDIRECTION(INFO_PTR(closure)))
186 closure = (P_) IND_CLOSURE_PTR(closure);
187 ip = (P_) INFO_PTR(closure);
189 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
190 /* Forward the Fetch to someone else */
191 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
192 rga.loc.gc.slot = (int) BF_SLOT(bf);
193 rga.weight = (unsigned) BF_WEIGHT(bf);
195 sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
196 } else if (IS_BLACK_HOLE(ip)) {
197 BF_NODE(bf) = closure;
198 blockFetch(bf, closure);
200 /* We now have some local graph to send back */
204 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
206 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
207 SAVE_Hp -= PACK_HEAP_REQUIRED;
210 closure = BF_NODE(bf);
211 graph = PackNearbyGraph(closure, &size);
212 ASSERT(graph != NULL);
214 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
215 rga.loc.gc.slot = (int) BF_SLOT(bf);
216 rga.weight = (unsigned) BF_WEIGHT(bf);
218 sendResume(&rga, size, graph);
221 PendingFetches = Nil_closure;
226 @unpackResume@ unpacks a Resume message into two Global addresses and a data array.
231 unpackResume(globalAddr *lga, int *nelem, W_ *data)
236 lga->weight = (unsigned) buf[0];
237 lga->loc.gc.gtid = mytid;
238 lga->loc.gc.slot = (int) buf[1];
240 *nelem = (int) buf[2];
241 GetArgs(data, *nelem);
245 @SendAck@ packs the global address being acknowledged, together with
246 an array of global addresses for any closures shipped and sends them.
250 sendAck(task, ngas, gagamap)
258 CostCentre Save_CCC = CCC;
260 buffer = (long *) gumPackBuffer;
262 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
265 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
266 ASSERT(gagamap[1].weight > 0);
267 p[0] = (long) gagamap->weight;
268 p[1] = (long) gagamap->loc.gc.gtid;
269 p[2] = (long) gagamap->loc.gc.slot;
271 p[3] = (long) gagamap->weight;
272 p[4] = (long) gagamap->loc.gc.gtid;
273 p[5] = (long) gagamap->loc.gc.slot;
277 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
279 SendOpN(PP_ACK, task, p - buffer, buffer);
285 @unpackAck@ unpacks an Acknowledgement message into a Global address,
286 a count of the number of global addresses following and a map of
292 unpackAck(int *ngas, globalAddr *gagamap)
297 GetArgs(&GAarraysize, 1);
299 *ngas = GAarraysize / 6;
301 while (GAarraysize > 0) {
303 gagamap->weight = (unsigned) buf[0];
304 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
305 gagamap->loc.gc.slot = (int) buf[2];
307 gagamap->weight = (unsigned) buf[3];
308 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
309 gagamap->loc.gc.slot = (int) buf[5];
310 ASSERT(gagamap->weight > 0);
317 @SendFish@ packs the global address being acknowledged, together with
318 an array of global addresses for any closures shipped and sends them.
322 sendFish(destPE, origPE, age, history, hunger)
323 GLOBAL_TASK_ID destPE, origPE;
324 int age, history, hunger;
326 CostCentre Save_CCC = CCC;
328 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
332 fprintf(stderr,"Sending Fish to %lx\n", destPE);
334 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
342 @unpackFish@ unpacks a FISH message into the global task id of the
343 originating PE and 3 data fields: the age, history and hunger of the
344 fish. The history + hunger are not currently used.
349 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
355 *origPE = (GLOBAL_TASK_ID) buf[0];
357 *history = (int) buf[2];
358 *hunger = (int) buf[3];
362 @SendFree@ sends (weight, slot) pairs for GAs that we no longer need references
367 sendFree(pe, nelem, data)
372 CostCentre Save_CCC = CCC;
374 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
378 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
380 SendOpN(PP_FREE, pe, nelem, data);
387 @unpackFree@ unpacks a FREE message into the amount of data shipped and
393 unpackFree(int *nelem, W_ *data)
398 *nelem = (int) buf[0];
399 GetArgs(data, *nelem);
403 @SendSchedule@ sends a closure to be evaluated in response to a Fish
404 message. The message is directed to the PE that originated the Fish
405 (origPE), and includes the packed closure (data) along with its size
411 sendSchedule(origPE, nelem, data)
412 GLOBAL_TASK_ID origPE;
417 CostCentre Save_CCC = CCC;
419 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
422 #ifdef SCHEDULE_DEBUG
424 fprintf(stderr, "Sending Schedule to %x\n", origPE);
427 SendOpN(PP_SCHEDULE, origPE, nelem, data);
433 @unpackSchedule@ unpacks a SCHEDULE message into the Global address of
434 the closure shipped, the amount of data shipped (nelem) and the data
440 unpackSchedule(int *nelem, W_ *data)
445 *nelem = (int) buf[0];
446 GetArgs(data, *nelem);
450 \section{Message-Processing Functions}
452 The following routines process incoming GUM messages. Often reissuing
453 messages in response.
455 @processFish@ unpacks a fish message, reissuing it if it's our own,
456 sending work if we have it or sending it onwards otherwise.
460 processFish(STG_NO_ARGS)
462 GLOBAL_TASK_ID origPE;
463 int age, history, hunger;
465 unpackFish(&origPE, &age, &history, &hunger);
467 if (origPE == mytid) {
472 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
476 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
477 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
478 SAVE_Hp -= PACK_HEAP_REQUIRED;
479 /* Now go back and try again */
481 sendSchedule(origPE, size, graph);
487 /* We have no sparks to give */
488 if (age < FISH_LIFE_EXPECTANCY)
489 sendFish(choosePE(), origPE,
490 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
492 /* Send it home to die */
494 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
500 @processFetch@ either returns the requested data (if available)
501 or blocks the remote blocking queue on a black hole (if not).
505 processFetch(STG_NO_ARGS)
513 unpackFetch(&ga, &rga, &load);
515 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
516 ga.loc.gc.gtid, ga.loc.gc.slot,
517 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
520 closure = GALAlookup(&ga);
521 ip = (P_) INFO_PTR(closure);
523 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
524 /* Forward the Fetch to someone else */
525 sendFetch(FETCHME_GA(closure), &rga, load);
526 } else if (rga.loc.gc.gtid == mytid) {
527 /* Our own FETCH forwarded back around to us */
528 P_ fmbq = GALAlookup(&rga);
530 /* We may have already discovered that the fetch target is our own. */
532 CommonUp(fmbq, closure);
533 (void) addWeight(&rga);
534 } else if (IS_BLACK_HOLE(ip)) {
535 /* This includes RBH's and FMBQ's */
538 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
539 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
540 closure = GALAlookup(&ga);
541 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
543 ASSERT(GALAlookup(&rga) == NULL);
545 SET_BF_HDR(bf, BF_info, bogosity);
546 BF_NODE(bf) = closure;
547 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
548 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
549 BF_WEIGHT(bf) = (W_) rga.weight;
550 blockFetch(bf, closure);
553 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
554 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
558 /* The target of the FetchMe is some local graph */
562 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
563 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
564 SAVE_Hp -= PACK_HEAP_REQUIRED;
565 closure = GALAlookup(&ga);
566 graph = PackNearbyGraph(closure, &size);
567 ASSERT(graph != NULL);
569 sendResume(&rga, size, graph);
574 @processFree@ unpacks a FREE message and adds the weights to our GAs.
578 processFree(STG_NO_ARGS)
581 static W_ *freeBuffer;
585 freeBuffer = gumPackBuffer;
586 unpackFree(&nelem, freeBuffer);
588 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
590 ga.loc.gc.gtid = mytid;
591 for (i = 0; i < nelem;) {
592 ga.weight = (unsigned) freeBuffer[i++];
593 ga.loc.gc.slot = (int) freeBuffer[i++];
595 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
596 ga.loc.gc.slot, ga.weight);
598 (void) addWeight(&ga);
603 @processResume@ unpacks a RESUME message into the graph, filling in
604 the LA -> GA, and GA -> LA tables. Threads blocked on the original
605 @FetchMe@ (now a blocking queue) are awakened, and the blocking queue
606 is converted into an indirection. Finally it sends an ACK in response
607 which contains any newly allocated GAs.
612 processResume(GLOBAL_TASK_ID sender)
616 static W_ *packBuffer;
622 packBuffer = gumPackBuffer;
623 unpackResume(&lga, &nelem, packBuffer);
626 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
627 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
628 PrintPacket(packBuffer);
632 * We always unpack the incoming graph, even if we've received the
633 * requested node in some other data packet (and already awakened
634 * the blocking queue).
636 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
637 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
638 SAVE_Hp -= packBuffer[0];
641 /* Do this *after* GC; we don't want to release the object early! */
644 (void) addWeight(&lga);
646 old = GALAlookup(&lga);
648 if (RTSflags.ParFlags.granSimStats) {
651 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
652 for(tso = (P_) FMBQ_ENTRIES(old);
653 TSO_LINK(tso) != Nil_closure;
657 DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender));
660 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
661 ASSERT(newGraph != NULL);
664 * Sometimes, unpacking will common up the resumee with the
665 * incoming graph, but if it hasn't, we'd better do so now.
668 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
669 CommonUp(old, newGraph);
672 DebugPrintGAGAMap(gagamap, nGAs);
675 sendAck(sender, nGAs, gagamap);
679 @processSchedule@ unpacks a SCHEDULE message into the graph, filling
680 in the LA -> GA, and GA -> LA tables. The root of the graph is added to
681 the local spark queue. Finally it sends an ACK in response
682 which contains any newly allocated GAs.
686 processSchedule(GLOBAL_TASK_ID sender)
691 static W_ *packBuffer;
696 packBuffer = gumPackBuffer; /* HWL */
697 unpackSchedule(&nelem, packBuffer);
699 #ifdef SCHEDULE_DEBUG
700 fprintf(stderr, "Rcvd Schedule\n");
701 PrintPacket(packBuffer);
705 * For now, the graph is a closure to be sparked as an advisory
706 * spark, but in future it may be a complete spark with
707 * required/advisory status, priority etc.
710 space_required = packBuffer[0];
711 if (SAVE_Hp + space_required >= SAVE_HpLim) {
712 ReallyPerformThreadGC(space_required, rtsFalse);
713 SAVE_Hp -= space_required;
715 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
716 ASSERT(newGraph != NULL);
717 success = Spark(newGraph, rtsFalse);
720 #ifdef SCHEDULE_DEBUG
721 DebugPrintGAGAMap(gagamap, nGAs);
725 sendAck(sender, nGAs, gagamap);
731 @processAck@ unpacks an ACK, and uses the GAGA map to convert RBH's
732 (which represent shared thunks that have been shipped) into fetch-mes
737 processAck(STG_NO_ARGS)
742 globalAddr gagamap[MAX_GAS * 2];
744 unpackAck(&nGAs, gagamap);
747 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
748 DebugPrintGAGAMap(gagamap, nGAs);
752 * For each (oldGA, newGA) pair, set the GA of the corresponding
753 * thunk to the newGA, convert the thunk to a FetchMe, and return
754 * the weight from the oldGA.
756 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
757 P_ old = GALAlookup(gaga);
758 P_ new = GALAlookup(gaga + 1);
761 /* We don't have this closure, so we make a fetchme for it */
762 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
764 convertToFetchMe(old, ga);
767 * Oops...we've got this one already; update the RBH to
768 * point to the object we already know about, whatever it
774 * Increase the weight of the object by the amount just
775 * received in the second part of the ACK pair.
777 (void) addWeight(gaga + 1);
779 (void) addWeight(gaga);
784 \section{GUM Message Processor}
786 @processMessages@ processes any messages that have arrived, calling
787 appropriate routines depending on the message tag
788 (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
789 present and performs a blocking receive! During profiling it
790 busy-waits in order to record idle time.
794 processMessages(STG_NO_ARGS)
800 /* Temporary Test Definitions */
804 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
807 if (RTSflags.CcFlags.doCostCentres) {
808 CCC = (CostCentre)STATIC_CC_REF(CC_IDLE);
811 while (!PacketsWaiting())
814 CCC = (CostCentre)STATIC_CC_REF(CC_MSG);
817 packet = GetPacket(); /* Get next message; block until one available */
820 get_opcode_and_sender(packet, &opcode, &task);
825 EXIT(EXIT_SUCCESS); /* The computation has been completed by someone
850 processSchedule(task);
854 /* Anything we're not prepared to deal with. */
855 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
856 mytid, opcode, task);
861 } while (PacketsWaiting()); /* While there are messages: process them */
863 } /* processMessages */
866 \section{Exception Handlers}
869 @Comms_Harness_Exception@ is an exception handler that tests out the different
874 Comms_Harness_Exception(packet)
879 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
880 OPCODE opcode = Opcode(packet);
883 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
888 IAmMainThread = rtsTrue; /* This processor is the IO task */
889 /* fprintf(stderr,"I am Main Thread\n"); */
899 get_opcode_and_sender(packet,&opcode,&task);
900 fprintf(stderr,"Task %x: Got Fetch from %x\n", mytid, task );
901 unpackFetch(&ga,&bqga,&load);
902 fprintf(stderr,"In PE, Fetch = (%x, %d, %x) (%x, %d, %x) %d \n",
903 ga.loc.gc.gtid, ga.loc.gc.slot, ga.weight,
904 bqga.loc.gc.gtid, bqga.loc.gc.slot, bqga.weight, load);
905 /*Send Resume in Response*/
906 for (i=0; i <= 10; ++i) data[i] = i;
907 sendResume(&bqga,11,data);
914 globalAddr gagamap[MAX_GAS*2];
916 get_opcode_and_sender(packet,&opcode,&task);
917 fprintf(stderr,"Task %x: Got Ack from %x\n", mytid, task );
918 unpackAck(&nGAs,gagamap);
920 DebugPrintGAGAMap(gagamap,nGAs);
927 GLOBAL_TASK_ID origPE;
928 int age, history, hunger;
932 get_opcode_and_sender(packet,&opcode,&task);
933 fprintf(stderr,"Task %x: Got FISH from %x\n", mytid, task );
934 unpackFish(&origPE, &age, &history, &hunger);
935 fprintf(stderr,"In PE, FISH.origPE = %x age = %d history = %d hunger = %d\n",
936 origPE, age, history, hunger);
938 testGA.loc.gc.gtid = mytid; testGA.loc.gc.slot = 52; testGA.weight = 1024;
939 for (i=0; i <= 5; ++i) testData[i] = 40+i;
940 sendSchedule(origPE,6,testData);
945 { /* Test variables */
949 get_opcode_and_sender(packet,&opcode,&task);
950 fprintf(stderr,"Task %x: Got SCHEDULE from %x\n", mytid, task );
951 unpackSchedule(&nelem, &testData);
952 fprintf(stderr,"In PE, nelem = %d \n", nelem);
953 for (i=0; i <= 5; ++i) fprintf(stderr,"tData[%d] = %d ",i,testData[i]);
954 fprintf(stderr,"\n");
958 /* Anything we're not prepared to deal with. Note that ALL
959 * opcodes are discarded during termination -- this helps
960 * prevent bizarre race conditions.
963 if (!GlobalStopPending)
965 GLOBAL_TASK_ID ErrorTask;
968 get_opcode_and_sender(packet,&opcode,&ErrorTask);
969 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in Comms Harness\n",
970 mytid, opcode, ErrorTask );
980 @STG_Exception@ handles real communication exceptions
984 STG_Exception(packet)
987 /* GLOBAL_TASK_ID sender = Sender_Task(packet); */
988 OPCODE opcode = Opcode(packet);
990 /* fprintf(stderr,"STG_Exception: Received %s (%x), sender %x\n",GetOpName(opcode),opcode,sender); */
995 IAmMainThread = rtsTrue; /* This processor is the IO task */
996 /* fprintf(stderr,"I am Main Thread\n"); */
1003 /* Anything we're not prepared to deal with. Note that ALL opcodes are discarded
1004 during termination -- this helps prevent bizarre race conditions.
1007 if (!GlobalStopPending)
1009 GLOBAL_TASK_ID ErrorTask;
1012 get_opcode_and_sender(packet,&opcode,&ErrorTask);
1013 fprintf(stderr,"Task %x: Unexpected opcode %x from %x in STG_Exception\n",
1014 mytid, opcode, ErrorTask );
1022 \section{Miscellaneous Functions}
1024 @ChoosePE@ selects a GlobalTaskId from the array of PEs 'at random'.
1025 Important properties:
1026 o it varies during execution, even if the PE is idle
1027 o it's different for each PE
1028 o we never send a fish to ourselves
1031 extern long lrand48 (STG_NO_ARGS);
1034 choosePE(STG_NO_ARGS)
1038 temp = lrand48() % nPEs;
1039 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
1040 temp = (temp + 1) % nPEs;
1046 @WaitForTermination@ enters an infinite loop waiting for the
1047 termination sequence to be completed.
1051 WaitForTermination(STG_NO_ARGS)
1054 PACKET p = GetPacket();
1063 DebugPrintGAGAMap(gagamap, nGAs)
1064 globalAddr *gagamap;
1069 for (i = 0; i < nGAs; ++i, gagamap += 2)
1070 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
1071 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
1072 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
1079 static PP_ freeMsgBuffer = NULL;
1080 static int *freeMsgIndex = NULL;
1083 prepareFreeMsgBuffers(STG_NO_ARGS)
1087 /* Allocate the freeMsg buffers just once and then hang onto them. */
1089 if (freeMsgIndex == NULL) {
1091 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
1092 freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
1094 for(i = 0; i < nPEs; i++) {
1096 freeMsgBuffer[i] = (P_) stgMallocWords(RTSflags.ParFlags.packBufferSize,
1097 "prepareFreeMsgBuffers (Buffer #i)");
1102 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
1103 for (i = 0; i < nPEs; i++)
1104 freeMsgIndex[i] = 0;
1108 freeRemoteGA(pe, ga)
1114 ASSERT(GALAlookup(ga) == NULL);
1116 if ((i = freeMsgIndex[pe]) + 2 >= RTSflags.ParFlags.packBufferSize) {
1118 fprintf(stderr, "Filled a free message buffer\n");
1120 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
1123 freeMsgBuffer[pe][i++] = (W_) ga->weight;
1124 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
1125 freeMsgIndex[pe] = i;
1127 ga->weight = 0x0f0f0f0f;
1128 ga->loc.gc.gtid = 0x666;
1129 ga->loc.gc.slot = 0xdeaddead;
1134 sendFreeMessages(STG_NO_ARGS)
1138 for (i = 0; i < nPEs; i++) {
1139 if (freeMsgIndex[i] > 0)
1140 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1144 #endif /* PAR -- whole file */