# define STATIC_INLINE static
#endif
-#ifdef THREADED_RTS
-#define USED_WHEN_THREADED_RTS
-#define USED_WHEN_NON_THREADED_RTS STG_UNUSED
-#else
-#define USED_WHEN_THREADED_RTS STG_UNUSED
-#define USED_WHEN_NON_THREADED_RTS
-#endif
-
-#ifdef SMP
-#define USED_WHEN_SMP
-#else
-#define USED_WHEN_SMP STG_UNUSED
-#endif
-
/* -----------------------------------------------------------------------------
* Global variables
* -------------------------------------------------------------------------- */
/*
* This mutex protects most of the global scheduler data in
- * the THREADED_RTS and (inc. SMP) runtime.
+ * the THREADED_RTS runtime.
*/
#if defined(THREADED_RTS)
Mutex sched_mutex;
// scheduler clearer.
//
static void schedulePreLoop (void);
-#if defined(SMP)
+#if defined(THREADED_RTS)
static void schedulePushWork(Capability *cap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
+static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
+ void (*get_roots)(evac_fn));
static void unblockThread(Capability *cap, StgTSO *tso);
static rtsBool checkBlackHoles(Capability *cap);
}
#endif
-#ifdef SMP
+#if defined(THREADED_RTS)
schedulePushWork(cap,task);
#endif
//
if (interrupted) {
deleteRunQueue(cap);
-#if defined(SMP)
+#if defined(THREADED_RTS)
discardSparksCap(cap);
#endif
if (shutting_down_scheduler) {
}
}
-#if defined(SMP)
+#if defined(THREADED_RTS)
// If the run queue is empty, take a spark and turn it into a thread.
{
if (emptyRunQueue(cap)) {
}
}
}
-#endif // SMP
+#endif // THREADED_RTS
scheduleStartSignalHandlers(cap);
scheduleCheckBlockedThreads(cap);
scheduleDetectDeadlock(cap,task);
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
errno = t->saved_errno;
cap->in_haskell = rtsTrue;
+ dirtyTSO(t);
+
recent_activity = ACTIVITY_YES;
switch (prev_what_next) {
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
+ if (ready_to_gc) {
+ scheduleDoGC(cap,task,rtsFalse,GetRoots);
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
+ }
} /* end of while() */
IF_PAR_DEBUG(verbose,
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
-#ifdef SMP
+#if defined(THREADED_RTS)
static void
-schedulePushWork(Capability *cap USED_WHEN_SMP,
- Task *task USED_WHEN_SMP)
+schedulePushWork(Capability *cap USED_IF_THREADS,
+ Task *task USED_IF_THREADS)
{
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
* ------------------------------------------------------------------------- */
static void
-scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
+scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
{
#if !defined(THREADED_RTS)
//
{
#if defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
+ // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
return;
#endif
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
+ scheduleDoGC( cap, task, rtsTrue/*force major GC*/, GetRoots );
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
+
recent_activity = ACTIVITY_DONE_GC;
if ( !emptyRunQueue(cap) ) return;
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
g0s0 == cap->r.rNursery);
#endif
// has tidied up its stack and placed itself on whatever queue
// it needs to be on.
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
ASSERT(t->why_blocked != NotBlocked);
- // This might not be true under SMP: we don't have
+ // This might not be true under THREADED_RTS: we don't have
// exclusive access to this TSO, so someone might have
// woken it up by now. This actually happens: try
// conc023 +RTS -N2.
if (performHeapProfile ||
(RtsFlags.ProfFlags.profileInterval==0 &&
RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
+
+ // checking black holes is necessary before GC, otherwise
+ // there may be threads that are unreachable except by the
+ // blackhole queue, which the GC will consider to be
+ // deadlocked.
+ scheduleCheckBlackHoles(&MainCapability);
+
+ IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
GarbageCollect(GetRoots, rtsTrue);
+
+ IF_DEBUG(scheduler, sched_belch("performing heap census"));
heapCensus();
+
performHeapProfile = rtsFalse;
return rtsTrue; // true <=> we already GC'd
}
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
+scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
+ rtsBool force_major, void (*get_roots)(evac_fn))
{
StgTSO *t;
-#ifdef SMP
+#ifdef THREADED_RTS
static volatile StgWord waiting_for_gc;
rtsBool was_waiting;
nat i;
#endif
-#ifdef SMP
+#ifdef THREADED_RTS
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
if (was_waiting) {
do {
IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
- yieldCapability(&cap,task);
+ if (cap) yieldCapability(&cap,task);
} while (waiting_for_gc);
- return;
+ return; // NOTE: task->cap might have changed here
}
for (i=0; i < n_capabilities; i++) {
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- raiseAsync_(cap, t, NULL, rtsTrue, NULL);
+ raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
// so this happens periodically:
- scheduleCheckBlackHoles(cap);
+ if (cap) scheduleCheckBlackHoles(cap);
IF_DEBUG(scheduler, printAllThreads());
#if defined(THREADED_RTS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots, force_major);
+ GarbageCollect(get_roots, force_major);
-#if defined(SMP)
+#if defined(THREADED_RTS)
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
releaseCapability(&capabilities[i]);
}
}
- task->cap = cap;
+ if (cap) {
+ task->cap = cap;
+ } else {
+ task->cap = NULL;
+ }
#endif
#if defined(GRAN)
* ------------------------------------------------------------------------- */
StgBool
-isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
+isThreadBound(StgTSO* tso USED_IF_THREADS)
{
#if defined(THREADED_RTS)
return (tso->bound != NULL);
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-#if !defined(mingw32_HOST_OS) && !defined(SMP)
+#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
StgTSO* t,*next;
Capability *cap;
+#if defined(THREADED_RTS)
+ if (RtsFlags.ParFlags.nNodes > 1) {
+ errorBelch("forking not supported with +RTS -N<n> greater than 1");
+ stg_exit(EXIT_FAILURE);
+ }
+#endif
+
IF_DEBUG(scheduler,sched_belch("forking!"));
// ToDo: for SMP, we should probably acquire *all* the capabilities
cap->in_haskell = rtsTrue;
errno = saved_errno;
+ /* We might have GC'd, mark the TSO dirty again */
+ dirtyTSO(tso);
+
return &cap->r;
}
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
+ tso->flags = TSO_DIRTY;
tso->saved_errno = 0;
tso->bound = NULL;
/* A capability holds the state a native thread needs in
* order to execute STG code. At least one capability is
- * floating around (only SMP builds have more than one).
+ * floating around (only THREADED_RTS builds have more than one).
*/
initCapabilities();
initTaskManager();
-#if defined(SMP) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
initSparkPools();
#endif
-#if defined(SMP)
+#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
* Capability 0. The idea is that we're probably going to start a
}
#if !defined(THREADED_RTS)
- evac((StgClosure **)&blocked_queue_hd);
- evac((StgClosure **)&blocked_queue_tl);
- evac((StgClosure **)&sleeping_queue);
+ evac((StgClosure **)(void *)&blocked_queue_hd);
+ evac((StgClosure **)(void *)&blocked_queue_tl);
+ evac((StgClosure **)(void *)&sleeping_queue);
#endif
#endif
- evac((StgClosure **)&blackhole_queue);
+ // evac((StgClosure **)&blackhole_queue);
-#if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
markSparkQueue(evac);
#endif
static void (*extra_roots)(evac_fn);
+static void
+performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
+{
+ Task *task = myTask();
+
+ if (task == NULL) {
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
+ scheduleDoGC(NULL,task,force_major, get_roots);
+ boundTaskExiting(task);
+ } else {
+ scheduleDoGC(NULL,task,force_major, get_roots);
+ }
+}
+
void
performGC(void)
{
-#ifdef THREADED_RTS
- // ToDo: we have to grab all the capabilities here.
- errorBelch("performGC not supported in threaded RTS (yet)");
- stg_exit(EXIT_FAILURE);
-#endif
- /* Obligated to hold this lock upon entry */
- GarbageCollect(GetRoots,rtsFalse);
+ performGC_(rtsFalse, GetRoots);
}
void
performMajorGC(void)
{
-#ifdef THREADED_RTS
- errorBelch("performMayjorGC not supported in threaded RTS (yet)");
- stg_exit(EXIT_FAILURE);
-#endif
- GarbageCollect(GetRoots,rtsTrue);
+ performGC_(rtsTrue, GetRoots);
}
static void
void
performGCWithRoots(void (*get_roots)(evac_fn))
{
-#ifdef THREADED_RTS
- errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
- stg_exit(EXIT_FAILURE);
-#endif
extra_roots = get_roots;
- GarbageCollect(AllRoots,rtsFalse);
+ performGC_(rtsFalse, AllRoots);
}
/* -----------------------------------------------------------------------------
* CATCH_FRAME on the stack. In either case, we strip the entire
* stack and replace the thread with a zombie.
*
- * ToDo: in SMP mode, this function is only safe if either (a) we hold
- * all the Capabilities (eg. in GC), or (b) we own the Capability that
- * the TSO is currently blocked on or on the run queue of.
+ * ToDo: in THREADED_RTS mode, this function is only safe if either
+ * (a) we hold all the Capabilities (eg. in GC, or if there is only
+ * one Capability), or (b) we own the Capability that the TSO is
+ * currently blocked on or on the run queue of.
*
* -------------------------------------------------------------------------- */
// Remove it from any blocking queues
unblockThread(cap,tso);
+ // mark it dirty; we're about to change its stack.
+ dirtyTSO(tso);
+
sp = tso->sp;
// The stack freezing code assumes there's a closure pointer on
// we've got an exception to raise, so let's pass it to the
// handler in this frame.
//
- raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
+ raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
TICK_ALLOC_SE_THK(1,0);
SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
raise->payload[0] = exception;
// thunks which are currently under evaluataion.
//
- //
+ // OLD COMMENT (we don't have MIN_UPD_SIZE now):
// LDV profiling: stg_raise_info has THUNK as its closure
// type. Since a THUNK takes at least MIN_UPD_SIZE words in its
// payload, MIN_UPD_SIZE is more approprate than 1. It seems that
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
+ (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}