X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FSchedule.c;h=0850749b36e3d660cc9d57e9e2f82c91a259e217;hp=e3d025a180c4a62e1b59d7b63fdb83aad5c5d14f;hb=83d563cb9ede0ba792836e529b1e2929db926355;hpb=f125eb7e4b8079e79f41f55d60e5b4bf138284ba diff --git a/rts/Schedule.c b/rts/Schedule.c index e3d025a..0850749 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -39,6 +39,7 @@ #include "Threads.h" #include "Timer.h" #include "ThreadPaused.h" +#include "Messages.h" #ifdef HAVE_SYS_TYPES_H #include @@ -66,17 +67,6 @@ StgTSO *blocked_queue_tl = NULL; StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? #endif -/* Threads blocked on blackholes. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *blackhole_queue = NULL; - -/* The blackhole_queue should be checked for threads to wake up. See - * Schedule.h for more thorough comment. - * LOCK: none (doesn't matter if we miss an update) - */ -rtsBool blackholes_need_checking = rtsFalse; - /* Set to true when the latest garbage collection failed to reclaim * enough space, and the runtime should proceed to shut itself down in * an orderly fashion (emitting profiling info etc.) @@ -135,12 +125,11 @@ static Capability *schedule (Capability *initialCapability, Task *task); static void schedulePreLoop (void); static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void scheduleYield (Capability **pcap, Task *task, rtsBool); +static void scheduleYield (Capability **pcap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleProcessInbox(Capability *cap); -static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); #if defined(THREADED_RTS) @@ -159,8 +148,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); static Capability *scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); -static rtsBool checkBlackHoles(Capability *cap); - static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso); @@ -171,17 +158,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -/* ----------------------------------------------------------------------------- - * Putting a thread on the run queue: different scheduling policies - * -------------------------------------------------------------------------- */ - -STATIC_INLINE void -addToRunQueue( Capability *cap, StgTSO *t ) -{ - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); -} - /* --------------------------------------------------------------------------- Main scheduling loop. @@ -228,7 +204,6 @@ schedule (Capability *initialCapability, Task *task) rtsBool ready_to_gc; #if defined(THREADED_RTS) rtsBool first = rtsTrue; - rtsBool force_yield = rtsFalse; #endif cap = initialCapability; @@ -352,9 +327,7 @@ schedule (Capability *initialCapability, Task *task) // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); } - yield: - scheduleYield(&cap,task,force_yield); - force_yield = rtsFalse; + scheduleYield(&cap,task); if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -433,9 +406,6 @@ run_thread: startHeapProfTimer(); - // Check for exceptions blocked on this thread - maybePerformBlockedException (cap, t); - // ---------------------------------------------------------------------- // Run the current thread @@ -506,13 +476,6 @@ run_thread: // happened. So find the new location: t = cap->r.rCurrentTSO; - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - // Lock-free test here should be ok, we're just setting a flag. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } - // And save the current errno in this thread. // XXX: possibly bogus for SMP because this thread might already // be running again, see code below. @@ -524,19 +487,6 @@ run_thread: traceEventStopThread(cap, t, ret); -#if defined(THREADED_RTS) - // If ret is ThreadBlocked, and this Task is bound to the TSO that - // blocked, we are in limbo - the TSO is now owned by whatever it - // is blocked on, and may in fact already have been woken up, - // perhaps even on a different Capability. It may be the case - // that task->cap != cap. We better yield this Capability - // immediately and return to normaility. - if (ret == ThreadBlocked) { - force_yield = rtsTrue; - goto yield; - } -#endif - ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); @@ -591,6 +541,30 @@ run_thread: } /* end of while() */ } +/* ----------------------------------------------------------------------------- + * Run queue operations + * -------------------------------------------------------------------------- */ + +void +removeFromRunQueue (Capability *cap, StgTSO *tso) +{ + if (tso->block_info.prev == END_TSO_QUEUE) { + ASSERT(cap->run_queue_hd == tso); + cap->run_queue_hd = tso->_link; + } else { + setTSOLink(cap, tso->block_info.prev, tso->_link); + } + if (tso->_link == END_TSO_QUEUE) { + ASSERT(cap->run_queue_tl == tso); + cap->run_queue_tl = tso->block_info.prev; + } else { + setTSOPrev(cap, tso->_link, tso->block_info.prev); + } + tso->_link = tso->block_info.prev = END_TSO_QUEUE; + + IF_DEBUG(sanity, checkRunQueue(cap)); +} + /* ---------------------------------------------------------------------------- * Setting up the scheduler loop * ------------------------------------------------------------------------- */ @@ -612,12 +586,6 @@ scheduleFindWork (Capability *cap) { scheduleStartSignalHandlers(cap); - // Only check the black holes here if we've nothing else to do. - // During normal execution, the black hole list only gets checked - // at GC time, to avoid repeatedly traversing this possibly long - // list each time around the scheduler. - if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); @@ -655,26 +623,15 @@ shouldYieldCapability (Capability *cap, Task *task) // and also check the benchmarks in nofib/parallel for regressions. static void -scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) +scheduleYield (Capability **pcap, Task *task) { Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. // - // The force_yield flag is used when a bound thread blocks. This - // is a particularly tricky situation: the current Task does not - // own the TSO any more, since it is on some queue somewhere, and - // might be woken up or manipulated by another thread at any time. - // The TSO and Task might be migrated to another Capability. - // Certain invariants might be in doubt, such as task->bound->cap - // == cap. We have to yield the current Capability immediately, - // no messing around. - // - if (!force_yield && - !shouldYieldCapability(cap,task) && + if (!shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || !emptyInbox(cap) || - blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -773,12 +730,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS, || t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); + setTSOPrev(cap, t, prev); prev = t; } else if (i == n_free_caps) { pushed_to_all = rtsTrue; i = 0; // keep one for us setTSOLink(cap, prev, t); + setTSOPrev(cap, t, prev); prev = t; } else { appendToRunQueue(free_caps[i],t); @@ -791,6 +750,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } } cap->run_queue_tl = prev; + + IF_DEBUG(sanity, checkRunQueue(cap)); } #ifdef SPARK_PUSHING @@ -863,126 +824,12 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) // if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) ) { - awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking ); - } -#endif -} - - -/* ---------------------------------------------------------------------------- - * Check for threads woken up by other Capabilities - * ------------------------------------------------------------------------- */ - -#if defined(THREADED_RTS) -static void -executeMessage (Capability *cap, Message *m) -{ - const StgInfoTable *i; - -loop: - write_barrier(); // allow m->header to be modified by another thread - i = m->header.info; - if (i == &stg_MSG_WAKEUP_info) - { - MessageWakeup *w = (MessageWakeup *)m; - StgTSO *tso = w->tso; - debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", - (lnat)tso->id); - ASSERT(tso->cap == cap); - ASSERT(tso->why_blocked == BlockedOnMsgWakeup); - ASSERT(tso->block_info.closure == (StgClosure *)m); - tso->why_blocked = NotBlocked; - appendToRunQueue(cap, tso); - } - else if (i == &stg_MSG_THROWTO_info) - { - MessageThrowTo *t = (MessageThrowTo *)m; - nat r; - const StgInfoTable *i; - - i = lockClosure((StgClosure*)m); - if (i != &stg_MSG_THROWTO_info) { - unlockClosure((StgClosure*)m, i); - goto loop; - } - - debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", - (lnat)t->source->id, (lnat)t->target->id); - - ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(t->source->block_info.closure == (StgClosure *)m); - - r = throwToMsg(cap, t); - - switch (r) { - case THROWTO_SUCCESS: - ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); - t->source->sp += 3; - unblockOne(cap, t->source); - // this message is done - unlockClosure((StgClosure*)m, &stg_IND_info); - break; - case THROWTO_BLOCKED: - // unlock the message - unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); - break; - } - } - else if (i == &stg_IND_info) - { - // message was revoked - return; - } - else if (i == &stg_WHITEHOLE_info) - { - goto loop; - } - else - { - barf("executeMessage: %p", i); - } -} -#endif - -static void -scheduleProcessInbox (Capability *cap USED_IF_THREADS) -{ -#if defined(THREADED_RTS) - Message *m; - - while (!emptyInbox(cap)) { - ACQUIRE_LOCK(&cap->lock); - m = cap->inbox; - cap->inbox = m->link; - RELEASE_LOCK(&cap->lock); - executeMessage(cap, (Message *)m); + awaitEvent (emptyRunQueue(cap)); } #endif } /* ---------------------------------------------------------------------------- - * Check for threads blocked on BLACKHOLEs that can be woken up - * ------------------------------------------------------------------------- */ -static void -scheduleCheckBlackHoles (Capability *cap) -{ - if ( blackholes_need_checking ) // check without the lock first - { - ACQUIRE_LOCK(&sched_mutex); - if ( blackholes_need_checking ) { - blackholes_need_checking = rtsFalse; - // important that we reset the flag *before* checking the - // blackhole queue, otherwise we could get deadlock. This - // happens as follows: we wake up a thread that - // immediately runs on another Capability, blocks on a - // blackhole, and then we reset the blackholes_need_checking flag. - checkBlackHoles(cap); - } - RELEASE_LOCK(&sched_mutex); - } -} - -/* ---------------------------------------------------------------------------- * Detect deadlock conditions and attempt to resolve them. * ------------------------------------------------------------------------- */ @@ -1090,6 +937,26 @@ scheduleSendPendingMessages(void) #endif /* ---------------------------------------------------------------------------- + * Process message in the current Capability's inbox + * ------------------------------------------------------------------------- */ + +static void +scheduleProcessInbox (Capability *cap USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + Message *m; + + while (!emptyInbox(cap)) { + ACQUIRE_LOCK(&cap->lock); + m = cap->inbox; + cap->inbox = m->link; + RELEASE_LOCK(&cap->lock); + executeMessage(cap, (Message *)m); + } +#endif +} + +/* ---------------------------------------------------------------------------- * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ @@ -1217,7 +1084,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) // context switch flag, and we end up waiting for a GC. // See #1984, and concurrent/should_run/1984 cap->context_switch = 0; - addToRunQueue(cap,t); + appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); } @@ -1286,7 +1153,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); - addToRunQueue(cap,t); + appendToRunQueue(cap,t); return rtsFalse; } @@ -1309,9 +1176,6 @@ scheduleHandleThreadBlocked( StgTSO *t // ASSERT(t->why_blocked != NotBlocked); // Not true: for example, - // - in THREADED_RTS, the thread may already have been woken - // up by another Capability. This actually happens: try - // conc023 +RTS -N2. // - the thread may have woken itself up already, because // threadPaused() might have raised a blocked throwTo // exception, see maybePerformBlockedException(). @@ -1371,23 +1235,23 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) ASSERT(task->incall->tso == t); if (t->what_next == ThreadComplete) { - if (task->ret) { + if (task->incall->ret) { // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; + *(task->incall->ret) = (StgClosure *)task->incall->tso->sp[1]; } - task->stat = Success; + task->incall->stat = Success; } else { - if (task->ret) { - *(task->ret) = NULL; + if (task->incall->ret) { + *(task->incall->ret) = NULL; } if (sched_state >= SCHED_INTERRUPTING) { if (heap_overflow) { - task->stat = HeapExhausted; + task->incall->stat = HeapExhausted; } else { - task->stat = Interrupted; + task->incall->stat = Interrupted; } } else { - task->stat = Killed; + task->incall->stat = Killed; } } #ifdef DEBUG @@ -1502,9 +1366,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } - // do this while the other Capabilities stop: - if (cap) scheduleCheckBlackHoles(cap); - if (gc_type == PENDING_GC_SEQ) { // single-threaded GC: grab all the capabilities @@ -1532,11 +1393,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) waitForGcThreads(cap); } -#else /* !THREADED_RTS */ - - // do this while the other Capabilities stop: - if (cap) scheduleCheckBlackHoles(cap); - #endif IF_DEBUG(scheduler, printAllThreads()); @@ -1677,10 +1533,14 @@ forkProcess(HsStablePtr *entry ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->running_task->lock); + stopTimer(); // See #4074 + pid = fork(); if (pid) { // parent + startTimer(); // #4074 + RELEASE_LOCK(&sched_mutex); RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->running_task->lock); @@ -1856,13 +1716,17 @@ recoverSuspendedTask (Capability *cap, Task *task) * the whole system. * * The Haskell thread making the C call is put to sleep for the - * duration of the call, on the susepended_ccalling_threads queue. We + * duration of the call, on the suspended_ccalling_threads queue. We * give out a token to the task, which it can use to resume the thread * on return from the C function. + * + * If this is an interruptible C call, this means that the FFI call may be + * unceremoniously terminated and should be scheduled on an + * unbound worker thread. * ------------------------------------------------------------------------- */ void * -suspendThread (StgRegTable *reg) +suspendThread (StgRegTable *reg, rtsBool interruptible) { Capability *cap; int saved_errno; @@ -1891,12 +1755,10 @@ suspendThread (StgRegTable *reg) threadPaused(cap,tso); - if ((tso->flags & TSO_BLOCKEX) == 0) { - tso->why_blocked = BlockedOnCCall; - tso->flags |= TSO_BLOCKEX; - tso->flags &= ~TSO_INTERRUPTIBLE; + if (interruptible) { + tso->why_blocked = BlockedOnCCall_Interruptible; } else { - tso->why_blocked = BlockedOnCCall_NoUnblockExc; + tso->why_blocked = BlockedOnCCall; } // Hand back capability @@ -1955,12 +1817,11 @@ resumeThread (void *task_) traceEventRunThread(cap, tso); - if (tso->why_blocked == BlockedOnCCall) { + if ((tso->flags & TSO_BLOCKEX) == 0) { // avoid locking the TSO if we don't have to if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); } - tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } /* Reset blocking status */ @@ -2009,8 +1870,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - traceEventMigrateThread (cap, tso, capabilities[cpu].no); - wakeupThreadOnCapability(cap, &capabilities[cpu], tso); + migrateThread(cap, tso, &capabilities[cpu]); } #else appendToRunQueue(cap,tso); @@ -2032,8 +1892,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) tso->cap = cap; task->incall->tso = tso; - task->ret = ret; - task->stat = NoStatus; + task->incall->ret = ret; + task->incall->stat = NoStatus; appendToRunQueue(cap,tso); @@ -2042,7 +1902,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) cap = schedule(cap,task); - ASSERT(task->stat != NoStatus); + ASSERT(task->incall->stat != NoStatus); ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); @@ -2096,8 +1956,6 @@ initScheduler(void) sleeping_queue = END_TSO_QUEUE; #endif - blackhole_queue = END_TSO_QUEUE; - sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; @@ -2144,12 +2002,7 @@ initScheduler(void) } void -exitScheduler( - rtsBool wait_foreign -#if !defined(THREADED_RTS) - __attribute__((unused)) -#endif -) +exitScheduler (rtsBool wait_foreign USED_IF_THREADS) /* see Capability.c, shutdownCapability() */ { Task *task = NULL; @@ -2350,8 +2203,9 @@ threadStackOverflow(Capability *cap, StgTSO *tso) * of the stack, so we don't attempt to scavenge any part of the * dead TSO's stack. */ - tso->what_next = ThreadRelocated; setTSOLink(cap,tso,dest); + write_barrier(); // other threads seeing ThreadRelocated will look at _link + tso->what_next = ThreadRelocated; tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; @@ -2408,8 +2262,9 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", (long)tso->id, tso_size_w, tso_sizeW(new_tso)); - tso->what_next = ThreadRelocated; tso->_link = new_tso; // no write barrier reqd: same generation + write_barrier(); // other threads seeing ThreadRelocated will look at _link + tso->what_next = ThreadRelocated; // The TSO attached to this Task may have moved, so update the // pointer to it. @@ -2461,57 +2316,6 @@ void wakeUpRts(void) #endif /* ----------------------------------------------------------------------------- - * checkBlackHoles() - * - * Check the blackhole_queue for threads that can be woken up. We do - * this periodically: before every GC, and whenever the run queue is - * empty. - * - * An elegant solution might be to just wake up all the blocked - * threads with awakenBlockedQueue occasionally: they'll go back to - * sleep again if the object is still a BLACKHOLE. Unfortunately this - * doesn't give us a way to tell whether we've actually managed to - * wake up any threads, so we would be busy-waiting. - * - * -------------------------------------------------------------------------- */ - -static rtsBool -checkBlackHoles (Capability *cap) -{ - StgTSO **prev, *t; - rtsBool any_woke_up = rtsFalse; - StgHalfWord type; - - // blackhole_queue is global: - ASSERT_LOCK_HELD(&sched_mutex); - - debugTrace(DEBUG_sched, "checking threads blocked on black holes"); - - // ASSUMES: sched_mutex - prev = &blackhole_queue; - t = blackhole_queue; - while (t != END_TSO_QUEUE) { - if (t->what_next == ThreadRelocated) { - t = t->_link; - continue; - } - ASSERT(t->why_blocked == BlockedOnBlackHole); - type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; - if (type != BLACKHOLE && type != CAF_BLACKHOLE) { - IF_DEBUG(sanity,checkTSO(t)); - t = unblockOne(cap, t); - *prev = t; - any_woke_up = rtsTrue; - } else { - prev = &t->_link; - t = t->_link; - } - } - - return any_woke_up; -} - -/* ----------------------------------------------------------------------------- Deleting threads This is used for interruption (^C) and forking, and corresponds to @@ -2520,7 +2324,7 @@ checkBlackHoles (Capability *cap) -------------------------------------------------------------------------- */ static void -deleteThread (Capability *cap, StgTSO *tso) +deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) { // NOTE: must only be called on a TSO that we have exclusive // access to, because we will call throwToSingleThreaded() below. @@ -2528,8 +2332,8 @@ deleteThread (Capability *cap, StgTSO *tso) // we must own all Capabilities. if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - throwToSingleThreaded(cap,tso,NULL); + tso->why_blocked != BlockedOnCCall_Interruptible) { + throwToSingleThreaded(tso->cap,tso,NULL); } } @@ -2540,9 +2344,9 @@ deleteThread_(Capability *cap, StgTSO *tso) // like deleteThread(), but we delete threads in foreign calls, too. if (tso->why_blocked == BlockedOnCCall || - tso->why_blocked == BlockedOnCCall_NoUnblockExc) { - unblockOne(cap,tso); + tso->why_blocked == BlockedOnCCall_Interruptible) { tso->what_next = ThreadKilled; + appendToRunQueue(tso->cap, tso); } else { deleteThread(cap,tso); } @@ -2602,8 +2406,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(cap, ((StgUpdateFrame *)p)->updatee, - (StgClosure *)raise_closure); + updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee, + (StgClosure *)raise_closure); p = next; continue; @@ -2747,6 +2551,12 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; + case BlockedOnMsgThrowTo: + // This can happen if the target is masking, blocks on a + // black hole, and then is found to be unreachable. In + // this case, we want to let the target wake up and carry + // on, and do nothing to this thread. + continue; default: barf("resurrectThreads: thread blocked in a strange way: %d", tso->why_blocked);