/* if this flag is set as well, give up execution */
rtsBool interrupted = rtsFalse;
-/* If this flag is set, we are running Haskell code. Used to detect
- * uses of 'foreign import unsafe' that should be 'safe'.
- */
-static rtsBool in_haskell = rtsFalse;
-
/* Next thread ID to allocate.
* Locks required: thread_id_mutex
*/
*/
StgTSO dummy_tso;
-# if defined(SMP)
-static Condition gc_pending_cond = INIT_COND_VAR;
-# endif
-
-static rtsBool ready_to_gc;
-
/*
* Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
* in an MT setting, needed to signal that a worker thread shouldn't hang around
static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
Capability *cap, StgTSO *t );
-static void scheduleDoHeapProfile(void);
-static void scheduleDoGC(void);
+static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
+static void scheduleDoGC(Capability *cap);
static void unblockThread(StgTSO *tso);
static rtsBool checkBlackHoles(void);
* ------------------------------------------------------------------------- */
#if defined(RTS_SUPPORTS_THREADS)
-static rtsBool startingWorkerThread = rtsFalse;
+static nat startingWorkerThread = 0;
static void
taskStart(void)
{
ACQUIRE_LOCK(&sched_mutex);
- startingWorkerThread = rtsFalse;
+ startingWorkerThread--;
schedule(NULL,NULL);
taskStop();
RELEASE_LOCK(&sched_mutex);
{
if ( !EMPTY_RUN_QUEUE()
&& !shutting_down_scheduler // not if we're shutting down
- && !startingWorkerThread )
+ && startingWorkerThread==0)
{
// we don't want to start another worker thread
// just because the last one hasn't yet reached the
// "waiting for capability" state
- startingWorkerThread = rtsTrue;
+ startingWorkerThread++;
if (!maybeStartNewWorker(taskStart)) {
- startingWorkerThread = rtsFalse;
+ startingWorkerThread--;
}
}
}
# endif
#endif
nat prev_what_next;
+ rtsBool ready_to_gc;
// Pre-condition: sched_mutex is held.
// We might have a capability, passed in as initialCapability.
IF_DEBUG(scheduler, printAllThreads());
-#if defined(SMP)
- //
- // Wait until GC has completed, if necessary.
- //
- if (ready_to_gc) {
- if (cap != NULL) {
- releaseCapability(cap);
- IF_DEBUG(scheduler,sched_belch("waiting for GC"));
- waitCondition( &gc_pending_cond, &sched_mutex );
- }
- }
-#endif
-
#if defined(RTS_SUPPORTS_THREADS)
// Yield the capability to higher-priority tasks if necessary.
//
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
- if (in_haskell) {
+ if (cap->r.rInHaskell) {
errorBelch("schedule: re-entered unsafely.\n"
" Perhaps a 'foreign import unsafe' should be 'safe'?");
stg_exit(1);
prev_what_next = t->what_next;
errno = t->saved_errno;
- in_haskell = rtsTrue;
+ cap->r.rInHaskell = rtsTrue;
switch (prev_what_next) {
blackholes_need_checking = rtsTrue;
}
- in_haskell = rtsFalse;
+ cap->r.rInHaskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
schedulePostRunThread();
+ ready_to_gc = rtsFalse;
+
switch (ret) {
case HeapOverflow:
ready_to_gc = scheduleHandleHeapOverflow(cap,t);
barf("schedule: invalid thread return code %d", (int)ret);
}
- scheduleDoHeapProfile();
- scheduleDoGC();
+ if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
+ if (ready_to_gc) { scheduleDoGC(cap); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
static void
scheduleStartSignalHandlers(void)
{
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
StgMainThread *m;
m = main_threads;
switch (m->tso->why_blocked) {
+ case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
+#if !defined(SMP)
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
- g0s0->blocks == cap->r.rNursery);
- cap->r.rNursery = g0s0->blocks = bd;
+ g0s0 == cap->r.rNursery);
+ g0s0->blocks = bd;
+#endif
+ cap->r.rNursery->blocks = bd;
}
cap->r.rCurrentNursery->u.back = bd;
}
}
+#if !defined(SMP)
// don't forget to update the block count in g0s0.
g0s0->n_blocks += blocks;
+
// This assert can be a killer if the app is doing lots
// of large block allocations.
ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
+#endif
// now update the nursery to point to the new block
cap->r.rCurrentNursery = bd;
* Perform a heap census, if PROFILING
* -------------------------------------------------------------------------- */
-static void
-scheduleDoHeapProfile(void)
+static rtsBool
+scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
{
-#ifdef PROFILING
+#if defined(PROFILING)
// When we have +RTS -i0 and we're heap profiling, do a census at
// every GC. This lets us get repeatable runs for debugging.
if (performHeapProfile ||
GarbageCollect(GetRoots, rtsTrue);
heapCensus();
performHeapProfile = rtsFalse;
- ready_to_gc = rtsFalse; // we already GC'd
+ return rtsTrue; // true <=> we already GC'd
}
#endif
+ return rtsFalse;
}
/* -----------------------------------------------------------------------------
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC(void)
+scheduleDoGC( Capability *cap STG_UNUSED )
{
StgTSO *t;
+#ifdef SMP
+ static rtsBool waiting_for_gc;
+ int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
+ // subtract one because we're already holding one.
+ Capability *caps[n_capabilities];
+#endif
#ifdef SMP
- // The last task to stop actually gets to do the GC. The rest
- // of the tasks release their capabilities and wait gc_pending_cond.
- if (ready_to_gc && allFreeCapabilities())
-#else
- if (ready_to_gc)
+ // In order to GC, there must be no threads running Haskell code.
+ // Therefore, the GC thread needs to hold *all* the capabilities,
+ // and release them after the GC has completed.
+ //
+ // This seems to be the simplest way: previous attempts involved
+ // making all the threads with capabilities give up their
+ // capabilities and sleep except for the *last* one, which
+ // actually did the GC. But it's quite hard to arrange for all
+ // the other tasks to sleep and stay asleep.
+ //
+
+ // Someone else is already trying to GC
+ if (waiting_for_gc) return;
+ waiting_for_gc = rtsTrue;
+
+ caps[n_capabilities] = cap;
+ while (n_capabilities > 0) {
+ IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
+ waitForReturnCapability(&sched_mutex, &cap);
+ n_capabilities--;
+ caps[n_capabilities] = cap;
+ }
+
+ waiting_for_gc = rtsFalse;
#endif
- {
- /* Kick any transactions which are invalid back to their
- * atomically frames. When next scheduled they will try to
- * commit, this commit will fail and they will retry.
- */
- for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
- if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateTransaction (t -> trec)) {
- IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-
- // strip the stack back to the ATOMICALLY_FRAME, aborting
- // the (nested) transaction, and saving the stack of any
- // partially-evaluated thunks on the heap.
- raiseAsync_(t, NULL, rtsTrue);
-
+
+ /* Kick any transactions which are invalid back to their
+ * atomically frames. When next scheduled they will try to
+ * commit, this commit will fail and they will retry.
+ */
+ for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+ if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateTransaction (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
+
#ifdef REG_R1
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
- }
}
}
-
- // so this happens periodically:
- scheduleCheckBlackHoles();
-
- /* everybody back, start the GC.
- * Could do it in this thread, or signal a condition var
- * to do it in another thread. Either way, we need to
- * broadcast on gc_pending_cond afterward.
- */
+ }
+
+ // so this happens periodically:
+ scheduleCheckBlackHoles();
+
+ /* everybody back, start the GC.
+ * Could do it in this thread, or signal a condition var
+ * to do it in another thread. Either way, we need to
+ * broadcast on gc_pending_cond afterward.
+ */
#if defined(RTS_SUPPORTS_THREADS)
- IF_DEBUG(scheduler,sched_belch("doing GC"));
+ IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots,rtsFalse);
- ready_to_gc = rtsFalse;
+ GarbageCollect(GetRoots,rtsFalse);
+
#if defined(SMP)
- broadcastCondition(&gc_pending_cond);
+ {
+ // release our stash of capabilities.
+ nat i;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
+ releaseCapability(caps[i]);
+ }
+ }
#endif
+
#if defined(GRAN)
- /* add a ContinueThread event to continue execution of current thread */
- new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
- ContinueThread,
- t, (StgClosure*)NULL, (rtsSpark*)NULL);
- IF_GRAN_DEBUG(bq,
- debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
- G_EVENTQ(0);
- G_CURR_THREADQ(0));
+ /* add a ContinueThread event to continue execution of current thread */
+ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
+ ContinueThread,
+ t, (StgClosure*)NULL, (rtsSpark*)NULL);
+ IF_GRAN_DEBUG(bq,
+ debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
+ G_EVENTQ(0);
+ G_CURR_THREADQ(0));
#endif /* GRAN */
- }
}
/* ---------------------------------------------------------------------------
tok = cap->r.rCurrentTSO->id;
/* Hand back capability */
+ cap->r.rInHaskell = rtsFalse;
releaseCapability(cap);
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
- in_haskell = rtsFalse;
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
- in_haskell = rtsTrue;
+ cap->r.rInHaskell = rtsTrue;
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
return &cap->r;
#if defined(SMP)
/* eagerly start some extra workers */
+ startingWorkerThread = RtsFlags.ParFlags.nNodes;
startTasks(RtsFlags.ParFlags.nNodes, taskStart);
#endif