1 /* ----------------------------------------------------------------------------
2 * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
4 * High Level Communications Routines (HLComms.lc)
6 * Contains the high-level routines (i.e. communication
7 * subsystem independent) used by GUM
9 * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
10 * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
11 * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
13 * ------------------------------------------------------------------------- */
15 #ifdef PAR /* whole file */
17 //@node High Level Communications Routines, , ,
18 //@section High Level Communications Routines
23 //* GUM Message Sending and Unpacking Functions::
24 //* Message-Processing Functions::
25 //* GUM Message Processor::
26 //* Miscellaneous Functions::
30 //@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
31 //@subsection Macros etc
33 /* Evidently not Posix */
34 /* #include "PosixSource.h" */
36 //@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
37 //@subsection Includes
42 #include "Storage.h" // for recordMutable
45 #include "GranSimRts.h"
46 #include "ParallelRts.h"
48 #include "FetchMe.h" // for BLOCKED_FETCH_info etc
50 # include "ParallelDebug.h"
52 #include "StgMacros.h" // inlined IS_... fcts
55 #include "SchedAPI.h" //for createIOThread
56 extern unsigned int context_switch;
59 //@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
60 //@subsection GUM Message Sending and Unpacking Functions
63 * GUM Message Sending and Unpacking Functions
67 * Allocate space for message processing
70 //@cindex gumPackBuffer
71 static rtsPackBuffer *gumPackBuffer;
73 //@cindex initMoreBuffers
77 if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize,
78 "initMoreBuffers")) == NULL)
84 * SendFetch packs the two global addresses and a load into a message +
89 Structure of a FETCH message:
92 +------------------------------------+------+
93 | gtid | slot | weight | gtid | slot | load |
94 +------------------------------------+------+
99 sendFetch(globalAddr *rga, globalAddr *lga, int load)
101 ASSERT(rga->weight > 0 && lga->weight > 0);
103 belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
104 rga->payload.gc.gtid, rga->payload.gc.slot,
105 lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
110 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid),
111 GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
112 0, spark_queue_len(ADVISORY_POOL));
115 sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
116 (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot,
117 (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid,
118 (StgWord) lga->payload.gc.slot, (StgWord) load);
122 * unpackFetch unpacks a FETCH message into two Global addresses and a load
126 //@cindex unpackFetch
128 unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
135 belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
136 (GlobalTaskId) buf[0], (int) buf[1],
137 (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
140 lga->payload.gc.gtid = (GlobalTaskId) buf[0];
141 lga->payload.gc.slot = (int) buf[1];
143 rga->weight = (unsigned) buf[2];
144 rga->payload.gc.gtid = (GlobalTaskId) buf[3];
145 rga->payload.gc.slot = (int) buf[4];
147 *load = (int) buf[5];
149 ASSERT(rga->weight > 0);
153 * SendResume packs the remote blocking queue's GA and data into a message
158 Structure of a RESUME message:
160 -------------------------------
161 | weight | slot | n | data ...
162 -------------------------------
164 data is a packed graph represented as an rtsPackBuffer
165 n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
170 sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
173 belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]",
174 packBuffer->id, nelem,
175 rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
176 rga->payload.gc.gtid));
178 PrintPacket(packBuffer));
180 ASSERT(nelem==packBuffer->size);
181 /* check for magic end-of-buffer word */
182 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
184 sendOpNV(PP_RESUME, rga->payload.gc.gtid,
185 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer,
186 2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
190 * unpackResume unpacks a Resume message into two Global addresses and
194 //@cindex unpackResume
196 unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
203 RESUME event is written in awaken_blocked_queue
204 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
205 GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
208 lga->weight = (unsigned) buf[0];
209 lga->payload.gc.gtid = mytid;
210 lga->payload.gc.slot = (int) buf[1];
212 *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
213 GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
216 belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))",
217 packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
219 /* check for magic end-of-buffer word */
220 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
224 * SendAck packs the global address being acknowledged, together with
225 * an array of global addresses for any closures shipped and sends them.
229 Structure of an ACK message:
232 +---------------------------------------------+-------
233 | weight | gtid | slot | weight | gtid | slot | ..... ngas times
234 + --------------------------------------------+-------
240 sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
247 return; //don't send unnecessary messages!!
249 buffer = (long *) gumPackBuffer;
251 for(i = 0, p = buffer; i < ngas; i++, p += 6) {
252 ASSERT(gagamap[1].weight > 0);
253 p[0] = (long) gagamap->weight;
254 p[1] = (long) gagamap->payload.gc.gtid;
255 p[2] = (long) gagamap->payload.gc.slot;
257 p[3] = (long) gagamap->weight;
258 p[4] = (long) gagamap->payload.gc.gtid;
259 p[5] = (long) gagamap->payload.gc.slot;
262 IF_PAR_DEBUG(schedule,
263 belch("~^,, Sending Ack (%d pairs) to [%x]\n",
266 sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
270 * unpackAck unpacks an Acknowledgement message into a Global address,
271 * a count of the number of global addresses following and a map of
277 unpackAck(int *ngas, globalAddr *gagamap)
282 GetArgs(&GAarraysize, 1);
284 *ngas = GAarraysize / 6;
286 IF_PAR_DEBUG(schedule,
287 belch("~^,, Unpacking Ack (%d pairs) on [%x]\n",
290 while (GAarraysize > 0) {
292 gagamap->weight = (rtsWeight) buf[0];
293 gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
294 gagamap->payload.gc.slot = (int) buf[2];
296 gagamap->weight = (rtsWeight) buf[3];
297 gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
298 gagamap->payload.gc.slot = (int) buf[5];
299 ASSERT(gagamap->weight > 0);
306 * SendFish packs the global address being acknowledged, together with
307 * an array of global addresses for any closures shipped and sends them.
311 Structure of a FISH message:
313 +----------------------------------+
314 | orig PE | age | history | hunger |
315 +----------------------------------+
320 sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
321 int age, int history, int hunger)
324 belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)",
325 destPE, outstandingFishes));
327 sendOpV(PP_FISH, destPE, 4,
328 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
330 if (origPE == mytid) {
337 * unpackFish unpacks a FISH message into the global task id of the
338 * originating PE and 3 data fields: the age, history and hunger of the
339 * fish. The history + hunger are not currently used.
345 unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
352 belch("~^$$ Unpacking Fish from [%x] (age=%d)",
353 (GlobalTaskId) buf[0], (int) buf[1]));
355 *origPE = (GlobalTaskId) buf[0];
357 *history = (int) buf[2];
358 *hunger = (int) buf[3];
362 * SendFree sends (weight, slot) pairs for GAs that we no longer need
367 Structure of a FREE message:
369 +-----------------------------
370 | n | weight_1 | slot_1 | ...
371 +-----------------------------
375 sendFree(GlobalTaskId pe, int nelem, StgPtr data)
378 belch("~^!! Sending Free (%d GAs) to [%x]",
381 sendOpN(PP_FREE, pe, nelem, data);
385 * unpackFree unpacks a FREE message into the amount of data shipped and
390 unpackFree(int *nelem, StgWord *data)
395 *nelem = (int) buf[0];
398 belch("~^!! Unpacking Free (%d GAs)",
401 GetArgs(data, *nelem);
405 * SendSchedule sends a closure to be evaluated in response to a Fish
406 * message. The message is directed to the PE that originated the Fish
407 * (origPE), and includes the packed closure (data) along with its size
412 Structure of a SCHEDULE message:
414 +------------------------------------
415 | PE | n | pack buffer of a graph ...
416 +------------------------------------
418 //@cindex sendSchedule
420 sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
422 IF_PAR_DEBUG(schedule,
423 belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n",
424 packBuffer->id, nelem, origPE));
426 PrintPacket(packBuffer));
428 ASSERT(nelem==packBuffer->size);
429 /* check for magic end-of-buffer word */
430 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
432 sendOpN(PP_SCHEDULE, origPE,
433 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
437 * unpackSchedule unpacks a SCHEDULE message into the Global address of
438 * the closure shipped, the amount of data shipped (nelem) and the data
442 //@cindex unpackSchedule
444 unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
448 /* first, just unpack 1 word containing the total size (including header) */
450 /* no. of elems, not counting the header of the pack buffer */
451 *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
453 /* automatic cast of flat pvm-data to rtsPackBuffer */
454 GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
456 IF_PAR_DEBUG(schedule,
457 belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n",
458 packBuffer->id, *nelem, mytid));
460 ASSERT(*nelem==packBuffer->size);
461 /* check for magic end-of-buffer word */
462 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
466 /* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
468 sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
470 IF_PAR_DEBUG(schedule,
471 belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n",
472 packBuffer->id, nelem, origPE));
474 PrintPacket(packBuffer));
476 ASSERT(nelem==packBuffer->size);
477 /* check for magic end-of-buffer word */
478 IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
480 sendOpN(PP_REVAL, origPE,
481 nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
484 void FinishReval(StgTSO *t)
488 rtsPackBuffer *buffer=NULL;
490 ga.payload.gc.slot = t->revalSlot;
491 ga.payload.gc.gtid = t->revalTid;
494 //find where the reval result is
495 res = GALAlookup(&ga);
498 IF_PAR_DEBUG(schedule,
500 belch(" needs the result %08x\n",res));
502 //send off the result
503 buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
504 ASSERT(buffer != (rtsPackBuffer *)NULL);
505 sendResume(&ga, size, buffer);
507 IF_PAR_DEBUG(schedule,
508 belch("@;~) Reval Finished"));
513 //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
514 //@subsection Message-Processing Functions
517 * Message-Processing Functions
519 * The following routines process incoming GUM messages. Often reissuing
520 * messages in response.
522 * processFish unpacks a fish message, reissuing it if it's our own,
523 * sending work if we have it or sending it onwards otherwise.
527 * processFetches constructs and sends resume messages for every
528 * BlockedFetch which is ready to be awakened.
529 * awaken_blocked_queue (in Schedule.c) is responsible for moving
530 * BlockedFetches from a blocking queue to the PendingFetches queue.
533 extern StgBlockedFetch *PendingFetches;
536 pending_fetches_len(void)
541 for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
542 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
547 //@cindex processFetches
549 processFetches(void) {
550 StgBlockedFetch *bf, *next;
554 static rtsPackBuffer *packBuffer;
556 IF_PAR_DEBUG(verbose,
557 belch("____ processFetches: %d pending fetches (root @ %p)",
558 pending_fetches_len(), PendingFetches));
560 for (bf = PendingFetches;
563 /* the PendingFetches list contains only BLOCKED_FETCH closures */
564 ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
565 /* store link (we might overwrite it via blockFetch later on */
566 next = (StgBlockedFetch *)(bf->link);
569 * Find the target at the end of the indirection chain, and
570 * process it in much the same fashion as the original target
571 * of the fetch. Though we hope to find graph here, we could
572 * find a black hole (of any flavor) or even a FetchMe.
576 We evacuate BQs and update the node fields where necessary in GC.c
577 So, if we find an EVACUATED closure, something has gone Very Wrong
578 (and therefore we let the RTS crash most ungracefully).
580 ASSERT(get_itbl(closure)->type != EVACUATED);
581 // closure = ((StgEvacuated *)closure)->evacuee;
583 closure = UNWIND_IND(closure);
584 //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
586 ip = get_itbl(closure);
587 if (ip->type == FETCH_ME) {
588 /* Forward the Fetch to someone else */
589 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
590 rga.payload.gc.slot = bf->ga.payload.gc.slot;
591 rga.weight = bf->ga.weight;
593 sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
595 // Global statistics: count no. of fetches
596 if (RtsFlags.ParFlags.ParStats.Global &&
597 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
598 globalParStats.tot_fetch_mess++;
602 belch("__-> processFetches: Forwarding fetch from %lx to %lx",
603 mytid, rga.payload.gc.gtid));
605 } else if (IS_BLACK_HOLE(closure)) {
606 IF_PAR_DEBUG(verbose,
607 belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
608 closure, info_type(closure)));
610 blockFetch(bf, closure);
612 /* We now have some local graph to send back */
615 packBuffer = gumPackBuffer;
616 IF_PAR_DEBUG(verbose,
617 belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
618 closure, info_type(closure)));
620 if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
621 // Put current BF back on list
622 bf->link = (StgBlockingQueueElement *)PendingFetches;
623 PendingFetches = (StgBlockedFetch *)bf;
624 // ToDo: check that nothing more has to be done to prepare for GC!
625 barf("processFetches: out of heap while packing graph; ToDo: call GC here");
626 GarbageCollect(GetRoots, rtsFalse);
628 PendingFetches = (StgBlockedFetch *)(bf->link);
630 packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
631 ASSERT(packBuffer != (rtsPackBuffer *)NULL);
633 rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
634 rga.payload.gc.slot = bf->ga.payload.gc.slot;
635 rga.weight = bf->ga.weight;
637 sendResume(&rga, size, packBuffer);
639 // Global statistics: count no. of fetches
640 if (RtsFlags.ParFlags.ParStats.Global &&
641 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
642 globalParStats.tot_resume_mess++;
646 PendingFetches = END_BF_QUEUE;
651 Alternatively to sending fetch messages directly from the FETCH_ME_entry
652 code we could just store the data about the remote data in a global
653 variable and send the fetch request from the main scheduling loop (similar
654 to processFetches above). This would save an expensive STGCALL in the entry
655 code because we have to go back to the scheduler anyway.
657 //@cindex processFetches
659 processTheRealFetches(void) {
661 StgClosure *closure, *next;
663 IF_PAR_DEBUG(verbose,
664 belch("__ processTheRealFetches: ");
665 printGA(&theGlobalFromGA);
666 printGA(&theGlobalToGA));
668 ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
669 theGlobalToGA.payload.gc.gtid != 0);
671 /* the old version did this in the FETCH_ME entry code */
672 sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
679 Way of dealing with unwanted fish.
680 Used during startup/shutdown, or from unknown PEs
685 int age, history, hunger;
687 /* IF_PAR_DEBUG(verbose, */
688 belch(".... [%x] Bouncing unwanted FISH",mytid);
690 unpackFish(&origPE, &age, &history, &hunger);
692 if (origPE == mytid) {
693 //fishing = rtsFalse; // fish has come home
695 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
696 return; // that's all
699 /* otherwise, send it home to die */
700 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
701 // Global statistics: count no. of fetches
702 if (RtsFlags.ParFlags.ParStats.Global &&
703 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
704 globalParStats.tot_fish_mess++;
709 * processFish unpacks a fish message, reissuing it if it's our own,
710 * sending work if we have it or sending it onwards otherwise.
712 //@cindex processFish
717 int age, history, hunger;
719 static rtsPackBuffer *packBuffer;
721 unpackFish(&origPE, &age, &history, &hunger);
723 if (origPE == mytid) {
724 //fishing = rtsFalse; // fish has come home
726 last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
727 return; // that's all
730 ASSERT(origPE != mytid);
732 belch("$$__ processing fish; %d sparks available",
733 spark_queue_len(&(MainRegTable.rSparks))));
734 while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
736 // StgClosure *graph;
738 packBuffer = gumPackBuffer;
739 ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
740 if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
742 belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
743 (StgClosure *)spark));
744 barf("processFish: out of heap while packing graph; ToDo: call GC here");
745 GarbageCollect(GetRoots, rtsFalse);
746 /* Now go back and try again */
748 IF_PAR_DEBUG(verbose,
749 if (RtsFlags.ParFlags.ParStats.Sparks)
750 belch("==== STEALING spark %x; sending to %x", spark, origPE));
753 belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
755 (StgClosure *)spark, info_type((StgClosure *)spark)));
756 sendSchedule(origPE, size, packBuffer);
758 // Global statistics: count no. of fetches
759 if (RtsFlags.ParFlags.ParStats.Global &&
760 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
761 globalParStats.tot_schedule_mess++;
767 if (spark == (rtsSpark)NULL) {
769 belch("$$^^ No sparks available for FISH from %x",
771 /* We have no sparks to give */
772 if (age < FISH_LIFE_EXPECTANCY) {
773 /* and the fish is atill young, send it to another PE to look for work */
774 sendFish(choosePE(), origPE,
775 (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
777 // Global statistics: count no. of fetches
778 if (RtsFlags.ParFlags.ParStats.Global &&
779 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
780 globalParStats.tot_fish_mess++;
782 } else { /* otherwise, send it home to die */
783 sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
784 // Global statistics: count no. of fetches
785 if (RtsFlags.ParFlags.ParStats.Global &&
786 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
787 globalParStats.tot_fish_mess++;
794 * processFetch either returns the requested data (if available)
795 * or blocks the remote blocking queue on a black hole (if not).
798 //@cindex processFetch
807 unpackFetch(&ga, &rga, &load);
809 belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
810 ga.payload.gc.gtid, ga.payload.gc.slot,
811 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
812 rga.payload.gc.gtid));
814 closure = GALAlookup(&ga);
815 ASSERT(closure != (StgClosure *)NULL);
816 ip = get_itbl(closure);
817 if (ip->type == FETCH_ME) {
818 /* Forward the Fetch to someone else */
819 sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
821 // Global statistics: count no. of fetches
822 if (RtsFlags.ParFlags.ParStats.Global &&
823 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
824 globalParStats.tot_fetch_mess++;
826 } else if (rga.payload.gc.gtid == mytid) {
827 /* Our own FETCH forwarded back around to us */
828 StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
831 belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
832 closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
833 /* We may have already discovered that the fetch target is our own. */
834 if ((StgClosure *)fmbq != closure)
835 CommonUp((StgClosure *)fmbq, closure);
836 (void) addWeight(&rga);
837 } else if (IS_BLACK_HOLE(closure)) {
838 /* This includes RBH's and FMBQ's */
841 /* Can we assert something on the remote GA? */
842 ASSERT(GALAlookup(&rga) == NULL);
844 /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
845 closure into the BQ in order to denote that when updating this node
846 the result should be sent to the originator of this fetch message. */
847 bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
849 belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
850 rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
851 closure, info_type(closure)));
852 blockFetch(bf, closure);
854 /* The target of the FetchMe is some local graph */
856 // StgClosure *graph;
857 rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
859 if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
860 barf("processFetch: out of heap while packing graph; ToDo: call GC here");
861 GarbageCollect(GetRoots, rtsFalse);
862 closure = GALAlookup(&ga);
863 buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
864 ASSERT(buffer != (rtsPackBuffer *)NULL);
866 sendResume(&rga, size, buffer);
868 // Global statistics: count no. of fetches
869 if (RtsFlags.ParFlags.ParStats.Global &&
870 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
871 globalParStats.tot_resume_mess++;
877 The list of pending fetches must be a root-list for GC.
878 This routine is called from GC.c (same as marking GAs etc).
881 markPendingFetches(rtsBool major_gc) {
883 /* No need to traverse the list; this is done via the scavenge code
884 for a BLOCKED_FETCH closure, which evacuates the link field */
886 if (PendingFetches != END_BF_QUEUE ) {
888 fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
891 PendingFetches = MarkRoot((StgClosure*)PendingFetches);
893 IF_PAR_DEBUG(verbose,
894 fprintf(stderr, " %p\n", PendingFetches));
898 fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
903 * processFree unpacks a FREE message and adds the weights to our GAs.
905 //@cindex processFree
910 static StgWord *buffer;
914 buffer = (StgWord *)gumPackBuffer;
915 unpackFree(&nelem, buffer);
917 belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
919 ga.payload.gc.gtid = mytid;
920 for (i = 0; i < nelem;) {
921 ga.weight = (rtsWeight) buffer[i++];
922 ga.payload.gc.slot = (int) buffer[i++];
924 fprintf(stderr, "!!-- Processing free ");
928 (void) addWeight(&ga);
933 * processResume unpacks a RESUME message into the graph, filling in
934 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
935 * FetchMe (now a blocking queue) are awakened, and the blocking queue
936 * is converted into an indirection. Finally it sends an ACK in response
937 * which contains any newly allocated GAs.
940 //@cindex processResume
942 processResume(GlobalTaskId sender)
946 static rtsPackBuffer *packBuffer;
947 StgClosure *newGraph, *old;
951 packBuffer = (rtsPackBuffer *)gumPackBuffer;
952 unpackResume(&lga, &nelem, packBuffer);
955 fprintf(stderr, "[]__ Rcvd Resume for ");
957 fputc('\n', stderr));
959 PrintPacket((rtsPackBuffer *)packBuffer));
962 * We always unpack the incoming graph, even if we've received the
963 * requested node in some other data packet (and already awakened
964 * the blocking queue).
965 if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
966 ReallyPerformThreadGC(packBuffer[0], rtsFalse);
967 SAVE_Hp -= packBuffer[0];
971 // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
973 /* Do this *after* GC; we don't want to release the object early! */
976 (void) addWeight(&lga);
978 old = GALAlookup(&lga);
980 /* ToDo: The closure that requested this graph must be one of these two?*/
981 ASSERT(get_itbl(old)->type == FETCH_ME_BQ ||
982 get_itbl(old)->type == RBH);
984 if (RtsFlags.ParFlags.ParStats.Full) {
985 StgBlockingQueueElement *bqe, *last_bqe;
988 belch("[]-- Resume is REPLY to closure %lx", old));
990 /* Write REPLY events to the log file, indicating that the remote
992 NB: we emit a REPLY only for the *last* elem in the queue; this is
993 the one that triggered the fetch message; all other entries
994 have just added themselves to the queue, waiting for the data
995 they know that has been requested (see entry code for FETCH_ME_BQ)
997 if ((get_itbl(old)->type == FETCH_ME_BQ ||
998 get_itbl(old)->type == RBH)) {
999 for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
1000 last_bqe = END_BQ_QUEUE;
1001 get_itbl(bqe)->type==TSO ||
1002 get_itbl(bqe)->type==BLOCKED_FETCH;
1003 last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
1005 ASSERT(last_bqe==END_BQ_QUEUE ||
1006 get_itbl((StgClosure *)last_bqe)->type == TSO);
1008 /* last_bqe now points to the TSO that triggered the FETCH */
1009 if (get_itbl((StgClosure *)last_bqe)->type == TSO)
1010 DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
1011 GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
1012 0, spark_queue_len(&(MainRegTable.rSparks)));
1016 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1017 ASSERT(newGraph != NULL);
1020 * Sometimes, unpacking will common up the resumee with the
1021 * incoming graph, but if it hasn't, we'd better do so now.
1024 if (get_itbl(old)->type == FETCH_ME_BQ)
1025 CommonUp(old, newGraph);
1028 belch("[]-- Ready to resume unpacked graph at %p (%s)",
1029 newGraph, info_type(newGraph)));
1031 IF_PAR_DEBUG(tables,
1032 DebugPrintGAGAMap(gagamap, nGAs));
1034 sendAck(sender, nGAs, gagamap);
1038 * processSchedule unpacks a SCHEDULE message into the graph, filling
1039 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
1040 * the local spark queue. Finally it sends an ACK in response
1041 * which contains any newly allocated GAs.
1043 //@cindex processSchedule
1045 processSchedule(GlobalTaskId sender)
1049 static rtsPackBuffer *packBuffer;
1050 StgClosure *newGraph;
1051 globalAddr *gagamap;
1053 packBuffer = gumPackBuffer; /* HWL */
1054 unpackSchedule(&nelem, packBuffer);
1056 IF_PAR_DEBUG(schedule,
1057 belch("--__ Rcvd Schedule (%d elems)", nelem));
1058 IF_PAR_DEBUG(packet,
1059 PrintPacket(packBuffer));
1062 * For now, the graph is a closure to be sparked as an advisory
1063 * spark, but in future it may be a complete spark with
1064 * required/advisory status, priority etc.
1068 space_required = packBuffer[0];
1069 if (SAVE_Hp + space_required >= SAVE_HpLim) {
1070 ReallyPerformThreadGC(space_required, rtsFalse);
1071 SAVE_Hp -= space_required;
1074 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1075 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1076 ASSERT(newGraph != NULL);
1077 success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
1079 if (RtsFlags.ParFlags.ParStats.Full &&
1080 RtsFlags.ParFlags.ParStats.Sparks &&
1082 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1083 GR_STOLEN, ((StgTSO *)NULL), newGraph,
1084 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1086 IF_PAR_DEBUG(schedule,
1088 belch("--^^ added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)",
1089 newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
1091 belch("--^^ received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]",
1092 newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
1093 IF_PAR_DEBUG(packet,
1094 belch("*< Unpacked graph with root at %p (%s):",
1095 newGraph, info_type(newGraph));
1096 PrintGraph(newGraph, 0));
1098 IF_PAR_DEBUG(tables,
1099 DebugPrintGAGAMap(gagamap, nGAs));
1101 sendAck(sender, nGAs, gagamap);
1103 //fishing = rtsFalse;
1104 ASSERT(outstandingFishes>0);
1105 outstandingFishes--;
1109 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
1110 * (which represent shared thunks that have been shipped) into fetch-mes
1113 //@cindex processAck
1119 globalAddr gagamap[256]; // ToDo: elim magic constant!! MAX_GAS * 2];??
1121 unpackAck(&nGAs, gagamap);
1123 IF_PAR_DEBUG(tables,
1124 belch(",,,, Rcvd Ack (%d pairs)", nGAs);
1125 DebugPrintGAGAMap(gagamap, nGAs));
1128 checkGAGAMap(gagamap, nGAs));
1131 * For each (oldGA, newGA) pair, set the GA of the corresponding
1132 * thunk to the newGA, convert the thunk to a FetchMe, and return
1133 * the weight from the oldGA.
1135 for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
1136 StgClosure *old_closure = GALAlookup(gaga);
1137 StgClosure *new_closure = GALAlookup(gaga + 1);
1139 ASSERT(old_closure != NULL);
1140 if (new_closure == NULL) {
1141 /* We don't have this closure, so we make a fetchme for it */
1142 globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
1144 /* convertToFetchMe should be done unconditionally here.
1145 Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
1146 so we have to check whether it is an RBH before converting
1148 ASSERT(get_itbl(old_closure)==RBH);
1150 if (get_itbl(old_closure)->type==RBH)
1151 convertToFetchMe((StgRBH *)old_closure, ga);
1154 * Oops...we've got this one already; update the RBH to
1155 * point to the object we already know about, whatever it
1158 CommonUp(old_closure, new_closure);
1161 * Increase the weight of the object by the amount just
1162 * received in the second part of the ACK pair.
1164 (void) addWeight(gaga + 1);
1166 (void) addWeight(gaga);
1169 /* check the sanity of the LAGA and GALA tables after mincing them */
1170 IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
1177 barf("Task %x: TODO: should send NACK in response to REVAL",mytid);
1181 processReval(GlobalTaskId sender) //similar to schedule...
1182 { nat nelem, space_required, nGAs;
1183 static rtsPackBuffer *packBuffer;
1184 StgClosure *newGraph;
1185 globalAddr *gagamap;
1189 packBuffer = gumPackBuffer; /* HWL */
1190 unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
1192 IF_PAR_DEBUG(packet,
1193 belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
1194 PrintPacket(packBuffer));
1197 space_required = packBuffer[0];
1198 if (SAVE_Hp + space_required >= SAVE_HpLim) {
1199 ReallyPerformThreadGC(space_required, rtsFalse);
1200 SAVE_Hp -= space_required;
1204 // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
1205 newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
1206 ASSERT(newGraph != NULL);
1208 IF_PAR_DEBUG(packet,
1209 belch("@;~) Unpacked graph with root at %p (%s):",
1210 newGraph, info_type(newGraph));
1211 PrintGraph(newGraph, 0));
1213 IF_PAR_DEBUG(tables,
1214 DebugPrintGAGAMap(gagamap, nGAs));
1216 IF_PAR_DEBUG(tables,
1218 DebugPrintGAGAMap(gagamap, nGAs));
1220 //We don't send an Ack to the head!!!!
1222 sendAck(sender, nGAs-1, gagamap+2);
1224 IF_PAR_DEBUG(verbose,
1225 belch("@;~) About to create Reval thread on behalf of %x",
1228 tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
1229 tso->priority=RevalPriority;
1230 tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
1231 tso->revalTid =gagamap->payload.gc.gtid;
1232 scheduleThread(tso);
1233 context_switch = 1; // switch at the earliest opportunity
1238 //@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
1239 //@subsection GUM Message Processor
1242 * GUM Message Processor
1244 * processMessages processes any messages that have arrived, calling
1245 * appropriate routines depending on the message tag
1246 * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
1247 * present and performs a blocking receive! During profiling it
1248 * busy-waits in order to record idle time.
1251 //@cindex processMessages
1253 processMessages(void)
1258 rtsBool receivedFinish = rtsFalse;
1261 packet = GetPacket(); /* Get next message; block until one available */
1262 getOpcodeAndSender(packet, &opcode, &task);
1264 if (task==SysManTask) {
1271 IF_PAR_DEBUG(verbose,
1272 belch("==== received FINISH [%p]", mytid));
1273 /* this boolean value is returned and propagated to the main
1274 scheduling loop, thus shutting-down this PE */
1275 receivedFinish = rtsTrue;
1279 barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
1281 } else if (taskIDtoPE(task)==0) {
1282 /* When a new PE joins then potentially FISH & REVAL message may
1283 reach PES before they are notified of the new PEs existance. The
1284 only solution is to bounce/fail these messages back to the sender.
1285 But we will worry about it once we start seeing these race
1297 belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
1304 belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
1310 // Global statistics: count no. of fetches
1311 if (RtsFlags.ParFlags.ParStats.Global &&
1312 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1313 globalParStats.rec_fetch_mess++;
1318 processResume(task);
1319 // Global statistics: count no. of fetches
1320 if (RtsFlags.ParFlags.ParStats.Global &&
1321 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1322 globalParStats.rec_resume_mess++;
1332 // Global statistics: count no. of fetches
1333 if (RtsFlags.ParFlags.ParStats.Global &&
1334 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1335 globalParStats.rec_fish_mess++;
1344 processSchedule(task);
1345 // Global statistics: count no. of fetches
1346 if (RtsFlags.ParFlags.ParStats.Global &&
1347 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1348 globalParStats.rec_schedule_mess++;
1355 // Global statistics: count no. of fetches
1356 if (RtsFlags.ParFlags.ParStats.Global &&
1357 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1358 globalParStats.rec_reval_mess++;
1364 /* Anything we're not prepared to deal with. */
1365 barf("Task %x: Unexpected opcode %x from %x",
1366 mytid, opcode, task);
1369 } while (PacketsWaiting()); /* While there are messages: process them */
1370 return receivedFinish;
1371 } /* processMessages */
1373 //@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
1374 //@subsection Miscellaneous Functions
1377 * blockFetch blocks a BlockedFetch node on some kind of black hole.
1379 //@cindex blockFetch
1381 blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
1383 switch (get_itbl(bh)->type) {
1385 bf->link = END_BQ_QUEUE;
1386 //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
1387 SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
1388 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1390 // put bh on the mutables list
1391 recordMutable((StgMutClosure *)bh);
1395 /* enqueue bf on blocking queue of closure bh */
1396 bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
1397 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1399 // put bh on the mutables list; ToDo: check
1400 recordMutable((StgMutClosure *)bh);
1404 /* enqueue bf on blocking queue of closure bh */
1405 bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
1406 ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1408 // put bh on the mutables list; ToDo: check
1409 recordMutable((StgMutClosure *)bh);
1413 /* enqueue bf on blocking queue of closure bh */
1414 bf->link = ((StgRBH *)bh)->blocking_queue;
1415 ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
1417 // put bh on the mutables list; ToDo: check
1418 recordMutable((StgMutClosure *)bh);
1422 barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
1423 (StgClosure *)bh, get_itbl((StgClosure *)bh),
1424 info_type((StgClosure *)bh));
1427 belch("##++ blockFetch: after block the BQ of %p (%s) is:",
1434 @blockThread@ is called from the main scheduler whenever tso returns with
1435 a ThreadBlocked return code; tso has already been added to a blocking
1436 queue (that's done in the entry code of the closure, because it is a
1437 cheap operation we have to do in any case); the main purpose of this
1438 routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
1439 closure, which is indicated by the tso.why_blocked field;
1440 we also write an entry into the log file if we are generating one
1442 Should update exectime etc in the entry code already; but we don't have
1443 something like ``system time'' in the log file anyway, so this should
1444 even out the inaccuracies.
1447 //@cindex blockThread
1449 blockThread(StgTSO *tso)
1451 globalAddr *remote_ga=NULL;
1452 globalAddr *local_ga;
1455 // ASSERT(we are on some blocking queue)
1456 ASSERT(tso->block_info.closure != (StgClosure *)NULL);
1459 We have to check why this thread has been blocked.
1461 switch (tso->why_blocked) {
1463 /* the closure must be a FETCH_ME_BQ; tso came in here via
1464 FETCH_ME entry code */
1465 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1467 /* HACK: the link field is used to hold the GA between FETCH_ME_entry
1468 end this point; if something (eg. GC) happens inbetween the whole
1470 The problem is that the ga field of the FETCH_ME has been overwritten
1471 with the head of the blocking queue (which is tso).
1473 ASSERT(looks_like_ga(&theGlobalFromGA));
1474 // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
1475 remote_ga = &theGlobalFromGA; //tso->link;
1476 tso->link = (StgTSO*)END_BQ_QUEUE;
1477 /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
1478 we have to send a Fetch message here! */
1479 if (RtsFlags.ParFlags.ParStats.Full) {
1480 /* Note that CURRENT_TIME may perform an unsafe call */
1481 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1482 tso->par.fetchcount++;
1483 tso->par.blockedat = CURRENT_TIME;
1484 /* we are about to send off a FETCH message, so dump a FETCH event */
1485 DumpRawGranEvent(CURRENT_PROC,
1486 taskIDtoPE(remote_ga->payload.gc.gtid),
1487 GR_FETCH, tso, tso->block_info.closure, 0, 0);
1489 /* Phil T. claims that this was a workaround for a hard-to-find
1490 * bug, hence I'm leaving it out for now --SDM
1492 /* Assign a brand-new global address to the newly created FMBQ */
1493 local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
1494 splitWeight(&fmbq_ga, local_ga);
1495 ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
1497 sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
1499 // Global statistics: count no. of fetches
1500 if (RtsFlags.ParFlags.ParStats.Global &&
1501 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1502 globalParStats.tot_fetch_mess++;
1506 theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
1509 case BlockedOnGA_NoSend:
1510 /* the closure must be a FETCH_ME_BQ; tso came in here via
1511 FETCH_ME_BQ entry code */
1512 ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
1514 /* Fetch message has been sent already */
1515 if (RtsFlags.ParFlags.ParStats.Full) {
1516 /* Note that CURRENT_TIME may perform an unsafe call */
1517 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1518 tso->par.blockcount++;
1519 tso->par.blockedat = CURRENT_TIME;
1520 /* dump a block event, because fetch has been sent already */
1521 DumpRawGranEvent(CURRENT_PROC, thisPE,
1522 GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1527 case BlockedOnBlackHole:
1528 /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via
1529 BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
1530 ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
1531 get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
1532 get_itbl(tso->block_info.closure)->type==RBH);
1534 /* if collecting stats update the execution time etc */
1535 if (RtsFlags.ParFlags.ParStats.Full) {
1536 /* Note that CURRENT_TIME may perform an unsafe call */
1537 tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
1538 tso->par.blockcount++;
1539 tso->par.blockedat = CURRENT_TIME;
1540 DumpRawGranEvent(CURRENT_PROC, thisPE,
1541 GR_BLOCK, tso, tso->block_info.closure, 0, 0);
1545 case BlockedOnDelay:
1546 /* Whats sort of stats shall we collect for an explicit threadDelay? */
1547 IF_PAR_DEBUG(verbose,
1548 belch("##++ blockThread: TSO %d blocked on ThreadDelay",
1552 /* Check that the following is impossible to happen, indeed
1553 case BlockedOnException:
1555 case BlockedOnWrite:
1558 barf("blockThread: impossible why_blocked code %d for TSO %d",
1559 tso->why_blocked, tso->id);
1562 IF_PAR_DEBUG(verbose,
1563 belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
1564 tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
1565 (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
1568 print_bq(tso->block_info.closure));
1572 * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
1573 * Important properties:
1574 * - it varies during execution, even if the PE is idle
1575 * - it's different for each PE
1576 * - we never send a fish to ourselves
1578 extern long lrand48 (void);
1586 temp = lrand48() % nPEs;
1587 if (allPEs[temp] == mytid) { /* Never send a FISH to yourself */
1588 temp = (temp + 1) % nPEs;
1590 return allPEs[temp];
1594 * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
1595 * of the ga argument; called from processFetch when the local closure is
1598 //@cindex createBlockedFetch
1600 createBlockedFetch (globalAddr ga, globalAddr rga)
1602 StgBlockedFetch *bf;
1603 StgClosure *closure;
1605 closure = GALAlookup(&ga);
1606 if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
1607 barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
1608 GarbageCollect(GetRoots, rtsFalse);
1609 closure = GALAlookup(&ga);
1610 bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
1611 // ToDo: check whether really guaranteed to succeed 2nd time around
1614 ASSERT(bf != (StgBlockedFetch *)NULL);
1615 SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
1616 // ToDo: check whether other header info is needed
1618 bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
1619 bf->ga.payload.gc.slot = rga.payload.gc.slot;
1620 bf->ga.weight = rga.weight;
1621 // bf->link = NULL; debugging
1623 IF_PAR_DEBUG(schedule,
1624 fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
1625 bf, info_type((StgClosure*)bf));
1627 fputc('\n',stderr));
1628 return (StgClosure *)bf;
1632 * waitForTermination enters a loop ignoring spurious messages while
1633 * waiting for the termination sequence to be completed.
1635 //@cindex waitForTermination
1637 waitForTermination(void)
1640 rtsPacket p = GetPacket();
1641 processUnexpectedMessage(p);
1646 //@cindex DebugPrintGAGAMap
1648 DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
1652 for (i = 0; i < nGAs; ++i, gagamap += 2)
1653 fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
1654 gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
1655 gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
1658 //@cindex checkGAGAMap
1660 checkGAGAMap(globalAddr *gagamap, int nGAs)
1664 for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
1665 ASSERT(looks_like_ga(gagamap));
1666 ASSERT(looks_like_ga(gagamap+1));
1671 //@cindex freeMsgBuffer
1672 static StgWord **freeMsgBuffer = NULL;
1673 //@cindex freeMsgIndex
1674 static nat *freeMsgIndex = NULL;
1676 //@cindex prepareFreeMsgBuffers
1678 prepareFreeMsgBuffers(void)
1682 /* Allocate the freeMsg buffers just once and then hang onto them. */
1683 if (freeMsgIndex == NULL) {
1684 freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat),
1685 "prepareFreeMsgBuffers (Index)");
1686 freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *),
1687 "prepareFreeMsgBuffers (Buffer)");
1689 for(i = 0; i < nPEs; i++)
1690 if (i != (thisPE-1))
1691 freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
1692 "prepareFreeMsgBuffers (Buffer #i)");
1694 freeMsgBuffer[i] = 0;
1697 /* Initialize the freeMsg buffer pointers to point to the start of their
1699 for (i = 0; i < nPEs; i++)
1700 freeMsgIndex[i] = 0;
1703 //@cindex freeRemoteGA
1705 freeRemoteGA(int pe, globalAddr *ga)
1709 ASSERT(GALAlookup(ga) == NULL);
1711 if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
1713 belch("!! Filled a free message buffer (sending remaining messages indivisually)"));
1715 sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
1718 freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
1719 freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
1720 freeMsgIndex[pe] = i;
1723 ga->weight = 0xdead0add;
1724 ga->payload.gc.gtid = 0xbbbbbbbb;
1725 ga->payload.gc.slot = 0xbbbbbbbb;);
1728 //@cindex sendFreeMessages
1730 sendFreeMessages(void)
1734 for (i = 0; i < nPEs; i++)
1735 if (freeMsgIndex[i] > 0)
1736 sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
1739 /* synchronises with the other PEs. Receives and records in a global
1740 * variable the task-id of SysMan. If this is the main thread (discovered
1741 * in main.lc), identifies itself to SysMan. Finally it receives
1742 * from SysMan an array of the Global Task Ids of each PE, which is
1743 * returned as the value of the function.
1746 #if defined(PAR_TICKY)
1747 /* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
1748 //@cindex stats_CntFreeGA
1750 stats_CntFreeGA (void) { // stats only
1752 // Global statistics: residency of thread and spark pool
1753 if (RtsFlags.ParFlags.ParStats.Global &&
1754 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1757 globalParStats.cnt_free_GA++;
1758 for (i = 0, s = 0; i < nPEs; i++)
1759 s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
1761 if ( s > globalParStats.res_free_GA )
1762 globalParStats.res_free_GA = s;
1765 #endif /* PAR_TICKY */
1767 #endif /* PAR -- whole file */
1769 //@node Index, , Miscellaneous Functions, High Level Communications Routines
1773 //* ACK:: @cindex\s-+ACK
1774 //* DebugPrintGAGAMap:: @cindex\s-+DebugPrintGAGAMap
1775 //* FETCH:: @cindex\s-+FETCH
1776 //* FISH:: @cindex\s-+FISH
1777 //* FREE:: @cindex\s-+FREE
1778 //* RESUME:: @cindex\s-+RESUME
1779 //* SCHEDULE:: @cindex\s-+SCHEDULE
1780 //* blockFetch:: @cindex\s-+blockFetch
1781 //* choosePE:: @cindex\s-+choosePE
1782 //* freeMsgBuffer:: @cindex\s-+freeMsgBuffer
1783 //* freeMsgIndex:: @cindex\s-+freeMsgIndex
1784 //* freeRemoteGA:: @cindex\s-+freeRemoteGA
1785 //* gumPackBuffer:: @cindex\s-+gumPackBuffer
1786 //* initMoreBuffers:: @cindex\s-+initMoreBuffers
1787 //* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers
1788 //* processAck:: @cindex\s-+processAck
1789 //* processFetch:: @cindex\s-+processFetch
1790 //* processFetches:: @cindex\s-+processFetches
1791 //* processFish:: @cindex\s-+processFish
1792 //* processFree:: @cindex\s-+processFree
1793 //* processMessages:: @cindex\s-+processMessages
1794 //* processResume:: @cindex\s-+processResume
1795 //* processSchedule:: @cindex\s-+processSchedule
1796 //* sendAck:: @cindex\s-+sendAck
1797 //* sendFetch:: @cindex\s-+sendFetch
1798 //* sendFish:: @cindex\s-+sendFish
1799 //* sendFree:: @cindex\s-+sendFree
1800 //* sendFreeMessages:: @cindex\s-+sendFreeMessages
1801 //* sendResume:: @cindex\s-+sendResume
1802 //* sendSchedule:: @cindex\s-+sendSchedule
1803 //* unpackAck:: @cindex\s-+unpackAck
1804 //* unpackFetch:: @cindex\s-+unpackFetch
1805 //* unpackFish:: @cindex\s-+unpackFish
1806 //* unpackFree:: @cindex\s-+unpackFree
1807 //* unpackResume:: @cindex\s-+unpackResume
1808 //* unpackSchedule:: @cindex\s-+unpackSchedule
1809 //* waitForTermination:: @cindex\s-+waitForTermination