STATIC_INLINE rtsBool
globalWorkToDo (void)
{
- return blackholes_need_checking
- || sched_state >= SCHED_INTERRUPTING
- ;
+ return sched_state >= SCHED_INTERRUPTING
+ || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
newReturningTask (Capability *cap, Task *task)
{
ASSERT_LOCK_HELD(&cap->lock);
- ASSERT(task->return_link == NULL);
+ ASSERT(task->next == NULL);
if (cap->returning_tasks_hd) {
- ASSERT(cap->returning_tasks_tl->return_link == NULL);
- cap->returning_tasks_tl->return_link = task;
+ ASSERT(cap->returning_tasks_tl->next == NULL);
+ cap->returning_tasks_tl->next = task;
} else {
cap->returning_tasks_hd = task;
}
Task *task;
task = cap->returning_tasks_hd;
ASSERT(task);
- cap->returning_tasks_hd = task->return_link;
+ cap->returning_tasks_hd = task->next;
if (!cap->returning_tasks_hd) {
cap->returning_tasks_tl = NULL;
}
- task->return_link = NULL;
+ task->next = NULL;
return task;
}
#endif
initMutex(&cap->lock);
cap->running_task = NULL; // indicates cap is free
cap->spare_workers = NULL;
- cap->suspended_ccalling_tasks = NULL;
+ cap->n_spare_workers = 0;
+ cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
- cap->wakeup_queue_hd = END_TSO_QUEUE;
- cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks_created = 0;
+ cap->sparks_dud = 0;
cap->sparks_converted = 0;
- cap->sparks_pruned = 0;
+ cap->sparks_gcd = 0;
+ cap->sparks_fizzled = 0;
#endif
cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
debugTrace(DEBUG_sched, "passing capability %d to %s %p",
- cap->no, task->tso ? "bound task" : "worker",
+ cap->no, task->incall->tso ? "bound task" : "worker",
(void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
// assertion is false: in schedule() we force a yield after
// ThreadBlocked, but the thread may be back on the run queue
// by now.
- task = cap->run_queue_hd->bound;
+ task = cap->run_queue_hd->bound->task;
giveCapabilityToTask(cap,task);
return;
}
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no);
- startWorkerTask(cap, workerStart);
+ startWorkerTask(cap);
return;
}
}
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
- !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
task = cap->running_task;
+ // If the Task is stopped, we shouldn't be yielding, we should
+ // be just exiting.
+ ASSERT(!task->stopped);
+
// If the current task is a worker, save it on the spare_workers
// list of this Capability. A worker can mark itself as stopped,
// in which case it is not replaced on the spare_worker queue.
// This happens when the system is shutting down (see
// Schedule.c:workerStart()).
- // Also, be careful to check that this task hasn't just exited
- // Haskell to do a foreign call (task->suspended_tso).
- if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
- task->next = cap->spare_workers;
- cap->spare_workers = task;
+ if (!isBoundTask(task))
+ {
+ if (cap->n_spare_workers < MAX_SPARE_WORKERS)
+ {
+ task->next = cap->spare_workers;
+ cap->spare_workers = task;
+ cap->n_spare_workers++;
+ }
+ else
+ {
+ debugTrace(DEBUG_sched, "%d spare workers already, exiting",
+ cap->n_spare_workers);
+ releaseCapability_(cap,rtsFalse);
+ // hold the lock until after workerTaskStop; c.f. scheduleWorker()
+ workerTaskStop(task);
+ RELEASE_LOCK(&cap->lock);
+ shutdownThread();
+ }
}
// Bound tasks just float around attached to their TSOs.
continue;
}
- if (task->tso == NULL) {
+ if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it
// again. This is unlikely to happen.
}
cap->spare_workers = task->next;
task->next = NULL;
- }
+ cap->n_spare_workers--;
+ }
cap->running_task = task;
RELEASE_LOCK(&cap->lock);
break;
}
/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-void
-wakeupThreadOnCapability (Capability *my_cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- ACQUIRE_LOCK(&other_cap->lock);
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
-
- if (other_cap->running_task == NULL) {
- // nobody is running this Capability, we can add our thread
- // directly onto the run queue and start up a Task to run it.
-
- other_cap->running_task = myTask();
- // precond for releaseCapability_() and appendToRunQueue()
-
- appendToRunQueue(other_cap,tso);
-
- releaseCapability_(other_cap,rtsFalse);
- } else {
- appendToWakeupQueue(my_cap,other_cap,tso);
- other_cap->context_switch = 1;
- // someone is running on this Capability, so it cannot be
- // freed without first checking the wakeup queue (see
- // releaseCapability_).
- }
-
- RELEASE_LOCK(&other_cap->lock);
-}
-
-/* ----------------------------------------------------------------------------
* prodCapability
*
* If a Capability is currently idle, wake up a Task on it. Used to
if (!osThreadIsAlive(t->id)) {
debugTrace(DEBUG_sched,
"worker thread %p has died unexpectedly", (void *)t->id);
- if (!prev) {
- cap->spare_workers = t->next;
- } else {
- prev->next = t->next;
- }
- prev = t;
+ cap->n_spare_workers--;
+ if (!prev) {
+ cap->spare_workers = t->next;
+ } else {
+ prev->next = t->next;
+ }
+ prev = t;
}
}
}
// that will try to return to code that has been unloaded.
// We can be a bit more relaxed when this is a standalone
// program that is about to terminate, and let safe=false.
- if (cap->suspended_ccalling_tasks && safe) {
+ if (cap->suspended_ccalls && safe) {
debugTrace(DEBUG_sched,
"thread(s) are involved in foreign calls, yielding");
cap->running_task = NULL;
------------------------------------------------------------------------ */
void
-markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
- rtsBool prune_sparks USED_IF_THREADS)
+markCapability (evac_fn evac, void *user, Capability *cap,
+ rtsBool no_mark_sparks USED_IF_THREADS)
{
- nat i;
- Capability *cap;
- Task *task;
+ InCall *incall;
// Each GC thread is responsible for following roots from the
// Capability of the same number. There will usually be the same
// or fewer Capabilities as GC threads, but just in case there
// are more, we mark every Capability whose number is the GC
// thread's index plus a multiple of the number of GC threads.
- for (i = i0; i < n_capabilities; i += delta) {
- cap = &capabilities[i];
- evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
- evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
+ evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
+ evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+ evac(user, (StgClosure **)(void *)&cap->inbox);
#endif
- for (task = cap->suspended_ccalling_tasks; task != NULL;
- task=task->next) {
- evac(user, (StgClosure **)(void *)&task->suspended_tso);
- }
+ for (incall = cap->suspended_ccalls; incall != NULL;
+ incall=incall->next) {
+ evac(user, (StgClosure **)(void *)&incall->suspended_tso);
+ }
#if defined(THREADED_RTS)
- if (prune_sparks) {
- pruneSparkQueue (evac, user, cap);
- } else {
- traverseSparkQueue (evac, user, cap);
- }
-#endif
+ if (!no_mark_sparks) {
+ traverseSparkQueue (evac, user, cap);
}
+#endif
-#if !defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&blocked_queue_hd);
- evac(user, (StgClosure **)(void *)&blocked_queue_tl);
- evac(user, (StgClosure **)(void *)&sleeping_queue);
-#endif
+ // Free STM structures for this Capability
+ stmPreGCHook(cap);
}
void
markCapabilities (evac_fn evac, void *user)
{
- markSomeCapabilities(evac, user, 0, 1, rtsFalse);
+ nat n;
+ for (n = 0; n < n_capabilities; n++) {
+ markCapability(evac, user, &capabilities[n], rtsFalse);
+ }
}