af94a8aabed967b0101ceb523f75923d799ff5fa
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Storage.h"
19 #include "Schedule.h"
20 #include "Hash.h"
21 #include "Trace.h"
22
23 #if HAVE_SIGNAL_H
24 #include <signal.h>
25 #endif
26
27 // Task lists and global counters.
28 // Locks required: sched_mutex.
29 Task *all_tasks = NULL;
30 static Task *task_free_list = NULL; // singly-linked
31 static nat taskCount;
32 static nat tasksRunning;
33 static nat workerCount;
34 static int tasksInitialized = 0;
35
36 /* -----------------------------------------------------------------------------
37  * Remembering the current thread's Task
38  * -------------------------------------------------------------------------- */
39
40 // A thread-local-storage key that we can use to get access to the
41 // current thread's Task structure.
42 #if defined(THREADED_RTS)
43 ThreadLocalKey currentTaskKey;
44 #else
45 Task *my_task;
46 #endif
47
48 /* -----------------------------------------------------------------------------
49  * Rest of the Task API
50  * -------------------------------------------------------------------------- */
51
52 void
53 initTaskManager (void)
54 {
55     if (!tasksInitialized) {
56         taskCount = 0;
57         workerCount = 0;
58         tasksRunning = 0;
59         tasksInitialized = 1;
60 #if defined(THREADED_RTS)
61         newThreadLocalKey(&currentTaskKey);
62 #endif
63     }
64 }
65
66 nat
67 freeTaskManager (void)
68 {
69     Task *task, *next;
70
71     ASSERT_LOCK_HELD(&sched_mutex);
72
73     debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
74                tasksRunning);
75
76     for (task = all_tasks; task != NULL; task = next) {
77         next = task->all_link;
78         if (task->stopped) {
79             // We only free resources if the Task is not in use.  A
80             // Task may still be in use if we have a Haskell thread in
81             // a foreign call while we are attempting to shut down the
82             // RTS (see conc059).
83 #if defined(THREADED_RTS)
84             closeCondition(&task->cond);
85             closeMutex(&task->lock);
86 #endif
87             stgFree(task);
88         }
89     }
90     all_tasks = NULL;
91     task_free_list = NULL;
92 #if defined(THREADED_RTS)
93     freeThreadLocalKey(&currentTaskKey);
94 #endif
95
96     tasksInitialized = 0;
97
98     return tasksRunning;
99 }
100
101
102 static Task*
103 newTask (void)
104 {
105 #if defined(THREADED_RTS)
106     Ticks currentElapsedTime, currentUserTime;
107 #endif
108     Task *task;
109
110 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
111     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
112     
113     task->cap  = NULL;
114     task->stopped = rtsFalse;
115     task->suspended_tso = NULL;
116     task->tso  = NULL;
117     task->stat = NoStatus;
118     task->ret  = NULL;
119     
120 #if defined(THREADED_RTS)
121     initCondition(&task->cond);
122     initMutex(&task->lock);
123     task->wakeup = rtsFalse;
124 #endif
125
126 #if defined(THREADED_RTS)
127     currentUserTime = getThreadCPUTime();
128     currentElapsedTime = getProcessElapsedTime();
129     task->mut_time = 0;
130     task->mut_etime = 0;
131     task->gc_time = 0;
132     task->gc_etime = 0;
133     task->muttimestart = currentUserTime;
134     task->elapsedtimestart = currentElapsedTime;
135 #endif
136
137     task->prev = NULL;
138     task->next = NULL;
139     task->return_link = NULL;
140
141     task->all_link = all_tasks;
142     all_tasks = task;
143
144     taskCount++;
145
146     return task;
147 }
148
149 Task *
150 newBoundTask (void)
151 {
152     Task *task;
153
154     if (!tasksInitialized) {
155         errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
156         stg_exit(EXIT_FAILURE);
157     }
158
159     // ToDo: get rid of this lock in the common case.  We could store
160     // a free Task in thread-local storage, for example.  That would
161     // leave just one lock on the path into the RTS: cap->lock when
162     // acquiring the Capability.
163     ACQUIRE_LOCK(&sched_mutex);
164
165     if (task_free_list == NULL) {
166         task = newTask();
167     } else {
168         task = task_free_list;
169         task_free_list = task->next;
170         task->next = NULL;
171         task->prev = NULL;
172         task->stopped = rtsFalse;
173     }
174 #if defined(THREADED_RTS)
175     task->id = osThreadId();
176 #endif
177     ASSERT(task->cap == NULL);
178
179     tasksRunning++;
180
181     taskEnter(task);
182
183     RELEASE_LOCK(&sched_mutex);
184
185     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
186     return task;
187 }
188
189 void
190 boundTaskExiting (Task *task)
191 {
192     task->tso = NULL;
193     task->stopped = rtsTrue;
194     task->cap = NULL;
195
196 #if defined(THREADED_RTS)
197     ASSERT(osThreadId() == task->id);
198 #endif
199     ASSERT(myTask() == task);
200     setMyTask(task->prev_stack);
201
202     tasksRunning--;
203
204     // sadly, we need a lock around the free task list. Todo: eliminate.
205     ACQUIRE_LOCK(&sched_mutex);
206     task->next = task_free_list;
207     task_free_list = task;
208     RELEASE_LOCK(&sched_mutex);
209
210     debugTrace(DEBUG_sched, "task exiting");
211 }
212
213 #ifdef THREADED_RTS
214 #define TASK_ID(t) (t)->id
215 #else
216 #define TASK_ID(t) (t)
217 #endif
218
219 void
220 discardTask (Task *task)
221 {
222     ASSERT_LOCK_HELD(&sched_mutex);
223     if (!task->stopped) {
224         debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
225         task->cap = NULL;
226         if (task->tso == NULL) {
227             workerCount--;
228         } else {
229             task->tso = NULL;
230         }
231         task->stopped = rtsTrue;
232         tasksRunning--;
233         task->next = task_free_list;
234         task_free_list = task;
235     }
236 }
237
238 void
239 taskTimeStamp (Task *task USED_IF_THREADS)
240 {
241 #if defined(THREADED_RTS)
242     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
243
244     currentUserTime = getThreadCPUTime();
245     currentElapsedTime = getProcessElapsedTime();
246
247     // XXX this is wrong; we want elapsed GC time since the
248     // Task started.
249     elapsedGCTime = stat_getElapsedGCTime();
250     
251     task->mut_time = 
252         currentUserTime - task->muttimestart - task->gc_time;
253     task->mut_etime = 
254         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
255
256     if (task->mut_time  < 0) { task->mut_time  = 0; }
257     if (task->mut_etime < 0) { task->mut_etime = 0; }
258 #endif
259 }
260
261 void
262 workerTaskStop (Task *task)
263 {
264 #if defined(THREADED_RTS)
265     OSThreadId id;
266     id = osThreadId();
267     ASSERT(task->id == id);
268     ASSERT(myTask() == task);
269 #endif
270
271     task->cap = NULL;
272     taskTimeStamp(task);
273     task->stopped = rtsTrue;
274     tasksRunning--;
275     workerCount--;
276
277     ACQUIRE_LOCK(&sched_mutex);
278     task->next = task_free_list;
279     task_free_list = task;
280     RELEASE_LOCK(&sched_mutex);
281 }
282
283 void
284 resetTaskManagerAfterFork (void)
285 {
286     // TODO!
287     taskCount = 0;
288 }
289
290 #if defined(THREADED_RTS)
291
292 void
293 startWorkerTask (Capability *cap, 
294                  void OSThreadProcAttr (*taskStart)(Task *task))
295 {
296   int r;
297   OSThreadId tid;
298   Task *task;
299
300   workerCount++;
301
302   // A worker always gets a fresh Task structure.
303   task = newTask();
304
305   tasksRunning++;
306
307   // The lock here is to synchronise with taskStart(), to make sure
308   // that we have finished setting up the Task structure before the
309   // worker thread reads it.
310   ACQUIRE_LOCK(&task->lock);
311
312   task->cap = cap;
313
314   // Give the capability directly to the worker; we can't let anyone
315   // else get in, because the new worker Task has nowhere to go to
316   // sleep so that it could be woken up again.
317   ASSERT_LOCK_HELD(&cap->lock);
318   cap->running_task = task;
319
320   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
321   if (r != 0) {
322     sysErrorBelch("failed to create OS thread");
323     stg_exit(EXIT_FAILURE);
324   }
325
326   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
327
328   task->id = tid;
329
330   // ok, finished with the Task struct.
331   RELEASE_LOCK(&task->lock);
332 }
333
334 #endif /* THREADED_RTS */
335
336 #ifdef DEBUG
337
338 static void *taskId(Task *task)
339 {
340 #ifdef THREADED_RTS
341     return (void *)task->id;
342 #else
343     return (void *)task;
344 #endif
345 }
346
347 void printAllTasks(void);
348
349 void
350 printAllTasks(void)
351 {
352     Task *task;
353     for (task = all_tasks; task != NULL; task = task->all_link) {
354         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
355         if (!task->stopped) {
356             if (task->cap) {
357                 debugBelch("on capability %d, ", task->cap->no);
358             }
359             if (task->tso) {
360               debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
361             } else {
362                 debugBelch("worker");
363             }
364         }
365         debugBelch("\n");
366     }
367 }                      
368
369 #endif
370