static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
Capability *cap, StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap);
+static void scheduleDoGC(rtsBool force_major);
static void unblockThread(StgTSO *tso);
static rtsBool checkBlackHoles(void);
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
+void printThreadQueue(StgTSO *tso);
#if defined(PARALLEL_HASKELL)
StgTSO * createSparkThread(rtsSpark spark);
// Yield the capability to higher-priority tasks if necessary.
//
if (cap != NULL) {
- yieldCapability(&cap);
+ yieldCapability(&cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL );
}
// If we do not currently hold a capability, we wait for one
// We now have a capability...
#endif
-
+
#if 0 /* extra sanity checking */
{
StgMainThread *m;
sched_belch("### thread %d bound to another OS thread", t->id));
// no, bound to a different Haskell thread: pass to that thread
PUSH_ON_RUN_QUEUE(t);
- passCapability(&m->bound_thread_cond);
continue;
}
}
else
{
if(mainThread != NULL)
- // The thread we want to run is bound.
+ // The thread we want to run is unbound.
{
IF_DEBUG(scheduler,
sched_belch("### this OS thread cannot run thread %d", t->id));
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
PUSH_ON_RUN_QUEUE(t);
- passCapabilityToWorker();
continue;
}
}
cap = myCapability();
#endif
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
cap->r.rInHaskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
#endif
ACQUIRE_LOCK(&sched_mutex);
+
+ // We have run some Haskell code: there might be blackhole-blocked
+ // threads to wake up now.
+ if ( blackhole_queue != END_TSO_QUEUE ) {
+ blackholes_need_checking = rtsTrue;
+ }
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(cap); }
+ if (ready_to_gc) { scheduleDoGC(rtsFalse); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
#if defined(RTS_SUPPORTS_THREADS)
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
-#endif
+#else
awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+#endif
}
}
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock(void)
+scheduleDetectDeadlock()
{
#if defined(PARALLEL_HASKELL)
// exception. Any threads thus released will be immediately
// runnable.
- GarbageCollect(GetRoots,rtsTrue);
+ scheduleDoGC( rtsTrue/*force major GC*/ );
recent_activity = ACTIVITY_DONE_GC;
if ( !EMPTY_RUN_QUEUE() ) return;
cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
// if the nursery has only one block.
+ ACQUIRE_SM_LOCK
bd = allocGroup( blocks );
+ RELEASE_SM_LOCK
cap->r.rNursery->n_blocks += blocks;
// link the new group into the list
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC( Capability *cap STG_UNUSED )
+scheduleDoGC( rtsBool force_major )
{
StgTSO *t;
#ifdef SMP
+ Capability *cap;
static rtsBool waiting_for_gc;
int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
// subtract one because we're already holding one.
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
+ // This does mean that there will be multiple entries in the
+ // thread->capability hash table for the current thread, but
+ // they will be removed as normal when the capabilities are
+ // released again.
+ //
// Someone else is already trying to GC
if (waiting_for_gc) return;
waiting_for_gc = rtsTrue;
- caps[n_capabilities] = cap;
while (n_capabilities > 0) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
waitForReturnCapability(&sched_mutex, &cap);
* atomically frames. When next scheduled they will try to
* commit, this commit will fail and they will retry.
*/
- for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
- if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateTransaction (t -> trec)) {
- IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-
- // strip the stack back to the ATOMICALLY_FRAME, aborting
- // the (nested) transaction, and saving the stack of any
- // partially-evaluated thunks on the heap.
- raiseAsync_(t, NULL, rtsTrue);
-
+ {
+ StgTSO *next;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateNestOfTransactions (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
+
#ifdef REG_R1
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
+ }
+ }
}
}
}
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots,rtsFalse);
+ GarbageCollect(GetRoots, force_major);
#if defined(SMP)
{
createThread(nat size)
#endif
{
-
StgTSO *tso;
nat stack_size;
* on this thread's stack before the scheduler is invoked.
* ------------------------------------------------------------------------ */
-static void
-scheduleThread_(StgTSO *tso)
+void
+scheduleThreadLocked(StgTSO *tso)
{
// The thread goes at the *end* of the run-queue, to avoid possible
// starvation of any threads already on the queue.
scheduleThread(StgTSO* tso)
{
ACQUIRE_LOCK(&sched_mutex);
- scheduleThread_(tso);
+ scheduleThreadLocked(tso);
RELEASE_LOCK(&sched_mutex);
}
{
interrupted = rtsTrue;
shutting_down_scheduler = rtsTrue;
+
#if defined(RTS_SUPPORTS_THREADS)
if (threadIsTask(osThreadId())) { taskStop(); }
stopTaskManager();
+ //
+ // What can we do here? There are a bunch of worker threads, it
+ // might be nice to let them exit cleanly. There may be some main
+ // threads in the run queue; we should let them return to their
+ // callers with an Interrupted state. We can't in general wait
+ // for all the running Tasks to stop, because some might be off in
+ // a C call that is blocked.
+ //
+ // Letting the run queue drain is the safest thing. That lets any
+ // main threads return that can return, and cleans up all the
+ // runnable threads. Then we grab all the Capabilities to stop
+ // anything unexpected happening while we shut down.
+ //
+ // ToDo: this doesn't let us get the time stats from the worker
+ // tasks, because they haven't called taskStop().
+ //
+ ACQUIRE_LOCK(&sched_mutex);
+ {
+ nat i;
+ for (i = 1000; i > 0; i--) {
+ if (EMPTY_RUN_QUEUE()) {
+ IF_DEBUG(scheduler, sched_belch("run queue is empty"));
+ break;
+ }
+ IF_DEBUG(scheduler, sched_belch("yielding"));
+ RELEASE_LOCK(&sched_mutex);
+ prodWorker();
+ yieldThread();
+ ACQUIRE_LOCK(&sched_mutex);
+ }
+ }
+
+#ifdef SMP
+ {
+ Capability *cap;
+ int n_capabilities = RtsFlags.ParFlags.nNodes;
+ Capability *caps[n_capabilities];
+ nat i;
+
+ while (n_capabilities > 0) {
+ IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities));
+ waitForReturnCapability(&sched_mutex, &cap);
+ n_capabilities--;
+ caps[n_capabilities] = cap;
+ }
+ }
+#else
+ {
+ Capability *cap;
+ waitForReturnCapability(&sched_mutex, &cap);
+ }
+#endif
#endif
}
void
awakenBlockedQueueNoLock(StgTSO *tso)
{
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
while (tso != END_TSO_QUEUE) {
tso = unblockOneLocked(tso);
}
void
awakenBlockedQueue(StgTSO *tso)
{
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
ACQUIRE_LOCK(&sched_mutex);
while (tso != END_TSO_QUEUE) {
tso = unblockOneLocked(tso);
ASSERT(t->why_blocked == BlockedOnBlackHole);
type = get_itbl(t->block_info.closure)->type;
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
+ IF_DEBUG(sanity,checkTSO(t));
t = unblockOneLocked(t);
*prev = t;
any_woke_up = rtsTrue;
// we've got an exception to raise, so let's pass it to the
// handler in this frame.
//
- raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
+ raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
TICK_ALLOC_SE_THK(1,0);
SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
raise->payload[0] = exception;
{
switch (tso->why_blocked) {
case BlockedOnRead:
- debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
break;
case BlockedOnWrite:
- debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
break;
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
break;
#endif
case BlockedOnDelay:
- debugBelch("is blocked until %ld", tso->block_info.target);
+ debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
case BlockedOnMVar:
- debugBelch("is blocked on an MVar");
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnException:
debugBelch("is blocked on delivering an exception to thread %d",
# endif
for (t = all_threads; t != END_TSO_QUEUE; ) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
#if defined(DEBUG)
{
void *label = lookupThreadLabel(t->id);
}
}
}
-
+
#ifdef DEBUG
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ }
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
/*
Print a whole blocking queue attached to node (debugging only).
*/