1 /* ----------------------------------------------------------------------------
2 * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
3 * $Id: HLComms.c,v 1.4 2001/03/22 03:51:11 hwloidl Exp $
5 * High Level Communications Routines (HLComms.lc)
7 * Contains the high-level routines (i.e. communication
8 * subsystem independent) used by GUM
10 * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
11 * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
12 * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
14 * ------------------------------------------------------------------------- */
16 #ifdef PAR /* whole file */
18 //@node High Level Communications Routines, , ,
19 //@section High Level Communications Routines
24 //* GUM Message Sending and Unpacking Functions::
25 //* Message-Processing Functions::
26 //* GUM Message Processor::
27 //* Miscellaneous Functions::
31 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
32 //@subsection Macros etc
35 # define NON_POSIX_SOURCE /* so says Solaris */
38 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
39 //@subsection Includes
44 #include "Storage.h" // for recordMutable
47 #include "GranSimRts.h"
48 #include "ParallelRts.h"
50 #include "FetchMe.h" // for BLOCKED_FETCH_info etc
52 # include "ParallelDebug.h"
54 #include "StgMacros.h" // inlined IS_... fcts
57 #include "SchedAPI.h" //for createIOThread
58 extern unsigned int context_switch;
61 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
62 //@subsection GUM Message Sending and Unpacking Functions
65 * GUM Message Sending and Unpacking Functions
69 * Allocate space for message processing
72 //@cindex gumPackBuffer
73 static rtsPackBuffer *gumPackBuffer;
75 //@cindex initMoreBuffers
79 if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize,
80 "initMoreBuffers")) == NULL)
86 * SendFetch packs the two global addresses and a load into a message +
91 Structure of a FETCH message:
94 +------------------------------------+------+
95 | gtid | slot | weight | gtid | slot | load |
96 +------------------------------------+------+
101 sendFetch(globalAddr *rga, globalAddr *lga, int load)
103 ASSERT(rga->weight > 0 && lga->weight > 0);
105 belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
106 rga->payload.gc.gtid, rga->payload.gc.slot,
107 lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
112 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid),
113 GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
114 0, spark_queue_len(ADVISORY_POOL));
117 sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
118 (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot,
119 (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid,
120 (StgWord) lga->payload.gc.slot, (StgWord) load);
124 * unpackFetch unpacks a FETCH message into two Global addresses and a load
128 //@cindex unpackFetch
130 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
137 belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
138 (GlobalTaskId) buf[0], (int) buf[1],
139 (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
142 lga->payload.gc.gtid = (GlobalTaskId) buf[0];
143 lga->payload.gc.slot = (int) buf[1];
145 rga->weight = (unsigned) buf[2];
146 rga->payload.gc.gtid = (GlobalTaskId) buf[3];
147 rga->payload.gc.slot = (int) buf[4];
149 *load = (int) buf[5];
151 ASSERT(rga->weight > 0);
155 * SendResume packs the remote blocking queue's GA and data into a message
160 Structure of a RESUME message:
162 -------------------------------
163 | weight | slot | n | data ...
164 -------------------------------
166 data is a packed graph represented as an rtsPackBuffer
167 n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
172 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
175 belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]",
176 packBuffer->id, nelem,
177 rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
178 rga->payload.gc.gtid));
180 PrintPacket(packBuffer));
182 ASSERT(nelem==packBuffer->size);
183 /* check for magic end-of-buffer word */
184 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
186 sendOpNV(PP_RESUME, rga->payload.gc.gtid,
187 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer,
188 2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
192 * unpackResume unpacks a Resume message into two Global addresses and
196 //@cindex unpackResume
198 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
205 RESUME event is written in awaken_blocked_queue
206 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
207 GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
210 lga->weight = (unsigned) buf[0];
211 lga->payload.gc.gtid = mytid;
212 lga->payload.gc.slot = (int) buf[1];
214 *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
215 GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
218 belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))",
219 packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
221 /* check for magic end-of-buffer word */
222 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
226 * SendAck packs the global address being acknowledged, together with
227 * an array of global addresses for any closures shipped and sends them.
231 Structure of an ACK message:
234 +---------------------------------------------+-------
235 | weight | gtid | slot | weight | gtid | slot | ..... ngas times
236 + --------------------------------------------+-------
242 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
249 return; //don't send unnecessary messages!!
251 buffer = (long *) gumPackBuffer;
253 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
254 ASSERT(gagamap[1].weight > 0);
255 p[0] = (long) gagamap->weight;
256 p[1] = (long) gagamap->payload.gc.gtid;
257 p[2] = (long) gagamap->payload.gc.slot;
259 p[3] = (long) gagamap->weight;
260 p[4] = (long) gagamap->payload.gc.gtid;
261 p[5] = (long) gagamap->payload.gc.slot;
264 IF_PAR_DEBUG(schedule,
265 belch("~^,, Sending Ack (%d pairs) to [%x]\n",
268 sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
272 * unpackAck unpacks an Acknowledgement message into a Global address,
273 * a count of the number of global addresses following and a map of
279 unpackAck(int *ngas, globalAddr *gagamap)
284 GetArgs(&GAarraysize, 1);
286 *ngas = GAarraysize / 6;
288 IF_PAR_DEBUG(schedule,
289 belch("~^,, Unpacking Ack (%d pairs) on [%x]\n",
292 while (GAarraysize > 0) {
294 gagamap->weight = (rtsWeight) buf[0];
295 gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
296 gagamap->payload.gc.slot = (int) buf[2];
298 gagamap->weight = (rtsWeight) buf[3];
299 gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
300 gagamap->payload.gc.slot = (int) buf[5];
301 ASSERT(gagamap->weight > 0);
308 * SendFish packs the global address being acknowledged, together with
309 * an array of global addresses for any closures shipped and sends them.
313 Structure of a FISH message:
315 +----------------------------------+
316 | orig PE | age | history | hunger |
317 +----------------------------------+
322 sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
323 int age, int history, int hunger)
326 belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)",
327 destPE, outstandingFishes));
329 sendOpV(PP_FISH, destPE, 4,
330 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
332 if (origPE == mytid) {
339 * unpackFish unpacks a FISH message into the global task id of the
340 * originating PE and 3 data fields: the age, history and hunger of the
341 * fish. The history + hunger are not currently used.
347 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
354 belch("~^$$ Unpacking Fish from [%x] (age=%d)",
355 (GlobalTaskId) buf[0], (int) buf[1]));
357 *origPE = (GlobalTaskId) buf[0];
359 *history = (int) buf[2];
360 *hunger = (int) buf[3];
364 * SendFree sends (weight, slot) pairs for GAs that we no longer need
369 Structure of a FREE message:
371 +-----------------------------
372 | n | weight_1 | slot_1 | ...
373 +-----------------------------
377 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
380 belch("~^!! Sending Free (%d GAs) to [%x]",
383 sendOpN(PP_FREE, pe, nelem, data);
387 * unpackFree unpacks a FREE message into the amount of data shipped and
392 unpackFree(int *nelem, StgWord *data)
397 *nelem = (int) buf[0];
400 belch("~^!! Unpacking Free (%d GAs)",
403 GetArgs(data, *nelem);
407 * SendSchedule sends a closure to be evaluated in response to a Fish
408 * message. The message is directed to the PE that originated the Fish
409 * (origPE), and includes the packed closure (data) along with its size
414 Structure of a SCHEDULE message:
416 +------------------------------------
417 | PE | n | pack buffer of a graph ...
418 +------------------------------------
420 //@cindex sendSchedule
422 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
424 IF_PAR_DEBUG(schedule,
425 belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n",
426 packBuffer->id, nelem, origPE));
428 PrintPacket(packBuffer));
430 ASSERT(nelem==packBuffer->size);
431 /* check for magic end-of-buffer word */
432 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
434 sendOpN(PP_SCHEDULE, origPE,
435 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
439 * unpackSchedule unpacks a SCHEDULE message into the Global address of
440 * the closure shipped, the amount of data shipped (nelem) and the data
444 //@cindex unpackSchedule
446 unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
450 /* first, just unpack 1 word containing the total size (including header) */
452 /* no. of elems, not counting the header of the pack buffer */
453 *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
455 /* automatic cast of flat pvm-data to rtsPackBuffer */
456 GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
458 IF_PAR_DEBUG(schedule,
459 belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n",
460 packBuffer->id, *nelem, mytid));
462 ASSERT(*nelem==packBuffer->size);
463 /* check for magic end-of-buffer word */
464 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
468 /* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
470 sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
472 IF_PAR_DEBUG(schedule,
473 belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n",
474 packBuffer->id, nelem, origPE));
476 PrintPacket(packBuffer));
478 ASSERT(nelem==packBuffer->size);
479 /* check for magic end-of-buffer word */
480 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
482 sendOpN(PP_REVAL, origPE,
483 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
486 void FinishReval(StgTSO *t)
490 rtsPackBuffer *buffer=NULL;
492 ga.payload.gc.slot = t->revalSlot;
493 ga.payload.gc.gtid = t->revalTid;
496 //find where the reval result is
497 res = GALAlookup(&ga);
500 IF_PAR_DEBUG(schedule,
502 belch(" needs the result %08x\n",res));
504 //send off the result
505 buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
506 ASSERT(buffer != (rtsPackBuffer *)NULL);
507 sendResume(&ga, size, buffer);
509 IF_PAR_DEBUG(schedule,
510 belch("@;~) Reval Finished"));
515 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
516 //@subsection Message-Processing Functions
519 * Message-Processing Functions
521 * The following routines process incoming GUM messages. Often reissuing
522 * messages in response.
524 * processFish unpacks a fish message, reissuing it if it's our own,
525 * sending work if we have it or sending it onwards otherwise.
529 * processFetches constructs and sends resume messages for every
530 * BlockedFetch which is ready to be awakened.
531 * awaken_blocked_queue (in Schedule.c) is responsible for moving
532 * BlockedFetches from a blocking queue to the PendingFetches queue.
535 extern StgBlockedFetch *PendingFetches;
538 pending_fetches_len(void)
543 for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
544 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
549 //@cindex processFetches
551 processFetches(void) {
552 StgBlockedFetch *bf, *next;
556 static rtsPackBuffer *packBuffer;
558 IF_PAR_DEBUG(verbose,
559 belch("____ processFetches: %d pending fetches (root @ %p)",
560 pending_fetches_len(), PendingFetches));
562 for (bf = PendingFetches;
565 /* the PendingFetches list contains only BLOCKED_FETCH closures */
566 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
567 /* store link (we might overwrite it via blockFetch later on */
568 next = (StgBlockedFetch *)(bf->link);
571 * Find the target at the end of the indirection chain, and
572 * process it in much the same fashion as the original target
573 * of the fetch. Though we hope to find graph here, we could
574 * find a black hole (of any flavor) or even a FetchMe.
578 We evacuate BQs and update the node fields where necessary in GC.c
579 So, if we find an EVACUATED closure, something has gone Very Wrong
580 (and therefore we let the RTS crash most ungracefully).
582 ASSERT(get_itbl(closure)->type != EVACUATED);
583 // closure = ((StgEvacuated *)closure)->evacuee;
585 closure = UNWIND_IND(closure);
586 //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
588 ip = get_itbl(closure);
589 if (ip->type == FETCH_ME) {
590 /* Forward the Fetch to someone else */
591 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
592 rga.payload.gc.slot = bf->ga.payload.gc.slot;
593 rga.weight = bf->ga.weight;
595 sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
597 // Global statistics: count no. of fetches
598 if (RtsFlags.ParFlags.ParStats.Global &&
599 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
600 globalParStats.tot_fetch_mess++;
604 belch("__-> processFetches: Forwarding fetch from %lx to %lx",
605 mytid, rga.payload.gc.gtid));
607 } else if (IS_BLACK_HOLE(closure)) {
608 IF_PAR_DEBUG(verbose,
609 belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
610 closure, info_type(closure)));
612 blockFetch(bf, closure);
614 /* We now have some local graph to send back */
617 packBuffer = gumPackBuffer;
618 IF_PAR_DEBUG(verbose,
619 belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
620 closure, info_type(closure)));
622 if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
623 // Put current BF back on list
624 bf->link = (StgBlockingQueueElement *)PendingFetches;
625 PendingFetches = (StgBlockedFetch *)bf;
626 // ToDo: check that nothing more has to be done to prepare for GC!
627 barf("processFetches: out of heap while packing graph; ToDo: call GC here");
628 GarbageCollect(GetRoots, rtsFalse);
630 PendingFetches = (StgBlockedFetch *)(bf->link);
632 packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
633 ASSERT(packBuffer != (rtsPackBuffer *)NULL);
635 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
636 rga.payload.gc.slot = bf->ga.payload.gc.slot;
637 rga.weight = bf->ga.weight;
639 sendResume(&rga, size, packBuffer);
641 // Global statistics: count no. of fetches
642 if (RtsFlags.ParFlags.ParStats.Global &&
643 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
644 globalParStats.tot_resume_mess++;
648 PendingFetches = END_BF_QUEUE;
653 Alternatively to sending fetch messages directly from the FETCH_ME_entry
654 code we could just store the data about the remote data in a global
655 variable and send the fetch request from the main scheduling loop (similar
656 to processFetches above). This would save an expensive STGCALL in the entry
657 code because we have to go back to the scheduler anyway.
659 //@cindex processFetches
661 processTheRealFetches(void) {
663 StgClosure *closure, *next;
665 IF_PAR_DEBUG(verbose,
666 belch("__ processTheRealFetches: ");
667 printGA(&theGlobalFromGA);
668 printGA(&theGlobalToGA));
670 ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
671 theGlobalToGA.payload.gc.gtid != 0);
673 /* the old version did this in the FETCH_ME entry code */
674 sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
681 Way of dealing with unwanted fish.
682 Used during startup/shutdown, or from unknown PEs
687 int age, history, hunger;
689 /* IF_PAR_DEBUG(verbose, */
690 belch(".... [%x] Bouncing unwanted FISH",mytid);
692 unpackFish(&origPE, &age, &history, &hunger);
694 if (origPE == mytid) {
695 //fishing = rtsFalse; // fish has come home
697 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
698 return; // that's all
701 /* otherwise, send it home to die */
702 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
703 // Global statistics: count no. of fetches
704 if (RtsFlags.ParFlags.ParStats.Global &&
705 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
706 globalParStats.tot_fish_mess++;
711 * processFish unpacks a fish message, reissuing it if it's our own,
712 * sending work if we have it or sending it onwards otherwise.
714 //@cindex processFish
719 int age, history, hunger;
721 static rtsPackBuffer *packBuffer;
723 unpackFish(&origPE, &age, &history, &hunger);
725 if (origPE == mytid) {
726 //fishing = rtsFalse; // fish has come home
728 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
729 return; // that's all
732 ASSERT(origPE != mytid);
734 belch("$$__ processing fish; %d sparks available",
735 spark_queue_len(&(MainRegTable.rSparks))));
736 while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
738 // StgClosure *graph;
740 packBuffer = gumPackBuffer;
741 ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
742 if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
744 belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
745 (StgClosure *)spark));
746 barf("processFish: out of heap while packing graph; ToDo: call GC here");
747 GarbageCollect(GetRoots, rtsFalse);
748 /* Now go back and try again */
750 IF_PAR_DEBUG(verbose,
751 if (RtsFlags.ParFlags.ParStats.Sparks)
752 belch("==== STEALING spark %x; sending to %x", spark, origPE));
755 belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
757 (StgClosure *)spark, info_type((StgClosure *)spark)));
758 sendSchedule(origPE, size, packBuffer);
760 // Global statistics: count no. of fetches
761 if (RtsFlags.ParFlags.ParStats.Global &&
762 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
763 globalParStats.tot_schedule_mess++;
769 if (spark == (rtsSpark)NULL) {
771 belch("$$^^ No sparks available for FISH from %x",
773 /* We have no sparks to give */
774 if (age < FISH_LIFE_EXPECTANCY) {
775 /* and the fish is atill young, send it to another PE to look for work */
776 sendFish(choosePE(), origPE,
777 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
779 // Global statistics: count no. of fetches
780 if (RtsFlags.ParFlags.ParStats.Global &&
781 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
782 globalParStats.tot_fish_mess++;
784 } else { /* otherwise, send it home to die */
785 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
786 // Global statistics: count no. of fetches
787 if (RtsFlags.ParFlags.ParStats.Global &&
788 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
789 globalParStats.tot_fish_mess++;
796 * processFetch either returns the requested data (if available)
797 * or blocks the remote blocking queue on a black hole (if not).
800 //@cindex processFetch
809 unpackFetch(&ga, &rga, &load);
811 belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
812 ga.payload.gc.gtid, ga.payload.gc.slot,
813 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
814 rga.payload.gc.gtid));
816 closure = GALAlookup(&ga);
817 ASSERT(closure != (StgClosure *)NULL);
818 ip = get_itbl(closure);
819 if (ip->type == FETCH_ME) {
820 /* Forward the Fetch to someone else */
821 sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
823 // Global statistics: count no. of fetches
824 if (RtsFlags.ParFlags.ParStats.Global &&
825 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
826 globalParStats.tot_fetch_mess++;
828 } else if (rga.payload.gc.gtid == mytid) {
829 /* Our own FETCH forwarded back around to us */
830 StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
833 belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
834 closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
835 /* We may have already discovered that the fetch target is our own. */
836 if ((StgClosure *)fmbq != closure)
837 CommonUp((StgClosure *)fmbq, closure);
838 (void) addWeight(&rga);
839 } else if (IS_BLACK_HOLE(closure)) {
840 /* This includes RBH's and FMBQ's */
843 /* Can we assert something on the remote GA? */
844 ASSERT(GALAlookup(&rga) == NULL);
846 /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
847 closure into the BQ in order to denote that when updating this node
848 the result should be sent to the originator of this fetch message. */
849 bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
851 belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
852 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
853 closure, info_type(closure)));
854 blockFetch(bf, closure);
856 /* The target of the FetchMe is some local graph */
858 // StgClosure *graph;
859 rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
861 if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
862 barf("processFetch: out of heap while packing graph; ToDo: call GC here");
863 GarbageCollect(GetRoots, rtsFalse);
864 closure = GALAlookup(&ga);
865 buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
866 ASSERT(buffer != (rtsPackBuffer *)NULL);
868 sendResume(&rga, size, buffer);
870 // Global statistics: count no. of fetches
871 if (RtsFlags.ParFlags.ParStats.Global &&
872 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
873 globalParStats.tot_resume_mess++;
879 The list of pending fetches must be a root-list for GC.
880 This routine is called from GC.c (same as marking GAs etc).
883 markPendingFetches(rtsBool major_gc) {
885 /* No need to traverse the list; this is done via the scavenge code
886 for a BLOCKED_FETCH closure, which evacuates the link field */
888 if (PendingFetches != END_BF_QUEUE ) {
890 fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
893 PendingFetches = MarkRoot((StgClosure*)PendingFetches);
895 IF_PAR_DEBUG(verbose,
896 fprintf(stderr, " %p\n", PendingFetches));
900 fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
905 * processFree unpacks a FREE message and adds the weights to our GAs.
907 //@cindex processFree
912 static StgWord *buffer;
916 buffer = (StgWord *)gumPackBuffer;
917 unpackFree(&nelem, buffer);
919 belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
921 ga.payload.gc.gtid = mytid;
922 for (i = 0; i < nelem;) {
923 ga.weight = (rtsWeight) buffer[i++];
924 ga.payload.gc.slot = (int) buffer[i++];
926 fprintf(stderr, "!!-- Processing free ");
930 (void) addWeight(&ga);
935 * processResume unpacks a RESUME message into the graph, filling in
936 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
937 * FetchMe (now a blocking queue) are awakened, and the blocking queue
938 * is converted into an indirection. Finally it sends an ACK in response
939 * which contains any newly allocated GAs.
942 //@cindex processResume
944 processResume(GlobalTaskId sender)
948 static rtsPackBuffer *packBuffer;
949 StgClosure *newGraph, *old;
953 packBuffer = (rtsPackBuffer *)gumPackBuffer;
954 unpackResume(&lga, &nelem, packBuffer);
957 fprintf(stderr, "[]__ Rcvd Resume for ");
959 fputc('\n', stderr));
961 PrintPacket((rtsPackBuffer *)packBuffer));
964 * We always unpack the incoming graph, even if we've received the
965 * requested node in some other data packet (and already awakened
966 * the blocking queue).
967 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
968 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
969 SAVE_Hp -= packBuffer[0];
973 // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
975 /* Do this *after* GC; we don't want to release the object early! */
978 (void) addWeight(&lga);
980 old = GALAlookup(&lga);
982 /* ToDo: The closure that requested this graph must be one of these two?*/
983 ASSERT(get_itbl(old)->type == FETCH_ME_BQ ||
984 get_itbl(old)->type == RBH);
986 if (RtsFlags.ParFlags.ParStats.Full) {
987 StgBlockingQueueElement *bqe, *last_bqe;
990 belch("[]-- Resume is REPLY to closure %lx", old));
992 /* Write REPLY events to the log file, indicating that the remote
994 NB: we emit a REPLY only for the *last* elem in the queue; this is
995 the one that triggered the fetch message; all other entries
996 have just added themselves to the queue, waiting for the data
997 they know that has been requested (see entry code for FETCH_ME_BQ)
999 if ((get_itbl(old)->type == FETCH_ME_BQ ||
1000 get_itbl(old)->type == RBH)) {
1001 for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
1002 last_bqe = END_BQ_QUEUE;
1003 get_itbl(bqe)->type==TSO ||
1004 get_itbl(bqe)->type==BLOCKED_FETCH;
1005 last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
1007 ASSERT(last_bqe==END_BQ_QUEUE ||
1008 get_itbl((StgClosure *)last_bqe)->type == TSO);
1010 /* last_bqe now points to the TSO that triggered the FETCH */
1011 if (get_itbl((StgClosure *)last_bqe)->type == TSO)
1012 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
1013 GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
1014 0, spark_queue_len(&(MainRegTable.rSparks)));
1018 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1019 ASSERT(newGraph != NULL);
1022 * Sometimes, unpacking will common up the resumee with the
1023 * incoming graph, but if it hasn't, we'd better do so now.
1026 if (get_itbl(old)->type == FETCH_ME_BQ)
1027 CommonUp(old, newGraph);
1030 belch("[]-- Ready to resume unpacked graph at %p (%s)",
1031 newGraph, info_type(newGraph)));
1033 IF_PAR_DEBUG(tables,
1034 DebugPrintGAGAMap(gagamap, nGAs));
1036 sendAck(sender, nGAs, gagamap);
1040 * processSchedule unpacks a SCHEDULE message into the graph, filling
1041 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
1042 * the local spark queue. Finally it sends an ACK in response
1043 * which contains any newly allocated GAs.
1045 //@cindex processSchedule
1047 processSchedule(GlobalTaskId sender)
1051 static rtsPackBuffer *packBuffer;
1052 StgClosure *newGraph;
1053 globalAddr *gagamap;
1055 packBuffer = gumPackBuffer; /* HWL */
1056 unpackSchedule(&nelem, packBuffer);
1058 IF_PAR_DEBUG(schedule,
1059 belch("--__ Rcvd Schedule (%d elems)", nelem));
1060 IF_PAR_DEBUG(packet,
1061 PrintPacket(packBuffer));
1064 * For now, the graph is a closure to be sparked as an advisory
1065 * spark, but in future it may be a complete spark with
1066 * required/advisory status, priority etc.
1070 space_required = packBuffer[0];
1071 if (SAVE_Hp + space_required >= SAVE_HpLim) {
1072 ReallyPerformThreadGC(space_required, rtsFalse);
1073 SAVE_Hp -= space_required;
1076 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1077 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1078 ASSERT(newGraph != NULL);
1079 success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
1081 if (RtsFlags.ParFlags.ParStats.Full &&
1082 RtsFlags.ParFlags.ParStats.Sparks &&
1084 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1085 GR_STOLEN, ((StgTSO *)NULL), newGraph,
1086 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1088 IF_PAR_DEBUG(schedule,
1090 belch("--^^ added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)",
1091 newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
1093 belch("--^^ received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]",
1094 newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
1095 IF_PAR_DEBUG(packet,
1096 belch("*< Unpacked graph with root at %p (%s):",
1097 newGraph, info_type(newGraph));
1098 PrintGraph(newGraph, 0));
1100 IF_PAR_DEBUG(tables,
1101 DebugPrintGAGAMap(gagamap, nGAs));
1103 sendAck(sender, nGAs, gagamap);
1105 //fishing = rtsFalse;
1106 ASSERT(outstandingFishes>0);
1107 outstandingFishes--;
1111 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
1112 * (which represent shared thunks that have been shipped) into fetch-mes
1115 //@cindex processAck
1121 globalAddr gagamap[256]; // ToDo: elim magic constant!! MAX_GAS * 2];??
1123 unpackAck(&nGAs, gagamap);
1125 IF_PAR_DEBUG(tables,
1126 belch(",,,, Rcvd Ack (%d pairs)", nGAs);
1127 DebugPrintGAGAMap(gagamap, nGAs));
1130 checkGAGAMap(gagamap, nGAs));
1133 * For each (oldGA, newGA) pair, set the GA of the corresponding
1134 * thunk to the newGA, convert the thunk to a FetchMe, and return
1135 * the weight from the oldGA.
1137 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1138 StgClosure *old_closure = GALAlookup(gaga);
1139 StgClosure *new_closure = GALAlookup(gaga + 1);
1141 ASSERT(old_closure != NULL);
1142 if (new_closure == NULL) {
1143 /* We don't have this closure, so we make a fetchme for it */
1144 globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1146 /* convertToFetchMe should be done unconditionally here.
1147 Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1148 so we have to check whether it is an RBH before converting
1150 ASSERT(get_itbl(old_closure)==RBH);
1152 if (get_itbl(old_closure)->type==RBH)
1153 convertToFetchMe((StgRBH *)old_closure, ga);
1156 * Oops...we've got this one already; update the RBH to
1157 * point to the object we already know about, whatever it
1160 CommonUp(old_closure, new_closure);
1163 * Increase the weight of the object by the amount just
1164 * received in the second part of the ACK pair.
1166 (void) addWeight(gaga + 1);
1168 (void) addWeight(gaga);
1171 /* check the sanity of the LAGA and GALA tables after mincing them */
1172 IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
1179 barf("Task %x: TODO: should send NACK in response to REVAL",mytid);
1183 processReval(GlobalTaskId sender) //similar to schedule...
1184 { nat nelem, space_required, nGAs;
1185 static rtsPackBuffer *packBuffer;
1186 StgClosure *newGraph;
1187 globalAddr *gagamap;
1191 packBuffer = gumPackBuffer; /* HWL */
1192 unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
1194 IF_PAR_DEBUG(packet,
1195 belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
1196 PrintPacket(packBuffer));
1199 space_required = packBuffer[0];
1200 if (SAVE_Hp + space_required >= SAVE_HpLim) {
1201 ReallyPerformThreadGC(space_required, rtsFalse);
1202 SAVE_Hp -= space_required;
1206 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1207 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1208 ASSERT(newGraph != NULL);
1210 IF_PAR_DEBUG(packet,
1211 belch("@;~) Unpacked graph with root at %p (%s):",
1212 newGraph, info_type(newGraph));
1213 PrintGraph(newGraph, 0));
1215 IF_PAR_DEBUG(tables,
1216 DebugPrintGAGAMap(gagamap, nGAs));
1218 IF_PAR_DEBUG(tables,
1220 DebugPrintGAGAMap(gagamap, nGAs));
1222 //We don't send an Ack to the head!!!!
1224 sendAck(sender, nGAs-1, gagamap+2);
1226 IF_PAR_DEBUG(verbose,
1227 belch("@;~) About to create Reval thread on behalf of %x",
1230 tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
1231 tso->priority=RevalPriority;
1232 tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
1233 tso->revalTid =gagamap->payload.gc.gtid;
1234 scheduleThread(tso);
1235 context_switch = 1; // switch at the earliest opportunity
1240 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1241 //@subsection GUM Message Processor
1244 * GUM Message Processor
1246 * processMessages processes any messages that have arrived, calling
1247 * appropriate routines depending on the message tag
1248 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1249 * present and performs a blocking receive! During profiling it
1250 * busy-waits in order to record idle time.
1253 //@cindex processMessages
1255 processMessages(void)
1260 rtsBool receivedFinish = rtsFalse;
1263 packet = GetPacket(); /* Get next message; block until one available */
1264 getOpcodeAndSender(packet, &opcode, &task);
1266 if (task==SysManTask) {
1273 IF_PAR_DEBUG(verbose,
1274 belch("==== received FINISH [%p]", mytid));
1275 /* this boolean value is returned and propagated to the main
1276 scheduling loop, thus shutting-down this PE */
1277 receivedFinish = rtsTrue;
1281 barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
1283 } else if (taskIDtoPE(task)==0) {
1284 /* When a new PE joins then potentially FISH & REVAL message may
1285 reach PES before they are notified of the new PEs existance. The
1286 only solution is to bounce/fail these messages back to the sender.
1287 But we will worry about it once we start seeing these race
1299 belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
1306 belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
1312 // Global statistics: count no. of fetches
1313 if (RtsFlags.ParFlags.ParStats.Global &&
1314 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1315 globalParStats.rec_fetch_mess++;
1320 processResume(task);
1321 // Global statistics: count no. of fetches
1322 if (RtsFlags.ParFlags.ParStats.Global &&
1323 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1324 globalParStats.rec_resume_mess++;
1334 // Global statistics: count no. of fetches
1335 if (RtsFlags.ParFlags.ParStats.Global &&
1336 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1337 globalParStats.rec_fish_mess++;
1346 processSchedule(task);
1347 // Global statistics: count no. of fetches
1348 if (RtsFlags.ParFlags.ParStats.Global &&
1349 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1350 globalParStats.rec_schedule_mess++;
1357 // Global statistics: count no. of fetches
1358 if (RtsFlags.ParFlags.ParStats.Global &&
1359 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1360 globalParStats.rec_reval_mess++;
1366 /* Anything we're not prepared to deal with. */
1367 barf("Task %x: Unexpected opcode %x from %x",
1368 mytid, opcode, task);
1371 } while (PacketsWaiting()); /* While there are messages: process them */
1372 return receivedFinish;
1373 } /* processMessages */
1375 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1376 //@subsection Miscellaneous Functions
1379 * blockFetch blocks a BlockedFetch node on some kind of black hole.
1381 //@cindex blockFetch
1383 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1385 switch (get_itbl(bh)->type) {
1387 bf->link = END_BQ_QUEUE;
1388 //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
1389 SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
1390 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1392 // put bh on the mutables list
1393 recordMutable((StgMutClosure *)bh);
1397 /* enqueue bf on blocking queue of closure bh */
1398 bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1399 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1401 // put bh on the mutables list; ToDo: check
1402 recordMutable((StgMutClosure *)bh);
1406 /* enqueue bf on blocking queue of closure bh */
1407 bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1408 ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1410 // put bh on the mutables list; ToDo: check
1411 recordMutable((StgMutClosure *)bh);
1415 /* enqueue bf on blocking queue of closure bh */
1416 bf->link = ((StgRBH *)bh)->blocking_queue;
1417 ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1419 // put bh on the mutables list; ToDo: check
1420 recordMutable((StgMutClosure *)bh);
1424 barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1425 (StgClosure *)bh, get_itbl((StgClosure *)bh),
1426 info_type((StgClosure *)bh));
1429 belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1436 @blockThread@ is called from the main scheduler whenever tso returns with
1437 a ThreadBlocked return code; tso has already been added to a blocking
1438 queue (that's done in the entry code of the closure, because it is a
1439 cheap operation we have to do in any case); the main purpose of this
1440 routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1441 closure, which is indicated by the tso.why_blocked field;
1442 we also write an entry into the log file if we are generating one
1444 Should update exectime etc in the entry code already; but we don't have
1445 something like ``system time'' in the log file anyway, so this should
1446 even out the inaccuracies.
1449 //@cindex blockThread
1451 blockThread(StgTSO *tso)
1453 globalAddr *remote_ga=NULL;
1454 globalAddr *local_ga;
1457 // ASSERT(we are on some blocking queue)
1458 ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1461 We have to check why this thread has been blocked.
1463 switch (tso->why_blocked) {
1465 /* the closure must be a FETCH_ME_BQ; tso came in here via
1466 FETCH_ME entry code */
1467 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1469 /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1470 end this point; if something (eg. GC) happens inbetween the whole
1472 The problem is that the ga field of the FETCH_ME has been overwritten
1473 with the head of the blocking queue (which is tso).
1475 ASSERT(looks_like_ga(&theGlobalFromGA));
1476 // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1477 remote_ga = &theGlobalFromGA; //tso->link;
1478 tso->link = (StgTSO*)END_BQ_QUEUE;
1479 /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1480 we have to send a Fetch message here! */
1481 if (RtsFlags.ParFlags.ParStats.Full) {
1482 /* Note that CURRENT_TIME may perform an unsafe call */
1483 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1484 tso->par.fetchcount++;
1485 tso->par.blockedat = CURRENT_TIME;
1486 /* we are about to send off a FETCH message, so dump a FETCH event */
1487 DumpRawGranEvent(CURRENT_PROC,
1488 taskIDtoPE(remote_ga->payload.gc.gtid),
1489 GR_FETCH, tso, tso->block_info.closure, 0, 0);
1491 /* Phil T. claims that this was a workaround for a hard-to-find
1492 * bug, hence I'm leaving it out for now --SDM
1494 /* Assign a brand-new global address to the newly created FMBQ */
1495 local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1496 splitWeight(&fmbq_ga, local_ga);
1497 ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
1499 sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1501 // Global statistics: count no. of fetches
1502 if (RtsFlags.ParFlags.ParStats.Global &&
1503 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1504 globalParStats.tot_fetch_mess++;
1508 theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
1511 case BlockedOnGA_NoSend:
1512 /* the closure must be a FETCH_ME_BQ; tso came in here via
1513 FETCH_ME_BQ entry code */
1514 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1516 /* Fetch message has been sent already */
1517 if (RtsFlags.ParFlags.ParStats.Full) {
1518 /* Note that CURRENT_TIME may perform an unsafe call */
1519 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1520 tso->par.blockcount++;
1521 tso->par.blockedat = CURRENT_TIME;
1522 /* dump a block event, because fetch has been sent already */
1523 DumpRawGranEvent(CURRENT_PROC, thisPE,
1524 GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1529 case BlockedOnBlackHole:
1530 /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via
1531 BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1532 ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
1533 get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1534 get_itbl(tso->block_info.closure)->type==RBH);
1536 /* if collecting stats update the execution time etc */
1537 if (RtsFlags.ParFlags.ParStats.Full) {
1538 /* Note that CURRENT_TIME may perform an unsafe call */
1539 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1540 tso->par.blockcount++;
1541 tso->par.blockedat = CURRENT_TIME;
1542 DumpRawGranEvent(CURRENT_PROC, thisPE,
1543 GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1547 case BlockedOnDelay:
1548 /* Whats sort of stats shall we collect for an explicit threadDelay? */
1549 IF_PAR_DEBUG(verbose,
1550 belch("##++ blockThread: TSO %d blocked on ThreadDelay",
1554 /* Check that the following is impossible to happen, indeed
1555 case BlockedOnException:
1557 case BlockedOnWrite:
1560 barf("blockThread: impossible why_blocked code %d for TSO %d",
1561 tso->why_blocked, tso->id);
1564 IF_PAR_DEBUG(verbose,
1565 belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
1566 tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
1567 (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
1570 print_bq(tso->block_info.closure));
1574 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1575 * Important properties:
1576 * - it varies during execution, even if the PE is idle
1577 * - it's different for each PE
1578 * - we never send a fish to ourselves
1580 extern long lrand48 (void);
1588 temp = lrand48() % nPEs;
1589 if (allPEs[temp] == mytid) { /* Never send a FISH to yourself */
1590 temp = (temp + 1) % nPEs;
1592 return allPEs[temp];
1596 * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1597 * of the ga argument; called from processFetch when the local closure is
1600 //@cindex createBlockedFetch
1602 createBlockedFetch (globalAddr ga, globalAddr rga)
1604 StgBlockedFetch *bf;
1605 StgClosure *closure;
1607 closure = GALAlookup(&ga);
1608 if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
1609 barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1610 GarbageCollect(GetRoots, rtsFalse);
1611 closure = GALAlookup(&ga);
1612 bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
1613 // ToDo: check whether really guaranteed to succeed 2nd time around
1616 ASSERT(bf != (StgBlockedFetch *)NULL);
1617 SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
1618 // ToDo: check whether other header info is needed
1620 bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1621 bf->ga.payload.gc.slot = rga.payload.gc.slot;
1622 bf->ga.weight = rga.weight;
1623 // bf->link = NULL; debugging
1625 IF_PAR_DEBUG(schedule,
1626 fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1627 bf, info_type((StgClosure*)bf));
1629 fputc('\n',stderr));
1630 return (StgClosure *)bf;
1634 * waitForTermination enters a loop ignoring spurious messages while
1635 * waiting for the termination sequence to be completed.
1637 //@cindex waitForTermination
1639 waitForTermination(void)
1642 rtsPacket p = GetPacket();
1643 processUnexpectedMessage(p);
1648 //@cindex DebugPrintGAGAMap
1650 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1654 for (i = 0; i < nGAs; ++i, gagamap += 2)
1655 fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1656 gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1657 gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1660 //@cindex checkGAGAMap
1662 checkGAGAMap(globalAddr *gagamap, int nGAs)
1666 for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
1667 ASSERT(looks_like_ga(gagamap));
1668 ASSERT(looks_like_ga(gagamap+1));
1673 //@cindex freeMsgBuffer
1674 static StgWord **freeMsgBuffer = NULL;
1675 //@cindex freeMsgIndex
1676 static nat *freeMsgIndex = NULL;
1678 //@cindex prepareFreeMsgBuffers
1680 prepareFreeMsgBuffers(void)
1684 /* Allocate the freeMsg buffers just once and then hang onto them. */
1685 if (freeMsgIndex == NULL) {
1686 freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat),
1687 "prepareFreeMsgBuffers (Index)");
1688 freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *),
1689 "prepareFreeMsgBuffers (Buffer)");
1691 for(i = 0; i < nPEs; i++)
1692 if (i != (thisPE-1))
1693 freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1694 "prepareFreeMsgBuffers (Buffer #i)");
1696 freeMsgBuffer[i] = 0;
1699 /* Initialize the freeMsg buffer pointers to point to the start of their
1701 for (i = 0; i < nPEs; i++)
1702 freeMsgIndex[i] = 0;
1705 //@cindex freeRemoteGA
1707 freeRemoteGA(int pe, globalAddr *ga)
1711 ASSERT(GALAlookup(ga) == NULL);
1713 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1715 belch("!! Filled a free message buffer (sending remaining messages indivisually)"));
1717 sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1720 freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1721 freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1722 freeMsgIndex[pe] = i;
1725 ga->weight = 0xdead0add;
1726 ga->payload.gc.gtid = 0xbbbbbbbb;
1727 ga->payload.gc.slot = 0xbbbbbbbb;);
1730 //@cindex sendFreeMessages
1732 sendFreeMessages(void)
1736 for (i = 0; i < nPEs; i++)
1737 if (freeMsgIndex[i] > 0)
1738 sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1741 /* synchronises with the other PEs. Receives and records in a global
1742 * variable the task-id of SysMan. If this is the main thread (discovered
1743 * in main.lc), identifies itself to SysMan. Finally it receives
1744 * from SysMan an array of the Global Task Ids of each PE, which is
1745 * returned as the value of the function.
1748 #if defined(PAR_TICKY)
1749 /* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
1750 //@cindex stats_CntFreeGA
1752 stats_CntFreeGA (void) { // stats only
1754 // Global statistics: residency of thread and spark pool
1755 if (RtsFlags.ParFlags.ParStats.Global &&
1756 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1759 globalParStats.cnt_free_GA++;
1760 for (i = 0, s = 0; i < nPEs; i++)
1761 s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
1763 if ( s > globalParStats.res_free_GA )
1764 globalParStats.res_free_GA = s;
1767 #endif /* PAR_TICKY */
1769 #endif /* PAR -- whole file */
1771 //@node Index, , Miscellaneous Functions, High Level Communications Routines
1775 //* ACK:: @cindex\s-+ACK
1776 //* DebugPrintGAGAMap:: @cindex\s-+DebugPrintGAGAMap
1777 //* FETCH:: @cindex\s-+FETCH
1778 //* FISH:: @cindex\s-+FISH
1779 //* FREE:: @cindex\s-+FREE
1780 //* RESUME:: @cindex\s-+RESUME
1781 //* SCHEDULE:: @cindex\s-+SCHEDULE
1782 //* blockFetch:: @cindex\s-+blockFetch
1783 //* choosePE:: @cindex\s-+choosePE
1784 //* freeMsgBuffer:: @cindex\s-+freeMsgBuffer
1785 //* freeMsgIndex:: @cindex\s-+freeMsgIndex
1786 //* freeRemoteGA:: @cindex\s-+freeRemoteGA
1787 //* gumPackBuffer:: @cindex\s-+gumPackBuffer
1788 //* initMoreBuffers:: @cindex\s-+initMoreBuffers
1789 //* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers
1790 //* processAck:: @cindex\s-+processAck
1791 //* processFetch:: @cindex\s-+processFetch
1792 //* processFetches:: @cindex\s-+processFetches
1793 //* processFish:: @cindex\s-+processFish
1794 //* processFree:: @cindex\s-+processFree
1795 //* processMessages:: @cindex\s-+processMessages
1796 //* processResume:: @cindex\s-+processResume
1797 //* processSchedule:: @cindex\s-+processSchedule
1798 //* sendAck:: @cindex\s-+sendAck
1799 //* sendFetch:: @cindex\s-+sendFetch
1800 //* sendFish:: @cindex\s-+sendFish
1801 //* sendFree:: @cindex\s-+sendFree
1802 //* sendFreeMessages:: @cindex\s-+sendFreeMessages
1803 //* sendResume:: @cindex\s-+sendResume
1804 //* sendSchedule:: @cindex\s-+sendSchedule
1805 //* unpackAck:: @cindex\s-+unpackAck
1806 //* unpackFetch:: @cindex\s-+unpackFetch
1807 //* unpackFish:: @cindex\s-+unpackFish
1808 //* unpackFree:: @cindex\s-+unpackFree
1809 //* unpackResume:: @cindex\s-+unpackResume
1810 //* unpackSchedule:: @cindex\s-+unpackSchedule
1811 //* waitForTermination:: @cindex\s-+waitForTermination