[project @ 2005-10-29 05:17:57 by krasimir]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
25 #include "Schedule.h"
26
27 #if !defined(SMP)
28 Capability MainCapability;     // for non-SMP, we have one global capability
29 #endif
30
31 nat n_capabilities;
32 Capability *capabilities = NULL;
33
34 // Holds the Capability which last became free.  This is used so that
35 // an in-call has a chance of quickly finding a free Capability.
36 // Maintaining a global free list of Capabilities would require global
37 // locking, so we don't do that.
38 Capability *last_free_capability;
39
40 #ifdef SMP
41 #define UNUSED_IF_NOT_SMP
42 #else
43 #define UNUSED_IF_NOT_SMP STG_UNUSED
44 #endif
45
46 #ifdef RTS_USER_SIGNALS
47 #define UNUSED_IF_NOT_THREADS
48 #else
49 #define UNUSED_IF_NOT_THREADS STG_UNUSED
50 #endif
51
52
53 STATIC_INLINE rtsBool
54 globalWorkToDo (void)
55 {
56     return blackholes_need_checking
57         || interrupted
58 #if defined(RTS_USER_SIGNALS)
59         || signals_pending()
60 #endif
61         ;
62 }
63
64 #if defined(THREADED_RTS)
65 STATIC_INLINE rtsBool
66 anyWorkForMe( Capability *cap, Task *task )
67 {
68     // If the run queue is not empty, then we only wake up the guy who
69     // can run the thread at the head, even if there is some other
70     // reason for this task to run (eg. interrupted=rtsTrue).
71     if (!emptyRunQueue(cap)) {
72         if (cap->run_queue_hd->bound == NULL) {
73             return (task->tso == NULL);
74         } else {
75             return (cap->run_queue_hd->bound == task);
76         }
77     }
78     return globalWorkToDo();
79 }
80 #endif
81
82 /* -----------------------------------------------------------------------------
83  * Manage the returning_tasks lists.
84  *
85  * These functions require cap->lock
86  * -------------------------------------------------------------------------- */
87
88 #if defined(THREADED_RTS)
89 STATIC_INLINE void
90 newReturningTask (Capability *cap, Task *task)
91 {
92     ASSERT_LOCK_HELD(&cap->lock);
93     ASSERT(task->return_link == NULL);
94     if (cap->returning_tasks_hd) {
95         ASSERT(cap->returning_tasks_tl->return_link == NULL);
96         cap->returning_tasks_tl->return_link = task;
97     } else {
98         cap->returning_tasks_hd = task;
99     }
100     cap->returning_tasks_tl = task;
101 }
102
103 STATIC_INLINE Task *
104 popReturningTask (Capability *cap)
105 {
106     ASSERT_LOCK_HELD(&cap->lock);
107     Task *task;
108     task = cap->returning_tasks_hd;
109     ASSERT(task);
110     cap->returning_tasks_hd = task->return_link;
111     if (!cap->returning_tasks_hd) {
112         cap->returning_tasks_tl = NULL;
113     }
114     task->return_link = NULL;
115     return task;
116 }
117 #endif
118
119 /* ----------------------------------------------------------------------------
120  * Initialisation
121  *
122  * The Capability is initially marked not free.
123  * ------------------------------------------------------------------------- */
124
125 static void
126 initCapability( Capability *cap, nat i )
127 {
128         nat g;
129
130     cap->no = i;
131     cap->in_haskell        = rtsFalse;
132
133     cap->run_queue_hd      = END_TSO_QUEUE;
134     cap->run_queue_tl      = END_TSO_QUEUE;
135
136 #if defined(THREADED_RTS)
137     initMutex(&cap->lock);
138     cap->running_task      = NULL; // indicates cap is free
139     cap->spare_workers     = NULL;
140     cap->suspended_ccalling_tasks = NULL;
141     cap->returning_tasks_hd = NULL;
142     cap->returning_tasks_tl = NULL;
143 #endif
144
145     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
146     cap->f.stgGCFun        = (F_)__stg_gc_fun;
147
148     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
149                                      RtsFlags.GcFlags.generations,
150                                      "initCapability");
151         for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
152               cap->mut_lists[g] = NULL;
153     }
154 }
155
156 /* ---------------------------------------------------------------------------
157  * Function:  initCapabilities()
158  *
159  * Purpose:   set up the Capability handling. For the SMP build,
160  *            we keep a table of them, the size of which is
161  *            controlled by the user via the RTS flag -N.
162  *
163  * ------------------------------------------------------------------------- */
164 void
165 initCapabilities( void )
166 {
167 #if defined(SMP)
168     nat i,n;
169
170     n_capabilities = n = RtsFlags.ParFlags.nNodes;
171     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
172
173     for (i = 0; i < n; i++) {
174         initCapability(&capabilities[i], i);
175     }
176
177     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
178 #else
179     n_capabilities = 1;
180     capabilities = &MainCapability;
181     initCapability(&MainCapability, 0);
182 #endif
183
184     // There are no free capabilities to begin with.  We will start
185     // a worker Task to each Capability, which will quickly put the
186     // Capability on the free list when it finds nothing to do.
187     last_free_capability = &capabilities[0];
188 }
189
190 /* ----------------------------------------------------------------------------
191  * Give a Capability to a Task.  The task must currently be sleeping
192  * on its condition variable.
193  *
194  * Requires cap->lock (modifies cap->running_task).
195  *
196  * When migrating a Task, the migrater must take task->lock before
197  * modifying task->cap, to synchronise with the waking up Task.
198  * Additionally, the migrater should own the Capability (when
199  * migrating the run queue), or cap->lock (when migrating
200  * returning_workers).
201  *
202  * ------------------------------------------------------------------------- */
203
204 #if defined(THREADED_RTS)
205 STATIC_INLINE void
206 giveCapabilityToTask (Capability *cap, Task *task)
207 {
208     ASSERT_LOCK_HELD(&cap->lock);
209     ASSERT(task->cap == cap);
210     // We are not modifying task->cap, so we do not need to take task->lock.
211     IF_DEBUG(scheduler,
212              sched_belch("passing capability %d to %s %p",
213                          cap->no, task->tso ? "bound task" : "worker",
214                          (void *)task->id));
215     ACQUIRE_LOCK(&task->lock);
216     task->wakeup = rtsTrue;
217     // the wakeup flag is needed because signalCondition() doesn't
218     // flag the condition if the thread is already runniing, but we want
219     // it to be sticky.
220     signalCondition(&task->cond);
221     RELEASE_LOCK(&task->lock);
222 }
223 #endif
224
225 /* ----------------------------------------------------------------------------
226  * Function:  releaseCapability(Capability*)
227  *
228  * Purpose:   Letting go of a capability. Causes a
229  *            'returning worker' thread or a 'waiting worker'
230  *            to wake up, in that order.
231  * ------------------------------------------------------------------------- */
232
233 #if defined(THREADED_RTS)
234 void
235 releaseCapability_ (Capability* cap)
236 {
237     Task *task;
238
239     task = cap->running_task;
240
241     ASSERT_CAPABILITY_INVARIANTS(cap,task);
242
243     cap->running_task = NULL;
244
245     // Check to see whether a worker thread can be given
246     // the go-ahead to return the result of an external call..
247     if (cap->returning_tasks_hd != NULL) {
248         giveCapabilityToTask(cap,cap->returning_tasks_hd);
249         // The Task pops itself from the queue (see waitForReturnCapability())
250         return;
251     }
252
253     // If the next thread on the run queue is a bound thread,
254     // give this Capability to the appropriate Task.
255     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
256         // Make sure we're not about to try to wake ourselves up
257         ASSERT(task != cap->run_queue_hd->bound);
258         task = cap->run_queue_hd->bound;
259         giveCapabilityToTask(cap,task);
260         return;
261     }
262
263     // If we have an unbound thread on the run queue, or if there's
264     // anything else to do, give the Capability to a worker thread.
265     if (!emptyRunQueue(cap) || globalWorkToDo()) {
266         if (cap->spare_workers) {
267             giveCapabilityToTask(cap,cap->spare_workers);
268             // The worker Task pops itself from the queue;
269             return;
270         }
271
272         // Create a worker thread if we don't have one.  If the system
273         // is interrupted, we only create a worker task if there
274         // are threads that need to be completed.  If the system is
275         // shutting down, we never create a new worker.
276         if (!shutting_down_scheduler) {
277             IF_DEBUG(scheduler,
278                      sched_belch("starting new worker on capability %d", cap->no));
279             startWorkerTask(cap, workerStart);
280             return;
281         }
282     }
283
284     last_free_capability = cap;
285     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
286 }
287
288 void
289 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
290 {
291     ACQUIRE_LOCK(&cap->lock);
292     releaseCapability_(cap);
293     RELEASE_LOCK(&cap->lock);
294 }
295
296 static void
297 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
298 {
299     Task *task;
300
301     ACQUIRE_LOCK(&cap->lock);
302
303     task = cap->running_task;
304
305     // If the current task is a worker, save it on the spare_workers
306     // list of this Capability.  A worker can mark itself as stopped,
307     // in which case it is not replaced on the spare_worker queue.
308     // This happens when the system is shutting down (see
309     // Schedule.c:workerStart()).
310     // Also, be careful to check that this task hasn't just exited
311     // Haskell to do a foreign call (task->suspended_tso).
312     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
313         task->next = cap->spare_workers;
314         cap->spare_workers = task;
315     }
316     // Bound tasks just float around attached to their TSOs.
317
318     releaseCapability_(cap);
319
320     RELEASE_LOCK(&cap->lock);
321 }
322 #endif
323
324 /* ----------------------------------------------------------------------------
325  * waitForReturnCapability( Task *task )
326  *
327  * Purpose:  when an OS thread returns from an external call,
328  * it calls waitForReturnCapability() (via Schedule.resumeThread())
329  * to wait for permission to enter the RTS & communicate the
330  * result of the external call back to the Haskell thread that
331  * made it.
332  *
333  * ------------------------------------------------------------------------- */
334 void
335 waitForReturnCapability (Capability **pCap,
336                          Task *task UNUSED_IF_NOT_THREADS)
337 {
338 #if !defined(THREADED_RTS)
339
340     MainCapability.running_task = task;
341     task->cap = &MainCapability;
342     *pCap = &MainCapability;
343
344 #else
345     Capability *cap = *pCap;
346
347     if (cap == NULL) {
348         // Try last_free_capability first
349         cap = last_free_capability;
350         if (!cap->running_task) {
351             nat i;
352             // otherwise, search for a free capability
353             for (i = 0; i < n_capabilities; i++) {
354                 cap = &capabilities[i];
355                 if (!cap->running_task) {
356                     break;
357                 }
358             }
359             // Can't find a free one, use last_free_capability.
360             cap = last_free_capability;
361         }
362
363         // record the Capability as the one this Task is now assocated with.
364         task->cap = cap;
365
366     } else {
367         ASSERT(task->cap == cap);
368     }
369
370     ACQUIRE_LOCK(&cap->lock);
371
372     IF_DEBUG(scheduler,
373              sched_belch("returning; I want capability %d", cap->no));
374
375     if (!cap->running_task) {
376         // It's free; just grab it
377         cap->running_task = task;
378         RELEASE_LOCK(&cap->lock);
379     } else {
380         newReturningTask(cap,task);
381         RELEASE_LOCK(&cap->lock);
382
383         for (;;) {
384             ACQUIRE_LOCK(&task->lock);
385             // task->lock held, cap->lock not held
386             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
387             cap = task->cap;
388             task->wakeup = rtsFalse;
389             RELEASE_LOCK(&task->lock);
390
391             // now check whether we should wake up...
392             ACQUIRE_LOCK(&cap->lock);
393             if (cap->running_task == NULL) {
394                 if (cap->returning_tasks_hd != task) {
395                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
396                     RELEASE_LOCK(&cap->lock);
397                     continue;
398                 }
399                 cap->running_task = task;
400                 popReturningTask(cap);
401                 RELEASE_LOCK(&cap->lock);
402                 break;
403             }
404             RELEASE_LOCK(&cap->lock);
405         }
406
407     }
408
409     ASSERT_CAPABILITY_INVARIANTS(cap,task);
410
411     IF_DEBUG(scheduler,
412              sched_belch("returning; got capability %d", cap->no));
413
414     *pCap = cap;
415 #endif
416 }
417
418 #if defined(THREADED_RTS)
419 /* ----------------------------------------------------------------------------
420  * yieldCapability
421  * ------------------------------------------------------------------------- */
422
423 void
424 yieldCapability (Capability** pCap, Task *task)
425 {
426     Capability *cap = *pCap;
427
428     // The fast path; no locking
429     if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
430         return;
431
432     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
433         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
434
435         // We must now release the capability and wait to be woken up
436         // again.
437         releaseCapabilityAndQueueWorker(cap);
438
439         for (;;) {
440             ACQUIRE_LOCK(&task->lock);
441             // task->lock held, cap->lock not held
442             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
443             cap = task->cap;
444             task->wakeup = rtsFalse;
445             RELEASE_LOCK(&task->lock);
446
447             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
448             ACQUIRE_LOCK(&cap->lock);
449             if (cap->running_task != NULL) {
450                 RELEASE_LOCK(&cap->lock);
451                 continue;
452             }
453
454             if (task->tso == NULL) {
455                 ASSERT(cap->spare_workers != NULL);
456                 // if we're not at the front of the queue, release it
457                 // again.  This is unlikely to happen.
458                 if (cap->spare_workers != task) {
459                     giveCapabilityToTask(cap,cap->spare_workers);
460                     RELEASE_LOCK(&cap->lock);
461                     continue;
462                 }
463                 cap->spare_workers = task->next;
464                 task->next = NULL;
465             }
466             cap->running_task = task;
467             RELEASE_LOCK(&cap->lock);
468             break;
469         }
470
471         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
472         ASSERT(cap->running_task == task);
473     }
474
475     *pCap = cap;
476
477     ASSERT_CAPABILITY_INVARIANTS(cap,task);
478
479     return;
480 }
481
482 /* ----------------------------------------------------------------------------
483  * prodCapabilities
484  *
485  * Used to indicate that the interrupted flag is now set, or some
486  * other global condition that might require waking up a Task on each
487  * Capability.
488  * ------------------------------------------------------------------------- */
489
490 static void
491 prodCapabilities(rtsBool all)
492 {
493     nat i;
494     Capability *cap;
495     Task *task;
496
497     for (i=0; i < n_capabilities; i++) {
498         cap = &capabilities[i];
499         ACQUIRE_LOCK(&cap->lock);
500         if (!cap->running_task) {
501             if (cap->spare_workers) {
502                 task = cap->spare_workers;
503                 ASSERT(!task->stopped);
504                 giveCapabilityToTask(cap,task);
505                 if (!all) {
506                     RELEASE_LOCK(&cap->lock);
507                     return;
508                 }
509             }
510         }
511         RELEASE_LOCK(&cap->lock);
512     }
513 }
514
515 void
516 prodAllCapabilities (void)
517 {
518     prodCapabilities(rtsTrue);
519 }
520
521 /* ----------------------------------------------------------------------------
522  * prodOneCapability
523  *
524  * Like prodAllCapabilities, but we only require a single Task to wake
525  * up in order to service some global event, such as checking for
526  * deadlock after some idle time has passed.
527  * ------------------------------------------------------------------------- */
528
529 void
530 prodOneCapability (void)
531 {
532     prodCapabilities(rtsFalse);
533 }
534
535 /* ----------------------------------------------------------------------------
536  * shutdownCapability
537  *
538  * At shutdown time, we want to let everything exit as cleanly as
539  * possible.  For each capability, we let its run queue drain, and
540  * allow the workers to stop.
541  *
542  * This function should be called when interrupted and
543  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
544  * will exit the scheduler and call taskStop(), and any bound thread
545  * that wakes up will return to its caller.  Runnable threads are
546  * killed.
547  *
548  * ------------------------------------------------------------------------- */
549
550 void
551 shutdownCapability (Capability *cap, Task *task)
552 {
553     nat i;
554
555     ASSERT(interrupted && shutting_down_scheduler);
556
557     task->cap = cap;
558
559     for (i = 0; i < 50; i++) {
560         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
561         ACQUIRE_LOCK(&cap->lock);
562         if (cap->running_task) {
563             RELEASE_LOCK(&cap->lock);
564             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
565             yieldThread();
566             continue;
567         }
568         cap->running_task = task;
569         if (!emptyRunQueue(cap) || cap->spare_workers) {
570             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
571             releaseCapability_(cap); // this will wake up a worker
572             RELEASE_LOCK(&cap->lock);
573             yieldThread();
574             continue;
575         }
576         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
577         RELEASE_LOCK(&cap->lock);
578         break;
579     }
580     // we now have the Capability, its run queue and spare workers
581     // list are both empty.
582 }
583
584 /* ----------------------------------------------------------------------------
585  * tryGrabCapability
586  *
587  * Attempt to gain control of a Capability if it is free.
588  *
589  * ------------------------------------------------------------------------- */
590
591 rtsBool
592 tryGrabCapability (Capability *cap, Task *task)
593 {
594     if (cap->running_task != NULL) return rtsFalse;
595     ACQUIRE_LOCK(&cap->lock);
596     if (cap->running_task != NULL) {
597         RELEASE_LOCK(&cap->lock);
598         return rtsFalse;
599     }
600     task->cap = cap;
601     cap->running_task = task;
602     RELEASE_LOCK(&cap->lock);
603     return rtsTrue;
604 }
605
606
607 #endif /* THREADED_RTS */
608
609