fix non-threaded way
[ghc-hetmet.git] / ghc / rts / Task.c
index 76ea891..89db782 100644 (file)
@@ -9,33 +9,44 @@
  * -------------------------------------------------------------------------*/
 
 #include "Rts.h"
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
 #include "RtsUtils.h"
 #include "OSThreads.h"
 #include "Task.h"
+#include "Capability.h"
 #include "Stats.h"
 #include "RtsFlags.h"
 #include "Schedule.h"
 #include "Hash.h"
-#include "Capability.h"
 
 #if HAVE_SIGNAL_H
 #include <signal.h>
 #endif
 
-#define INIT_TASK_TABLE_SIZE 16
-
-TaskInfo* taskTable;
-static nat taskTableSize;
-
-HashTable *taskHash; // maps OSThreadID to TaskInfo*
-
-nat taskCount;
+// Task lists and global counters.
+// Locks required: sched_mutex.
+Task *all_tasks = NULL;
+static Task *task_free_list = NULL; // singly-linked
+static nat taskCount;
+#define DEFAULT_MAX_WORKERS 64
+static nat maxWorkers; // we won't create more workers than this
 static nat tasksRunning;
 static nat workerCount;
 
-#define DEFAULT_MAX_WORKERS 64
-nat maxWorkers; // we won't create more workers than this
+/* -----------------------------------------------------------------------------
+ * Remembering the current thread's Task
+ * -------------------------------------------------------------------------- */
+
+// A thread-local-storage key that we can use to get access to the
+// current thread's Task structure.
+#if defined(THREADED_RTS)
+ThreadLocalKey currentTaskKey;
+#else
+Task *my_task;
+#endif
+
+/* -----------------------------------------------------------------------------
+ * Rest of the Task API
+ * -------------------------------------------------------------------------- */
 
 void
 initTaskManager (void)
@@ -43,207 +54,262 @@ initTaskManager (void)
     static int initialized = 0;
 
     if (!initialized) {
-       taskTableSize = INIT_TASK_TABLE_SIZE;
-       taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
-                                   "initTaskManager");
-    
        taskCount = 0;
        workerCount = 0;
        tasksRunning = 0;
-
-       taskHash = allocHashTable();
-  
        maxWorkers = DEFAULT_MAX_WORKERS;
-
        initialized = 1;
+#if defined(THREADED_RTS)
+       newThreadLocalKey(&currentTaskKey);
+#endif
     }
 }
 
-static void
-expandTaskTable (void)
-{
-    taskTableSize *= 2;
-    taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
-                               "expandTaskTable");
-}
 
 void
 stopTaskManager (void)
 {
-    nat i;
-
     IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
-    for (i = 1000; i > 0; i--) {
-       if (tasksRunning == 0) {
-           IF_DEBUG(scheduler, sched_belch("all tasks stopped"));
-           return;
-       }
-       prodWorker();
-       yieldThread();
-    }
-    IF_DEBUG(scheduler, sched_belch("%d tasks still running, exiting anyway", tasksRunning));
-
-    /* 
-       OLD CODE follows:
-    */
-#if old_code
-    /* Send 'em all a SIGHUP.  That should shut 'em up. */
-    awaitDeath = taskCount==0 ? 0 : taskCount-1;
-    for (i = 0; i < taskCount; i++) {
-       /* don't cancel the thread running this piece of code. */
-       if ( taskTable[i].id != tid ) {
-           pthread_kill(taskTable[i].id,SIGTERM);
-       }
-    }
-    while (awaitDeath > 0) {
-       sched_yield();
-    }
-#endif // old_code
 }
 
 
-rtsBool
-startTasks (nat num, void (*taskStart)(void))
-{
-    nat i; 
-    for (i = 0; i < num; i++) {
-       if (!startTask(taskStart)) {
-           return rtsFalse;
-       }
-    }
-    return rtsTrue;
-}
-
-static TaskInfo*
-newTask (OSThreadId id, rtsBool is_worker)
+static Task*
+newTask (void)
 {
-    long currentElapsedTime, currentUserTime, elapsedGCTime;
-    TaskInfo *task_info;
+#if defined(THREADED_RTS)
+    Ticks currentElapsedTime, currentUserTime;
+#endif
+    Task *task;
 
-    if (taskCount >= taskTableSize) {
-       expandTaskTable();
-    }
-    
-    insertHashTable( taskHash, id, &(taskTable[taskCount]) );
-    
-    stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
-    
-    task_info = &taskTable[taskCount];
+    task = stgMallocBytes(sizeof(Task), "newTask");
     
-    task_info->id = id;
-    task_info->is_worker = is_worker;
-    task_info->stopped = rtsFalse;
-    task_info->mut_time = 0.0;
-    task_info->mut_etime = 0.0;
-    task_info->gc_time = 0.0;
-    task_info->gc_etime = 0.0;
-    task_info->muttimestart = currentUserTime;
-    task_info->elapsedtimestart = currentElapsedTime;
+    task->cap  = NULL;
+    task->stopped = rtsFalse;
+    task->suspended_tso = NULL;
+    task->tso  = NULL;
+    task->stat = NoStatus;
+    task->ret  = NULL;
     
+#if defined(THREADED_RTS)
+    initCondition(&task->cond);
+    initMutex(&task->lock);
+    task->wakeup = rtsFalse;
+#endif
+
+#if defined(THREADED_RTS)
+    currentUserTime = getThreadCPUTime();
+    currentElapsedTime = getProcessElapsedTime();
+    task->mut_time = 0.0;
+    task->mut_etime = 0.0;
+    task->gc_time = 0.0;
+    task->gc_etime = 0.0;
+    task->muttimestart = currentUserTime;
+    task->elapsedtimestart = currentElapsedTime;
+#endif
+
+    task->prev = NULL;
+    task->next = NULL;
+    task->return_link = NULL;
+
+    task->all_link = all_tasks;
+    all_tasks = task;
+
     taskCount++;
     workerCount++;
-    tasksRunning++;
 
-    IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks););
-    
-    return task_info;
+    return task;
 }
 
-rtsBool
-startTask (void (*taskStart)(void))
+Task *
+newBoundTask (void)
 {
-  int r;
-  OSThreadId tid;
+    Task *task;
+
+    ASSERT_LOCK_HELD(&sched_mutex);
+    if (task_free_list == NULL) {
+       task = newTask();
+    } else {
+       task = task_free_list;
+       task_free_list = task->next;
+       task->next = NULL;
+       task->prev = NULL;
+       task->stopped = rtsFalse;
+    }
+#if defined(THREADED_RTS)
+    task->id = osThreadId();
+#endif
+    ASSERT(task->cap == NULL);
 
-  r = createOSThread(&tid,taskStart);
-  if (r != 0) {
-    barf("startTask: Can't create new task");
-  }
-  newTask (tid, rtsTrue);
-  return rtsTrue;
+    tasksRunning++;
+
+    taskEnter(task);
+
+    IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
+    return task;
 }
 
-TaskInfo *
-threadIsTask (OSThreadId id)
+void
+boundTaskExiting (Task *task)
 {
-    TaskInfo *task_info;
-    
-    task_info = lookupHashTable(taskHash, id);
-    if (task_info != NULL) {
-       if (task_info->stopped) {
-           task_info->stopped = rtsFalse;
-       }
-       return task_info;
-    }
+    task->stopped = rtsTrue;
+    task->cap = NULL;
+
+#if defined(THREADED_RTS)
+    ASSERT(osThreadId() == task->id);
+#endif
+    ASSERT(myTask() == task);
+    setMyTask(task->prev_stack);
+
+    tasksRunning--;
 
-    return newTask(id, rtsFalse);
+    // sadly, we need a lock around the free task list. Todo: eliminate.
+    ACQUIRE_LOCK(&sched_mutex);
+    task->next = task_free_list;
+    task_free_list = task;
+    RELEASE_LOCK(&sched_mutex);
+
+    IF_DEBUG(scheduler,sched_belch("task exiting"));
 }
 
-TaskInfo *
-taskOfId (OSThreadId id)
+void
+discardTask (Task *task)
 {
-    return lookupHashTable(taskHash, id);
+    ASSERT_LOCK_HELD(&sched_mutex);
+    if (!task->stopped) {
+       IF_DEBUG(scheduler,sched_belch("discarding task %p",
+#ifdef THREADED_RTS
+                                      (void *)task->id
+#else
+                                      (void *)task
+#endif
+                    ));
+       task->cap = NULL;
+       task->tso = NULL;
+       task->stopped = rtsTrue;
+       tasksRunning--;
+       task->next = task_free_list;
+       task_free_list = task;
+    }
 }
 
 void
