1 /* ----------------------------------------------------------------------------
2 * Time-stamp: <Wed Mar 29 2000 19:35:36 Stardate: [-30]4578.87 hwloidl>
3 * $Id: HLComms.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
5 * High Level Communications Routines (HLComms.lc)
7 * Contains the high-level routines (i.e. communication
8 * subsystem independent) used by GUM
10 * Phil Trinder, Glasgow University, 12 December 1994
12 * Phil Trinder, Simon Marlow July 1998
13 * H-W. Loidl, Heriot-Watt University, November 1999
15 * ------------------------------------------------------------------------- */
17 #ifdef PAR /* whole file */
19 //@node High Level Communications Routines, , ,
20 //@section High Level Communications Routines
25 //* GUM Message Sending and Unpacking Functions::
26 //* Message-Processing Functions::
27 //* GUM Message Processor::
28 //* Miscellaneous Functions::
32 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
33 //@subsection Macros etc
36 # define NON_POSIX_SOURCE /* so says Solaris */
39 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
40 //@subsection Includes
45 #include "Storage.h" // for recordMutable
48 #include "GranSimRts.h"
49 #include "ParallelRts.h"
50 #include "FetchMe.h" // for BLOCKED_FETCH_info etc
52 # include "ParallelDebug.h"
54 #include "StgMacros.h" // inlined IS_... fcts
56 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
57 //@subsection GUM Message Sending and Unpacking Functions
60 * GUM Message Sending and Unpacking Functions
64 * Allocate space for message processing
67 //@cindex gumPackBuffer
68 static rtsPackBuffer *gumPackBuffer;
70 //@cindex initMoreBuffers
74 if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize,
75 "initMoreBuffers")) == NULL)
81 * SendFetch packs the two global addresses and a load into a message +
86 Structure of a FETCH message:
89 +------------------------------------+------+
90 | gtid | slot | weight | gtid | slot | load |
91 +------------------------------------+------+
96 sendFetch(globalAddr *rga, globalAddr *lga, int load)
98 ASSERT(rga->weight > 0 && lga->weight > 0);
100 belch("** [%x] Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
102 rga->payload.gc.gtid, rga->payload.gc.slot,
103 lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
108 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid),
109 GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
110 0, spark_queue_len(ADVISORY_POOL));
113 sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
114 (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot,
115 (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid,
116 (StgWord) lga->payload.gc.slot, (StgWord) load);
120 * unpackFetch unpacks a FETCH message into two Global addresses and a load
124 //@cindex unpackFetch
126 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
133 belch("** [%x] Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
135 (GlobalTaskId) buf[0], (int) buf[1],
136 (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
139 lga->payload.gc.gtid = (GlobalTaskId) buf[0];
140 lga->payload.gc.slot = (int) buf[1];
142 rga->weight = (unsigned) buf[2];
143 rga->payload.gc.gtid = (GlobalTaskId) buf[3];
144 rga->payload.gc.slot = (int) buf[4];
146 *load = (int) buf[5];
148 ASSERT(rga->weight > 0);
152 * SendResume packs the remote blocking queue's GA and data into a message
157 Structure of a RESUME message:
159 -------------------------------
160 | weight | slot | n | data ...
161 -------------------------------
163 data is a packed graph represented as an rtsPackBuffer
164 n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
169 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
173 belch("[] [%x] Sending Resume for ((%x, %d, %x))",
175 rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight));
177 sendOpNV(PP_RESUME, rga->payload.gc.gtid,
178 nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data,
179 2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
183 * unpackResume unpacks a Resume message into two Global addresses and
187 //@cindex unpackResume
189 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
196 belch("[] [%x] Unpacking Resume for ((%x, %d, %x))",
198 (int) buf[1], (unsigned) buf[0]));
201 RESUME event is written in awaken_blocked_queue
202 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
203 GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
206 lga->weight = (unsigned) buf[0];
207 lga->payload.gc.gtid = mytid;
208 lga->payload.gc.slot = (int) buf[1];
210 *nelem = (int) buf[2]; // includes PACK_BUFFER_HDR_SIZE;
211 GetArgs(data, *nelem);
212 *nelem -= PACK_BUFFER_HDR_SIZE;
216 * SendAck packs the global address being acknowledged, together with
217 * an array of global addresses for any closures shipped and sends them.
221 Structure of an ACK message:
224 +---------------------------------------------+-------
225 | weight | gtid | slot | weight | gtid | slot | ..... ngas times
226 + --------------------------------------------+-------
232 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
238 buffer = (long *) gumPackBuffer;
240 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
241 ASSERT(gagamap[1].weight > 0);
242 p[0] = (long) gagamap->weight;
243 p[1] = (long) gagamap->payload.gc.gtid;
244 p[2] = (long) gagamap->payload.gc.slot;
246 p[3] = (long) gagamap->weight;
247 p[4] = (long) gagamap->payload.gc.gtid;
248 p[5] = (long) gagamap->payload.gc.slot;
251 IF_PAR_DEBUG(schedule,
252 belch(",, [%x] Sending Ack (%d pairs) to PE %x\n",
255 sendOpN(PP_ACK, task, p - buffer, buffer);
259 * unpackAck unpacks an Acknowledgement message into a Global address,
260 * a count of the number of global addresses following and a map of
266 unpackAck(int *ngas, globalAddr *gagamap)
271 GetArgs(&GAarraysize, 1);
273 *ngas = GAarraysize / 6;
275 IF_PAR_DEBUG(schedule,
276 belch(",, [%x] Unpacking Ack (%d pairs) on %x\n",
277 mytid, *ngas, mytid));
279 while (GAarraysize > 0) {
281 gagamap->weight = (rtsWeight) buf[0];
282 gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
283 gagamap->payload.gc.slot = (int) buf[2];
285 gagamap->weight = (rtsWeight) buf[3];
286 gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
287 gagamap->payload.gc.slot = (int) buf[5];
288 ASSERT(gagamap->weight > 0);
295 * SendFish packs the global address being acknowledged, together with
296 * an array of global addresses for any closures shipped and sends them.
300 Structure of a FISH message:
302 +----------------------------------+
303 | orig PE | age | history | hunger |
304 +----------------------------------+
309 sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
310 int age, int history, int hunger)
313 belch("$$ [%x] Sending Fish to %x (%d outstanding fishes)",
314 mytid, destPE, outstandingFishes));
316 sendOpV(PP_FISH, destPE, 4,
317 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
319 if (origPE == mytid) {
326 * unpackFish unpacks a FISH message into the global task id of the
327 * originating PE and 3 data fields: the age, history and hunger of the
328 * fish. The history + hunger are not currently used.
334 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
341 belch("$$ [%x] Unpacking Fish from PE %x (age=%d)",
342 mytid, (GlobalTaskId) buf[0], (int) buf[1]));
344 *origPE = (GlobalTaskId) buf[0];
346 *history = (int) buf[2];
347 *hunger = (int) buf[3];
351 * SendFree sends (weight, slot) pairs for GAs that we no longer need
356 Structure of a FREE message:
358 +-----------------------------
359 | n | weight_1 | slot_1 | ...
360 +-----------------------------
364 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
367 belch("!! [%x] Sending Free (%d GAs) to %x",
368 mytid, nelem/2, pe));
370 sendOpN(PP_FREE, pe, nelem, data);
374 * unpackFree unpacks a FREE message into the amount of data shipped and
379 unpackFree(int *nelem, rtsPackBuffer *data)
384 *nelem = (int) buf[0];
387 belch("!! [%x] Unpacking Free (%d GAs)",
390 GetArgs(data, *nelem);
394 * SendSchedule sends a closure to be evaluated in response to a Fish
395 * message. The message is directed to the PE that originated the Fish
396 * (origPE), and includes the packed closure (data) along with its size
401 Structure of a SCHEDULE message:
403 +------------------------------------
404 | PE | n | pack buffer of a graph ...
405 +------------------------------------
407 //@cindex sendSchedule
409 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data)
411 IF_PAR_DEBUG(schedule,
413 belch("-- [%x] Sending Schedule (%d elems) to %x\n",
414 mytid, nelem, origPE));
416 sendOpN(PP_SCHEDULE, origPE, nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data);
420 * unpackSchedule unpacks a SCHEDULE message into the Global address of
421 * the closure shipped, the amount of data shipped (nelem) and the data
425 //@cindex unpackSchedule
427 unpackSchedule(int *nelem, rtsPackBuffer *data)
432 /* no. of elems, not counting the header of the pack buffer */
433 *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE;
435 IF_PAR_DEBUG(schedule,
436 belch("-- [%x] Unpacking Schedule (%d elems) on %x\n",
439 /* automatic cast of flat pvm-data to rtsPackBuffer */
440 GetArgs(data, *nelem + PACK_BUFFER_HDR_SIZE);
443 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
444 //@subsection Message-Processing Functions
447 * Message-Processing Functions
449 * The following routines process incoming GUM messages. Often reissuing
450 * messages in response.
452 * processFish unpacks a fish message, reissuing it if it's our own,
453 * sending work if we have it or sending it onwards otherwise.
457 * processFetches constructs and sends resume messages for every
458 * BlockedFetch which is ready to be awakened.
459 * awaken_blocked_queue (in Schedule.c) is responsible for moving
460 * BlockedFetches from a blocking queue to the PendingFetches queue.
463 extern StgBlockedFetch *PendingFetches;
466 pending_fetches_len(void)
471 for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
472 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
477 //@cindex processFetches
479 processFetches(void) {
480 StgBlockedFetch *bf, *next;
484 static rtsPackBuffer *packBuffer;
486 IF_PAR_DEBUG(verbose,
487 belch("____ processFetches: %d pending fetches (root @ %p)",
488 pending_fetches_len(), PendingFetches));
490 for (bf = PendingFetches;
493 /* the PendingFetches list contains only BLOCKED_FETCH closures */
494 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
495 /* store link (we might overwrite it via blockFetch later on */
496 next = (StgBlockedFetch *)(bf->link);
499 * Find the target at the end of the indirection chain, and
500 * process it in much the same fashion as the original target
501 * of the fetch. Though we hope to find graph here, we could
502 * find a black hole (of any flavor) or even a FetchMe.
506 We evacuate BQs and update the node fields where necessary in GC.c
507 So, if we find an EVACUATED closure, something has gone Very Wrong
508 (and therefore we let the RTS crash most ungracefully).
510 ASSERT(get_itbl(closure)->type != EVACUATED);
511 // closure = ((StgEvacuated *)closure)->evacuee;
513 closure = UNWIND_IND(closure);
514 //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
516 ip = get_itbl(closure);
517 if (ip->type == FETCH_ME) {
518 /* Forward the Fetch to someone else */
519 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
520 rga.payload.gc.slot = bf->ga.payload.gc.slot;
521 rga.weight = bf->ga.weight;
523 sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
526 belch("__-> processFetches: Forwarding fetch from %lx to %lx",
527 mytid, rga.payload.gc.gtid));
529 } else if (IS_BLACK_HOLE(closure)) {
530 IF_PAR_DEBUG(verbose,
531 belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
532 closure, info_type(closure)));
534 blockFetch(bf, closure);
536 /* We now have some local graph to send back */
539 packBuffer = gumPackBuffer;
540 IF_PAR_DEBUG(verbose,
541 belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
542 closure, info_type(closure)));
544 if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
545 // Put current BF back on list
546 bf->link = (StgBlockingQueueElement *)PendingFetches;
547 PendingFetches = (StgBlockedFetch *)bf;
548 // ToDo: check that nothing more has to be done to prepare for GC!
549 barf("processFetches: out of heap while packing graph; ToDo: call GC here");
550 GarbageCollect(GetRoots);
552 PendingFetches = (StgBlockedFetch *)(bf->link);
554 packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
555 ASSERT(packBuffer != (rtsPackBuffer *)NULL);
557 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
558 rga.payload.gc.slot = bf->ga.payload.gc.slot;
559 rga.weight = bf->ga.weight;
561 sendResume(&rga, size, packBuffer);
564 PendingFetches = END_BF_QUEUE;
569 Alternatively to sending fetch messages directly from the FETCH_ME_entry
570 code we could just store the data about the remote data in a global
571 variable and send the fetch request from the main scheduling loop (similar
572 to processFetches above). This would save an expensive STGCALL in the entry
573 code because we have to go back to the scheduler anyway.
575 //@cindex processFetches
577 processTheRealFetches(void) {
579 StgClosure *closure, *next;
581 IF_PAR_DEBUG(verbose,
582 belch("__ processTheRealFetches: ");
583 printGA(&theGlobalFromGA);
584 printGA(&theGlobalToGA));
586 ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
587 theGlobalToGA.payload.gc.gtid != 0);
589 /* the old version did this in the FETCH_ME entry code */
590 sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
597 * processFish unpacks a fish message, reissuing it if it's our own,
598 * sending work if we have it or sending it onwards otherwise.
600 //@cindex processFish
605 int age, history, hunger;
607 static rtsPackBuffer *packBuffer;
609 unpackFish(&origPE, &age, &history, &hunger);
611 if (origPE == mytid) {
612 //fishing = rtsFalse; // fish has come home
614 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
615 return; // that's all
618 ASSERT(origPE != mytid);
620 belch("$$__ processing fish; %d sparks available",
621 spark_queue_len(&(MainRegTable.rSparks))));
622 while ((spark = findSpark()) != NULL) {
624 // StgClosure *graph;
626 packBuffer = gumPackBuffer;
627 ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
628 if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size)) == NULL) {
630 belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
631 (StgClosure *)spark));
632 barf("processFish: out of heap while packing graph; ToDo: call GC here");
633 GarbageCollect(GetRoots);
634 /* Now go back and try again */
637 belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
639 (StgClosure *)spark, info_type((StgClosure *)spark)));
640 sendSchedule(origPE, size, packBuffer);
645 if (spark == (rtsSpark)NULL) {
647 belch("$$^^ No sparks available for FISH from %x",
649 /* We have no sparks to give */
650 if (age < FISH_LIFE_EXPECTANCY)
651 /* and the fish is atill young, send it to another PE to look for work */
652 sendFish(choosePE(), origPE,
653 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
655 /* otherwise, send it home to die */
657 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
662 * processFetch either returns the requested data (if available)
663 * or blocks the remote blocking queue on a black hole (if not).
666 //@cindex processFetch
675 unpackFetch(&ga, &rga, &load);
677 belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
678 ga.payload.gc.gtid, ga.payload.gc.slot,
679 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
680 rga.payload.gc.gtid));
682 closure = GALAlookup(&ga);
683 ASSERT(closure != (StgClosure *)NULL);
684 ip = get_itbl(closure);
685 if (ip->type == FETCH_ME) {
686 /* Forward the Fetch to someone else */
687 sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
688 } else if (rga.payload.gc.gtid == mytid) {
689 /* Our own FETCH forwarded back around to us */
690 StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
693 belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
694 closure, info_type(closure), fmbq, info_type(fmbq)));
695 /* We may have already discovered that the fetch target is our own. */
696 if ((StgClosure *)fmbq != closure)
697 CommonUp((StgClosure *)fmbq, closure);
698 (void) addWeight(&rga);
699 } else if (IS_BLACK_HOLE(closure)) {
700 /* This includes RBH's and FMBQ's */
703 ASSERT(GALAlookup(&rga) == NULL);
705 /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
706 closure into the BQ in order to denote that when updating this node
707 the result should be sent to the originator of this fetch message. */
708 bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
709 blockFetch(bf, closure);
712 belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
713 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
714 closure, info_type(closure)));
716 /* The target of the FetchMe is some local graph */
718 // StgClosure *graph;
719 rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
721 if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
722 barf("processFetch: out of heap while packing graph; ToDo: call GC here");
723 GarbageCollect(GetRoots);
724 closure = GALAlookup(&ga);
725 buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
726 ASSERT(buffer != (rtsPackBuffer *)NULL);
728 sendResume(&rga, size, buffer);
733 * processFree unpacks a FREE message and adds the weights to our GAs.
735 //@cindex processFree
740 static StgWord *buffer;
744 buffer = (StgWord *)gumPackBuffer;
745 unpackFree(&nelem, buffer);
747 belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
749 ga.payload.gc.gtid = mytid;
750 for (i = 0; i < nelem;) {
751 ga.weight = (rtsWeight) buffer[i++];
752 ga.payload.gc.slot = (int) buffer[i++];
754 fprintf(stderr, "!!-- Processing free ");
758 (void) addWeight(&ga);
763 * processResume unpacks a RESUME message into the graph, filling in
764 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
765 * FetchMe (now a blocking queue) are awakened, and the blocking queue
766 * is converted into an indirection. Finally it sends an ACK in response
767 * which contains any newly allocated GAs.
770 //@cindex processResume
772 processResume(GlobalTaskId sender)
776 static rtsPackBuffer *packBuffer;
777 StgClosure *newGraph, *old;
781 packBuffer = gumPackBuffer;
782 unpackResume(&lga, &nelem, (StgPtr)packBuffer);
785 fprintf(stderr, "[]__ Rcvd Resume for ");
788 PrintPacket((rtsPackBuffer *)packBuffer));
791 * We always unpack the incoming graph, even if we've received the
792 * requested node in some other data packet (and already awakened
793 * the blocking queue).
794 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
795 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
796 SAVE_Hp -= packBuffer[0];
800 // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
802 /* Do this *after* GC; we don't want to release the object early! */
805 (void) addWeight(&lga);
807 old = GALAlookup(&lga);
809 if (RtsFlags.ParFlags.ParStats.Full) {
810 // StgTSO *tso = END_TSO_QUEUE;
811 StgBlockingQueueElement *bqe;
813 /* Write REPLY events to the log file, indicating that the remote
815 if (get_itbl(old)->type == FETCH_ME_BQ ||
816 get_itbl(old)->type == RBH)
817 for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue;
818 bqe->link != END_BQ_QUEUE;
820 if (get_itbl((StgClosure *)bqe)->type == TSO)
821 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
822 GR_REPLY, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
823 0, spark_queue_len(&(MainRegTable.rSparks)));
826 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
827 ASSERT(newGraph != NULL);
830 * Sometimes, unpacking will common up the resumee with the
831 * incoming graph, but if it hasn't, we'd better do so now.
834 if (get_itbl(old)->type == FETCH_ME_BQ)
835 CommonUp(old, newGraph);
838 DebugPrintGAGAMap(gagamap, nGAs));
840 sendAck(sender, nGAs, gagamap);
844 * processSchedule unpacks a SCHEDULE message into the graph, filling
845 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
846 * the local spark queue. Finally it sends an ACK in response
847 * which contains any newly allocated GAs.
849 //@cindex processSchedule
851 processSchedule(GlobalTaskId sender)
853 nat nelem, space_required, nGAs;
855 static rtsPackBuffer *packBuffer;
856 StgClosure *newGraph;
859 packBuffer = gumPackBuffer; /* HWL */
860 unpackSchedule(&nelem, packBuffer);
863 belch("--__ Rcvd Schedule (%d elems)", nelem);
864 PrintPacket(packBuffer));
867 * For now, the graph is a closure to be sparked as an advisory
868 * spark, but in future it may be a complete spark with
869 * required/advisory status, priority etc.
873 space_required = packBuffer[0];
874 if (SAVE_Hp + space_required >= SAVE_HpLim) {
875 ReallyPerformThreadGC(space_required, rtsFalse);
876 SAVE_Hp -= space_required;
879 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
880 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
881 ASSERT(newGraph != NULL);
882 success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
886 belch("--^^ added spark to unpacked graph %p; %d sparks available on [%x]",
887 newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
889 belch("--^^ received non-sparkable closure %p; nothing added to spark pool; %d sparks available on [%x]",
890 newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
891 belch("*< Unpacked graph with root at %p (%s):",
892 newGraph, info_type(newGraph));
893 PrintGraph(newGraph, 0));
896 DebugPrintGAGAMap(gagamap, nGAs));
899 sendAck(sender, nGAs, gagamap);
901 //fishing = rtsFalse;
902 ASSERT(outstandingFishes>0);
907 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
908 * (which represent shared thunks that have been shipped) into fetch-mes
917 globalAddr gagamap[256]; // ToDo: elim magic constant!! MAX_GAS * 2];??
919 unpackAck(&nGAs, gagamap);
922 belch(",,,, Rcvd Ack (%d pairs)", nGAs);
923 DebugPrintGAGAMap(gagamap, nGAs));
926 checkGAGAMap(gagamap, nGAs));
929 * For each (oldGA, newGA) pair, set the GA of the corresponding
930 * thunk to the newGA, convert the thunk to a FetchMe, and return
931 * the weight from the oldGA.
933 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
934 StgClosure *old_closure = GALAlookup(gaga);
935 StgClosure *new_closure = GALAlookup(gaga + 1);
937 ASSERT(old_closure != NULL);
938 if (new_closure == NULL) {
939 /* We don't have this closure, so we make a fetchme for it */
940 globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
942 /* convertToFetchMe should be done unconditionally here.
943 Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
944 so we have to check whether it is an RBH before converting
946 ASSERT(get_itbl(old_closure)==RBH);
948 if (get_itbl(old_closure)->type==RBH)
949 convertToFetchMe(old_closure, ga);
952 * Oops...we've got this one already; update the RBH to
953 * point to the object we already know about, whatever it
956 CommonUp(old_closure, new_closure);
959 * Increase the weight of the object by the amount just
960 * received in the second part of the ACK pair.
962 (void) addWeight(gaga + 1);
964 (void) addWeight(gaga);
967 /* check the sanity of the LAGA and GALA tables after mincing them */
968 IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
971 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
972 //@subsection GUM Message Processor
975 * GUM Message Processor
977 * processMessages processes any messages that have arrived, calling
978 * appropriate routines depending on the message tag
979 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
980 * present and performs a blocking receive! During profiling it
981 * busy-waits in order to record idle time.
984 //@cindex processMessages
986 processMessages(void)
993 packet = GetPacket(); /* Get next message; block until one available */
994 getOpcodeAndSender(packet, &opcode, &task);
998 IF_PAR_DEBUG(verbose,
999 belch("==== received FINISH [%p]", mytid));
1000 /* setting this global variables eventually terminates the main
1001 scheduling loop for this PE and causes a shut-down, sending
1002 PP_FINISH to SysMan */
1003 GlobalStopPending = rtsTrue;
1011 processResume(task);
1027 processSchedule(task);
1031 /* Anything we're not prepared to deal with. */
1032 barf("Task %x: Unexpected opcode %x from %x",
1033 mytid, opcode, task);
1036 } while (PacketsWaiting()); /* While there are messages: process them */
1037 } /* processMessages */
1039 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1040 //@subsection Miscellaneous Functions
1043 * blockFetch blocks a BlockedFetch node on some kind of black hole.
1045 //@cindex blockFetch
1047 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1049 switch (get_itbl(bh)->type) {
1051 bf->link = END_BQ_QUEUE;
1052 //((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1053 SET_INFO(bh, &BLACKHOLE_BQ_info); // turn closure into a blocking queue
1054 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1056 // put bh on the mutables list
1057 recordMutable((StgMutClosure *)bh);
1061 /* enqueue bf on blocking queue of closure bh */
1062 bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1063 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1065 // put bh on the mutables list; ToDo: check
1066 recordMutable((StgMutClosure *)bh);
1070 /* enqueue bf on blocking queue of closure bh */
1071 bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1072 ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1074 // put bh on the mutables list; ToDo: check
1075 recordMutable((StgMutClosure *)bh);
1079 /* enqueue bf on blocking queue of closure bh */
1080 bf->link = ((StgRBH *)bh)->blocking_queue;
1081 ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1083 // put bh on the mutables list; ToDo: check
1084 recordMutable((StgMutClosure *)bh);
1088 barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1089 (StgClosure *)bh, get_itbl((StgClosure *)bh),
1090 info_type((StgClosure *)bh));
1092 IF_PAR_DEBUG(schedule,
1093 belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1100 blockThread is called from the main scheduler whenever tso returns with
1101 a ThreadBlocked return code; tso has already been added to a blocking
1102 queue (that's done in the entry code of the closure, because it is a
1103 cheap operation we have to do in any case); the main purpose of this
1104 routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1105 closure, which is indicated by the tso.why_blocked field;
1106 we also write an entry into the log file if we are generating one
1108 Should update exectime etc in the entry code already; but we don't have
1109 something like ``system time'' in the log file anyway, so this should
1110 even out the inaccuracies.
1113 //@cindex blockThread
1115 blockThread(StgTSO *tso)
1117 globalAddr *remote_ga;
1118 globalAddr *local_ga;
1121 // ASSERT(we are on some blocking queue)
1122 ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1125 We have to check why this thread has been blocked.
1127 switch (tso->why_blocked) {
1129 /* the closure must be a FETCH_ME_BQ; tso came in here via
1130 FETCH_ME entry code */
1131 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1133 /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1134 end this point; if something (eg. GC) happens inbetween the whole
1136 The problem is that the ga field of the FETCH_ME has been overwritten
1137 with the head of the blocking (which is tso).
1139 //ASSERT(looks_like_ga((globalAddr *)tso->link));
1140 ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1141 remote_ga = (globalAddr *)tso->link; // ((StgFetchMe *)tso->block_info.closure)->ga;
1142 tso->link = END_BQ_QUEUE;
1143 /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1144 we have to send a Fetch message here! */
1145 if (RtsFlags.ParFlags.ParStats.Full) {
1146 /* Note that CURRENT_TIME may perform an unsafe call */
1147 //rtsTime now = CURRENT_TIME; /* Now */
1148 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1149 tso->par.fetchcount++;
1150 tso->par.blockedat = CURRENT_TIME;
1151 /* we are about to send off a FETCH message, so dump a FETCH event */
1152 DumpRawGranEvent(CURRENT_PROC,
1153 taskIDtoPE(remote_ga->payload.gc.gtid),
1154 GR_FETCH, tso, tso->block_info.closure, 0);
1156 /* Phil T. claims that this was a workaround for a hard-to-find
1157 * bug, hence I'm leaving it out for now --SDM
1159 /* Assign a brand-new global address to the newly created FMBQ */
1160 local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1161 splitWeight(&fmbq_ga, local_ga);
1162 ASSERT(fmbq_ga.weight == 1L << (BITS_IN(unsigned) - 1));
1164 sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1168 case BlockedOnGA_NoSend:
1169 /* the closure must be a FETCH_ME_BQ; tso came in here via
1170 FETCH_ME_BQ entry code */
1171 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1173 /* Fetch message has been sent already */
1174 if (RtsFlags.ParFlags.ParStats.Full) {
1175 /* Note that CURRENT_TIME may perform an unsafe call */
1176 //rtsTime now = CURRENT_TIME; /* Now */
1177 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1178 tso->par.blockcount++;
1179 tso->par.blockedat = CURRENT_TIME;
1180 /* dump a block event, because fetch has been sent already */
1181 DumpRawGranEvent(CURRENT_PROC, thisPE,
1182 GR_BLOCK, tso, tso->block_info.closure, 0);
1186 case BlockedOnBlackHole:
1187 /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via
1188 BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1189 ASSERT(get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1190 get_itbl(tso->block_info.closure)->type==RBH);
1192 /* if collecting stats update the execution time etc */
1193 if (RtsFlags.ParFlags.ParStats.Full) {
1194 /* Note that CURRENT_TIME may perform an unsafe call */
1195 //rtsTime now = CURRENT_TIME; /* Now */
1196 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1197 tso->par.blockcount++;
1198 tso->par.blockedat = CURRENT_TIME;
1199 DumpRawGranEvent(CURRENT_PROC, thisPE,
1200 GR_BLOCK, tso, tso->block_info.closure, 0);
1205 barf("blockThread: impossible why_blocked code %d for TSO %d",
1206 tso->why_blocked, tso->id);
1209 IF_PAR_DEBUG(schedule,
1210 belch("##++ blockThread: TSO %d blocked on closure %p (%s)",
1211 tso->id, tso->block_info.closure, info_type(tso->block_info.closure)));
1215 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1216 * Important properties:
1217 * - it varies during execution, even if the PE is idle
1218 * - it's different for each PE
1219 * - we never send a fish to ourselves
1221 extern long lrand48 (void);
1229 temp = lrand48() % nPEs;
1230 if (allPEs[temp] == mytid) { /* Never send a FISH to yourself */
1231 temp = (temp + 1) % nPEs;
1233 return allPEs[temp];
1237 * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1238 * of the ga argument; called from processFetch when the local closure is
1241 //@cindex createBlockedFetch
1243 createBlockedFetch (globalAddr ga, globalAddr rga)
1245 StgBlockedFetch *bf;
1246 StgClosure *closure;
1248 closure = GALAlookup(&ga);
1249 if ((bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch))) == NULL) {
1250 barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1251 GarbageCollect(GetRoots);
1252 closure = GALAlookup(&ga);
1253 bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch));
1254 // ToDo: check whether really guaranteed to succeed 2nd time around
1257 ASSERT(bf != (StgClosure *)NULL);
1258 SET_INFO((StgClosure *)bf, &BLOCKED_FETCH_info);
1259 // ToDo: check whether other header info is needed
1261 bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1262 bf->ga.payload.gc.slot = rga.payload.gc.slot;
1263 bf->ga.weight = rga.weight;
1264 // bf->link = NULL; debugging
1266 IF_PAR_DEBUG(schedule,
1267 fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1268 bf, info_type(bf), closure);
1270 fputc('\n',stderr));
1275 * waitForTermination enters a loop ignoring spurious messages while
1276 * waiting for the termination sequence to be completed.
1278 //@cindex waitForTermination
1280 waitForTermination(void)
1283 rtsPacket p = GetPacket();
1284 processUnexpected(p);
1289 //@cindex DebugPrintGAGAMap
1291 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1295 for (i = 0; i < nGAs; ++i, gagamap += 2)
1296 fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1297 gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1298 gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1301 //@cindex checkGAGAMap
1303 checkGAGAMap(globalAddr *gagamap, int nGAs)
1307 for (i = 0; i < nGAs; ++i, gagamap += 2) {
1308 ASSERT(looks_like_ga(gagamap));
1309 ASSERT(looks_like_ga(gagamap+1));
1314 //@cindex freeMsgBuffer
1315 static StgWord **freeMsgBuffer = NULL;
1316 //@cindex freeMsgIndex
1317 static nat *freeMsgIndex = NULL;
1319 //@cindex prepareFreeMsgBuffers
1321 prepareFreeMsgBuffers(void)
1325 /* Allocate the freeMsg buffers just once and then hang onto them. */
1326 if (freeMsgIndex == NULL) {
1327 freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat),
1328 "prepareFreeMsgBuffers (Index)");
1329 freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *),
1330 "prepareFreeMsgBuffers (Buffer)");
1332 for(i = 0; i < nPEs; i++)
1334 freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1335 "prepareFreeMsgBuffers (Buffer #i)");
1338 /* Initialize the freeMsg buffer pointers to point to the start of their
1340 for (i = 0; i < nPEs; i++)
1341 freeMsgIndex[i] = 0;
1344 //@cindex freeRemoteGA
1346 freeRemoteGA(int pe, globalAddr *ga)
1350 ASSERT(GALAlookup(ga) == NULL);
1352 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1354 belch("!! Filled a free message buffer (sending remaining messages indivisually)"));
1356 sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1359 freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1360 freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1361 freeMsgIndex[pe] = i;
1364 ga->weight = 0xdead0add;
1365 ga->payload.gc.gtid = 0xbbbbbbbb;
1366 ga->payload.gc.slot = 0xbbbbbbbb;);
1369 //@cindex sendFreeMessages
1371 sendFreeMessages(void)
1375 for (i = 0; i < nPEs; i++)
1376 if (freeMsgIndex[i] > 0)
1377 sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1380 #endif /* PAR -- whole file */
1382 //@node Index, , Miscellaneous Functions, High Level Communications Routines
1386 //* ACK:: @cindex\s-+ACK
1387 //* DebugPrintGAGAMap:: @cindex\s-+DebugPrintGAGAMap
1388 //* FETCH:: @cindex\s-+FETCH
1389 //* FISH:: @cindex\s-+FISH
1390 //* FREE:: @cindex\s-+FREE
1391 //* RESUME:: @cindex\s-+RESUME
1392 //* SCHEDULE:: @cindex\s-+SCHEDULE
1393 //* blockFetch:: @cindex\s-+blockFetch
1394 //* choosePE:: @cindex\s-+choosePE
1395 //* freeMsgBuffer:: @cindex\s-+freeMsgBuffer
1396 //* freeMsgIndex:: @cindex\s-+freeMsgIndex
1397 //* freeRemoteGA:: @cindex\s-+freeRemoteGA
1398 //* gumPackBuffer:: @cindex\s-+gumPackBuffer
1399 //* initMoreBuffers:: @cindex\s-+initMoreBuffers
1400 //* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers
1401 //* processAck:: @cindex\s-+processAck
1402 //* processFetch:: @cindex\s-+processFetch
1403 //* processFetches:: @cindex\s-+processFetches
1404 //* processFish:: @cindex\s-+processFish
1405 //* processFree:: @cindex\s-+processFree
1406 //* processMessages:: @cindex\s-+processMessages
1407 //* processResume:: @cindex\s-+processResume
1408 //* processSchedule:: @cindex\s-+processSchedule
1409 //* sendAck:: @cindex\s-+sendAck
1410 //* sendFetch:: @cindex\s-+sendFetch
1411 //* sendFish:: @cindex\s-+sendFish
1412 //* sendFree:: @cindex\s-+sendFree
1413 //* sendFreeMessages:: @cindex\s-+sendFreeMessages
1414 //* sendResume:: @cindex\s-+sendResume
1415 //* sendSchedule:: @cindex\s-+sendSchedule
1416 //* unpackAck:: @cindex\s-+unpackAck
1417 //* unpackFetch:: @cindex\s-+unpackFetch
1418 //* unpackFish:: @cindex\s-+unpackFish
1419 //* unpackFree:: @cindex\s-+unpackFree
1420 //* unpackResume:: @cindex\s-+unpackResume
1421 //* unpackSchedule:: @cindex\s-+unpackSchedule
1422 //* waitForTermination:: @cindex\s-+waitForTermination