[project @ 2006-01-18 10:31:50 by simonmar]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 #if !defined(SMP)
30 Capability MainCapability;     // for non-SMP, we have one global capability
31 #endif
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #if defined(THREADED_RTS)
43 STATIC_INLINE rtsBool
44 globalWorkToDo (void)
45 {
46     return blackholes_need_checking
47         || interrupted
48         ;
49 }
50 #endif
51
52 #if defined(THREADED_RTS)
53 STATIC_INLINE rtsBool
54 anyWorkForMe( Capability *cap, Task *task )
55 {
56     // If the run queue is not empty, then we only wake up the guy who
57     // can run the thread at the head, even if there is some other
58     // reason for this task to run (eg. interrupted=rtsTrue).
59     if (!emptyRunQueue(cap)) {
60         if (cap->run_queue_hd->bound == NULL) {
61             return (task->tso == NULL);
62         } else {
63             return (cap->run_queue_hd->bound == task);
64         }
65     } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
66         return rtsTrue;
67     }
68     return globalWorkToDo();
69 }
70 #endif
71
72 /* -----------------------------------------------------------------------------
73  * Manage the returning_tasks lists.
74  *
75  * These functions require cap->lock
76  * -------------------------------------------------------------------------- */
77
78 #if defined(THREADED_RTS)
79 STATIC_INLINE void
80 newReturningTask (Capability *cap, Task *task)
81 {
82     ASSERT_LOCK_HELD(&cap->lock);
83     ASSERT(task->return_link == NULL);
84     if (cap->returning_tasks_hd) {
85         ASSERT(cap->returning_tasks_tl->return_link == NULL);
86         cap->returning_tasks_tl->return_link = task;
87     } else {
88         cap->returning_tasks_hd = task;
89     }
90     cap->returning_tasks_tl = task;
91 }
92
93 STATIC_INLINE Task *
94 popReturningTask (Capability *cap)
95 {
96     ASSERT_LOCK_HELD(&cap->lock);
97     Task *task;
98     task = cap->returning_tasks_hd;
99     ASSERT(task);
100     cap->returning_tasks_hd = task->return_link;
101     if (!cap->returning_tasks_hd) {
102         cap->returning_tasks_tl = NULL;
103     }
104     task->return_link = NULL;
105     return task;
106 }
107 #endif
108
109 /* ----------------------------------------------------------------------------
110  * Initialisation
111  *
112  * The Capability is initially marked not free.
113  * ------------------------------------------------------------------------- */
114
115 static void
116 initCapability( Capability *cap, nat i )
117 {
118     nat g;
119
120     cap->no = i;
121     cap->in_haskell        = rtsFalse;
122
123     cap->run_queue_hd      = END_TSO_QUEUE;
124     cap->run_queue_tl      = END_TSO_QUEUE;
125
126 #if defined(THREADED_RTS)
127     initMutex(&cap->lock);
128     cap->running_task      = NULL; // indicates cap is free
129     cap->spare_workers     = NULL;
130     cap->suspended_ccalling_tasks = NULL;
131     cap->returning_tasks_hd = NULL;
132     cap->returning_tasks_tl = NULL;
133 #endif
134
135     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
136     cap->f.stgGCFun        = (F_)__stg_gc_fun;
137
138     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
139                                      RtsFlags.GcFlags.generations,
140                                      "initCapability");
141
142     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
143         cap->mut_lists[g] = NULL;
144     }
145
146     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
147     cap->free_trec_chunks = END_STM_CHUNK_LIST;
148     cap->free_trec_headers = NO_TREC;
149     cap->transaction_tokens = 0;
150 }
151
152 /* ---------------------------------------------------------------------------
153  * Function:  initCapabilities()
154  *
155  * Purpose:   set up the Capability handling. For the SMP build,
156  *            we keep a table of them, the size of which is
157  *            controlled by the user via the RTS flag -N.
158  *
159  * ------------------------------------------------------------------------- */
160 void
161 initCapabilities( void )
162 {
163 #if defined(SMP)
164     nat i,n;
165
166     n_capabilities = n = RtsFlags.ParFlags.nNodes;
167     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
168
169     for (i = 0; i < n; i++) {
170         initCapability(&capabilities[i], i);
171     }
172
173     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
174 #else
175     n_capabilities = 1;
176     capabilities = &MainCapability;
177     initCapability(&MainCapability, 0);
178 #endif
179
180     // There are no free capabilities to begin with.  We will start
181     // a worker Task to each Capability, which will quickly put the
182     // Capability on the free list when it finds nothing to do.
183     last_free_capability = &capabilities[0];
184 }
185
186 /* ----------------------------------------------------------------------------
187  * Give a Capability to a Task.  The task must currently be sleeping
188  * on its condition variable.
189  *
190  * Requires cap->lock (modifies cap->running_task).
191  *
192  * When migrating a Task, the migrater must take task->lock before
193  * modifying task->cap, to synchronise with the waking up Task.
194  * Additionally, the migrater should own the Capability (when
195  * migrating the run queue), or cap->lock (when migrating
196  * returning_workers).
197  *
198  * ------------------------------------------------------------------------- */
199
200 #if defined(THREADED_RTS)
201 STATIC_INLINE void
202 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
203 {
204     ASSERT_LOCK_HELD(&cap->lock);
205     ASSERT(task->cap == cap);
206     IF_DEBUG(scheduler,
207              sched_belch("passing capability %d to %s %p",
208                          cap->no, task->tso ? "bound task" : "worker",
209                          (void *)task->id));
210     ACQUIRE_LOCK(&task->lock);
211     task->wakeup = rtsTrue;
212     // the wakeup flag is needed because signalCondition() doesn't
213     // flag the condition if the thread is already runniing, but we want
214     // it to be sticky.
215     signalCondition(&task->cond);
216     RELEASE_LOCK(&task->lock);
217 }
218 #endif
219
220 /* ----------------------------------------------------------------------------
221  * Function:  releaseCapability(Capability*)
222  *
223  * Purpose:   Letting go of a capability. Causes a
224  *            'returning worker' thread or a 'waiting worker'
225  *            to wake up, in that order.
226  * ------------------------------------------------------------------------- */
227
228 #if defined(THREADED_RTS)
229 void
230 releaseCapability_ (Capability* cap)
231 {
232     Task *task;
233
234     task = cap->running_task;
235
236     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
237
238     cap->running_task = NULL;
239
240     // Check to see whether a worker thread can be given
241     // the go-ahead to return the result of an external call..
242     if (cap->returning_tasks_hd != NULL) {
243         giveCapabilityToTask(cap,cap->returning_tasks_hd);
244         // The Task pops itself from the queue (see waitForReturnCapability())
245         return;
246     }
247
248     // If the next thread on the run queue is a bound thread,
249     // give this Capability to the appropriate Task.
250     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
251         // Make sure we're not about to try to wake ourselves up
252         ASSERT(task != cap->run_queue_hd->bound);
253         task = cap->run_queue_hd->bound;
254         giveCapabilityToTask(cap,task);
255         return;
256     }
257
258     // If we have an unbound thread on the run queue, or if there's
259     // anything else to do, give the Capability to a worker thread.
260     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
261         if (cap->spare_workers) {
262             giveCapabilityToTask(cap,cap->spare_workers);
263             // The worker Task pops itself from the queue;
264             return;
265         }
266
267         // Create a worker thread if we don't have one.  If the system
268         // is interrupted, we only create a worker task if there
269         // are threads that need to be completed.  If the system is
270         // shutting down, we never create a new worker.
271         if (!shutting_down_scheduler) {
272             IF_DEBUG(scheduler,
273                      sched_belch("starting new worker on capability %d", cap->no));
274             startWorkerTask(cap, workerStart);
275             return;
276         }
277     }
278
279     last_free_capability = cap;
280     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
281 }
282
283 void
284 releaseCapability (Capability* cap USED_IF_THREADS)
285 {
286     ACQUIRE_LOCK(&cap->lock);
287     releaseCapability_(cap);
288     RELEASE_LOCK(&cap->lock);
289 }
290
291 static void
292 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
293 {
294     Task *task;
295
296     ACQUIRE_LOCK(&cap->lock);
297
298     task = cap->running_task;
299
300     // If the current task is a worker, save it on the spare_workers
301     // list of this Capability.  A worker can mark itself as stopped,
302     // in which case it is not replaced on the spare_worker queue.
303     // This happens when the system is shutting down (see
304     // Schedule.c:workerStart()).
305     // Also, be careful to check that this task hasn't just exited
306     // Haskell to do a foreign call (task->suspended_tso).
307     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
308         task->next = cap->spare_workers;
309         cap->spare_workers = task;
310     }
311     // Bound tasks just float around attached to their TSOs.
312
313     releaseCapability_(cap);
314
315     RELEASE_LOCK(&cap->lock);
316 }
317 #endif
318
319 /* ----------------------------------------------------------------------------
320  * waitForReturnCapability( Task *task )
321  *
322  * Purpose:  when an OS thread returns from an external call,
323  * it calls waitForReturnCapability() (via Schedule.resumeThread())
324  * to wait for permission to enter the RTS & communicate the
325  * result of the external call back to the Haskell thread that
326  * made it.
327  *
328  * ------------------------------------------------------------------------- */
329 void
330 waitForReturnCapability (Capability **pCap, Task *task)
331 {
332 #if !defined(THREADED_RTS)
333
334     MainCapability.running_task = task;
335     task->cap = &MainCapability;
336     *pCap = &MainCapability;
337
338 #else
339     Capability *cap = *pCap;
340
341     if (cap == NULL) {
342         // Try last_free_capability first
343         cap = last_free_capability;
344         if (!cap->running_task) {
345             nat i;
346             // otherwise, search for a free capability
347             for (i = 0; i < n_capabilities; i++) {
348                 cap = &capabilities[i];
349                 if (!cap->running_task) {
350                     break;
351                 }
352             }
353             // Can't find a free one, use last_free_capability.
354             cap = last_free_capability;
355         }
356
357         // record the Capability as the one this Task is now assocated with.
358         task->cap = cap;
359
360     } else {
361         ASSERT(task->cap == cap);
362     }
363
364     ACQUIRE_LOCK(&cap->lock);
365
366     IF_DEBUG(scheduler,
367              sched_belch("returning; I want capability %d", cap->no));
368
369     if (!cap->running_task) {
370         // It's free; just grab it
371         cap->running_task = task;
372         RELEASE_LOCK(&cap->lock);
373     } else {
374         newReturningTask(cap,task);
375         RELEASE_LOCK(&cap->lock);
376
377         for (;;) {
378             ACQUIRE_LOCK(&task->lock);
379             // task->lock held, cap->lock not held
380             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
381             cap = task->cap;
382             task->wakeup = rtsFalse;
383             RELEASE_LOCK(&task->lock);
384
385             // now check whether we should wake up...
386             ACQUIRE_LOCK(&cap->lock);
387             if (cap->running_task == NULL) {
388                 if (cap->returning_tasks_hd != task) {
389                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
390                     RELEASE_LOCK(&cap->lock);
391                     continue;
392                 }
393                 cap->running_task = task;
394                 popReturningTask(cap);
395                 RELEASE_LOCK(&cap->lock);
396                 break;
397             }
398             RELEASE_LOCK(&cap->lock);
399         }
400
401     }
402
403     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
404
405     IF_DEBUG(scheduler,
406              sched_belch("returning; got capability %d", cap->no));
407
408     *pCap = cap;
409 #endif
410 }
411
412 #if defined(THREADED_RTS)
413 /* ----------------------------------------------------------------------------
414  * yieldCapability
415  * ------------------------------------------------------------------------- */
416
417 void
418 yieldCapability (Capability** pCap, Task *task)
419 {
420     Capability *cap = *pCap;
421
422     // The fast path has no locking, if we don't enter this while loop
423
424     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
425         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
426
427         // We must now release the capability and wait to be woken up
428         // again.
429         task->wakeup = rtsFalse;
430         releaseCapabilityAndQueueWorker(cap);
431
432         for (;;) {
433             ACQUIRE_LOCK(&task->lock);
434             // task->lock held, cap->lock not held
435             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
436             cap = task->cap;
437             task->wakeup = rtsFalse;
438             RELEASE_LOCK(&task->lock);
439
440             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
441             ACQUIRE_LOCK(&cap->lock);
442             if (cap->running_task != NULL) {
443                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
444                 RELEASE_LOCK(&cap->lock);
445                 continue;
446             }
447
448             if (task->tso == NULL) {
449                 ASSERT(cap->spare_workers != NULL);
450                 // if we're not at the front of the queue, release it
451                 // again.  This is unlikely to happen.
452                 if (cap->spare_workers != task) {
453                     giveCapabilityToTask(cap,cap->spare_workers);
454                     RELEASE_LOCK(&cap->lock);
455                     continue;
456                 }
457                 cap->spare_workers = task->next;
458                 task->next = NULL;
459             }
460             cap->running_task = task;
461             RELEASE_LOCK(&cap->lock);
462             break;
463         }
464
465         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
466         ASSERT(cap->running_task == task);
467     }
468
469     *pCap = cap;
470
471     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
472
473     return;
474 }
475
476 /* ----------------------------------------------------------------------------
477  * prodCapabilities
478  *
479  * Used to indicate that the interrupted flag is now set, or some
480  * other global condition that might require waking up a Task on each
481  * Capability.
482  * ------------------------------------------------------------------------- */
483
484 static void
485 prodCapabilities(rtsBool all)
486 {
487     nat i;
488     Capability *cap;
489     Task *task;
490
491     for (i=0; i < n_capabilities; i++) {
492         cap = &capabilities[i];
493         ACQUIRE_LOCK(&cap->lock);
494         if (!cap->running_task) {
495             if (cap->spare_workers) {
496                 task = cap->spare_workers;
497                 ASSERT(!task->stopped);
498                 giveCapabilityToTask(cap,task);
499                 if (!all) {
500                     RELEASE_LOCK(&cap->lock);
501                     return;
502                 }
503             }
504         }
505         RELEASE_LOCK(&cap->lock);
506     }
507 }
508
509 void
510 prodAllCapabilities (void)
511 {
512     prodCapabilities(rtsTrue);
513 }
514
515 /* ----------------------------------------------------------------------------
516  * prodOneCapability
517  *
518  * Like prodAllCapabilities, but we only require a single Task to wake
519  * up in order to service some global event, such as checking for
520  * deadlock after some idle time has passed.
521  * ------------------------------------------------------------------------- */
522
523 void
524 prodOneCapability (void)
525 {
526     prodCapabilities(rtsFalse);
527 }
528
529 /* ----------------------------------------------------------------------------
530  * shutdownCapability
531  *
532  * At shutdown time, we want to let everything exit as cleanly as
533  * possible.  For each capability, we let its run queue drain, and
534  * allow the workers to stop.
535  *
536  * This function should be called when interrupted and
537  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
538  * will exit the scheduler and call taskStop(), and any bound thread
539  * that wakes up will return to its caller.  Runnable threads are
540  * killed.
541  *
542  * ------------------------------------------------------------------------- */
543
544 void
545 shutdownCapability (Capability *cap, Task *task)
546 {
547     nat i;
548
549     ASSERT(interrupted && shutting_down_scheduler);
550
551     task->cap = cap;
552
553     for (i = 0; i < 50; i++) {
554         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
555         ACQUIRE_LOCK(&cap->lock);
556         if (cap->running_task) {
557             RELEASE_LOCK(&cap->lock);
558             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
559             yieldThread();
560             continue;
561         }
562         cap->running_task = task;
563         if (!emptyRunQueue(cap) || cap->spare_workers) {
564             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
565             releaseCapability_(cap); // this will wake up a worker
566             RELEASE_LOCK(&cap->lock);
567             yieldThread();
568             continue;
569         }
570         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
571         RELEASE_LOCK(&cap->lock);
572         break;
573     }
574     // we now have the Capability, its run queue and spare workers
575     // list are both empty.
576 }
577
578 /* ----------------------------------------------------------------------------
579  * tryGrabCapability
580  *
581  * Attempt to gain control of a Capability if it is free.
582  *
583  * ------------------------------------------------------------------------- */
584
585 rtsBool
586 tryGrabCapability (Capability *cap, Task *task)
587 {
588     if (cap->running_task != NULL) return rtsFalse;
589     ACQUIRE_LOCK(&cap->lock);
590     if (cap->running_task != NULL) {
591         RELEASE_LOCK(&cap->lock);
592         return rtsFalse;
593     }
594     task->cap = cap;
595     cap->running_task = task;
596     RELEASE_LOCK(&cap->lock);
597     return rtsTrue;
598 }
599
600
601 #endif /* THREADED_RTS */
602
603