764357c56d41cadd40aa6ea86d522a627f2348c6
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 #if !defined(SMP)
30 Capability MainCapability;     // for non-SMP, we have one global capability
31 #endif
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #if defined(THREADED_RTS)
43 STATIC_INLINE rtsBool
44 globalWorkToDo (void)
45 {
46     return blackholes_need_checking
47         || interrupted
48         ;
49 }
50 #endif
51
52 #if defined(THREADED_RTS)
53 STATIC_INLINE rtsBool
54 anyWorkForMe( Capability *cap, Task *task )
55 {
56     // If the run queue is not empty, then we only wake up the guy who
57     // can run the thread at the head, even if there is some other
58     // reason for this task to run (eg. interrupted=rtsTrue).
59     if (!emptyRunQueue(cap)) {
60         if (cap->run_queue_hd->bound == NULL) {
61             return (task->tso == NULL);
62         } else {
63             return (cap->run_queue_hd->bound == task);
64         }
65     } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
66         return rtsTrue;
67     }
68     return globalWorkToDo();
69 }
70 #endif
71
72 /* -----------------------------------------------------------------------------
73  * Manage the returning_tasks lists.
74  *
75  * These functions require cap->lock
76  * -------------------------------------------------------------------------- */
77
78 #if defined(THREADED_RTS)
79 STATIC_INLINE void
80 newReturningTask (Capability *cap, Task *task)
81 {
82     ASSERT_LOCK_HELD(&cap->lock);
83     ASSERT(task->return_link == NULL);
84     if (cap->returning_tasks_hd) {
85         ASSERT(cap->returning_tasks_tl->return_link == NULL);
86         cap->returning_tasks_tl->return_link = task;
87     } else {
88         cap->returning_tasks_hd = task;
89     }
90     cap->returning_tasks_tl = task;
91 }
92
93 STATIC_INLINE Task *
94 popReturningTask (Capability *cap)
95 {
96     ASSERT_LOCK_HELD(&cap->lock);
97     Task *task;
98     task = cap->returning_tasks_hd;
99     ASSERT(task);
100     cap->returning_tasks_hd = task->return_link;
101     if (!cap->returning_tasks_hd) {
102         cap->returning_tasks_tl = NULL;
103     }
104     task->return_link = NULL;
105     return task;
106 }
107 #endif
108
109 /* ----------------------------------------------------------------------------
110  * Initialisation
111  *
112  * The Capability is initially marked not free.
113  * ------------------------------------------------------------------------- */
114
115 static void
116 initCapability( Capability *cap, nat i )
117 {
118     nat g;
119
120     cap->no = i;
121     cap->in_haskell        = rtsFalse;
122
123     cap->run_queue_hd      = END_TSO_QUEUE;
124     cap->run_queue_tl      = END_TSO_QUEUE;
125
126 #if defined(THREADED_RTS)
127     initMutex(&cap->lock);
128     cap->running_task      = NULL; // indicates cap is free
129     cap->spare_workers     = NULL;
130     cap->suspended_ccalling_tasks = NULL;
131     cap->returning_tasks_hd = NULL;
132     cap->returning_tasks_tl = NULL;
133 #endif
134
135     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
136     cap->f.stgGCFun        = (F_)__stg_gc_fun;
137
138     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
139                                      RtsFlags.GcFlags.generations,
140                                      "initCapability");
141
142     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
143         cap->mut_lists[g] = NULL;
144     }
145
146     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
147     cap->free_trec_chunks = END_STM_CHUNK_LIST;
148     cap->free_trec_headers = NO_TREC;
149     cap->transaction_tokens = 0;
150 }
151
152 /* ---------------------------------------------------------------------------
153  * Function:  initCapabilities()
154  *
155  * Purpose:   set up the Capability handling. For the SMP build,
156  *            we keep a table of them, the size of which is
157  *            controlled by the user via the RTS flag -N.
158  *
159  * ------------------------------------------------------------------------- */
160 void
161 initCapabilities( void )
162 {
163 #if defined(SMP)
164     nat i,n;
165
166 #ifndef REG_BaseReg
167     // We can't support multiple CPUs if BaseReg is not a register
168     if (RtsFlags.ParFlags.nNodes > 1) {
169         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
170         RtsFlags.ParFlags.nNodes = 1;
171     }
172 #endif
173
174     n_capabilities = n = RtsFlags.ParFlags.nNodes;
175     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
176
177     for (i = 0; i < n; i++) {
178         initCapability(&capabilities[i], i);
179     }
180
181     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
182 #else
183     n_capabilities = 1;
184     capabilities = &MainCapability;
185     initCapability(&MainCapability, 0);
186 #endif
187
188     // There are no free capabilities to begin with.  We will start
189     // a worker Task to each Capability, which will quickly put the
190     // Capability on the free list when it finds nothing to do.
191     last_free_capability = &capabilities[0];
192 }
193
194 /* ----------------------------------------------------------------------------
195  * Give a Capability to a Task.  The task must currently be sleeping
196  * on its condition variable.
197  *
198  * Requires cap->lock (modifies cap->running_task).
199  *
200  * When migrating a Task, the migrater must take task->lock before
201  * modifying task->cap, to synchronise with the waking up Task.
202  * Additionally, the migrater should own the Capability (when
203  * migrating the run queue), or cap->lock (when migrating
204  * returning_workers).
205  *
206  * ------------------------------------------------------------------------- */
207
208 #if defined(THREADED_RTS)
209 STATIC_INLINE void
210 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
211 {
212     ASSERT_LOCK_HELD(&cap->lock);
213     ASSERT(task->cap == cap);
214     IF_DEBUG(scheduler,
215              sched_belch("passing capability %d to %s %p",
216                          cap->no, task->tso ? "bound task" : "worker",
217                          (void *)task->id));
218     ACQUIRE_LOCK(&task->lock);
219     task->wakeup = rtsTrue;
220     // the wakeup flag is needed because signalCondition() doesn't
221     // flag the condition if the thread is already runniing, but we want
222     // it to be sticky.
223     signalCondition(&task->cond);
224     RELEASE_LOCK(&task->lock);
225 }
226 #endif
227
228 /* ----------------------------------------------------------------------------
229  * Function:  releaseCapability(Capability*)
230  *
231  * Purpose:   Letting go of a capability. Causes a
232  *            'returning worker' thread or a 'waiting worker'
233  *            to wake up, in that order.
234  * ------------------------------------------------------------------------- */
235
236 #if defined(THREADED_RTS)
237 void
238 releaseCapability_ (Capability* cap)
239 {
240     Task *task;
241
242     task = cap->running_task;
243
244     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
245
246     cap->running_task = NULL;
247
248     // Check to see whether a worker thread can be given
249     // the go-ahead to return the result of an external call..
250     if (cap->returning_tasks_hd != NULL) {
251         giveCapabilityToTask(cap,cap->returning_tasks_hd);
252         // The Task pops itself from the queue (see waitForReturnCapability())
253         return;
254     }
255
256     // If the next thread on the run queue is a bound thread,
257     // give this Capability to the appropriate Task.
258     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
259         // Make sure we're not about to try to wake ourselves up
260         ASSERT(task != cap->run_queue_hd->bound);
261         task = cap->run_queue_hd->bound;
262         giveCapabilityToTask(cap,task);
263         return;
264     }
265
266     // If we have an unbound thread on the run queue, or if there's
267     // anything else to do, give the Capability to a worker thread.
268     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
269         if (cap->spare_workers) {
270             giveCapabilityToTask(cap,cap->spare_workers);
271             // The worker Task pops itself from the queue;
272             return;
273         }
274
275         // Create a worker thread if we don't have one.  If the system
276         // is interrupted, we only create a worker task if there
277         // are threads that need to be completed.  If the system is
278         // shutting down, we never create a new worker.
279         if (!shutting_down_scheduler) {
280             IF_DEBUG(scheduler,
281                      sched_belch("starting new worker on capability %d", cap->no));
282             startWorkerTask(cap, workerStart);
283             return;
284         }
285     }
286
287     last_free_capability = cap;
288     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
289 }
290
291 void
292 releaseCapability (Capability* cap USED_IF_THREADS)
293 {
294     ACQUIRE_LOCK(&cap->lock);
295     releaseCapability_(cap);
296     RELEASE_LOCK(&cap->lock);
297 }
298
299 static void
300 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
301 {
302     Task *task;
303
304     ACQUIRE_LOCK(&cap->lock);
305
306     task = cap->running_task;
307
308     // If the current task is a worker, save it on the spare_workers
309     // list of this Capability.  A worker can mark itself as stopped,
310     // in which case it is not replaced on the spare_worker queue.
311     // This happens when the system is shutting down (see
312     // Schedule.c:workerStart()).
313     // Also, be careful to check that this task hasn't just exited
314     // Haskell to do a foreign call (task->suspended_tso).
315     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
316         task->next = cap->spare_workers;
317         cap->spare_workers = task;
318     }
319     // Bound tasks just float around attached to their TSOs.
320
321     releaseCapability_(cap);
322
323     RELEASE_LOCK(&cap->lock);
324 }
325 #endif
326
327 /* ----------------------------------------------------------------------------
328  * waitForReturnCapability( Task *task )
329  *
330  * Purpose:  when an OS thread returns from an external call,
331  * it calls waitForReturnCapability() (via Schedule.resumeThread())
332  * to wait for permission to enter the RTS & communicate the
333  * result of the external call back to the Haskell thread that
334  * made it.
335  *
336  * ------------------------------------------------------------------------- */
337 void
338 waitForReturnCapability (Capability **pCap, Task *task)
339 {
340 #if !defined(THREADED_RTS)
341
342     MainCapability.running_task = task;
343     task->cap = &MainCapability;
344     *pCap = &MainCapability;
345
346 #else
347     Capability *cap = *pCap;
348
349     if (cap == NULL) {
350         // Try last_free_capability first
351         cap = last_free_capability;
352         if (!cap->running_task) {
353             nat i;
354             // otherwise, search for a free capability
355             for (i = 0; i < n_capabilities; i++) {
356                 cap = &capabilities[i];
357                 if (!cap->running_task) {
358                     break;
359                 }
360             }
361             // Can't find a free one, use last_free_capability.
362             cap = last_free_capability;
363         }
364
365         // record the Capability as the one this Task is now assocated with.
366         task->cap = cap;
367
368     } else {
369         ASSERT(task->cap == cap);
370     }
371
372     ACQUIRE_LOCK(&cap->lock);
373
374     IF_DEBUG(scheduler,
375              sched_belch("returning; I want capability %d", cap->no));
376
377     if (!cap->running_task) {
378         // It's free; just grab it
379         cap->running_task = task;
380         RELEASE_LOCK(&cap->lock);
381     } else {
382         newReturningTask(cap,task);
383         RELEASE_LOCK(&cap->lock);
384
385         for (;;) {
386             ACQUIRE_LOCK(&task->lock);
387             // task->lock held, cap->lock not held
388             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
389             cap = task->cap;
390             task->wakeup = rtsFalse;
391             RELEASE_LOCK(&task->lock);
392
393             // now check whether we should wake up...
394             ACQUIRE_LOCK(&cap->lock);
395             if (cap->running_task == NULL) {
396                 if (cap->returning_tasks_hd != task) {
397                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
398                     RELEASE_LOCK(&cap->lock);
399                     continue;
400                 }
401                 cap->running_task = task;
402                 popReturningTask(cap);
403                 RELEASE_LOCK(&cap->lock);
404                 break;
405             }
406             RELEASE_LOCK(&cap->lock);
407         }
408
409     }
410
411     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
412
413     IF_DEBUG(scheduler,
414              sched_belch("returning; got capability %d", cap->no));
415
416     *pCap = cap;
417 #endif
418 }
419
420 #if defined(THREADED_RTS)
421 /* ----------------------------------------------------------------------------
422  * yieldCapability
423  * ------------------------------------------------------------------------- */
424
425 void
426 yieldCapability (Capability** pCap, Task *task)
427 {
428     Capability *cap = *pCap;
429
430     // The fast path has no locking, if we don't enter this while loop
431
432     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
433         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
434
435         // We must now release the capability and wait to be woken up
436         // again.
437         task->wakeup = rtsFalse;
438         releaseCapabilityAndQueueWorker(cap);
439
440         for (;;) {
441             ACQUIRE_LOCK(&task->lock);
442             // task->lock held, cap->lock not held
443             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
444             cap = task->cap;
445             task->wakeup = rtsFalse;
446             RELEASE_LOCK(&task->lock);
447
448             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
449             ACQUIRE_LOCK(&cap->lock);
450             if (cap->running_task != NULL) {
451                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
452                 RELEASE_LOCK(&cap->lock);
453                 continue;
454             }
455
456             if (task->tso == NULL) {
457                 ASSERT(cap->spare_workers != NULL);
458                 // if we're not at the front of the queue, release it
459                 // again.  This is unlikely to happen.
460                 if (cap->spare_workers != task) {
461                     giveCapabilityToTask(cap,cap->spare_workers);
462                     RELEASE_LOCK(&cap->lock);
463                     continue;
464                 }
465                 cap->spare_workers = task->next;
466                 task->next = NULL;
467             }
468             cap->running_task = task;
469             RELEASE_LOCK(&cap->lock);
470             break;
471         }
472
473         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
474         ASSERT(cap->running_task == task);
475     }
476
477     *pCap = cap;
478
479     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
480
481     return;
482 }
483
484 /* ----------------------------------------------------------------------------
485  * prodCapabilities
486  *
487  * Used to indicate that the interrupted flag is now set, or some
488  * other global condition that might require waking up a Task on each
489  * Capability.
490  * ------------------------------------------------------------------------- */
491
492 static void
493 prodCapabilities(rtsBool all)
494 {
495     nat i;
496     Capability *cap;
497     Task *task;
498
499     for (i=0; i < n_capabilities; i++) {
500         cap = &capabilities[i];
501         ACQUIRE_LOCK(&cap->lock);
502         if (!cap->running_task) {
503             if (cap->spare_workers) {
504                 task = cap->spare_workers;
505                 ASSERT(!task->stopped);
506                 giveCapabilityToTask(cap,task);
507                 if (!all) {
508                     RELEASE_LOCK(&cap->lock);
509                     return;
510                 }
511             }
512         }
513         RELEASE_LOCK(&cap->lock);
514     }
515 }
516
517 void
518 prodAllCapabilities (void)
519 {
520     prodCapabilities(rtsTrue);
521 }
522
523 /* ----------------------------------------------------------------------------
524  * prodOneCapability
525  *
526  * Like prodAllCapabilities, but we only require a single Task to wake
527  * up in order to service some global event, such as checking for
528  * deadlock after some idle time has passed.
529  * ------------------------------------------------------------------------- */
530
531 void
532 prodOneCapability (void)
533 {
534     prodCapabilities(rtsFalse);
535 }
536
537 /* ----------------------------------------------------------------------------
538  * shutdownCapability
539  *
540  * At shutdown time, we want to let everything exit as cleanly as
541  * possible.  For each capability, we let its run queue drain, and
542  * allow the workers to stop.
543  *
544  * This function should be called when interrupted and
545  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
546  * will exit the scheduler and call taskStop(), and any bound thread
547  * that wakes up will return to its caller.  Runnable threads are
548  * killed.
549  *
550  * ------------------------------------------------------------------------- */
551
552 void
553 shutdownCapability (Capability *cap, Task *task)
554 {
555     nat i;
556
557     ASSERT(interrupted && shutting_down_scheduler);
558
559     task->cap = cap;
560
561     for (i = 0; i < 50; i++) {
562         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
563         ACQUIRE_LOCK(&cap->lock);
564         if (cap->running_task) {
565             RELEASE_LOCK(&cap->lock);
566             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
567             yieldThread();
568             continue;
569         }
570         cap->running_task = task;
571         if (!emptyRunQueue(cap) || cap->spare_workers) {
572             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
573             releaseCapability_(cap); // this will wake up a worker
574             RELEASE_LOCK(&cap->lock);
575             yieldThread();
576             continue;
577         }
578         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
579         RELEASE_LOCK(&cap->lock);
580         break;
581     }
582     // we now have the Capability, its run queue and spare workers
583     // list are both empty.
584 }
585
586 /* ----------------------------------------------------------------------------
587  * tryGrabCapability
588  *
589  * Attempt to gain control of a Capability if it is free.
590  *
591  * ------------------------------------------------------------------------- */
592
593 rtsBool
594 tryGrabCapability (Capability *cap, Task *task)
595 {
596     if (cap->running_task != NULL) return rtsFalse;
597     ACQUIRE_LOCK(&cap->lock);
598     if (cap->running_task != NULL) {
599         RELEASE_LOCK(&cap->lock);
600         return rtsFalse;
601     }
602     task->cap = cap;
603     cap->running_task = task;
604     RELEASE_LOCK(&cap->lock);
605     return rtsTrue;
606 }
607
608
609 #endif /* THREADED_RTS */
610
611