#include "Proftimer.h"
#include "ProfHeap.h"
#include "Weak.h"
-#include "eventlog/EventLog.h"
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
#include "Sparks.h"
#include "Capability.h"
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task);
+static void scheduleYield (Capability **pcap, Task *task, rtsBool);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
-#ifdef DEBUG
-static char *whatNext_strs[] = {
- [0] = "(unknown)",
- [ThreadRunGHC] = "ThreadRunGHC",
- [ThreadInterpret] = "ThreadInterpret",
- [ThreadKilled] = "ThreadKilled",
- [ThreadRelocated] = "ThreadRelocated",
- [ThreadComplete] = "ThreadComplete"
-};
-#endif
-
/* -----------------------------------------------------------------------------
* Putting a thread on the run queue: different scheduling policies
* -------------------------------------------------------------------------- */
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
+ rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
// The sched_mutex is *NOT* held
// NB. on return, we still hold a capability.
- debugTrace (DEBUG_sched,
- "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
- task, initialCapability);
+ debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
schedulePreLoop();
}
yield:
- scheduleYield(&cap,task);
+ scheduleYield(&cap,task,force_yield);
+ force_yield = rtsFalse;
+
if (emptyRunQueue(cap)) continue; // look for work again
#endif
if (bound) {
if (bound == task) {
- debugTrace(DEBUG_sched,
- "### Running thread %lu in bound thread", (unsigned long)t->id);
// yes, the Haskell thread is bound to the current native thread
} else {
debugTrace(DEBUG_sched,
- "### thread %lu bound to another OS thread", (unsigned long)t->id);
+ "thread %lu bound to another OS thread",
+ (unsigned long)t->id);
// no, bound to a different Haskell thread: pass to that thread
pushOnRunQueue(cap,t);
continue;
// The thread we want to run is unbound.
if (task->tso) {
debugTrace(DEBUG_sched,
- "### this OS thread cannot run thread %lu", (unsigned long)t->id);
+ "this OS thread cannot run thread %lu",
+ (unsigned long)t->id);
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
pushOnRunQueue(cap,t);
// that.
cap->r.rCurrentTSO = t;
- debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
- (long)t->id, whatNext_strs[t->what_next]);
-
startHeapProfTimer();
// Check for exceptions blocked on this thread
}
#endif
- postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
+ traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0);
switch (prev_what_next) {
t->saved_winerror = GetLastError();
#endif
- postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
+ traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret);
#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// that task->cap != cap. We better yield this Capability
// immediately and return to normaility.
if (ret == ThreadBlocked) {
- debugTrace(DEBUG_sched,
- "--<< thread %lu (%s) stopped: blocked",
- (unsigned long)t->id, whatNext_strs[t->what_next]);
+ force_yield = rtsTrue;
goto yield;
}
#endif
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task)
+scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
- if (!shouldYieldCapability(cap,task) &&
+ //
+ // The force_yield flag is used when a bound thread blocks. This
+ // is a particularly tricky situation: the current Task does not
+ // own the TSO any more, since it is on some queue somewhere, and
+ // might be woken up or manipulated by another thread at any time.
+ // The TSO and Task might be migrated to another Capability.
+ // Certain invariants might be in doubt, such as task->bound->cap
+ // == cap. We have to yield the current Capability immediately,
+ // no messing around.
+ //
+ if (!force_yield &&
+ !shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyWakeupQueue(cap) ||
blackholes_need_checking ||
debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
appendToRunQueue(free_caps[i],t);
- postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
+ traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no);
if (t->bound) { t->bound->cap = free_caps[i]; }
t->cap = free_caps[i];
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
- postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
+ traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no);
newSpark(&(free_caps[i]->r), spark);
}
// partially-evaluated thunks on the heap.
throwToSingleThreaded_(cap, t, NULL, rtsTrue);
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+// ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
}
debugTrace(DEBUG_sched,
"--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
- (long)t->id, whatNext_strs[t->what_next], blocks);
+ (long)t->id, what_next_strs[t->what_next], blocks);
// don't do this if the nursery is (nearly) full, we'll GC first.
if (cap->r.rCurrentNursery->link != NULL ||
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
-#if !defined(THREADED_RTS)
- ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
- g0s0 == cap->r.rNursery);
-#endif
cap->r.rNursery->blocks = bd;
}
cap->r.rCurrentNursery->u.back = bd;
}
}
- debugTrace(DEBUG_sched,
- "--<< thread %ld (%s) stopped: HeapOverflow",
- (long)t->id, whatNext_strs[t->what_next]);
-
if (cap->r.rHpLim == NULL || cap->context_switch) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
static void
scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
{
- debugTrace (DEBUG_sched,
- "--<< thread %ld (%s) stopped, StackOverflow",
- (long)t->id, whatNext_strs[t->what_next]);
-
/* just adjust the stack for this thread, then pop it back
* on the run queue.
*/
if (t->what_next != prev_what_next) {
debugTrace(DEBUG_sched,
"--<< thread %ld (%s) stopped to switch evaluators",
- (long)t->id, whatNext_strs[t->what_next]);
- } else {
- debugTrace(DEBUG_sched,
- "--<< thread %ld (%s) stopped, yielding",
- (long)t->id, whatNext_strs[t->what_next]);
+ (long)t->id, what_next_strs[t->what_next]);
}
#endif
// exception, see maybePerformBlockedException().
#ifdef DEBUG
- if (traceClass(DEBUG_sched)) {
- debugTraceBegin("--<< thread %lu (%s) stopped: ",
- (unsigned long)t->id, whatNext_strs[t->what_next]);
- printThreadBlockage(t);
- debugTraceEnd();
- }
+ traceThreadStatus(DEBUG_sched, t);
#endif
}
* We also end up here if the thread kills itself with an
* uncaught exception, see Exception.cmm.
*/
- debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
- (unsigned long)t->id, whatNext_strs[t->what_next]);
// blocked exceptions can now complete, even if the thread was in
// blocked mode (see #2910). This unconditionally calls
if (gc_type == PENDING_GC_SEQ)
{
- postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+ traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+ }
+ else
+ {
+ traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
+ }
+
+ // do this while the other Capabilities stop:
+ if (cap) scheduleCheckBlackHoles(cap);
+
+ if (gc_type == PENDING_GC_SEQ)
+ {
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
{
// multi-threaded GC: make sure all the Capabilities donate one
// GC thread each.
- postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
-
waitForGcThreads(cap);
}
-#endif
- // so this happens periodically:
+#else /* !THREADED_RTS */
+
+ // do this while the other Capabilities stop:
if (cap) scheduleCheckBlackHoles(cap);
-
+
+#endif
+
IF_DEBUG(scheduler, printAllThreads());
delete_threads_and_gc:
heap_census = scheduleNeedHeapProfile(rtsTrue);
#if defined(THREADED_RTS)
- postEvent(cap, EVENT_GC_START, 0, 0);
- debugTrace(DEBUG_sched, "doing GC");
+ traceSchedEvent(cap, EVENT_GC_START, 0, 0);
// reset waiting_for_gc *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
waiting_for_gc = 0;
#else
GarbageCollect(force_major || heap_census, 0, cap);
#endif
- postEvent(cap, EVENT_GC_END, 0, 0);
+ traceSchedEvent(cap, EVENT_GC_END, 0, 0);
if (recent_activity == ACTIVITY_INACTIVE && force_major)
{
task = cap->running_task;
tso = cap->r.rCurrentTSO;
- postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
- debugTrace(DEBUG_sched,
- "thread %lu did a safe foreign call",
- (unsigned long)cap->r.rCurrentTSO->id);
+ traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL);
// XXX this might not be necessary --SDM
tso->what_next = ThreadRunGHC;
RELEASE_LOCK(&cap->lock);
-#if defined(THREADED_RTS)
- /* Preparing to leave the RTS, so ensure there's a native thread/task
- waiting to take over.
- */
- debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
-#endif
-
errno = saved_errno;
#if mingw32_HOST_OS
SetLastError(saved_winerror);
task->suspended_tso = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
- postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
- debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
+ traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next);
if (tso->why_blocked == BlockedOnCCall) {
// avoid locking the TSO if we don't have to
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
+ traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
case CATCH_STM_FRAME: {
StgTRecHeader *trec = tso -> trec;
- StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ StgTRecHeader *outer = trec -> enclosing_trec;
debugTrace(DEBUG_stm,
"found CATCH_STM_FRAME at %p during retry", p);
debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
switch (tso->why_blocked) {
case BlockedOnMVar:
- case BlockedOnException:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
- (StgClosure *)blockedOnDeadMVar_closure);
+ (StgClosure *)blockedIndefinitelyOnMVar_closure);
break;
case BlockedOnBlackHole:
throwToSingleThreaded(cap, tso,
break;
case BlockedOnSTM:
throwToSingleThreaded(cap, tso,
- (StgClosure *)blockedIndefinitely_closure);
+ (StgClosure *)blockedIndefinitelyOnSTM_closure);
break;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
* can wake up threads, remember...).
*/
continue;
+ case BlockedOnException:
+ // throwTo should never block indefinitely: if the target
+ // thread dies or completes, throwTo returns.
+ barf("resurrectThreads: thread BlockedOnException");
+ break;
default:
barf("resurrectThreads: thread blocked in a strange way");
}
{
StgTSO *tso, *next;
Capability *cap;
+ Task *task, *saved_task;;
step *step;
+ task = myTask();
+ cap = task->cap;
+
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
- cap = tso->cap;
- maybePerformBlockedException(cap, tso);
- }
+ // We must pretend this Capability belongs to the current Task
+ // for the time being, as invariants will be broken otherwise.
+ // In fact the current Task has exclusive access to the systme
+ // at this point, so this is just bookkeeping:
+ task->cap = tso->cap;
+ saved_task = tso->cap->running_task;
+ tso->cap->running_task = task;
+ maybePerformBlockedException(tso->cap, tso);
+ tso->cap->running_task = saved_task;
+ }
+
+ // Restore our original Capability:
+ task->cap = cap;
}