#include "Capability.h"
#include "Schedule.h"
#include "Sparks.h"
+#include "Trace.h"
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
// locking, so we don't do that.
Capability *last_free_capability;
+/* GC indicator, in scope for the scheduler, init'ed to false */
+volatile StgWord waiting_for_gc = 0;
+
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
globalWorkToDo (void)
cap->mut_lists[g] = NULL;
}
- cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+ cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
+ cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
cap->free_trec_chunks = END_STM_CHUNK_LIST;
cap->free_trec_headers = NO_TREC;
cap->transaction_tokens = 0;
initCapability(&capabilities[i], i);
}
- IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
- n_capabilities));
+ debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
#else /* !THREADED_RTS */
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
- IF_DEBUG(scheduler,
- sched_belch("passing capability %d to %s %p",
- cap->no, task->tso ? "bound task" : "worker",
- (void *)task->id));
+ trace(TRACE_sched | DEBUG_sched,
+ "passing capability %d to %s %p",
+ cap->no, task->tso ? "bound task" : "worker",
+ (void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
// the wakeup flag is needed because signalCondition() doesn't
return;
}
+ /* if waiting_for_gc was the reason to release the cap: thread
+ comes from yieldCap->releaseAndQueueWorker. Unconditionally set
+ cap. free and return (see default after the if-protected other
+ special cases). Thread will wait on cond.var and re-acquire the
+ same cap after GC (GC-triggering cap. calls releaseCap and
+ enters the spare_workers case)
+ */
+ if (waiting_for_gc) {
+ last_free_capability = cap; // needed?
+ trace(TRACE_sched | DEBUG_sched,
+ "GC pending, set capability %d free", cap->no);
+ return;
+ }
+
+
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
// are threads that need to be completed. If the system is
// shutting down, we never create a new worker.
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
- IF_DEBUG(scheduler,
- sched_belch("starting new worker on capability %d", cap->no));
+ debugTrace(DEBUG_sched,
+ "starting new worker on capability %d", cap->no);
startWorkerTask(cap, workerStart);
return;
}
}
last_free_capability = cap;
- IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
+ trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
}
void
ACQUIRE_LOCK(&cap->lock);
- IF_DEBUG(scheduler,
- sched_belch("returning; I want capability %d", cap->no));
+ debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
if (!cap->running_task) {
// It's free; just grab it
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- IF_DEBUG(scheduler,
- sched_belch("returning; got capability %d", cap->no));
+ trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
*pCap = cap;
#endif
// The fast path has no locking, if we don't enter this while loop
- while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
- IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
+ while ( waiting_for_gc
+ /* i.e. another capability triggered HeapOverflow, is busy
+ getting capabilities (stopping their owning tasks) */
+ || cap->returning_tasks_hd != NULL
+ /* cap reserved for another task */
+ || !anyWorkForMe(cap,task)
+ /* cap/task have no work */
+ ) {
+ debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
// again.
task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock);
- IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
+ debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
+
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
- IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
+ debugTrace(DEBUG_sched,
+ "capability %d is owned by another task", cap->no);
RELEASE_LOCK(&cap->lock);
continue;
}
break;
}
- IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
+ trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
}
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+wakeupThreadOnCapability (Capability *my_cap,
+ Capability *other_cap,
+ StgTSO *tso)
{
- ASSERT(tso->cap == cap);
- ASSERT(tso->bound ? tso->bound->cap == cap : 1);
+ ACQUIRE_LOCK(&other_cap->lock);
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task == NULL) {
+ // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = other_cap;
+ }
+ tso->cap = other_cap;
+
+ ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
+
+ if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
- appendToRunQueue(cap,tso);
- // start it up
- cap->running_task = myTask(); // precond for releaseCapability_()
- releaseCapability_(cap);
+ other_cap->running_task = myTask();
+ // precond for releaseCapability_() and appendToRunQueue()
+
+ appendToRunQueue(other_cap,tso);
+
+ trace(TRACE_sched, "resuming capability %d", other_cap->no);
+ releaseCapability_(other_cap);
} else {
- appendToWakeupQueue(cap,tso);
+ appendToWakeupQueue(my_cap,other_cap,tso);
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
- RELEASE_LOCK(&cap->lock);
+
+ RELEASE_LOCK(&other_cap->lock);
}
/* ----------------------------------------------------------------------------
ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
if (cap->spare_workers) {
+ trace(TRACE_sched, "resuming capability %d", cap->no);
task = cap->spare_workers;
ASSERT(!task->stopped);
giveCapabilityToTask(cap,task);
* ------------------------------------------------------------------------- */
void
-shutdownCapability (Capability *cap, Task *task)
+shutdownCapability (Capability *cap, Task *task, rtsBool safe)
{
nat i;
task->cap = cap;
- for (i = 0; i < 50; i++) {
- IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
+ // Loop indefinitely until all the workers have exited and there
+ // are no Haskell threads left. We used to bail out after 50
+ // iterations of this loop, but that occasionally left a worker
+ // running which caused problems later (the closeMutex() below
+ // isn't safe, for one thing).
+
+ for (i = 0; /* i < 50 */; i++) {
+ debugTrace(DEBUG_sched,
+ "shutting down capability %d, attempt %d", cap->no, i);
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task) {
RELEASE_LOCK(&cap->lock);
- IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
+ debugTrace(DEBUG_sched, "not owner, yielding");
yieldThread();
continue;
}
cap->running_task = task;
+
+ if (cap->spare_workers) {
+ // Look for workers that have died without removing
+ // themselves from the list; this could happen if the OS
+ // summarily killed the thread, for example. This
+ // actually happens on Windows when the system is
+ // terminating the program, and the RTS is running in a
+ // DLL.
+ Task *t, *prev;
+ prev = NULL;
+ for (t = cap->spare_workers; t != NULL; t = t->next) {
+ if (!osThreadIsAlive(t->id)) {
+ debugTrace(DEBUG_sched,
+ "worker thread %p has died unexpectedly", (void *)t->id);
+ if (!prev) {
+ cap->spare_workers = t->next;
+ } else {
+ prev->next = t->next;
+ }
+ prev = t;
+ }
+ }
+ }
+
if (!emptyRunQueue(cap) || cap->spare_workers) {
- IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
+ debugTrace(DEBUG_sched,
+ "runnable threads or workers still alive, yielding");
releaseCapability_(cap); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;
}
- IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
+
+ // If "safe", then busy-wait for any threads currently doing
+ // foreign calls. If we're about to unload this DLL, for
+ // example, we need to be sure that there are no OS threads
+ // that will try to return to code that has been unloaded.
+ // We can be a bit more relaxed when this is a standalone
+ // program that is about to terminate, and let safe=false.
+ if (cap->suspended_ccalling_tasks && safe) {
+ debugTrace(DEBUG_sched,
+ "thread(s) are involved in foreign calls, yielding");
+ cap->running_task = NULL;
+ RELEASE_LOCK(&cap->lock);
+ yieldThread();
+ continue;
+ }
+
+ debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
+ freeCapability(cap);
RELEASE_LOCK(&cap->lock);
break;
}
// we now have the Capability, its run queue and spare workers
// list are both empty.
+
+ // ToDo: we can't drop this mutex, because there might still be
+ // threads performing foreign calls that will eventually try to
+ // return via resumeThread() and attempt to grab cap->lock.
+ // closeMutex(&cap->lock);
}
/* ----------------------------------------------------------------------------
#endif /* THREADED_RTS */
+void
+freeCapability (Capability *cap) {
+ stgFree(cap->mut_lists);
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ freeSparkPool(&cap->r.rSparks);
+#endif
+}
+/* ---------------------------------------------------------------------------
+ Mark everything directly reachable from the Capabilities. When
+ using multiple GC threads, each GC thread marks all Capabilities
+ for which (c `mod` n == 0), for Capability c and thread n.
+ ------------------------------------------------------------------------ */
+
+void
+markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
+{
+ nat i;
+ Capability *cap;
+ Task *task;
+
+ // Each GC thread is responsible for following roots from the
+ // Capability of the same number. There will usually be the same
+ // or fewer Capabilities as GC threads, but just in case there
+ // are more, we mark every Capability whose number is the GC
+ // thread's index plus a multiple of the number of GC threads.
+ for (i = i0; i < n_capabilities; i += delta) {
+ cap = &capabilities[i];
+ evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
+ evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
+ evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
+ for (task = cap->suspended_ccalling_tasks; task != NULL;
+ task=task->next) {
+ debugTrace(DEBUG_sched,
+ "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
+ evac(user, (StgClosure **)(void *)&task->suspended_tso);
+ }
+
+#if defined(THREADED_RTS)
+ traverseSparkQueue (evac, user, cap);
+#endif
+ }
+
+#if !defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&blocked_queue_hd);
+ evac(user, (StgClosure **)(void *)&blocked_queue_tl);
+ evac(user, (StgClosure **)(void *)&sleeping_queue);
+#endif
+}
+
+// This function is used by the compacting GC to thread all the
+// pointers from spark queues.
+void
+traverseSparkQueues (evac_fn evac USED_IF_THREADS, void *user USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ for (i = 0; i < n_capabilities; i++) {
+ traverseSparkQueue (evac, user, &capabilities[i]);
+ }
+#endif // THREADED_RTS
+
+}
+
+void
+markCapabilities (evac_fn evac, void *user)
+{
+ markSomeCapabilities(evac, user, 0, 1);
+}