X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FTask.c;h=7366480094da1554c1f34b0c85700a56f542cb60;hb=28a464a75e14cece5db40f2765a29348273ff2d2;hp=bf20e999c314a0147fcc8321863ce4ab7e63c819;hpb=3470e75bd62fa08f86da2607a58c7f0b4aeba9db;p=ghc-hetmet.git diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index bf20e99..7366480 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -1,195 +1,315 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 2001- + * (c) The GHC Team 2001-2005 * * The task manager subsystem. Tasks execute STG code, with this * module providing the API which the Scheduler uses to control their * creation and destruction. - * - * Two kinds of RTS builds uses 'tasks' - the SMP and the - * 'native thread-friendly' builds. * - * The SMP build lets multiple tasks concurrently execute STG code, - * all sharing vital internal RTS data structures in a controlled manner - * (see details elsewhere...ToDo: fill in ref!) - * - * The 'threads' build has at any one time only one task executing STG - * code, other tasks are either busy executing code outside the RTS - * (e.g., a C call) or waiting for their turn to (again) evaluate some - * STG code. A task relinquishes its RTS token when it is asked to - * evaluate an external (C) call. - * * -------------------------------------------------------------------------*/ + #include "Rts.h" -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ #include "RtsUtils.h" #include "OSThreads.h" #include "Task.h" +#include "Capability.h" #include "Stats.h" #include "RtsFlags.h" #include "Schedule.h" +#include "Hash.h" -/* There's not all that much code that is shared between the - * SMP and threads version of the 'task manager.' A sign - * that the code ought to be structured differently..(Maybe ToDo). - */ - -/* - * The following Task Manager-local variables are assumed to be - * accessed with the RTS lock in hand. - */ -#if defined(SMP) -static TaskInfo* taskTable; +#if HAVE_SIGNAL_H +#include #endif -/* upper bound / the number of tasks created. */ -static nat maxTasks; -/* number of tasks currently created */ -static nat taskCount; +// Task lists and global counters. +// Locks required: sched_mutex. +Task *all_tasks = NULL; +static Task *task_free_list = NULL; // singly-linked +static nat taskCount; +#define DEFAULT_MAX_WORKERS 64 +static nat maxWorkers; // we won't create more workers than this +static nat tasksRunning; +static nat workerCount; + +/* ----------------------------------------------------------------------------- + * Remembering the current thread's Task + * -------------------------------------------------------------------------- */ + +// A thread-local-storage key that we can use to get access to the +// current thread's Task structure. +#if defined(THREADED_RTS) +ThreadLocalKey currentTaskKey; +#else +Task *my_task; +#endif + +/* ----------------------------------------------------------------------------- + * Rest of the Task API + * -------------------------------------------------------------------------- */ -#if defined(SMP) void -startTaskManager( nat maxCount, void (*taskStart)(void) ) +initTaskManager (void) { - nat i; - static int initialized = 0; - - if (!initialized) { - - taskCount = 0; - maxTasks = maxCount; - /* allocate table holding task metadata */ - - if (maxCount > 0) { - taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo), - "startTaskManager:tasks"); - - /* and eagerly create them all. */ - for (i = 0; i < maxCount; i++) { - startTask(taskStart); - taskCount++; - } + static int initialized = 0; + + if (!initialized) { + taskCount = 0; + workerCount = 0; + tasksRunning = 0; + maxWorkers = DEFAULT_MAX_WORKERS; + initialized = 1; +#if defined(THREADED_RTS) + newThreadLocalKey(¤tTaskKey); +#endif } - initialized = 1; - } } + void -startTask ( void (*taskStart)(void) ) +stopTaskManager (void) { - int r; - OSThreadId tid; + IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning)); +} - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTask: Can't create new task"); - } - taskTable[taskCount].id = tid; - taskTable[taskCount].mut_time = 0.0; - taskTable[taskCount].mut_etime = 0.0; - taskTable[taskCount].gc_time = 0.0; - taskTable[taskCount].gc_etime = 0.0; - taskTable[taskCount].elapsedtimestart = stat_getElapsedTime(); +static Task* +newTask (void) +{ +#if defined(THREADED_RTS) + Ticks currentElapsedTime, currentUserTime; +#endif + Task *task; + + task = stgMallocBytes(sizeof(Task), "newTask"); + + task->cap = NULL; + task->stopped = rtsFalse; + task->suspended_tso = NULL; + task->tso = NULL; + task->stat = NoStatus; + task->ret = NULL; + +#if defined(THREADED_RTS) + initCondition(&task->cond); + initMutex(&task->lock); + task->wakeup = rtsFalse; +#endif - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid);); - return; +#if defined(THREADED_RTS) + currentUserTime = getThreadCPUTime(); + currentElapsedTime = getProcessElapsedTime(); + task->mut_time = 0.0; + task->mut_etime = 0.0; + task->gc_time = 0.0; + task->gc_etime = 0.0; + task->muttimestart = currentUserTime; + task->elapsedtimestart = currentElapsedTime; +#endif + + task->prev = NULL; + task->next = NULL; + task->return_link = NULL; + + task->all_link = all_tasks; + all_tasks = task; + + taskCount++; + workerCount++; + + return task; +} + +Task * +newBoundTask (void) +{ + Task *task; + + ASSERT_LOCK_HELD(&sched_mutex); + if (task_free_list == NULL) { + task = newTask(); + } else { + task = task_free_list; + task_free_list = task->next; + task->next = NULL; + task->prev = NULL; + task->stopped = rtsFalse; + } +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + ASSERT(task->cap == NULL); + + tasksRunning++; + + taskEnter(task); + + IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount);); + return task; } void -stopTaskManager () +boundTaskExiting (Task *task) { - nat i; - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(taskTable[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < maxCount; i++) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", - taskTable[i].id)); - pthread_join(taskTable[i].id, NULL); - } + task->stopped = rtsTrue; + task->cap = NULL; + +#if defined(THREADED_RTS) + ASSERT(osThreadId() == task->id); #endif + ASSERT(myTask() == task); + setMyTask(task->prev_stack); - /* Send 'em all a SIGHUP. That should shut 'em up. */ - await_death = maxCount; - for (i = 0; i < maxCount; i++) { - pthread_kill(taskTable[i].id,SIGTERM); - } - while (await_death > 0) { - sched_yield(); - } - - return; + tasksRunning--; + + // sadly, we need a lock around the free task list. Todo: eliminate. + ACQUIRE_LOCK(&sched_mutex); + task->next = task_free_list; + task_free_list = task; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler,sched_belch("task exiting")); } +#ifdef THREADED_RTS +#define TASK_ID(t) (t)->id #else -/************ THREADS version *****************/ +#define TASK_ID(t) (t) +#endif + +void +discardTask (Task *task) +{ + ASSERT_LOCK_HELD(&sched_mutex); + if (!task->stopped) { + IF_DEBUG(scheduler,sched_belch("discarding task %p", TASK_ID(task))); + task->cap = NULL; + task->tso = NULL; + task->stopped = rtsTrue; + tasksRunning--; + task->next = task_free_list; + task_free_list = task; + } +} + +void +taskStop (Task *task) +{ +#if defined(THREADED_RTS) + OSThreadId id; + Ticks currentElapsedTime, currentUserTime, elapsedGCTime; + + id = osThreadId(); + ASSERT(task->id == id); + ASSERT(myTask() == task); + + currentUserTime = getThreadCPUTime(); + currentElapsedTime = getProcessElapsedTime(); + + // XXX this is wrong; we want elapsed GC time since the + // Task started. + elapsedGCTime = stat_getElapsedGCTime(); + + task->mut_time = + currentUserTime - task->muttimestart - task->gc_time; + task->mut_etime = + currentElapsedTime - task->elapsedtimestart - elapsedGCTime; + + if (task->mut_time < 0.0) { task->mut_time = 0.0; } + if (task->mut_etime < 0.0) { task->mut_etime = 0.0; } +#endif + + task->stopped = rtsTrue; + tasksRunning--; +} void -startTaskManager( nat maxCount, void (*taskStart)(void) ) +resetTaskManagerAfterFork (void) { - /* In the threaded case, maxCount is used to limit the - the creation of worker tasks. Tasks are created lazily, i.e., - when the current task gives up the token on executing - STG code. - */ - maxTasks = maxCount; - taskCount = 0; +#warning TODO! + taskCount = 0; } +#if defined(THREADED_RTS) + void -startTask ( void (*taskStart)(void) ) +startWorkerTask (Capability *cap, + void OSThreadProcAttr (*taskStart)(Task *task)) { int r; OSThreadId tid; - - /* Locks assumed: rts_mutex */ - - /* If more than one worker thread is known to be blocked waiting - on thread_ready_cond, signal it rather than creating a new one. - */ - if ( rts_n_waiting_tasks > 0) { - IF_DEBUG(scheduler,fprintf(stderr, - "scheduler: startTask: %d tasks waiting, not creating new one.\n", - rts_n_waiting_tasks);); - signalCondition(&thread_ready_cond); - /* Not needed, but gives more 'interesting' thread schedules when testing */ - yieldThread(); - return; - } + Task *task; - /* If the task limit has been reached, just return. */ - if (maxTasks > 0 && taskCount == maxTasks) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: startTask: task limit (%d) reached, not creating new one.\n",maxTasks)); - return; + if (workerCount >= maxWorkers) { + barf("too many workers; runaway worker creation?"); } - + workerCount++; + + // A worker always gets a fresh Task structure. + task = newTask(); + + tasksRunning++; + + // The lock here is to synchronise with taskStart(), to make sure + // that we have finished setting up the Task structure before the + // worker thread reads it. + ACQUIRE_LOCK(&task->lock); - r = createOSThread(&tid,taskStart); + task->cap = cap; + + // Give the capability directly to the worker; we can't let anyone + // else get in, because the new worker Task has nowhere to go to + // sleep so that it could be woken up again. + ASSERT_LOCK_HELD(&cap->lock); + cap->running_task = task; + + r = createOSThread(&tid, (OSThreadProc *)taskStart, task); if (r != 0) { barf("startTask: Can't create new task"); } - taskCount++; - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: startTask: new task %ld (total_count: %d; waiting: %d)\n", tid, taskCount, rts_n_waiting_tasks);); - return; + IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount);); + + task->id = tid; + + // ok, finished with the Task struct. + RELEASE_LOCK(&task->lock); } +#endif /* THREADED_RTS */ + +#ifdef DEBUG + +static void *taskId(Task *task) +{ +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; +#endif +} + +void printAllTasks(void); + void -stopTaskManager () +printAllTasks(void) { + Task *task; + for (task = all_tasks; task != NULL; task = task->all_link) { + debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive"); + if (!task->stopped) { + if (task->cap) { + debugBelch("on capability %d, ", task->cap->no); + } + if (task->tso) { + debugBelch("bound to thread %d", task->tso->id); + } else { + debugBelch("worker"); + } + } + debugBelch("\n"); + } +} -} #endif - -#endif /* RTS_SUPPORTS_THREADS */