#include "PosixSource.h"
#include "Rts.h"
-#include "SchedAPI.h"
-#include "Storage.h"
+
+#include "Capability.h"
+#include "Updates.h"
#include "Threads.h"
-#include "RtsFlags.h"
#include "STM.h"
#include "Schedule.h"
#include "Trace.h"
#include "ThreadLabels.h"
+#include "Updates.h"
+#include "Messages.h"
+#include "sm/Storage.h"
/* Next thread ID to allocate.
* LOCK: sched_mutex
currently pri (priority) is only used in a GRAN setup -- HWL
------------------------------------------------------------------------ */
-#if defined(GRAN)
-/* currently pri (priority) is only used in a GRAN setup -- HWL */
-StgTSO *
-createThread(nat size, StgInt pri)
-#else
StgTSO *
createThread(Capability *cap, nat size)
-#endif
{
StgTSO *tso;
nat stack_size;
/* sched_mutex is *not* required */
/* First check whether we should create a thread at all */
-#if defined(PARALLEL_HASKELL)
- /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
- if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
- threadsIgnored++;
- debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
- RtsFlags.ParFlags.maxThreads, advisory_thread_count);
- return END_TSO_QUEUE;
- }
- threadsCreated++;
-#endif
-
-#if defined(GRAN)
- ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
-#endif
// ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
}
+ size = round_to_mblocks(size);
+ tso = (StgTSO *)allocate(cap, size);
+
stack_size = size - TSO_STRUCT_SIZEW;
-
- tso = (StgTSO *)allocateLocal(cap, size);
TICK_ALLOC_TSO(stack_size, 0);
SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
-#if defined(GRAN)
- SET_GRAN_HDR(tso, ThisPE);
-#endif
// Always start with the compiled code evaluator
tso->what_next = ThreadRunGHC;
tso->why_blocked = NotBlocked;
- tso->blocked_exceptions = END_TSO_QUEUE;
- tso->flags = TSO_DIRTY;
+ tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
+ tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
+ tso->flags = 0;
+ tso->dirty = 1;
tso->saved_errno = 0;
tso->bound = NULL;
SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
tso->_link = END_TSO_QUEUE;
- // ToDo: check this
-#if defined(GRAN)
- /* uses more flexible routine in GranSim */
- insertThread(tso, CurrentProc);
-#else
- /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
- * from its creation
- */
-#endif
-
-#if defined(GRAN)
- if (RtsFlags.GranFlags.GranSimStats.Full)
- DumpGranEvent(GR_START,tso);
-#elif defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.ParStats.Full)
- DumpGranEvent(GR_STARTQ,tso);
- /* HACk to avoid SCHEDULE
- LastTSO = tso; */
-#endif
-
/* Link the new thread on the global thread list.
*/
ACQUIRE_LOCK(&sched_mutex);
tso->id = next_thread_id++; // while we have the mutex
- tso->global_link = g0s0->threads;
- g0s0->threads = tso;
+ tso->global_link = g0->threads;
+ g0->threads = tso;
RELEASE_LOCK(&sched_mutex);
-#if defined(DIST)
- tso->dist.priority = MandatoryPriority; //by default that is...
-#endif
-
-#if defined(GRAN)
- tso->gran.pri = pri;
-# if defined(DEBUG)
- tso->gran.magic = TSO_MAGIC; // debugging only
-# endif
- tso->gran.sparkname = 0;
- tso->gran.startedat = CURRENT_TIME;
- tso->gran.exported = 0;
- tso->gran.basicblocks = 0;
- tso->gran.allocs = 0;
- tso->gran.exectime = 0;
- tso->gran.fetchtime = 0;
- tso->gran.fetchcount = 0;
- tso->gran.blocktime = 0;
- tso->gran.blockcount = 0;
- tso->gran.blockedat = 0;
- tso->gran.globalsparks = 0;
- tso->gran.localsparks = 0;
- if (RtsFlags.GranFlags.Light)
- tso->gran.clock = Now; /* local clock */
- else
- tso->gran.clock = 0;
-
- IF_DEBUG(gran,printTSO(tso));
-#elif defined(PARALLEL_HASKELL)
-# if defined(DEBUG)
- tso->par.magic = TSO_MAGIC; // debugging only
-# endif
- tso->par.sparkname = 0;
- tso->par.startedat = CURRENT_TIME;
- tso->par.exported = 0;
- tso->par.basicblocks = 0;
- tso->par.allocs = 0;
- tso->par.exectime = 0;
- tso->par.fetchtime = 0;
- tso->par.fetchcount = 0;
- tso->par.blocktime = 0;
- tso->par.blockcount = 0;
- tso->par.blockedat = 0;
- tso->par.globalsparks = 0;
- tso->par.localsparks = 0;
-#endif
-
-#if defined(GRAN)
- globalGranStats.tot_threads_created++;
- globalGranStats.threads_created_on_PE[CurrentProc]++;
- globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
- globalGranStats.tot_sq_probes++;
-#elif defined(PARALLEL_HASKELL)
- // collect parallel global statistics (currently done together with GC stats)
- if (RtsFlags.ParFlags.ParStats.Global &&
- RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
- //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
- globalParStats.tot_threads_created++;
- }
-#endif
-
-#if defined(GRAN)
- debugTrace(GRAN_DEBUG_pri,
- "==__ schedule: Created TSO %d (%p);",
- CurrentProc, tso, tso->id);
-#elif defined(PARALLEL_HASKELL)
- debugTrace(PAR_DEBUG_verbose,
- "==__ schedule: Created TSO %d (%p); %d threads active",
- (long)tso->id, tso, advisory_thread_count);
-#else
- debugTrace(DEBUG_sched,
- "created thread %ld, stack size = %lx words",
- (long)tso->id, (long)tso->stack_size);
-#endif
- return tso;
-}
+ // ToDo: report the stack size in the event?
+ traceEventCreateThread(cap, tso);
-#if defined(PAR)
-/* RFP:
- all parallel thread creation calls should fall through the following routine.
-*/
-StgTSO *
-createThreadFromSpark(rtsSpark spark)
-{ StgTSO *tso;
- ASSERT(spark != (rtsSpark)NULL);
-// JB: TAKE CARE OF THIS COUNTER! BUGGY
- if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
- { threadsIgnored++;
- barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
- RtsFlags.ParFlags.maxThreads, advisory_thread_count);
- return END_TSO_QUEUE;
- }
- else
- { threadsCreated++;
- tso = createThread(RtsFlags.GcFlags.initialStkSize);
- if (tso==END_TSO_QUEUE)
- barf("createSparkThread: Cannot create TSO");
-#if defined(DIST)
- tso->priority = AdvisoryPriority;
-#endif
- pushClosure(tso,spark);
- addToRunQueue(tso);
- advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
- }
- return tso;
+ return tso;
}
-#endif
/* ---------------------------------------------------------------------------
* Comparing Thread ids.
Fails fatally if the TSO is not on the queue.
-------------------------------------------------------------------------- */
-void
+rtsBool // returns True if we modified queue
removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
{
StgTSO *t, *prev;
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ return rtsFalse;
} else {
*queue = t->_link;
+ return rtsTrue;
}
- return;
}
}
barf("removeThreadFromQueue: not found");
}
-void
+rtsBool // returns True if we modified head or tail
removeThreadFromDeQueue (Capability *cap,
StgTSO **head, StgTSO **tail, StgTSO *tso)
{
StgTSO *t, *prev;
+ rtsBool flag = rtsFalse;
prev = NULL;
for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ flag = rtsFalse;
} else {
*head = t->_link;
+ flag = rtsTrue;
}
if (*tail == tso) {
if (prev) {
} else {
*tail = END_TSO_QUEUE;
}
- }
- return;
+ return rtsTrue;
+ } else {
+ return flag;
+ }
}
}
barf("removeThreadFromMVarQueue: not found");
}
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
- removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
-}
-
/* ----------------------------------------------------------------------------
- unblockOne()
+ tryWakeupThread()
+
+ Attempt to wake up a thread. tryWakeupThread is idempotent: it is
+ always safe to call it too many times, but it is not safe in
+ general to omit a call.
- unblock a single thread.
------------------------------------------------------------------------- */
-#if defined(GRAN)
-STATIC_INLINE void
-unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
{
+ tryWakeupThread_(cap, deRefTSO(tso));
}
-#elif defined(PARALLEL_HASKELL)
-STATIC_INLINE void
-unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+
+void
+tryWakeupThread_ (Capability *cap, StgTSO *tso)
{
- /* write RESUME events to log file and
- update blocked and fetch time (depending on type of the orig closure) */
- if (RtsFlags.ParFlags.ParStats.Full) {
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- if (emptyRunQueue())
- emitSchedule = rtsTrue;
-
- switch (get_itbl(node)->type) {
- case FETCH_ME_BQ:
- ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
- break;
- case RBH:
- case FETCH_ME:
- case BLACKHOLE_BQ:
- ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
- break;
-#ifdef DIST
- case MVAR:
- break;
-#endif
- default:
- barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
- }
- }
-}
+ traceEventThreadWakeup (cap, tso, tso->cap->no);
+
+#ifdef THREADED_RTS
+ if (tso->cap != cap)
+ {
+ MessageWakeup *msg;
+ msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
+ SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
+ msg->tso = tso;
+ sendMessage(cap, tso->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
+ (lnat)tso->id, tso->cap->no);
+ return;
+ }
#endif
-#if defined(GRAN)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- StgTSO *tso;
- PEs node_loc, tso_loc;
-
- node_loc = where_is(node); // should be lifted out of loop
- tso = (StgTSO *)bqe; // wastes an assignment to get the type right
- tso_loc = where_is((StgClosure *)tso);
- if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
- /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
- ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
- CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
- // insertThread(tso, node_loc);
- new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
- ResumeThread,
- tso, node, (rtsSpark*)NULL);
- tso->link = END_TSO_QUEUE; // overwrite link just to be sure
- // len_local++;
- // len++;
- } else { // TSO is remote (actually should be FMBQ)
- CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
- RtsFlags.GranFlags.Costs.gunblocktime +
- RtsFlags.GranFlags.Costs.latency;
- new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
- UnblockThread,
- tso, node, (rtsSpark*)NULL);
- tso->link = END_TSO_QUEUE; // overwrite link just to be sure
- // len++;
+ switch (tso->why_blocked)
+ {
+ case BlockedOnMVar:
+ {
+ if (tso->_link == END_TSO_QUEUE) {
+ tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+ goto unblock;
+ } else {
+ return;
+ }
}
- /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
- IF_GRAN_DEBUG(bq,
- debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
- (node_loc==tso_loc ? "Local" : "Global"),
- tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
- tso->block_info.closure = NULL;
- debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
- tso->id, tso));
-}
-#elif defined(PARALLEL_HASKELL)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- StgBlockingQueueElement *next;
-
- switch (get_itbl(bqe)->type) {
- case TSO:
- ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
- /* if it's a TSO just push it onto the run_queue */
- next = bqe->link;
- ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
- APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
- threadRunnable();
- unblockCount(bqe, node);
- /* reset blocking status after dumping event */
- ((StgTSO *)bqe)->why_blocked = NotBlocked;
- break;
-
- case BLOCKED_FETCH:
- /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
- next = bqe->link;
- bqe->link = (StgBlockingQueueElement *)PendingFetches;
- PendingFetches = (StgBlockedFetch *)bqe;
- break;
-
-# if defined(DEBUG)
- /* can ignore this case in a non-debugging setup;
- see comments on RBHSave closures above */
- case CONSTR:
- /* check that the closure is an RBHSave closure */
- ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
- get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
- get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
- break;
+
+ case BlockedOnMsgThrowTo:
+ {
+ const StgInfoTable *i;
+
+ i = lockClosure(tso->block_info.closure);
+ unlockClosure(tso->block_info.closure, i);
+ if (i != &stg_MSG_NULL_info) {
+ debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
+ (lnat)tso->id, tso->block_info.throwto->header.info);
+ return;
+ }
+
+ // remove the block frame from the stack
+ ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
+ tso->sp += 3;
+ goto unblock;
+ }
+
+ case BlockedOnBlackHole:
+ case BlockedOnSTM:
+ case ThreadMigrating:
+ goto unblock;
default:
- barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
- get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
- (StgClosure *)bqe);
-# endif
+ // otherwise, do nothing
+ return;
}
- IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
- return next;
+
+unblock:
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
}
-#endif
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
+/* ----------------------------------------------------------------------------
+ migrateThread
+ ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
{
- return unblockOne_(cap,tso,rtsTrue); // allow migration
+ traceEventMigrateThread (from, tso, to->no);
+ // ThreadMigrating tells the target cap that it needs to be added to
+ // the run queue when it receives the MSG_TRY_WAKEUP.
+ tso->why_blocked = ThreadMigrating;
+ tso->cap = to;
+ tryWakeupThread(from, tso);
}
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso,
- rtsBool allow_migrate USED_IF_THREADS)
-{
- StgTSO *next;
+/* ----------------------------------------------------------------------------
+ awakenBlockedQueue
- // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
+ wakes up all the threads on the specified queue.
+ ------------------------------------------------------------------------- */
- tso->why_blocked = NotBlocked;
- next = tso->_link;
- tso->_link = END_TSO_QUEUE;
+void
+wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
+{
+ MessageBlackHole *msg;
+ const StgInfoTable *i;
+
+ ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
+ bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
+
+ for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
+ msg = msg->link) {
+ i = msg->header.info;
+ if (i != &stg_IND_info) {
+ ASSERT(i == &stg_MSG_BLACKHOLE_info);
+ tryWakeupThread(cap,msg->tso);
+ }
+ }
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) &&
- allow_migrate &&
- RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
- }
- tso->cap = cap;
- appendToRunQueue(cap,tso);
- // we're holding a newly woken thread, make sure we context switch
- // quickly so we can migrate it if necessary.
- context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(cap, tso->cap, tso);
- }
-#else
- appendToRunQueue(cap,tso);
- context_switch = 1;
+ // overwrite the BQ with an indirection so it will be
+ // collected at the next GC.
+#if defined(DEBUG) && !defined(THREADED_RTS)
+ // XXX FILL_SLOP, but not if THREADED_RTS because in that case
+ // another thread might be looking at this BLOCKING_QUEUE and
+ // checking the owner field at the same time.
+ bq->bh = 0; bq->queue = 0; bq->owner = 0;
#endif
+ OVERWRITE_INFO(bq, &stg_IND_info);
+}
- debugTrace(DEBUG_sched,
- "waking up thread %ld on cap %d",
- (long)tso->id, tso->cap->no);
+// If we update a closure that we know we BLACKHOLE'd, and the closure
+// no longer points to the current TSO as its owner, then there may be
+// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
+// it. We therefore traverse the BLOCKING_QUEUEs attached to the
+// current TSO to see if any can now be woken up.
+void
+checkBlockingQueues (Capability *cap, StgTSO *tso)
+{
+ StgBlockingQueue *bq, *next;
+ StgClosure *p;
- return next;
+ debugTraceCap(DEBUG_sched, cap,
+ "collision occurred; checking blocking queues for thread %ld",
+ (lnat)tso->id);
+
+ for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
+ next = bq->link;
+
+ if (bq->header.info == &stg_IND_info) {
+ // ToDo: could short it out right here, to avoid
+ // traversing this IND multiple times.
+ continue;
+ }
+
+ p = bq->bh;
+
+ if (p->header.info != &stg_BLACKHOLE_info ||
+ ((StgInd *)p)->indirectee != (StgClosure*)bq)
+ {
+ wakeBlockingQueue(cap,bq);
+ }
+ }
}
/* ----------------------------------------------------------------------------
- awakenBlockedQueue
+ updateThunk
- wakes up all the threads on the specified queue.
+ Update a thunk with a value. In order to do this, we need to know
+ which TSO owns (or is evaluating) the thunk, in case we need to
+ awaken any threads that are blocked on it.
------------------------------------------------------------------------- */
-#if defined(GRAN)
void
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
{
- StgBlockingQueueElement *bqe;
- PEs node_loc;
- nat len = 0;
-
- IF_GRAN_DEBUG(bq,
- debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
- node, CurrentProc, CurrentTime[CurrentProc],
- CurrentTSO->id, CurrentTSO));
-
- node_loc = where_is(node);
-
- ASSERT(q == END_BQ_QUEUE ||
- get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
- get_itbl(q)->type == CONSTR); // closure (type constructor)
- ASSERT(is_unique(node));
-
- /* FAKE FETCH: magically copy the node to the tso's proc;
- no Fetch necessary because in reality the node should not have been
- moved to the other PE in the first place
- */
- if (CurrentProc!=node_loc) {
- IF_GRAN_DEBUG(bq,
- debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
- node, node_loc, CurrentProc, CurrentTSO->id,
- // CurrentTSO, where_is(CurrentTSO),
- node->header.gran.procs));
- node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
- IF_GRAN_DEBUG(bq,
- debugBelch("## new bitmask of node %p is %#x\n",
- node, node->header.gran.procs));
- if (RtsFlags.GranFlags.GranSimStats.Global) {
- globalGranStats.tot_fake_fetches++;
+ StgClosure *v;
+ StgTSO *owner;
+ const StgInfoTable *i;
+
+ i = thunk->header.info;
+ if (i != &stg_BLACKHOLE_info &&
+ i != &stg_CAF_BLACKHOLE_info &&
+ i != &__stg_EAGER_BLACKHOLE_info &&
+ i != &stg_WHITEHOLE_info) {
+ updateWithIndirection(cap, thunk, val);
+ return;
}
- }
+
+ v = ((StgInd*)thunk)->indirectee;
- bqe = q;
- // ToDo: check: ASSERT(CurrentProc==node_loc);
- while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
- //next = bqe->link;
- /*
- bqe points to the current element in the queue
- next points to the next element in the queue
- */
- //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
- //tso_loc = where_is(tso);
- len++;
- bqe = unblockOne(bqe, node);
- }
+ updateWithIndirection(cap, thunk, val);
- /* if this is the BQ of an RBH, we have to put back the info ripped out of
- the closure to make room for the anchor of the BQ */
- if (bqe!=END_BQ_QUEUE) {
- ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
- /*
- ASSERT((info_ptr==&RBH_Save_0_info) ||
- (info_ptr==&RBH_Save_1_info) ||
- (info_ptr==&RBH_Save_2_info));
- */
- /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
- ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
- ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
-
- IF_GRAN_DEBUG(bq,
- debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
- node, info_type(node)));
- }
+ i = v->header.info;
+ if (i == &stg_TSO_info) {
+ owner = deRefTSO((StgTSO*)v);
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ }
+ return;
+ }
- /* statistics gathering */
- if (RtsFlags.GranFlags.GranSimStats.Global) {
- // globalGranStats.tot_bq_processing_time += bq_processing_time;
- globalGranStats.tot_bq_len += len; // total length of all bqs awakened
- // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
- globalGranStats.tot_awbq++; // total no. of bqs awakened
- }
- IF_GRAN_DEBUG(bq,
- debugBelch("## BQ Stats of %p: [%d entries] %s\n",
- node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
-}
-#elif defined(PARALLEL_HASKELL)
-void
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
-
- IF_PAR_DEBUG(verbose,
- debugBelch("##-_ AwBQ for node %p on [%x]: \n",
- node, mytid));
-#ifdef DIST
- //RFP
- if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
- IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
- return;
- }
-#endif
-
- ASSERT(q == END_BQ_QUEUE ||
- get_itbl(q)->type == TSO ||
- get_itbl(q)->type == BLOCKED_FETCH ||
- get_itbl(q)->type == CONSTR);
-
- bqe = q;
- while (get_itbl(bqe)->type==TSO ||
- get_itbl(bqe)->type==BLOCKED_FETCH) {
- bqe = unblockOne(bqe, node);
- }
-}
+ if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
+ i != &stg_BLOCKING_QUEUE_DIRTY_info) {
+ checkBlockingQueues(cap, tso);
+ return;
+ }
-#else /* !GRAN && !PARALLEL_HASKELL */
+ owner = deRefTSO(((StgBlockingQueue*)v)->owner);
-void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
-{
- while (tso != END_TSO_QUEUE) {
- tso = unblockOne(cap,tso);
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ } else {
+ wakeBlockingQueue(cap, (StgBlockingQueue*)v);
}
}
-#endif
-
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
break;
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
- debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
+ debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
break;
#endif
case BlockedOnDelay:
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
- case BlockedOnException:
- debugBelch("is blocked on delivering an exception to thread %lu",
- (unsigned long)tso->block_info.tso->id);
- break;
case BlockedOnBlackHole:
- debugBelch("is blocked on a black hole");
+ debugBelch("is blocked on a black hole %p",
+ ((StgBlockingQueue*)tso->block_info.bh->bh));
+ break;
+ case BlockedOnMsgThrowTo:
+ debugBelch("is blocked on a throwto message");
break;
case NotBlocked:
debugBelch("is not blocked");
break;
-#if defined(PARALLEL_HASKELL)
- case BlockedOnGA:
- debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
- tso->block_info.closure, info_type(tso->block_info.closure));
+ case ThreadMigrating:
+ debugBelch("is runnable, but not on the run queue");
break;
- case BlockedOnGA_NoSend:
- debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
- tso->block_info.closure, info_type(tso->block_info.closure));
- break;
-#endif
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;
}
}
+
void
printThreadStatus(StgTSO *t)
{
default:
printThreadBlockage(t);
}
+ if (t->dirty) {
+ debugBelch(" (TSO_DIRTY)");
+ } else if (t->flags & TSO_LINK_DIRTY) {
+ debugBelch(" (TSO_LINK_DIRTY)");
+ }
debugBelch("\n");
}
}
printAllThreads(void)
{
StgTSO *t, *next;
- nat i, s;
+ nat i, g;
Capability *cap;
-# if defined(GRAN)
- char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
- ullong_format_string(TIME_ON_PROC(CurrentProc),
- time_string, rtsFalse/*no commas!*/);
-
- debugBelch("all threads at [%s]:\n", time_string);
-# elif defined(PARALLEL_HASKELL)
- char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
- ullong_format_string(CURRENT_TIME,
- time_string, rtsFalse/*no commas!*/);
-
- debugBelch("all threads at [%s]:\n", time_string);
-# else
debugBelch("all threads:\n");
-# endif
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
}
debugBelch("other threads:\n");
- for (s = 0; s < total_steps; s++) {
- for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
if (t->why_blocked != NotBlocked) {
printThreadStatus(t);
}
debugBelch("%d threads on queue\n", i);
}
-/*
- Print a whole blocking queue attached to node (debugging only).
-*/
-# if defined(PARALLEL_HASKELL)
-void
-print_bq (StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
- StgTSO *tso;
- rtsBool end;
-
- debugBelch("## BQ of closure %p (%s): ",
- node, info_type(node));
-
- /* should cover all closures that may have a blocking queue */
- ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
- get_itbl(node)->type == FETCH_ME_BQ ||
- get_itbl(node)->type == RBH ||
- get_itbl(node)->type == MVAR);
-
- ASSERT(node!=(StgClosure*)NULL); // sanity check
-
- print_bqe(((StgBlockingQueue*)node)->blocking_queue);
-}
-
-/*
- Print a whole blocking queue starting with the element bqe.
-*/
-void
-print_bqe (StgBlockingQueueElement *bqe)
-{
- rtsBool end;
-
- /*
- NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
- */
- for (end = (bqe==END_BQ_QUEUE);
- !end; // iterate until bqe points to a CONSTR
- end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
- bqe = end ? END_BQ_QUEUE : bqe->link) {
- ASSERT(bqe != END_BQ_QUEUE); // sanity check
- ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
- /* types of closures that may appear in a blocking queue */
- ASSERT(get_itbl(bqe)->type == TSO ||
- get_itbl(bqe)->type == BLOCKED_FETCH ||
- get_itbl(bqe)->type == CONSTR);
- /* only BQs of an RBH end with an RBH_Save closure */
- //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
-
- switch (get_itbl(bqe)->type) {
- case TSO:
- debugBelch(" TSO %u (%x),",
- ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
- break;
- case BLOCKED_FETCH:
- debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
- ((StgBlockedFetch *)bqe)->node,
- ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
- ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
- ((StgBlockedFetch *)bqe)->ga.weight);
- break;
- case CONSTR:
- debugBelch(" %s (IP %p),",
- (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
- get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
- get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
- "RBH_Save_?"), get_itbl(bqe));
- break;
- default:
- barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
- info_type((StgClosure *)bqe)); // , node, info_type(node));
- break;
- }
- } /* for */
- debugBelch("\n");
-}
-# elif defined(GRAN)
-void
-print_bq (StgClosure *node)
-{
- StgBlockingQueueElement *bqe;
- PEs node_loc, tso_loc;
- rtsBool end;
-
- /* should cover all closures that may have a blocking queue */
- ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
- get_itbl(node)->type == FETCH_ME_BQ ||
- get_itbl(node)->type == RBH);
-
- ASSERT(node!=(StgClosure*)NULL); // sanity check
- node_loc = where_is(node);
-
- debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
- node, info_type(node), node_loc);
-
- /*
- NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
- */
- for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
- !end; // iterate until bqe points to a CONSTR
- end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
- ASSERT(bqe != END_BQ_QUEUE); // sanity check
- ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
- /* types of closures that may appear in a blocking queue */
- ASSERT(get_itbl(bqe)->type == TSO ||
- get_itbl(bqe)->type == CONSTR);
- /* only BQs of an RBH end with an RBH_Save closure */
- ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
-
- tso_loc = where_is((StgClosure *)bqe);
- switch (get_itbl(bqe)->type) {
- case TSO:
- debugBelch(" TSO %d (%p) on [PE %d],",
- ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
- break;
- case CONSTR:
- debugBelch(" %s (IP %p),",
- (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
- get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
- get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
- "RBH_Save_?"), get_itbl(bqe));
- break;
- default:
- barf("Unexpected closure type %s in blocking queue of %p (%s)",
- info_type((StgClosure *)bqe), node, info_type(node));
- break;
- }
- } /* for */
- debugBelch("\n");
-}
-# endif
-
-#if defined(PARALLEL_HASKELL)
-nat
-run_queue_len(void)
-{
- nat i;
- StgTSO *tso;
-
- for (i=0, tso=run_queue_hd;
- tso != END_TSO_QUEUE;
- i++, tso=tso->link) {
- /* nothing */
- }
-
- return i;
-}
-#endif
-
#endif /* DEBUG */