[project @ 2005-11-21 15:58:47 by tharris]
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 #if !defined(SMP)
30 Capability MainCapability;     // for non-SMP, we have one global capability
31 #endif
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #ifdef SMP
43 #define UNUSED_IF_NOT_SMP
44 #else
45 #define UNUSED_IF_NOT_SMP STG_UNUSED
46 #endif
47
48 #ifdef RTS_USER_SIGNALS
49 #define UNUSED_IF_NOT_THREADS
50 #else
51 #define UNUSED_IF_NOT_THREADS STG_UNUSED
52 #endif
53
54
55 STATIC_INLINE rtsBool
56 globalWorkToDo (void)
57 {
58     return blackholes_need_checking
59         || interrupted
60 #if defined(RTS_USER_SIGNALS)
61         || signals_pending()
62 #endif
63         ;
64 }
65
66 #if defined(THREADED_RTS)
67 STATIC_INLINE rtsBool
68 anyWorkForMe( Capability *cap, Task *task )
69 {
70     // If the run queue is not empty, then we only wake up the guy who
71     // can run the thread at the head, even if there is some other
72     // reason for this task to run (eg. interrupted=rtsTrue).
73     if (!emptyRunQueue(cap)) {
74         if (cap->run_queue_hd->bound == NULL) {
75             return (task->tso == NULL);
76         } else {
77             return (cap->run_queue_hd->bound == task);
78         }
79     } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
80         return rtsTrue;
81     }
82     return globalWorkToDo();
83 }
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Manage the returning_tasks lists.
88  *
89  * These functions require cap->lock
90  * -------------------------------------------------------------------------- */
91
92 #if defined(THREADED_RTS)
93 STATIC_INLINE void
94 newReturningTask (Capability *cap, Task *task)
95 {
96     ASSERT_LOCK_HELD(&cap->lock);
97     ASSERT(task->return_link == NULL);
98     if (cap->returning_tasks_hd) {
99         ASSERT(cap->returning_tasks_tl->return_link == NULL);
100         cap->returning_tasks_tl->return_link = task;
101     } else {
102         cap->returning_tasks_hd = task;
103     }
104     cap->returning_tasks_tl = task;
105 }
106
107 STATIC_INLINE Task *
108 popReturningTask (Capability *cap)
109 {
110     ASSERT_LOCK_HELD(&cap->lock);
111     Task *task;
112     task = cap->returning_tasks_hd;
113     ASSERT(task);
114     cap->returning_tasks_hd = task->return_link;
115     if (!cap->returning_tasks_hd) {
116         cap->returning_tasks_tl = NULL;
117     }
118     task->return_link = NULL;
119     return task;
120 }
121 #endif
122
123 /* ----------------------------------------------------------------------------
124  * Initialisation
125  *
126  * The Capability is initially marked not free.
127  * ------------------------------------------------------------------------- */
128
129 static void
130 initCapability( Capability *cap, nat i )
131 {
132     nat g;
133
134     cap->no = i;
135     cap->in_haskell        = rtsFalse;
136
137     cap->run_queue_hd      = END_TSO_QUEUE;
138     cap->run_queue_tl      = END_TSO_QUEUE;
139
140 #if defined(THREADED_RTS)
141     initMutex(&cap->lock);
142     cap->running_task      = NULL; // indicates cap is free
143     cap->spare_workers     = NULL;
144     cap->suspended_ccalling_tasks = NULL;
145     cap->returning_tasks_hd = NULL;
146     cap->returning_tasks_tl = NULL;
147 #endif
148
149     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
150     cap->f.stgGCFun        = (F_)__stg_gc_fun;
151
152     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
153                                      RtsFlags.GcFlags.generations,
154                                      "initCapability");
155
156     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
157         cap->mut_lists[g] = NULL;
158     }
159
160     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
161     cap->free_trec_chunks = END_STM_CHUNK_LIST;
162     cap->free_trec_headers = NO_TREC;
163     cap->transaction_tokens = 0;
164 }
165
166 /* ---------------------------------------------------------------------------
167  * Function:  initCapabilities()
168  *
169  * Purpose:   set up the Capability handling. For the SMP build,
170  *            we keep a table of them, the size of which is
171  *            controlled by the user via the RTS flag -N.
172  *
173  * ------------------------------------------------------------------------- */
174 void
175 initCapabilities( void )
176 {
177 #if defined(SMP)
178     nat i,n;
179
180     n_capabilities = n = RtsFlags.ParFlags.nNodes;
181     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
182
183     for (i = 0; i < n; i++) {
184         initCapability(&capabilities[i], i);
185     }
186
187     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
188 #else
189     n_capabilities = 1;
190     capabilities = &MainCapability;
191     initCapability(&MainCapability, 0);
192 #endif
193
194     // There are no free capabilities to begin with.  We will start
195     // a worker Task to each Capability, which will quickly put the
196     // Capability on the free list when it finds nothing to do.
197     last_free_capability = &capabilities[0];
198 }
199
200 /* ----------------------------------------------------------------------------
201  * Give a Capability to a Task.  The task must currently be sleeping
202  * on its condition variable.
203  *
204  * Requires cap->lock (modifies cap->running_task).
205  *
206  * When migrating a Task, the migrater must take task->lock before
207  * modifying task->cap, to synchronise with the waking up Task.
208  * Additionally, the migrater should own the Capability (when
209  * migrating the run queue), or cap->lock (when migrating
210  * returning_workers).
211  *
212  * ------------------------------------------------------------------------- */
213
214 #if defined(THREADED_RTS)
215 STATIC_INLINE void
216 giveCapabilityToTask (Capability *cap, Task *task)
217 {
218     ASSERT_LOCK_HELD(&cap->lock);
219     ASSERT(task->cap == cap);
220     // We are not modifying task->cap, so we do not need to take task->lock.
221     IF_DEBUG(scheduler,
222              sched_belch("passing capability %d to %s %p",
223                          cap->no, task->tso ? "bound task" : "worker",
224                          (void *)task->id));
225     ACQUIRE_LOCK(&task->lock);
226     task->wakeup = rtsTrue;
227     // the wakeup flag is needed because signalCondition() doesn't
228     // flag the condition if the thread is already runniing, but we want
229     // it to be sticky.
230     signalCondition(&task->cond);
231     RELEASE_LOCK(&task->lock);
232 }
233 #endif
234
235 /* ----------------------------------------------------------------------------
236  * Function:  releaseCapability(Capability*)
237  *
238  * Purpose:   Letting go of a capability. Causes a
239  *            'returning worker' thread or a 'waiting worker'
240  *            to wake up, in that order.
241  * ------------------------------------------------------------------------- */
242
243 #if defined(THREADED_RTS)
244 void
245 releaseCapability_ (Capability* cap)
246 {
247     Task *task;
248
249     task = cap->running_task;
250
251     ASSERT_CAPABILITY_INVARIANTS(cap,task);
252
253     cap->running_task = NULL;
254
255     // Check to see whether a worker thread can be given
256     // the go-ahead to return the result of an external call..
257     if (cap->returning_tasks_hd != NULL) {
258         giveCapabilityToTask(cap,cap->returning_tasks_hd);
259         // The Task pops itself from the queue (see waitForReturnCapability())
260         return;
261     }
262
263     // If the next thread on the run queue is a bound thread,
264     // give this Capability to the appropriate Task.
265     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
266         // Make sure we're not about to try to wake ourselves up
267         ASSERT(task != cap->run_queue_hd->bound);
268         task = cap->run_queue_hd->bound;
269         giveCapabilityToTask(cap,task);
270         return;
271     }
272
273     // If we have an unbound thread on the run queue, or if there's
274     // anything else to do, give the Capability to a worker thread.
275     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
276         if (cap->spare_workers) {
277             giveCapabilityToTask(cap,cap->spare_workers);
278             // The worker Task pops itself from the queue;
279             return;
280         }
281
282         // Create a worker thread if we don't have one.  If the system
283         // is interrupted, we only create a worker task if there
284         // are threads that need to be completed.  If the system is
285         // shutting down, we never create a new worker.
286         if (!shutting_down_scheduler) {
287             IF_DEBUG(scheduler,
288                      sched_belch("starting new worker on capability %d", cap->no));
289             startWorkerTask(cap, workerStart);
290             return;
291         }
292     }
293
294     last_free_capability = cap;
295     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
296 }
297
298 void
299 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
300 {
301     ACQUIRE_LOCK(&cap->lock);
302     releaseCapability_(cap);
303     RELEASE_LOCK(&cap->lock);
304 }
305
306 static void
307 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
308 {
309     Task *task;
310
311     ACQUIRE_LOCK(&cap->lock);
312
313     task = cap->running_task;
314
315     // If the current task is a worker, save it on the spare_workers
316     // list of this Capability.  A worker can mark itself as stopped,
317     // in which case it is not replaced on the spare_worker queue.
318     // This happens when the system is shutting down (see
319     // Schedule.c:workerStart()).
320     // Also, be careful to check that this task hasn't just exited
321     // Haskell to do a foreign call (task->suspended_tso).
322     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
323         task->next = cap->spare_workers;
324         cap->spare_workers = task;
325     }
326     // Bound tasks just float around attached to their TSOs.
327
328     releaseCapability_(cap);
329
330     RELEASE_LOCK(&cap->lock);
331 }
332 #endif
333
334 /* ----------------------------------------------------------------------------
335  * waitForReturnCapability( Task *task )
336  *
337  * Purpose:  when an OS thread returns from an external call,
338  * it calls waitForReturnCapability() (via Schedule.resumeThread())
339  * to wait for permission to enter the RTS & communicate the
340  * result of the external call back to the Haskell thread that
341  * made it.
342  *
343  * ------------------------------------------------------------------------- */
344 void
345 waitForReturnCapability (Capability **pCap,
346                          Task *task UNUSED_IF_NOT_THREADS)
347 {
348 #if !defined(THREADED_RTS)
349
350     MainCapability.running_task = task;
351     task->cap = &MainCapability;
352     *pCap = &MainCapability;
353
354 #else
355     Capability *cap = *pCap;
356
357     if (cap == NULL) {
358         // Try last_free_capability first
359         cap = last_free_capability;
360         if (!cap->running_task) {
361             nat i;
362             // otherwise, search for a free capability
363             for (i = 0; i < n_capabilities; i++) {
364                 cap = &capabilities[i];
365                 if (!cap->running_task) {
366                     break;
367                 }
368             }
369             // Can't find a free one, use last_free_capability.
370             cap = last_free_capability;
371         }
372
373         // record the Capability as the one this Task is now assocated with.
374         task->cap = cap;
375
376     } else {
377         ASSERT(task->cap == cap);
378     }
379
380     ACQUIRE_LOCK(&cap->lock);
381
382     IF_DEBUG(scheduler,
383              sched_belch("returning; I want capability %d", cap->no));
384
385     if (!cap->running_task) {
386         // It's free; just grab it
387         cap->running_task = task;
388         RELEASE_LOCK(&cap->lock);
389     } else {
390         newReturningTask(cap,task);
391         RELEASE_LOCK(&cap->lock);
392
393         for (;;) {
394             ACQUIRE_LOCK(&task->lock);
395             // task->lock held, cap->lock not held
396             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
397             cap = task->cap;
398             task->wakeup = rtsFalse;
399             RELEASE_LOCK(&task->lock);
400
401             // now check whether we should wake up...
402             ACQUIRE_LOCK(&cap->lock);
403             if (cap->running_task == NULL) {
404                 if (cap->returning_tasks_hd != task) {
405                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
406                     RELEASE_LOCK(&cap->lock);
407                     continue;
408                 }
409                 cap->running_task = task;
410                 popReturningTask(cap);
411                 RELEASE_LOCK(&cap->lock);
412                 break;
413             }
414             RELEASE_LOCK(&cap->lock);
415         }
416
417     }
418
419     ASSERT_CAPABILITY_INVARIANTS(cap,task);
420
421     IF_DEBUG(scheduler,
422              sched_belch("returning; got capability %d", cap->no));
423
424     *pCap = cap;
425 #endif
426 }
427
428 #if defined(THREADED_RTS)
429 /* ----------------------------------------------------------------------------
430  * yieldCapability
431  * ------------------------------------------------------------------------- */
432
433 void
434 yieldCapability (Capability** pCap, Task *task)
435 {
436     Capability *cap = *pCap;
437
438     // The fast path; no locking
439     if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
440         return;
441
442     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
443         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
444
445         // We must now release the capability and wait to be woken up
446         // again.
447         releaseCapabilityAndQueueWorker(cap);
448
449         for (;;) {
450             ACQUIRE_LOCK(&task->lock);
451             // task->lock held, cap->lock not held
452             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
453             cap = task->cap;
454             task->wakeup = rtsFalse;
455             RELEASE_LOCK(&task->lock);
456
457             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
458             ACQUIRE_LOCK(&cap->lock);
459             if (cap->running_task != NULL) {
460                 RELEASE_LOCK(&cap->lock);
461                 continue;
462             }
463
464             if (task->tso == NULL) {
465                 ASSERT(cap->spare_workers != NULL);
466                 // if we're not at the front of the queue, release it
467                 // again.  This is unlikely to happen.
468                 if (cap->spare_workers != task) {
469                     giveCapabilityToTask(cap,cap->spare_workers);
470                     RELEASE_LOCK(&cap->lock);
471                     continue;
472                 }
473                 cap->spare_workers = task->next;
474                 task->next = NULL;
475             }
476             cap->running_task = task;
477             RELEASE_LOCK(&cap->lock);
478             break;
479         }
480
481         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
482         ASSERT(cap->running_task == task);
483     }
484
485     *pCap = cap;
486
487     ASSERT_CAPABILITY_INVARIANTS(cap,task);
488
489     return;
490 }
491
492 /* ----------------------------------------------------------------------------
493  * prodCapabilities
494  *
495  * Used to indicate that the interrupted flag is now set, or some
496  * other global condition that might require waking up a Task on each
497  * Capability.
498  * ------------------------------------------------------------------------- */
499
500 static void
501 prodCapabilities(rtsBool all)
502 {
503     nat i;
504     Capability *cap;
505     Task *task;
506
507     for (i=0; i < n_capabilities; i++) {
508         cap = &capabilities[i];
509         ACQUIRE_LOCK(&cap->lock);
510         if (!cap->running_task) {
511             if (cap->spare_workers) {
512                 task = cap->spare_workers;
513                 ASSERT(!task->stopped);
514                 giveCapabilityToTask(cap,task);
515                 if (!all) {
516                     RELEASE_LOCK(&cap->lock);
517                     return;
518                 }
519             }
520         }
521         RELEASE_LOCK(&cap->lock);
522     }
523 }
524
525 void
526 prodAllCapabilities (void)
527 {
528     prodCapabilities(rtsTrue);
529 }
530
531 /* ----------------------------------------------------------------------------
532  * prodOneCapability
533  *
534  * Like prodAllCapabilities, but we only require a single Task to wake
535  * up in order to service some global event, such as checking for
536  * deadlock after some idle time has passed.
537  * ------------------------------------------------------------------------- */
538
539 void
540 prodOneCapability (void)
541 {
542     prodCapabilities(rtsFalse);
543 }
544
545 /* ----------------------------------------------------------------------------
546  * shutdownCapability
547  *
548  * At shutdown time, we want to let everything exit as cleanly as
549  * possible.  For each capability, we let its run queue drain, and
550  * allow the workers to stop.
551  *
552  * This function should be called when interrupted and
553  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
554  * will exit the scheduler and call taskStop(), and any bound thread
555  * that wakes up will return to its caller.  Runnable threads are
556  * killed.
557  *
558  * ------------------------------------------------------------------------- */
559
560 void
561 shutdownCapability (Capability *cap, Task *task)
562 {
563     nat i;
564
565     ASSERT(interrupted && shutting_down_scheduler);
566
567     task->cap = cap;
568
569     for (i = 0; i < 50; i++) {
570         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
571         ACQUIRE_LOCK(&cap->lock);
572         if (cap->running_task) {
573             RELEASE_LOCK(&cap->lock);
574             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
575             yieldThread();
576             continue;
577         }
578         cap->running_task = task;
579         if (!emptyRunQueue(cap) || cap->spare_workers) {
580             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
581             releaseCapability_(cap); // this will wake up a worker
582             RELEASE_LOCK(&cap->lock);
583             yieldThread();
584             continue;
585         }
586         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
587         RELEASE_LOCK(&cap->lock);
588         break;
589     }
590     // we now have the Capability, its run queue and spare workers
591     // list are both empty.
592 }
593
594 /* ----------------------------------------------------------------------------
595  * tryGrabCapability
596  *
597  * Attempt to gain control of a Capability if it is free.
598  *
599  * ------------------------------------------------------------------------- */
600
601 rtsBool
602 tryGrabCapability (Capability *cap, Task *task)
603 {
604     if (cap->running_task != NULL) return rtsFalse;
605     ACQUIRE_LOCK(&cap->lock);
606     if (cap->running_task != NULL) {
607         RELEASE_LOCK(&cap->lock);
608         return rtsFalse;
609     }
610     task->cap = cap;
611     cap->running_task = task;
612     RELEASE_LOCK(&cap->lock);
613     return rtsTrue;
614 }
615
616
617 #endif /* THREADED_RTS */
618
619