X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FTask.c;h=9a8ebd69630b3864cb46bd27a7191e3328678509;hb=d305c6b68e06368c2a8d89900a2123388fc39ae1;hp=7366480094da1554c1f34b0c85700a56f542cb60;hpb=0065d5ab628975892cea1ec7303f968c3338cbe1;p=ghc-hetmet.git diff --git a/rts/Task.c b/rts/Task.c index 7366480..9a8ebd6 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -8,15 +8,16 @@ * * -------------------------------------------------------------------------*/ +#include "PosixSource.h" #include "Rts.h" + #include "RtsUtils.h" -#include "OSThreads.h" #include "Task.h" #include "Capability.h" #include "Stats.h" -#include "RtsFlags.h" #include "Schedule.h" #include "Hash.h" +#include "Trace.h" #if HAVE_SIGNAL_H #include @@ -27,10 +28,9 @@ Task *all_tasks = NULL; static Task *task_free_list = NULL; // singly-linked static nat taskCount; -#define DEFAULT_MAX_WORKERS 64 -static nat maxWorkers; // we won't create more workers than this static nat tasksRunning; static nat workerCount; +static int tasksInitialized = 0; /* ----------------------------------------------------------------------------- * Remembering the current thread's Task @@ -51,25 +51,50 @@ Task *my_task; void initTaskManager (void) { - static int initialized = 0; - - if (!initialized) { + if (!tasksInitialized) { taskCount = 0; workerCount = 0; tasksRunning = 0; - maxWorkers = DEFAULT_MAX_WORKERS; - initialized = 1; + tasksInitialized = 1; #if defined(THREADED_RTS) newThreadLocalKey(¤tTaskKey); #endif } } - -void -stopTaskManager (void) +nat +freeTaskManager (void) { - IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning)); + Task *task, *next; + + ASSERT_LOCK_HELD(&sched_mutex); + + debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running", + tasksRunning); + + for (task = all_tasks; task != NULL; task = next) { + next = task->all_link; + if (task->stopped) { + // We only free resources if the Task is not in use. A + // Task may still be in use if we have a Haskell thread in + // a foreign call while we are attempting to shut down the + // RTS (see conc059). +#if defined(THREADED_RTS) + closeCondition(&task->cond); + closeMutex(&task->lock); +#endif + stgFree(task); + } + } + all_tasks = NULL; + task_free_list = NULL; +#if defined(THREADED_RTS) + freeThreadLocalKey(¤tTaskKey); +#endif + + tasksInitialized = 0; + + return tasksRunning; } @@ -81,7 +106,8 @@ newTask (void) #endif Task *task; - task = stgMallocBytes(sizeof(Task), "newTask"); +#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64) + task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask"); task->cap = NULL; task->stopped = rtsFalse; @@ -99,10 +125,10 @@ newTask (void) #if defined(THREADED_RTS) currentUserTime = getThreadCPUTime(); currentElapsedTime = getProcessElapsedTime(); - task->mut_time = 0.0; - task->mut_etime = 0.0; - task->gc_time = 0.0; - task->gc_etime = 0.0; + task->mut_time = 0; + task->mut_etime = 0; + task->gc_time = 0; + task->gc_etime = 0; task->muttimestart = currentUserTime; task->elapsedtimestart = currentElapsedTime; #endif @@ -115,7 +141,6 @@ newTask (void) all_tasks = task; taskCount++; - workerCount++; return task; } @@ -125,7 +150,17 @@ newBoundTask (void) { Task *task; - ASSERT_LOCK_HELD(&sched_mutex); + if (!tasksInitialized) { + errorBelch("newBoundTask: RTS is not initialised; call hs_init() first"); + stg_exit(EXIT_FAILURE); + } + + // ToDo: get rid of this lock in the common case. We could store + // a free Task in thread-local storage, for example. That would + // leave just one lock on the path into the RTS: cap->lock when + // acquiring the Capability. + ACQUIRE_LOCK(&sched_mutex); + if (task_free_list == NULL) { task = newTask(); } else { @@ -144,13 +179,16 @@ newBoundTask (void) taskEnter(task); - IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount);); + RELEASE_LOCK(&sched_mutex); + + debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount); return task; } void boundTaskExiting (Task *task) { + task->tso = NULL; task->stopped = rtsTrue; task->cap = NULL; @@ -168,7 +206,7 @@ boundTaskExiting (Task *task) task_free_list = task; RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler,sched_belch("task exiting")); + debugTrace(DEBUG_sched, "task exiting"); } #ifdef THREADED_RTS @@ -182,9 +220,13 @@ discardTask (Task *task) { ASSERT_LOCK_HELD(&sched_mutex); if (!task->stopped) { - IF_DEBUG(scheduler,sched_belch("discarding task %p", TASK_ID(task))); + debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); task->cap = NULL; - task->tso = NULL; + if (task->tso == NULL) { + workerCount--; + } else { + task->tso = NULL; + } task->stopped = rtsTrue; tasksRunning--; task->next = task_free_list; @@ -193,16 +235,11 @@ discardTask (Task *task) } void -taskStop (Task *task) +taskTimeStamp (Task *task USED_IF_THREADS) { #if defined(THREADED_RTS) - OSThreadId id; Ticks currentElapsedTime, currentUserTime, elapsedGCTime; - id = osThreadId(); - ASSERT(task->id == id); - ASSERT(myTask() == task); - currentUserTime = getThreadCPUTime(); currentElapsedTime = getProcessElapsedTime(); @@ -215,21 +252,35 @@ taskStop (Task *task) task->mut_etime = currentElapsedTime - task->elapsedtimestart - elapsedGCTime; - if (task->mut_time < 0.0) { task->mut_time = 0.0; } - if (task->mut_etime < 0.0) { task->mut_etime = 0.0; } + if (task->mut_time < 0) { task->mut_time = 0; } + if (task->mut_etime < 0) { task->mut_etime = 0; } #endif - - task->stopped = rtsTrue; - tasksRunning--; } +#if defined(THREADED_RTS) + void -resetTaskManagerAfterFork (void) +workerTaskStop (Task *task) { -#warning TODO! - taskCount = 0; + OSThreadId id; + id = osThreadId(); + ASSERT(task->id == id); + ASSERT(myTask() == task); + + task->cap = NULL; + taskTimeStamp(task); + task->stopped = rtsTrue; + tasksRunning--; + workerCount--; + + ACQUIRE_LOCK(&sched_mutex); + task->next = task_free_list; + task_free_list = task; + RELEASE_LOCK(&sched_mutex); } +#endif + #if defined(THREADED_RTS) void @@ -240,9 +291,6 @@ startWorkerTask (Capability *cap, OSThreadId tid; Task *task; - if (workerCount >= maxWorkers) { - barf("too many workers; runaway worker creation?"); - } workerCount++; // A worker always gets a fresh Task structure. @@ -265,10 +313,11 @@ startWorkerTask (Capability *cap, r = createOSThread(&tid, (OSThreadProc *)taskStart, task); if (r != 0) { - barf("startTask: Can't create new task"); + sysErrorBelch("failed to create OS thread"); + stg_exit(EXIT_FAILURE); } - IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount);); + debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount); task->id = tid; @@ -302,7 +351,7 @@ printAllTasks(void) debugBelch("on capability %d, ", task->cap->no); } if (task->tso) { - debugBelch("bound to thread %d", task->tso->id); + debugBelch("bound to thread %lu", (unsigned long)task->tso->id); } else { debugBelch("worker"); }