X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FThreads.c;h=7344134a7d5a0d54e6a1d6bc60aa66c7c8277842;hp=b7f62c8f07fad55734c6446fe3ebf42d63e4e8b6;hb=83d563cb9ede0ba792836e529b1e2929db926355;hpb=200c73fdfea734765c48309cc8dcbcf44b69c8c5 diff --git a/rts/Threads.c b/rts/Threads.c index b7f62c8..7344134 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -8,14 +8,17 @@ #include "PosixSource.h" #include "Rts.h" -#include "SchedAPI.h" -#include "Storage.h" + +#include "Capability.h" +#include "Updates.h" #include "Threads.h" -#include "RtsFlags.h" #include "STM.h" #include "Schedule.h" #include "Trace.h" #include "ThreadLabels.h" +#include "Updates.h" +#include "Messages.h" +#include "sm/Storage.h" /* Next thread ID to allocate. * LOCK: sched_mutex @@ -47,14 +50,8 @@ static StgThreadID next_thread_id = 1; currently pri (priority) is only used in a GRAN setup -- HWL ------------------------------------------------------------------------ */ -#if defined(GRAN) -/* currently pri (priority) is only used in a GRAN setup -- HWL */ -StgTSO * -createThread(nat size, StgInt pri) -#else StgTSO * createThread(Capability *cap, nat size) -#endif { StgTSO *tso; nat stack_size; @@ -62,20 +59,6 @@ createThread(Capability *cap, nat size) /* sched_mutex is *not* required */ /* First check whether we should create a thread at all */ -#if defined(PARALLEL_HASKELL) - /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ - if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { - threadsIgnored++; - debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", - RtsFlags.ParFlags.maxThreads, advisory_thread_count); - return END_TSO_QUEUE; - } - threadsCreated++; -#endif - -#if defined(GRAN) - ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); -#endif // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW @@ -84,22 +67,23 @@ createThread(Capability *cap, nat size) size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; } + size = round_to_mblocks(size); + tso = (StgTSO *)allocate(cap, size); + stack_size = size - TSO_STRUCT_SIZEW; - - tso = (StgTSO *)allocateLocal(cap, size); TICK_ALLOC_TSO(stack_size, 0); SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); -#if defined(GRAN) - SET_GRAN_HDR(tso, ThisPE); -#endif // Always start with the compiled code evaluator tso->what_next = ThreadRunGHC; tso->why_blocked = NotBlocked; - tso->blocked_exceptions = END_TSO_QUEUE; - tso->flags = TSO_DIRTY; + tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; + tso->bq = (StgBlockingQueue *)END_TSO_QUEUE; + tso->flags = 0; + tso->dirty = 1; tso->saved_errno = 0; tso->bound = NULL; @@ -121,141 +105,19 @@ createThread(Capability *cap, nat size) SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); tso->_link = END_TSO_QUEUE; - // ToDo: check this -#if defined(GRAN) - /* uses more flexible routine in GranSim */ - insertThread(tso, CurrentProc); -#else - /* In a non-GranSim setup the pushing of a TSO onto the runq is separated - * from its creation - */ -#endif - -#if defined(GRAN) - if (RtsFlags.GranFlags.GranSimStats.Full) - DumpGranEvent(GR_START,tso); -#elif defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.ParStats.Full) - DumpGranEvent(GR_STARTQ,tso); - /* HACk to avoid SCHEDULE - LastTSO = tso; */ -#endif - /* Link the new thread on the global thread list. */ ACQUIRE_LOCK(&sched_mutex); tso->id = next_thread_id++; // while we have the mutex - tso->global_link = g0s0->threads; - g0s0->threads = tso; + tso->global_link = g0->threads; + g0->threads = tso; RELEASE_LOCK(&sched_mutex); -#if defined(DIST) - tso->dist.priority = MandatoryPriority; //by default that is... -#endif - -#if defined(GRAN) - tso->gran.pri = pri; -# if defined(DEBUG) - tso->gran.magic = TSO_MAGIC; // debugging only -# endif - tso->gran.sparkname = 0; - tso->gran.startedat = CURRENT_TIME; - tso->gran.exported = 0; - tso->gran.basicblocks = 0; - tso->gran.allocs = 0; - tso->gran.exectime = 0; - tso->gran.fetchtime = 0; - tso->gran.fetchcount = 0; - tso->gran.blocktime = 0; - tso->gran.blockcount = 0; - tso->gran.blockedat = 0; - tso->gran.globalsparks = 0; - tso->gran.localsparks = 0; - if (RtsFlags.GranFlags.Light) - tso->gran.clock = Now; /* local clock */ - else - tso->gran.clock = 0; - - IF_DEBUG(gran,printTSO(tso)); -#elif defined(PARALLEL_HASKELL) -# if defined(DEBUG) - tso->par.magic = TSO_MAGIC; // debugging only -# endif - tso->par.sparkname = 0; - tso->par.startedat = CURRENT_TIME; - tso->par.exported = 0; - tso->par.basicblocks = 0; - tso->par.allocs = 0; - tso->par.exectime = 0; - tso->par.fetchtime = 0; - tso->par.fetchcount = 0; - tso->par.blocktime = 0; - tso->par.blockcount = 0; - tso->par.blockedat = 0; - tso->par.globalsparks = 0; - tso->par.localsparks = 0; -#endif - -#if defined(GRAN) - globalGranStats.tot_threads_created++; - globalGranStats.threads_created_on_PE[CurrentProc]++; - globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); - globalGranStats.tot_sq_probes++; -#elif defined(PARALLEL_HASKELL) - // collect parallel global statistics (currently done together with GC stats) - if (RtsFlags.ParFlags.ParStats.Global && - RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); - globalParStats.tot_threads_created++; - } -#endif - -#if defined(GRAN) - debugTrace(GRAN_DEBUG_pri, - "==__ schedule: Created TSO %d (%p);", - CurrentProc, tso, tso->id); -#elif defined(PARALLEL_HASKELL) - debugTrace(PAR_DEBUG_verbose, - "==__ schedule: Created TSO %d (%p); %d threads active", - (long)tso->id, tso, advisory_thread_count); -#else - debugTrace(DEBUG_sched, - "created thread %ld, stack size = %lx words", - (long)tso->id, (long)tso->stack_size); -#endif - return tso; -} + // ToDo: report the stack size in the event? + traceEventCreateThread(cap, tso); -#if defined(PAR) -/* RFP: - all parallel thread creation calls should fall through the following routine. -*/ -StgTSO * -createThreadFromSpark(rtsSpark spark) -{ StgTSO *tso; - ASSERT(spark != (rtsSpark)NULL); -// JB: TAKE CARE OF THIS COUNTER! BUGGY - if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) - { threadsIgnored++; - barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)", - RtsFlags.ParFlags.maxThreads, advisory_thread_count); - return END_TSO_QUEUE; - } - else - { threadsCreated++; - tso = createThread(RtsFlags.GcFlags.initialStkSize); - if (tso==END_TSO_QUEUE) - barf("createSparkThread: Cannot create TSO"); -#if defined(DIST) - tso->priority = AdvisoryPriority; -#endif - pushClosure(tso,spark); - addToRunQueue(tso); - advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY - } - return tso; + return tso; } -#endif /* --------------------------------------------------------------------------- * Comparing Thread ids. @@ -291,7 +153,7 @@ rts_getThreadId(StgPtr tso) Fails fatally if the TSO is not on the queue. -------------------------------------------------------------------------- */ -void +rtsBool // returns True if we modified queue removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) { StgTSO *t, *prev; @@ -301,28 +163,32 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + return rtsFalse; } else { *queue = t->_link; + return rtsTrue; } - return; } } barf("removeThreadFromQueue: not found"); } -void +rtsBool // returns True if we modified head or tail removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso) { StgTSO *t, *prev; + rtsBool flag = rtsFalse; prev = NULL; for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) { if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + flag = rtsFalse; } else { *head = t->_link; + flag = rtsTrue; } if (*tail == tso) { if (prev) { @@ -330,328 +196,231 @@ removeThreadFromDeQueue (Capability *cap, } else { *tail = END_TSO_QUEUE; } - } - return; + return rtsTrue; + } else { + return flag; + } } } barf("removeThreadFromMVarQueue: not found"); } -void -removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso) -{ - removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso); -} - /* ---------------------------------------------------------------------------- - unblockOne() + tryWakeupThread() + + Attempt to wake up a thread. tryWakeupThread is idempotent: it is + always safe to call it too many times, but it is not safe in + general to omit a call. - unblock a single thread. ------------------------------------------------------------------------- */ -#if defined(GRAN) -STATIC_INLINE void -unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) +void +tryWakeupThread (Capability *cap, StgTSO *tso) { + tryWakeupThread_(cap, deRefTSO(tso)); } -#elif defined(PARALLEL_HASKELL) -STATIC_INLINE void -unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) + +void +tryWakeupThread_ (Capability *cap, StgTSO *tso) { - /* write RESUME events to log file and - update blocked and fetch time (depending on type of the orig closure) */ - if (RtsFlags.ParFlags.ParStats.Full) { - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - if (emptyRunQueue()) - emitSchedule = rtsTrue; - - switch (get_itbl(node)->type) { - case FETCH_ME_BQ: - ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; - break; - case RBH: - case FETCH_ME: - case BLACKHOLE_BQ: - ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; - break; -#ifdef DIST - case MVAR: - break; -#endif - default: - barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue"); - } - } -} + traceEventThreadWakeup (cap, tso, tso->cap->no); + +#ifdef THREADED_RTS + if (tso->cap != cap) + { + MessageWakeup *msg; + msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup)); + SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM); + msg->tso = tso; + sendMessage(cap, tso->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d", + (lnat)tso->id, tso->cap->no); + return; + } #endif -#if defined(GRAN) -StgBlockingQueueElement * -unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) -{ - StgTSO *tso; - PEs node_loc, tso_loc; - - node_loc = where_is(node); // should be lifted out of loop - tso = (StgTSO *)bqe; // wastes an assignment to get the type right - tso_loc = where_is((StgClosure *)tso); - if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local - /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */ - ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc); - CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime; - // insertThread(tso, node_loc); - new_event(tso_loc, tso_loc, CurrentTime[CurrentProc], - ResumeThread, - tso, node, (rtsSpark*)NULL); - tso->link = END_TSO_QUEUE; // overwrite link just to be sure - // len_local++; - // len++; - } else { // TSO is remote (actually should be FMBQ) - CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime + - RtsFlags.GranFlags.Costs.gunblocktime + - RtsFlags.GranFlags.Costs.latency; - new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc], - UnblockThread, - tso, node, (rtsSpark*)NULL); - tso->link = END_TSO_QUEUE; // overwrite link just to be sure - // len++; + switch (tso->why_blocked) + { + case BlockedOnMVar: + { + if (tso->_link == END_TSO_QUEUE) { + tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; + goto unblock; + } else { + return; + } } - /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */ - IF_GRAN_DEBUG(bq, - debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,", - (node_loc==tso_loc ? "Local" : "Global"), - tso->id, tso, CurrentProc, tso->block_info.closure, tso->link)); - tso->block_info.closure = NULL; - debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", - tso->id, tso)); -} -#elif defined(PARALLEL_HASKELL) -StgBlockingQueueElement * -unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) -{ - StgBlockingQueueElement *next; - - switch (get_itbl(bqe)->type) { - case TSO: - ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked); - /* if it's a TSO just push it onto the run_queue */ - next = bqe->link; - ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? - APPEND_TO_RUN_QUEUE((StgTSO *)bqe); - threadRunnable(); - unblockCount(bqe, node); - /* reset blocking status after dumping event */ - ((StgTSO *)bqe)->why_blocked = NotBlocked; - break; - - case BLOCKED_FETCH: - /* if it's a BLOCKED_FETCH put it on the PendingFetches list */ - next = bqe->link; - bqe->link = (StgBlockingQueueElement *)PendingFetches; - PendingFetches = (StgBlockedFetch *)bqe; - break; - -# if defined(DEBUG) - /* can ignore this case in a non-debugging setup; - see comments on RBHSave closures above */ - case CONSTR: - /* check that the closure is an RBHSave closure */ - ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info || - get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info || - get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info); - break; + + case BlockedOnMsgThrowTo: + { + const StgInfoTable *i; + + i = lockClosure(tso->block_info.closure); + unlockClosure(tso->block_info.closure, i); + if (i != &stg_MSG_NULL_info) { + debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)", + (lnat)tso->id, tso->block_info.throwto->header.info); + return; + } + + // remove the block frame from the stack + ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info); + tso->sp += 3; + goto unblock; + } + + case BlockedOnBlackHole: + case BlockedOnSTM: + case ThreadMigrating: + goto unblock; default: - barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n", - get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), - (StgClosure *)bqe); -# endif + // otherwise, do nothing + return; } - IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe))); - return next; + +unblock: + // just run the thread now, if the BH is not really available, + // we'll block again. + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); } -#endif -StgTSO * -unblockOne (Capability *cap, StgTSO *tso) +/* ---------------------------------------------------------------------------- + migrateThread + ------------------------------------------------------------------------- */ + +void +migrateThread (Capability *from, StgTSO *tso, Capability *to) { - return unblockOne_(cap,tso,rtsTrue); // allow migration + traceEventMigrateThread (from, tso, to->no); + // ThreadMigrating tells the target cap that it needs to be added to + // the run queue when it receives the MSG_TRY_WAKEUP. + tso->why_blocked = ThreadMigrating; + tso->cap = to; + tryWakeupThread(from, tso); } -StgTSO * -unblockOne_ (Capability *cap, StgTSO *tso, - rtsBool allow_migrate USED_IF_THREADS) -{ - StgTSO *next; +/* ---------------------------------------------------------------------------- + awakenBlockedQueue - // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); - ASSERT(tso->why_blocked != NotBlocked); + wakes up all the threads on the specified queue. + ------------------------------------------------------------------------- */ - tso->why_blocked = NotBlocked; - next = tso->_link; - tso->_link = END_TSO_QUEUE; +void +wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq) +{ + MessageBlackHole *msg; + const StgInfoTable *i; + + ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info || + bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info ); + + for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; + msg = msg->link) { + i = msg->header.info; + if (i != &stg_IND_info) { + ASSERT(i == &stg_MSG_BLACKHOLE_info); + tryWakeupThread(cap,msg->tso); + } + } -#if defined(THREADED_RTS) - if (tso->cap == cap || (!tsoLocked(tso) && - allow_migrate && - RtsFlags.ParFlags.wakeupMigrate)) { - // We are waking up this thread on the current Capability, which - // might involve migrating it from the Capability it was last on. - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; - } - tso->cap = cap; - appendToRunQueue(cap,tso); - // we're holding a newly woken thread, make sure we context switch - // quickly so we can migrate it if necessary. - context_switch = 1; - } else { - // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability_lock(tso->cap, tso); - } -#else - appendToRunQueue(cap,tso); - context_switch = 1; + // overwrite the BQ with an indirection so it will be + // collected at the next GC. +#if defined(DEBUG) && !defined(THREADED_RTS) + // XXX FILL_SLOP, but not if THREADED_RTS because in that case + // another thread might be looking at this BLOCKING_QUEUE and + // checking the owner field at the same time. + bq->bh = 0; bq->queue = 0; bq->owner = 0; #endif + OVERWRITE_INFO(bq, &stg_IND_info); +} - debugTrace(DEBUG_sched, - "waking up thread %ld on cap %d", - (long)tso->id, tso->cap->no); +// If we update a closure that we know we BLACKHOLE'd, and the closure +// no longer points to the current TSO as its owner, then there may be +// an orphaned BLOCKING_QUEUE closure with blocked threads attached to +// it. We therefore traverse the BLOCKING_QUEUEs attached to the +// current TSO to see if any can now be woken up. +void +checkBlockingQueues (Capability *cap, StgTSO *tso) +{ + StgBlockingQueue *bq, *next; + StgClosure *p; - return next; + debugTraceCap(DEBUG_sched, cap, + "collision occurred; checking blocking queues for thread %ld", + (lnat)tso->id); + + for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) { + next = bq->link; + + if (bq->header.info == &stg_IND_info) { + // ToDo: could short it out right here, to avoid + // traversing this IND multiple times. + continue; + } + + p = bq->bh; + + if (p->header.info != &stg_BLACKHOLE_info || + ((StgInd *)p)->indirectee != (StgClosure*)bq) + { + wakeBlockingQueue(cap,bq); + } + } } /* ---------------------------------------------------------------------------- - awakenBlockedQueue + updateThunk - wakes up all the threads on the specified queue. + Update a thunk with a value. In order to do this, we need to know + which TSO owns (or is evaluating) the thunk, in case we need to + awaken any threads that are blocked on it. ------------------------------------------------------------------------- */ -#if defined(GRAN) void -awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) +updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) { - StgBlockingQueueElement *bqe; - PEs node_loc; - nat len = 0; - - IF_GRAN_DEBUG(bq, - debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \ - node, CurrentProc, CurrentTime[CurrentProc], - CurrentTSO->id, CurrentTSO)); - - node_loc = where_is(node); - - ASSERT(q == END_BQ_QUEUE || - get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave - get_itbl(q)->type == CONSTR); // closure (type constructor) - ASSERT(is_unique(node)); - - /* FAKE FETCH: magically copy the node to the tso's proc; - no Fetch necessary because in reality the node should not have been - moved to the other PE in the first place - */ - if (CurrentProc!=node_loc) { - IF_GRAN_DEBUG(bq, - debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n", - node, node_loc, CurrentProc, CurrentTSO->id, - // CurrentTSO, where_is(CurrentTSO), - node->header.gran.procs)); - node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc); - IF_GRAN_DEBUG(bq, - debugBelch("## new bitmask of node %p is %#x\n", - node, node->header.gran.procs)); - if (RtsFlags.GranFlags.GranSimStats.Global) { - globalGranStats.tot_fake_fetches++; + StgClosure *v; + StgTSO *owner; + const StgInfoTable *i; + + i = thunk->header.info; + if (i != &stg_BLACKHOLE_info && + i != &stg_CAF_BLACKHOLE_info && + i != &__stg_EAGER_BLACKHOLE_info && + i != &stg_WHITEHOLE_info) { + updateWithIndirection(cap, thunk, val); + return; } - } + + v = ((StgInd*)thunk)->indirectee; - bqe = q; - // ToDo: check: ASSERT(CurrentProc==node_loc); - while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) { - //next = bqe->link; - /* - bqe points to the current element in the queue - next points to the next element in the queue - */ - //tso = (StgTSO *)bqe; // wastes an assignment to get the type right - //tso_loc = where_is(tso); - len++; - bqe = unblockOne(bqe, node); - } + updateWithIndirection(cap, thunk, val); - /* if this is the BQ of an RBH, we have to put back the info ripped out of - the closure to make room for the anchor of the BQ */ - if (bqe!=END_BQ_QUEUE) { - ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR); - /* - ASSERT((info_ptr==&RBH_Save_0_info) || - (info_ptr==&RBH_Save_1_info) || - (info_ptr==&RBH_Save_2_info)); - */ - /* cf. convertToRBH in RBH.c for writing the RBHSave closure */ - ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0]; - ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1]; - - IF_GRAN_DEBUG(bq, - debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n", - node, info_type(node))); - } + i = v->header.info; + if (i == &stg_TSO_info) { + owner = deRefTSO((StgTSO*)v); + if (owner != tso) { + checkBlockingQueues(cap, tso); + } + return; + } - /* statistics gathering */ - if (RtsFlags.GranFlags.GranSimStats.Global) { - // globalGranStats.tot_bq_processing_time += bq_processing_time; - globalGranStats.tot_bq_len += len; // total length of all bqs awakened - // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only - globalGranStats.tot_awbq++; // total no. of bqs awakened - } - IF_GRAN_DEBUG(bq, - debugBelch("## BQ Stats of %p: [%d entries] %s\n", - node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : "")); -} -#elif defined(PARALLEL_HASKELL) -void -awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) -{ - StgBlockingQueueElement *bqe; - - IF_PAR_DEBUG(verbose, - debugBelch("##-_ AwBQ for node %p on [%x]: \n", - node, mytid)); -#ifdef DIST - //RFP - if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) { - IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n")); - return; - } -#endif - - ASSERT(q == END_BQ_QUEUE || - get_itbl(q)->type == TSO || - get_itbl(q)->type == BLOCKED_FETCH || - get_itbl(q)->type == CONSTR); - - bqe = q; - while (get_itbl(bqe)->type==TSO || - get_itbl(bqe)->type==BLOCKED_FETCH) { - bqe = unblockOne(bqe, node); - } -} + if (i != &stg_BLOCKING_QUEUE_CLEAN_info && + i != &stg_BLOCKING_QUEUE_DIRTY_info) { + checkBlockingQueues(cap, tso); + return; + } -#else /* !GRAN && !PARALLEL_HASKELL */ + owner = deRefTSO(((StgBlockingQueue*)v)->owner); -void -awakenBlockedQueue(Capability *cap, StgTSO *tso) -{ - while (tso != END_TSO_QUEUE) { - tso = unblockOne(cap,tso); + if (owner != tso) { + checkBlockingQueues(cap, tso); + } else { + wakeBlockingQueue(cap, (StgBlockingQueue*)v); } } -#endif - /* --------------------------------------------------------------------------- * rtsSupportsBoundThreads(): is the RTS built to support bound threads? @@ -698,7 +467,7 @@ printThreadBlockage(StgTSO *tso) break; #if defined(mingw32_HOST_OS) case BlockedOnDoProc: - debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID); + debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID); break; #endif case BlockedOnDelay: @@ -707,31 +476,24 @@ printThreadBlockage(StgTSO *tso) case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; - case BlockedOnException: - debugBelch("is blocked on delivering an exception to thread %lu", - (unsigned long)tso->block_info.tso->id); - break; case BlockedOnBlackHole: - debugBelch("is blocked on a black hole"); + debugBelch("is blocked on a black hole %p", + ((StgBlockingQueue*)tso->block_info.bh->bh)); + break; + case BlockedOnMsgThrowTo: + debugBelch("is blocked on a throwto message"); break; case NotBlocked: debugBelch("is not blocked"); break; -#if defined(PARALLEL_HASKELL) - case BlockedOnGA: - debugBelch("is blocked on global address; local FM_BQ is %p (%s)", - tso->block_info.closure, info_type(tso->block_info.closure)); + case ThreadMigrating: + debugBelch("is runnable, but not on the run queue"); break; - case BlockedOnGA_NoSend: - debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)", - tso->block_info.closure, info_type(tso->block_info.closure)); - break; -#endif case BlockedOnCCall: debugBelch("is blocked on an external call"); break; - case BlockedOnCCall_NoUnblockExc: - debugBelch("is blocked on an external call (exceptions were already blocked)"); + case BlockedOnCCall_Interruptible: + debugBelch("is blocked on an external call (but may be interrupted)"); break; case BlockedOnSTM: debugBelch("is blocked on an STM operation"); @@ -742,6 +504,7 @@ printThreadBlockage(StgTSO *tso) } } + void printThreadStatus(StgTSO *t) { @@ -763,6 +526,11 @@ printThreadStatus(StgTSO *t) default: printThreadBlockage(t); } + if (t->dirty) { + debugBelch(" (TSO_DIRTY)"); + } else if (t->flags & TSO_LINK_DIRTY) { + debugBelch(" (TSO_LINK_DIRTY)"); + } debugBelch("\n"); } } @@ -771,24 +539,10 @@ void printAllThreads(void) { StgTSO *t, *next; - nat i, s; + nat i, g; Capability *cap; -# if defined(GRAN) - char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; - ullong_format_string(TIME_ON_PROC(CurrentProc), - time_string, rtsFalse/*no commas!*/); - - debugBelch("all threads at [%s]:\n", time_string); -# elif defined(PARALLEL_HASKELL) - char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; - ullong_format_string(CURRENT_TIME, - time_string, rtsFalse/*no commas!*/); - - debugBelch("all threads at [%s]:\n", time_string); -# else debugBelch("all threads:\n"); -# endif for (i = 0; i < n_capabilities; i++) { cap = &capabilities[i]; @@ -799,8 +553,8 @@ printAllThreads(void) } debugBelch("other threads:\n"); - for (s = 0; s < total_steps; s++) { - for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { if (t->why_blocked != NotBlocked) { printThreadStatus(t); } @@ -825,153 +579,4 @@ printThreadQueue(StgTSO *t) debugBelch("%d threads on queue\n", i); } -/* - Print a whole blocking queue attached to node (debugging only). -*/ -# if defined(PARALLEL_HASKELL) -void -print_bq (StgClosure *node) -{ - StgBlockingQueueElement *bqe; - StgTSO *tso; - rtsBool end; - - debugBelch("## BQ of closure %p (%s): ", - node, info_type(node)); - - /* should cover all closures that may have a blocking queue */ - ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || - get_itbl(node)->type == FETCH_ME_BQ || - get_itbl(node)->type == RBH || - get_itbl(node)->type == MVAR); - - ASSERT(node!=(StgClosure*)NULL); // sanity check - - print_bqe(((StgBlockingQueue*)node)->blocking_queue); -} - -/* - Print a whole blocking queue starting with the element bqe. -*/ -void -print_bqe (StgBlockingQueueElement *bqe) -{ - rtsBool end; - - /* - NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; - */ - for (end = (bqe==END_BQ_QUEUE); - !end; // iterate until bqe points to a CONSTR - end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), - bqe = end ? END_BQ_QUEUE : bqe->link) { - ASSERT(bqe != END_BQ_QUEUE); // sanity check - ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check - /* types of closures that may appear in a blocking queue */ - ASSERT(get_itbl(bqe)->type == TSO || - get_itbl(bqe)->type == BLOCKED_FETCH || - get_itbl(bqe)->type == CONSTR); - /* only BQs of an RBH end with an RBH_Save closure */ - //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); - - switch (get_itbl(bqe)->type) { - case TSO: - debugBelch(" TSO %u (%x),", - ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); - break; - case BLOCKED_FETCH: - debugBelch(" BF (node=%p, ga=((%x, %d, %x)),", - ((StgBlockedFetch *)bqe)->node, - ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid, - ((StgBlockedFetch *)bqe)->ga.payload.gc.slot, - ((StgBlockedFetch *)bqe)->ga.weight); - break; - case CONSTR: - debugBelch(" %s (IP %p),", - (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : - get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : - get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : - "RBH_Save_?"), get_itbl(bqe)); - break; - default: - barf("Unexpected closure type %s in blocking queue", // of %p (%s)", - info_type((StgClosure *)bqe)); // , node, info_type(node)); - break; - } - } /* for */ - debugBelch("\n"); -} -# elif defined(GRAN) -void -print_bq (StgClosure *node) -{ - StgBlockingQueueElement *bqe; - PEs node_loc, tso_loc; - rtsBool end; - - /* should cover all closures that may have a blocking queue */ - ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || - get_itbl(node)->type == FETCH_ME_BQ || - get_itbl(node)->type == RBH); - - ASSERT(node!=(StgClosure*)NULL); // sanity check - node_loc = where_is(node); - - debugBelch("## BQ of closure %p (%s) on [PE %d]: ", - node, info_type(node), node_loc); - - /* - NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; - */ - for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE); - !end; // iterate until bqe points to a CONSTR - end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) { - ASSERT(bqe != END_BQ_QUEUE); // sanity check - ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check - /* types of closures that may appear in a blocking queue */ - ASSERT(get_itbl(bqe)->type == TSO || - get_itbl(bqe)->type == CONSTR); - /* only BQs of an RBH end with an RBH_Save closure */ - ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); - - tso_loc = where_is((StgClosure *)bqe); - switch (get_itbl(bqe)->type) { - case TSO: - debugBelch(" TSO %d (%p) on [PE %d],", - ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc); - break; - case CONSTR: - debugBelch(" %s (IP %p),", - (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : - get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : - get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : - "RBH_Save_?"), get_itbl(bqe)); - break; - default: - barf("Unexpected closure type %s in blocking queue of %p (%s)", - info_type((StgClosure *)bqe), node, info_type(node)); - break; - } - } /* for */ - debugBelch("\n"); -} -# endif - -#if defined(PARALLEL_HASKELL) -nat -run_queue_len(void) -{ - nat i; - StgTSO *tso; - - for (i=0, tso=run_queue_hd; - tso != END_TSO_QUEUE; - i++, tso=tso->link) { - /* nothing */ - } - - return i; -} -#endif - #endif /* DEBUG */