[project @ 2005-04-06 15:27:06 by simonmar]
authorsimonmar <unknown>
Wed, 6 Apr 2005 15:27:06 +0000 (15:27 +0000)
committersimonmar <unknown>
Wed, 6 Apr 2005 15:27:06 +0000 (15:27 +0000)
Revamp the Task API: now we use the same implementation for threaded
and SMP.  We also keep per-task timing stats in the threaded RTS now,
which makes the output of +RTS -sstderr more useful.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Main.c
ghc/rts/Schedule.c
ghc/rts/Stats.c
ghc/rts/Stats.h
ghc/rts/Task.c
ghc/rts/Task.h

index 8a93dc9..bdb651d 100644 (file)
@@ -207,6 +207,10 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
 #if !defined(SMP)
     ASSERT(rts_n_free_capabilities == 0);
 #endif
+#if defined(SMP)
+    cap->link = free_capabilities;
+    free_capabilities = cap;
+#endif
     // Check to see whether a worker thread can be given
     // the go-ahead to return the result of an external call..
     if (rts_n_waiting_workers > 0) {
@@ -214,12 +218,6 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
        // thread that is yielding its capability will repeatedly
        // signal returning_worker_cond.
 
-#if defined(SMP)
-       // SMP variant untested
-       cap->link = free_capabilities;
-       free_capabilities = cap;
-#endif
-
        rts_n_waiting_workers--;
        signalCondition(&returning_worker_cond);
        IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker"));
@@ -230,13 +228,15 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
        } else {
            signalCondition(passTarget);
        }
+#if defined(SMP)
+       rts_n_free_capabilities++;
+#else
        rts_n_free_capabilities = 1;
