Reorganisation of the source tree
[ghc-hetmet.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #if defined(THREADED_RTS)
43 STATIC_INLINE rtsBool
44 globalWorkToDo (void)
45 {
46     return blackholes_need_checking
47         || sched_state >= SCHED_INTERRUPTING
48         ;
49 }
50 #endif
51
52 #if defined(THREADED_RTS)
53 STATIC_INLINE rtsBool
54 anyWorkForMe( Capability *cap, Task *task )
55 {
56     if (task->tso != NULL) {
57         // A bound task only runs if its thread is on the run queue of
58         // the capability on which it was woken up.  Otherwise, we
59         // can't be sure that we have the right capability: the thread
60         // might be woken up on some other capability, and task->cap
61         // could change under our feet.
62         return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
63     } else {
64         // A vanilla worker task runs if either there is a lightweight
65         // thread at the head of the run queue, or the run queue is
66         // empty and (there are sparks to execute, or there is some
67         // other global condition to check, such as threads blocked on
68         // blackholes).
69         if (emptyRunQueue(cap)) {
70             return !emptySparkPoolCap(cap)
71                 || !emptyWakeupQueue(cap)
72                 || globalWorkToDo();
73         } else
74             return cap->run_queue_hd->bound == NULL;
75     }
76 }
77 #endif
78
79 /* -----------------------------------------------------------------------------
80  * Manage the returning_tasks lists.
81  *
82  * These functions require cap->lock
83  * -------------------------------------------------------------------------- */
84
85 #if defined(THREADED_RTS)
86 STATIC_INLINE void
87 newReturningTask (Capability *cap, Task *task)
88 {
89     ASSERT_LOCK_HELD(&cap->lock);
90     ASSERT(task->return_link == NULL);
91     if (cap->returning_tasks_hd) {
92         ASSERT(cap->returning_tasks_tl->return_link == NULL);
93         cap->returning_tasks_tl->return_link = task;
94     } else {
95         cap->returning_tasks_hd = task;
96     }
97     cap->returning_tasks_tl = task;
98 }
99
100 STATIC_INLINE Task *
101 popReturningTask (Capability *cap)
102 {
103     ASSERT_LOCK_HELD(&cap->lock);
104     Task *task;
105     task = cap->returning_tasks_hd;
106     ASSERT(task);
107     cap->returning_tasks_hd = task->return_link;
108     if (!cap->returning_tasks_hd) {
109         cap->returning_tasks_tl = NULL;
110     }
111     task->return_link = NULL;
112     return task;
113 }
114 #endif
115
116 /* ----------------------------------------------------------------------------
117  * Initialisation
118  *
119  * The Capability is initially marked not free.
120  * ------------------------------------------------------------------------- */
121
122 static void
123 initCapability( Capability *cap, nat i )
124 {
125     nat g;
126
127     cap->no = i;
128     cap->in_haskell        = rtsFalse;
129
130     cap->run_queue_hd      = END_TSO_QUEUE;
131     cap->run_queue_tl      = END_TSO_QUEUE;
132
133 #if defined(THREADED_RTS)
134     initMutex(&cap->lock);
135     cap->running_task      = NULL; // indicates cap is free
136     cap->spare_workers     = NULL;
137     cap->suspended_ccalling_tasks = NULL;
138     cap->returning_tasks_hd = NULL;
139     cap->returning_tasks_tl = NULL;
140     cap->wakeup_queue_hd    = END_TSO_QUEUE;
141     cap->wakeup_queue_tl    = END_TSO_QUEUE;
142 #endif
143
144     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
145     cap->f.stgGCFun        = (F_)__stg_gc_fun;
146
147     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
148                                      RtsFlags.GcFlags.generations,
149                                      "initCapability");
150
151     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
152         cap->mut_lists[g] = NULL;
153     }
154
155     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
156     cap->free_trec_chunks = END_STM_CHUNK_LIST;
157     cap->free_trec_headers = NO_TREC;
158     cap->transaction_tokens = 0;
159 }
160
161 /* ---------------------------------------------------------------------------
162  * Function:  initCapabilities()
163  *
164  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
165  *            we keep a table of them, the size of which is
166  *            controlled by the user via the RTS flag -N.
167  *
168  * ------------------------------------------------------------------------- */
169 void
170 initCapabilities( void )
171 {
172 #if defined(THREADED_RTS)
173     nat i;
174
175 #ifndef REG_Base
176     // We can't support multiple CPUs if BaseReg is not a register
177     if (RtsFlags.ParFlags.nNodes > 1) {
178         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
179         RtsFlags.ParFlags.nNodes = 1;
180     }
181 #endif
182
183     n_capabilities = RtsFlags.ParFlags.nNodes;
184
185     if (n_capabilities == 1) {
186         capabilities = &MainCapability;
187         // THREADED_RTS must work on builds that don't have a mutable
188         // BaseReg (eg. unregisterised), so in this case
189         // capabilities[0] must coincide with &MainCapability.
190     } else {
191         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
192                                       "initCapabilities");
193     }
194
195     for (i = 0; i < n_capabilities; i++) {
196         initCapability(&capabilities[i], i);
197     }
198
199     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", 
200                                     n_capabilities));
201
202 #else /* !THREADED_RTS */
203
204     n_capabilities = 1;
205     capabilities = &MainCapability;
206     initCapability(&MainCapability, 0);
207
208 #endif
209
210     // There are no free capabilities to begin with.  We will start
211     // a worker Task to each Capability, which will quickly put the
212     // Capability on the free list when it finds nothing to do.
213     last_free_capability = &capabilities[0];
214 }
215
216 /* ----------------------------------------------------------------------------
217  * Give a Capability to a Task.  The task must currently be sleeping
218  * on its condition variable.
219  *
220  * Requires cap->lock (modifies cap->running_task).
221  *
222  * When migrating a Task, the migrater must take task->lock before
223  * modifying task->cap, to synchronise with the waking up Task.
224  * Additionally, the migrater should own the Capability (when
225  * migrating the run queue), or cap->lock (when migrating
226  * returning_workers).
227  *
228  * ------------------------------------------------------------------------- */
229
230 #if defined(THREADED_RTS)
231 STATIC_INLINE void
232 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
233 {
234     ASSERT_LOCK_HELD(&cap->lock);
235     ASSERT(task->cap == cap);
236     IF_DEBUG(scheduler,
237              sched_belch("passing capability %d to %s %p",
238                          cap->no, task->tso ? "bound task" : "worker",
239                          (void *)task->id));
240     ACQUIRE_LOCK(&task->lock);
241     task->wakeup = rtsTrue;
242     // the wakeup flag is needed because signalCondition() doesn't
243     // flag the condition if the thread is already runniing, but we want
244     // it to be sticky.
245     signalCondition(&task->cond);
246     RELEASE_LOCK(&task->lock);
247 }
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Function:  releaseCapability(Capability*)
252  *
253  * Purpose:   Letting go of a capability. Causes a
254  *            'returning worker' thread or a 'waiting worker'
255  *            to wake up, in that order.
256  * ------------------------------------------------------------------------- */
257
258 #if defined(THREADED_RTS)
259 void
260 releaseCapability_ (Capability* cap)
261 {
262     Task *task;
263
264     task = cap->running_task;
265
266     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
267
268     cap->running_task = NULL;
269
270     // Check to see whether a worker thread can be given
271     // the go-ahead to return the result of an external call..
272     if (cap->returning_tasks_hd != NULL) {
273         giveCapabilityToTask(cap,cap->returning_tasks_hd);
274         // The Task pops itself from the queue (see waitForReturnCapability())
275         return;
276     }
277
278     // If the next thread on the run queue is a bound thread,
279     // give this Capability to the appropriate Task.
280     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
281         // Make sure we're not about to try to wake ourselves up
282         ASSERT(task != cap->run_queue_hd->bound);
283         task = cap->run_queue_hd->bound;
284         giveCapabilityToTask(cap,task);
285         return;
286     }
287
288     if (!cap->spare_workers) {
289         // Create a worker thread if we don't have one.  If the system
290         // is interrupted, we only create a worker task if there
291         // are threads that need to be completed.  If the system is
292         // shutting down, we never create a new worker.
293         if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
294             IF_DEBUG(scheduler,
295                      sched_belch("starting new worker on capability %d", cap->no));
296             startWorkerTask(cap, workerStart);
297             return;
298         }
299     }
300
301     // If we have an unbound thread on the run queue, or if there's
302     // anything else to do, give the Capability to a worker thread.
303     if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
304               || !emptySparkPoolCap(cap) || globalWorkToDo()) {
305         if (cap->spare_workers) {
306             giveCapabilityToTask(cap,cap->spare_workers);
307             // The worker Task pops itself from the queue;
308             return;
309         }
310     }
311
312     last_free_capability = cap;
313     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
314 }
315
316 void
317 releaseCapability (Capability* cap USED_IF_THREADS)
318 {
319     ACQUIRE_LOCK(&cap->lock);
320     releaseCapability_(cap);
321     RELEASE_LOCK(&cap->lock);
322 }
323
324 static void
325 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
326 {
327     Task *task;
328
329     ACQUIRE_LOCK(&cap->lock);
330
331     task = cap->running_task;
332
333     // If the current task is a worker, save it on the spare_workers
334     // list of this Capability.  A worker can mark itself as stopped,
335     // in which case it is not replaced on the spare_worker queue.
336     // This happens when the system is shutting down (see
337     // Schedule.c:workerStart()).
338     // Also, be careful to check that this task hasn't just exited
339     // Haskell to do a foreign call (task->suspended_tso).
340     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
341         task->next = cap->spare_workers;
342         cap->spare_workers = task;
343     }
344     // Bound tasks just float around attached to their TSOs.
345
346     releaseCapability_(cap);
347
348     RELEASE_LOCK(&cap->lock);
349 }
350 #endif
351
352 /* ----------------------------------------------------------------------------
353  * waitForReturnCapability( Task *task )
354  *
355  * Purpose:  when an OS thread returns from an external call,
356  * it calls waitForReturnCapability() (via Schedule.resumeThread())
357  * to wait for permission to enter the RTS & communicate the
358  * result of the external call back to the Haskell thread that
359  * made it.
360  *
361  * ------------------------------------------------------------------------- */
362 void
363 waitForReturnCapability (Capability **pCap, Task *task)
364 {
365 #if !defined(THREADED_RTS)
366
367     MainCapability.running_task = task;
368     task->cap = &MainCapability;
369     *pCap = &MainCapability;
370
371 #else
372     Capability *cap = *pCap;
373
374     if (cap == NULL) {
375         // Try last_free_capability first
376         cap = last_free_capability;
377         if (!cap->running_task) {
378             nat i;
379             // otherwise, search for a free capability
380             for (i = 0; i < n_capabilities; i++) {
381                 cap = &capabilities[i];
382                 if (!cap->running_task) {
383                     break;
384                 }
385             }
386             // Can't find a free one, use last_free_capability.
387             cap = last_free_capability;
388         }
389
390         // record the Capability as the one this Task is now assocated with.
391         task->cap = cap;
392
393     } else {
394         ASSERT(task->cap == cap);
395     }
396
397     ACQUIRE_LOCK(&cap->lock);
398
399     IF_DEBUG(scheduler,
400              sched_belch("returning; I want capability %d", cap->no));
401
402     if (!cap->running_task) {
403         // It's free; just grab it
404         cap->running_task = task;
405         RELEASE_LOCK(&cap->lock);
406     } else {
407         newReturningTask(cap,task);
408         RELEASE_LOCK(&cap->lock);
409
410         for (;;) {
411             ACQUIRE_LOCK(&task->lock);
412             // task->lock held, cap->lock not held
413             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
414             cap = task->cap;
415             task->wakeup = rtsFalse;
416             RELEASE_LOCK(&task->lock);
417
418             // now check whether we should wake up...
419             ACQUIRE_LOCK(&cap->lock);
420             if (cap->running_task == NULL) {
421                 if (cap->returning_tasks_hd != task) {
422                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
423                     RELEASE_LOCK(&cap->lock);
424                     continue;
425                 }
426                 cap->running_task = task;
427                 popReturningTask(cap);
428                 RELEASE_LOCK(&cap->lock);
429                 break;
430             }
431             RELEASE_LOCK(&cap->lock);
432         }
433
434     }
435
436     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
437
438     IF_DEBUG(scheduler,
439              sched_belch("returning; got capability %d", cap->no));
440
441     *pCap = cap;
442 #endif
443 }
444
445 #if defined(THREADED_RTS)
446 /* ----------------------------------------------------------------------------
447  * yieldCapability
448  * ------------------------------------------------------------------------- */
449
450 void
451 yieldCapability (Capability** pCap, Task *task)
452 {
453     Capability *cap = *pCap;
454
455     // The fast path has no locking, if we don't enter this while loop
456
457     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
458         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
459
460         // We must now release the capability and wait to be woken up
461         // again.
462         task->wakeup = rtsFalse;
463         releaseCapabilityAndQueueWorker(cap);
464
465         for (;;) {
466             ACQUIRE_LOCK(&task->lock);
467             // task->lock held, cap->lock not held
468             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
469             cap = task->cap;
470             task->wakeup = rtsFalse;
471             RELEASE_LOCK(&task->lock);
472
473             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
474             ACQUIRE_LOCK(&cap->lock);
475             if (cap->running_task != NULL) {
476                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
477                 RELEASE_LOCK(&cap->lock);
478                 continue;
479             }
480
481             if (task->tso == NULL) {
482                 ASSERT(cap->spare_workers != NULL);
483                 // if we're not at the front of the queue, release it
484                 // again.  This is unlikely to happen.
485                 if (cap->spare_workers != task) {
486                     giveCapabilityToTask(cap,cap->spare_workers);
487                     RELEASE_LOCK(&cap->lock);
488                     continue;
489                 }
490                 cap->spare_workers = task->next;
491                 task->next = NULL;
492             }
493             cap->running_task = task;
494             RELEASE_LOCK(&cap->lock);
495             break;
496         }
497
498         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
499         ASSERT(cap->running_task == task);
500     }
501
502     *pCap = cap;
503
504     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505
506     return;
507 }
508
509 /* ----------------------------------------------------------------------------
510  * Wake up a thread on a Capability.
511  *
512  * This is used when the current Task is running on a Capability and
513  * wishes to wake up a thread on a different Capability.
514  * ------------------------------------------------------------------------- */
515
516 void
517 wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
518 {
519     ASSERT(tso->cap == cap);
520     ASSERT(tso->bound ? tso->bound->cap == cap : 1);
521
522     ACQUIRE_LOCK(&cap->lock);
523     if (cap->running_task == NULL) {
524         // nobody is running this Capability, we can add our thread
525         // directly onto the run queue and start up a Task to run it.
526         appendToRunQueue(cap,tso);
527
528         // start it up
529         cap->running_task = myTask(); // precond for releaseCapability_()
530         releaseCapability_(cap);
531     } else {
532         appendToWakeupQueue(cap,tso);
533         // someone is running on this Capability, so it cannot be
534         // freed without first checking the wakeup queue (see
535         // releaseCapability_).
536     }
537     RELEASE_LOCK(&cap->lock);
538 }
539
540 /* ----------------------------------------------------------------------------
541  * prodCapabilities
542  *
543  * Used to indicate that the interrupted flag is now set, or some
544  * other global condition that might require waking up a Task on each
545  * Capability.
546  * ------------------------------------------------------------------------- */
547
548 static void
549 prodCapabilities(rtsBool all)
550 {
551     nat i;
552     Capability *cap;
553     Task *task;
554
555     for (i=0; i < n_capabilities; i++) {
556         cap = &capabilities[i];
557         ACQUIRE_LOCK(&cap->lock);
558         if (!cap->running_task) {
559             if (cap->spare_workers) {
560                 task = cap->spare_workers;
561                 ASSERT(!task->stopped);
562                 giveCapabilityToTask(cap,task);
563                 if (!all) {
564                     RELEASE_LOCK(&cap->lock);
565                     return;
566                 }
567             }
568         }
569         RELEASE_LOCK(&cap->lock);
570     }
571     return;
572 }
573
574 void
575 prodAllCapabilities (void)
576 {
577     prodCapabilities(rtsTrue);
578 }
579
580 /* ----------------------------------------------------------------------------
581  * prodOneCapability
582  *
583  * Like prodAllCapabilities, but we only require a single Task to wake
584  * up in order to service some global event, such as checking for
585  * deadlock after some idle time has passed.
586  * ------------------------------------------------------------------------- */
587
588 void
589 prodOneCapability (void)
590 {
591     prodCapabilities(rtsFalse);
592 }
593
594 /* ----------------------------------------------------------------------------
595  * shutdownCapability
596  *
597  * At shutdown time, we want to let everything exit as cleanly as
598  * possible.  For each capability, we let its run queue drain, and
599  * allow the workers to stop.
600  *
601  * This function should be called when interrupted and
602  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
603  * will exit the scheduler and call taskStop(), and any bound thread
604  * that wakes up will return to its caller.  Runnable threads are
605  * killed.
606  *
607  * ------------------------------------------------------------------------- */
608
609 void
610 shutdownCapability (Capability *cap, Task *task)
611 {
612     nat i;
613
614     ASSERT(sched_state == SCHED_SHUTTING_DOWN);
615
616     task->cap = cap;
617
618     for (i = 0; i < 50; i++) {
619         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
620         ACQUIRE_LOCK(&cap->lock);
621         if (cap->running_task) {
622             RELEASE_LOCK(&cap->lock);
623             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
624             yieldThread();
625             continue;
626         }
627         cap->running_task = task;
628         if (!emptyRunQueue(cap) || cap->spare_workers) {
629             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
630             releaseCapability_(cap); // this will wake up a worker
631             RELEASE_LOCK(&cap->lock);
632             yieldThread();
633             continue;
634         }
635         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
636         RELEASE_LOCK(&cap->lock);
637         break;
638     }
639     // we now have the Capability, its run queue and spare workers
640     // list are both empty.
641 }
642
643 /* ----------------------------------------------------------------------------
644  * tryGrabCapability
645  *
646  * Attempt to gain control of a Capability if it is free.
647  *
648  * ------------------------------------------------------------------------- */
649
650 rtsBool
651 tryGrabCapability (Capability *cap, Task *task)
652 {
653     if (cap->running_task != NULL) return rtsFalse;
654     ACQUIRE_LOCK(&cap->lock);
655     if (cap->running_task != NULL) {
656         RELEASE_LOCK(&cap->lock);
657         return rtsFalse;
658     }
659     task->cap = cap;
660     cap->running_task = task;
661     RELEASE_LOCK(&cap->lock);
662     return rtsTrue;
663 }
664
665
666 #endif /* THREADED_RTS */
667
668