1 /* ----------------------------------------------------------------------------
2 * Time-stamp: <Wed Jan 12 2000 13:32:25 Stardate: [-30]4193.86 hwloidl>
3 * $Id: HLComms.c,v 1.2 2000/01/13 14:34:07 hwloidl Exp $
5 * High Level Communications Routines (HLComms.lc)
7 * Contains the high-level routines (i.e. communication
8 * subsystem independent) used by GUM
10 * Phil Trinder, Glasgow University, 12 December 1994
12 * Phil Trinder, Simon Marlow July 1998
13 * H-W. Loidl, Heriot-Watt University, November 1999
15 * ------------------------------------------------------------------------- */
17 #ifdef PAR /* whole file */
19 //@node High Level Communications Routines, , ,
20 //@section High Level Communications Routines
25 //* GUM Message Sending and Unpacking Functions::
26 //* Message-Processing Functions::
27 //* GUM Message Processor::
28 //* Miscellaneous Functions::
32 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
33 //@subsection Macros etc
36 # define NON_POSIX_SOURCE /* so says Solaris */
39 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
40 //@subsection Includes
45 #include "Storage.h" // for recordMutable
48 #include "GranSimRts.h"
49 #include "ParallelRts.h"
50 #include "FetchMe.h" // for BLOCKED_FETCH_info etc
52 # include "ParallelDebug.h"
54 #include "StgMacros.h" // inlined IS_... fcts
56 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
57 //@subsection GUM Message Sending and Unpacking Functions
60 * GUM Message Sending and Unpacking Functions
64 * Allocate space for message processing
67 //@cindex gumPackBuffer
68 static rtsPackBuffer *gumPackBuffer;
70 //@cindex initMoreBuffers
74 if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize,
75 "initMoreBuffers")) == NULL)
81 * SendFetch packs the two global addresses and a load into a message +
86 Structure of a FETCH message:
89 +------------------------------------+------+
90 | gtid | slot | weight | gtid | slot | load |
91 +------------------------------------+------+
96 sendFetch(globalAddr *rga, globalAddr *lga, int load)
98 ASSERT(rga->weight > 0 && lga->weight > 0);
100 belch("** [%x] Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
102 rga->payload.gc.gtid, rga->payload.gc.slot,
103 lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
108 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid),
109 GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
110 0, spark_queue_len(ADVISORY_POOL));
113 sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
114 (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot,
115 (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid,
116 (StgWord) lga->payload.gc.slot, (StgWord) load);
120 * unpackFetch unpacks a FETCH message into two Global addresses and a load
124 //@cindex unpackFetch
126 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
133 belch("** [%x] Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
135 (GlobalTaskId) buf[0], (int) buf[1],
136 (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
139 lga->payload.gc.gtid = (GlobalTaskId) buf[0];
140 lga->payload.gc.slot = (int) buf[1];
142 rga->weight = (unsigned) buf[2];
143 rga->payload.gc.gtid = (GlobalTaskId) buf[3];
144 rga->payload.gc.slot = (int) buf[4];
146 *load = (int) buf[5];
148 ASSERT(rga->weight > 0);
152 * SendResume packs the remote blocking queue's GA and data into a message
157 Structure of a RESUME message:
159 -------------------------------
160 | weight | slot | n | data ...
161 -------------------------------
163 data is a packed graph represented as an rtsPackBuffer
164 n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
169 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
173 belch("[] [%x] Sending Resume for ((%x, %d, %x))",
175 rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight));
177 sendOpNV(PP_RESUME, rga->payload.gc.gtid,
178 nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data,
179 2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
183 * unpackResume unpacks a Resume message into two Global addresses and
187 //@cindex unpackResume
189 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
196 belch("[] [%x] Unpacking Resume for ((%x, %d, %x))",
198 (int) buf[1], (unsigned) buf[0]));
201 RESUME event is written in awaken_blocked_queue
202 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
203 GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
206 lga->weight = (unsigned) buf[0];
207 lga->payload.gc.gtid = mytid;
208 lga->payload.gc.slot = (int) buf[1];
210 *nelem = (int) buf[2]; // includes PACK_BUFFER_HDR_SIZE;
211 GetArgs(data, *nelem);
212 *nelem -= PACK_BUFFER_HDR_SIZE;
216 * SendAck packs the global address being acknowledged, together with
217 * an array of global addresses for any closures shipped and sends them.
221 Structure of an ACK message:
224 +---------------------------------------------+-------
225 | weight | gtid | slot | weight | gtid | slot | ..... ngas times
226 + --------------------------------------------+-------
232 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
238 buffer = (long *) gumPackBuffer;
240 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
241 ASSERT(gagamap[1].weight > 0);
242 p[0] = (long) gagamap->weight;
243 p[1] = (long) gagamap->payload.gc.gtid;
244 p[2] = (long) gagamap->payload.gc.slot;
246 p[3] = (long) gagamap->weight;
247 p[4] = (long) gagamap->payload.gc.gtid;
248 p[5] = (long) gagamap->payload.gc.slot;
252 belch(",, [%x] Sending Ack (%d pairs) to PE %x\n",
255 sendOpN(PP_ACK, task, p - buffer, buffer);
259 * unpackAck unpacks an Acknowledgement message into a Global address,
260 * a count of the number of global addresses following and a map of
266 unpackAck(int *ngas, globalAddr *gagamap)
271 GetArgs(&GAarraysize, 1);
273 *ngas = GAarraysize / 6;
276 belch(",, [%x] Unpacking Ack (%d pairs) on %x\n",
277 mytid, *ngas, mytid));
279 while (GAarraysize > 0) {
281 gagamap->weight = (rtsWeight) buf[0];
282 gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
283 gagamap->payload.gc.slot = (int) buf[2];
285 gagamap->weight = (rtsWeight) buf[3];
286 gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
287 gagamap->payload.gc.slot = (int) buf[5];
288 ASSERT(gagamap->weight > 0);
295 * SendFish packs the global address being acknowledged, together with
296 * an array of global addresses for any closures shipped and sends them.
300 Structure of a FISH message:
302 +----------------------------------+
303 | orig PE | age | history | hunger |
304 +----------------------------------+
309 sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
310 int age, int history, int hunger)
313 belch("$$ [%x] Sending Fish to %x (%d outstanding fishes)",
314 mytid, destPE, outstandingFishes));
316 sendOpV(PP_FISH, destPE, 4,
317 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
319 if (origPE == mytid) {
326 * unpackFish unpacks a FISH message into the global task id of the
327 * originating PE and 3 data fields: the age, history and hunger of the
328 * fish. The history + hunger are not currently used.
334 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
341 belch("$$ [%x] Unpacking Fish from PE %x (age=%d)",
342 mytid, (GlobalTaskId) buf[0], (int) buf[1]));
344 *origPE = (GlobalTaskId) buf[0];
346 *history = (int) buf[2];
347 *hunger = (int) buf[3];
351 * SendFree sends (weight, slot) pairs for GAs that we no longer need
356 Structure of a FREE message:
358 +-----------------------------
359 | n | weight_1 | slot_1 | ...
360 +-----------------------------
364 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
367 belch("!! [%x] Sending Free (%d GAs) to %x",
368 mytid, nelem/2, pe));
370 sendOpN(PP_FREE, pe, nelem, data);
374 * unpackFree unpacks a FREE message into the amount of data shipped and
379 unpackFree(int *nelem, rtsPackBuffer *data)
384 *nelem = (int) buf[0];
387 belch("!! [%x] Unpacking Free (%d GAs)",
390 GetArgs(data, *nelem);
394 * SendSchedule sends a closure to be evaluated in response to a Fish
395 * message. The message is directed to the PE that originated the Fish
396 * (origPE), and includes the packed closure (data) along with its size
401 Structure of a SCHEDULE message:
403 +------------------------------------
404 | PE | n | pack buffer of a graph ...
405 +------------------------------------
407 //@cindex sendSchedule
409 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data)
411 IF_PAR_DEBUG(schedule,
413 belch("-- [%x] Sending Schedule (%d elems) to %x\n",
414 mytid, nelem, origPE));
416 sendOpN(PP_SCHEDULE, origPE, nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data);
420 * unpackSchedule unpacks a SCHEDULE message into the Global address of
421 * the closure shipped, the amount of data shipped (nelem) and the data
425 //@cindex unpackSchedule
427 unpackSchedule(int *nelem, rtsPackBuffer *data)
432 /* no. of elems, not counting the header of the pack buffer */
433 *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE;
435 IF_PAR_DEBUG(schedule,
436 belch("-- [%x] Unpacking Schedule (%d elems) on %x\n",
439 /* automatic cast of flat pvm-data to rtsPackBuffer */
440 GetArgs(data, *nelem + PACK_BUFFER_HDR_SIZE);
443 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
444 //@subsection Message-Processing Functions
447 * Message-Processing Functions
449 * The following routines process incoming GUM messages. Often reissuing
450 * messages in response.
452 * processFish unpacks a fish message, reissuing it if it's our own,
453 * sending work if we have it or sending it onwards otherwise.
457 * blockFetch blocks a BlockedFetch node on some kind of black hole.
461 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
463 switch (get_itbl(bh)->type) {
465 bf->link = END_BQ_QUEUE;
466 //((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
467 SET_INFO(bh, &BLACKHOLE_BQ_info); // turn closure into a blocking queue
468 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
470 // put bh on the mutables list
471 recordMutable((StgMutClosure *)bh);
475 * If we modify a black hole in the old generation, we have to
476 * make sure it goes on the mutables list
479 if (bh <= StorageMgrInfo.OldLim) {
480 MUT_LINK(bh) = (StgWord) StorageMgrInfo.OldMutables;
481 StorageMgrInfo.OldMutables = bh;
483 MUT_LINK(bh) = MUT_NOT_LINKED;
488 /* enqueue bf on blocking queue of closure bh */
489 bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
490 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
492 // put bh on the mutables list; ToDo: check
493 recordMutable((StgMutClosure *)bh);
497 /* enqueue bf on blocking queue of closure bh */
498 bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
499 ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
501 // put bh on the mutables list; ToDo: check
502 recordMutable((StgMutClosure *)bh);
506 /* enqueue bf on blocking queue of closure bh */
507 bf->link = ((StgRBH *)bh)->blocking_queue;
508 ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
510 // put bh on the mutables list; ToDo: check
511 recordMutable((StgMutClosure *)bh);
515 barf("Panic (blockFetch): thought %p was a black hole (IP %#lx, %s)",
516 (StgClosure *)bh, get_itbl((StgClosure *)bh),
517 info_type((StgClosure *)bh));
519 IF_PAR_DEBUG(verbose,
520 belch("## blockFetch: after block the BQ of %p (%s) is:",
527 * processFetches constructs and sends resume messages for every
528 * BlockedFetch which is ready to be awakened.
529 * awaken_blocked_queue (in Schedule.c) is responsible for moving
530 * BlockedFetches from a blocking queue to the PendingFetches queue.
533 extern StgBlockedFetch *PendingFetches;
536 pending_fetches_len(void)
541 for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
542 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
547 //@cindex processFetches
549 processFetches(void) {
551 StgClosure *closure, *next;
554 static rtsPackBuffer *packBuffer;
556 IF_PAR_DEBUG(verbose,
557 belch("__ processFetches: %d pending fetches",
558 pending_fetches_len()));
560 for (bf = PendingFetches;
562 bf=(StgBlockedFetch *)(bf->link)) {
563 /* the PendingFetches list contains only BLOCKED_FETCH closures */
564 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
567 * Find the target at the end of the indirection chain, and
568 * process it in much the same fashion as the original target
569 * of the fetch. Though we hope to find graph here, we could
570 * find a black hole (of any flavor) or even a FetchMe.
574 HACK 312: bf->node may have been evacuated since filling it; follow
575 the evacuee in this case; the proper way to handle this is to
576 traverse the blocking queue and update the node fields of
577 BLOCKED_FETCH entries when evacuating an BLACKHOLE_BQ, FETCH_ME_BQ
578 or RBH (but it's late and I'm tired)
580 if (get_itbl(closure)->type == EVACUATED)
581 closure = ((StgEvacuated *)closure)->evacuee;
583 while ((next = IS_INDIRECTION(closure)) != NULL) { closure = next; }
585 ip = get_itbl(closure);
586 if (ip->type == FETCH_ME) {
587 /* Forward the Fetch to someone else */
588 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
589 rga.payload.gc.slot = bf->ga.payload.gc.slot;
590 rga.weight = bf->ga.weight;
592 sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
594 IF_PAR_DEBUG(forward,
595 belch("__ processFetches: Forwarding fetch from %lx to %lx",
596 mytid, rga.payload.gc.gtid));
598 } else if (IS_BLACK_HOLE(closure)) {
599 IF_PAR_DEBUG(verbose,
600 belch("__ processFetches: trying to send a BLACK_HOLE => doign a blockFetch on closure %p (%s)",
601 closure, info_type(closure)));
603 blockFetch(bf, closure);
605 /* We now have some local graph to send back */
608 packBuffer = gumPackBuffer;
609 IF_PAR_DEBUG(verbose,
610 belch("__ processFetches: PackNearbyGraph of closure %p (%s)",
611 closure, info_type(closure)));
613 if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
614 // Put current BF back on list
615 bf->link = (StgBlockingQueueElement *)PendingFetches;
616 PendingFetches = (StgBlockedFetch *)bf;
617 // ToDo: check that nothing more has to be done to prepare for GC!
618 GarbageCollect(GetRoots);
620 PendingFetches = (StgBlockedFetch *)(bf->link);
622 packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
623 ASSERT(packBuffer != (rtsPackBuffer *)NULL);
625 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
626 rga.payload.gc.slot = bf->ga.payload.gc.slot;
627 rga.weight = bf->ga.weight;
629 sendResume(&rga, size, packBuffer);
632 PendingFetches = END_BF_QUEUE;
637 Alternatively to sending fetch messages directly from the FETCH_ME_entry
638 code we could just store the data about the remote data in a global
639 variable and send the fetch request from the main scheduling loop (similar
640 to processFetches above). This would save an expensive STGCALL in the entry
641 code because we have to go back to the scheduler anyway.
643 //@cindex processFetches
645 processTheRealFetches(void) {
647 StgClosure *closure, *next;
649 IF_PAR_DEBUG(verbose,
650 belch("__ processTheRealFetches: ");
651 printGA(&theGlobalFromGA);
652 printGA(&theGlobalToGA));
654 ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
655 theGlobalToGA.payload.gc.gtid != 0);
657 /* the old version did this in the FETCH_ME entry code */
658 sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
661 theGlobalFromGA.payload.gc.gtid = 0;
662 theGlobalToGA.payload.gc.gtid = 0;
669 * processFish unpacks a fish message, reissuing it if it's our own,
670 * sending work if we have it or sending it onwards otherwise.
672 //@cindex processFish
677 int age, history, hunger;
679 static rtsPackBuffer *packBuffer;
681 unpackFish(&origPE, &age, &history, &hunger);
683 if (origPE == mytid) {
684 //fishing = rtsFalse; // fish has come home
686 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
687 return; // that's all
690 ASSERT(origPE != mytid);
692 belch("$$ [%x] processing fish; %d sparks available",
693 mytid, spark_queue_len(ADVISORY_POOL)));
694 while ((spark = findLocalSpark(rtsTrue)) != NULL) {
696 // StgClosure *graph;
698 packBuffer = gumPackBuffer;
699 ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
700 if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size)) == NULL) {
702 belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
703 (StgClosure *)spark));
704 GarbageCollect(GetRoots);
705 /* Now go back and try again */
708 belch("$$ [%x] Replying to FISH from %x by sending graph @ %p (%s)",
710 (StgClosure *)spark, info_type((StgClosure *)spark)));
711 sendSchedule(origPE, size, packBuffer);
716 if (spark == (rtsSpark)NULL) {
718 belch("$$ [%x] No sparks available for FISH from %x",
720 /* We have no sparks to give */
721 if (age < FISH_LIFE_EXPECTANCY)
722 /* and the fish is atill young, send it to another PE to look for work */
723 sendFish(choosePE(), origPE,
724 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
726 /* otherwise, send it home to die */
728 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
733 * processFetch either returns the requested data (if available)
734 * or blocks the remote blocking queue on a black hole (if not).
737 //@cindex processFetch
746 unpackFetch(&ga, &rga, &load);
748 belch("%% [%x] Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
750 ga.payload.gc.gtid, ga.payload.gc.slot,
751 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
752 rga.payload.gc.gtid));
754 closure = GALAlookup(&ga);
755 ASSERT(closure != (StgClosure *)NULL);
756 ip = get_itbl(closure);
757 if (ip->type == FETCH_ME) {
758 /* Forward the Fetch to someone else */
759 sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
760 } else if (rga.payload.gc.gtid == mytid) {
761 /* Our own FETCH forwarded back around to us */
762 StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
765 belch("%% [%x] Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
766 mytid, closure, info_type(closure), fmbq, info_type(fmbq)));
767 /* We may have already discovered that the fetch target is our own. */
768 if ((StgClosure *)fmbq != closure)
769 CommonUp((StgClosure *)fmbq, closure);
770 (void) addWeight(&rga);
771 } else if (IS_BLACK_HOLE(closure)) {
772 /* This includes RBH's and FMBQ's */
775 ASSERT(GALAlookup(&rga) == NULL);
777 /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
778 closure into the BQ in order to denote that when updating this node
779 the result should be sent to the originator of this fetch message. */
780 bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
781 blockFetch(bf, closure);
784 belch("%% [%x] Blocking Fetch ((%x, %d, %x)) on %p (%s)",
786 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
787 closure, info_type(closure)));
789 /* The target of the FetchMe is some local graph */
791 // StgClosure *graph;
792 rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
794 if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
795 GarbageCollect(GetRoots);
796 closure = GALAlookup(&ga);
797 buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
798 ASSERT(buffer != (rtsPackBuffer *)NULL);
800 sendResume(&rga, size, buffer);
805 * processFree unpacks a FREE message and adds the weights to our GAs.
807 //@cindex processFree
812 static StgWord *buffer;
816 buffer = (StgWord *)gumPackBuffer;
817 unpackFree(&nelem, buffer);
819 belch("!! [%x] Rcvd Free (%d GAs)", mytid, nelem / 2));
821 ga.payload.gc.gtid = mytid;
822 for (i = 0; i < nelem;) {
823 ga.weight = (rtsWeight) buffer[i++];
824 ga.payload.gc.slot = (int) buffer[i++];
826 fprintf(stderr, "!! [%x] Processing free ", mytid);
830 (void) addWeight(&ga);
835 * processResume unpacks a RESUME message into the graph, filling in
836 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
837 * FetchMe (now a blocking queue) are awakened, and the blocking queue
838 * is converted into an indirection. Finally it sends an ACK in response
839 * which contains any newly allocated GAs.
842 //@cindex processResume
844 processResume(GlobalTaskId sender)
848 static rtsPackBuffer *packBuffer;
849 StgClosure *newGraph, *old;
853 packBuffer = gumPackBuffer;
854 unpackResume(&lga, &nelem, (StgPtr)packBuffer);
857 fprintf(stderr, "[] [%x] Rcvd Resume for ", mytid);
860 PrintPacket((rtsPackBuffer *)packBuffer));
863 * We always unpack the incoming graph, even if we've received the
864 * requested node in some other data packet (and already awakened
865 * the blocking queue).
866 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
867 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
868 SAVE_Hp -= packBuffer[0];
872 // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
874 /* Do this *after* GC; we don't want to release the object early! */
877 (void) addWeight(&lga);
879 old = GALAlookup(&lga);
881 if (RtsFlags.ParFlags.ParStats.Full) {
882 // StgTSO *tso = END_TSO_QUEUE;
883 StgBlockingQueueElement *bqe;
885 /* Write REPLY events to the log file, indicating that the remote
887 if (get_itbl(old)->type == FETCH_ME_BQ ||
888 get_itbl(old)->type == RBH)
889 for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue;
890 bqe->link != END_BQ_QUEUE;
892 if (get_itbl((StgClosure *)bqe)->type == TSO)
893 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
894 GR_REPLY, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
895 0, spark_queue_len(ADVISORY_POOL));
898 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
899 ASSERT(newGraph != NULL);
902 * Sometimes, unpacking will common up the resumee with the
903 * incoming graph, but if it hasn't, we'd better do so now.
906 if (get_itbl(old)->type == FETCH_ME_BQ)
907 CommonUp(old, newGraph);
910 DebugPrintGAGAMap(gagamap, nGAs));
912 sendAck(sender, nGAs, gagamap);
916 * processSchedule unpacks a SCHEDULE message into the graph, filling
917 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
918 * the local spark queue. Finally it sends an ACK in response
919 * which contains any newly allocated GAs.
921 //@cindex processSchedule
923 processSchedule(GlobalTaskId sender)
925 nat nelem, space_required, nGAs;
927 static rtsPackBuffer *packBuffer;
928 StgClosure *newGraph;
931 packBuffer = gumPackBuffer; /* HWL */
932 unpackSchedule(&nelem, packBuffer);
934 IF_PAR_DEBUG(schedule,
935 belch("-- [%x] Rcvd Schedule (%d elems)", mytid, nelem);
936 PrintPacket(packBuffer));
939 * For now, the graph is a closure to be sparked as an advisory
940 * spark, but in future it may be a complete spark with
941 * required/advisory status, priority etc.
945 space_required = packBuffer[0];
946 if (SAVE_Hp + space_required >= SAVE_HpLim) {
947 ReallyPerformThreadGC(space_required, rtsFalse);
948 SAVE_Hp -= space_required;
951 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!1
952 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
953 ASSERT(newGraph != NULL);
954 success = add_to_spark_queue(newGraph, rtsFalse);
958 belch("+* added spark to unpacked graph %p; %d sparks available on [%x]",
959 newGraph, spark_queue_len(ADVISORY_POOL), mytid);
961 belch("+* received non-sparkable closure %p; nothing added to spark pool; %d sparks available on [%x]",
962 newGraph, spark_queue_len(ADVISORY_POOL), mytid);
963 belch("-* Unpacked graph with root at %p (%s):",
964 newGraph, info_type(newGraph));
965 PrintGraph(newGraph, 0));
968 DebugPrintGAGAMap(gagamap, nGAs));
971 sendAck(sender, nGAs, gagamap);
973 //fishing = rtsFalse;
974 ASSERT(outstandingFishes>0);
979 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
980 * (which represent shared thunks that have been shipped) into fetch-mes
989 globalAddr gagamap[256]; // ToDo: elim magic constant!! MAX_GAS * 2];??
991 unpackAck(&nGAs, gagamap);
994 belch(",, [%x] Rcvd Ack (%d pairs)", mytid, nGAs);
995 DebugPrintGAGAMap(gagamap, nGAs));
998 * For each (oldGA, newGA) pair, set the GA of the corresponding
999 * thunk to the newGA, convert the thunk to a FetchMe, and return
1000 * the weight from the oldGA.
1002 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1003 StgClosure *old_closure = GALAlookup(gaga);
1004 StgClosure *new_closure = GALAlookup(gaga + 1);
1006 ASSERT(old_closure != NULL);
1007 if (new_closure == NULL) {
1008 /* We don't have this closure, so we make a fetchme for it */
1009 globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1011 /* convertToFetchMe should be done unconditionally here.
1012 Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1013 so we have to check whether it is an RBH before converting
1015 ASSERT(get_itbl(old_closure)==RBH);
1017 if (get_itbl(old_closure)->type==RBH)
1018 convertToFetchMe(old_closure, ga);
1021 * Oops...we've got this one already; update the RBH to
1022 * point to the object we already know about, whatever it
1025 CommonUp(old_closure, new_closure);
1028 * Increase the weight of the object by the amount just
1029 * received in the second part of the ACK pair.
1031 (void) addWeight(gaga + 1);
1033 (void) addWeight(gaga);
1037 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1038 //@subsection GUM Message Processor
1041 * GUM Message Processor
1043 * processMessages processes any messages that have arrived, calling
1044 * appropriate routines depending on the message tag
1045 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1046 * present and performs a blocking receive! During profiling it
1047 * busy-waits in order to record idle time.
1050 //@cindex processMessages
1052 processMessages(void)
1059 packet = GetPacket(); /* Get next message; block until one available */
1060 getOpcodeAndSender(packet, &opcode, &task);
1064 IF_PAR_DEBUG(verbose,
1065 belch("== [%x] received FINISH", mytid));
1066 /* setting this global variables eventually terminates the main
1067 scheduling loop for this PE and causes a shut-down, sending
1068 PP_FINISH to SysMan */
1069 GlobalStopPending = rtsTrue;
1077 processResume(task);
1093 processSchedule(task);
1097 /* Anything we're not prepared to deal with. */
1098 barf("Task %x: Unexpected opcode %x from %x",
1099 mytid, opcode, task);
1102 } while (PacketsWaiting()); /* While there are messages: process them */
1103 } /* processMessages */
1105 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1106 //@subsection Miscellaneous Functions
1109 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1110 * Important properties:
1111 * - it varies during execution, even if the PE is idle
1112 * - it's different for each PE
1113 * - we never send a fish to ourselves
1115 extern long lrand48 (void);
1123 temp = lrand48() % nPEs;
1124 if (allPEs[temp] == mytid) { /* Never send a FISH to yourself */
1125 temp = (temp + 1) % nPEs;
1127 return allPEs[temp];
1131 * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1132 * of the ga argument; called from processFetch when the local closure is
1135 //@cindex createBlockedFetch
1137 createBlockedFetch (globalAddr ga, globalAddr rga)
1139 StgBlockedFetch *bf;
1140 StgClosure *closure;
1142 closure = GALAlookup(&ga);
1143 if ((bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch))) == NULL) {
1144 GarbageCollect(GetRoots);
1145 closure = GALAlookup(&ga);
1146 bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch));
1147 // ToDo: check whether really guaranteed to succeed 2nd time around
1150 ASSERT(bf != (StgClosure *)NULL);
1151 SET_INFO((StgClosure *)bf, &BLOCKED_FETCH_info);
1152 // ToDo: check whether other header info is needed
1154 bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1155 bf->ga.payload.gc.slot = rga.payload.gc.slot;
1156 bf->ga.weight = rga.weight;
1157 // bf->link = NULL; debugging
1160 fprintf(stderr, "%% [%x] created BF: closure=%p (%s), GA: ",
1161 mytid, closure, info_type(closure));
1163 fputc('\n',stderr));
1168 * waitForTermination enters a loop ignoring spurious messages while
1169 * waiting for the termination sequence to be completed.
1171 //@cindex waitForTermination
1173 waitForTermination(void)
1176 rtsPacket p = GetPacket();
1177 processUnexpected(p);
1182 //@cindex DebugPrintGAGAMap
1184 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1188 for (i = 0; i < nGAs; ++i, gagamap += 2)
1189 fprintf(stderr, "gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1190 gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1191 gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1195 //@cindex freeMsgBuffer
1196 static StgWord **freeMsgBuffer = NULL;
1197 //@cindex freeMsgIndex
1198 static int *freeMsgIndex = NULL;
1200 //@cindex prepareFreeMsgBuffers
1202 prepareFreeMsgBuffers(void)
1206 /* Allocate the freeMsg buffers just once and then hang onto them. */
1207 if (freeMsgIndex == NULL) {
1208 freeMsgIndex = (int *) stgMallocBytes(nPEs * sizeof(int),
1209 "prepareFreeMsgBuffers (Index)");
1210 freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *),
1211 "prepareFreeMsgBuffers (Buffer)");
1213 for(i = 0; i < nPEs; i++)
1215 freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1216 "prepareFreeMsgBuffers (Buffer #i)");
1219 /* Initialize the freeMsg buffer pointers to point to the start of their
1221 for (i = 0; i < nPEs; i++)
1222 freeMsgIndex[i] = 0;
1225 //@cindex freeRemoteGA
1227 freeRemoteGA(int pe, globalAddr *ga)
1231 ASSERT(GALAlookup(ga) == NULL);
1233 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1235 belch("Filled a free message buffer (sending remaining messages indivisually)"));
1237 sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1240 freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1241 freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1242 freeMsgIndex[pe] = i;
1245 ga->weight = 0x0f0f0f0f;
1246 ga->payload.gc.gtid = 0x666;
1247 ga->payload.gc.slot = 0xdeaddead;
1251 //@cindex sendFreeMessages
1253 sendFreeMessages(void)
1257 for (i = 0; i < nPEs; i++)
1258 if (freeMsgIndex[i] > 0)
1259 sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1262 #endif /* PAR -- whole file */
1264 //@node Index, , Miscellaneous Functions, High Level Communications Routines
1268 //* ACK:: @cindex\s-+ACK
1269 //* DebugPrintGAGAMap:: @cindex\s-+DebugPrintGAGAMap
1270 //* FETCH:: @cindex\s-+FETCH
1271 //* FISH:: @cindex\s-+FISH
1272 //* FREE:: @cindex\s-+FREE
1273 //* RESUME:: @cindex\s-+RESUME
1274 //* SCHEDULE:: @cindex\s-+SCHEDULE
1275 //* blockFetch:: @cindex\s-+blockFetch
1276 //* choosePE:: @cindex\s-+choosePE
1277 //* freeMsgBuffer:: @cindex\s-+freeMsgBuffer
1278 //* freeMsgIndex:: @cindex\s-+freeMsgIndex
1279 //* freeRemoteGA:: @cindex\s-+freeRemoteGA
1280 //* gumPackBuffer:: @cindex\s-+gumPackBuffer
1281 //* initMoreBuffers:: @cindex\s-+initMoreBuffers
1282 //* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers
1283 //* processAck:: @cindex\s-+processAck
1284 //* processFetch:: @cindex\s-+processFetch
1285 //* processFetches:: @cindex\s-+processFetches
1286 //* processFish:: @cindex\s-+processFish
1287 //* processFree:: @cindex\s-+processFree
1288 //* processMessages:: @cindex\s-+processMessages
1289 //* processResume:: @cindex\s-+processResume
1290 //* processSchedule:: @cindex\s-+processSchedule
1291 //* sendAck:: @cindex\s-+sendAck
1292 //* sendFetch:: @cindex\s-+sendFetch
1293 //* sendFish:: @cindex\s-+sendFish
1294 //* sendFree:: @cindex\s-+sendFree
1295 //* sendFreeMessages:: @cindex\s-+sendFreeMessages
1296 //* sendResume:: @cindex\s-+sendResume
1297 //* sendSchedule:: @cindex\s-+sendSchedule
1298 //* unpackAck:: @cindex\s-+unpackAck
1299 //* unpackFetch:: @cindex\s-+unpackFetch
1300 //* unpackFish:: @cindex\s-+unpackFish
1301 //* unpackFree:: @cindex\s-+unpackFree
1302 //* unpackResume:: @cindex\s-+unpackResume
1303 //* unpackSchedule:: @cindex\s-+unpackSchedule
1304 //* waitForTermination:: @cindex\s-+waitForTermination