1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2003-2005
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
13 * Only in an SMP build will there be multiple capabilities, for
14 * the threaded RTS and other non-threaded builds, there is only
15 * one global capability, namely MainCapability.
17 * --------------------------------------------------------------------------*/
19 #include "PosixSource.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
30 Capability MainCapability; // for non-SMP, we have one global capability
34 Capability *capabilities = NULL;
36 // Holds the Capability which last became free. This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
43 #define UNUSED_IF_NOT_SMP
45 #define UNUSED_IF_NOT_SMP STG_UNUSED
48 #ifdef RTS_USER_SIGNALS
49 #define UNUSED_IF_NOT_THREADS
51 #define UNUSED_IF_NOT_THREADS STG_UNUSED
58 return blackholes_need_checking
60 #if defined(RTS_USER_SIGNALS)
66 #if defined(THREADED_RTS)
68 anyWorkForMe( Capability *cap, Task *task )
70 // If the run queue is not empty, then we only wake up the guy who
71 // can run the thread at the head, even if there is some other
72 // reason for this task to run (eg. interrupted=rtsTrue).
73 if (!emptyRunQueue(cap)) {
74 if (cap->run_queue_hd->bound == NULL) {
75 return (task->tso == NULL);
77 return (cap->run_queue_hd->bound == task);
79 } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
82 return globalWorkToDo();
86 /* -----------------------------------------------------------------------------
87 * Manage the returning_tasks lists.
89 * These functions require cap->lock
90 * -------------------------------------------------------------------------- */
92 #if defined(THREADED_RTS)
94 newReturningTask (Capability *cap, Task *task)
96 ASSERT_LOCK_HELD(&cap->lock);
97 ASSERT(task->return_link == NULL);
98 if (cap->returning_tasks_hd) {
99 ASSERT(cap->returning_tasks_tl->return_link == NULL);
100 cap->returning_tasks_tl->return_link = task;
102 cap->returning_tasks_hd = task;
104 cap->returning_tasks_tl = task;
108 popReturningTask (Capability *cap)
110 ASSERT_LOCK_HELD(&cap->lock);
112 task = cap->returning_tasks_hd;
114 cap->returning_tasks_hd = task->return_link;
115 if (!cap->returning_tasks_hd) {
116 cap->returning_tasks_tl = NULL;
118 task->return_link = NULL;
123 /* ----------------------------------------------------------------------------
126 * The Capability is initially marked not free.
127 * ------------------------------------------------------------------------- */
130 initCapability( Capability *cap, nat i )
135 cap->in_haskell = rtsFalse;
137 cap->run_queue_hd = END_TSO_QUEUE;
138 cap->run_queue_tl = END_TSO_QUEUE;
140 #if defined(THREADED_RTS)
141 initMutex(&cap->lock);
142 cap->running_task = NULL; // indicates cap is free
143 cap->spare_workers = NULL;
144 cap->suspended_ccalling_tasks = NULL;
145 cap->returning_tasks_hd = NULL;
146 cap->returning_tasks_tl = NULL;
149 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
150 cap->f.stgGCFun = (F_)__stg_gc_fun;
152 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
153 RtsFlags.GcFlags.generations,
156 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
157 cap->mut_lists[g] = NULL;
160 cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
161 cap->free_trec_chunks = END_STM_CHUNK_LIST;
162 cap->free_trec_headers = NO_TREC;
163 cap->transaction_tokens = 0;
166 /* ---------------------------------------------------------------------------
167 * Function: initCapabilities()
169 * Purpose: set up the Capability handling. For the SMP build,
170 * we keep a table of them, the size of which is
171 * controlled by the user via the RTS flag -N.
173 * ------------------------------------------------------------------------- */
175 initCapabilities( void )
180 n_capabilities = n = RtsFlags.ParFlags.nNodes;
181 capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
183 for (i = 0; i < n; i++) {
184 initCapability(&capabilities[i], i);
187 IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
190 capabilities = &MainCapability;
191 initCapability(&MainCapability, 0);
194 // There are no free capabilities to begin with. We will start
195 // a worker Task to each Capability, which will quickly put the
196 // Capability on the free list when it finds nothing to do.
197 last_free_capability = &capabilities[0];
200 /* ----------------------------------------------------------------------------
201 * Give a Capability to a Task. The task must currently be sleeping
202 * on its condition variable.
204 * Requires cap->lock (modifies cap->running_task).
206 * When migrating a Task, the migrater must take task->lock before
207 * modifying task->cap, to synchronise with the waking up Task.
208 * Additionally, the migrater should own the Capability (when
209 * migrating the run queue), or cap->lock (when migrating
210 * returning_workers).
212 * ------------------------------------------------------------------------- */
214 #if defined(THREADED_RTS)
216 giveCapabilityToTask (Capability *cap, Task *task)
218 ASSERT_LOCK_HELD(&cap->lock);
219 ASSERT(task->cap == cap);
221 sched_belch("passing capability %d to %s %p",
222 cap->no, task->tso ? "bound task" : "worker",
224 ACQUIRE_LOCK(&task->lock);
225 task->wakeup = rtsTrue;
226 // the wakeup flag is needed because signalCondition() doesn't
227 // flag the condition if the thread is already runniing, but we want
229 signalCondition(&task->cond);
230 RELEASE_LOCK(&task->lock);
234 /* ----------------------------------------------------------------------------
235 * Function: releaseCapability(Capability*)
237 * Purpose: Letting go of a capability. Causes a
238 * 'returning worker' thread or a 'waiting worker'
239 * to wake up, in that order.
240 * ------------------------------------------------------------------------- */
242 #if defined(THREADED_RTS)
244 releaseCapability_ (Capability* cap)
248 task = cap->running_task;
250 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
252 cap->running_task = NULL;
254 // Check to see whether a worker thread can be given
255 // the go-ahead to return the result of an external call..
256 if (cap->returning_tasks_hd != NULL) {
257 giveCapabilityToTask(cap,cap->returning_tasks_hd);
258 // The Task pops itself from the queue (see waitForReturnCapability())
262 // If the next thread on the run queue is a bound thread,
263 // give this Capability to the appropriate Task.
264 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
265 // Make sure we're not about to try to wake ourselves up
266 ASSERT(task != cap->run_queue_hd->bound);
267 task = cap->run_queue_hd->bound;
268 giveCapabilityToTask(cap,task);
272 // If we have an unbound thread on the run queue, or if there's
273 // anything else to do, give the Capability to a worker thread.
274 if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
275 if (cap->spare_workers) {
276 giveCapabilityToTask(cap,cap->spare_workers);
277 // The worker Task pops itself from the queue;
281 // Create a worker thread if we don't have one. If the system
282 // is interrupted, we only create a worker task if there
283 // are threads that need to be completed. If the system is
284 // shutting down, we never create a new worker.
285 if (!shutting_down_scheduler) {
287 sched_belch("starting new worker on capability %d", cap->no));
288 startWorkerTask(cap, workerStart);
293 last_free_capability = cap;
294 IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
298 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
300 ACQUIRE_LOCK(&cap->lock);
301 releaseCapability_(cap);
302 RELEASE_LOCK(&cap->lock);
306 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
310 ACQUIRE_LOCK(&cap->lock);
312 task = cap->running_task;
314 // If the current task is a worker, save it on the spare_workers
315 // list of this Capability. A worker can mark itself as stopped,
316 // in which case it is not replaced on the spare_worker queue.
317 // This happens when the system is shutting down (see
318 // Schedule.c:workerStart()).
319 // Also, be careful to check that this task hasn't just exited
320 // Haskell to do a foreign call (task->suspended_tso).
321 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
322 task->next = cap->spare_workers;
323 cap->spare_workers = task;
325 // Bound tasks just float around attached to their TSOs.
327 releaseCapability_(cap);
329 RELEASE_LOCK(&cap->lock);
333 /* ----------------------------------------------------------------------------
334 * waitForReturnCapability( Task *task )
336 * Purpose: when an OS thread returns from an external call,
337 * it calls waitForReturnCapability() (via Schedule.resumeThread())
338 * to wait for permission to enter the RTS & communicate the
339 * result of the external call back to the Haskell thread that
342 * ------------------------------------------------------------------------- */
344 waitForReturnCapability (Capability **pCap,
345 Task *task UNUSED_IF_NOT_THREADS)
347 #if !defined(THREADED_RTS)
349 MainCapability.running_task = task;
350 task->cap = &MainCapability;
351 *pCap = &MainCapability;
354 Capability *cap = *pCap;
357 // Try last_free_capability first
358 cap = last_free_capability;
359 if (!cap->running_task) {
361 // otherwise, search for a free capability
362 for (i = 0; i < n_capabilities; i++) {
363 cap = &capabilities[i];
364 if (!cap->running_task) {
368 // Can't find a free one, use last_free_capability.
369 cap = last_free_capability;
372 // record the Capability as the one this Task is now assocated with.
376 ASSERT(task->cap == cap);
379 ACQUIRE_LOCK(&cap->lock);
382 sched_belch("returning; I want capability %d", cap->no));
384 if (!cap->running_task) {
385 // It's free; just grab it
386 cap->running_task = task;
387 RELEASE_LOCK(&cap->lock);
389 newReturningTask(cap,task);
390 RELEASE_LOCK(&cap->lock);
393 ACQUIRE_LOCK(&task->lock);
394 // task->lock held, cap->lock not held
395 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
397 task->wakeup = rtsFalse;
398 RELEASE_LOCK(&task->lock);
400 // now check whether we should wake up...
401 ACQUIRE_LOCK(&cap->lock);
402 if (cap->running_task == NULL) {
403 if (cap->returning_tasks_hd != task) {
404 giveCapabilityToTask(cap,cap->returning_tasks_hd);
405 RELEASE_LOCK(&cap->lock);
408 cap->running_task = task;
409 popReturningTask(cap);
410 RELEASE_LOCK(&cap->lock);
413 RELEASE_LOCK(&cap->lock);
418 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
421 sched_belch("returning; got capability %d", cap->no));
427 #if defined(THREADED_RTS)
428 /* ----------------------------------------------------------------------------
430 * ------------------------------------------------------------------------- */
433 yieldCapability (Capability** pCap, Task *task)
435 Capability *cap = *pCap;
437 // The fast path has no locking, if we don't enter this while loop
439 while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
440 IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
442 // We must now release the capability and wait to be woken up
444 task->wakeup = rtsFalse;
445 releaseCapabilityAndQueueWorker(cap);
448 ACQUIRE_LOCK(&task->lock);
449 // task->lock held, cap->lock not held
450 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
452 task->wakeup = rtsFalse;
453 RELEASE_LOCK(&task->lock);
455 IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
456 ACQUIRE_LOCK(&cap->lock);
457 if (cap->running_task != NULL) {
458 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
459 RELEASE_LOCK(&cap->lock);
463 if (task->tso == NULL) {
464 ASSERT(cap->spare_workers != NULL);
465 // if we're not at the front of the queue, release it
466 // again. This is unlikely to happen.
467 if (cap->spare_workers != task) {
468 giveCapabilityToTask(cap,cap->spare_workers);
469 RELEASE_LOCK(&cap->lock);
472 cap->spare_workers = task->next;
475 cap->running_task = task;
476 RELEASE_LOCK(&cap->lock);
480 IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
481 ASSERT(cap->running_task == task);
486 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
491 /* ----------------------------------------------------------------------------
494 * Used to indicate that the interrupted flag is now set, or some
495 * other global condition that might require waking up a Task on each
497 * ------------------------------------------------------------------------- */
500 prodCapabilities(rtsBool all)
506 for (i=0; i < n_capabilities; i++) {
507 cap = &capabilities[i];
508 ACQUIRE_LOCK(&cap->lock);
509 if (!cap->running_task) {
510 if (cap->spare_workers) {
511 task = cap->spare_workers;
512 ASSERT(!task->stopped);
513 giveCapabilityToTask(cap,task);
515 RELEASE_LOCK(&cap->lock);
520 RELEASE_LOCK(&cap->lock);
525 prodAllCapabilities (void)
527 prodCapabilities(rtsTrue);
530 /* ----------------------------------------------------------------------------
533 * Like prodAllCapabilities, but we only require a single Task to wake
534 * up in order to service some global event, such as checking for
535 * deadlock after some idle time has passed.
536 * ------------------------------------------------------------------------- */
539 prodOneCapability (void)
541 prodCapabilities(rtsFalse);
544 /* ----------------------------------------------------------------------------
547 * At shutdown time, we want to let everything exit as cleanly as
548 * possible. For each capability, we let its run queue drain, and
549 * allow the workers to stop.
551 * This function should be called when interrupted and
552 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
553 * will exit the scheduler and call taskStop(), and any bound thread
554 * that wakes up will return to its caller. Runnable threads are
557 * ------------------------------------------------------------------------- */
560 shutdownCapability (Capability *cap, Task *task)
564 ASSERT(interrupted && shutting_down_scheduler);
568 for (i = 0; i < 50; i++) {
569 IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
570 ACQUIRE_LOCK(&cap->lock);
571 if (cap->running_task) {
572 RELEASE_LOCK(&cap->lock);
573 IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
577 cap->running_task = task;
578 if (!emptyRunQueue(cap) || cap->spare_workers) {
579 IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
580 releaseCapability_(cap); // this will wake up a worker
581 RELEASE_LOCK(&cap->lock);
585 IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
586 RELEASE_LOCK(&cap->lock);
589 // we now have the Capability, its run queue and spare workers
590 // list are both empty.
593 /* ----------------------------------------------------------------------------
596 * Attempt to gain control of a Capability if it is free.
598 * ------------------------------------------------------------------------- */
601 tryGrabCapability (Capability *cap, Task *task)
603 if (cap->running_task != NULL) return rtsFalse;
604 ACQUIRE_LOCK(&cap->lock);
605 if (cap->running_task != NULL) {
606 RELEASE_LOCK(&cap->lock);
610 cap->running_task = task;
611 RELEASE_LOCK(&cap->lock);
616 #endif /* THREADED_RTS */