/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
- * Scheduler
- *
- * Different GHC ways use this scheduler quite differently (see comments below)
- * Here is the global picture:
- *
- * WAY Name CPP flag What's it for
- * --------------------------------------
- * mp GUM PARALLEL_HASKELL Parallel execution on a distrib. memory machine
- * s SMP SMP Parallel execution on a shared memory machine
- * mg GranSim GRAN Simulation of parallel execution
- * md GUM/GdH DIST Distributed execution (based on GUM)
+ * The scheduler and thread-related functionality
*
* --------------------------------------------------------------------------*/
-/*
- * Version with support for distributed memory parallelism aka GUM (WAY=mp):
-
- The main scheduling loop in GUM iterates until a finish message is received.
- In that case a global flag @receivedFinish@ is set and this instance of
- the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
- for the handling of incoming messages, such as PP_FINISH.
- Note that in the parallel case we have a system manager that coordinates
- different PEs, each of which are running one instance of the RTS.
- See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
- From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
-
- * Version with support for simulating parallel execution aka GranSim (WAY=mg):
-
- The main scheduling code in GranSim is quite different from that in std
- (concurrent) Haskell: while concurrent Haskell just iterates over the
- threads in the runnable queue, GranSim is event driven, i.e. it iterates
- over the events in the global event queue. -- HWL
-*/
-
#include "PosixSource.h"
#include "Rts.h"
#include "SchedAPI.h"
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
-#define COMPILING_SCHEDULER
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Interpreter.h"
#include "Exception.h"
#include "Printer.h"
-#include "Signals.h"
+#include "RtsSignals.h"
#include "Sanity.h"
#include "Stats.h"
#include "STM.h"
#endif
#include "Sparks.h"
#include "Capability.h"
-#include "Task.h"
+#include "Task.h"
+#include "AwaitEvent.h"
+#if defined(mingw32_HOST_OS)
+#include "win32/IOManager.h"
+#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
# define STATIC_INLINE static
#endif
-#ifdef THREADED_RTS
-#define USED_IN_THREADED_RTS
-#else
-#define USED_IN_THREADED_RTS STG_UNUSED
-#endif
-
-#ifdef RTS_SUPPORTS_THREADS
-#define USED_WHEN_RTS_SUPPORTS_THREADS
-#else
-#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
-#endif
-
-/* Main thread queue.
- * Locks required: sched_mutex.
- */
-StgMainThread *main_threads = NULL;
+/* -----------------------------------------------------------------------------
+ * Global variables
+ * -------------------------------------------------------------------------- */
#if defined(GRAN)
#else /* !GRAN */
-/* Thread queues.
- * Locks required: sched_mutex.
- */
-StgTSO *run_queue_hd = NULL;
-StgTSO *run_queue_tl = NULL;
+#if !defined(THREADED_RTS)
+// Blocked/sleeping thrads
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
-StgTSO *blackhole_queue = NULL;
-StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
+StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
+#endif
+/* Threads blocked on blackholes.
+ * LOCK: sched_mutex+capability, or all capabilities
+ */
+StgTSO *blackhole_queue = NULL;
#endif
/* The blackhole_queue should be checked for threads to wake up. See
* Schedule.h for more thorough comment.
+ * LOCK: none (doesn't matter if we miss an update)
*/
rtsBool blackholes_need_checking = rtsFalse;
/* Linked list of all threads.
* Used for detecting garbage collected threads.
+ * LOCK: sched_mutex+capability, or all capabilities
*/
StgTSO *all_threads = NULL;
-/* When a thread performs a safe C call (_ccall_GC, using old
- * terminology), it gets put on the suspended_ccalling_threads
- * list. Used by the garbage collector.
+/* flag set by signal handler to precipitate a context switch
+ * LOCK: none (just an advisory flag)
*/
-static StgTSO *suspended_ccalling_threads;
-
-/* KH: The following two flags are shared memory locations. There is no need
- to lock them, since they are only unset at the end of a scheduler
- operation.
-*/
-
-/* flag set by signal handler to precipitate a context switch */
int context_switch = 0;
-/* if this flag is set as well, give up execution */
+/* flag that tracks whether we have done any execution in this time slice.
+ * LOCK: currently none, perhaps we should lock (but needs to be
+ * updated in the fast path of the scheduler).
+ */
+nat recent_activity = ACTIVITY_YES;
+
+/* if this flag is set as well, give up execution
+ * LOCK: none (changes once, from false->true)
+ */
rtsBool interrupted = rtsFalse;
/* Next thread ID to allocate.
- * Locks required: thread_id_mutex
+ * LOCK: sched_mutex
*/
static StgThreadID next_thread_id = 1;
-/*
- * Pointers to the state of the current thread.
- * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
- * thread. If CurrentTSO == NULL, then we're at the scheduler level.
- */
-
/* The smallest stack size that makes any sense is:
* RESERVED_STACK_WORDS (so we can get back from the stack overflow)
* + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
* A thread with this stack will bomb immediately with a stack
* overflow, which will increase its stack size.
*/
-
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
-
#if defined(GRAN)
StgTSO *CurrentTSO;
#endif
* in an MT setting, needed to signal that a worker thread shouldn't hang around
* in the scheduler when it is out of work.
*/
-static rtsBool shutting_down_scheduler = rtsFalse;
+rtsBool shutting_down_scheduler = rtsFalse;
-#if defined(RTS_SUPPORTS_THREADS)
-/* ToDo: carefully document the invariants that go together
- * with these synchronisation objects.
+/*
+ * This mutex protects most of the global scheduler data in
+ * the THREADED_RTS runtime.
*/
-Mutex sched_mutex = INIT_MUTEX_VAR;
-Mutex term_mutex = INIT_MUTEX_VAR;
-
-#endif /* RTS_SUPPORTS_THREADS */
+#if defined(THREADED_RTS)
+Mutex sched_mutex;
+#endif
#if defined(PARALLEL_HASKELL)
StgTSO *LastTSO;
rtsBool emitSchedule = rtsTrue;
#endif
-#if DEBUG
-static char *whatNext_strs[] = {
- "(unknown)",
- "ThreadRunGHC",
- "ThreadInterpret",
- "ThreadKilled",
- "ThreadRelocated",
- "ThreadComplete"
-};
-#endif
-
/* -----------------------------------------------------------------------------
* static function prototypes
* -------------------------------------------------------------------------- */
-#if defined(RTS_SUPPORTS_THREADS)
-static void taskStart(void);
-#endif
-
-static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
- Capability *initialCapability );
+static Capability *schedule (Capability *initialCapability, Task *task);
//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
-static void schedulePreLoop(void);
-static void scheduleStartSignalHandlers(void);
-static void scheduleCheckBlockedThreads(void);
-static void scheduleCheckBlackHoles(void);
-static void scheduleDetectDeadlock(void);
+static void schedulePreLoop (void);
+#if defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
+#endif
+static void scheduleStartSignalHandlers (Capability *cap);
+static void scheduleCheckBlockedThreads (Capability *cap);
+static void scheduleCheckBlackHoles (Capability *cap);
+static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#endif
static void schedulePostRunThread(void);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
-static void scheduleHandleStackOverflow( StgTSO *t);
-static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
+static void scheduleHandleStackOverflow( Capability *cap, Task *task,
+ StgTSO *t);
+static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
+ nat prev_what_next );
static void scheduleHandleThreadBlocked( StgTSO *t );
-static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
- Capability *cap, StgTSO *t );
+static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
+ StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap);
-
-static void unblockThread(StgTSO *tso);
-static rtsBool checkBlackHoles(void);
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
- Capability *initialCapability
- );
-static void scheduleThread_ (StgTSO* tso);
+static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
+ void (*get_roots)(evac_fn));
+
+static void unblockThread(Capability *cap, StgTSO *tso);
+static rtsBool checkBlackHoles(Capability *cap);
static void AllRoots(evac_fn evac);
-static StgTSO *threadStackOverflow(StgTSO *tso);
+static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
+
+static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically, StgPtr stop_here);
-static void raiseAsync_(StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically);
+static void deleteThread (Capability *cap, StgTSO *tso);
+static void deleteRunQueue (Capability *cap);
+#ifdef DEBUG
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
+void printThreadQueue(StgTSO *tso);
+#endif
#if defined(PARALLEL_HASKELL)
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);
#endif
-/* ----------------------------------------------------------------------------
- * Starting Tasks
- * ------------------------------------------------------------------------- */
-
-#if defined(RTS_SUPPORTS_THREADS)
-static nat startingWorkerThread = 0;
-
-static void
-taskStart(void)
-{
- ACQUIRE_LOCK(&sched_mutex);
- startingWorkerThread--;
- schedule(NULL,NULL);
- taskStop();
- RELEASE_LOCK(&sched_mutex);
-}
-
-void
-startSchedulerTaskIfNecessary(void)
-{
- if ( !EMPTY_RUN_QUEUE()
- && !shutting_down_scheduler // not if we're shutting down
- && startingWorkerThread==0)
- {
- // we don't want to start another worker thread
- // just because the last one hasn't yet reached the
- // "waiting for capability" state
- startingWorkerThread++;
- if (!maybeStartNewWorker(taskStart)) {
- startingWorkerThread--;
- }
- }
-}
+#ifdef DEBUG
+static char *whatNext_strs[] = {
+ "(unknown)",
+ "ThreadRunGHC",
+ "ThreadInterpret",
+ "ThreadKilled",
+ "ThreadRelocated",
+ "ThreadComplete"
+};
#endif
/* -----------------------------------------------------------------------------
* -------------------------------------------------------------------------- */
STATIC_INLINE void
-addToRunQueue( StgTSO *t )
+addToRunQueue( Capability *cap, StgTSO *t )
{
#if defined(PARALLEL_HASKELL)
if (RtsFlags.ParFlags.doFairScheduling) {
// this does round-robin scheduling; good for concurrency
- APPEND_TO_RUN_QUEUE(t);
+ appendToRunQueue(cap,t);
} else {
// this does unfair scheduling; good for parallelism
- PUSH_ON_RUN_QUEUE(t);
+ pushOnRunQueue(cap,t);
}
#else
// this does round-robin scheduling; good for concurrency
- APPEND_TO_RUN_QUEUE(t);
+ appendToRunQueue(cap,t);
#endif
}
-
+
/* ---------------------------------------------------------------------------
Main scheduling loop.
* thread ends
* stack overflow
- Locking notes: we acquire the scheduler lock once at the beginning
- of the scheduler loop, and release it when
-
- * running a thread, or
- * waiting for work, or
- * waiting for a GC to complete.
-
GRAN version:
In a GranSim setup this loop iterates over the global event queue.
This revolves around the global event queue, which determines what
------------------------------------------------------------------------ */
-static void
-schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
- Capability *initialCapability )
+static Capability *
+schedule (Capability *initialCapability, Task *task)
{
StgTSO *t;
Capability *cap;
#endif
nat prev_what_next;
rtsBool ready_to_gc;
+#if defined(THREADED_RTS)
+ rtsBool first = rtsTrue;
+#endif
- // Pre-condition: sched_mutex is held.
- // We might have a capability, passed in as initialCapability.
cap = initialCapability;
-#if !defined(RTS_SUPPORTS_THREADS)
- // simply initialise it in the non-threaded case
- grabCapability(&cap);
-#endif
+ // Pre-condition: this task owns initialCapability.
+ // The sched_mutex is *NOT* held
+ // NB. on return, we still hold a capability.
IF_DEBUG(scheduler,
- sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
- mainThread, initialCapability);
+ sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
+ task, initialCapability);
);
schedulePreLoop();
CurrentTSO = event->tso;
#endif
- IF_DEBUG(scheduler, printAllThreads());
-
-#if defined(RTS_SUPPORTS_THREADS)
- // Yield the capability to higher-priority tasks if necessary.
- //
- if (cap != NULL) {
- yieldCapability(&cap);
- }
-
- // If we do not currently hold a capability, we wait for one
- //
- if (cap == NULL) {
- waitForCapability(&sched_mutex, &cap,
- mainThread ? &mainThread->bound_thread_cond : NULL);
+#if defined(THREADED_RTS)
+ if (first) {
+ // don't yield the first time, we want a chance to run this
+ // thread for a bit, even if there are others banging at the
+ // door.
+ first = rtsFalse;
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ } else {
+ // Yield the capability to higher-priority tasks if necessary.
+ yieldCapability(&cap, task);
}
-
- // We now have a capability...
+#endif
+
+#if defined(THREADED_RTS)
+ schedulePushWork(cap,task);
#endif
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
- if (cap->r.rInHaskell) {
+ if (cap->in_haskell) {
errorBelch("schedule: re-entered unsafely.\n"
" Perhaps a 'foreign import unsafe' should be 'safe'?");
- stg_exit(1);
+ stg_exit(EXIT_FAILURE);
}
//
// the threaded RTS.
//
if (interrupted) {
+ deleteRunQueue(cap);
+#if defined(THREADED_RTS)
+ discardSparksCap(cap);
+#endif
if (shutting_down_scheduler) {
IF_DEBUG(scheduler, sched_belch("shutting down"));
- releaseCapability(cap);
- if (mainThread) {
- mainThread->stat = Interrupted;
- mainThread->ret = NULL;
+ // If we are a worker, just exit. If we're a bound thread
+ // then we will exit below when we've removed our TSO from
+ // the run queue.
+ if (task->tso == NULL && emptyRunQueue(cap)) {
+ return cap;
}
- return;
} else {
IF_DEBUG(scheduler, sched_belch("interrupted"));
- deleteAllThreads();
}
}
-#if defined(not_yet) && defined(SMP)
- //
- // Top up the run queue from our spark pool. We try to make the
- // number of threads in the run queue equal to the number of
- // free capabilities.
- //
+#if defined(THREADED_RTS)
+ // If the run queue is empty, take a spark and turn it into a thread.
{
- StgClosure *spark;
- if (EMPTY_RUN_QUEUE()) {
- spark = findSpark(rtsFalse);
- if (spark == NULL) {
- break; /* no more sparks in the pool */
- } else {
- createSparkThread(spark);
+ if (emptyRunQueue(cap)) {
+ StgClosure *spark;
+ spark = findSpark(cap);
+ if (spark != NULL) {
IF_DEBUG(scheduler,
- sched_belch("==^^ turning spark of closure %p into a thread",
+ sched_belch("turning spark of closure %p into a thread",
(StgClosure *)spark));
+ createSparkThread(cap,spark);
}
}
}
-#endif // SMP
+#endif // THREADED_RTS
- scheduleStartSignalHandlers();
+ scheduleStartSignalHandlers(cap);
// Only check the black holes here if we've nothing else to do.
// During normal execution, the black hole list only gets checked
// at GC time, to avoid repeatedly traversing this possibly long
// list each time around the scheduler.
- if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
+ if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
- scheduleCheckBlockedThreads();
+ scheduleCheckBlockedThreads(cap);
- scheduleDetectDeadlock();
+ scheduleDetectDeadlock(cap,task);
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
//
// win32: might be here due to awaitEvent() being abandoned
// as a result of a console event having been delivered.
- if ( EMPTY_RUN_QUEUE() ) {
-#if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
+ if ( emptyRunQueue(cap) ) {
+#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
ASSERT(interrupted);
#endif
continue; // nothing to do
#if defined(PARALLEL_HASKELL)
scheduleSendPendingMessages();
- if (EMPTY_RUN_QUEUE() && scheduleActivateSpark())
+ if (emptyRunQueue(cap) && scheduleActivateSpark())
continue;
#if defined(SPARKS)
/* If we still have no work we need to send a FISH to get a spark
from another PE */
- if (EMPTY_RUN_QUEUE()) {
+ if (emptyRunQueue(cap)) {
if (!scheduleGetRemoteWork(&receivedFinish)) continue;
ASSERT(rtsFalse); // should not happen at the moment
}
//
// Get a thread to run
//
- ASSERT(run_queue_hd != END_TSO_QUEUE);
- POP_RUN_QUEUE(t);
+ t = popRunQueue(cap);
#if defined(GRAN) || defined(PAR)
scheduleGranParReport(); // some kind of debuging output
IF_DEBUG(sanity,checkTSO(t));
#endif
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
- StgMainThread *m = t->main;
+ Task *bound = t->bound;
- if(m)
- {
- if(m == mainThread)
- {
- IF_DEBUG(scheduler,
- sched_belch("### Running thread %d in bound thread", t->id));
- // yes, the Haskell thread is bound to the current native thread
- }
- else
- {
- IF_DEBUG(scheduler,
- sched_belch("### thread %d bound to another OS thread", t->id));
- // no, bound to a different Haskell thread: pass to that thread
- PUSH_ON_RUN_QUEUE(t);
- passCapability(&m->bound_thread_cond);
- continue;
- }
- }
- else
- {
- if(mainThread != NULL)
- // The thread we want to run is bound.
- {
- IF_DEBUG(scheduler,
- sched_belch("### this OS thread cannot run thread %d", t->id));
- // no, the current native thread is bound to a different
- // Haskell thread, so pass it to any worker thread
- PUSH_ON_RUN_QUEUE(t);
- passCapabilityToWorker();
- continue;
+ if (bound) {
+ if (bound == task) {
+ IF_DEBUG(scheduler,
+ sched_belch("### Running thread %d in bound thread",
+ t->id));
+ // yes, the Haskell thread is bound to the current native thread
+ } else {
+ IF_DEBUG(scheduler,
+ sched_belch("### thread %d bound to another OS thread",
+ t->id));
+ // no, bound to a different Haskell thread: pass to that thread
+ pushOnRunQueue(cap,t);
+ continue;
+ }
+ } else {
+ // The thread we want to run is unbound.
+ if (task->tso) {
+ IF_DEBUG(scheduler,
+ sched_belch("### this OS thread cannot run thread %d", t->id));
+ // no, the current native thread is bound to a different
+ // Haskell thread, so pass it to any worker thread
+ pushOnRunQueue(cap,t);
+ continue;
+ }
}
- }
}
#endif
cap->r.rCurrentTSO = t;
- /* context switches are now initiated by the timer signal, unless
+ /* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
*/
- if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
- && (run_queue_hd != END_TSO_QUEUE
- || blocked_queue_hd != END_TSO_QUEUE
- || sleeping_queue != END_TSO_QUEUE)))
+ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
+ && !emptyThreadQueues(cap)) {
context_switch = 1;
-
+ }
+
run_thread:
- RELEASE_LOCK(&sched_mutex);
-
IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
(long)t->id, whatNext_strs[t->what_next]));
prev_what_next = t->what_next;
errno = t->saved_errno;
- cap->r.rInHaskell = rtsTrue;
+ cap->in_haskell = rtsTrue;
- switch (prev_what_next) {
+ dirtyTSO(t);
+ recent_activity = ACTIVITY_YES;
+
+ switch (prev_what_next) {
+
case ThreadKilled:
case ThreadComplete:
/* Thread already finished, return to scheduler. */
ret = ThreadFinished;
break;
-
+
case ThreadRunGHC:
- ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+ {
+ StgRegTable *r;
+ r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+ cap = regTableToCapability(r);
+ ret = r->rRet;
break;
-
+ }
+
case ThreadInterpret:
- ret = interpretBCO(cap);
+ cap = interpretBCO(cap);
+ ret = cap->r.rRet;
break;
-
+
default:
- barf("schedule: invalid what_next field");
- }
-
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
+ barf("schedule: invalid what_next field");
}
- cap->r.rInHaskell = rtsFalse;
+ cap->in_haskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
t = cap->r.rCurrentTSO;
+ // We have run some Haskell code: there might be blackhole-blocked
+ // threads to wake up now.
+ // Lock-free test here should be ok, we're just setting a flag.
+ if ( blackhole_queue != END_TSO_QUEUE ) {
+ blackholes_need_checking = rtsTrue;
+ }
+
// And save the current errno in this thread.
+ // XXX: possibly bogus for SMP because this thread might already
+ // be running again, see code below.
t->saved_errno = errno;
+#ifdef SMP
+ // If ret is ThreadBlocked, and this Task is bound to the TSO that
+ // blocked, we are in limbo - the TSO is now owned by whatever it
+ // is blocked on, and may in fact already have been woken up,
+ // perhaps even on a different Capability. It may be the case
+ // that task->cap != cap. We better yield this Capability
+ // immediately and return to normaility.
+ if (ret == ThreadBlocked) {
+ IF_DEBUG(scheduler,
+ sched_belch("--<< thread %d (%s) stopped: blocked\n",
+ t->id, whatNext_strs[t->what_next]));
+ continue;
+ }
+#endif
+
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
// ----------------------------------------------------------------------
- /* Costs for the scheduler are assigned to CCS_SYSTEM */
+ // Costs for the scheduler are assigned to CCS_SYSTEM
#if defined(PROFILING)
stopHeapProfTimer();
CCCS = CCS_SYSTEM;
#endif
- ACQUIRE_LOCK(&sched_mutex);
-
-#if defined(RTS_SUPPORTS_THREADS)
- IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
+#if defined(THREADED_RTS)
+ IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
#elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
IF_DEBUG(scheduler,debugBelch("sched: "););
#endif
break;
case StackOverflow:
- scheduleHandleStackOverflow(t);
+ scheduleHandleStackOverflow(cap,task,t);
break;
case ThreadYielding:
- if (scheduleHandleYield(t, prev_what_next)) {
+ if (scheduleHandleYield(cap, t, prev_what_next)) {
// shortcut for switching between compiler/interpreter:
goto run_thread;
}
case ThreadBlocked:
scheduleHandleThreadBlocked(t);
- threadPaused(t);
break;
case ThreadFinished:
- if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
+ if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
break;
default:
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(cap); }
+ if (ready_to_gc) {
+ scheduleDoGC(cap,task,rtsFalse,GetRoots);
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
+ }
} /* end of while() */
IF_PAR_DEBUG(verbose,
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
#endif
}
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+static void
+schedulePushWork(Capability *cap USED_IF_THREADS,
+ Task *task USED_IF_THREADS)
+{
+ Capability *free_caps[n_capabilities], *cap0;
+ nat i, n_free_caps;
+
+ // Check whether we have more threads on our run queue, or sparks
+ // in our pool, that we could hand to another Capability.
+ if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
+ && sparkPoolSizeCap(cap) < 2) {
+ return;
+ }
+
+ // First grab as many free Capabilities as we can.
+ for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+ cap0 = &capabilities[i];
+ if (cap != cap0 && tryGrabCapability(cap0,task)) {
+ if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+ // it already has some work, we just grabbed it at
+ // the wrong moment. Or maybe it's deadlocked!
+ releaseCapability(cap0);
+ } else {
+ free_caps[n_free_caps++] = cap0;
+ }
+ }
+ }
+
+ // we now have n_free_caps free capabilities stashed in
+ // free_caps[]. Share our run queue equally with them. This is
+ // probably the simplest thing we could do; improvements we might
+ // want to do include:
+ //
+ // - giving high priority to moving relatively new threads, on
+ // the gournds that they haven't had time to build up a
+ // working set in the cache on this CPU/Capability.
+ //
+ // - giving low priority to moving long-lived threads
+
+ if (n_free_caps > 0) {
+ StgTSO *prev, *t, *next;
+ rtsBool pushed_to_all;
+
+ IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+ i = 0;
+ pushed_to_all = rtsFalse;
+
+ if (cap->run_queue_hd != END_TSO_QUEUE) {
+ prev = cap->run_queue_hd;
+ t = prev->link;
+ prev->link = END_TSO_QUEUE;
+ for (; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+ t->link = END_TSO_QUEUE;
+ if (t->what_next == ThreadRelocated
+ || t->bound == task) { // don't move my bound thread
+ prev->link = t;
+ prev = t;
+ } else if (i == n_free_caps) {
+ pushed_to_all = rtsTrue;
+ i = 0;
+ // keep one for us
+ prev->link = t;
+ prev = t;
+ } else {
+ IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
+ appendToRunQueue(free_caps[i],t);
+ if (t->bound) { t->bound->cap = free_caps[i]; }
+ i++;
+ }
+ }
+ cap->run_queue_tl = prev;
+ }
+
+ // If there are some free capabilities that we didn't push any
+ // threads to, then try to push a spark to each one.
+ if (!pushed_to_all) {
+ StgClosure *spark;
+ // i is the next free capability to push to
+ for (; i < n_free_caps; i++) {
+ if (emptySparkPoolCap(free_caps[i])) {
+ spark = findSpark(cap);
+ if (spark != NULL) {
+ IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
+ newSpark(&(free_caps[i]->r), spark);
+ }
+ }
+ }
+ }
+
+ // release the capabilities
+ for (i = 0; i < n_free_caps; i++) {
+ task->cap = free_caps[i];
+ releaseCapability(free_caps[i]);
+ }
+ }
+ task->cap = cap; // reset to point to our Capability.
+}
+#endif
+
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
static void
-scheduleStartSignalHandlers(void)
+scheduleStartSignalHandlers(Capability *cap)
{
-#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
- if (signals_pending()) {
- RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
- startSignalHandlers();
- ACQUIRE_LOCK(&sched_mutex);
+ if (signals_pending()) { // safe outside the lock
+ startSignalHandlers(cap);
}
-#endif
}
+#else
+static void
+scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
+{
+}
+#endif
/* ----------------------------------------------------------------------------
* Check for blocked threads that can be woken up.
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
-scheduleCheckBlockedThreads(void)
+scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
{
+#if !defined(THREADED_RTS)
//
// Check whether any waiting threads need to be woken up. If the
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
+ if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
-#if defined(RTS_SUPPORTS_THREADS)
- // We shouldn't be here...
- barf("schedule: awaitEvent() in threaded RTS");
-#endif
- awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+ awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
}
+#endif
}
/* ----------------------------------------------------------------------------
* Check for threads blocked on BLACKHOLEs that can be woken up
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
-scheduleCheckBlackHoles( void )
+scheduleCheckBlackHoles (Capability *cap)
{
- if ( blackholes_need_checking )
+ if ( blackholes_need_checking ) // check without the lock first
{
- checkBlackHoles();
- blackholes_need_checking = rtsFalse;
+ ACQUIRE_LOCK(&sched_mutex);
+ if ( blackholes_need_checking ) {
+ checkBlackHoles(cap);
+ blackholes_need_checking = rtsFalse;
+ }
+ RELEASE_LOCK(&sched_mutex);
}
}
/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock(void)
+scheduleDetectDeadlock (Capability *cap, Task *task)
{
+
+#if defined(PARALLEL_HASKELL)
+ // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
+ return;
+#endif
+
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* other tasks are waiting for work, we must have a deadlock of
* some description.
*/
- if ( EMPTY_THREAD_QUEUES() )
+ if ( emptyThreadQueues(cap) )
{
-#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
+ /*
+ * In the threaded RTS, we only check for deadlock if there
+ * has been no activity in a complete timeslice. This means
+ * we won't eagerly start a full GC just because we don't have
+ * any threads to run currently.
+ */
+ if (recent_activity != ACTIVITY_INACTIVE) return;
+#endif
+
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
// Garbage collection can release some new threads due to
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- GarbageCollect(GetRoots,rtsTrue);
- if ( !EMPTY_RUN_QUEUE() ) return;
+ scheduleDoGC( cap, task, rtsTrue/*force major GC*/, GetRoots );
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
-#if defined(RTS_USER_SIGNALS)
+ recent_activity = ACTIVITY_DONE_GC;
+
+ if ( !emptyRunQueue(cap) ) return;
+
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
awaitUserSignals();
if (signals_pending()) {
- RELEASE_LOCK(&sched_mutex);
- startSignalHandlers();
- ACQUIRE_LOCK(&sched_mutex);
+ startSignalHandlers(cap);
}
// either we have threads to run, or we were interrupted:
- ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
+ ASSERT(!emptyRunQueue(cap) || interrupted);
}
#endif
+#if !defined(THREADED_RTS)
/* Probably a real deadlock. Send the current main thread the
- * Deadlock exception (or in the SMP build, send *all* main
- * threads the deadlock exception, since none of them can make
- * progress).
+ * Deadlock exception.
*/
- {
- StgMainThread *m;
- m = main_threads;
- switch (m->tso->why_blocked) {
+ if (task->tso) {
+ switch (task->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
- raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
+ raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
return;
default:
barf("deadlock: main thread blocked in a strange way");
}
}
-
-#elif defined(RTS_SUPPORTS_THREADS)
- // ToDo: add deadlock detection in threaded RTS
-#elif defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
+ return;
#endif
}
}
scheduleActivateSpark(void)
{
#if defined(SPARKS)
- ASSERT(EMPTY_RUN_QUEUE());
+ ASSERT(emptyRunQueue());
/* We get here if the run queue is empty and want some work.
We try to turn a spark into a thread, and add it to the run queue,
from where it will be picked up in the next iteration of the scheduler
static rtsBool
scheduleGetRemoteWork(rtsBool *receivedFinish)
{
- ASSERT(EMPTY_RUN_QUEUE());
+ ASSERT(emptyRunQueue());
if (RtsFlags.ParFlags.BufferTime) {
IF_PAR_DEBUG(verbose,
/* ----------------------------------------------------------------------------
* After running a thread...
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadHeepOverflow
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
(long)t->id, whatNext_strs[t->what_next], blocks));
- // don't do this if it would push us over the
- // alloc_blocks_lim limit; we'll GC first.
- if (alloc_blocks + blocks < alloc_blocks_lim) {
+ // don't do this if the nursery is (nearly) full, we'll GC first.
+ if (cap->r.rCurrentNursery->link != NULL ||
+ cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
+ // if the nursery has only one block.
- alloc_blocks += blocks;
+ ACQUIRE_SM_LOCK
bd = allocGroup( blocks );
+ RELEASE_SM_LOCK
+ cap->r.rNursery->n_blocks += blocks;
// link the new group into the list
bd->link = cap->r.rCurrentNursery;
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
g0s0 == cap->r.rNursery);
- g0s0->blocks = bd;
#endif
cap->r.rNursery->blocks = bd;
}
{
bdescr *x;
for (x = bd; x < bd + blocks; x++) {
- x->step = g0s0;
+ x->step = cap->r.rNursery;
x->gen_no = 0;
x->flags = 0;
}
}
-#if !defined(SMP)
- // don't forget to update the block count in g0s0.
- g0s0->n_blocks += blocks;
-
// This assert can be a killer if the app is doing lots
// of large block allocations.
- ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
-#endif
+ IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
// now update the nursery to point to the new block
cap->r.rCurrentNursery = bd;
// run queue before us and steal the large block, but in that
// case the thread will just end up requesting another large
// block.
- PUSH_ON_RUN_QUEUE(t);
+ pushOnRunQueue(cap,t);
return rtsFalse; /* not actually GC'ing */
}
}
- /* make all the running tasks block on a condition variable,
- * maybe set context_switch and wait till they all pile in,
- * then have them wait on a GC condition variable.
- */
IF_DEBUG(scheduler,
debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
(long)t->id, whatNext_strs[t->what_next]));
- threadPaused(t);
#if defined(GRAN)
ASSERT(!is_on_queue(t,CurrentProc));
#elif defined(PARALLEL_HASKELL)
}
#endif
- PUSH_ON_RUN_QUEUE(t);
+ pushOnRunQueue(cap,t);
return rtsTrue;
/* actual GC is done at the end of the while loop in schedule() */
}
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadStackOverflow
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
-scheduleHandleStackOverflow( StgTSO *t)
+scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
{
IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
(long)t->id, whatNext_strs[t->what_next]));
/* just adjust the stack for this thread, then pop it back
* on the run queue.
*/
- threadPaused(t);
{
/* enlarge the stack */
- StgTSO *new_t = threadStackOverflow(t);
+ StgTSO *new_t = threadStackOverflow(cap, t);
- /* This TSO has moved, so update any pointers to it from the
- * main thread stack. It better not be on any other queues...
- * (it shouldn't be).
+ /* The TSO attached to this Task may have moved, so update the
+ * pointer to it.
*/
- if (t->main != NULL) {
- t->main->tso = new_t;
+ if (task->tso == t) {
+ task->tso = new_t;
}
- PUSH_ON_RUN_QUEUE(new_t);
+ pushOnRunQueue(cap,new_t);
}
}
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadYielding
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
-scheduleHandleYield( StgTSO *t, nat prev_what_next )
+scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
{
// Reset the context switch flag. We don't do this just before
// running the thread, because that would mean we would lose ticks
return rtsTrue;
}
- threadPaused(t);
-
#if defined(GRAN)
ASSERT(!is_on_queue(t,CurrentProc));
#endif
- addToRunQueue(t);
+ addToRunQueue(cap,t);
#if defined(GRAN)
/* add a ContinueThread event to actually process the thread */
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadBlocked
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
emitSchedule = rtsTrue;
#else /* !GRAN */
- /* don't need to do anything. Either the thread is blocked on
- * I/O, in which case we'll have called addToBlockedQueue
- * previously, or it's blocked on an MVar or Blackhole, in which
- * case it'll be on the relevant queue already.
- */
+
+ // We don't need to do anything. The thread is blocked, and it
+ // has tidied up its stack and placed itself on whatever queue
+ // it needs to be on.
+
+#if !defined(THREADED_RTS)
ASSERT(t->why_blocked != NotBlocked);
+ // This might not be true under THREADED_RTS: we don't have
+ // exclusive access to this TSO, so someone might have
+ // woken it up by now. This actually happens: try
+ // conc023 +RTS -N2.
+#endif
+
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadFinished
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
-scheduleHandleThreadFinished( StgMainThread *mainThread
- USED_WHEN_RTS_SUPPORTS_THREADS,
- Capability *cap,
- StgTSO *t )
+scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
{
/* Need to check whether this was a main thread, and if so,
* return with the return value.
#endif // PARALLEL_HASKELL
//
- // Check whether the thread that just completed was a main
+ // Check whether the thread that just completed was a bound
// thread, and if so return with the result.
//
// There is an assumption here that all thread completion goes
// ends up in the ThreadKilled state, that it stays on the run
// queue so it can be dealt with here.
//
- if (
-#if defined(RTS_SUPPORTS_THREADS)
- mainThread != NULL
+
+ if (t->bound) {
+
+ if (t->bound != task) {
+#if !defined(THREADED_RTS)
+ // Must be a bound thread that is not the topmost one. Leave
+ // it on the run queue until the stack has unwound to the
+ // point where we can deal with this. Leaving it on the run
+ // queue also ensures that the garbage collector knows about
+ // this thread and its return value (it gets dropped from the
+ // all_threads list so there's no other way to find it).
+ appendToRunQueue(cap,t);
+ return rtsFalse;
#else
- mainThread->tso == t
+ // this cannot happen in the threaded RTS, because a
+ // bound thread can only be run by the appropriate Task.
+ barf("finished bound thread that isn't mine");
#endif
- )
- {
- // We are a bound thread: this must be our thread that just
- // completed.
- ASSERT(mainThread->tso == t);
+ }
+
+ ASSERT(task->tso == t);
if (t->what_next == ThreadComplete) {
- if (mainThread->ret) {
+ if (task->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
+ *(task->ret) = (StgClosure *)task->tso->sp[1];
}
- mainThread->stat = Success;
+ task->stat = Success;
} else {
- if (mainThread->ret) {
- *(mainThread->ret) = NULL;
+ if (task->ret) {
+ *(task->ret) = NULL;
}
if (interrupted) {
- mainThread->stat = Interrupted;
+ task->stat = Interrupted;
} else {
- mainThread->stat = Killed;
+ task->stat = Killed;
}
}
#ifdef DEBUG
- removeThreadLabel((StgWord)mainThread->tso->id);
+ removeThreadLabel((StgWord)task->tso->id);
#endif
- if (mainThread->prev == NULL) {
- main_threads = mainThread->link;
- } else {
- mainThread->prev->link = mainThread->link;
- }
- if (mainThread->link != NULL) {
- mainThread->link->prev = mainThread->prev;
- }
- releaseCapability(cap);
return rtsTrue; // tells schedule() to return
}
-#ifdef RTS_SUPPORTS_THREADS
- ASSERT(t->main == NULL);
-#else
- if (t->main != NULL) {
- // Must be a main thread that is not the topmost one. Leave
- // it on the run queue until the stack has unwound to the
- // point where we can deal with this. Leaving it on the run
- // queue also ensures that the garbage collector knows about
- // this thread and its return value (it gets dropped from the
- // all_threads list so there's no other way to find it).
- APPEND_TO_RUN_QUEUE(t);
- }
-#endif
return rtsFalse;
}
if (performHeapProfile ||
(RtsFlags.ProfFlags.profileInterval==0 &&
RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
+
+ // checking black holes is necessary before GC, otherwise
+ // there may be threads that are unreachable except by the
+ // blackhole queue, which the GC will consider to be
+ // deadlocked.
+ scheduleCheckBlackHoles(&MainCapability);
+
+ IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
GarbageCollect(GetRoots, rtsTrue);
+
+ IF_DEBUG(scheduler, sched_belch("performing heap census"));
heapCensus();
+
performHeapProfile = rtsFalse;
return rtsTrue; // true <=> we already GC'd
}
/* -----------------------------------------------------------------------------
* Perform a garbage collection if necessary
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC( Capability *cap STG_UNUSED )
+scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
+ rtsBool force_major, void (*get_roots)(evac_fn))
{
StgTSO *t;
-#ifdef SMP
- static rtsBool waiting_for_gc;
- int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
- // subtract one because we're already holding one.
- Capability *caps[n_capabilities];
+#ifdef THREADED_RTS
+ static volatile StgWord waiting_for_gc;
+ rtsBool was_waiting;
+ nat i;
#endif
-#ifdef SMP
+#ifdef THREADED_RTS
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
// the other tasks to sleep and stay asleep.
//
- // Someone else is already trying to GC
- if (waiting_for_gc) return;
- waiting_for_gc = rtsTrue;
-
- caps[n_capabilities] = cap;
- while (n_capabilities > 0) {
- IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
- waitForReturnCapability(&sched_mutex, &cap);
- n_capabilities--;
- caps[n_capabilities] = cap;
+ was_waiting = cas(&waiting_for_gc, 0, 1);
+ if (was_waiting) {
+ do {
+ IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+ if (cap) yieldCapability(&cap,task);
+ } while (waiting_for_gc);
+ return; // NOTE: task->cap might have changed here
+ }
+
+ for (i=0; i < n_capabilities; i++) {
+ IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
+ if (cap != &capabilities[i]) {
+ Capability *pcap = &capabilities[i];
+ // we better hope this task doesn't get migrated to
+ // another Capability while we're waiting for this one.
+ // It won't, because load balancing happens while we have
+ // all the Capabilities, but even so it's a slightly
+ // unsavoury invariant.
+ task->cap = pcap;
+ context_switch = 1;
+ waitForReturnCapability(&pcap, task);
+ if (pcap != &capabilities[i]) {
+ barf("scheduleDoGC: got the wrong capability");
+ }
+ }
}
waiting_for_gc = rtsFalse;
* atomically frames. When next scheduled they will try to
* commit, this commit will fail and they will retry.
*/
- for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
- if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateTransaction (t -> trec)) {
- IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-
- // strip the stack back to the ATOMICALLY_FRAME, aborting
- // the (nested) transaction, and saving the stack of any
- // partially-evaluated thunks on the heap.
- raiseAsync_(t, NULL, rtsTrue);
-
+ {
+ StgTSO *next;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateNestOfTransactions (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the
+ // ATOMICALLY_FRAME, aborting the (nested)
+ // transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
+
#ifdef REG_R1
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
+ }
+ }
}
}
}
// so this happens periodically:
- scheduleCheckBlackHoles();
+ if (cap) scheduleCheckBlackHoles(cap);
+ IF_DEBUG(scheduler, printAllThreads());
+
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots,rtsFalse);
+ GarbageCollect(get_roots, force_major);
-#if defined(SMP)
- {
- // release our stash of capabilities.
- nat i;
- for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
- releaseCapability(caps[i]);
+#if defined(THREADED_RTS)
+ // release our stash of capabilities.
+ for (i = 0; i < n_capabilities; i++) {
+ if (cap != &capabilities[i]) {
+ task->cap = &capabilities[i];
+ releaseCapability(&capabilities[i]);
}
}
+ if (cap) {
+ task->cap = cap;
+ } else {
+ task->cap = NULL;
+ }
#endif
#if defined(GRAN)
StgBool
rtsSupportsBoundThreads(void)
{
-#ifdef THREADED_RTS
+#if defined(THREADED_RTS)
return rtsTrue;
#else
return rtsFalse;
* ------------------------------------------------------------------------- */
StgBool
-isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
+isThreadBound(StgTSO* tso USED_IF_THREADS)
{
-#ifdef THREADED_RTS
- return (tso->main != NULL);
+#if defined(THREADED_RTS)
+ return (tso->bound != NULL);
#endif
return rtsFalse;
}
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-#ifndef mingw32_HOST_OS
+#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(StgTSO *tso);
+deleteThreadImmediately(Capability *cap, StgTSO *tso);
#endif
StgInt
forkProcess(HsStablePtr *entry
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
- pid_t pid;
- StgTSO* t,*next;
- StgMainThread *m;
- SchedulerStatus rc;
-
- IF_DEBUG(scheduler,sched_belch("forking!"));
- rts_lock(); // This not only acquires sched_mutex, it also
- // makes sure that no other threads are running
-
- pid = fork();
-
- if (pid) { /* parent */
-
- /* just return the pid */
- rts_unlock();
- return pid;
-
- } else { /* child */
+ Task *task;
+ pid_t pid;
+ StgTSO* t,*next;
+ Capability *cap;
-
- // delete all threads
- run_queue_hd = run_queue_tl = END_TSO_QUEUE;
-
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
-
- // don't allow threads to catch the ThreadKilled exception
- deleteThreadImmediately(t);
- }
-
- // wipe the main thread list
- while((m = main_threads) != NULL) {
- main_threads = m->link;
-# ifdef THREADED_RTS
- closeCondition(&m->bound_thread_cond);
-# endif
- stgFree(m);
+#if defined(THREADED_RTS)
+ if (RtsFlags.ParFlags.nNodes > 1) {
+ errorBelch("forking not supported with +RTS -N<n> greater than 1");
+ stg_exit(EXIT_FAILURE);
}
+#endif
+
+ IF_DEBUG(scheduler,sched_belch("forking!"));
- rc = rts_evalStableIO(entry, NULL); // run the action
- rts_checkSchedStatus("forkProcess",rc);
+ // ToDo: for SMP, we should probably acquire *all* the capabilities
+ cap = rts_lock();
- rts_unlock();
+ pid = fork();
- hs_exit(); // clean up and exit
- stg_exit(0);
- }
+ if (pid) { // parent
+
+ // just return the pid
+ rts_unlock(cap);
+ return pid;
+
+ } else { // child
+
+ // delete all threads
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ // don't allow threads to catch the ThreadKilled exception
+ deleteThreadImmediately(cap,t);
+ }
+
+ // wipe the task list
+ ACQUIRE_LOCK(&sched_mutex);
+ for (task = all_tasks; task != NULL; task=task->all_link) {
+ if (task != cap->running_task) discardTask(task);
+ }
+ RELEASE_LOCK(&sched_mutex);
+
+ cap->suspended_ccalling_tasks = NULL;
+
+#if defined(THREADED_RTS)
+ // wipe our spare workers list.
+ cap->spare_workers = NULL;
+ cap->returning_tasks_hd = NULL;
+ cap->returning_tasks_tl = NULL;
+#endif
+
+ cap = rts_evalStableIO(cap, entry, NULL); // run the action
+ rts_checkSchedStatus("forkProcess",cap);
+
+ rts_unlock(cap);
+ hs_exit(); // clean up and exit
+ stg_exit(EXIT_SUCCESS);
+ }
#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
- barf("forkProcess#: primop not supported, sorry!\n");
- return -1;
+ barf("forkProcess#: primop not supported on this platform, sorry!\n");
+ return -1;
#endif
}
/* ---------------------------------------------------------------------------
- * deleteAllThreads(): kill all the live threads.
- *
- * This is used when we catch a user interrupt (^C), before performing
- * any necessary cleanups and running finalizers.
- *
- * Locks: sched_mutex held.
+ * Delete the threads on the run queue of the current capability.
* ------------------------------------------------------------------------- */
-void
-deleteAllThreads ( void )
+static void
+deleteRunQueue (Capability *cap)
{
- StgTSO* t, *next;
- IF_DEBUG(scheduler,sched_belch("deleting all threads"));
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->global_link;
- deleteThread(t);
- }
-
- // The run queue now contains a bunch of ThreadKilled threads. We
- // must not throw these away: the main thread(s) will be in there
- // somewhere, and the main scheduler loop has to deal with it.
- // Also, the run queue is the only thing keeping these threads from
- // being GC'd, and we don't want the "main thread has been GC'd" panic.
-
- ASSERT(blocked_queue_hd == END_TSO_QUEUE);
- ASSERT(blackhole_queue == END_TSO_QUEUE);
- ASSERT(sleeping_queue == END_TSO_QUEUE);
+ StgTSO *t, *next;
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
+ ASSERT(t->what_next != ThreadRelocated);
+ next = t->link;
+ deleteThread(cap, t);
+ }
}
/* startThread and insertThread are now in GranSim.c -- HWL */
+/* -----------------------------------------------------------------------------
+ Managing the suspended_ccalling_tasks list.
+ Locks required: sched_mutex
+ -------------------------------------------------------------------------- */
+
+STATIC_INLINE void
+suspendTask (Capability *cap, Task *task)
+{
+ ASSERT(task->next == NULL && task->prev == NULL);
+ task->next = cap->suspended_ccalling_tasks;
+ task->prev = NULL;
+ if (cap->suspended_ccalling_tasks) {
+ cap->suspended_ccalling_tasks->prev = task;
+ }
+ cap->suspended_ccalling_tasks = task;
+}
+
+STATIC_INLINE void
+recoverSuspendedTask (Capability *cap, Task *task)
+{
+ if (task->prev) {
+ task->prev->next = task->next;
+ } else {
+ ASSERT(cap->suspended_ccalling_tasks == task);
+ cap->suspended_ccalling_tasks = task->next;
+ }
+ if (task->next) {
+ task->next->prev = task->prev;
+ }
+ task->next = task->prev = NULL;
+}
+
/* ---------------------------------------------------------------------------
* Suspending & resuming Haskell threads.
*
* on return from the C function.
* ------------------------------------------------------------------------- */
-StgInt
-suspendThread( StgRegTable *reg )
+void *
+suspendThread (StgRegTable *reg)
{
- nat tok;
Capability *cap;
int saved_errno = errno;
+ StgTSO *tso;
+ Task *task;
- /* assume that *reg is a pointer to the StgRegTable part
- * of a Capability.
+ /* assume that *reg is a pointer to the StgRegTable part of a Capability.
*/
- cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
+ cap = regTableToCapability(reg);
- ACQUIRE_LOCK(&sched_mutex);
+ task = cap->running_task;
+ tso = cap->r.rCurrentTSO;
IF_DEBUG(scheduler,
- sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
+ sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
// XXX this might not be necessary --SDM
- cap->r.rCurrentTSO->what_next = ThreadRunGHC;
+ tso->what_next = ThreadRunGHC;
- threadPaused(cap->r.rCurrentTSO);
- cap->r.rCurrentTSO->link = suspended_ccalling_threads;
- suspended_ccalling_threads = cap->r.rCurrentTSO;
+ threadPaused(cap,tso);
- if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
- cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
- cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+ if(tso->blocked_exceptions == NULL) {
+ tso->why_blocked = BlockedOnCCall;
+ tso->blocked_exceptions = END_TSO_QUEUE;
} else {
- cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+ tso->why_blocked = BlockedOnCCall_NoUnblockExc;
}
- /* Use the thread ID as the token; it should be unique */
- tok = cap->r.rCurrentTSO->id;
+ // Hand back capability
+ task->suspended_tso = tso;
- /* Hand back capability */
- cap->r.rInHaskell = rtsFalse;
- releaseCapability(cap);
+ ACQUIRE_LOCK(&cap->lock);
+
+ suspendTask(cap,task);
+ cap->in_haskell = rtsFalse;
+ releaseCapability_(cap);
-#if defined(RTS_SUPPORTS_THREADS)
+ RELEASE_LOCK(&cap->lock);
+
+#if defined(THREADED_RTS)
/* Preparing to leave the RTS, so ensure there's a native thread/task
waiting to take over.
*/
- IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
+ IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
#endif
- RELEASE_LOCK(&sched_mutex);
-
errno = saved_errno;
- return tok;
+ return task;
}
StgRegTable *
-resumeThread( StgInt tok )
+resumeThread (void *task_)
{
- StgTSO *tso, **prev;
- Capability *cap;
- int saved_errno = errno;
-
-#if defined(RTS_SUPPORTS_THREADS)
- /* Wait for permission to re-enter the RTS with the result. */
- ACQUIRE_LOCK(&sched_mutex);
- waitForReturnCapability(&sched_mutex, &cap);
-
- IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
-#else
- grabCapability(&cap);
-#endif
-
- /* Remove the thread off of the suspended list */
- prev = &suspended_ccalling_threads;
- for (tso = suspended_ccalling_threads;
- tso != END_TSO_QUEUE;
- prev = &tso->link, tso = tso->link) {
- if (tso->id == (StgThreadID)tok) {
- *prev = tso->link;
- break;
+ StgTSO *tso;
+ Capability *cap;
+ int saved_errno = errno;
+ Task *task = task_;
+
+ cap = task->cap;
+ // Wait for permission to re-enter the RTS with the result.
+ waitForReturnCapability(&cap,task);
+ // we might be on a different capability now... but if so, our
+ // entry on the suspended_ccalling_tasks list will also have been
+ // migrated.
+
+ // Remove the thread from the suspended list
+ recoverSuspendedTask(cap,task);
+
+ tso = task->suspended_tso;
+ task->suspended_tso = NULL;
+ tso->link = END_TSO_QUEUE;
+ IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
+
+ if (tso->why_blocked == BlockedOnCCall) {
+ awakenBlockedQueue(cap,tso->blocked_exceptions);
+ tso->blocked_exceptions = NULL;
}
- }
- if (tso == END_TSO_QUEUE) {
- barf("resumeThread: thread not found");
- }
- tso->link = END_TSO_QUEUE;
-
- if(tso->why_blocked == BlockedOnCCall) {
- awakenBlockedQueueNoLock(tso->blocked_exceptions);
- tso->blocked_exceptions = NULL;
- }
-
- /* Reset blocking status */
- tso->why_blocked = NotBlocked;
+
+ /* Reset blocking status */
+ tso->why_blocked = NotBlocked;
+
+ cap->r.rCurrentTSO = tso;
+ cap->in_haskell = rtsTrue;
+ errno = saved_errno;
- cap->r.rCurrentTSO = tso;
- cap->r.rInHaskell = rtsTrue;
- RELEASE_LOCK(&sched_mutex);
- errno = saved_errno;
- return &cap->r;
+ /* We might have GC'd, mark the TSO dirty again */
+ dirtyTSO(tso);
+
+ return &cap->r;
}
/* ---------------------------------------------------------------------------
createThread(nat size, StgInt pri)
#else
StgTSO *
-createThread(nat size)
+createThread(Capability *cap, nat size)
#endif
{
-
StgTSO *tso;
nat stack_size;
+ /* sched_mutex is *not* required */
+
/* First check whether we should create a thread at all */
#if defined(PARALLEL_HASKELL)
- /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
- if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
- threadsIgnored++;
- debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
- RtsFlags.ParFlags.maxThreads, advisory_thread_count);
- return END_TSO_QUEUE;
- }
- threadsCreated++;
+ /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+ threadsIgnored++;
+ debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ threadsCreated++;
#endif
#if defined(GRAN)
- ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
#endif
- // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
-
- /* catch ridiculously small stack sizes */
- if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
- size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
- }
+ // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
- stack_size = size - TSO_STRUCT_SIZEW;
+ /* catch ridiculously small stack sizes */
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ }
- tso = (StgTSO *)allocate(size);
- TICK_ALLOC_TSO(stack_size, 0);
+ stack_size = size - TSO_STRUCT_SIZEW;
+
+ tso = (StgTSO *)allocateLocal(cap, size);
+ TICK_ALLOC_TSO(stack_size, 0);
- SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+ SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
#if defined(GRAN)
- SET_GRAN_HDR(tso, ThisPE);
+ SET_GRAN_HDR(tso, ThisPE);
#endif
- // Always start with the compiled code evaluator
- tso->what_next = ThreadRunGHC;
-
- tso->id = next_thread_id++;
- tso->why_blocked = NotBlocked;
- tso->blocked_exceptions = NULL;
-
- tso->saved_errno = 0;
- tso->main = NULL;
-
- tso->stack_size = stack_size;
- tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- - TSO_STRUCT_SIZEW;
- tso->sp = (P_)&(tso->stack) + stack_size;
+ // Always start with the compiled code evaluator
+ tso->what_next = ThreadRunGHC;
- tso->trec = NO_TREC;
+ tso->why_blocked = NotBlocked;
+ tso->blocked_exceptions = NULL;
+ tso->flags = TSO_DIRTY;
+
+ tso->saved_errno = 0;
+ tso->bound = NULL;
+
+ tso->stack_size = stack_size;
+ tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
+ - TSO_STRUCT_SIZEW;
+ tso->sp = (P_)&(tso->stack) + stack_size;
+ tso->trec = NO_TREC;
+
#ifdef PROFILING
- tso->prof.CCCS = CCS_MAIN;
+ tso->prof.CCCS = CCS_MAIN;
#endif
-
+
/* put a stop frame on the stack */
- tso->sp -= sizeofW(StgStopFrame);
- SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
- tso->link = END_TSO_QUEUE;
-
+ tso->sp -= sizeofW(StgStopFrame);
+ SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+ tso->link = END_TSO_QUEUE;
+
// ToDo: check this
#if defined(GRAN)
- /* uses more flexible routine in GranSim */
- insertThread(tso, CurrentProc);
+ /* uses more flexible routine in GranSim */
+ insertThread(tso, CurrentProc);
#else
- /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
- * from its creation
- */
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+ * from its creation
+ */
#endif
-
+
#if defined(GRAN)
- if (RtsFlags.GranFlags.GranSimStats.Full)
- DumpGranEvent(GR_START,tso);
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpGranEvent(GR_START,tso);
#elif defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.ParStats.Full)
- DumpGranEvent(GR_STARTQ,tso);
- /* HACk to avoid SCHEDULE
- LastTSO = tso; */
+ if (RtsFlags.ParFlags.ParStats.Full)
+ DumpGranEvent(GR_STARTQ,tso);
+ /* HACk to avoid SCHEDULE
+ LastTSO = tso; */
#endif
-
- /* Link the new thread on the global thread list.
- */
- tso->global_link = all_threads;
- all_threads = tso;
-
+
+ /* Link the new thread on the global thread list.
+ */
+ ACQUIRE_LOCK(&sched_mutex);
+ tso->id = next_thread_id++; // while we have the mutex
+ tso->global_link = all_threads;
+ all_threads = tso;
+ RELEASE_LOCK(&sched_mutex);
+
#if defined(DIST)
- tso->dist.priority = MandatoryPriority; //by default that is...
+ tso->dist.priority = MandatoryPriority; //by default that is...
#endif
-
+
#if defined(GRAN)
- tso->gran.pri = pri;
+ tso->gran.pri = pri;
# if defined(DEBUG)
- tso->gran.magic = TSO_MAGIC; // debugging only
+ tso->gran.magic = TSO_MAGIC; // debugging only
# endif
- tso->gran.sparkname = 0;
- tso->gran.startedat = CURRENT_TIME;
- tso->gran.exported = 0;
- tso->gran.basicblocks = 0;
- tso->gran.allocs = 0;
- tso->gran.exectime = 0;
- tso->gran.fetchtime = 0;
- tso->gran.fetchcount = 0;
- tso->gran.blocktime = 0;
- tso->gran.blockcount = 0;
- tso->gran.blockedat = 0;
- tso->gran.globalsparks = 0;
- tso->gran.localsparks = 0;
- if (RtsFlags.GranFlags.Light)
- tso->gran.clock = Now; /* local clock */
- else
- tso->gran.clock = 0;
-
- IF_DEBUG(gran,printTSO(tso));
+ tso->gran.sparkname = 0;
+ tso->gran.startedat = CURRENT_TIME;
+ tso->gran.exported = 0;
+ tso->gran.basicblocks = 0;
+ tso->gran.allocs = 0;
+ tso->gran.exectime = 0;
+ tso->gran.fetchtime = 0;
+ tso->gran.fetchcount = 0;
+ tso->gran.blocktime = 0;
+ tso->gran.blockcount = 0;
+ tso->gran.blockedat = 0;
+ tso->gran.globalsparks = 0;
+ tso->gran.localsparks = 0;
+ if (RtsFlags.GranFlags.Light)
+ tso->gran.clock = Now; /* local clock */
+ else
+ tso->gran.clock = 0;
+
+ IF_DEBUG(gran,printTSO(tso));
#elif defined(PARALLEL_HASKELL)
# if defined(DEBUG)
- tso->par.magic = TSO_MAGIC; // debugging only
+ tso->par.magic = TSO_MAGIC; // debugging only
# endif
- tso->par.sparkname = 0;
- tso->par.startedat = CURRENT_TIME;
- tso->par.exported = 0;
- tso->par.basicblocks = 0;
- tso->par.allocs = 0;
- tso->par.exectime = 0;
- tso->par.fetchtime = 0;
- tso->par.fetchcount = 0;
- tso->par.blocktime = 0;
- tso->par.blockcount = 0;
- tso->par.blockedat = 0;
- tso->par.globalsparks = 0;
- tso->par.localsparks = 0;
+ tso->par.sparkname = 0;
+ tso->par.startedat = CURRENT_TIME;
+ tso->par.exported = 0;
+ tso->par.basicblocks = 0;
+ tso->par.allocs = 0;
+ tso->par.exectime = 0;
+ tso->par.fetchtime = 0;
+ tso->par.fetchcount = 0;
+ tso->par.blocktime = 0;
+ tso->par.blockcount = 0;
+ tso->par.blockedat = 0;
+ tso->par.globalsparks = 0;
+ tso->par.localsparks = 0;
#endif
-
+
#if defined(GRAN)
- globalGranStats.tot_threads_created++;
- globalGranStats.threads_created_on_PE[CurrentProc]++;
- globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
- globalGranStats.tot_sq_probes++;
+ globalGranStats.tot_threads_created++;
+ globalGranStats.threads_created_on_PE[CurrentProc]++;
+ globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
+ globalGranStats.tot_sq_probes++;
#elif defined(PARALLEL_HASKELL)
- // collect parallel global statistics (currently done together with GC stats)
- if (RtsFlags.ParFlags.ParStats.Global &&
- RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
- //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
- globalParStats.tot_threads_created++;
- }
+ // collect parallel global statistics (currently done together with GC stats)
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
+ globalParStats.tot_threads_created++;
+ }
#endif
-
+
#if defined(GRAN)
- IF_GRAN_DEBUG(pri,
- sched_belch("==__ schedule: Created TSO %d (%p);",
- CurrentProc, tso, tso->id));
+ IF_GRAN_DEBUG(pri,
+ sched_belch("==__ schedule: Created TSO %d (%p);",
+ CurrentProc, tso, tso->id));
#elif defined(PARALLEL_HASKELL)
- IF_PAR_DEBUG(verbose,
- sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
- (long)tso->id, tso, advisory_thread_count));
+ IF_PAR_DEBUG(verbose,
+ sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
+ (long)tso->id, tso, advisory_thread_count));
#else
- IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
- (long)tso->id, (long)tso->stack_size));
+ IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
+ (long)tso->id, (long)tso->stack_size));
#endif
- return tso;
+ return tso;
}
#if defined(PAR)
/* ---------------------------------------------------------------------------
* scheduleThread()
*
- * scheduleThread puts a thread on the head of the runnable queue.
+ * scheduleThread puts a thread on the end of the runnable queue.
* This will usually be done immediately after a thread is created.
* The caller of scheduleThread must create the thread using e.g.
* createThread and push an appropriate closure
* on this thread's stack before the scheduler is invoked.
* ------------------------------------------------------------------------ */
-static void
-scheduleThread_(StgTSO *tso)
+void
+scheduleThread(Capability *cap, StgTSO *tso)
{
- // The thread goes at the *end* of the run-queue, to avoid possible
- // starvation of any threads already on the queue.
- APPEND_TO_RUN_QUEUE(tso);
- threadRunnable();
+ // The thread goes at the *end* of the run-queue, to avoid possible
+ // starvation of any threads already on the queue.
+ appendToRunQueue(cap,tso);
}
-void
-scheduleThread(StgTSO* tso)
+Capability *
+scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
{
- ACQUIRE_LOCK(&sched_mutex);
- scheduleThread_(tso);
- RELEASE_LOCK(&sched_mutex);
-}
+ Task *task;
+
+ // We already created/initialised the Task
+ task = cap->running_task;
+
+ // This TSO is now a bound thread; make the Task and TSO
+ // point to each other.
+ tso->bound = task;
+
+ task->tso = tso;
+ task->ret = ret;
+ task->stat = NoStatus;
+
+ appendToRunQueue(cap,tso);
-#if defined(RTS_SUPPORTS_THREADS)
-static Condition bound_cond_cache;
-static int bound_cond_cache_full = 0;
+ IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
+
+#if defined(GRAN)
+ /* GranSim specific init */
+ CurrentTSO = m->tso; // the TSO to run
+ procStatus[MainProc] = Busy; // status of main PE
+ CurrentProc = MainProc; // PE to run it on
#endif
+ cap = schedule(cap,task);
+
+ ASSERT(task->stat != NoStatus);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
+ IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
+ return cap;
+}
+
+/* ----------------------------------------------------------------------------
+ * Starting Tasks
+ * ------------------------------------------------------------------------- */
-SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
- Capability *initialCapability)
+#if defined(THREADED_RTS)
+void
+workerStart(Task *task)
{
- // Precondition: sched_mutex must be held
- StgMainThread *m;
-
- m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
- m->tso = tso;
- tso->main = m;
- m->ret = ret;
- m->stat = NoStatus;
- m->link = main_threads;
- m->prev = NULL;
- if (main_threads != NULL) {
- main_threads->prev = m;
- }
- main_threads = m;
-
-#if defined(RTS_SUPPORTS_THREADS)
- // Allocating a new condition for each thread is expensive, so we
- // cache one. This is a pretty feeble hack, but it helps speed up
- // consecutive call-ins quite a bit.
- if (bound_cond_cache_full) {
- m->bound_thread_cond = bound_cond_cache;
- bound_cond_cache_full = 0;
- } else {
- initCondition(&m->bound_thread_cond);
- }
-#endif
+ Capability *cap;
- /* Put the thread on the main-threads list prior to scheduling the TSO.
- Failure to do so introduces a race condition in the MT case (as
- identified by Wolfgang Thaller), whereby the new task/OS thread
- created by scheduleThread_() would complete prior to the thread
- that spawned it managed to put 'itself' on the main-threads list.
- The upshot of it all being that the worker thread wouldn't get to
- signal the completion of the its work item for the main thread to
- see (==> it got stuck waiting.) -- sof 6/02.
- */
- IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
-
- APPEND_TO_RUN_QUEUE(tso);
- // NB. Don't call threadRunnable() here, because the thread is
- // bound and only runnable by *this* OS thread, so waking up other
- // workers will just slow things down.
+ // See startWorkerTask().
+ ACQUIRE_LOCK(&task->lock);
+ cap = task->cap;
+ RELEASE_LOCK(&task->lock);
- return waitThread_(m, initialCapability);
+ // set the thread-local pointer to the Task:
+ taskEnter(task);
+
+ // schedule() runs without a lock.
+ cap = schedule(cap,task);
+
+ // On exit from schedule(), we have a Capability.
+ releaseCapability(cap);
+ taskStop(task);
}
+#endif
/* ---------------------------------------------------------------------------
* initScheduler()
{
#if defined(GRAN)
nat i;
-
for (i=0; i<=MAX_PROC; i++) {
run_queue_hds[i] = END_TSO_QUEUE;
run_queue_tls[i] = END_TSO_QUEUE;
blackhole_queue[i] = END_TSO_QUEUE;
sleeping_queue = END_TSO_QUEUE;
}
-#else
- run_queue_hd = END_TSO_QUEUE;
- run_queue_tl = END_TSO_QUEUE;
+#elif !defined(THREADED_RTS)
blocked_queue_hd = END_TSO_QUEUE;
blocked_queue_tl = END_TSO_QUEUE;
- blackhole_queue = END_TSO_QUEUE;
sleeping_queue = END_TSO_QUEUE;
-#endif
-
- suspended_ccalling_threads = END_TSO_QUEUE;
+#endif
- main_threads = NULL;
- all_threads = END_TSO_QUEUE;
+ blackhole_queue = END_TSO_QUEUE;
+ all_threads = END_TSO_QUEUE;
context_switch = 0;
interrupted = 0;
RtsFlags.ConcFlags.ctxtSwitchTicks =
RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
/* Initialise the mutex and condition variables used by
* the scheduler. */
initMutex(&sched_mutex);
- initMutex(&term_mutex);
#endif
ACQUIRE_LOCK(&sched_mutex);
/* A capability holds the state a native thread needs in
* order to execute STG code. At least one capability is
- * floating around (only SMP builds have more than one).
+ * floating around (only THREADED_RTS builds have more than one).
*/
initCapabilities();
-
-#if defined(RTS_SUPPORTS_THREADS)
+
initTaskManager();
-#endif
-#if defined(SMP)
- /* eagerly start some extra workers */
- startingWorkerThread = RtsFlags.ParFlags.nNodes;
- startTasks(RtsFlags.ParFlags.nNodes, taskStart);
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ initSparkPools();
#endif
-#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
- initSparkPools();
+#if defined(THREADED_RTS)
+ /*
+ * Eagerly start one worker to run each Capability, except for
+ * Capability 0. The idea is that we're probably going to start a
+ * bound thread on Capability 0 pretty soon, so we don't want a
+ * worker task hogging it.
+ */
+ {
+ nat i;
+ Capability *cap;
+ for (i = 1; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ ACQUIRE_LOCK(&cap->lock);
+ startWorkerTask(cap, workerStart);
+ RELEASE_LOCK(&cap->lock);
+ }
+ }
#endif
RELEASE_LOCK(&sched_mutex);
{
interrupted = rtsTrue;
shutting_down_scheduler = rtsTrue;
-#if defined(RTS_SUPPORTS_THREADS)
- if (threadIsTask(osThreadId())) { taskStop(); }
- stopTaskManager();
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- Managing the per-task allocation areas.
-
- Each capability comes with an allocation area. These are
- fixed-length block lists into which allocation can be done.
-
- ToDo: no support for two-space collection at the moment???
- ------------------------------------------------------------------------- */
-
-static SchedulerStatus
-waitThread_(StgMainThread* m, Capability *initialCapability)
-{
- SchedulerStatus stat;
-
- // Precondition: sched_mutex must be held.
- IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
-
-#if defined(GRAN)
- /* GranSim specific init */
- CurrentTSO = m->tso; // the TSO to run
- procStatus[MainProc] = Busy; // status of main PE
- CurrentProc = MainProc; // PE to run it on
- schedule(m,initialCapability);
-#else
- schedule(m,initialCapability);
- ASSERT(m->stat != NoStatus);
-#endif
- stat = m->stat;
+#if defined(THREADED_RTS)
+ {
+ Task *task;
+ nat i;
+
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
-#if defined(RTS_SUPPORTS_THREADS)
- // Free the condition variable, returning it to the cache if possible.
- if (!bound_cond_cache_full) {
- bound_cond_cache = m->bound_thread_cond;
- bound_cond_cache_full = 1;
- } else {
- closeCondition(&m->bound_thread_cond);
- }
+ for (i = 0; i < n_capabilities; i++) {
+ shutdownCapability(&capabilities[i], task);
+ }
+ boundTaskExiting(task);
+ stopTaskManager();
+ }
#endif
-
- IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
- stgFree(m);
-
- // Postcondition: sched_mutex still held
- return stat;
}
/* ---------------------------------------------------------------------------
void
GetRoots( evac_fn evac )
{
-#if defined(GRAN)
- {
nat i;
+ Capability *cap;
+ Task *task;
+
+#if defined(GRAN)
for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
- if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
- evac((StgClosure **)&run_queue_hds[i]);
- if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
- evac((StgClosure **)&run_queue_tls[i]);
-
- if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
- evac((StgClosure **)&blocked_queue_hds[i]);
- if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
- evac((StgClosure **)&blocked_queue_tls[i]);
- if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
- evac((StgClosure **)&ccalling_threads[i]);
+ if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
+ evac((StgClosure **)&run_queue_hds[i]);
+ if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
+ evac((StgClosure **)&run_queue_tls[i]);
+
+ if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
+ evac((StgClosure **)&blocked_queue_hds[i]);
+ if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
+ evac((StgClosure **)&blocked_queue_tls[i]);
+ if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
+ evac((StgClosure **)&ccalling_threads[i]);
}
- }
- markEventQueue();
+ markEventQueue();
#else /* !GRAN */
- if (run_queue_hd != END_TSO_QUEUE) {
- ASSERT(run_queue_tl != END_TSO_QUEUE);
- evac((StgClosure **)&run_queue_hd);
- evac((StgClosure **)&run_queue_tl);
- }
-
- if (blocked_queue_hd != END_TSO_QUEUE) {
- ASSERT(blocked_queue_tl != END_TSO_QUEUE);
- evac((StgClosure **)&blocked_queue_hd);
- evac((StgClosure **)&blocked_queue_tl);
- }
-
- if (sleeping_queue != END_TSO_QUEUE) {
- evac((StgClosure **)&sleeping_queue);
- }
-#endif
- if (blackhole_queue != END_TSO_QUEUE) {
- evac((StgClosure **)&blackhole_queue);
- }
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ evac((StgClosure **)&cap->run_queue_hd);
+ evac((StgClosure **)&cap->run_queue_tl);
+
+ for (task = cap->suspended_ccalling_tasks; task != NULL;
+ task=task->next) {
+ evac((StgClosure **)&task->suspended_tso);
+ }
+ }
+
+#if !defined(THREADED_RTS)
+ evac((StgClosure **)(void *)&blocked_queue_hd);
+ evac((StgClosure **)(void *)&blocked_queue_tl);
+ evac((StgClosure **)(void *)&sleeping_queue);
+#endif
+#endif
- if (suspended_ccalling_threads != END_TSO_QUEUE) {
- evac((StgClosure **)&suspended_ccalling_threads);
- }
+ // evac((StgClosure **)&blackhole_queue);
-#if defined(PARALLEL_HASKELL) || defined(GRAN)
- markSparkQueue(evac);
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
+ markSparkQueue(evac);
#endif
-
+
#if defined(RTS_USER_SIGNALS)
- // mark the signal handlers (signals should be already blocked)
- markSignalHandlers(evac);
+ // mark the signal handlers (signals should be already blocked)
+ markSignalHandlers(evac);
#endif
}
static void (*extra_roots)(evac_fn);
+static void
+performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
+{
+ Task *task = myTask();
+
+ if (task == NULL) {
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
+ scheduleDoGC(NULL,task,force_major, get_roots);
+ boundTaskExiting(task);
+ } else {
+ scheduleDoGC(NULL,task,force_major, get_roots);
+ }
+}
+
void
performGC(void)
{
- /* Obligated to hold this lock upon entry */
- ACQUIRE_LOCK(&sched_mutex);
- GarbageCollect(GetRoots,rtsFalse);
- RELEASE_LOCK(&sched_mutex);
+ performGC_(rtsFalse, GetRoots);
}
void
performMajorGC(void)
{
- ACQUIRE_LOCK(&sched_mutex);
- GarbageCollect(GetRoots,rtsTrue);
- RELEASE_LOCK(&sched_mutex);
+ performGC_(rtsTrue, GetRoots);
}
static void
void
performGCWithRoots(void (*get_roots)(evac_fn))
{
- ACQUIRE_LOCK(&sched_mutex);
- extra_roots = get_roots;
- GarbageCollect(AllRoots,rtsFalse);
- RELEASE_LOCK(&sched_mutex);
+ extra_roots = get_roots;
+ performGC_(rtsFalse, AllRoots);
}
/* -----------------------------------------------------------------------------
-------------------------------------------------------------------------- */
static StgTSO *
-threadStackOverflow(StgTSO *tso)
+threadStackOverflow(Capability *cap, StgTSO *tso)
{
nat new_stack_size, stack_words;
lnat new_tso_size;
tso->sp+64)));
/* Send this thread the StackOverflow exception */
- raiseAsync(tso, (StgClosure *)stackOverflow_closure);
+ raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
return tso;
}
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
- IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
+ IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
TICK_ALLOC_TSO(new_stack_size,0);
DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- if (EMPTY_RUN_QUEUE())
+ if (emptyRunQueue())
emitSchedule = rtsTrue;
switch (get_itbl(node)->type) {
break;
#endif
default:
- barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
+ barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
}
}
}
#endif
#if defined(GRAN)
-static StgBlockingQueueElement *
-unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgTSO *tso;
PEs node_loc, tso_loc;
tso->id, tso));
}
#elif defined(PARALLEL_HASKELL)
-static StgBlockingQueueElement *
-unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgBlockingQueueElement *next;
break;
default:
- barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
+ barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
(StgClosure *)bqe);
# endif
IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
return next;
}
+#endif
-#else /* !GRAN && !PARALLEL_HASKELL */
-static StgTSO *
-unblockOneLocked(StgTSO *tso)
+StgTSO *
+unblockOne(Capability *cap, StgTSO *tso)
{
StgTSO *next;
tso->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
- APPEND_TO_RUN_QUEUE(tso);
- threadRunnable();
+
+ // We might have just migrated this TSO to our Capability:
+ if (tso->bound) {
+ tso->bound->cap = cap;
+ }
+
+ appendToRunQueue(cap,tso);
+
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
return next;
}
-#endif
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-INLINE_ME StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- ACQUIRE_LOCK(&sched_mutex);
- bqe = unblockOneLocked(bqe, node);
- RELEASE_LOCK(&sched_mutex);
- return bqe;
-}
-#else
-INLINE_ME StgTSO *
-unblockOne(StgTSO *tso)
-{
- ACQUIRE_LOCK(&sched_mutex);
- tso = unblockOneLocked(tso);
- RELEASE_LOCK(&sched_mutex);
- return tso;
-}
-#endif
#if defined(GRAN)
void
//tso = (StgTSO *)bqe; // wastes an assignment to get the type right
//tso_loc = where_is(tso);
len++;
- bqe = unblockOneLocked(bqe, node);
+ bqe = unblockOne(bqe, node);
}
/* if this is the BQ of an RBH, we have to put back the info ripped out of
{
StgBlockingQueueElement *bqe;
- ACQUIRE_LOCK(&sched_mutex);
-
IF_PAR_DEBUG(verbose,
debugBelch("##-_ AwBQ for node %p on [%x]: \n",
node, mytid));
bqe = q;
while (get_itbl(bqe)->type==TSO ||
get_itbl(bqe)->type==BLOCKED_FETCH) {
- bqe = unblockOneLocked(bqe, node);
+ bqe = unblockOne(bqe, node);
}
- RELEASE_LOCK(&sched_mutex);
}
#else /* !GRAN && !PARALLEL_HASKELL */
void
-awakenBlockedQueueNoLock(StgTSO *tso)
+awakenBlockedQueue(Capability *cap, StgTSO *tso)
{
- while (tso != END_TSO_QUEUE) {
- tso = unblockOneLocked(tso);
- }
-}
-
-void
-awakenBlockedQueue(StgTSO *tso)
-{
- ACQUIRE_LOCK(&sched_mutex);
- while (tso != END_TSO_QUEUE) {
- tso = unblockOneLocked(tso);
- }
- RELEASE_LOCK(&sched_mutex);
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOne(cap,tso);
+ }
}
#endif
{
interrupted = 1;
context_switch = 1;
+#if defined(THREADED_RTS)
+ prodAllCapabilities();
+#endif
}
/* -----------------------------------------------------------------------------
*/
static void
-unblockThread(StgTSO *tso)
+unblockThread(Capability *cap, StgTSO *tso)
{
StgBlockingQueueElement *t, **last;
blocked_queue_tl = (StgTSO *)prev;
}
}
+#if defined(mingw32_HOST_OS)
+ /* (Cooperatively) signal that the worker thread should abort
+ * the request.
+ */
+ abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
goto done;
}
}
tso->link = END_TSO_QUEUE;
tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL;
- PUSH_ON_RUN_QUEUE(tso);
+ pushOnRunQueue(cap,tso);
}
#else
static void
-unblockThread(StgTSO *tso)
+unblockThread(Capability *cap, StgTSO *tso)
{
StgTSO *t, **last;
barf("unblockThread (Exception): TSO not found");
}
+#if !defined(THREADED_RTS)
case BlockedOnRead:
case BlockedOnWrite:
#if defined(mingw32_HOST_OS)
blocked_queue_tl = prev;
}
}
+#if defined(mingw32_HOST_OS)
+ /* (Cooperatively) signal that the worker thread should abort
+ * the request.
+ */
+ abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
goto done;
}
}
}
barf("unblockThread (delay): TSO not found");
}
+#endif
default:
barf("unblockThread");
tso->link = END_TSO_QUEUE;
tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL;
- APPEND_TO_RUN_QUEUE(tso);
+ appendToRunQueue(cap,tso);
}
#endif
* -------------------------------------------------------------------------- */
static rtsBool
-checkBlackHoles( void )
+checkBlackHoles (Capability *cap)
{
StgTSO **prev, *t;
rtsBool any_woke_up = rtsFalse;
StgHalfWord type;
+ // blackhole_queue is global:
+ ASSERT_LOCK_HELD(&sched_mutex);
+
IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
// ASSUMES: sched_mutex
ASSERT(t->why_blocked == BlockedOnBlackHole);
type = get_itbl(t->block_info.closure)->type;
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
- t = unblockOneLocked(t);
+ IF_DEBUG(sanity,checkTSO(t));
+ t = unblockOne(cap, t);
+ // urk, the threads migrate to the current capability
+ // here, but we'd like to keep them on the original one.
*prev = t;
any_woke_up = rtsTrue;
} else {
* CATCH_FRAME on the stack. In either case, we strip the entire
* stack and replace the thread with a zombie.
*
- * Locks: sched_mutex held upon entry nor exit.
+ * ToDo: in THREADED_RTS mode, this function is only safe if either
+ * (a) we hold all the Capabilities (eg. in GC, or if there is only
+ * one Capability), or (b) we own the Capability that the TSO is
+ * currently blocked on or on the run queue of.
*
* -------------------------------------------------------------------------- */
-void
-deleteThread(StgTSO *tso)
-{
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- raiseAsync(tso,NULL);
- }
-}
-
-#ifdef FORKPROCESS_PRIMOP_SUPPORTED
-static void
-deleteThreadImmediately(StgTSO *tso)
-{ // for forkProcess only:
- // delete thread without giving it a chance to catch the KillThread exception
-
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
-
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unblockThread(tso);
- }
-
- tso->what_next = ThreadKilled;
-}
-#endif
-
void
-raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
+raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
{
- /* When raising async exs from contexts where sched_mutex isn't held;
- use raiseAsyncWithLock(). */
- ACQUIRE_LOCK(&sched_mutex);
- raiseAsync(tso,exception);
- RELEASE_LOCK(&sched_mutex);
+ raiseAsync_(cap, tso, exception, rtsFalse, NULL);
}
void
-raiseAsync(StgTSO *tso, StgClosure *exception)
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
{
- raiseAsync_(tso, exception, rtsFalse);
+ raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
}
static void
-raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
+raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically, StgPtr stop_here)
{
StgRetInfoTable *info;
- StgPtr sp;
+ StgPtr sp, frame;
+ nat i;
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
sched_belch("raising exception in thread %ld.", (long)tso->id));
// Remove it from any blocking queues
- unblockThread(tso);
+ unblockThread(cap,tso);
+
+ // mark it dirty; we're about to change its stack.
+ dirtyTSO(tso);
sp = tso->sp;
sp[0] = (W_)&stg_dummy_ret_closure;
}
- while (1) {
- nat i;
+ frame = sp + 1;
+ while (stop_here == NULL || frame < stop_here) {
// 1. Let the top of the stack be the "current closure"
//
// NB: if we pass an ATOMICALLY_FRAME then abort the associated
// transaction
-
- StgPtr frame;
-
- frame = sp + 1;
info = get_ret_itbl((StgClosure *)frame);
-
- while (info->i.type != UPDATE_FRAME
- && (info->i.type != CATCH_FRAME || exception == NULL)
- && info->i.type != STOP_FRAME
- && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
- {
- if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
- // IF we find an ATOMICALLY_FRAME then we abort the
- // current transaction and propagate the exception. In
- // this case (unlike ordinary exceptions) we do not care
- // whether the transaction is valid or not because its
- // possible validity cannot have caused the exception
- // and will not be visible after the abort.
- IF_DEBUG(stm,
- debugBelch("Found atomically block delivering async exception\n"));
- stmAbortTransaction(tso -> trec);
- tso -> trec = stmGetEnclosingTRec(tso -> trec);
- }
- frame += stack_frame_sizeW((StgClosure *)frame);
- info = get_ret_itbl((StgClosure *)frame);
- }
-
+
switch (info->i.type) {
-
- case ATOMICALLY_FRAME:
- ASSERT(stop_at_atomically);
- ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
- stmCondemnTransaction(tso -> trec);
-#ifdef REG_R1
- tso->sp = frame;
-#else
- // R1 is not a register: the return convention for IO in
- // this case puts the return value on the stack, so we
- // need to set up the stack to return to the atomically
- // frame properly...
- tso->sp = frame - 2;
- tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
- tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
- tso->what_next = ThreadRunGHC;
- return;
- case CATCH_FRAME:
- // If we find a CATCH_FRAME, and we've got an exception to raise,
- // then build the THUNK raise(exception), and leave it on
- // top of the CATCH_FRAME ready to enter.
- //
- {
-#ifdef PROFILING
- StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
- StgClosure *raise;
-
- // we've got an exception to raise, so let's pass it to the
- // handler in this frame.
- //
- raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
- TICK_ALLOC_SE_THK(1,0);
- SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
- raise->payload[0] = exception;
-
- // throw away the stack from Sp up to the CATCH_FRAME.
- //
- sp = frame - 1;
-
- /* Ensure that async excpetions are blocked now, so we don't get
- * a surprise exception before we get around to executing the
- * handler.
- */
- if (tso->blocked_exceptions == NULL) {
- tso->blocked_exceptions = END_TSO_QUEUE;
- }
-
- /* Put the newly-built THUNK on top of the stack, ready to execute
- * when the thread restarts.
- */
- sp[0] = (W_)raise;
- sp[-1] = (W_)&stg_enter_info;
- tso->sp = sp-1;
- tso->what_next = ThreadRunGHC;
- IF_DEBUG(sanity, checkTSO(tso));
- return;
- }
-
case UPDATE_FRAME:
{
StgAP_STACK * ap;
// fun field.
//
words = frame - sp - 1;
- ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
+ ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
ap->size = words;
ap->fun = (StgClosure *)sp[0];
printObj((StgClosure *)ap);
);
- // Replace the updatee with an indirection - happily
- // this will also wake up any threads currently
- // waiting on the result.
+ // Replace the updatee with an indirection
//
// Warning: if we're in a loop, more than one update frame on
// the stack may point to the same object. Be careful not to
}
sp += sizeofW(StgUpdateFrame) - 1;
sp[0] = (W_)ap; // push onto stack
- break;
+ frame = sp + 1;
+ continue; //no need to bump frame
}
-
+
case STOP_FRAME:
// We've stripped the entire stack, the thread is now dead.
- sp += sizeofW(StgStopFrame);
tso->what_next = ThreadKilled;
- tso->sp = sp;
+ tso->sp = frame + sizeofW(StgStopFrame);
return;
+
+ case CATCH_FRAME:
+ // If we find a CATCH_FRAME, and we've got an exception to raise,
+ // then build the THUNK raise(exception), and leave it on
+ // top of the CATCH_FRAME ready to enter.
+ //
+ {
+#ifdef PROFILING
+ StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+ StgThunk *raise;
+
+ if (exception == NULL) break;
+
+ // we've got an exception to raise, so let's pass it to the
+ // handler in this frame.
+ //
+ raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ TICK_ALLOC_SE_THK(1,0);
+ SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+ raise->payload[0] = exception;
+
+ // throw away the stack from Sp up to the CATCH_FRAME.
+ //
+ sp = frame - 1;
+
+ /* Ensure that async excpetions are blocked now, so we don't get
+ * a surprise exception before we get around to executing the
+ * handler.
+ */
+ if (tso->blocked_exceptions == NULL) {
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ }
+
+ /* Put the newly-built THUNK on top of the stack, ready to execute
+ * when the thread restarts.
+ */
+ sp[0] = (W_)raise;
+ sp[-1] = (W_)&stg_enter_info;
+ tso->sp = sp-1;
+ tso->what_next = ThreadRunGHC;
+ IF_DEBUG(sanity, checkTSO(tso));
+ return;
+ }
+
+ case ATOMICALLY_FRAME:
+ if (stop_at_atomically) {
+ ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ stmCondemnTransaction(cap, tso -> trec);
+#ifdef REG_R1
+ tso->sp = frame;
+#else
+ // R1 is not a register: the return convention for IO in
+ // this case puts the return value on the stack, so we
+ // need to set up the stack to return to the atomically
+ // frame properly...
+ tso->sp = frame - 2;
+ tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+ tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+ tso->what_next = ThreadRunGHC;
+ return;
+ }
+ // Not stop_at_atomically... fall through and abort the
+ // transaction.
+
+ case CATCH_RETRY_FRAME:
+ // IF we find an ATOMICALLY_FRAME then we abort the
+ // current transaction and propagate the exception. In
+ // this case (unlike ordinary exceptions) we do not care
+ // whether the transaction is valid or not because its
+ // possible validity cannot have caused the exception
+ // and will not be visible after the abort.
+ IF_DEBUG(stm,
+ debugBelch("Found atomically block delivering async exception\n"));
+ StgTRecHeader *trec = tso -> trec;
+ StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ stmAbortTransaction(cap, trec);
+ tso -> trec = outer;
+ break;
default:
- barf("raiseAsync");
+ break;
}
+
+ // move on to the next stack frame
+ frame += stack_frame_sizeW((StgClosure *)frame);
}
- barf("raiseAsync");
+
+ // if we got here, then we stopped at stop_here
+ ASSERT(stop_here != NULL);
}
/* -----------------------------------------------------------------------------
+ Deleting threads
+
+ This is used for interruption (^C) and forking, and corresponds to
+ raising an exception but without letting the thread catch the
+ exception.
+ -------------------------------------------------------------------------- */
+
+static void
+deleteThread (Capability *cap, StgTSO *tso)
+{
+ if (tso->why_blocked != BlockedOnCCall &&
+ tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ raiseAsync(cap,tso,NULL);
+ }
+}
+
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+static void
+deleteThreadImmediately(Capability *cap, StgTSO *tso)
+{ // for forkProcess only:
+ // delete thread without giving it a chance to catch the KillThread exception
+
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+
+ if (tso->why_blocked != BlockedOnCCall &&
+ tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ unblockThread(cap,tso);
+ }
+
+ tso->what_next = ThreadKilled;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
raiseExceptionHelper
This function is called by the raise# primitve, just so that we can
-------------------------------------------------------------------------- */
StgWord
-raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
+raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
{
- StgClosure *raise_closure = NULL;
+ Capability *cap = regTableToCapability(reg);
+ StgThunk *raise_closure = NULL;
StgPtr p, next;
StgRetInfoTable *info;
//
// thunks which are currently under evaluataion.
//
- //
+ // OLD COMMENT (we don't have MIN_UPD_SIZE now):
// LDV profiling: stg_raise_info has THUNK as its closure
// type. Since a THUNK takes at least MIN_UPD_SIZE words in its
// payload, MIN_UPD_SIZE is more approprate than 1. It seems that
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
+ (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
+ UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
p = next;
continue;
on an MVar, or NonTermination if the thread was blocked on a Black
Hole.
- Locks: sched_mutex isn't held upon entry nor exit.
+ Locks: assumes we hold *all* the capabilities.
-------------------------------------------------------------------------- */
void
-resurrectThreads( StgTSO *threads )
+resurrectThreads (StgTSO *threads)
{
- StgTSO *tso, *next;
-
- for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
- next = tso->global_link;
- tso->global_link = all_threads;
- all_threads = tso;
- IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
-
- switch (tso->why_blocked) {
- case BlockedOnMVar:
- case BlockedOnException:
- /* Called by GC - sched_mutex lock is currently held. */
- raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
- break;
- case BlockedOnBlackHole:
- raiseAsync(tso,(StgClosure *)NonTermination_closure);
- break;
- case BlockedOnSTM:
- raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
- break;
- case NotBlocked:
- /* This might happen if the thread was blocked on a black hole
- * belonging to a thread that we've just woken up (raiseAsync
- * can wake up threads, remember...).
- */
- continue;
- default:
- barf("resurrectThreads: thread blocked in a strange way");
+ StgTSO *tso, *next;
+ Capability *cap;
+
+ for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
+ next = tso->global_link;
+ tso->global_link = all_threads;
+ all_threads = tso;
+ IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
+
+ // Wake up the thread on the Capability it was last on for a
+ // bound thread, or last_free_capability otherwise.
+ if (tso->bound) {
+ cap = tso->bound->cap;
+ } else {
+ cap = last_free_capability;
+ }
+
+ switch (tso->why_blocked) {
+ case BlockedOnMVar:
+ case BlockedOnException:
+ /* Called by GC - sched_mutex lock is currently held. */
+ raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
+ break;
+ case BlockedOnBlackHole:
+ raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
+ break;
+ case BlockedOnSTM:
+ raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
+ break;
+ case NotBlocked:
+ /* This might happen if the thread was blocked on a black hole
+ * belonging to a thread that we've just woken up (raiseAsync
+ * can wake up threads, remember...).
+ */
+ continue;
+ default:
+ barf("resurrectThreads: thread blocked in a strange way");
+ }
}
- }
}
/* ----------------------------------------------------------------------------
* at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
------------------------------------------------------------------------- */
+#if DEBUG
static void
printThreadBlockage(StgTSO *tso)
{
switch (tso->why_blocked) {
case BlockedOnRead:
- debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
break;
case BlockedOnWrite:
- debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
break;
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
break;
#endif
case BlockedOnDelay:
- debugBelch("is blocked until %ld", tso->block_info.target);
+ debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
case BlockedOnMVar:
- debugBelch("is blocked on an MVar");
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnException:
debugBelch("is blocked on delivering an exception to thread %d",
}
}
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
{
- switch (tso->what_next) {
- case ThreadKilled:
- debugBelch("has been killed");
- break;
- case ThreadComplete:
- debugBelch("has completed");
- break;
- default:
- printThreadBlockage(tso);
- }
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ switch (t->what_next) {
+ case ThreadKilled:
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(t);
+ }
+ debugBelch("\n");
+ }
}
void
printAllThreads(void)
{
- StgTSO *t;
+ StgTSO *t, *next;
+ nat i;
+ Capability *cap;
# if defined(GRAN)
char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
debugBelch("all threads:\n");
# endif
- for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
-#if defined(DEBUG)
- {
- void *label = lookupThreadLabel(t->id);
- if (label) debugBelch("[\"%s\"] ",(char *)label);
- }
-#endif
- printThreadStatus(t);
- debugBelch("\n");
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ debugBelch("threads on capability %d:\n", cap->no);
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ }
+ }
+
+ debugBelch("other threads:\n");
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->why_blocked != NotBlocked) {
+ printThreadStatus(t);
+ }
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ }
}
}
-
-#ifdef DEBUG
+
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
/*
Print a whole blocking queue attached to node (debugging only).
static nat
run_queue_len(void)
{
- nat i;
- StgTSO *tso;
-
- for (i=0, tso=run_queue_hd;
- tso != END_TSO_QUEUE;
- i++, tso=tso->link)
- /* nothing */
-
- return i;
+ nat i;
+ StgTSO *tso;
+
+ for (i=0, tso=run_queue_hd;
+ tso != END_TSO_QUEUE;
+ i++, tso=tso->link) {
+ /* nothing */
+ }
+
+ return i;
}
#endif
void
sched_belch(char *s, ...)
{
- va_list ap;
- va_start(ap,s);
-#ifdef RTS_SUPPORTS_THREADS
- debugBelch("sched (task %p): ", osThreadId());
+ va_list ap;
+ va_start(ap,s);
+#ifdef THREADED_RTS
+ debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
#elif defined(PARALLEL_HASKELL)
- debugBelch("== ");
+ debugBelch("== ");
#else
- debugBelch("sched: ");
+ debugBelch("sched: ");
#endif
- vdebugBelch(s, ap);
- debugBelch("\n");
- va_end(ap);
+ vdebugBelch(s, ap);
+ debugBelch("\n");
+ va_end(ap);
}
#endif /* DEBUG */