*
* -------------------------------------------------------------------------*/
+#include "PosixSource.h"
#include "Rts.h"
+
#include "RtsUtils.h"
-#include "OSThreads.h"
#include "Task.h"
#include "Capability.h"
#include "Stats.h"
-#include "RtsFlags.h"
-#include "Storage.h"
#include "Schedule.h"
#include "Hash.h"
#include "Trace.h"
#endif
// Task lists and global counters.
-// Locks required: sched_mutex.
+// Locks required: all_tasks_mutex.
Task *all_tasks = NULL;
-static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
-static nat tasksRunning;
-static nat workerCount;
+static int tasksInitialized = 0;
+
+static void freeTask (Task *task);
+static Task * allocTask (void);
+static Task * newTask (rtsBool);
+
+#if defined(THREADED_RTS)
+static Mutex all_tasks_mutex;
+#endif
/* -----------------------------------------------------------------------------
* Remembering the current thread's Task
// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
+# if defined(MYTASK_USE_TLV)
+__thread Task *my_task;
+# else
ThreadLocalKey currentTaskKey;
+# endif
#else
Task *my_task;
#endif
void
initTaskManager (void)
{
- static int initialized = 0;
-
- if (!initialized) {
+ if (!tasksInitialized) {
taskCount = 0;
- workerCount = 0;
- tasksRunning = 0;
- initialized = 1;
+ tasksInitialized = 1;
#if defined(THREADED_RTS)
+#if !defined(MYTASK_USE_TLV)
newThreadLocalKey(¤tTaskKey);
#endif
+ initMutex(&all_tasks_mutex);
+#endif
}
}
freeTaskManager (void)
{
Task *task, *next;
+ nat tasksRunning = 0;
- ASSERT_LOCK_HELD(&sched_mutex);
-
- debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
- tasksRunning);
+ ACQUIRE_LOCK(&all_tasks_mutex);
for (task = all_tasks; task != NULL; task = next) {
next = task->all_link;
if (task->stopped) {
- // We only free resources if the Task is not in use. A
- // Task may still be in use if we have a Haskell thread in
- // a foreign call while we are attempting to shut down the
- // RTS (see conc059).
-#if defined(THREADED_RTS)
- closeCondition(&task->cond);
- closeMutex(&task->lock);
-#endif
- stgFree(task);
+ freeTask(task);
+ } else {
+ tasksRunning++;
}
}
+
+ debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
+ tasksRunning);
+
all_tasks = NULL;
- task_free_list = NULL;
-#if defined(THREADED_RTS)
+
+ RELEASE_LOCK(&all_tasks_mutex);
+
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
+ closeMutex(&all_tasks_mutex);
freeThreadLocalKey(¤tTaskKey);
#endif
+ tasksInitialized = 0;
+
return tasksRunning;
}
+static Task *
+allocTask (void)
+{
+ Task *task;
+
+ task = myTask();
+ if (task != NULL) {
+ return task;
+ } else {
+ task = newTask(rtsFalse);
+#if defined(THREADED_RTS)
+ task->id = osThreadId();
+#endif
+ setMyTask(task);
+ return task;
+ }
+}
+
+static void
+freeTask (Task *task)
+{
+ InCall *incall, *next;
+
+ // We only free resources if the Task is not in use. A
+ // Task may still be in use if we have a Haskell thread in
+ // a foreign call while we are attempting to shut down the
+ // RTS (see conc059).
+#if defined(THREADED_RTS)
+ closeCondition(&task->cond);
+ closeMutex(&task->lock);
+#endif
+
+ for (incall = task->incall; incall != NULL; incall = next) {
+ next = incall->prev_stack;
+ stgFree(incall);
+ }
+ for (incall = task->spare_incalls; incall != NULL; incall = next) {
+ next = incall->next;
+ stgFree(incall);
+ }
+
+ stgFree(task);
+}
static Task*
-newTask (void)
+newTask (rtsBool worker)
{
#if defined(THREADED_RTS)
Ticks currentElapsedTime, currentUserTime;
#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
- task->cap = NULL;
- task->stopped = rtsFalse;
- task->suspended_tso = NULL;
- task->tso = NULL;
- task->stat = NoStatus;
- task->ret = NULL;
+ task->cap = NULL;
+ task->worker = worker;
+ task->stopped = rtsFalse;
+ task->running_finalizers = rtsFalse;
+ task->n_spare_incalls = 0;
+ task->spare_incalls = NULL;
+ task->incall = NULL;
#if defined(THREADED_RTS)
initCondition(&task->cond);
task->elapsedtimestart = currentElapsedTime;
#endif
- task->prev = NULL;
task->next = NULL;
- task->return_link = NULL;
+
+ ACQUIRE_LOCK(&all_tasks_mutex);
task->all_link = all_tasks;
all_tasks = task;
taskCount++;
+ RELEASE_LOCK(&all_tasks_mutex);
+
return task;
}
+// avoid the spare_incalls list growing unboundedly
+#define MAX_SPARE_INCALLS 8
+
+static void
+newInCall (Task *task)
+{
+ InCall *incall;
+
+ if (task->spare_incalls != NULL) {
+ incall = task->spare_incalls;
+ task->spare_incalls = incall->next;
+ task->n_spare_incalls--;
+ } else {
+ incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
+ }
+
+ incall->tso = NULL;
+ incall->task = task;
+ incall->suspended_tso = NULL;
+ incall->suspended_cap = NULL;
+ incall->stat = NoStatus;
+ incall->ret = NULL;
+ incall->next = NULL;
+ incall->prev = NULL;
+ incall->prev_stack = task->incall;
+ task->incall = incall;
+}
+
+static void
+endInCall (Task *task)
+{
+ InCall *incall;
+
+ incall = task->incall;
+ incall->tso = NULL;
+ task->incall = task->incall->prev_stack;
+
+ if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
+ stgFree(incall);
+ } else {
+ incall->next = task->spare_incalls;
+ task->spare_incalls = incall;
+ task->n_spare_incalls++;
+ }
+}
+
+
Task *
newBoundTask (void)
{
Task *task;
- ASSERT_LOCK_HELD(&sched_mutex);
- if (task_free_list == NULL) {
- task = newTask();
- } else {
- task = task_free_list;
- task_free_list = task->next;
- task->next = NULL;
- task->prev = NULL;
- task->stopped = rtsFalse;
+ if (!tasksInitialized) {
+ errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
+ stg_exit(EXIT_FAILURE);
}
-#if defined(THREADED_RTS)
- task->id = osThreadId();
-#endif
- ASSERT(task->cap == NULL);
- tasksRunning++;
+ task = allocTask();
+
+ task->stopped = rtsFalse;
- taskEnter(task);
+ newInCall(task);
debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
return task;
void
boundTaskExiting (Task *task)
{
- task->tso = NULL;
- task->stopped = rtsTrue;
- task->cap = NULL;
-
#if defined(THREADED_RTS)
ASSERT(osThreadId() == task->id);
#endif
ASSERT(myTask() == task);
- setMyTask(task->prev_stack);
- tasksRunning--;
+ endInCall(task);
- // sadly, we need a lock around the free task list. Todo: eliminate.
- ACQUIRE_LOCK(&sched_mutex);
- task->next = task_free_list;
- task_free_list = task;
- RELEASE_LOCK(&sched_mutex);
+ // Set task->stopped, but only if this is the last call (#4850).
+ // Remember that we might have a worker Task that makes a foreign
+ // call and then a callback, so it can transform into a bound
+ // Task for the duration of the callback.
+ if (task->incall == NULL) {
+ task->stopped = rtsTrue;
+ }
debugTrace(DEBUG_sched, "task exiting");
}
+
#ifdef THREADED_RTS
#define TASK_ID(t) (t)->id
#else
#endif
void
-discardTask (Task *task)
+discardTasksExcept (Task *keep)
{
- ASSERT_LOCK_HELD(&sched_mutex);
- if (!task->stopped) {
- debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
- task->cap = NULL;
- if (task->tso == NULL) {
- workerCount--;
- } else {
- task->tso = NULL;
+ Task *task, *next;
+
+ // Wipe the task list, except the current Task.
+ ACQUIRE_LOCK(&all_tasks_mutex);
+ for (task = all_tasks; task != NULL; task=next) {
+ next = task->all_link;
+ if (task != keep) {
+ debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
+ freeTask(task);
}
- task->stopped = rtsTrue;
- tasksRunning--;
- task->next = task_free_list;
- task_free_list = task;
}
+ all_tasks = keep;
+ keep->all_link = NULL;
+ RELEASE_LOCK(&all_tasks_mutex);
}
void
taskTimeStamp (Task *task USED_IF_THREADS)
{
#if defined(THREADED_RTS)
- Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
+ Ticks currentElapsedTime, currentUserTime;
currentUserTime = getThreadCPUTime();
currentElapsedTime = getProcessElapsedTime();
- // XXX this is wrong; we want elapsed GC time since the
- // Task started.
- elapsedGCTime = stat_getElapsedGCTime();
-
- task->mut_time =
+ task->mut_time =
currentUserTime - task->muttimestart - task->gc_time;
task->mut_etime =
- currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
+ currentElapsedTime - task->elapsedtimestart - task->gc_etime;
+ if (task->gc_time < 0) { task->gc_time = 0; }
+ if (task->gc_etime < 0) { task->gc_etime = 0; }
if (task->mut_time < 0) { task->mut_time = 0; }
if (task->mut_etime < 0) { task->mut_etime = 0; }
#endif
}
void
-workerTaskStop (Task *task)
+taskDoneGC (Task *task, Ticks cpu_time, Ticks elapsed_time)
{
+ task->gc_time += cpu_time;
+ task->gc_etime += elapsed_time;
+}
+
#if defined(THREADED_RTS)
+
+void
+workerTaskStop (Task *task)
+{
OSThreadId id;
id = osThreadId();
ASSERT(task->id == id);
ASSERT(myTask() == task);
-#endif
task->cap = NULL;
taskTimeStamp(task);
task->stopped = rtsTrue;
- tasksRunning--;
- workerCount--;
-
- ACQUIRE_LOCK(&sched_mutex);
- task->next = task_free_list;
- task_free_list = task;
- RELEASE_LOCK(&sched_mutex);
}
-void
-resetTaskManagerAfterFork (void)
+#endif
+
+#ifdef DEBUG
+
+static void *taskId(Task *task)
{
- // TODO!
- taskCount = 0;
+#ifdef THREADED_RTS
+ return (void *)task->id;
+#else
+ return (void *)task;
+#endif
}
+#endif
+
#if defined(THREADED_RTS)
+static void OSThreadProcAttr
+workerStart(Task *task)
+{
+ Capability *cap;
+
+ // See startWorkerTask().
+ ACQUIRE_LOCK(&task->lock);
+ cap = task->cap;
+ RELEASE_LOCK(&task->lock);
+
+ if (RtsFlags.ParFlags.setAffinity) {
+ setThreadAffinity(cap->no, n_capabilities);
+ }
+
+ // set the thread-local pointer to the Task:
+ setMyTask(task);
+
+ newInCall(task);
+
+ scheduleWorker(cap,task);
+}
+
void
-startWorkerTask (Capability *cap,
- void OSThreadProcAttr (*taskStart)(Task *task))
+startWorkerTask (Capability *cap)
{
int r;
OSThreadId tid;
Task *task;
- workerCount++;
-
// A worker always gets a fresh Task structure.
- task = newTask();
-
- tasksRunning++;
+ task = newTask(rtsTrue);
// The lock here is to synchronise with taskStart(), to make sure
// that we have finished setting up the Task structure before the
ASSERT_LOCK_HELD(&cap->lock);
cap->running_task = task;
- r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+ r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
if (r != 0) {
sysErrorBelch("failed to create OS thread");
stg_exit(EXIT_FAILURE);
RELEASE_LOCK(&task->lock);
}
+void
+interruptWorkerTask (Task *task)
+{
+ ASSERT(osThreadId() != task->id); // seppuku not allowed
+ ASSERT(task->incall->suspended_tso); // use this only for FFI calls
+ interruptOSThread(task->id);
+ debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
+}
+
#endif /* THREADED_RTS */
#ifdef DEBUG
-static void *taskId(Task *task)
-{
-#ifdef THREADED_RTS
- return (void *)task->id;
-#else
- return (void *)task;
-#endif
-}
-
void printAllTasks(void);
void
if (task->cap) {
debugBelch("on capability %d, ", task->cap->no);
}
- if (task->tso) {
- debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
+ if (task->incall->tso) {
+ debugBelch("bound to thread %lu",
+ (unsigned long)task->incall->tso->id);
} else {
debugBelch("worker");
}