[project @ 2005-10-21 14:02:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  * 
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "OSThreads.h"
24 #include "Capability.h"
25 #include "Schedule.h"
26
27 #if !defined(SMP)
28 Capability MainCapability;     // for non-SMP, we have one global capability
29 #endif
30
31 nat n_capabilities;
32 Capability *capabilities = NULL;
33
34 // Holds the Capability which last became free.  This is used so that
35 // an in-call has a chance of quickly finding a free Capability.
36 // Maintaining a global free list of Capabilities would require global
37 // locking, so we don't do that.
38 Capability *last_free_capability;
39
40 #ifdef SMP
41 #define UNUSED_IF_NOT_SMP
42 #else
43 #define UNUSED_IF_NOT_SMP STG_UNUSED
44 #endif
45
46 #ifdef RTS_USER_SIGNALS
47 #define UNUSED_IF_NOT_THREADS
48 #else
49 #define UNUSED_IF_NOT_THREADS STG_UNUSED
50 #endif
51
52
53 STATIC_INLINE rtsBool
54 globalWorkToDo (void)
55 {
56     return blackholes_need_checking
57         || interrupted
58 #if defined(RTS_USER_SIGNALS)
59         || signals_pending()
60 #endif
61         ;
62 }
63
64 #if defined(THREADED_RTS)
65 STATIC_INLINE rtsBool
66 anyWorkForMe( Capability *cap, Task *task )
67 {
68     // If the run queue is not empty, then we only wake up the guy who
69     // can run the thread at the head, even if there is some other
70     // reason for this task to run (eg. interrupted=rtsTrue).
71     if (!emptyRunQueue(cap)) {
72         if (cap->run_queue_hd->bound == NULL) {
73             return (task->tso == NULL);
74         } else {
75             return (cap->run_queue_hd->bound == task);
76         }
77     }
78     return globalWorkToDo();
79 }
80 #endif
81
82 /* -----------------------------------------------------------------------------
83  * Manage the returning_tasks lists.
84  * 
85  * These functions require cap->lock
86  * -------------------------------------------------------------------------- */
87
88 #if defined(THREADED_RTS)
89 STATIC_INLINE void
90 newReturningTask (Capability *cap, Task *task)
91 {
92     ASSERT_LOCK_HELD(&cap->lock);
93     ASSERT(task->return_link == NULL);
94     if (cap->returning_tasks_hd) {
95         ASSERT(cap->returning_tasks_tl->return_link == NULL);
96         cap->returning_tasks_tl->return_link = task;
97     } else {
98         cap->returning_tasks_hd = task;
99     }
100     cap->returning_tasks_tl = task;
101 }
102
103 STATIC_INLINE Task *
104 popReturningTask (Capability *cap)
105 {
106     ASSERT_LOCK_HELD(&cap->lock);
107     Task *task;
108     task = cap->returning_tasks_hd;
109     ASSERT(task);
110     cap->returning_tasks_hd = task->return_link;
111     if (!cap->returning_tasks_hd) {
112         cap->returning_tasks_tl = NULL;
113     }
114     task->return_link = NULL;
115     return task;
116 }
117 #endif
118
119 /* ----------------------------------------------------------------------------
120  * Initialisation
121  *
122  * The Capability is initially marked not free.
123  * ------------------------------------------------------------------------- */
124
125 static void
126 initCapability( Capability *cap, nat i )
127 {
128     cap->no = i;
129     cap->in_haskell        = rtsFalse;
130
131     cap->run_queue_hd      = END_TSO_QUEUE;
132     cap->run_queue_tl      = END_TSO_QUEUE;
133
134 #if defined(THREADED_RTS)
135     initMutex(&cap->lock);
136     cap->running_task      = NULL; // indicates cap is free
137     cap->spare_workers     = NULL;
138     cap->suspended_ccalling_tasks = NULL;
139     cap->returning_tasks_hd = NULL;
140     cap->returning_tasks_tl = NULL;
141     cap->next = NULL;
142     cap->prev = NULL;
143 #endif
144
145     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
146     cap->f.stgGCFun        = (F_)__stg_gc_fun;
147 }
148
149 /* ---------------------------------------------------------------------------
150  * Function:  initCapabilities()
151  *
152  * Purpose:   set up the Capability handling. For the SMP build,
153  *            we keep a table of them, the size of which is
154  *            controlled by the user via the RTS flag -N.
155  *
156  * ------------------------------------------------------------------------- */
157 void
158 initCapabilities( void )
159 {
160 #if defined(SMP)
161     nat i,n;
162
163     n_capabilities = n = RtsFlags.ParFlags.nNodes;
164     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
165
166     for (i = 0; i < n; i++) {
167         initCapability(&capabilities[i], i);
168     }
169     
170     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
171 #else
172     n_capabilities = 1;
173     capabilities = &MainCapability;
174     initCapability(&MainCapability, 0);
175 #endif
176
177     // There are no free capabilities to begin with.  We will start
178     // a worker Task to each Capability, which will quickly put the
179     // Capability on the free list when it finds nothing to do.
180     last_free_capability = &capabilities[0];
181 }
182
183 /* ----------------------------------------------------------------------------
184  * Give a Capability to a Task.  The task must currently be sleeping
185  * on its condition variable.
186  *
187  * Requires cap->lock (modifies cap->running_task).
188  *
189  * When migrating a Task, the migrater must take task->lock before
190  * modifying task->cap, to synchronise with the waking up Task.
191  * Additionally, the migrater should own the Capability (when
192  * migrating the run queue), or cap->lock (when migrating
193  * returning_workers).
194  *
195  * ------------------------------------------------------------------------- */
196
197 #if defined(THREADED_RTS)
198 STATIC_INLINE void
199 giveCapabilityToTask (Capability *cap, Task *task)
200 {
201     ASSERT_LOCK_HELD(&cap->lock);
202     ASSERT(task->cap == cap);
203     // We are not modifying task->cap, so we do not need to take task->lock.
204     IF_DEBUG(scheduler, 
205              sched_belch("passing capability %d to %s %p",
206                          cap->no, task->tso ? "bound task" : "worker", 
207                          (void *)task->id));
208     ACQUIRE_LOCK(&task->lock);
209     task->wakeup = rtsTrue;
210     // the wakeup flag is needed because signalCondition() doesn't
211     // flag the condition if the thread is already runniing, but we want
212     // it to be sticky.
213     signalCondition(&task->cond);
214     RELEASE_LOCK(&task->lock);
215 }
216 #endif
217
218 /* ----------------------------------------------------------------------------
219  * Function:  releaseCapability(Capability*)
220  *
221  * Purpose:   Letting go of a capability. Causes a
222  *            'returning worker' thread or a 'waiting worker'
223  *            to wake up, in that order.
224  * ------------------------------------------------------------------------- */
225
226 #if defined(THREADED_RTS)
227 void
228 releaseCapability_ (Capability* cap)
229 {
230     Task *task;
231
232     ASSERT(cap->running_task != NULL && myTask() == cap->running_task);
233
234     task = cap->running_task;
235     cap->running_task = NULL;
236
237     ASSERT(task->id == osThreadId());
238
239     // Check to see whether a worker thread can be given
240     // the go-ahead to return the result of an external call..
241     if (cap->returning_tasks_hd != NULL) {
242         giveCapabilityToTask(cap,cap->returning_tasks_hd);
243         // The Task pops itself from the queue (see waitForReturnCapability())
244         return;
245     } 
246
247     // If the next thread on the run queue is a bound thread,
248     // give this Capability to the appropriate Task.
249     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
250         // Make sure we're not about to try to wake ourselves up
251         ASSERT(task != cap->run_queue_hd->bound);
252         task = cap->run_queue_hd->bound;
253         giveCapabilityToTask(cap,task);
254         return;
255     } 
256
257     // If we have an unbound thread on the run queue, or if there's
258     // anything else to do, give the Capability to a worker thread.
259     if (!emptyRunQueue(cap) || globalWorkToDo()) {
260         if (cap->spare_workers) {
261             giveCapabilityToTask(cap,cap->spare_workers);
262             // The worker Task pops itself from the queue;
263             return;
264         } 
265
266         // Create a worker thread if we don't have one.  If the system
267         // is interrupted, we only create a worker task if there
268         // are threads that need to be completed.  If the system is
269         // shutting down, we never create a new worker.
270         if (!shutting_down_scheduler) {
271             IF_DEBUG(scheduler, 
272                      sched_belch("starting new worker on capability %d", cap->no));
273             startWorkerTask(cap, workerStart);
274             return;
275         }
276     }
277
278     last_free_capability = cap;
279     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
280 }
281
282 void
283 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
284 {
285     ACQUIRE_LOCK(&cap->lock);
286     releaseCapability_(cap);
287     RELEASE_LOCK(&cap->lock);
288 }
289
290 static void
291 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
292 {
293     Task *task;
294
295     ACQUIRE_LOCK(&cap->lock);
296
297     task = cap->running_task;
298
299     // If the current task is a worker, save it on the spare_workers
300     // list of this Capability.  A worker can mark itself as stopped,
301     // in which case it is not replaced on the spare_worker queue.
302     // This happens when the system is shutting down (see
303     // Schedule.c:workerStart()).
304     // Also, be careful to check that this task hasn't just exited
305     // Haskell to do a foreign call (task->suspended_tso).
306     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
307         task->next = cap->spare_workers;
308         cap->spare_workers = task;
309     }
310     // Bound tasks just float around attached to their TSOs.
311
312     releaseCapability_(cap);
313
314     RELEASE_LOCK(&cap->lock);
315 }
316 #endif
317
318 /* ----------------------------------------------------------------------------
319  * waitForReturnCapability( Task *task )
320  *
321  * Purpose:  when an OS thread returns from an external call,
322  * it calls waitForReturnCapability() (via Schedule.resumeThread())
323  * to wait for permission to enter the RTS & communicate the
324  * result of the external call back to the Haskell thread that
325  * made it.
326  *
327  * ------------------------------------------------------------------------- */
328 void
329 waitForReturnCapability (Capability **pCap, 
330                          Task *task UNUSED_IF_NOT_THREADS)
331 {
332 #if !defined(THREADED_RTS)
333
334     MainCapability.running_task = task;
335     task->cap = &MainCapability;
336     *pCap = &MainCapability;
337
338 #else
339     Capability *cap = *pCap;
340
341     if (cap == NULL) {
342         // Try last_free_capability first
343         cap = last_free_capability;
344         if (!cap->running_task) {
345             nat i;
346             // otherwise, search for a free capability
347             for (i = 0; i < n_capabilities; i++) {
348                 cap = &capabilities[i];
349                 if (!cap->running_task) {
350                     break;
351                 }
352             }
353             // Can't find a free one, use last_free_capability.
354             cap = last_free_capability;
355         }
356
357         // record the Capability as the one this Task is now assocated with.
358         task->cap = cap;
359
360     } else {
361         ASSERT(task->cap == cap);
362     }
363
364     ACQUIRE_LOCK(&cap->lock);
365
366     IF_DEBUG(scheduler, 
367              sched_belch("returning; I want capability %d", cap->no));
368
369     if (!cap->running_task) {
370         // It's free; just grab it
371         cap->running_task = task;
372         RELEASE_LOCK(&cap->lock);
373     } else {
374         newReturningTask(cap,task);
375         RELEASE_LOCK(&cap->lock);
376
377         for (;;) {
378             ACQUIRE_LOCK(&task->lock);
379             // task->lock held, cap->lock not held
380             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
381             cap = task->cap;
382             task->wakeup = rtsFalse;
383             RELEASE_LOCK(&task->lock);
384
385             // now check whether we should wake up...
386             ACQUIRE_LOCK(&cap->lock);
387             if (cap->running_task == NULL) {
388                 if (cap->returning_tasks_hd != task) {
389                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
390                     RELEASE_LOCK(&cap->lock);
391                     continue;
392                 }
393                 cap->running_task = task;
394                 popReturningTask(cap);
395                 RELEASE_LOCK(&cap->lock);
396                 break;
397             }
398             RELEASE_LOCK(&cap->lock);
399         }
400
401     }
402
403     ASSERT(cap->running_task == task);
404
405     IF_DEBUG(scheduler, 
406              sched_belch("returning; got capability %d", cap->no));
407
408     *pCap = cap;
409 #endif
410 }
411
412 #if defined(THREADED_RTS)
413 /* ----------------------------------------------------------------------------
414  * yieldCapability
415  * ------------------------------------------------------------------------- */
416
417 void
418 yieldCapability (Capability** pCap, Task *task)
419 {
420     Capability *cap = *pCap;
421
422     // The fast path; no locking
423     if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
424         return;
425
426     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
427         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
428
429         // We must now release the capability and wait to be woken up
430         // again.  
431         releaseCapabilityAndQueueWorker(cap);
432
433         for (;;) {
434             ACQUIRE_LOCK(&task->lock);
435             // task->lock held, cap->lock not held
436             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
437             cap = task->cap;
438             task->wakeup = rtsFalse;
439             RELEASE_LOCK(&task->lock);
440
441             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
442             ACQUIRE_LOCK(&cap->lock);
443             if (cap->running_task != NULL) {
444                 RELEASE_LOCK(&cap->lock);
445                 continue;
446             }
447
448             if (task->tso == NULL) {
449                 ASSERT(cap->spare_workers != NULL);
450                 // if we're not at the front of the queue, release it
451                 // again.  This is unlikely to happen.
452                 if (cap->spare_workers != task) {
453                     giveCapabilityToTask(cap,cap->spare_workers);
454                     RELEASE_LOCK(&cap->lock);
455                     continue;
456                 }
457                 cap->spare_workers = task->next;
458                 task->next = NULL;
459             }
460             cap->running_task = task;
461             RELEASE_LOCK(&cap->lock);
462             break;
463         }
464
465         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
466         ASSERT(cap->running_task == task);
467     }
468
469     *pCap = cap;
470     return;
471 }
472
473 /* ----------------------------------------------------------------------------
474  * prodCapabilities
475  *
476  * Used to indicate that the interrupted flag is now set, or some
477  * other global condition that might require waking up a Task on each
478  * Capability.
479  * ------------------------------------------------------------------------- */
480
481 static void
482 prodCapabilities(rtsBool all)
483 {
484     nat i;
485     Capability *cap;
486     Task *task;
487     
488     for (i=0; i < n_capabilities; i++) {
489         cap = &capabilities[i];
490         ACQUIRE_LOCK(&cap->lock);
491         if (!cap->running_task) {
492             if (cap->spare_workers) {
493                 task = cap->spare_workers;
494                 ASSERT(!task->stopped);
495                 giveCapabilityToTask(cap,task);
496                 if (!all) {
497                     RELEASE_LOCK(&cap->lock);
498                     return;
499                 }
500             }
501         }
502         RELEASE_LOCK(&cap->lock);
503     }
504 }
505
506 void
507 prodAllCapabilities (void)
508 {
509     prodCapabilities(rtsTrue);
510 }
511
512 /* ----------------------------------------------------------------------------
513  * prodOneCapability
514  *
515  * Like prodAllCapabilities, but we only require a single Task to wake
516  * up in order to service some global event, such as checking for
517  * deadlock after some idle time has passed.
518  * ------------------------------------------------------------------------- */
519
520 void
521 prodOneCapability (void)
522 {
523     prodCapabilities(rtsFalse);
524 }           
525
526 /* ----------------------------------------------------------------------------
527  * shutdownCapability
528  *
529  * At shutdown time, we want to let everything exit as cleanly as
530  * possible.  For each capability, we let its run queue drain, and
531  * allow the workers to stop.
532  *
533  * This function should be called when interrupted and
534  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
535  * will exit the scheduler and call taskStop(), and any bound thread
536  * that wakes up will return to its caller.  Runnable threads are
537  * killed.
538  * 
539  * ------------------------------------------------------------------------- */
540
541 void
542 shutdownCapability (Capability *cap, Task *task)
543 {
544     nat i;
545
546     ASSERT(interrupted && shutting_down_scheduler);
547
548     task->cap = cap;
549
550     for (i = 0; i < 50; i++) {
551         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
552         ACQUIRE_LOCK(&cap->lock);
553         if (cap->running_task) {
554             RELEASE_LOCK(&cap->lock);
555             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
556             yieldThread();
557             continue;
558         }
559         cap->running_task = task;
560         if (!emptyRunQueue(cap) || cap->spare_workers) {
561             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
562             releaseCapability_(cap); // this will wake up a worker
563             RELEASE_LOCK(&cap->lock);
564             yieldThread();
565             continue;
566         }
567         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
568         RELEASE_LOCK(&cap->lock);
569         break;
570     }
571     // we now have the Capability, its run queue and spare workers
572     // list are both empty.
573 }
574
575 #endif /* THREADED_RTS */
576
577