/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team 2001-
+ * (c) The GHC Team 2001-2005
*
* The task manager subsystem. Tasks execute STG code, with this
* module providing the API which the Scheduler uses to control their
* creation and destruction.
- *
- * Two kinds of RTS builds uses 'tasks' - the SMP and the
- * 'native thread-friendly' builds.
*
- * The SMP build lets multiple tasks concurrently execute STG code,
- * all sharing vital internal RTS data structures in a controlled manner
- * (see details elsewhere...ToDo: fill in ref!)
- *
- * The 'threads' build has at any one time only one task executing STG
- * code, other tasks are either busy executing code outside the RTS
- * (e.g., a C call) or waiting for their turn to (again) evaluate some
- * STG code. A task relinquishes its RTS token when it is asked to
- * evaluate an external (C) call.
- *
* -------------------------------------------------------------------------*/
+
#include "Rts.h"
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Task.h"
+#include "Capability.h"
#include "Stats.h"
#include "RtsFlags.h"
#include "Schedule.h"
+#include "Hash.h"
-/* There's not all that much code that is shared between the
- * SMP and threads version of the 'task manager.' A sign
- * that the code ought to be structured differently..(Maybe ToDo).
- */
-
-/*
- * The following Task Manager-local variables are assumed to be
- * accessed with the RTS lock in hand.
- */
-#if defined(SMP)
-static TaskInfo* taskTable;
+#if HAVE_SIGNAL_H
+#include <signal.h>
#endif
-/* upper bound / the number of tasks created. */
-static nat maxTasks;
-/* number of tasks currently created */
-static nat taskCount;
-#if defined(SMP)
+// Task lists and global counters.
+// Locks required: sched_mutex.
+Task *all_tasks = NULL;
+static Task *task_free_list = NULL; // singly-linked
+static nat taskCount;
+#define DEFAULT_MAX_WORKERS 64
+static nat maxWorkers; // we won't create more workers than this
+static nat tasksRunning;
+static nat workerCount;
+
+/* -----------------------------------------------------------------------------
+ * Remembering the current thread's Task
+ * -------------------------------------------------------------------------- */
+
+// A thread-local-storage key that we can use to get access to the
+// current thread's Task structure.
+#if defined(THREADED_RTS)
+ThreadLocalKey currentTaskKey;
+#else
+Task *my_task;
+#endif
+
+/* -----------------------------------------------------------------------------
+ * Rest of the Task API
+ * -------------------------------------------------------------------------- */
+
void
-startTaskManager( nat maxCount, void (*taskStart)(void) )
+initTaskManager (void)
{
- nat i;
- static int initialized = 0;
-
- if (!initialized) {
-
- taskCount = 0;
- maxTasks = maxCount;
- /* allocate table holding task metadata */
-
- if (maxCount > 0) {
- taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo),
- "startTaskManager:tasks");
-
- /* and eagerly create them all. */
- for (i = 0; i < maxCount; i++) {
- startTask(taskStart);
- taskCount++;
- }
+ static int initialized = 0;
+
+ if (!initialized) {
+ taskCount = 0;
+ workerCount = 0;
+ tasksRunning = 0;
+ maxWorkers = DEFAULT_MAX_WORKERS;
+ initialized = 1;
+#if defined(THREADED_RTS)
+ newThreadLocalKey(¤tTaskKey);
+#endif
}
- initialized = 1;
- }
}
+
void
-startTask ( void (*taskStart)(void) )
+stopTaskManager (void)
{
- int r;
- OSThreadId tid;
+ IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
+}
- r = createOSThread(&tid,taskStart);
- if (r != 0) {
- barf("startTask: Can't create new task");
- }
- taskTable[taskCount].id = tid;
- taskTable[taskCount].mut_time = 0.0;
- taskTable[taskCount].mut_etime = 0.0;
- taskTable[taskCount].gc_time = 0.0;
- taskTable[taskCount].gc_etime = 0.0;
- taskTable[taskCount].elapsedtimestart = stat_getElapsedTime();
+static Task*
+newTask (void)
+{
+#if defined(THREADED_RTS)
+ Ticks currentElapsedTime, currentUserTime;
+#endif
+ Task *task;
+
+ task = stgMallocBytes(sizeof(Task), "newTask");
+
+ task->cap = NULL;
+ task->stopped = rtsFalse;
+ task->suspended_tso = NULL;
+ task->tso = NULL;
+ task->stat = NoStatus;
+ task->ret = NULL;
+
+#if defined(THREADED_RTS)
+ initCondition(&task->cond);
+ initMutex(&task->lock);
+ task->wakeup = rtsFalse;
+#endif
+
+#if defined(THREADED_RTS)
+ currentUserTime = getThreadCPUTime();
+ currentElapsedTime = getProcessElapsedTime();
+ task->mut_time = 0.0;
+ task->mut_etime = 0.0;
+ task->gc_time = 0.0;
+ task->gc_etime = 0.0;
+ task->muttimestart = currentUserTime;
+ task->elapsedtimestart = currentElapsedTime;
+#endif
+
+ task->prev = NULL;
+ task->next = NULL;
+ task->return_link = NULL;
- IF_DEBUG(scheduler,debugBelch("scheduler: Started task: %ld\n",tid););
- return;
+ task->all_link = all_tasks;
+ all_tasks = task;
+
+ taskCount++;
+ workerCount++;
+
+ return task;
+}
+
+Task *
+newBoundTask (void)
+{
+ Task *task;
+
+ ASSERT_LOCK_HELD(&sched_mutex);
+ if (task_free_list == NULL) {
+ task = newTask();
+ } else {
+ task = task_free_list;
+ task_free_list = task->next;
+ task->next = NULL;
+ task->prev = NULL;
+ task->stopped = rtsFalse;
+ }
+#if defined(THREADED_RTS)
+ task->id = osThreadId();
+#endif
+ ASSERT(task->cap == NULL);
+
+ tasksRunning++;
+
+ taskEnter(task);
+
+ IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
+ return task;
}
void
-stopTaskManager ()
+boundTaskExiting (Task *task)
{
- nat i;
- OSThreadId tid = osThreadId();
-
- /* Don't want to use pthread_cancel, since we'd have to install
- * these silly exception handlers (pthread_cleanup_{push,pop}) around
- * all our locks.
- */
-#if 0
- /* Cancel all our tasks */
- for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
- pthread_cancel(taskTable[i].id);
- }
-
- /* Wait for all the tasks to terminate */
- for (i = 0; i < maxCount; i++) {
- IF_DEBUG(scheduler,debugBelch("scheduler: waiting for task %ld\n",
- taskTable[i].id));
- pthread_join(taskTable[i].id, NULL);
- }
+ task->stopped = rtsTrue;
+ task->cap = NULL;
+
+#if defined(THREADED_RTS)
+ ASSERT(osThreadId() == task->id);
#endif
+ ASSERT(myTask() == task);
+ setMyTask(task->prev_stack);
- /* Send 'em all a SIGHUP. That should shut 'em up. */
- await_death = maxCount - 1;
- for (i = 0; i < maxCount; i++) {
- /* don't cancel the thread running this piece of code. */
- if ( taskTable[i].id != tid ) {
- pthread_kill(taskTable[i].id,SIGTERM);
- }
- }
- while (await_death > 0) {
- sched_yield();
- }
-
- return;
+ tasksRunning--;
+
+ // sadly, we need a lock around the free task list. Todo: eliminate.
+ ACQUIRE_LOCK(&sched_mutex);
+ task->next = task_free_list;
+ task_free_list = task;
+ RELEASE_LOCK(&sched_mutex);
+
+ IF_DEBUG(scheduler,sched_belch("task exiting"));
}
void
-resetTaskManagerAfterFork ()
+discardTask (Task *task)
{
- barf("resetTaskManagerAfterFork not implemented for SMP");
+ ASSERT_LOCK_HELD(&sched_mutex);
+ task->stopped = rtsTrue;
+ task->cap = NULL;
+ task->next = task_free_list;
+ task_free_list = task;
}
-#else
-/************ THREADS version *****************/
+void
+taskStop (Task *task)
+{
+#if defined(THREADED_RTS)
+ OSThreadId id;
+ Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
+
+ id = osThreadId();
+ ASSERT(task->id == id);
+ ASSERT(myTask() == task);
+
+ currentUserTime = getThreadCPUTime();
+ currentElapsedTime = getProcessElapsedTime();
+
+ // XXX this is wrong; we want elapsed GC time since the
+ // Task started.
+ elapsedGCTime = stat_getElapsedGCTime();
+
+ task->mut_time =
+ currentUserTime - task->muttimestart - task->gc_time;
+ task->mut_etime =
+ currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
+
+ if (task->mut_time < 0.0) { task->mut_time = 0.0; }
+ if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
+#endif
+
+ task->stopped = rtsTrue;
+ tasksRunning--;
+}
void
-startTaskManager( nat maxCount,
- void (*taskStart)(void) STG_UNUSED )
+resetTaskManagerAfterFork (void)
{
- /* In the threaded case, maxCount is used to limit the
- the creation of worker tasks. Tasks are created lazily, i.e.,
- when the current task gives up the token on executing
- STG code.
- */
- maxTasks = maxCount;
- taskCount = 0;
+#warning TODO!
+ taskCount = 0;
}
-rtsBool
-startTask ( void (*taskStart)(void) )
+#if defined(THREADED_RTS)
+
+void
+startWorkerTask (Capability *cap,
+ void OSThreadProcAttr (*taskStart)(Task *task))
{
int r;
OSThreadId tid;
-
- /* If more than one worker thread is known to be blocked waiting
- on thread_ready_cond, don't create a new one.
- */
- if ( rts_n_waiting_tasks > 0) {
- IF_DEBUG(scheduler,debugBelch(
- "scheduler: startTask: %d tasks waiting, not creating new one.\n",
- rts_n_waiting_tasks););
- // the task will run as soon as a capability is available,
- // so there's no need to wake it.
- return rtsFalse;
- }
+ Task *task;
- /* If the task limit has been reached, just return. */
- if (maxTasks > 0 && taskCount == maxTasks) {
- IF_DEBUG(scheduler,debugBelch("scheduler: startTask: task limit (%d) reached, not creating new one.\n",maxTasks));
- return rtsFalse;
+ if (workerCount >= maxWorkers) {
+ barf("too many workers; runaway worker creation?");
}
-
+ workerCount++;
+
+ // A worker always gets a fresh Task structure.
+ task = newTask();
+
+ tasksRunning++;
- r = createOSThread(&tid,taskStart);
+ // The lock here is to synchronise with taskStart(), to make sure
+ // that we have finished setting up the Task structure before the
+ // worker thread reads it.
+ ACQUIRE_LOCK(&task->lock);
+
+ task->cap = cap;
+
+ // Give the capability directly to the worker; we can't let anyone
+ // else get in, because the new worker Task has nowhere to go to
+ // sleep so that it could be woken up again.
+ ASSERT_LOCK_HELD(&cap->lock);
+ cap->running_task = task;
+
+ r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
if (r != 0) {
barf("startTask: Can't create new task");
}
- taskCount++;
- IF_DEBUG(scheduler,debugBelch("scheduler: startTask: new task %ld (total_count: %d; waiting: %d)\n", tid, taskCount, rts_n_waiting_tasks););
- return rtsTrue;
+ IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
+
+ task->id = tid;
+
+ // ok, finished with the Task struct.
+ RELEASE_LOCK(&task->lock);
}
+#endif /* THREADED_RTS */
+#ifdef DEBUG
-void
-stopTaskManager ()
+static void *taskId(Task *task)
{
-
+#ifdef THREADED_RTS
+ return (void *)task->id;
+#else
+ return (void *)task;
+#endif
}
+void printAllTasks(void);
+
void
-resetTaskManagerAfterFork ( void )
+printAllTasks(void)
{
- rts_n_waiting_tasks = 0;
- taskCount = 0;
-}
-#endif
+ Task *task;
+ for (task = all_tasks; task != NULL; task = task->all_link) {
+ debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
+ if (!task->stopped) {
+ if (task->cap) {
+ debugBelch("on capability %d, ", task->cap->no);
+ }
+ if (task->tso) {
+ debugBelch("bound to thread %d", task->tso->id);
+ } else {
+ debugBelch("worker");
+ }
+ }
+ debugBelch("\n");
+ }
+}
+#endif
-#endif /* RTS_SUPPORTS_THREADS */