+#endif
        IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it"));
 
     } else {
 #if defined(SMP)
-       cap->link = free_capabilities;
-       free_capabilities = cap;
        rts_n_free_capabilities++;
 #else
        rts_n_free_capabilities = 1;
@@ -433,7 +433,6 @@ passCapabilityToWorker( void )
    ToDo: should check whether the thread at the front of the queue is
    bound, and if so wake up the appropriate worker.
    -------------------------------------------------------------------------- */
-
 void
 threadRunnable ( void )
 {
@@ -444,3 +443,17 @@ threadRunnable ( void )
     startSchedulerTaskIfNecessary();
 #endif
 }
+
+
+/* ----------------------------------------------------------------------------
+   prodWorker()
+
+   Wake up... time to die.
+   -------------------------------------------------------------------------- */
+void
+prodWorker ( void )
+{
+#if defined(RTS_SUPPORTS_THREADS)
+    signalCondition(&thread_ready_cond);
+#endif
+}
index c575335..963aa85 100644 (file)
@@ -35,6 +35,8 @@ extern void releaseCapability( Capability* cap );
 //
 extern void threadRunnable ( void );
 
+extern void prodWorker ( void );
+
 #ifdef RTS_SUPPORTS_THREADS
 // Gives up the current capability IFF there is a higher-priority
 // thread waiting for it.  This happens in one of two ways:
index 7721438..c6d4170 100644 (file)
@@ -16,6 +16,7 @@
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "Prelude.h"
+#include "Task.h"
 #include <stdlib.h>
 
 #ifdef DEBUG
@@ -50,6 +51,11 @@ int main(int argc, char *argv[])
 
     startupHaskell(argc,argv,__stginit_ZCMain);
 
+    /* Register this thread as a task, so we can get timing stats about it */
+#if defined(RTS_SUPPORTS_THREADS)
+    threadIsTask(osThreadId());
+#endif
+
     /* kick off the computation by creating the main thread with a pointer
        to mainIO_closure representing the computation of the overall program;
        then enter the scheduler with this thread and off we go;
index 6e363a6..6cac6c1 100644 (file)
@@ -273,7 +273,6 @@ static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 // scheduler clearer.
 //
 static void schedulePreLoop(void);
-static void scheduleHandleInterrupt(void);
 static void scheduleStartSignalHandlers(void);
 static void scheduleCheckBlockedThreads(void);
 static void scheduleCheckBlackHoles(void);
@@ -333,26 +332,25 @@ taskStart(void)
   ACQUIRE_LOCK(&sched_mutex);
   startingWorkerThread = rtsFalse;
   schedule(NULL,NULL);
+  taskStop();
   RELEASE_LOCK(&sched_mutex);
 }
 
 void
 startSchedulerTaskIfNecessary(void)
 {
-  if(run_queue_hd != END_TSO_QUEUE
-    || blocked_queue_hd != END_TSO_QUEUE
-    || sleeping_queue != END_TSO_QUEUE)
-  {
-    if(!startingWorkerThread)
-    { // we don't want to start another worker thread
-      // just because the last one hasn't yet reached the
-      // "waiting for capability" state
-      startingWorkerThread = rtsTrue;
-      if (!startTask(taskStart)) {
-         startingWorkerThread = rtsFalse;
-      }
+    if ( !EMPTY_RUN_QUEUE()
+        && !shutting_down_scheduler // not if we're shutting down
+        && !startingWorkerThread )
+    {
+       // we don't want to start another worker thread
+       // just because the last one hasn't yet reached the
+       // "waiting for capability" state
+       startingWorkerThread = rtsTrue;
+       if (!maybeStartNewWorker(taskStart)) {
+           startingWorkerThread = rtsFalse;
+       }
     }
-  }
 }
 #endif
 
@@ -508,7 +506,26 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
          stg_exit(1);
     }
 
-    scheduleHandleInterrupt();
+    //
+    // Test for interruption.  If interrupted==rtsTrue, then either
+    // we received a keyboard interrupt (^C), or the scheduler is
+    // trying to shut down all the tasks (shutting_down_scheduler) in
+    // the threaded RTS.
+    //
+    if (interrupted) {
+       if (shutting_down_scheduler) {
+           IF_DEBUG(scheduler, sched_belch("shutting down"));
+           releaseCapability(cap);
+           if (mainThread) {
+               mainThread->stat = Interrupted;
+               mainThread->ret  = NULL;
+           }
+           return;
+       } else {
+           IF_DEBUG(scheduler, sched_belch("interrupted"));
+           deleteAllThreads();
+       }
+    }
 
 #if defined(not_yet) && defined(SMP)
     //
@@ -792,33 +809,6 @@ schedulePreLoop(void)
 }
 
 /* ----------------------------------------------------------------------------
- * Deal with the interrupt flag
- * ASSUMES: sched_mutex
- * ------------------------------------------------------------------------- */
-
-static
-void scheduleHandleInterrupt(void)
-{
-    //
-    // Test for interruption.  If interrupted==rtsTrue, then either
-    // we received a keyboard interrupt (^C), or the scheduler is
-    // trying to shut down all the tasks (shutting_down_scheduler) in
-    // the threaded RTS.
-    //
-    if (interrupted) {
-       if (shutting_down_scheduler) {
-           IF_DEBUG(scheduler, sched_belch("shutting down"));
-#if defined(RTS_SUPPORTS_THREADS)
-           shutdownThread();
-#endif
-       } else {
-           IF_DEBUG(scheduler, sched_belch("interrupted"));
-           deleteAllThreads();
-       }
-    }
-}
-
-/* ----------------------------------------------------------------------------
  * Start any pending signal handlers
  * ASSUMES: sched_mutex
  * ------------------------------------------------------------------------- */
@@ -1476,7 +1466,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
        blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
        
        IF_DEBUG(scheduler,
-                debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
+                debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
                            (long)t->id, whatNext_strs[t->what_next], blocks));
        
        // don't do this if it would push us over the
@@ -2620,8 +2610,12 @@ initScheduler(void)
   initCapabilities();
   
 #if defined(RTS_SUPPORTS_THREADS)
-    /* start our haskell execution tasks */
-    startTaskManager(0,taskStart);
+  initTaskManager();
+#endif
+
+#if defined(SMP)
+  /* eagerly start some extra workers */
+  startTasks(RtsFlags.ParFlags.nNodes, taskStart);
 #endif
 
 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
