static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
Capability *cap, StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap);
+static void scheduleDoGC(rtsBool force_major);
static void unblockThread(StgTSO *tso);
static rtsBool checkBlackHoles(void);
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
+void printThreadQueue(StgTSO *tso);
#if defined(PARALLEL_HASKELL)
StgTSO * createSparkThread(rtsSpark spark);
CurrentTSO = event->tso;
#endif
- IF_DEBUG(scheduler, printAllThreads());
-
#if defined(RTS_SUPPORTS_THREADS)
// Yield the capability to higher-priority tasks if necessary.
//
if (cap != NULL) {
- yieldCapability(&cap);
+ yieldCapability(&cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL );
}
// If we do not currently hold a capability, we wait for one
// We now have a capability...
#endif
+#if 0 /* extra sanity checking */
+ {
+ StgMainThread *m;
+ for (m = main_threads; m != NULL; m = m->link) {
+ ASSERT(get_itbl(m->tso)->type == TSO);
+ }
+ }
+#endif
+
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
sched_belch("### thread %d bound to another OS thread", t->id));
// no, bound to a different Haskell thread: pass to that thread
PUSH_ON_RUN_QUEUE(t);
- passCapability(&m->bound_thread_cond);
continue;
}
}
else
{
if(mainThread != NULL)
- // The thread we want to run is bound.
+ // The thread we want to run is unbound.
{
IF_DEBUG(scheduler,
sched_belch("### this OS thread cannot run thread %d", t->id));
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
PUSH_ON_RUN_QUEUE(t);
- passCapabilityToWorker();
continue;
}
}
barf("schedule: invalid what_next field");
}
+#if defined(SMP)
+ // in SMP mode, we might return with a different capability than
+ // we started with, if the Haskell thread made a foreign call. So
+ // let's find out what our current Capability is:
+ cap = myCapability();
+#endif
+
// We have run some Haskell code: there might be blackhole-blocked
// threads to wake up now.
if ( blackhole_queue != END_TSO_QUEUE ) {
case ThreadBlocked:
scheduleHandleThreadBlocked(t);
- threadPaused(t);
break;
case ThreadFinished:
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(cap); }
+ if (ready_to_gc) { scheduleDoGC(rtsFalse); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
#if defined(RTS_SUPPORTS_THREADS)
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
-#endif
+#else
awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+#endif
}
}
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock(void)
+scheduleDetectDeadlock()
{
#if defined(PARALLEL_HASKELL)
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- GarbageCollect(GetRoots,rtsTrue);
+
+ scheduleDoGC( rtsTrue/*force major GC*/ );
recent_activity = ACTIVITY_DONE_GC;
if ( !EMPTY_RUN_QUEUE() ) return;
debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
(long)t->id, whatNext_strs[t->what_next], blocks));
- // don't do this if it would push us over the
- // alloc_blocks_lim limit; we'll GC first.
- if (alloc_blocks + blocks < alloc_blocks_lim) {
+ // don't do this if the nursery is (nearly) full, we'll GC first.
+ if (cap->r.rCurrentNursery->link != NULL ||
+ cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
+ // if the nursery has only one block.
- alloc_blocks += blocks;
bd = allocGroup( blocks );
+ cap->r.rNursery->n_blocks += blocks;
// link the new group into the list
bd->link = cap->r.rCurrentNursery;
#if !defined(SMP)
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
g0s0 == cap->r.rNursery);
- g0s0->blocks = bd;
#endif
cap->r.rNursery->blocks = bd;
}
{
bdescr *x;
for (x = bd; x < bd + blocks; x++) {
- x->step = g0s0;
+ x->step = cap->r.rNursery;
x->gen_no = 0;
x->flags = 0;
}
}
-#if !defined(SMP)
- // don't forget to update the block count in g0s0.
- g0s0->n_blocks += blocks;
-
// This assert can be a killer if the app is doing lots
// of large block allocations.
- ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
-#endif
+ IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
// now update the nursery to point to the new block
cap->r.rCurrentNursery = bd;
}
}
- /* make all the running tasks block on a condition variable,
- * maybe set context_switch and wait till they all pile in,
- * then have them wait on a GC condition variable.
- */
IF_DEBUG(scheduler,
debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
(long)t->id, whatNext_strs[t->what_next]));
- threadPaused(t);
#if defined(GRAN)
ASSERT(!is_on_queue(t,CurrentProc));
#elif defined(PARALLEL_HASKELL)
/* just adjust the stack for this thread, then pop it back
* on the run queue.
*/
- threadPaused(t);
{
/* enlarge the stack */
StgTSO *new_t = threadStackOverflow(t);
return rtsTrue;
}
- threadPaused(t);
-
#if defined(GRAN)
ASSERT(!is_on_queue(t,CurrentProc));
emitSchedule = rtsTrue;
#else /* !GRAN */
- /* don't need to do anything. Either the thread is blocked on
- * I/O, in which case we'll have called addToBlockedQueue
- * previously, or it's blocked on an MVar or Blackhole, in which
- * case it'll be on the relevant queue already.
- */
+
+ // We don't need to do anything. The thread is blocked, and it
+ // has tidied up its stack and placed itself on whatever queue
+ // it needs to be on.
+
+#if !defined(SMP)
ASSERT(t->why_blocked != NotBlocked);
+ // This might not be true under SMP: we don't have
+ // exclusive access to this TSO, so someone might have
+ // woken it up by now. This actually happens: try
+ // conc023 +RTS -N2.
+#endif
+
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
removeThreadLabel((StgWord)mainThread->tso->id);
#endif
if (mainThread->prev == NULL) {
+ ASSERT(mainThread == main_threads);
main_threads = mainThread->link;
} else {
mainThread->prev->link = mainThread->link;
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC( Capability *cap STG_UNUSED )
+scheduleDoGC( rtsBool force_major )
{
StgTSO *t;
#ifdef SMP
+ Capability *cap;
static rtsBool waiting_for_gc;
int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
// subtract one because we're already holding one.
if (waiting_for_gc) return;
waiting_for_gc = rtsTrue;
- caps[n_capabilities] = cap;
while (n_capabilities > 0) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
waitForReturnCapability(&sched_mutex, &cap);
*/
for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateTransaction (t -> trec)) {
+ if (!stmValidateNestOfTransactions (t -> trec)) {
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
// strip the stack back to the ATOMICALLY_FRAME, aborting
// so this happens periodically:
scheduleCheckBlackHoles();
+ IF_DEBUG(scheduler, printAllThreads());
+
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots,rtsFalse);
+ GarbageCollect(GetRoots, force_major);
#if defined(SMP)
{
StgTSO* t, *next;
IF_DEBUG(scheduler,sched_belch("deleting all threads"));
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->global_link;
- deleteThread(t);
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ deleteThread(t);
+ }
}
// The run queue now contains a bunch of ThreadKilled threads. We
#endif
#if defined(GRAN)
-static StgBlockingQueueElement *
+StgBlockingQueueElement *
unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgTSO *tso;
tso->id, tso));
}
#elif defined(PARALLEL_HASKELL)
-static StgBlockingQueueElement *
+StgBlockingQueueElement *
unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgBlockingQueueElement *next;
}
#else /* !GRAN && !PARALLEL_HASKELL */
-static StgTSO *
+StgTSO *
unblockOneLocked(StgTSO *tso)
{
StgTSO *next;
{
switch (tso->why_blocked) {
case BlockedOnRead:
- debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
break;
case BlockedOnWrite:
- debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
break;
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
break;
#endif
case BlockedOnDelay:
- debugBelch("is blocked until %ld", tso->block_info.target);
+ debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
case BlockedOnMVar:
- debugBelch("is blocked on an MVar");
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnException:
debugBelch("is blocked on delivering an exception to thread %d",
debugBelch("all threads:\n");
# endif
- for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ for (t = all_threads; t != END_TSO_QUEUE; ) {
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
#if defined(DEBUG)
{
void *label = lookupThreadLabel(t->id);
if (label) debugBelch("[\"%s\"] ",(char *)label);
}
#endif
- printThreadStatus(t);
- debugBelch("\n");
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ t = t->link;
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ t = t->global_link;
+ }
}
}
-
+
#ifdef DEBUG
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ }
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
/*
Print a whole blocking queue attached to node (debugging only).
*/