/* ----------------------------------------------------------------------------
- * Time-stamp: <Wed Mar 29 2000 19:35:36 Stardate: [-30]4578.87 hwloidl>
- * $Id: HLComms.c,v 1.3 2000/03/31 03:09:37 hwloidl Exp $
+ * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
+ * $Id: HLComms.c,v 1.6 2001/08/14 13:40:10 sewardj Exp $
*
* High Level Communications Routines (HLComms.lc)
*
* Contains the high-level routines (i.e. communication
* subsystem independent) used by GUM
*
- * Phil Trinder, Glasgow University, 12 December 1994
- * Adapted for new RTS
- * Phil Trinder, Simon Marlow July 1998
- * H-W. Loidl, Heriot-Watt University, November 1999
+ * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
+ * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
+ * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
*
* ------------------------------------------------------------------------- */
//@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
//@subsection Macros etc
-# ifndef _AIX
-# define NON_POSIX_SOURCE /* so says Solaris */
-# endif
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
//@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
//@subsection Includes
#include "Parallel.h"
#include "GranSimRts.h"
#include "ParallelRts.h"
+#include "Sparks.h"
#include "FetchMe.h" // for BLOCKED_FETCH_info etc
#if defined(DEBUG)
# include "ParallelDebug.h"
#endif
#include "StgMacros.h" // inlined IS_... fcts
+#ifdef DIST
+#include "SchedAPI.h" //for createIOThread
+extern unsigned int context_switch;
+#endif /* DIST */
+
//@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
//@subsection GUM Message Sending and Unpacking Functions
{
ASSERT(rga->weight > 0 && lga->weight > 0);
IF_PAR_DEBUG(fetch,
- belch("** [%x] Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
- mytid,
+ belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
rga->payload.gc.gtid, rga->payload.gc.slot,
lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
load));
GetArgs(buf, 6);
IF_PAR_DEBUG(fetch,
- belch("** [%x] Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
- mytid,
+ belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
(GlobalTaskId) buf[0], (int) buf[1],
(GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
//@cindex sendResume
void
-sendResume(globalAddr *rga, int nelem, rtsPackBuffer *data) // StgPtr data)
+sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
{
- IF_PAR_DEBUG(resume,
- PrintPacket(data);
- belch("[] [%x] Sending Resume for ((%x, %d, %x))",
- mytid,
- rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight));
+ IF_PAR_DEBUG(fetch,
+ belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]",
+ packBuffer->id, nelem,
+ rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
+ rga->payload.gc.gtid));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
+
+ ASSERT(nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
sendOpNV(PP_RESUME, rga->payload.gc.gtid,
- nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data,
+ nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer,
2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
}
//@cindex unpackResume
static void
-unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *data)
+unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
{
long buf[3];
GetArgs(buf, 3);
- IF_PAR_DEBUG(resume,
- belch("[] [%x] Unpacking Resume for ((%x, %d, %x))",
- mytid, mytid,
- (int) buf[1], (unsigned) buf[0]));
-
/*
RESUME event is written in awaken_blocked_queue
DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
lga->payload.gc.gtid = mytid;
lga->payload.gc.slot = (int) buf[1];
- *nelem = (int) buf[2]; // includes PACK_BUFFER_HDR_SIZE;
- GetArgs(data, *nelem);
- *nelem -= PACK_BUFFER_HDR_SIZE;
+ *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
+ GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
+
+ IF_PAR_DEBUG(fetch,
+ belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))",
+ packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
+
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
}
/*
long *p;
int i;
+ if(ngas==0)
+ return; //don't send unnecessary messages!!
+
buffer = (long *) gumPackBuffer;
for(i = 0, p = buffer; i < ngas; i++, p += 6) {
gagamap++;
}
IF_PAR_DEBUG(schedule,
- belch(",, [%x] Sending Ack (%d pairs) to PE %x\n",
- mytid, ngas, task));
+ belch("~^,, Sending Ack (%d pairs) to [%x]\n",
+ ngas, task));
- sendOpN(PP_ACK, task, p - buffer, buffer);
+ sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
}
/*
*ngas = GAarraysize / 6;
IF_PAR_DEBUG(schedule,
- belch(",, [%x] Unpacking Ack (%d pairs) on %x\n",
- mytid, *ngas, mytid));
+ belch("~^,, Unpacking Ack (%d pairs) on [%x]\n",
+ *ngas, mytid));
while (GAarraysize > 0) {
GetArgs(buf, 6);
int age, int history, int hunger)
{
IF_PAR_DEBUG(fish,
- belch("$$ [%x] Sending Fish to %x (%d outstanding fishes)",
- mytid, destPE, outstandingFishes));
+ belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)",
+ destPE, outstandingFishes));
sendOpV(PP_FISH, destPE, 4,
(StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
GetArgs(buf, 4);
IF_PAR_DEBUG(fish,
- belch("$$ [%x] Unpacking Fish from PE %x (age=%d)",
- mytid, (GlobalTaskId) buf[0], (int) buf[1]));
+ belch("~^$$ Unpacking Fish from [%x] (age=%d)",
+ (GlobalTaskId) buf[0], (int) buf[1]));
*origPE = (GlobalTaskId) buf[0];
*age = (int) buf[1];
sendFree(GlobalTaskId pe, int nelem, StgPtr data)
{
IF_PAR_DEBUG(free,
- belch("!! [%x] Sending Free (%d GAs) to %x",
- mytid, nelem/2, pe));
+ belch("~^!! Sending Free (%d GAs) to [%x]",
+ nelem/2, pe));
sendOpN(PP_FREE, pe, nelem, data);
}
*/
//@cindex unpackFree
static void
-unpackFree(int *nelem, rtsPackBuffer *data)
+unpackFree(int *nelem, StgWord *data)
{
long buf[1];
*nelem = (int) buf[0];
IF_PAR_DEBUG(free,
- belch("!! [%x] Unpacking Free (%d GAs)",
- mytid, *nelem/2));
+ belch("~^!! Unpacking Free (%d GAs)",
+ *nelem/2));
GetArgs(data, *nelem);
}
*/
//@cindex sendSchedule
void
-sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *data) // StgPtr data)
+sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
{
IF_PAR_DEBUG(schedule,
- PrintPacket(data);
- belch("-- [%x] Sending Schedule (%d elems) to %x\n",
- mytid, nelem, origPE));
+ belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n",
+ packBuffer->id, nelem, origPE));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
- sendOpN(PP_SCHEDULE, origPE, nelem + PACK_BUFFER_HDR_SIZE, (StgPtr)data);
+ ASSERT(nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+ sendOpN(PP_SCHEDULE, origPE,
+ nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
}
/*
//@cindex unpackSchedule
static void
-unpackSchedule(int *nelem, rtsPackBuffer *data)
+unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
{
- long buf[1];
+ long buf[1];
+
+ /* first, just unpack 1 word containing the total size (including header) */
+ GetArgs(buf, 1);
+ /* no. of elems, not counting the header of the pack buffer */
+ *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
- GetArgs(buf, 1);
- /* no. of elems, not counting the header of the pack buffer */
- *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE;
+ /* automatic cast of flat pvm-data to rtsPackBuffer */
+ GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
- IF_PAR_DEBUG(schedule,
- belch("-- [%x] Unpacking Schedule (%d elems) on %x\n",
- mytid, *nelem));
+ IF_PAR_DEBUG(schedule,
+ belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n",
+ packBuffer->id, *nelem, mytid));
- /* automatic cast of flat pvm-data to rtsPackBuffer */
- GetArgs(data, *nelem + PACK_BUFFER_HDR_SIZE);
+ ASSERT(*nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
}
+#ifdef DIST
+/* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
+void
+sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
+{
+ IF_PAR_DEBUG(schedule,
+ belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n",
+ packBuffer->id, nelem, origPE));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
+
+ ASSERT(nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+ sendOpN(PP_REVAL, origPE,
+ nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
+}
+
+void FinishReval(StgTSO *t)
+{ StgClosure *res;
+ globalAddr ga;
+ nat size;
+ rtsPackBuffer *buffer=NULL;
+
+ ga.payload.gc.slot = t->revalSlot;
+ ga.payload.gc.gtid = t->revalTid;
+ ga.weight = 0;
+
+ //find where the reval result is
+ res = GALAlookup(&ga);
+ ASSERT(res);
+
+ IF_PAR_DEBUG(schedule,
+ printGA(&ga);
+ belch(" needs the result %08x\n",res));
+
+ //send off the result
+ buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
+ ASSERT(buffer != (rtsPackBuffer *)NULL);
+ sendResume(&ga, size, buffer);
+
+ IF_PAR_DEBUG(schedule,
+ belch("@;~) Reval Finished"));
+}
+
+#endif /* DIST */
+
//@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
//@subsection Message-Processing Functions
sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fetch_mess++;
+ }
+
IF_PAR_DEBUG(fetch,
belch("__-> processFetches: Forwarding fetch from %lx to %lx",
mytid, rga.payload.gc.gtid));
belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
closure, info_type(closure)));
- if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
+ if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
// Put current BF back on list
bf->link = (StgBlockingQueueElement *)PendingFetches;
PendingFetches = (StgBlockedFetch *)bf;
// ToDo: check that nothing more has to be done to prepare for GC!
barf("processFetches: out of heap while packing graph; ToDo: call GC here");
- GarbageCollect(GetRoots);
+ GarbageCollect(GetRoots, rtsFalse);
bf = PendingFetches;
PendingFetches = (StgBlockedFetch *)(bf->link);
closure = bf->node;
- packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
+ packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
ASSERT(packBuffer != (rtsPackBuffer *)NULL);
}
rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
rga.weight = bf->ga.weight;
sendResume(&rga, size, packBuffer);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_resume_mess++;
+ }
}
}
PendingFetches = END_BF_QUEUE;
#endif
+/*
+ Way of dealing with unwanted fish.
+ Used during startup/shutdown, or from unknown PEs
+*/
+void
+bounceFish(void) {
+ GlobalTaskId origPE;
+ int age, history, hunger;
+
+ /* IF_PAR_DEBUG(verbose, */
+ belch(".... [%x] Bouncing unwanted FISH",mytid);
+
+ unpackFish(&origPE, &age, &history, &hunger);
+
+ if (origPE == mytid) {
+ //fishing = rtsFalse; // fish has come home
+ outstandingFishes--;
+ last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
+ return; // that's all
+ }
+
+ /* otherwise, send it home to die */
+ sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fish_mess++;
+ }
+}
+
/*
* processFish unpacks a fish message, reissuing it if it's our own,
* sending work if we have it or sending it onwards otherwise.
IF_PAR_DEBUG(fish,
belch("$$__ processing fish; %d sparks available",
spark_queue_len(&(MainRegTable.rSparks))));
- while ((spark = findSpark()) != NULL) {
+ while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
nat size;
// StgClosure *graph;
packBuffer = gumPackBuffer;
ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
- if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size)) == NULL) {
+ if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
IF_PAR_DEBUG(fish,
belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
(StgClosure *)spark));
barf("processFish: out of heap while packing graph; ToDo: call GC here");
- GarbageCollect(GetRoots);
+ GarbageCollect(GetRoots, rtsFalse);
/* Now go back and try again */
} else {
+ IF_PAR_DEBUG(verbose,
+ if (RtsFlags.ParFlags.ParStats.Sparks)
+ belch("==== STEALING spark %x; sending to %x", spark, origPE));
+
IF_PAR_DEBUG(fish,
belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
origPE,
(StgClosure *)spark, info_type((StgClosure *)spark)));
sendSchedule(origPE, size, packBuffer);
disposeSpark(spark);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_schedule_mess++;
+ }
+
break;
}
}
belch("$$^^ No sparks available for FISH from %x",
origPE));
/* We have no sparks to give */
- if (age < FISH_LIFE_EXPECTANCY)
+ if (age < FISH_LIFE_EXPECTANCY) {
/* and the fish is atill young, send it to another PE to look for work */
sendFish(choosePE(), origPE,
(age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
- /* otherwise, send it home to die */
- else
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fish_mess++;
+ }
+ } else { /* otherwise, send it home to die */
sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fish_mess++;
+ }
}
+ }
} /* processFish */
/*
if (ip->type == FETCH_ME) {
/* Forward the Fetch to someone else */
sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fetch_mess++;
+ }
} else if (rga.payload.gc.gtid == mytid) {
/* Our own FETCH forwarded back around to us */
StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
IF_PAR_DEBUG(fetch,
belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
- closure, info_type(closure), fmbq, info_type(fmbq)));
+ closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
/* We may have already discovered that the fetch target is our own. */
if ((StgClosure *)fmbq != closure)
CommonUp((StgClosure *)fmbq, closure);
/* This includes RBH's and FMBQ's */
StgBlockedFetch *bf;
+ /* Can we assert something on the remote GA? */
ASSERT(GALAlookup(&rga) == NULL);
/* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
closure into the BQ in order to denote that when updating this node
the result should be sent to the originator of this fetch message. */
bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
- blockFetch(bf, closure);
-
IF_PAR_DEBUG(fetch,
belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
closure, info_type(closure)));
- } else {
- /* The target of the FetchMe is some local graph */
- nat size;
- // StgClosure *graph;
- rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
-
- if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
- barf("processFetch: out of heap while packing graph; ToDo: call GC here");
- GarbageCollect(GetRoots);
- closure = GALAlookup(&ga);
- buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
- ASSERT(buffer != (rtsPackBuffer *)NULL);
- }
- sendResume(&rga, size, buffer);
+ blockFetch(bf, closure);
+ } else {
+ /* The target of the FetchMe is some local graph */
+ nat size;
+ // StgClosure *graph;
+ rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
+
+ if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
+ barf("processFetch: out of heap while packing graph; ToDo: call GC here");
+ GarbageCollect(GetRoots, rtsFalse);
+ closure = GALAlookup(&ga);
+ buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
+ ASSERT(buffer != (rtsPackBuffer *)NULL);
}
+ sendResume(&rga, size, buffer);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_resume_mess++;
+ }
+ }
+}
+
+/*
+ The list of pending fetches must be a root-list for GC.
+ This routine is called from GC.c (same as marking GAs etc).
+*/
+void
+markPendingFetches(rtsBool major_gc) {
+
+ /* No need to traverse the list; this is done via the scavenge code
+ for a BLOCKED_FETCH closure, which evacuates the link field */
+
+ if (PendingFetches != END_BF_QUEUE ) {
+ IF_PAR_DEBUG(tables,
+ fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
+ PendingFetches));
+
+ PendingFetches = MarkRoot((StgClosure*)PendingFetches);
+
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, " %p\n", PendingFetches));
+
+ } else {
+ IF_PAR_DEBUG(tables,
+ fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
+ }
}
/*
globalAddr lga;
globalAddr *gagamap;
- packBuffer = gumPackBuffer;
- unpackResume(&lga, &nelem, (StgPtr)packBuffer);
+ packBuffer = (rtsPackBuffer *)gumPackBuffer;
+ unpackResume(&lga, &nelem, packBuffer);
- IF_PAR_DEBUG(resume,
+ IF_PAR_DEBUG(fetch,
fprintf(stderr, "[]__ Rcvd Resume for ");
printGA(&lga);
- fputc('\n', stderr);
+ fputc('\n', stderr));
+ IF_PAR_DEBUG(packet,
PrintPacket((rtsPackBuffer *)packBuffer));
/*
old = GALAlookup(&lga);
+ /* ToDo: The closure that requested this graph must be one of these two?*/
+ ASSERT(get_itbl(old)->type == FETCH_ME_BQ ||
+ get_itbl(old)->type == RBH);
+
if (RtsFlags.ParFlags.ParStats.Full) {
- // StgTSO *tso = END_TSO_QUEUE;
- StgBlockingQueueElement *bqe;
+ StgBlockingQueueElement *bqe, *last_bqe;
+
+ IF_PAR_DEBUG(fetch,
+ belch("[]-- Resume is REPLY to closure %lx", old));
/* Write REPLY events to the log file, indicating that the remote
- data has arrived */
- if (get_itbl(old)->type == FETCH_ME_BQ ||
- get_itbl(old)->type == RBH)
- for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue;
- bqe->link != END_BQ_QUEUE;
- bqe = bqe->link)
- if (get_itbl((StgClosure *)bqe)->type == TSO)
- DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
- GR_REPLY, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
- 0, spark_queue_len(&(MainRegTable.rSparks)));
+ data has arrived
+ NB: we emit a REPLY only for the *last* elem in the queue; this is
+ the one that triggered the fetch message; all other entries
+ have just added themselves to the queue, waiting for the data
+ they know that has been requested (see entry code for FETCH_ME_BQ)
+ */
+ if ((get_itbl(old)->type == FETCH_ME_BQ ||
+ get_itbl(old)->type == RBH)) {
+ for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
+ last_bqe = END_BQ_QUEUE;
+ get_itbl(bqe)->type==TSO ||
+ get_itbl(bqe)->type==BLOCKED_FETCH;
+ last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
+
+ ASSERT(last_bqe==END_BQ_QUEUE ||
+ get_itbl((StgClosure *)last_bqe)->type == TSO);
+
+ /* last_bqe now points to the TSO that triggered the FETCH */
+ if (get_itbl((StgClosure *)last_bqe)->type == TSO)
+ DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
+ GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
+ 0, spark_queue_len(&(MainRegTable.rSparks)));
+ }
}
newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
if (get_itbl(old)->type == FETCH_ME_BQ)
CommonUp(old, newGraph);
+ IF_PAR_DEBUG(fetch,
+ belch("[]-- Ready to resume unpacked graph at %p (%s)",
+ newGraph, info_type(newGraph)));
+
IF_PAR_DEBUG(tables,
DebugPrintGAGAMap(gagamap, nGAs));
static void
processSchedule(GlobalTaskId sender)
{
- nat nelem, space_required, nGAs;
+ nat nelem, nGAs;
rtsBool success;
static rtsPackBuffer *packBuffer;
StgClosure *newGraph;
packBuffer = gumPackBuffer; /* HWL */
unpackSchedule(&nelem, packBuffer);
+ IF_PAR_DEBUG(schedule,
+ belch("--__ Rcvd Schedule (%d elems)", nelem));
IF_PAR_DEBUG(packet,
- belch("--__ Rcvd Schedule (%d elems)", nelem);
PrintPacket(packBuffer));
/*
ASSERT(newGraph != NULL);
success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
- IF_PAR_DEBUG(packet,
+ if (RtsFlags.ParFlags.ParStats.Full &&
+ RtsFlags.ParFlags.ParStats.Sparks &&
+ success)
+ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
+ GR_STOLEN, ((StgTSO *)NULL), newGraph,
+ 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+
+ IF_PAR_DEBUG(schedule,
if (success)
- belch("--^^ added spark to unpacked graph %p; %d sparks available on [%x]",
- newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
+ belch("--^^ added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)",
+ newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
else
- belch("--^^ received non-sparkable closure %p; nothing added to spark pool; %d sparks available on [%x]",
- newGraph, spark_queue_len(&(MainRegTable.rSparks)), mytid);
+ belch("--^^ received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]",
+ newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
+ IF_PAR_DEBUG(packet,
belch("*< Unpacked graph with root at %p (%s):",
newGraph, info_type(newGraph));
PrintGraph(newGraph, 0));
IF_PAR_DEBUG(tables,
DebugPrintGAGAMap(gagamap, nGAs));
- if (nGAs > 0)
- sendAck(sender, nGAs, gagamap);
+ sendAck(sender, nGAs, gagamap);
//fishing = rtsFalse;
ASSERT(outstandingFishes>0);
ASSERT(get_itbl(old_closure)==RBH);
*/
if (get_itbl(old_closure)->type==RBH)
- convertToFetchMe(old_closure, ga);
+ convertToFetchMe((StgRBH *)old_closure, ga);
} else {
/*
* Oops...we've got this one already; update the RBH to
IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
}
+#ifdef DIST
+
+void
+bounceReval(void) {
+ barf("Task %x: TODO: should send NACK in response to REVAL",mytid);
+}
+
+static void
+processReval(GlobalTaskId sender) //similar to schedule...
+{ nat nelem, space_required, nGAs;
+ static rtsPackBuffer *packBuffer;
+ StgClosure *newGraph;
+ globalAddr *gagamap;
+ StgTSO* tso;
+ globalAddr *ga;
+
+ packBuffer = gumPackBuffer; /* HWL */
+ unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
+
+ IF_PAR_DEBUG(packet,
+ belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
+ PrintPacket(packBuffer));
+
+ /*
+ space_required = packBuffer[0];
+ if (SAVE_Hp + space_required >= SAVE_HpLim) {
+ ReallyPerformThreadGC(space_required, rtsFalse);
+ SAVE_Hp -= space_required;
+ }
+ */
+
+ // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
+ newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
+ ASSERT(newGraph != NULL);
+
+ IF_PAR_DEBUG(packet,
+ belch("@;~) Unpacked graph with root at %p (%s):",
+ newGraph, info_type(newGraph));
+ PrintGraph(newGraph, 0));
+
+ IF_PAR_DEBUG(tables,
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ IF_PAR_DEBUG(tables,
+ printLAGAtable();
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ //We don't send an Ack to the head!!!!
+ ASSERT(nGAs>0);
+ sendAck(sender, nGAs-1, gagamap+2);
+
+ IF_PAR_DEBUG(verbose,
+ belch("@;~) About to create Reval thread on behalf of %x",
+ sender));
+
+ tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
+ tso->priority=RevalPriority;
+ tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
+ tso->revalTid =gagamap->payload.gc.gtid;
+ scheduleThread(tso);
+ context_switch = 1; // switch at the earliest opportunity
+}
+#endif
+
+
//@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
//@subsection GUM Message Processor
*/
//@cindex processMessages
-void
+rtsBool
processMessages(void)
{
rtsPacket packet;
OpCode opcode;
GlobalTaskId task;
-
+ rtsBool receivedFinish = rtsFalse;
+
do {
packet = GetPacket(); /* Get next message; block until one available */
getOpcodeAndSender(packet, &opcode, &task);
- switch (opcode) {
- case PP_FINISH:
- IF_PAR_DEBUG(verbose,
- belch("==== received FINISH [%p]", mytid));
- /* setting this global variables eventually terminates the main
- scheduling loop for this PE and causes a shut-down, sending
- PP_FINISH to SysMan */
- GlobalStopPending = rtsTrue;
- break;
-
- case PP_FETCH:
- processFetch();
- break;
-
- case PP_RESUME:
- processResume(task);
- break;
-
- case PP_ACK:
- processAck();
- break;
-
- case PP_FISH:
- processFish();
- break;
-
- case PP_FREE:
- processFree();
- break;
+ if (task==SysManTask) {
+ switch (opcode) {
+ case PP_PETIDS:
+ processPEtids();
+ break;
+
+ case PP_FINISH:
+ IF_PAR_DEBUG(verbose,
+ belch("==== received FINISH [%p]", mytid));
+ /* this boolean value is returned and propagated to the main
+ scheduling loop, thus shutting-down this PE */
+ receivedFinish = rtsTrue;
+ break;
+
+ default:
+ barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
+ }
+ } else if (taskIDtoPE(task)==0) {
+ /* When a new PE joins then potentially FISH & REVAL message may
+ reach PES before they are notified of the new PEs existance. The
+ only solution is to bounce/fail these messages back to the sender.
+ But we will worry about it once we start seeing these race
+ conditions! */
+ switch (opcode) {
+ case PP_FISH:
+ bounceFish();
+ break;
+#ifdef DIST
+ case PP_REVAL:
+ bounceReval();
+ break;
+#endif
+ case PP_PETIDS:
+ belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
+ break;
+
+ case PP_FINISH:
+ break;
+
+ default:
+ belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
+ }
+ } else
+ switch (opcode) {
+ case PP_FETCH:
+ processFetch();
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_fetch_mess++;
+ }
+ break;
+
+ case PP_RESUME:
+ processResume(task);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_resume_mess++;
+ }
+ break;
+
+ case PP_ACK:
+ processAck();
+ break;
+
+ case PP_FISH:
+ processFish();
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_fish_mess++;
+ }
+ break;
+
+ case PP_FREE:
+ processFree();
+ break;
- case PP_SCHEDULE:
- processSchedule(task);
- break;
-
- default:
- /* Anything we're not prepared to deal with. */
- barf("Task %x: Unexpected opcode %x from %x",
- mytid, opcode, task);
- } /* switch */
+ case PP_SCHEDULE:
+ processSchedule(task);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_schedule_mess++;
+ }
+ break;
+
+#ifdef DIST
+ case PP_REVAL:
+ processReval(task);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_reval_mess++;
+ }
+ break;
+#endif
+
+ default:
+ /* Anything we're not prepared to deal with. */
+ barf("Task %x: Unexpected opcode %x from %x",
+ mytid, opcode, task);
+ } /* switch */
} while (PacketsWaiting()); /* While there are messages: process them */
+ return receivedFinish;
} /* processMessages */
//@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
switch (get_itbl(bh)->type) {
case BLACKHOLE:
bf->link = END_BQ_QUEUE;
- //((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
- SET_INFO(bh, &BLACKHOLE_BQ_info); // turn closure into a blocking queue
+ //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
+ SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
// put bh on the mutables list
(StgClosure *)bh, get_itbl((StgClosure *)bh),
info_type((StgClosure *)bh));
}
- IF_PAR_DEBUG(schedule,
+ IF_PAR_DEBUG(bq,
belch("##++ blockFetch: after block the BQ of %p (%s) is:",
bh, info_type(bh));
print_bq(bh));
/*
- blockThread is called from the main scheduler whenever tso returns with
+ @blockThread@ is called from the main scheduler whenever tso returns with
a ThreadBlocked return code; tso has already been added to a blocking
queue (that's done in the entry code of the closure, because it is a
cheap operation we have to do in any case); the main purpose of this
void
blockThread(StgTSO *tso)
{
- globalAddr *remote_ga;
+ globalAddr *remote_ga=NULL;
globalAddr *local_ga;
globalAddr fmbq_ga;
end this point; if something (eg. GC) happens inbetween the whole
thing will blow up
The problem is that the ga field of the FETCH_ME has been overwritten
- with the head of the blocking (which is tso).
+ with the head of the blocking queue (which is tso).
*/
- //ASSERT(looks_like_ga((globalAddr *)tso->link));
- ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
- remote_ga = (globalAddr *)tso->link; // ((StgFetchMe *)tso->block_info.closure)->ga;
- tso->link = END_BQ_QUEUE;
+ ASSERT(looks_like_ga(&theGlobalFromGA));
+ // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
+ remote_ga = &theGlobalFromGA; //tso->link;
+ tso->link = (StgTSO*)END_BQ_QUEUE;
/* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
we have to send a Fetch message here! */
if (RtsFlags.ParFlags.ParStats.Full) {
/* Note that CURRENT_TIME may perform an unsafe call */
- //rtsTime now = CURRENT_TIME; /* Now */
tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
tso->par.fetchcount++;
tso->par.blockedat = CURRENT_TIME;
/* we are about to send off a FETCH message, so dump a FETCH event */
DumpRawGranEvent(CURRENT_PROC,
taskIDtoPE(remote_ga->payload.gc.gtid),
- GR_FETCH, tso, tso->block_info.closure, 0);
+ GR_FETCH, tso, tso->block_info.closure, 0, 0);
}
/* Phil T. claims that this was a workaround for a hard-to-find
* bug, hence I'm leaving it out for now --SDM
/* Assign a brand-new global address to the newly created FMBQ */
local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
splitWeight(&fmbq_ga, local_ga);
- ASSERT(fmbq_ga.weight == 1L << (BITS_IN(unsigned) - 1));
+ ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fetch_mess++;
+ }
+
+ IF_DEBUG(sanity,
+ theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
break;
case BlockedOnGA_NoSend:
/* Fetch message has been sent already */
if (RtsFlags.ParFlags.ParStats.Full) {
/* Note that CURRENT_TIME may perform an unsafe call */
- //rtsTime now = CURRENT_TIME; /* Now */
tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
tso->par.blockcount++;
tso->par.blockedat = CURRENT_TIME;
/* dump a block event, because fetch has been sent already */
DumpRawGranEvent(CURRENT_PROC, thisPE,
- GR_BLOCK, tso, tso->block_info.closure, 0);
+ GR_BLOCK, tso, tso->block_info.closure, 0, 0);
}
break;
+ case BlockedOnMVar:
case BlockedOnBlackHole:
/* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via
BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
- ASSERT(get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
+ ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
+ get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
get_itbl(tso->block_info.closure)->type==RBH);
/* if collecting stats update the execution time etc */
if (RtsFlags.ParFlags.ParStats.Full) {
/* Note that CURRENT_TIME may perform an unsafe call */
- //rtsTime now = CURRENT_TIME; /* Now */
tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
tso->par.blockcount++;
tso->par.blockedat = CURRENT_TIME;
DumpRawGranEvent(CURRENT_PROC, thisPE,
- GR_BLOCK, tso, tso->block_info.closure, 0);
+ GR_BLOCK, tso, tso->block_info.closure, 0, 0);
}
break;
-
+
+ case BlockedOnDelay:
+ /* Whats sort of stats shall we collect for an explicit threadDelay? */
+ IF_PAR_DEBUG(verbose,
+ belch("##++ blockThread: TSO %d blocked on ThreadDelay",
+ tso->id));
+ break;
+
+ /* Check that the following is impossible to happen, indeed
+ case BlockedOnException:
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ */
default:
barf("blockThread: impossible why_blocked code %d for TSO %d",
tso->why_blocked, tso->id);
}
- IF_PAR_DEBUG(schedule,
- belch("##++ blockThread: TSO %d blocked on closure %p (%s)",
- tso->id, tso->block_info.closure, info_type(tso->block_info.closure)));
+ IF_PAR_DEBUG(verbose,
+ belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
+ tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
+ (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
+
+ IF_PAR_DEBUG(bq,
+ print_bq(tso->block_info.closure));
}
/*
StgClosure *closure;
closure = GALAlookup(&ga);
- if ((bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch))) == NULL) {
+ if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
- GarbageCollect(GetRoots);
+ GarbageCollect(GetRoots, rtsFalse);
closure = GALAlookup(&ga);
- bf = (StgBlockedFetch *)allocate(FIXED_HS + sizeofW(StgBlockedFetch));
+ bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
// ToDo: check whether really guaranteed to succeed 2nd time around
}
- ASSERT(bf != (StgClosure *)NULL);
- SET_INFO((StgClosure *)bf, &BLOCKED_FETCH_info);
+ ASSERT(bf != (StgBlockedFetch *)NULL);
+ SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
// ToDo: check whether other header info is needed
bf->node = closure;
bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
IF_PAR_DEBUG(schedule,
fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
- bf, info_type(bf), closure);
+ bf, info_type((StgClosure*)bf));
printGA(&(bf->ga));
fputc('\n',stderr));
- return bf;
+ return (StgClosure *)bf;
}
/*
{
do {
rtsPacket p = GetPacket();
- processUnexpected(p);
+ processUnexpectedMessage(p);
} while (rtsTrue);
}
{
nat i;
- for (i = 0; i < nGAs; ++i, gagamap += 2) {
+ for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
ASSERT(looks_like_ga(gagamap));
ASSERT(looks_like_ga(gagamap+1));
}
"prepareFreeMsgBuffers (Buffer)");
for(i = 0; i < nPEs; i++)
- if (i != thisPE)
+ if (i != (thisPE-1))
freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
"prepareFreeMsgBuffers (Buffer #i)");
+ else
+ freeMsgBuffer[i] = 0;
}
/* Initialize the freeMsg buffer pointers to point to the start of their
sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
}
+/* synchronises with the other PEs. Receives and records in a global
+ * variable the task-id of SysMan. If this is the main thread (discovered
+ * in main.lc), identifies itself to SysMan. Finally it receives
+ * from SysMan an array of the Global Task Ids of each PE, which is
+ * returned as the value of the function.
+ */
+
+#if defined(PAR_TICKY)
+/* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
+//@cindex stats_CntFreeGA
+void
+stats_CntFreeGA (void) { // stats only
+
+ // Global statistics: residency of thread and spark pool
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ nat i, s;
+
+ globalParStats.cnt_free_GA++;
+ for (i = 0, s = 0; i < nPEs; i++)
+ s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
+
+ if ( s > globalParStats.res_free_GA )
+ globalParStats.res_free_GA = s;
+ }
+}
+#endif /* PAR_TICKY */
+
#endif /* PAR -- whole file */
//@node Index, , Miscellaneous Functions, High Level Communications Routines