Add a GHC layout extension to the alternative layout rule
[ghc-hetmet.git] / rts / Task.c
index 7366480..9a8ebd6 100644 (file)
@@ -8,15 +8,16 @@
  * 
  * -------------------------------------------------------------------------*/
 
+#include "PosixSource.h"
 #include "Rts.h"
+
 #include "RtsUtils.h"
-#include "OSThreads.h"
 #include "Task.h"
 #include "Capability.h"
 #include "Stats.h"
-#include "RtsFlags.h"
 #include "Schedule.h"
 #include "Hash.h"
+#include "Trace.h"
 
 #if HAVE_SIGNAL_H
 #include <signal.h>
 Task *all_tasks = NULL;
 static Task *task_free_list = NULL; // singly-linked
 static nat taskCount;
-#define DEFAULT_MAX_WORKERS 64
-static nat maxWorkers; // we won't create more workers than this
 static nat tasksRunning;
 static nat workerCount;
+static int tasksInitialized = 0;
 
 /* -----------------------------------------------------------------------------
  * Remembering the current thread's Task
@@ -51,25 +51,50 @@ Task *my_task;
 void
 initTaskManager (void)
 {
-    static int initialized = 0;
-
-    if (!initialized) {
+    if (!tasksInitialized) {
        taskCount = 0;
        workerCount = 0;
        tasksRunning = 0;
-       maxWorkers = DEFAULT_MAX_WORKERS;
-       initialized = 1;
+       tasksInitialized = 1;
 #if defined(THREADED_RTS)
        newThreadLocalKey(&currentTaskKey);
 #endif
     }
 }
 
-
-void
-stopTaskManager (void)
+nat
+freeTaskManager (void)
 {
-    IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
+    Task *task, *next;
+
+    ASSERT_LOCK_HELD(&sched_mutex);
+
+    debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
+               tasksRunning);
+
+    for (task = all_tasks; task != NULL; task = next) {
+        next = task->all_link;
+        if (task->stopped) {
+            // We only free resources if the Task is not in use.  A
+            // Task may still be in use if we have a Haskell thread in
+            // a foreign call while we are attempting to shut down the
+            // RTS (see conc059).
+#if defined(THREADED_RTS)
+            closeCondition(&task->cond);
+            closeMutex(&task->lock);
+#endif
+            stgFree(task);
+        }
+    }
+    all_tasks = NULL;
+    task_free_list = NULL;
+#if defined(THREADED_RTS)
+    freeThreadLocalKey(&currentTaskKey);
+#endif
+
+    tasksInitialized = 0;
+
+    return tasksRunning;
 }
 
 
@@ -81,7 +106,8 @@ newTask (void)
 #endif
     Task *task;
 
-    task = stgMallocBytes(sizeof(Task), "newTask");
+#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
+    task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
     
     task->cap  = NULL;
     task->stopped = rtsFalse;
@@ -99,10 +125,10 @@ newTask (void)
 #if defined(THREADED_RTS)
     currentUserTime = getThreadCPUTime();
     currentElapsedTime = getProcessElapsedTime();
-    task->mut_time = 0.0;
-    task->mut_etime = 0.0;
-    task->gc_time = 0.0;
-    task->gc_etime = 0.0;
+    task->mut_time = 0;
+    task->mut_etime = 0;
+    task->gc_time = 0;
+    task->gc_etime = 0;
     task->muttimestart = currentUserTime;
     task->elapsedtimestart = currentElapsedTime;
 #endif
@@ -115,7 +141,6 @@ newTask (void)
     all_tasks = task;
 
     taskCount++;
-    workerCount++;
 
     return task;
 }
@@ -125,7 +150,17 @@ newBoundTask (void)
 {
     Task *task;
 
-    ASSERT_LOCK_HELD(&sched_mutex);
+    if (!tasksInitialized) {
+        errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
+        stg_exit(EXIT_FAILURE);
+    }
+
+    // ToDo: get rid of this lock in the common case.  We could store
+    // a free Task in thread-local storage, for example.  That would
+    // leave just one lock on the path into the RTS: cap->lock when
+    // acquiring the Capability.
+    ACQUIRE_LOCK(&sched_mutex);
+
     if (task_free_list == NULL) {
        task = newTask();
     } else {
@@ -144,13 +179,16 @@ newBoundTask (void)
 
     taskEnter(task);
 
-    IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
+    RELEASE_LOCK(&sched_mutex);
+
+    debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
     return task;
 }
 
 void
 boundTaskExiting (Task *task)
 {
+    task->tso = NULL;
     task->stopped = rtsTrue;
     task->cap = NULL;
 
@@ -168,7 +206,7 @@ boundTaskExiting (Task *task)
     task_free_list = task;
     RELEASE_LOCK(&sched_mutex);
 
-    IF_DEBUG(scheduler,sched_belch("task exiting"));
+    debugTrace(DEBUG_sched, "task exiting");
 }
 
 #ifdef THREADED_RTS
@@ -182,9 +220,13 @@ discardTask (Task *task)
 {
     ASSERT_LOCK_HELD(&sched_mutex);
     if (!task->stopped) {
-       IF_DEBUG(scheduler,sched_belch("discarding task %p", TASK_ID(task)));
+       debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
        task->cap = NULL;
-       task->tso = NULL;
+        if (task->tso == NULL) {
+            workerCount--;
+        } else {
+            task->tso = NULL;
+        }
        task->stopped = rtsTrue;
        tasksRunning--;
        task->next = task_free_list;
@@ -193,16 +235,11 @@ discardTask (Task *task)
 }
 
 void
-taskStop (Task *task)
+taskTimeStamp (Task *task USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
-    OSThreadId id;
     Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
 
-    id = osThreadId();
-    ASSERT(task->id == id);
-    ASSERT(myTask() == task);
-
     currentUserTime = getThreadCPUTime();
     currentElapsedTime = getProcessElapsedTime();
 
@@ -215,21 +252,35 @@ taskStop (Task *task)
     task->mut_etime = 
        currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
 
-    if (task->mut_time < 0.0)  { task->mut_time = 0.0;  }
-    if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
+    if (task->mut_time  < 0) { task->mut_time  = 0; }
+    if (task->mut_etime < 0) { task->mut_etime = 0; }
 #endif
-
-    task->stopped = rtsTrue;
-    tasksRunning--;
 }
 
+#if defined(THREADED_RTS)
+
 void
-resetTaskManagerAfterFork (void)
+workerTaskStop (Task *task)
 {
-#warning TODO!
-    taskCount = 0;
+    OSThreadId id;
+    id = osThreadId();
+    ASSERT(task->id == id);
+    ASSERT(myTask() == task);
+
+    task->cap = NULL;
+    taskTimeStamp(task);
+    task->stopped = rtsTrue;
+    tasksRunning--;
+    workerCount--;
+
+    ACQUIRE_LOCK(&sched_mutex);
+    task->next = task_free_list;
+    task_free_list = task;
+    RELEASE_LOCK(&sched_mutex);
 }
 
+#endif
+
 #if defined(THREADED_RTS)
 
 void
@@ -240,9 +291,6 @@ startWorkerTask (Capability *cap,
   OSThreadId tid;
   Task *task;
 
-  if (workerCount >= maxWorkers) {
-      barf("too many workers; runaway worker creation?");
-  }
   workerCount++;
 
   // A worker always gets a fresh Task structure.
@@ -265,10 +313,11 @@ startWorkerTask (Capability *cap,
 
   r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
   if (r != 0) {
-    barf("startTask: Can't create new task");
+    sysErrorBelch("failed to create OS thread");
+    stg_exit(EXIT_FAILURE);
   }
 
-  IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
+  debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
 
   task->id = tid;
 
@@ -302,7 +351,7 @@ printAllTasks(void)
                debugBelch("on capability %d, ", task->cap->no);
            }
            if (task->tso) {
-               debugBelch("bound to thread %d", task->tso->id);
+             debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
            } else {
                debugBelch("worker");
            }