1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 2001-2005
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
9 * -------------------------------------------------------------------------*/
11 #include "PosixSource.h"
16 #include "Capability.h"
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
30 static int tasksInitialized = 0;
32 static void freeTask (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask (rtsBool);
36 #if defined(THREADED_RTS)
37 static Mutex all_tasks_mutex;
40 /* -----------------------------------------------------------------------------
41 * Remembering the current thread's Task
42 * -------------------------------------------------------------------------- */
44 // A thread-local-storage key that we can use to get access to the
45 // current thread's Task structure.
46 #if defined(THREADED_RTS)
47 # if defined(MYTASK_USE_TLV)
48 __thread Task *my_task;
50 ThreadLocalKey currentTaskKey;
56 /* -----------------------------------------------------------------------------
57 * Rest of the Task API
58 * -------------------------------------------------------------------------- */
61 initTaskManager (void)
63 if (!tasksInitialized) {
66 #if defined(THREADED_RTS)
67 #if !defined(MYTASK_USE_TLV)
68 newThreadLocalKey(¤tTaskKey);
70 initMutex(&all_tasks_mutex);
76 freeTaskManager (void)
81 ACQUIRE_LOCK(&all_tasks_mutex);
83 for (task = all_tasks; task != NULL; task = next) {
84 next = task->all_link;
92 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
97 RELEASE_LOCK(&all_tasks_mutex);
99 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
100 closeMutex(&all_tasks_mutex);
101 freeThreadLocalKey(¤tTaskKey);
104 tasksInitialized = 0;
118 task = newTask(rtsFalse);
119 #if defined(THREADED_RTS)
120 task->id = osThreadId();
128 freeTask (Task *task)
130 InCall *incall, *next;
132 // We only free resources if the Task is not in use. A
133 // Task may still be in use if we have a Haskell thread in
134 // a foreign call while we are attempting to shut down the
135 // RTS (see conc059).
136 #if defined(THREADED_RTS)
137 closeCondition(&task->cond);
138 closeMutex(&task->lock);
141 for (incall = task->incall; incall != NULL; incall = next) {
142 next = incall->prev_stack;
145 for (incall = task->spare_incalls; incall != NULL; incall = next) {
154 newTask (rtsBool worker)
156 #if defined(THREADED_RTS)
157 Ticks currentElapsedTime, currentUserTime;
161 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
162 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
165 task->worker = worker;
166 task->stopped = rtsFalse;
167 task->running_finalizers = rtsFalse;
168 task->n_spare_incalls = 0;
169 task->spare_incalls = NULL;
172 #if defined(THREADED_RTS)
173 initCondition(&task->cond);
174 initMutex(&task->lock);
175 task->wakeup = rtsFalse;
178 #if defined(THREADED_RTS)
179 currentUserTime = getThreadCPUTime();
180 currentElapsedTime = getProcessElapsedTime();
185 task->muttimestart = currentUserTime;
186 task->elapsedtimestart = currentElapsedTime;
191 ACQUIRE_LOCK(&all_tasks_mutex);
193 task->all_link = all_tasks;
198 RELEASE_LOCK(&all_tasks_mutex);
203 // avoid the spare_incalls list growing unboundedly
204 #define MAX_SPARE_INCALLS 8
207 newInCall (Task *task)
211 if (task->spare_incalls != NULL) {
212 incall = task->spare_incalls;
213 task->spare_incalls = incall->next;
214 task->n_spare_incalls--;
216 incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
221 incall->suspended_tso = NULL;
222 incall->suspended_cap = NULL;
223 incall->stat = NoStatus;
227 incall->prev_stack = task->incall;
228 task->incall = incall;
232 endInCall (Task *task)
236 incall = task->incall;
238 task->incall = task->incall->prev_stack;
240 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
243 incall->next = task->spare_incalls;
244 task->spare_incalls = incall;
245 task->n_spare_incalls++;
255 if (!tasksInitialized) {
256 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
257 stg_exit(EXIT_FAILURE);
262 task->stopped = rtsFalse;
266 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
271 boundTaskExiting (Task *task)
273 #if defined(THREADED_RTS)
274 ASSERT(osThreadId() == task->id);
276 ASSERT(myTask() == task);
280 // Set task->stopped, but only if this is the last call (#4850).
281 // Remember that we might have a worker Task that makes a foreign
282 // call and then a callback, so it can transform into a bound
283 // Task for the duration of the callback.
284 if (task->incall == NULL) {
285 task->stopped = rtsTrue;
288 debugTrace(DEBUG_sched, "task exiting");
293 #define TASK_ID(t) (t)->id
295 #define TASK_ID(t) (t)
299 discardTasksExcept (Task *keep)
303 // Wipe the task list, except the current Task.
304 ACQUIRE_LOCK(&all_tasks_mutex);
305 for (task = all_tasks; task != NULL; task=next) {
306 next = task->all_link;
308 debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
313 keep->all_link = NULL;
314 RELEASE_LOCK(&all_tasks_mutex);
318 taskTimeStamp (Task *task USED_IF_THREADS)
320 #if defined(THREADED_RTS)
321 Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
323 currentUserTime = getThreadCPUTime();
324 currentElapsedTime = getProcessElapsedTime();
326 // XXX this is wrong; we want elapsed GC time since the
328 elapsedGCTime = stat_getElapsedGCTime();
331 currentUserTime - task->muttimestart - task->gc_time;
333 currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
335 if (task->mut_time < 0) { task->mut_time = 0; }
336 if (task->mut_etime < 0) { task->mut_etime = 0; }
340 #if defined(THREADED_RTS)
343 workerTaskStop (Task *task)
347 ASSERT(task->id == id);
348 ASSERT(myTask() == task);
352 task->stopped = rtsTrue;
359 static void *taskId(Task *task)
362 return (void *)task->id;
370 #if defined(THREADED_RTS)
372 static void OSThreadProcAttr
373 workerStart(Task *task)
377 // See startWorkerTask().
378 ACQUIRE_LOCK(&task->lock);
380 RELEASE_LOCK(&task->lock);
382 if (RtsFlags.ParFlags.setAffinity) {
383 setThreadAffinity(cap->no, n_capabilities);
386 // set the thread-local pointer to the Task:
391 scheduleWorker(cap,task);
395 startWorkerTask (Capability *cap)
401 // A worker always gets a fresh Task structure.
402 task = newTask(rtsTrue);
404 // The lock here is to synchronise with taskStart(), to make sure
405 // that we have finished setting up the Task structure before the
406 // worker thread reads it.
407 ACQUIRE_LOCK(&task->lock);
411 // Give the capability directly to the worker; we can't let anyone
412 // else get in, because the new worker Task has nowhere to go to
413 // sleep so that it could be woken up again.
414 ASSERT_LOCK_HELD(&cap->lock);
415 cap->running_task = task;
417 r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
419 sysErrorBelch("failed to create OS thread");
420 stg_exit(EXIT_FAILURE);
423 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
427 // ok, finished with the Task struct.
428 RELEASE_LOCK(&task->lock);
432 interruptWorkerTask (Task *task)
434 ASSERT(osThreadId() != task->id); // seppuku not allowed
435 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
436 interruptOSThread(task->id);
437 debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
440 #endif /* THREADED_RTS */
444 void printAllTasks(void);
450 for (task = all_tasks; task != NULL; task = task->all_link) {
451 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
452 if (!task->stopped) {
454 debugBelch("on capability %d, ", task->cap->no);
456 if (task->incall->tso) {
457 debugBelch("bound to thread %lu",
458 (unsigned long)task->incall->tso->id);
460 debugBelch("worker");