fix bug in previous patch to this file
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #if defined(THREADED_RTS)
43 STATIC_INLINE rtsBool
44 globalWorkToDo (void)
45 {
46     return blackholes_need_checking
47         || interrupted
48         ;
49 }
50 #endif
51
52 #if defined(THREADED_RTS)
53 STATIC_INLINE rtsBool
54 anyWorkForMe( Capability *cap, Task *task )
55 {
56     if (task->tso != NULL) {
57         // A bound task only runs if its thread is on the run queue of
58         // the capability on which it was woken up.  Otherwise, we
59         // can't be sure that we have the right capability: the thread
60         // might be woken up on some other capability, and task->cap
61         // could change under our feet.
62         return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
63     } else {
64         // A vanilla worker task runs if either there is a lightweight
65         // thread at the head of the run queue, or the run queue is
66         // empty and (there are sparks to execute, or there is some
67         // other global condition to check, such as threads blocked on
68         // blackholes).
69         if (emptyRunQueue(cap)) {
70             return !emptySparkPoolCap(cap) || globalWorkToDo();
71         } else
72             return cap->run_queue_hd->bound == NULL;
73     }
74 }
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Manage the returning_tasks lists.
79  *
80  * These functions require cap->lock
81  * -------------------------------------------------------------------------- */
82
83 #if defined(THREADED_RTS)
84 STATIC_INLINE void
85 newReturningTask (Capability *cap, Task *task)
86 {
87     ASSERT_LOCK_HELD(&cap->lock);
88     ASSERT(task->return_link == NULL);
89     if (cap->returning_tasks_hd) {
90         ASSERT(cap->returning_tasks_tl->return_link == NULL);
91         cap->returning_tasks_tl->return_link = task;
92     } else {
93         cap->returning_tasks_hd = task;
94     }
95     cap->returning_tasks_tl = task;
96 }
97
98 STATIC_INLINE Task *
99 popReturningTask (Capability *cap)
100 {
101     ASSERT_LOCK_HELD(&cap->lock);
102     Task *task;
103     task = cap->returning_tasks_hd;
104     ASSERT(task);
105     cap->returning_tasks_hd = task->return_link;
106     if (!cap->returning_tasks_hd) {
107         cap->returning_tasks_tl = NULL;
108     }
109     task->return_link = NULL;
110     return task;
111 }
112 #endif
113
114 /* ----------------------------------------------------------------------------
115  * Initialisation
116  *
117  * The Capability is initially marked not free.
118  * ------------------------------------------------------------------------- */
119
120 static void
121 initCapability( Capability *cap, nat i )
122 {
123     nat g;
124
125     cap->no = i;
126     cap->in_haskell        = rtsFalse;
127
128     cap->run_queue_hd      = END_TSO_QUEUE;
129     cap->run_queue_tl      = END_TSO_QUEUE;
130
131 #if defined(THREADED_RTS)
132     initMutex(&cap->lock);
133     cap->running_task      = NULL; // indicates cap is free
134     cap->spare_workers     = NULL;
135     cap->suspended_ccalling_tasks = NULL;
136     cap->returning_tasks_hd = NULL;
137     cap->returning_tasks_tl = NULL;
138 #endif
139
140     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
141     cap->f.stgGCFun        = (F_)__stg_gc_fun;
142
143     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
144                                      RtsFlags.GcFlags.generations,
145                                      "initCapability");
146
147     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
148         cap->mut_lists[g] = NULL;
149     }
150
151     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
152     cap->free_trec_chunks = END_STM_CHUNK_LIST;
153     cap->free_trec_headers = NO_TREC;
154     cap->transaction_tokens = 0;
155 }
156
157 /* ---------------------------------------------------------------------------
158  * Function:  initCapabilities()
159  *
160  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
161  *            we keep a table of them, the size of which is
162  *            controlled by the user via the RTS flag -N.
163  *
164  * ------------------------------------------------------------------------- */
165 void
166 initCapabilities( void )
167 {
168 #if defined(THREADED_RTS)
169     nat i;
170
171 #ifndef REG_Base
172     // We can't support multiple CPUs if BaseReg is not a register
173     if (RtsFlags.ParFlags.nNodes > 1) {
174         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
175         RtsFlags.ParFlags.nNodes = 1;
176     }
177 #endif
178
179     n_capabilities = RtsFlags.ParFlags.nNodes;
180
181     if (n_capabilities == 1) {
182         capabilities = &MainCapability;
183         // THREADED_RTS must work on builds that don't have a mutable
184         // BaseReg (eg. unregisterised), so in this case
185         // capabilities[0] must coincide with &MainCapability.
186     } else {
187         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
188                                       "initCapabilities");
189     }
190
191     for (i = 0; i < n_capabilities; i++) {
192         initCapability(&capabilities[i], i);
193     }
194
195     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", 
196                                     n_capabilities));
197
198 #else /* !THREADED_RTS */
199
200     n_capabilities = 1;
201     capabilities = &MainCapability;
202     initCapability(&MainCapability, 0);
203
204 #endif
205
206     // There are no free capabilities to begin with.  We will start
207     // a worker Task to each Capability, which will quickly put the
208     // Capability on the free list when it finds nothing to do.
209     last_free_capability = &capabilities[0];
210 }
211
212 /* ----------------------------------------------------------------------------
213  * Give a Capability to a Task.  The task must currently be sleeping
214  * on its condition variable.
215  *
216  * Requires cap->lock (modifies cap->running_task).
217  *
218  * When migrating a Task, the migrater must take task->lock before
219  * modifying task->cap, to synchronise with the waking up Task.
220  * Additionally, the migrater should own the Capability (when
221  * migrating the run queue), or cap->lock (when migrating
222  * returning_workers).
223  *
224  * ------------------------------------------------------------------------- */
225
226 #if defined(THREADED_RTS)
227 STATIC_INLINE void
228 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
229 {
230     ASSERT_LOCK_HELD(&cap->lock);
231     ASSERT(task->cap == cap);
232     IF_DEBUG(scheduler,
233              sched_belch("passing capability %d to %s %p",
234                          cap->no, task->tso ? "bound task" : "worker",
235                          (void *)task->id));
236     ACQUIRE_LOCK(&task->lock);
237     task->wakeup = rtsTrue;
238     // the wakeup flag is needed because signalCondition() doesn't
239     // flag the condition if the thread is already runniing, but we want
240     // it to be sticky.
241     signalCondition(&task->cond);
242     RELEASE_LOCK(&task->lock);
243 }
244 #endif
245
246 /* ----------------------------------------------------------------------------
247  * Function:  releaseCapability(Capability*)
248  *
249  * Purpose:   Letting go of a capability. Causes a
250  *            'returning worker' thread or a 'waiting worker'
251  *            to wake up, in that order.
252  * ------------------------------------------------------------------------- */
253
254 #if defined(THREADED_RTS)
255 void
256 releaseCapability_ (Capability* cap)
257 {
258     Task *task;
259
260     task = cap->running_task;
261
262     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
263
264     cap->running_task = NULL;
265
266     // Check to see whether a worker thread can be given
267     // the go-ahead to return the result of an external call..
268     if (cap->returning_tasks_hd != NULL) {
269         giveCapabilityToTask(cap,cap->returning_tasks_hd);
270         // The Task pops itself from the queue (see waitForReturnCapability())
271         return;
272     }
273
274     // If the next thread on the run queue is a bound thread,
275     // give this Capability to the appropriate Task.
276     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
277         // Make sure we're not about to try to wake ourselves up
278         ASSERT(task != cap->run_queue_hd->bound);
279         task = cap->run_queue_hd->bound;
280         giveCapabilityToTask(cap,task);
281         return;
282     }
283
284     if (!cap->spare_workers) {
285         // Create a worker thread if we don't have one.  If the system
286         // is interrupted, we only create a worker task if there
287         // are threads that need to be completed.  If the system is
288         // shutting down, we never create a new worker.
289         if (!shutting_down_scheduler) {
290             IF_DEBUG(scheduler,
291                      sched_belch("starting new worker on capability %d", cap->no));
292             startWorkerTask(cap, workerStart);
293             return;
294         }
295     }
296
297     // If we have an unbound thread on the run queue, or if there's
298     // anything else to do, give the Capability to a worker thread.
299     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
300         if (cap->spare_workers) {
301             giveCapabilityToTask(cap,cap->spare_workers);
302             // The worker Task pops itself from the queue;
303             return;
304         }
305     }
306
307     last_free_capability = cap;
308     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
309 }
310
311 void
312 releaseCapability (Capability* cap USED_IF_THREADS)
313 {
314     ACQUIRE_LOCK(&cap->lock);
315     releaseCapability_(cap);
316     RELEASE_LOCK(&cap->lock);
317 }
318
319 static void
320 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
321 {
322     Task *task;
323
324     ACQUIRE_LOCK(&cap->lock);
325
326     task = cap->running_task;
327
328     // If the current task is a worker, save it on the spare_workers
329     // list of this Capability.  A worker can mark itself as stopped,
330     // in which case it is not replaced on the spare_worker queue.
331     // This happens when the system is shutting down (see
332     // Schedule.c:workerStart()).
333     // Also, be careful to check that this task hasn't just exited
334     // Haskell to do a foreign call (task->suspended_tso).
335     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
336         task->next = cap->spare_workers;
337         cap->spare_workers = task;
338     }
339     // Bound tasks just float around attached to their TSOs.
340
341     releaseCapability_(cap);
342
343     RELEASE_LOCK(&cap->lock);
344 }
345 #endif
346
347 /* ----------------------------------------------------------------------------
348  * waitForReturnCapability( Task *task )
349  *
350  * Purpose:  when an OS thread returns from an external call,
351  * it calls waitForReturnCapability() (via Schedule.resumeThread())
352  * to wait for permission to enter the RTS & communicate the
353  * result of the external call back to the Haskell thread that
354  * made it.
355  *
356  * ------------------------------------------------------------------------- */
357 void
358 waitForReturnCapability (Capability **pCap, Task *task)
359 {
360 #if !defined(THREADED_RTS)
361
362     MainCapability.running_task = task;
363     task->cap = &MainCapability;
364     *pCap = &MainCapability;
365
366 #else
367     Capability *cap = *pCap;
368
369     if (cap == NULL) {
370         // Try last_free_capability first
371         cap = last_free_capability;
372         if (!cap->running_task) {
373             nat i;
374             // otherwise, search for a free capability
375             for (i = 0; i < n_capabilities; i++) {
376                 cap = &capabilities[i];
377                 if (!cap->running_task) {
378                     break;
379                 }
380             }
381             // Can't find a free one, use last_free_capability.
382             cap = last_free_capability;
383         }
384
385         // record the Capability as the one this Task is now assocated with.
386         task->cap = cap;
387
388     } else {
389         ASSERT(task->cap == cap);
390     }
391
392     ACQUIRE_LOCK(&cap->lock);
393
394     IF_DEBUG(scheduler,
395              sched_belch("returning; I want capability %d", cap->no));
396
397     if (!cap->running_task) {
398         // It's free; just grab it
399         cap->running_task = task;
400         RELEASE_LOCK(&cap->lock);
401     } else {
402         newReturningTask(cap,task);
403         RELEASE_LOCK(&cap->lock);
404
405         for (;;) {
406             ACQUIRE_LOCK(&task->lock);
407             // task->lock held, cap->lock not held
408             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
409             cap = task->cap;
410             task->wakeup = rtsFalse;
411             RELEASE_LOCK(&task->lock);
412
413             // now check whether we should wake up...
414             ACQUIRE_LOCK(&cap->lock);
415             if (cap->running_task == NULL) {
416                 if (cap->returning_tasks_hd != task) {
417                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
418                     RELEASE_LOCK(&cap->lock);
419                     continue;
420                 }
421                 cap->running_task = task;
422                 popReturningTask(cap);
423                 RELEASE_LOCK(&cap->lock);
424                 break;
425             }
426             RELEASE_LOCK(&cap->lock);
427         }
428
429     }
430
431     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
432
433     IF_DEBUG(scheduler,
434              sched_belch("returning; got capability %d", cap->no));
435
436     *pCap = cap;
437 #endif
438 }
439
440 #if defined(THREADED_RTS)
441 /* ----------------------------------------------------------------------------
442  * yieldCapability
443  * ------------------------------------------------------------------------- */
444
445 void
446 yieldCapability (Capability** pCap, Task *task)
447 {
448     Capability *cap = *pCap;
449
450     // The fast path has no locking, if we don't enter this while loop
451
452     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
453         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
454
455         // We must now release the capability and wait to be woken up
456         // again.
457         task->wakeup = rtsFalse;
458         releaseCapabilityAndQueueWorker(cap);
459
460         for (;;) {
461             ACQUIRE_LOCK(&task->lock);
462             // task->lock held, cap->lock not held
463             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
464             cap = task->cap;
465             task->wakeup = rtsFalse;
466             RELEASE_LOCK(&task->lock);
467
468             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
469             ACQUIRE_LOCK(&cap->lock);
470             if (cap->running_task != NULL) {
471                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
472                 RELEASE_LOCK(&cap->lock);
473                 continue;
474             }
475
476             if (task->tso == NULL) {
477                 ASSERT(cap->spare_workers != NULL);
478                 // if we're not at the front of the queue, release it
479                 // again.  This is unlikely to happen.
480                 if (cap->spare_workers != task) {
481                     giveCapabilityToTask(cap,cap->spare_workers);
482                     RELEASE_LOCK(&cap->lock);
483                     continue;
484                 }
485                 cap->spare_workers = task->next;
486                 task->next = NULL;
487             }
488             cap->running_task = task;
489             RELEASE_LOCK(&cap->lock);
490             break;
491         }
492
493         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
494         ASSERT(cap->running_task == task);
495     }
496
497     *pCap = cap;
498
499     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500
501     return;
502 }
503
504 /* ----------------------------------------------------------------------------
505  * prodCapabilities
506  *
507  * Used to indicate that the interrupted flag is now set, or some
508  * other global condition that might require waking up a Task on each
509  * Capability.
510  * ------------------------------------------------------------------------- */
511
512 static void
513 prodCapabilities(rtsBool all)
514 {
515     nat i;
516     Capability *cap;
517     Task *task;
518
519     for (i=0; i < n_capabilities; i++) {
520         cap = &capabilities[i];
521         ACQUIRE_LOCK(&cap->lock);
522         if (!cap->running_task) {
523             if (cap->spare_workers) {
524                 task = cap->spare_workers;
525                 ASSERT(!task->stopped);
526                 giveCapabilityToTask(cap,task);
527                 if (!all) {
528                     RELEASE_LOCK(&cap->lock);
529                     return;
530                 }
531             }
532         }
533         RELEASE_LOCK(&cap->lock);
534     }
535     return;
536 }
537
538 void
539 prodAllCapabilities (void)
540 {
541     prodCapabilities(rtsTrue);
542 }
543
544 /* ----------------------------------------------------------------------------
545  * prodOneCapability
546  *
547  * Like prodAllCapabilities, but we only require a single Task to wake
548  * up in order to service some global event, such as checking for
549  * deadlock after some idle time has passed.
550  * ------------------------------------------------------------------------- */
551
552 void
553 prodOneCapability (void)
554 {
555     prodCapabilities(rtsFalse);
556 }
557
558 /* ----------------------------------------------------------------------------
559  * shutdownCapability
560  *
561  * At shutdown time, we want to let everything exit as cleanly as
562  * possible.  For each capability, we let its run queue drain, and
563  * allow the workers to stop.
564  *
565  * This function should be called when interrupted and
566  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
567  * will exit the scheduler and call taskStop(), and any bound thread
568  * that wakes up will return to its caller.  Runnable threads are
569  * killed.
570  *
571  * ------------------------------------------------------------------------- */
572
573 void
574 shutdownCapability (Capability *cap, Task *task)
575 {
576     nat i;
577
578     ASSERT(interrupted && shutting_down_scheduler);
579
580     task->cap = cap;
581
582     for (i = 0; i < 50; i++) {
583         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
584         ACQUIRE_LOCK(&cap->lock);
585         if (cap->running_task) {
586             RELEASE_LOCK(&cap->lock);
587             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
588             yieldThread();
589             continue;
590         }
591         cap->running_task = task;
592         if (!emptyRunQueue(cap) || cap->spare_workers) {
593             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
594             releaseCapability_(cap); // this will wake up a worker
595             RELEASE_LOCK(&cap->lock);
596             yieldThread();
597             continue;
598         }
599         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
600         RELEASE_LOCK(&cap->lock);
601         break;
602     }
603     // we now have the Capability, its run queue and spare workers
604     // list are both empty.
605 }
606
607 /* ----------------------------------------------------------------------------
608  * tryGrabCapability
609  *
610  * Attempt to gain control of a Capability if it is free.
611  *
612  * ------------------------------------------------------------------------- */
613
614 rtsBool
615 tryGrabCapability (Capability *cap, Task *task)
616 {
617     if (cap->running_task != NULL) return rtsFalse;
618     ACQUIRE_LOCK(&cap->lock);
619     if (cap->running_task != NULL) {
620         RELEASE_LOCK(&cap->lock);
621         return rtsFalse;
622     }
623     task->cap = cap;
624     cap->running_task = task;
625     RELEASE_LOCK(&cap->lock);
626     return rtsTrue;
627 }
628
629
630 #endif /* THREADED_RTS */
631
632