1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2003-2006
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
17 * --------------------------------------------------------------------------*/
19 #include "PosixSource.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
34 Capability *capabilities = NULL;
36 // Holds the Capability which last became free. This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
42 #if defined(THREADED_RTS)
46 return blackholes_need_checking
47 || sched_state >= SCHED_INTERRUPTING
52 #if defined(THREADED_RTS)
54 anyWorkForMe( Capability *cap, Task *task )
56 if (task->tso != NULL) {
57 // A bound task only runs if its thread is on the run queue of
58 // the capability on which it was woken up. Otherwise, we
59 // can't be sure that we have the right capability: the thread
60 // might be woken up on some other capability, and task->cap
61 // could change under our feet.
62 return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
64 // A vanilla worker task runs if either there is a lightweight
65 // thread at the head of the run queue, or the run queue is
66 // empty and (there are sparks to execute, or there is some
67 // other global condition to check, such as threads blocked on
69 if (emptyRunQueue(cap)) {
70 return !emptySparkPoolCap(cap) || globalWorkToDo();
72 return cap->run_queue_hd->bound == NULL;
77 /* -----------------------------------------------------------------------------
78 * Manage the returning_tasks lists.
80 * These functions require cap->lock
81 * -------------------------------------------------------------------------- */
83 #if defined(THREADED_RTS)
85 newReturningTask (Capability *cap, Task *task)
87 ASSERT_LOCK_HELD(&cap->lock);
88 ASSERT(task->return_link == NULL);
89 if (cap->returning_tasks_hd) {
90 ASSERT(cap->returning_tasks_tl->return_link == NULL);
91 cap->returning_tasks_tl->return_link = task;
93 cap->returning_tasks_hd = task;
95 cap->returning_tasks_tl = task;
99 popReturningTask (Capability *cap)
101 ASSERT_LOCK_HELD(&cap->lock);
103 task = cap->returning_tasks_hd;
105 cap->returning_tasks_hd = task->return_link;
106 if (!cap->returning_tasks_hd) {
107 cap->returning_tasks_tl = NULL;
109 task->return_link = NULL;
114 /* ----------------------------------------------------------------------------
117 * The Capability is initially marked not free.
118 * ------------------------------------------------------------------------- */
121 initCapability( Capability *cap, nat i )
126 cap->in_haskell = rtsFalse;
128 cap->run_queue_hd = END_TSO_QUEUE;
129 cap->run_queue_tl = END_TSO_QUEUE;
131 #if defined(THREADED_RTS)
132 initMutex(&cap->lock);
133 cap->running_task = NULL; // indicates cap is free
134 cap->spare_workers = NULL;
135 cap->suspended_ccalling_tasks = NULL;
136 cap->returning_tasks_hd = NULL;
137 cap->returning_tasks_tl = NULL;
140 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
141 cap->f.stgGCFun = (F_)__stg_gc_fun;
143 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
144 RtsFlags.GcFlags.generations,
147 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
148 cap->mut_lists[g] = NULL;
151 cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
152 cap->free_trec_chunks = END_STM_CHUNK_LIST;
153 cap->free_trec_headers = NO_TREC;
154 cap->transaction_tokens = 0;
157 /* ---------------------------------------------------------------------------
158 * Function: initCapabilities()
160 * Purpose: set up the Capability handling. For the THREADED_RTS build,
161 * we keep a table of them, the size of which is
162 * controlled by the user via the RTS flag -N.
164 * ------------------------------------------------------------------------- */
166 initCapabilities( void )
168 #if defined(THREADED_RTS)
172 // We can't support multiple CPUs if BaseReg is not a register
173 if (RtsFlags.ParFlags.nNodes > 1) {
174 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
175 RtsFlags.ParFlags.nNodes = 1;
179 n_capabilities = RtsFlags.ParFlags.nNodes;
181 if (n_capabilities == 1) {
182 capabilities = &MainCapability;
183 // THREADED_RTS must work on builds that don't have a mutable
184 // BaseReg (eg. unregisterised), so in this case
185 // capabilities[0] must coincide with &MainCapability.
187 capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
191 for (i = 0; i < n_capabilities; i++) {
192 initCapability(&capabilities[i], i);
195 IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
198 #else /* !THREADED_RTS */
201 capabilities = &MainCapability;
202 initCapability(&MainCapability, 0);
206 // There are no free capabilities to begin with. We will start
207 // a worker Task to each Capability, which will quickly put the
208 // Capability on the free list when it finds nothing to do.
209 last_free_capability = &capabilities[0];
212 /* ----------------------------------------------------------------------------
213 * Give a Capability to a Task. The task must currently be sleeping
214 * on its condition variable.
216 * Requires cap->lock (modifies cap->running_task).
218 * When migrating a Task, the migrater must take task->lock before
219 * modifying task->cap, to synchronise with the waking up Task.
220 * Additionally, the migrater should own the Capability (when
221 * migrating the run queue), or cap->lock (when migrating
222 * returning_workers).
224 * ------------------------------------------------------------------------- */
226 #if defined(THREADED_RTS)
228 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
230 ASSERT_LOCK_HELD(&cap->lock);
231 ASSERT(task->cap == cap);
233 sched_belch("passing capability %d to %s %p",
234 cap->no, task->tso ? "bound task" : "worker",
236 ACQUIRE_LOCK(&task->lock);
237 task->wakeup = rtsTrue;
238 // the wakeup flag is needed because signalCondition() doesn't
239 // flag the condition if the thread is already runniing, but we want
241 signalCondition(&task->cond);
242 RELEASE_LOCK(&task->lock);
246 /* ----------------------------------------------------------------------------
247 * Function: releaseCapability(Capability*)
249 * Purpose: Letting go of a capability. Causes a
250 * 'returning worker' thread or a 'waiting worker'
251 * to wake up, in that order.
252 * ------------------------------------------------------------------------- */
254 #if defined(THREADED_RTS)
256 releaseCapability_ (Capability* cap)
260 task = cap->running_task;
262 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
264 cap->running_task = NULL;
266 // Check to see whether a worker thread can be given
267 // the go-ahead to return the result of an external call..
268 if (cap->returning_tasks_hd != NULL) {
269 giveCapabilityToTask(cap,cap->returning_tasks_hd);
270 // The Task pops itself from the queue (see waitForReturnCapability())
274 // If the next thread on the run queue is a bound thread,
275 // give this Capability to the appropriate Task.
276 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
277 // Make sure we're not about to try to wake ourselves up
278 ASSERT(task != cap->run_queue_hd->bound);
279 task = cap->run_queue_hd->bound;
280 giveCapabilityToTask(cap,task);
284 if (!cap->spare_workers) {
285 // Create a worker thread if we don't have one. If the system
286 // is interrupted, we only create a worker task if there
287 // are threads that need to be completed. If the system is
288 // shutting down, we never create a new worker.
289 if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
291 sched_belch("starting new worker on capability %d", cap->no));
292 startWorkerTask(cap, workerStart);
297 // If we have an unbound thread on the run queue, or if there's
298 // anything else to do, give the Capability to a worker thread.
299 if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
300 if (cap->spare_workers) {
301 giveCapabilityToTask(cap,cap->spare_workers);
302 // The worker Task pops itself from the queue;
307 last_free_capability = cap;
308 IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
312 releaseCapability (Capability* cap USED_IF_THREADS)
314 ACQUIRE_LOCK(&cap->lock);
315 releaseCapability_(cap);
316 RELEASE_LOCK(&cap->lock);
320 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
324 ACQUIRE_LOCK(&cap->lock);
326 task = cap->running_task;
328 // If the current task is a worker, save it on the spare_workers
329 // list of this Capability. A worker can mark itself as stopped,
330 // in which case it is not replaced on the spare_worker queue.
331 // This happens when the system is shutting down (see
332 // Schedule.c:workerStart()).
333 // Also, be careful to check that this task hasn't just exited
334 // Haskell to do a foreign call (task->suspended_tso).
335 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
336 task->next = cap->spare_workers;
337 cap->spare_workers = task;
339 // Bound tasks just float around attached to their TSOs.
341 releaseCapability_(cap);
343 RELEASE_LOCK(&cap->lock);
347 /* ----------------------------------------------------------------------------
348 * waitForReturnCapability( Task *task )
350 * Purpose: when an OS thread returns from an external call,
351 * it calls waitForReturnCapability() (via Schedule.resumeThread())
352 * to wait for permission to enter the RTS & communicate the
353 * result of the external call back to the Haskell thread that
356 * ------------------------------------------------------------------------- */
358 waitForReturnCapability (Capability **pCap, Task *task)
360 #if !defined(THREADED_RTS)
362 MainCapability.running_task = task;
363 task->cap = &MainCapability;
364 *pCap = &MainCapability;
367 Capability *cap = *pCap;
370 // Try last_free_capability first
371 cap = last_free_capability;
372 if (!cap->running_task) {
374 // otherwise, search for a free capability
375 for (i = 0; i < n_capabilities; i++) {
376 cap = &capabilities[i];
377 if (!cap->running_task) {
381 // Can't find a free one, use last_free_capability.
382 cap = last_free_capability;
385 // record the Capability as the one this Task is now assocated with.
389 ASSERT(task->cap == cap);
392 ACQUIRE_LOCK(&cap->lock);
395 sched_belch("returning; I want capability %d", cap->no));
397 if (!cap->running_task) {
398 // It's free; just grab it
399 cap->running_task = task;
400 RELEASE_LOCK(&cap->lock);
402 newReturningTask(cap,task);
403 RELEASE_LOCK(&cap->lock);
406 ACQUIRE_LOCK(&task->lock);
407 // task->lock held, cap->lock not held
408 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
410 task->wakeup = rtsFalse;
411 RELEASE_LOCK(&task->lock);
413 // now check whether we should wake up...
414 ACQUIRE_LOCK(&cap->lock);
415 if (cap->running_task == NULL) {
416 if (cap->returning_tasks_hd != task) {
417 giveCapabilityToTask(cap,cap->returning_tasks_hd);
418 RELEASE_LOCK(&cap->lock);
421 cap->running_task = task;
422 popReturningTask(cap);
423 RELEASE_LOCK(&cap->lock);
426 RELEASE_LOCK(&cap->lock);
431 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
434 sched_belch("returning; got capability %d", cap->no));
440 #if defined(THREADED_RTS)
441 /* ----------------------------------------------------------------------------
443 * ------------------------------------------------------------------------- */
446 yieldCapability (Capability** pCap, Task *task)
448 Capability *cap = *pCap;
450 // The fast path has no locking, if we don't enter this while loop
452 while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
453 IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
455 // We must now release the capability and wait to be woken up
457 task->wakeup = rtsFalse;
458 releaseCapabilityAndQueueWorker(cap);
461 ACQUIRE_LOCK(&task->lock);
462 // task->lock held, cap->lock not held
463 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
465 task->wakeup = rtsFalse;
466 RELEASE_LOCK(&task->lock);
468 IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
469 ACQUIRE_LOCK(&cap->lock);
470 if (cap->running_task != NULL) {
471 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
472 RELEASE_LOCK(&cap->lock);
476 if (task->tso == NULL) {
477 ASSERT(cap->spare_workers != NULL);
478 // if we're not at the front of the queue, release it
479 // again. This is unlikely to happen.
480 if (cap->spare_workers != task) {
481 giveCapabilityToTask(cap,cap->spare_workers);
482 RELEASE_LOCK(&cap->lock);
485 cap->spare_workers = task->next;
488 cap->running_task = task;
489 RELEASE_LOCK(&cap->lock);
493 IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
494 ASSERT(cap->running_task == task);
499 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
504 /* ----------------------------------------------------------------------------
507 * Used to indicate that the interrupted flag is now set, or some
508 * other global condition that might require waking up a Task on each
510 * ------------------------------------------------------------------------- */
513 prodCapabilities(rtsBool all)
519 for (i=0; i < n_capabilities; i++) {
520 cap = &capabilities[i];
521 ACQUIRE_LOCK(&cap->lock);
522 if (!cap->running_task) {
523 if (cap->spare_workers) {
524 task = cap->spare_workers;
525 ASSERT(!task->stopped);
526 giveCapabilityToTask(cap,task);
528 RELEASE_LOCK(&cap->lock);
533 RELEASE_LOCK(&cap->lock);
539 prodAllCapabilities (void)
541 prodCapabilities(rtsTrue);
544 /* ----------------------------------------------------------------------------
547 * Like prodAllCapabilities, but we only require a single Task to wake
548 * up in order to service some global event, such as checking for
549 * deadlock after some idle time has passed.
550 * ------------------------------------------------------------------------- */
553 prodOneCapability (void)
555 prodCapabilities(rtsFalse);
558 /* ----------------------------------------------------------------------------
561 * At shutdown time, we want to let everything exit as cleanly as
562 * possible. For each capability, we let its run queue drain, and
563 * allow the workers to stop.
565 * This function should be called when interrupted and
566 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
567 * will exit the scheduler and call taskStop(), and any bound thread
568 * that wakes up will return to its caller. Runnable threads are
571 * ------------------------------------------------------------------------- */
574 shutdownCapability (Capability *cap, Task *task)
578 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
582 for (i = 0; i < 50; i++) {
583 IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
584 ACQUIRE_LOCK(&cap->lock);
585 if (cap->running_task) {
586 RELEASE_LOCK(&cap->lock);
587 IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
591 cap->running_task = task;
592 if (!emptyRunQueue(cap) || cap->spare_workers) {
593 IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
594 releaseCapability_(cap); // this will wake up a worker
595 RELEASE_LOCK(&cap->lock);
599 IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
600 RELEASE_LOCK(&cap->lock);
603 // we now have the Capability, its run queue and spare workers
604 // list are both empty.
607 /* ----------------------------------------------------------------------------
610 * Attempt to gain control of a Capability if it is free.
612 * ------------------------------------------------------------------------- */
615 tryGrabCapability (Capability *cap, Task *task)
617 if (cap->running_task != NULL) return rtsFalse;
618 ACQUIRE_LOCK(&cap->lock);
619 if (cap->running_task != NULL) {
620 RELEASE_LOCK(&cap->lock);
624 cap->running_task = task;
625 RELEASE_LOCK(&cap->lock);
630 #endif /* THREADED_RTS */