[project @ 2005-10-27 15:26:06 by simonmar]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  * 
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
25 #include "Schedule.h"
26
27 #if !defined(SMP)
28 Capability MainCapability;     // for non-SMP, we have one global capability
29 #endif
30
31 nat n_capabilities;
32 Capability *capabilities = NULL;
33
34 // Holds the Capability which last became free.  This is used so that
35 // an in-call has a chance of quickly finding a free Capability.
36 // Maintaining a global free list of Capabilities would require global
37 // locking, so we don't do that.
38 Capability *last_free_capability;
39
40 #ifdef SMP
41 #define UNUSED_IF_NOT_SMP
42 #else
43 #define UNUSED_IF_NOT_SMP STG_UNUSED
44 #endif
45
46 #ifdef RTS_USER_SIGNALS
47 #define UNUSED_IF_NOT_THREADS
48 #else
49 #define UNUSED_IF_NOT_THREADS STG_UNUSED
50 #endif
51
52
53 STATIC_INLINE rtsBool
54 globalWorkToDo (void)
55 {
56     return blackholes_need_checking
57         || interrupted
58 #if defined(RTS_USER_SIGNALS)
59         || signals_pending()
60 #endif
61         ;
62 }
63
64 #if defined(THREADED_RTS)
65 STATIC_INLINE rtsBool
66 anyWorkForMe( Capability *cap, Task *task )
67 {
68     // If the run queue is not empty, then we only wake up the guy who
69     // can run the thread at the head, even if there is some other
70     // reason for this task to run (eg. interrupted=rtsTrue).
71     if (!emptyRunQueue(cap)) {
72         if (cap->run_queue_hd->bound == NULL) {
73             return (task->tso == NULL);
74         } else {
75             return (cap->run_queue_hd->bound == task);
76         }
77     }
78     return globalWorkToDo();
79 }
80 #endif
81
82 /* -----------------------------------------------------------------------------
83  * Manage the returning_tasks lists.
84  * 
85  * These functions require cap->lock
86  * -------------------------------------------------------------------------- */
87
88 #if defined(THREADED_RTS)
89 STATIC_INLINE void
90 newReturningTask (Capability *cap, Task *task)
91 {
92     ASSERT_LOCK_HELD(&cap->lock);
93     ASSERT(task->return_link == NULL);
94     if (cap->returning_tasks_hd) {
95         ASSERT(cap->returning_tasks_tl->return_link == NULL);
96         cap->returning_tasks_tl->return_link = task;
97     } else {
98         cap->returning_tasks_hd = task;
99     }
100     cap->returning_tasks_tl = task;
101 }
102
103 STATIC_INLINE Task *
104 popReturningTask (Capability *cap)
105 {
106     ASSERT_LOCK_HELD(&cap->lock);
107     Task *task;
108     task = cap->returning_tasks_hd;
109     ASSERT(task);
110     cap->returning_tasks_hd = task->return_link;
111     if (!cap->returning_tasks_hd) {
112         cap->returning_tasks_tl = NULL;
113     }
114     task->return_link = NULL;
115     return task;
116 }
117 #endif
118
119 /* ----------------------------------------------------------------------------
120  * Initialisation
121  *
122  * The Capability is initially marked not free.
123  * ------------------------------------------------------------------------- */
124
125 static void
126 initCapability( Capability *cap, nat i )
127 {
128     cap->no = i;
129     cap->in_haskell        = rtsFalse;
130
131     cap->run_queue_hd      = END_TSO_QUEUE;
132     cap->run_queue_tl      = END_TSO_QUEUE;
133
134 #if defined(THREADED_RTS)
135     initMutex(&cap->lock);
136     cap->running_task      = NULL; // indicates cap is free
137     cap->spare_workers     = NULL;
138     cap->suspended_ccalling_tasks = NULL;
139     cap->returning_tasks_hd = NULL;
140     cap->returning_tasks_tl = NULL;
141 #endif
142
143     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
144     cap->f.stgGCFun        = (F_)__stg_gc_fun;
145
146     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) * 
147                                      RtsFlags.GcFlags.generations,
148                                      "initCapability");
149 }
150
151 /* ---------------------------------------------------------------------------
152  * Function:  initCapabilities()
153  *
154  * Purpose:   set up the Capability handling. For the SMP build,
155  *            we keep a table of them, the size of which is
156  *            controlled by the user via the RTS flag -N.
157  *
158  * ------------------------------------------------------------------------- */
159 void
160 initCapabilities( void )
161 {
162 #if defined(SMP)
163     nat i,n;
164
165     n_capabilities = n = RtsFlags.ParFlags.nNodes;
166     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
167
168     for (i = 0; i < n; i++) {
169         initCapability(&capabilities[i], i);
170     }
171     
172     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
173 #else
174     n_capabilities = 1;
175     capabilities = &MainCapability;
176     initCapability(&MainCapability, 0);
177 #endif
178
179     // There are no free capabilities to begin with.  We will start
180     // a worker Task to each Capability, which will quickly put the
181     // Capability on the free list when it finds nothing to do.
182     last_free_capability = &capabilities[0];
183 }
184
185 /* ----------------------------------------------------------------------------
186  * Give a Capability to a Task.  The task must currently be sleeping
187  * on its condition variable.
188  *
189  * Requires cap->lock (modifies cap->running_task).
190  *
191  * When migrating a Task, the migrater must take task->lock before
192  * modifying task->cap, to synchronise with the waking up Task.
193  * Additionally, the migrater should own the Capability (when
194  * migrating the run queue), or cap->lock (when migrating
195  * returning_workers).
196  *
197  * ------------------------------------------------------------------------- */
198
199 #if defined(THREADED_RTS)
200 STATIC_INLINE void
201 giveCapabilityToTask (Capability *cap, Task *task)
202 {
203     ASSERT_LOCK_HELD(&cap->lock);
204     ASSERT(task->cap == cap);
205     // We are not modifying task->cap, so we do not need to take task->lock.
206     IF_DEBUG(scheduler, 
207              sched_belch("passing capability %d to %s %p",
208                          cap->no, task->tso ? "bound task" : "worker", 
209                          (void *)task->id));
210     ACQUIRE_LOCK(&task->lock);
211     task->wakeup = rtsTrue;
212     // the wakeup flag is needed because signalCondition() doesn't
213     // flag the condition if the thread is already runniing, but we want
214     // it to be sticky.
215     signalCondition(&task->cond);
216     RELEASE_LOCK(&task->lock);
217 }
218 #endif
219
220 /* ----------------------------------------------------------------------------
221  * Function:  releaseCapability(Capability*)
222  *
223  * Purpose:   Letting go of a capability. Causes a
224  *            'returning worker' thread or a 'waiting worker'
225  *            to wake up, in that order.
226  * ------------------------------------------------------------------------- */
227
228 #if defined(THREADED_RTS)
229 void
230 releaseCapability_ (Capability* cap)
231 {
232     Task *task;
233
234     task = cap->running_task;
235
236     ASSERT_CAPABILITY_INVARIANTS(cap,task);
237
238     cap->running_task = NULL;
239
240     // Check to see whether a worker thread can be given
241     // the go-ahead to return the result of an external call..
242     if (cap->returning_tasks_hd != NULL) {
243         giveCapabilityToTask(cap,cap->returning_tasks_hd);
244         // The Task pops itself from the queue (see waitForReturnCapability())
245         return;
246     } 
247
248     // If the next thread on the run queue is a bound thread,
249     // give this Capability to the appropriate Task.
250     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
251         // Make sure we're not about to try to wake ourselves up
252         ASSERT(task != cap->run_queue_hd->bound);
253         task = cap->run_queue_hd->bound;
254         giveCapabilityToTask(cap,task);
255         return;
256     } 
257
258     // If we have an unbound thread on the run queue, or if there's
259     // anything else to do, give the Capability to a worker thread.
260     if (!emptyRunQueue(cap) || globalWorkToDo()) {
261         if (cap->spare_workers) {
262             giveCapabilityToTask(cap,cap->spare_workers);
263             // The worker Task pops itself from the queue;
264             return;
265         } 
266
267         // Create a worker thread if we don't have one.  If the system
268         // is interrupted, we only create a worker task if there
269         // are threads that need to be completed.  If the system is
270         // shutting down, we never create a new worker.
271         if (!shutting_down_scheduler) {
272             IF_DEBUG(scheduler, 
273                      sched_belch("starting new worker on capability %d", cap->no));
274             startWorkerTask(cap, workerStart);
275             return;
276         }
277     }
278
279     last_free_capability = cap;
280     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
281 }
282
283 void
284 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
285 {
286     ACQUIRE_LOCK(&cap->lock);
287     releaseCapability_(cap);
288     RELEASE_LOCK(&cap->lock);
289 }
290
291 static void
292 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
293 {
294     Task *task;
295
296     ACQUIRE_LOCK(&cap->lock);
297
298     task = cap->running_task;
299
300     // If the current task is a worker, save it on the spare_workers
301     // list of this Capability.  A worker can mark itself as stopped,
302     // in which case it is not replaced on the spare_worker queue.
303     // This happens when the system is shutting down (see
304     // Schedule.c:workerStart()).
305     // Also, be careful to check that this task hasn't just exited
306     // Haskell to do a foreign call (task->suspended_tso).
307     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
308         task->next = cap->spare_workers;
309         cap->spare_workers = task;
310     }
311     // Bound tasks just float around attached to their TSOs.
312
313     releaseCapability_(cap);
314
315     RELEASE_LOCK(&cap->lock);
316 }
317 #endif
318
319 /* ----------------------------------------------------------------------------
320  * waitForReturnCapability( Task *task )
321  *
322  * Purpose:  when an OS thread returns from an external call,
323  * it calls waitForReturnCapability() (via Schedule.resumeThread())
324  * to wait for permission to enter the RTS & communicate the
325  * result of the external call back to the Haskell thread that
326  * made it.
327  *
328  * ------------------------------------------------------------------------- */
329 void
330 waitForReturnCapability (Capability **pCap, 
331                          Task *task UNUSED_IF_NOT_THREADS)
332 {
333 #if !defined(THREADED_RTS)
334
335     MainCapability.running_task = task;
336     task->cap = &MainCapability;
337     *pCap = &MainCapability;
338
339 #else
340     Capability *cap = *pCap;
341
342     if (cap == NULL) {
343         // Try last_free_capability first
344         cap = last_free_capability;
345         if (!cap->running_task) {
346             nat i;
347             // otherwise, search for a free capability
348             for (i = 0; i < n_capabilities; i++) {
349                 cap = &capabilities[i];
350                 if (!cap->running_task) {
351                     break;
352                 }
353             }
354             // Can't find a free one, use last_free_capability.
355             cap = last_free_capability;
356         }
357
358         // record the Capability as the one this Task is now assocated with.
359         task->cap = cap;
360
361     } else {
362         ASSERT(task->cap == cap);
363     }
364
365     ACQUIRE_LOCK(&cap->lock);
366
367     IF_DEBUG(scheduler, 
368              sched_belch("returning; I want capability %d", cap->no));
369
370     if (!cap->running_task) {
371         // It's free; just grab it
372         cap->running_task = task;
373         RELEASE_LOCK(&cap->lock);
374     } else {
375         newReturningTask(cap,task);
376         RELEASE_LOCK(&cap->lock);
377
378         for (;;) {
379             ACQUIRE_LOCK(&task->lock);
380             // task->lock held, cap->lock not held
381             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
382             cap = task->cap;
383             task->wakeup = rtsFalse;
384             RELEASE_LOCK(&task->lock);
385
386             // now check whether we should wake up...
387             ACQUIRE_LOCK(&cap->lock);
388             if (cap->running_task == NULL) {
389                 if (cap->returning_tasks_hd != task) {
390                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
391                     RELEASE_LOCK(&cap->lock);
392                     continue;
393                 }
394                 cap->running_task = task;
395                 popReturningTask(cap);
396                 RELEASE_LOCK(&cap->lock);
397                 break;
398             }
399             RELEASE_LOCK(&cap->lock);
400         }
401
402     }
403
404     ASSERT_CAPABILITY_INVARIANTS(cap,task);
405
406     IF_DEBUG(scheduler, 
407              sched_belch("returning; got capability %d", cap->no));
408
409     *pCap = cap;
410 #endif
411 }
412
413 #if defined(THREADED_RTS)
414 /* ----------------------------------------------------------------------------
415  * yieldCapability
416  * ------------------------------------------------------------------------- */
417
418 void
419 yieldCapability (Capability** pCap, Task *task)
420 {
421     Capability *cap = *pCap;
422
423     // The fast path; no locking
424     if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
425         return;
426
427     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
428         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
429
430         // We must now release the capability and wait to be woken up
431         // again.  
432         releaseCapabilityAndQueueWorker(cap);
433
434         for (;;) {
435             ACQUIRE_LOCK(&task->lock);
436             // task->lock held, cap->lock not held
437             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
438             cap = task->cap;
439             task->wakeup = rtsFalse;
440             RELEASE_LOCK(&task->lock);
441
442             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
443             ACQUIRE_LOCK(&cap->lock);
444             if (cap->running_task != NULL) {
445                 RELEASE_LOCK(&cap->lock);
446                 continue;
447             }
448
449             if (task->tso == NULL) {
450                 ASSERT(cap->spare_workers != NULL);
451                 // if we're not at the front of the queue, release it
452                 // again.  This is unlikely to happen.
453                 if (cap->spare_workers != task) {
454                     giveCapabilityToTask(cap,cap->spare_workers);
455                     RELEASE_LOCK(&cap->lock);
456                     continue;
457                 }
458                 cap->spare_workers = task->next;
459                 task->next = NULL;
460             }
461             cap->running_task = task;
462             RELEASE_LOCK(&cap->lock);
463             break;
464         }
465
466         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
467         ASSERT(cap->running_task == task);
468     }
469
470     *pCap = cap;
471
472     ASSERT_CAPABILITY_INVARIANTS(cap,task);
473
474     return;
475 }
476
477 /* ----------------------------------------------------------------------------
478  * prodCapabilities
479  *
480  * Used to indicate that the interrupted flag is now set, or some
481  * other global condition that might require waking up a Task on each
482  * Capability.
483  * ------------------------------------------------------------------------- */
484
485 static void
486 prodCapabilities(rtsBool all)
487 {
488     nat i;
489     Capability *cap;
490     Task *task;
491     
492     for (i=0; i < n_capabilities; i++) {
493         cap = &capabilities[i];
494         ACQUIRE_LOCK(&cap->lock);
495         if (!cap->running_task) {
496             if (cap->spare_workers) {
497                 task = cap->spare_workers;
498                 ASSERT(!task->stopped);
499                 giveCapabilityToTask(cap,task);
500                 if (!all) {
501                     RELEASE_LOCK(&cap->lock);
502                     return;
503                 }
504             }
505         }
506         RELEASE_LOCK(&cap->lock);
507     }
508 }
509
510 void
511 prodAllCapabilities (void)
512 {
513     prodCapabilities(rtsTrue);
514 }
515
516 /* ----------------------------------------------------------------------------
517  * prodOneCapability
518  *
519  * Like prodAllCapabilities, but we only require a single Task to wake
520  * up in order to service some global event, such as checking for
521  * deadlock after some idle time has passed.
522  * ------------------------------------------------------------------------- */
523
524 void
525 prodOneCapability (void)
526 {
527     prodCapabilities(rtsFalse);
528 }           
529
530 /* ----------------------------------------------------------------------------
531  * shutdownCapability
532  *
533  * At shutdown time, we want to let everything exit as cleanly as
534  * possible.  For each capability, we let its run queue drain, and
535  * allow the workers to stop.
536  *
537  * This function should be called when interrupted and
538  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
539  * will exit the scheduler and call taskStop(), and any bound thread
540  * that wakes up will return to its caller.  Runnable threads are
541  * killed.
542  * 
543  * ------------------------------------------------------------------------- */
544
545 void
546 shutdownCapability (Capability *cap, Task *task)
547 {
548     nat i;
549
550     ASSERT(interrupted && shutting_down_scheduler);
551
552     task->cap = cap;
553
554     for (i = 0; i < 50; i++) {
555         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
556         ACQUIRE_LOCK(&cap->lock);
557         if (cap->running_task) {
558             RELEASE_LOCK(&cap->lock);
559             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
560             yieldThread();
561             continue;
562         }
563         cap->running_task = task;
564         if (!emptyRunQueue(cap) || cap->spare_workers) {
565             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
566             releaseCapability_(cap); // this will wake up a worker
567             RELEASE_LOCK(&cap->lock);
568             yieldThread();
569             continue;
570         }
571         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
572         RELEASE_LOCK(&cap->lock);
573         break;
574     }
575     // we now have the Capability, its run queue and spare workers
576     // list are both empty.
577 }
578
579 /* ----------------------------------------------------------------------------
580  * tryGrabCapability
581  *
582  * Attempt to gain control of a Capability if it is free.
583  * 
584  * ------------------------------------------------------------------------- */
585
586 rtsBool
587 tryGrabCapability (Capability *cap, Task *task)
588 {
589     if (cap->running_task != NULL) return rtsFalse;
590     ACQUIRE_LOCK(&cap->lock);
591     if (cap->running_task != NULL) {
592         RELEASE_LOCK(&cap->lock);
593         return rtsFalse;
594     }
595     task->cap = cap;
596     cap->running_task = task;
597     RELEASE_LOCK(&cap->lock);
598     return rtsTrue;
599 }
600
601
602 #endif /* THREADED_RTS */
603
604