#include "Signals.h"
#include "Sanity.h"
#include "Stats.h"
+#include "STM.h"
#include "Timer.h"
#include "Prelude.h"
#include "ThreadLabels.h"
*/
/* flag set by signal handler to precipitate a context switch */
-nat context_switch = 0;
+int context_switch = 0;
/* if this flag is set as well, give up execution */
rtsBool interrupted = rtsFalse;
static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
void interruptStgRts ( void );
+#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
static void detectBlackHoles ( void );
+#endif
+
+static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
+ if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
+ {
#if defined(RTS_SUPPORTS_THREADS)
- || EMPTY_RUN_QUEUE()
+ // We shouldn't be here...
+ barf("schedule: awaitEvent() in threaded RTS");
#endif
- )
- {
- awaitEvent( EMPTY_RUN_QUEUE() );
+ awaitEvent( EMPTY_RUN_QUEUE() );
}
// we can be interrupted while waiting for I/O...
if (interrupted) continue;
if ( EMPTY_THREAD_QUEUES() )
{
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
+
// Garbage collection can release some new threads due to
// either (a) finalizers or (b) threads resurrected because
- // they are about to be send BlockedOnDeadMVar. Any threads
- // thus released will be immediately runnable.
+ // they are unreachable and will therefore be sent an
+ // exception. Any threads thus released will be immediately
+ // runnable.
GarbageCollect(GetRoots,rtsTrue);
-
- if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-
- IF_DEBUG(scheduler,
- sched_belch("still deadlocked, checking for black holes..."));
- detectBlackHoles();
-
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
#if defined(RTS_USER_SIGNALS)
// ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
#endif
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
+ /* win32: might be back here due to awaitEvent() being abandoned
+ * as a result of a console event having been delivered.
+ */
if ( EMPTY_RUN_QUEUE() ) {
continue; // nothing to do
}
* previously, or it's blocked on an MVar or Blackhole, in which
* case it'll be on the relevant queue already.
*/
+ ASSERT(t->why_blocked != NotBlocked);
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
#endif
if (ready_to_gc) {
+ /* Kick any transactions which are invalid back to their atomically frames.
+ * When next scheduled they will try to commit, this commit will fail and
+ * they will retry. */
+ for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+ if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateTransaction (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
+
+#ifdef REG_R1
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+#endif
+ }
+ }
+ }
+
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
stgFree(m);
}
-# ifdef RTS_SUPPORTS_THREADS
- resetTaskManagerAfterFork(); // tell startTask() and friends that
- startingWorkerThread = rtsFalse; // we have no worker threads any more
- resetWorkerWakeupPipeAfterFork();
-# endif
-
rc = rts_evalStableIO(entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",rc);
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
- /* Other threads _might_ be available for execution; signal this */
- THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
- TSO_STRUCT_SIZEW;
tso->sp = (P_)&(tso->stack) + stack_size;
+ tso->trec = NO_TREC;
+
#ifdef PROFILING
tso->prof.CCCS = CCS_MAIN;
#endif
void
scheduleThread_(StgTSO *tso)
{
- // Precondition: sched_mutex must be held.
// The thread goes at the *end* of the run-queue, to avoid possible
// starvation of any threads already on the queue.
APPEND_TO_RUN_QUEUE(tso);
- THREAD_RUNNABLE();
+ threadRunnable();
}
void
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
APPEND_TO_RUN_QUEUE(tso);
- // NB. Don't call THREAD_RUNNABLE() here, because the thread is
+ // NB. Don't call threadRunnable() here, because the thread is
// bound and only runnable by *this* OS thread, so waking up other
// workers will just slow things down.
next = bqe->link;
((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
- THREAD_RUNNABLE();
+ threadRunnable();
unblockCount(bqe, node);
/* reset blocking status after dumping event */
((StgTSO *)bqe)->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
APPEND_TO_RUN_QUEUE(tso);
- THREAD_RUNNABLE();
+ threadRunnable();
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
return next;
}
{
interrupted = 1;
context_switch = 1;
-#ifdef RTS_SUPPORTS_THREADS
- wakeBlockedWorkerThread();
-#endif
}
/* -----------------------------------------------------------------------------
case NotBlocked:
return; /* not blocked */
+ case BlockedOnSTM:
+ // Be careful: nothing to do here! We tell the scheduler that the thread
+ // is runnable and we leave it to the stack-walking code to abort the
+ // transaction while unwinding the stack. We should perhaps have a debugging
+ // test to make sure that this really happens and that the 'zombie' transaction
+ // does not get committed.
+ goto done;
+
case BlockedOnMVar:
ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
{
case BlockedOnRead:
case BlockedOnWrite:
-#if defined(mingw32_TARGET_OS)
+#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
#endif
{
switch (tso->why_blocked) {
+ case BlockedOnSTM:
+ // Be careful: nothing to do here! We tell the scheduler that the thread
+ // is runnable and we leave it to the stack-walking code to abort the
+ // transaction while unwinding the stack. We should perhaps have a debugging
+ // test to make sure that this really happens and that the 'zombie' transaction
+ // does not get committed.
+ goto done;
+
case BlockedOnMVar:
ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
{
case BlockedOnRead:
case BlockedOnWrite:
-#if defined(mingw32_TARGET_OS)
+#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
#endif
{
void
deleteThread(StgTSO *tso)
{
- raiseAsync(tso,NULL);
+ if (tso->why_blocked != BlockedOnCCall &&
+ tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ raiseAsync(tso,NULL);
+ }
}
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
void
raiseAsync(StgTSO *tso, StgClosure *exception)
{
+ raiseAsync_(tso, exception, rtsFalse);
+}
+
+static void
+raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
+{
StgRetInfoTable *info;
StgPtr sp;
// top of the stack applied to the exception.
//
// 5. If it's a STOP_FRAME, then kill the thread.
+ //
+ // NB: if we pass an ATOMICALLY_FRAME then abort the associated
+ // transaction
+
StgPtr frame;
while (info->i.type != UPDATE_FRAME
&& (info->i.type != CATCH_FRAME || exception == NULL)
- && info->i.type != STOP_FRAME) {
+ && info->i.type != STOP_FRAME
+ && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
+ {
+ if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
+ // IF we find an ATOMICALLY_FRAME then we abort the
+ // current transaction and propagate the exception. In
+ // this case (unlike ordinary exceptions) we do not care
+ // whether the transaction is valid or not because its
+ // possible validity cannot have caused the exception
+ // and will not be visible after the abort.
+ IF_DEBUG(stm,
+ debugBelch("Found atomically block delivering async exception\n"));
+ stmAbortTransaction(tso -> trec);
+ tso -> trec = stmGetEnclosingTRec(tso -> trec);
+ }
frame += stack_frame_sizeW((StgClosure *)frame);
info = get_ret_itbl((StgClosure *)frame);
}
switch (info->i.type) {
+ case ATOMICALLY_FRAME:
+ ASSERT(stop_at_atomically);
+ ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ stmCondemnTransaction(tso -> trec);
+#ifdef REG_R1
+ tso->sp = frame;
+#else
+ // R1 is not a register: the return convention for IO in
+ // this case puts the return value on the stack, so we
+ // need to set up the stack to return to the atomically
+ // frame properly...
+ tso->sp = frame - 2;
+ tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+ tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+ tso->what_next = ThreadRunGHC;
+ return;
+
case CATCH_FRAME:
// If we find a CATCH_FRAME, and we've got an exception to raise,
// then build the THUNK raise(exception), and leave it on
UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
p = next;
continue;
+
+ case ATOMICALLY_FRAME:
+ IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
+ tso->sp = p;
+ return ATOMICALLY_FRAME;
case CATCH_FRAME:
tso->sp = p;
return CATCH_FRAME;
+
+ case CATCH_STM_FRAME:
+ IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
+ tso->sp = p;
+ return CATCH_STM_FRAME;
case STOP_FRAME:
tso->sp = p;
return STOP_FRAME;
+ case CATCH_RETRY_FRAME:
default:
p = next;
continue;
}
}
+
+/* -----------------------------------------------------------------------------
+ findRetryFrameHelper
+
+ This function is called by the retry# primitive. It traverses the stack
+ leaving tso->sp referring to the frame which should handle the retry.
+
+ This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
+ or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
+
+ We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
+ despite the similar implementation.
+
+ We should not expect to see CATCH_FRAME or STOP_FRAME because those should
+ not be created within memory transactions.
+ -------------------------------------------------------------------------- */
+
+StgWord
+findRetryFrameHelper (StgTSO *tso)
+{
+ StgPtr p, next;
+ StgRetInfoTable *info;
+
+ p = tso -> sp;
+ while (1) {
+ info = get_ret_itbl((StgClosure *)p);
+ next = p + stack_frame_sizeW((StgClosure *)p);
+ switch (info->i.type) {
+
+ case ATOMICALLY_FRAME:
+ IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
+ tso->sp = p;
+ return ATOMICALLY_FRAME;
+
+ case CATCH_RETRY_FRAME:
+ IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
+ tso->sp = p;
+ return CATCH_RETRY_FRAME;
+
+ case CATCH_STM_FRAME:
+ default:
+ ASSERT(info->i.type != CATCH_FRAME);
+ ASSERT(info->i.type != STOP_FRAME);
+ p = next;
+ continue;
+ }
+ }
+}
+
/* -----------------------------------------------------------------------------
resurrectThreads is called after garbage collection on the list of
threads found to be garbage. Each of these threads will be woken
case BlockedOnBlackHole:
raiseAsync(tso,(StgClosure *)NonTermination_closure);
break;
+ case BlockedOnSTM:
+ raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
+ break;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
* belonging to a thread that we've just woken up (raiseAsync
}
}
-/* -----------------------------------------------------------------------------
- * Blackhole detection: if we reach a deadlock, test whether any
- * threads are blocked on themselves. Any threads which are found to
- * be self-blocked get sent a NonTermination exception.
- *
- * This is only done in a deadlock situation in order to avoid
- * performance overhead in the normal case.
- *
- * Locks: sched_mutex is held upon entry and exit.
- * -------------------------------------------------------------------------- */
-
-static void
-detectBlackHoles( void )
-{
- StgTSO *tso = all_threads;
- StgPtr frame;
- StgClosure *blocked_on;
- StgRetInfoTable *info;
-
- for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
-
- while (tso->what_next == ThreadRelocated) {
- tso = tso->link;
- ASSERT(get_itbl(tso)->type == TSO);
- }
-
- if (tso->why_blocked != BlockedOnBlackHole) {
- continue;
- }
- blocked_on = tso->block_info.closure;
-
- frame = tso->sp;
-
- while(1) {
- info = get_ret_itbl((StgClosure *)frame);
- switch (info->i.type) {
- case UPDATE_FRAME:
- if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
- /* We are blocking on one of our own computations, so
- * send this thread the NonTermination exception.
- */
- IF_DEBUG(scheduler,
- sched_belch("thread %d is blocked on itself", tso->id));
- raiseAsync(tso, (StgClosure *)NonTermination_closure);
- goto done;
- }
-
- frame = (StgPtr)((StgUpdateFrame *)frame + 1);
- continue;
-
- case STOP_FRAME:
- goto done;
-
- // normal stack frames; do nothing except advance the pointer
- default:
- frame += stack_frame_sizeW((StgClosure *)frame);
- }
- }
- done: ;
- }
-}
-
/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* [Also provides useful information when debugging threaded programs
case BlockedOnWrite:
debugBelch("is blocked on write to fd %d", tso->block_info.fd);
break;
-#if defined(mingw32_TARGET_OS)
+#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
break;
case BlockedOnCCall_NoUnblockExc:
debugBelch("is blocked on an external call (exceptions were already blocked)");
break;
+ case BlockedOnSTM:
+ debugBelch("is blocked on an STM operation");
+ break;
default:
barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
tso->why_blocked, tso->id, tso);
printAllThreads(void)
{
StgTSO *t;
- void *label;
# if defined(GRAN)
char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
debugBelch("\tthread %d @ %p ", t->id, (void *)t);
- label = lookupThreadLabel(t->id);
- if (label) debugBelch("[\"%s\"] ",(char *)label);
+#if defined(DEBUG)
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+#endif
printThreadStatus(t);
debugBelch("\n");
}