*/
rtsBool blackholes_need_checking = rtsFalse;
-/* Linked list of all threads.
- * Used for detecting garbage collected threads.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *all_threads = NULL;
-
/* flag set by signal handler to precipitate a context switch
* LOCK: none (just an advisory flag)
*/
#if defined(PAR) || defined(GRAN)
static void scheduleGranParReport(void);
#endif
-static void schedulePostRunThread(void);
+static void schedulePostRunThread(StgTSO *t);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static void scheduleHandleStackOverflow( Capability *cap, Task *task,
StgTSO *t);
}
#endif
- cap->r.rCurrentTSO = t;
-
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
run_thread:
+ // CurrentTSO is the thread to run. t might be different if we
+ // loop back to run_thread, so make sure to set CurrentTSO after
+ // that.
+ cap->r.rCurrentTSO = t;
+
debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
(long)t->id, whatNext_strs[t->what_next]);
CCCS = CCS_SYSTEM;
#endif
- schedulePostRunThread();
+ schedulePostRunThread(t);
t = threadStackUnderflow(task,t);
// either we have threads to run, or we were interrupted:
ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
+
+ return;
}
#endif
case BlockedOnException:
case BlockedOnMVar:
throwToSingleThreaded(cap, task->tso,
- (StgClosure *)NonTermination_closure);
+ (StgClosure *)nonTermination_closure);
return;
default:
barf("deadlock: main thread blocked in a strange way");
* ------------------------------------------------------------------------- */
static void
-schedulePostRunThread(void)
+schedulePostRunThread (StgTSO *t)
{
+ // We have to be able to catch transactions that are in an
+ // infinite loop as a result of seeing an inconsistent view of
+ // memory, e.g.
+ //
+ // atomically $ do
+ // [a,b] <- mapM readTVar [ta,tb]
+ // when (a == b) loop
+ //
+ // and a is never equal to b given a consistent view of memory.
+ //
+ if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateNestOfTransactions (t -> trec)) {
+ debugTrace(DEBUG_sched | DEBUG_stm,
+ "trec %p found wasting its time", t);
+
+ // strip the stack back to the
+ // ATOMICALLY_FRAME, aborting the (nested)
+ // transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ throwToSingleThreaded_(&capabilities[0], t,
+ NULL, rtsTrue, NULL);
+
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+ }
+ }
+
#if defined(PAR)
/* HACK 675: if the last thread didn't yield, make sure to print a
SCHEDULE event to the log file when StgRunning the next thread, even
}
debugTrace(DEBUG_sched,
- "--<< thread %ld (%s) stopped: HeapOverflow\n",
+ "--<< thread %ld (%s) stopped: HeapOverflow",
(long)t->id, whatNext_strs[t->what_next]);
#if defined(GRAN)
// point where we can deal with this. Leaving it on the run
// queue also ensures that the garbage collector knows about
// this thread and its return value (it gets dropped from the
- // all_threads list so there's no other way to find it).
+ // step->threads list so there's no other way to find it).
appendToRunQueue(cap,t);
return rtsFalse;
#else
waiting_for_gc = rtsFalse;
#endif
- /* Kick any transactions which are invalid back to their
- * atomically frames. When next scheduled they will try to
- * commit, this commit will fail and they will retry.
- */
- {
- StgTSO *next;
-
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- if (t->what_next == ThreadRelocated) {
- next = t->_link;
- } else {
- next = t->global_link;
-
- // This is a good place to check for blocked
- // exceptions. It might be the case that a thread is
- // blocked on delivering an exception to a thread that
- // is also blocked - we try to ensure that this
- // doesn't happen in throwTo(), but it's too hard (or
- // impossible) to close all the race holes, so we
- // accept that some might get through and deal with
- // them here. A GC will always happen at some point,
- // even if the system is otherwise deadlocked.
- maybePerformBlockedException (&capabilities[0], t);
-
- if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateNestOfTransactions (t -> trec)) {
- debugTrace(DEBUG_sched | DEBUG_stm,
- "trec %p found wasting its time", t);
-
- // strip the stack back to the
- // ATOMICALLY_FRAME, aborting the (nested)
- // transaction, and saving the stack of any
- // partially-evaluated thunks on the heap.
- throwToSingleThreaded_(&capabilities[0], t,
- NULL, rtsTrue, NULL);
-
-#ifdef REG_R1
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
-#endif
- }
- }
- }
- }
- }
-
// so this happens periodically:
if (cap) scheduleCheckBlackHoles(cap);
pid_t pid;
StgTSO* t,*next;
Capability *cap;
+ nat s;
#if defined(THREADED_RTS)
if (RtsFlags.ParFlags.nNodes > 1) {
// all Tasks, because they correspond to OS threads that are
// now gone.
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ for (s = 0; s < total_steps; s++) {
+ for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->_link;
} else {
// threads may be evaluating thunks that we need later.
deleteThread_(cap,t);
}
+ }
}
// Empty the run queue. It seems tempting to let all the
// don't exist now:
cap->suspended_ccalling_tasks = NULL;
- // Empty the all_threads list. Otherwise, the garbage
+ // Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
- all_threads = END_TSO_QUEUE;
+ for (s = 0; s < total_steps; s++) {
+ all_steps[s].threads = END_TSO_QUEUE;
+ }
// Wipe the task list, except the current Task.
ACQUIRE_LOCK(&sched_mutex);
// NOTE: only safe to call if we own all capabilities.
StgTSO* t, *next;
+ nat s;
+
debugTrace(DEBUG_sched,"deleting all threads");
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ for (s = 0; s < total_steps; s++) {
+ for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->_link;
} else {
next = t->global_link;
deleteThread(cap,t);
}
+ }
}
// The run queue now contains a bunch of ThreadKilled threads. We
#endif
blackhole_queue = END_TSO_QUEUE;
- all_threads = END_TSO_QUEUE;
context_switch = 0;
sched_state = SCHED_RUNNING;
bd = Bdescr((StgPtr)tso);
new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
+ new_bd->free = bd->free;
+ bd->free = bd->start + TSO_STRUCT_SIZEW;
new_tso = (StgTSO *)new_bd->start;
memcpy(new_tso,tso,TSO_STRUCT_SIZE);
{
StgTSO *tso, *next;
Capability *cap;
+ step *step;
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
- tso->global_link = all_threads;
- all_threads = tso;
+
+ step = Bdescr((P_)tso)->step;
+ tso->global_link = step->threads;
+ step->threads = tso;
+
debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
// Wake up the thread on the Capability it was last on
case BlockedOnException:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
- (StgClosure *)BlockedOnDeadMVar_closure);
+ (StgClosure *)blockedOnDeadMVar_closure);
break;
case BlockedOnBlackHole:
throwToSingleThreaded(cap, tso,
- (StgClosure *)NonTermination_closure);
+ (StgClosure *)nonTermination_closure);
break;
case BlockedOnSTM:
throwToSingleThreaded(cap, tso,
- (StgClosure *)BlockedIndefinitely_closure);
+ (StgClosure *)blockedIndefinitely_closure);
break;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
}
}
}
+
+/* -----------------------------------------------------------------------------
+ performPendingThrowTos is called after garbage collection, and
+ passed a list of threads that were found to have pending throwTos
+ (tso->blocked_exceptions was not empty), and were blocked.
+ Normally this doesn't happen, because we would deliver the
+ exception directly if the target thread is blocked, but there are
+ small windows where it might occur on a multiprocessor (see
+ throwTo()).
+
+ NB. we must be holding all the capabilities at this point, just
+ like resurrectThreads().
+ -------------------------------------------------------------------------- */
+
+void
+performPendingThrowTos (StgTSO *threads)
+{
+ StgTSO *tso, *next;
+ Capability *cap;
+ step *step;
+
+ for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
+ next = tso->global_link;
+
+ step = Bdescr((P_)tso)->step;
+ tso->global_link = step->threads;
+ step->threads = tso;
+
+ debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
+
+ cap = tso->cap;
+ maybePerformBlockedException(cap, tso);
+ }
+}