X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FTask.c;h=e77a030f39ba565eb89a0c8132ede83e6e923612;hp=8779e292ef7d14578f6e94d637c3483aa11f3cc3;hb=1fb38442d3a55ac92795aa6c5ed4df82011df724;hpb=c58450c0c62dc9bf74ecd5df3c07a3f1ce511e7f diff --git a/rts/Task.c b/rts/Task.c index 8779e29..e77a030 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -8,13 +8,13 @@ * * -------------------------------------------------------------------------*/ +#include "PosixSource.h" #include "Rts.h" + #include "RtsUtils.h" -#include "OSThreads.h" #include "Task.h" #include "Capability.h" #include "Stats.h" -#include "RtsFlags.h" #include "Schedule.h" #include "Hash.h" #include "Trace.h" @@ -24,14 +24,18 @@ #endif // Task lists and global counters. -// Locks required: sched_mutex. +// Locks required: all_tasks_mutex. Task *all_tasks = NULL; -static Task *task_free_list = NULL; // singly-linked static nat taskCount; -#define DEFAULT_MAX_WORKERS 64 -static nat maxWorkers; // we won't create more workers than this -static nat tasksRunning; -static nat workerCount; +static int tasksInitialized = 0; + +static void freeTask (Task *task); +static Task * allocTask (void); +static Task * newTask (rtsBool); + +#if defined(THREADED_RTS) +static Mutex all_tasks_mutex; +#endif /* ----------------------------------------------------------------------------- * Remembering the current thread's Task @@ -40,7 +44,11 @@ static nat workerCount; // A thread-local-storage key that we can use to get access to the // current thread's Task structure. #if defined(THREADED_RTS) +# if defined(MYTASK_USE_TLV) +__thread Task *my_task; +# else ThreadLocalKey currentTaskKey; +# endif #else Task *my_task; #endif @@ -52,50 +60,114 @@ Task *my_task; void initTaskManager (void) { - static int initialized = 0; - - if (!initialized) { + if (!tasksInitialized) { taskCount = 0; - workerCount = 0; - tasksRunning = 0; -#if defined(THREADED_RTS) - maxWorkers = DEFAULT_MAX_WORKERS * RtsFlags.ParFlags.nNodes; -#else - maxWorkers = DEFAULT_MAX_WORKERS; -#endif - initialized = 1; + tasksInitialized = 1; #if defined(THREADED_RTS) +#if !defined(MYTASK_USE_TLV) newThreadLocalKey(¤tTaskKey); #endif + initMutex(&all_tasks_mutex); +#endif } } +nat +freeTaskManager (void) +{ + Task *task, *next; + nat tasksRunning = 0; + + ACQUIRE_LOCK(&all_tasks_mutex); + + for (task = all_tasks; task != NULL; task = next) { + next = task->all_link; + if (task->stopped) { + freeTask(task); + } else { + tasksRunning++; + } + } -void -stopTaskManager (void) + debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running", + tasksRunning); + + all_tasks = NULL; + + RELEASE_LOCK(&all_tasks_mutex); + +#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV) + closeMutex(&all_tasks_mutex); + freeThreadLocalKey(¤tTaskKey); +#endif + + tasksInitialized = 0; + + return tasksRunning; +} + +static Task * +allocTask (void) { - debugTrace(DEBUG_sched, - "stopping task manager, %d tasks still running", - tasksRunning); + Task *task; + + task = myTask(); + if (task != NULL) { + return task; + } else { + task = newTask(rtsFalse); +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + setMyTask(task); + return task; + } } +static void +freeTask (Task *task) +{ + InCall *incall, *next; + + // We only free resources if the Task is not in use. A + // Task may still be in use if we have a Haskell thread in + // a foreign call while we are attempting to shut down the + // RTS (see conc059). +#if defined(THREADED_RTS) + closeCondition(&task->cond); + closeMutex(&task->lock); +#endif + + for (incall = task->incall; incall != NULL; incall = next) { + next = incall->prev_stack; + stgFree(incall); + } + for (incall = task->spare_incalls; incall != NULL; incall = next) { + next = incall->next; + stgFree(incall); + } + + stgFree(task); +} static Task* -newTask (void) +newTask (rtsBool worker) { #if defined(THREADED_RTS) Ticks currentElapsedTime, currentUserTime; #endif Task *task; - task = stgMallocBytes(sizeof(Task), "newTask"); +#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64) + task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask"); - task->cap = NULL; - task->stopped = rtsFalse; - task->suspended_tso = NULL; - task->tso = NULL; - task->stat = NoStatus; - task->ret = NULL; + task->cap = NULL; + task->worker = worker; + task->stopped = rtsFalse; + task->running_finalizers = rtsFalse; + task->n_spare_incalls = 0; + task->spare_incalls = NULL; + task->incall = NULL; #if defined(THREADED_RTS) initCondition(&task->cond); @@ -114,42 +186,82 @@ newTask (void) task->elapsedtimestart = currentElapsedTime; #endif - task->prev = NULL; task->next = NULL; - task->return_link = NULL; + + ACQUIRE_LOCK(&all_tasks_mutex); task->all_link = all_tasks; all_tasks = task; taskCount++; - workerCount++; + + RELEASE_LOCK(&all_tasks_mutex); return task; } +// avoid the spare_incalls list growing unboundedly +#define MAX_SPARE_INCALLS 8 + +static void +newInCall (Task *task) +{ + InCall *incall; + + if (task->spare_incalls != NULL) { + incall = task->spare_incalls; + task->spare_incalls = incall->next; + task->n_spare_incalls--; + } else { + incall = stgMallocBytes((sizeof(InCall)), "newBoundTask"); + } + + incall->tso = NULL; + incall->task = task; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; + incall->stat = NoStatus; + incall->ret = NULL; + incall->next = NULL; + incall->prev = NULL; + incall->prev_stack = task->incall; + task->incall = incall; +} + +static void +endInCall (Task *task) +{ + InCall *incall; + + incall = task->incall; + incall->tso = NULL; + task->incall = task->incall->prev_stack; + + if (task->n_spare_incalls >= MAX_SPARE_INCALLS) { + stgFree(incall); + } else { + incall->next = task->spare_incalls; + task->spare_incalls = incall; + task->n_spare_incalls++; + } +} + + Task * newBoundTask (void) { Task *task; - ASSERT_LOCK_HELD(&sched_mutex); - if (task_free_list == NULL) { - task = newTask(); - } else { - task = task_free_list; - task_free_list = task->next; - task->next = NULL; - task->prev = NULL; - task->stopped = rtsFalse; + if (!tasksInitialized) { + errorBelch("newBoundTask: RTS is not initialised; call hs_init() first"); + stg_exit(EXIT_FAILURE); } -#if defined(THREADED_RTS) - task->id = osThreadId(); -#endif - ASSERT(task->cap == NULL); - tasksRunning++; + task = allocTask(); + + task->stopped = rtsFalse; - taskEnter(task); + newInCall(task); debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount); return task; @@ -158,26 +270,25 @@ newBoundTask (void) void boundTaskExiting (Task *task) { - task->stopped = rtsTrue; - task->cap = NULL; - #if defined(THREADED_RTS) ASSERT(osThreadId() == task->id); #endif ASSERT(myTask() == task); - setMyTask(task->prev_stack); - tasksRunning--; + endInCall(task); - // sadly, we need a lock around the free task list. Todo: eliminate. - ACQUIRE_LOCK(&sched_mutex); - task->next = task_free_list; - task_free_list = task; - RELEASE_LOCK(&sched_mutex); + // Set task->stopped, but only if this is the last call (#4850). + // Remember that we might have a worker Task that makes a foreign + // call and then a callback, so it can transform into a bound + // Task for the duration of the callback. + if (task->incall == NULL) { + task->stopped = rtsTrue; + } debugTrace(DEBUG_sched, "task exiting"); } + #ifdef THREADED_RTS #define TASK_ID(t) (t)->id #else @@ -185,84 +296,115 @@ boundTaskExiting (Task *task) #endif void -discardTask (Task *task) +discardTasksExcept (Task *keep) { - ASSERT_LOCK_HELD(&sched_mutex); - if (!task->stopped) { - debugTrace(DEBUG_sched, "discarding task %p", TASK_ID(task)); - task->cap = NULL; - task->tso = NULL; - task->stopped = rtsTrue; - tasksRunning--; - task->next = task_free_list; - task_free_list = task; + Task *task, *next; + + // Wipe the task list, except the current Task. + ACQUIRE_LOCK(&all_tasks_mutex); + for (task = all_tasks; task != NULL; task=next) { + next = task->all_link; + if (task != keep) { + debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); + freeTask(task); + } } + all_tasks = keep; + keep->all_link = NULL; + RELEASE_LOCK(&all_tasks_mutex); } void taskTimeStamp (Task *task USED_IF_THREADS) { #if defined(THREADED_RTS) - Ticks currentElapsedTime, currentUserTime, elapsedGCTime; + Ticks currentElapsedTime, currentUserTime; currentUserTime = getThreadCPUTime(); currentElapsedTime = getProcessElapsedTime(); - // XXX this is wrong; we want elapsed GC time since the - // Task started. - elapsedGCTime = stat_getElapsedGCTime(); - - task->mut_time = + task->mut_time = currentUserTime - task->muttimestart - task->gc_time; task->mut_etime = - currentElapsedTime - task->elapsedtimestart - elapsedGCTime; + currentElapsedTime - task->elapsedtimestart - task->gc_etime; + if (task->gc_time < 0) { task->gc_time = 0; } + if (task->gc_etime < 0) { task->gc_etime = 0; } if (task->mut_time < 0) { task->mut_time = 0; } if (task->mut_etime < 0) { task->mut_etime = 0; } #endif } void -workerTaskStop (Task *task) +taskDoneGC (Task *task, Ticks cpu_time, Ticks elapsed_time) { + task->gc_time += cpu_time; + task->gc_etime += elapsed_time; +} + #if defined(THREADED_RTS) + +void +workerTaskStop (Task *task) +{ OSThreadId id; id = osThreadId(); ASSERT(task->id == id); ASSERT(myTask() == task); -#endif + task->cap = NULL; taskTimeStamp(task); task->stopped = rtsTrue; - tasksRunning--; } -void -resetTaskManagerAfterFork (void) +#endif + +#ifdef DEBUG + +static void *taskId(Task *task) { -#warning TODO! - taskCount = 0; +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; +#endif } +#endif + #if defined(THREADED_RTS) +static void OSThreadProcAttr +workerStart(Task *task) +{ + Capability *cap; + + // See startWorkerTask(). + ACQUIRE_LOCK(&task->lock); + cap = task->cap; + RELEASE_LOCK(&task->lock); + + if (RtsFlags.ParFlags.setAffinity) { + setThreadAffinity(cap->no, n_capabilities); + } + + // set the thread-local pointer to the Task: + setMyTask(task); + + newInCall(task); + + scheduleWorker(cap,task); +} + void -startWorkerTask (Capability *cap, - void OSThreadProcAttr (*taskStart)(Task *task)) +startWorkerTask (Capability *cap) { int r; OSThreadId tid; Task *task; - if (workerCount >= maxWorkers) { - barf("too many workers; runaway worker creation?"); - } - workerCount++; - // A worker always gets a fresh Task structure. - task = newTask(); - - tasksRunning++; + task = newTask(rtsTrue); // The lock here is to synchronise with taskStart(), to make sure // that we have finished setting up the Task structure before the @@ -277,9 +419,10 @@ startWorkerTask (Capability *cap, ASSERT_LOCK_HELD(&cap->lock); cap->running_task = task; - r = createOSThread(&tid, (OSThreadProc *)taskStart, task); + r = createOSThread(&tid, (OSThreadProc*)workerStart, task); if (r != 0) { - barf("startTask: Can't create new task"); + sysErrorBelch("failed to create OS thread"); + stg_exit(EXIT_FAILURE); } debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount); @@ -290,19 +433,19 @@ startWorkerTask (Capability *cap, RELEASE_LOCK(&task->lock); } +void +interruptWorkerTask (Task *task) +{ + ASSERT(osThreadId() != task->id); // seppuku not allowed + ASSERT(task->incall->suspended_tso); // use this only for FFI calls + interruptOSThread(task->id); + debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task)); +} + #endif /* THREADED_RTS */ #ifdef DEBUG -static void *taskId(Task *task) -{ -#ifdef THREADED_RTS - return (void *)task->id; -#else - return (void *)task; -#endif -} - void printAllTasks(void); void @@ -315,8 +458,9 @@ printAllTasks(void) if (task->cap) { debugBelch("on capability %d, ", task->cap->no); } - if (task->tso) { - debugBelch("bound to thread %d", task->tso->id); + if (task->incall->tso) { + debugBelch("bound to thread %lu", + (unsigned long)task->incall->tso->id); } else { debugBelch("worker"); }