#include "PosixSource.h"
#define KEEP_LOCKCLOSURE
#include "Rts.h"
-#include "SchedAPI.h"
+
+#include "sm/Storage.h"
#include "RtsUtils.h"
-#include "RtsFlags.h"
-#include "OSThreads.h"
-#include "Storage.h"
#include "StgRun.h"
-#include "Hooks.h"
#include "Schedule.h"
-#include "StgMiscClosures.h"
#include "Interpreter.h"
#include "Printer.h"
#include "RtsSignals.h"
#include "Sanity.h"
#include "Stats.h"
#include "STM.h"
-#include "Timer.h"
#include "Prelude.h"
#include "ThreadLabels.h"
-#include "LdvProfile.h"
#include "Updates.h"
#include "Proftimer.h"
#include "ProfHeap.h"
-#include "GC.h"
-
-/* PARALLEL_HASKELL includes go here */
-
+#include "Weak.h"
+#include "eventlog/EventLog.h"
+#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
#include "Sparks.h"
#include "Capability.h"
#include "Task.h"
#include "Trace.h"
#include "RaiseAsync.h"
#include "Threads.h"
-#include "ThrIOManager.h"
+#include "Timer.h"
+#include "ThreadPaused.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#include <errno.h>
#endif
-// Turn off inlining when debugging - it obfuscates things
-#ifdef DEBUG
-# undef STATIC_INLINE
-# define STATIC_INLINE static
-#endif
-
/* -----------------------------------------------------------------------------
* Global variables
* -------------------------------------------------------------------------- */
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task);
+static void scheduleYield (Capability **pcap, Task *task, rtsBool);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
-static rtsBool scheduleGetRemoteWork(Capability *cap);
-static void scheduleSendPendingMessages(void);
-#endif
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
#ifdef DEBUG
static char *whatNext_strs[] = {
- "(unknown)",
- "ThreadRunGHC",
- "ThreadInterpret",
- "ThreadKilled",
- "ThreadRelocated",
- "ThreadComplete"
+ [0] = "(unknown)",
+ [ThreadRunGHC] = "ThreadRunGHC",
+ [ThreadInterpret] = "ThreadInterpret",
+ [ThreadKilled] = "ThreadKilled",
+ [ThreadRelocated] = "ThreadRelocated",
+ [ThreadComplete] = "ThreadComplete"
};
#endif
STATIC_INLINE void
addToRunQueue( Capability *cap, StgTSO *t )
{
-#if defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.doFairScheduling) {
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
- } else {
- // this does unfair scheduling; good for parallelism
- pushOnRunQueue(cap,t);
- }
-#else
// this does round-robin scheduling; good for concurrency
appendToRunQueue(cap,t);
-#endif
}
/* ---------------------------------------------------------------------------
StgTSO *t;
Capability *cap;
StgThreadReturnCode ret;
-#if defined(PARALLEL_HASKELL)
- rtsBool receivedFinish = rtsFalse;
-#endif
nat prev_what_next;
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
+ rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
// -----------------------------------------------------------
// Scheduler loop starts here:
-#if defined(PARALLEL_HASKELL)
-#define TERMINATION_CONDITION (!receivedFinish)
-#else
-#define TERMINATION_CONDITION rtsTrue
-#endif
-
- while (TERMINATION_CONDITION) {
+ while (1) {
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
-#if defined(PARALLEL_HASKELL)
- /* since we perform a blocking receive and continue otherwise,
- either we never reach here or we definitely have work! */
- // from here: non-empty run queue
- ASSERT(!emptyRunQueue(cap));
-
- if (PacketsWaiting()) { /* now process incoming messages, if any
- pending...
-
- CAUTION: scheduleGetRemoteWork called
- above, waits for messages as well! */
- processMessages(cap, &receivedFinish);
- }
-#endif // PARALLEL_HASKELL: non-empty run queue!
-
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
}
yield:
- scheduleYield(&cap,task);
+ scheduleYield(&cap,task,force_yield);
+ force_yield = rtsFalse;
+
if (emptyRunQueue(cap)) continue; // look for work again
#endif
}
#endif
+ postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
+
switch (prev_what_next) {
case ThreadKilled:
t->saved_winerror = GetLastError();
#endif
+ postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
+
#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
debugTrace(DEBUG_sched,
"--<< thread %lu (%s) stopped: blocked",
(unsigned long)t->id, whatNext_strs[t->what_next]);
+ force_yield = rtsTrue;
goto yield;
}
#endif
schedulePostRunThread(cap,t);
- t = threadStackUnderflow(task,t);
+ if (ret != StackOverflow) {
+ t = threadStackUnderflow(task,t);
+ }
ready_to_gc = rtsFalse;
scheduleCheckBlockedThreads(cap);
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
-
-#if defined(PARALLEL_HASKELL)
- // if messages have been buffered...
- scheduleSendPendingMessages();
-#endif
-
-#if defined(PARALLEL_HASKELL)
- if (emptyRunQueue(cap)) {
- receivedFinish = scheduleGetRemoteWork(cap);
- continue; // a new round, (hopefully) with new work
- /*
- in GUM, this a) sends out a FISH and returns IF no fish is
- out already
- b) (blocking) awaits and receives messages
-
- in Eden, this is only the blocking receive, as b) in GUM.
- */
- }
-#endif
}
#if defined(THREADED_RTS)
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task)
+scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
- if (!shouldYieldCapability(cap,task) &&
+ //
+ // The force_yield flag is used when a bound thread blocks. This
+ // is a particularly tricky situation: the current Task does not
+ // own the TSO any more, since it is on some queue somewhere, and
+ // might be woken up or manipulated by another thread at any time.
+ // The TSO and Task might be migrated to another Capability.
+ // Certain invariants might be in doubt, such as task->bound->cap
+ // == cap. We have to yield the current Capability immediately,
+ // no messing around.
+ //
+ if (!force_yield &&
+ !shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
+ !emptyWakeupQueue(cap) ||
blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
} else {
debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
appendToRunQueue(free_caps[i],t);
+
+ postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
+
if (t->bound) { t->bound->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
spark = tryStealSpark(cap->sparks);
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
+
+ postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
+
newSpark(&(free_caps[i]->r), spark);
}
}
static void
scheduleDetectDeadlock (Capability *cap, Task *task)
{
-
-#if defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
- return;
-#endif
-
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(THREADED_RTS)
static void
scheduleActivateSpark(Capability *cap)
{
#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
- * Get work from a remote node (PARALLEL_HASKELL only)
- * ------------------------------------------------------------------------- */
-
-#if defined(PARALLEL_HASKELL)
-static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork (Capability *cap STG_UNUSED)
-{
-#if defined(PARALLEL_HASKELL)
- rtsBool receivedFinish = rtsFalse;
-
- // idle() , i.e. send all buffers, wait for work
- if (RtsFlags.ParFlags.BufferTime) {
- IF_PAR_DEBUG(verbose,
- debugBelch("...send all pending data,"));
- {
- nat i;
- for (i=1; i<=nPEs; i++)
- sendImmediately(i); // send all messages away immediately
- }
- }
-
- /* this would be the place for fishing in GUM...
-
- if (no-earlier-fish-around)
- sendFish(choosePe());
- */
-
- // Eden:just look for incoming messages (blocking receive)
- IF_PAR_DEBUG(verbose,
- debugBelch("...wait for incoming messages...\n"));
- processMessages(cap, &receivedFinish); // blocking receive...
-
-
- return receivedFinish;
- // reenter scheduling look after having received something
-
-#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
-
- return rtsFalse; /* return value unused in THREADED_RTS */
-
-#endif /* PARALLEL_HASKELL */
-}
-#endif // PARALLEL_HASKELL || THREADED_RTS
-
-/* ----------------------------------------------------------------------------
* After running a thread...
* ------------------------------------------------------------------------- */
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
-#if !defined(THREADED_RTS)
- ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
- g0s0 == cap->r.rNursery);
-#endif
cap->r.rNursery->blocks = bd;
}
cap->r.rCurrentNursery->u.back = bd;
"--<< thread %ld (%s) stopped: HeapOverflow",
(long)t->id, whatNext_strs[t->what_next]);
- if (cap->context_switch) {
+ if (cap->r.rHpLim == NULL || cap->context_switch) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
// context switch flag, and we end up waiting for a GC.
static void
scheduleHandleThreadBlocked( StgTSO *t
-#if !defined(GRAN) && !defined(DEBUG)
+#if !defined(DEBUG)
STG_UNUSED
#endif
)
(unsigned long)t->id, whatNext_strs[t->what_next]);
// blocked exceptions can now complete, even if the thread was in
- // blocked mode (see #2910). The thread is already marked
- // ThreadComplete, so any further throwTos will complete
- // immediately and we don't need to worry about synchronising with
- // those.
+ // blocked mode (see #2910). This unconditionally calls
+ // lockTSO(), which ensures that we don't miss any threads that
+ // are engaged in throwTo() with this thread as a target.
awakenBlockedExceptionQueue (cap, t);
//
if (gc_type == PENDING_GC_SEQ)
{
+ postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
{
// multi-threaded GC: make sure all the Capabilities donate one
// GC thread each.
+ postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
waitForGcThreads(cap);
heap_census = scheduleNeedHeapProfile(rtsTrue);
+#if defined(THREADED_RTS)
+ postEvent(cap, EVENT_GC_START, 0, 0);
+ debugTrace(DEBUG_sched, "doing GC");
+ // reset waiting_for_gc *before* GC, so that when the GC threads
+ // emerge they don't immediately re-enter the GC.
+ waiting_for_gc = 0;
+ GarbageCollect(force_major || heap_census, gc_type, cap);
+#else
+ GarbageCollect(force_major || heap_census, 0, cap);
+#endif
+ postEvent(cap, EVENT_GC_END, 0, 0);
+
if (recent_activity == ACTIVITY_INACTIVE && force_major)
{
// We are doing a GC because the system has been idle for a
// timeslice and we need to check for deadlock. Record the
// fact that we've done a GC and turn off the timer signal;
// it will get re-enabled if we run any threads after the GC.
- //
- // Note: this is done before GC, because after GC there might
- // be threads already running (GarbageCollect() releases the
- // GC threads when it completes), so we risk turning off the
- // timer signal when it should really be on.
recent_activity = ACTIVITY_DONE_GC;
stopTimer();
}
+ else
+ {
+ // the GC might have taken long enough for the timer to set
+ // recent_activity = ACTIVITY_INACTIVE, but we aren't
+ // necessarily deadlocked:
+ recent_activity = ACTIVITY_YES;
+ }
#if defined(THREADED_RTS)
- debugTrace(DEBUG_sched, "doing GC");
- // reset waiting_for_gc *before* GC, so that when the GC threads
- // emerge they don't immediately re-enter the GC.
- waiting_for_gc = 0;
- GarbageCollect(force_major || heap_census, gc_type, cap);
-#else
- GarbageCollect(force_major || heap_census, 0, cap);
+ if (gc_type == PENDING_GC_PAR)
+ {
+ releaseGCThreads(cap);
+ }
#endif
if (heap_census) {
}
#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
barf("forkProcess#: primop not supported on this platform, sorry!\n");
- return -1;
#endif
}
task = cap->running_task;
tso = cap->r.rCurrentTSO;
+ postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
debugTrace(DEBUG_sched,
"thread %lu did a safe foreign call",
(unsigned long)cap->r.rCurrentTSO->id);
tso = task->suspended_tso;
task->suspended_tso = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
+
+ postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
if (tso->why_blocked == BlockedOnCCall) {
- awakenBlockedExceptionQueue(cap,tso);
+ // avoid locking the TSO if we don't have to
+ if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ awakenBlockedExceptionQueue(cap,tso);
+ }
tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
}
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
+ postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
cap = task->cap;
RELEASE_LOCK(&task->lock);
+ if (RtsFlags.ParFlags.setAffinity) {
+ setThreadAffinity(cap->no, n_capabilities);
+ }
+
// set the thread-local pointer to the Task:
taskEnter(task);
initTaskManager();
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
initSparkPools();
#endif
}
#endif
- trace(TRACE_sched, "start: %d capabilities", n_capabilities);
-
RELEASE_LOCK(&sched_mutex);
}
{
Task *task = NULL;
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
-#endif
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
-#if defined(THREADED_RTS)
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
releaseCapability(task->cap);
-#else
- scheduleDoGC(&MainCapability,task,rtsFalse);
-#endif
}
sched_state = SCHED_SHUTTING_DOWN;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalling_tasks queue.
- ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,force_major);
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
- IF_PAR_DEBUG(verbose,
- debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
- tso->id, tso, tso->stack_size);
- /* If we're debugging, just print out the top of the stack */
- printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
- tso->sp+64)));
-
unlockTSO(dest);
unlockTSO(tso);
tso_size_w = tso_sizeW(tso);
- if (tso_size_w < MBLOCK_SIZE_W ||
+ if (tso_size_w < MBLOCK_SIZE_W ||
+ // TSO is less than 2 mblocks (since the first mblock is
+ // shorter than MBLOCK_SIZE_W)
+ (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
+ // or TSO is not a whole number of megablocks (ensuring
+ // precondition of splitLargeBlock() below)
+ (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
+ // or TSO is smaller than the minimum stack size (rounded up)
(nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
+ // or stack is using more than 1/4 of the available space
{
+ // then do nothing
return tso;
}
{
sched_state = SCHED_INTERRUPTING;
setContextSwitches();
+#if defined(THREADED_RTS)
wakeUpRts();
+#endif
}
/* -----------------------------------------------------------------------------
will have interrupted any blocking system call in progress anyway.
-------------------------------------------------------------------------- */
-void
-wakeUpRts(void)
-{
#if defined(THREADED_RTS)
+void wakeUpRts(void)
+{
// This forces the IO Manager thread to wakeup, which will
// in turn ensure that some OS thread wakes up and runs the
// scheduler loop, which will cause a GC and deadlock check.
ioManagerWakeup();
-#endif
}
+#endif
/* -----------------------------------------------------------------------------
* checkBlackHoles()
prev = &blackhole_queue;
t = blackhole_queue;
while (t != END_TSO_QUEUE) {
+ if (t->what_next == ThreadRelocated) {
+ t = t->_link;
+ continue;
+ }
ASSERT(t->why_blocked == BlockedOnBlackHole);
type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {