Remove the artifical cap on the number of workers
[ghc-hetmet.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  * 
9  * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
30 static nat taskCount;
31 static nat tasksRunning;
32 static nat workerCount;
33
34 /* -----------------------------------------------------------------------------
35  * Remembering the current thread's Task
36  * -------------------------------------------------------------------------- */
37
38 // A thread-local-storage key that we can use to get access to the
39 // current thread's Task structure.
40 #if defined(THREADED_RTS)
41 ThreadLocalKey currentTaskKey;
42 #else
43 Task *my_task;
44 #endif
45
46 /* -----------------------------------------------------------------------------
47  * Rest of the Task API
48  * -------------------------------------------------------------------------- */
49
50 void
51 initTaskManager (void)
52 {
53     static int initialized = 0;
54
55     if (!initialized) {
56         taskCount = 0;
57         workerCount = 0;
58         tasksRunning = 0;
59         initialized = 1;
60 #if defined(THREADED_RTS)
61         newThreadLocalKey(&currentTaskKey);
62 #endif
63     }
64 }
65
66
67 void
68 stopTaskManager (void)
69 {
70     Task *task, *next;
71
72     debugTrace(DEBUG_sched, 
73                "stopping task manager, %d tasks still running",
74                tasksRunning);
75
76     ACQUIRE_LOCK(&sched_mutex);
77     for (task = task_free_list; task != NULL; next) {
78         next = task->next;
79         stgFree(task);
80     }
81     task_free_list = NULL;
82     RELEASE_LOCK(&sched_mutex);
83 }
84
85
86 static Task*
87 newTask (void)
88 {
89 #if defined(THREADED_RTS)
90     Ticks currentElapsedTime, currentUserTime;
91 #endif
92     Task *task;
93
94     task = stgMallocBytes(sizeof(Task), "newTask");
95     
96     task->cap  = NULL;
97     task->stopped = rtsFalse;
98     task->suspended_tso = NULL;
99     task->tso  = NULL;
100     task->stat = NoStatus;
101     task->ret  = NULL;
102     
103 #if defined(THREADED_RTS)
104     initCondition(&task->cond);
105     initMutex(&task->lock);
106     task->wakeup = rtsFalse;
107 #endif
108
109 #if defined(THREADED_RTS)
110     currentUserTime = getThreadCPUTime();
111     currentElapsedTime = getProcessElapsedTime();
112     task->mut_time = 0;
113     task->mut_etime = 0;
114     task->gc_time = 0;
115     task->gc_etime = 0;
116     task->muttimestart = currentUserTime;
117     task->elapsedtimestart = currentElapsedTime;
118 #endif
119
120     task->prev = NULL;
121     task->next = NULL;
122     task->return_link = NULL;
123
124     task->all_link = all_tasks;
125     all_tasks = task;
126
127     taskCount++;
128     workerCount++;
129
130     return task;
131 }
132
133 Task *
134 newBoundTask (void)
135 {
136     Task *task;
137
138     ASSERT_LOCK_HELD(&sched_mutex);
139     if (task_free_list == NULL) {
140         task = newTask();
141     } else {
142         task = task_free_list;
143         task_free_list = task->next;
144         task->next = NULL;
145         task->prev = NULL;
146         task->stopped = rtsFalse;
147     }
148 #if defined(THREADED_RTS)
149     task->id = osThreadId();
150 #endif
151     ASSERT(task->cap == NULL);
152
153     tasksRunning++;
154
155     taskEnter(task);
156
157     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
158     return task;
159 }
160
161 void
162 boundTaskExiting (Task *task)
163 {
164     task->stopped = rtsTrue;
165     task->cap = NULL;
166
167 #if defined(THREADED_RTS)
168     ASSERT(osThreadId() == task->id);
169 #endif
170     ASSERT(myTask() == task);
171     setMyTask(task->prev_stack);
172
173     tasksRunning--;
174
175     // sadly, we need a lock around the free task list. Todo: eliminate.
176     ACQUIRE_LOCK(&sched_mutex);
177     task->next = task_free_list;
178     task_free_list = task;
179     RELEASE_LOCK(&sched_mutex);
180
181     debugTrace(DEBUG_sched, "task exiting");
182 }
183
184 #ifdef THREADED_RTS
185 #define TASK_ID(t) (t)->id
186 #else
187 #define TASK_ID(t) (t)
188 #endif
189
190 void
191 discardTask (Task *task)
192 {
193     ASSERT_LOCK_HELD(&sched_mutex);
194     if (!task->stopped) {
195         debugTrace(DEBUG_sched, "discarding task %ld", TASK_ID(task));
196         task->cap = NULL;
197         task->tso = NULL;
198         task->stopped = rtsTrue;
199         tasksRunning--;
200         task->next = task_free_list;
201         task_free_list = task;
202     }
203 }
204
205 void
206 taskTimeStamp (Task *task USED_IF_THREADS)
207 {
208 #if defined(THREADED_RTS)
209     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
210
211     currentUserTime = getThreadCPUTime();
212     currentElapsedTime = getProcessElapsedTime();
213
214     // XXX this is wrong; we want elapsed GC time since the
215     // Task started.
216     elapsedGCTime = stat_getElapsedGCTime();
217     
218     task->mut_time = 
219         currentUserTime - task->muttimestart - task->gc_time;
220     task->mut_etime = 
221         currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
222
223     if (task->mut_time  < 0) { task->mut_time  = 0; }
224     if (task->mut_etime < 0) { task->mut_etime = 0; }
225 #endif
226 }
227
228 void
229 workerTaskStop (Task *task)
230 {
231 #if defined(THREADED_RTS)
232     OSThreadId id;
233     id = osThreadId();
234     ASSERT(task->id == id);
235     ASSERT(myTask() == task);
236 #endif
237
238     taskTimeStamp(task);
239     task->stopped = rtsTrue;
240     tasksRunning--;
241 }
242
243 void
244 resetTaskManagerAfterFork (void)
245 {
246     // TODO!
247     taskCount = 0;
248 }
249
250 #if defined(THREADED_RTS)
251
252 void
253 startWorkerTask (Capability *cap, 
254                  void OSThreadProcAttr (*taskStart)(Task *task))
255 {
256   int r;
257   OSThreadId tid;
258   Task *task;
259
260   workerCount++;
261
262   // A worker always gets a fresh Task structure.
263   task = newTask();
264
265   tasksRunning++;
266
267   // The lock here is to synchronise with taskStart(), to make sure
268   // that we have finished setting up the Task structure before the
269   // worker thread reads it.
270   ACQUIRE_LOCK(&task->lock);
271
272   task->cap = cap;
273
274   // Give the capability directly to the worker; we can't let anyone
275   // else get in, because the new worker Task has nowhere to go to
276   // sleep so that it could be woken up again.
277   ASSERT_LOCK_HELD(&cap->lock);
278   cap->running_task = task;
279
280   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
281   if (r != 0) {
282     barf("startTask: Can't create new task");
283   }
284
285   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
286
287   task->id = tid;
288
289   // ok, finished with the Task struct.
290   RELEASE_LOCK(&task->lock);
291 }
292
293 #endif /* THREADED_RTS */
294
295 #ifdef DEBUG
296
297 static void *taskId(Task *task)
298 {
299 #ifdef THREADED_RTS
300     return (void *)task->id;
301 #else
302     return (void *)task;
303 #endif
304 }
305
306 void printAllTasks(void);
307
308 void
309 printAllTasks(void)
310 {
311     Task *task;
312     for (task = all_tasks; task != NULL; task = task->all_link) {
313         debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
314         if (!task->stopped) {
315             if (task->cap) {
316                 debugBelch("on capability %d, ", task->cap->no);
317             }
318             if (task->tso) {
319                 debugBelch("bound to thread %d", task->tso->id);
320             } else {
321                 debugBelch("worker");
322             }
323         }
324         debugBelch("\n");
325     }
326 }                      
327
328 #endif
329