static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
Capability *cap, StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap);
+static void scheduleDoGC(rtsBool force_major);
static void unblockThread(StgTSO *tso);
static rtsBool checkBlackHoles(void);
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
+void printThreadQueue(StgTSO *tso);
#if defined(PARALLEL_HASKELL)
StgTSO * createSparkThread(rtsSpark spark);
// Yield the capability to higher-priority tasks if necessary.
//
if (cap != NULL) {
- yieldCapability(&cap);
+ yieldCapability(&cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL );
}
// If we do not currently hold a capability, we wait for one
// We now have a capability...
#endif
-
+
#if 0 /* extra sanity checking */
{
StgMainThread *m;
sched_belch("### thread %d bound to another OS thread", t->id));
// no, bound to a different Haskell thread: pass to that thread
PUSH_ON_RUN_QUEUE(t);
- passCapability(&m->bound_thread_cond);
continue;
}
}
else
{
if(mainThread != NULL)
- // The thread we want to run is bound.
+ // The thread we want to run is unbound.
{
IF_DEBUG(scheduler,
sched_belch("### this OS thread cannot run thread %d", t->id));
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
PUSH_ON_RUN_QUEUE(t);
- passCapabilityToWorker();
continue;
}
}
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(cap); }
+ if (ready_to_gc) { scheduleDoGC(rtsFalse); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
#if defined(RTS_SUPPORTS_THREADS)
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
-#endif
+#else
awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+#endif
}
}
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock(void)
+scheduleDetectDeadlock()
{
#if defined(PARALLEL_HASKELL)
// exception. Any threads thus released will be immediately
// runnable.
- GarbageCollect(GetRoots,rtsTrue);
+ scheduleDoGC( rtsTrue/*force major GC*/ );
recent_activity = ACTIVITY_DONE_GC;
if ( !EMPTY_RUN_QUEUE() ) return;
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC( Capability *cap STG_UNUSED )
+scheduleDoGC( rtsBool force_major )
{
StgTSO *t;
#ifdef SMP
+ Capability *cap;
static rtsBool waiting_for_gc;
int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
// subtract one because we're already holding one.
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
+ // This does mean that there will be multiple entries in the
+ // thread->capability hash table for the current thread, but
+ // they will be removed as normal when the capabilities are
+ // released again.
+ //
// Someone else is already trying to GC
if (waiting_for_gc) return;
waiting_for_gc = rtsTrue;
- caps[n_capabilities] = cap;
while (n_capabilities > 0) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
waitForReturnCapability(&sched_mutex, &cap);
* atomically frames. When next scheduled they will try to
* commit, this commit will fail and they will retry.
*/
- for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
- if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateTransaction (t -> trec)) {
- IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-
- // strip the stack back to the ATOMICALLY_FRAME, aborting
- // the (nested) transaction, and saving the stack of any
- // partially-evaluated thunks on the heap.
- raiseAsync_(t, NULL, rtsTrue);
-
+ {
+ StgTSO *next;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateNestOfTransactions (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
+
#ifdef REG_R1
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
+ }
+ }
}
}
}
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots,rtsFalse);
+ GarbageCollect(GetRoots, force_major);
#if defined(SMP)
{
void
awakenBlockedQueueNoLock(StgTSO *tso)
{
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
while (tso != END_TSO_QUEUE) {
tso = unblockOneLocked(tso);
}
void
awakenBlockedQueue(StgTSO *tso)
{
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
ACQUIRE_LOCK(&sched_mutex);
while (tso != END_TSO_QUEUE) {
tso = unblockOneLocked(tso);
debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
case BlockedOnMVar:
- debugBelch("is blocked on an MVar");
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnException:
debugBelch("is blocked on delivering an exception to thread %d",
# endif
for (t = all_threads; t != END_TSO_QUEUE; ) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
#if defined(DEBUG)
{
void *label = lookupThreadLabel(t->id);
}
}
}
-
+
#ifdef DEBUG
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ }
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
/*
Print a whole blocking queue attached to node (debugging only).
*/