1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 2001-2005
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
9 * -------------------------------------------------------------------------*/
13 #include "OSThreads.h"
15 #include "Capability.h"
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
31 #define DEFAULT_MAX_WORKERS 64
32 static nat maxWorkers; // we won't create more workers than this
33 static nat tasksRunning;
34 static nat workerCount;
36 /* -----------------------------------------------------------------------------
37 * Remembering the current thread's Task
38 * -------------------------------------------------------------------------- */
40 // A thread-local-storage key that we can use to get access to the
41 // current thread's Task structure.
42 #if defined(THREADED_RTS)
43 ThreadLocalKey currentTaskKey;
48 /* -----------------------------------------------------------------------------
49 * Rest of the Task API
50 * -------------------------------------------------------------------------- */
53 initTaskManager (void)
55 static int initialized = 0;
61 #if defined(THREADED_RTS)
62 maxWorkers = DEFAULT_MAX_WORKERS * RtsFlags.ParFlags.nNodes;
64 maxWorkers = DEFAULT_MAX_WORKERS;
67 #if defined(THREADED_RTS)
68 newThreadLocalKey(¤tTaskKey);
75 stopTaskManager (void)
79 debugTrace(DEBUG_sched,
80 "stopping task manager, %d tasks still running",
83 ACQUIRE_LOCK(&sched_mutex);
84 for (task = task_free_list; task != NULL; next) {
88 task_free_list = NULL;
89 RELEASE_LOCK(&sched_mutex);
96 #if defined(THREADED_RTS)
97 Ticks currentElapsedTime, currentUserTime;
101 task = stgMallocBytes(sizeof(Task), "newTask");
104 task->stopped = rtsFalse;
105 task->suspended_tso = NULL;
107 task->stat = NoStatus;
110 #if defined(THREADED_RTS)
111 initCondition(&task->cond);
112 initMutex(&task->lock);
113 task->wakeup = rtsFalse;
116 #if defined(THREADED_RTS)
117 currentUserTime = getThreadCPUTime();
118 currentElapsedTime = getProcessElapsedTime();
123 task->muttimestart = currentUserTime;
124 task->elapsedtimestart = currentElapsedTime;
129 task->return_link = NULL;
131 task->all_link = all_tasks;
145 ASSERT_LOCK_HELD(&sched_mutex);
146 if (task_free_list == NULL) {
149 task = task_free_list;
150 task_free_list = task->next;
153 task->stopped = rtsFalse;
155 #if defined(THREADED_RTS)
156 task->id = osThreadId();
158 ASSERT(task->cap == NULL);
164 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
169 boundTaskExiting (Task *task)
171 task->stopped = rtsTrue;
174 #if defined(THREADED_RTS)
175 ASSERT(osThreadId() == task->id);
177 ASSERT(myTask() == task);
178 setMyTask(task->prev_stack);
182 // sadly, we need a lock around the free task list. Todo: eliminate.
183 ACQUIRE_LOCK(&sched_mutex);
184 task->next = task_free_list;
185 task_free_list = task;
186 RELEASE_LOCK(&sched_mutex);
188 debugTrace(DEBUG_sched, "task exiting");
192 #define TASK_ID(t) (t)->id
194 #define TASK_ID(t) (t)
198 discardTask (Task *task)
200 ASSERT_LOCK_HELD(&sched_mutex);
201 if (!task->stopped) {
202 debugTrace(DEBUG_sched, "discarding task %ld", TASK_ID(task));
205 task->stopped = rtsTrue;
207 task->next = task_free_list;
208 task_free_list = task;
213 taskTimeStamp (Task *task USED_IF_THREADS)
215 #if defined(THREADED_RTS)
216 Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
218 currentUserTime = getThreadCPUTime();
219 currentElapsedTime = getProcessElapsedTime();
221 // XXX this is wrong; we want elapsed GC time since the
223 elapsedGCTime = stat_getElapsedGCTime();
226 currentUserTime - task->muttimestart - task->gc_time;
228 currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
230 if (task->mut_time < 0) { task->mut_time = 0; }
231 if (task->mut_etime < 0) { task->mut_etime = 0; }
236 workerTaskStop (Task *task)
238 #if defined(THREADED_RTS)
241 ASSERT(task->id == id);
242 ASSERT(myTask() == task);
246 task->stopped = rtsTrue;
251 resetTaskManagerAfterFork (void)
257 #if defined(THREADED_RTS)
260 startWorkerTask (Capability *cap,
261 void OSThreadProcAttr (*taskStart)(Task *task))
267 if (workerCount >= maxWorkers) {
268 barf("too many workers; runaway worker creation?");
272 // A worker always gets a fresh Task structure.
277 // The lock here is to synchronise with taskStart(), to make sure
278 // that we have finished setting up the Task structure before the
279 // worker thread reads it.
280 ACQUIRE_LOCK(&task->lock);
284 // Give the capability directly to the worker; we can't let anyone
285 // else get in, because the new worker Task has nowhere to go to
286 // sleep so that it could be woken up again.
287 ASSERT_LOCK_HELD(&cap->lock);
288 cap->running_task = task;
290 r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
292 barf("startTask: Can't create new task");
295 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
299 // ok, finished with the Task struct.
300 RELEASE_LOCK(&task->lock);
303 #endif /* THREADED_RTS */
307 static void *taskId(Task *task)
310 return (void *)task->id;
316 void printAllTasks(void);
322 for (task = all_tasks; task != NULL; task = task->all_link) {
323 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
324 if (!task->stopped) {
326 debugBelch("on capability %d, ", task->cap->no);
329 debugBelch("bound to thread %d", task->tso->id);
331 debugBelch("worker");