fix haddock submodule pointer
[ghc-hetmet.git] / rts / Task.c
index dcfa5b5..e77a030 100644 (file)
@@ -8,13 +8,13 @@
  * 
  * -------------------------------------------------------------------------*/
 
+#include "PosixSource.h"
 #include "Rts.h"
+
 #include "RtsUtils.h"
-#include "OSThreads.h"
 #include "Task.h"
 #include "Capability.h"
 #include "Stats.h"
-#include "RtsFlags.h"
 #include "Schedule.h"
 #include "Hash.h"
 #include "Trace.h"
 #endif
 
 // Task lists and global counters.
-// Locks required: sched_mutex.
+// Locks required: all_tasks_mutex.
 Task *all_tasks = NULL;
-static Task *task_free_list = NULL; // singly-linked
 static nat taskCount;
-static nat tasksRunning;
-static nat workerCount;
+static int tasksInitialized = 0;
+
+static void   freeTask  (Task *task);
+static Task * allocTask (void);
+static Task * newTask   (rtsBool);
+
+#if defined(THREADED_RTS)
+static Mutex all_tasks_mutex;
+#endif
 
 /* -----------------------------------------------------------------------------
  * Remembering the current thread's Task
@@ -38,7 +44,11 @@ static nat workerCount;
 // A thread-local-storage key that we can use to get access to the
 // current thread's Task structure.
 #if defined(THREADED_RTS)
+# if defined(MYTASK_USE_TLV)
+__thread Task *my_task;
+# else
 ThreadLocalKey currentTaskKey;
+# endif
 #else
 Task *my_task;
 #endif
@@ -50,55 +60,114 @@ Task *my_task;
 void
 initTaskManager (void)
 {
-    static int initialized = 0;
-
-    if (!initialized) {
+    if (!tasksInitialized) {
        taskCount = 0;
-       workerCount = 0;
-       tasksRunning = 0;
-       initialized = 1;
+       tasksInitialized = 1;
 #if defined(THREADED_RTS)
+#if !defined(MYTASK_USE_TLV)
        newThreadLocalKey(&currentTaskKey);
 #endif
+        initMutex(&all_tasks_mutex);
+#endif
     }
 }
 
-
-void
-stopTaskManager (void)
+nat
+freeTaskManager (void)
 {
     Task *task, *next;
+    nat tasksRunning = 0;
 
-    debugTrace(DEBUG_sched, 
-              "stopping task manager, %d tasks still running",
-              tasksRunning);
+    ACQUIRE_LOCK(&all_tasks_mutex);
 
-    ACQUIRE_LOCK(&sched_mutex);
-    for (task = task_free_list; task != NULL; task = next) {
-        next = task->next;
-        stgFree(task);
+    for (task = all_tasks; task != NULL; task = next) {
+        next = task->all_link;
+        if (task->stopped) {
+            freeTask(task);
+        } else {
+            tasksRunning++;
+        }
+    }
+
+    debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
+               tasksRunning);
+
+    all_tasks = NULL;
+
+    RELEASE_LOCK(&all_tasks_mutex);
+
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
+    closeMutex(&all_tasks_mutex); 
+    freeThreadLocalKey(&currentTaskKey);
+#endif
+
+    tasksInitialized = 0;
+
+    return tasksRunning;
+}
+
+static Task *
+allocTask (void)
+{
+    Task *task;
+
+    task = myTask();
+    if (task != NULL) {
+        return task;
+    } else {
+        task = newTask(rtsFalse);
+#if defined(THREADED_RTS)
+        task->id = osThreadId();
+#endif
+        setMyTask(task);
+        return task;
     }
-    task_free_list = NULL;
-    RELEASE_LOCK(&sched_mutex);
 }
 
+static void
+freeTask (Task *task)
+{
+    InCall *incall, *next;
+
+    // We only free resources if the Task is not in use.  A
+    // Task may still be in use if we have a Haskell thread in
+    // a foreign call while we are attempting to shut down the
+    // RTS (see conc059).
+#if defined(THREADED_RTS)
+    closeCondition(&task->cond);
+    closeMutex(&task->lock);
+#endif
+
+    for (incall = task->incall; incall != NULL; incall = next) {
+        next = incall->prev_stack;
+        stgFree(incall);
+    }
+    for (incall = task->spare_incalls; incall != NULL; incall = next) {
+        next = incall->next;
+        stgFree(incall);
+    }
+
+    stgFree(task);
+}
 
 static Task*
-newTask (void)
+newTask (rtsBool worker)
 {
 #if defined(THREADED_RTS)
     Ticks currentElapsedTime, currentUserTime;
 #endif
     Task *task;
 
-    task = stgMallocBytes(sizeof(Task), "newTask");
+#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
+    task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
     
-    task->cap  = NULL;
-    task->stopped = rtsFalse;
-    task->suspended_tso = NULL;
-    task->tso  = NULL;
-    task->stat = NoStatus;
-    task->ret  = NULL;
+    task->cap           = NULL;
+    task->worker        = worker;
+    task->stopped       = rtsFalse;
+    task->running_finalizers = rtsFalse;
+    task->n_spare_incalls = 0;
+    task->spare_incalls = NULL;
+    task->incall        = NULL;
     
 #if defined(THREADED_RTS)
     initCondition(&task->cond);
@@ -117,42 +186,82 @@ newTask (void)
     task->elapsedtimestart = currentElapsedTime;
 #endif
 
-    task->prev = NULL;
     task->next = NULL;
-    task->return_link = NULL;
+
+    ACQUIRE_LOCK(&all_tasks_mutex);
 
     task->all_link = all_tasks;
     all_tasks = task;
 
     taskCount++;
-    workerCount++;
+
+    RELEASE_LOCK(&all_tasks_mutex);
 
     return task;
 }
 
+// avoid the spare_incalls list growing unboundedly
+#define MAX_SPARE_INCALLS 8
+
+static void
+newInCall (Task *task)
+{
+    InCall *incall;
+    
+    if (task->spare_incalls != NULL) {
+        incall = task->spare_incalls;
+        task->spare_incalls = incall->next;
+        task->n_spare_incalls--;
+    } else {
+        incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
+    }
+
+    incall->tso = NULL;
+    incall->task = task;
+    incall->suspended_tso = NULL;
+    incall->suspended_cap = NULL;
+    incall->stat          = NoStatus;
+    incall->ret           = NULL;
+    incall->next = NULL;
+    incall->prev = NULL;
+    incall->prev_stack = task->incall;
+    task->incall = incall;
+}
+
+static void
+endInCall (Task *task)
+{
+    InCall *incall;
+
+    incall = task->incall;
+    incall->tso = NULL;
+    task->incall = task->incall->prev_stack;
+
+    if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
+        stgFree(incall);
+    } else {
+        incall->next = task->spare_incalls;
+        task->spare_incalls = incall;
+        task->n_spare_incalls++;
+    }
+}
+
+
 Task *
 newBoundTask (void)
 {
     Task *task;
 
-    ASSERT_LOCK_HELD(&sched_mutex);
-    if (task_free_list == NULL) {
-       task = newTask();
-    } else {
-       task = task_free_list;
-       task_free_list = task->next;
-       task->next = NULL;
-       task->prev = NULL;
-       task->stopped = rtsFalse;
+    if (!tasksInitialized) {
+        errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
+        stg_exit(EXIT_FAILURE);
     }
-#if defined(THREADED_RTS)
-    task->id = osThreadId();
-#endif
-    ASSERT(task->cap == NULL);
 
-    tasksRunning++;
+    task = allocTask();
 
-    taskEnter(task);
+    task->stopped = rtsFalse;
+
+    newInCall(task);
 
     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
     return task;
@@ -161,26 +270,25 @@ newBoundTask (void)
 void
 boundTaskExiting (Task *task)
 {
-    task->stopped = rtsTrue;
-    task->cap = NULL;
-
 #if defined(THREADED_RTS)
     ASSERT(osThreadId() == task->id);
 #endif
     ASSERT(myTask() == task);
-    setMyTask(task->prev_stack);
 
-    tasksRunning--;
+    endInCall(task);
 
-    // sadly, we need a lock around the free task list. Todo: eliminate.
-    ACQUIRE_LOCK(&sched_mutex);
-    task->next = task_free_list;
-    task_free_list = task;
-    RELEASE_LOCK(&sched_mutex);
+    // Set task->stopped, but only if this is the last call (#4850).
+    // Remember that we might have a worker Task that makes a foreign
+    // call and then a callback, so it can transform into a bound
+    // Task for the duration of the callback.
+    if (task->incall == NULL) {
+        task->stopped = rtsTrue;
+    }
 
     debugTrace(DEBUG_sched, "task exiting");
 }
 
+
 #ifdef THREADED_RTS
 #define TASK_ID(t) (t)->id
 #else
@@ -188,81 +296,115 @@ boundTaskExiting (Task *task)
 #endif
 
 void
-discardTask (Task *task)
+discardTasksExcept (Task *keep)
 {
-    ASSERT_LOCK_HELD(&sched_mutex);
-    if (!task->stopped) {
-       debugTrace(DEBUG_sched, "discarding task %ld", TASK_ID(task));
-       task->cap = NULL;
-       task->tso = NULL;
-       task->stopped = rtsTrue;
-       tasksRunning--;
-       task->next = task_free_list;
-       task_free_list = task;
+    Task *task, *next;
+
+    // Wipe the task list, except the current Task.
+    ACQUIRE_LOCK(&all_tasks_mutex);
+    for (task = all_tasks; task != NULL; task=next) {
+        next = task->all_link;
+        if (task != keep) {
+            debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
+            freeTask(task);
+        }
     }
+    all_tasks = keep;
+    keep->all_link = NULL;
+    RELEASE_LOCK(&all_tasks_mutex);
 }
 
 void
 taskTimeStamp (Task *task USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
-    Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
+    Ticks currentElapsedTime, currentUserTime;
 
     currentUserTime = getThreadCPUTime();
     currentElapsedTime = getProcessElapsedTime();
 
-    // XXX this is wrong; we want elapsed GC time since the
-    // Task started.
-    elapsedGCTime = stat_getElapsedGCTime();
-    
-    task->mut_time = 
+    task->mut_time =
        currentUserTime - task->muttimestart - task->gc_time;
     task->mut_etime = 
-       currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
+        currentElapsedTime - task->elapsedtimestart - task->gc_etime;
 
+    if (task->gc_time   < 0) { task->gc_time   = 0; }
+    if (task->gc_etime  < 0) { task->gc_etime  = 0; }
     if (task->mut_time  < 0) { task->mut_time  = 0; }
     if (task->mut_etime < 0) { task->mut_etime = 0; }
 #endif
 }
 
 void
-workerTaskStop (Task *task)
+taskDoneGC (Task *task, Ticks cpu_time, Ticks elapsed_time)
 {
+    task->gc_time  += cpu_time;
+    task->gc_etime += elapsed_time;
+}
+
 #if defined(THREADED_RTS)
+
+void
+workerTaskStop (Task *task)
+{
     OSThreadId id;
     id = osThreadId();
     ASSERT(task->id == id);
     ASSERT(myTask() == task);
-#endif
 
+    task->cap = NULL;
     taskTimeStamp(task);
     task->stopped = rtsTrue;
-    tasksRunning--;
 }
 
-void
-resetTaskManagerAfterFork (void)
+#endif
+
+#ifdef DEBUG
+
+static void *taskId(Task *task)
 {
-    // TODO!
-    taskCount = 0;
+#ifdef THREADED_RTS
+    return (void *)task->id;
+#else
+    return (void *)task;
+#endif
 }
 
+#endif
+
 #if defined(THREADED_RTS)
 
+static void OSThreadProcAttr
+workerStart(Task *task)
+{
+    Capability *cap;
+
+    // See startWorkerTask().
+    ACQUIRE_LOCK(&task->lock);
+    cap = task->cap;
+    RELEASE_LOCK(&task->lock);
+
+    if (RtsFlags.ParFlags.setAffinity) {
+        setThreadAffinity(cap->no, n_capabilities);
+    }
+
+    // set the thread-local pointer to the Task:
+    setMyTask(task);
+
+    newInCall(task);
+
+    scheduleWorker(cap,task);
+}
+
 void
-startWorkerTask (Capability *cap, 
-                void OSThreadProcAttr (*taskStart)(Task *task))
+startWorkerTask (Capability *cap)
 {
   int r;
   OSThreadId tid;
   Task *task;
 
-  workerCount++;
-
   // A worker always gets a fresh Task structure.
-  task = newTask();
-
-  tasksRunning++;
+  task = newTask(rtsTrue);
 
   // The lock here is to synchronise with taskStart(), to make sure
   // that we have finished setting up the Task structure before the
@@ -277,9 +419,10 @@ startWorkerTask (Capability *cap,
   ASSERT_LOCK_HELD(&cap->lock);
   cap->running_task = task;
 
-  r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+  r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
   if (r != 0) {
-    barf("startTask: Can't create new task");
+    sysErrorBelch("failed to create OS thread");
+    stg_exit(EXIT_FAILURE);
   }
 
   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
@@ -290,19 +433,19 @@ startWorkerTask (Capability *cap,
   RELEASE_LOCK(&task->lock);
 }
 
+void
+interruptWorkerTask (Task *task)
+{
+  ASSERT(osThreadId() != task->id);    // seppuku not allowed
+  ASSERT(task->incall->suspended_tso); // use this only for FFI calls
+  interruptOSThread(task->id);
+  debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
+}
+
 #endif /* THREADED_RTS */
 
 #ifdef DEBUG
 
-static void *taskId(Task *task)
-{
-#ifdef THREADED_RTS
-    return (void *)task->id;
-#else
-    return (void *)task;
-#endif
-}
-
 void printAllTasks(void);
 
 void
@@ -315,8 +458,9 @@ printAllTasks(void)
            if (task->cap) {
                debugBelch("on capability %d, ", task->cap->no);
            }
-           if (task->tso) {
-               debugBelch("bound to thread %d", task->tso->id);
+           if (task->incall->tso) {
+             debugBelch("bound to thread %lu",
+                         (unsigned long)task->incall->tso->id);
            } else {
                debugBelch("worker");
            }