1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 2001-2005
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
9 * -------------------------------------------------------------------------*/
11 #include "PosixSource.h"
16 #include "Capability.h"
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
30 static int tasksInitialized = 0;
32 static void freeTask (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask (rtsBool);
36 #if defined(THREADED_RTS)
37 static Mutex all_tasks_mutex;
40 /* -----------------------------------------------------------------------------
41 * Remembering the current thread's Task
42 * -------------------------------------------------------------------------- */
44 // A thread-local-storage key that we can use to get access to the
45 // current thread's Task structure.
46 #if defined(THREADED_RTS)
47 # if defined(MYTASK_USE_TLV)
48 __thread Task *my_task;
50 ThreadLocalKey currentTaskKey;
56 /* -----------------------------------------------------------------------------
57 * Rest of the Task API
58 * -------------------------------------------------------------------------- */
61 initTaskManager (void)
63 if (!tasksInitialized) {
66 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
67 newThreadLocalKey(¤tTaskKey);
68 initMutex(&all_tasks_mutex);
74 freeTaskManager (void)
79 ACQUIRE_LOCK(&all_tasks_mutex);
81 for (task = all_tasks; task != NULL; task = next) {
82 next = task->all_link;
90 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
95 RELEASE_LOCK(&all_tasks_mutex);
97 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
98 closeMutex(&all_tasks_mutex);
99 freeThreadLocalKey(¤tTaskKey);
102 tasksInitialized = 0;
116 task = newTask(rtsFalse);
117 #if defined(THREADED_RTS)
118 task->id = osThreadId();
126 freeTask (Task *task)
128 InCall *incall, *next;
130 // We only free resources if the Task is not in use. A
131 // Task may still be in use if we have a Haskell thread in
132 // a foreign call while we are attempting to shut down the
133 // RTS (see conc059).
134 #if defined(THREADED_RTS)
135 closeCondition(&task->cond);
136 closeMutex(&task->lock);
139 for (incall = task->incall; incall != NULL; incall = next) {
140 next = incall->prev_stack;
143 for (incall = task->spare_incalls; incall != NULL; incall = next) {
152 newTask (rtsBool worker)
154 #if defined(THREADED_RTS)
155 Ticks currentElapsedTime, currentUserTime;
159 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
160 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
163 task->worker = worker;
164 task->stopped = rtsFalse;
165 task->running_finalizers = rtsFalse;
166 task->n_spare_incalls = 0;
167 task->spare_incalls = NULL;
170 #if defined(THREADED_RTS)
171 initCondition(&task->cond);
172 initMutex(&task->lock);
173 task->wakeup = rtsFalse;
176 #if defined(THREADED_RTS)
177 currentUserTime = getThreadCPUTime();
178 currentElapsedTime = getProcessElapsedTime();
183 task->muttimestart = currentUserTime;
184 task->elapsedtimestart = currentElapsedTime;
189 ACQUIRE_LOCK(&all_tasks_mutex);
191 task->all_link = all_tasks;
196 RELEASE_LOCK(&all_tasks_mutex);
201 // avoid the spare_incalls list growing unboundedly
202 #define MAX_SPARE_INCALLS 8
205 newInCall (Task *task)
209 if (task->spare_incalls != NULL) {
210 incall = task->spare_incalls;
211 task->spare_incalls = incall->next;
212 task->n_spare_incalls--;
214 incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
219 incall->suspended_tso = NULL;
220 incall->suspended_cap = NULL;
221 incall->stat = NoStatus;
225 incall->prev_stack = task->incall;
226 task->incall = incall;
230 endInCall (Task *task)
234 incall = task->incall;
236 task->incall = task->incall->prev_stack;
238 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
241 incall->next = task->spare_incalls;
242 task->spare_incalls = incall;
243 task->n_spare_incalls++;
253 if (!tasksInitialized) {
254 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
255 stg_exit(EXIT_FAILURE);
260 task->stopped = rtsFalse;
264 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
269 boundTaskExiting (Task *task)
271 task->stopped = rtsTrue;
273 #if defined(THREADED_RTS)
274 ASSERT(osThreadId() == task->id);
276 ASSERT(myTask() == task);
280 debugTrace(DEBUG_sched, "task exiting");
285 #define TASK_ID(t) (t)->id
287 #define TASK_ID(t) (t)
291 discardTasksExcept (Task *keep)
295 // Wipe the task list, except the current Task.
296 ACQUIRE_LOCK(&all_tasks_mutex);
297 for (task = all_tasks; task != NULL; task=next) {
298 next = task->all_link;
300 debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
305 keep->all_link = NULL;
306 RELEASE_LOCK(&all_tasks_mutex);
310 taskTimeStamp (Task *task USED_IF_THREADS)
312 #if defined(THREADED_RTS)
313 Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
315 currentUserTime = getThreadCPUTime();
316 currentElapsedTime = getProcessElapsedTime();
318 // XXX this is wrong; we want elapsed GC time since the
320 elapsedGCTime = stat_getElapsedGCTime();
323 currentUserTime - task->muttimestart - task->gc_time;
325 currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
327 if (task->mut_time < 0) { task->mut_time = 0; }
328 if (task->mut_etime < 0) { task->mut_etime = 0; }
332 #if defined(THREADED_RTS)
335 workerTaskStop (Task *task)
339 ASSERT(task->id == id);
340 ASSERT(myTask() == task);
344 task->stopped = rtsTrue;
349 #if defined(THREADED_RTS)
351 static void OSThreadProcAttr
352 workerStart(Task *task)
356 // See startWorkerTask().
357 ACQUIRE_LOCK(&task->lock);
359 RELEASE_LOCK(&task->lock);
361 if (RtsFlags.ParFlags.setAffinity) {
362 setThreadAffinity(cap->no, n_capabilities);
365 // set the thread-local pointer to the Task:
370 scheduleWorker(cap,task);
374 startWorkerTask (Capability *cap)
380 // A worker always gets a fresh Task structure.
381 task = newTask(rtsTrue);
383 // The lock here is to synchronise with taskStart(), to make sure
384 // that we have finished setting up the Task structure before the
385 // worker thread reads it.
386 ACQUIRE_LOCK(&task->lock);
390 // Give the capability directly to the worker; we can't let anyone
391 // else get in, because the new worker Task has nowhere to go to
392 // sleep so that it could be woken up again.
393 ASSERT_LOCK_HELD(&cap->lock);
394 cap->running_task = task;
396 r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
398 sysErrorBelch("failed to create OS thread");
399 stg_exit(EXIT_FAILURE);
402 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
406 // ok, finished with the Task struct.
407 RELEASE_LOCK(&task->lock);
410 #endif /* THREADED_RTS */
414 static void *taskId(Task *task)
417 return (void *)task->id;
423 void printAllTasks(void);
429 for (task = all_tasks; task != NULL; task = task->all_link) {
430 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
431 if (!task->stopped) {
433 debugBelch("on capability %d, ", task->cap->no);
435 if (task->incall->tso) {
436 debugBelch("bound to thread %lu",
437 (unsigned long)task->incall->tso->id);
439 debugBelch("worker");