* exclusive access to the RTS and all its data structures (that are not
* locked by the Scheduler's mutex).
*
- * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
- *
+ * thread_ready_cond is signalled whenever
+ * !noCapabilities && !EMPTY_RUN_QUEUE().
*/
Condition thread_ready_cond = INIT_COND_VAR;
#define UNUSED_IF_NOT_SMP STG_UNUSED
#endif
+#if defined(RTS_USER_SIGNALS)
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending())
+#else
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted)
+#endif
+
/* ----------------------------------------------------------------------------
Initialisation
------------------------------------------------------------------------- */
rts_n_free_capabilities = 1;
#endif
// Signal that a capability is available
- if (rts_n_waiting_tasks > 0) {
+ if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
signalCondition(&thread_ready_cond);
}
startSchedulerTaskIfNecessary();
if ( noCapabilities() || passingCapability ) {
rts_n_waiting_workers++;
- wakeBlockedWorkerThread();
context_switch = 1; // make sure it's our turn soon
waitCondition(&returning_worker_cond, pMutex);
#if defined(SMP)
// Pre-condition: pMutex is assumed held, the current thread
// holds the capability pointed to by pCap.
- if ( rts_n_waiting_workers > 0 || passingCapability ) {
- IF_DEBUG(scheduler, sched_belch("worker: giving up capability"));
+ if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
+ IF_DEBUG(scheduler,
+ if (rts_n_waiting_workers > 0) {
+ sched_belch("worker: giving up capability (returning wkr)");
+ } else if (passingCapability) {
+ sched_belch("worker: giving up capability (passing capability)");
+ } else {
+ sched_belch("worker: giving up capability (no threads to run)");
+ }
+ );
releaseCapability(*pCap);
*pCap = NULL;
}
* passed to this thread using passCapability.
* ------------------------------------------------------------------------- */
-void
+void
waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
{
// Pre-condition: pMutex is held.
- while ( noCapabilities() ||
- (passingCapability && passTarget != pThreadCond)) {
+ while ( noCapabilities() ||
+ (passingCapability && passTarget != pThreadCond) ||
+ !ANY_WORK_TO_DO()) {
IF_DEBUG(scheduler,
sched_belch("worker: wait for capability (cond: %p)",
pThreadCond));
#endif /* RTS_SUPPORTS_THREADS */
+/* ----------------------------------------------------------------------------
+ threadRunnable()
+
+ Signals that a thread has been placed on the run queue, so a worker
+ might need to be woken up to run it.
+
+ ToDo: should check whether the thread at the front of the queue is
+ bound, and if so wake up the appropriate worker.
+ -------------------------------------------------------------------------- */
+
+void
+threadRunnable ( void )
+{
+#if defined(RTS_SUPPORTS_THREADS)
+ if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
+ signalCondition(&thread_ready_cond);
+ }
+ startSchedulerTaskIfNecessary();
+#endif
+}
+
/* ------------------------------------------------------------------------- */
#if defined(SMP)
//
extern void releaseCapability( Capability* cap );
+// Signal that a thread has become runnable
+//
+extern void threadRunnable ( void );
+
#ifdef RTS_SUPPORTS_THREADS
// Gives up the current capability IFF there is a higher-priority
// thread waiting for it. This happens in one of two ways:
waitReadzh_fast
{
/* args: R1 */
+#ifdef THREADED_RTS
+ foreign "C" barf("waitRead# on threaded RTS");
+#endif
+
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
StgTSO_block_info(CurrentTSO) = R1;
waitWritezh_fast
{
/* args: R1 */
+#ifdef THREADED_RTS
+ foreign "C" barf("waitWrite# on threaded RTS");
+#endif
+
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
StgTSO_block_info(CurrentTSO) = R1;
W_ t, prev, target;
#endif
+#ifdef THREADED_RTS
+ foreign "C" barf("delay# on threaded RTS");
+#endif
+
/* args: R1 (microsecond delay amount) */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
W_ ares;
CInt reqID;
+#ifdef THREADED_RTS
+ foreign "C" barf("asyncRead# on threaded RTS");
+#endif
+
/* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
W_ ares;
CInt reqID;
+#ifdef THREADED_RTS
+ foreign "C" barf("asyncWrite# on threaded RTS");
+#endif
+
/* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
+ if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
+ {
#if defined(RTS_SUPPORTS_THREADS)
- || EMPTY_RUN_QUEUE()
+ // We shouldn't be here...
+ barf("schedule: awaitEvent() in threaded RTS");
#endif
- )
- {
- awaitEvent( EMPTY_RUN_QUEUE() );
+ awaitEvent( EMPTY_RUN_QUEUE() );
}
// we can be interrupted while waiting for I/O...
if (interrupted) continue;
if ( EMPTY_THREAD_QUEUES() )
{
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
+
// Garbage collection can release some new threads due to
// either (a) finalizers or (b) threads resurrected because
- // they are about to be send BlockedOnDeadMVar. Any threads
- // thus released will be immediately runnable.
+ // they are unreachable and will therefore be sent an
+ // exception. Any threads thus released will be immediately
+ // runnable.
GarbageCollect(GetRoots,rtsTrue);
-
- if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-
- IF_DEBUG(scheduler,
- sched_belch("still deadlocked, checking for black holes..."));
- detectBlackHoles();
-
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
#if defined(RTS_USER_SIGNALS)
stgFree(m);
}
-# ifdef RTS_SUPPORTS_THREADS
- resetTaskManagerAfterFork(); // tell startTask() and friends that
- startingWorkerThread = rtsFalse; // we have no worker threads any more
- resetWorkerWakeupPipeAfterFork();
-# endif
-
rc = rts_evalStableIO(entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",rc);
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
- /* Other threads _might_ be available for execution; signal this */
- THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
void
scheduleThread_(StgTSO *tso)
{
- // Precondition: sched_mutex must be held.
// The thread goes at the *end* of the run-queue, to avoid possible
// starvation of any threads already on the queue.
APPEND_TO_RUN_QUEUE(tso);
- THREAD_RUNNABLE();
+ threadRunnable();
}
void
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
APPEND_TO_RUN_QUEUE(tso);
- // NB. Don't call THREAD_RUNNABLE() here, because the thread is
+ // NB. Don't call threadRunnable() here, because the thread is
// bound and only runnable by *this* OS thread, so waking up other
// workers will just slow things down.
next = bqe->link;
((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
- THREAD_RUNNABLE();
+ threadRunnable();
unblockCount(bqe, node);
/* reset blocking status after dumping event */
((StgTSO *)bqe)->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
APPEND_TO_RUN_QUEUE(tso);
- THREAD_RUNNABLE();
+ threadRunnable();
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
return next;
}
{
interrupted = 1;
context_switch = 1;
-#ifdef RTS_SUPPORTS_THREADS
- wakeBlockedWorkerThread();
-#endif
}
/* -----------------------------------------------------------------------------
}
}
-/* -----------------------------------------------------------------------------
- * Blackhole detection: if we reach a deadlock, test whether any
- * threads are blocked on themselves. Any threads which are found to
- * be self-blocked get sent a NonTermination exception.
- *
- * This is only done in a deadlock situation in order to avoid
- * performance overhead in the normal case.
- *
- * Locks: sched_mutex is held upon entry and exit.
- * -------------------------------------------------------------------------- */
-
-#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
-static void
-detectBlackHoles( void )
-{
- StgTSO *tso = all_threads;
- StgPtr frame;
- StgClosure *blocked_on;
- StgRetInfoTable *info;
-
- for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
-
- while (tso->what_next == ThreadRelocated) {
- tso = tso->link;
- ASSERT(get_itbl(tso)->type == TSO);
- }
-
- if (tso->why_blocked != BlockedOnBlackHole) {
- continue;
- }
- blocked_on = tso->block_info.closure;
-
- frame = tso->sp;
-
- while(1) {
- info = get_ret_itbl((StgClosure *)frame);
- switch (info->i.type) {
- case UPDATE_FRAME:
- if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
- /* We are blocking on one of our own computations, so
- * send this thread the NonTermination exception.
- */
- IF_DEBUG(scheduler,
- sched_belch("thread %d is blocked on itself", tso->id));
- raiseAsync(tso, (StgClosure *)NonTermination_closure);
- goto done;
- }
-
- frame = (StgPtr)((StgUpdateFrame *)frame + 1);
- continue;
-
- case STOP_FRAME:
- goto done;
-
- // normal stack frames; do nothing except advance the pointer
- default:
- frame += stack_frame_sizeW((StgClosure *)frame);
- }
- }
- done: ;
- }
-}
-#endif
-
/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* [Also provides useful information when debugging threaded programs
extern nat RTS_VAR(rts_n_waiting_tasks);
#endif
-StgBool rtsSupportsBoundThreads(void);
StgBool isThreadBound(StgTSO *tso);
extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
} \
blocked_queue_tl = tso;
-/* Signal that a runnable thread has become available, in
- * case there are any waiting tasks to execute it.
- */
-#if defined(RTS_SUPPORTS_THREADS)
-#define THREAD_RUNNABLE() \
- wakeBlockedWorkerThread(); \
- context_switch = 1;
-#else
-#define THREAD_RUNNABLE() /* nothing */
-#endif
-
/* Check whether various thread queues are empty
*/
#define EMPTY_QUEUE(q) (q == END_TSO_QUEUE)
/* last timestamp */
nat timestamp = 0;
-#ifdef RTS_SUPPORTS_THREADS
-static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
-static rtsBool workerWakeupPending = rtsFalse;
-static int workerWakeupPipe[2];
-static rtsBool workerWakeupInited = rtsFalse;
-#endif
-
/* There's a clever trick here to avoid problems when the time wraps
* around. Since our maximum delay is smaller than 31 bits of ticks
* (it's actually 31 bits of microseconds), we can safely check
}
}
-#ifdef RTS_SUPPORTS_THREADS
- if(!workerWakeupInited) {
- pipe(workerWakeupPipe);
- workerWakeupInited = rtsTrue;
- }
- FD_SET(workerWakeupPipe[0], &rfd);
- maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
-#endif
-
- /* Release the scheduler lock while we do the poll.
- * this means that someone might muck with the blocked_queue
- * while we do this, but it shouldn't matter:
- *
- * - another task might poll for I/O and remove one
- * or more threads from the blocked_queue.
- * - more I/O threads may be added to blocked_queue.
- * - more delayed threads may be added to blocked_queue. We'll
- * just subtract delta from their delays after the poll.
- *
- * I believe none of these cases lead to trouble --SDM.
- */
-
-#ifdef RTS_SUPPORTS_THREADS
- isWorkerBlockedInAwaitEvent = rtsTrue;
- workerWakeupPending = rtsFalse;
-#endif
- RELEASE_LOCK(&sched_mutex);
-
/* Check for any interesting events */
tv.tv_sec = min / 1000000;
barf("select failed");
}
}
- ACQUIRE_LOCK(&sched_mutex);
-#ifdef RTS_SUPPORTS_THREADS
- isWorkerBlockedInAwaitEvent = rtsFalse;
-#endif
/* We got a signal; could be one of ours. If so, we need
* to start up the signal handler straight away, otherwise
*/
#if defined(RTS_USER_SIGNALS)
if (signals_pending()) {
- RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
- ACQUIRE_LOCK(&sched_mutex);
return; /* still hold the lock */
}
#endif
if (run_queue_hd != END_TSO_QUEUE) {
return; /* still hold the lock */
}
-
-#ifdef RTS_SUPPORTS_THREADS
- /* If another worker thread wants to take over,
- * return to the scheduler
- */
- if (needToYieldToReturningWorker()) {
- return; /* still hold the lock */
- }
-#endif
-
-#ifdef RTS_SUPPORTS_THREADS
- isWorkerBlockedInAwaitEvent = rtsTrue;
-#endif
- RELEASE_LOCK(&sched_mutex);
}
- ACQUIRE_LOCK(&sched_mutex);
-
/* Step through the waiting queue, unblocking every thread that now has
* a file descriptor in a ready state.
*/
}
}
-#if defined(RTS_SUPPORTS_THREADS)
- // if we were woken up by wakeBlockedWorkerThread,
- // read the dummy byte from the pipe
- if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
- unsigned char dummy;
- wait = rtsFalse;
- read(workerWakeupPipe[0],&dummy,1);
- }
-#endif
} while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
}
-
-
-#ifdef RTS_SUPPORTS_THREADS
-/* wakeBlockedWorkerThread
- *
- * If a worker thread is currently blocked within awaitEvent,
- * wake it.
- * Must be called with sched_mutex held.
- */
-void
-wakeBlockedWorkerThread()
-{
- if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
- unsigned char dummy = 42; // Any value will do here
-
- // write something so that select() wakes up
- write(workerWakeupPipe[1],&dummy,1);
- workerWakeupPending = rtsTrue;
- }
-}
-
-/* resetWorkerWakeupPipeAfterFork
- *
- * To be called right after a fork().
- * After the fork(), the worker wakeup pipe will be shared
- * with the parent process, and that's something we don't want.
- */
-void
-resetWorkerWakeupPipeAfterFork()
-{
- if(workerWakeupInited) {
- close(workerWakeupPipe[0]);
- close(workerWakeupPipe[1]);
- }
- workerWakeupInited = rtsFalse;
-}
-#endif
StgPtr pending_handler_buf[N_PENDING_HANDLERS];
StgPtr *next_pending_handler = pending_handler_buf;
+/* -----------------------------------------------------------------------------
+ * Signal handling
+ * -------------------------------------------------------------------------- */
+
#ifdef RTS_SUPPORTS_THREADS
pthread_t signalHandlingThread;
#endif
- // Handle all signals in the current thread.
- // Called from Capability.c whenever the main capability is granted to a thread
- // and in installDefaultHandlers
+// Handle all signals in the current thread.
+// Called from Capability.c whenever the main capability is granted to a thread
+// and in installDefaultHandlers
void
-handleSignalsInThisThread()
+handleSignalsInThisThread(void)
{
#ifdef RTS_SUPPORTS_THREADS
signalHandlingThread = pthread_self();
#endif
}
-
/* -----------------------------------------------------------------------------
* Allocate/resize the table of signal handlers.
* -------------------------------------------------------------------------- */
extern void initDefaultHandlers(void);
extern void handleSignalsInThisThread(void);
+extern void handleSignalsInPrevThread(void);
#elif defined(mingw32_TARGET_OS)
#define RTS_USER_SIGNALS 1