1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 2001-2005
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
9 * -------------------------------------------------------------------------*/
11 #include "PosixSource.h"
16 #include "Capability.h"
26 // Task lists and global counters.
27 // Locks required: sched_mutex.
28 Task *all_tasks = NULL;
30 static int tasksInitialized = 0;
32 static void freeTask (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask (rtsBool);
36 /* -----------------------------------------------------------------------------
37 * Remembering the current thread's Task
38 * -------------------------------------------------------------------------- */
40 // A thread-local-storage key that we can use to get access to the
41 // current thread's Task structure.
42 #if defined(THREADED_RTS)
43 # if defined(MYTASK_USE_TLV)
44 __thread Task *my_task;
46 ThreadLocalKey currentTaskKey;
52 /* -----------------------------------------------------------------------------
53 * Rest of the Task API
54 * -------------------------------------------------------------------------- */
57 initTaskManager (void)
59 if (!tasksInitialized) {
62 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
63 newThreadLocalKey(¤tTaskKey);
69 freeTaskManager (void)
74 ASSERT_LOCK_HELD(&sched_mutex);
76 for (task = all_tasks; task != NULL; task = next) {
77 next = task->all_link;
85 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
89 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
90 freeThreadLocalKey(¤tTaskKey);
107 task = newTask(rtsFalse);
108 #if defined(THREADED_RTS)
109 task->id = osThreadId();
117 freeTask (Task *task)
119 InCall *incall, *next;
121 // We only free resources if the Task is not in use. A
122 // Task may still be in use if we have a Haskell thread in
123 // a foreign call while we are attempting to shut down the
124 // RTS (see conc059).
125 #if defined(THREADED_RTS)
126 closeCondition(&task->cond);
127 closeMutex(&task->lock);
130 for (incall = task->incall; incall != NULL; incall = next) {
131 next = incall->prev_stack;
134 for (incall = task->spare_incalls; incall != NULL; incall = next) {
143 newTask (rtsBool worker)
145 #if defined(THREADED_RTS)
146 Ticks currentElapsedTime, currentUserTime;
150 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
151 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
154 task->worker = worker;
155 task->stopped = rtsFalse;
156 task->running_finalizers = rtsFalse;
157 task->n_spare_incalls = 0;
158 task->spare_incalls = NULL;
161 #if defined(THREADED_RTS)
162 initCondition(&task->cond);
163 initMutex(&task->lock);
164 task->wakeup = rtsFalse;
167 #if defined(THREADED_RTS)
168 currentUserTime = getThreadCPUTime();
169 currentElapsedTime = getProcessElapsedTime();
174 task->muttimestart = currentUserTime;
175 task->elapsedtimestart = currentElapsedTime;
180 ACQUIRE_LOCK(&sched_mutex);
182 task->all_link = all_tasks;
187 RELEASE_LOCK(&sched_mutex);
192 // avoid the spare_incalls list growing unboundedly
193 #define MAX_SPARE_INCALLS 8
196 newInCall (Task *task)
200 if (task->spare_incalls != NULL) {
201 incall = task->spare_incalls;
202 task->spare_incalls = incall->next;
203 task->n_spare_incalls--;
205 incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
210 incall->suspended_tso = NULL;
211 incall->suspended_cap = NULL;
212 incall->stat = NoStatus;
216 incall->prev_stack = task->incall;
217 task->incall = incall;
221 endInCall (Task *task)
225 incall = task->incall;
227 task->incall = task->incall->prev_stack;
229 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
232 incall->next = task->spare_incalls;
233 task->spare_incalls = incall;
234 task->n_spare_incalls++;
244 if (!tasksInitialized) {
245 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
246 stg_exit(EXIT_FAILURE);
251 task->stopped = rtsFalse;
255 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
260 boundTaskExiting (Task *task)
262 task->stopped = rtsTrue;
264 #if defined(THREADED_RTS)
265 ASSERT(osThreadId() == task->id);
267 ASSERT(myTask() == task);
271 debugTrace(DEBUG_sched, "task exiting");
276 #define TASK_ID(t) (t)->id
278 #define TASK_ID(t) (t)
282 discardTasksExcept (Task *keep)
286 // Wipe the task list, except the current Task.
287 ACQUIRE_LOCK(&sched_mutex);
288 for (task = all_tasks; task != NULL; task=next) {
289 next = task->all_link;
291 debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
296 keep->all_link = NULL;
297 RELEASE_LOCK(&sched_mutex);
301 taskTimeStamp (Task *task USED_IF_THREADS)
303 #if defined(THREADED_RTS)
304 Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
306 currentUserTime = getThreadCPUTime();
307 currentElapsedTime = getProcessElapsedTime();
309 // XXX this is wrong; we want elapsed GC time since the
311 elapsedGCTime = stat_getElapsedGCTime();
314 currentUserTime - task->muttimestart - task->gc_time;
316 currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
318 if (task->mut_time < 0) { task->mut_time = 0; }
319 if (task->mut_etime < 0) { task->mut_etime = 0; }
323 #if defined(THREADED_RTS)
326 workerTaskStop (Task *task)
330 ASSERT(task->id == id);
331 ASSERT(myTask() == task);
335 task->stopped = rtsTrue;
340 #if defined(THREADED_RTS)
342 static void OSThreadProcAttr
343 workerStart(Task *task)
347 // See startWorkerTask().
348 ACQUIRE_LOCK(&task->lock);
350 RELEASE_LOCK(&task->lock);
352 if (RtsFlags.ParFlags.setAffinity) {
353 setThreadAffinity(cap->no, n_capabilities);
356 // set the thread-local pointer to the Task:
361 scheduleWorker(cap,task);
365 startWorkerTask (Capability *cap)
371 // A worker always gets a fresh Task structure.
372 task = newTask(rtsTrue);
374 // The lock here is to synchronise with taskStart(), to make sure
375 // that we have finished setting up the Task structure before the
376 // worker thread reads it.
377 ACQUIRE_LOCK(&task->lock);
381 // Give the capability directly to the worker; we can't let anyone
382 // else get in, because the new worker Task has nowhere to go to
383 // sleep so that it could be woken up again.
384 ASSERT_LOCK_HELD(&cap->lock);
385 cap->running_task = task;
387 r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
389 sysErrorBelch("failed to create OS thread");
390 stg_exit(EXIT_FAILURE);
393 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
397 // ok, finished with the Task struct.
398 RELEASE_LOCK(&task->lock);
401 #endif /* THREADED_RTS */
405 static void *taskId(Task *task)
408 return (void *)task->id;
414 void printAllTasks(void);
420 for (task = all_tasks; task != NULL; task = task->all_link) {
421 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
422 if (!task->stopped) {
424 debugBelch("on capability %d, ", task->cap->no);
426 if (task->incall->tso) {
427 debugBelch("bound to thread %lu",
428 (unsigned long)task->incall->tso->id);
430 debugBelch("worker");