98f083c11257ff51f4f6c581b2a45ff1d5611154
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static nat taskCount;
30 static int tasksInitialized = 0;
31
32 static void   freeTask  (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask   (rtsBool);
35
36 /* -----------------------------------------------------------------------------
37  * Remembering the current thread's Task
38  * -------------------------------------------------------------------------- */
39
40 // A thread-local-storage key that we can use to get access to the
41 // current thread's Task structure.
42 #if defined(THREADED_RTS)
43 # if defined(MYTASK_USE_TLV)
44 __thread Task *my_task;
45 # else
46 ThreadLocalKey currentTaskKey;
47 # endif
48 #else
49 Task *my_task;
50 #endif
51
52 /* -----------------------------------------------------------------------------
53  * Rest of the Task API
54  * -------------------------------------------------------------------------- */
55
56 void
57 initTaskManager (void)
58 {
59     if (!tasksInitialized) {
60         taskCount = 0;
61         tasksInitialized = 1;
62 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
63         newThreadLocalKey(&currentTaskKey);
64 #endif
65     }
66 }
67
68 nat
69 freeTaskManager (void)
70 {
71     Task *task, *next;
72     nat tasksRunning = 0;
73
74     ASSERT_LOCK_HELD(&sched_mutex);
75
76     for (task = all_tasks; task != NULL; task = next) {
77         next = task->all_link;
78         if (task->stopped) {
79             freeTask(task);
80         } else {
81             tasksRunning++;
82         }
83     }
84
85     debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
86                tasksRunning);
87
88     all_tasks = NULL;
89 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
90     freeThreadLocalKey(&currentTaskKey);
91 #endif
92
93     tasksInitialized = 0;
94
95     return tasksRunning;
96 }
97
98 static Task *
99 allocTask (void)
100 {
101     Task *task;
102
103     task = myTask();
104     if (task != NULL) {
105         return task;
106     } else {
107         task = newTask(rtsFalse);
108 #if defined(THREADED_RTS)
109         task->id = osThreadId();
110 #endif
111         setMyTask(task);
112         return task;
113     }
114 }
115
116 static void
117 freeTask (Task *task)
118 {
119     InCall *incall, *next;
120
121     // We only free resources if the Task is not in use.  A
122     // Task may still be in use if we have a Haskell thread in
123     // a foreign call while we are attempting to shut down the
124     // RTS (see conc059).
125 #if defined(THREADED_RTS)
126     closeCondition(&task->cond);
127     closeMutex(&task->lock);
128 #endif
129
130     for (incall = task->incall; incall != NULL; incall = next) {
131         next = incall->prev_stack;
132         stgFree(incall);
133     }
134     for (incall = task->spare_incalls; incall != NULL; incall = next) {
135         next = incall->next;
136         stgFree(incall);
137     }
138
139     stgFree(task);
140 }
141
142 static Task*
143 newTask (rtsBool worker)
144 {
145 #if defined(THREADED_RTS)
146     Ticks currentElapsedTime, currentUserTime;
147 #endif
148     Task *task;
149
150 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
151     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
152     
153     task->cap           = NULL;
154     task->worker        = worker;
155     task->stopped       = rtsFalse;
156     task->running_finalizers = rtsFalse;
157     task->stat          = NoStatus;
158     task->ret           = NULL;
159     task->n_spare_incalls = 0;
160     task->spare_incalls = NULL;
161     task->incall        = NULL;
162     
163 #if defined(THREADED_RTS)
164     initCondition(&task->cond);
165     initMutex(&task->lock);
166     task->wakeup = rtsFalse;
167 #endif
168
169 #if defined(THREADED_RTS)
170     currentUserTime = getThreadCPUTime();
171     currentElapsedTime = getProcessElapsedTime();
172     task->mut_time = 0;
173     task->mut_etime = 0;
174     task->gc_time = 0;
175     task->gc_etime = 0;
176     task->muttimestart = currentUserTime;
177     task->elapsedtimestart = currentElapsedTime;
178 #endif
179
180     task->next = NULL;
181
182     ACQUIRE_LOCK(&sched_mutex);
183
184     task->all_link = all_tasks;
185     all_tasks = task;
186
187     taskCount++;
188
189     RELEASE_LOCK(&sched_mutex);
190
191     return task;
192 }
193
194 // avoid the spare_incalls list growing unboundedly
195 #define MAX_SPARE_INCALLS 8
196
197 static void
198 newInCall (Task *task)
199 {
200     InCall *incall;
201     
202     if (task->spare_incalls != NULL) {
203         incall = task->spare_incalls;
204         task->spare_incalls = incall->next;
205         task->n_spare_incalls--;
206     } else {
207         incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
208     }
209
210     incall->tso = NULL;
211     incall->task = task;
212     incall->suspended_tso = NULL;
213     incall->suspended_cap = NULL;
214     incall->next = NULL;
215     incall->prev = NULL;
216     incall->prev_stack = task->incall;
217     task->incall = incall;
218 }
219
220 static void
221 endInCall (Task *task)
222 {
223     InCall *incall;
224
225     incall = task->incall;
226     incall->tso = NULL;
227     task->incall = task->incall->prev_stack;
228
229     if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
230         stgFree(incall);
231     } else {
232         incall->next = task->spare_incalls;
233         task->spare_incalls = incall;
234         task->n_spare_incalls++;
235     }
236 }
237
238
239 Task *
240 newBoundTask (void)
241 {
242     Task *task;
243
244     if (!tasksInitialized) {
245         errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
246         stg_exit(EXIT_FAILURE);
247     }
248
249     task = allocTask();
250
251     task->stopped = rtsFalse;
252
253     newInCall(task);
254
255     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
256     return task;
257 }
258
259 void
260 boundTaskExiting (Task *task)
261 {
262     task->stopped = rtsTrue;
263
264 #if defined(THREADED_RTS)
265     ASSERT(osThreadId() == task->id);
266 #endif
267     ASSERT(myTask() == task);
268
269     endInCall(task);
270
271     debugTrace(DEBUG_sched, "task exiting");
272 }
273
274
275 #ifdef THREADED_RTS
276 #define TASK_ID(t) (t)->id
277 #else
278 #define TASK_ID(t) (t)
279 #endif
280
281 void
282 discardTasksExcept (Task *keep)
283 {
284     Task *task, *next;
285
286     // Wipe the task list, except the current Task.
287     ACQUIRE_LOCK(&sched_mutex);
288     for (task = all_tasks; task != NULL; task=next) {
289         next = task->all_link;
290         if (task != keep) {
291             debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
292             freeTask(task);
293         }
294     }
295     all_tasks = keep;
296     keep->all_link = NULL;
297     RELEASE_LOCK(&sched_mutex);
298 }
299
300 void
301 taskTimeStamp (Task *task USED_IF_THREADS)
302 {
303 #if defined(THREADED_RTS)
304     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
305
306     currentUserTime = getThreadCPUTime();
307     currentElapsedTime = getProcessElapsedTime();
308
309     // XXX this is wrong; we want elapsed GC time since the
310     // Task started.
311     elapsedGCTime = stat_getElapsedGCTime();
312     
313     task->mut_time = 
314         currentUserTime - task->muttimestart - task->gc_time;
315     task->mut_etime = 
316         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
317
318     if (task->mut_time  < 0) { task->mut_time  = 0; }
319     if (task->mut_etime < 0) { task->mut_etime = 0; }
320 #endif
321 }
322
323 #if defined(THREADED_RTS)
324
325 void
326 workerTaskStop (Task *task)
327 {
328     OSThreadId id;
329     id = osThreadId();
330     ASSERT(task->id == id);
331     ASSERT(myTask() == task);
332
333     task->cap = NULL;
334     taskTimeStamp(task);
335     task->stopped = rtsTrue;
336 }
337
338 #endif
339
340 #if defined(THREADED_RTS)
341
342 static void OSThreadProcAttr
343 workerStart(Task *task)
344 {
345     Capability *cap;
346
347     // See startWorkerTask().
348     ACQUIRE_LOCK(&task->lock);
349     cap = task->cap;
350     RELEASE_LOCK(&task->lock);
351
352     if (RtsFlags.ParFlags.setAffinity) {
353         setThreadAffinity(cap->no, n_capabilities);
354     }
355
356     // set the thread-local pointer to the Task:
357     setMyTask(task);
358
359     newInCall(task);
360
361     scheduleWorker(cap,task);
362 }
363
364 void
365 startWorkerTask (Capability *cap)
366 {
367   int r;
368   OSThreadId tid;
369   Task *task;
370
371   // A worker always gets a fresh Task structure.
372   task = newTask(rtsTrue);
373
374   // The lock here is to synchronise with taskStart(), to make sure
375   // that we have finished setting up the Task structure before the
376   // worker thread reads it.
377   ACQUIRE_LOCK(&task->lock);
378
379   task->cap = cap;
380
381   // Give the capability directly to the worker; we can't let anyone
382   // else get in, because the new worker Task has nowhere to go to
383   // sleep so that it could be woken up again.
384   ASSERT_LOCK_HELD(&cap->lock);
385   cap->running_task = task;
386
387   r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
388   if (r != 0) {
389     sysErrorBelch("failed to create OS thread");
390     stg_exit(EXIT_FAILURE);
391   }
392
393   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
394
395   task->id = tid;
396
397   // ok, finished with the Task struct.
398   RELEASE_LOCK(&task->lock);
399 }
400
401 #endif /* THREADED_RTS */
402
403 #ifdef DEBUG
404
405 static void *taskId(Task *task)
406 {
407 #ifdef THREADED_RTS
408     return (void *)task->id;
409 #else
410     return (void *)task;
411 #endif
412 }
413
414 void printAllTasks(void);
415
416 void
417 printAllTasks(void)
418 {
419     Task *task;
420     for (task = all_tasks; task != NULL; task = task->all_link) {
421         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
422         if (!task->stopped) {
423             if (task->cap) {
424                 debugBelch("on capability %d, ", task->cap->no);
425             }
426             if (task->incall->tso) {
427               debugBelch("bound to thread %lu",
428                          (unsigned long)task->incall->tso->id);
429             } else {
430                 debugBelch("worker");
431             }
432         }
433         debugBelch("\n");
434     }
435 }                      
436
437 #endif
438