X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FThreads.c;h=e86630e77eb49f4cd8ea369c14fef1d01739cb83;hp=0c3e5916659468ba64424e080248e7bb348b49f0;hb=4f37664780b85725ba3552b7de11c0e5e79d3fee;hpb=5d52d9b64c21dcf77849866584744722f8121389 diff --git a/rts/Threads.c b/rts/Threads.c index 0c3e591..e86630e 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -18,8 +18,14 @@ #include "ThreadLabels.h" #include "Updates.h" #include "Messages.h" +#include "RaiseAsync.h" +#include "Prelude.h" +#include "Printer.h" +#include "sm/Sanity.h" #include "sm/Storage.h" +#include + /* Next thread ID to allocate. * LOCK: sched_mutex */ @@ -54,57 +60,67 @@ StgTSO * createThread(Capability *cap, nat size) { StgTSO *tso; + StgStack *stack; nat stack_size; /* sched_mutex is *not* required */ - /* First check whether we should create a thread at all */ - - // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW - /* catch ridiculously small stack sizes */ - if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { - size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; + if (size < MIN_STACK_WORDS + sizeofW(StgStack)) { + size = MIN_STACK_WORDS + sizeofW(StgStack); } - size = round_to_mblocks(size); - tso = (StgTSO *)allocate(cap, size); - - stack_size = size - TSO_STRUCT_SIZEW; - TICK_ALLOC_TSO(stack_size, 0); - + /* The size argument we are given includes all the per-thread + * overheads: + * + * - The TSO structure + * - The STACK header + * + * This is so that we can use a nice round power of 2 for the + * default stack size (e.g. 1k), and if we're allocating lots of + * threads back-to-back they'll fit nicely in a block. It's a bit + * of a benchmark hack, but it doesn't do any harm. + */ + stack_size = round_to_mblocks(size - sizeofW(StgTSO)); + stack = (StgStack *)allocate(cap, stack_size); + TICK_ALLOC_STACK(stack_size); + SET_HDR(stack, &stg_STACK_info, CCS_SYSTEM); + stack->stack_size = stack_size - sizeofW(StgStack); + stack->sp = stack->stack + stack->stack_size; + stack->dirty = 1; + + tso = (StgTSO *)allocate(cap, sizeofW(StgTSO)); + TICK_ALLOC_TSO(); SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); // Always start with the compiled code evaluator tso->what_next = ThreadRunGHC; - tso->why_blocked = NotBlocked; tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; tso->bq = (StgBlockingQueue *)END_TSO_QUEUE; tso->flags = 0; tso->dirty = 1; - + tso->_link = END_TSO_QUEUE; + tso->saved_errno = 0; tso->bound = NULL; tso->cap = cap; - tso->stack_size = stack_size; - tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - - TSO_STRUCT_SIZEW; - tso->sp = (P_)&(tso->stack) + stack_size; + tso->stackobj = stack; + tso->tot_stack_size = stack->stack_size; tso->trec = NO_TREC; - + #ifdef PROFILING tso->prof.CCCS = CCS_MAIN; #endif - /* put a stop frame on the stack */ - tso->sp -= sizeofW(StgStopFrame); - SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); - tso->_link = END_TSO_QUEUE; - + // put a stop frame on the stack + stack->sp -= sizeofW(StgStopFrame); + SET_HDR((StgClosure*)stack->sp, + (StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); + /* Link the new thread on the global thread list. */ ACQUIRE_LOCK(&sched_mutex); @@ -163,9 +179,11 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso) if (t == tso) { if (prev) { setTSOLink(cap,prev,t->_link); + t->_link = END_TSO_QUEUE; return rtsFalse; } else { *queue = t->_link; + t->_link = END_TSO_QUEUE; return rtsTrue; } } @@ -190,7 +208,8 @@ removeThreadFromDeQueue (Capability *cap, *head = t->_link; flag = rtsTrue; } - if (*tail == tso) { + t->_link = END_TSO_QUEUE; + if (*tail == tso) { if (prev) { *tail = prev; } else { @@ -205,83 +224,20 @@ removeThreadFromDeQueue (Capability *cap, barf("removeThreadFromMVarQueue: not found"); } -void -removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso) -{ - // caller must do the write barrier, because replacing the info - // pointer will unlock the MVar. - removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso); - tso->_link = END_TSO_QUEUE; -} - /* ---------------------------------------------------------------------------- - unblockOne() - - unblock a single thread. - ------------------------------------------------------------------------- */ - -StgTSO * -unblockOne (Capability *cap, StgTSO *tso) -{ - return unblockOne_(cap,tso,rtsTrue); // allow migration -} - -StgTSO * -unblockOne_ (Capability *cap, StgTSO *tso, - rtsBool allow_migrate USED_IF_THREADS) -{ - StgTSO *next; - - // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); - ASSERT(tso->why_blocked != NotBlocked); - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - next = tso->_link; - tso->_link = END_TSO_QUEUE; - -#if defined(THREADED_RTS) - if (tso->cap == cap || (!tsoLocked(tso) && - allow_migrate && - RtsFlags.ParFlags.wakeupMigrate)) { - // We are waking up this thread on the current Capability, which - // might involve migrating it from the Capability it was last on. - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = cap; - } - - tso->cap = cap; - write_barrier(); - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; - } else { - // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability(cap, tso->cap, tso); - } -#else - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); + tryWakeupThread() - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; -#endif - - traceEventThreadWakeup (cap, tso, tso->cap->no); + Attempt to wake up a thread. tryWakeupThread is idempotent: it is + always safe to call it too many times, but it is not safe in + general to omit a call. - return next; -} + ------------------------------------------------------------------------- */ void tryWakeupThread (Capability *cap, StgTSO *tso) { + traceEventThreadWakeup (cap, tso, tso->cap->no); + #ifdef THREADED_RTS if (tso->cap != cap) { @@ -290,25 +246,85 @@ tryWakeupThread (Capability *cap, StgTSO *tso) SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM); msg->tso = tso; sendMessage(cap, tso->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d", + (lnat)tso->id, tso->cap->no); return; } #endif switch (tso->why_blocked) { - case BlockedOnBlackHole: - case BlockedOnSTM: + case BlockedOnMVar: { - // just run the thread now, if the BH is not really available, - // we'll block again. - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - break; + if (tso->_link == END_TSO_QUEUE) { + tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; + goto unblock; + } else { + return; + } } + + case BlockedOnMsgThrowTo: + { + const StgInfoTable *i; + + i = lockClosure(tso->block_info.closure); + unlockClosure(tso->block_info.closure, i); + if (i != &stg_MSG_NULL_info) { + debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)", + (lnat)tso->id, tso->block_info.throwto->header.info); + return; + } + + // remove the block frame from the stack + ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info); + tso->stackobj->sp += 3; + goto unblock; + } + + case BlockedOnBlackHole: + case BlockedOnSTM: + case ThreadMigrating: + goto unblock; + default: // otherwise, do nothing - break; + return; } + +unblock: + // just run the thread now, if the BH is not really available, + // we'll block again. + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + + // We used to set the context switch flag here, which would + // trigger a context switch a short time in the future (at the end + // of the current nursery block). The idea is that we have just + // woken up a thread, so we may need to load-balance and migrate + // threads to other CPUs. On the other hand, setting the context + // switch flag here unfairly penalises the current thread by + // yielding its time slice too early. + // + // The synthetic benchmark nofib/smp/chan can be used to show the + // difference quite clearly. + + // cap->context_switch = 1; +} + +/* ---------------------------------------------------------------------------- + migrateThread + ------------------------------------------------------------------------- */ + +void +migrateThread (Capability *from, StgTSO *tso, Capability *to) +{ + traceEventMigrateThread (from, tso, to->no); + // ThreadMigrating tells the target cap that it needs to be added to + // the run queue when it receives the MSG_TRY_WAKEUP. + tso->why_blocked = ThreadMigrating; + tso->cap = to; + tryWakeupThread(from, tso); } /* ---------------------------------------------------------------------------- @@ -398,6 +414,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) i = thunk->header.info; if (i != &stg_BLACKHOLE_info && i != &stg_CAF_BLACKHOLE_info && + i != &__stg_EAGER_BLACKHOLE_info && i != &stg_WHITEHOLE_info) { updateWithIndirection(cap, thunk, val); return; @@ -409,7 +426,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) i = v->header.info; if (i == &stg_TSO_info) { - owner = deRefTSO((StgTSO*)v); + owner = (StgTSO*)v; if (owner != tso) { checkBlockingQueues(cap, tso); } @@ -422,7 +439,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) return; } - owner = deRefTSO(((StgBlockingQueue*)v)->owner); + owner = ((StgBlockingQueue*)v)->owner; if (owner != tso) { checkBlockingQueues(cap, tso); @@ -431,47 +448,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) } } -/* ---------------------------------------------------------------------------- - * Wake up a thread on a Capability. - * - * This is used when the current Task is running on a Capability and - * wishes to wake up a thread on a different Capability. - * ------------------------------------------------------------------------- */ - -#ifdef THREADED_RTS - -void -wakeupThreadOnCapability (Capability *cap, - Capability *other_cap, - StgTSO *tso) -{ - MessageWakeup *msg; - - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = other_cap; - } - tso->cap = other_cap; - - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - - msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); - SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM); - msg->tso = tso; - tso->block_info.closure = (StgClosure *)msg; - dirty_TSO(cap, tso); - write_barrier(); - tso->why_blocked = BlockedOnMsgWakeup; - - sendMessage(cap, other_cap, (Message*)msg); -} - -#endif /* THREADED_RTS */ - /* --------------------------------------------------------------------------- * rtsSupportsBoundThreads(): is the RTS built to support bound threads? * used by Control.Concurrent for error checking. @@ -500,6 +476,222 @@ isThreadBound(StgTSO* tso USED_IF_THREADS) return rtsFalse; } +/* ----------------------------------------------------------------------------- + Stack overflow + + If the thread has reached its maximum stack size, then raise the + StackOverflow exception in the offending thread. Otherwise + relocate the TSO into a larger chunk of memory and adjust its stack + size appropriately. + -------------------------------------------------------------------------- */ + +void +threadStackOverflow (Capability *cap, StgTSO *tso) +{ + StgStack *new_stack, *old_stack; + StgUnderflowFrame *frame; + lnat chunk_size; + + IF_DEBUG(sanity,checkTSO(tso)); + + if (tso->tot_stack_size >= RtsFlags.GcFlags.maxStkSize + && !(tso->flags & TSO_BLOCKEX)) { + // NB. never raise a StackOverflow exception if the thread is + // inside Control.Exceptino.block. It is impractical to protect + // against stack overflow exceptions, since virtually anything + // can raise one (even 'catch'), so this is the only sensible + // thing to do here. See bug #767. + // + + if (tso->flags & TSO_SQUEEZED) { + return; + } + // #3677: In a stack overflow situation, stack squeezing may + // reduce the stack size, but we don't know whether it has been + // reduced enough for the stack check to succeed if we try + // again. Fortunately stack squeezing is idempotent, so all we + // need to do is record whether *any* squeezing happened. If we + // are at the stack's absolute -K limit, and stack squeezing + // happened, then we try running the thread again. The + // TSO_SQUEEZED flag is set by threadPaused() to tell us whether + // squeezing happened or not. + + debugTrace(DEBUG_gc, + "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", + (long)tso->id, tso, (long)tso->stackobj->stack_size, + RtsFlags.GcFlags.maxStkSize); + IF_DEBUG(gc, + /* If we're debugging, just print out the top of the stack */ + printStackChunk(tso->stackobj->sp, + stg_min(tso->stackobj->stack + tso->stackobj->stack_size, + tso->stackobj->sp+64))); + + // Send this thread the StackOverflow exception + throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); + } + + + // We also want to avoid enlarging the stack if squeezing has + // already released some of it. However, we don't want to get into + // a pathalogical situation where a thread has a nearly full stack + // (near its current limit, but not near the absolute -K limit), + // keeps allocating a little bit, squeezing removes a little bit, + // and then it runs again. So to avoid this, if we squeezed *and* + // there is still less than BLOCK_SIZE_W words free, then we enlarge + // the stack anyway. + if ((tso->flags & TSO_SQUEEZED) && + ((W_)(tso->stackobj->sp - tso->stackobj->stack) >= BLOCK_SIZE_W)) { + return; + } + + old_stack = tso->stackobj; + + // If we used less than half of the previous stack chunk, then we + // must have failed a stack check for a large amount of stack. In + // this case we allocate a double-sized chunk to try to + // accommodate the large stack request. If that also fails, the + // next chunk will be 4x normal size, and so on. + // + // It would be better to have the mutator tell us how much stack + // was needed, as we do with heap allocations, but this works for + // now. + // + if (old_stack->sp > old_stack->stack + old_stack->stack_size / 2) + { + chunk_size = 2 * (old_stack->stack_size + sizeofW(StgStack)); + } + else + { + chunk_size = RtsFlags.GcFlags.stkChunkSize; + } + + debugTraceCap(DEBUG_sched, cap, + "allocating new stack chunk of size %d bytes", + chunk_size * sizeof(W_)); + + new_stack = (StgStack*) allocate(cap, chunk_size); + SET_HDR(new_stack, &stg_STACK_info, CCS_SYSTEM); + TICK_ALLOC_STACK(chunk_size); + + new_stack->dirty = 0; // begin clean, we'll mark it dirty below + new_stack->stack_size = chunk_size - sizeofW(StgStack); + new_stack->sp = new_stack->stack + new_stack->stack_size; + + tso->tot_stack_size += new_stack->stack_size; + + new_stack->sp -= sizeofW(StgUnderflowFrame); + frame = (StgUnderflowFrame*)new_stack->sp; + frame->info = &stg_stack_underflow_frame_info; + frame->next_chunk = old_stack; + + { + StgWord *sp; + nat chunk_words, size; + + // find the boundary of the chunk of old stack we're going to + // copy to the new stack. We skip over stack frames until we + // reach the smaller of + // + // * the chunk buffer size (+RTS -kb) + // * the end of the old stack + // + for (sp = old_stack->sp; + sp < stg_min(old_stack->sp + RtsFlags.GcFlags.stkChunkBufferSize, + old_stack->stack + old_stack->stack_size); ) + { + size = stack_frame_sizeW((StgClosure*)sp); + + // if including this frame would exceed the size of the + // new stack (taking into account the underflow frame), + // then stop at the previous frame. + if (sp + size > old_stack->stack + (new_stack->stack_size - + sizeofW(StgUnderflowFrame))) { + break; + } + sp += size; + } + + // copy the stack chunk between tso->sp and sp to + // new_tso->sp + (tso->sp - sp) + chunk_words = sp - old_stack->sp; + + memcpy(/* dest */ new_stack->sp - chunk_words, + /* source */ old_stack->sp, + /* size */ chunk_words * sizeof(W_)); + + old_stack->sp += chunk_words; + new_stack->sp -= chunk_words; + } + + // if the old stack chunk is now empty, discard it. With the + // default settings, -ki1k -kb1k, this means the first stack chunk + // will be discarded after the first overflow, being replaced by a + // non-moving 32k chunk. + if (old_stack->sp == old_stack->stack + old_stack->stack_size) { + frame->next_chunk = new_stack; + } + + tso->stackobj = new_stack; + + // we're about to run it, better mark it dirty + dirty_STACK(cap, new_stack); + + IF_DEBUG(sanity,checkTSO(tso)); + // IF_DEBUG(scheduler,printTSO(new_tso)); +} + + +/* --------------------------------------------------------------------------- + Stack underflow - called from the stg_stack_underflow_info frame + ------------------------------------------------------------------------ */ + +nat // returns offset to the return address +threadStackUnderflow (Capability *cap, StgTSO *tso) +{ + StgStack *new_stack, *old_stack; + StgUnderflowFrame *frame; + nat retvals; + + debugTraceCap(DEBUG_sched, cap, "stack underflow"); + + old_stack = tso->stackobj; + + frame = (StgUnderflowFrame*)(old_stack->stack + old_stack->stack_size + - sizeofW(StgUnderflowFrame)); + ASSERT(frame->info == &stg_stack_underflow_frame_info); + + new_stack = (StgStack*)frame->next_chunk; + tso->stackobj = new_stack; + + retvals = (P_)frame - old_stack->sp; + if (retvals != 0) + { + // we have some return values to copy to the old stack + if ((new_stack->sp - new_stack->stack) < retvals) + { + barf("threadStackUnderflow: not enough space for return values"); + } + + new_stack->sp -= retvals; + + memcpy(/* dest */ new_stack->sp, + /* src */ old_stack->sp, + /* size */ retvals * sizeof(W_)); + } + + // empty the old stack. The GC may still visit this object + // because it is on the mutable list. + old_stack->sp = old_stack->stack + old_stack->stack_size; + + // restore the stack parameters, and update tot_stack_size + tso->tot_stack_size -= old_stack->stack_size; + + // we're about to run it, better mark it dirty + dirty_STACK(cap, new_stack); + + return retvals; +} + /* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * ------------------------------------------------------------------------- */ @@ -530,20 +722,20 @@ printThreadBlockage(StgTSO *tso) debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); break; - case BlockedOnMsgWakeup: - debugBelch("is blocked on a wakeup message"); - break; case BlockedOnMsgThrowTo: debugBelch("is blocked on a throwto message"); break; case NotBlocked: debugBelch("is not blocked"); break; + case ThreadMigrating: + debugBelch("is runnable, but not on the run queue"); + break; case BlockedOnCCall: debugBelch("is blocked on an external call"); break; - case BlockedOnCCall_NoUnblockExc: - debugBelch("is blocked on an external call (exceptions were already blocked)"); + case BlockedOnCCall_Interruptible: + debugBelch("is blocked on an external call (but may be interrupted)"); break; case BlockedOnSTM: debugBelch("is blocked on an STM operation"); @@ -563,10 +755,7 @@ printThreadStatus(StgTSO *t) void *label = lookupThreadLabel(t->id); if (label) debugBelch("[\"%s\"] ",(char *)label); } - if (t->what_next == ThreadRelocated) { - debugBelch("has been relocated...\n"); - } else { - switch (t->what_next) { + switch (t->what_next) { case ThreadKilled: debugBelch("has been killed"); break; @@ -578,11 +767,8 @@ printThreadStatus(StgTSO *t) } if (t->dirty) { debugBelch(" (TSO_DIRTY)"); - } else if (t->flags & TSO_LINK_DIRTY) { - debugBelch(" (TSO_LINK_DIRTY)"); } debugBelch("\n"); - } } void @@ -608,11 +794,7 @@ printAllThreads(void) if (t->why_blocked != NotBlocked) { printThreadStatus(t); } - if (t->what_next == ThreadRelocated) { - next = t->_link; - } else { - next = t->global_link; - } + next = t->global_link; } } }