X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FTask.c;h=89db782800522bd8412e64d539c5d89d8c3cbeaf;hb=bbe90cbe14b94899efe0ce24e0c5fdbdb8d40ada;hp=19ae7999d3371f9bf99099a6ea40ed66e6efa4e9;hpb=80c55dc792abc20e8f316f5f4f90009322be8e34;p=ghc-hetmet.git diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index 19ae799..89db782 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -1,242 +1,315 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 2001- + * (c) The GHC Team 2001-2005 * * The task manager subsystem. Tasks execute STG code, with this * module providing the API which the Scheduler uses to control their * creation and destruction. - * - * Two kinds of RTS builds uses 'tasks' - the SMP and the - * 'native thread-friendly' builds. * - * The SMP build lets multiple tasks concurrently execute STG code, - * all sharing vital internal RTS data structures in a controlled manner - * (see details elsewhere...ToDo: fill in ref!) - * - * The 'threads' build has at any one time only one task executing STG - * code, other tasks are either busy executing code outside the RTS - * (e.g., a C call) or waiting for their turn to (again) evaluate some - * STG code. A task relinquishes its RTS token when it is asked to - * evaluate an external (C) call. - * * -------------------------------------------------------------------------*/ + #include "Rts.h" -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ #include "RtsUtils.h" #include "OSThreads.h" #include "Task.h" +#include "Capability.h" #include "Stats.h" #include "RtsFlags.h" +#include "Schedule.h" +#include "Hash.h" + +#if HAVE_SIGNAL_H +#include +#endif -/* There's not all that much code that is shared between the - * SMP and threads version of the 'task manager.' A sign - * that the code ought to be structured differently..(Maybe ToDo). - */ - -/* - * The following Task Manager-local variables are assumed to be - * accessed with the RTS lock in hand. - */ -#if defined(SMP) -static TaskInfo* taskTable; +// Task lists and global counters. +// Locks required: sched_mutex. +Task *all_tasks = NULL; +static Task *task_free_list = NULL; // singly-linked +static nat taskCount; +#define DEFAULT_MAX_WORKERS 64 +static nat maxWorkers; // we won't create more workers than this +static nat tasksRunning; +static nat workerCount; + +/* ----------------------------------------------------------------------------- + * Remembering the current thread's Task + * -------------------------------------------------------------------------- */ + +// A thread-local-storage key that we can use to get access to the +// current thread's Task structure. +#if defined(THREADED_RTS) +ThreadLocalKey currentTaskKey; +#else +Task *my_task; #endif -/* upper bound / the number of tasks created. */ -static nat maxTasks; -/* number of tasks currently created */ -static nat taskCount; -static nat tasksAvailable; +/* ----------------------------------------------------------------------------- + * Rest of the Task API + * -------------------------------------------------------------------------- */ -#if defined(SMP) void -startTaskManager( nat maxCount, void (*taskStart)(void) ) +initTaskManager (void) { - nat i; - static int initialized = 0; - - if (!initialized) { - - taskCount = 0; - maxTasks = maxCount; - /* allocate table holding task metadata */ - - if (maxCount > 0) { - taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo), - "startTaskManager:tasks"); - - /* and eagerly create them all. */ - for (i = 0; i < maxCount; i++) { - startTask(taskStart); - taskCount++; - } + static int initialized = 0; + + if (!initialized) { + taskCount = 0; + workerCount = 0; + tasksRunning = 0; + maxWorkers = DEFAULT_MAX_WORKERS; + initialized = 1; +#if defined(THREADED_RTS) + newThreadLocalKey(¤tTaskKey); +#endif } - initialized = 1; - } } + void -startTask ( void (*taskStart)(void) ) +stopTaskManager (void) { - int r; - OSThreadId tid; + IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning)); +} - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTask: Can't create new task"); - } - taskTable[taskCount].id = tid; - taskTable[taskCount].mut_time = 0.0; - taskTable[taskCount].mut_etime = 0.0; - taskTable[taskCount].gc_time = 0.0; - taskTable[taskCount].gc_etime = 0.0; - taskTable[taskCount].elapsedtimestart = stat_getElapsedTime(); +static Task* +newTask (void) +{ +#if defined(THREADED_RTS) + Ticks currentElapsedTime, currentUserTime; +#endif + Task *task; + + task = stgMallocBytes(sizeof(Task), "newTask"); + + task->cap = NULL; + task->stopped = rtsFalse; + task->suspended_tso = NULL; + task->tso = NULL; + task->stat = NoStatus; + task->ret = NULL; + +#if defined(THREADED_RTS) + initCondition(&task->cond); + initMutex(&task->lock); + task->wakeup = rtsFalse; +#endif + +#if defined(THREADED_RTS) + currentUserTime = getThreadCPUTime(); + currentElapsedTime = getProcessElapsedTime(); + task->mut_time = 0.0; + task->mut_etime = 0.0; + task->gc_time = 0.0; + task->gc_etime = 0.0; + task->muttimestart = currentUserTime; + task->elapsedtimestart = currentElapsedTime; +#endif + + task->prev = NULL; + task->next = NULL; + task->return_link = NULL; + + task->all_link = all_tasks; + all_tasks = task; + + taskCount++; + workerCount++; + + return task; +} + +Task * +newBoundTask (void) +{ + Task *task; + + ASSERT_LOCK_HELD(&sched_mutex); + if (task_free_list == NULL) { + task = newTask(); + } else { + task = task_free_list; + task_free_list = task->next; + task->next = NULL; + task->prev = NULL; + task->stopped = rtsFalse; + } +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + ASSERT(task->cap == NULL); + + tasksRunning++; + + taskEnter(task); - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid);); - return; + IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount);); + return task; } void -stopTaskManager () +boundTaskExiting (Task *task) { - nat i; - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(taskTable[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < maxCount; i++) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", - taskTable[i].id)); - pthread_join(taskTable[i].id, NULL); - } + task->stopped = rtsTrue; + task->cap = NULL; + +#if defined(THREADED_RTS) + ASSERT(osThreadId() == task->id); #endif + ASSERT(myTask() == task); + setMyTask(task->prev_stack); - /* Send 'em all a SIGHUP. That should shut 'em up. */ - await_death = maxCount; - for (i = 0; i < maxCount; i++) { - pthread_kill(taskTable[i].id,SIGTERM); - } - while (await_death > 0) { - sched_yield(); - } - - return; + tasksRunning--; + + // sadly, we need a lock around the free task list. Todo: eliminate. + ACQUIRE_LOCK(&sched_mutex); + task->next = task_free_list; + task_free_list = task; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler,sched_belch("task exiting")); } +void +discardTask (Task *task) +{ + ASSERT_LOCK_HELD(&sched_mutex); + if (!task->stopped) { + IF_DEBUG(scheduler,sched_belch("discarding task %p", +#ifdef THREADED_RTS + (void *)task->id #else -/************ THREADS version *****************/ + (void *)task +#endif + )); + task->cap = NULL; + task->tso = NULL; + task->stopped = rtsTrue; + tasksRunning--; + task->next = task_free_list; + task_free_list = task; + } +} + +void +taskStop (Task *task) +{ +#if defined(THREADED_RTS) + OSThreadId id; + Ticks currentElapsedTime, currentUserTime, elapsedGCTime; + + id = osThreadId(); + ASSERT(task->id == id); + ASSERT(myTask() == task); + + currentUserTime = getThreadCPUTime(); + currentElapsedTime = getProcessElapsedTime(); + + // XXX this is wrong; we want elapsed GC time since the + // Task started. + elapsedGCTime = stat_getElapsedGCTime(); + + task->mut_time = + currentUserTime - task->muttimestart - task->gc_time; + task->mut_etime = + currentElapsedTime - task->elapsedtimestart - elapsedGCTime; + + if (task->mut_time < 0.0) { task->mut_time = 0.0; } + if (task->mut_etime < 0.0) { task->mut_etime = 0.0; } +#endif + + task->stopped = rtsTrue; + tasksRunning--; +} void -startTaskManager( nat maxCount, void (*taskStart)(void) ) +resetTaskManagerAfterFork (void) { - /* In the threaded case, maxCount is used to limit the - the creation of worker tasks. Tasks are created lazily, i.e., - when the current task gives up the token on executing - STG code. - */ - maxTasks = maxCount; - taskCount = 0; - tasksAvailable = 0; +#warning TODO! + taskCount = 0; } +#if defined(THREADED_RTS) + void -startTask ( void (*taskStart)(void) ) +startWorkerTask (Capability *cap, + void OSThreadProcAttr (*taskStart)(Task *task)) { int r; OSThreadId tid; - - /* Locks assumed: rts_mutex */ - - /* If there are threads known to be waiting to do - useful work, no need to create a new task. */ - if (tasksAvailable > 0) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: startTask: %d tasks available, not creating new one.\n",tasksAvailable);); - return; - } + Task *task; - /* If the task limit has been reached, just return. */ - if (maxTasks > 0 && taskCount == maxTasks) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: startTask: task limit (%d) reached, not creating new one.\n",maxTasks)); - return; + if (workerCount >= maxWorkers) { + barf("too many workers; runaway worker creation?"); } - + workerCount++; + + // A worker always gets a fresh Task structure. + task = newTask(); + + tasksRunning++; - r = createOSThread(&tid,taskStart); + // The lock here is to synchronise with taskStart(), to make sure + // that we have finished setting up the Task structure before the + // worker thread reads it. + ACQUIRE_LOCK(&task->lock); + + task->cap = cap; + + // Give the capability directly to the worker; we can't let anyone + // else get in, because the new worker Task has nowhere to go to + // sleep so that it could be woken up again. + ASSERT_LOCK_HELD(&cap->lock); + cap->running_task = task; + + r = createOSThread(&tid, (OSThreadProc *)taskStart, task); if (r != 0) { barf("startTask: Can't create new task"); } - taskCount++; - tasksAvailable++; - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task (%d): %ld\n", taskCount, tid);); - return; -} + IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount);); -/* - * When the RTS thread ends up performing a call-out, - * we need to know whether there'll be other tasks/threads - * to take over RTS responsibilities. The 'tasksAvailable' - * variable holds the number of threads that are _blocked - * waiting to enter the RTS_ (or soon will be). Equipped - * with that count, startTask() is able to make an informed - * decision on whether or not to create a new thread. - * - * Two functions control increments / decrements of - * 'tasksAvailable': - * - * - taskNotAvailable() : called whenever a task/thread - * has acquired the RTS lock, i.e., always called by - * a thread that holds the rts_mutex lock. - * - * - taskAvailable(): called whenever a task/thread - * is about to try to grab the RTS lock. The task manager - * and scheduler will only call this whenever it is - * in possession of the rts_mutex lock, i.e., - * - when a new task is created in startTask(). - * - when the scheduler gives up the RTS token to - * let threads waiting to return from an external - * call continue. - * - */ -void -taskNotAvailable() -{ - if (tasksAvailable > 0) { - tasksAvailable--; - } - return; -} + task->id = tid; -void -taskAvailable() -{ - tasksAvailable++; - return; + // ok, finished with the Task struct. + RELEASE_LOCK(&task->lock); } +#endif /* THREADED_RTS */ +#ifdef DEBUG -void -stopTaskManager () +static void *taskId(Task *task) { - -} +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; #endif +} +void printAllTasks(void); -nat -getTaskCount () +void +printAllTasks(void) { - return taskCount; -} + Task *task; + for (task = all_tasks; task != NULL; task = task->all_link) { + debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive"); + if (!task->stopped) { + if (task->cap) { + debugBelch("on capability %d, ", task->cap->no); + } + if (task->tso) { + debugBelch("bound to thread %d", task->tso->id); + } else { + debugBelch("worker"); + } + } + debugBelch("\n"); + } +} +#endif -#endif /* RTS_SUPPORTS_THREADS */