8a289be0078f8a4d68af8b532a8ef9b219a74145
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29 static nat taskCount;
30 static int tasksInitialized = 0;
31
32 static void   freeTask  (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask   (rtsBool);
35
36 #if defined(THREADED_RTS)
37 static Mutex all_tasks_mutex;
38 #endif
39
40 /* -----------------------------------------------------------------------------
41  * Remembering the current thread's Task
42  * -------------------------------------------------------------------------- */
43
44 // A thread-local-storage key that we can use to get access to the
45 // current thread's Task structure.
46 #if defined(THREADED_RTS)
47 # if defined(MYTASK_USE_TLV)
48 __thread Task *my_task;
49 # else
50 ThreadLocalKey currentTaskKey;
51 # endif
52 #else
53 Task *my_task;
54 #endif
55
56 /* -----------------------------------------------------------------------------
57  * Rest of the Task API
58  * -------------------------------------------------------------------------- */
59
60 void
61 initTaskManager (void)
62 {
63     if (!tasksInitialized) {
64         taskCount = 0;
65         tasksInitialized = 1;
66 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
67         newThreadLocalKey(&currentTaskKey);
68         initMutex(&all_tasks_mutex);
69 #endif
70     }
71 }
72
73 nat
74 freeTaskManager (void)
75 {
76     Task *task, *next;
77     nat tasksRunning = 0;
78
79     ACQUIRE_LOCK(&all_tasks_mutex);
80
81     for (task = all_tasks; task != NULL; task = next) {
82         next = task->all_link;
83         if (task->stopped) {
84             freeTask(task);
85         } else {
86             tasksRunning++;
87         }
88     }
89
90     debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
91                tasksRunning);
92
93     all_tasks = NULL;
94
95     RELEASE_LOCK(&all_tasks_mutex);
96
97 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
98     closeMutex(&all_tasks_mutex); 
99     freeThreadLocalKey(&currentTaskKey);
100 #endif
101
102     tasksInitialized = 0;
103
104     return tasksRunning;
105 }
106
107 static Task *
108 allocTask (void)
109 {
110     Task *task;
111
112     task = myTask();
113     if (task != NULL) {
114         return task;
115     } else {
116         task = newTask(rtsFalse);
117 #if defined(THREADED_RTS)
118         task->id = osThreadId();
119 #endif
120         setMyTask(task);
121         return task;
122     }
123 }
124
125 static void
126 freeTask (Task *task)
127 {
128     InCall *incall, *next;
129
130     // We only free resources if the Task is not in use.  A
131     // Task may still be in use if we have a Haskell thread in
132     // a foreign call while we are attempting to shut down the
133     // RTS (see conc059).
134 #if defined(THREADED_RTS)
135     closeCondition(&task->cond);
136     closeMutex(&task->lock);
137 #endif
138
139     for (incall = task->incall; incall != NULL; incall = next) {
140         next = incall->prev_stack;
141         stgFree(incall);
142     }
143     for (incall = task->spare_incalls; incall != NULL; incall = next) {
144         next = incall->next;
145         stgFree(incall);
146     }
147
148     stgFree(task);
149 }
150
151 static Task*
152 newTask (rtsBool worker)
153 {
154 #if defined(THREADED_RTS)
155     Ticks currentElapsedTime, currentUserTime;
156 #endif
157     Task *task;
158
159 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
160     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
161     
162     task->cap           = NULL;
163     task->worker        = worker;
164     task->stopped       = rtsFalse;
165     task->running_finalizers = rtsFalse;
166     task->n_spare_incalls = 0;
167     task->spare_incalls = NULL;
168     task->incall        = NULL;
169     
170 #if defined(THREADED_RTS)
171     initCondition(&task->cond);
172     initMutex(&task->lock);
173     task->wakeup = rtsFalse;
174 #endif
175
176 #if defined(THREADED_RTS)
177     currentUserTime = getThreadCPUTime();
178     currentElapsedTime = getProcessElapsedTime();
179     task->mut_time = 0;
180     task->mut_etime = 0;
181     task->gc_time = 0;
182     task->gc_etime = 0;
183     task->muttimestart = currentUserTime;
184     task->elapsedtimestart = currentElapsedTime;
185 #endif
186
187     task->next = NULL;
188
189     ACQUIRE_LOCK(&all_tasks_mutex);
190
191     task->all_link = all_tasks;
192     all_tasks = task;
193
194     taskCount++;
195
196     RELEASE_LOCK(&all_tasks_mutex);
197
198     return task;
199 }
200
201 // avoid the spare_incalls list growing unboundedly
202 #define MAX_SPARE_INCALLS 8
203
204 static void
205 newInCall (Task *task)
206 {
207     InCall *incall;
208     
209     if (task->spare_incalls != NULL) {
210         incall = task->spare_incalls;
211         task->spare_incalls = incall->next;
212         task->n_spare_incalls--;
213     } else {
214         incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
215     }
216
217     incall->tso = NULL;
218     incall->task = task;
219     incall->suspended_tso = NULL;
220     incall->suspended_cap = NULL;
221     incall->stat          = NoStatus;
222     incall->ret           = NULL;
223     incall->next = NULL;
224     incall->prev = NULL;
225     incall->prev_stack = task->incall;
226     task->incall = incall;
227 }
228
229 static void
230 endInCall (Task *task)
231 {
232     InCall *incall;
233
234     incall = task->incall;
235     incall->tso = NULL;
236     task->incall = task->incall->prev_stack;
237
238     if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
239         stgFree(incall);
240     } else {
241         incall->next = task->spare_incalls;
242         task->spare_incalls = incall;
243         task->n_spare_incalls++;
244     }
245 }
246
247
248 Task *
249 newBoundTask (void)
250 {
251     Task *task;
252
253     if (!tasksInitialized) {
254         errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
255         stg_exit(EXIT_FAILURE);
256     }
257
258     task = allocTask();
259
260     task->stopped = rtsFalse;
261
262     newInCall(task);
263
264     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
265     return task;
266 }
267
268 void
269 boundTaskExiting (Task *task)
270 {
271     task->stopped = rtsTrue;
272
273 #if defined(THREADED_RTS)
274     ASSERT(osThreadId() == task->id);
275 #endif
276     ASSERT(myTask() == task);
277
278     endInCall(task);
279
280     debugTrace(DEBUG_sched, "task exiting");
281 }
282
283
284 #ifdef THREADED_RTS
285 #define TASK_ID(t) (t)->id
286 #else
287 #define TASK_ID(t) (t)
288 #endif
289
290 void
291 discardTasksExcept (Task *keep)
292 {
293     Task *task, *next;
294
295     // Wipe the task list, except the current Task.
296     ACQUIRE_LOCK(&all_tasks_mutex);
297     for (task = all_tasks; task != NULL; task=next) {
298         next = task->all_link;
299         if (task != keep) {
300             debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
301             freeTask(task);
302         }
303     }
304     all_tasks = keep;
305     keep->all_link = NULL;
306     RELEASE_LOCK(&all_tasks_mutex);
307 }
308
309 void
310 taskTimeStamp (Task *task USED_IF_THREADS)
311 {
312 #if defined(THREADED_RTS)
313     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
314
315     currentUserTime = getThreadCPUTime();
316     currentElapsedTime = getProcessElapsedTime();
317
318     // XXX this is wrong; we want elapsed GC time since the
319     // Task started.
320     elapsedGCTime = stat_getElapsedGCTime();
321     
322     task->mut_time = 
323         currentUserTime - task->muttimestart - task->gc_time;
324     task->mut_etime = 
325         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
326
327     if (task->mut_time  < 0) { task->mut_time  = 0; }
328     if (task->mut_etime < 0) { task->mut_etime = 0; }
329 #endif
330 }
331
332 #if defined(THREADED_RTS)
333
334 void
335 workerTaskStop (Task *task)
336 {
337     OSThreadId id;
338     id = osThreadId();
339     ASSERT(task->id == id);
340     ASSERT(myTask() == task);
341
342     task->cap = NULL;
343     taskTimeStamp(task);
344     task->stopped = rtsTrue;
345 }
346
347 #endif
348
349 #if defined(THREADED_RTS)
350
351 static void OSThreadProcAttr
352 workerStart(Task *task)
353 {
354     Capability *cap;
355
356     // See startWorkerTask().
357     ACQUIRE_LOCK(&task->lock);
358     cap = task->cap;
359     RELEASE_LOCK(&task->lock);
360
361     if (RtsFlags.ParFlags.setAffinity) {
362         setThreadAffinity(cap->no, n_capabilities);
363     }
364
365     // set the thread-local pointer to the Task:
366     setMyTask(task);
367
368     newInCall(task);
369
370     scheduleWorker(cap,task);
371 }
372
373 void
374 startWorkerTask (Capability *cap)
375 {
376   int r;
377   OSThreadId tid;
378   Task *task;
379
380   // A worker always gets a fresh Task structure.
381   task = newTask(rtsTrue);
382
383   // The lock here is to synchronise with taskStart(), to make sure
384   // that we have finished setting up the Task structure before the
385   // worker thread reads it.
386   ACQUIRE_LOCK(&task->lock);
387
388   task->cap = cap;
389
390   // Give the capability directly to the worker; we can't let anyone
391   // else get in, because the new worker Task has nowhere to go to
392   // sleep so that it could be woken up again.
393   ASSERT_LOCK_HELD(&cap->lock);
394   cap->running_task = task;
395
396   r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
397   if (r != 0) {
398     sysErrorBelch("failed to create OS thread");
399     stg_exit(EXIT_FAILURE);
400   }
401
402   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
403
404   task->id = tid;
405
406   // ok, finished with the Task struct.
407   RELEASE_LOCK(&task->lock);
408 }
409
410 #endif /* THREADED_RTS */
411
412 #ifdef DEBUG
413
414 static void *taskId(Task *task)
415 {
416 #ifdef THREADED_RTS
417     return (void *)task->id;
418 #else
419     return (void *)task;
420 #endif
421 }
422
423 void printAllTasks(void);
424
425 void
426 printAllTasks(void)
427 {
428     Task *task;
429     for (task = all_tasks; task != NULL; task = task->all_link) {
430         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
431         if (!task->stopped) {
432             if (task->cap) {
433                 debugBelch("on capability %d, ", task->cap->no);
434             }
435             if (task->incall->tso) {
436               debugBelch("bound to thread %lu",
437                          (unsigned long)task->incall->tso->id);
438             } else {
439                 debugBelch("worker");
440             }
441         }
442         debugBelch("\n");
443     }
444 }                      
445
446 #endif
447