1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2003-2005
7 * A Capability represent the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
13 * Only in an SMP build will there be multiple capabilities, for
14 * the threaded RTS and other non-threaded builds, there is only
15 * one global capability, namely MainCapability.
17 * --------------------------------------------------------------------------*/
19 #include "PosixSource.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
29 Capability MainCapability; // for non-SMP, we have one global capability
33 Capability *capabilities = NULL;
35 // Holds the Capability which last became free. This is used so that
36 // an in-call has a chance of quickly finding a free Capability.
37 // Maintaining a global free list of Capabilities would require global
38 // locking, so we don't do that.
39 Capability *last_free_capability;
42 #define UNUSED_IF_NOT_SMP
44 #define UNUSED_IF_NOT_SMP STG_UNUSED
47 #ifdef RTS_USER_SIGNALS
48 #define UNUSED_IF_NOT_THREADS
50 #define UNUSED_IF_NOT_THREADS STG_UNUSED
57 return blackholes_need_checking
59 #if defined(RTS_USER_SIGNALS)
65 #if defined(THREADED_RTS)
67 anyWorkForMe( Capability *cap, Task *task )
69 // If the run queue is not empty, then we only wake up the guy who
70 // can run the thread at the head, even if there is some other
71 // reason for this task to run (eg. interrupted=rtsTrue).
72 if (!emptyRunQueue(cap)) {
73 if (cap->run_queue_hd->bound == NULL) {
74 return (task->tso == NULL);
76 return (cap->run_queue_hd->bound == task);
78 } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
81 return globalWorkToDo();
85 /* -----------------------------------------------------------------------------
86 * Manage the returning_tasks lists.
88 * These functions require cap->lock
89 * -------------------------------------------------------------------------- */
91 #if defined(THREADED_RTS)
93 newReturningTask (Capability *cap, Task *task)
95 ASSERT_LOCK_HELD(&cap->lock);
96 ASSERT(task->return_link == NULL);
97 if (cap->returning_tasks_hd) {
98 ASSERT(cap->returning_tasks_tl->return_link == NULL);
99 cap->returning_tasks_tl->return_link = task;
101 cap->returning_tasks_hd = task;
103 cap->returning_tasks_tl = task;
107 popReturningTask (Capability *cap)
109 ASSERT_LOCK_HELD(&cap->lock);
111 task = cap->returning_tasks_hd;
113 cap->returning_tasks_hd = task->return_link;
114 if (!cap->returning_tasks_hd) {
115 cap->returning_tasks_tl = NULL;
117 task->return_link = NULL;
122 /* ----------------------------------------------------------------------------
125 * The Capability is initially marked not free.
126 * ------------------------------------------------------------------------- */
129 initCapability( Capability *cap, nat i )
134 cap->in_haskell = rtsFalse;
136 cap->run_queue_hd = END_TSO_QUEUE;
137 cap->run_queue_tl = END_TSO_QUEUE;
139 #if defined(THREADED_RTS)
140 initMutex(&cap->lock);
141 cap->running_task = NULL; // indicates cap is free
142 cap->spare_workers = NULL;
143 cap->suspended_ccalling_tasks = NULL;
144 cap->returning_tasks_hd = NULL;
145 cap->returning_tasks_tl = NULL;
148 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
149 cap->f.stgGCFun = (F_)__stg_gc_fun;
151 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
152 RtsFlags.GcFlags.generations,
155 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
156 cap->mut_lists[g] = NULL;
160 /* ---------------------------------------------------------------------------
161 * Function: initCapabilities()
163 * Purpose: set up the Capability handling. For the SMP build,
164 * we keep a table of them, the size of which is
165 * controlled by the user via the RTS flag -N.
167 * ------------------------------------------------------------------------- */
169 initCapabilities( void )
174 n_capabilities = n = RtsFlags.ParFlags.nNodes;
175 capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
177 for (i = 0; i < n; i++) {
178 initCapability(&capabilities[i], i);
181 IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
184 capabilities = &MainCapability;
185 initCapability(&MainCapability, 0);
188 // There are no free capabilities to begin with. We will start
189 // a worker Task to each Capability, which will quickly put the
190 // Capability on the free list when it finds nothing to do.
191 last_free_capability = &capabilities[0];
194 /* ----------------------------------------------------------------------------
195 * Give a Capability to a Task. The task must currently be sleeping
196 * on its condition variable.
198 * Requires cap->lock (modifies cap->running_task).
200 * When migrating a Task, the migrater must take task->lock before
201 * modifying task->cap, to synchronise with the waking up Task.
202 * Additionally, the migrater should own the Capability (when
203 * migrating the run queue), or cap->lock (when migrating
204 * returning_workers).
206 * ------------------------------------------------------------------------- */
208 #if defined(THREADED_RTS)
210 giveCapabilityToTask (Capability *cap, Task *task)
212 ASSERT_LOCK_HELD(&cap->lock);
213 ASSERT(task->cap == cap);
214 // We are not modifying task->cap, so we do not need to take task->lock.
216 sched_belch("passing capability %d to %s %p",
217 cap->no, task->tso ? "bound task" : "worker",
219 ACQUIRE_LOCK(&task->lock);
220 task->wakeup = rtsTrue;
221 // the wakeup flag is needed because signalCondition() doesn't
222 // flag the condition if the thread is already runniing, but we want
224 signalCondition(&task->cond);
225 RELEASE_LOCK(&task->lock);
229 /* ----------------------------------------------------------------------------
230 * Function: releaseCapability(Capability*)
232 * Purpose: Letting go of a capability. Causes a
233 * 'returning worker' thread or a 'waiting worker'
234 * to wake up, in that order.
235 * ------------------------------------------------------------------------- */
237 #if defined(THREADED_RTS)
239 releaseCapability_ (Capability* cap)
243 task = cap->running_task;
245 ASSERT_CAPABILITY_INVARIANTS(cap,task);
247 cap->running_task = NULL;
249 // Check to see whether a worker thread can be given
250 // the go-ahead to return the result of an external call..
251 if (cap->returning_tasks_hd != NULL) {
252 giveCapabilityToTask(cap,cap->returning_tasks_hd);
253 // The Task pops itself from the queue (see waitForReturnCapability())
257 // If the next thread on the run queue is a bound thread,
258 // give this Capability to the appropriate Task.
259 if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
260 // Make sure we're not about to try to wake ourselves up
261 ASSERT(task != cap->run_queue_hd->bound);
262 task = cap->run_queue_hd->bound;
263 giveCapabilityToTask(cap,task);
267 // If we have an unbound thread on the run queue, or if there's
268 // anything else to do, give the Capability to a worker thread.
269 if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
270 if (cap->spare_workers) {
271 giveCapabilityToTask(cap,cap->spare_workers);
272 // The worker Task pops itself from the queue;
276 // Create a worker thread if we don't have one. If the system
277 // is interrupted, we only create a worker task if there
278 // are threads that need to be completed. If the system is
279 // shutting down, we never create a new worker.
280 if (!shutting_down_scheduler) {
282 sched_belch("starting new worker on capability %d", cap->no));
283 startWorkerTask(cap, workerStart);
288 last_free_capability = cap;
289 IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
293 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
295 ACQUIRE_LOCK(&cap->lock);
296 releaseCapability_(cap);
297 RELEASE_LOCK(&cap->lock);
301 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
305 ACQUIRE_LOCK(&cap->lock);
307 task = cap->running_task;
309 // If the current task is a worker, save it on the spare_workers
310 // list of this Capability. A worker can mark itself as stopped,
311 // in which case it is not replaced on the spare_worker queue.
312 // This happens when the system is shutting down (see
313 // Schedule.c:workerStart()).
314 // Also, be careful to check that this task hasn't just exited
315 // Haskell to do a foreign call (task->suspended_tso).
316 if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
317 task->next = cap->spare_workers;
318 cap->spare_workers = task;
320 // Bound tasks just float around attached to their TSOs.
322 releaseCapability_(cap);
324 RELEASE_LOCK(&cap->lock);
328 /* ----------------------------------------------------------------------------
329 * waitForReturnCapability( Task *task )
331 * Purpose: when an OS thread returns from an external call,
332 * it calls waitForReturnCapability() (via Schedule.resumeThread())
333 * to wait for permission to enter the RTS & communicate the
334 * result of the external call back to the Haskell thread that
337 * ------------------------------------------------------------------------- */
339 waitForReturnCapability (Capability **pCap,
340 Task *task UNUSED_IF_NOT_THREADS)
342 #if !defined(THREADED_RTS)
344 MainCapability.running_task = task;
345 task->cap = &MainCapability;
346 *pCap = &MainCapability;
349 Capability *cap = *pCap;
352 // Try last_free_capability first
353 cap = last_free_capability;
354 if (!cap->running_task) {
356 // otherwise, search for a free capability
357 for (i = 0; i < n_capabilities; i++) {
358 cap = &capabilities[i];
359 if (!cap->running_task) {
363 // Can't find a free one, use last_free_capability.
364 cap = last_free_capability;
367 // record the Capability as the one this Task is now assocated with.
371 ASSERT(task->cap == cap);
374 ACQUIRE_LOCK(&cap->lock);
377 sched_belch("returning; I want capability %d", cap->no));
379 if (!cap->running_task) {
380 // It's free; just grab it
381 cap->running_task = task;
382 RELEASE_LOCK(&cap->lock);
384 newReturningTask(cap,task);
385 RELEASE_LOCK(&cap->lock);
388 ACQUIRE_LOCK(&task->lock);
389 // task->lock held, cap->lock not held
390 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
392 task->wakeup = rtsFalse;
393 RELEASE_LOCK(&task->lock);
395 // now check whether we should wake up...
396 ACQUIRE_LOCK(&cap->lock);
397 if (cap->running_task == NULL) {
398 if (cap->returning_tasks_hd != task) {
399 giveCapabilityToTask(cap,cap->returning_tasks_hd);
400 RELEASE_LOCK(&cap->lock);
403 cap->running_task = task;
404 popReturningTask(cap);
405 RELEASE_LOCK(&cap->lock);
408 RELEASE_LOCK(&cap->lock);
413 ASSERT_CAPABILITY_INVARIANTS(cap,task);
416 sched_belch("returning; got capability %d", cap->no));
422 #if defined(THREADED_RTS)
423 /* ----------------------------------------------------------------------------
425 * ------------------------------------------------------------------------- */
428 yieldCapability (Capability** pCap, Task *task)
430 Capability *cap = *pCap;
432 // The fast path; no locking
433 if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
436 while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
437 IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
439 // We must now release the capability and wait to be woken up
441 releaseCapabilityAndQueueWorker(cap);
444 ACQUIRE_LOCK(&task->lock);
445 // task->lock held, cap->lock not held
446 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
448 task->wakeup = rtsFalse;
449 RELEASE_LOCK(&task->lock);
451 IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
452 ACQUIRE_LOCK(&cap->lock);
453 if (cap->running_task != NULL) {
454 RELEASE_LOCK(&cap->lock);
458 if (task->tso == NULL) {
459 ASSERT(cap->spare_workers != NULL);
460 // if we're not at the front of the queue, release it
461 // again. This is unlikely to happen.
462 if (cap->spare_workers != task) {
463 giveCapabilityToTask(cap,cap->spare_workers);
464 RELEASE_LOCK(&cap->lock);
467 cap->spare_workers = task->next;
470 cap->running_task = task;
471 RELEASE_LOCK(&cap->lock);
475 IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
476 ASSERT(cap->running_task == task);
481 ASSERT_CAPABILITY_INVARIANTS(cap,task);
486 /* ----------------------------------------------------------------------------
489 * Used to indicate that the interrupted flag is now set, or some
490 * other global condition that might require waking up a Task on each
492 * ------------------------------------------------------------------------- */
495 prodCapabilities(rtsBool all)
501 for (i=0; i < n_capabilities; i++) {
502 cap = &capabilities[i];
503 ACQUIRE_LOCK(&cap->lock);
504 if (!cap->running_task) {
505 if (cap->spare_workers) {
506 task = cap->spare_workers;
507 ASSERT(!task->stopped);
508 giveCapabilityToTask(cap,task);
510 RELEASE_LOCK(&cap->lock);
515 RELEASE_LOCK(&cap->lock);
520 prodAllCapabilities (void)
522 prodCapabilities(rtsTrue);
525 /* ----------------------------------------------------------------------------
528 * Like prodAllCapabilities, but we only require a single Task to wake
529 * up in order to service some global event, such as checking for
530 * deadlock after some idle time has passed.
531 * ------------------------------------------------------------------------- */
534 prodOneCapability (void)
536 prodCapabilities(rtsFalse);
539 /* ----------------------------------------------------------------------------
542 * At shutdown time, we want to let everything exit as cleanly as
543 * possible. For each capability, we let its run queue drain, and
544 * allow the workers to stop.
546 * This function should be called when interrupted and
547 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
548 * will exit the scheduler and call taskStop(), and any bound thread
549 * that wakes up will return to its caller. Runnable threads are
552 * ------------------------------------------------------------------------- */
555 shutdownCapability (Capability *cap, Task *task)
559 ASSERT(interrupted && shutting_down_scheduler);
563 for (i = 0; i < 50; i++) {
564 IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
565 ACQUIRE_LOCK(&cap->lock);
566 if (cap->running_task) {
567 RELEASE_LOCK(&cap->lock);
568 IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
572 cap->running_task = task;
573 if (!emptyRunQueue(cap) || cap->spare_workers) {
574 IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
575 releaseCapability_(cap); // this will wake up a worker
576 RELEASE_LOCK(&cap->lock);
580 IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
581 RELEASE_LOCK(&cap->lock);
584 // we now have the Capability, its run queue and spare workers
585 // list are both empty.
588 /* ----------------------------------------------------------------------------
591 * Attempt to gain control of a Capability if it is free.
593 * ------------------------------------------------------------------------- */
596 tryGrabCapability (Capability *cap, Task *task)
598 if (cap->running_task != NULL) return rtsFalse;
599 ACQUIRE_LOCK(&cap->lock);
600 if (cap->running_task != NULL) {
601 RELEASE_LOCK(&cap->lock);
605 cap->running_task = task;
606 RELEASE_LOCK(&cap->lock);
611 #endif /* THREADED_RTS */