X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=4343a149cc87aaf5c3dd2656889e9e0b7ed27a05;hb=784e214dd44eba39f4c34936a27e6cc82948205c;hp=b350ade5fae58f1a17615653d70f2e00dabdd3ff;hpb=58b2c6dfaf64a06eff317235f8ac9b7f73b5bf5a;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index b350ade..4343a14 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -56,6 +56,9 @@ #include #endif +#ifdef TRACING +#include "eventlog/EventLog.h" +#endif /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -137,9 +140,7 @@ static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); -static void scheduleHandleStackOverflow( Capability *cap, Task *task, - StgTSO *t); -static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, +static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ); static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, @@ -148,9 +149,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); static Capability *scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); -static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); -static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso); - static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); @@ -423,6 +421,7 @@ run_thread: cap->in_haskell = rtsTrue; dirty_TSO(cap,t); + dirty_STACK(cap,t->stackobj); #if defined(THREADED_RTS) if (recent_activity == ACTIVITY_DONE_GC) { @@ -485,7 +484,17 @@ run_thread: t->saved_winerror = GetLastError(); #endif - traceEventStopThread(cap, t, ret); + if (ret == ThreadBlocked) { + if (t->why_blocked == BlockedOnBlackHole) { + StgTSO *owner = blackHoleOwner(t->block_info.bh->bh); + traceEventStopThread(cap, t, t->why_blocked + 6, + owner != NULL ? owner->id : 0); + } else { + traceEventStopThread(cap, t, t->why_blocked + 6, 0); + } + } else { + traceEventStopThread(cap, t, ret, 0); + } ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); @@ -500,10 +509,6 @@ run_thread: schedulePostRunThread(cap,t); - if (ret != StackOverflow) { - t = threadStackUnderflow(cap,task,t); - } - ready_to_gc = rtsFalse; switch (ret) { @@ -512,8 +517,11 @@ run_thread: break; case StackOverflow: - scheduleHandleStackOverflow(cap,task,t); - break; + // just adjust the stack for this thread, then pop it back + // on the run queue. + threadStackOverflow(cap, t); + pushOnRunQueue(cap,t); + break; case ThreadYielding: if (scheduleHandleYield(cap, t, prev_what_next)) { @@ -726,8 +734,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (; t != END_TSO_QUEUE; t = next) { next = t->_link; t->_link = END_TSO_QUEUE; - if (t->what_next == ThreadRelocated - || t->bound == task->incall // don't move my bound thread + if (t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); setTSOPrev(cap, t, prev); @@ -1022,6 +1029,10 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE; + if (blocks > BLOCKS_PER_MBLOCK) { + barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc); + } + debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, what_next_strs[t->what_next], blocks); @@ -1031,10 +1042,8 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop // if the nursery has only one block. - ACQUIRE_SM_LOCK - bd = allocGroup( blocks ); - RELEASE_SM_LOCK - cap->r.rNursery->n_blocks += blocks; + bd = allocGroup_lock(blocks); + cap->r.rNursery->n_blocks += blocks; // link the new group into the list bd->link = cap->r.rCurrentNursery; @@ -1093,30 +1102,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } /* ----------------------------------------------------------------------------- - * Handle a thread that returned to the scheduler with ThreadStackOverflow - * -------------------------------------------------------------------------- */ - -static void -scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) -{ - /* just adjust the stack for this thread, then pop it back - * on the run queue. - */ - { - /* enlarge the stack */ - StgTSO *new_t = threadStackOverflow(cap, t); - - /* The TSO attached to this Task may have moved, so update the - * pointer to it. - */ - if (task->incall->tso == t) { - task->incall->tso = new_t; - } - pushOnRunQueue(cap,new_t); - } -} - -/* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadYielding * -------------------------------------------------------------------------- */ @@ -1235,23 +1220,23 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) ASSERT(task->incall->tso == t); if (t->what_next == ThreadComplete) { - if (task->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; + if (task->incall->ret) { + // NOTE: return val is stack->sp[1] (see StgStartup.hc) + *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1]; } - task->stat = Success; + task->incall->stat = Success; } else { - if (task->ret) { - *(task->ret) = NULL; + if (task->incall->ret) { + *(task->incall->ret) = NULL; } if (sched_state >= SCHED_INTERRUPTING) { if (heap_overflow) { - task->stat = HeapExhausted; + task->incall->stat = HeapExhausted; } else { - task->stat = Interrupted; + task->incall->stat = Interrupted; } } else { - task->stat = Killed; + task->incall->stat = Killed; } } #ifdef DEBUG @@ -1533,10 +1518,18 @@ forkProcess(HsStablePtr *entry ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->running_task->lock); + stopTimer(); // See #4074 + +#if defined(TRACING) + flushEventLog(); // so that child won't inherit dirty file buffers +#endif + pid = fork(); if (pid) { // parent + startTimer(); // #4074 + RELEASE_LOCK(&sched_mutex); RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->running_task->lock); @@ -1553,7 +1546,11 @@ forkProcess(HsStablePtr *entry initMutex(&cap->running_task->lock); #endif - // Now, all OS threads except the thread that forked are +#ifdef TRACING + resetTracing(); +#endif + + // Now, all OS threads except the thread that forked are // stopped. We need to stop all Haskell threads, including // those involved in foreign calls. Also we need to delete // all Tasks, because they correspond to OS threads that are @@ -1561,10 +1558,7 @@ forkProcess(HsStablePtr *entry for (g = 0; g < RtsFlags.GcFlags.generations; g++) { for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->_link; - } else { - next = t->global_link; + next = t->global_link; // don't allow threads to catch the ThreadKilled // exception, but we do want to raiseAsync() because these // threads may be evaluating thunks that we need later. @@ -1576,7 +1570,6 @@ forkProcess(HsStablePtr *entry // won't get a chance to exit in the usual way (see // also scheduleHandleThreadFinished). t->bound = NULL; - } } } @@ -1603,7 +1596,8 @@ forkProcess(HsStablePtr *entry // Wipe our spare workers list, they no longer exist. New // workers will be created if necessary. cap->spare_workers = NULL; - cap->returning_tasks_hd = NULL; + cap->n_spare_workers = 0; + cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; #endif @@ -1643,12 +1637,8 @@ deleteAllThreads ( Capability *cap ) debugTrace(DEBUG_sched,"deleting all threads"); for (g = 0; g < RtsFlags.GcFlags.generations; g++) { for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->_link; - } else { next = t->global_link; deleteThread(cap,t); - } } } @@ -1712,13 +1702,17 @@ recoverSuspendedTask (Capability *cap, Task *task) * the whole system. * * The Haskell thread making the C call is put to sleep for the - * duration of the call, on the susepended_ccalling_threads queue. We + * duration of the call, on the suspended_ccalling_threads queue. We * give out a token to the task, which it can use to resume the thread * on return from the C function. + * + * If this is an interruptible C call, this means that the FFI call may be + * unceremoniously terminated and should be scheduled on an + * unbound worker thread. * ------------------------------------------------------------------------- */ void * -suspendThread (StgRegTable *reg) +suspendThread (StgRegTable *reg, rtsBool interruptible) { Capability *cap; int saved_errno; @@ -1740,19 +1734,17 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; - traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL); + traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; threadPaused(cap,tso); - if ((tso->flags & TSO_BLOCKEX) == 0) { - tso->why_blocked = BlockedOnCCall; - tso->flags |= TSO_BLOCKEX; - tso->flags &= ~TSO_INTERRUPTIBLE; + if (interruptible) { + tso->why_blocked = BlockedOnCCall_Interruptible; } else { - tso->why_blocked = BlockedOnCCall_NoUnblockExc; + tso->why_blocked = BlockedOnCCall; } // Hand back capability @@ -1811,17 +1803,16 @@ resumeThread (void *task_) traceEventRunThread(cap, tso); - if (tso->why_blocked == BlockedOnCCall) { + /* Reset blocking status */ + tso->why_blocked = NotBlocked; + + if ((tso->flags & TSO_BLOCKEX) == 0) { // avoid locking the TSO if we don't have to if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { - awakenBlockedExceptionQueue(cap,tso); + maybePerformBlockedException(cap,tso); } - tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } - /* Reset blocking status */ - tso->why_blocked = NotBlocked; - cap->r.rCurrentTSO = tso; cap->in_haskell = rtsTrue; errno = saved_errno; @@ -1831,6 +1822,7 @@ resumeThread (void *task_) /* We might have GC'd, mark the TSO dirty again */ dirty_TSO(cap,tso); + dirty_STACK(cap,tso->stackobj); IF_DEBUG(sanity, checkTSO(tso)); @@ -1887,8 +1879,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) tso->cap = cap; task->incall->tso = tso; - task->ret = ret; - task->stat = NoStatus; + task->incall->ret = ret; + task->incall->stat = NoStatus; appendToRunQueue(cap,tso); @@ -1897,7 +1889,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) cap = schedule(cap,task); - ASSERT(task->stat != NoStatus); + ASSERT(task->incall->stat != NoStatus); ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); @@ -2089,189 +2081,6 @@ performMajorGC(void) performGC_(rtsTrue); } -/* ----------------------------------------------------------------------------- - Stack overflow - - If the thread has reached its maximum stack size, then raise the - StackOverflow exception in the offending thread. Otherwise - relocate the TSO into a larger chunk of memory and adjust its stack - size appropriately. - -------------------------------------------------------------------------- */ - -static StgTSO * -threadStackOverflow(Capability *cap, StgTSO *tso) -{ - nat new_stack_size, stack_words; - lnat new_tso_size; - StgPtr new_sp; - StgTSO *dest; - - IF_DEBUG(sanity,checkTSO(tso)); - - if (tso->stack_size >= tso->max_stack_size - && !(tso->flags & TSO_BLOCKEX)) { - // NB. never raise a StackOverflow exception if the thread is - // inside Control.Exceptino.block. It is impractical to protect - // against stack overflow exceptions, since virtually anything - // can raise one (even 'catch'), so this is the only sensible - // thing to do here. See bug #767. - // - - if (tso->flags & TSO_SQUEEZED) { - return tso; - } - // #3677: In a stack overflow situation, stack squeezing may - // reduce the stack size, but we don't know whether it has been - // reduced enough for the stack check to succeed if we try - // again. Fortunately stack squeezing is idempotent, so all we - // need to do is record whether *any* squeezing happened. If we - // are at the stack's absolute -K limit, and stack squeezing - // happened, then we try running the thread again. The - // TSO_SQUEEZED flag is set by threadPaused() to tell us whether - // squeezing happened or not. - - debugTrace(DEBUG_gc, - "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", - (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size); - IF_DEBUG(gc, - /* If we're debugging, just print out the top of the stack */ - printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, - tso->sp+64))); - - // Send this thread the StackOverflow exception - throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); - return tso; - } - - - // We also want to avoid enlarging the stack if squeezing has - // already released some of it. However, we don't want to get into - // a pathalogical situation where a thread has a nearly full stack - // (near its current limit, but not near the absolute -K limit), - // keeps allocating a little bit, squeezing removes a little bit, - // and then it runs again. So to avoid this, if we squeezed *and* - // there is still less than BLOCK_SIZE_W words free, then we enlarge - // the stack anyway. - if ((tso->flags & TSO_SQUEEZED) && - ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { - return tso; - } - - /* Try to double the current stack size. If that takes us over the - * maximum stack size for this thread, then use the maximum instead - * (that is, unless we're already at or over the max size and we - * can't raise the StackOverflow exception (see above), in which - * case just double the size). Finally round up so the TSO ends up as - * a whole number of blocks. - */ - if (tso->stack_size >= tso->max_stack_size) { - new_stack_size = tso->stack_size * 2; - } else { - new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); - } - new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + - TSO_STRUCT_SIZE)/sizeof(W_); - new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ - new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - - debugTrace(DEBUG_sched, - "increasing stack size from %ld words to %d.", - (long)tso->stack_size, new_stack_size); - - dest = (StgTSO *)allocate(cap,new_tso_size); - TICK_ALLOC_TSO(new_stack_size,0); - - /* copy the TSO block and the old stack into the new area */ - memcpy(dest,tso,TSO_STRUCT_SIZE); - stack_words = tso->stack + tso->stack_size - tso->sp; - new_sp = (P_)dest + new_tso_size - stack_words; - memcpy(new_sp, tso->sp, stack_words * sizeof(W_)); - - /* relocate the stack pointers... */ - dest->sp = new_sp; - dest->stack_size = new_stack_size; - - /* Mark the old TSO as relocated. We have to check for relocated - * TSOs in the garbage collector and any primops that deal with TSOs. - * - * It's important to set the sp value to just beyond the end - * of the stack, so we don't attempt to scavenge any part of the - * dead TSO's stack. - */ - setTSOLink(cap,tso,dest); - write_barrier(); // other threads seeing ThreadRelocated will look at _link - tso->what_next = ThreadRelocated; - tso->sp = (P_)&(tso->stack[tso->stack_size]); - tso->why_blocked = NotBlocked; - - IF_DEBUG(sanity,checkTSO(dest)); -#if 0 - IF_DEBUG(scheduler,printTSO(dest)); -#endif - - return dest; -} - -static StgTSO * -threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) -{ - bdescr *bd, *new_bd; - lnat free_w, tso_size_w; - StgTSO *new_tso; - - tso_size_w = tso_sizeW(tso); - - if (tso_size_w < MBLOCK_SIZE_W || - // TSO is less than 2 mblocks (since the first mblock is - // shorter than MBLOCK_SIZE_W) - (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 || - // or TSO is not a whole number of megablocks (ensuring - // precondition of splitLargeBlock() below) - (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) || - // or TSO is smaller than the minimum stack size (rounded up) - (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) - // or stack is using more than 1/4 of the available space - { - // then do nothing - return tso; - } - - // this is the number of words we'll free - free_w = round_to_mblocks(tso_size_w/2); - - bd = Bdescr((StgPtr)tso); - new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W); - bd->free = bd->start + TSO_STRUCT_SIZEW; - - new_tso = (StgTSO *)new_bd->start; - memcpy(new_tso,tso,TSO_STRUCT_SIZE); - new_tso->stack_size = new_bd->free - new_tso->stack; - - // The original TSO was dirty and probably on the mutable - // list. The new TSO is not yet on the mutable list, so we better - // put it there. - new_tso->dirty = 0; - new_tso->flags &= ~TSO_LINK_DIRTY; - dirty_TSO(cap, new_tso); - - debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", - (long)tso->id, tso_size_w, tso_sizeW(new_tso)); - - tso->_link = new_tso; // no write barrier reqd: same generation - write_barrier(); // other threads seeing ThreadRelocated will look at _link - tso->what_next = ThreadRelocated; - - // The TSO attached to this Task may have moved, so update the - // pointer to it. - if (task->incall->tso == tso) { - task->incall->tso = new_tso; - } - - IF_DEBUG(sanity,checkTSO(new_tso)); - - return new_tso; -} - /* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. @@ -2318,7 +2127,7 @@ void wakeUpRts(void) exception. -------------------------------------------------------------------------- */ -static void +static void deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) { // NOTE: must only be called on a TSO that we have exclusive @@ -2327,19 +2136,19 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) // we must own all Capabilities. if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - throwToSingleThreaded(tso->cap,tso,NULL); + tso->why_blocked != BlockedOnCCall_Interruptible) { + throwToSingleThreaded(tso->cap,tso,NULL); } } #ifdef FORKPROCESS_PRIMOP_SUPPORTED -static void +static void deleteThread_(Capability *cap, StgTSO *tso) { // for forkProcess only: // like deleteThread(), but we delete threads in foreign calls, too. if (tso->why_blocked == BlockedOnCCall || - tso->why_blocked == BlockedOnCCall_NoUnblockExc) { + tso->why_blocked == BlockedOnCCall_Interruptible) { tso->what_next = ThreadKilled; appendToRunQueue(tso->cap, tso); } else { @@ -2387,7 +2196,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // we update any closures pointed to from update frames with the // raise closure that we just built. // - p = tso->sp; + p = tso->stackobj->sp; while(1) { info = get_ret_itbl((StgClosure *)p); next = p + stack_frame_sizeW((StgClosure *)p); @@ -2408,20 +2217,26 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) case ATOMICALLY_FRAME: debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p); - tso->sp = p; + tso->stackobj->sp = p; return ATOMICALLY_FRAME; case CATCH_FRAME: - tso->sp = p; + tso->stackobj->sp = p; return CATCH_FRAME; case CATCH_STM_FRAME: debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p); - tso->sp = p; + tso->stackobj->sp = p; return CATCH_STM_FRAME; - case STOP_FRAME: - tso->sp = p; + case UNDERFLOW_FRAME: + tso->stackobj->sp = p; + threadStackUnderflow(cap,tso); + p = tso->stackobj->sp; + continue; + + case STOP_FRAME: + tso->stackobj->sp = p; return STOP_FRAME; case CATCH_RETRY_FRAME: @@ -2451,12 +2266,12 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) -------------------------------------------------------------------------- */ StgWord -findRetryFrameHelper (StgTSO *tso) +findRetryFrameHelper (Capability *cap, StgTSO *tso) { StgPtr p, next; StgRetInfoTable *info; - p = tso -> sp; + p = tso->stackobj->sp; while (1) { info = get_ret_itbl((StgClosure *)p); next = p + stack_frame_sizeW((StgClosure *)p); @@ -2465,13 +2280,13 @@ findRetryFrameHelper (StgTSO *tso) case ATOMICALLY_FRAME: debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p during retry", p); - tso->sp = p; + tso->stackobj->sp = p; return ATOMICALLY_FRAME; case CATCH_RETRY_FRAME: debugTrace(DEBUG_stm, "found CATCH_RETRY_FRAME at %p during retrry", p); - tso->sp = p; + tso->stackobj->sp = p; return CATCH_RETRY_FRAME; case CATCH_STM_FRAME: { @@ -2480,13 +2295,17 @@ findRetryFrameHelper (StgTSO *tso) debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p during retry", p); debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer); - stmAbortTransaction(tso -> cap, trec); - stmFreeAbortedTRec(tso -> cap, trec); + stmAbortTransaction(cap, trec); + stmFreeAbortedTRec(cap, trec); tso -> trec = outer; p = next; continue; } + case UNDERFLOW_FRAME: + threadStackUnderflow(cap,tso); + p = tso->stackobj->sp; + continue; default: ASSERT(info->i.type != CATCH_FRAME);