X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FTask.c;h=a15758c4f1cdb73f2862306f95d3c75de10fa551;hp=9a8ebd69630b3864cb46bd27a7191e3328678509;hb=890f22ef8eff8dbb5b31fa221dfce65a7b84c202;hpb=a2a67cd520b9841114d69a87a423dabcb3b4368e diff --git a/rts/Task.c b/rts/Task.c index 9a8ebd6..a15758c 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -24,14 +24,19 @@ #endif // Task lists and global counters. -// Locks required: sched_mutex. +// Locks required: all_tasks_mutex. Task *all_tasks = NULL; -static Task *task_free_list = NULL; // singly-linked static nat taskCount; -static nat tasksRunning; -static nat workerCount; static int tasksInitialized = 0; +static void freeTask (Task *task); +static Task * allocTask (void); +static Task * newTask (rtsBool); + +#if defined(THREADED_RTS) +static Mutex all_tasks_mutex; +#endif + /* ----------------------------------------------------------------------------- * Remembering the current thread's Task * -------------------------------------------------------------------------- */ @@ -39,7 +44,11 @@ static int tasksInitialized = 0; // A thread-local-storage key that we can use to get access to the // current thread's Task structure. #if defined(THREADED_RTS) +# if defined(MYTASK_USE_TLV) +__thread Task *my_task; +# else ThreadLocalKey currentTaskKey; +# endif #else Task *my_task; #endif @@ -53,12 +62,13 @@ initTaskManager (void) { if (!tasksInitialized) { taskCount = 0; - workerCount = 0; - tasksRunning = 0; tasksInitialized = 1; #if defined(THREADED_RTS) +#if !defined(MYTASK_USE_TLV) newThreadLocalKey(¤tTaskKey); #endif + initMutex(&all_tasks_mutex); +#endif } } @@ -66,29 +76,28 @@ nat freeTaskManager (void) { Task *task, *next; + nat tasksRunning = 0; - ASSERT_LOCK_HELD(&sched_mutex); - - debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running", - tasksRunning); + ACQUIRE_LOCK(&all_tasks_mutex); for (task = all_tasks; task != NULL; task = next) { next = task->all_link; if (task->stopped) { - // We only free resources if the Task is not in use. A - // Task may still be in use if we have a Haskell thread in - // a foreign call while we are attempting to shut down the - // RTS (see conc059). -#if defined(THREADED_RTS) - closeCondition(&task->cond); - closeMutex(&task->lock); -#endif - stgFree(task); + freeTask(task); + } else { + tasksRunning++; } } + + debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running", + tasksRunning); + all_tasks = NULL; - task_free_list = NULL; -#if defined(THREADED_RTS) + + RELEASE_LOCK(&all_tasks_mutex); + +#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV) + closeMutex(&all_tasks_mutex); freeThreadLocalKey(¤tTaskKey); #endif @@ -97,9 +106,52 @@ freeTaskManager (void) return tasksRunning; } +static Task * +allocTask (void) +{ + Task *task; + + task = myTask(); + if (task != NULL) { + return task; + } else { + task = newTask(rtsFalse); +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + setMyTask(task); + return task; + } +} + +static void +freeTask (Task *task) +{ + InCall *incall, *next; + + // We only free resources if the Task is not in use. A + // Task may still be in use if we have a Haskell thread in + // a foreign call while we are attempting to shut down the + // RTS (see conc059). +#if defined(THREADED_RTS) + closeCondition(&task->cond); + closeMutex(&task->lock); +#endif + + for (incall = task->incall; incall != NULL; incall = next) { + next = incall->prev_stack; + stgFree(incall); + } + for (incall = task->spare_incalls; incall != NULL; incall = next) { + next = incall->next; + stgFree(incall); + } + + stgFree(task); +} static Task* -newTask (void) +newTask (rtsBool worker) { #if defined(THREADED_RTS) Ticks currentElapsedTime, currentUserTime; @@ -109,12 +161,13 @@ newTask (void) #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64) task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask"); - task->cap = NULL; - task->stopped = rtsFalse; - task->suspended_tso = NULL; - task->tso = NULL; - task->stat = NoStatus; - task->ret = NULL; + task->cap = NULL; + task->worker = worker; + task->stopped = rtsFalse; + task->running_finalizers = rtsFalse; + task->n_spare_incalls = 0; + task->spare_incalls = NULL; + task->incall = NULL; #if defined(THREADED_RTS) initCondition(&task->cond); @@ -133,18 +186,67 @@ newTask (void) task->elapsedtimestart = currentElapsedTime; #endif - task->prev = NULL; task->next = NULL; - task->return_link = NULL; + + ACQUIRE_LOCK(&all_tasks_mutex); task->all_link = all_tasks; all_tasks = task; taskCount++; + RELEASE_LOCK(&all_tasks_mutex); + return task; } +// avoid the spare_incalls list growing unboundedly +#define MAX_SPARE_INCALLS 8 + +static void +newInCall (Task *task) +{ + InCall *incall; + + if (task->spare_incalls != NULL) { + incall = task->spare_incalls; + task->spare_incalls = incall->next; + task->n_spare_incalls--; + } else { + incall = stgMallocBytes((sizeof(InCall)), "newBoundTask"); + } + + incall->tso = NULL; + incall->task = task; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; + incall->stat = NoStatus; + incall->ret = NULL; + incall->next = NULL; + incall->prev = NULL; + incall->prev_stack = task->incall; + task->incall = incall; +} + +static void +endInCall (Task *task) +{ + InCall *incall; + + incall = task->incall; + incall->tso = NULL; + task->incall = task->incall->prev_stack; + + if (task->n_spare_incalls >= MAX_SPARE_INCALLS) { + stgFree(incall); + } else { + incall->next = task->spare_incalls; + task->spare_incalls = incall; + task->n_spare_incalls++; + } +} + + Task * newBoundTask (void) { @@ -155,31 +257,11 @@ newBoundTask (void) stg_exit(EXIT_FAILURE); } - // ToDo: get rid of this lock in the common case. We could store - // a free Task in thread-local storage, for example. That would - // leave just one lock on the path into the RTS: cap->lock when - // acquiring the Capability. - ACQUIRE_LOCK(&sched_mutex); - - if (task_free_list == NULL) { - task = newTask(); - } else { - task = task_free_list; - task_free_list = task->next; - task->next = NULL; - task->prev = NULL; - task->stopped = rtsFalse; - } -#if defined(THREADED_RTS) - task->id = osThreadId(); -#endif - ASSERT(task->cap == NULL); + task = allocTask(); - tasksRunning++; - - taskEnter(task); + task->stopped = rtsFalse; - RELEASE_LOCK(&sched_mutex); + newInCall(task); debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount); return task; @@ -188,27 +270,19 @@ newBoundTask (void) void boundTaskExiting (Task *task) { - task->tso = NULL; task->stopped = rtsTrue; - task->cap = NULL; #if defined(THREADED_RTS) ASSERT(osThreadId() == task->id); #endif ASSERT(myTask() == task); - setMyTask(task->prev_stack); - tasksRunning--; - - // sadly, we need a lock around the free task list. Todo: eliminate. - ACQUIRE_LOCK(&sched_mutex); - task->next = task_free_list; - task_free_list = task; - RELEASE_LOCK(&sched_mutex); + endInCall(task); debugTrace(DEBUG_sched, "task exiting"); } + #ifdef THREADED_RTS #define TASK_ID(t) (t)->id #else @@ -216,22 +290,22 @@ boundTaskExiting (Task *task) #endif void -discardTask (Task *task) +discardTasksExcept (Task *keep) { - ASSERT_LOCK_HELD(&sched_mutex); - if (!task->stopped) { - debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); - task->cap = NULL; - if (task->tso == NULL) { - workerCount--; - } else { - task->tso = NULL; + Task *task, *next; + + // Wipe the task list, except the current Task. + ACQUIRE_LOCK(&all_tasks_mutex); + for (task = all_tasks; task != NULL; task=next) { + next = task->all_link; + if (task != keep) { + debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); + freeTask(task); } - task->stopped = rtsTrue; - tasksRunning--; - task->next = task_free_list; - task_free_list = task; } + all_tasks = keep; + keep->all_link = NULL; + RELEASE_LOCK(&all_tasks_mutex); } void @@ -270,33 +344,56 @@ workerTaskStop (Task *task) task->cap = NULL; taskTimeStamp(task); task->stopped = rtsTrue; - tasksRunning--; - workerCount--; +} - ACQUIRE_LOCK(&sched_mutex); - task->next = task_free_list; - task_free_list = task; - RELEASE_LOCK(&sched_mutex); +#endif + +#ifdef DEBUG + +static void *taskId(Task *task) +{ +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; +#endif } #endif #if defined(THREADED_RTS) +static void OSThreadProcAttr +workerStart(Task *task) +{ + Capability *cap; + + // See startWorkerTask(). + ACQUIRE_LOCK(&task->lock); + cap = task->cap; + RELEASE_LOCK(&task->lock); + + if (RtsFlags.ParFlags.setAffinity) { + setThreadAffinity(cap->no, n_capabilities); + } + + // set the thread-local pointer to the Task: + setMyTask(task); + + newInCall(task); + + scheduleWorker(cap,task); +} + void -startWorkerTask (Capability *cap, - void OSThreadProcAttr (*taskStart)(Task *task)) +startWorkerTask (Capability *cap) { int r; OSThreadId tid; Task *task; - workerCount++; - // A worker always gets a fresh Task structure. - task = newTask(); - - tasksRunning++; + task = newTask(rtsTrue); // The lock here is to synchronise with taskStart(), to make sure // that we have finished setting up the Task structure before the @@ -311,7 +408,7 @@ startWorkerTask (Capability *cap, ASSERT_LOCK_HELD(&cap->lock); cap->running_task = task; - r = createOSThread(&tid, (OSThreadProc *)taskStart, task); + r = createOSThread(&tid, (OSThreadProc*)workerStart, task); if (r != 0) { sysErrorBelch("failed to create OS thread"); stg_exit(EXIT_FAILURE); @@ -325,19 +422,19 @@ startWorkerTask (Capability *cap, RELEASE_LOCK(&task->lock); } +void +interruptWorkerTask (Task *task) +{ + ASSERT(osThreadId() != task->id); // seppuku not allowed + ASSERT(task->incall->suspended_tso); // use this only for FFI calls + interruptOSThread(task->id); + debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task)); +} + #endif /* THREADED_RTS */ #ifdef DEBUG -static void *taskId(Task *task) -{ -#ifdef THREADED_RTS - return (void *)task->id; -#else - return (void *)task; -#endif -} - void printAllTasks(void); void @@ -350,8 +447,9 @@ printAllTasks(void) if (task->cap) { debugBelch("on capability %d, ", task->cap->no); } - if (task->tso) { - debugBelch("bound to thread %lu", (unsigned long)task->tso->id); + if (task->incall->tso) { + debugBelch("bound to thread %lu", + (unsigned long)task->incall->tso->id); } else { debugBelch("worker"); }