11307a7703bd60d2ce5edd73dce3baaf4aea7ca1
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
30 static nat taskCount;
31 static nat tasksRunning;
32 static nat workerCount;
33
34 /* -----------------------------------------------------------------------------
35  * Remembering the current thread's Task
36  * -------------------------------------------------------------------------- */
37
38 // A thread-local-storage key that we can use to get access to the
39 // current thread's Task structure.
40 #if defined(THREADED_RTS)
41 ThreadLocalKey currentTaskKey;
42 #else
43 Task *my_task;
44 #endif
45
46 /* -----------------------------------------------------------------------------
47  * Rest of the Task API
48  * -------------------------------------------------------------------------- */
49
50 void
51 initTaskManager (void)
52 {
53     static int initialized = 0;
54
55     if (!initialized) {
56         taskCount = 0;
57         workerCount = 0;
58         tasksRunning = 0;
59         initialized = 1;
60 #if defined(THREADED_RTS)
61         newThreadLocalKey(&currentTaskKey);
62 #endif
63     }
64 }
65
66
67 void
68 stopTaskManager (void)
69 {
70     Task *task, *next;
71
72     debugTrace(DEBUG_sched, 
73                "stopping task manager, %d tasks still running",
74                tasksRunning);
75
76     ACQUIRE_LOCK(&sched_mutex);
77     for (task = task_free_list; task != NULL; task = next) {
78         next = task->next;
79 #if defined(THREADED_RTS)
80         closeCondition(&task->cond);
81         closeMutex(&task->lock);
82 #endif
83         stgFree(task);
84     }
85     task_free_list = NULL;
86     RELEASE_LOCK(&sched_mutex);
87 }
88
89
90 static Task*
91 newTask (void)
92 {
93 #if defined(THREADED_RTS)
94     Ticks currentElapsedTime, currentUserTime;
95 #endif
96     Task *task;
97
98     task = stgMallocBytes(sizeof(Task), "newTask");
99     
100     task->cap  = NULL;
101     task->stopped = rtsFalse;
102     task->suspended_tso = NULL;
103     task->tso  = NULL;
104     task->stat = NoStatus;
105     task->ret  = NULL;
106     
107 #if defined(THREADED_RTS)
108     initCondition(&task->cond);
109     initMutex(&task->lock);
110     task->wakeup = rtsFalse;
111 #endif
112
113 #if defined(THREADED_RTS)
114     currentUserTime = getThreadCPUTime();
115     currentElapsedTime = getProcessElapsedTime();
116     task->mut_time = 0;
117     task->mut_etime = 0;
118     task->gc_time = 0;
119     task->gc_etime = 0;
120     task->muttimestart = currentUserTime;
121     task->elapsedtimestart = currentElapsedTime;
122 #endif
123
124     task->prev = NULL;
125     task->next = NULL;
126     task->return_link = NULL;
127
128     task->all_link = all_tasks;
129     all_tasks = task;
130
131     taskCount++;
132     workerCount++;
133
134     return task;
135 }
136
137 Task *
138 newBoundTask (void)
139 {
140     Task *task;
141
142     ASSERT_LOCK_HELD(&sched_mutex);
143     if (task_free_list == NULL) {
144         task = newTask();
145     } else {
146         task = task_free_list;
147         task_free_list = task->next;
148         task->next = NULL;
149         task->prev = NULL;
150         task->stopped = rtsFalse;
151     }
152 #if defined(THREADED_RTS)
153     task->id = osThreadId();
154 #endif
155     ASSERT(task->cap == NULL);
156
157     tasksRunning++;
158
159     taskEnter(task);
160
161     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
162     return task;
163 }
164
165 void
166 boundTaskExiting (Task *task)
167 {
168     task->stopped = rtsTrue;
169     task->cap = NULL;
170
171 #if defined(THREADED_RTS)
172     ASSERT(osThreadId() == task->id);
173 #endif
174     ASSERT(myTask() == task);
175     setMyTask(task->prev_stack);
176
177     tasksRunning--;
178
179     // sadly, we need a lock around the free task list. Todo: eliminate.
180     ACQUIRE_LOCK(&sched_mutex);
181     task->next = task_free_list;
182     task_free_list = task;
183     RELEASE_LOCK(&sched_mutex);
184
185     debugTrace(DEBUG_sched, "task exiting");
186 }
187
188 #ifdef THREADED_RTS
189 #define TASK_ID(t) (t)->id
190 #else
191 #define TASK_ID(t) (t)
192 #endif
193
194 void
195 discardTask (Task *task)
196 {
197     ASSERT_LOCK_HELD(&sched_mutex);
198     if (!task->stopped) {
199         debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
200         task->cap = NULL;
201         task->tso = NULL;
202         task->stopped = rtsTrue;
203         tasksRunning--;
204         task->next = task_free_list;
205         task_free_list = task;
206     }
207 }
208
209 void
210 taskTimeStamp (Task *task USED_IF_THREADS)
211 {
212 #if defined(THREADED_RTS)
213     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
214
215     currentUserTime = getThreadCPUTime();
216     currentElapsedTime = getProcessElapsedTime();
217
218     // XXX this is wrong; we want elapsed GC time since the
219     // Task started.
220     elapsedGCTime = stat_getElapsedGCTime();
221     
222     task->mut_time = 
223         currentUserTime - task->muttimestart - task->gc_time;
224     task->mut_etime = 
225         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
226
227     if (task->mut_time  < 0) { task->mut_time  = 0; }
228     if (task->mut_etime < 0) { task->mut_etime = 0; }
229 #endif
230 }
231
232 void
233 workerTaskStop (Task *task)
234 {
235 #if defined(THREADED_RTS)
236     OSThreadId id;
237     id = osThreadId();
238     ASSERT(task->id == id);
239     ASSERT(myTask() == task);
240 #endif
241
242     taskTimeStamp(task);
243     task->stopped = rtsTrue;
244     tasksRunning--;
245 }
246
247 void
248 resetTaskManagerAfterFork (void)
249 {
250     // TODO!
251     taskCount = 0;
252 }
253
254 #if defined(THREADED_RTS)
255
256 void
257 startWorkerTask (Capability *cap, 
258                  void OSThreadProcAttr (*taskStart)(Task *task))
259 {
260   int r;
261   OSThreadId tid;
262   Task *task;
263
264   workerCount++;
265
266   // A worker always gets a fresh Task structure.
267   task = newTask();
268
269   tasksRunning++;
270
271   // The lock here is to synchronise with taskStart(), to make sure
272   // that we have finished setting up the Task structure before the
273   // worker thread reads it.
274   ACQUIRE_LOCK(&task->lock);
275
276   task->cap = cap;
277
278   // Give the capability directly to the worker; we can't let anyone
279   // else get in, because the new worker Task has nowhere to go to
280   // sleep so that it could be woken up again.
281   ASSERT_LOCK_HELD(&cap->lock);
282   cap->running_task = task;
283
284   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
285   if (r != 0) {
286     sysErrorBelch("failed to create OS thread");
287     stg_exit(EXIT_FAILURE);
288   }
289
290   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
291
292   task->id = tid;
293
294   // ok, finished with the Task struct.
295   RELEASE_LOCK(&task->lock);
296 }
297
298 #endif /* THREADED_RTS */
299
300 #ifdef DEBUG
301
302 static void *taskId(Task *task)
303 {
304 #ifdef THREADED_RTS
305     return (void *)task->id;
306 #else
307     return (void *)task;
308 #endif
309 }
310
311 void printAllTasks(void);
312
313 void
314 printAllTasks(void)
315 {
316     Task *task;
317     for (task = all_tasks; task != NULL; task = task->all_link) {
318         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
319         if (!task->stopped) {
320             if (task->cap) {
321                 debugBelch("on capability %d, ", task->cap->no);
322             }
323             if (task->tso) {
324               debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
325             } else {
326                 debugBelch("worker");
327             }
328         }
329         debugBelch("\n");
330     }
331 }                      
332
333 #endif
334