@@ -2634,11 +2628,12 @@ initScheduler(void)
 void
 exitScheduler( void )
 {
+    interrupted = rtsTrue;
+    shutting_down_scheduler = rtsTrue;
 #if defined(RTS_SUPPORTS_THREADS)
-  stopTaskManager();
+    if (threadIsTask(osThreadId())) { taskStop(); }
+    stopTaskManager();
 #endif
-  interrupted = rtsTrue;
-  shutting_down_scheduler = rtsTrue;
 }
 
 /* ----------------------------------------------------------------------------
index 7c14590..efa3e31 100644 (file)
@@ -363,13 +363,13 @@ stat_startExit(void)
        PROF_VAL(RPe_tot_time + HCe_tot_time) - InitElapsedStamp;
     if (MutElapsedTime < 0) { MutElapsedTime = 0; }    /* sometimes -0.00 */
 
-    /* for SMP, we don't know the mutator time yet, we have to inspect
+    /* for threads, we don't know the mutator time yet, we have to inspect
      * all the running threads to find out, and they haven't stopped
      * yet.  So we just timestamp MutUserTime at this point so we can
      * calculate the EXIT time.  The real MutUserTime is calculated
      * in stat_exit below.
      */
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
     MutUserTime = CurrentUserTime;
 #else
     MutUserTime = CurrentUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime;
@@ -381,7 +381,7 @@ void
 stat_endExit(void)
 {
     getTimes();
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
     ExitUserTime = CurrentUserTime - MutUserTime;
 #else
     ExitUserTime = CurrentUserTime - MutUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime;
@@ -478,17 +478,13 @@ stat_endGC(lnat alloc, lnat collect, lnat live, lnat copied, lnat gen)
        GC_tot_time   += gc_time;
        GCe_tot_time  += gc_etime;
        
-#if defined(SMP)
+#if defined(RTS_SUPPORTS_THREADS)
        {
-           nat i;
-           pthread_t me = pthread_self();
-
-           for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
-               if (me == taskTable[i].id) {
-                   taskTable[i].gc_time += gc_time;
-                   taskTable[i].gc_etime += gc_etime;
-                   break;
-               }
+           TaskInfo *task_info = taskOfId(osThreadId());
+           
+           if (task_info != NULL) {
+               task_info->gc_time += gc_time;
+               task_info->gc_etime += gc_etime;
            }
        }
 #endif
@@ -582,35 +578,16 @@ stat_endHeapCensus(void)
    stats for this thread into the taskTable struct for that thread.
    -------------------------------------------------------------------------- */
 
-#if defined(SMP)
 void
-stat_workerStop(void)
-{
-    nat i;
-    pthread_t me = pthread_self();
-
-    getTimes();
-
-    for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
-       if (taskTable[i].id == me) {
-           taskTable[i].mut_time = CurrentUserTime - taskTable[i].gc_time;
-           taskTable[i].mut_etime = CurrentElapsedTime
-               - GCe_tot_time
-               - taskTable[i].elapsedtimestart;
-           if (taskTable[i].mut_time < 0.0)  { taskTable[i].mut_time = 0.0;  }
-           if (taskTable[i].mut_etime < 0.0) { taskTable[i].mut_etime = 0.0; }
-       }
-    }
-}
-#endif
-
-#if defined(SMP)
-long int stat_getElapsedTime ()
+stat_getTimes ( long *currentElapsedTime, 
+               long *currentUserTime,
+               long *elapsedGCTime )
 {
   getTimes();
-  return CurrentElapsedTime;
+  *currentElapsedTime = CurrentElapsedTime;
+  *currentUserTime = CurrentUserTime;
+  *elapsedGCTime = GCe_tot_time;
 }
-#endif
 
 /* -----------------------------------------------------------------------------
    Called at the end of execution
@@ -631,15 +608,10 @@ stat_exit(int alloc)
        nat g, total_collections = 0;
 
        getTimes();
-       time = CurrentUserTime;
        etime = CurrentElapsedTime - ElapsedTimeStart;
 
        GC_tot_alloc += alloc;
 
-       /* avoid divide by zero if time is measured as 0.00 seconds -- SDM */
-       if (time  == 0.0)  time = 1;
-       if (etime == 0.0) etime = 1;
-       
        /* Count total garbage collections */
        for (g = 0; g < RtsFlags.GcFlags.generations; g++)
            total_collections += generations[g].collections;
@@ -647,17 +619,24 @@ stat_exit(int alloc)
        /* For SMP, we have to get the user time from each thread
         * and try to work out the total time.
         */
-#ifdef SMP
-       {   nat i;
+#if defined(RTS_SUPPORTS_THREADS)
+       {   
+           nat i;
            MutUserTime = 0.0;
-           for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
+           for (i = 0; i < taskCount; i++) {
                MutUserTime += taskTable[i].mut_time;
            }
        }
        time = MutUserTime + GC_tot_time + InitUserTime + ExitUserTime;
        if (MutUserTime < 0) { MutUserTime = 0; }
+#else
+       time = CurrentUserTime;
 #endif
 
+       /* avoid divide by zero if time is measured as 0.00 seconds -- SDM */
+       if (time  == 0.0)  time = 1;
+       if (etime == 0.0) etime = 1;
+       
        if (RtsFlags.GcFlags.giveStats >= VERBOSE_GC_STATS) {
            statsPrintf("%9ld %9.9s %9.9s", (lnat)alloc*sizeof(W_), "", "");
            statsPrintf(" %5.2f %5.2f\n\n", 0.0, 0.0);
@@ -690,17 +669,18 @@ stat_exit(int alloc)
            statsPrintf("\n%11ld Mb total memory in use\n\n", 
                    mblocks_allocated * MBLOCK_SIZE / (1024 * 1024));
 
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
            {
                nat i;
-               for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
-                   statsPrintf("  Task %2d:  MUT time: %6.2fs  (%6.2fs elapsed)\n"
-                           "            GC  time: %6.2fs  (%6.2fs elapsed)\n\n", 
-                           i, 
-                           TICK_TO_DBL(taskTable[i].mut_time),
-                           TICK_TO_DBL(taskTable[i].mut_etime),
-                           TICK_TO_DBL(taskTable[i].gc_time),
-                           TICK_TO_DBL(taskTable[i].gc_etime));
+               for (i = 0; i < taskCount; i++) {
+                   statsPrintf("  Task %2d %-8s :  MUT time: %6.2fs  (%6.2fs elapsed)\n"
+                           "                      GC  time: %6.2fs  (%6.2fs elapsed)\n\n", 
+                               i,
+                               taskTable[i].is_worker ? "(worker)" : "(bound)",
+                               TICK_TO_DBL(taskTable[i].mut_time),
+                               TICK_TO_DBL(taskTable[i].mut_etime),
+                               TICK_TO_DBL(taskTable[i].gc_time),
+                               TICK_TO_DBL(taskTable[i].gc_etime));
                }
            }
 #endif
index 962d38c..0ed1307 100644 (file)
@@ -46,6 +46,6 @@ extern double    mut_user_time_during_heap_census(void);
 extern void      statDescribeGens( void );
 extern HsInt64   getAllocations( void );
 
-#if defined(SMP)
-extern long int  stat_getElapsedTime ( void );
-#endif
+extern void      stat_getTimes ( long *currentElapsedTime, 
+                                long *currentUserTime,
+                                long *elapsedGCTime );
index 42dc9c9..76ea891 100644 (file)
@@ -1,25 +1,13 @@
 /* -----------------------------------------------------------------------------
  *
- * (c) The GHC Team 2001-
+ * (c) The GHC Team 2001-2005
  *
  * The task manager subsystem.  Tasks execute STG code, with this
  * module providing the API which the Scheduler uses to control their
  * creation and destruction.
- *
- * Two kinds of RTS builds uses 'tasks' - the SMP and the
- * 'native thread-friendly' builds. 
  * 
- * The SMP build lets multiple tasks concurrently execute STG code,
- * all sharing vital internal RTS data structures in a controlled manner
- * (see details elsewhere...ToDo: fill in ref!)
- *
- * The 'threads' build has at any one time only one task executing STG
- * code, other tasks are either busy executing code outside the RTS
- * (e.g., a C call) or waiting for their turn to (again) evaluate some
- * STG code. A task relinquishes its RTS token when it is asked to
- * evaluate an external (C) call.
- *
  * -------------------------------------------------------------------------*/
+
 #include "Rts.h"
 #if defined(RTS_SUPPORTS_THREADS) /* to the end */
 #include "RtsUtils.h"
 #include "Stats.h"
 #include "RtsFlags.h"
 #include "Schedule.h"
+#include "Hash.h"
+#include "Capability.h"
 
 #if HAVE_SIGNAL_H
 #include <signal.h>
 #endif
 
-/* There's not all that much code that is shared between the
- * SMP and threads version of the 'task manager.' A sign
- * that the code ought to be structured differently..(Maybe ToDo).
- */
+#define INIT_TASK_TABLE_SIZE 16
 
-/* 
- * The following Task Manager-local variables are assumed to be
- * accessed with the RTS lock in hand.
- */
-#if defined(SMP)
 TaskInfo* taskTable;
-#endif
-/* upper bound / the number of tasks created. */
-static nat maxTasks;  
-/* number of tasks currently created */
-static nat taskCount; 
-static nat awaitDeath;
+static nat taskTableSize;
+
+HashTable *taskHash; // maps OSThreadID to TaskInfo*
+
+nat taskCount;
+static nat tasksRunning;
+static nat workerCount;
+
+#define DEFAULT_MAX_WORKERS 64
+nat maxWorkers; // we won't create more workers than this
 
-#if defined(SMP)
 void
-startTaskManager( nat maxCount, void (*taskStart)(void) )
+initTaskManager (void)
 {
-  nat i;
-  static int initialized = 0;
-  
-  if (!initialized) {
-  
-    taskCount = 0;
-    maxTasks = maxCount;
-    /* allocate table holding task metadata */
+    static int initialized = 0;
+
+    if (!initialized) {
+       taskTableSize = INIT_TASK_TABLE_SIZE;
+       taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
+                                   "initTaskManager");
+    
+       taskCount = 0;
+       workerCount = 0;
+       tasksRunning = 0;
+
+       taskHash = allocHashTable();
   
-    if (maxCount > 0) {
-      taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo),
-                              "startTaskManager:tasks");
-
-      /* and eagerly create them all. */
-      for (i = 0; i < maxCount; i++) {
-       startTask(taskStart);
-       taskCount++;
-      }
+       maxWorkers = DEFAULT_MAX_WORKERS;
+
+       initialized = 1;
     }
-    initialized = 1;
-  }
 }
 
-rtsBool
-startTask ( void (*taskStart)(void) )
+static void
+expandTaskTable (void)
 {
-  int r;
-  OSThreadId tid;
-
-  r = createOSThread(&tid,taskStart);
-  if (r != 0) {
-    barf("startTask: Can't create new task");
-  }
-
-  taskTable[taskCount].id = tid;
-  taskTable[taskCount].mut_time = 0.0;
-  taskTable[taskCount].mut_etime = 0.0;
-  taskTable[taskCount].gc_time = 0.0;
-  taskTable[taskCount].gc_etime = 0.0;
-  taskTable[taskCount].elapsedtimestart = stat_getElapsedTime();
-
-  IF_DEBUG(scheduler,debugBelch("scheduler: Started task: %ld\n",tid););
-  return rtsTrue;
+    taskTableSize *= 2;
+    taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
+                               "expandTaskTable");
 }
 
 void
 stopTaskManager (void)
 {
-  nat i;
-  OSThreadId tid = osThreadId();
-
-  /* Don't want to use pthread_cancel, since we'd have to install
-   * these silly exception handlers (pthread_cleanup_{push,pop}) around
-   * all our locks.
-   */
-#if 0
-  /* Cancel all our tasks */
-  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
-    pthread_cancel(taskTable[i].id);
-  }
-  
-  /* Wait for all the tasks to terminate */
-  for (i = 0; i < maxCount; i++) {
-    IF_DEBUG(scheduler,debugBelch("scheduler: waiting for task %ld\n", 
-                              taskTable[i].id));
-    pthread_join(taskTable[i].id, NULL);
-  }
-#endif
-
-  /* Send 'em all a SIGHUP.  That should shut 'em up. */
-  awaitDeath = taskCount==0 ? 0 : taskCount-1;
-  for (i = 0; i < taskCount; i++) {
-    /* don't cancel the thread running this piece of code. */
-    if ( taskTable[i].id != tid ) {
-      pthread_kill(taskTable[i].id,SIGTERM);
+    nat i;
+
+    IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
+    for (i = 1000; i > 0; i--) {
+       if (tasksRunning == 0) {
+           IF_DEBUG(scheduler, sched_belch("all tasks stopped"));
+           return;
+       }
+       prodWorker();
+       yieldThread();
     }
-  }
-  while (awaitDeath > 0) {
-    sched_yield();
-  }
-  
-  return;
+    IF_DEBUG(scheduler, sched_belch("%d tasks still running, exiting anyway", tasksRunning));
+
+    /* 
+       OLD CODE follows:
+    */
+#if old_code
+    /* Send 'em all a SIGHUP.  That should shut 'em up. */
+    awaitDeath = taskCount==0 ? 0 : taskCount-1;
+    for (i = 0; i < taskCount; i++) {
+       /* don't cancel the thread running this piece of code. */
+       if ( taskTable[i].id != tid ) {
+           pthread_kill(taskTable[i].id,SIGTERM);
+       }
+    }
+    while (awaitDeath > 0) {
+       sched_yield();
+    }
+#endif // old_code
 }
 
-void
-resetTaskManagerAfterFork (void)
+
+rtsBool
+startTasks (nat num, void (*taskStart)(void))
 {
-       barf("resetTaskManagerAfterFork not implemented for SMP");
+    nat i; 
+    for (i = 0; i < num; i++) {
+       if (!startTask(taskStart)) {
+           return rtsFalse;
+       }
+    }
+    return rtsTrue;
 }
 
-#else
-/************ THREADS version *****************/
-
-void
-startTaskManager( nat maxCount,
-                 void (*taskStart)(void) STG_UNUSED )
+static TaskInfo*
+newTask (OSThreadId id, rtsBool is_worker)
 {
-  /* In the threaded case, maxCount is used to limit the
-     the creation of worker tasks. Tasks are created lazily, i.e.,
-     when the current task gives up the token on executing
-     STG code.
-  */
-  maxTasks = maxCount;
-  taskCount = 0;
+    long currentElapsedTime, currentUserTime, elapsedGCTime;
+    TaskInfo *task_info;
+
+    if (taskCount >= taskTableSize) {
+       expandTaskTable();
+    }
+    
+    insertHashTable( taskHash, id, &(taskTable[taskCount]) );
+    
+    stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
+    
+    task_info = &taskTable[taskCount];
+    
+    task_info->id = id;
+    task_info->is_worker = is_worker;
+    task_info->stopped = rtsFalse;
+    task_info->mut_time = 0.0;
+    task_info->mut_etime = 0.0;
+    task_info->gc_time = 0.0;
+    task_info->gc_etime = 0.0;
+    task_info->muttimestart = currentUserTime;
+    task_info->elapsedtimestart = currentElapsedTime;
+    
+    taskCount++;
+    workerCount++;
+    tasksRunning++;
+
+    IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks););
+    
+    return task_info;
 }
 
 rtsBool
-startTask ( void (*taskStart)(void) )
+startTask (void (*taskStart)(void))
 {
   int r;
   OSThreadId tid;
-  
-  /* If more than one worker thread is known to be blocked waiting
-     on thread_ready_cond, don't create a new one.
-  */
-  if ( rts_n_waiting_tasks > 0) {
-    IF_DEBUG(scheduler,debugBelch(
-                              "scheduler: startTask: %d tasks waiting, not creating new one.\n", 
-                              rts_n_waiting_tasks););
-    // the task will run as soon as a capability is available,
-    // so there's no need to wake it.
-    return rtsFalse;
-  }
 
-  /* If the task limit has been reached, just return. */
-  if (maxTasks > 0 && taskCount == maxTasks) {
-    IF_DEBUG(scheduler,debugBelch("scheduler: startTask: task limit (%d) reached, not creating new one.\n",maxTasks));
-    return rtsFalse;
-  }
-  
   r = createOSThread(&tid,taskStart);
   if (r != 0) {
     barf("startTask: Can't create new task");
   }
-  taskCount++;
-
-  IF_DEBUG(scheduler,debugBelch("scheduler: startTask: new task %ld (total_count: %d; waiting: %d)\n", tid, taskCount, rts_n_waiting_tasks););
+  newTask (tid, rtsTrue);
   return rtsTrue;
 }
 
+TaskInfo *
+threadIsTask (OSThreadId id)
+{
+    TaskInfo *task_info;
+    
+    task_info = lookupHashTable(taskHash, id);
+    if (task_info != NULL) {
+       if (task_info->stopped) {
+           task_info->stopped = rtsFalse;
+       }
+       return task_info;
+    }
+
+    return newTask(id, rtsFalse);
+}
 
+TaskInfo *
+taskOfId (OSThreadId id)
+{
+    return lookupHashTable(taskHash, id);
+}
 
 void
-stopTaskManager ()
+taskStop (void)
 {
+    OSThreadId id;
+    long currentElapsedTime, currentUserTime, elapsedGCTime;
+    TaskInfo *task_info;
+
+    id = osThreadId();
+    task_info = taskOfId(id);
+    if (task_info == NULL) {
+       debugBelch("taskStop: not a task");
+       return;
+    }
+    ASSERT(task_info->id == id);
+
+    task_info->stopped = rtsTrue;
+    tasksRunning--;
 
+    stat_getTimes(&currentElapsedTime, &currentUserTime, &elapsedGCTime);
+    
+    task_info->mut_time = 
+       currentUserTime - task_info->muttimestart - task_info->gc_time;
+    task_info->mut_etime = 
+       currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime;
+
+    if (task_info->mut_time < 0.0)  { task_info->mut_time = 0.0;  }
+    if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; }
 }
 
 void
-resetTaskManagerAfterFork ( void )
+resetTaskManagerAfterFork (void)
 {
-       rts_n_waiting_tasks = 0;
-       taskCount = 0;
+    rts_n_waiting_tasks = 0;
+    taskCount = 0;
 }
-#endif
 
+rtsBool
+maybeStartNewWorker (void (*taskStart)(void))
+{
+    /* 
+     * If more than one worker thread is known to be blocked waiting
+     * on thread_ready_cond, don't create a new one.
+     */
+    if ( rts_n_waiting_tasks > 0) {
+       IF_DEBUG(scheduler,sched_belch(
+                    "startTask: %d tasks waiting, not creating new one", 
+                    rts_n_waiting_tasks););
+       // the task will run as soon as a capability is available,
+       // so there's no need to wake it.
+       return rtsFalse;
+    }
+    
+    /* If the task limit has been reached, just return. */
+    if (maxWorkers > 0 && workerCount >= maxWorkers) {
+       IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers));
+       return rtsFalse;
+    }
+    
+    return startTask(taskStart);
+}
 
 #endif /* RTS_SUPPORTS_THREADS */
index 7dd29ad..b1add2f 100644 (file)
 /* -----------------------------------------------------------------------------
  *
- * (c) The GHC Team 2001-2003
+ * (c) The GHC Team 2001-2005
  *
- * Types + prototypes for functions in Task.c
- * (RTS subsystem for handling tasks, agents thay may execute STG code).
+ * Tasks
  *
  * -------------------------------------------------------------------------*/
+
 #ifndef __TASK_H__
 #define __TASK_H__
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
 
+/* Definition of a Task:
+ *
+ * A task is an OSThread that runs Haskell code.  Every OSThread
+ * created by the RTS for the purposes of running Haskell code is a
+ * Task.  We maintain information about Tasks mainly for the purposes
+ * of stats gathering.
+ *
+ * There may exist OSThreads that run Haskell code, but which aren't
+ * tasks (they don't have an associated TaskInfo structure).  This
+ * happens when a thread makes an in-call to Haskell: we don't want to
+ * create a Task for every in-call and register stats for all these
+ * threads, so it is not therefore mandatory to have a Task for every
+ * thread running Haskell code.
+ *
+ * The SMP build lets multiple tasks concurrently execute STG code,
+ * all sharing vital internal RTS data structures in a controlled manner.
+ *
+ * The 'threaded' build has at any one time only one task executing STG
+ * code, other tasks are either busy executing code outside the RTS
+ * (e.g., a C call) or waiting for their turn to (again) evaluate some
+ * STG code. A task relinquishes its RTS token when it is asked to
+ * evaluate an external (C) call.
+ */
+
+#if defined(RTS_SUPPORTS_THREADS) /* to the end */
 /* 
- * Tasks evaluate STG code; the TaskInfo structure collects together
+ * Tasks evaluate Haskell code; the TaskInfo structure collects together
  * misc metadata about a task.
- * 
  */
 typedef struct _TaskInfo {
   OSThreadId id;
-  double     elapsedtimestart;
-  double     mut_time;
-  double     mut_etime;
-  double     gc_time;
-  double     gc_etime;
+  rtsBool    is_worker;                /* rtsFalse <=> is a bound thread */
+  rtsBool    stopped;           /* this task has stopped or exited Haskell */
+  long       elapsedtimestart;
+  long       muttimestart;
+  long       mut_time;
+  long       mut_etime;
+  long       gc_time;
+  long       gc_etime;
 } TaskInfo;
 
 extern TaskInfo *taskTable;
+extern nat taskCount;
 
-extern void startTaskManager ( nat maxTasks, void (*taskStart)(void) );
-extern void stopTaskManager ( void );
-extern void resetTaskManagerAfterFork ( void );
+/*
+ * Start and stop the task manager.
+ * Requires: sched_mutex.
+ */
+extern void initTaskManager (void);
+extern void stopTaskManager (void);
 
-extern rtsBool startTask ( void (*taskStart)(void) );
+/*
+ * Two ways to start tasks: either singly or in a batch
+ * Requires: sched_mutex.
+ */
+extern rtsBool startTasks (nat num, void (*taskStart)(void));
+extern rtsBool startTask  (void (*taskStart)(void));
+
+/*
+ * Notify the task manager that a task has stopped.  This is used
+ * mainly for stats-gathering purposes.
+ * Requires: sched_mutex.
+ */
+extern void taskStop (void);
+
+/*
+ * After a fork, the tasks are not carried into the child process, so
+ * we must tell the task manager.
+ * Requires: sched_mutex.
+ */
+extern void resetTaskManagerAfterFork (void);
+
+/*
+ * Tell the task manager that the current OS thread is now a task,
+ * because it has entered Haskell as a bound thread.
+ * Requires: sched_mutex.
+ */
+extern TaskInfo* threadIsTask (OSThreadId id);
+
+/*
+ * Get the TaskInfo structure corresponding to an OSThread.  Returns
+ * NULL if the thread is not a task.
+ * Requires: sched_mutex.
+ */
+extern TaskInfo* taskOfId (OSThreadId id);
+
+/*
+ * Decides whether to call startTask() or not, based on how many
+ * workers are already running and waiting for work.  Returns
+ * rtsTrue if a worker was created.
+ * Requires: sched_mutex.
+ */
+extern rtsBool maybeStartNewWorker (void (*taskStart)(void));
 
 #endif /* RTS_SUPPORTS_THREADS */
 #endif /* __TASK_H__ */