[project @ 2005-11-03 11:02:00 by simonmar]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
25 #include "Schedule.h"
26
27 #if !defined(SMP)
28 Capability MainCapability;     // for non-SMP, we have one global capability
29 #endif
30
31 nat n_capabilities;
32 Capability *capabilities = NULL;
33
34 // Holds the Capability which last became free.  This is used so that
35 // an in-call has a chance of quickly finding a free Capability.
36 // Maintaining a global free list of Capabilities would require global
37 // locking, so we don't do that.
38 Capability *last_free_capability;
39
40 #ifdef SMP
41 #define UNUSED_IF_NOT_SMP
42 #else
43 #define UNUSED_IF_NOT_SMP STG_UNUSED
44 #endif
45
46 #ifdef RTS_USER_SIGNALS
47 #define UNUSED_IF_NOT_THREADS
48 #else
49 #define UNUSED_IF_NOT_THREADS STG_UNUSED
50 #endif
51
52
53 STATIC_INLINE rtsBool
54 globalWorkToDo (void)
55 {
56     return blackholes_need_checking
57         || interrupted
58 #if defined(RTS_USER_SIGNALS)
59         || signals_pending()
60 #endif
61         ;
62 }
63
64 #if defined(THREADED_RTS)
65 STATIC_INLINE rtsBool
66 anyWorkForMe( Capability *cap, Task *task )
67 {
68     // If the run queue is not empty, then we only wake up the guy who
69     // can run the thread at the head, even if there is some other
70     // reason for this task to run (eg. interrupted=rtsTrue).
71     if (!emptyRunQueue(cap)) {
72         if (cap->run_queue_hd->bound == NULL) {
73             return (task->tso == NULL);
74         } else {
75             return (cap->run_queue_hd->bound == task);
76         }
77     }
78     return globalWorkToDo();
79 }
80 #endif
81
82 /* -----------------------------------------------------------------------------
83  * Manage the returning_tasks lists.
84  *
85  * These functions require cap->lock
86  * -------------------------------------------------------------------------- */
87
88 #if defined(THREADED_RTS)
89 STATIC_INLINE void
90 newReturningTask (Capability *cap, Task *task)
91 {
92     ASSERT_LOCK_HELD(&cap->lock);
93     ASSERT(task->return_link == NULL);
94     if (cap->returning_tasks_hd) {
95         ASSERT(cap->returning_tasks_tl->return_link == NULL);
96         cap->returning_tasks_tl->return_link = task;
97     } else {
98         cap->returning_tasks_hd = task;
99     }
100     cap->returning_tasks_tl = task;
101 }
102
103 STATIC_INLINE Task *
104 popReturningTask (Capability *cap)
105 {
106     ASSERT_LOCK_HELD(&cap->lock);
107     Task *task;
108     task = cap->returning_tasks_hd;
109     ASSERT(task);
110     cap->returning_tasks_hd = task->return_link;
111     if (!cap->returning_tasks_hd) {
112         cap->returning_tasks_tl = NULL;
113     }
114     task->return_link = NULL;
115     return task;
116 }
117 #endif
118
119 /* ----------------------------------------------------------------------------
120  * Initialisation
121  *
122  * The Capability is initially marked not free.
123  * ------------------------------------------------------------------------- */
124
125 static void
126 initCapability( Capability *cap, nat i )
127 {
128     nat g;
129
130     cap->no = i;
131     cap->in_haskell        = rtsFalse;
132
133     cap->run_queue_hd      = END_TSO_QUEUE;
134     cap->run_queue_tl      = END_TSO_QUEUE;
135
136 #if defined(THREADED_RTS)
137     initMutex(&cap->lock);
138     cap->running_task      = NULL; // indicates cap is free
139     cap->spare_workers     = NULL;
140     cap->suspended_ccalling_tasks = NULL;
141     cap->returning_tasks_hd = NULL;
142     cap->returning_tasks_tl = NULL;
143 #endif
144
145     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
146     cap->f.stgGCFun        = (F_)__stg_gc_fun;
147
148     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
149                                      RtsFlags.GcFlags.generations,
150                                      "initCapability");
151
152     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
153         cap->mut_lists[g] = NULL;
154     }
155 }
156
157 /* ---------------------------------------------------------------------------
158  * Function:  initCapabilities()
159  *
160  * Purpose:   set up the Capability handling. For the SMP build,
161  *            we keep a table of them, the size of which is
162  *            controlled by the user via the RTS flag -N.
163  *
164  * ------------------------------------------------------------------------- */
165 void
166 initCapabilities( void )
167 {
168 #if defined(SMP)
169     nat i,n;
170
171     n_capabilities = n = RtsFlags.ParFlags.nNodes;
172     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
173
174     for (i = 0; i < n; i++) {
175         initCapability(&capabilities[i], i);
176     }
177
178     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
179 #else
180     n_capabilities = 1;
181     capabilities = &MainCapability;
182     initCapability(&MainCapability, 0);
183 #endif
184
185     // There are no free capabilities to begin with.  We will start
186     // a worker Task to each Capability, which will quickly put the
187     // Capability on the free list when it finds nothing to do.
188     last_free_capability = &capabilities[0];
189 }
190
191 /* ----------------------------------------------------------------------------
192  * Give a Capability to a Task.  The task must currently be sleeping
193  * on its condition variable.
194  *
195  * Requires cap->lock (modifies cap->running_task).
196  *
197  * When migrating a Task, the migrater must take task->lock before
198  * modifying task->cap, to synchronise with the waking up Task.
199  * Additionally, the migrater should own the Capability (when
200  * migrating the run queue), or cap->lock (when migrating
201  * returning_workers).
202  *
203  * ------------------------------------------------------------------------- */
204
205 #if defined(THREADED_RTS)
206 STATIC_INLINE void
207 giveCapabilityToTask (Capability *cap, Task *task)
208 {
209     ASSERT_LOCK_HELD(&cap->lock);
210     ASSERT(task->cap == cap);
211     // We are not modifying task->cap, so we do not need to take task->lock.
212     IF_DEBUG(scheduler,
213              sched_belch("passing capability %d to %s %p",
214                          cap->no, task->tso ? "bound task" : "worker",
215                          (void *)task->id));
216     ACQUIRE_LOCK(&task->lock);
217     task->wakeup = rtsTrue;
218     // the wakeup flag is needed because signalCondition() doesn't
219     // flag the condition if the thread is already runniing, but we want
220     // it to be sticky.
221     signalCondition(&task->cond);
222     RELEASE_LOCK(&task->lock);
223 }
224 #endif
225
226 /* ----------------------------------------------------------------------------
227  * Function:  releaseCapability(Capability*)
228  *
229  * Purpose:   Letting go of a capability. Causes a
230  *            'returning worker' thread or a 'waiting worker'
231  *            to wake up, in that order.
232  * ------------------------------------------------------------------------- */
233
234 #if defined(THREADED_RTS)
235 void
236 releaseCapability_ (Capability* cap)
237 {
238     Task *task;
239
240     task = cap->running_task;
241
242     ASSERT_CAPABILITY_INVARIANTS(cap,task);
243
244     cap->running_task = NULL;
245
246     // Check to see whether a worker thread can be given
247     // the go-ahead to return the result of an external call..
248     if (cap->returning_tasks_hd != NULL) {
249         giveCapabilityToTask(cap,cap->returning_tasks_hd);
250         // The Task pops itself from the queue (see waitForReturnCapability())
251         return;
252     }
253
254     // If the next thread on the run queue is a bound thread,
255     // give this Capability to the appropriate Task.
256     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
257         // Make sure we're not about to try to wake ourselves up
258         ASSERT(task != cap->run_queue_hd->bound);
259         task = cap->run_queue_hd->bound;
260         giveCapabilityToTask(cap,task);
261         return;
262     }
263
264     // If we have an unbound thread on the run queue, or if there's
265     // anything else to do, give the Capability to a worker thread.
266     if (!emptyRunQueue(cap) || globalWorkToDo()) {
267         if (cap->spare_workers) {
268             giveCapabilityToTask(cap,cap->spare_workers);
269             // The worker Task pops itself from the queue;
270             return;
271         }
272
273         // Create a worker thread if we don't have one.  If the system
274         // is interrupted, we only create a worker task if there
275         // are threads that need to be completed.  If the system is
276         // shutting down, we never create a new worker.
277         if (!shutting_down_scheduler) {
278             IF_DEBUG(scheduler,
279                      sched_belch("starting new worker on capability %d", cap->no));
280             startWorkerTask(cap, workerStart);
281             return;
282         }
283     }
284
285     last_free_capability = cap;
286     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
287 }
288
289 void
290 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
291 {
292     ACQUIRE_LOCK(&cap->lock);
293     releaseCapability_(cap);
294     RELEASE_LOCK(&cap->lock);
295 }
296
297 static void
298 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
299 {
300     Task *task;
301
302     ACQUIRE_LOCK(&cap->lock);
303
304     task = cap->running_task;
305
306     // If the current task is a worker, save it on the spare_workers
307     // list of this Capability.  A worker can mark itself as stopped,
308     // in which case it is not replaced on the spare_worker queue.
309     // This happens when the system is shutting down (see
310     // Schedule.c:workerStart()).
311     // Also, be careful to check that this task hasn't just exited
312     // Haskell to do a foreign call (task->suspended_tso).
313     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
314         task->next = cap->spare_workers;
315         cap->spare_workers = task;
316     }
317     // Bound tasks just float around attached to their TSOs.
318
319     releaseCapability_(cap);
320
321     RELEASE_LOCK(&cap->lock);
322 }
323 #endif
324
325 /* ----------------------------------------------------------------------------
326  * waitForReturnCapability( Task *task )
327  *
328  * Purpose:  when an OS thread returns from an external call,
329  * it calls waitForReturnCapability() (via Schedule.resumeThread())
330  * to wait for permission to enter the RTS & communicate the
331  * result of the external call back to the Haskell thread that
332  * made it.
333  *
334  * ------------------------------------------------------------------------- */
335 void
336 waitForReturnCapability (Capability **pCap,
337                          Task *task UNUSED_IF_NOT_THREADS)
338 {
339 #if !defined(THREADED_RTS)
340
341     MainCapability.running_task = task;
342     task->cap = &MainCapability;
343     *pCap = &MainCapability;
344
345 #else
346     Capability *cap = *pCap;
347
348     if (cap == NULL) {
349         // Try last_free_capability first
350         cap = last_free_capability;
351         if (!cap->running_task) {
352             nat i;
353             // otherwise, search for a free capability
354             for (i = 0; i < n_capabilities; i++) {
355                 cap = &capabilities[i];
356                 if (!cap->running_task) {
357                     break;
358                 }
359             }
360             // Can't find a free one, use last_free_capability.
361             cap = last_free_capability;
362         }
363
364         // record the Capability as the one this Task is now assocated with.
365         task->cap = cap;
366
367     } else {
368         ASSERT(task->cap == cap);
369     }
370
371     ACQUIRE_LOCK(&cap->lock);
372
373     IF_DEBUG(scheduler,
374              sched_belch("returning; I want capability %d", cap->no));
375
376     if (!cap->running_task) {
377         // It's free; just grab it
378         cap->running_task = task;
379         RELEASE_LOCK(&cap->lock);
380     } else {
381         newReturningTask(cap,task);
382         RELEASE_LOCK(&cap->lock);
383
384         for (;;) {
385             ACQUIRE_LOCK(&task->lock);
386             // task->lock held, cap->lock not held
387             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
388             cap = task->cap;
389             task->wakeup = rtsFalse;
390             RELEASE_LOCK(&task->lock);
391
392             // now check whether we should wake up...
393             ACQUIRE_LOCK(&cap->lock);
394             if (cap->running_task == NULL) {
395                 if (cap->returning_tasks_hd != task) {
396                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
397                     RELEASE_LOCK(&cap->lock);
398                     continue;
399                 }
400                 cap->running_task = task;
401                 popReturningTask(cap);
402                 RELEASE_LOCK(&cap->lock);
403                 break;
404             }
405             RELEASE_LOCK(&cap->lock);
406         }
407
408     }
409
410     ASSERT_CAPABILITY_INVARIANTS(cap,task);
411
412     IF_DEBUG(scheduler,
413              sched_belch("returning; got capability %d", cap->no));
414
415     *pCap = cap;
416 #endif
417 }
418
419 #if defined(THREADED_RTS)
420 /* ----------------------------------------------------------------------------
421  * yieldCapability
422  * ------------------------------------------------------------------------- */
423
424 void
425 yieldCapability (Capability** pCap, Task *task)
426 {
427     Capability *cap = *pCap;
428
429     // The fast path; no locking
430     if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
431         return;
432
433     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
434         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
435
436         // We must now release the capability and wait to be woken up
437         // again.
438         releaseCapabilityAndQueueWorker(cap);
439
440         for (;;) {
441             ACQUIRE_LOCK(&task->lock);
442             // task->lock held, cap->lock not held
443             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
444             cap = task->cap;
445             task->wakeup = rtsFalse;
446             RELEASE_LOCK(&task->lock);
447
448             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
449             ACQUIRE_LOCK(&cap->lock);
450             if (cap->running_task != NULL) {
451                 RELEASE_LOCK(&cap->lock);
452                 continue;
453             }
454
455             if (task->tso == NULL) {
456                 ASSERT(cap->spare_workers != NULL);
457                 // if we're not at the front of the queue, release it
458                 // again.  This is unlikely to happen.
459                 if (cap->spare_workers != task) {
460                     giveCapabilityToTask(cap,cap->spare_workers);
461                     RELEASE_LOCK(&cap->lock);
462                     continue;
463                 }
464                 cap->spare_workers = task->next;
465                 task->next = NULL;
466             }
467             cap->running_task = task;
468             RELEASE_LOCK(&cap->lock);
469             break;
470         }
471
472         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
473         ASSERT(cap->running_task == task);
474     }
475
476     *pCap = cap;
477
478     ASSERT_CAPABILITY_INVARIANTS(cap,task);
479
480     return;
481 }
482
483 /* ----------------------------------------------------------------------------
484  * prodCapabilities
485  *
486  * Used to indicate that the interrupted flag is now set, or some
487  * other global condition that might require waking up a Task on each
488  * Capability.
489  * ------------------------------------------------------------------------- */
490
491 static void
492 prodCapabilities(rtsBool all)
493 {
494     nat i;
495     Capability *cap;
496     Task *task;
497
498     for (i=0; i < n_capabilities; i++) {
499         cap = &capabilities[i];
500         ACQUIRE_LOCK(&cap->lock);
501         if (!cap->running_task) {
502             if (cap->spare_workers) {
503                 task = cap->spare_workers;
504                 ASSERT(!task->stopped);
505                 giveCapabilityToTask(cap,task);
506                 if (!all) {
507                     RELEASE_LOCK(&cap->lock);
508                     return;
509                 }
510             }
511         }
512         RELEASE_LOCK(&cap->lock);
513     }
514 }
515
516 void
517 prodAllCapabilities (void)
518 {
519     prodCapabilities(rtsTrue);
520 }
521
522 /* ----------------------------------------------------------------------------
523  * prodOneCapability
524  *
525  * Like prodAllCapabilities, but we only require a single Task to wake
526  * up in order to service some global event, such as checking for
527  * deadlock after some idle time has passed.
528  * ------------------------------------------------------------------------- */
529
530 void
531 prodOneCapability (void)
532 {
533     prodCapabilities(rtsFalse);
534 }
535
536 /* ----------------------------------------------------------------------------
537  * shutdownCapability
538  *
539  * At shutdown time, we want to let everything exit as cleanly as
540  * possible.  For each capability, we let its run queue drain, and
541  * allow the workers to stop.
542  *
543  * This function should be called when interrupted and
544  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
545  * will exit the scheduler and call taskStop(), and any bound thread
546  * that wakes up will return to its caller.  Runnable threads are
547  * killed.
548  *
549  * ------------------------------------------------------------------------- */
550
551 void
552 shutdownCapability (Capability *cap, Task *task)
553 {
554     nat i;
555
556     ASSERT(interrupted && shutting_down_scheduler);
557
558     task->cap = cap;
559
560     for (i = 0; i < 50; i++) {
561         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
562         ACQUIRE_LOCK(&cap->lock);
563         if (cap->running_task) {
564             RELEASE_LOCK(&cap->lock);
565             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
566             yieldThread();
567             continue;
568         }
569         cap->running_task = task;
570         if (!emptyRunQueue(cap) || cap->spare_workers) {
571             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
572             releaseCapability_(cap); // this will wake up a worker
573             RELEASE_LOCK(&cap->lock);
574             yieldThread();
575             continue;
576         }
577         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
578         RELEASE_LOCK(&cap->lock);
579         break;
580     }
581     // we now have the Capability, its run queue and spare workers
582     // list are both empty.
583 }
584
585 /* ----------------------------------------------------------------------------
586  * tryGrabCapability
587  *
588  * Attempt to gain control of a Capability if it is free.
589  *
590  * ------------------------------------------------------------------------- */
591
592 rtsBool
593 tryGrabCapability (Capability *cap, Task *task)
594 {
595     if (cap->running_task != NULL) return rtsFalse;
596     ACQUIRE_LOCK(&cap->lock);
597     if (cap->running_task != NULL) {
598         RELEASE_LOCK(&cap->lock);
599         return rtsFalse;
600     }
601     task->cap = cap;
602     cap->running_task = task;
603     RELEASE_LOCK(&cap->lock);
604     return rtsTrue;
605 }
606
607
608 #endif /* THREADED_RTS */
609
610