a5de804418ab2eeefbfc16bcad39e3fbc4a8c540
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29 static nat taskCount;
30 static int tasksInitialized = 0;
31
32 static void   freeTask  (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask   (rtsBool);
35
36 #if defined(THREADED_RTS)
37 static Mutex all_tasks_mutex;
38 #endif
39
40 /* -----------------------------------------------------------------------------
41  * Remembering the current thread's Task
42  * -------------------------------------------------------------------------- */
43
44 // A thread-local-storage key that we can use to get access to the
45 // current thread's Task structure.
46 #if defined(THREADED_RTS)
47 # if defined(MYTASK_USE_TLV)
48 __thread Task *my_task;
49 # else
50 ThreadLocalKey currentTaskKey;
51 # endif
52 #else
53 Task *my_task;
54 #endif
55
56 /* -----------------------------------------------------------------------------
57  * Rest of the Task API
58  * -------------------------------------------------------------------------- */
59
60 void
61 initTaskManager (void)
62 {
63     if (!tasksInitialized) {
64         taskCount = 0;
65         tasksInitialized = 1;
66 #if defined(THREADED_RTS)
67 #if !defined(MYTASK_USE_TLV)
68         newThreadLocalKey(&currentTaskKey);
69 #endif
70         initMutex(&all_tasks_mutex);
71 #endif
72     }
73 }
74
75 nat
76 freeTaskManager (void)
77 {
78     Task *task, *next;
79     nat tasksRunning = 0;
80
81     ACQUIRE_LOCK(&all_tasks_mutex);
82
83     for (task = all_tasks; task != NULL; task = next) {
84         next = task->all_link;
85         if (task->stopped) {
86             freeTask(task);
87         } else {
88             tasksRunning++;
89         }
90     }
91
92     debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
93                tasksRunning);
94
95     all_tasks = NULL;
96
97     RELEASE_LOCK(&all_tasks_mutex);
98
99 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
100     closeMutex(&all_tasks_mutex); 
101     freeThreadLocalKey(&currentTaskKey);
102 #endif
103
104     tasksInitialized = 0;
105
106     return tasksRunning;
107 }
108
109 static Task *
110 allocTask (void)
111 {
112     Task *task;
113
114     task = myTask();
115     if (task != NULL) {
116         return task;
117     } else {
118         task = newTask(rtsFalse);
119 #if defined(THREADED_RTS)
120         task->id = osThreadId();
121 #endif
122         setMyTask(task);
123         return task;
124     }
125 }
126
127 static void
128 freeTask (Task *task)
129 {
130     InCall *incall, *next;
131
132     // We only free resources if the Task is not in use.  A
133     // Task may still be in use if we have a Haskell thread in
134     // a foreign call while we are attempting to shut down the
135     // RTS (see conc059).
136 #if defined(THREADED_RTS)
137     closeCondition(&task->cond);
138     closeMutex(&task->lock);
139 #endif
140
141     for (incall = task->incall; incall != NULL; incall = next) {
142         next = incall->prev_stack;
143         stgFree(incall);
144     }
145     for (incall = task->spare_incalls; incall != NULL; incall = next) {
146         next = incall->next;
147         stgFree(incall);
148     }
149
150     stgFree(task);
151 }
152
153 static Task*
154 newTask (rtsBool worker)
155 {
156 #if defined(THREADED_RTS)
157     Ticks currentElapsedTime, currentUserTime;
158 #endif
159     Task *task;
160
161 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
162     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
163     
164     task->cap           = NULL;
165     task->worker        = worker;
166     task->stopped       = rtsFalse;
167     task->running_finalizers = rtsFalse;
168     task->n_spare_incalls = 0;
169     task->spare_incalls = NULL;
170     task->incall        = NULL;
171     
172 #if defined(THREADED_RTS)
173     initCondition(&task->cond);
174     initMutex(&task->lock);
175     task->wakeup = rtsFalse;
176 #endif
177
178 #if defined(THREADED_RTS)
179     currentUserTime = getThreadCPUTime();
180     currentElapsedTime = getProcessElapsedTime();
181     task->mut_time = 0;
182     task->mut_etime = 0;
183     task->gc_time = 0;
184     task->gc_etime = 0;
185     task->muttimestart = currentUserTime;
186     task->elapsedtimestart = currentElapsedTime;
187 #endif
188
189     task->next = NULL;
190
191     ACQUIRE_LOCK(&all_tasks_mutex);
192
193     task->all_link = all_tasks;
194     all_tasks = task;
195
196     taskCount++;
197
198     RELEASE_LOCK(&all_tasks_mutex);
199
200     return task;
201 }
202
203 // avoid the spare_incalls list growing unboundedly
204 #define MAX_SPARE_INCALLS 8
205
206 static void
207 newInCall (Task *task)
208 {
209     InCall *incall;
210     
211     if (task->spare_incalls != NULL) {
212         incall = task->spare_incalls;
213         task->spare_incalls = incall->next;
214         task->n_spare_incalls--;
215     } else {
216         incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
217     }
218
219     incall->tso = NULL;
220     incall->task = task;
221     incall->suspended_tso = NULL;
222     incall->suspended_cap = NULL;
223     incall->stat          = NoStatus;
224     incall->ret           = NULL;
225     incall->next = NULL;
226     incall->prev = NULL;
227     incall->prev_stack = task->incall;
228     task->incall = incall;
229 }
230
231 static void
232 endInCall (Task *task)
233 {
234     InCall *incall;
235
236     incall = task->incall;
237     incall->tso = NULL;
238     task->incall = task->incall->prev_stack;
239
240     if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
241         stgFree(incall);
242     } else {
243         incall->next = task->spare_incalls;
244         task->spare_incalls = incall;
245         task->n_spare_incalls++;
246     }
247 }
248
249
250 Task *
251 newBoundTask (void)
252 {
253     Task *task;
254
255     if (!tasksInitialized) {
256         errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
257         stg_exit(EXIT_FAILURE);
258     }
259
260     task = allocTask();
261
262     task->stopped = rtsFalse;
263
264     newInCall(task);
265
266     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
267     return task;
268 }
269
270 void
271 boundTaskExiting (Task *task)
272 {
273 #if defined(THREADED_RTS)
274     ASSERT(osThreadId() == task->id);
275 #endif
276     ASSERT(myTask() == task);
277
278     endInCall(task);
279
280     // Set task->stopped, but only if this is the last call (#4850).
281     // Remember that we might have a worker Task that makes a foreign
282     // call and then a callback, so it can transform into a bound
283     // Task for the duration of the callback.
284     if (task->incall == NULL) {
285         task->stopped = rtsTrue;
286     }
287
288     debugTrace(DEBUG_sched, "task exiting");
289 }
290
291
292 #ifdef THREADED_RTS
293 #define TASK_ID(t) (t)->id
294 #else
295 #define TASK_ID(t) (t)
296 #endif
297
298 void
299 discardTasksExcept (Task *keep)
300 {
301     Task *task, *next;
302
303     // Wipe the task list, except the current Task.
304     ACQUIRE_LOCK(&all_tasks_mutex);
305     for (task = all_tasks; task != NULL; task=next) {
306         next = task->all_link;
307         if (task != keep) {
308             debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
309             freeTask(task);
310         }
311     }
312     all_tasks = keep;
313     keep->all_link = NULL;
314     RELEASE_LOCK(&all_tasks_mutex);
315 }
316
317 void
318 taskTimeStamp (Task *task USED_IF_THREADS)
319 {
320 #if defined(THREADED_RTS)
321     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
322
323     currentUserTime = getThreadCPUTime();
324     currentElapsedTime = getProcessElapsedTime();
325
326     // XXX this is wrong; we want elapsed GC time since the
327     // Task started.
328     elapsedGCTime = stat_getElapsedGCTime();
329     
330     task->mut_time = 
331         currentUserTime - task->muttimestart - task->gc_time;
332     task->mut_etime = 
333         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
334
335     if (task->mut_time  < 0) { task->mut_time  = 0; }
336     if (task->mut_etime < 0) { task->mut_etime = 0; }
337 #endif
338 }
339
340 #if defined(THREADED_RTS)
341
342 void
343 workerTaskStop (Task *task)
344 {
345     OSThreadId id;
346     id = osThreadId();
347     ASSERT(task->id == id);
348     ASSERT(myTask() == task);
349
350     task->cap = NULL;
351     taskTimeStamp(task);
352     task->stopped = rtsTrue;
353 }
354
355 #endif
356
357 #ifdef DEBUG
358
359 static void *taskId(Task *task)
360 {
361 #ifdef THREADED_RTS
362     return (void *)task->id;
363 #else
364     return (void *)task;
365 #endif
366 }
367
368 #endif
369
370 #if defined(THREADED_RTS)
371
372 static void OSThreadProcAttr
373 workerStart(Task *task)
374 {
375     Capability *cap;
376
377     // See startWorkerTask().
378     ACQUIRE_LOCK(&task->lock);
379     cap = task->cap;
380     RELEASE_LOCK(&task->lock);
381
382     if (RtsFlags.ParFlags.setAffinity) {
383         setThreadAffinity(cap->no, n_capabilities);
384     }
385
386     // set the thread-local pointer to the Task:
387     setMyTask(task);
388
389     newInCall(task);
390
391     scheduleWorker(cap,task);
392 }
393
394 void
395 startWorkerTask (Capability *cap)
396 {
397   int r;
398   OSThreadId tid;
399   Task *task;
400
401   // A worker always gets a fresh Task structure.
402   task = newTask(rtsTrue);
403
404   // The lock here is to synchronise with taskStart(), to make sure
405   // that we have finished setting up the Task structure before the
406   // worker thread reads it.
407   ACQUIRE_LOCK(&task->lock);
408
409   task->cap = cap;
410
411   // Give the capability directly to the worker; we can't let anyone
412   // else get in, because the new worker Task has nowhere to go to
413   // sleep so that it could be woken up again.
414   ASSERT_LOCK_HELD(&cap->lock);
415   cap->running_task = task;
416
417   r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
418   if (r != 0) {
419     sysErrorBelch("failed to create OS thread");
420     stg_exit(EXIT_FAILURE);
421   }
422
423   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
424
425   task->id = tid;
426
427   // ok, finished with the Task struct.
428   RELEASE_LOCK(&task->lock);
429 }
430
431 void
432 interruptWorkerTask (Task *task)
433 {
434   ASSERT(osThreadId() != task->id);    // seppuku not allowed
435   ASSERT(task->incall->suspended_tso); // use this only for FFI calls
436   interruptOSThread(task->id);
437   debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
438 }
439
440 #endif /* THREADED_RTS */
441
442 #ifdef DEBUG
443
444 void printAllTasks(void);
445
446 void
447 printAllTasks(void)
448 {
449     Task *task;
450     for (task = all_tasks; task != NULL; task = task->all_link) {
451         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
452         if (!task->stopped) {
453             if (task->cap) {
454                 debugBelch("on capability %d, ", task->cap->no);
455             }
456             if (task->incall->tso) {
457               debugBelch("bound to thread %lu",
458                          (unsigned long)task->incall->tso->id);
459             } else {
460                 debugBelch("worker");
461             }
462         }
463         debugBelch("\n");
464     }
465 }                      
466
467 #endif
468