Adding TcGadt.lhs
[ghc-hetmet.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28 #include "Trace.h"
29
30 // one global capability, this is the Capability for non-threaded
31 // builds, and for +RTS -N1
32 Capability MainCapability;
33
34 nat n_capabilities;
35 Capability *capabilities = NULL;
36
37 // Holds the Capability which last became free.  This is used so that
38 // an in-call has a chance of quickly finding a free Capability.
39 // Maintaining a global free list of Capabilities would require global
40 // locking, so we don't do that.
41 Capability *last_free_capability;
42
43 #if defined(THREADED_RTS)
44 STATIC_INLINE rtsBool
45 globalWorkToDo (void)
46 {
47     return blackholes_need_checking
48         || sched_state >= SCHED_INTERRUPTING
49         ;
50 }
51 #endif
52
53 #if defined(THREADED_RTS)
54 STATIC_INLINE rtsBool
55 anyWorkForMe( Capability *cap, Task *task )
56 {
57     if (task->tso != NULL) {
58         // A bound task only runs if its thread is on the run queue of
59         // the capability on which it was woken up.  Otherwise, we
60         // can't be sure that we have the right capability: the thread
61         // might be woken up on some other capability, and task->cap
62         // could change under our feet.
63         return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
64     } else {
65         // A vanilla worker task runs if either there is a lightweight
66         // thread at the head of the run queue, or the run queue is
67         // empty and (there are sparks to execute, or there is some
68         // other global condition to check, such as threads blocked on
69         // blackholes).
70         if (emptyRunQueue(cap)) {
71             return !emptySparkPoolCap(cap)
72                 || !emptyWakeupQueue(cap)
73                 || globalWorkToDo();
74         } else
75             return cap->run_queue_hd->bound == NULL;
76     }
77 }
78 #endif
79
80 /* -----------------------------------------------------------------------------
81  * Manage the returning_tasks lists.
82  *
83  * These functions require cap->lock
84  * -------------------------------------------------------------------------- */
85
86 #if defined(THREADED_RTS)
87 STATIC_INLINE void
88 newReturningTask (Capability *cap, Task *task)
89 {
90     ASSERT_LOCK_HELD(&cap->lock);
91     ASSERT(task->return_link == NULL);
92     if (cap->returning_tasks_hd) {
93         ASSERT(cap->returning_tasks_tl->return_link == NULL);
94         cap->returning_tasks_tl->return_link = task;
95     } else {
96         cap->returning_tasks_hd = task;
97     }
98     cap->returning_tasks_tl = task;
99 }
100
101 STATIC_INLINE Task *
102 popReturningTask (Capability *cap)
103 {
104     ASSERT_LOCK_HELD(&cap->lock);
105     Task *task;
106     task = cap->returning_tasks_hd;
107     ASSERT(task);
108     cap->returning_tasks_hd = task->return_link;
109     if (!cap->returning_tasks_hd) {
110         cap->returning_tasks_tl = NULL;
111     }
112     task->return_link = NULL;
113     return task;
114 }
115 #endif
116
117 /* ----------------------------------------------------------------------------
118  * Initialisation
119  *
120  * The Capability is initially marked not free.
121  * ------------------------------------------------------------------------- */
122
123 static void
124 initCapability( Capability *cap, nat i )
125 {
126     nat g;
127
128     cap->no = i;
129     cap->in_haskell        = rtsFalse;
130
131     cap->run_queue_hd      = END_TSO_QUEUE;
132     cap->run_queue_tl      = END_TSO_QUEUE;
133
134 #if defined(THREADED_RTS)
135     initMutex(&cap->lock);
136     cap->running_task      = NULL; // indicates cap is free
137     cap->spare_workers     = NULL;
138     cap->suspended_ccalling_tasks = NULL;
139     cap->returning_tasks_hd = NULL;
140     cap->returning_tasks_tl = NULL;
141     cap->wakeup_queue_hd    = END_TSO_QUEUE;
142     cap->wakeup_queue_tl    = END_TSO_QUEUE;
143 #endif
144
145     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
146     cap->f.stgGCFun        = (F_)__stg_gc_fun;
147
148     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
149                                      RtsFlags.GcFlags.generations,
150                                      "initCapability");
151
152     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
153         cap->mut_lists[g] = NULL;
154     }
155
156     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
157     cap->free_trec_chunks = END_STM_CHUNK_LIST;
158     cap->free_trec_headers = NO_TREC;
159     cap->transaction_tokens = 0;
160 }
161
162 /* ---------------------------------------------------------------------------
163  * Function:  initCapabilities()
164  *
165  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
166  *            we keep a table of them, the size of which is
167  *            controlled by the user via the RTS flag -N.
168  *
169  * ------------------------------------------------------------------------- */
170 void
171 initCapabilities( void )
172 {
173 #if defined(THREADED_RTS)
174     nat i;
175
176 #ifndef REG_Base
177     // We can't support multiple CPUs if BaseReg is not a register
178     if (RtsFlags.ParFlags.nNodes > 1) {
179         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
180         RtsFlags.ParFlags.nNodes = 1;
181     }
182 #endif
183
184     n_capabilities = RtsFlags.ParFlags.nNodes;
185
186     if (n_capabilities == 1) {
187         capabilities = &MainCapability;
188         // THREADED_RTS must work on builds that don't have a mutable
189         // BaseReg (eg. unregisterised), so in this case
190         // capabilities[0] must coincide with &MainCapability.
191     } else {
192         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
193                                       "initCapabilities");
194     }
195
196     for (i = 0; i < n_capabilities; i++) {
197         initCapability(&capabilities[i], i);
198     }
199
200     debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
201
202 #else /* !THREADED_RTS */
203
204     n_capabilities = 1;
205     capabilities = &MainCapability;
206     initCapability(&MainCapability, 0);
207
208 #endif
209
210     // There are no free capabilities to begin with.  We will start
211     // a worker Task to each Capability, which will quickly put the
212     // Capability on the free list when it finds nothing to do.
213     last_free_capability = &capabilities[0];
214 }
215
216 /* ----------------------------------------------------------------------------
217  * Give a Capability to a Task.  The task must currently be sleeping
218  * on its condition variable.
219  *
220  * Requires cap->lock (modifies cap->running_task).
221  *
222  * When migrating a Task, the migrater must take task->lock before
223  * modifying task->cap, to synchronise with the waking up Task.
224  * Additionally, the migrater should own the Capability (when
225  * migrating the run queue), or cap->lock (when migrating
226  * returning_workers).
227  *
228  * ------------------------------------------------------------------------- */
229
230 #if defined(THREADED_RTS)
231 STATIC_INLINE void
232 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
233 {
234     ASSERT_LOCK_HELD(&cap->lock);
235     ASSERT(task->cap == cap);
236     trace(TRACE_sched | DEBUG_sched,
237           "passing capability %d to %s %p",
238           cap->no, task->tso ? "bound task" : "worker",
239           (void *)task->id);
240     ACQUIRE_LOCK(&task->lock);
241     task->wakeup = rtsTrue;
242     // the wakeup flag is needed because signalCondition() doesn't
243     // flag the condition if the thread is already runniing, but we want
244     // it to be sticky.
245     signalCondition(&task->cond);
246     RELEASE_LOCK(&task->lock);
247 }
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Function:  releaseCapability(Capability*)
252  *
253  * Purpose:   Letting go of a capability. Causes a
254  *            'returning worker' thread or a 'waiting worker'
255  *            to wake up, in that order.
256  * ------------------------------------------------------------------------- */
257
258 #if defined(THREADED_RTS)
259 void
260 releaseCapability_ (Capability* cap)
261 {
262     Task *task;
263
264     task = cap->running_task;
265
266     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
267
268     cap->running_task = NULL;
269
270     // Check to see whether a worker thread can be given
271     // the go-ahead to return the result of an external call..
272     if (cap->returning_tasks_hd != NULL) {
273         giveCapabilityToTask(cap,cap->returning_tasks_hd);
274         // The Task pops itself from the queue (see waitForReturnCapability())
275         return;
276     }
277
278     // If the next thread on the run queue is a bound thread,
279     // give this Capability to the appropriate Task.
280     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
281         // Make sure we're not about to try to wake ourselves up
282         ASSERT(task != cap->run_queue_hd->bound);
283         task = cap->run_queue_hd->bound;
284         giveCapabilityToTask(cap,task);
285         return;
286     }
287
288     if (!cap->spare_workers) {
289         // Create a worker thread if we don't have one.  If the system
290         // is interrupted, we only create a worker task if there
291         // are threads that need to be completed.  If the system is
292         // shutting down, we never create a new worker.
293         if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
294             debugTrace(DEBUG_sched,
295                        "starting new worker on capability %d", cap->no);
296             startWorkerTask(cap, workerStart);
297             return;
298         }
299     }
300
301     // If we have an unbound thread on the run queue, or if there's
302     // anything else to do, give the Capability to a worker thread.
303     if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
304               || !emptySparkPoolCap(cap) || globalWorkToDo()) {
305         if (cap->spare_workers) {
306             giveCapabilityToTask(cap,cap->spare_workers);
307             // The worker Task pops itself from the queue;
308             return;
309         }
310     }
311
312     last_free_capability = cap;
313     trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
314 }
315
316 void
317 releaseCapability (Capability* cap USED_IF_THREADS)
318 {
319     ACQUIRE_LOCK(&cap->lock);
320     releaseCapability_(cap);
321     RELEASE_LOCK(&cap->lock);
322 }
323
324 static void
325 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
326 {
327     Task *task;
328
329     ACQUIRE_LOCK(&cap->lock);
330
331     task = cap->running_task;
332
333     // If the current task is a worker, save it on the spare_workers
334     // list of this Capability.  A worker can mark itself as stopped,
335     // in which case it is not replaced on the spare_worker queue.
336     // This happens when the system is shutting down (see
337     // Schedule.c:workerStart()).
338     // Also, be careful to check that this task hasn't just exited
339     // Haskell to do a foreign call (task->suspended_tso).
340     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
341         task->next = cap->spare_workers;
342         cap->spare_workers = task;
343     }
344     // Bound tasks just float around attached to their TSOs.
345
346     releaseCapability_(cap);
347
348     RELEASE_LOCK(&cap->lock);
349 }
350 #endif
351
352 /* ----------------------------------------------------------------------------
353  * waitForReturnCapability( Task *task )
354  *
355  * Purpose:  when an OS thread returns from an external call,
356  * it calls waitForReturnCapability() (via Schedule.resumeThread())
357  * to wait for permission to enter the RTS & communicate the
358  * result of the external call back to the Haskell thread that
359  * made it.
360  *
361  * ------------------------------------------------------------------------- */
362 void
363 waitForReturnCapability (Capability **pCap, Task *task)
364 {
365 #if !defined(THREADED_RTS)
366
367     MainCapability.running_task = task;
368     task->cap = &MainCapability;
369     *pCap = &MainCapability;
370
371 #else
372     Capability *cap = *pCap;
373
374     if (cap == NULL) {
375         // Try last_free_capability first
376         cap = last_free_capability;
377         if (!cap->running_task) {
378             nat i;
379             // otherwise, search for a free capability
380             for (i = 0; i < n_capabilities; i++) {
381                 cap = &capabilities[i];
382                 if (!cap->running_task) {
383                     break;
384                 }
385             }
386             // Can't find a free one, use last_free_capability.
387             cap = last_free_capability;
388         }
389
390         // record the Capability as the one this Task is now assocated with.
391         task->cap = cap;
392
393     } else {
394         ASSERT(task->cap == cap);
395     }
396
397     ACQUIRE_LOCK(&cap->lock);
398
399     debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
400
401     if (!cap->running_task) {
402         // It's free; just grab it
403         cap->running_task = task;
404         RELEASE_LOCK(&cap->lock);
405     } else {
406         newReturningTask(cap,task);
407         RELEASE_LOCK(&cap->lock);
408
409         for (;;) {
410             ACQUIRE_LOCK(&task->lock);
411             // task->lock held, cap->lock not held
412             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
413             cap = task->cap;
414             task->wakeup = rtsFalse;
415             RELEASE_LOCK(&task->lock);
416
417             // now check whether we should wake up...
418             ACQUIRE_LOCK(&cap->lock);
419             if (cap->running_task == NULL) {
420                 if (cap->returning_tasks_hd != task) {
421                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
422                     RELEASE_LOCK(&cap->lock);
423                     continue;
424                 }
425                 cap->running_task = task;
426                 popReturningTask(cap);
427                 RELEASE_LOCK(&cap->lock);
428                 break;
429             }
430             RELEASE_LOCK(&cap->lock);
431         }
432
433     }
434
435     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436
437     trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
438
439     *pCap = cap;
440 #endif
441 }
442
443 #if defined(THREADED_RTS)
444 /* ----------------------------------------------------------------------------
445  * yieldCapability
446  * ------------------------------------------------------------------------- */
447
448 void
449 yieldCapability (Capability** pCap, Task *task)
450 {
451     Capability *cap = *pCap;
452
453     // The fast path has no locking, if we don't enter this while loop
454
455     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
456         debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
457
458         // We must now release the capability and wait to be woken up
459         // again.
460         task->wakeup = rtsFalse;
461         releaseCapabilityAndQueueWorker(cap);
462
463         for (;;) {
464             ACQUIRE_LOCK(&task->lock);
465             // task->lock held, cap->lock not held
466             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
467             cap = task->cap;
468             task->wakeup = rtsFalse;
469             RELEASE_LOCK(&task->lock);
470
471             debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
472
473             ACQUIRE_LOCK(&cap->lock);
474             if (cap->running_task != NULL) {
475                 debugTrace(DEBUG_sched, 
476                            "capability %d is owned by another task", cap->no);
477                 RELEASE_LOCK(&cap->lock);
478                 continue;
479             }
480
481             if (task->tso == NULL) {
482                 ASSERT(cap->spare_workers != NULL);
483                 // if we're not at the front of the queue, release it
484                 // again.  This is unlikely to happen.
485                 if (cap->spare_workers != task) {
486                     giveCapabilityToTask(cap,cap->spare_workers);
487                     RELEASE_LOCK(&cap->lock);
488                     continue;
489                 }
490                 cap->spare_workers = task->next;
491                 task->next = NULL;
492             }
493             cap->running_task = task;
494             RELEASE_LOCK(&cap->lock);
495             break;
496         }
497
498         trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
499         ASSERT(cap->running_task == task);
500     }
501
502     *pCap = cap;
503
504     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505
506     return;
507 }
508
509 /* ----------------------------------------------------------------------------
510  * Wake up a thread on a Capability.
511  *
512  * This is used when the current Task is running on a Capability and
513  * wishes to wake up a thread on a different Capability.
514  * ------------------------------------------------------------------------- */
515
516 void
517 wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
518 {
519     ASSERT(tso->cap == cap);
520     ASSERT(tso->bound ? tso->bound->cap == cap : 1);
521     ASSERT_LOCK_HELD(&cap->lock);
522
523     tso->cap = cap;
524
525     if (cap->running_task == NULL) {
526         // nobody is running this Capability, we can add our thread
527         // directly onto the run queue and start up a Task to run it.
528         appendToRunQueue(cap,tso);
529
530         // start it up
531         cap->running_task = myTask(); // precond for releaseCapability_()
532         trace(TRACE_sched, "resuming capability %d", cap->no);
533         releaseCapability_(cap);
534     } else {
535         appendToWakeupQueue(cap,tso);
536         // someone is running on this Capability, so it cannot be
537         // freed without first checking the wakeup queue (see
538         // releaseCapability_).
539     }
540 }
541
542 void
543 wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
544 {
545     ACQUIRE_LOCK(&cap->lock);
546     migrateThreadToCapability (cap, tso);
547     RELEASE_LOCK(&cap->lock);
548 }
549
550 void
551 migrateThreadToCapability (Capability *cap, StgTSO *tso)
552 {
553     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
554     if (tso->bound) {
555         ASSERT(tso->bound->cap == tso->cap);
556         tso->bound->cap = cap;
557     }
558     tso->cap = cap;
559     wakeupThreadOnCapability(cap,tso);
560 }
561
562 void
563 migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
564 {
565     ACQUIRE_LOCK(&cap->lock);
566     migrateThreadToCapability (cap, tso);
567     RELEASE_LOCK(&cap->lock);
568 }
569
570 /* ----------------------------------------------------------------------------
571  * prodCapabilities
572  *
573  * Used to indicate that the interrupted flag is now set, or some
574  * other global condition that might require waking up a Task on each
575  * Capability.
576  * ------------------------------------------------------------------------- */
577
578 static void
579 prodCapabilities(rtsBool all)
580 {
581     nat i;
582     Capability *cap;
583     Task *task;
584
585     for (i=0; i < n_capabilities; i++) {
586         cap = &capabilities[i];
587         ACQUIRE_LOCK(&cap->lock);
588         if (!cap->running_task) {
589             if (cap->spare_workers) {
590                 trace(TRACE_sched, "resuming capability %d", cap->no);
591                 task = cap->spare_workers;
592                 ASSERT(!task->stopped);
593                 giveCapabilityToTask(cap,task);
594                 if (!all) {
595                     RELEASE_LOCK(&cap->lock);
596                     return;
597                 }
598             }
599         }
600         RELEASE_LOCK(&cap->lock);
601     }
602     return;
603 }
604
605 void
606 prodAllCapabilities (void)
607 {
608     prodCapabilities(rtsTrue);
609 }
610
611 /* ----------------------------------------------------------------------------
612  * prodOneCapability
613  *
614  * Like prodAllCapabilities, but we only require a single Task to wake
615  * up in order to service some global event, such as checking for
616  * deadlock after some idle time has passed.
617  * ------------------------------------------------------------------------- */
618
619 void
620 prodOneCapability (void)
621 {
622     prodCapabilities(rtsFalse);
623 }
624
625 /* ----------------------------------------------------------------------------
626  * shutdownCapability
627  *
628  * At shutdown time, we want to let everything exit as cleanly as
629  * possible.  For each capability, we let its run queue drain, and
630  * allow the workers to stop.
631  *
632  * This function should be called when interrupted and
633  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
634  * will exit the scheduler and call taskStop(), and any bound thread
635  * that wakes up will return to its caller.  Runnable threads are
636  * killed.
637  *
638  * ------------------------------------------------------------------------- */
639
640 void
641 shutdownCapability (Capability *cap, Task *task)
642 {
643     nat i;
644
645     ASSERT(sched_state == SCHED_SHUTTING_DOWN);
646
647     task->cap = cap;
648
649     // Loop indefinitely until all the workers have exited and there
650     // are no Haskell threads left.  We used to bail out after 50
651     // iterations of this loop, but that occasionally left a worker
652     // running which caused problems later (the closeMutex() below
653     // isn't safe, for one thing).
654
655     for (i = 0; /* i < 50 */; i++) {
656         debugTrace(DEBUG_sched, 
657                    "shutting down capability %d, attempt %d", cap->no, i);
658         ACQUIRE_LOCK(&cap->lock);
659         if (cap->running_task) {
660             RELEASE_LOCK(&cap->lock);
661             debugTrace(DEBUG_sched, "not owner, yielding");
662             yieldThread();
663             continue;
664         }
665         cap->running_task = task;
666         if (!emptyRunQueue(cap) || cap->spare_workers) {
667             debugTrace(DEBUG_sched, 
668                        "runnable threads or workers still alive, yielding");
669             releaseCapability_(cap); // this will wake up a worker
670             RELEASE_LOCK(&cap->lock);
671             yieldThread();
672             continue;
673         }
674         debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
675         RELEASE_LOCK(&cap->lock);
676         break;
677     }
678     // we now have the Capability, its run queue and spare workers
679     // list are both empty.
680
681     // ToDo: we can't drop this mutex, because there might still be
682     // threads performing foreign calls that will eventually try to 
683     // return via resumeThread() and attempt to grab cap->lock.
684     // closeMutex(&cap->lock);
685 }
686
687 /* ----------------------------------------------------------------------------
688  * tryGrabCapability
689  *
690  * Attempt to gain control of a Capability if it is free.
691  *
692  * ------------------------------------------------------------------------- */
693
694 rtsBool
695 tryGrabCapability (Capability *cap, Task *task)
696 {
697     if (cap->running_task != NULL) return rtsFalse;
698     ACQUIRE_LOCK(&cap->lock);
699     if (cap->running_task != NULL) {
700         RELEASE_LOCK(&cap->lock);
701         return rtsFalse;
702     }
703     task->cap = cap;
704     cap->running_task = task;
705     RELEASE_LOCK(&cap->lock);
706     return rtsTrue;
707 }
708
709
710 #endif /* THREADED_RTS */
711
712