a4380e776d5dd478f1413e52a7c8267c023ddf8f
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #if defined(THREADED_RTS)
43 STATIC_INLINE rtsBool
44 globalWorkToDo (void)
45 {
46     return blackholes_need_checking
47         || interrupted
48         ;
49 }
50 #endif
51
52 #if defined(THREADED_RTS)
53 STATIC_INLINE rtsBool
54 anyWorkForMe( Capability *cap, Task *task )
55 {
56     // If the run queue is not empty, then we only wake up the guy who
57     // can run the thread at the head, even if there is some other
58     // reason for this task to run (eg. interrupted=rtsTrue).
59     if (!emptyRunQueue(cap)) {
60         if (cap->run_queue_hd->bound == NULL) {
61             return (task->tso == NULL);
62         } else {
63             return (cap->run_queue_hd->bound == task);
64         }
65     } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
66         return rtsTrue;
67     }
68     return globalWorkToDo();
69 }
70 #endif
71
72 /* -----------------------------------------------------------------------------
73  * Manage the returning_tasks lists.
74  *
75  * These functions require cap->lock
76  * -------------------------------------------------------------------------- */
77
78 #if defined(THREADED_RTS)
79 STATIC_INLINE void
80 newReturningTask (Capability *cap, Task *task)
81 {
82     ASSERT_LOCK_HELD(&cap->lock);
83     ASSERT(task->return_link == NULL);
84     if (cap->returning_tasks_hd) {
85         ASSERT(cap->returning_tasks_tl->return_link == NULL);
86         cap->returning_tasks_tl->return_link = task;
87     } else {
88         cap->returning_tasks_hd = task;
89     }
90     cap->returning_tasks_tl = task;
91 }
92
93 STATIC_INLINE Task *
94 popReturningTask (Capability *cap)
95 {
96     ASSERT_LOCK_HELD(&cap->lock);
97     Task *task;
98     task = cap->returning_tasks_hd;
99     ASSERT(task);
100     cap->returning_tasks_hd = task->return_link;
101     if (!cap->returning_tasks_hd) {
102         cap->returning_tasks_tl = NULL;
103     }
104     task->return_link = NULL;
105     return task;
106 }
107 #endif
108
109 /* ----------------------------------------------------------------------------
110  * Initialisation
111  *
112  * The Capability is initially marked not free.
113  * ------------------------------------------------------------------------- */
114
115 static void
116 initCapability( Capability *cap, nat i )
117 {
118     nat g;
119
120     cap->no = i;
121     cap->in_haskell        = rtsFalse;
122
123     cap->run_queue_hd      = END_TSO_QUEUE;
124     cap->run_queue_tl      = END_TSO_QUEUE;
125
126 #if defined(THREADED_RTS)
127     initMutex(&cap->lock);
128     cap->running_task      = NULL; // indicates cap is free
129     cap->spare_workers     = NULL;
130     cap->suspended_ccalling_tasks = NULL;
131     cap->returning_tasks_hd = NULL;
132     cap->returning_tasks_tl = NULL;
133 #endif
134
135     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
136     cap->f.stgGCFun        = (F_)__stg_gc_fun;
137
138     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
139                                      RtsFlags.GcFlags.generations,
140                                      "initCapability");
141
142     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
143         cap->mut_lists[g] = NULL;
144     }
145
146     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
147     cap->free_trec_chunks = END_STM_CHUNK_LIST;
148     cap->free_trec_headers = NO_TREC;
149     cap->transaction_tokens = 0;
150 }
151
152 /* ---------------------------------------------------------------------------
153  * Function:  initCapabilities()
154  *
155  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
156  *            we keep a table of them, the size of which is
157  *            controlled by the user via the RTS flag -N.
158  *
159  * ------------------------------------------------------------------------- */
160 void
161 initCapabilities( void )
162 {
163 #if defined(THREADED_RTS)
164     nat i;
165
166 #ifndef REG_Base
167     // We can't support multiple CPUs if BaseReg is not a register
168     if (RtsFlags.ParFlags.nNodes > 1) {
169         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
170         RtsFlags.ParFlags.nNodes = 1;
171     }
172 #endif
173
174     n_capabilities = RtsFlags.ParFlags.nNodes;
175
176     if (n_capabilities == 1) {
177         capabilities = &MainCapability;
178         // THREADED_RTS must work on builds that don't have a mutable
179         // BaseReg (eg. unregisterised), so in this case
180         // capabilities[0] must coincide with &MainCapability.
181     } else {
182         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
183                                       "initCapabilities");
184     }
185
186     for (i = 0; i < n_capabilities; i++) {
187         initCapability(&capabilities[i], i);
188     }
189
190     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", 
191                                     n_capabilities));
192
193 #else /* !THREADED_RTS */
194
195     n_capabilities = 1;
196     capabilities = &MainCapability;
197     initCapability(&MainCapability, 0);
198
199 #endif
200
201     // There are no free capabilities to begin with.  We will start
202     // a worker Task to each Capability, which will quickly put the
203     // Capability on the free list when it finds nothing to do.
204     last_free_capability = &capabilities[0];
205 }
206
207 /* ----------------------------------------------------------------------------
208  * Give a Capability to a Task.  The task must currently be sleeping
209  * on its condition variable.
210  *
211  * Requires cap->lock (modifies cap->running_task).
212  *
213  * When migrating a Task, the migrater must take task->lock before
214  * modifying task->cap, to synchronise with the waking up Task.
215  * Additionally, the migrater should own the Capability (when
216  * migrating the run queue), or cap->lock (when migrating
217  * returning_workers).
218  *
219  * ------------------------------------------------------------------------- */
220
221 #if defined(THREADED_RTS)
222 STATIC_INLINE void
223 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
224 {
225     ASSERT_LOCK_HELD(&cap->lock);
226     ASSERT(task->cap == cap);
227     IF_DEBUG(scheduler,
228              sched_belch("passing capability %d to %s %p",
229                          cap->no, task->tso ? "bound task" : "worker",
230                          (void *)task->id));
231     ACQUIRE_LOCK(&task->lock);
232     task->wakeup = rtsTrue;
233     // the wakeup flag is needed because signalCondition() doesn't
234     // flag the condition if the thread is already runniing, but we want
235     // it to be sticky.
236     signalCondition(&task->cond);
237     RELEASE_LOCK(&task->lock);
238 }
239 #endif
240
241 /* ----------------------------------------------------------------------------
242  * Function:  releaseCapability(Capability*)
243  *
244  * Purpose:   Letting go of a capability. Causes a
245  *            'returning worker' thread or a 'waiting worker'
246  *            to wake up, in that order.
247  * ------------------------------------------------------------------------- */
248
249 #if defined(THREADED_RTS)
250 void
251 releaseCapability_ (Capability* cap)
252 {
253     Task *task;
254
255     task = cap->running_task;
256
257     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
258
259     cap->running_task = NULL;
260
261     // Check to see whether a worker thread can be given
262     // the go-ahead to return the result of an external call..
263     if (cap->returning_tasks_hd != NULL) {
264         giveCapabilityToTask(cap,cap->returning_tasks_hd);
265         // The Task pops itself from the queue (see waitForReturnCapability())
266         return;
267     }
268
269     // If the next thread on the run queue is a bound thread,
270     // give this Capability to the appropriate Task.
271     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
272         // Make sure we're not about to try to wake ourselves up
273         ASSERT(task != cap->run_queue_hd->bound);
274         task = cap->run_queue_hd->bound;
275         giveCapabilityToTask(cap,task);
276         return;
277     }
278
279     if (!cap->spare_workers) {
280         // Create a worker thread if we don't have one.  If the system
281         // is interrupted, we only create a worker task if there
282         // are threads that need to be completed.  If the system is
283         // shutting down, we never create a new worker.
284         if (!shutting_down_scheduler) {
285             IF_DEBUG(scheduler,
286                      sched_belch("starting new worker on capability %d", cap->no));
287             startWorkerTask(cap, workerStart);
288             return;
289         }
290     }
291
292     // If we have an unbound thread on the run queue, or if there's
293     // anything else to do, give the Capability to a worker thread.
294     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
295         if (cap->spare_workers) {
296             giveCapabilityToTask(cap,cap->spare_workers);
297             // The worker Task pops itself from the queue;
298             return;
299         }
300     }
301
302     last_free_capability = cap;
303     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
304 }
305
306 void
307 releaseCapability (Capability* cap USED_IF_THREADS)
308 {
309     ACQUIRE_LOCK(&cap->lock);
310     releaseCapability_(cap);
311     RELEASE_LOCK(&cap->lock);
312 }
313
314 static void
315 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
316 {
317     Task *task;
318
319     ACQUIRE_LOCK(&cap->lock);
320
321     task = cap->running_task;
322
323     // If the current task is a worker, save it on the spare_workers
324     // list of this Capability.  A worker can mark itself as stopped,
325     // in which case it is not replaced on the spare_worker queue.
326     // This happens when the system is shutting down (see
327     // Schedule.c:workerStart()).
328     // Also, be careful to check that this task hasn't just exited
329     // Haskell to do a foreign call (task->suspended_tso).
330     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
331         task->next = cap->spare_workers;
332         cap->spare_workers = task;
333     }
334     // Bound tasks just float around attached to their TSOs.
335
336     releaseCapability_(cap);
337
338     RELEASE_LOCK(&cap->lock);
339 }
340 #endif
341
342 /* ----------------------------------------------------------------------------
343  * waitForReturnCapability( Task *task )
344  *
345  * Purpose:  when an OS thread returns from an external call,
346  * it calls waitForReturnCapability() (via Schedule.resumeThread())
347  * to wait for permission to enter the RTS & communicate the
348  * result of the external call back to the Haskell thread that
349  * made it.
350  *
351  * ------------------------------------------------------------------------- */
352 void
353 waitForReturnCapability (Capability **pCap, Task *task)
354 {
355 #if !defined(THREADED_RTS)
356
357     MainCapability.running_task = task;
358     task->cap = &MainCapability;
359     *pCap = &MainCapability;
360
361 #else
362     Capability *cap = *pCap;
363
364     if (cap == NULL) {
365         // Try last_free_capability first
366         cap = last_free_capability;
367         if (!cap->running_task) {
368             nat i;
369             // otherwise, search for a free capability
370             for (i = 0; i < n_capabilities; i++) {
371                 cap = &capabilities[i];
372                 if (!cap->running_task) {
373                     break;
374                 }
375             }
376             // Can't find a free one, use last_free_capability.
377             cap = last_free_capability;
378         }
379
380         // record the Capability as the one this Task is now assocated with.
381         task->cap = cap;
382
383     } else {
384         ASSERT(task->cap == cap);
385     }
386
387     ACQUIRE_LOCK(&cap->lock);
388
389     IF_DEBUG(scheduler,
390              sched_belch("returning; I want capability %d", cap->no));
391
392     if (!cap->running_task) {
393         // It's free; just grab it
394         cap->running_task = task;
395         RELEASE_LOCK(&cap->lock);
396     } else {
397         newReturningTask(cap,task);
398         RELEASE_LOCK(&cap->lock);
399
400         for (;;) {
401             ACQUIRE_LOCK(&task->lock);
402             // task->lock held, cap->lock not held
403             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
404             cap = task->cap;
405             task->wakeup = rtsFalse;
406             RELEASE_LOCK(&task->lock);
407
408             // now check whether we should wake up...
409             ACQUIRE_LOCK(&cap->lock);
410             if (cap->running_task == NULL) {
411                 if (cap->returning_tasks_hd != task) {
412                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
413                     RELEASE_LOCK(&cap->lock);
414                     continue;
415                 }
416                 cap->running_task = task;
417                 popReturningTask(cap);
418                 RELEASE_LOCK(&cap->lock);
419                 break;
420             }
421             RELEASE_LOCK(&cap->lock);
422         }
423
424     }
425
426     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
427
428     IF_DEBUG(scheduler,
429              sched_belch("returning; got capability %d", cap->no));
430
431     *pCap = cap;
432 #endif
433 }
434
435 #if defined(THREADED_RTS)
436 /* ----------------------------------------------------------------------------
437  * yieldCapability
438  * ------------------------------------------------------------------------- */
439
440 void
441 yieldCapability (Capability** pCap, Task *task)
442 {
443     Capability *cap = *pCap;
444
445     // The fast path has no locking, if we don't enter this while loop
446
447     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
448         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
449
450         // We must now release the capability and wait to be woken up
451         // again.
452         task->wakeup = rtsFalse;
453         releaseCapabilityAndQueueWorker(cap);
454
455         for (;;) {
456             ACQUIRE_LOCK(&task->lock);
457             // task->lock held, cap->lock not held
458             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
459             cap = task->cap;
460             task->wakeup = rtsFalse;
461             RELEASE_LOCK(&task->lock);
462
463             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
464             ACQUIRE_LOCK(&cap->lock);
465             if (cap->running_task != NULL) {
466                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
467                 RELEASE_LOCK(&cap->lock);
468                 continue;
469             }
470
471             if (task->tso == NULL) {
472                 ASSERT(cap->spare_workers != NULL);
473                 // if we're not at the front of the queue, release it
474                 // again.  This is unlikely to happen.
475                 if (cap->spare_workers != task) {
476                     giveCapabilityToTask(cap,cap->spare_workers);
477                     RELEASE_LOCK(&cap->lock);
478                     continue;
479                 }
480                 cap->spare_workers = task->next;
481                 task->next = NULL;
482             }
483             cap->running_task = task;
484             RELEASE_LOCK(&cap->lock);
485             break;
486         }
487
488         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
489         ASSERT(cap->running_task == task);
490     }
491
492     *pCap = cap;
493
494     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
495
496     return;
497 }
498
499 /* ----------------------------------------------------------------------------
500  * prodCapabilities
501  *
502  * Used to indicate that the interrupted flag is now set, or some
503  * other global condition that might require waking up a Task on each
504  * Capability.
505  * ------------------------------------------------------------------------- */
506
507 static void
508 prodCapabilities(rtsBool all)
509 {
510     nat i;
511     Capability *cap;
512     Task *task;
513
514     for (i=0; i < n_capabilities; i++) {
515         cap = &capabilities[i];
516         ACQUIRE_LOCK(&cap->lock);
517         if (!cap->running_task) {
518             if (cap->spare_workers) {
519                 task = cap->spare_workers;
520                 ASSERT(!task->stopped);
521                 giveCapabilityToTask(cap,task);
522                 if (!all) {
523                     RELEASE_LOCK(&cap->lock);
524                     return;
525                 }
526             }
527         }
528         RELEASE_LOCK(&cap->lock);
529     }
530     return;
531 }
532
533 void
534 prodAllCapabilities (void)
535 {
536     prodCapabilities(rtsTrue);
537 }
538
539 /* ----------------------------------------------------------------------------
540  * prodOneCapability
541  *
542  * Like prodAllCapabilities, but we only require a single Task to wake
543  * up in order to service some global event, such as checking for
544  * deadlock after some idle time has passed.
545  * ------------------------------------------------------------------------- */
546
547 void
548 prodOneCapability (void)
549 {
550     prodCapabilities(rtsFalse);
551 }
552
553 /* ----------------------------------------------------------------------------
554  * shutdownCapability
555  *
556  * At shutdown time, we want to let everything exit as cleanly as
557  * possible.  For each capability, we let its run queue drain, and
558  * allow the workers to stop.
559  *
560  * This function should be called when interrupted and
561  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
562  * will exit the scheduler and call taskStop(), and any bound thread
563  * that wakes up will return to its caller.  Runnable threads are
564  * killed.
565  *
566  * ------------------------------------------------------------------------- */
567
568 void
569 shutdownCapability (Capability *cap, Task *task)
570 {
571     nat i;
572
573     ASSERT(interrupted && shutting_down_scheduler);
574
575     task->cap = cap;
576
577     for (i = 0; i < 50; i++) {
578         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
579         ACQUIRE_LOCK(&cap->lock);
580         if (cap->running_task) {
581             RELEASE_LOCK(&cap->lock);
582             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
583             yieldThread();
584             continue;
585         }
586         cap->running_task = task;
587         if (!emptyRunQueue(cap) || cap->spare_workers) {
588             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
589             releaseCapability_(cap); // this will wake up a worker
590             RELEASE_LOCK(&cap->lock);
591             yieldThread();
592             continue;
593         }
594         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
595         RELEASE_LOCK(&cap->lock);
596         break;
597     }
598     // we now have the Capability, its run queue and spare workers
599     // list are both empty.
600 }
601
602 /* ----------------------------------------------------------------------------
603  * tryGrabCapability
604  *
605  * Attempt to gain control of a Capability if it is free.
606  *
607  * ------------------------------------------------------------------------- */
608
609 rtsBool
610 tryGrabCapability (Capability *cap, Task *task)
611 {
612     if (cap->running_task != NULL) return rtsFalse;
613     ACQUIRE_LOCK(&cap->lock);
614     if (cap->running_task != NULL) {
615         RELEASE_LOCK(&cap->lock);
616         return rtsFalse;
617     }
618     task->cap = cap;
619     cap->running_task = task;
620     RELEASE_LOCK(&cap->lock);
621     return rtsTrue;
622 }
623
624
625 #endif /* THREADED_RTS */
626
627