-taskStop (void)
+taskStop (Task *task)
 {
+#if defined(THREADED_RTS)
     OSThreadId id;
-    long currentElapsedTime, currentUserTime, elapsedGCTime;
-    TaskInfo *task_info;
+    Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
 
     id = osThreadId();
-    task_info = taskOfId(id);
-    if (task_info == NULL) {
-       debugBelch("taskStop: not a task");
-       return;
-    }
-    ASSERT(task_info->id == id);
+    ASSERT(task->id == id);
+    ASSERT(myTask() == task);
 
-    task_info->stopped = rtsTrue;
-    tasksRunning--;
+    currentUserTime = getThreadCPUTime();
+    currentElapsedTime = getProcessElapsedTime();
 
-    stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
+    // XXX this is wrong; we want elapsed GC time since the
+    // Task started.
+    elapsedGCTime = stat_getElapsedGCTime();
     
-    task_info->mut_time = 
-       currentUserTime - task_info->muttimestart - task_info->gc_time;
-    task_info->mut_etime = 
-       currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime;
+    task->mut_time = 
+       currentUserTime - task->muttimestart - task->gc_time;
+    task->mut_etime = 
+       currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
 
-    if (task_info->mut_time < 0.0)  { task_info->mut_time = 0.0;  }
-    if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; }
+    if (task->mut_time < 0.0)  { task->mut_time = 0.0;  }
+    if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
+#endif
+
+    task->stopped = rtsTrue;
+    tasksRunning--;
 }
 
 void
 resetTaskManagerAfterFork (void)
 {
-    rts_n_waiting_tasks = 0;
+#warning TODO!
     taskCount = 0;
 }
 
-rtsBool
-maybeStartNewWorker (void (*taskStart)(void))
+#if defined(THREADED_RTS)
+
+void
+startWorkerTask (Capability *cap, 
+                void OSThreadProcAttr (*taskStart)(Task *task))
 {
-    /* 
-     * If more than one worker thread is known to be blocked waiting
-     * on thread_ready_cond, don't create a new one.
-     */
-    if ( rts_n_waiting_tasks > 0) {
-       IF_DEBUG(scheduler,sched_belch(
-                    "startTask: %d tasks waiting, not creating new one", 
-                    rts_n_waiting_tasks););
-       // the task will run as soon as a capability is available,
-       // so there's no need to wake it.
-       return rtsFalse;
-    }
-    
-    /* If the task limit has been reached, just return. */
-    if (maxWorkers > 0 && workerCount >= maxWorkers) {
-       IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers));
-       return rtsFalse;
-    }
-    
-    return startTask(taskStart);
+  int r;
+  OSThreadId tid;
+  Task *task;
+
+  if (workerCount >= maxWorkers) {
+      barf("too many workers; runaway worker creation?");
+  }
+  workerCount++;
+
+  // A worker always gets a fresh Task structure.
+  task = newTask();
+
+  tasksRunning++;
+
+  // The lock here is to synchronise with taskStart(), to make sure
+  // that we have finished setting up the Task structure before the
+  // worker thread reads it.
+  ACQUIRE_LOCK(&task->lock);
+
+  task->cap = cap;
+
+  // Give the capability directly to the worker; we can't let anyone
+  // else get in, because the new worker Task has nowhere to go to
+  // sleep so that it could be woken up again.
+  ASSERT_LOCK_HELD(&cap->lock);
+  cap->running_task = task;
+
+  r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+  if (r != 0) {
+    barf("startTask: Can't create new task");
+  }
+
+  IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
+
+  task->id = tid;
+
+  // ok, finished with the Task struct.
+  RELEASE_LOCK(&task->lock);
+}
+
+#endif /* THREADED_RTS */
+
+#ifdef DEBUG
+
+static void *taskId(Task *task)
+{
+#ifdef THREADED_RTS
+    return (void *)task->id;
+#else
+    return (void *)task;
+#endif
 }
 
-#endif /* RTS_SUPPORTS_THREADS */
+void printAllTasks(void);
+
+void
+printAllTasks(void)
+{
+    Task *task;
+    for (task = all_tasks; task != NULL; task = task->all_link) {
+       debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
+       if (!task->stopped) {
+           if (task->cap) {
+               debugBelch("on capability %d, ", task->cap->no);
+           }
+           if (task->tso) {
+               debugBelch("bound to thread %d", task->tso->id);
+           } else {
+               debugBelch("worker");
+           }
+       }
+       debugBelch("\n");
+    }
+}                     
+
+#endif
+