* -------------------------------------------------------------------------*/
#include "Rts.h"
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Task.h"
+#include "Capability.h"
#include "Stats.h"
#include "RtsFlags.h"
#include "Schedule.h"
#include "Hash.h"
-#include "Capability.h"
#if HAVE_SIGNAL_H
#include <signal.h>
#endif
-#define INIT_TASK_TABLE_SIZE 16
-
-TaskInfo* taskTable;
-static nat taskTableSize;
-
-// maps OSThreadID to TaskInfo*
-HashTable *taskHash;
-
-nat taskCount;
+// Task lists and global counters.
+// Locks required: sched_mutex.
+Task *all_tasks = NULL;
+static Task *task_free_list = NULL; // singly-linked
+static nat taskCount;
+#define DEFAULT_MAX_WORKERS 64
+static nat maxWorkers; // we won't create more workers than this
static nat tasksRunning;
static nat workerCount;
-#define DEFAULT_MAX_WORKERS 64
-nat maxWorkers; // we won't create more workers than this
+/* -----------------------------------------------------------------------------
+ * Remembering the current thread's Task
+ * -------------------------------------------------------------------------- */
+
+// A thread-local-storage key that we can use to get access to the
+// current thread's Task structure.
+#if defined(THREADED_RTS)
+ThreadLocalKey currentTaskKey;
+#else
+Task *my_task;
+#endif
+
+/* -----------------------------------------------------------------------------
+ * Rest of the Task API
+ * -------------------------------------------------------------------------- */
void
initTaskManager (void)
static int initialized = 0;
if (!initialized) {
-#if defined(SMP)
- taskTableSize = stg_max(INIT_TASK_TABLE_SIZE,
- RtsFlags.ParFlags.nNodes * 2);
-#else
- taskTableSize = INIT_TASK_TABLE_SIZE;
-#endif
- taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
- "initTaskManager");
-
taskCount = 0;
workerCount = 0;
tasksRunning = 0;
-
- taskHash = allocHashTable();
-
maxWorkers = DEFAULT_MAX_WORKERS;
-
initialized = 1;
+#if defined(THREADED_RTS)
+ newThreadLocalKey(¤tTaskKey);
+#endif
}
}
-static void
-expandTaskTable (void)
-{
- nat i;
-
- taskTableSize *= 2;
- taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
- "expandTaskTable");
-
- /* Have to update the hash table now... */
- for (i = 0; i < taskCount; i++) {
- removeHashTable( taskHash, taskTable[i].id, NULL );
- insertHashTable( taskHash, taskTable[i].id, &taskTable[i] );
- }
-}
void
stopTaskManager (void)
}
-rtsBool
-startTasks (nat num, void (*taskStart)(void))
+static Task*
+newTask (void)
{
- nat i;
- for (i = 0; i < num; i++) {
- if (!startTask(taskStart)) {
- return rtsFalse;
- }
- }
- return rtsTrue;
-}
-
-static TaskInfo*
-newTask (OSThreadId id, rtsBool is_worker)
-{
- long currentElapsedTime, currentUserTime, elapsedGCTime;
- TaskInfo *task_info;
+#if defined(THREADED_RTS)
+ Ticks currentElapsedTime, currentUserTime;
+#endif
+ Task *task;
- if (taskCount >= taskTableSize) {
- expandTaskTable();
- }
-
- insertHashTable( taskHash, id, &(taskTable[taskCount]) );
-
- stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime);
+ task = stgMallocBytes(sizeof(Task), "newTask");
- task_info = &taskTable[taskCount];
-
- task_info->id = id;
- task_info->is_worker = is_worker;
- task_info->stopped = rtsFalse;
- task_info->mut_time = 0.0;
- task_info->mut_etime = 0.0;
- task_info->gc_time = 0.0;
- task_info->gc_etime = 0.0;
- task_info->muttimestart = currentUserTime;
- task_info->elapsedtimestart = currentElapsedTime;
+ task->cap = NULL;
+ task->stopped = rtsFalse;
+ task->suspended_tso = NULL;
+ task->tso = NULL;
+ task->stat = NoStatus;
+ task->ret = NULL;
+#if defined(THREADED_RTS)
+ initCondition(&task->cond);
+ initMutex(&task->lock);
+ task->wakeup = rtsFalse;
+#endif
+
+#if defined(THREADED_RTS)
+ currentUserTime = getThreadCPUTime();
+ currentElapsedTime = getProcessElapsedTime();
+ task->mut_time = 0.0;
+ task->mut_etime = 0.0;
+ task->gc_time = 0.0;
+ task->gc_etime = 0.0;
+ task->muttimestart = currentUserTime;
+ task->elapsedtimestart = currentElapsedTime;
+#endif
+
+ task->prev = NULL;
+ task->next = NULL;
+ task->return_link = NULL;
+
+ task->all_link = all_tasks;
+ all_tasks = task;
+
taskCount++;
workerCount++;
- tasksRunning++;
- IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks););
-
- return task_info;
+ return task;
}
-rtsBool
-startTask (void (*taskStart)(void))
+Task *
+newBoundTask (void)
{
- int r;
- OSThreadId tid;
+ Task *task;
+
+ ASSERT_LOCK_HELD(&sched_mutex);
+ if (task_free_list == NULL) {
+ task = newTask();
+ } else {
+ task = task_free_list;
+ task_free_list = task->next;
+ task->next = NULL;
+ task->prev = NULL;
+ task->stopped = rtsFalse;
+ }
+#if defined(THREADED_RTS)
+ task->id = osThreadId();
+#endif
+ ASSERT(task->cap == NULL);
- r = createOSThread(&tid,taskStart);
- if (r != 0) {
- barf("startTask: Can't create new task");
- }
- newTask (tid, rtsTrue);
- return rtsTrue;
+ tasksRunning++;
+
+ taskEnter(task);
+
+ IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
+ return task;
}
-TaskInfo *
-threadIsTask (OSThreadId id)
+void
+boundTaskExiting (Task *task)
{
- TaskInfo *task_info;
-
- task_info = lookupHashTable(taskHash, id);
- if (task_info != NULL) {
- if (task_info->stopped) {
- task_info->stopped = rtsFalse;
- }
- return task_info;
- }
+ task->stopped = rtsTrue;
+ task->cap = NULL;
+
+#if defined(THREADED_RTS)
+ ASSERT(osThreadId() == task->id);
+#endif
+ ASSERT(myTask() == task);
+ setMyTask(task->prev_stack);
- return newTask(id, rtsFalse);
+ tasksRunning--;
+
+ // sadly, we need a lock around the free task list. Todo: eliminate.
+ ACQUIRE_LOCK(&sched_mutex);
+ task->next = task_free_list;
+ task_free_list = task;
+ RELEASE_LOCK(&sched_mutex);
+
+ IF_DEBUG(scheduler,sched_belch("task exiting"));
}
-TaskInfo *
-taskOfId (OSThreadId id)
+void
+discardTask (Task *task)
{
- return lookupHashTable(taskHash, id);
+ ASSERT_LOCK_HELD(&sched_mutex);
+ task->stopped = rtsTrue;
+ task->cap = NULL;
+ task->next = task_free_list;
+ task_free_list = task;
}
void
-taskStop (void)
+taskStop (Task *task)
{
+#if defined(THREADED_RTS)
OSThreadId id;
- long currentElapsedTime, currentUserTime, elapsedGCTime;
- TaskInfo *task_info;
+ Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
id = osThreadId();
- task_info = taskOfId(id);
- if (task_info == NULL) {
- debugBelch("taskStop: not a task");
- return;
- }
- ASSERT(task_info->id == id);
+ ASSERT(task->id == id);
+ ASSERT(myTask() == task);
+
+ currentUserTime = getThreadCPUTime();
+ currentElapsedTime = getProcessElapsedTime();
- stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime);
+ // XXX this is wrong; we want elapsed GC time since the
+ // Task started.
+ elapsedGCTime = stat_getElapsedGCTime();
- task_info->mut_time =
- currentUserTime - task_info->muttimestart - task_info->gc_time;
- task_info->mut_etime =
- currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime;
+ task->mut_time =
+ currentUserTime - task->muttimestart - task->gc_time;
+ task->mut_etime =
+ currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
- if (task_info->mut_time < 0.0) { task_info->mut_time = 0.0; }
- if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; }
+ if (task->mut_time < 0.0) { task->mut_time = 0.0; }
+ if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
+#endif
- task_info->stopped = rtsTrue;
+ task->stopped = rtsTrue;
tasksRunning--;
}
void
resetTaskManagerAfterFork (void)
{
- rts_n_waiting_tasks = 0;
+#warning TODO!
taskCount = 0;
}
-rtsBool
-maybeStartNewWorker (void (*taskStart)(void))
+#if defined(THREADED_RTS)
+
+void
+startWorkerTask (Capability *cap,
+ void OSThreadProcAttr (*taskStart)(Task *task))
{
- /*
- * If more than one worker thread is known to be blocked waiting
- * on thread_ready_cond, don't create a new one.
- */
- if ( rts_n_waiting_tasks > 0) {
- IF_DEBUG(scheduler,sched_belch(
- "startTask: %d tasks waiting, not creating new one",
- rts_n_waiting_tasks););
- // the task will run as soon as a capability is available,
- // so there's no need to wake it.
- return rtsFalse;
- }
-
- /* If the task limit has been reached, just return. */
- if (maxWorkers > 0 && workerCount >= maxWorkers) {
- IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers));
- return rtsFalse;
- }
-
- return startTask(taskStart);
+ int r;
+ OSThreadId tid;
+ Task *task;
+
+ if (workerCount >= maxWorkers) {
+ barf("too many workers; runaway worker creation?");
+ }
+ workerCount++;
+
+ // A worker always gets a fresh Task structure.
+ task = newTask();
+
+ tasksRunning++;
+
+ // The lock here is to synchronise with taskStart(), to make sure
+ // that we have finished setting up the Task structure before the
+ // worker thread reads it.
+ ACQUIRE_LOCK(&task->lock);
+
+ task->cap = cap;
+
+ // Give the capability directly to the worker; we can't let anyone
+ // else get in, because the new worker Task has nowhere to go to
+ // sleep so that it could be woken up again.
+ ASSERT_LOCK_HELD(&cap->lock);
+ cap->running_task = task;
+
+ r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+ if (r != 0) {
+ barf("startTask: Can't create new task");
+ }
+
+ IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
+
+ task->id = tid;
+
+ // ok, finished with the Task struct.
+ RELEASE_LOCK(&task->lock);
}
-#endif /* RTS_SUPPORTS_THREADS */
+#endif /* THREADED_RTS */
+
+#ifdef DEBUG
+
+static void *taskId(Task *task)
+{
+#ifdef THREADED_RTS
+ return (void *)task->id;
+#else
+ return (void *)task;
+#endif
+}
+
+void printAllTasks(void);
+
+void
+printAllTasks(void)
+{
+ Task *task;
+ for (task = all_tasks; task != NULL; task = task->all_link) {
+ debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
+ if (!task->stopped) {
+ if (task->cap) {
+ debugBelch("on capability %d, ", task->cap->no);
+ }
+ if (task->tso) {
+ debugBelch("bound to thread %d", task->tso->id);
+ } else {
+ debugBelch("worker");
+ }
+ }
+ debugBelch("\n");
+ }
+}
+
+#endif
+