#include "PosixSource.h"
#define KEEP_LOCKCLOSURE
#include "Rts.h"
-#include "SchedAPI.h"
+
+#include "sm/Storage.h"
#include "RtsUtils.h"
-#include "RtsFlags.h"
-#include "OSThreads.h"
-#include "Storage.h"
#include "StgRun.h"
-#include "Hooks.h"
#include "Schedule.h"
-#include "StgMiscClosures.h"
#include "Interpreter.h"
#include "Printer.h"
#include "RtsSignals.h"
#include "Sanity.h"
#include "Stats.h"
#include "STM.h"
-#include "Timer.h"
#include "Prelude.h"
#include "ThreadLabels.h"
-#include "LdvProfile.h"
#include "Updates.h"
#include "Proftimer.h"
#include "ProfHeap.h"
-#include "GC.h"
#include "Weak.h"
-#include "EventLog.h"
-
-/* PARALLEL_HASKELL includes go here */
-
+#include "eventlog/EventLog.h"
+#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
#include "Sparks.h"
#include "Capability.h"
#include "Task.h"
#include "Trace.h"
#include "RaiseAsync.h"
#include "Threads.h"
-#include "ThrIOManager.h"
+#include "Timer.h"
+#include "ThreadPaused.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#include <errno.h>
#endif
-// Turn off inlining when debugging - it obfuscates things
-#ifdef DEBUG
-# undef STATIC_INLINE
-# define STATIC_INLINE static
-#endif
-
/* -----------------------------------------------------------------------------
* Global variables
* -------------------------------------------------------------------------- */
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task);
+static void scheduleYield (Capability **pcap, Task *task, rtsBool);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
-static rtsBool scheduleGetRemoteWork(Capability *cap);
-static void scheduleSendPendingMessages(void);
-#endif
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
#ifdef DEBUG
static char *whatNext_strs[] = {
- "(unknown)",
- "ThreadRunGHC",
- "ThreadInterpret",
- "ThreadKilled",
- "ThreadRelocated",
- "ThreadComplete"
+ [0] = "(unknown)",
+ [ThreadRunGHC] = "ThreadRunGHC",
+ [ThreadInterpret] = "ThreadInterpret",
+ [ThreadKilled] = "ThreadKilled",
+ [ThreadRelocated] = "ThreadRelocated",
+ [ThreadComplete] = "ThreadComplete"
};
#endif
STATIC_INLINE void
addToRunQueue( Capability *cap, StgTSO *t )
{
-#if defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.doFairScheduling) {
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
- } else {
- // this does unfair scheduling; good for parallelism
- pushOnRunQueue(cap,t);
- }
-#else
// this does round-robin scheduling; good for concurrency
appendToRunQueue(cap,t);
-#endif
}
/* ---------------------------------------------------------------------------
StgTSO *t;
Capability *cap;
StgThreadReturnCode ret;
-#if defined(PARALLEL_HASKELL)
- rtsBool receivedFinish = rtsFalse;
-#endif
nat prev_what_next;
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
+ rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
"### NEW SCHEDULER LOOP (task: %p, cap: %p)",
task, initialCapability);
- if (running_finalizers) {
- errorBelch("error: a C finalizer called back into Haskell.\n"
- " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
- " To create finalizers that may call back into Haskll, use\n"
- " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
- stg_exit(EXIT_FAILURE);
- }
-
schedulePreLoop();
// -----------------------------------------------------------
// Scheduler loop starts here:
-#if defined(PARALLEL_HASKELL)
-#define TERMINATION_CONDITION (!receivedFinish)
-#else
-#define TERMINATION_CONDITION rtsTrue
-#endif
-
- while (TERMINATION_CONDITION) {
+ while (1) {
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
-#if defined(PARALLEL_HASKELL)
- /* since we perform a blocking receive and continue otherwise,
- either we never reach here or we definitely have work! */
- // from here: non-empty run queue
- ASSERT(!emptyRunQueue(cap));
-
- if (PacketsWaiting()) { /* now process incoming messages, if any
- pending...
-
- CAUTION: scheduleGetRemoteWork called
- above, waits for messages as well! */
- processMessages(cap, &receivedFinish);
- }
-#endif // PARALLEL_HASKELL: non-empty run queue!
-
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
}
yield:
- scheduleYield(&cap,task);
+ scheduleYield(&cap,task,force_yield);
+ force_yield = rtsFalse;
+
if (emptyRunQueue(cap)) continue; // look for work again
#endif
debugTrace(DEBUG_sched,
"--<< thread %lu (%s) stopped: blocked",
(unsigned long)t->id, whatNext_strs[t->what_next]);
+ force_yield = rtsTrue;
goto yield;
}
#endif
schedulePostRunThread(cap,t);
- t = threadStackUnderflow(task,t);
+ if (ret != StackOverflow) {
+ t = threadStackUnderflow(task,t);
+ }
ready_to_gc = rtsFalse;
scheduleCheckBlockedThreads(cap);
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
-
-#if defined(PARALLEL_HASKELL)
- // if messages have been buffered...
- scheduleSendPendingMessages();
-#endif
-
-#if defined(PARALLEL_HASKELL)
- if (emptyRunQueue(cap)) {
- receivedFinish = scheduleGetRemoteWork(cap);
- continue; // a new round, (hopefully) with new work
- /*
- in GUM, this a) sends out a FISH and returns IF no fish is
- out already
- b) (blocking) awaits and receives messages
-
- in Eden, this is only the blocking receive, as b) in GUM.
- */
- }
-#endif
}
#if defined(THREADED_RTS)
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task)
+scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
- if (!shouldYieldCapability(cap,task) &&
+ //
+ // The force_yield flag is used when a bound thread blocks. This
+ // is a particularly tricky situation: the current Task does not
+ // own the TSO any more, since it is on some queue somewhere, and
+ // might be woken up or manipulated by another thread at any time.
+ // The TSO and Task might be migrated to another Capability.
+ // Certain invariants might be in doubt, such as task->bound->cap
+ // == cap. We have to yield the current Capability immediately,
+ // no messing around.
+ //
+ if (!force_yield &&
+ !shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyWakeupQueue(cap) ||
blackholes_need_checking ||
static void
scheduleDetectDeadlock (Capability *cap, Task *task)
{
-
-#if defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
- return;
-#endif
-
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(THREADED_RTS)
static void
scheduleActivateSpark(Capability *cap)
{
#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
- * Get work from a remote node (PARALLEL_HASKELL only)
- * ------------------------------------------------------------------------- */
-
-#if defined(PARALLEL_HASKELL)
-static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork (Capability *cap STG_UNUSED)
-{
-#if defined(PARALLEL_HASKELL)
- rtsBool receivedFinish = rtsFalse;
-
- // idle() , i.e. send all buffers, wait for work
- if (RtsFlags.ParFlags.BufferTime) {
- IF_PAR_DEBUG(verbose,
- debugBelch("...send all pending data,"));
- {
- nat i;
- for (i=1; i<=nPEs; i++)
- sendImmediately(i); // send all messages away immediately
- }
- }
-
- /* this would be the place for fishing in GUM...
-
- if (no-earlier-fish-around)
- sendFish(choosePe());
- */
-
- // Eden:just look for incoming messages (blocking receive)
- IF_PAR_DEBUG(verbose,
- debugBelch("...wait for incoming messages...\n"));
- processMessages(cap, &receivedFinish); // blocking receive...
-
-
- return receivedFinish;
- // reenter scheduling look after having received something
-
-#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
-
- return rtsFalse; /* return value unused in THREADED_RTS */
-
-#endif /* PARALLEL_HASKELL */
-}
-#endif // PARALLEL_HASKELL || THREADED_RTS
-
-/* ----------------------------------------------------------------------------
* After running a thread...
* ------------------------------------------------------------------------- */
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
-#if !defined(THREADED_RTS)
- ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
- g0s0 == cap->r.rNursery);
-#endif
cap->r.rNursery->blocks = bd;
}
cap->r.rCurrentNursery->u.back = bd;
static void
scheduleHandleThreadBlocked( StgTSO *t
-#if !defined(GRAN) && !defined(DEBUG)
+#if !defined(DEBUG)
STG_UNUSED
#endif
)
}
#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
barf("forkProcess#: primop not supported on this platform, sorry!\n");
- return -1;
#endif
}
initTaskManager();
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
initSparkPools();
#endif
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
- IF_PAR_DEBUG(verbose,
- debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
- tso->id, tso, tso->stack_size);
- /* If we're debugging, just print out the top of the stack */
- printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
- tso->sp+64)));
-
unlockTSO(dest);
unlockTSO(tso);
{
sched_state = SCHED_INTERRUPTING;
setContextSwitches();
+#if defined(THREADED_RTS)
wakeUpRts();
+#endif
}
/* -----------------------------------------------------------------------------
will have interrupted any blocking system call in progress anyway.
-------------------------------------------------------------------------- */
-void
-wakeUpRts(void)
-{
#if defined(THREADED_RTS)
+void wakeUpRts(void)
+{
// This forces the IO Manager thread to wakeup, which will
// in turn ensure that some OS thread wakes up and runs the
// scheduler loop, which will cause a GC and deadlock check.
ioManagerWakeup();
-#endif
}
+#endif
/* -----------------------------------------------------------------------------
* checkBlackHoles()
switch (tso->why_blocked) {
case BlockedOnMVar:
- case BlockedOnException:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
- (StgClosure *)blockedOnDeadMVar_closure);
+ (StgClosure *)blockedIndefinitelyOnMVar_closure);
break;
case BlockedOnBlackHole:
throwToSingleThreaded(cap, tso,
break;
case BlockedOnSTM:
throwToSingleThreaded(cap, tso,
- (StgClosure *)blockedIndefinitely_closure);
+ (StgClosure *)blockedIndefinitelyOnSTM_closure);
break;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
* can wake up threads, remember...).
*/
continue;
+ case BlockedOnException:
+ // throwTo should never block indefinitely: if the target
+ // thread dies or completes, throwTo returns.
+ barf("resurrectThreads: thread BlockedOnException");
+ break;
default:
barf("resurrectThreads: thread blocked in a strange way");
}
{
StgTSO *tso, *next;
Capability *cap;
+ Task *task, *saved_task;;
step *step;
+ task = myTask();
+ cap = task->cap;
+
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
- cap = tso->cap;
- maybePerformBlockedException(cap, tso);
- }
+ // We must pretend this Capability belongs to the current Task
+ // for the time being, as invariants will be broken otherwise.
+ // In fact the current Task has exclusive access to the systme
+ // at this point, so this is just bookkeeping:
+ task->cap = tso->cap;
+ saved_task = tso->cap->running_task;
+ tso->cap->running_task = task;
+ maybePerformBlockedException(tso->cap, tso);
+ tso->cap->running_task = saved_task;
+ }
+
+ // Restore our original Capability:
+ task->cap = cap;
}