[project @ 2005-10-21 14:02:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20
21 #if HAVE_SIGNAL_H
22 #include <signal.h>
23 #endif
24
25 // Task lists and global counters.
26 // Locks required: sched_mutex.
27 Task *all_tasks = NULL;
28 static Task *task_free_list = NULL; // singly-linked
29 static nat taskCount;
30 #define DEFAULT_MAX_WORKERS 64
31 static nat maxWorkers; // we won't create more workers than this
32 static nat tasksRunning;
33 static nat workerCount;
34
35 /* -----------------------------------------------------------------------------
36  * Remembering the current thread's Task
37  * -------------------------------------------------------------------------- */
38
39 // A thread-local-storage key that we can use to get access to the
40 // current thread's Task structure.
41 #if defined(THREADED_RTS)
42 ThreadLocalKey currentTaskKey;
43 #else
44 Task *my_task;
45 #endif
46
47 /* -----------------------------------------------------------------------------
48  * Rest of the Task API
49  * -------------------------------------------------------------------------- */
50
51 void
52 initTaskManager (void)
53 {
54     static int initialized = 0;
55
56     if (!initialized) {
57         taskCount = 0;
58         workerCount = 0;
59         tasksRunning = 0;
60         maxWorkers = DEFAULT_MAX_WORKERS;
61         initialized = 1;
62 #if defined(THREADED_RTS)
63         newThreadLocalKey(&currentTaskKey);
64 #endif
65     }
66 }
67
68
69 void
70 stopTaskManager (void)
71 {
72     IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
73 }
74
75
76 static Task*
77 newTask (void)
78 {
79 #if defined(THREADED_RTS)
80     long currentElapsedTime, currentUserTime, elapsedGCTime;
81 #endif
82     Task *task;
83
84     task = stgMallocBytes(sizeof(Task), "newTask");
85     
86     task->cap  = NULL;
87     task->stopped = rtsFalse;
88     task->suspended_tso = NULL;
89     task->tso  = NULL;
90     task->stat = NoStatus;
91     task->ret  = NULL;
92     
93 #if defined(THREADED_RTS)
94     initCondition(&task->cond);
95     initMutex(&task->lock);
96     task->wakeup = rtsFalse;
97 #endif
98
99 #if defined(THREADED_RTS)
100     stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
101     task->mut_time = 0.0;
102     task->mut_etime = 0.0;
103     task->gc_time = 0.0;
104     task->gc_etime = 0.0;
105     task->muttimestart = currentUserTime;
106     task->elapsedtimestart = currentElapsedTime;
107 #endif
108
109     task->prev = NULL;
110     task->next = NULL;
111     task->return_link = NULL;
112
113     task->all_link = all_tasks;
114     all_tasks = task;
115
116     taskCount++;
117     workerCount++;
118
119     return task;
120 }
121
122 Task *
123 newBoundTask (void)
124 {
125     Task *task;
126
127     ASSERT_LOCK_HELD(&sched_mutex);
128     if (task_free_list == NULL) {
129         task = newTask();
130     } else {
131         task = task_free_list;
132         task_free_list = task->next;
133         task->next = NULL;
134         task->prev = NULL;
135         task->stopped = rtsFalse;
136     }
137 #if defined(THREADED_RTS)
138     task->id = osThreadId();
139 #endif
140     ASSERT(task->cap == NULL);
141
142     tasksRunning++;
143
144     taskEnter(task);
145
146     IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
147     return task;
148 }
149
150 void
151 boundTaskExiting (Task *task)
152 {
153     task->stopped = rtsTrue;
154     task->cap = NULL;
155
156 #if defined(THREADED_RTS)
157     ASSERT(osThreadId() == task->id);
158 #endif
159     ASSERT(myTask() == task);
160     setMyTask(task->prev_stack);
161
162     tasksRunning--;
163
164     // sadly, we need a lock around the free task list. Todo: eliminate.
165     ACQUIRE_LOCK(&sched_mutex);
166     task->next = task_free_list;
167     task_free_list = task;
168     RELEASE_LOCK(&sched_mutex);
169
170     IF_DEBUG(scheduler,sched_belch("task exiting"));
171 }
172
173 void
174 discardTask (Task *task)
175 {
176     ASSERT_LOCK_HELD(&sched_mutex);
177 #if defined(THREADED_RTS)
178     closeCondition(&task->cond);
179 #endif
180     task->stopped = rtsTrue;
181     task->cap = NULL;
182     task->next = task_free_list;
183     task_free_list = task;
184 }
185
186 void
187 taskStop (Task *task)
188 {
189 #if defined(THREADED_RTS)
190     OSThreadId id;
191     long currentElapsedTime, currentUserTime, elapsedGCTime;
192
193     id = osThreadId();
194     ASSERT(task->id == id);
195     ASSERT(myTask() == task);
196
197     stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
198     
199     task->mut_time = 
200         currentUserTime - task->muttimestart - task->gc_time;
201     task->mut_etime = 
202         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
203
204     if (task->mut_time < 0.0)  { task->mut_time = 0.0;  }
205     if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
206 #endif
207
208     task->stopped = rtsTrue;
209     tasksRunning--;
210 }
211
212 void
213 resetTaskManagerAfterFork (void)
214 {
215 #warning TODO!
216     taskCount = 0;
217 }
218
219 #if defined(THREADED_RTS)
220
221 void
222 startWorkerTask (Capability *cap, 
223                  void OSThreadProcAttr (*taskStart)(Task *task))
224 {
225   int r;
226   OSThreadId tid;
227   Task *task;
228
229   if (workerCount >= maxWorkers) {
230       barf("too many workers; runaway worker creation?");
231   }
232   workerCount++;
233
234   // A worker always gets a fresh Task structure.
235   task = newTask();
236
237   tasksRunning++;
238
239   // The lock here is to synchronise with taskStart(), to make sure
240   // that we have finished setting up the Task structure before the
241   // worker thread reads it.
242   ACQUIRE_LOCK(&task->lock);
243
244   task->cap = cap;
245
246   // Give the capability directly to the worker; we can't let anyone
247   // else get in, because the new worker Task has nowhere to go to
248   // sleep so that it could be woken up again.
249   ASSERT_LOCK_HELD(&cap->lock);
250   cap->running_task = task;
251
252   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
253   if (r != 0) {
254     barf("startTask: Can't create new task");
255   }
256
257   IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
258
259   task->id = tid;
260
261   // ok, finished with the Task struct.
262   RELEASE_LOCK(&task->lock);
263 }
264
265 #endif /* THREADED_RTS */
266
267 #ifdef DEBUG
268
269 static void *taskId(Task *task)
270 {
271 #ifdef THREADED_RTS
272     return (void *)task->id;
273 #else
274     return (void *)task;
275 #endif
276 }
277
278 void printAllTasks(void);
279
280 void
281 printAllTasks(void)
282 {
283     Task *task;
284     for (task = all_tasks; task != NULL; task = task->all_link) {
285         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
286         if (!task->stopped) {
287             if (task->cap) {
288                 debugBelch("on capability %d, ", task->cap->no);
289             }
290             if (task->tso) {
291                 debugBelch("bound to thread %d", task->tso->id);
292             } else {
293                 debugBelch("worker");
294             }
295         }
296         debugBelch("\n");
297     }
298 }                      
299
300 #endif
301