X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=ecd47aa41b9d9edf7729dfa1cf71c1ee3f6e609a;hb=153b9cb9b11e05c4edb1b6bc0a7b972660e41f70;hp=9e2a5d053a481490f2859708d317994387854dc6;hpb=95ca6bff6fc9918203173b442192d9298ef9757a;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 9e2a5d0..ecd47aa 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -55,6 +55,7 @@ #include "Signals.h" #include "Sanity.h" #include "Stats.h" +#include "STM.h" #include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" @@ -162,7 +163,7 @@ static StgTSO *threadStackOverflow(StgTSO *tso); */ /* flag set by signal handler to precipitate a context switch */ -nat context_switch = 0; +int context_switch = 0; /* if this flag is set as well, give up execution */ rtsBool interrupted = rtsFalse; @@ -216,7 +217,11 @@ void addToBlockedQueue ( StgTSO *tso ); static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); +#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS) static void detectBlackHoles ( void ); +#endif + +static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically); #if defined(RTS_SUPPORTS_THREADS) /* ToDo: carefully document the invariants that go together @@ -451,13 +456,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // run queue is empty, and there are no other tasks running, we // can wait indefinitely for something to happen. // - if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) + if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) + { #if defined(RTS_SUPPORTS_THREADS) - || EMPTY_RUN_QUEUE() + // We shouldn't be here... + barf("schedule: awaitEvent() in threaded RTS"); #endif - ) - { - awaitEvent( EMPTY_RUN_QUEUE() ); + awaitEvent( EMPTY_RUN_QUEUE() ); } // we can be interrupted while waiting for I/O... if (interrupted) continue; @@ -477,18 +482,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, if ( EMPTY_THREAD_QUEUES() ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); + // Garbage collection can release some new threads due to // either (a) finalizers or (b) threads resurrected because - // they are about to be send BlockedOnDeadMVar. Any threads - // thus released will be immediately runnable. + // they are unreachable and will therefore be sent an + // exception. Any threads thus released will be immediately + // runnable. GarbageCollect(GetRoots,rtsTrue); - - if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } - - IF_DEBUG(scheduler, - sched_belch("still deadlocked, checking for black holes...")); - detectBlackHoles(); - if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } #if defined(RTS_USER_SIGNALS) @@ -542,7 +542,10 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif -#if defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS) + /* win32: might be back here due to awaitEvent() being abandoned + * as a result of a console event having been delivered. + */ if ( EMPTY_RUN_QUEUE() ) { continue; // nothing to do } @@ -1202,6 +1205,7 @@ run_thread: * previously, or it's blocked on an MVar or Blackhole, in which * case it'll be on the relevant queue already. */ + ASSERT(t->why_blocked != NotBlocked); IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1329,6 +1333,26 @@ run_thread: #endif if (ready_to_gc) { + /* Kick any transactions which are invalid back to their atomically frames. + * When next scheduled they will try to commit, this commit will fail and + * they will retry. */ + for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { + if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateTransaction (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + +#ifdef REG_R1 + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +#endif + } + } + } + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1400,7 +1424,7 @@ isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -#ifndef mingw32_TARGET_OS +#ifndef mingw32_HOST_OS #define FORKPROCESS_PRIMOP_SUPPORTED #endif @@ -1455,12 +1479,6 @@ forkProcess(HsStablePtr *entry stgFree(m); } -# ifdef RTS_SUPPORTS_THREADS - resetTaskManagerAfterFork(); // tell startTask() and friends that - startingWorkerThread = rtsFalse; // we have no worker threads any more - resetWorkerWakeupPipeAfterFork(); -# endif - rc = rts_evalStableIO(entry, NULL); // run the action rts_checkSchedStatus("forkProcess",rc); @@ -1566,8 +1584,6 @@ suspendThread( StgRegTable *reg ) IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); #endif - /* Other threads _might_ be available for execution; signal this */ - THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); errno = saved_errno; @@ -1745,6 +1761,8 @@ createThread(nat size) - TSO_STRUCT_SIZEW; tso->sp = (P_)&(tso->stack) + stack_size; + tso->trec = NO_TREC; + #ifdef PROFILING tso->prof.CCCS = CCS_MAIN; #endif @@ -1931,11 +1949,10 @@ static void scheduleThread_ (StgTSO* tso); void scheduleThread_(StgTSO *tso) { - // Precondition: sched_mutex must be held. // The thread goes at the *end* of the run-queue, to avoid possible // starvation of any threads already on the queue. APPEND_TO_RUN_QUEUE(tso); - THREAD_RUNNABLE(); + threadRunnable(); } void @@ -1995,7 +2012,7 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id)); APPEND_TO_RUN_QUEUE(tso); - // NB. Don't call THREAD_RUNNABLE() here, because the thread is + // NB. Don't call threadRunnable() here, because the thread is // bound and only runnable by *this* OS thread, so waking up other // workers will just slow things down. @@ -2426,7 +2443,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) next = bqe->link; ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? APPEND_TO_RUN_QUEUE((StgTSO *)bqe); - THREAD_RUNNABLE(); + threadRunnable(); unblockCount(bqe, node); /* reset blocking status after dumping event */ ((StgTSO *)bqe)->why_blocked = NotBlocked; @@ -2471,7 +2488,7 @@ unblockOneLocked(StgTSO *tso) next = tso->link; tso->link = END_TSO_QUEUE; APPEND_TO_RUN_QUEUE(tso); - THREAD_RUNNABLE(); + threadRunnable(); IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id)); return next; } @@ -2642,9 +2659,6 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; -#ifdef RTS_SUPPORTS_THREADS - wakeBlockedWorkerThread(); -#endif } /* ----------------------------------------------------------------------------- @@ -2674,6 +2688,14 @@ unblockThread(StgTSO *tso) case NotBlocked: return; /* not blocked */ + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2740,7 +2762,7 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: -#if defined(mingw32_TARGET_OS) +#if defined(mingw32_HOST_OS) case BlockedOnDoProc: #endif { @@ -2807,6 +2829,14 @@ unblockThread(StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2870,7 +2900,7 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: -#if defined(mingw32_TARGET_OS) +#if defined(mingw32_HOST_OS) case BlockedOnDoProc: #endif { @@ -2961,7 +2991,10 @@ unblockThread(StgTSO *tso) void deleteThread(StgTSO *tso) { - raiseAsync(tso,NULL); + if (tso->why_blocked != BlockedOnCCall && + tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + raiseAsync(tso,NULL); + } } #ifdef FORKPROCESS_PRIMOP_SUPPORTED @@ -2996,6 +3029,12 @@ raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) void raiseAsync(StgTSO *tso, StgClosure *exception) { + raiseAsync_(tso, exception, rtsFalse); +} + +static void +raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) +{ StgRetInfoTable *info; StgPtr sp; @@ -3039,6 +3078,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception) // top of the stack applied to the exception. // // 5. If it's a STOP_FRAME, then kill the thread. + // + // NB: if we pass an ATOMICALLY_FRAME then abort the associated + // transaction + StgPtr frame; @@ -3047,13 +3090,45 @@ raiseAsync(StgTSO *tso, StgClosure *exception) while (info->i.type != UPDATE_FRAME && (info->i.type != CATCH_FRAME || exception == NULL) - && info->i.type != STOP_FRAME) { + && info->i.type != STOP_FRAME + && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse)) + { + if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) { + // IF we find an ATOMICALLY_FRAME then we abort the + // current transaction and propagate the exception. In + // this case (unlike ordinary exceptions) we do not care + // whether the transaction is valid or not because its + // possible validity cannot have caused the exception + // and will not be visible after the abort. + IF_DEBUG(stm, + debugBelch("Found atomically block delivering async exception\n")); + stmAbortTransaction(tso -> trec); + tso -> trec = stmGetEnclosingTRec(tso -> trec); + } frame += stack_frame_sizeW((StgClosure *)frame); info = get_ret_itbl((StgClosure *)frame); } switch (info->i.type) { + case ATOMICALLY_FRAME: + ASSERT(stop_at_atomically); + ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + stmCondemnTransaction(tso -> trec); +#ifdef REG_R1 + tso->sp = frame; +#else + // R1 is not a register: the return convention for IO in + // this case puts the return value on the stack, so we + // need to set up the stack to return to the atomically + // frame properly... + tso->sp = frame - 2; + tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? + tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; +#endif + tso->what_next = ThreadRunGHC; + return; + case CATCH_FRAME: // If we find a CATCH_FRAME, and we've got an exception to raise, // then build the THUNK raise(exception), and leave it on @@ -3217,15 +3292,26 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); p = next; continue; + + case ATOMICALLY_FRAME: + IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p)); + tso->sp = p; + return ATOMICALLY_FRAME; case CATCH_FRAME: tso->sp = p; return CATCH_FRAME; + + case CATCH_STM_FRAME: + IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p)); + tso->sp = p; + return CATCH_STM_FRAME; case STOP_FRAME: tso->sp = p; return STOP_FRAME; + case CATCH_RETRY_FRAME: default: p = next; continue; @@ -3233,6 +3319,55 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) } } + +/* ----------------------------------------------------------------------------- + findRetryFrameHelper + + This function is called by the retry# primitive. It traverses the stack + leaving tso->sp referring to the frame which should handle the retry. + + This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) + or should be a ATOMICALLY_FRAME (if the retry# reaches the top level). + + We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions, + despite the similar implementation. + + We should not expect to see CATCH_FRAME or STOP_FRAME because those should + not be created within memory transactions. + -------------------------------------------------------------------------- */ + +StgWord +findRetryFrameHelper (StgTSO *tso) +{ + StgPtr p, next; + StgRetInfoTable *info; + + p = tso -> sp; + while (1) { + info = get_ret_itbl((StgClosure *)p); + next = p + stack_frame_sizeW((StgClosure *)p); + switch (info->i.type) { + + case ATOMICALLY_FRAME: + IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p)); + tso->sp = p; + return ATOMICALLY_FRAME; + + case CATCH_RETRY_FRAME: + IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p)); + tso->sp = p; + return CATCH_RETRY_FRAME; + + case CATCH_STM_FRAME: + default: + ASSERT(info->i.type != CATCH_FRAME); + ASSERT(info->i.type != STOP_FRAME); + p = next; + continue; + } + } +} + /* ----------------------------------------------------------------------------- resurrectThreads is called after garbage collection on the list of threads found to be garbage. Each of these threads will be woken @@ -3263,6 +3398,9 @@ resurrectThreads( StgTSO *threads ) case BlockedOnBlackHole: raiseAsync(tso,(StgClosure *)NonTermination_closure); break; + case BlockedOnSTM: + raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure); + break; case NotBlocked: /* This might happen if the thread was blocked on a black hole * belonging to a thread that we've just woken up (raiseAsync @@ -3275,68 +3413,6 @@ resurrectThreads( StgTSO *threads ) } } -/* ----------------------------------------------------------------------------- - * Blackhole detection: if we reach a deadlock, test whether any - * threads are blocked on themselves. Any threads which are found to - * be self-blocked get sent a NonTermination exception. - * - * This is only done in a deadlock situation in order to avoid - * performance overhead in the normal case. - * - * Locks: sched_mutex is held upon entry and exit. - * -------------------------------------------------------------------------- */ - -static void -detectBlackHoles( void ) -{ - StgTSO *tso = all_threads; - StgPtr frame; - StgClosure *blocked_on; - StgRetInfoTable *info; - - for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) { - - while (tso->what_next == ThreadRelocated) { - tso = tso->link; - ASSERT(get_itbl(tso)->type == TSO); - } - - if (tso->why_blocked != BlockedOnBlackHole) { - continue; - } - blocked_on = tso->block_info.closure; - - frame = tso->sp; - - while(1) { - info = get_ret_itbl((StgClosure *)frame); - switch (info->i.type) { - case UPDATE_FRAME: - if (((StgUpdateFrame *)frame)->updatee == blocked_on) { - /* We are blocking on one of our own computations, so - * send this thread the NonTermination exception. - */ - IF_DEBUG(scheduler, - sched_belch("thread %d is blocked on itself", tso->id)); - raiseAsync(tso, (StgClosure *)NonTermination_closure); - goto done; - } - - frame = (StgPtr)((StgUpdateFrame *)frame + 1); - continue; - - case STOP_FRAME: - goto done; - - // normal stack frames; do nothing except advance the pointer - default: - frame += stack_frame_sizeW((StgClosure *)frame); - } - } - done: ; - } -} - /* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * [Also provides useful information when debugging threaded programs @@ -3354,7 +3430,7 @@ printThreadBlockage(StgTSO *tso) case BlockedOnWrite: debugBelch("is blocked on write to fd %d", tso->block_info.fd); break; -#if defined(mingw32_TARGET_OS) +#if defined(mingw32_HOST_OS) case BlockedOnDoProc: debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID); break; @@ -3391,6 +3467,9 @@ printThreadBlockage(StgTSO *tso) case BlockedOnCCall_NoUnblockExc: debugBelch("is blocked on an external call (exceptions were already blocked)"); break; + case BlockedOnSTM: + debugBelch("is blocked on an STM operation"); + break; default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso); @@ -3417,7 +3496,6 @@ void printAllThreads(void) { StgTSO *t; - void *label; # if defined(GRAN) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; @@ -3437,8 +3515,12 @@ printAllThreads(void) for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { debugBelch("\tthread %d @ %p ", t->id, (void *)t); - label = lookupThreadLabel(t->id); - if (label) debugBelch("[\"%s\"] ",(char *)label); +#if defined(DEBUG) + { + void *label = lookupThreadLabel(t->id); + if (label) debugBelch("[\"%s\"] ",(char *)label); + } +#endif printThreadStatus(t); debugBelch("\n"); }