5ca2f518fe0782db77d23cf224e0154ec275208a
[ghc-hetmet.git] / ghc / rts / Capability.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2005
4  *
5  * Capabilities
6  *
7  * A Capability represent the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in an SMP build will there be multiple capabilities, for
14  * the threaded RTS and other non-threaded builds, there is only
15  * one global capability, namely MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21 #include "RtsUtils.h"
22 #include "RtsFlags.h"
23 #include "STM.h"
24 #include "OSThreads.h"
25 #include "Capability.h"
26 #include "Schedule.h"
27 #include "Sparks.h"
28
29 #if !defined(SMP)
30 Capability MainCapability;     // for non-SMP, we have one global capability
31 #endif
32
33 nat n_capabilities;
34 Capability *capabilities = NULL;
35
36 // Holds the Capability which last became free.  This is used so that
37 // an in-call has a chance of quickly finding a free Capability.
38 // Maintaining a global free list of Capabilities would require global
39 // locking, so we don't do that.
40 Capability *last_free_capability;
41
42 #ifdef SMP
43 #define UNUSED_IF_NOT_SMP
44 #else
45 #define UNUSED_IF_NOT_SMP STG_UNUSED
46 #endif
47
48 #ifdef RTS_USER_SIGNALS
49 #define UNUSED_IF_NOT_THREADS
50 #else
51 #define UNUSED_IF_NOT_THREADS STG_UNUSED
52 #endif
53
54
55 STATIC_INLINE rtsBool
56 globalWorkToDo (void)
57 {
58     return blackholes_need_checking
59         || interrupted
60 #if defined(RTS_USER_SIGNALS)
61         || signals_pending()
62 #endif
63         ;
64 }
65
66 #if defined(THREADED_RTS)
67 STATIC_INLINE rtsBool
68 anyWorkForMe( Capability *cap, Task *task )
69 {
70     // If the run queue is not empty, then we only wake up the guy who
71     // can run the thread at the head, even if there is some other
72     // reason for this task to run (eg. interrupted=rtsTrue).
73     if (!emptyRunQueue(cap)) {
74         if (cap->run_queue_hd->bound == NULL) {
75             return (task->tso == NULL);
76         } else {
77             return (cap->run_queue_hd->bound == task);
78         }
79     } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
80         return rtsTrue;
81     }
82     return globalWorkToDo();
83 }
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Manage the returning_tasks lists.
88  *
89  * These functions require cap->lock
90  * -------------------------------------------------------------------------- */
91
92 #if defined(THREADED_RTS)
93 STATIC_INLINE void
94 newReturningTask (Capability *cap, Task *task)
95 {
96     ASSERT_LOCK_HELD(&cap->lock);
97     ASSERT(task->return_link == NULL);
98     if (cap->returning_tasks_hd) {
99         ASSERT(cap->returning_tasks_tl->return_link == NULL);
100         cap->returning_tasks_tl->return_link = task;
101     } else {
102         cap->returning_tasks_hd = task;
103     }
104     cap->returning_tasks_tl = task;
105 }
106
107 STATIC_INLINE Task *
108 popReturningTask (Capability *cap)
109 {
110     ASSERT_LOCK_HELD(&cap->lock);
111     Task *task;
112     task = cap->returning_tasks_hd;
113     ASSERT(task);
114     cap->returning_tasks_hd = task->return_link;
115     if (!cap->returning_tasks_hd) {
116         cap->returning_tasks_tl = NULL;
117     }
118     task->return_link = NULL;
119     return task;
120 }
121 #endif
122
123 /* ----------------------------------------------------------------------------
124  * Initialisation
125  *
126  * The Capability is initially marked not free.
127  * ------------------------------------------------------------------------- */
128
129 static void
130 initCapability( Capability *cap, nat i )
131 {
132     nat g;
133
134     cap->no = i;
135     cap->in_haskell        = rtsFalse;
136
137     cap->run_queue_hd      = END_TSO_QUEUE;
138     cap->run_queue_tl      = END_TSO_QUEUE;
139
140 #if defined(THREADED_RTS)
141     initMutex(&cap->lock);
142     cap->running_task      = NULL; // indicates cap is free
143     cap->spare_workers     = NULL;
144     cap->suspended_ccalling_tasks = NULL;
145     cap->returning_tasks_hd = NULL;
146     cap->returning_tasks_tl = NULL;
147 #endif
148
149     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
150     cap->f.stgGCFun        = (F_)__stg_gc_fun;
151
152     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
153                                      RtsFlags.GcFlags.generations,
154                                      "initCapability");
155
156     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
157         cap->mut_lists[g] = NULL;
158     }
159
160     cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
161     cap->free_trec_chunks = END_STM_CHUNK_LIST;
162     cap->free_trec_headers = NO_TREC;
163     cap->transaction_tokens = 0;
164 }
165
166 /* ---------------------------------------------------------------------------
167  * Function:  initCapabilities()
168  *
169  * Purpose:   set up the Capability handling. For the SMP build,
170  *            we keep a table of them, the size of which is
171  *            controlled by the user via the RTS flag -N.
172  *
173  * ------------------------------------------------------------------------- */
174 void
175 initCapabilities( void )
176 {
177 #if defined(SMP)
178     nat i,n;
179
180     n_capabilities = n = RtsFlags.ParFlags.nNodes;
181     capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
182
183     for (i = 0; i < n; i++) {
184         initCapability(&capabilities[i], i);
185     }
186
187     IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
188 #else
189     n_capabilities = 1;
190     capabilities = &MainCapability;
191     initCapability(&MainCapability, 0);
192 #endif
193
194     // There are no free capabilities to begin with.  We will start
195     // a worker Task to each Capability, which will quickly put the
196     // Capability on the free list when it finds nothing to do.
197     last_free_capability = &capabilities[0];
198 }
199
200 /* ----------------------------------------------------------------------------
201  * Give a Capability to a Task.  The task must currently be sleeping
202  * on its condition variable.
203  *
204  * Requires cap->lock (modifies cap->running_task).
205  *
206  * When migrating a Task, the migrater must take task->lock before
207  * modifying task->cap, to synchronise with the waking up Task.
208  * Additionally, the migrater should own the Capability (when
209  * migrating the run queue), or cap->lock (when migrating
210  * returning_workers).
211  *
212  * ------------------------------------------------------------------------- */
213
214 #if defined(THREADED_RTS)
215 STATIC_INLINE void
216 giveCapabilityToTask (Capability *cap, Task *task)
217 {
218     ASSERT_LOCK_HELD(&cap->lock);
219     ASSERT(task->cap == cap);
220     IF_DEBUG(scheduler,
221              sched_belch("passing capability %d to %s %p",
222                          cap->no, task->tso ? "bound task" : "worker",
223                          (void *)task->id));
224     ACQUIRE_LOCK(&task->lock);
225     task->wakeup = rtsTrue;
226     // the wakeup flag is needed because signalCondition() doesn't
227     // flag the condition if the thread is already runniing, but we want
228     // it to be sticky.
229     signalCondition(&task->cond);
230     RELEASE_LOCK(&task->lock);
231 }
232 #endif
233
234 /* ----------------------------------------------------------------------------
235  * Function:  releaseCapability(Capability*)
236  *
237  * Purpose:   Letting go of a capability. Causes a
238  *            'returning worker' thread or a 'waiting worker'
239  *            to wake up, in that order.
240  * ------------------------------------------------------------------------- */
241
242 #if defined(THREADED_RTS)
243 void
244 releaseCapability_ (Capability* cap)
245 {
246     Task *task;
247
248     task = cap->running_task;
249
250     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
251
252     cap->running_task = NULL;
253
254     // Check to see whether a worker thread can be given
255     // the go-ahead to return the result of an external call..
256     if (cap->returning_tasks_hd != NULL) {
257         giveCapabilityToTask(cap,cap->returning_tasks_hd);
258         // The Task pops itself from the queue (see waitForReturnCapability())
259         return;
260     }
261
262     // If the next thread on the run queue is a bound thread,
263     // give this Capability to the appropriate Task.
264     if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
265         // Make sure we're not about to try to wake ourselves up
266         ASSERT(task != cap->run_queue_hd->bound);
267         task = cap->run_queue_hd->bound;
268         giveCapabilityToTask(cap,task);
269         return;
270     }
271
272     // If we have an unbound thread on the run queue, or if there's
273     // anything else to do, give the Capability to a worker thread.
274     if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
275         if (cap->spare_workers) {
276             giveCapabilityToTask(cap,cap->spare_workers);
277             // The worker Task pops itself from the queue;
278             return;
279         }
280
281         // Create a worker thread if we don't have one.  If the system
282         // is interrupted, we only create a worker task if there
283         // are threads that need to be completed.  If the system is
284         // shutting down, we never create a new worker.
285         if (!shutting_down_scheduler) {
286             IF_DEBUG(scheduler,
287                      sched_belch("starting new worker on capability %d", cap->no));
288             startWorkerTask(cap, workerStart);
289             return;
290         }
291     }
292
293     last_free_capability = cap;
294     IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
295 }
296
297 void
298 releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
299 {
300     ACQUIRE_LOCK(&cap->lock);
301     releaseCapability_(cap);
302     RELEASE_LOCK(&cap->lock);
303 }
304
305 static void
306 releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
307 {
308     Task *task;
309
310     ACQUIRE_LOCK(&cap->lock);
311
312     task = cap->running_task;
313
314     // If the current task is a worker, save it on the spare_workers
315     // list of this Capability.  A worker can mark itself as stopped,
316     // in which case it is not replaced on the spare_worker queue.
317     // This happens when the system is shutting down (see
318     // Schedule.c:workerStart()).
319     // Also, be careful to check that this task hasn't just exited
320     // Haskell to do a foreign call (task->suspended_tso).
321     if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
322         task->next = cap->spare_workers;
323         cap->spare_workers = task;
324     }
325     // Bound tasks just float around attached to their TSOs.
326
327     releaseCapability_(cap);
328
329     RELEASE_LOCK(&cap->lock);
330 }
331 #endif
332
333 /* ----------------------------------------------------------------------------
334  * waitForReturnCapability( Task *task )
335  *
336  * Purpose:  when an OS thread returns from an external call,
337  * it calls waitForReturnCapability() (via Schedule.resumeThread())
338  * to wait for permission to enter the RTS & communicate the
339  * result of the external call back to the Haskell thread that
340  * made it.
341  *
342  * ------------------------------------------------------------------------- */
343 void
344 waitForReturnCapability (Capability **pCap,
345                          Task *task UNUSED_IF_NOT_THREADS)
346 {
347 #if !defined(THREADED_RTS)
348
349     MainCapability.running_task = task;
350     task->cap = &MainCapability;
351     *pCap = &MainCapability;
352
353 #else
354     Capability *cap = *pCap;
355
356     if (cap == NULL) {
357         // Try last_free_capability first
358         cap = last_free_capability;
359         if (!cap->running_task) {
360             nat i;
361             // otherwise, search for a free capability
362             for (i = 0; i < n_capabilities; i++) {
363                 cap = &capabilities[i];
364                 if (!cap->running_task) {
365                     break;
366                 }
367             }
368             // Can't find a free one, use last_free_capability.
369             cap = last_free_capability;
370         }
371
372         // record the Capability as the one this Task is now assocated with.
373         task->cap = cap;
374
375     } else {
376         ASSERT(task->cap == cap);
377     }
378
379     ACQUIRE_LOCK(&cap->lock);
380
381     IF_DEBUG(scheduler,
382              sched_belch("returning; I want capability %d", cap->no));
383
384     if (!cap->running_task) {
385         // It's free; just grab it
386         cap->running_task = task;
387         RELEASE_LOCK(&cap->lock);
388     } else {
389         newReturningTask(cap,task);
390         RELEASE_LOCK(&cap->lock);
391
392         for (;;) {
393             ACQUIRE_LOCK(&task->lock);
394             // task->lock held, cap->lock not held
395             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
396             cap = task->cap;
397             task->wakeup = rtsFalse;
398             RELEASE_LOCK(&task->lock);
399
400             // now check whether we should wake up...
401             ACQUIRE_LOCK(&cap->lock);
402             if (cap->running_task == NULL) {
403                 if (cap->returning_tasks_hd != task) {
404                     giveCapabilityToTask(cap,cap->returning_tasks_hd);
405                     RELEASE_LOCK(&cap->lock);
406                     continue;
407                 }
408                 cap->running_task = task;
409                 popReturningTask(cap);
410                 RELEASE_LOCK(&cap->lock);
411                 break;
412             }
413             RELEASE_LOCK(&cap->lock);
414         }
415
416     }
417
418     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
419
420     IF_DEBUG(scheduler,
421              sched_belch("returning; got capability %d", cap->no));
422
423     *pCap = cap;
424 #endif
425 }
426
427 #if defined(THREADED_RTS)
428 /* ----------------------------------------------------------------------------
429  * yieldCapability
430  * ------------------------------------------------------------------------- */
431
432 void
433 yieldCapability (Capability** pCap, Task *task)
434 {
435     Capability *cap = *pCap;
436
437     // The fast path has no locking, if we don't enter this while loop
438
439     while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
440         IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
441
442         // We must now release the capability and wait to be woken up
443         // again.
444         task->wakeup = rtsFalse;
445         releaseCapabilityAndQueueWorker(cap);
446
447         for (;;) {
448             ACQUIRE_LOCK(&task->lock);
449             // task->lock held, cap->lock not held
450             if (!task->wakeup) waitCondition(&task->cond, &task->lock);
451             cap = task->cap;
452             task->wakeup = rtsFalse;
453             RELEASE_LOCK(&task->lock);
454
455             IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
456             ACQUIRE_LOCK(&cap->lock);
457             if (cap->running_task != NULL) {
458                 IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
459                 RELEASE_LOCK(&cap->lock);
460                 continue;
461             }
462
463             if (task->tso == NULL) {
464                 ASSERT(cap->spare_workers != NULL);
465                 // if we're not at the front of the queue, release it
466                 // again.  This is unlikely to happen.
467                 if (cap->spare_workers != task) {
468                     giveCapabilityToTask(cap,cap->spare_workers);
469                     RELEASE_LOCK(&cap->lock);
470                     continue;
471                 }
472                 cap->spare_workers = task->next;
473                 task->next = NULL;
474             }
475             cap->running_task = task;
476             RELEASE_LOCK(&cap->lock);
477             break;
478         }
479
480         IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
481         ASSERT(cap->running_task == task);
482     }
483
484     *pCap = cap;
485
486     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
487
488     return;
489 }
490
491 /* ----------------------------------------------------------------------------
492  * prodCapabilities
493  *
494  * Used to indicate that the interrupted flag is now set, or some
495  * other global condition that might require waking up a Task on each
496  * Capability.
497  * ------------------------------------------------------------------------- */
498
499 static void
500 prodCapabilities(rtsBool all)
501 {
502     nat i;
503     Capability *cap;
504     Task *task;
505
506     for (i=0; i < n_capabilities; i++) {
507         cap = &capabilities[i];
508         ACQUIRE_LOCK(&cap->lock);
509         if (!cap->running_task) {
510             if (cap->spare_workers) {
511                 task = cap->spare_workers;
512                 ASSERT(!task->stopped);
513                 giveCapabilityToTask(cap,task);
514                 if (!all) {
515                     RELEASE_LOCK(&cap->lock);
516                     return;
517                 }
518             }
519         }
520         RELEASE_LOCK(&cap->lock);
521     }
522 }
523
524 void
525 prodAllCapabilities (void)
526 {
527     prodCapabilities(rtsTrue);
528 }
529
530 /* ----------------------------------------------------------------------------
531  * prodOneCapability
532  *
533  * Like prodAllCapabilities, but we only require a single Task to wake
534  * up in order to service some global event, such as checking for
535  * deadlock after some idle time has passed.
536  * ------------------------------------------------------------------------- */
537
538 void
539 prodOneCapability (void)
540 {
541     prodCapabilities(rtsFalse);
542 }
543
544 /* ----------------------------------------------------------------------------
545  * shutdownCapability
546  *
547  * At shutdown time, we want to let everything exit as cleanly as
548  * possible.  For each capability, we let its run queue drain, and
549  * allow the workers to stop.
550  *
551  * This function should be called when interrupted and
552  * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
553  * will exit the scheduler and call taskStop(), and any bound thread
554  * that wakes up will return to its caller.  Runnable threads are
555  * killed.
556  *
557  * ------------------------------------------------------------------------- */
558
559 void
560 shutdownCapability (Capability *cap, Task *task)
561 {
562     nat i;
563
564     ASSERT(interrupted && shutting_down_scheduler);
565
566     task->cap = cap;
567
568     for (i = 0; i < 50; i++) {
569         IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
570         ACQUIRE_LOCK(&cap->lock);
571         if (cap->running_task) {
572             RELEASE_LOCK(&cap->lock);
573             IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
574             yieldThread();
575             continue;
576         }
577         cap->running_task = task;
578         if (!emptyRunQueue(cap) || cap->spare_workers) {
579             IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
580             releaseCapability_(cap); // this will wake up a worker
581             RELEASE_LOCK(&cap->lock);
582             yieldThread();
583             continue;
584         }
585         IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
586         RELEASE_LOCK(&cap->lock);
587         break;
588     }
589     // we now have the Capability, its run queue and spare workers
590     // list are both empty.
591 }
592
593 /* ----------------------------------------------------------------------------
594  * tryGrabCapability
595  *
596  * Attempt to gain control of a Capability if it is free.
597  *
598  * ------------------------------------------------------------------------- */
599
600 rtsBool
601 tryGrabCapability (Capability *cap, Task *task)
602 {
603     if (cap->running_task != NULL) return rtsFalse;
604     ACQUIRE_LOCK(&cap->lock);
605     if (cap->running_task != NULL) {
606         RELEASE_LOCK(&cap->lock);
607         return rtsFalse;
608     }
609     task->cap = cap;
610     cap->running_task = task;
611     RELEASE_LOCK(&cap->lock);
612     return rtsTrue;
613 }
614
615
616 #endif /* THREADED_RTS */
617
618