[project @ 2005-11-18 15:24:12 by simonmar]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
25 #include "Schedule.h"
26 #include "Sparks.h"
27
28 #if !defined(SMP)
29 Capability MainCapability;     // for non-SMP, we have one global capability
30 #endif
31
32 nat n_capabilities;
33 Capability *capabilities = NULL;
34
35 // Holds the Capability which last became free.  This is used so that
36 // an in-call has a chance of quickly finding a free Capability.
37 // Maintaining a global free list of Capabilities would require global
38 // locking, so we don't do that.
39 Capability *last_free_capability;
40
41 #ifdef SMP
42 #define UNUSED_IF_NOT_SMP
43 #else
44 #define UNUSED_IF_NOT_SMP STG_UNUSED
45 #endif
46
47 #ifdef RTS_USER_SIGNALS
48 #define UNUSED_IF_NOT_THREADS
49 #else
50 #define UNUSED_IF_NOT_THREADS STG_UNUSED
51 #endif
52
53
54 STATIC_INLINE rtsBool
55 globalWorkToDo (void)
56 {
57     return blackholes_need_checking
58         || interrupted
59 #if defined(RTS_USER_SIGNALS)
60         || signals_pending()
61 #endif
62         ;
63 }
64
65 #if defined(THREADED_RTS)
66 STATIC_INLINE rtsBool
67 anyWorkForMe( Capability *cap, Task *task )
68 {
69     // If the run queue is not empty, then we only wake up the guy who
70     // can run the thread at the head, even if there is some other
71     // reason for this task to run (eg. interrupted=rtsTrue).
72     if (!emptyRunQueue(cap)) {
73         if (cap->run_queue_hd->bound == NULL) {
74             return (task->tso == NULL);
75         } else {
76             return (cap->run_queue_hd->bound == task);
77         }
78     } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
79         return rtsTrue;
80     }
81     return globalWorkToDo();
82 }
83 #endif
84
85 /* -----------------------------------------------------------------------------
86  * Manage the returning_tasks lists.
87  *
88  * These functions require cap->lock
89  * -------------------------------------------------------------------------- */
90
91 #if defined(THREADED_RTS)
92 STATIC_INLINE void
93 newReturningTask (Capability *cap, Task *task)
94 {
95     ASSERT_LOCK_HELD(&cap->lock);
96     ASSERT(task->return_link == NULL);
97     if (cap->returning_tasks_hd) {
98         ASSERT(cap->returning_tasks_tl->return_link == NULL);
99         cap->returning_tasks_tl->return_link = task;
100     } else {
101         cap->returning_tasks_hd = task;
102     }
103     cap->returning_tasks_tl = task;
104 }
105
106 STATIC_INLINE Task *
107 popReturningTask (Capability *cap)
108 {
109     ASSERT_LOCK_HELD(&cap->lock);
110     Task *task;
111     task = cap->returning_tasks_hd;
112     ASSERT(task);
113     cap->returning_tasks_hd = task->return_link;
114     if (!cap->returning_tasks_hd) {
115         cap->returning_tasks_tl = NULL;
116     }
117     task->return_link = NULL;
118     return task;
119 }
120 #endif
121
122 /* ----------------------------------------------------------------------------
123  * Initialisation
124  *
125  * The Capability is initially marked not free.
126  * ------------------------------------------------------------------------- */
127
128 static void
129 initCapability( Capability *cap, nat i )
130 {
131     nat g;
132
133     cap->no = i;
134     cap->in_haskell        = rtsFalse;
135
136     cap->run_queue_hd      = END_TSO_QUEUE;
137     cap->run_queue_tl      = END_TSO_QUEUE;
138
139 #if defined(THREADED_RTS)
140     initMutex(&cap->lock);
141     cap->running_task      = NULL; // indicates cap is free
142     cap->spare_workers     = NULL;
143     cap->suspended_ccalling_tasks = NULL;
144     cap->returning_tasks_hd = NULL;
145     cap->returning_tasks_tl = NULL;
146 #endif
147
148     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
149     cap->f.stgGCFun        = (F_)__stg_gc_fun;
150
151     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
152                                      RtsFlags.GcFlags.generations,
153                                      "initCapability");
154
155     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
156         cap->mut_lists[g] = NULL;
157     }
158 }
159
160 /* ---------------------------------------------------------------------------
161  * Function:  initCapabilities()
162  *
163  * Purpose:   set up the Capability handling. For the SMP build,
164  *            we keep a table of them, the size of which is
165  *            controlled by the user via the RTS flag -N.
166  *
167  * ------------------------------------------------------------------------- */
168 void
169 initCapabilities( void )
170 {
171 #if defined(SMP)
172     nat i,n;
173
174     n_capabilities = n = RtsFlags.ParFlags.nNodes;
175     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
176
177     for (i = 0; i < n; i++) {
178         initCapability(&capabilities[i], i);
179     }
180
181     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
182 #else
183     n_capabilities = 1;
184     capabilities = &MainCapability;
185     initCapability(&MainCapability, 0);
186 #endif
187
188     // There are no free capabilities to begin with.  We will start
189     // a worker Task to each Capability, which will quickly put the
190     // Capability on the free list when it finds nothing to do.
191     last_free_capability = &capabilities[0];
192 }
193
194 /* ----------------------------------------------------------------------------
195  * Give a Capability to a Task.  The task must currently be sleeping
196  * on its condition variable.
197  *
198  * Requires cap->lock (modifies cap->running_task).
199  *
200  * When migrating a Task, the migrater must take task->lock before
201  * modifying task->cap, to synchronise with the waking up Task.
202  * Additionally, the migrater should own the Capability (when
203  * migrating the run queue), or cap->lock (when migrating
204  * returning_workers).
205  *
206  * ------------------------------------------------------------------------- */
207
208 #if defined(THREADED_RTS)
209 STATIC_INLINE void
210 giveCapabilityToTask (Capability *cap, Task *task)
211 {
212     ASSERT_LOCK_HELD(&cap->lock);
213     ASSERT(task->cap == cap);
214     // We are not modifying task->cap, so we do not need to take task->lock.
215     IF_DEBUG(scheduler,
216              sched_belch("passing capability %d to %s %p",
217                          cap->no, task->tso ? "bound task" : "worker",
218                          (void *)task->id));
219     ACQUIRE_LOCK(&task->lock);
220     task->wakeup = rtsTrue;
221     // the wakeup flag is needed because signalCondition() doesn't
222     // flag the condition if the thread is already runniing, but we want
223     // it to be sticky.
224     signalCondition(&task->cond);
225     RELEASE_LOCK(&task->lock);
226 }
227 #endif
228
229 /* ----------------------------------------------------------------------------
230  * Function:  releaseCapability(Capability*)
231  *
232  * Purpose:   Letting go of a capability. Causes a
233  *            'returning worker' thread or a 'waiting worker'
234  *            to wake up, in that order.
235  * ------------------------------------------------------------------------- */
236
237 #if defined(THREADED_RTS)
238 void
239 releaseCapability_ (Capability* cap)
240 {
241     Task *task;
242
243     task = cap->running_task;
244
245     ASSERT_CAPABILITY_INVARIANTS(cap,task);
246
247     cap->running_task = NULL;
248
249     // Check to see whether a worker thread can be given
250     // the go-ahead to return the result of an external call..
251     if (cap->returning_tasks_hd != NULL) {
252         giveCapabilityToTask(cap,cap->returning_tasks_hd);
253         // The Task pops itself from the queue (see waitForReturnCapability())
254         return;
255     }
256
257     // If the next thread on the run queue is a bound thread,
258     // give this Capability to the appropriate Task.
259     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
260         // Make sure we're not about to try to wake ourselves up
261         ASSERT(task != cap->run_queue_hd->bound);
262         task = cap->run_queue_hd->bound;
263         giveCapabilityToTask(cap,task);
264         return;
265     }
266
267     // If we have an unbound thread on the run queue, or if there's
268     // anything else to do, give the Capability to a worker thread.
269     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
270         if (cap->spare_workers) {
271             giveCapabilityToTask(cap,cap->spare_workers);
272             // The worker Task pops itself from the queue;
273             return;
274         }
275
276         // Create a worker thread if we don't have one.  If the system
277         // is interrupted, we only create a worker task if there
278         // are threads that need to be completed.  If the system is
279         // shutting down, we never create a new worker.
280         if (!shutting_down_scheduler) {
281             IF_DEBUG(scheduler,
282                      sched_belch("starting new worker on capability %d", cap->no));
283             startWorkerTask(cap, workerStart);
284             return;
285         }
286     }
287
288     last_free_capability = cap;
289     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
290 }
291
292 void
293 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
294 {
295     ACQUIRE_LOCK(&cap->lock);
296     releaseCapability_(cap);
297     RELEASE_LOCK(&cap->lock);
298 }
299
300 static void
301 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
302 {
303     Task *task;
304
305     ACQUIRE_LOCK(&cap->lock);
306
307     task = cap->running_task;
308
309     // If the current task is a worker, save it on the spare_workers
310     // list of this Capability.  A worker can mark itself as stopped,
311     // in which case it is not replaced on the spare_worker queue.
312     // This happens when the system is shutting down (see
313     // Schedule.c:workerStart()).
314     // Also, be careful to check that this task hasn't just exited
315     // Haskell to do a foreign call (task->suspended_tso).
316     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
317         task->next = cap->spare_workers;
318         cap->spare_workers = task;
319     }
320     // Bound tasks just float around attached to their TSOs.
321
322     releaseCapability_(cap);
323
324     RELEASE_LOCK(&cap->lock);
325 }
326 #endif
327
328 /* ----------------------------------------------------------------------------
329  * waitForReturnCapability( Task *task )
330  *
331  * Purpose:  when an OS thread returns from an external call,
332  * it calls waitForReturnCapability() (via Schedule.resumeThread())
333  * to wait for permission to enter the RTS & communicate the
334  * result of the external call back to the Haskell thread that
335  * made it.
336  *
337  * ------------------------------------------------------------------------- */
338 void
339 waitForReturnCapability (Capability **pCap,
340                          Task *task UNUSED_IF_NOT_THREADS)
341 {
342 #if !defined(THREADED_RTS)
343
344     MainCapability.running_task = task;
345     task->cap = &MainCapability;
346     *pCap = &MainCapability;
347
348 #else
349     Capability *cap = *pCap;
350
351     if (cap == NULL) {
352         // Try last_free_capability first
353         cap = last_free_capability;
354         if (!cap->running_task) {
355             nat i;
356             // otherwise, search for a free capability
357             for (i = 0; i < n_capabilities; i++) {
358                 cap = &capabilities[i];
359                 if (!cap->running_task) {
360                     break;
361                 }
362             }
363             // Can't find a free one, use last_free_capability.
364             cap = last_free_capability;
365         }
366
367         // record the Capability as the one this Task is now assocated with.
368         task->cap = cap;
369
370     } else {
371         ASSERT(task->cap == cap);
372     }
373
374     ACQUIRE_LOCK(&cap->lock);
375
376     IF_DEBUG(scheduler,
377              sched_belch("returning; I want capability %d", cap->no));
378
379     if (!cap->running_task) {
380         // It's free; just grab it
381         cap->running_task = task;
382         RELEASE_LOCK(&cap->lock);
383     } else {
384         newReturningTask(cap,task);
385         RELEASE_LOCK(&cap->lock);
386
387         for (;;) {
388             ACQUIRE_LOCK(&task->lock);
389             // task->lock held, cap->lock not held
390             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
391             cap = task->cap;
392             task->wakeup = rtsFalse;
393             RELEASE_LOCK(&task->lock);
394
395             // now check whether we should wake up...
396             ACQUIRE_LOCK(&cap->lock);
397             if (cap->running_task == NULL) {
398                 if (cap->returning_tasks_hd != task) {
399                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
400                     RELEASE_LOCK(&cap->lock);
401                     continue;
402                 }
403                 cap->running_task = task;
404                 popReturningTask(cap);
405                 RELEASE_LOCK(&cap->lock);
406                 break;
407             }
408             RELEASE_LOCK(&cap->lock);
409         }
410
411     }
412
413     ASSERT_CAPABILITY_INVARIANTS(cap,task);
414
415     IF_DEBUG(scheduler,
416              sched_belch("returning; got capability %d", cap->no));
417
418     *pCap = cap;
419 #endif
420 }
421
422 #if defined(THREADED_RTS)
423 /* ----------------------------------------------------------------------------
424  * yieldCapability
425  * ------------------------------------------------------------------------- */
426
427 void
428 yieldCapability (Capability** pCap, Task *task)
429 {
430     Capability *cap = *pCap;
431
432     // The fast path; no locking
433     if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
434         return;
435
436     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
437         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
438
439         // We must now release the capability and wait to be woken up
440         // again.
441         releaseCapabilityAndQueueWorker(cap);
442
443         for (;;) {
444             ACQUIRE_LOCK(&task->lock);
445             // task->lock held, cap->lock not held
446             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
447             cap = task->cap;
448             task->wakeup = rtsFalse;
449             RELEASE_LOCK(&task->lock);
450
451             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
452             ACQUIRE_LOCK(&cap->lock);
453             if (cap->running_task != NULL) {
454                 RELEASE_LOCK(&cap->lock);
455                 continue;
456             }
457
458             if (task->tso == NULL) {
459                 ASSERT(cap->spare_workers != NULL);
460                 // if we're not at the front of the queue, release it
461                 // again.  This is unlikely to happen.
462                 if (cap->spare_workers != task) {
463                     giveCapabilityToTask(cap,cap->spare_workers);
464                     RELEASE_LOCK(&cap->lock);
465                     continue;
466                 }
467                 cap->spare_workers = task->next;
468                 task->next = NULL;
469             }
470             cap->running_task = task;
471             RELEASE_LOCK(&cap->lock);
472             break;
473         }
474
475         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
476         ASSERT(cap->running_task == task);
477     }
478
479     *pCap = cap;
480
481     ASSERT_CAPABILITY_INVARIANTS(cap,task);
482
483     return;
484 }
485
486 /* ----------------------------------------------------------------------------
487  * prodCapabilities
488  *
489  * Used to indicate that the interrupted flag is now set, or some
490  * other global condition that might require waking up a Task on each
491  * Capability.
492  * ------------------------------------------------------------------------- */
493
494 static void
495 prodCapabilities(rtsBool all)
496 {
497     nat i;
498     Capability *cap;
499     Task *task;
500
501     for (i=0; i < n_capabilities; i++) {
502         cap = &capabilities[i];
503         ACQUIRE_LOCK(&cap->lock);
504         if (!cap->running_task) {
505             if (cap->spare_workers) {
506                 task = cap->spare_workers;
507                 ASSERT(!task->stopped);
508                 giveCapabilityToTask(cap,task);
509                 if (!all) {
510                     RELEASE_LOCK(&cap->lock);
511                     return;
512                 }
513             }
514         }
515         RELEASE_LOCK(&cap->lock);
516     }
517 }
518
519 void
520 prodAllCapabilities (void)
521 {
522     prodCapabilities(rtsTrue);
523 }
524
525 /* ----------------------------------------------------------------------------
526  * prodOneCapability
527  *
528  * Like prodAllCapabilities, but we only require a single Task to wake
529  * up in order to service some global event, such as checking for
530  * deadlock after some idle time has passed.
531  * ------------------------------------------------------------------------- */
532
533 void
534 prodOneCapability (void)
535 {
536     prodCapabilities(rtsFalse);
537 }
538
539 /* ----------------------------------------------------------------------------
540  * shutdownCapability
541  *
542  * At shutdown time, we want to let everything exit as cleanly as
543  * possible.  For each capability, we let its run queue drain, and
544  * allow the workers to stop.
545  *
546  * This function should be called when interrupted and
547  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
548  * will exit the scheduler and call taskStop(), and any bound thread
549  * that wakes up will return to its caller.  Runnable threads are
550  * killed.
551  *
552  * ------------------------------------------------------------------------- */
553
554 void
555 shutdownCapability (Capability *cap, Task *task)
556 {
557     nat i;
558
559     ASSERT(interrupted && shutting_down_scheduler);
560
561     task->cap = cap;
562
563     for (i = 0; i < 50; i++) {
564         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
565         ACQUIRE_LOCK(&cap->lock);
566         if (cap->running_task) {
567             RELEASE_LOCK(&cap->lock);
568             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
569             yieldThread();
570             continue;
571         }
572         cap->running_task = task;
573         if (!emptyRunQueue(cap) || cap->spare_workers) {
574             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
575             releaseCapability_(cap); // this will wake up a worker
576             RELEASE_LOCK(&cap->lock);
577             yieldThread();
578             continue;
579         }
580         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
581         RELEASE_LOCK(&cap->lock);
582         break;
583     }
584     // we now have the Capability, its run queue and spare workers
585     // list are both empty.
586 }
587
588 /* ----------------------------------------------------------------------------
589  * tryGrabCapability
590  *
591  * Attempt to gain control of a Capability if it is free.
592  *
593  * ------------------------------------------------------------------------- */
594
595 rtsBool
596 tryGrabCapability (Capability *cap, Task *task)
597 {
598     if (cap->running_task != NULL) return rtsFalse;
599     ACQUIRE_LOCK(&cap->lock);
600     if (cap->running_task != NULL) {
601         RELEASE_LOCK(&cap->lock);
602         return rtsFalse;
603     }
604     task->cap = cap;
605     cap->running_task = task;
606     RELEASE_LOCK(&cap->lock);
607     return rtsTrue;
608 }
609
610
611 #endif /* THREADED_RTS */
612
613