1 /* ----------------------------------------------------------------------------
2 * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
3 * $Id: HLComms.c,v 1.6 2001/08/14 13:40:10 sewardj Exp $
5 * High Level Communications Routines (HLComms.lc)
7 * Contains the high-level routines (i.e. communication
8 * subsystem independent) used by GUM
10 * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
11 * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
12 * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
14 * ------------------------------------------------------------------------- */
16 #ifdef PAR /* whole file */
18 //@node High Level Communications Routines, , ,
19 //@section High Level Communications Routines
24 //* GUM Message Sending and Unpacking Functions::
25 //* Message-Processing Functions::
26 //* GUM Message Processor::
27 //* Miscellaneous Functions::
31 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
32 //@subsection Macros etc
34 /* Evidently not Posix */
35 /* #include "PosixSource.h" */
37 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
38 //@subsection Includes
43 #include "Storage.h" // for recordMutable
46 #include "GranSimRts.h"
47 #include "ParallelRts.h"
49 #include "FetchMe.h" // for BLOCKED_FETCH_info etc
51 # include "ParallelDebug.h"
53 #include "StgMacros.h" // inlined IS_... fcts
56 #include "SchedAPI.h" //for createIOThread
57 extern unsigned int context_switch;
60 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
61 //@subsection GUM Message Sending and Unpacking Functions
64 * GUM Message Sending and Unpacking Functions
68 * Allocate space for message processing
71 //@cindex gumPackBuffer
72 static rtsPackBuffer *gumPackBuffer;
74 //@cindex initMoreBuffers
78 if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize,
79 "initMoreBuffers")) == NULL)
85 * SendFetch packs the two global addresses and a load into a message +
90 Structure of a FETCH message:
93 +------------------------------------+------+
94 | gtid | slot | weight | gtid | slot | load |
95 +------------------------------------+------+
100 sendFetch(globalAddr *rga, globalAddr *lga, int load)
102 ASSERT(rga->weight > 0 && lga->weight > 0);
104 belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
105 rga->payload.gc.gtid, rga->payload.gc.slot,
106 lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
111 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid),
112 GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
113 0, spark_queue_len(ADVISORY_POOL));
116 sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
117 (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot,
118 (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid,
119 (StgWord) lga->payload.gc.slot, (StgWord) load);
123 * unpackFetch unpacks a FETCH message into two Global addresses and a load
127 //@cindex unpackFetch
129 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
136 belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
137 (GlobalTaskId) buf[0], (int) buf[1],
138 (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
141 lga->payload.gc.gtid = (GlobalTaskId) buf[0];
142 lga->payload.gc.slot = (int) buf[1];
144 rga->weight = (unsigned) buf[2];
145 rga->payload.gc.gtid = (GlobalTaskId) buf[3];
146 rga->payload.gc.slot = (int) buf[4];
148 *load = (int) buf[5];
150 ASSERT(rga->weight > 0);
154 * SendResume packs the remote blocking queue's GA and data into a message
159 Structure of a RESUME message:
161 -------------------------------
162 | weight | slot | n | data ...
163 -------------------------------
165 data is a packed graph represented as an rtsPackBuffer
166 n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
171 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
174 belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]",
175 packBuffer->id, nelem,
176 rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
177 rga->payload.gc.gtid));
179 PrintPacket(packBuffer));
181 ASSERT(nelem==packBuffer->size);
182 /* check for magic end-of-buffer word */
183 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
185 sendOpNV(PP_RESUME, rga->payload.gc.gtid,
186 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer,
187 2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
191 * unpackResume unpacks a Resume message into two Global addresses and
195 //@cindex unpackResume
197 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
204 RESUME event is written in awaken_blocked_queue
205 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
206 GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
209 lga->weight = (unsigned) buf[0];
210 lga->payload.gc.gtid = mytid;
211 lga->payload.gc.slot = (int) buf[1];
213 *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
214 GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
217 belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))",
218 packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
220 /* check for magic end-of-buffer word */
221 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
225 * SendAck packs the global address being acknowledged, together with
226 * an array of global addresses for any closures shipped and sends them.
230 Structure of an ACK message:
233 +---------------------------------------------+-------
234 | weight | gtid | slot | weight | gtid | slot | ..... ngas times
235 + --------------------------------------------+-------
241 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
248 return; //don't send unnecessary messages!!
250 buffer = (long *) gumPackBuffer;
252 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
253 ASSERT(gagamap[1].weight > 0);
254 p[0] = (long) gagamap->weight;
255 p[1] = (long) gagamap->payload.gc.gtid;
256 p[2] = (long) gagamap->payload.gc.slot;
258 p[3] = (long) gagamap->weight;
259 p[4] = (long) gagamap->payload.gc.gtid;
260 p[5] = (long) gagamap->payload.gc.slot;
263 IF_PAR_DEBUG(schedule,
264 belch("~^,, Sending Ack (%d pairs) to [%x]\n",
267 sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
271 * unpackAck unpacks an Acknowledgement message into a Global address,
272 * a count of the number of global addresses following and a map of
278 unpackAck(int *ngas, globalAddr *gagamap)
283 GetArgs(&GAarraysize, 1);
285 *ngas = GAarraysize / 6;
287 IF_PAR_DEBUG(schedule,
288 belch("~^,, Unpacking Ack (%d pairs) on [%x]\n",
291 while (GAarraysize > 0) {
293 gagamap->weight = (rtsWeight) buf[0];
294 gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
295 gagamap->payload.gc.slot = (int) buf[2];
297 gagamap->weight = (rtsWeight) buf[3];
298 gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
299 gagamap->payload.gc.slot = (int) buf[5];
300 ASSERT(gagamap->weight > 0);
307 * SendFish packs the global address being acknowledged, together with
308 * an array of global addresses for any closures shipped and sends them.
312 Structure of a FISH message:
314 +----------------------------------+
315 | orig PE | age | history | hunger |
316 +----------------------------------+
321 sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
322 int age, int history, int hunger)
325 belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)",
326 destPE, outstandingFishes));
328 sendOpV(PP_FISH, destPE, 4,
329 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
331 if (origPE == mytid) {
338 * unpackFish unpacks a FISH message into the global task id of the
339 * originating PE and 3 data fields: the age, history and hunger of the
340 * fish. The history + hunger are not currently used.
346 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
353 belch("~^$$ Unpacking Fish from [%x] (age=%d)",
354 (GlobalTaskId) buf[0], (int) buf[1]));
356 *origPE = (GlobalTaskId) buf[0];
358 *history = (int) buf[2];
359 *hunger = (int) buf[3];
363 * SendFree sends (weight, slot) pairs for GAs that we no longer need
368 Structure of a FREE message:
370 +-----------------------------
371 | n | weight_1 | slot_1 | ...
372 +-----------------------------
376 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
379 belch("~^!! Sending Free (%d GAs) to [%x]",
382 sendOpN(PP_FREE, pe, nelem, data);
386 * unpackFree unpacks a FREE message into the amount of data shipped and
391 unpackFree(int *nelem, StgWord *data)
396 *nelem = (int) buf[0];
399 belch("~^!! Unpacking Free (%d GAs)",
402 GetArgs(data, *nelem);
406 * SendSchedule sends a closure to be evaluated in response to a Fish
407 * message. The message is directed to the PE that originated the Fish
408 * (origPE), and includes the packed closure (data) along with its size
413 Structure of a SCHEDULE message:
415 +------------------------------------
416 | PE | n | pack buffer of a graph ...
417 +------------------------------------
419 //@cindex sendSchedule
421 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
423 IF_PAR_DEBUG(schedule,
424 belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n",
425 packBuffer->id, nelem, origPE));
427 PrintPacket(packBuffer));
429 ASSERT(nelem==packBuffer->size);
430 /* check for magic end-of-buffer word */
431 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
433 sendOpN(PP_SCHEDULE, origPE,
434 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
438 * unpackSchedule unpacks a SCHEDULE message into the Global address of
439 * the closure shipped, the amount of data shipped (nelem) and the data
443 //@cindex unpackSchedule
445 unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
449 /* first, just unpack 1 word containing the total size (including header) */
451 /* no. of elems, not counting the header of the pack buffer */
452 *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
454 /* automatic cast of flat pvm-data to rtsPackBuffer */
455 GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
457 IF_PAR_DEBUG(schedule,
458 belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n",
459 packBuffer->id, *nelem, mytid));
461 ASSERT(*nelem==packBuffer->size);
462 /* check for magic end-of-buffer word */
463 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
467 /* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
469 sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
471 IF_PAR_DEBUG(schedule,
472 belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n",
473 packBuffer->id, nelem, origPE));
475 PrintPacket(packBuffer));
477 ASSERT(nelem==packBuffer->size);
478 /* check for magic end-of-buffer word */
479 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
481 sendOpN(PP_REVAL, origPE,
482 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
485 void FinishReval(StgTSO *t)
489 rtsPackBuffer *buffer=NULL;
491 ga.payload.gc.slot = t->revalSlot;
492 ga.payload.gc.gtid = t->revalTid;
495 //find where the reval result is
496 res = GALAlookup(&ga);
499 IF_PAR_DEBUG(schedule,
501 belch(" needs the result %08x\n",res));
503 //send off the result
504 buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
505 ASSERT(buffer != (rtsPackBuffer *)NULL);
506 sendResume(&ga, size, buffer);
508 IF_PAR_DEBUG(schedule,
509 belch("@;~) Reval Finished"));
514 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
515 //@subsection Message-Processing Functions
518 * Message-Processing Functions
520 * The following routines process incoming GUM messages. Often reissuing
521 * messages in response.
523 * processFish unpacks a fish message, reissuing it if it's our own,
524 * sending work if we have it or sending it onwards otherwise.
528 * processFetches constructs and sends resume messages for every
529 * BlockedFetch which is ready to be awakened.
530 * awaken_blocked_queue (in Schedule.c) is responsible for moving
531 * BlockedFetches from a blocking queue to the PendingFetches queue.
534 extern StgBlockedFetch *PendingFetches;
537 pending_fetches_len(void)
542 for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
543 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
548 //@cindex processFetches
550 processFetches(void) {
551 StgBlockedFetch *bf, *next;
555 static rtsPackBuffer *packBuffer;
557 IF_PAR_DEBUG(verbose,
558 belch("____ processFetches: %d pending fetches (root @ %p)",
559 pending_fetches_len(), PendingFetches));
561 for (bf = PendingFetches;
564 /* the PendingFetches list contains only BLOCKED_FETCH closures */
565 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
566 /* store link (we might overwrite it via blockFetch later on */
567 next = (StgBlockedFetch *)(bf->link);
570 * Find the target at the end of the indirection chain, and
571 * process it in much the same fashion as the original target
572 * of the fetch. Though we hope to find graph here, we could
573 * find a black hole (of any flavor) or even a FetchMe.
577 We evacuate BQs and update the node fields where necessary in GC.c
578 So, if we find an EVACUATED closure, something has gone Very Wrong
579 (and therefore we let the RTS crash most ungracefully).
581 ASSERT(get_itbl(closure)->type != EVACUATED);
582 // closure = ((StgEvacuated *)closure)->evacuee;
584 closure = UNWIND_IND(closure);
585 //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
587 ip = get_itbl(closure);
588 if (ip->type == FETCH_ME) {
589 /* Forward the Fetch to someone else */
590 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
591 rga.payload.gc.slot = bf->ga.payload.gc.slot;
592 rga.weight = bf->ga.weight;
594 sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
596 // Global statistics: count no. of fetches
597 if (RtsFlags.ParFlags.ParStats.Global &&
598 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
599 globalParStats.tot_fetch_mess++;
603 belch("__-> processFetches: Forwarding fetch from %lx to %lx",
604 mytid, rga.payload.gc.gtid));
606 } else if (IS_BLACK_HOLE(closure)) {
607 IF_PAR_DEBUG(verbose,
608 belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
609 closure, info_type(closure)));
611 blockFetch(bf, closure);
613 /* We now have some local graph to send back */
616 packBuffer = gumPackBuffer;
617 IF_PAR_DEBUG(verbose,
618 belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
619 closure, info_type(closure)));
621 if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
622 // Put current BF back on list
623 bf->link = (StgBlockingQueueElement *)PendingFetches;
624 PendingFetches = (StgBlockedFetch *)bf;
625 // ToDo: check that nothing more has to be done to prepare for GC!
626 barf("processFetches: out of heap while packing graph; ToDo: call GC here");
627 GarbageCollect(GetRoots, rtsFalse);
629 PendingFetches = (StgBlockedFetch *)(bf->link);
631 packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
632 ASSERT(packBuffer != (rtsPackBuffer *)NULL);
634 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
635 rga.payload.gc.slot = bf->ga.payload.gc.slot;
636 rga.weight = bf->ga.weight;
638 sendResume(&rga, size, packBuffer);
640 // Global statistics: count no. of fetches
641 if (RtsFlags.ParFlags.ParStats.Global &&
642 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
643 globalParStats.tot_resume_mess++;
647 PendingFetches = END_BF_QUEUE;
652 Alternatively to sending fetch messages directly from the FETCH_ME_entry
653 code we could just store the data about the remote data in a global
654 variable and send the fetch request from the main scheduling loop (similar
655 to processFetches above). This would save an expensive STGCALL in the entry
656 code because we have to go back to the scheduler anyway.
658 //@cindex processFetches
660 processTheRealFetches(void) {
662 StgClosure *closure, *next;
664 IF_PAR_DEBUG(verbose,
665 belch("__ processTheRealFetches: ");
666 printGA(&theGlobalFromGA);
667 printGA(&theGlobalToGA));
669 ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
670 theGlobalToGA.payload.gc.gtid != 0);
672 /* the old version did this in the FETCH_ME entry code */
673 sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
680 Way of dealing with unwanted fish.
681 Used during startup/shutdown, or from unknown PEs
686 int age, history, hunger;
688 /* IF_PAR_DEBUG(verbose, */
689 belch(".... [%x] Bouncing unwanted FISH",mytid);
691 unpackFish(&origPE, &age, &history, &hunger);
693 if (origPE == mytid) {
694 //fishing = rtsFalse; // fish has come home
696 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
697 return; // that's all
700 /* otherwise, send it home to die */
701 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
702 // Global statistics: count no. of fetches
703 if (RtsFlags.ParFlags.ParStats.Global &&
704 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
705 globalParStats.tot_fish_mess++;
710 * processFish unpacks a fish message, reissuing it if it's our own,
711 * sending work if we have it or sending it onwards otherwise.
713 //@cindex processFish
718 int age, history, hunger;
720 static rtsPackBuffer *packBuffer;
722 unpackFish(&origPE, &age, &history, &hunger);
724 if (origPE == mytid) {
725 //fishing = rtsFalse; // fish has come home
727 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
728 return; // that's all
731 ASSERT(origPE != mytid);
733 belch("$$__ processing fish; %d sparks available",
734 spark_queue_len(&(MainRegTable.rSparks))));
735 while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
737 // StgClosure *graph;
739 packBuffer = gumPackBuffer;
740 ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
741 if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
743 belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
744 (StgClosure *)spark));
745 barf("processFish: out of heap while packing graph; ToDo: call GC here");
746 GarbageCollect(GetRoots, rtsFalse);
747 /* Now go back and try again */
749 IF_PAR_DEBUG(verbose,
750 if (RtsFlags.ParFlags.ParStats.Sparks)
751 belch("==== STEALING spark %x; sending to %x", spark, origPE));
754 belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
756 (StgClosure *)spark, info_type((StgClosure *)spark)));
757 sendSchedule(origPE, size, packBuffer);
759 // Global statistics: count no. of fetches
760 if (RtsFlags.ParFlags.ParStats.Global &&
761 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
762 globalParStats.tot_schedule_mess++;
768 if (spark == (rtsSpark)NULL) {
770 belch("$$^^ No sparks available for FISH from %x",
772 /* We have no sparks to give */
773 if (age < FISH_LIFE_EXPECTANCY) {
774 /* and the fish is atill young, send it to another PE to look for work */
775 sendFish(choosePE(), origPE,
776 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
778 // Global statistics: count no. of fetches
779 if (RtsFlags.ParFlags.ParStats.Global &&
780 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
781 globalParStats.tot_fish_mess++;
783 } else { /* otherwise, send it home to die */
784 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
785 // Global statistics: count no. of fetches
786 if (RtsFlags.ParFlags.ParStats.Global &&
787 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
788 globalParStats.tot_fish_mess++;
795 * processFetch either returns the requested data (if available)
796 * or blocks the remote blocking queue on a black hole (if not).
799 //@cindex processFetch
808 unpackFetch(&ga, &rga, &load);
810 belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
811 ga.payload.gc.gtid, ga.payload.gc.slot,
812 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
813 rga.payload.gc.gtid));
815 closure = GALAlookup(&ga);
816 ASSERT(closure != (StgClosure *)NULL);
817 ip = get_itbl(closure);
818 if (ip->type == FETCH_ME) {
819 /* Forward the Fetch to someone else */
820 sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
822 // Global statistics: count no. of fetches
823 if (RtsFlags.ParFlags.ParStats.Global &&
824 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
825 globalParStats.tot_fetch_mess++;
827 } else if (rga.payload.gc.gtid == mytid) {
828 /* Our own FETCH forwarded back around to us */
829 StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
832 belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
833 closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
834 /* We may have already discovered that the fetch target is our own. */
835 if ((StgClosure *)fmbq != closure)
836 CommonUp((StgClosure *)fmbq, closure);
837 (void) addWeight(&rga);
838 } else if (IS_BLACK_HOLE(closure)) {
839 /* This includes RBH's and FMBQ's */
842 /* Can we assert something on the remote GA? */
843 ASSERT(GALAlookup(&rga) == NULL);
845 /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
846 closure into the BQ in order to denote that when updating this node
847 the result should be sent to the originator of this fetch message. */
848 bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
850 belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
851 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
852 closure, info_type(closure)));
853 blockFetch(bf, closure);
855 /* The target of the FetchMe is some local graph */
857 // StgClosure *graph;
858 rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
860 if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
861 barf("processFetch: out of heap while packing graph; ToDo: call GC here");
862 GarbageCollect(GetRoots, rtsFalse);
863 closure = GALAlookup(&ga);
864 buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
865 ASSERT(buffer != (rtsPackBuffer *)NULL);
867 sendResume(&rga, size, buffer);
869 // Global statistics: count no. of fetches
870 if (RtsFlags.ParFlags.ParStats.Global &&
871 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
872 globalParStats.tot_resume_mess++;
878 The list of pending fetches must be a root-list for GC.
879 This routine is called from GC.c (same as marking GAs etc).
882 markPendingFetches(rtsBool major_gc) {
884 /* No need to traverse the list; this is done via the scavenge code
885 for a BLOCKED_FETCH closure, which evacuates the link field */
887 if (PendingFetches != END_BF_QUEUE ) {
889 fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
892 PendingFetches = MarkRoot((StgClosure*)PendingFetches);
894 IF_PAR_DEBUG(verbose,
895 fprintf(stderr, " %p\n", PendingFetches));
899 fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
904 * processFree unpacks a FREE message and adds the weights to our GAs.
906 //@cindex processFree
911 static StgWord *buffer;
915 buffer = (StgWord *)gumPackBuffer;
916 unpackFree(&nelem, buffer);
918 belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
920 ga.payload.gc.gtid = mytid;
921 for (i = 0; i < nelem;) {
922 ga.weight = (rtsWeight) buffer[i++];
923 ga.payload.gc.slot = (int) buffer[i++];
925 fprintf(stderr, "!!-- Processing free ");
929 (void) addWeight(&ga);
934 * processResume unpacks a RESUME message into the graph, filling in
935 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
936 * FetchMe (now a blocking queue) are awakened, and the blocking queue
937 * is converted into an indirection. Finally it sends an ACK in response
938 * which contains any newly allocated GAs.
941 //@cindex processResume
943 processResume(GlobalTaskId sender)
947 static rtsPackBuffer *packBuffer;
948 StgClosure *newGraph, *old;
952 packBuffer = (rtsPackBuffer *)gumPackBuffer;
953 unpackResume(&lga, &nelem, packBuffer);
956 fprintf(stderr, "[]__ Rcvd Resume for ");
958 fputc('\n', stderr));
960 PrintPacket((rtsPackBuffer *)packBuffer));
963 * We always unpack the incoming graph, even if we've received the
964 * requested node in some other data packet (and already awakened
965 * the blocking queue).
966 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
967 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
968 SAVE_Hp -= packBuffer[0];
972 // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
974 /* Do this *after* GC; we don't want to release the object early! */
977 (void) addWeight(&lga);
979 old = GALAlookup(&lga);
981 /* ToDo: The closure that requested this graph must be one of these two?*/
982 ASSERT(get_itbl(old)->type == FETCH_ME_BQ ||
983 get_itbl(old)->type == RBH);
985 if (RtsFlags.ParFlags.ParStats.Full) {
986 StgBlockingQueueElement *bqe, *last_bqe;
989 belch("[]-- Resume is REPLY to closure %lx", old));
991 /* Write REPLY events to the log file, indicating that the remote
993 NB: we emit a REPLY only for the *last* elem in the queue; this is
994 the one that triggered the fetch message; all other entries
995 have just added themselves to the queue, waiting for the data
996 they know that has been requested (see entry code for FETCH_ME_BQ)
998 if ((get_itbl(old)->type == FETCH_ME_BQ ||
999 get_itbl(old)->type == RBH)) {
1000 for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
1001 last_bqe = END_BQ_QUEUE;
1002 get_itbl(bqe)->type==TSO ||
1003 get_itbl(bqe)->type==BLOCKED_FETCH;
1004 last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
1006 ASSERT(last_bqe==END_BQ_QUEUE ||
1007 get_itbl((StgClosure *)last_bqe)->type == TSO);
1009 /* last_bqe now points to the TSO that triggered the FETCH */
1010 if (get_itbl((StgClosure *)last_bqe)->type == TSO)
1011 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
1012 GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
1013 0, spark_queue_len(&(MainRegTable.rSparks)));
1017 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1018 ASSERT(newGraph != NULL);
1021 * Sometimes, unpacking will common up the resumee with the
1022 * incoming graph, but if it hasn't, we'd better do so now.
1025 if (get_itbl(old)->type == FETCH_ME_BQ)
1026 CommonUp(old, newGraph);
1029 belch("[]-- Ready to resume unpacked graph at %p (%s)",
1030 newGraph, info_type(newGraph)));
1032 IF_PAR_DEBUG(tables,
1033 DebugPrintGAGAMap(gagamap, nGAs));
1035 sendAck(sender, nGAs, gagamap);
1039 * processSchedule unpacks a SCHEDULE message into the graph, filling
1040 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
1041 * the local spark queue. Finally it sends an ACK in response
1042 * which contains any newly allocated GAs.
1044 //@cindex processSchedule
1046 processSchedule(GlobalTaskId sender)
1050 static rtsPackBuffer *packBuffer;
1051 StgClosure *newGraph;
1052 globalAddr *gagamap;
1054 packBuffer = gumPackBuffer; /* HWL */
1055 unpackSchedule(&nelem, packBuffer);
1057 IF_PAR_DEBUG(schedule,
1058 belch("--__ Rcvd Schedule (%d elems)", nelem));
1059 IF_PAR_DEBUG(packet,
1060 PrintPacket(packBuffer));
1063 * For now, the graph is a closure to be sparked as an advisory
1064 * spark, but in future it may be a complete spark with
1065 * required/advisory status, priority etc.
1069 space_required = packBuffer[0];
1070 if (SAVE_Hp + space_required >= SAVE_HpLim) {
1071 ReallyPerformThreadGC(space_required, rtsFalse);
1072 SAVE_Hp -= space_required;
1075 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1076 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1077 ASSERT(newGraph != NULL);
1078 success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
1080 if (RtsFlags.ParFlags.ParStats.Full &&
1081 RtsFlags.ParFlags.ParStats.Sparks &&
1083 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1084 GR_STOLEN, ((StgTSO *)NULL), newGraph,
1085 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1087 IF_PAR_DEBUG(schedule,
1089 belch("--^^ added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)",
1090 newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
1092 belch("--^^ received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]",
1093 newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
1094 IF_PAR_DEBUG(packet,
1095 belch("*< Unpacked graph with root at %p (%s):",
1096 newGraph, info_type(newGraph));
1097 PrintGraph(newGraph, 0));
1099 IF_PAR_DEBUG(tables,
1100 DebugPrintGAGAMap(gagamap, nGAs));
1102 sendAck(sender, nGAs, gagamap);
1104 //fishing = rtsFalse;
1105 ASSERT(outstandingFishes>0);
1106 outstandingFishes--;
1110 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
1111 * (which represent shared thunks that have been shipped) into fetch-mes
1114 //@cindex processAck
1120 globalAddr gagamap[256]; // ToDo: elim magic constant!! MAX_GAS * 2];??
1122 unpackAck(&nGAs, gagamap);
1124 IF_PAR_DEBUG(tables,
1125 belch(",,,, Rcvd Ack (%d pairs)", nGAs);
1126 DebugPrintGAGAMap(gagamap, nGAs));
1129 checkGAGAMap(gagamap, nGAs));
1132 * For each (oldGA, newGA) pair, set the GA of the corresponding
1133 * thunk to the newGA, convert the thunk to a FetchMe, and return
1134 * the weight from the oldGA.
1136 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1137 StgClosure *old_closure = GALAlookup(gaga);
1138 StgClosure *new_closure = GALAlookup(gaga + 1);
1140 ASSERT(old_closure != NULL);
1141 if (new_closure == NULL) {
1142 /* We don't have this closure, so we make a fetchme for it */
1143 globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1145 /* convertToFetchMe should be done unconditionally here.
1146 Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1147 so we have to check whether it is an RBH before converting
1149 ASSERT(get_itbl(old_closure)==RBH);
1151 if (get_itbl(old_closure)->type==RBH)
1152 convertToFetchMe((StgRBH *)old_closure, ga);
1155 * Oops...we've got this one already; update the RBH to
1156 * point to the object we already know about, whatever it
1159 CommonUp(old_closure, new_closure);
1162 * Increase the weight of the object by the amount just
1163 * received in the second part of the ACK pair.
1165 (void) addWeight(gaga + 1);
1167 (void) addWeight(gaga);
1170 /* check the sanity of the LAGA and GALA tables after mincing them */
1171 IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
1178 barf("Task %x: TODO: should send NACK in response to REVAL",mytid);
1182 processReval(GlobalTaskId sender) //similar to schedule...
1183 { nat nelem, space_required, nGAs;
1184 static rtsPackBuffer *packBuffer;
1185 StgClosure *newGraph;
1186 globalAddr *gagamap;
1190 packBuffer = gumPackBuffer; /* HWL */
1191 unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
1193 IF_PAR_DEBUG(packet,
1194 belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
1195 PrintPacket(packBuffer));
1198 space_required = packBuffer[0];
1199 if (SAVE_Hp + space_required >= SAVE_HpLim) {
1200 ReallyPerformThreadGC(space_required, rtsFalse);
1201 SAVE_Hp -= space_required;
1205 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1206 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1207 ASSERT(newGraph != NULL);
1209 IF_PAR_DEBUG(packet,
1210 belch("@;~) Unpacked graph with root at %p (%s):",
1211 newGraph, info_type(newGraph));
1212 PrintGraph(newGraph, 0));
1214 IF_PAR_DEBUG(tables,
1215 DebugPrintGAGAMap(gagamap, nGAs));
1217 IF_PAR_DEBUG(tables,
1219 DebugPrintGAGAMap(gagamap, nGAs));
1221 //We don't send an Ack to the head!!!!
1223 sendAck(sender, nGAs-1, gagamap+2);
1225 IF_PAR_DEBUG(verbose,
1226 belch("@;~) About to create Reval thread on behalf of %x",
1229 tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
1230 tso->priority=RevalPriority;
1231 tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
1232 tso->revalTid =gagamap->payload.gc.gtid;
1233 scheduleThread(tso);
1234 context_switch = 1; // switch at the earliest opportunity
1239 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1240 //@subsection GUM Message Processor
1243 * GUM Message Processor
1245 * processMessages processes any messages that have arrived, calling
1246 * appropriate routines depending on the message tag
1247 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1248 * present and performs a blocking receive! During profiling it
1249 * busy-waits in order to record idle time.
1252 //@cindex processMessages
1254 processMessages(void)
1259 rtsBool receivedFinish = rtsFalse;
1262 packet = GetPacket(); /* Get next message; block until one available */
1263 getOpcodeAndSender(packet, &opcode, &task);
1265 if (task==SysManTask) {
1272 IF_PAR_DEBUG(verbose,
1273 belch("==== received FINISH [%p]", mytid));
1274 /* this boolean value is returned and propagated to the main
1275 scheduling loop, thus shutting-down this PE */
1276 receivedFinish = rtsTrue;
1280 barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
1282 } else if (taskIDtoPE(task)==0) {
1283 /* When a new PE joins then potentially FISH & REVAL message may
1284 reach PES before they are notified of the new PEs existance. The
1285 only solution is to bounce/fail these messages back to the sender.
1286 But we will worry about it once we start seeing these race
1298 belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
1305 belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
1311 // Global statistics: count no. of fetches
1312 if (RtsFlags.ParFlags.ParStats.Global &&
1313 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1314 globalParStats.rec_fetch_mess++;
1319 processResume(task);
1320 // Global statistics: count no. of fetches
1321 if (RtsFlags.ParFlags.ParStats.Global &&
1322 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1323 globalParStats.rec_resume_mess++;
1333 // Global statistics: count no. of fetches
1334 if (RtsFlags.ParFlags.ParStats.Global &&
1335 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1336 globalParStats.rec_fish_mess++;
1345 processSchedule(task);
1346 // Global statistics: count no. of fetches
1347 if (RtsFlags.ParFlags.ParStats.Global &&
1348 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1349 globalParStats.rec_schedule_mess++;
1356 // Global statistics: count no. of fetches
1357 if (RtsFlags.ParFlags.ParStats.Global &&
1358 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1359 globalParStats.rec_reval_mess++;
1365 /* Anything we're not prepared to deal with. */
1366 barf("Task %x: Unexpected opcode %x from %x",
1367 mytid, opcode, task);
1370 } while (PacketsWaiting()); /* While there are messages: process them */
1371 return receivedFinish;
1372 } /* processMessages */
1374 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1375 //@subsection Miscellaneous Functions
1378 * blockFetch blocks a BlockedFetch node on some kind of black hole.
1380 //@cindex blockFetch
1382 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1384 switch (get_itbl(bh)->type) {
1386 bf->link = END_BQ_QUEUE;
1387 //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
1388 SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
1389 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1391 // put bh on the mutables list
1392 recordMutable((StgMutClosure *)bh);
1396 /* enqueue bf on blocking queue of closure bh */
1397 bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1398 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1400 // put bh on the mutables list; ToDo: check
1401 recordMutable((StgMutClosure *)bh);
1405 /* enqueue bf on blocking queue of closure bh */
1406 bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1407 ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1409 // put bh on the mutables list; ToDo: check
1410 recordMutable((StgMutClosure *)bh);
1414 /* enqueue bf on blocking queue of closure bh */
1415 bf->link = ((StgRBH *)bh)->blocking_queue;
1416 ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1418 // put bh on the mutables list; ToDo: check
1419 recordMutable((StgMutClosure *)bh);
1423 barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1424 (StgClosure *)bh, get_itbl((StgClosure *)bh),
1425 info_type((StgClosure *)bh));
1428 belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1435 @blockThread@ is called from the main scheduler whenever tso returns with
1436 a ThreadBlocked return code; tso has already been added to a blocking
1437 queue (that's done in the entry code of the closure, because it is a
1438 cheap operation we have to do in any case); the main purpose of this
1439 routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1440 closure, which is indicated by the tso.why_blocked field;
1441 we also write an entry into the log file if we are generating one
1443 Should update exectime etc in the entry code already; but we don't have
1444 something like ``system time'' in the log file anyway, so this should
1445 even out the inaccuracies.
1448 //@cindex blockThread
1450 blockThread(StgTSO *tso)
1452 globalAddr *remote_ga=NULL;
1453 globalAddr *local_ga;
1456 // ASSERT(we are on some blocking queue)
1457 ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1460 We have to check why this thread has been blocked.
1462 switch (tso->why_blocked) {
1464 /* the closure must be a FETCH_ME_BQ; tso came in here via
1465 FETCH_ME entry code */
1466 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1468 /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1469 end this point; if something (eg. GC) happens inbetween the whole
1471 The problem is that the ga field of the FETCH_ME has been overwritten
1472 with the head of the blocking queue (which is tso).
1474 ASSERT(looks_like_ga(&theGlobalFromGA));
1475 // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1476 remote_ga = &theGlobalFromGA; //tso->link;
1477 tso->link = (StgTSO*)END_BQ_QUEUE;
1478 /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1479 we have to send a Fetch message here! */
1480 if (RtsFlags.ParFlags.ParStats.Full) {
1481 /* Note that CURRENT_TIME may perform an unsafe call */
1482 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1483 tso->par.fetchcount++;
1484 tso->par.blockedat = CURRENT_TIME;
1485 /* we are about to send off a FETCH message, so dump a FETCH event */
1486 DumpRawGranEvent(CURRENT_PROC,
1487 taskIDtoPE(remote_ga->payload.gc.gtid),
1488 GR_FETCH, tso, tso->block_info.closure, 0, 0);
1490 /* Phil T. claims that this was a workaround for a hard-to-find
1491 * bug, hence I'm leaving it out for now --SDM
1493 /* Assign a brand-new global address to the newly created FMBQ */
1494 local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1495 splitWeight(&fmbq_ga, local_ga);
1496 ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
1498 sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1500 // Global statistics: count no. of fetches
1501 if (RtsFlags.ParFlags.ParStats.Global &&
1502 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1503 globalParStats.tot_fetch_mess++;
1507 theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
1510 case BlockedOnGA_NoSend:
1511 /* the closure must be a FETCH_ME_BQ; tso came in here via
1512 FETCH_ME_BQ entry code */
1513 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1515 /* Fetch message has been sent already */
1516 if (RtsFlags.ParFlags.ParStats.Full) {
1517 /* Note that CURRENT_TIME may perform an unsafe call */
1518 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1519 tso->par.blockcount++;
1520 tso->par.blockedat = CURRENT_TIME;
1521 /* dump a block event, because fetch has been sent already */
1522 DumpRawGranEvent(CURRENT_PROC, thisPE,
1523 GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1528 case BlockedOnBlackHole:
1529 /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via
1530 BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1531 ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
1532 get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1533 get_itbl(tso->block_info.closure)->type==RBH);
1535 /* if collecting stats update the execution time etc */
1536 if (RtsFlags.ParFlags.ParStats.Full) {
1537 /* Note that CURRENT_TIME may perform an unsafe call */
1538 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1539 tso->par.blockcount++;
1540 tso->par.blockedat = CURRENT_TIME;
1541 DumpRawGranEvent(CURRENT_PROC, thisPE,
1542 GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1546 case BlockedOnDelay:
1547 /* Whats sort of stats shall we collect for an explicit threadDelay? */
1548 IF_PAR_DEBUG(verbose,
1549 belch("##++ blockThread: TSO %d blocked on ThreadDelay",
1553 /* Check that the following is impossible to happen, indeed
1554 case BlockedOnException:
1556 case BlockedOnWrite:
1559 barf("blockThread: impossible why_blocked code %d for TSO %d",
1560 tso->why_blocked, tso->id);
1563 IF_PAR_DEBUG(verbose,
1564 belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
1565 tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
1566 (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
1569 print_bq(tso->block_info.closure));
1573 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1574 * Important properties:
1575 * - it varies during execution, even if the PE is idle
1576 * - it's different for each PE
1577 * - we never send a fish to ourselves
1579 extern long lrand48 (void);
1587 temp = lrand48() % nPEs;
1588 if (allPEs[temp] == mytid) { /* Never send a FISH to yourself */
1589 temp = (temp + 1) % nPEs;
1591 return allPEs[temp];
1595 * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1596 * of the ga argument; called from processFetch when the local closure is
1599 //@cindex createBlockedFetch
1601 createBlockedFetch (globalAddr ga, globalAddr rga)
1603 StgBlockedFetch *bf;
1604 StgClosure *closure;
1606 closure = GALAlookup(&ga);
1607 if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
1608 barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1609 GarbageCollect(GetRoots, rtsFalse);
1610 closure = GALAlookup(&ga);
1611 bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
1612 // ToDo: check whether really guaranteed to succeed 2nd time around
1615 ASSERT(bf != (StgBlockedFetch *)NULL);
1616 SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
1617 // ToDo: check whether other header info is needed
1619 bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1620 bf->ga.payload.gc.slot = rga.payload.gc.slot;
1621 bf->ga.weight = rga.weight;
1622 // bf->link = NULL; debugging
1624 IF_PAR_DEBUG(schedule,
1625 fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1626 bf, info_type((StgClosure*)bf));
1628 fputc('\n',stderr));
1629 return (StgClosure *)bf;
1633 * waitForTermination enters a loop ignoring spurious messages while
1634 * waiting for the termination sequence to be completed.
1636 //@cindex waitForTermination
1638 waitForTermination(void)
1641 rtsPacket p = GetPacket();
1642 processUnexpectedMessage(p);
1647 //@cindex DebugPrintGAGAMap
1649 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1653 for (i = 0; i < nGAs; ++i, gagamap += 2)
1654 fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1655 gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1656 gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1659 //@cindex checkGAGAMap
1661 checkGAGAMap(globalAddr *gagamap, int nGAs)
1665 for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
1666 ASSERT(looks_like_ga(gagamap));
1667 ASSERT(looks_like_ga(gagamap+1));
1672 //@cindex freeMsgBuffer
1673 static StgWord **freeMsgBuffer = NULL;
1674 //@cindex freeMsgIndex
1675 static nat *freeMsgIndex = NULL;
1677 //@cindex prepareFreeMsgBuffers
1679 prepareFreeMsgBuffers(void)
1683 /* Allocate the freeMsg buffers just once and then hang onto them. */
1684 if (freeMsgIndex == NULL) {
1685 freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat),
1686 "prepareFreeMsgBuffers (Index)");
1687 freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *),
1688 "prepareFreeMsgBuffers (Buffer)");
1690 for(i = 0; i < nPEs; i++)
1691 if (i != (thisPE-1))
1692 freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1693 "prepareFreeMsgBuffers (Buffer #i)");
1695 freeMsgBuffer[i] = 0;
1698 /* Initialize the freeMsg buffer pointers to point to the start of their
1700 for (i = 0; i < nPEs; i++)
1701 freeMsgIndex[i] = 0;
1704 //@cindex freeRemoteGA
1706 freeRemoteGA(int pe, globalAddr *ga)
1710 ASSERT(GALAlookup(ga) == NULL);
1712 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1714 belch("!! Filled a free message buffer (sending remaining messages indivisually)"));
1716 sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1719 freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1720 freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1721 freeMsgIndex[pe] = i;
1724 ga->weight = 0xdead0add;
1725 ga->payload.gc.gtid = 0xbbbbbbbb;
1726 ga->payload.gc.slot = 0xbbbbbbbb;);
1729 //@cindex sendFreeMessages
1731 sendFreeMessages(void)
1735 for (i = 0; i < nPEs; i++)
1736 if (freeMsgIndex[i] > 0)
1737 sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1740 /* synchronises with the other PEs. Receives and records in a global
1741 * variable the task-id of SysMan. If this is the main thread (discovered
1742 * in main.lc), identifies itself to SysMan. Finally it receives
1743 * from SysMan an array of the Global Task Ids of each PE, which is
1744 * returned as the value of the function.
1747 #if defined(PAR_TICKY)
1748 /* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
1749 //@cindex stats_CntFreeGA
1751 stats_CntFreeGA (void) { // stats only
1753 // Global statistics: residency of thread and spark pool
1754 if (RtsFlags.ParFlags.ParStats.Global &&
1755 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1758 globalParStats.cnt_free_GA++;
1759 for (i = 0, s = 0; i < nPEs; i++)
1760 s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
1762 if ( s > globalParStats.res_free_GA )
1763 globalParStats.res_free_GA = s;
1766 #endif /* PAR_TICKY */
1768 #endif /* PAR -- whole file */
1770 //@node Index, , Miscellaneous Functions, High Level Communications Routines
1774 //* ACK:: @cindex\s-+ACK
1775 //* DebugPrintGAGAMap:: @cindex\s-+DebugPrintGAGAMap
1776 //* FETCH:: @cindex\s-+FETCH
1777 //* FISH:: @cindex\s-+FISH
1778 //* FREE:: @cindex\s-+FREE
1779 //* RESUME:: @cindex\s-+RESUME
1780 //* SCHEDULE:: @cindex\s-+SCHEDULE
1781 //* blockFetch:: @cindex\s-+blockFetch
1782 //* choosePE:: @cindex\s-+choosePE
1783 //* freeMsgBuffer:: @cindex\s-+freeMsgBuffer
1784 //* freeMsgIndex:: @cindex\s-+freeMsgIndex
1785 //* freeRemoteGA:: @cindex\s-+freeRemoteGA
1786 //* gumPackBuffer:: @cindex\s-+gumPackBuffer
1787 //* initMoreBuffers:: @cindex\s-+initMoreBuffers
1788 //* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers
1789 //* processAck:: @cindex\s-+processAck
1790 //* processFetch:: @cindex\s-+processFetch
1791 //* processFetches:: @cindex\s-+processFetches
1792 //* processFish:: @cindex\s-+processFish
1793 //* processFree:: @cindex\s-+processFree
1794 //* processMessages:: @cindex\s-+processMessages
1795 //* processResume:: @cindex\s-+processResume
1796 //* processSchedule:: @cindex\s-+processSchedule
1797 //* sendAck:: @cindex\s-+sendAck
1798 //* sendFetch:: @cindex\s-+sendFetch
1799 //* sendFish:: @cindex\s-+sendFish
1800 //* sendFree:: @cindex\s-+sendFree
1801 //* sendFreeMessages:: @cindex\s-+sendFreeMessages
1802 //* sendResume:: @cindex\s-+sendResume
1803 //* sendSchedule:: @cindex\s-+sendSchedule
1804 //* unpackAck:: @cindex\s-+unpackAck
1805 //* unpackFetch:: @cindex\s-+unpackFetch
1806 //* unpackFish:: @cindex\s-+unpackFish
1807 //* unpackFree:: @cindex\s-+unpackFree
1808 //* unpackResume:: @cindex\s-+unpackResume
1809 //* unpackSchedule:: @cindex\s-+unpackSchedule
1810 //* waitForTermination:: @cindex\s-+waitForTermination