STM invariants
[ghc-hetmet.git] / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28 #include "Trace.h"
29
30 // one global capability, this is the Capability for non-threaded
31 // builds, and for +RTS -N1
32 Capability MainCapability;
33
34 nat n_capabilities;
35 Capability *capabilities = NULL;
36
37 // Holds the Capability which last became free.  This is used so that
38 // an in-call has a chance of quickly finding a free Capability.
39 // Maintaining a global free list of Capabilities would require global
40 // locking, so we don't do that.
41 Capability *last_free_capability;
42
43 #if defined(THREADED_RTS)
44 STATIC_INLINE rtsBool
45 globalWorkToDo (void)
46 {
47     return blackholes_need_checking
48         || sched_state >= SCHED_INTERRUPTING
49         ;
50 }
51 #endif
52
53 #if defined(THREADED_RTS)
54 STATIC_INLINE rtsBool
55 anyWorkForMe( Capability *cap, Task *task )
56 {
57     if (task->tso != NULL) {
58         // A bound task only runs if its thread is on the run queue of
59         // the capability on which it was woken up.  Otherwise, we
60         // can't be sure that we have the right capability: the thread
61         // might be woken up on some other capability, and task->cap
62         // could change under our feet.
63         return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
64     } else {
65         // A vanilla worker task runs if either there is a lightweight
66         // thread at the head of the run queue, or the run queue is
67         // empty and (there are sparks to execute, or there is some
68         // other global condition to check, such as threads blocked on
69         // blackholes).
70         if (emptyRunQueue(cap)) {
71             return !emptySparkPoolCap(cap)
72                 || !emptyWakeupQueue(cap)
73                 || globalWorkToDo();
74         } else
75             return cap->run_queue_hd->bound == NULL;
76     }
77 }
78 #endif
79
80 /* -----------------------------------------------------------------------------
81  * Manage the returning_tasks lists.
82  *
83  * These functions require cap->lock
84  * -------------------------------------------------------------------------- */
85
86 #if defined(THREADED_RTS)
87 STATIC_INLINE void
88 newReturningTask (Capability *cap, Task *task)
89 {
90     ASSERT_LOCK_HELD(&cap->lock);
91     ASSERT(task->return_link == NULL);
92     if (cap->returning_tasks_hd) {
93         ASSERT(cap->returning_tasks_tl->return_link == NULL);
94         cap->returning_tasks_tl->return_link = task;
95     } else {
96         cap->returning_tasks_hd = task;
97     }
98     cap->returning_tasks_tl = task;
99 }
100
101 STATIC_INLINE Task *
102 popReturningTask (Capability *cap)
103 {
104     ASSERT_LOCK_HELD(&cap->lock);
105     Task *task;
106     task = cap->returning_tasks_hd;
107     ASSERT(task);
108     cap->returning_tasks_hd = task->return_link;
109     if (!cap->returning_tasks_hd) {
110         cap->returning_tasks_tl = NULL;
111     }
112     task->return_link = NULL;
113     return task;
114 }
115 #endif
116
117 /* ----------------------------------------------------------------------------
118  * Initialisation
119  *
120  * The Capability is initially marked not free.
121  * ------------------------------------------------------------------------- */
122
123 static void
124 initCapability( Capability *cap, nat i )
125 {
126     nat g;
127
128     cap->no = i;
129     cap->in_haskell        = rtsFalse;
130
131     cap->run_queue_hd      = END_TSO_QUEUE;
132     cap->run_queue_tl      = END_TSO_QUEUE;
133
134 #if defined(THREADED_RTS)
135     initMutex(&cap->lock);
136     cap->running_task      = NULL; // indicates cap is free
137     cap->spare_workers     = NULL;
138     cap->suspended_ccalling_tasks = NULL;
139     cap->returning_tasks_hd = NULL;
140     cap->returning_tasks_tl = NULL;
141     cap->wakeup_queue_hd    = END_TSO_QUEUE;
142     cap->wakeup_queue_tl    = END_TSO_QUEUE;
143 #endif
144
145     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
146     cap->f.stgGCFun        = (F_)__stg_gc_fun;
147
148     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
149                                      RtsFlags.GcFlags.generations,
150                                      "initCapability");
151
152     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
153         cap->mut_lists[g] = NULL;
154     }
155
156     cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
157     cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
158     cap->free_trec_chunks = END_STM_CHUNK_LIST;
159     cap->free_trec_headers = NO_TREC;
160     cap->transaction_tokens = 0;
161 }
162
163 /* ---------------------------------------------------------------------------
164  * Function:  initCapabilities()
165  *
166  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
167  *            we keep a table of them, the size of which is
168  *            controlled by the user via the RTS flag -N.
169  *
170  * ------------------------------------------------------------------------- */
171 void
172 initCapabilities( void )
173 {
174 #if defined(THREADED_RTS)
175     nat i;
176
177 #ifndef REG_Base
178     // We can't support multiple CPUs if BaseReg is not a register
179     if (RtsFlags.ParFlags.nNodes > 1) {
180         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
181         RtsFlags.ParFlags.nNodes = 1;
182     }
183 #endif
184
185     n_capabilities = RtsFlags.ParFlags.nNodes;
186
187     if (n_capabilities == 1) {
188         capabilities = &MainCapability;
189         // THREADED_RTS must work on builds that don't have a mutable
190         // BaseReg (eg. unregisterised), so in this case
191         // capabilities[0] must coincide with &MainCapability.
192     } else {
193         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
194                                       "initCapabilities");
195     }
196
197     for (i = 0; i < n_capabilities; i++) {
198         initCapability(&capabilities[i], i);
199     }
200
201     debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
202
203 #else /* !THREADED_RTS */
204
205     n_capabilities = 1;
206     capabilities = &MainCapability;
207     initCapability(&MainCapability, 0);
208
209 #endif
210
211     // There are no free capabilities to begin with.  We will start
212     // a worker Task to each Capability, which will quickly put the
213     // Capability on the free list when it finds nothing to do.
214     last_free_capability = &capabilities[0];
215 }
216
217 /* ----------------------------------------------------------------------------
218  * Give a Capability to a Task.  The task must currently be sleeping
219  * on its condition variable.
220  *
221  * Requires cap->lock (modifies cap->running_task).
222  *
223  * When migrating a Task, the migrater must take task->lock before
224  * modifying task->cap, to synchronise with the waking up Task.
225  * Additionally, the migrater should own the Capability (when
226  * migrating the run queue), or cap->lock (when migrating
227  * returning_workers).
228  *
229  * ------------------------------------------------------------------------- */
230
231 #if defined(THREADED_RTS)
232 STATIC_INLINE void
233 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
234 {
235     ASSERT_LOCK_HELD(&cap->lock);
236     ASSERT(task->cap == cap);
237     trace(TRACE_sched | DEBUG_sched,
238           "passing capability %d to %s %p",
239           cap->no, task->tso ? "bound task" : "worker",
240           (void *)task->id);
241     ACQUIRE_LOCK(&task->lock);
242     task->wakeup = rtsTrue;
243     // the wakeup flag is needed because signalCondition() doesn't
244     // flag the condition if the thread is already runniing, but we want
245     // it to be sticky.
246     signalCondition(&task->cond);
247     RELEASE_LOCK(&task->lock);
248 }
249 #endif
250
251 /* ----------------------------------------------------------------------------
252  * Function:  releaseCapability(Capability*)
253  *
254  * Purpose:   Letting go of a capability. Causes a
255  *            'returning worker' thread or a 'waiting worker'
256  *            to wake up, in that order.
257  * ------------------------------------------------------------------------- */
258
259 #if defined(THREADED_RTS)
260 void
261 releaseCapability_ (Capability* cap)
262 {
263     Task *task;
264
265     task = cap->running_task;
266
267     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
268
269     cap->running_task = NULL;
270
271     // Check to see whether a worker thread can be given
272     // the go-ahead to return the result of an external call..
273     if (cap->returning_tasks_hd != NULL) {
274         giveCapabilityToTask(cap,cap->returning_tasks_hd);
275         // The Task pops itself from the queue (see waitForReturnCapability())
276         return;
277     }
278
279     // If the next thread on the run queue is a bound thread,
280     // give this Capability to the appropriate Task.
281     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
282         // Make sure we're not about to try to wake ourselves up
283         ASSERT(task != cap->run_queue_hd->bound);
284         task = cap->run_queue_hd->bound;
285         giveCapabilityToTask(cap,task);
286         return;
287     }
288
289     if (!cap->spare_workers) {
290         // Create a worker thread if we don't have one.  If the system
291         // is interrupted, we only create a worker task if there
292         // are threads that need to be completed.  If the system is
293         // shutting down, we never create a new worker.
294         if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
295             debugTrace(DEBUG_sched,
296                        "starting new worker on capability %d", cap->no);
297             startWorkerTask(cap, workerStart);
298             return;
299         }
300     }
301
302     // If we have an unbound thread on the run queue, or if there's
303     // anything else to do, give the Capability to a worker thread.
304     if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
305               || !emptySparkPoolCap(cap) || globalWorkToDo()) {
306         if (cap->spare_workers) {
307             giveCapabilityToTask(cap,cap->spare_workers);
308             // The worker Task pops itself from the queue;
309             return;
310         }
311     }
312
313     last_free_capability = cap;
314     trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
315 }
316
317 void
318 releaseCapability (Capability* cap USED_IF_THREADS)
319 {
320     ACQUIRE_LOCK(&cap->lock);
321     releaseCapability_(cap);
322     RELEASE_LOCK(&cap->lock);
323 }
324
325 static void
326 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
327 {
328     Task *task;
329
330     ACQUIRE_LOCK(&cap->lock);
331
332     task = cap->running_task;
333
334     // If the current task is a worker, save it on the spare_workers
335     // list of this Capability.  A worker can mark itself as stopped,
336     // in which case it is not replaced on the spare_worker queue.
337     // This happens when the system is shutting down (see
338     // Schedule.c:workerStart()).
339     // Also, be careful to check that this task hasn't just exited
340     // Haskell to do a foreign call (task->suspended_tso).
341     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
342         task->next = cap->spare_workers;
343         cap->spare_workers = task;
344     }
345     // Bound tasks just float around attached to their TSOs.
346
347     releaseCapability_(cap);
348
349     RELEASE_LOCK(&cap->lock);
350 }
351 #endif
352
353 /* ----------------------------------------------------------------------------
354  * waitForReturnCapability( Task *task )
355  *
356  * Purpose:  when an OS thread returns from an external call,
357  * it calls waitForReturnCapability() (via Schedule.resumeThread())
358  * to wait for permission to enter the RTS & communicate the
359  * result of the external call back to the Haskell thread that
360  * made it.
361  *
362  * ------------------------------------------------------------------------- */
363 void
364 waitForReturnCapability (Capability **pCap, Task *task)
365 {
366 #if !defined(THREADED_RTS)
367
368     MainCapability.running_task = task;
369     task->cap = &MainCapability;
370     *pCap = &MainCapability;
371
372 #else
373     Capability *cap = *pCap;
374
375     if (cap == NULL) {
376         // Try last_free_capability first
377         cap = last_free_capability;
378         if (!cap->running_task) {
379             nat i;
380             // otherwise, search for a free capability
381             for (i = 0; i < n_capabilities; i++) {
382                 cap = &capabilities[i];
383                 if (!cap->running_task) {
384                     break;
385                 }
386             }
387             // Can't find a free one, use last_free_capability.
388             cap = last_free_capability;
389         }
390
391         // record the Capability as the one this Task is now assocated with.
392         task->cap = cap;
393
394     } else {
395         ASSERT(task->cap == cap);
396     }
397
398     ACQUIRE_LOCK(&cap->lock);
399
400     debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
401
402     if (!cap->running_task) {
403         // It's free; just grab it
404         cap->running_task = task;
405         RELEASE_LOCK(&cap->lock);
406     } else {
407         newReturningTask(cap,task);
408         RELEASE_LOCK(&cap->lock);
409
410         for (;;) {
411             ACQUIRE_LOCK(&task->lock);
412             // task->lock held, cap->lock not held
413             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
414             cap = task->cap;
415             task->wakeup = rtsFalse;
416             RELEASE_LOCK(&task->lock);
417
418             // now check whether we should wake up...
419             ACQUIRE_LOCK(&cap->lock);
420             if (cap->running_task == NULL) {
421                 if (cap->returning_tasks_hd != task) {
422                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
423                     RELEASE_LOCK(&cap->lock);
424                     continue;
425                 }
426                 cap->running_task = task;
427                 popReturningTask(cap);
428                 RELEASE_LOCK(&cap->lock);
429                 break;
430             }
431             RELEASE_LOCK(&cap->lock);
432         }
433
434     }
435
436     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
437
438     trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
439
440     *pCap = cap;
441 #endif
442 }
443
444 #if defined(THREADED_RTS)
445 /* ----------------------------------------------------------------------------
446  * yieldCapability
447  * ------------------------------------------------------------------------- */
448
449 void
450 yieldCapability (Capability** pCap, Task *task)
451 {
452     Capability *cap = *pCap;
453
454     // The fast path has no locking, if we don't enter this while loop
455
456     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
457         debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
458
459         // We must now release the capability and wait to be woken up
460         // again.
461         task->wakeup = rtsFalse;
462         releaseCapabilityAndQueueWorker(cap);
463
464         for (;;) {
465             ACQUIRE_LOCK(&task->lock);
466             // task->lock held, cap->lock not held
467             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
468             cap = task->cap;
469             task->wakeup = rtsFalse;
470             RELEASE_LOCK(&task->lock);
471
472             debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
473
474             ACQUIRE_LOCK(&cap->lock);
475             if (cap->running_task != NULL) {
476                 debugTrace(DEBUG_sched, 
477                            "capability %d is owned by another task", cap->no);
478                 RELEASE_LOCK(&cap->lock);
479                 continue;
480             }
481
482             if (task->tso == NULL) {
483                 ASSERT(cap->spare_workers != NULL);
484                 // if we're not at the front of the queue, release it
485                 // again.  This is unlikely to happen.
486                 if (cap->spare_workers != task) {
487                     giveCapabilityToTask(cap,cap->spare_workers);
488                     RELEASE_LOCK(&cap->lock);
489                     continue;
490                 }
491                 cap->spare_workers = task->next;
492                 task->next = NULL;
493             }
494             cap->running_task = task;
495             RELEASE_LOCK(&cap->lock);
496             break;
497         }
498
499         trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
500         ASSERT(cap->running_task == task);
501     }
502
503     *pCap = cap;
504
505     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
506
507     return;
508 }
509
510 /* ----------------------------------------------------------------------------
511  * Wake up a thread on a Capability.
512  *
513  * This is used when the current Task is running on a Capability and
514  * wishes to wake up a thread on a different Capability.
515  * ------------------------------------------------------------------------- */
516
517 void
518 wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
519 {
520     ASSERT(tso->cap == cap);
521     ASSERT(tso->bound ? tso->bound->cap == cap : 1);
522     ASSERT_LOCK_HELD(&cap->lock);
523
524     tso->cap = cap;
525
526     if (cap->running_task == NULL) {
527         // nobody is running this Capability, we can add our thread
528         // directly onto the run queue and start up a Task to run it.
529         appendToRunQueue(cap,tso);
530
531         // start it up
532         cap->running_task = myTask(); // precond for releaseCapability_()
533         trace(TRACE_sched, "resuming capability %d", cap->no);
534         releaseCapability_(cap);
535     } else {
536         appendToWakeupQueue(cap,tso);
537         // someone is running on this Capability, so it cannot be
538         // freed without first checking the wakeup queue (see
539         // releaseCapability_).
540     }
541 }
542
543 void
544 wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
545 {
546     ACQUIRE_LOCK(&cap->lock);
547     migrateThreadToCapability (cap, tso);
548     RELEASE_LOCK(&cap->lock);
549 }
550
551 void
552 migrateThreadToCapability (Capability *cap, StgTSO *tso)
553 {
554     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
555     if (tso->bound) {
556         ASSERT(tso->bound->cap == tso->cap);
557         tso->bound->cap = cap;
558     }
559     tso->cap = cap;
560     wakeupThreadOnCapability(cap,tso);
561 }
562
563 void
564 migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
565 {
566     ACQUIRE_LOCK(&cap->lock);
567     migrateThreadToCapability (cap, tso);
568     RELEASE_LOCK(&cap->lock);
569 }
570
571 /* ----------------------------------------------------------------------------
572  * prodCapabilities
573  *
574  * Used to indicate that the interrupted flag is now set, or some
575  * other global condition that might require waking up a Task on each
576  * Capability.
577  * ------------------------------------------------------------------------- */
578
579 static void
580 prodCapabilities(rtsBool all)
581 {
582     nat i;
583     Capability *cap;
584     Task *task;
585
586     for (i=0; i < n_capabilities; i++) {
587         cap = &capabilities[i];
588         ACQUIRE_LOCK(&cap->lock);
589         if (!cap->running_task) {
590             if (cap->spare_workers) {
591                 trace(TRACE_sched, "resuming capability %d", cap->no);
592                 task = cap->spare_workers;
593                 ASSERT(!task->stopped);
594                 giveCapabilityToTask(cap,task);
595                 if (!all) {
596                     RELEASE_LOCK(&cap->lock);
597                     return;
598                 }
599             }
600         }
601         RELEASE_LOCK(&cap->lock);
602     }
603     return;
604 }
605
606 void
607 prodAllCapabilities (void)
608 {
609     prodCapabilities(rtsTrue);
610 }
611
612 /* ----------------------------------------------------------------------------
613  * prodOneCapability
614  *
615  * Like prodAllCapabilities, but we only require a single Task to wake
616  * up in order to service some global event, such as checking for
617  * deadlock after some idle time has passed.
618  * ------------------------------------------------------------------------- */
619
620 void
621 prodOneCapability (void)
622 {
623     prodCapabilities(rtsFalse);
624 }
625
626 /* ----------------------------------------------------------------------------
627  * shutdownCapability
628  *
629  * At shutdown time, we want to let everything exit as cleanly as
630  * possible.  For each capability, we let its run queue drain, and
631  * allow the workers to stop.
632  *
633  * This function should be called when interrupted and
634  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
635  * will exit the scheduler and call taskStop(), and any bound thread
636  * that wakes up will return to its caller.  Runnable threads are
637  * killed.
638  *
639  * ------------------------------------------------------------------------- */
640
641 void
642 shutdownCapability (Capability *cap, Task *task)
643 {
644     nat i;
645
646     ASSERT(sched_state == SCHED_SHUTTING_DOWN);
647
648     task->cap = cap;
649
650     // Loop indefinitely until all the workers have exited and there
651     // are no Haskell threads left.  We used to bail out after 50
652     // iterations of this loop, but that occasionally left a worker
653     // running which caused problems later (the closeMutex() below
654     // isn't safe, for one thing).
655
656     for (i = 0; /* i < 50 */; i++) {
657         debugTrace(DEBUG_sched, 
658                    "shutting down capability %d, attempt %d", cap->no, i);
659         ACQUIRE_LOCK(&cap->lock);
660         if (cap->running_task) {
661             RELEASE_LOCK(&cap->lock);
662             debugTrace(DEBUG_sched, "not owner, yielding");
663             yieldThread();
664             continue;
665         }
666         cap->running_task = task;
667         if (!emptyRunQueue(cap) || cap->spare_workers) {
668             debugTrace(DEBUG_sched, 
669                        "runnable threads or workers still alive, yielding");
670             releaseCapability_(cap); // this will wake up a worker
671             RELEASE_LOCK(&cap->lock);
672             yieldThread();
673             continue;
674         }
675         debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
676         RELEASE_LOCK(&cap->lock);
677         break;
678     }
679     // we now have the Capability, its run queue and spare workers
680     // list are both empty.
681
682     // ToDo: we can't drop this mutex, because there might still be
683     // threads performing foreign calls that will eventually try to 
684     // return via resumeThread() and attempt to grab cap->lock.
685     // closeMutex(&cap->lock);
686 }
687
688 /* ----------------------------------------------------------------------------
689  * tryGrabCapability
690  *
691  * Attempt to gain control of a Capability if it is free.
692  *
693  * ------------------------------------------------------------------------- */
694
695 rtsBool
696 tryGrabCapability (Capability *cap, Task *task)
697 {
698     if (cap->running_task != NULL) return rtsFalse;
699     ACQUIRE_LOCK(&cap->lock);
700     if (cap->running_task != NULL) {
701         RELEASE_LOCK(&cap->lock);
702         return rtsFalse;
703     }
704     task->cap = cap;
705     cap->running_task = task;
706     RELEASE_LOCK(&cap->lock);
707     return rtsTrue;
708 }
709
710
711 #endif /* THREADED_RTS */
712
713