StgTSO *run_queue_tl = NULL;
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
+StgTSO *blackhole_queue = NULL;
StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
#endif
+/* The blackhole_queue should be checked for threads to wake up. See
+ * Schedule.h for more thorough comment.
+ */
+rtsBool blackholes_need_checking = rtsFalse;
+
/* Linked list of all threads.
* Used for detecting garbage collected threads.
*/
static void scheduleHandleInterrupt(void);
static void scheduleStartSignalHandlers(void);
static void scheduleCheckBlockedThreads(void);
+static void scheduleCheckBlackHoles(void);
static void scheduleDetectDeadlock(void);
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
static void scheduleDoGC(void);
static void unblockThread(StgTSO *tso);
+static rtsBool checkBlackHoles(void);
static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
Capability *initialCapability
);
scheduleStartSignalHandlers();
+ // Only check the black holes here if we've nothing else to do.
+ // During normal execution, the black hole list only gets checked
+ // at GC time, to avoid repeatedly traversing this possibly long
+ // list each time around the scheduler.
+ if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
+
scheduleCheckBlockedThreads();
scheduleDetectDeadlock();
startHeapProfTimer();
#endif
- /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
- /* Run the current thread
- */
+ // ----------------------------------------------------------------------
+ // Run the current thread
+
prev_what_next = t->what_next;
errno = t->saved_errno;
barf("schedule: invalid what_next field");
}
+ // We have run some Haskell code: there might be blackhole-blocked
+ // threads to wake up now.
+ if ( blackhole_queue != END_TSO_QUEUE ) {
+ blackholes_need_checking = rtsTrue;
+ }
+
in_haskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// And save the current errno in this thread.
t->saved_errno = errno;
- /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
+ // ----------------------------------------------------------------------
/* Costs for the scheduler are assigned to CCS_SYSTEM */
#if defined(PROFILING)
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
#endif
- awaitEvent( EMPTY_RUN_QUEUE() );
+ awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+ }
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Check for threads blocked on BLACKHOLEs that can be woken up
+ * ASSUMES: sched_mutex
+ * ------------------------------------------------------------------------- */
+static void
+scheduleCheckBlackHoles( void )
+{
+ if ( blackholes_need_checking )
+ {
+ checkBlackHoles();
+ blackholes_need_checking = rtsFalse;
}
}
{
/*
* Detect deadlock: when we have no threads to run, there are no
- * threads waiting on I/O or sleeping, and all the other tasks are
- * waiting for work, we must have a deadlock of some description.
- *
- * We first try to find threads blocked on themselves (ie. black
- * holes), and generate NonTermination exceptions where necessary.
- *
- * If no threads are black holed, we have a deadlock situation, so
- * inform all the main threads.
+ * threads blocked, waiting for I/O, or sleeping, and all the
+ * other tasks are waiting for work, we must have a deadlock of
+ * some description.
*/
-#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
if ( EMPTY_THREAD_QUEUES() )
{
+#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
// Garbage collection can release some new threads due to
barf("deadlock: main thread blocked in a strange way");
}
}
- }
#elif defined(RTS_SUPPORTS_THREADS)
// ToDo: add deadlock detection in threaded RTS
#elif defined(PARALLEL_HASKELL)
// ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
#endif
+ }
}
/* ----------------------------------------------------------------------------
if (cap->r.rHpAlloc > BLOCK_SIZE) {
// if so, get one and push it on the front of the nursery.
bdescr *bd;
- nat blocks;
+ lnat blocks;
- blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
+ blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
IF_DEBUG(scheduler,
debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
}
}
+ // so this happens periodically:
+ scheduleCheckBlackHoles();
+
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
// being GC'd, and we don't want the "main thread has been GC'd" panic.
ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+ ASSERT(blackhole_queue == END_TSO_QUEUE);
ASSERT(sleeping_queue == END_TSO_QUEUE);
}
blocked_queue_hds[i] = END_TSO_QUEUE;
blocked_queue_tls[i] = END_TSO_QUEUE;
ccalling_threadss[i] = END_TSO_QUEUE;
+ blackhole_queue[i] = END_TSO_QUEUE;
sleeping_queue = END_TSO_QUEUE;
}
#else
run_queue_tl = END_TSO_QUEUE;
blocked_queue_hd = END_TSO_QUEUE;
blocked_queue_tl = END_TSO_QUEUE;
+ blackhole_queue = END_TSO_QUEUE;
sleeping_queue = END_TSO_QUEUE;
#endif
}
#endif
+ if (blackhole_queue != END_TSO_QUEUE) {
+ evac((StgClosure **)&blackhole_queue);
+ }
+
if (suspended_ccalling_threads != END_TSO_QUEUE) {
evac((StgClosure **)&suspended_ccalling_threads);
}
static StgTSO *
threadStackOverflow(StgTSO *tso)
{
- nat new_stack_size, new_tso_size, stack_words;
+ nat new_stack_size, stack_words;
+ lnat new_tso_size;
StgPtr new_sp;
StgTSO *dest;
* Finally round up so the TSO ends up as a whole number of blocks.
*/
new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
- new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
+ new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
TSO_STRUCT_SIZE)/sizeof(W_);
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
}
case BlockedOnBlackHole:
- ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
{
- StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
- last = &bq->blocking_queue;
- for (t = bq->blocking_queue; t != END_TSO_QUEUE;
+ last = &blackhole_queue;
+ for (t = blackhole_queue; t != END_TSO_QUEUE;
last = &t->link, t = t->link) {
if (t == tso) {
*last = tso->link;
#endif
/* -----------------------------------------------------------------------------
+ * checkBlackHoles()
+ *
+ * Check the blackhole_queue for threads that can be woken up. We do
+ * this periodically: before every GC, and whenever the run queue is
+ * empty.
+ *
+ * An elegant solution might be to just wake up all the blocked
+ * threads with awakenBlockedQueue occasionally: they'll go back to
+ * sleep again if the object is still a BLACKHOLE. Unfortunately this
+ * doesn't give us a way to tell whether we've actually managed to
+ * wake up any threads, so we would be busy-waiting.
+ *
+ * -------------------------------------------------------------------------- */
+
+static rtsBool
+checkBlackHoles( void )
+{
+ StgTSO **prev, *t;
+ rtsBool any_woke_up = rtsFalse;
+ StgHalfWord type;
+
+ IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
+
+ // ASSUMES: sched_mutex
+ prev = &blackhole_queue;
+ t = blackhole_queue;
+ while (t != END_TSO_QUEUE) {
+ ASSERT(t->why_blocked == BlockedOnBlackHole);
+ type = get_itbl(t->block_info.closure)->type;
+ if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
+ t = unblockOneLocked(t);
+ *prev = t;
+ any_woke_up = rtsTrue;
+ } else {
+ prev = &t->link;
+ t = t->link;
+ }
+ }
+
+ return any_woke_up;
+}
+
+/* -----------------------------------------------------------------------------
* raiseAsync()
*
* The following function implements the magic for raising an
{
switch (tso->why_blocked) {
case BlockedOnRead:
- debugBelch("is blocked on read from fd %d", tso->block_info.fd);
+ debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
break;
case BlockedOnWrite:
- debugBelch("is blocked on write to fd %d", tso->block_info.fd);
+ debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
break;
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
- debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+ debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
break;
#endif
case BlockedOnDelay:
- debugBelch("is blocked until %d", tso->block_info.target);
+ debugBelch("is blocked until %ld", tso->block_info.target);
break;
case BlockedOnMVar:
debugBelch("is blocked on an MVar");
} /* for */
debugBelch("\n");
}
-#else
-/*
- Nice and easy: only TSOs on the blocking queue
-*/
-void
-print_bq (StgClosure *node)
-{
- StgTSO *tso;
-
- ASSERT(node!=(StgClosure*)NULL); // sanity check
- for (tso = ((StgBlockingQueue*)node)->blocking_queue;
- tso != END_TSO_QUEUE;
- tso=tso->link) {
- ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
- ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
- debugBelch(" TSO %d (%p),", tso->id, tso);
- }
- debugBelch("\n");
-}
# endif
#if defined(PARALLEL_HASKELL)