X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FThreads.c;h=7344134a7d5a0d54e6a1d6bc60aa66c7c8277842;hp=3b209ea95b51ccc5b86c795be574f5b835153456;hb=83d563cb9ede0ba792836e529b1e2929db926355;hpb=a5288c551349a0adab0d931a429b10a096d9444d diff --git a/rts/Threads.c b/rts/Threads.c index 3b209ea..7344134 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -9,11 +9,16 @@ #include "PosixSource.h" #include "Rts.h" +#include "Capability.h" +#include "Updates.h" #include "Threads.h" #include "STM.h" #include "Schedule.h" #include "Trace.h" #include "ThreadLabels.h" +#include "Updates.h" +#include "Messages.h" +#include "sm/Storage.h" /* Next thread ID to allocate. * LOCK: sched_mutex @@ -63,7 +68,7 @@ createThread(Capability *cap, nat size) } size = round_to_mblocks(size); - tso = (StgTSO *)allocateLocal(cap, size); + tso = (StgTSO *)allocate(cap, size); stack_size = size - TSO_STRUCT_SIZEW; TICK_ALLOC_TSO(stack_size, 0); @@ -74,7 +79,9 @@ createThread(Capability *cap, nat size) tso->what_next = ThreadRunGHC; tso->why_blocked = NotBlocked; - tso->blocked_exceptions = END_TSO_QUEUE; + tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; + tso->bq = (StgBlockingQueue *)END_TSO_QUEUE; tso->flags = 0; tso->dirty = 1; @@ -102,12 +109,12 @@ createThread(Capability *cap, nat size) */ ACQUIRE_LOCK(&sched_mutex); tso->id = next_thread_id++; // while we have the mutex - tso->global_link = g0s0->threads; - g0s0->threads = tso; + tso->global_link = g0->threads; + g0->threads = tso; RELEASE_LOCK(&sched_mutex); // ToDo: report the stack size in the event? - traceSchedEvent (cap, EVENT_CREATE_THREAD, tso, tso->stack_size); + traceEventCreateThread(cap, tso); return tso; } @@ -146,7 +153,7 @@ rts_getThreadId(StgPtr tso) Fails fatally if the TSO is not on the queue. -------------------------------------------------------------------------- */ -void +rtsBool // returns True if we modified queue removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) { StgTSO *t, *prev; @@ -156,28 +163,32 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + return rtsFalse; } else { *queue = t->_link; + return rtsTrue; } - return; } } barf("removeThreadFromQueue: not found"); } -void +rtsBool // returns True if we modified head or tail removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso) { StgTSO *t, *prev; + rtsBool flag = rtsFalse; prev = NULL; for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) { if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + flag = rtsFalse; } else { *head = t->_link; + flag = rtsTrue; } if (*tail == tso) { if (prev) { @@ -185,78 +196,109 @@ removeThreadFromDeQueue (Capability *cap, } else { *tail = END_TSO_QUEUE; } - } - return; + return rtsTrue; + } else { + return flag; + } } } barf("removeThreadFromMVarQueue: not found"); } -void -removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso) -{ - removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso); -} - /* ---------------------------------------------------------------------------- - unblockOne() + tryWakeupThread() + + Attempt to wake up a thread. tryWakeupThread is idempotent: it is + always safe to call it too many times, but it is not safe in + general to omit a call. - unblock a single thread. ------------------------------------------------------------------------- */ -StgTSO * -unblockOne (Capability *cap, StgTSO *tso) +void +tryWakeupThread (Capability *cap, StgTSO *tso) { - return unblockOne_(cap,tso,rtsTrue); // allow migration + tryWakeupThread_(cap, deRefTSO(tso)); } -StgTSO * -unblockOne_ (Capability *cap, StgTSO *tso, - rtsBool allow_migrate USED_IF_THREADS) +void +tryWakeupThread_ (Capability *cap, StgTSO *tso) { - StgTSO *next; + traceEventThreadWakeup (cap, tso, tso->cap->no); - // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); - ASSERT(tso->why_blocked != NotBlocked); +#ifdef THREADED_RTS + if (tso->cap != cap) + { + MessageWakeup *msg; + msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup)); + SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM); + msg->tso = tso; + sendMessage(cap, tso->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d", + (lnat)tso->id, tso->cap->no); + return; + } +#endif + + switch (tso->why_blocked) + { + case BlockedOnMVar: + { + if (tso->_link == END_TSO_QUEUE) { + tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; + goto unblock; + } else { + return; + } + } - tso->why_blocked = NotBlocked; - next = tso->_link; - tso->_link = END_TSO_QUEUE; + case BlockedOnMsgThrowTo: + { + const StgInfoTable *i; + + i = lockClosure(tso->block_info.closure); + unlockClosure(tso->block_info.closure, i); + if (i != &stg_MSG_NULL_info) { + debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)", + (lnat)tso->id, tso->block_info.throwto->header.info); + return; + } -#if defined(THREADED_RTS) - if (tso->cap == cap || (!tsoLocked(tso) && - allow_migrate && - RtsFlags.ParFlags.wakeupMigrate)) { - // We are waking up this thread on the current Capability, which - // might involve migrating it from the Capability it was last on. - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; - } + // remove the block frame from the stack + ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info); + tso->sp += 3; + goto unblock; + } - tso->cap = cap; - appendToRunQueue(cap,tso); + case BlockedOnBlackHole: + case BlockedOnSTM: + case ThreadMigrating: + goto unblock; - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; - } else { - // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability(cap, tso->cap, tso); - } -#else - appendToRunQueue(cap,tso); + default: + // otherwise, do nothing + return; + } - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; -#endif +unblock: + // just run the thread now, if the BH is not really available, + // we'll block again. + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); +} - traceSchedEvent (cap, EVENT_THREAD_WAKEUP, tso, tso->cap->no); +/* ---------------------------------------------------------------------------- + migrateThread + ------------------------------------------------------------------------- */ - return next; +void +migrateThread (Capability *from, StgTSO *tso, Capability *to) +{ + traceEventMigrateThread (from, tso, to->no); + // ThreadMigrating tells the target cap that it needs to be added to + // the run queue when it receives the MSG_TRY_WAKEUP. + tso->why_blocked = ThreadMigrating; + tso->cap = to; + tryWakeupThread(from, tso); } /* ---------------------------------------------------------------------------- @@ -266,10 +308,117 @@ unblockOne_ (Capability *cap, StgTSO *tso, ------------------------------------------------------------------------- */ void -awakenBlockedQueue(Capability *cap, StgTSO *tso) +wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq) +{ + MessageBlackHole *msg; + const StgInfoTable *i; + + ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info || + bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info ); + + for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; + msg = msg->link) { + i = msg->header.info; + if (i != &stg_IND_info) { + ASSERT(i == &stg_MSG_BLACKHOLE_info); + tryWakeupThread(cap,msg->tso); + } + } + + // overwrite the BQ with an indirection so it will be + // collected at the next GC. +#if defined(DEBUG) && !defined(THREADED_RTS) + // XXX FILL_SLOP, but not if THREADED_RTS because in that case + // another thread might be looking at this BLOCKING_QUEUE and + // checking the owner field at the same time. + bq->bh = 0; bq->queue = 0; bq->owner = 0; +#endif + OVERWRITE_INFO(bq, &stg_IND_info); +} + +// If we update a closure that we know we BLACKHOLE'd, and the closure +// no longer points to the current TSO as its owner, then there may be +// an orphaned BLOCKING_QUEUE closure with blocked threads attached to +// it. We therefore traverse the BLOCKING_QUEUEs attached to the +// current TSO to see if any can now be woken up. +void +checkBlockingQueues (Capability *cap, StgTSO *tso) +{ + StgBlockingQueue *bq, *next; + StgClosure *p; + + debugTraceCap(DEBUG_sched, cap, + "collision occurred; checking blocking queues for thread %ld", + (lnat)tso->id); + + for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) { + next = bq->link; + + if (bq->header.info == &stg_IND_info) { + // ToDo: could short it out right here, to avoid + // traversing this IND multiple times. + continue; + } + + p = bq->bh; + + if (p->header.info != &stg_BLACKHOLE_info || + ((StgInd *)p)->indirectee != (StgClosure*)bq) + { + wakeBlockingQueue(cap,bq); + } + } +} + +/* ---------------------------------------------------------------------------- + updateThunk + + Update a thunk with a value. In order to do this, we need to know + which TSO owns (or is evaluating) the thunk, in case we need to + awaken any threads that are blocked on it. + ------------------------------------------------------------------------- */ + +void +updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) { - while (tso != END_TSO_QUEUE) { - tso = unblockOne(cap,tso); + StgClosure *v; + StgTSO *owner; + const StgInfoTable *i; + + i = thunk->header.info; + if (i != &stg_BLACKHOLE_info && + i != &stg_CAF_BLACKHOLE_info && + i != &__stg_EAGER_BLACKHOLE_info && + i != &stg_WHITEHOLE_info) { + updateWithIndirection(cap, thunk, val); + return; + } + + v = ((StgInd*)thunk)->indirectee; + + updateWithIndirection(cap, thunk, val); + + i = v->header.info; + if (i == &stg_TSO_info) { + owner = deRefTSO((StgTSO*)v); + if (owner != tso) { + checkBlockingQueues(cap, tso); + } + return; + } + + if (i != &stg_BLOCKING_QUEUE_CLEAN_info && + i != &stg_BLOCKING_QUEUE_DIRTY_info) { + checkBlockingQueues(cap, tso); + return; + } + + owner = deRefTSO(((StgBlockingQueue*)v)->owner); + + if (owner != tso) { + checkBlockingQueues(cap, tso); + } else { + wakeBlockingQueue(cap, (StgBlockingQueue*)v); } } @@ -327,21 +476,24 @@ printThreadBlockage(StgTSO *tso) case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; - case BlockedOnException: - debugBelch("is blocked on delivering an exception to thread %lu", - (unsigned long)tso->block_info.tso->id); - break; case BlockedOnBlackHole: - debugBelch("is blocked on a black hole"); + debugBelch("is blocked on a black hole %p", + ((StgBlockingQueue*)tso->block_info.bh->bh)); + break; + case BlockedOnMsgThrowTo: + debugBelch("is blocked on a throwto message"); break; case NotBlocked: debugBelch("is not blocked"); break; + case ThreadMigrating: + debugBelch("is runnable, but not on the run queue"); + break; case BlockedOnCCall: debugBelch("is blocked on an external call"); break; - case BlockedOnCCall_NoUnblockExc: - debugBelch("is blocked on an external call (exceptions were already blocked)"); + case BlockedOnCCall_Interruptible: + debugBelch("is blocked on an external call (but may be interrupted)"); break; case BlockedOnSTM: debugBelch("is blocked on an STM operation"); @@ -387,24 +539,10 @@ void printAllThreads(void) { StgTSO *t, *next; - nat i, s; + nat i, g; Capability *cap; -# if defined(GRAN) - char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; - ullong_format_string(TIME_ON_PROC(CurrentProc), - time_string, rtsFalse/*no commas!*/); - - debugBelch("all threads at [%s]:\n", time_string); -# elif defined(PARALLEL_HASKELL) - char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; - ullong_format_string(CURRENT_TIME, - time_string, rtsFalse/*no commas!*/); - - debugBelch("all threads at [%s]:\n", time_string); -# else debugBelch("all threads:\n"); -# endif for (i = 0; i < n_capabilities; i++) { cap = &capabilities[i]; @@ -415,8 +553,8 @@ printAllThreads(void) } debugBelch("other threads:\n"); - for (s = 0; s < total_steps; s++) { - for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { if (t->why_blocked != NotBlocked) { printThreadStatus(t); }