#include "Interpreter.h"
#include "Printer.h"
#include "RtsSignals.h"
-#include "Sanity.h"
+#include "sm/Sanity.h"
#include "Stats.h"
#include "STM.h"
#include "Prelude.h"
static rtsBool checkBlackHoles(Capability *cap);
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
-static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
+static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
if (prev == ACTIVITY_DONE_GC) {
startTimer();
}
- } else {
+ } else if (recent_activity != ACTIVITY_INACTIVE) {
+ // If we reached ACTIVITY_INACTIVE, then don't reset it until
+ // we've done the GC. The thread running here might just be
+ // the IO manager thread that handle_tick() woke up via
+ // wakeUpRts().
recent_activity = ACTIVITY_YES;
}
#endif
- traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0);
+ traceEventRunThread(cap, t);
switch (prev_what_next) {
t->saved_winerror = GetLastError();
#endif
- traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret);
+ traceEventStopThread(cap, t, ret);
#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
schedulePostRunThread(cap,t);
if (ret != StackOverflow) {
- t = threadStackUnderflow(task,t);
+ t = threadStackUnderflow(cap,task,t);
}
ready_to_gc = rtsFalse;
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
- // migration can be turned off with +RTS -qg
+ // migration can be turned off with +RTS -qm
if (!RtsFlags.ParFlags.migrate) return;
// Check whether we have more threads on our run queue, or sparks
setTSOLink(cap, prev, t);
prev = t;
} else {
- debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
appendToRunQueue(free_caps[i],t);
- traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no);
+ traceEventMigrateThread (cap, t, free_caps[i]->no);
if (t->bound) { t->bound->cap = free_caps[i]; }
t->cap = free_caps[i];
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
- traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no);
+ traceEventStealSpark(free_caps[i], t, cap->no);
newSpark(&(free_caps[i]->r), spark);
}
// partially-evaluated thunks on the heap.
throwToSingleThreaded_(cap, t, NULL, rtsTrue);
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+// ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
}
{
bdescr *x;
for (x = bd; x < bd + blocks; x++) {
- x->step = cap->r.rNursery;
- x->gen_no = 0;
+ initBdescr(x,g0,g0);
+ x->free = x->start;
x->flags = 0;
}
}
}
#endif
- IF_DEBUG(sanity,
- //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
- checkTSO(t));
ASSERT(t->_link == END_TSO_QUEUE);
// Shortcut if we're just switching evaluators: don't bother
return rtsTrue;
}
+ IF_DEBUG(sanity,
+ //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
+ checkTSO(t));
+
addToRunQueue(cap,t);
return rtsFalse;
if (sched_state < SCHED_INTERRUPTING
&& RtsFlags.ParFlags.parGcEnabled
&& N >= RtsFlags.ParFlags.parGcGen
- && ! oldest_gen->steps[0].mark)
+ && ! oldest_gen->mark)
{
gc_type = PENDING_GC_PAR;
} else {
if (gc_type == PENDING_GC_SEQ)
{
- traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+ traceEventRequestSeqGc(cap);
+ }
+ else
+ {
+ traceEventRequestParGc(cap);
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
+ }
+
+ // do this while the other Capabilities stop:
+ if (cap) scheduleCheckBlackHoles(cap);
+
+ if (gc_type == PENDING_GC_SEQ)
+ {
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
{
// multi-threaded GC: make sure all the Capabilities donate one
// GC thread each.
- traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
-
waitForGcThreads(cap);
}
-#endif
- // so this happens periodically:
+#else /* !THREADED_RTS */
+
+ // do this while the other Capabilities stop:
if (cap) scheduleCheckBlackHoles(cap);
-
+
+#endif
+
IF_DEBUG(scheduler, printAllThreads());
delete_threads_and_gc:
heap_census = scheduleNeedHeapProfile(rtsTrue);
+ traceEventGcStart(cap);
#if defined(THREADED_RTS)
- traceSchedEvent(cap, EVENT_GC_START, 0, 0);
// reset waiting_for_gc *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
waiting_for_gc = 0;
#else
GarbageCollect(force_major || heap_census, 0, cap);
#endif
- traceSchedEvent(cap, EVENT_GC_END, 0, 0);
+ traceEventGcEnd(cap);
if (recent_activity == ACTIVITY_INACTIVE && force_major)
{
pid_t pid;
StgTSO* t,*next;
Capability *cap;
- nat s;
+ nat g;
#if defined(THREADED_RTS)
if (RtsFlags.ParFlags.nNodes > 1) {
// all Tasks, because they correspond to OS threads that are
// now gone.
- for (s = 0; s < total_steps; s++) {
- for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->_link;
} else {
// Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
- for (s = 0; s < total_steps; s++) {
- all_steps[s].threads = END_TSO_QUEUE;
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ generations[g].threads = END_TSO_QUEUE;
}
// Wipe the task list, except the current Task.
initTimer();
startTimer();
+#if defined(THREADED_RTS)
+ cap = ioManagerStartCap(cap);
+#endif
+
cap = rts_evalStableIO(cap, entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",cap);
// NOTE: only safe to call if we own all capabilities.
StgTSO* t, *next;
- nat s;
+ nat g;
debugTrace(DEBUG_sched,"deleting all threads");
- for (s = 0; s < total_steps; s++) {
- for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
- if (t->what_next == ThreadRelocated) {
- next = t->_link;
- } else {
- next = t->global_link;
- deleteThread(cap,t);
- }
- }
- }
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->_link;
+ } else {
+ next = t->global_link;
+ deleteThread(cap,t);
+ }
+ }
+ }
// The run queue now contains a bunch of ThreadKilled threads. We
// must not throw these away: the main thread(s) will be in there
task = cap->running_task;
tso = cap->r.rCurrentTSO;
- traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL);
+ traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL);
// XXX this might not be necessary --SDM
tso->what_next = ThreadRunGHC;
task->suspended_tso = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
- traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next);
+ traceEventRunThread(cap, tso);
if (tso->why_blocked == BlockedOnCCall) {
// avoid locking the TSO if we don't have to
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no);
+ traceEventMigrateThread (cap, tso, capabilities[cpu].no);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
for (i = 0; i < n_capabilities; i++) {
shutdownCapability(&capabilities[i], task, wait_foreign);
}
- boundTaskExiting(task);
}
#endif
+
+ boundTaskExiting(task);
}
void
// while we are moving the TSO:
lockClosure((StgClosure *)tso);
- if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
+ if (tso->stack_size >= tso->max_stack_size
+ && !(tso->flags & TSO_BLOCKEX)) {
// NB. never raise a StackOverflow exception if the thread is
// inside Control.Exceptino.block. It is impractical to protect
// against stack overflow exceptions, since virtually anything
// can raise one (even 'catch'), so this is the only sensible
// thing to do here. See bug #767.
+ //
+
+ if (tso->flags & TSO_SQUEEZED) {
+ unlockTSO(tso);
+ return tso;
+ }
+ // #3677: In a stack overflow situation, stack squeezing may
+ // reduce the stack size, but we don't know whether it has been
+ // reduced enough for the stack check to succeed if we try
+ // again. Fortunately stack squeezing is idempotent, so all we
+ // need to do is record whether *any* squeezing happened. If we
+ // are at the stack's absolute -K limit, and stack squeezing
+ // happened, then we try running the thread again. The
+ // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
+ // squeezing happened or not.
debugTrace(DEBUG_gc,
"threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
return tso;
}
+
+ // We also want to avoid enlarging the stack if squeezing has
+ // already released some of it. However, we don't want to get into
+ // a pathalogical situation where a thread has a nearly full stack
+ // (near its current limit, but not near the absolute -K limit),
+ // keeps allocating a little bit, squeezing removes a little bit,
+ // and then it runs again. So to avoid this, if we squeezed *and*
+ // there is still less than BLOCK_SIZE_W words free, then we enlarge
+ // the stack anyway.
+ if ((tso->flags & TSO_SQUEEZED) &&
+ ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
+ unlockTSO(tso);
+ return tso;
+ }
+
/* Try to double the current stack size. If that takes us over the
* maximum stack size for this thread, then use the maximum instead
* (that is, unless we're already at or over the max size and we
"increasing stack size from %ld words to %d.",
(long)tso->stack_size, new_stack_size);
- dest = (StgTSO *)allocateLocal(cap,new_tso_size);
+ dest = (StgTSO *)allocate(cap,new_tso_size);
TICK_ALLOC_TSO(new_stack_size,0);
/* copy the TSO block and the old stack into the new area */
}
static StgTSO *
-threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
+threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
{
bdescr *bd, *new_bd;
lnat free_w, tso_size_w;
memcpy(new_tso,tso,TSO_STRUCT_SIZE);
new_tso->stack_size = new_bd->free - new_tso->stack;
+ // The original TSO was dirty and probably on the mutable
+ // list. The new TSO is not yet on the mutable list, so we better
+ // put it there.
+ new_tso->dirty = 0;
+ new_tso->flags &= ~TSO_LINK_DIRTY;
+ dirty_TSO(cap, new_tso);
+
debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
(long)tso->id, tso_size_w, tso_sizeW(new_tso));
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
+ UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
+ (StgClosure *)raise_closure);
p = next;
continue;
case CATCH_STM_FRAME: {
StgTRecHeader *trec = tso -> trec;
- StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ StgTRecHeader *outer = trec -> enclosing_trec;
debugTrace(DEBUG_stm,
"found CATCH_STM_FRAME at %p during retry", p);
debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
{
StgTSO *tso, *next;
Capability *cap;
- step *step;
+ generation *gen;
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
- step = Bdescr((P_)tso)->step;
- tso->global_link = step->threads;
- step->threads = tso;
+ gen = Bdescr((P_)tso)->gen;
+ tso->global_link = gen->threads;
+ gen->threads = tso;
debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
StgTSO *tso, *next;
Capability *cap;
Task *task, *saved_task;;
- step *step;
+ generation *gen;
task = myTask();
cap = task->cap;
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
- step = Bdescr((P_)tso)->step;
- tso->global_link = step->threads;
- step->threads = tso;
+ gen = Bdescr((P_)tso)->gen;
+ tso->global_link = gen->threads;
+ gen->threads = tso;
debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);