X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=c6fb1d885e61cb23bcf8b7b986fcc993eaf8cd6a;hb=0ba8b0bed96c78570bc750e0413d555b6939c16c;hp=5fa949cd4a0689fe252650b4405fb100873d4e27;hpb=fac738e582dcaca1575f5291c83910db01d25284;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index 5fa949c..c6fb1d8 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -118,12 +118,6 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; -/* Linked list of all threads. - * Used for detecting garbage collected threads. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *all_threads = NULL; - /* flag set by signal handler to precipitate a context switch * LOCK: none (just an advisory flag) */ @@ -206,7 +200,7 @@ static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish); #if defined(PAR) || defined(GRAN) static void scheduleGranParReport(void); #endif -static void schedulePostRunThread(void); +static void schedulePostRunThread(StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); static void scheduleHandleStackOverflow( Capability *cap, Task *task, StgTSO *t); @@ -556,8 +550,6 @@ schedule (Capability *initialCapability, Task *task) } #endif - cap->r.rCurrentTSO = t; - /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 @@ -569,6 +561,11 @@ schedule (Capability *initialCapability, Task *task) run_thread: + // CurrentTSO is the thread to run. t might be different if we + // loop back to run_thread, so make sure to set CurrentTSO after + // that. + cap->r.rCurrentTSO = t; + debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", (long)t->id, whatNext_strs[t->what_next]); @@ -592,7 +589,7 @@ run_thread: cap->in_haskell = rtsTrue; - dirtyTSO(t); + dirty_TSO(cap,t); #if defined(THREADED_RTS) if (recent_activity == ACTIVITY_DONE_GC) { @@ -682,7 +679,7 @@ run_thread: CCCS = CCS_SYSTEM; #endif - schedulePostRunThread(); + schedulePostRunThread(t); t = threadStackUnderflow(task,t); @@ -768,7 +765,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. - if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) + if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE) && sparkPoolSizeCap(cap) < 2) { return; } @@ -809,21 +806,21 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (cap->run_queue_hd != END_TSO_QUEUE) { prev = cap->run_queue_hd; - t = prev->link; - prev->link = END_TSO_QUEUE; + t = prev->_link; + prev->_link = END_TSO_QUEUE; for (; t != END_TSO_QUEUE; t = next) { - next = t->link; - t->link = END_TSO_QUEUE; + next = t->_link; + t->_link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated || t->bound == task // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread - prev->link = t; + setTSOLink(cap, prev, t); prev = t; } else if (i == n_free_caps) { pushed_to_all = rtsTrue; i = 0; // keep one for us - prev->link = t; + setTSOLink(cap, prev, t); prev = t; } else { debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); @@ -919,7 +916,7 @@ scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) cap->run_queue_hd = cap->wakeup_queue_hd; cap->run_queue_tl = cap->wakeup_queue_tl; } else { - cap->run_queue_tl->link = cap->wakeup_queue_hd; + setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); cap->run_queue_tl = cap->wakeup_queue_tl; } cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; @@ -1008,6 +1005,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // either we have threads to run, or we were interrupted: ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING); + + return; } #endif @@ -1456,8 +1455,36 @@ JB: TODO: investigate wether state change field could be nuked * ------------------------------------------------------------------------- */ static void -schedulePostRunThread(void) +schedulePostRunThread (StgTSO *t) { + // We have to be able to catch transactions that are in an + // infinite loop as a result of seeing an inconsistent view of + // memory, e.g. + // + // atomically $ do + // [a,b] <- mapM readTVar [ta,tb] + // when (a == b) loop + // + // and a is never equal to b given a consistent view of memory. + // + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + debugTrace(DEBUG_sched | DEBUG_stm, + "trec %p found wasting its time", t); + + // strip the stack back to the + // ATOMICALLY_FRAME, aborting the (nested) + // transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + throwToSingleThreaded_(&capabilities[0], t, + NULL, rtsTrue, NULL); + +#ifdef REG_R1 + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +#endif + } + } + #if defined(PAR) /* HACK 675: if the last thread didn't yield, make sure to print a SCHEDULE event to the log file when StgRunning the next thread, even @@ -1618,7 +1645,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow\n", + "--<< thread %ld (%s) stopped: HeapOverflow", (long)t->id, whatNext_strs[t->what_next]); #if defined(GRAN) @@ -1711,7 +1738,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) IF_DEBUG(sanity, //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); - ASSERT(t->link == END_TSO_QUEUE); + ASSERT(t->_link == END_TSO_QUEUE); // Shortcut if we're just switching evaluators: don't bother // doing stack squeezing (which can be expensive), just run the @@ -1898,7 +1925,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) // point where we can deal with this. Leaving it on the run // queue also ensures that the garbage collector knows about // this thread and its return value (it gets dropped from the - // all_threads list so there's no other way to find it). + // step->threads list so there's no other way to find it). appendToRunQueue(cap,t); return rtsFalse; #else @@ -2010,51 +2037,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) waiting_for_gc = rtsFalse; #endif - /* Kick any transactions which are invalid back to their - * atomically frames. When next scheduled they will try to - * commit, this commit will fail and they will retry. - */ - { - StgTSO *next; - - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->link; - } else { - next = t->global_link; - - // This is a good place to check for blocked - // exceptions. It might be the case that a thread is - // blocked on delivering an exception to a thread that - // is also blocked - we try to ensure that this - // doesn't happen in throwTo(), but it's too hard (or - // impossible) to close all the race holes, so we - // accept that some might get through and deal with - // them here. A GC will always happen at some point, - // even if the system is otherwise deadlocked. - maybePerformBlockedException (&capabilities[0], t); - - if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateNestOfTransactions (t -> trec)) { - debugTrace(DEBUG_sched | DEBUG_stm, - "trec %p found wasting its time", t); - - // strip the stack back to the - // ATOMICALLY_FRAME, aborting the (nested) - // transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - throwToSingleThreaded_(&capabilities[0], t, - NULL, rtsTrue, NULL); - -#ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); -#endif - } - } - } - } - } - // so this happens periodically: if (cap) scheduleCheckBlackHoles(cap); @@ -2133,6 +2115,7 @@ forkProcess(HsStablePtr *entry pid_t pid; StgTSO* t,*next; Capability *cap; + nat s; #if defined(THREADED_RTS) if (RtsFlags.ParFlags.nNodes > 1) { @@ -2180,9 +2163,10 @@ forkProcess(HsStablePtr *entry // all Tasks, because they correspond to OS threads that are // now gone. - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { - next = t->link; + next = t->_link; } else { next = t->global_link; // don't allow threads to catch the ThreadKilled @@ -2190,6 +2174,7 @@ forkProcess(HsStablePtr *entry // threads may be evaluating thunks that we need later. deleteThread_(cap,t); } + } } // Empty the run queue. It seems tempting to let all the @@ -2203,9 +2188,11 @@ forkProcess(HsStablePtr *entry // don't exist now: cap->suspended_ccalling_tasks = NULL; - // Empty the all_threads list. Otherwise, the garbage + // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. - all_threads = END_TSO_QUEUE; + for (s = 0; s < total_steps; s++) { + all_steps[s].threads = END_TSO_QUEUE; + } // Wipe the task list, except the current Task. ACQUIRE_LOCK(&sched_mutex); @@ -2255,14 +2242,18 @@ deleteAllThreads ( Capability *cap ) // NOTE: only safe to call if we own all capabilities. StgTSO* t, *next; + nat s; + debugTrace(DEBUG_sched,"deleting all threads"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { - next = t->link; + next = t->_link; } else { next = t->global_link; deleteThread(cap,t); } + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2417,7 +2408,7 @@ resumeThread (void *task_) tso = task->suspended_tso; task->suspended_tso = NULL; - tso->link = END_TSO_QUEUE; + tso->_link = END_TSO_QUEUE; // no write barrier reqd debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); if (tso->why_blocked == BlockedOnCCall) { @@ -2436,7 +2427,7 @@ resumeThread (void *task_) #endif /* We might have GC'd, mark the TSO dirty again */ - dirtyTSO(tso); + dirty_TSO(cap,tso); IF_DEBUG(sanity, checkTSO(tso)); @@ -2572,7 +2563,6 @@ initScheduler(void) #endif blackhole_queue = END_TSO_QUEUE; - all_threads = END_TSO_QUEUE; context_switch = 0; sched_state = SCHED_RUNNING; @@ -2786,7 +2776,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) * dead TSO's stack. */ tso->what_next = ThreadRelocated; - tso->link = dest; + setTSOLink(cap,tso,dest); tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; @@ -2834,6 +2824,8 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) bd = Bdescr((StgPtr)tso); new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W); + new_bd->free = bd->free; + bd->free = bd->start + TSO_STRUCT_SIZEW; new_tso = (StgTSO *)new_bd->start; memcpy(new_tso,tso,TSO_STRUCT_SIZE); @@ -2934,8 +2926,8 @@ checkBlackHoles (Capability *cap) *prev = t; any_woke_up = rtsTrue; } else { - prev = &t->link; - t = t->link; + prev = &t->_link; + t = t->_link; } } @@ -3143,11 +3135,15 @@ resurrectThreads (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + step *step; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - tso->global_link = all_threads; - all_threads = tso; + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id); // Wake up the thread on the Capability it was last on @@ -3179,3 +3175,37 @@ resurrectThreads (StgTSO *threads) } } } + +/* ----------------------------------------------------------------------------- + performPendingThrowTos is called after garbage collection, and + passed a list of threads that were found to have pending throwTos + (tso->blocked_exceptions was not empty), and were blocked. + Normally this doesn't happen, because we would deliver the + exception directly if the target thread is blocked, but there are + small windows where it might occur on a multiprocessor (see + throwTo()). + + NB. we must be holding all the capabilities at this point, just + like resurrectThreads(). + -------------------------------------------------------------------------- */ + +void +performPendingThrowTos (StgTSO *threads) +{ + StgTSO *tso, *next; + Capability *cap; + step *step; + + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { + next = tso->global_link; + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + + debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); + + cap = tso->cap; + maybePerformBlockedException(cap, tso); + } +}