9a8ebd69630b3864cb46bd27a7191e3328678509
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
30 static nat taskCount;
31 static nat tasksRunning;
32 static nat workerCount;
33 static int tasksInitialized = 0;
34
35 /* -----------------------------------------------------------------------------
36  * Remembering the current thread's Task
37  * -------------------------------------------------------------------------- */
38
39 // A thread-local-storage key that we can use to get access to the
40 // current thread's Task structure.
41 #if defined(THREADED_RTS)
42 ThreadLocalKey currentTaskKey;
43 #else
44 Task *my_task;
45 #endif
46
47 /* -----------------------------------------------------------------------------
48  * Rest of the Task API
49  * -------------------------------------------------------------------------- */
50
51 void
52 initTaskManager (void)
53 {
54     if (!tasksInitialized) {
55         taskCount = 0;
56         workerCount = 0;
57         tasksRunning = 0;
58         tasksInitialized = 1;
59 #if defined(THREADED_RTS)
60         newThreadLocalKey(&currentTaskKey);
61 #endif
62     }
63 }
64
65 nat
66 freeTaskManager (void)
67 {
68     Task *task, *next;
69
70     ASSERT_LOCK_HELD(&sched_mutex);
71
72     debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
73                tasksRunning);
74
75     for (task = all_tasks; task != NULL; task = next) {
76         next = task->all_link;
77         if (task->stopped) {
78             // We only free resources if the Task is not in use.  A
79             // Task may still be in use if we have a Haskell thread in
80             // a foreign call while we are attempting to shut down the
81             // RTS (see conc059).
82 #if defined(THREADED_RTS)
83             closeCondition(&task->cond);
84             closeMutex(&task->lock);
85 #endif
86             stgFree(task);
87         }
88     }
89     all_tasks = NULL;
90     task_free_list = NULL;
91 #if defined(THREADED_RTS)
92     freeThreadLocalKey(&currentTaskKey);
93 #endif
94
95     tasksInitialized = 0;
96
97     return tasksRunning;
98 }
99
100
101 static Task*
102 newTask (void)
103 {
104 #if defined(THREADED_RTS)
105     Ticks currentElapsedTime, currentUserTime;
106 #endif
107     Task *task;
108
109 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
110     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
111     
112     task->cap  = NULL;
113     task->stopped = rtsFalse;
114     task->suspended_tso = NULL;
115     task->tso  = NULL;
116     task->stat = NoStatus;
117     task->ret  = NULL;
118     
119 #if defined(THREADED_RTS)
120     initCondition(&task->cond);
121     initMutex(&task->lock);
122     task->wakeup = rtsFalse;
123 #endif
124
125 #if defined(THREADED_RTS)
126     currentUserTime = getThreadCPUTime();
127     currentElapsedTime = getProcessElapsedTime();
128     task->mut_time = 0;
129     task->mut_etime = 0;
130     task->gc_time = 0;
131     task->gc_etime = 0;
132     task->muttimestart = currentUserTime;
133     task->elapsedtimestart = currentElapsedTime;
134 #endif
135
136     task->prev = NULL;
137     task->next = NULL;
138     task->return_link = NULL;
139
140     task->all_link = all_tasks;
141     all_tasks = task;
142
143     taskCount++;
144
145     return task;
146 }
147
148 Task *
149 newBoundTask (void)
150 {
151     Task *task;
152
153     if (!tasksInitialized) {
154         errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
155         stg_exit(EXIT_FAILURE);
156     }
157
158     // ToDo: get rid of this lock in the common case.  We could store
159     // a free Task in thread-local storage, for example.  That would
160     // leave just one lock on the path into the RTS: cap->lock when
161     // acquiring the Capability.
162     ACQUIRE_LOCK(&sched_mutex);
163
164     if (task_free_list == NULL) {
165         task = newTask();
166     } else {
167         task = task_free_list;
168         task_free_list = task->next;
169         task->next = NULL;
170         task->prev = NULL;
171         task->stopped = rtsFalse;
172     }
173 #if defined(THREADED_RTS)
174     task->id = osThreadId();
175 #endif
176     ASSERT(task->cap == NULL);
177
178     tasksRunning++;
179
180     taskEnter(task);
181
182     RELEASE_LOCK(&sched_mutex);
183
184     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
185     return task;
186 }
187
188 void
189 boundTaskExiting (Task *task)
190 {
191     task->tso = NULL;
192     task->stopped = rtsTrue;
193     task->cap = NULL;
194
195 #if defined(THREADED_RTS)
196     ASSERT(osThreadId() == task->id);
197 #endif
198     ASSERT(myTask() == task);
199     setMyTask(task->prev_stack);
200
201     tasksRunning--;
202
203     // sadly, we need a lock around the free task list. Todo: eliminate.
204     ACQUIRE_LOCK(&sched_mutex);
205     task->next = task_free_list;
206     task_free_list = task;
207     RELEASE_LOCK(&sched_mutex);
208
209     debugTrace(DEBUG_sched, "task exiting");
210 }
211
212 #ifdef THREADED_RTS
213 #define TASK_ID(t) (t)->id
214 #else
215 #define TASK_ID(t) (t)
216 #endif
217
218 void
219 discardTask (Task *task)
220 {
221     ASSERT_LOCK_HELD(&sched_mutex);
222     if (!task->stopped) {
223         debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
224         task->cap = NULL;
225         if (task->tso == NULL) {
226             workerCount--;
227         } else {
228             task->tso = NULL;
229         }
230         task->stopped = rtsTrue;
231         tasksRunning--;
232         task->next = task_free_list;
233         task_free_list = task;
234     }
235 }
236
237 void
238 taskTimeStamp (Task *task USED_IF_THREADS)
239 {
240 #if defined(THREADED_RTS)
241     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
242
243     currentUserTime = getThreadCPUTime();
244     currentElapsedTime = getProcessElapsedTime();
245
246     // XXX this is wrong; we want elapsed GC time since the
247     // Task started.
248     elapsedGCTime = stat_getElapsedGCTime();
249     
250     task->mut_time = 
251         currentUserTime - task->muttimestart - task->gc_time;
252     task->mut_etime = 
253         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
254
255     if (task->mut_time  < 0) { task->mut_time  = 0; }
256     if (task->mut_etime < 0) { task->mut_etime = 0; }
257 #endif
258 }
259
260 #if defined(THREADED_RTS)
261
262 void
263 workerTaskStop (Task *task)
264 {
265     OSThreadId id;
266     id = osThreadId();
267     ASSERT(task->id == id);
268     ASSERT(myTask() == task);
269
270     task->cap = NULL;
271     taskTimeStamp(task);
272     task->stopped = rtsTrue;
273     tasksRunning--;
274     workerCount--;
275
276     ACQUIRE_LOCK(&sched_mutex);
277     task->next = task_free_list;
278     task_free_list = task;
279     RELEASE_LOCK(&sched_mutex);
280 }
281
282 #endif
283
284 #if defined(THREADED_RTS)
285
286 void
287 startWorkerTask (Capability *cap, 
288                  void OSThreadProcAttr (*taskStart)(Task *task))
289 {
290   int r;
291   OSThreadId tid;
292   Task *task;
293
294   workerCount++;
295
296   // A worker always gets a fresh Task structure.
297   task = newTask();
298
299   tasksRunning++;
300
301   // The lock here is to synchronise with taskStart(), to make sure
302   // that we have finished setting up the Task structure before the
303   // worker thread reads it.
304   ACQUIRE_LOCK(&task->lock);
305
306   task->cap = cap;
307
308   // Give the capability directly to the worker; we can't let anyone
309   // else get in, because the new worker Task has nowhere to go to
310   // sleep so that it could be woken up again.
311   ASSERT_LOCK_HELD(&cap->lock);
312   cap->running_task = task;
313
314   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
315   if (r != 0) {
316     sysErrorBelch("failed to create OS thread");
317     stg_exit(EXIT_FAILURE);
318   }
319
320   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
321
322   task->id = tid;
323
324   // ok, finished with the Task struct.
325   RELEASE_LOCK(&task->lock);
326 }
327
328 #endif /* THREADED_RTS */
329
330 #ifdef DEBUG
331
332 static void *taskId(Task *task)
333 {
334 #ifdef THREADED_RTS
335     return (void *)task->id;
336 #else
337     return (void *)task;
338 #endif
339 }
340
341 void printAllTasks(void);
342
343 void
344 printAllTasks(void)
345 {
346     Task *task;
347     for (task = all_tasks; task != NULL; task = task->all_link) {
348         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
349         if (!task->stopped) {
350             if (task->cap) {
351                 debugBelch("on capability %d, ", task->cap->no);
352             }
353             if (task->tso) {
354               debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
355             } else {
356                 debugBelch("worker");
357             }
358         }
359         debugBelch("\n");
360     }
361 }                      
362
363 #endif
364