X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=ecd47aa41b9d9edf7729dfa1cf71c1ee3f6e609a;hb=153b9cb9b11e05c4edb1b6bc0a7b972660e41f70;hp=04e70dae731327d6028fb771b1cdbdd423f75ec6;hpb=bb01a96bea6bd7808332d43a5bed78d1aff4a3fd;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 04e70da..ecd47aa 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -55,6 +55,7 @@ #include "Signals.h" #include "Sanity.h" #include "Stats.h" +#include "STM.h" #include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" @@ -162,7 +163,7 @@ static StgTSO *threadStackOverflow(StgTSO *tso); */ /* flag set by signal handler to precipitate a context switch */ -nat context_switch = 0; +int context_switch = 0; /* if this flag is set as well, give up execution */ rtsBool interrupted = rtsFalse; @@ -220,6 +221,8 @@ static void schedule ( StgMainThread *mainThread, Capability *initi static void detectBlackHoles ( void ); #endif +static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically); + #if defined(RTS_SUPPORTS_THREADS) /* ToDo: carefully document the invariants that go together * with these synchronisation objects. @@ -539,7 +542,10 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif -#if defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS) + /* win32: might be back here due to awaitEvent() being abandoned + * as a result of a console event having been delivered. + */ if ( EMPTY_RUN_QUEUE() ) { continue; // nothing to do } @@ -1199,6 +1205,7 @@ run_thread: * previously, or it's blocked on an MVar or Blackhole, in which * case it'll be on the relevant queue already. */ + ASSERT(t->why_blocked != NotBlocked); IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1326,6 +1333,26 @@ run_thread: #endif if (ready_to_gc) { + /* Kick any transactions which are invalid back to their atomically frames. + * When next scheduled they will try to commit, this commit will fail and + * they will retry. */ + for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { + if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateTransaction (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + +#ifdef REG_R1 + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +#endif + } + } + } + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1397,7 +1424,7 @@ isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -#ifndef mingw32_TARGET_OS +#ifndef mingw32_HOST_OS #define FORKPROCESS_PRIMOP_SUPPORTED #endif @@ -1734,6 +1761,8 @@ createThread(nat size) - TSO_STRUCT_SIZEW; tso->sp = (P_)&(tso->stack) + stack_size; + tso->trec = NO_TREC; + #ifdef PROFILING tso->prof.CCCS = CCS_MAIN; #endif @@ -2659,6 +2688,14 @@ unblockThread(StgTSO *tso) case NotBlocked: return; /* not blocked */ + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2725,7 +2762,7 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: -#if defined(mingw32_TARGET_OS) +#if defined(mingw32_HOST_OS) case BlockedOnDoProc: #endif { @@ -2792,6 +2829,14 @@ unblockThread(StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2855,7 +2900,7 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: -#if defined(mingw32_TARGET_OS) +#if defined(mingw32_HOST_OS) case BlockedOnDoProc: #endif { @@ -2946,7 +2991,10 @@ unblockThread(StgTSO *tso) void deleteThread(StgTSO *tso) { - raiseAsync(tso,NULL); + if (tso->why_blocked != BlockedOnCCall && + tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + raiseAsync(tso,NULL); + } } #ifdef FORKPROCESS_PRIMOP_SUPPORTED @@ -2981,6 +3029,12 @@ raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) void raiseAsync(StgTSO *tso, StgClosure *exception) { + raiseAsync_(tso, exception, rtsFalse); +} + +static void +raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) +{ StgRetInfoTable *info; StgPtr sp; @@ -3024,6 +3078,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception) // top of the stack applied to the exception. // // 5. If it's a STOP_FRAME, then kill the thread. + // + // NB: if we pass an ATOMICALLY_FRAME then abort the associated + // transaction + StgPtr frame; @@ -3032,13 +3090,45 @@ raiseAsync(StgTSO *tso, StgClosure *exception) while (info->i.type != UPDATE_FRAME && (info->i.type != CATCH_FRAME || exception == NULL) - && info->i.type != STOP_FRAME) { + && info->i.type != STOP_FRAME + && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse)) + { + if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) { + // IF we find an ATOMICALLY_FRAME then we abort the + // current transaction and propagate the exception. In + // this case (unlike ordinary exceptions) we do not care + // whether the transaction is valid or not because its + // possible validity cannot have caused the exception + // and will not be visible after the abort. + IF_DEBUG(stm, + debugBelch("Found atomically block delivering async exception\n")); + stmAbortTransaction(tso -> trec); + tso -> trec = stmGetEnclosingTRec(tso -> trec); + } frame += stack_frame_sizeW((StgClosure *)frame); info = get_ret_itbl((StgClosure *)frame); } switch (info->i.type) { + case ATOMICALLY_FRAME: + ASSERT(stop_at_atomically); + ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + stmCondemnTransaction(tso -> trec); +#ifdef REG_R1 + tso->sp = frame; +#else + // R1 is not a register: the return convention for IO in + // this case puts the return value on the stack, so we + // need to set up the stack to return to the atomically + // frame properly... + tso->sp = frame - 2; + tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? + tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; +#endif + tso->what_next = ThreadRunGHC; + return; + case CATCH_FRAME: // If we find a CATCH_FRAME, and we've got an exception to raise, // then build the THUNK raise(exception), and leave it on @@ -3202,15 +3292,26 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); p = next; continue; + + case ATOMICALLY_FRAME: + IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p)); + tso->sp = p; + return ATOMICALLY_FRAME; case CATCH_FRAME: tso->sp = p; return CATCH_FRAME; + + case CATCH_STM_FRAME: + IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p)); + tso->sp = p; + return CATCH_STM_FRAME; case STOP_FRAME: tso->sp = p; return STOP_FRAME; + case CATCH_RETRY_FRAME: default: p = next; continue; @@ -3218,6 +3319,55 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) } } + +/* ----------------------------------------------------------------------------- + findRetryFrameHelper + + This function is called by the retry# primitive. It traverses the stack + leaving tso->sp referring to the frame which should handle the retry. + + This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) + or should be a ATOMICALLY_FRAME (if the retry# reaches the top level). + + We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions, + despite the similar implementation. + + We should not expect to see CATCH_FRAME or STOP_FRAME because those should + not be created within memory transactions. + -------------------------------------------------------------------------- */ + +StgWord +findRetryFrameHelper (StgTSO *tso) +{ + StgPtr p, next; + StgRetInfoTable *info; + + p = tso -> sp; + while (1) { + info = get_ret_itbl((StgClosure *)p); + next = p + stack_frame_sizeW((StgClosure *)p); + switch (info->i.type) { + + case ATOMICALLY_FRAME: + IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p)); + tso->sp = p; + return ATOMICALLY_FRAME; + + case CATCH_RETRY_FRAME: + IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p)); + tso->sp = p; + return CATCH_RETRY_FRAME; + + case CATCH_STM_FRAME: + default: + ASSERT(info->i.type != CATCH_FRAME); + ASSERT(info->i.type != STOP_FRAME); + p = next; + continue; + } + } +} + /* ----------------------------------------------------------------------------- resurrectThreads is called after garbage collection on the list of threads found to be garbage. Each of these threads will be woken @@ -3248,6 +3398,9 @@ resurrectThreads( StgTSO *threads ) case BlockedOnBlackHole: raiseAsync(tso,(StgClosure *)NonTermination_closure); break; + case BlockedOnSTM: + raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure); + break; case NotBlocked: /* This might happen if the thread was blocked on a black hole * belonging to a thread that we've just woken up (raiseAsync @@ -3277,7 +3430,7 @@ printThreadBlockage(StgTSO *tso) case BlockedOnWrite: debugBelch("is blocked on write to fd %d", tso->block_info.fd); break; -#if defined(mingw32_TARGET_OS) +#if defined(mingw32_HOST_OS) case BlockedOnDoProc: debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID); break; @@ -3314,6 +3467,9 @@ printThreadBlockage(StgTSO *tso) case BlockedOnCCall_NoUnblockExc: debugBelch("is blocked on an external call (exceptions were already blocked)"); break; + case BlockedOnSTM: + debugBelch("is blocked on an STM operation"); + break; default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso);