[project @ 2005-10-20 11:45:19 by simonmar]
[ghc-hetmet.git] / ghc / rts / Task.c
index bf20e99..0d75df8 100644 (file)
@@ -1,25 +1,13 @@
 /* -----------------------------------------------------------------------------
  *
- * (c) The GHC Team 2001-
+ * (c) The GHC Team 2001-2005
  *
  * The task manager subsystem.  Tasks execute STG code, with this
  * module providing the API which the Scheduler uses to control their
  * creation and destruction.
- *
- * Two kinds of RTS builds uses 'tasks' - the SMP and the
- * 'native thread-friendly' builds. 
  * 
- * The SMP build lets multiple tasks concurrently execute STG code,
- * all sharing vital internal RTS data structures in a controlled manner
- * (see details elsewhere...ToDo: fill in ref!)
- *
- * The 'threads' build has at any one time only one task executing STG
- * code, other tasks are either busy executing code outside the RTS
- * (e.g., a C call) or waiting for their turn to (again) evaluate some
- * STG code. A task relinquishes its RTS token when it is asked to
- * evaluate an external (C) call.
- *
  * -------------------------------------------------------------------------*/
+
 #include "Rts.h"
 #if defined(RTS_SUPPORTS_THREADS) /* to the end */
 #include "RtsUtils.h"
 #include "Stats.h"
 #include "RtsFlags.h"
 #include "Schedule.h"
+#include "Hash.h"
+#include "Capability.h"
 
-/* There's not all that much code that is shared between the
- * SMP and threads version of the 'task manager.' A sign
- * that the code ought to be structured differently..(Maybe ToDo).
- */
-
-/* 
- * The following Task Manager-local variables are assumed to be
- * accessed with the RTS lock in hand.
- */
-#if defined(SMP)
-static TaskInfo* taskTable;
+#if HAVE_SIGNAL_H
+#include <signal.h>
 #endif
-/* upper bound / the number of tasks created. */
-static nat maxTasks;  
-/* number of tasks currently created */
-static nat taskCount; 
 
+#define INIT_TASK_TABLE_SIZE 16
+
+TaskInfo* taskTable;
+static nat taskTableSize;
+
+// maps OSThreadID to TaskInfo*
+HashTable *taskHash;
+
+nat taskCount;
+static nat tasksRunning;
+static nat workerCount;
+
+#define DEFAULT_MAX_WORKERS 64
+nat maxWorkers; // we won't create more workers than this
 
-#if defined(SMP)
 void
-startTaskManager( nat maxCount, void (*taskStart)(void) )
+initTaskManager (void)
 {
-  nat i;
-  static int initialized = 0;
-  
-  if (!initialized) {
-  
-    taskCount = 0;
-    maxTasks = maxCount;
-    /* allocate table holding task metadata */
+    static int initialized = 0;
+
+    if (!initialized) {
+#if defined(SMP)
+       taskTableSize = stg_max(INIT_TASK_TABLE_SIZE, 
+                               RtsFlags.ParFlags.nNodes * 2);
+#else
+       taskTableSize = INIT_TASK_TABLE_SIZE;
+#endif
+       taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
+                                   "initTaskManager");
+    
+       taskCount = 0;
+       workerCount = 0;
+       tasksRunning = 0;
+
+       taskHash = allocHashTable();
   
-    if (maxCount > 0) {
-      taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo),
-                              "startTaskManager:tasks");
-
-      /* and eagerly create them all. */
-      for (i = 0; i < maxCount; i++) {
-       startTask(taskStart);
-       taskCount++;
-      }
+       maxWorkers = DEFAULT_MAX_WORKERS;
+
+       initialized = 1;
     }
-    initialized = 1;
-  }
 }
 
-void
-startTask ( void (*taskStart)(void) )
+static void
+expandTaskTable (void)
 {
-  int r;
-  OSThreadId tid;
-
-  r = createOSThread(&tid,taskStart);
-  if (r != 0) {
-    barf("startTask: Can't create new task");
-  }
+    nat i;
 
-  taskTable[taskCount].id = tid;
-  taskTable[taskCount].mut_time = 0.0;
-  taskTable[taskCount].mut_etime = 0.0;
-  taskTable[taskCount].gc_time = 0.0;
-  taskTable[taskCount].gc_etime = 0.0;
-  taskTable[taskCount].elapsedtimestart = stat_getElapsedTime();
+    taskTableSize *= 2;
+    taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
+                               "expandTaskTable");
 
-  IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
-  return;
+    /* Have to update the hash table now... */
+    for (i = 0; i < taskCount; i++) {
+       removeHashTable( taskHash, taskTable[i].id, NULL );
+       insertHashTable( taskHash, taskTable[i].id, &taskTable[i] );
+    }
 }
 
 void
-stopTaskManager ()
+stopTaskManager (void)
 {
-  nat i;
-
-  /* Don't want to use pthread_cancel, since we'd have to install
-   * these silly exception handlers (pthread_cleanup_{push,pop}) around
-   * all our locks.
-   */
-#if 0
-  /* Cancel all our tasks */
-  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
-    pthread_cancel(taskTable[i].id);
-  }
-  
-  /* Wait for all the tasks to terminate */
-  for (i = 0; i < maxCount; i++) {
-    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
-                              taskTable[i].id));
-    pthread_join(taskTable[i].id, NULL);
-  }
-#endif
-
-  /* Send 'em all a SIGHUP.  That should shut 'em up. */
-  await_death = maxCount;
-  for (i = 0; i < maxCount; i++) {
-    pthread_kill(taskTable[i].id,SIGTERM);
-  }
-  while (await_death > 0) {
-    sched_yield();
-  }
-  
-  return;
+    IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
 }
 
-#else
-/************ THREADS version *****************/
 
-void
-startTaskManager( nat maxCount, void (*taskStart)(void) )
+rtsBool
+startTasks (nat num, void (*taskStart)(void))
 {
-  /* In the threaded case, maxCount is used to limit the
-     the creation of worker tasks. Tasks are created lazily, i.e.,
-     when the current task gives up the token on executing
-     STG code.
-  */
-  maxTasks = maxCount;
-  taskCount = 0;
+    nat i; 
+    for (i = 0; i < num; i++) {
+       if (!startTask(taskStart)) {
+           return rtsFalse;
+       }
+    }
+    return rtsTrue;
 }
 
-void
-startTask ( void (*taskStart)(void) )
+static TaskInfo*
+newTask (OSThreadId id, rtsBool is_worker)
+{
+    long currentElapsedTime, currentUserTime, elapsedGCTime;
+    TaskInfo *task_info;
+
+    if (taskCount >= taskTableSize) {
+       expandTaskTable();
+    }
+    
+    insertHashTable( taskHash, id, &(taskTable[taskCount]) );
+    
+    stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
+    
+    task_info = &taskTable[taskCount];
+    
+    task_info->id = id;
+    task_info->is_worker = is_worker;
+    task_info->stopped = rtsFalse;
+    task_info->mut_time = 0.0;
+    task_info->mut_etime = 0.0;
+    task_info->gc_time = 0.0;
+    task_info->gc_etime = 0.0;
+    task_info->muttimestart = currentUserTime;
+    task_info->elapsedtimestart = currentElapsedTime;
+    
+    taskCount++;
+    workerCount++;
+    tasksRunning++;
+
+    IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks););
+    
+    return task_info;
+}
+
+rtsBool
+startTask (void (*taskStart)(void))
 {
   int r;
   OSThreadId tid;
-  
-  /* Locks assumed: rts_mutex */
-  
-  /* If more than one worker thread is known to be blocked waiting
-     on thread_ready_cond, signal it rather than creating a new one.
-  */
-  if ( rts_n_waiting_tasks > 0) {
-    IF_DEBUG(scheduler,fprintf(stderr,
-                              "scheduler: startTask: %d tasks waiting, not creating new one.\n", 
-                              rts_n_waiting_tasks););
-    signalCondition(&thread_ready_cond);
-    /* Not needed, but gives more 'interesting' thread schedules when testing */
-    yieldThread();
-    return;
-  }
-
-  /* If the task limit has been reached, just return. */
-  if (maxTasks > 0 && taskCount == maxTasks) {
-    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: startTask: task limit (%d) reached, not creating new one.\n",maxTasks));
-    return;
-  }
-  
 
   r = createOSThread(&tid,taskStart);
   if (r != 0) {
     barf("startTask: Can't create new task");
   }
-  taskCount++;
+  newTask (tid, rtsTrue);
+  return rtsTrue;
+}
+
+TaskInfo *
+threadIsTask (OSThreadId id)
+{
+    TaskInfo *task_info;
+    
+    task_info = lookupHashTable(taskHash, id);
+    if (task_info != NULL) {
+       if (task_info->stopped) {
+           task_info->stopped = rtsFalse;
+       }
+       return task_info;
+    }
 
-  IF_DEBUG(scheduler,fprintf(stderr,"scheduler: startTask: new task %ld (total_count: %d; waiting: %d)\n", tid, taskCount, rts_n_waiting_tasks););
-  return;
+    return newTask(id, rtsFalse);
+}
+
+TaskInfo *
+taskOfId (OSThreadId id)
+{
+    return lookupHashTable(taskHash, id);
 }
 
 void
-stopTaskManager ()
+taskStop (void)
 {
+    OSThreadId id;
+    long currentElapsedTime, currentUserTime, elapsedGCTime;
+    TaskInfo *task_info;
+
+    id = osThreadId();
+    task_info = taskOfId(id);
+    if (task_info == NULL) {
+       debugBelch("taskStop: not a task");
+       return;
+    }
+    ASSERT(task_info->id == id);
+
+    stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
+    
+    task_info->mut_time = 
+       currentUserTime - task_info->muttimestart - task_info->gc_time;
+    task_info->mut_etime = 
+       currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime;
 
+    if (task_info->mut_time < 0.0)  { task_info->mut_time = 0.0;  }
+    if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; }
+
+    task_info->stopped = rtsTrue;
+    tasksRunning--;
 }
-#endif
 
+void
+resetTaskManagerAfterFork (void)
+{
+    rts_n_waiting_tasks = 0;
+    taskCount = 0;
+}
+
+rtsBool
+maybeStartNewWorker (void (*taskStart)(void))
+{
+    /* 
+     * If more than one worker thread is known to be blocked waiting
+     * on thread_ready_cond, don't create a new one.
+     */
+    if ( rts_n_waiting_tasks > 0) {
+       IF_DEBUG(scheduler,sched_belch(
+                    "startTask: %d tasks waiting, not creating new one", 
+                    rts_n_waiting_tasks););
+       // the task will run as soon as a capability is available,
+       // so there's no need to wake it.
+       return rtsFalse;
+    }
+    
+    /* If the task limit has been reached, just return. */
+    if (maxWorkers > 0 && workerCount >= maxWorkers) {
+       IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers));
+       return rtsFalse;
+    }
+    
+    return startTask(taskStart);
+}
 
 #endif /* RTS_SUPPORTS_THREADS */