Split GC.c, and move storage manager into sm/ directory
[ghc-hetmet.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Storage.h"
27 #include "Schedule.h"
28 #include "Sparks.h"
29 #include "Trace.h"
30
31 // one global capability, this is the Capability for non-threaded
32 // builds, and for +RTS -N1
33 Capability MainCapability;
34
35 nat n_capabilities;
36 Capability *capabilities = NULL;
37
38 // Holds the Capability which last became free.  This is used so that
39 // an in-call has a chance of quickly finding a free Capability.
40 // Maintaining a global free list of Capabilities would require global
41 // locking, so we don't do that.
42 Capability *last_free_capability;
43
44 #if defined(THREADED_RTS)
45 STATIC_INLINE rtsBool
46 globalWorkToDo (void)
47 {
48     return blackholes_need_checking
49         || sched_state >= SCHED_INTERRUPTING
50         ;
51 }
52 #endif
53
54 #if defined(THREADED_RTS)
55 STATIC_INLINE rtsBool
56 anyWorkForMe( Capability *cap, Task *task )
57 {
58     if (task->tso != NULL) {
59         // A bound task only runs if its thread is on the run queue of
60         // the capability on which it was woken up.  Otherwise, we
61         // can't be sure that we have the right capability: the thread
62         // might be woken up on some other capability, and task->cap
63         // could change under our feet.
64         return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
65     } else {
66         // A vanilla worker task runs if either there is a lightweight
67         // thread at the head of the run queue, or the run queue is
68         // empty and (there are sparks to execute, or there is some
69         // other global condition to check, such as threads blocked on
70         // blackholes).
71         if (emptyRunQueue(cap)) {
72             return !emptySparkPoolCap(cap)
73                 || !emptyWakeupQueue(cap)
74                 || globalWorkToDo();
75         } else
76             return cap->run_queue_hd->bound == NULL;
77     }
78 }
79 #endif
80
81 /* -----------------------------------------------------------------------------
82  * Manage the returning_tasks lists.
83  *
84  * These functions require cap->lock
85  * -------------------------------------------------------------------------- */
86
87 #if defined(THREADED_RTS)
88 STATIC_INLINE void
89 newReturningTask (Capability *cap, Task *task)
90 {
91     ASSERT_LOCK_HELD(&cap->lock);
92     ASSERT(task->return_link == NULL);
93     if (cap->returning_tasks_hd) {
94         ASSERT(cap->returning_tasks_tl->return_link == NULL);
95         cap->returning_tasks_tl->return_link = task;
96     } else {
97         cap->returning_tasks_hd = task;
98     }
99     cap->returning_tasks_tl = task;
100 }
101
102 STATIC_INLINE Task *
103 popReturningTask (Capability *cap)
104 {
105     ASSERT_LOCK_HELD(&cap->lock);
106     Task *task;
107     task = cap->returning_tasks_hd;
108     ASSERT(task);
109     cap->returning_tasks_hd = task->return_link;
110     if (!cap->returning_tasks_hd) {
111         cap->returning_tasks_tl = NULL;
112     }
113     task->return_link = NULL;
114     return task;
115 }
116 #endif
117
118 /* ----------------------------------------------------------------------------
119  * Initialisation
120  *
121  * The Capability is initially marked not free.
122  * ------------------------------------------------------------------------- */
123
124 static void
125 initCapability( Capability *cap, nat i )
126 {
127     nat g;
128
129     cap->no = i;
130     cap->in_haskell        = rtsFalse;
131
132     cap->run_queue_hd      = END_TSO_QUEUE;
133     cap->run_queue_tl      = END_TSO_QUEUE;
134
135 #if defined(THREADED_RTS)
136     initMutex(&cap->lock);
137     cap->running_task      = NULL; // indicates cap is free
138     cap->spare_workers     = NULL;
139     cap->suspended_ccalling_tasks = NULL;
140     cap->returning_tasks_hd = NULL;
141     cap->returning_tasks_tl = NULL;
142     cap->wakeup_queue_hd    = END_TSO_QUEUE;
143     cap->wakeup_queue_tl    = END_TSO_QUEUE;
144 #endif
145
146     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
147     cap->f.stgGCFun        = (F_)__stg_gc_fun;
148
149     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
150                                      RtsFlags.GcFlags.generations,
151                                      "initCapability");
152
153     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
154         cap->mut_lists[g] = NULL;
155     }
156
157     cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
158     cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
159     cap->free_trec_chunks = END_STM_CHUNK_LIST;
160     cap->free_trec_headers = NO_TREC;
161     cap->transaction_tokens = 0;
162 }
163
164 /* ---------------------------------------------------------------------------
165  * Function:  initCapabilities()
166  *
167  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
168  *            we keep a table of them, the size of which is
169  *            controlled by the user via the RTS flag -N.
170  *
171  * ------------------------------------------------------------------------- */
172 void
173 initCapabilities( void )
174 {
175 #if defined(THREADED_RTS)
176     nat i;
177
178 #ifndef REG_Base
179     // We can't support multiple CPUs if BaseReg is not a register
180     if (RtsFlags.ParFlags.nNodes > 1) {
181         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
182         RtsFlags.ParFlags.nNodes = 1;
183     }
184 #endif
185
186     n_capabilities = RtsFlags.ParFlags.nNodes;
187
188     if (n_capabilities == 1) {
189         capabilities = &MainCapability;
190         // THREADED_RTS must work on builds that don't have a mutable
191         // BaseReg (eg. unregisterised), so in this case
192         // capabilities[0] must coincide with &MainCapability.
193     } else {
194         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
195                                       "initCapabilities");
196     }
197
198     for (i = 0; i < n_capabilities; i++) {
199         initCapability(&capabilities[i], i);
200     }
201
202     debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
203
204 #else /* !THREADED_RTS */
205
206     n_capabilities = 1;
207     capabilities = &MainCapability;
208     initCapability(&MainCapability, 0);
209
210 #endif
211
212     // There are no free capabilities to begin with.  We will start
213     // a worker Task to each Capability, which will quickly put the
214     // Capability on the free list when it finds nothing to do.
215     last_free_capability = &capabilities[0];
216 }
217
218 /* ----------------------------------------------------------------------------
219  * Give a Capability to a Task.  The task must currently be sleeping
220  * on its condition variable.
221  *
222  * Requires cap->lock (modifies cap->running_task).
223  *
224  * When migrating a Task, the migrater must take task->lock before
225  * modifying task->cap, to synchronise with the waking up Task.
226  * Additionally, the migrater should own the Capability (when
227  * migrating the run queue), or cap->lock (when migrating
228  * returning_workers).
229  *
230  * ------------------------------------------------------------------------- */
231
232 #if defined(THREADED_RTS)
233 STATIC_INLINE void
234 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
235 {
236     ASSERT_LOCK_HELD(&cap->lock);
237     ASSERT(task->cap == cap);
238     trace(TRACE_sched | DEBUG_sched,
239           "passing capability %d to %s %p",
240           cap->no, task->tso ? "bound task" : "worker",
241           (void *)task->id);
242     ACQUIRE_LOCK(&task->lock);
243     task->wakeup = rtsTrue;
244     // the wakeup flag is needed because signalCondition() doesn't
245     // flag the condition if the thread is already runniing, but we want
246     // it to be sticky.
247     signalCondition(&task->cond);
248     RELEASE_LOCK(&task->lock);
249 }
250 #endif
251
252 /* ----------------------------------------------------------------------------
253  * Function:  releaseCapability(Capability*)
254  *
255  * Purpose:   Letting go of a capability. Causes a
256  *            'returning worker' thread or a 'waiting worker'
257  *            to wake up, in that order.
258  * ------------------------------------------------------------------------- */
259
260 #if defined(THREADED_RTS)
261 void
262 releaseCapability_ (Capability* cap)
263 {
264     Task *task;
265
266     task = cap->running_task;
267
268     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
269
270     cap->running_task = NULL;
271
272     // Check to see whether a worker thread can be given
273     // the go-ahead to return the result of an external call..
274     if (cap->returning_tasks_hd != NULL) {
275         giveCapabilityToTask(cap,cap->returning_tasks_hd);
276         // The Task pops itself from the queue (see waitForReturnCapability())
277         return;
278     }
279
280     // If the next thread on the run queue is a bound thread,
281     // give this Capability to the appropriate Task.
282     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
283         // Make sure we're not about to try to wake ourselves up
284         ASSERT(task != cap->run_queue_hd->bound);
285         task = cap->run_queue_hd->bound;
286         giveCapabilityToTask(cap,task);
287         return;
288     }
289
290     if (!cap->spare_workers) {
291         // Create a worker thread if we don't have one.  If the system
292         // is interrupted, we only create a worker task if there
293         // are threads that need to be completed.  If the system is
294         // shutting down, we never create a new worker.
295         if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
296             debugTrace(DEBUG_sched,
297                        "starting new worker on capability %d", cap->no);
298             startWorkerTask(cap, workerStart);
299             return;
300         }
301     }
302
303     // If we have an unbound thread on the run queue, or if there's
304     // anything else to do, give the Capability to a worker thread.
305     if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
306               || !emptySparkPoolCap(cap) || globalWorkToDo()) {
307         if (cap->spare_workers) {
308             giveCapabilityToTask(cap,cap->spare_workers);
309             // The worker Task pops itself from the queue;
310             return;
311         }
312     }
313
314     last_free_capability = cap;
315     trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
316 }
317
318 void
319 releaseCapability (Capability* cap USED_IF_THREADS)
320 {
321     ACQUIRE_LOCK(&cap->lock);
322     releaseCapability_(cap);
323     RELEASE_LOCK(&cap->lock);
324 }
325
326 static void
327 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
328 {
329     Task *task;
330
331     ACQUIRE_LOCK(&cap->lock);
332
333     task = cap->running_task;
334
335     // If the current task is a worker, save it on the spare_workers
336     // list of this Capability.  A worker can mark itself as stopped,
337     // in which case it is not replaced on the spare_worker queue.
338     // This happens when the system is shutting down (see
339     // Schedule.c:workerStart()).
340     // Also, be careful to check that this task hasn't just exited
341     // Haskell to do a foreign call (task->suspended_tso).
342     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
343         task->next = cap->spare_workers;
344         cap->spare_workers = task;
345     }
346     // Bound tasks just float around attached to their TSOs.
347
348     releaseCapability_(cap);
349
350     RELEASE_LOCK(&cap->lock);
351 }
352 #endif
353
354 /* ----------------------------------------------------------------------------
355  * waitForReturnCapability( Task *task )
356  *
357  * Purpose:  when an OS thread returns from an external call,
358  * it calls waitForReturnCapability() (via Schedule.resumeThread())
359  * to wait for permission to enter the RTS & communicate the
360  * result of the external call back to the Haskell thread that
361  * made it.
362  *
363  * ------------------------------------------------------------------------- */
364 void
365 waitForReturnCapability (Capability **pCap, Task *task)
366 {
367 #if !defined(THREADED_RTS)
368
369     MainCapability.running_task = task;
370     task->cap = &MainCapability;
371     *pCap = &MainCapability;
372
373 #else
374     Capability *cap = *pCap;
375
376     if (cap == NULL) {
377         // Try last_free_capability first
378         cap = last_free_capability;
379         if (!cap->running_task) {
380             nat i;
381             // otherwise, search for a free capability
382             for (i = 0; i < n_capabilities; i++) {
383                 cap = &capabilities[i];
384                 if (!cap->running_task) {
385                     break;
386                 }
387             }
388             // Can't find a free one, use last_free_capability.
389             cap = last_free_capability;
390         }
391
392         // record the Capability as the one this Task is now assocated with.
393         task->cap = cap;
394
395     } else {
396         ASSERT(task->cap == cap);
397     }
398
399     ACQUIRE_LOCK(&cap->lock);
400
401     debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
402
403     if (!cap->running_task) {
404         // It's free; just grab it
405         cap->running_task = task;
406         RELEASE_LOCK(&cap->lock);
407     } else {
408         newReturningTask(cap,task);
409         RELEASE_LOCK(&cap->lock);
410
411         for (;;) {
412             ACQUIRE_LOCK(&task->lock);
413             // task->lock held, cap->lock not held
414             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
415             cap = task->cap;
416             task->wakeup = rtsFalse;
417             RELEASE_LOCK(&task->lock);
418
419             // now check whether we should wake up...
420             ACQUIRE_LOCK(&cap->lock);
421             if (cap->running_task == NULL) {
422                 if (cap->returning_tasks_hd != task) {
423                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
424                     RELEASE_LOCK(&cap->lock);
425                     continue;
426                 }
427                 cap->running_task = task;
428                 popReturningTask(cap);
429                 RELEASE_LOCK(&cap->lock);
430                 break;
431             }
432             RELEASE_LOCK(&cap->lock);
433         }
434
435     }
436
437     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
438
439     trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
440
441     *pCap = cap;
442 #endif
443 }
444
445 #if defined(THREADED_RTS)
446 /* ----------------------------------------------------------------------------
447  * yieldCapability
448  * ------------------------------------------------------------------------- */
449
450 void
451 yieldCapability (Capability** pCap, Task *task)
452 {
453     Capability *cap = *pCap;
454
455     // The fast path has no locking, if we don't enter this while loop
456
457     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
458         debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
459
460         // We must now release the capability and wait to be woken up
461         // again.
462         task->wakeup = rtsFalse;
463         releaseCapabilityAndQueueWorker(cap);
464
465         for (;;) {
466             ACQUIRE_LOCK(&task->lock);
467             // task->lock held, cap->lock not held
468             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
469             cap = task->cap;
470             task->wakeup = rtsFalse;
471             RELEASE_LOCK(&task->lock);
472
473             debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
474
475             ACQUIRE_LOCK(&cap->lock);
476             if (cap->running_task != NULL) {
477                 debugTrace(DEBUG_sched, 
478                            "capability %d is owned by another task", cap->no);
479                 RELEASE_LOCK(&cap->lock);
480                 continue;
481             }
482
483             if (task->tso == NULL) {
484                 ASSERT(cap->spare_workers != NULL);
485                 // if we're not at the front of the queue, release it
486                 // again.  This is unlikely to happen.
487                 if (cap->spare_workers != task) {
488                     giveCapabilityToTask(cap,cap->spare_workers);
489                     RELEASE_LOCK(&cap->lock);
490                     continue;
491                 }
492                 cap->spare_workers = task->next;
493                 task->next = NULL;
494             }
495             cap->running_task = task;
496             RELEASE_LOCK(&cap->lock);
497             break;
498         }
499
500         trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
501         ASSERT(cap->running_task == task);
502     }
503
504     *pCap = cap;
505
506     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
507
508     return;
509 }
510
511 /* ----------------------------------------------------------------------------
512  * Wake up a thread on a Capability.
513  *
514  * This is used when the current Task is running on a Capability and
515  * wishes to wake up a thread on a different Capability.
516  * ------------------------------------------------------------------------- */
517
518 void
519 wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
520 {
521     ASSERT(tso->cap == cap);
522     ASSERT(tso->bound ? tso->bound->cap == cap : 1);
523     ASSERT_LOCK_HELD(&cap->lock);
524
525     tso->cap = cap;
526
527     if (cap->running_task == NULL) {
528         // nobody is running this Capability, we can add our thread
529         // directly onto the run queue and start up a Task to run it.
530         appendToRunQueue(cap,tso);
531
532         // start it up
533         cap->running_task = myTask(); // precond for releaseCapability_()
534         trace(TRACE_sched, "resuming capability %d", cap->no);
535         releaseCapability_(cap);
536     } else {
537         appendToWakeupQueue(cap,tso);
538         // someone is running on this Capability, so it cannot be
539         // freed without first checking the wakeup queue (see
540         // releaseCapability_).
541     }
542 }
543
544 void
545 wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
546 {
547     ACQUIRE_LOCK(&cap->lock);
548     migrateThreadToCapability (cap, tso);
549     RELEASE_LOCK(&cap->lock);
550 }
551
552 void
553 migrateThreadToCapability (Capability *cap, StgTSO *tso)
554 {
555     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
556     if (tso->bound) {
557         ASSERT(tso->bound->cap == tso->cap);
558         tso->bound->cap = cap;
559     }
560     tso->cap = cap;
561     wakeupThreadOnCapability(cap,tso);
562 }
563
564 void
565 migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
566 {
567     ACQUIRE_LOCK(&cap->lock);
568     migrateThreadToCapability (cap, tso);
569     RELEASE_LOCK(&cap->lock);
570 }
571
572 /* ----------------------------------------------------------------------------
573  * prodCapabilities
574  *
575  * Used to indicate that the interrupted flag is now set, or some
576  * other global condition that might require waking up a Task on each
577  * Capability.
578  * ------------------------------------------------------------------------- */
579
580 static void
581 prodCapabilities(rtsBool all)
582 {
583     nat i;
584     Capability *cap;
585     Task *task;
586
587     for (i=0; i < n_capabilities; i++) {
588         cap = &capabilities[i];
589         ACQUIRE_LOCK(&cap->lock);
590         if (!cap->running_task) {
591             if (cap->spare_workers) {
592                 trace(TRACE_sched, "resuming capability %d", cap->no);
593                 task = cap->spare_workers;
594                 ASSERT(!task->stopped);
595                 giveCapabilityToTask(cap,task);
596                 if (!all) {
597                     RELEASE_LOCK(&cap->lock);
598                     return;
599                 }
600             }
601         }
602         RELEASE_LOCK(&cap->lock);
603     }
604     return;
605 }
606
607 void
608 prodAllCapabilities (void)
609 {
610     prodCapabilities(rtsTrue);
611 }
612
613 /* ----------------------------------------------------------------------------
614  * prodOneCapability
615  *
616  * Like prodAllCapabilities, but we only require a single Task to wake
617  * up in order to service some global event, such as checking for
618  * deadlock after some idle time has passed.
619  * ------------------------------------------------------------------------- */
620
621 void
622 prodOneCapability (void)
623 {
624     prodCapabilities(rtsFalse);
625 }
626
627 /* ----------------------------------------------------------------------------
628  * shutdownCapability
629  *
630  * At shutdown time, we want to let everything exit as cleanly as
631  * possible.  For each capability, we let its run queue drain, and
632  * allow the workers to stop.
633  *
634  * This function should be called when interrupted and
635  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
636  * will exit the scheduler and call taskStop(), and any bound thread
637  * that wakes up will return to its caller.  Runnable threads are
638  * killed.
639  *
640  * ------------------------------------------------------------------------- */
641
642 void
643 shutdownCapability (Capability *cap, Task *task)
644 {
645     nat i;
646
647     ASSERT(sched_state == SCHED_SHUTTING_DOWN);
648
649     task->cap = cap;
650
651     // Loop indefinitely until all the workers have exited and there
652     // are no Haskell threads left.  We used to bail out after 50
653     // iterations of this loop, but that occasionally left a worker
654     // running which caused problems later (the closeMutex() below
655     // isn't safe, for one thing).
656
657     for (i = 0; /* i < 50 */; i++) {
658         debugTrace(DEBUG_sched, 
659                    "shutting down capability %d, attempt %d", cap->no, i);
660         ACQUIRE_LOCK(&cap->lock);
661         if (cap->running_task) {
662             RELEASE_LOCK(&cap->lock);
663             debugTrace(DEBUG_sched, "not owner, yielding");
664             yieldThread();
665             continue;
666         }
667         cap->running_task = task;
668         if (!emptyRunQueue(cap) || cap->spare_workers) {
669             debugTrace(DEBUG_sched, 
670                        "runnable threads or workers still alive, yielding");
671             releaseCapability_(cap); // this will wake up a worker
672             RELEASE_LOCK(&cap->lock);
673             yieldThread();
674             continue;
675         }
676         debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
677         RELEASE_LOCK(&cap->lock);
678         break;
679     }
680     // we now have the Capability, its run queue and spare workers
681     // list are both empty.
682
683     // ToDo: we can't drop this mutex, because there might still be
684     // threads performing foreign calls that will eventually try to 
685     // return via resumeThread() and attempt to grab cap->lock.
686     // closeMutex(&cap->lock);
687 }
688
689 /* ----------------------------------------------------------------------------
690  * tryGrabCapability
691  *
692  * Attempt to gain control of a Capability if it is free.
693  *
694  * ------------------------------------------------------------------------- */
695
696 rtsBool
697 tryGrabCapability (Capability *cap, Task *task)
698 {
699     if (cap->running_task != NULL) return rtsFalse;
700     ACQUIRE_LOCK(&cap->lock);
701     if (cap->running_task != NULL) {
702         RELEASE_LOCK(&cap->lock);
703         return rtsFalse;
704     }
705     task->cap = cap;
706     cap->running_task = task;
707     RELEASE_LOCK(&cap->lock);
708     return rtsTrue;
709 }
710
711
712 #endif /* THREADED_RTS */
713
714