#include "Threads.h"
#include "Timer.h"
#include "ThreadPaused.h"
+#include "Messages.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
#endif
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up. See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
/* Set to true when the latest garbage collection failed to reclaim
* enough space, and the runtime should proceed to shut itself down in
* an orderly fashion (emitting profiling info etc.)
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task, rtsBool);
+static void scheduleYield (Capability **pcap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
-static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
-static rtsBool checkBlackHoles(Capability *cap);
-
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
-/* -----------------------------------------------------------------------------
- * Putting a thread on the run queue: different scheduling policies
- * -------------------------------------------------------------------------- */
-
-STATIC_INLINE void
-addToRunQueue( Capability *cap, StgTSO *t )
-{
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
-}
-
/* ---------------------------------------------------------------------------
Main scheduling loop.
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
- rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
// ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
}
- yield:
- scheduleYield(&cap,task,force_yield);
- force_yield = rtsFalse;
+ scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
startHeapProfTimer();
- // Check for exceptions blocked on this thread
- maybePerformBlockedException (cap, t);
-
// ----------------------------------------------------------------------
// Run the current thread
// happened. So find the new location:
t = cap->r.rCurrentTSO;
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- // Lock-free test here should be ok, we're just setting a flag.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
// And save the current errno in this thread.
// XXX: possibly bogus for SMP because this thread might already
// be running again, see code below.
traceEventStopThread(cap, t, ret);
-#if defined(THREADED_RTS)
- // If ret is ThreadBlocked, and this Task is bound to the TSO that
- // blocked, we are in limbo - the TSO is now owned by whatever it
- // is blocked on, and may in fact already have been woken up,
- // perhaps even on a different Capability. It may be the case
- // that task->cap != cap. We better yield this Capability
- // immediately and return to normaility.
- if (ret == ThreadBlocked) {
- force_yield = rtsTrue;
- goto yield;
- }
-#endif
-
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
} /* end of while() */
}
+/* -----------------------------------------------------------------------------
+ * Run queue operations
+ * -------------------------------------------------------------------------- */
+
+void
+removeFromRunQueue (Capability *cap, StgTSO *tso)
+{
+ if (tso->block_info.prev == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_hd == tso);
+ cap->run_queue_hd = tso->_link;
+ } else {
+ setTSOLink(cap, tso->block_info.prev, tso->_link);
+ }
+ if (tso->_link == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_tl == tso);
+ cap->run_queue_tl = tso->block_info.prev;
+ } else {
+ setTSOPrev(cap, tso->_link, tso->block_info.prev);
+ }
+ tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
+}
+
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
* ------------------------------------------------------------------------- */
{
scheduleStartSignalHandlers(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
scheduleProcessInbox(cap);
scheduleCheckBlockedThreads(cap);
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
+scheduleYield (Capability **pcap, Task *task)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
//
- // The force_yield flag is used when a bound thread blocks. This
- // is a particularly tricky situation: the current Task does not
- // own the TSO any more, since it is on some queue somewhere, and
- // might be woken up or manipulated by another thread at any time.
- // The TSO and Task might be migrated to another Capability.
- // Certain invariants might be in doubt, such as task->bound->cap
- // == cap. We have to yield the current Capability immediately,
- // no messing around.
- //
- if (!force_yield &&
- !shouldYieldCapability(cap,task) &&
+ if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
- blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
|| t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else if (i == n_free_caps) {
pushed_to_all = rtsTrue;
i = 0;
// keep one for us
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
}
}
cap->run_queue_tl = prev;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
}
#ifdef SPARK_PUSHING
//
if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
- awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
- }
-#endif
-}
-
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-#if defined(THREADED_RTS)
-static void
-executeMessage (Capability *cap, Message *m)
-{
- const StgInfoTable *i;
-
-loop:
- write_barrier(); // allow m->header to be modified by another thread
- i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_THROWTO_info)
- {
- MessageThrowTo *t = (MessageThrowTo *)m;
- nat r;
- const StgInfoTable *i;
-
- i = lockClosure((StgClosure*)m);
- if (i != &stg_MSG_THROWTO_info) {
- unlockClosure((StgClosure*)m, i);
- goto loop;
- }
-
- debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
- (lnat)t->source->id, (lnat)t->target->id);
-
- ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
- ASSERT(t->source->block_info.closure == (StgClosure *)m);
-
- r = throwToMsg(cap, t);
-
- switch (r) {
- case THROWTO_SUCCESS:
- ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
- t->source->sp += 3;
- unblockOne(cap, t->source);
- // this message is done
- unlockClosure((StgClosure*)m, &stg_IND_info);
- break;
- case THROWTO_BLOCKED:
- // unlock the message
- unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
- break;
- }
- }
- else if (i == &stg_IND_info)
- {
- // message was revoked
- return;
- }
- else if (i == &stg_WHITEHOLE_info)
- {
- goto loop;
- }
- else
- {
- barf("executeMessage: %p", i);
- }
-}
-#endif
-
-static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- Message *m;
-
- while (!emptyInbox(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- m = cap->inbox;
- cap->inbox = m->link;
- RELEASE_LOCK(&cap->lock);
- executeMessage(cap, (Message *)m);
+ awaitEvent (emptyRunQueue(cap));
}
#endif
}
/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
- if ( blackholes_need_checking ) // check without the lock first
- {
- ACQUIRE_LOCK(&sched_mutex);
- if ( blackholes_need_checking ) {
- blackholes_need_checking = rtsFalse;
- // important that we reset the flag *before* checking the
- // blackhole queue, otherwise we could get deadlock. This
- // happens as follows: we wake up a thread that
- // immediately runs on another Capability, blocks on a
- // blackhole, and then we reset the blackholes_need_checking flag.
- checkBlackHoles(cap);
- }
- RELEASE_LOCK(&sched_mutex);
- }
-}
-
-/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
* ------------------------------------------------------------------------- */
#endif
/* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ Message *m;
+
+ while (!emptyInbox(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ m = cap->inbox;
+ cap->inbox = m->link;
+ RELEASE_LOCK(&cap->lock);
+ executeMessage(cap, (Message *)m);
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
cap->context_switch = 0;
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
}
//debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
checkTSO(t));
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
return rtsFalse;
}
ASSERT(task->incall->tso == t);
if (t->what_next == ThreadComplete) {
- if (task->ret) {
+ if (task->incall->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(task->ret) = (StgClosure *)task->incall->tso->sp[1];
+ *(task->incall->ret) = (StgClosure *)task->incall->tso->sp[1];
}
- task->stat = Success;
+ task->incall->stat = Success;
} else {
- if (task->ret) {
- *(task->ret) = NULL;
+ if (task->incall->ret) {
+ *(task->incall->ret) = NULL;
}
if (sched_state >= SCHED_INTERRUPTING) {
if (heap_overflow) {
- task->stat = HeapExhausted;
+ task->incall->stat = HeapExhausted;
} else {
- task->stat = Interrupted;
+ task->incall->stat = Interrupted;
}
} else {
- task->stat = Killed;
+ task->incall->stat = Killed;
}
}
#ifdef DEBUG
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
if (gc_type == PENDING_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
waitForGcThreads(cap);
}
-#else /* !THREADED_RTS */
-
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
#endif
IF_DEBUG(scheduler, printAllThreads());
ACQUIRE_LOCK(&cap->lock);
ACQUIRE_LOCK(&cap->running_task->lock);
+ stopTimer(); // See #4074
+
pid = fork();
if (pid) { // parent
+ startTimer(); // #4074
+
RELEASE_LOCK(&sched_mutex);
RELEASE_LOCK(&cap->lock);
RELEASE_LOCK(&cap->running_task->lock);
* the whole system.
*
* The Haskell thread making the C call is put to sleep for the
- * duration of the call, on the susepended_ccalling_threads queue. We
+ * duration of the call, on the suspended_ccalling_threads queue. We
* give out a token to the task, which it can use to resume the thread
* on return from the C function.
+ *
+ * If this is an interruptible C call, this means that the FFI call may be
+ * unceremoniously terminated and should be scheduled on an
+ * unbound worker thread.
* ------------------------------------------------------------------------- */
void *
-suspendThread (StgRegTable *reg)
+suspendThread (StgRegTable *reg, rtsBool interruptible)
{
Capability *cap;
int saved_errno;
threadPaused(cap,tso);
- if ((tso->flags & TSO_BLOCKEX) == 0) {
- tso->why_blocked = BlockedOnCCall;
- tso->flags |= TSO_BLOCKEX;
- tso->flags &= ~TSO_INTERRUPTIBLE;
+ if (interruptible) {
+ tso->why_blocked = BlockedOnCCall_Interruptible;
} else {
- tso->why_blocked = BlockedOnCCall_NoUnblockExc;
+ tso->why_blocked = BlockedOnCCall;
}
// Hand back capability
traceEventRunThread(cap, tso);
- if (tso->why_blocked == BlockedOnCCall) {
+ /* Reset blocking status */
+ tso->why_blocked = NotBlocked;
+
+ if ((tso->flags & TSO_BLOCKEX) == 0) {
// avoid locking the TSO if we don't have to
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
- awakenBlockedExceptionQueue(cap,tso);
+ maybePerformBlockedException(cap,tso);
}
- tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
}
- /* Reset blocking status */
- tso->why_blocked = NotBlocked;
-
cap->r.rCurrentTSO = tso;
cap->in_haskell = rtsTrue;
errno = saved_errno;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- traceEventMigrateThread (cap, tso, capabilities[cpu].no);
- wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
+ migrateThread(cap, tso, &capabilities[cpu]);
}
#else
appendToRunQueue(cap,tso);
tso->cap = cap;
task->incall->tso = tso;
- task->ret = ret;
- task->stat = NoStatus;
+ task->incall->ret = ret;
+ task->incall->stat = NoStatus;
appendToRunQueue(cap,tso);
cap = schedule(cap,task);
- ASSERT(task->stat != NoStatus);
+ ASSERT(task->incall->stat != NoStatus);
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
sleeping_queue = END_TSO_QUEUE;
#endif
- blackhole_queue = END_TSO_QUEUE;
-
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
}
void
-exitScheduler(
- rtsBool wait_foreign
-#if !defined(THREADED_RTS)
- __attribute__((unused))
-#endif
-)
+exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
/* see Capability.c, shutdownCapability() */
{
Task *task = NULL;
* of the stack, so we don't attempt to scavenge any part of the
* dead TSO's stack.
*/
- tso->what_next = ThreadRelocated;
setTSOLink(cap,tso,dest);
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
(long)tso->id, tso_size_w, tso_sizeW(new_tso));
- tso->what_next = ThreadRelocated;
tso->_link = new_tso; // no write barrier reqd: same generation
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
// The TSO attached to this Task may have moved, so update the
// pointer to it.
#endif
/* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up. We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE. Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
- StgTSO **prev, *t;
- rtsBool any_woke_up = rtsFalse;
- StgHalfWord type;
-
- // blackhole_queue is global:
- ASSERT_LOCK_HELD(&sched_mutex);
-
- debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
- // ASSUMES: sched_mutex
- prev = &blackhole_queue;
- t = blackhole_queue;
- while (t != END_TSO_QUEUE) {
- if (t->what_next == ThreadRelocated) {
- t = t->_link;
- continue;
- }
- ASSERT(t->why_blocked == BlockedOnBlackHole);
- type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
- if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
- IF_DEBUG(sanity,checkTSO(t));
- t = unblockOne(cap, t);
- *prev = t;
- any_woke_up = rtsTrue;
- } else {
- prev = &t->_link;
- t = t->_link;
- }
- }
-
- return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
Deleting threads
This is used for interruption (^C) and forking, and corresponds to
-------------------------------------------------------------------------- */
static void
-deleteThread (Capability *cap, StgTSO *tso)
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
{
// NOTE: must only be called on a TSO that we have exclusive
// access to, because we will call throwToSingleThreaded() below.
// we must own all Capabilities.
if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- throwToSingleThreaded(cap,tso,NULL);
+ tso->why_blocked != BlockedOnCCall_Interruptible) {
+ throwToSingleThreaded(tso->cap,tso,NULL);
}
}
// like deleteThread(), but we delete threads in foreign calls, too.
if (tso->why_blocked == BlockedOnCCall ||
- tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
- unblockOne(cap,tso);
+ tso->why_blocked == BlockedOnCCall_Interruptible) {
tso->what_next = ThreadKilled;
+ appendToRunQueue(tso->cap, tso);
} else {
deleteThread(cap,tso);
}
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
- (StgClosure *)raise_closure);
+ updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+ (StgClosure *)raise_closure);
p = next;
continue;
* can wake up threads, remember...).
*/
continue;
+ case BlockedOnMsgThrowTo:
+ // This can happen if the target is masking, blocks on a
+ // black hole, and then is found to be unreachable. In
+ // this case, we want to let the target wake up and carry
+ // on, and do nothing to this thread.
+ continue;
default:
barf("resurrectThreads: thread blocked in a strange way: %d",
tso->why_blocked);