1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 2001-2005
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
9 * -------------------------------------------------------------------------*/
11 #include "PosixSource.h"
16 #include "Capability.h"
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
29 static Task *task_free_list = NULL; // singly-linked
31 static nat tasksRunning;
32 static nat workerCount;
33 static int tasksInitialized = 0;
35 /* -----------------------------------------------------------------------------
36 * Remembering the current thread's Task
37 * -------------------------------------------------------------------------- */
39 // A thread-local-storage key that we can use to get access to the
40 // current thread's Task structure.
41 #if defined(THREADED_RTS)
42 ThreadLocalKey currentTaskKey;
47 /* -----------------------------------------------------------------------------
48 * Rest of the Task API
49 * -------------------------------------------------------------------------- */
52 initTaskManager (void)
54 if (!tasksInitialized) {
59 #if defined(THREADED_RTS)
60 newThreadLocalKey(¤tTaskKey);
66 freeTaskManager (void)
70 ASSERT_LOCK_HELD(&sched_mutex);
72 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
75 for (task = all_tasks; task != NULL; task = next) {
76 next = task->all_link;
78 // We only free resources if the Task is not in use. A
79 // Task may still be in use if we have a Haskell thread in
80 // a foreign call while we are attempting to shut down the
82 #if defined(THREADED_RTS)
83 closeCondition(&task->cond);
84 closeMutex(&task->lock);
90 task_free_list = NULL;
91 #if defined(THREADED_RTS)
92 freeThreadLocalKey(¤tTaskKey);
104 #if defined(THREADED_RTS)
105 Ticks currentElapsedTime, currentUserTime;
109 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
110 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
113 task->stopped = rtsFalse;
114 task->suspended_tso = NULL;
116 task->stat = NoStatus;
119 #if defined(THREADED_RTS)
120 initCondition(&task->cond);
121 initMutex(&task->lock);
122 task->wakeup = rtsFalse;
125 #if defined(THREADED_RTS)
126 currentUserTime = getThreadCPUTime();
127 currentElapsedTime = getProcessElapsedTime();
132 task->muttimestart = currentUserTime;
133 task->elapsedtimestart = currentElapsedTime;
138 task->return_link = NULL;
140 task->all_link = all_tasks;
153 if (!tasksInitialized) {
154 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
155 stg_exit(EXIT_FAILURE);
158 // ToDo: get rid of this lock in the common case. We could store
159 // a free Task in thread-local storage, for example. That would
160 // leave just one lock on the path into the RTS: cap->lock when
161 // acquiring the Capability.
162 ACQUIRE_LOCK(&sched_mutex);
164 if (task_free_list == NULL) {
167 task = task_free_list;
168 task_free_list = task->next;
171 task->stopped = rtsFalse;
173 #if defined(THREADED_RTS)
174 task->id = osThreadId();
176 ASSERT(task->cap == NULL);
182 RELEASE_LOCK(&sched_mutex);
184 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
189 boundTaskExiting (Task *task)
192 task->stopped = rtsTrue;
195 #if defined(THREADED_RTS)
196 ASSERT(osThreadId() == task->id);
198 ASSERT(myTask() == task);
199 setMyTask(task->prev_stack);
203 // sadly, we need a lock around the free task list. Todo: eliminate.
204 ACQUIRE_LOCK(&sched_mutex);
205 task->next = task_free_list;
206 task_free_list = task;
207 RELEASE_LOCK(&sched_mutex);
209 debugTrace(DEBUG_sched, "task exiting");
213 #define TASK_ID(t) (t)->id
215 #define TASK_ID(t) (t)
219 discardTask (Task *task)
221 ASSERT_LOCK_HELD(&sched_mutex);
222 if (!task->stopped) {
223 debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
225 if (task->tso == NULL) {
230 task->stopped = rtsTrue;
232 task->next = task_free_list;
233 task_free_list = task;
238 taskTimeStamp (Task *task USED_IF_THREADS)
240 #if defined(THREADED_RTS)
241 Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
243 currentUserTime = getThreadCPUTime();
244 currentElapsedTime = getProcessElapsedTime();
246 // XXX this is wrong; we want elapsed GC time since the
248 elapsedGCTime = stat_getElapsedGCTime();
251 currentUserTime - task->muttimestart - task->gc_time;
253 currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
255 if (task->mut_time < 0) { task->mut_time = 0; }
256 if (task->mut_etime < 0) { task->mut_etime = 0; }
260 #if defined(THREADED_RTS)
263 workerTaskStop (Task *task)
267 ASSERT(task->id == id);
268 ASSERT(myTask() == task);
272 task->stopped = rtsTrue;
276 ACQUIRE_LOCK(&sched_mutex);
277 task->next = task_free_list;
278 task_free_list = task;
279 RELEASE_LOCK(&sched_mutex);
284 #if defined(THREADED_RTS)
287 startWorkerTask (Capability *cap,
288 void OSThreadProcAttr (*taskStart)(Task *task))
296 // A worker always gets a fresh Task structure.
301 // The lock here is to synchronise with taskStart(), to make sure
302 // that we have finished setting up the Task structure before the
303 // worker thread reads it.
304 ACQUIRE_LOCK(&task->lock);
308 // Give the capability directly to the worker; we can't let anyone
309 // else get in, because the new worker Task has nowhere to go to
310 // sleep so that it could be woken up again.
311 ASSERT_LOCK_HELD(&cap->lock);
312 cap->running_task = task;
314 r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
316 sysErrorBelch("failed to create OS thread");
317 stg_exit(EXIT_FAILURE);
320 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
324 // ok, finished with the Task struct.
325 RELEASE_LOCK(&task->lock);
328 #endif /* THREADED_RTS */
332 static void *taskId(Task *task)
335 return (void *)task->id;
341 void printAllTasks(void);
347 for (task = all_tasks; task != NULL; task = task->all_link) {
348 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
349 if (!task->stopped) {
351 debugBelch("on capability %d, ", task->cap->no);
354 debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
356 debugBelch("worker");