fix a rather subtle SMP bug in anyWorkForMe()
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2006
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 // one global capability, this is the Capability for non-threaded
30 // builds, and for +RTS -N1
31 Capability MainCapability;
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #if defined(THREADED_RTS)
43 STATIC_INLINE rtsBool
44 globalWorkToDo (void)
45 {
46     return blackholes_need_checking
47         || interrupted
48         ;
49 }
50 #endif
51
52 #if defined(THREADED_RTS)
53 STATIC_INLINE rtsBool
54 anyWorkForMe( Capability *cap, Task *task )
55 {
56     if (task->tso != NULL) {
57         // A bound task only runs if its thread is on the run queue of
58         // the capability on which it was woken up.  Otherwise, we
59         // can't be sure that we have the right capability: the thread
60         // might be woken up on some other capability, and task->cap
61         // could change under our feet.
62         return (!emptyRunQueue(cap) && cap->run_queue_hd->bound == task);
63     } else {
64         // A vanilla worker task runs if either (a) there is a
65         // lightweight thread at the head of the run queue, or (b)
66         // there are sparks to execute, or (c) there is some other
67         // global condition to check, such as threads blocked on
68         // blackholes.
69         return ((!emptyRunQueue(cap) && cap->run_queue_hd->bound == NULL)
70                 || !emptySparkPoolCap(cap)
71                 || globalWorkToDo());
72     }
73 }
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Manage the returning_tasks lists.
78  *
79  * These functions require cap->lock
80  * -------------------------------------------------------------------------- */
81
82 #if defined(THREADED_RTS)
83 STATIC_INLINE void
84 newReturningTask (Capability *cap, Task *task)
85 {
86     ASSERT_LOCK_HELD(&cap->lock);
87     ASSERT(task->return_link == NULL);
88     if (cap->returning_tasks_hd) {
89         ASSERT(cap->returning_tasks_tl->return_link == NULL);
90         cap->returning_tasks_tl->return_link = task;
91     } else {
92         cap->returning_tasks_hd = task;
93     }
94     cap->returning_tasks_tl = task;
95 }
96
97 STATIC_INLINE Task *
98 popReturningTask (Capability *cap)
99 {
100     ASSERT_LOCK_HELD(&cap->lock);
101     Task *task;
102     task = cap->returning_tasks_hd;
103     ASSERT(task);
104     cap->returning_tasks_hd = task->return_link;
105     if (!cap->returning_tasks_hd) {
106         cap->returning_tasks_tl = NULL;
107     }
108     task->return_link = NULL;
109     return task;
110 }
111 #endif
112
113 /* ----------------------------------------------------------------------------
114  * Initialisation
115  *
116  * The Capability is initially marked not free.
117  * ------------------------------------------------------------------------- */
118
119 static void
120 initCapability( Capability *cap, nat i )
121 {
122     nat g;
123
124     cap->no = i;
125     cap->in_haskell        = rtsFalse;
126
127     cap->run_queue_hd      = END_TSO_QUEUE;
128     cap->run_queue_tl      = END_TSO_QUEUE;
129
130 #if defined(THREADED_RTS)
131     initMutex(&cap->lock);
132     cap->running_task      = NULL; // indicates cap is free
133     cap->spare_workers     = NULL;
134     cap->suspended_ccalling_tasks = NULL;
135     cap->returning_tasks_hd = NULL;
136     cap->returning_tasks_tl = NULL;
137 #endif
138
139     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
140     cap->f.stgGCFun        = (F_)__stg_gc_fun;
141
142     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
143                                      RtsFlags.GcFlags.generations,
144                                      "initCapability");
145
146     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
147         cap->mut_lists[g] = NULL;
148     }
149
150     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
151     cap->free_trec_chunks = END_STM_CHUNK_LIST;
152     cap->free_trec_headers = NO_TREC;
153     cap->transaction_tokens = 0;
154 }
155
156 /* ---------------------------------------------------------------------------
157  * Function:  initCapabilities()
158  *
159  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
160  *            we keep a table of them, the size of which is
161  *            controlled by the user via the RTS flag -N.
162  *
163  * ------------------------------------------------------------------------- */
164 void
165 initCapabilities( void )
166 {
167 #if defined(THREADED_RTS)
168     nat i;
169
170 #ifndef REG_Base
171     // We can't support multiple CPUs if BaseReg is not a register
172     if (RtsFlags.ParFlags.nNodes > 1) {
173         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
174         RtsFlags.ParFlags.nNodes = 1;
175     }
176 #endif
177
178     n_capabilities = RtsFlags.ParFlags.nNodes;
179
180     if (n_capabilities == 1) {
181         capabilities = &MainCapability;
182         // THREADED_RTS must work on builds that don't have a mutable
183         // BaseReg (eg. unregisterised), so in this case
184         // capabilities[0] must coincide with &MainCapability.
185     } else {
186         capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
187                                       "initCapabilities");
188     }
189
190     for (i = 0; i < n_capabilities; i++) {
191         initCapability(&capabilities[i], i);
192     }
193
194     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", 
195                                     n_capabilities));
196
197 #else /* !THREADED_RTS */
198
199     n_capabilities = 1;
200     capabilities = &MainCapability;
201     initCapability(&MainCapability, 0);
202
203 #endif
204
205     // There are no free capabilities to begin with.  We will start
206     // a worker Task to each Capability, which will quickly put the
207     // Capability on the free list when it finds nothing to do.
208     last_free_capability = &capabilities[0];
209 }
210
211 /* ----------------------------------------------------------------------------
212  * Give a Capability to a Task.  The task must currently be sleeping
213  * on its condition variable.
214  *
215  * Requires cap->lock (modifies cap->running_task).
216  *
217  * When migrating a Task, the migrater must take task->lock before
218  * modifying task->cap, to synchronise with the waking up Task.
219  * Additionally, the migrater should own the Capability (when
220  * migrating the run queue), or cap->lock (when migrating
221  * returning_workers).
222  *
223  * ------------------------------------------------------------------------- */
224
225 #if defined(THREADED_RTS)
226 STATIC_INLINE void
227 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
228 {
229     ASSERT_LOCK_HELD(&cap->lock);
230     ASSERT(task->cap == cap);
231     IF_DEBUG(scheduler,
232              sched_belch("passing capability %d to %s %p",
233                          cap->no, task->tso ? "bound task" : "worker",
234                          (void *)task->id));
235     ACQUIRE_LOCK(&task->lock);
236     task->wakeup = rtsTrue;
237     // the wakeup flag is needed because signalCondition() doesn't
238     // flag the condition if the thread is already runniing, but we want
239     // it to be sticky.
240     signalCondition(&task->cond);
241     RELEASE_LOCK(&task->lock);
242 }
243 #endif
244
245 /* ----------------------------------------------------------------------------
246  * Function:  releaseCapability(Capability*)
247  *
248  * Purpose:   Letting go of a capability. Causes a
249  *            'returning worker' thread or a 'waiting worker'
250  *            to wake up, in that order.
251  * ------------------------------------------------------------------------- */
252
253 #if defined(THREADED_RTS)
254 void
255 releaseCapability_ (Capability* cap)
256 {
257     Task *task;
258
259     task = cap->running_task;
260
261     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
262
263     cap->running_task = NULL;
264
265     // Check to see whether a worker thread can be given
266     // the go-ahead to return the result of an external call..
267     if (cap->returning_tasks_hd != NULL) {
268         giveCapabilityToTask(cap,cap->returning_tasks_hd);
269         // The Task pops itself from the queue (see waitForReturnCapability())
270         return;
271     }
272
273     // If the next thread on the run queue is a bound thread,
274     // give this Capability to the appropriate Task.
275     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
276         // Make sure we're not about to try to wake ourselves up
277         ASSERT(task != cap->run_queue_hd->bound);
278         task = cap->run_queue_hd->bound;
279         giveCapabilityToTask(cap,task);
280         return;
281     }
282
283     if (!cap->spare_workers) {
284         // Create a worker thread if we don't have one.  If the system
285         // is interrupted, we only create a worker task if there
286         // are threads that need to be completed.  If the system is
287         // shutting down, we never create a new worker.
288         if (!shutting_down_scheduler) {
289             IF_DEBUG(scheduler,
290                      sched_belch("starting new worker on capability %d", cap->no));
291             startWorkerTask(cap, workerStart);
292             return;
293         }
294     }
295
296     // If we have an unbound thread on the run queue, or if there's
297     // anything else to do, give the Capability to a worker thread.
298     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
299         if (cap->spare_workers) {
300             giveCapabilityToTask(cap,cap->spare_workers);
301             // The worker Task pops itself from the queue;
302             return;
303         }
304     }
305
306     last_free_capability = cap;
307     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
308 }
309
310 void
311 releaseCapability (Capability* cap USED_IF_THREADS)
312 {
313     ACQUIRE_LOCK(&cap->lock);
314     releaseCapability_(cap);
315     RELEASE_LOCK(&cap->lock);
316 }
317
318 static void
319 releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
320 {
321     Task *task;
322
323     ACQUIRE_LOCK(&cap->lock);
324
325     task = cap->running_task;
326
327     // If the current task is a worker, save it on the spare_workers
328     // list of this Capability.  A worker can mark itself as stopped,
329     // in which case it is not replaced on the spare_worker queue.
330     // This happens when the system is shutting down (see
331     // Schedule.c:workerStart()).
332     // Also, be careful to check that this task hasn't just exited
333     // Haskell to do a foreign call (task->suspended_tso).
334     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
335         task->next = cap->spare_workers;
336         cap->spare_workers = task;
337     }
338     // Bound tasks just float around attached to their TSOs.
339
340     releaseCapability_(cap);
341
342     RELEASE_LOCK(&cap->lock);
343 }
344 #endif
345
346 /* ----------------------------------------------------------------------------
347  * waitForReturnCapability( Task *task )
348  *
349  * Purpose:  when an OS thread returns from an external call,
350  * it calls waitForReturnCapability() (via Schedule.resumeThread())
351  * to wait for permission to enter the RTS & communicate the
352  * result of the external call back to the Haskell thread that
353  * made it.
354  *
355  * ------------------------------------------------------------------------- */
356 void
357 waitForReturnCapability (Capability **pCap, Task *task)
358 {
359 #if !defined(THREADED_RTS)
360
361     MainCapability.running_task = task;
362     task->cap = &MainCapability;
363     *pCap = &MainCapability;
364
365 #else
366     Capability *cap = *pCap;
367
368     if (cap == NULL) {
369         // Try last_free_capability first
370         cap = last_free_capability;
371         if (!cap->running_task) {
372             nat i;
373             // otherwise, search for a free capability
374             for (i = 0; i < n_capabilities; i++) {
375                 cap = &capabilities[i];
376                 if (!cap->running_task) {
377                     break;
378                 }
379             }
380             // Can't find a free one, use last_free_capability.
381             cap = last_free_capability;
382         }
383
384         // record the Capability as the one this Task is now assocated with.
385         task->cap = cap;
386
387     } else {
388         ASSERT(task->cap == cap);
389     }
390
391     ACQUIRE_LOCK(&cap->lock);
392
393     IF_DEBUG(scheduler,
394              sched_belch("returning; I want capability %d", cap->no));
395
396     if (!cap->running_task) {
397         // It's free; just grab it
398         cap->running_task = task;
399         RELEASE_LOCK(&cap->lock);
400     } else {
401         newReturningTask(cap,task);
402         RELEASE_LOCK(&cap->lock);
403
404         for (;;) {
405             ACQUIRE_LOCK(&task->lock);
406             // task->lock held, cap->lock not held
407             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
408             cap = task->cap;
409             task->wakeup = rtsFalse;
410             RELEASE_LOCK(&task->lock);
411
412             // now check whether we should wake up...
413             ACQUIRE_LOCK(&cap->lock);
414             if (cap->running_task == NULL) {
415                 if (cap->returning_tasks_hd != task) {
416                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
417                     RELEASE_LOCK(&cap->lock);
418                     continue;
419                 }
420                 cap->running_task = task;
421                 popReturningTask(cap);
422                 RELEASE_LOCK(&cap->lock);
423                 break;
424             }
425             RELEASE_LOCK(&cap->lock);
426         }
427
428     }
429
430     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
431
432     IF_DEBUG(scheduler,
433              sched_belch("returning; got capability %d", cap->no));
434
435     *pCap = cap;
436 #endif
437 }
438
439 #if defined(THREADED_RTS)
440 /* ----------------------------------------------------------------------------
441  * yieldCapability
442  * ------------------------------------------------------------------------- */
443
444 void
445 yieldCapability (Capability** pCap, Task *task)
446 {
447     Capability *cap = *pCap;
448
449     // The fast path has no locking, if we don't enter this while loop
450
451     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
452         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
453
454         // We must now release the capability and wait to be woken up
455         // again.
456         task->wakeup = rtsFalse;
457         releaseCapabilityAndQueueWorker(cap);
458
459         for (;;) {
460             ACQUIRE_LOCK(&task->lock);
461             // task->lock held, cap->lock not held
462             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
463             cap = task->cap;
464             task->wakeup = rtsFalse;
465             RELEASE_LOCK(&task->lock);
466
467             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
468             ACQUIRE_LOCK(&cap->lock);
469             if (cap->running_task != NULL) {
470                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
471                 RELEASE_LOCK(&cap->lock);
472                 continue;
473             }
474
475             if (task->tso == NULL) {
476                 ASSERT(cap->spare_workers != NULL);
477                 // if we're not at the front of the queue, release it
478                 // again.  This is unlikely to happen.
479                 if (cap->spare_workers != task) {
480                     giveCapabilityToTask(cap,cap->spare_workers);
481                     RELEASE_LOCK(&cap->lock);
482                     continue;
483                 }
484                 cap->spare_workers = task->next;
485                 task->next = NULL;
486             }
487             cap->running_task = task;
488             RELEASE_LOCK(&cap->lock);
489             break;
490         }
491
492         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
493         ASSERT(cap->running_task == task);
494     }
495
496     *pCap = cap;
497
498     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
499
500     return;
501 }
502
503 /* ----------------------------------------------------------------------------
504  * prodCapabilities
505  *
506  * Used to indicate that the interrupted flag is now set, or some
507  * other global condition that might require waking up a Task on each
508  * Capability.
509  * ------------------------------------------------------------------------- */
510
511 static void
512 prodCapabilities(rtsBool all)
513 {
514     nat i;
515     Capability *cap;
516     Task *task;
517
518     for (i=0; i < n_capabilities; i++) {
519         cap = &capabilities[i];
520         ACQUIRE_LOCK(&cap->lock);
521         if (!cap->running_task) {
522             if (cap->spare_workers) {
523                 task = cap->spare_workers;
524                 ASSERT(!task->stopped);
525                 giveCapabilityToTask(cap,task);
526                 if (!all) {
527                     RELEASE_LOCK(&cap->lock);
528                     return;
529                 }
530             }
531         }
532         RELEASE_LOCK(&cap->lock);
533     }
534     return;
535 }
536
537 void
538 prodAllCapabilities (void)
539 {
540     prodCapabilities(rtsTrue);
541 }
542
543 /* ----------------------------------------------------------------------------
544  * prodOneCapability
545  *
546  * Like prodAllCapabilities, but we only require a single Task to wake
547  * up in order to service some global event, such as checking for
548  * deadlock after some idle time has passed.
549  * ------------------------------------------------------------------------- */
550
551 void
552 prodOneCapability (void)
553 {
554     prodCapabilities(rtsFalse);
555 }
556
557 /* ----------------------------------------------------------------------------
558  * shutdownCapability
559  *
560  * At shutdown time, we want to let everything exit as cleanly as
561  * possible.  For each capability, we let its run queue drain, and
562  * allow the workers to stop.
563  *
564  * This function should be called when interrupted and
565  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
566  * will exit the scheduler and call taskStop(), and any bound thread
567  * that wakes up will return to its caller.  Runnable threads are
568  * killed.
569  *
570  * ------------------------------------------------------------------------- */
571
572 void
573 shutdownCapability (Capability *cap, Task *task)
574 {
575     nat i;
576
577     ASSERT(interrupted && shutting_down_scheduler);
578
579     task->cap = cap;
580
581     for (i = 0; i < 50; i++) {
582         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
583         ACQUIRE_LOCK(&cap->lock);
584         if (cap->running_task) {
585             RELEASE_LOCK(&cap->lock);
586             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
587             yieldThread();
588             continue;
589         }
590         cap->running_task = task;
591         if (!emptyRunQueue(cap) || cap->spare_workers) {
592             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
593             releaseCapability_(cap); // this will wake up a worker
594             RELEASE_LOCK(&cap->lock);
595             yieldThread();
596             continue;
597         }
598         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
599         RELEASE_LOCK(&cap->lock);
600         break;
601     }
602     // we now have the Capability, its run queue and spare workers
603     // list are both empty.
604 }
605
606 /* ----------------------------------------------------------------------------
607  * tryGrabCapability
608  *
609  * Attempt to gain control of a Capability if it is free.
610  *
611  * ------------------------------------------------------------------------- */
612
613 rtsBool
614 tryGrabCapability (Capability *cap, Task *task)
615 {
616     if (cap->running_task != NULL) return rtsFalse;
617     ACQUIRE_LOCK(&cap->lock);
618     if (cap->running_task != NULL) {
619         RELEASE_LOCK(&cap->lock);
620         return rtsFalse;
621     }
622     task->cap = cap;
623     cap->running_task = task;
624     RELEASE_LOCK(&cap->lock);
625     return rtsTrue;
626 }
627
628
629 #endif /* THREADED_RTS */
630
631