ef20c09a6e97eb07e3cfaa7b051762030ae12223
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
30 static nat taskCount;
31 #define DEFAULT_MAX_WORKERS 64
32 static nat maxWorkers; // we won't create more workers than this
33 static nat tasksRunning;
34 static nat workerCount;
35
36 /* -----------------------------------------------------------------------------
37  * Remembering the current thread's Task
38  * -------------------------------------------------------------------------- */
39
40 // A thread-local-storage key that we can use to get access to the
41 // current thread's Task structure.
42 #if defined(THREADED_RTS)
43 ThreadLocalKey currentTaskKey;
44 #else
45 Task *my_task;
46 #endif
47
48 /* -----------------------------------------------------------------------------
49  * Rest of the Task API
50  * -------------------------------------------------------------------------- */
51
52 void
53 initTaskManager (void)
54 {
55     static int initialized = 0;
56
57     if (!initialized) {
58         taskCount = 0;
59         workerCount = 0;
60         tasksRunning = 0;
61 #if defined(THREADED_RTS)
62         maxWorkers = DEFAULT_MAX_WORKERS * RtsFlags.ParFlags.nNodes;
63 #else
64         maxWorkers = DEFAULT_MAX_WORKERS;
65 #endif
66         initialized = 1;
67 #if defined(THREADED_RTS)
68         newThreadLocalKey(&currentTaskKey);
69 #endif
70     }
71 }
72
73
74 void
75 stopTaskManager (void)
76 {
77     Task *task, *next;
78
79     debugTrace(DEBUG_sched, 
80                "stopping task manager, %d tasks still running",
81                tasksRunning);
82
83     ACQUIRE_LOCK(&sched_mutex);
84     for (task = task_free_list; task != NULL; next) {
85         next = task->next;
86         stgFree(task);
87     }
88     task_free_list = NULL;
89     RELEASE_LOCK(&sched_mutex);
90 }
91
92
93 static Task*
94 newTask (void)
95 {
96 #if defined(THREADED_RTS)
97     Ticks currentElapsedTime, currentUserTime;
98 #endif
99     Task *task;
100
101     task = stgMallocBytes(sizeof(Task), "newTask");
102     
103     task->cap  = NULL;
104     task->stopped = rtsFalse;
105     task->suspended_tso = NULL;
106     task->tso  = NULL;
107     task->stat = NoStatus;
108     task->ret  = NULL;
109     
110 #if defined(THREADED_RTS)
111     initCondition(&task->cond);
112     initMutex(&task->lock);
113     task->wakeup = rtsFalse;
114 #endif
115
116 #if defined(THREADED_RTS)
117     currentUserTime = getThreadCPUTime();
118     currentElapsedTime = getProcessElapsedTime();
119     task->mut_time = 0;
120     task->mut_etime = 0;
121     task->gc_time = 0;
122     task->gc_etime = 0;
123     task->muttimestart = currentUserTime;
124     task->elapsedtimestart = currentElapsedTime;
125 #endif
126
127     task->prev = NULL;
128     task->next = NULL;
129     task->return_link = NULL;
130
131     task->all_link = all_tasks;
132     all_tasks = task;
133
134     taskCount++;
135     workerCount++;
136
137     return task;
138 }
139
140 Task *
141 newBoundTask (void)
142 {
143     Task *task;
144
145     ASSERT_LOCK_HELD(&sched_mutex);
146     if (task_free_list == NULL) {
147         task = newTask();
148     } else {
149         task = task_free_list;
150         task_free_list = task->next;
151         task->next = NULL;
152         task->prev = NULL;
153         task->stopped = rtsFalse;
154     }
155 #if defined(THREADED_RTS)
156     task->id = osThreadId();
157 #endif
158     ASSERT(task->cap == NULL);
159
160     tasksRunning++;
161
162     taskEnter(task);
163
164     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
165     return task;
166 }
167
168 void
169 boundTaskExiting (Task *task)
170 {
171     task->stopped = rtsTrue;
172     task->cap = NULL;
173
174 #if defined(THREADED_RTS)
175     ASSERT(osThreadId() == task->id);
176 #endif
177     ASSERT(myTask() == task);
178     setMyTask(task->prev_stack);
179
180     tasksRunning--;
181
182     // sadly, we need a lock around the free task list. Todo: eliminate.
183     ACQUIRE_LOCK(&sched_mutex);
184     task->next = task_free_list;
185     task_free_list = task;
186     RELEASE_LOCK(&sched_mutex);
187
188     debugTrace(DEBUG_sched, "task exiting");
189 }
190
191 #ifdef THREADED_RTS
192 #define TASK_ID(t) (t)->id
193 #else
194 #define TASK_ID(t) (t)
195 #endif
196
197 void
198 discardTask (Task *task)
199 {
200     ASSERT_LOCK_HELD(&sched_mutex);
201     if (!task->stopped) {
202         debugTrace(DEBUG_sched, "discarding task %ld", TASK_ID(task));
203         task->cap = NULL;
204         task->tso = NULL;
205         task->stopped = rtsTrue;
206         tasksRunning--;
207         task->next = task_free_list;
208         task_free_list = task;
209     }
210 }
211
212 void
213 taskTimeStamp (Task *task USED_IF_THREADS)
214 {
215 #if defined(THREADED_RTS)
216     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
217
218     currentUserTime = getThreadCPUTime();
219     currentElapsedTime = getProcessElapsedTime();
220
221     // XXX this is wrong; we want elapsed GC time since the
222     // Task started.
223     elapsedGCTime = stat_getElapsedGCTime();
224     
225     task->mut_time = 
226         currentUserTime - task->muttimestart - task->gc_time;
227     task->mut_etime = 
228         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
229
230     if (task->mut_time  < 0) { task->mut_time  = 0; }
231     if (task->mut_etime < 0) { task->mut_etime = 0; }
232 #endif
233 }
234
235 void
236 workerTaskStop (Task *task)
237 {
238 #if defined(THREADED_RTS)
239     OSThreadId id;
240     id = osThreadId();
241     ASSERT(task->id == id);
242     ASSERT(myTask() == task);
243 #endif
244
245     taskTimeStamp(task);
246     task->stopped = rtsTrue;
247     tasksRunning--;
248 }
249
250 void
251 resetTaskManagerAfterFork (void)
252 {
253     // TODO!
254     taskCount = 0;
255 }
256
257 #if defined(THREADED_RTS)
258
259 void
260 startWorkerTask (Capability *cap, 
261                  void OSThreadProcAttr (*taskStart)(Task *task))
262 {
263   int r;
264   OSThreadId tid;
265   Task *task;
266
267   if (workerCount >= maxWorkers) {
268       barf("too many workers; runaway worker creation?");
269   }
270   workerCount++;
271
272   // A worker always gets a fresh Task structure.
273   task = newTask();
274
275   tasksRunning++;
276
277   // The lock here is to synchronise with taskStart(), to make sure
278   // that we have finished setting up the Task structure before the
279   // worker thread reads it.
280   ACQUIRE_LOCK(&task->lock);
281
282   task->cap = cap;
283
284   // Give the capability directly to the worker; we can't let anyone
285   // else get in, because the new worker Task has nowhere to go to
286   // sleep so that it could be woken up again.
287   ASSERT_LOCK_HELD(&cap->lock);
288   cap->running_task = task;
289
290   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
291   if (r != 0) {
292     barf("startTask: Can't create new task");
293   }
294
295   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
296
297   task->id = tid;
298
299   // ok, finished with the Task struct.
300   RELEASE_LOCK(&task->lock);
301 }
302
303 #endif /* THREADED_RTS */
304
305 #ifdef DEBUG
306
307 static void *taskId(Task *task)
308 {
309 #ifdef THREADED_RTS
310     return (void *)task->id;
311 #else
312     return (void *)task;
313 #endif
314 }
315
316 void printAllTasks(void);
317
318 void
319 printAllTasks(void)
320 {
321     Task *task;
322     for (task = all_tasks; task != NULL; task = task->all_link) {
323         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
324         if (!task->stopped) {
325             if (task->cap) {
326                 debugBelch("on capability %d, ", task->cap->no);
327             }
328             if (task->tso) {
329                 debugBelch("bound to thread %d", task->tso->id);
330             } else {
331                 debugBelch("worker");
332             }
333         }
334         debugBelch("\n");
335     }
336 }                      
337
338 #endif
339