1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2003-2006
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
17 * --------------------------------------------------------------------------*/
19 #include "PosixSource.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
34 Capability *capabilities = NULL;
36 // Holds the Capability which last became free. This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
42 #if defined(THREADED_RTS)
46 return blackholes_need_checking
52 #if defined(THREADED_RTS)
54 anyWorkForMe( Capability *cap, Task *task )
56 // If the run queue is not empty, then we only wake up the guy who
57 // can run the thread at the head, even if there is some other
58 // reason for this task to run (eg. interrupted=rtsTrue).
59 if (!emptyRunQueue(cap)) {
60 if (cap->run_queue_hd->bound == NULL) {
61 return (task->tso == NULL);
63 return (cap->run_queue_hd->bound == task);
65 } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
68 return globalWorkToDo();
72 /* -----------------------------------------------------------------------------
73 * Manage the returning_tasks lists.
75 * These functions require cap->lock
76 * -------------------------------------------------------------------------- */
78 #if defined(THREADED_RTS)
80 newReturningTask (Capability *cap, Task *task)
82 ASSERT_LOCK_HELD(&cap->lock);
83 ASSERT(task->return_link == NULL);
84 if (cap->returning_tasks_hd) {
85 ASSERT(cap->returning_tasks_tl->return_link == NULL);
86 cap->returning_tasks_tl->return_link = task;
88 cap->returning_tasks_hd = task;
90 cap->returning_tasks_tl = task;
94 popReturningTask (Capability *cap)
96 ASSERT_LOCK_HELD(&cap->lock);
98 task = cap->returning_tasks_hd;
100 cap->returning_tasks_hd = task->return_link;
101 if (!cap->returning_tasks_hd) {
102 cap->returning_tasks_tl = NULL;
104 task->return_link = NULL;
109 /* ----------------------------------------------------------------------------
112 * The Capability is initially marked not free.
113 * ------------------------------------------------------------------------- */
116 initCapability( Capability *cap, nat i )
121 cap->in_haskell = rtsFalse;
123 cap->run_queue_hd = END_TSO_QUEUE;
124 cap->run_queue_tl = END_TSO_QUEUE;
126 #if defined(THREADED_RTS)
127 initMutex(&cap->lock);
128 cap->running_task = NULL; // indicates cap is free
129 cap->spare_workers = NULL;
130 cap->suspended_ccalling_tasks = NULL;
131 cap->returning_tasks_hd = NULL;
132 cap->returning_tasks_tl = NULL;
135 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
136 cap->f.stgGCFun = (F_)__stg_gc_fun;
138 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
139 RtsFlags.GcFlags.generations,
142 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
143 cap->mut_lists[g] = NULL;
146 cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
147 cap->free_trec_chunks = END_STM_CHUNK_LIST;
148 cap->free_trec_headers = NO_TREC;
149 cap->transaction_tokens = 0;
152 /* ---------------------------------------------------------------------------
153 * Function: initCapabilities()
155 * Purpose: set up the Capability handling. For the THREADED_RTS build,
156 * we keep a table of them, the size of which is
157 * controlled by the user via the RTS flag -N.
159 * ------------------------------------------------------------------------- */
161 initCapabilities( void )
163 #if defined(THREADED_RTS)
167 // We can't support multiple CPUs if BaseReg is not a register
168 if (RtsFlags.ParFlags.nNodes > 1) {
169 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
170 RtsFlags.ParFlags.nNodes = 1;
174 n_capabilities = RtsFlags.ParFlags.nNodes;
176 if (n_capabilities == 1) {
177 capabilities = &MainCapability;
178 // THREADED_RTS must work on builds that don't have a mutable
179 // BaseReg (eg. unregisterised), so in this case
180 // capabilities[0] must coincide with &MainCapability.
182 capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
186 for (i = 0; i < n_capabilities; i++) {
187 initCapability(&capabilities[i], i);
190 IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
193 #else /* !THREADED_RTS */
196 capabilities = &MainCapability;
197 initCapability(&MainCapability, 0);
201 // There are no free capabilities to begin with. We will start
202 // a worker Task to each Capability, which will quickly put the
203 // Capability on the free list when it finds nothing to do.
204 last_free_capability = &capabilities[0];
207 /* ----------------------------------------------------------------------------
208 * Give a Capability to a Task. The task must currently be sleeping
209 * on its condition variable.
211 * Requires cap->lock (modifies cap->running_task).
213 * When migrating a Task, the migrater must take task->lock before
214 * modifying task->cap, to synchronise with the waking up Task.
215 * Additionally, the migrater should own the Capability (when
216 * migrating the run queue), or cap->lock (when migrating
217 * returning_workers).
219 * ------------------------------------------------------------------------- */
221 #if defined(THREADED_RTS)
223 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
225 ASSERT_LOCK_HELD(&cap->lock);
226 ASSERT(task->cap == cap);
228 sched_belch("passing capability %d to %s %p",
229 cap->no, task->tso ? "bound task" : "worker",
231 ACQUIRE_LOCK(&task->lock);
232 task->wakeup = rtsTrue;
233 // the wakeup flag is needed because signalCondition() doesn't
234 // flag the condition if the thread is already runniing, but we want
236 signalCondition(&task->cond);
237 RELEASE_LOCK(&task->lock);
241 /* ----------------------------------------------------------------------------
242 * Function: releaseCapability(Capability*)
244 * Purpose: Letting go of a capability. Causes a
245 * 'returning worker' thread or a 'waiting worker'
246 * to wake up, in that order.
247 * ------------------------------------------------------------------------- */
249 #if defined(THREADED_RTS)
251 releaseCapability_ (Capability* cap)
255 task = cap->running_task;
257 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
259 cap->running_task = NULL;
261 // Check to see whether a worker thread can be given
262 // the go-ahead to return the result of an external call..
263 if (cap->returning_tasks_hd != NULL) {
264 giveCapabilityToTask(cap,cap->returning_tasks_hd);
265 // The Task pops itself from the queue (see waitForReturnCapability())
269 // If the next thread on the run queue is a bound thread,
270 // give this Capability to the appropriate Task.
271 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
272 // Make sure we're not about to try to wake ourselves up
273 ASSERT(task != cap->run_queue_hd->bound);
274 task = cap->run_queue_hd->bound;
275 giveCapabilityToTask(cap,task);
279 if (!cap->spare_workers) {
280 // Create a worker thread if we don't have one. If the system
281 // is interrupted, we only create a worker task if there
282 // are threads that need to be completed. If the system is
283 // shutting down, we never create a new worker.
284 if (!shutting_down_scheduler) {
286 sched_belch("starting new worker on capability %d", cap->no));
287 startWorkerTask(cap, workerStart);
292 // If we have an unbound thread on the run queue, or if there's
293 // anything else to do, give the Capability to a worker thread.
294 if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
295 if (cap->spare_workers) {
296 giveCapabilityToTask(cap,cap->spare_workers);
297 // The worker Task pops itself from the queue;
302 last_free_capability = cap;
303 IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
307 releaseCapability (Capability* cap USED_IF_THREADS)
309 ACQUIRE_LOCK(&cap->lock);
310 releaseCapability_(cap);
311 RELEASE_LOCK(&cap->lock);
315 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
319 ACQUIRE_LOCK(&cap->lock);
321 task = cap->running_task;
323 // If the current task is a worker, save it on the spare_workers
324 // list of this Capability. A worker can mark itself as stopped,
325 // in which case it is not replaced on the spare_worker queue.
326 // This happens when the system is shutting down (see
327 // Schedule.c:workerStart()).
328 // Also, be careful to check that this task hasn't just exited
329 // Haskell to do a foreign call (task->suspended_tso).
330 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
331 task->next = cap->spare_workers;
332 cap->spare_workers = task;
334 // Bound tasks just float around attached to their TSOs.
336 releaseCapability_(cap);
338 RELEASE_LOCK(&cap->lock);
342 /* ----------------------------------------------------------------------------
343 * waitForReturnCapability( Task *task )
345 * Purpose: when an OS thread returns from an external call,
346 * it calls waitForReturnCapability() (via Schedule.resumeThread())
347 * to wait for permission to enter the RTS & communicate the
348 * result of the external call back to the Haskell thread that
351 * ------------------------------------------------------------------------- */
353 waitForReturnCapability (Capability **pCap, Task *task)
355 #if !defined(THREADED_RTS)
357 MainCapability.running_task = task;
358 task->cap = &MainCapability;
359 *pCap = &MainCapability;
362 Capability *cap = *pCap;
365 // Try last_free_capability first
366 cap = last_free_capability;
367 if (!cap->running_task) {
369 // otherwise, search for a free capability
370 for (i = 0; i < n_capabilities; i++) {
371 cap = &capabilities[i];
372 if (!cap->running_task) {
376 // Can't find a free one, use last_free_capability.
377 cap = last_free_capability;
380 // record the Capability as the one this Task is now assocated with.
384 ASSERT(task->cap == cap);
387 ACQUIRE_LOCK(&cap->lock);
390 sched_belch("returning; I want capability %d", cap->no));
392 if (!cap->running_task) {
393 // It's free; just grab it
394 cap->running_task = task;
395 RELEASE_LOCK(&cap->lock);
397 newReturningTask(cap,task);
398 RELEASE_LOCK(&cap->lock);
401 ACQUIRE_LOCK(&task->lock);
402 // task->lock held, cap->lock not held
403 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
405 task->wakeup = rtsFalse;
406 RELEASE_LOCK(&task->lock);
408 // now check whether we should wake up...
409 ACQUIRE_LOCK(&cap->lock);
410 if (cap->running_task == NULL) {
411 if (cap->returning_tasks_hd != task) {
412 giveCapabilityToTask(cap,cap->returning_tasks_hd);
413 RELEASE_LOCK(&cap->lock);
416 cap->running_task = task;
417 popReturningTask(cap);
418 RELEASE_LOCK(&cap->lock);
421 RELEASE_LOCK(&cap->lock);
426 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
429 sched_belch("returning; got capability %d", cap->no));
435 #if defined(THREADED_RTS)
436 /* ----------------------------------------------------------------------------
438 * ------------------------------------------------------------------------- */
441 yieldCapability (Capability** pCap, Task *task)
443 Capability *cap = *pCap;
445 // The fast path has no locking, if we don't enter this while loop
447 while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
448 IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
450 // We must now release the capability and wait to be woken up
452 task->wakeup = rtsFalse;
453 releaseCapabilityAndQueueWorker(cap);
456 ACQUIRE_LOCK(&task->lock);
457 // task->lock held, cap->lock not held
458 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
460 task->wakeup = rtsFalse;
461 RELEASE_LOCK(&task->lock);
463 IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
464 ACQUIRE_LOCK(&cap->lock);
465 if (cap->running_task != NULL) {
466 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
467 RELEASE_LOCK(&cap->lock);
471 if (task->tso == NULL) {
472 ASSERT(cap->spare_workers != NULL);
473 // if we're not at the front of the queue, release it
474 // again. This is unlikely to happen.
475 if (cap->spare_workers != task) {
476 giveCapabilityToTask(cap,cap->spare_workers);
477 RELEASE_LOCK(&cap->lock);
480 cap->spare_workers = task->next;
483 cap->running_task = task;
484 RELEASE_LOCK(&cap->lock);
488 IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
489 ASSERT(cap->running_task == task);
494 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
499 /* ----------------------------------------------------------------------------
502 * Used to indicate that the interrupted flag is now set, or some
503 * other global condition that might require waking up a Task on each
505 * ------------------------------------------------------------------------- */
508 prodCapabilities(rtsBool all)
514 for (i=0; i < n_capabilities; i++) {
515 cap = &capabilities[i];
516 ACQUIRE_LOCK(&cap->lock);
517 if (!cap->running_task) {
518 if (cap->spare_workers) {
519 task = cap->spare_workers;
520 ASSERT(!task->stopped);
521 giveCapabilityToTask(cap,task);
523 RELEASE_LOCK(&cap->lock);
528 RELEASE_LOCK(&cap->lock);
534 prodAllCapabilities (void)
536 prodCapabilities(rtsTrue);
539 /* ----------------------------------------------------------------------------
542 * Like prodAllCapabilities, but we only require a single Task to wake
543 * up in order to service some global event, such as checking for
544 * deadlock after some idle time has passed.
545 * ------------------------------------------------------------------------- */
548 prodOneCapability (void)
550 prodCapabilities(rtsFalse);
553 /* ----------------------------------------------------------------------------
556 * At shutdown time, we want to let everything exit as cleanly as
557 * possible. For each capability, we let its run queue drain, and
558 * allow the workers to stop.
560 * This function should be called when interrupted and
561 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
562 * will exit the scheduler and call taskStop(), and any bound thread
563 * that wakes up will return to its caller. Runnable threads are
566 * ------------------------------------------------------------------------- */
569 shutdownCapability (Capability *cap, Task *task)
573 ASSERT(interrupted && shutting_down_scheduler);
577 for (i = 0; i < 50; i++) {
578 IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
579 ACQUIRE_LOCK(&cap->lock);
580 if (cap->running_task) {
581 RELEASE_LOCK(&cap->lock);
582 IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
586 cap->running_task = task;
587 if (!emptyRunQueue(cap) || cap->spare_workers) {
588 IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
589 releaseCapability_(cap); // this will wake up a worker
590 RELEASE_LOCK(&cap->lock);
594 IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
595 RELEASE_LOCK(&cap->lock);
598 // we now have the Capability, its run queue and spare workers
599 // list are both empty.
602 /* ----------------------------------------------------------------------------
605 * Attempt to gain control of a Capability if it is free.
607 * ------------------------------------------------------------------------- */
610 tryGrabCapability (Capability *cap, Task *task)
612 if (cap->running_task != NULL) return rtsFalse;
613 ACQUIRE_LOCK(&cap->lock);
614 if (cap->running_task != NULL) {
615 RELEASE_LOCK(&cap->lock);
619 cap->running_task = task;
620 RELEASE_LOCK(&cap->lock);
625 #endif /* THREADED_RTS */