1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2003-2005
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
13 * Only in an SMP build will there be multiple capabilities, for
14 * the threaded RTS and other non-threaded builds, there is only
15 * one global capability, namely MainCapability.
17 * --------------------------------------------------------------------------*/
19 #include "PosixSource.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
28 Capability MainCapability; // for non-SMP, we have one global capability
32 Capability *capabilities = NULL;
34 // Holds the Capability which last became free. This is used so that
35 // an in-call has a chance of quickly finding a free Capability.
36 // Maintaining a global free list of Capabilities would require global
37 // locking, so we don't do that.
38 Capability *last_free_capability;
41 #define UNUSED_IF_NOT_SMP
43 #define UNUSED_IF_NOT_SMP STG_UNUSED
46 #ifdef RTS_USER_SIGNALS
47 #define UNUSED_IF_NOT_THREADS
49 #define UNUSED_IF_NOT_THREADS STG_UNUSED
56 return blackholes_need_checking
58 #if defined(RTS_USER_SIGNALS)
64 #if defined(THREADED_RTS)
66 anyWorkForMe( Capability *cap, Task *task )
68 // If the run queue is not empty, then we only wake up the guy who
69 // can run the thread at the head, even if there is some other
70 // reason for this task to run (eg. interrupted=rtsTrue).
71 if (!emptyRunQueue(cap)) {
72 if (cap->run_queue_hd->bound == NULL) {
73 return (task->tso == NULL);
75 return (cap->run_queue_hd->bound == task);
78 return globalWorkToDo();
82 /* -----------------------------------------------------------------------------
83 * Manage the returning_tasks lists.
85 * These functions require cap->lock
86 * -------------------------------------------------------------------------- */
88 #if defined(THREADED_RTS)
90 newReturningTask (Capability *cap, Task *task)
92 ASSERT_LOCK_HELD(&cap->lock);
93 ASSERT(task->return_link == NULL);
94 if (cap->returning_tasks_hd) {
95 ASSERT(cap->returning_tasks_tl->return_link == NULL);
96 cap->returning_tasks_tl->return_link = task;
98 cap->returning_tasks_hd = task;
100 cap->returning_tasks_tl = task;
104 popReturningTask (Capability *cap)
106 ASSERT_LOCK_HELD(&cap->lock);
108 task = cap->returning_tasks_hd;
110 cap->returning_tasks_hd = task->return_link;
111 if (!cap->returning_tasks_hd) {
112 cap->returning_tasks_tl = NULL;
114 task->return_link = NULL;
119 /* ----------------------------------------------------------------------------
122 * The Capability is initially marked not free.
123 * ------------------------------------------------------------------------- */
126 initCapability( Capability *cap, nat i )
131 cap->in_haskell = rtsFalse;
133 cap->run_queue_hd = END_TSO_QUEUE;
134 cap->run_queue_tl = END_TSO_QUEUE;
136 #if defined(THREADED_RTS)
137 initMutex(&cap->lock);
138 cap->running_task = NULL; // indicates cap is free
139 cap->spare_workers = NULL;
140 cap->suspended_ccalling_tasks = NULL;
141 cap->returning_tasks_hd = NULL;
142 cap->returning_tasks_tl = NULL;
145 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
146 cap->f.stgGCFun = (F_)__stg_gc_fun;
148 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
149 RtsFlags.GcFlags.generations,
152 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
153 cap->mut_lists[g] = NULL;
157 /* ---------------------------------------------------------------------------
158 * Function: initCapabilities()
160 * Purpose: set up the Capability handling. For the SMP build,
161 * we keep a table of them, the size of which is
162 * controlled by the user via the RTS flag -N.
164 * ------------------------------------------------------------------------- */
166 initCapabilities( void )
171 n_capabilities = n = RtsFlags.ParFlags.nNodes;
172 capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
174 for (i = 0; i < n; i++) {
175 initCapability(&capabilities[i], i);
178 IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
181 capabilities = &MainCapability;
182 initCapability(&MainCapability, 0);
185 // There are no free capabilities to begin with. We will start
186 // a worker Task to each Capability, which will quickly put the
187 // Capability on the free list when it finds nothing to do.
188 last_free_capability = &capabilities[0];
191 /* ----------------------------------------------------------------------------
192 * Give a Capability to a Task. The task must currently be sleeping
193 * on its condition variable.
195 * Requires cap->lock (modifies cap->running_task).
197 * When migrating a Task, the migrater must take task->lock before
198 * modifying task->cap, to synchronise with the waking up Task.
199 * Additionally, the migrater should own the Capability (when
200 * migrating the run queue), or cap->lock (when migrating
201 * returning_workers).
203 * ------------------------------------------------------------------------- */
205 #if defined(THREADED_RTS)
207 giveCapabilityToTask (Capability *cap, Task *task)
209 ASSERT_LOCK_HELD(&cap->lock);
210 ASSERT(task->cap == cap);
211 // We are not modifying task->cap, so we do not need to take task->lock.
213 sched_belch("passing capability %d to %s %p",
214 cap->no, task->tso ? "bound task" : "worker",
216 ACQUIRE_LOCK(&task->lock);
217 task->wakeup = rtsTrue;
218 // the wakeup flag is needed because signalCondition() doesn't
219 // flag the condition if the thread is already runniing, but we want
221 signalCondition(&task->cond);
222 RELEASE_LOCK(&task->lock);
226 /* ----------------------------------------------------------------------------
227 * Function: releaseCapability(Capability*)
229 * Purpose: Letting go of a capability. Causes a
230 * 'returning worker' thread or a 'waiting worker'
231 * to wake up, in that order.
232 * ------------------------------------------------------------------------- */
234 #if defined(THREADED_RTS)
236 releaseCapability_ (Capability* cap)
240 task = cap->running_task;
242 ASSERT_CAPABILITY_INVARIANTS(cap,task);
244 cap->running_task = NULL;
246 // Check to see whether a worker thread can be given
247 // the go-ahead to return the result of an external call..
248 if (cap->returning_tasks_hd != NULL) {
249 giveCapabilityToTask(cap,cap->returning_tasks_hd);
250 // The Task pops itself from the queue (see waitForReturnCapability())
254 // If the next thread on the run queue is a bound thread,
255 // give this Capability to the appropriate Task.
256 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
257 // Make sure we're not about to try to wake ourselves up
258 ASSERT(task != cap->run_queue_hd->bound);
259 task = cap->run_queue_hd->bound;
260 giveCapabilityToTask(cap,task);
264 // If we have an unbound thread on the run queue, or if there's
265 // anything else to do, give the Capability to a worker thread.
266 if (!emptyRunQueue(cap) || globalWorkToDo()) {
267 if (cap->spare_workers) {
268 giveCapabilityToTask(cap,cap->spare_workers);
269 // The worker Task pops itself from the queue;
273 // Create a worker thread if we don't have one. If the system
274 // is interrupted, we only create a worker task if there
275 // are threads that need to be completed. If the system is
276 // shutting down, we never create a new worker.
277 if (!shutting_down_scheduler) {
279 sched_belch("starting new worker on capability %d", cap->no));
280 startWorkerTask(cap, workerStart);
285 last_free_capability = cap;
286 IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
290 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
292 ACQUIRE_LOCK(&cap->lock);
293 releaseCapability_(cap);
294 RELEASE_LOCK(&cap->lock);
298 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
302 ACQUIRE_LOCK(&cap->lock);
304 task = cap->running_task;
306 // If the current task is a worker, save it on the spare_workers
307 // list of this Capability. A worker can mark itself as stopped,
308 // in which case it is not replaced on the spare_worker queue.
309 // This happens when the system is shutting down (see
310 // Schedule.c:workerStart()).
311 // Also, be careful to check that this task hasn't just exited
312 // Haskell to do a foreign call (task->suspended_tso).
313 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
314 task->next = cap->spare_workers;
315 cap->spare_workers = task;
317 // Bound tasks just float around attached to their TSOs.
319 releaseCapability_(cap);
321 RELEASE_LOCK(&cap->lock);
325 /* ----------------------------------------------------------------------------
326 * waitForReturnCapability( Task *task )
328 * Purpose: when an OS thread returns from an external call,
329 * it calls waitForReturnCapability() (via Schedule.resumeThread())
330 * to wait for permission to enter the RTS & communicate the
331 * result of the external call back to the Haskell thread that
334 * ------------------------------------------------------------------------- */
336 waitForReturnCapability (Capability **pCap,
337 Task *task UNUSED_IF_NOT_THREADS)
339 #if !defined(THREADED_RTS)
341 MainCapability.running_task = task;
342 task->cap = &MainCapability;
343 *pCap = &MainCapability;
346 Capability *cap = *pCap;
349 // Try last_free_capability first
350 cap = last_free_capability;
351 if (!cap->running_task) {
353 // otherwise, search for a free capability
354 for (i = 0; i < n_capabilities; i++) {
355 cap = &capabilities[i];
356 if (!cap->running_task) {
360 // Can't find a free one, use last_free_capability.
361 cap = last_free_capability;
364 // record the Capability as the one this Task is now assocated with.
368 ASSERT(task->cap == cap);
371 ACQUIRE_LOCK(&cap->lock);
374 sched_belch("returning; I want capability %d", cap->no));
376 if (!cap->running_task) {
377 // It's free; just grab it
378 cap->running_task = task;
379 RELEASE_LOCK(&cap->lock);
381 newReturningTask(cap,task);
382 RELEASE_LOCK(&cap->lock);
385 ACQUIRE_LOCK(&task->lock);
386 // task->lock held, cap->lock not held
387 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
389 task->wakeup = rtsFalse;
390 RELEASE_LOCK(&task->lock);
392 // now check whether we should wake up...
393 ACQUIRE_LOCK(&cap->lock);
394 if (cap->running_task == NULL) {
395 if (cap->returning_tasks_hd != task) {
396 giveCapabilityToTask(cap,cap->returning_tasks_hd);
397 RELEASE_LOCK(&cap->lock);
400 cap->running_task = task;
401 popReturningTask(cap);
402 RELEASE_LOCK(&cap->lock);
405 RELEASE_LOCK(&cap->lock);
410 ASSERT_CAPABILITY_INVARIANTS(cap,task);
413 sched_belch("returning; got capability %d", cap->no));
419 #if defined(THREADED_RTS)
420 /* ----------------------------------------------------------------------------
422 * ------------------------------------------------------------------------- */
425 yieldCapability (Capability** pCap, Task *task)
427 Capability *cap = *pCap;
429 // The fast path; no locking
430 if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
433 while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
434 IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
436 // We must now release the capability and wait to be woken up
438 releaseCapabilityAndQueueWorker(cap);
441 ACQUIRE_LOCK(&task->lock);
442 // task->lock held, cap->lock not held
443 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
445 task->wakeup = rtsFalse;
446 RELEASE_LOCK(&task->lock);
448 IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
449 ACQUIRE_LOCK(&cap->lock);
450 if (cap->running_task != NULL) {
451 RELEASE_LOCK(&cap->lock);
455 if (task->tso == NULL) {
456 ASSERT(cap->spare_workers != NULL);
457 // if we're not at the front of the queue, release it
458 // again. This is unlikely to happen.
459 if (cap->spare_workers != task) {
460 giveCapabilityToTask(cap,cap->spare_workers);
461 RELEASE_LOCK(&cap->lock);
464 cap->spare_workers = task->next;
467 cap->running_task = task;
468 RELEASE_LOCK(&cap->lock);
472 IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
473 ASSERT(cap->running_task == task);
478 ASSERT_CAPABILITY_INVARIANTS(cap,task);
483 /* ----------------------------------------------------------------------------
486 * Used to indicate that the interrupted flag is now set, or some
487 * other global condition that might require waking up a Task on each
489 * ------------------------------------------------------------------------- */
492 prodCapabilities(rtsBool all)
498 for (i=0; i < n_capabilities; i++) {
499 cap = &capabilities[i];
500 ACQUIRE_LOCK(&cap->lock);
501 if (!cap->running_task) {
502 if (cap->spare_workers) {
503 task = cap->spare_workers;
504 ASSERT(!task->stopped);
505 giveCapabilityToTask(cap,task);
507 RELEASE_LOCK(&cap->lock);
512 RELEASE_LOCK(&cap->lock);
517 prodAllCapabilities (void)
519 prodCapabilities(rtsTrue);
522 /* ----------------------------------------------------------------------------
525 * Like prodAllCapabilities, but we only require a single Task to wake
526 * up in order to service some global event, such as checking for
527 * deadlock after some idle time has passed.
528 * ------------------------------------------------------------------------- */
531 prodOneCapability (void)
533 prodCapabilities(rtsFalse);
536 /* ----------------------------------------------------------------------------
539 * At shutdown time, we want to let everything exit as cleanly as
540 * possible. For each capability, we let its run queue drain, and
541 * allow the workers to stop.
543 * This function should be called when interrupted and
544 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
545 * will exit the scheduler and call taskStop(), and any bound thread
546 * that wakes up will return to its caller. Runnable threads are
549 * ------------------------------------------------------------------------- */
552 shutdownCapability (Capability *cap, Task *task)
556 ASSERT(interrupted && shutting_down_scheduler);
560 for (i = 0; i < 50; i++) {
561 IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
562 ACQUIRE_LOCK(&cap->lock);
563 if (cap->running_task) {
564 RELEASE_LOCK(&cap->lock);
565 IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
569 cap->running_task = task;
570 if (!emptyRunQueue(cap) || cap->spare_workers) {
571 IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
572 releaseCapability_(cap); // this will wake up a worker
573 RELEASE_LOCK(&cap->lock);
577 IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
578 RELEASE_LOCK(&cap->lock);
581 // we now have the Capability, its run queue and spare workers
582 // list are both empty.
585 /* ----------------------------------------------------------------------------
588 * Attempt to gain control of a Capability if it is free.
590 * ------------------------------------------------------------------------- */
593 tryGrabCapability (Capability *cap, Task *task)
595 if (cap->running_task != NULL) return rtsFalse;
596 ACQUIRE_LOCK(&cap->lock);
597 if (cap->running_task != NULL) {
598 RELEASE_LOCK(&cap->lock);
602 cap->running_task = task;
603 RELEASE_LOCK(&cap->lock);
608 #endif /* THREADED_RTS */