X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=e3d025a180c4a62e1b59d7b63fdb83aad5c5d14f;hb=f125eb7e4b8079e79f41f55d60e5b4bf138284ba;hp=7b57c0db32c148432bf849e64b39a27c458df75a;hpb=34bb8868a99962ede85ab7fa67c43fb922d78fc6;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index 7b57c0d..e3d025a 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); -static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); +static void scheduleProcessInbox(Capability *cap); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); @@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task) // If we are a worker, just exit. If we're a bound thread // then we will exit below when we've removed our TSO from // the run queue. - if (task->tso == NULL && emptyRunQueue(cap)) { + if (!isBoundTask(task) && emptyRunQueue(cap)) { return cap; } break; @@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task) // Check whether we can run this thread in the current task. // If not, we have to pass our capability to the right task. { - Task *bound = t->bound; + InCall *bound = t->bound; if (bound) { - if (bound == task) { + if (bound->task == task) { // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, @@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task) } } else { // The thread we want to run is unbound. - if (task->tso) { + if (task->incall->tso) { debugTrace(DEBUG_sched, "this OS thread cannot run thread %lu", (unsigned long)t->id); @@ -441,7 +441,7 @@ run_thread: ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); - ASSERT(t->bound ? t->bound->cap == cap : 1); + ASSERT(t->bound ? t->bound->task->cap == cap : 1); prev_what_next = t->what_next; @@ -463,12 +463,16 @@ run_thread: if (prev == ACTIVITY_DONE_GC) { startTimer(); } - } else { + } else if (recent_activity != ACTIVITY_INACTIVE) { + // If we reached ACTIVITY_INACTIVE, then don't reset it until + // we've done the GC. The thread running here might just be + // the IO manager thread that handle_tick() woke up via + // wakeUpRts(). recent_activity = ACTIVITY_YES; } #endif - traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0); + traceEventRunThread(cap, t); switch (prev_what_next) { @@ -518,7 +522,7 @@ run_thread: t->saved_winerror = GetLastError(); #endif - traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret); + traceEventStopThread(cap, t, ret); #if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that @@ -614,7 +618,7 @@ scheduleFindWork (Capability *cap) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleCheckWakeupThreads(cap); + scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); @@ -635,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task) // and this task it bound). return (waiting_for_gc || cap->returning_tasks_hd != NULL || - (!emptyRunQueue(cap) && (task->tso == NULL + (!emptyRunQueue(cap) && (task->incall->tso == NULL ? cap->run_queue_hd->bound != NULL - : cap->run_queue_hd->bound != task))); + : cap->run_queue_hd->bound != task->incall))); } // This is the single place where a Task goes to sleep. There are @@ -669,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) if (!force_yield && !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || - !emptyWakeupQueue(cap) || + !emptyInbox(cap) || blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -705,7 +709,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; - // migration can be turned off with +RTS -qg + // migration can be turned off with +RTS -qm if (!RtsFlags.ParFlags.migrate) return; // Check whether we have more threads on our run queue, or sparks @@ -721,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (i=0, n_free_caps=0; i < n_capabilities; i++) { cap0 = &capabilities[i]; if (cap != cap0 && tryGrabCapability(cap0,task)) { - if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) { + if (!emptyRunQueue(cap0) + || cap->returning_tasks_hd != NULL + || cap->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -764,7 +770,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, next = t->_link; t->_link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task // don't move my bound thread + || t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); prev = t; @@ -775,12 +781,11 @@ schedulePushWork(Capability *cap USED_IF_THREADS, setTSOLink(cap, prev, t); prev = t; } else { - debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); - traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no); + traceEventMigrateThread (cap, t, free_caps[i]->no); - if (t->bound) { t->bound->cap = free_caps[i]; } + if (t->bound) { t->bound->task->cap = free_caps[i]; } t->cap = free_caps[i]; i++; } @@ -802,7 +807,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); - traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no); + traceEventStealSpark(free_caps[i], t, cap->no); newSpark(&(free_caps[i]->r), spark); } @@ -868,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) * Check for threads woken up by other Capabilities * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) +static void +executeMessage (Capability *cap, Message *m) +{ + const StgInfoTable *i; + +loop: + write_barrier(); // allow m->header to be modified by another thread + i = m->header.info; + if (i == &stg_MSG_WAKEUP_info) + { + MessageWakeup *w = (MessageWakeup *)m; + StgTSO *tso = w->tso; + debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", + (lnat)tso->id); + ASSERT(tso->cap == cap); + ASSERT(tso->why_blocked == BlockedOnMsgWakeup); + ASSERT(tso->block_info.closure == (StgClosure *)m); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); + } + else if (i == &stg_MSG_THROWTO_info) + { + MessageThrowTo *t = (MessageThrowTo *)m; + nat r; + const StgInfoTable *i; + + i = lockClosure((StgClosure*)m); + if (i != &stg_MSG_THROWTO_info) { + unlockClosure((StgClosure*)m, i); + goto loop; + } + + debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", + (lnat)t->source->id, (lnat)t->target->id); + + ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(t->source->block_info.closure == (StgClosure *)m); + + r = throwToMsg(cap, t); + + switch (r) { + case THROWTO_SUCCESS: + ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); + t->source->sp += 3; + unblockOne(cap, t->source); + // this message is done + unlockClosure((StgClosure*)m, &stg_IND_info); + break; + case THROWTO_BLOCKED: + // unlock the message + unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); + break; + } + } + else if (i == &stg_IND_info) + { + // message was revoked + return; + } + else if (i == &stg_WHITEHOLE_info) + { + goto loop; + } + else + { + barf("executeMessage: %p", i); + } +} +#endif + static void -scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +scheduleProcessInbox (Capability *cap USED_IF_THREADS) { #if defined(THREADED_RTS) - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); + Message *m; + + while (!emptyInbox(cap)) { + ACQUIRE_LOCK(&cap->lock); + m = cap->inbox; + cap->inbox = m->link; + RELEASE_LOCK(&cap->lock); + executeMessage(cap, (Message *)m); } #endif } @@ -976,13 +1047,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task) /* Probably a real deadlock. Send the current main thread the * Deadlock exception. */ - if (task->tso) { - switch (task->tso->why_blocked) { + if (task->incall->tso) { + switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnMVar: - throwToSingleThreaded(cap, task->tso, + throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; default: @@ -1171,8 +1242,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) /* The TSO attached to this Task may have moved, so update the * pointer to it. */ - if (task->tso == t) { - task->tso = new_t; + if (task->incall->tso == t) { + task->incall->tso = new_t; } pushOnRunQueue(cap,new_t); } @@ -1185,36 +1256,32 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) { - // Reset the context switch flag. We don't do this just before - // running the thread, because that would mean we would lose ticks - // during GC, which can lead to unfair scheduling (a thread hogs - // the CPU because the tick always arrives during GC). This way - // penalises threads that do a lot of allocation, but that seems - // better than the alternative. - cap->context_switch = 0; - /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake * up the GC thread. getThread will block during a GC until the * GC is finished. */ -#ifdef DEBUG - if (t->what_next != prev_what_next) { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped to switch evaluators", - (long)t->id, what_next_strs[t->what_next]); - } -#endif - + ASSERT(t->_link == END_TSO_QUEUE); // Shortcut if we're just switching evaluators: don't bother // doing stack squeezing (which can be expensive), just run the // thread. - if (t->what_next != prev_what_next) { + if (cap->context_switch == 0 && t->what_next != prev_what_next) { + debugTrace(DEBUG_sched, + "--<< thread %ld (%s) stopped to switch evaluators", + (long)t->id, what_next_strs[t->what_next]); return rtsTrue; } + // Reset the context switch flag. We don't do this just before + // running the thread, because that would mean we would lose ticks + // during GC, which can lead to unfair scheduling (a thread hogs + // the CPU because the tick always arrives during GC). This way + // penalises threads that do a lot of allocation, but that seems + // better than the alternative. + cap->context_switch = 0; + IF_DEBUG(sanity, //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); @@ -1269,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) */ // blocked exceptions can now complete, even if the thread was in - // blocked mode (see #2910). This unconditionally calls - // lockTSO(), which ensures that we don't miss any threads that - // are engaged in throwTo() with this thread as a target. + // blocked mode (see #2910). awakenBlockedExceptionQueue (cap, t); // @@ -1286,7 +1351,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) if (t->bound) { - if (t->bound != task) { + if (t->bound != task->incall) { #if !defined(THREADED_RTS) // Must be a bound thread that is not the topmost one. Leave // it on the run queue until the stack has unwound to the @@ -1303,12 +1368,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) #endif } - ASSERT(task->tso == t); + ASSERT(task->incall->tso == t); if (t->what_next == ThreadComplete) { if (task->ret) { // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(task->ret) = (StgClosure *)task->tso->sp[1]; + *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; } task->stat = Success; } else { @@ -1326,8 +1391,19 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) } } #ifdef DEBUG - removeThreadLabel((StgWord)task->tso->id); + removeThreadLabel((StgWord)task->incall->tso->id); #endif + + // We no longer consider this thread and task to be bound to + // each other. The TSO lives on until it is GC'd, but the + // task is about to be released by the caller, and we don't + // want anyone following the pointer from the TSO to the + // defunct task (which might have already been + // re-used). This was a real bug: the GC updated + // tso->bound->tso which lead to a deadlock. + t->bound = NULL; + task->incall->tso = NULL; + return rtsTrue; // tells schedule() to return } @@ -1418,11 +1494,11 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (gc_type == PENDING_GC_SEQ) { - traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); + traceEventRequestSeqGc(cap); } else { - traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); + traceEventRequestParGc(cap); debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } @@ -1478,8 +1554,8 @@ delete_threads_and_gc: heap_census = scheduleNeedHeapProfile(rtsTrue); + traceEventGcStart(cap); #if defined(THREADED_RTS) - traceSchedEvent(cap, EVENT_GC_START, 0, 0); // reset waiting_for_gc *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. waiting_for_gc = 0; @@ -1487,7 +1563,7 @@ delete_threads_and_gc: #else GarbageCollect(force_major || heap_census, 0, cap); #endif - traceSchedEvent(cap, EVENT_GC_END, 0, 0); + traceEventGcEnd(cap); if (recent_activity == ACTIVITY_INACTIVE && force_major) { @@ -1576,7 +1652,6 @@ forkProcess(HsStablePtr *entry ) { #ifdef FORKPROCESS_PRIMOP_SUPPORTED - Task *task; pid_t pid; StgTSO* t,*next; Capability *cap; @@ -1638,6 +1713,13 @@ forkProcess(HsStablePtr *entry // exception, but we do want to raiseAsync() because these // threads may be evaluating thunks that we need later. deleteThread_(cap,t); + + // stop the GC from updating the InCall to point to + // the TSO. This is only necessary because the + // OSThread bound to the TSO has been killed, and + // won't get a chance to exit in the usual way (see + // also scheduleHandleThreadFinished). + t->bound = NULL; } } } @@ -1651,7 +1733,7 @@ forkProcess(HsStablePtr *entry // Any suspended C-calling Tasks are no more, their OS threads // don't exist now: - cap->suspended_ccalling_tasks = NULL; + cap->suspended_ccalls = NULL; // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. @@ -1659,17 +1741,7 @@ forkProcess(HsStablePtr *entry generations[g].threads = END_TSO_QUEUE; } - // Wipe the task list, except the current Task. - ACQUIRE_LOCK(&sched_mutex); - for (task = all_tasks; task != NULL; task=task->all_link) { - if (task != cap->running_task) { -#if defined(THREADED_RTS) - initMutex(&task->lock); // see #1391 -#endif - discardTask(task); - } - } - RELEASE_LOCK(&sched_mutex); + discardTasksExcept(cap->running_task); #if defined(THREADED_RTS) // Wipe our spare workers list, they no longer exist. New @@ -1737,35 +1809,41 @@ deleteAllThreads ( Capability *cap ) } /* ----------------------------------------------------------------------------- - Managing the suspended_ccalling_tasks list. + Managing the suspended_ccalls list. Locks required: sched_mutex -------------------------------------------------------------------------- */ STATIC_INLINE void suspendTask (Capability *cap, Task *task) { - ASSERT(task->next == NULL && task->prev == NULL); - task->next = cap->suspended_ccalling_tasks; - task->prev = NULL; - if (cap->suspended_ccalling_tasks) { - cap->suspended_ccalling_tasks->prev = task; - } - cap->suspended_ccalling_tasks = task; + InCall *incall; + + incall = task->incall; + ASSERT(incall->next == NULL && incall->prev == NULL); + incall->next = cap->suspended_ccalls; + incall->prev = NULL; + if (cap->suspended_ccalls) { + cap->suspended_ccalls->prev = incall; + } + cap->suspended_ccalls = incall; } STATIC_INLINE void recoverSuspendedTask (Capability *cap, Task *task) { - if (task->prev) { - task->prev->next = task->next; + InCall *incall; + + incall = task->incall; + if (incall->prev) { + incall->prev->next = incall->next; } else { - ASSERT(cap->suspended_ccalling_tasks == task); - cap->suspended_ccalling_tasks = task->next; + ASSERT(cap->suspended_ccalls == incall); + cap->suspended_ccalls = incall->next; } - if (task->next) { - task->next->prev = task->prev; + if (incall->next) { + incall->next->prev = incall->prev; } - task->next = task->prev = NULL; + incall->next = incall->prev = NULL; } /* --------------------------------------------------------------------------- @@ -1806,7 +1884,7 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; - traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL); + traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; @@ -1822,7 +1900,8 @@ suspendThread (StgRegTable *reg) } // Hand back capability - task->suspended_tso = tso; + task->incall->suspended_tso = tso; + task->incall->suspended_cap = cap; ACQUIRE_LOCK(&cap->lock); @@ -1843,6 +1922,7 @@ StgRegTable * resumeThread (void *task_) { StgTSO *tso; + InCall *incall; Capability *cap; Task *task = task_; int saved_errno; @@ -1855,25 +1935,29 @@ resumeThread (void *task_) saved_winerror = GetLastError(); #endif - cap = task->cap; + incall = task->incall; + cap = incall->suspended_cap; + task->cap = cap; + // Wait for permission to re-enter the RTS with the result. waitForReturnCapability(&cap,task); // we might be on a different capability now... but if so, our - // entry on the suspended_ccalling_tasks list will also have been + // entry on the suspended_ccalls list will also have been // migrated. // Remove the thread from the suspended list recoverSuspendedTask(cap,task); - tso = task->suspended_tso; - task->suspended_tso = NULL; + tso = incall->suspended_tso; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd - traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next); + traceEventRunThread(cap, tso); if (tso->why_blocked == BlockedOnCCall) { // avoid locking the TSO if we don't have to - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); } tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); @@ -1925,7 +2009,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no); + traceEventMigrateThread (cap, tso, capabilities[cpu].no); wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else @@ -1937,29 +2021,31 @@ Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { Task *task; + StgThreadID id; // We already created/initialised the Task task = cap->running_task; // This TSO is now a bound thread; make the Task and TSO // point to each other. - tso->bound = task; + tso->bound = task->incall; tso->cap = cap; - task->tso = tso; + task->incall->tso = tso; task->ret = ret; task->stat = NoStatus; appendToRunQueue(cap,tso); - debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id); + id = tso->id; + debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id); cap = schedule(cap,task); ASSERT(task->stat != NoStatus); ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id); + debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); return cap; } @@ -1968,23 +2054,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -void OSThreadProcAttr -workerStart(Task *task) +void scheduleWorker (Capability *cap, Task *task) { - Capability *cap; - - // See startWorkerTask(). - ACQUIRE_LOCK(&task->lock); - cap = task->cap; - RELEASE_LOCK(&task->lock); - - if (RtsFlags.ParFlags.setAffinity) { - setThreadAffinity(cap->no, n_capabilities); - } - - // set the thread-local pointer to the Task: - taskEnter(task); - // schedule() runs without a lock. cap = schedule(cap,task); @@ -2050,6 +2121,8 @@ initScheduler(void) initSparkPools(); #endif + RELEASE_LOCK(&sched_mutex); + #if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for @@ -2063,13 +2136,11 @@ initScheduler(void) for (i = 1; i < n_capabilities; i++) { cap = &capabilities[i]; ACQUIRE_LOCK(&cap->lock); - startWorkerTask(cap, workerStart); + startWorkerTask(cap); RELEASE_LOCK(&cap->lock); } } #endif - - RELEASE_LOCK(&sched_mutex); } void @@ -2089,7 +2160,8 @@ exitScheduler( if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; waitForReturnCapability(&task->cap,task); - scheduleDoGC(task->cap,task,rtsFalse); + scheduleDoGC(task->cap,task,rtsFalse); + ASSERT(task->incall->tso == NULL); releaseCapability(task->cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2099,6 +2171,7 @@ exitScheduler( nat i; for (i = 0; i < n_capabilities; i++) { + ASSERT(task->incall->tso == NULL); shutdownCapability(&capabilities[i], task, wait_foreign); } } @@ -2147,7 +2220,7 @@ performGC_(rtsBool force_major) // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the - // suspended_ccalling_tasks queue. + // suspended_ccalls queue. task = newBoundTask(); waitForReturnCapability(&task->cap,task); @@ -2187,10 +2260,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) IF_DEBUG(sanity,checkTSO(tso)); - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { // NB. never raise a StackOverflow exception if the thread is @@ -2222,7 +2291,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp+64))); // Send this thread the StackOverflow exception - unlockTSO(tso); throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); return tso; } @@ -2238,7 +2306,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // the stack anyway. if ((tso->flags & TSO_SQUEEZED) && ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { - unlockTSO(tso); return tso; } @@ -2288,9 +2355,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - unlockTSO(dest); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(dest)); #if 0 IF_DEBUG(scheduler,printTSO(dest)); @@ -2323,10 +2387,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) return tso; } - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - // this is the number of words we'll free free_w = round_to_mblocks(tso_size_w/2); @@ -2353,13 +2413,10 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) // The TSO attached to this Task may have moved, so update the // pointer to it. - if (task->tso == tso) { - task->tso = new_tso; + if (task->incall->tso == tso) { + task->incall->tso = new_tso; } - unlockTSO(new_tso); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(new_tso)); return new_tso; @@ -2545,7 +2602,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure); + UPD_IND(cap, ((StgUpdateFrame *)p)->updatee, + (StgClosure *)raise_closure); p = next; continue; @@ -2689,61 +2747,9 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; - case BlockedOnException: - // throwTo should never block indefinitely: if the target - // thread dies or completes, throwTo returns. - barf("resurrectThreads: thread BlockedOnException"); - break; default: - barf("resurrectThreads: thread blocked in a strange way"); + barf("resurrectThreads: thread blocked in a strange way: %d", + tso->why_blocked); } } } - -/* ----------------------------------------------------------------------------- - performPendingThrowTos is called after garbage collection, and - passed a list of threads that were found to have pending throwTos - (tso->blocked_exceptions was not empty), and were blocked. - Normally this doesn't happen, because we would deliver the - exception directly if the target thread is blocked, but there are - small windows where it might occur on a multiprocessor (see - throwTo()). - - NB. we must be holding all the capabilities at this point, just - like resurrectThreads(). - -------------------------------------------------------------------------- */ - -void -performPendingThrowTos (StgTSO *threads) -{ - StgTSO *tso, *next; - Capability *cap; - Task *task, *saved_task;; - generation *gen; - - task = myTask(); - cap = task->cap; - - for (tso = threads; tso != END_TSO_QUEUE; tso = next) { - next = tso->global_link; - - gen = Bdescr((P_)tso)->gen; - tso->global_link = gen->threads; - gen->threads = tso; - - debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - - // We must pretend this Capability belongs to the current Task - // for the time being, as invariants will be broken otherwise. - // In fact the current Task has exclusive access to the systme - // at this point, so this is just bookkeeping: - task->cap = tso->cap; - saved_task = tso->cap->running_task; - tso->cap->running_task = task; - maybePerformBlockedException(tso->cap, tso); - tso->cap->running_task = saved_task; - } - - // Restore our original Capability: - task->cap = cap; -}