1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2003-2006
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
13 * Only in an THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
17 * --------------------------------------------------------------------------*/
19 #include "PosixSource.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
34 Capability *capabilities = NULL;
36 // Holds the Capability which last became free. This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
42 #if defined(THREADED_RTS)
46 return blackholes_need_checking
52 #if defined(THREADED_RTS)
54 anyWorkForMe( Capability *cap, Task *task )
56 if (task->tso != NULL) {
57 // A bound task only runs if its thread is on the run queue of
58 // the capability on which it was woken up. Otherwise, we
59 // can't be sure that we have the right capability: the thread
60 // might be woken up on some other capability, and task->cap
61 // could change under our feet.
62 return (!emptyRunQueue(cap) && cap->run_queue_hd->bound == task);
64 // A vanilla worker task runs if either (a) there is a
65 // lightweight thread at the head of the run queue, or (b)
66 // there are sparks to execute, or (c) there is some other
67 // global condition to check, such as threads blocked on
69 return ((!emptyRunQueue(cap) && cap->run_queue_hd->bound == NULL)
70 || !emptySparkPoolCap(cap)
76 /* -----------------------------------------------------------------------------
77 * Manage the returning_tasks lists.
79 * These functions require cap->lock
80 * -------------------------------------------------------------------------- */
82 #if defined(THREADED_RTS)
84 newReturningTask (Capability *cap, Task *task)
86 ASSERT_LOCK_HELD(&cap->lock);
87 ASSERT(task->return_link == NULL);
88 if (cap->returning_tasks_hd) {
89 ASSERT(cap->returning_tasks_tl->return_link == NULL);
90 cap->returning_tasks_tl->return_link = task;
92 cap->returning_tasks_hd = task;
94 cap->returning_tasks_tl = task;
98 popReturningTask (Capability *cap)
100 ASSERT_LOCK_HELD(&cap->lock);
102 task = cap->returning_tasks_hd;
104 cap->returning_tasks_hd = task->return_link;
105 if (!cap->returning_tasks_hd) {
106 cap->returning_tasks_tl = NULL;
108 task->return_link = NULL;
113 /* ----------------------------------------------------------------------------
116 * The Capability is initially marked not free.
117 * ------------------------------------------------------------------------- */
120 initCapability( Capability *cap, nat i )
125 cap->in_haskell = rtsFalse;
127 cap->run_queue_hd = END_TSO_QUEUE;
128 cap->run_queue_tl = END_TSO_QUEUE;
130 #if defined(THREADED_RTS)
131 initMutex(&cap->lock);
132 cap->running_task = NULL; // indicates cap is free
133 cap->spare_workers = NULL;
134 cap->suspended_ccalling_tasks = NULL;
135 cap->returning_tasks_hd = NULL;
136 cap->returning_tasks_tl = NULL;
139 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
140 cap->f.stgGCFun = (F_)__stg_gc_fun;
142 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
143 RtsFlags.GcFlags.generations,
146 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
147 cap->mut_lists[g] = NULL;
150 cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
151 cap->free_trec_chunks = END_STM_CHUNK_LIST;
152 cap->free_trec_headers = NO_TREC;
153 cap->transaction_tokens = 0;
156 /* ---------------------------------------------------------------------------
157 * Function: initCapabilities()
159 * Purpose: set up the Capability handling. For the THREADED_RTS build,
160 * we keep a table of them, the size of which is
161 * controlled by the user via the RTS flag -N.
163 * ------------------------------------------------------------------------- */
165 initCapabilities( void )
167 #if defined(THREADED_RTS)
171 // We can't support multiple CPUs if BaseReg is not a register
172 if (RtsFlags.ParFlags.nNodes > 1) {
173 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
174 RtsFlags.ParFlags.nNodes = 1;
178 n_capabilities = RtsFlags.ParFlags.nNodes;
180 if (n_capabilities == 1) {
181 capabilities = &MainCapability;
182 // THREADED_RTS must work on builds that don't have a mutable
183 // BaseReg (eg. unregisterised), so in this case
184 // capabilities[0] must coincide with &MainCapability.
186 capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
190 for (i = 0; i < n_capabilities; i++) {
191 initCapability(&capabilities[i], i);
194 IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
197 #else /* !THREADED_RTS */
200 capabilities = &MainCapability;
201 initCapability(&MainCapability, 0);
205 // There are no free capabilities to begin with. We will start
206 // a worker Task to each Capability, which will quickly put the
207 // Capability on the free list when it finds nothing to do.
208 last_free_capability = &capabilities[0];
211 /* ----------------------------------------------------------------------------
212 * Give a Capability to a Task. The task must currently be sleeping
213 * on its condition variable.
215 * Requires cap->lock (modifies cap->running_task).
217 * When migrating a Task, the migrater must take task->lock before
218 * modifying task->cap, to synchronise with the waking up Task.
219 * Additionally, the migrater should own the Capability (when
220 * migrating the run queue), or cap->lock (when migrating
221 * returning_workers).
223 * ------------------------------------------------------------------------- */
225 #if defined(THREADED_RTS)
227 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
229 ASSERT_LOCK_HELD(&cap->lock);
230 ASSERT(task->cap == cap);
232 sched_belch("passing capability %d to %s %p",
233 cap->no, task->tso ? "bound task" : "worker",
235 ACQUIRE_LOCK(&task->lock);
236 task->wakeup = rtsTrue;
237 // the wakeup flag is needed because signalCondition() doesn't
238 // flag the condition if the thread is already runniing, but we want
240 signalCondition(&task->cond);
241 RELEASE_LOCK(&task->lock);
245 /* ----------------------------------------------------------------------------
246 * Function: releaseCapability(Capability*)
248 * Purpose: Letting go of a capability. Causes a
249 * 'returning worker' thread or a 'waiting worker'
250 * to wake up, in that order.
251 * ------------------------------------------------------------------------- */
253 #if defined(THREADED_RTS)
255 releaseCapability_ (Capability* cap)
259 task = cap->running_task;
261 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
263 cap->running_task = NULL;
265 // Check to see whether a worker thread can be given
266 // the go-ahead to return the result of an external call..
267 if (cap->returning_tasks_hd != NULL) {
268 giveCapabilityToTask(cap,cap->returning_tasks_hd);
269 // The Task pops itself from the queue (see waitForReturnCapability())
273 // If the next thread on the run queue is a bound thread,
274 // give this Capability to the appropriate Task.
275 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
276 // Make sure we're not about to try to wake ourselves up
277 ASSERT(task != cap->run_queue_hd->bound);
278 task = cap->run_queue_hd->bound;
279 giveCapabilityToTask(cap,task);
283 if (!cap->spare_workers) {
284 // Create a worker thread if we don't have one. If the system
285 // is interrupted, we only create a worker task if there
286 // are threads that need to be completed. If the system is
287 // shutting down, we never create a new worker.
288 if (!shutting_down_scheduler) {
290 sched_belch("starting new worker on capability %d", cap->no));
291 startWorkerTask(cap, workerStart);
296 // If we have an unbound thread on the run queue, or if there's
297 // anything else to do, give the Capability to a worker thread.
298 if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
299 if (cap->spare_workers) {
300 giveCapabilityToTask(cap,cap->spare_workers);
301 // The worker Task pops itself from the queue;
306 last_free_capability = cap;
307 IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
311 releaseCapability (Capability* cap USED_IF_THREADS)
313 ACQUIRE_LOCK(&cap->lock);
314 releaseCapability_(cap);
315 RELEASE_LOCK(&cap->lock);
319 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
323 ACQUIRE_LOCK(&cap->lock);
325 task = cap->running_task;
327 // If the current task is a worker, save it on the spare_workers
328 // list of this Capability. A worker can mark itself as stopped,
329 // in which case it is not replaced on the spare_worker queue.
330 // This happens when the system is shutting down (see
331 // Schedule.c:workerStart()).
332 // Also, be careful to check that this task hasn't just exited
333 // Haskell to do a foreign call (task->suspended_tso).
334 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
335 task->next = cap->spare_workers;
336 cap->spare_workers = task;
338 // Bound tasks just float around attached to their TSOs.
340 releaseCapability_(cap);
342 RELEASE_LOCK(&cap->lock);
346 /* ----------------------------------------------------------------------------
347 * waitForReturnCapability( Task *task )
349 * Purpose: when an OS thread returns from an external call,
350 * it calls waitForReturnCapability() (via Schedule.resumeThread())
351 * to wait for permission to enter the RTS & communicate the
352 * result of the external call back to the Haskell thread that
355 * ------------------------------------------------------------------------- */
357 waitForReturnCapability (Capability **pCap, Task *task)
359 #if !defined(THREADED_RTS)
361 MainCapability.running_task = task;
362 task->cap = &MainCapability;
363 *pCap = &MainCapability;
366 Capability *cap = *pCap;
369 // Try last_free_capability first
370 cap = last_free_capability;
371 if (!cap->running_task) {
373 // otherwise, search for a free capability
374 for (i = 0; i < n_capabilities; i++) {
375 cap = &capabilities[i];
376 if (!cap->running_task) {
380 // Can't find a free one, use last_free_capability.
381 cap = last_free_capability;
384 // record the Capability as the one this Task is now assocated with.
388 ASSERT(task->cap == cap);
391 ACQUIRE_LOCK(&cap->lock);
394 sched_belch("returning; I want capability %d", cap->no));
396 if (!cap->running_task) {
397 // It's free; just grab it
398 cap->running_task = task;
399 RELEASE_LOCK(&cap->lock);
401 newReturningTask(cap,task);
402 RELEASE_LOCK(&cap->lock);
405 ACQUIRE_LOCK(&task->lock);
406 // task->lock held, cap->lock not held
407 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
409 task->wakeup = rtsFalse;
410 RELEASE_LOCK(&task->lock);
412 // now check whether we should wake up...
413 ACQUIRE_LOCK(&cap->lock);
414 if (cap->running_task == NULL) {
415 if (cap->returning_tasks_hd != task) {
416 giveCapabilityToTask(cap,cap->returning_tasks_hd);
417 RELEASE_LOCK(&cap->lock);
420 cap->running_task = task;
421 popReturningTask(cap);
422 RELEASE_LOCK(&cap->lock);
425 RELEASE_LOCK(&cap->lock);
430 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
433 sched_belch("returning; got capability %d", cap->no));
439 #if defined(THREADED_RTS)
440 /* ----------------------------------------------------------------------------
442 * ------------------------------------------------------------------------- */
445 yieldCapability (Capability** pCap, Task *task)
447 Capability *cap = *pCap;
449 // The fast path has no locking, if we don't enter this while loop
451 while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
452 IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
454 // We must now release the capability and wait to be woken up
456 task->wakeup = rtsFalse;
457 releaseCapabilityAndQueueWorker(cap);
460 ACQUIRE_LOCK(&task->lock);
461 // task->lock held, cap->lock not held
462 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
464 task->wakeup = rtsFalse;
465 RELEASE_LOCK(&task->lock);
467 IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
468 ACQUIRE_LOCK(&cap->lock);
469 if (cap->running_task != NULL) {
470 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
471 RELEASE_LOCK(&cap->lock);
475 if (task->tso == NULL) {
476 ASSERT(cap->spare_workers != NULL);
477 // if we're not at the front of the queue, release it
478 // again. This is unlikely to happen.
479 if (cap->spare_workers != task) {
480 giveCapabilityToTask(cap,cap->spare_workers);
481 RELEASE_LOCK(&cap->lock);
484 cap->spare_workers = task->next;
487 cap->running_task = task;
488 RELEASE_LOCK(&cap->lock);
492 IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
493 ASSERT(cap->running_task == task);
498 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
503 /* ----------------------------------------------------------------------------
506 * Used to indicate that the interrupted flag is now set, or some
507 * other global condition that might require waking up a Task on each
509 * ------------------------------------------------------------------------- */
512 prodCapabilities(rtsBool all)
518 for (i=0; i < n_capabilities; i++) {
519 cap = &capabilities[i];
520 ACQUIRE_LOCK(&cap->lock);
521 if (!cap->running_task) {
522 if (cap->spare_workers) {
523 task = cap->spare_workers;
524 ASSERT(!task->stopped);
525 giveCapabilityToTask(cap,task);
527 RELEASE_LOCK(&cap->lock);
532 RELEASE_LOCK(&cap->lock);
538 prodAllCapabilities (void)
540 prodCapabilities(rtsTrue);
543 /* ----------------------------------------------------------------------------
546 * Like prodAllCapabilities, but we only require a single Task to wake
547 * up in order to service some global event, such as checking for
548 * deadlock after some idle time has passed.
549 * ------------------------------------------------------------------------- */
552 prodOneCapability (void)
554 prodCapabilities(rtsFalse);
557 /* ----------------------------------------------------------------------------
560 * At shutdown time, we want to let everything exit as cleanly as
561 * possible. For each capability, we let its run queue drain, and
562 * allow the workers to stop.
564 * This function should be called when interrupted and
565 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
566 * will exit the scheduler and call taskStop(), and any bound thread
567 * that wakes up will return to its caller. Runnable threads are
570 * ------------------------------------------------------------------------- */
573 shutdownCapability (Capability *cap, Task *task)
577 ASSERT(interrupted && shutting_down_scheduler);
581 for (i = 0; i < 50; i++) {
582 IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
583 ACQUIRE_LOCK(&cap->lock);
584 if (cap->running_task) {
585 RELEASE_LOCK(&cap->lock);
586 IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
590 cap->running_task = task;
591 if (!emptyRunQueue(cap) || cap->spare_workers) {
592 IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
593 releaseCapability_(cap); // this will wake up a worker
594 RELEASE_LOCK(&cap->lock);
598 IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
599 RELEASE_LOCK(&cap->lock);
602 // we now have the Capability, its run queue and spare workers
603 // list are both empty.
606 /* ----------------------------------------------------------------------------
609 * Attempt to gain control of a Capability if it is free.
611 * ------------------------------------------------------------------------- */
614 tryGrabCapability (Capability *cap, Task *task)
616 if (cap->running_task != NULL) return rtsFalse;
617 ACQUIRE_LOCK(&cap->lock);
618 if (cap->running_task != NULL) {
619 RELEASE_LOCK(&cap->lock);
623 cap->running_task = task;
624 RELEASE_LOCK(&cap->lock);
629 #endif /* THREADED_RTS */