1 /* -----------------------------------------------------------------------------
3 * $Id: HLComms.c,v 1.3 1999/02/15 14:30:56 simonm Exp $
5 * High Level Communications Routines (HLComms.lc)
7 * Contains the high-level routines (i.e. communication
8 * subsystem independent) used by GUM
10 * Phil Trinder, Glasgow University, 12 December 1994
12 * Phil Trinder, Simon Marlow July 1998
14 * -------------------------------------------------------------------------- */
16 #ifdef PAR /* whole file */
19 #define NON_POSIX_SOURCE /* so says Solaris */
30 * GUM Message Sending and Unpacking Functions
31 * ********************************************
35 * Allocate space for message processing
38 static W_ *gumPackBuffer;
44 = (W_ *) stgMallocWords(RtsFlags.ParFlags.packBufferSize, "initMoreBuffers");
48 *SendFetch packs the two global addresses and a load into a message +
53 sendFetch(globalAddr *rga, globalAddr *lga, int load)
56 ASSERT(rga->weight > 0 && lga->weight > 0);
58 fprintf(stderr, "Sending Fetch (%x, %d, 0), load = %d\n",
59 rga->loc.gc.gtid, rga->loc.gc.slot, load);
61 SendOpV(PP_FETCH, rga->loc.gc.gtid, 6,
62 (W_) rga->loc.gc.gtid, (W_) rga->loc.gc.slot,
63 (W_) lga->weight, (W_) lga->loc.gc.gtid, (W_) lga->loc.gc.slot, (W_) load);
67 *unpackFetch unpacks a FETCH message into two Global addresses and a load figure.
71 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
77 lga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[0];
78 lga->loc.gc.slot = (int) buf[1];
80 rga->weight = (unsigned) buf[2];
81 rga->loc.gc.gtid = (GLOBAL_TASK_ID) buf[3];
82 rga->loc.gc.slot = (int) buf[4];
86 ASSERT(rga->weight > 0);
90 * SendResume packs the remote blocking queue's GA and data into a message
95 sendResume(globalAddr *rga, int nelem, StgPtr data)
100 fprintf(stderr, "Sending Resume for (%x, %d, %x)\n",
101 rga->loc.gc.gtid, rga->loc.gc.slot, rga->weight);
104 SendOpNV(PP_RESUME, rga->loc.gc.gtid, nelem, data, 2,
105 (W_) rga->weight, (W_) rga->loc.gc.slot);
110 * blockFetch blocks a BlockedFetch node on some kind of black hole.
113 blockFetch(StgPtr bf, StgPtr bh)
117 Empty until Blocked fetches etc defined
118 switch (INFO_TYPE(INFO_PTR(bh))) {
120 BF_LINK(bf) = PrelBase_Z91Z93_closure;
121 SET_INFO_PTR(bh, BQ_info);
122 BQ_ENTRIES(bh) = (W_) bf;
124 #ifdef GC_MUT_REQUIRED
126 * If we modify a black hole in the old generation, we have to
127 * make sure it goes on the mutables list
130 if (bh <= StorageMgrInfo.OldLim) {
131 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
132 StorageMgrInfo.OldMutables = bh;
134 MUT_LINK(bh) = MUT_NOT_LINKED;
138 BF_LINK(bf) = (P_) BQ_ENTRIES(bh);
139 BQ_ENTRIES(bh) = (W_) bf;
142 BF_LINK(bf) = (P_) FMBQ_ENTRIES(bh);
143 FMBQ_ENTRIES(bh) = (W_) bf;
145 case INFO_SPEC_RBH_TYPE:
146 BF_LINK(bf) = (P_) SPEC_RBH_BQ(bh);
147 SPEC_RBH_BQ(bh) = (W_) bf;
149 case INFO_GEN_RBH_TYPE:
150 BF_LINK(bf) = (P_) GEN_RBH_BQ(bh);
151 GEN_RBH_BQ(bh) = (W_) bf;
154 fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)\n",
155 (W_) bh, INFO_PTR(bh));
162 * processFetches constructs and sends resume messages for every
163 * BlockedFetch which is ready to be awakened.
165 extern P_ PendingFetches;
172 Empty till closure defined
179 for (bf = PendingFetches; bf != PrelBase_Z91Z93_closure; bf = next) {
183 * Find the target at the end of the indirection chain, and
184 * process it in much the same fashion as the original target
185 * of the fetch. Though we hope to find graph here, we could
186 * find a black hole (of any flavor) or even a FetchMe.
188 closure = BF_NODE(bf);
189 while (IS_INDIRECTION(INFO_PTR(closure)))
190 closure = (P_) IND_CLOSURE_PTR(closure);
191 ip = (P_) INFO_PTR(closure);
193 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
194 /* Forward the Fetch to someone else */
195 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
196 rga.loc.gc.slot = (int) BF_SLOT(bf);
197 rga.weight = (unsigned) BF_WEIGHT(bf);
199 sendFetch(FETCHME_GA(closure), &rga, 0 /* load */);
200 } else if (IS_BLACK_HOLE(ip)) {
201 BF_NODE(bf) = closure;
202 blockFetch(bf, closure);
204 /* We now have some local graph to send back */
208 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
210 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
211 SAVE_Hp -= PACK_HEAP_REQUIRED;
214 closure = BF_NODE(bf);
215 graph = PackNearbyGraph(closure, &size);
216 ASSERT(graph != NULL);
218 rga.loc.gc.gtid = (GLOBAL_TASK_ID) BF_GTID(bf);
219 rga.loc.gc.slot = (int) BF_SLOT(bf);
220 rga.weight = (unsigned) BF_WEIGHT(bf);
222 sendResume(&rga, size, graph);
225 PendingFetches = PrelBase_Z91Z93_closure;
230 * unpackResume unpacks a Resume message into two Global addresses and
235 unpackResume(globalAddr *lga, int *nelem, W_ *data)
240 lga->weight = (unsigned) buf[0];
241 lga->loc.gc.gtid = mytid;
242 lga->loc.gc.slot = (int) buf[1];
244 *nelem = (int) buf[2];
245 GetArgs(data, *nelem);
249 *SendAck packs the global address being acknowledged, together with
250 *an array of global addresses for any closures shipped and sends them.
254 sendAck(GLOBAL_TASK_ID task, int ngas, globalAddr *gagamap)
260 buffer = (long *) gumPackBuffer;
262 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
263 ASSERT(gagamap[1].weight > 0);
264 p[0] = (long) gagamap->weight;
265 p[1] = (long) gagamap->loc.gc.gtid;
266 p[2] = (long) gagamap->loc.gc.slot;
268 p[3] = (long) gagamap->weight;
269 p[4] = (long) gagamap->loc.gc.gtid;
270 p[5] = (long) gagamap->loc.gc.slot;
274 fprintf(stderr,"Sending Ack (%d pairs) to %x\n", ngas, task);
276 SendOpN(PP_ACK, task, p - buffer, buffer);
281 *unpackAck unpacks an Acknowledgement message into a Global address,
282 *a count of the number of global addresses following and a map of
287 unpackAck(int *ngas, globalAddr *gagamap)
292 GetArgs(&GAarraysize, 1);
294 *ngas = GAarraysize / 6;
296 while (GAarraysize > 0) {
298 gagamap->weight = (unsigned) buf[0];
299 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[1];
300 gagamap->loc.gc.slot = (int) buf[2];
302 gagamap->weight = (unsigned) buf[3];
303 gagamap->loc.gc.gtid = (GLOBAL_TASK_ID) buf[4];
304 gagamap->loc.gc.slot = (int) buf[5];
305 ASSERT(gagamap->weight > 0);
312 *SendFish packs the global address being acknowledged, together with
313 *an array of global addresses for any closures shipped and sends them.
317 sendFish(GLOBAL_TASK_ID destPE, GLOBAL_TASK_ID origPE,
318 int age, int history, int hunger)
322 fprintf(stderr,"Sending Fish to %lx\n", destPE);
324 SendOpV(PP_FISH, destPE, 4, (W_) origPE, (W_) age, (W_) history, (W_) hunger);
331 *unpackFish unpacks a FISH message into the global task id of the
332 *originating PE and 3 data fields: the age, history and hunger of the
333 *fish. The history + hunger are not currently used.
337 unpackFish(GLOBAL_TASK_ID *origPE, int *age, int *history, int *hunger)
343 *origPE = (GLOBAL_TASK_ID) buf[0];
345 *history = (int) buf[2];
346 *hunger = (int) buf[3];
350 *SendFree sends (weight, slot) pairs for GAs that we no longer need references
354 sendFree(GLOBAL_TASK_ID pe, int nelem, StgPtr data)
357 fprintf(stderr, "Sending Free (%d GAs) to %x\n", nelem / 2, pe);
359 SendOpN(PP_FREE, pe, nelem, data);
365 *unpackFree unpacks a FREE message into the amount of data shipped and
370 unpackFree(int *nelem, W_ *data)
375 *nelem = (int) buf[0];
376 GetArgs(data, *nelem);
380 *SendSchedule sends a closure to be evaluated in response to a Fish
381 *message. The message is directed to the PE that originated the Fish
382 *(origPE), and includes the packed closure (data) along with its size
387 sendSchedule(GLOBAL_TASK_ID origPE, int nelem, StgPtr data)
389 #ifdef SCHEDULE_DEBUG
391 fprintf(stderr, "Sending Schedule to %x\n", origPE);
394 SendOpN(PP_SCHEDULE, origPE, nelem, data);
398 *unpackSchedule unpacks a SCHEDULE message into the Global address of
399 *the closure shipped, the amount of data shipped (nelem) and the data
404 unpackSchedule(int *nelem, W_ *data)
409 *nelem = (int) buf[0];
410 GetArgs(data, *nelem);
414 *Message-Processing Functions
416 *The following routines process incoming GUM messages. Often reissuing
417 *messages in response.
419 *processFish unpacks a fish message, reissuing it if it's our own,
420 *sending work if we have it or sending it onwards otherwise.
422 * Only stubs now. Real stuff in HLCommsRest PWT
429 * processFetch either returns the requested data (if available)
430 * or blocks the remote blocking queue on a black hole (if not).
437 * processFree unpacks a FREE message and adds the weights to our GAs.
444 * processResume unpacks a RESUME message into the graph, filling in
445 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
446 * FetchMe (now a blocking queue) are awakened, and the blocking queue
447 * is converted into an indirection. Finally it sends an ACK in response
448 * which contains any newly allocated GAs.
452 processResume(GLOBAL_TASK_ID sender)
456 * processSchedule unpacks a SCHEDULE message into the graph, filling
457 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
458 * the local spark queue. Finally it sends an ACK in response
459 * which contains any newly allocated GAs.
462 processSchedule(GLOBAL_TASK_ID sender)
467 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
468 * (which represent shared thunks that have been shipped) into fetch-mes
476 * GUM Message Processor
478 * processMessages processes any messages that have arrived, calling
479 * appropriate routines depending on the message tag
480 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
481 * present and performs a blocking receive! During profiling it
482 * busy-waits in order to record idle time.
486 processMessages(void)
494 packet = GetPacket(); /* Get next message; block until one available */
496 get_opcode_and_sender(packet, &opcode, &task);
501 stg_exit(EXIT_SUCCESS); /* The computation has been completed by someone
526 processSchedule(task);
530 /* Anything we're not prepared to deal with. */
531 fprintf(stderr, "Task %x: Unexpected opcode %x from %x\n",
532 mytid, opcode, task);
534 stg_exit(EXIT_FAILURE);
537 } while (PacketsWaiting()); /* While there are messages: process them */
538 } /* processMessages */
541 * Miscellaneous Functions
544 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
545 * Important properties:
546 * - it varies during execution, even if the PE is idle
547 * - it's different for each PE
548 * - we never send a fish to ourselves
550 extern long lrand48 (void);
557 temp = lrand48() % nPEs;
558 if (PEs[temp] == mytid) { /* Never send a FISH to yourself */
559 temp = (temp + 1) % nPEs;
565 *WaitForTermination enters a loop ignoring spurious messages while waiting for the
566 *termination sequence to be completed.
569 WaitForTermination(void)
572 PACKET p = GetPacket();
573 ProcessUnexpected(p);
579 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
583 for (i = 0; i < nGAs; ++i, gagamap += 2)
584 fprintf(stderr, "gagamap[%d] = (%x, %d, %x) -> (%x, %d, %x)\n", i,
585 gagamap[0].loc.gc.gtid, gagamap[0].loc.gc.slot, gagamap[0].weight,
586 gagamap[1].loc.gc.gtid, gagamap[1].loc.gc.slot, gagamap[1].weight);
590 static PP_ freeMsgBuffer = NULL;
591 static int *freeMsgIndex = NULL;
594 prepareFreeMsgBuffers(void)
598 /* Allocate the freeMsg buffers just once and then hang onto them. */
600 if (freeMsgIndex == NULL) {
602 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int), "prepareFreeMsgBuffers (Index)");
603 freeMsgBuffer = (PP_) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)");
605 for(i = 0; i < nPEs; i++) {
607 freeMsgBuffer[i] = (P_) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
608 "prepareFreeMsgBuffers (Buffer #i)");
613 /* Initialize the freeMsg buffer pointers to point to the start of their buffers */
614 for (i = 0; i < nPEs; i++)
619 freeRemoteGA(int pe, globalAddr *ga)
623 ASSERT(GALAlookup(ga) == NULL);
625 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
627 fprintf(stderr, "Filled a free message buffer\n");
629 sendFree(ga->loc.gc.gtid, i, freeMsgBuffer[pe]);
632 freeMsgBuffer[pe][i++] = (W_) ga->weight;
633 freeMsgBuffer[pe][i++] = (W_) ga->loc.gc.slot;
634 freeMsgIndex[pe] = i;
636 ga->weight = 0x0f0f0f0f;
637 ga->loc.gc.gtid = 0x666;
638 ga->loc.gc.slot = 0xdeaddead;
643 sendFreeMessages(void)
647 for (i = 0; i < nPEs; i++) {
648 if (freeMsgIndex[i] > 0)
649 sendFree(PEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
653 /* Process messaging code ripped out for the time being -- SDM & PWT */
656 /* These are the remaining message-processing functions from HLComms*/
660 *Message-Processing Functions
662 *The following routines process incoming GUM messages. Often reissuing
663 *messages in response.
665 *processFish unpacks a fish message, reissuing it if it's our own,
666 *sending work if we have it or sending it onwards otherwise.
671 GLOBAL_TASK_ID origPE;
672 int age, history, hunger;
674 unpackFish(&origPE, &age, &history, &hunger);
676 if (origPE == mytid) {
681 while ((spark = FindLocalSpark(rtsTrue)) != NULL) {
685 if ((graph = PackNearbyGraph(spark, &size)) == NULL) {
686 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
687 SAVE_Hp -= PACK_HEAP_REQUIRED;
688 /* Now go back and try again */
690 sendSchedule(origPE, size, graph);
696 /* We have no sparks to give */
697 if (age < FISH_LIFE_EXPECTANCY)
698 sendFish(choosePE(), origPE,
699 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
701 /* Send it home to die */
703 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
709 *processFetch either returns the requested data (if available)
710 *or blocks the remote blocking queue on a black hole (if not).
721 unpackFetch(&ga, &rga, &load);
723 fprintf(stderr, "Rcvd Fetch for (%x, %d, 0), Resume (%x, %d, %x) (load %d) \n",
724 ga.loc.gc.gtid, ga.loc.gc.slot,
725 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, load);
728 closure = GALAlookup(&ga);
729 ip = (P_) INFO_PTR(closure);
731 if (INFO_TYPE(ip) == INFO_FETCHME_TYPE) {
732 /* Forward the Fetch to someone else */
733 sendFetch(FETCHME_GA(closure), &rga, load);
734 } else if (rga.loc.gc.gtid == mytid) {
735 /* Our own FETCH forwarded back around to us */
736 P_ fmbq = GALAlookup(&rga);
738 /* We may have already discovered that the fetch target is our own. */
740 CommonUp(fmbq, closure);
741 (void) addWeight(&rga);
742 } else if (IS_BLACK_HOLE(ip)) {
743 /* This includes RBH's and FMBQ's */
746 if ((bf = AllocateHeap(FIXED_HS + BF_CLOSURE_SIZE(dummy))) == NULL) {
747 ReallyPerformThreadGC(FIXED_HS + BF_CLOSURE_SIZE(dummy), rtsFalse);
748 closure = GALAlookup(&ga);
749 bf = SAVE_Hp - (FIXED_HS + BF_CLOSURE_SIZE(dummy)) + 1;
751 ASSERT(GALAlookup(&rga) == NULL);
753 SET_BF_HDR(bf, BF_info, bogosity);
754 BF_NODE(bf) = closure;
755 BF_GTID(bf) = (W_) rga.loc.gc.gtid;
756 BF_SLOT(bf) = (W_) rga.loc.gc.slot;
757 BF_WEIGHT(bf) = (W_) rga.weight;
758 blockFetch(bf, closure);
761 fprintf(stderr, "Blocking Fetch (%x, %d, %x) on %#lx\n",
762 rga.loc.gc.gtid, rga.loc.gc.slot, rga.weight, closure);
766 /* The target of the FetchMe is some local graph */
770 if ((graph = PackNearbyGraph(closure, &size)) == NULL) {
771 ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
772 SAVE_Hp -= PACK_HEAP_REQUIRED;
773 closure = GALAlookup(&ga);
774 graph = PackNearbyGraph(closure, &size);
775 ASSERT(graph != NULL);
777 sendResume(&rga, size, graph);
782 *processFree unpacks a FREE message and adds the weights to our GAs.
788 static W_ *freeBuffer;
792 freeBuffer = gumPackBuffer;
793 unpackFree(&nelem, freeBuffer);
795 fprintf(stderr, "Rcvd Free (%d GAs)\n", nelem / 2);
797 ga.loc.gc.gtid = mytid;
798 for (i = 0; i < nelem;) {
799 ga.weight = (unsigned) freeBuffer[i++];
800 ga.loc.gc.slot = (int) freeBuffer[i++];
802 fprintf(stderr,"Processing free (%x, %d, %x)\n", ga.loc.gc.gtid,
803 ga.loc.gc.slot, ga.weight);
805 (void) addWeight(&ga);
810 *processResume unpacks a RESUME message into the graph, filling in
811 *the LA -> GA, and GA -> LA tables. Threads blocked on the original
812 *FetchMe (now a blocking queue) are awakened, and the blocking queue
813 *is converted into an indirection. Finally it sends an ACK in response
814 *which contains any newly allocated GAs.
818 processResume(GLOBAL_TASK_ID sender)
822 static W_ *packBuffer;
828 packBuffer = gumPackBuffer;
829 unpackResume(&lga, &nelem, packBuffer);
832 fprintf(stderr, "Rcvd Resume for (%x, %d, %x)\n",
833 lga.loc.gc.gtid, lga.loc.gc.slot, lga.weight);
834 PrintPacket(packBuffer);
838 * We always unpack the incoming graph, even if we've received the
839 * requested node in some other data packet (and already awakened
840 * the blocking queue).
842 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
843 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
844 SAVE_Hp -= packBuffer[0];
847 /* Do this *after* GC; we don't want to release the object early! */
850 (void) addWeight(&lga);
852 old = GALAlookup(&lga);
854 if (RtsFlags.ParFlags.granSimStats) {
857 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE) {
858 for(tso = (P_) FMBQ_ENTRIES(old);
859 TSO_LINK(tso) != PrelBase_Z91Z93_closure;
863 /* DumpGranEventAndNode(GR_REPLY, tso, old, taskIDtoPE(sender)); */
864 DumpRawGranEvent(CURRENT_PROC,taskIDtoPE(sender),GR_REPLY,
868 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
869 ASSERT(newGraph != NULL);
872 * Sometimes, unpacking will common up the resumee with the
873 * incoming graph, but if it hasn't, we'd better do so now.
876 if (INFO_TYPE(INFO_PTR(old)) == INFO_FMBQ_TYPE)
877 CommonUp(old, newGraph);
880 DebugPrintGAGAMap(gagamap, nGAs);
883 sendAck(sender, nGAs, gagamap);
887 *processSchedule unpacks a SCHEDULE message into the graph, filling
888 *in the LA -> GA, and GA -> LA tables. The root of the graph is added to
889 *the local spark queue. Finally it sends an ACK in response
890 *which contains any newly allocated GAs.
893 processSchedule(GLOBAL_TASK_ID sender)
898 static W_ *packBuffer;
903 packBuffer = gumPackBuffer; /* HWL */
904 unpackSchedule(&nelem, packBuffer);
906 #ifdef SCHEDULE_DEBUG
907 fprintf(stderr, "Rcvd Schedule\n");
908 PrintPacket(packBuffer);
912 * For now, the graph is a closure to be sparked as an advisory
913 * spark, but in future it may be a complete spark with
914 * required/advisory status, priority etc.
917 space_required = packBuffer[0];
918 if (SAVE_Hp + space_required >= SAVE_HpLim) {
919 ReallyPerformThreadGC(space_required, rtsFalse);
920 SAVE_Hp -= space_required;
922 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
923 ASSERT(newGraph != NULL);
924 success = Spark(newGraph, rtsFalse);
927 #ifdef SCHEDULE_DEBUG
928 DebugPrintGAGAMap(gagamap, nGAs);
932 sendAck(sender, nGAs, gagamap);
938 *processAck unpacks an ACK, and uses the GAGA map to convert RBH's
939 *(which represent shared thunks that have been shipped) into fetch-mes
948 globalAddr gagamap[MAX_GAS * 2];
950 unpackAck(&nGAs, gagamap);
953 fprintf(stderr, "Rcvd Ack (%d pairs)\n", nGAs);
954 DebugPrintGAGAMap(gagamap, nGAs);
958 * For each (oldGA, newGA) pair, set the GA of the corresponding
959 * thunk to the newGA, convert the thunk to a FetchMe, and return
960 * the weight from the oldGA.
962 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
963 P_ old = GALAlookup(gaga);
964 P_ new = GALAlookup(gaga + 1);
967 /* We don't have this closure, so we make a fetchme for it */
968 globalAddr *ga = setRemoteGA(old, gaga + 1, rtsTrue);
970 convertToFetchMe(old, ga);
973 * Oops...we've got this one already; update the RBH to
974 * point to the object we already know about, whatever it
980 * Increase the weight of the object by the amount just
981 * received in the second part of the ACK pair.
983 (void) addWeight(gaga + 1);
985 (void) addWeight(gaga);
991 #endif /* PAR -- whole file */