a couple more symbols need package names
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
30 static nat taskCount;
31 #define DEFAULT_MAX_WORKERS 64
32 static nat maxWorkers; // we won't create more workers than this
33 static nat tasksRunning;
34 static nat workerCount;
35
36 /* -----------------------------------------------------------------------------
37  * Remembering the current thread's Task
38  * -------------------------------------------------------------------------- */
39
40 // A thread-local-storage key that we can use to get access to the
41 // current thread's Task structure.
42 #if defined(THREADED_RTS)
43 ThreadLocalKey currentTaskKey;
44 #else
45 Task *my_task;
46 #endif
47
48 /* -----------------------------------------------------------------------------
49  * Rest of the Task API
50  * -------------------------------------------------------------------------- */
51
52 void
53 initTaskManager (void)
54 {
55     static int initialized = 0;
56
57     if (!initialized) {
58         taskCount = 0;
59         workerCount = 0;
60         tasksRunning = 0;
61 #if defined(THREADED_RTS)
62         maxWorkers = DEFAULT_MAX_WORKERS * RtsFlags.ParFlags.nNodes;
63 #else
64         maxWorkers = DEFAULT_MAX_WORKERS;
65 #endif
66         initialized = 1;
67 #if defined(THREADED_RTS)
68         newThreadLocalKey(&currentTaskKey);
69 #endif
70     }
71 }
72
73
74 void
75 stopTaskManager (void)
76 {
77     debugTrace(DEBUG_sched, 
78                "stopping task manager, %d tasks still running",
79                tasksRunning);
80 }
81
82
83 static Task*
84 newTask (void)
85 {
86 #if defined(THREADED_RTS)
87     Ticks currentElapsedTime, currentUserTime;
88 #endif
89     Task *task;
90
91     task = stgMallocBytes(sizeof(Task), "newTask");
92     
93     task->cap  = NULL;
94     task->stopped = rtsFalse;
95     task->suspended_tso = NULL;
96     task->tso  = NULL;
97     task->stat = NoStatus;
98     task->ret  = NULL;
99     
100 #if defined(THREADED_RTS)
101     initCondition(&task->cond);
102     initMutex(&task->lock);
103     task->wakeup = rtsFalse;
104 #endif
105
106 #if defined(THREADED_RTS)
107     currentUserTime = getThreadCPUTime();
108     currentElapsedTime = getProcessElapsedTime();
109     task->mut_time = 0;
110     task->mut_etime = 0;
111     task->gc_time = 0;
112     task->gc_etime = 0;
113     task->muttimestart = currentUserTime;
114     task->elapsedtimestart = currentElapsedTime;
115 #endif
116
117     task->prev = NULL;
118     task->next = NULL;
119     task->return_link = NULL;
120
121     task->all_link = all_tasks;
122     all_tasks = task;
123
124     taskCount++;
125     workerCount++;
126
127     return task;
128 }
129
130 Task *
131 newBoundTask (void)
132 {
133     Task *task;
134
135     ASSERT_LOCK_HELD(&sched_mutex);
136     if (task_free_list == NULL) {
137         task = newTask();
138     } else {
139         task = task_free_list;
140         task_free_list = task->next;
141         task->next = NULL;
142         task->prev = NULL;
143         task->stopped = rtsFalse;
144     }
145 #if defined(THREADED_RTS)
146     task->id = osThreadId();
147 #endif
148     ASSERT(task->cap == NULL);
149
150     tasksRunning++;
151
152     taskEnter(task);
153
154     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
155     return task;
156 }
157
158 void
159 boundTaskExiting (Task *task)
160 {
161     task->stopped = rtsTrue;
162     task->cap = NULL;
163
164 #if defined(THREADED_RTS)
165     ASSERT(osThreadId() == task->id);
166 #endif
167     ASSERT(myTask() == task);
168     setMyTask(task->prev_stack);
169
170     tasksRunning--;
171
172     // sadly, we need a lock around the free task list. Todo: eliminate.
173     ACQUIRE_LOCK(&sched_mutex);
174     task->next = task_free_list;
175     task_free_list = task;
176     RELEASE_LOCK(&sched_mutex);
177
178     debugTrace(DEBUG_sched, "task exiting");
179 }
180
181 #ifdef THREADED_RTS
182 #define TASK_ID(t) (t)->id
183 #else
184 #define TASK_ID(t) (t)
185 #endif
186
187 void
188 discardTask (Task *task)
189 {
190     ASSERT_LOCK_HELD(&sched_mutex);
191     if (!task->stopped) {
192         debugTrace(DEBUG_sched, "discarding task %ld", TASK_ID(task));
193         task->cap = NULL;
194         task->tso = NULL;
195         task->stopped = rtsTrue;
196         tasksRunning--;
197         task->next = task_free_list;
198         task_free_list = task;
199     }
200 }
201
202 void
203 taskTimeStamp (Task *task USED_IF_THREADS)
204 {
205 #if defined(THREADED_RTS)
206     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
207
208     currentUserTime = getThreadCPUTime();
209     currentElapsedTime = getProcessElapsedTime();
210
211     // XXX this is wrong; we want elapsed GC time since the
212     // Task started.
213     elapsedGCTime = stat_getElapsedGCTime();
214     
215     task->mut_time = 
216         currentUserTime - task->muttimestart - task->gc_time;
217     task->mut_etime = 
218         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
219
220     if (task->mut_time  < 0) { task->mut_time  = 0; }
221     if (task->mut_etime < 0) { task->mut_etime = 0; }
222 #endif
223 }
224
225 void
226 workerTaskStop (Task *task)
227 {
228 #if defined(THREADED_RTS)
229     OSThreadId id;
230     id = osThreadId();
231     ASSERT(task->id == id);
232     ASSERT(myTask() == task);
233 #endif
234
235     taskTimeStamp(task);
236     task->stopped = rtsTrue;
237     tasksRunning--;
238 }
239
240 void
241 resetTaskManagerAfterFork (void)
242 {
243     // TODO!
244     taskCount = 0;
245 }
246
247 #if defined(THREADED_RTS)
248
249 void
250 startWorkerTask (Capability *cap, 
251                  void OSThreadProcAttr (*taskStart)(Task *task))
252 {
253   int r;
254   OSThreadId tid;
255   Task *task;
256
257   if (workerCount >= maxWorkers) {
258       barf("too many workers; runaway worker creation?");
259   }
260   workerCount++;
261
262   // A worker always gets a fresh Task structure.
263   task = newTask();
264
265   tasksRunning++;
266
267   // The lock here is to synchronise with taskStart(), to make sure
268   // that we have finished setting up the Task structure before the
269   // worker thread reads it.
270   ACQUIRE_LOCK(&task->lock);
271
272   task->cap = cap;
273
274   // Give the capability directly to the worker; we can't let anyone
275   // else get in, because the new worker Task has nowhere to go to
276   // sleep so that it could be woken up again.
277   ASSERT_LOCK_HELD(&cap->lock);
278   cap->running_task = task;
279
280   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
281   if (r != 0) {
282     barf("startTask: Can't create new task");
283   }
284
285   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
286
287   task->id = tid;
288
289   // ok, finished with the Task struct.
290   RELEASE_LOCK(&task->lock);
291 }
292
293 #endif /* THREADED_RTS */
294
295 #ifdef DEBUG
296
297 static void *taskId(Task *task)
298 {
299 #ifdef THREADED_RTS
300     return (void *)task->id;
301 #else
302     return (void *)task;
303 #endif
304 }
305
306 void printAllTasks(void);
307
308 void
309 printAllTasks(void)
310 {
311     Task *task;
312     for (task = all_tasks; task != NULL; task = task->all_link) {
313         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
314         if (!task->stopped) {
315             if (task->cap) {
316                 debugBelch("on capability %d, ", task->cap->no);
317             }
318             if (task->tso) {
319                 debugBelch("bound to thread %d", task->tso->id);
320             } else {
321                 debugBelch("worker");
322             }
323         }
324         debugBelch("\n");
325     }
326 }                      
327
328 #endif
329