X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FTask.c;h=89db782800522bd8412e64d539c5d89d8c3cbeaf;hb=bbe90cbe14b94899efe0ce24e0c5fdbdb8d40ada;hp=76ea8913bba68c5878b1ed3fc538b6d3d1ab1597;hpb=9a92cb1ca49cb555ff66dcfcb9295ebf75d1ce01;p=ghc-hetmet.git diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index 76ea891..89db782 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -9,33 +9,44 @@ * -------------------------------------------------------------------------*/ #include "Rts.h" -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ #include "RtsUtils.h" #include "OSThreads.h" #include "Task.h" +#include "Capability.h" #include "Stats.h" #include "RtsFlags.h" #include "Schedule.h" #include "Hash.h" -#include "Capability.h" #if HAVE_SIGNAL_H #include #endif -#define INIT_TASK_TABLE_SIZE 16 - -TaskInfo* taskTable; -static nat taskTableSize; - -HashTable *taskHash; // maps OSThreadID to TaskInfo* - -nat taskCount; +// Task lists and global counters. +// Locks required: sched_mutex. +Task *all_tasks = NULL; +static Task *task_free_list = NULL; // singly-linked +static nat taskCount; +#define DEFAULT_MAX_WORKERS 64 +static nat maxWorkers; // we won't create more workers than this static nat tasksRunning; static nat workerCount; -#define DEFAULT_MAX_WORKERS 64 -nat maxWorkers; // we won't create more workers than this +/* ----------------------------------------------------------------------------- + * Remembering the current thread's Task + * -------------------------------------------------------------------------- */ + +// A thread-local-storage key that we can use to get access to the +// current thread's Task structure. +#if defined(THREADED_RTS) +ThreadLocalKey currentTaskKey; +#else +Task *my_task; +#endif + +/* ----------------------------------------------------------------------------- + * Rest of the Task API + * -------------------------------------------------------------------------- */ void initTaskManager (void) @@ -43,207 +54,262 @@ initTaskManager (void) static int initialized = 0; if (!initialized) { - taskTableSize = INIT_TASK_TABLE_SIZE; - taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo), - "initTaskManager"); - taskCount = 0; workerCount = 0; tasksRunning = 0; - - taskHash = allocHashTable(); - maxWorkers = DEFAULT_MAX_WORKERS; - initialized = 1; +#if defined(THREADED_RTS) + newThreadLocalKey(¤tTaskKey); +#endif } } -static void -expandTaskTable (void) -{ - taskTableSize *= 2; - taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo), - "expandTaskTable"); -} void stopTaskManager (void) { - nat i; - IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning)); - for (i = 1000; i > 0; i--) { - if (tasksRunning == 0) { - IF_DEBUG(scheduler, sched_belch("all tasks stopped")); - return; - } - prodWorker(); - yieldThread(); - } - IF_DEBUG(scheduler, sched_belch("%d tasks still running, exiting anyway", tasksRunning)); - - /* - OLD CODE follows: - */ -#if old_code - /* Send 'em all a SIGHUP. That should shut 'em up. */ - awaitDeath = taskCount==0 ? 0 : taskCount-1; - for (i = 0; i < taskCount; i++) { - /* don't cancel the thread running this piece of code. */ - if ( taskTable[i].id != tid ) { - pthread_kill(taskTable[i].id,SIGTERM); - } - } - while (awaitDeath > 0) { - sched_yield(); - } -#endif // old_code } -rtsBool -startTasks (nat num, void (*taskStart)(void)) -{ - nat i; - for (i = 0; i < num; i++) { - if (!startTask(taskStart)) { - return rtsFalse; - } - } - return rtsTrue; -} - -static TaskInfo* -newTask (OSThreadId id, rtsBool is_worker) +static Task* +newTask (void) { - long currentElapsedTime, currentUserTime, elapsedGCTime; - TaskInfo *task_info; +#if defined(THREADED_RTS) + Ticks currentElapsedTime, currentUserTime; +#endif + Task *task; - if (taskCount >= taskTableSize) { - expandTaskTable(); - } - - insertHashTable( taskHash, id, &(taskTable[taskCount]) ); - - stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); - - task_info = &taskTable[taskCount]; + task = stgMallocBytes(sizeof(Task), "newTask"); - task_info->id = id; - task_info->is_worker = is_worker; - task_info->stopped = rtsFalse; - task_info->mut_time = 0.0; - task_info->mut_etime = 0.0; - task_info->gc_time = 0.0; - task_info->gc_etime = 0.0; - task_info->muttimestart = currentUserTime; - task_info->elapsedtimestart = currentElapsedTime; + task->cap = NULL; + task->stopped = rtsFalse; + task->suspended_tso = NULL; + task->tso = NULL; + task->stat = NoStatus; + task->ret = NULL; +#if defined(THREADED_RTS) + initCondition(&task->cond); + initMutex(&task->lock); + task->wakeup = rtsFalse; +#endif + +#if defined(THREADED_RTS) + currentUserTime = getThreadCPUTime(); + currentElapsedTime = getProcessElapsedTime(); + task->mut_time = 0.0; + task->mut_etime = 0.0; + task->gc_time = 0.0; + task->gc_etime = 0.0; + task->muttimestart = currentUserTime; + task->elapsedtimestart = currentElapsedTime; +#endif + + task->prev = NULL; + task->next = NULL; + task->return_link = NULL; + + task->all_link = all_tasks; + all_tasks = task; + taskCount++; workerCount++; - tasksRunning++; - IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks);); - - return task_info; + return task; } -rtsBool -startTask (void (*taskStart)(void)) +Task * +newBoundTask (void) { - int r; - OSThreadId tid; + Task *task; + + ASSERT_LOCK_HELD(&sched_mutex); + if (task_free_list == NULL) { + task = newTask(); + } else { + task = task_free_list; + task_free_list = task->next; + task->next = NULL; + task->prev = NULL; + task->stopped = rtsFalse; + } +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + ASSERT(task->cap == NULL); - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTask: Can't create new task"); - } - newTask (tid, rtsTrue); - return rtsTrue; + tasksRunning++; + + taskEnter(task); + + IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount);); + return task; } -TaskInfo * -threadIsTask (OSThreadId id) +void +boundTaskExiting (Task *task) { - TaskInfo *task_info; - - task_info = lookupHashTable(taskHash, id); - if (task_info != NULL) { - if (task_info->stopped) { - task_info->stopped = rtsFalse; - } - return task_info; - } + task->stopped = rtsTrue; + task->cap = NULL; + +#if defined(THREADED_RTS) + ASSERT(osThreadId() == task->id); +#endif + ASSERT(myTask() == task); + setMyTask(task->prev_stack); + + tasksRunning--; - return newTask(id, rtsFalse); + // sadly, we need a lock around the free task list. Todo: eliminate. + ACQUIRE_LOCK(&sched_mutex); + task->next = task_free_list; + task_free_list = task; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler,sched_belch("task exiting")); } -TaskInfo * -taskOfId (OSThreadId id) +void +discardTask (Task *task) { - return lookupHashTable(taskHash, id); + ASSERT_LOCK_HELD(&sched_mutex); + if (!task->stopped) { + IF_DEBUG(scheduler,sched_belch("discarding task %p", +#ifdef THREADED_RTS + (void *)task->id +#else + (void *)task +#endif + )); + task->cap = NULL; + task->tso = NULL; + task->stopped = rtsTrue; + tasksRunning--; + task->next = task_free_list; + task_free_list = task; + } } void -taskStop (void) +taskStop (Task *task) { +#if defined(THREADED_RTS) OSThreadId id; - long currentElapsedTime, currentUserTime, elapsedGCTime; - TaskInfo *task_info; + Ticks currentElapsedTime, currentUserTime, elapsedGCTime; id = osThreadId(); - task_info = taskOfId(id); - if (task_info == NULL) { - debugBelch("taskStop: not a task"); - return; - } - ASSERT(task_info->id == id); + ASSERT(task->id == id); + ASSERT(myTask() == task); - task_info->stopped = rtsTrue; - tasksRunning--; + currentUserTime = getThreadCPUTime(); + currentElapsedTime = getProcessElapsedTime(); - stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); + // XXX this is wrong; we want elapsed GC time since the + // Task started. + elapsedGCTime = stat_getElapsedGCTime(); - task_info->mut_time = - currentUserTime - task_info->muttimestart - task_info->gc_time; - task_info->mut_etime = - currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime; + task->mut_time = + currentUserTime - task->muttimestart - task->gc_time; + task->mut_etime = + currentElapsedTime - task->elapsedtimestart - elapsedGCTime; - if (task_info->mut_time < 0.0) { task_info->mut_time = 0.0; } - if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; } + if (task->mut_time < 0.0) { task->mut_time = 0.0; } + if (task->mut_etime < 0.0) { task->mut_etime = 0.0; } +#endif + + task->stopped = rtsTrue; + tasksRunning--; } void resetTaskManagerAfterFork (void) { - rts_n_waiting_tasks = 0; +#warning TODO! taskCount = 0; } -rtsBool -maybeStartNewWorker (void (*taskStart)(void)) +#if defined(THREADED_RTS) + +void +startWorkerTask (Capability *cap, + void OSThreadProcAttr (*taskStart)(Task *task)) { - /* - * If more than one worker thread is known to be blocked waiting - * on thread_ready_cond, don't create a new one. - */ - if ( rts_n_waiting_tasks > 0) { - IF_DEBUG(scheduler,sched_belch( - "startTask: %d tasks waiting, not creating new one", - rts_n_waiting_tasks);); - // the task will run as soon as a capability is available, - // so there's no need to wake it. - return rtsFalse; - } - - /* If the task limit has been reached, just return. */ - if (maxWorkers > 0 && workerCount >= maxWorkers) { - IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers)); - return rtsFalse; - } - - return startTask(taskStart); + int r; + OSThreadId tid; + Task *task; + + if (workerCount >= maxWorkers) { + barf("too many workers; runaway worker creation?"); + } + workerCount++; + + // A worker always gets a fresh Task structure. + task = newTask(); + + tasksRunning++; + + // The lock here is to synchronise with taskStart(), to make sure + // that we have finished setting up the Task structure before the + // worker thread reads it. + ACQUIRE_LOCK(&task->lock); + + task->cap = cap; + + // Give the capability directly to the worker; we can't let anyone + // else get in, because the new worker Task has nowhere to go to + // sleep so that it could be woken up again. + ASSERT_LOCK_HELD(&cap->lock); + cap->running_task = task; + + r = createOSThread(&tid, (OSThreadProc *)taskStart, task); + if (r != 0) { + barf("startTask: Can't create new task"); + } + + IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount);); + + task->id = tid; + + // ok, finished with the Task struct. + RELEASE_LOCK(&task->lock); +} + +#endif /* THREADED_RTS */ + +#ifdef DEBUG + +static void *taskId(Task *task) +{ +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; +#endif } -#endif /* RTS_SUPPORTS_THREADS */ +void printAllTasks(void); + +void +printAllTasks(void) +{ + Task *task; + for (task = all_tasks; task != NULL; task = task->all_link) { + debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive"); + if (!task->stopped) { + if (task->cap) { + debugBelch("on capability %d, ", task->cap->no); + } + if (task->tso) { + debugBelch("bound to thread %d", task->tso->id); + } else { + debugBelch("worker"); + } + } + debugBelch("\n"); + } +} + +#endif +