[project @ 2000-01-12 15:15:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 54c1ace..1d037a1 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.38 1999/12/01 16:13:25 simonmar Exp $
+ * $Id: Schedule.c,v 1.39 2000/01/12 15:15:17 simonmar Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -47,6 +47,9 @@
 #include "Profiling.h"
 #include "Sanity.h"
 #include "Stats.h"
+#include "Sparks.h"
+
+#include <stdarg.h>
 
 /* Main threads:
  *
@@ -98,8 +101,9 @@ static StgTSO *threadStackOverflow(StgTSO *tso);
 
 /* flag set by signal handler to precipitate a context switch */
 nat context_switch;
+
 /* if this flag is set as well, give up execution */
-static nat interrupted;
+rtsBool interrupted;
 
 /* Next thread ID to allocate.
  * Locks required: sched_mutex
@@ -145,8 +149,12 @@ task_info *task_ids;
 void            addToBlockedQueue ( StgTSO *tso );
 
 static void     schedule          ( void );
-static void     initThread        ( StgTSO *tso, nat stack_size );
        void     interruptStgRts   ( void );
+static StgTSO * createThread_     ( nat size, rtsBool have_lock );
+
+#ifdef DEBUG
+static void sched_belch(char *s, ...);
+#endif
 
 #ifdef SMP
 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
@@ -194,7 +202,7 @@ schedule( void )
      * threads.
      */
     if (interrupted) {
-      IF_DEBUG(scheduler,belch("schedule: interrupted"));
+      IF_DEBUG(scheduler, sched_belch("interrupted"));
       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
        deleteThread(t);
       }
@@ -215,18 +223,22 @@ schedule( void )
       StgMainThread *m, **prev;
       prev = &main_threads;
       for (m = main_threads; m != NULL; m = m->link) {
-       if (m->tso->whatNext == ThreadComplete) {
+       switch (m->tso->whatNext) {
+       case ThreadComplete:
          if (m->ret) {
            *(m->ret) = (StgClosure *)m->tso->sp[0];
          }
          *prev = m->link;
          m->stat = Success;
          pthread_cond_broadcast(&m->wakeup);
-       }
-       if (m->tso->whatNext == ThreadKilled) {
+         break;
+       case ThreadKilled:
          *prev = m->link;
          m->stat = Killed;
          pthread_cond_broadcast(&m->wakeup);
+         break;
+       default:
+         break;
        }
       }
     }
@@ -251,6 +263,49 @@ schedule( void )
     }
 #endif
 
+    /* Top up the run queue from our spark pool.  We try to make the
+     * number of threads in the run queue equal to the number of
+     * free capabilities.
+     */
+#if defined(SMP) || defined(PAR)
+    {
+      nat n = n_free_capabilities;
+      StgTSO *tso = run_queue_hd;
+
+      /* Count the run queue */
+      while (n > 0 && tso != END_TSO_QUEUE) {
+       tso = tso->link;
+       n--;
+      }
+
+      for (; n > 0; n--) {
+       StgClosure *spark;
+       spark = findSpark();
+       if (spark == NULL) {
+         break; /* no more sparks in the pool */
+       } else {
+         StgTSO *tso;
+         tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
+         pushClosure(tso,spark);
+         PUSH_ON_RUN_QUEUE(tso);
+#ifdef ToDo
+         advisory_thread_count++;
+#endif
+         
+         IF_DEBUG(scheduler,
+                  sched_belch("turning spark of closure %p into a thread",
+                              (StgClosure *)spark));
+       }
+      }
+      /* We need to wake up the other tasks if we just created some
+       * work for them.
+       */
+      if (n_free_capabilities - n > 1) {
+         pthread_cond_signal(&thread_ready_cond);
+      }
+    }
+#endif /* SMP || PAR */
+
     /* Check whether any waiting threads need to be woken up.  If the
      * run queue is empty, and there are no other tasks running, we
      * can wait indefinitely for something to happen.
@@ -261,7 +316,7 @@ schedule( void )
       awaitEvent(
           (run_queue_hd == END_TSO_QUEUE)
 #ifdef SMP
-       && (n_free_capabilities == RtsFlags.ConcFlags.nNodes)
+       && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
 #endif
        );
     }
@@ -281,7 +336,7 @@ schedule( void )
 #ifdef SMP
     if (blocked_queue_hd == END_TSO_QUEUE
        && run_queue_hd == END_TSO_QUEUE
-       && (n_free_capabilities == RtsFlags.ConcFlags.nNodes)
+       && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
        ) {
       StgMainThread *m;
       for (m = main_threads; m != NULL; m = m->link) {
@@ -307,8 +362,7 @@ schedule( void )
      * completed.
      */
     if (ready_to_gc) {
-      IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n",
-                                pthread_self()););
+      IF_DEBUG(scheduler,sched_belch("waiting for GC"));
       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
     }
     
@@ -316,13 +370,9 @@ schedule( void )
      * capability.
      */
     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
-      IF_DEBUG(scheduler,
-              fprintf(stderr, "schedule (task %ld): waiting for work\n",
-                      pthread_self()););
+      IF_DEBUG(scheduler, sched_belch("waiting for work"));
       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
-      IF_DEBUG(scheduler,
-              fprintf(stderr, "schedule (task %ld): work now available\n",
-                      pthread_self()););
+      IF_DEBUG(scheduler, sched_belch("work now available"));
     }
 #endif
   
@@ -351,11 +401,7 @@ schedule( void )
 
     RELEASE_LOCK(&sched_mutex);
     
-#ifdef SMP
-    IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): running thread %d\n", pthread_self(),t->id));
-#else
-    IF_DEBUG(scheduler,fprintf(stderr,"schedule: running thread %d\n",t->id));
-#endif
+    IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
 
     /* Run the current thread 
      */
@@ -375,7 +421,7 @@ schedule( void )
 #ifdef INTERPRETER
       {
          StgClosure* c;
-        IF_DEBUG(scheduler,belch("schedule: entering Hugs"));    
+        IF_DEBUG(scheduler,sched_belch("entering Hugs"));
         c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
         cap->rCurrentTSO->sp += 1;
         ret = enter(cap,c);
@@ -494,7 +540,7 @@ schedule( void )
 #endif
 
 #ifdef SMP
-    if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) {
+    if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) {
 #else
     if (ready_to_gc) {
 #endif
@@ -504,7 +550,7 @@ schedule( void )
        * broadcast on gc_pending_cond afterward.
        */
 #ifdef SMP
-      IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self()));
+      IF_DEBUG(scheduler,sched_belch("doing GC"));
 #endif
       GarbageCollect(GetRoots);
       ready_to_gc = rtsFalse;
@@ -520,7 +566,7 @@ schedule( void )
 void deleteAllThreads ( void )
 {
   StgTSO* t;
-  IF_DEBUG(scheduler,belch("deleteAllThreads()"));
+  IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
     deleteThread(t);
   }
@@ -554,15 +600,8 @@ suspendThread( Capability *cap )
 
   ACQUIRE_LOCK(&sched_mutex);
 
-#ifdef SMP
-  IF_DEBUG(scheduler,
-          fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n", 
-                  pthread_self(), cap->rCurrentTSO->id));
-#else
   IF_DEBUG(scheduler,
-          fprintf(stderr, "schedule: thread %d did a _ccall_gc\n", 
-                  cap->rCurrentTSO->id));
-#endif
+          sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
 
   threadPaused(cap->rCurrentTSO);
   cap->rCurrentTSO->link = suspended_ccalling_threads;
@@ -604,13 +643,9 @@ resumeThread( StgInt tok )
 
 #ifdef SMP
   while (free_capabilities == NULL) {
-    IF_DEBUG(scheduler,
-            fprintf(stderr,"schedule (task %ld): waiting to resume\n",
-                    pthread_self()));
+    IF_DEBUG(scheduler, sched_belch("waiting to resume"));
     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
-    IF_DEBUG(scheduler,fprintf(stderr,
-                              "schedule (task %ld): resuming thread %d\n",
-                              pthread_self(), tso->id));
+    IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
   }
   cap = free_capabilities;
   free_capabilities = cap->link;
@@ -660,36 +695,38 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
    -------------------------------------------------------------------------- */
 
 StgTSO *
-createThread(nat stack_size)
+createThread(nat size)
+{
+  return createThread_(size, rtsFalse);
+}
+
+static StgTSO *
+createThread_(nat size, rtsBool have_lock)
 {
   StgTSO *tso;
+  nat stack_size;
 
   /* catch ridiculously small stack sizes */
-  if (stack_size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
-    stack_size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+  if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+    size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
   }
 
-  tso = (StgTSO *)allocate(stack_size);
-  TICK_ALLOC_TSO(stack_size-sizeofW(StgTSO),0);
+  tso = (StgTSO *)allocate(size);
+  TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
   
-  initThread(tso, stack_size - TSO_STRUCT_SIZEW);
-  return tso;
-}
+  stack_size = size - TSO_STRUCT_SIZEW;
 
-void
-initThread(StgTSO *tso, nat stack_size)
-{
   SET_HDR(tso, &TSO_info, CCS_MAIN);
-  tso->whatNext     = ThreadEnterGHC;
+  tso->whatNext = ThreadEnterGHC;
   
   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
         protect the increment operation on next_thread_id.
         In future, we could use an atomic increment instead.
   */
   
-  ACQUIRE_LOCK(&sched_mutex); 
+  if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
   tso->id = next_thread_id++; 
-  RELEASE_LOCK(&sched_mutex);
+  if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
 
   tso->why_blocked  = NotBlocked;
   tso->blocked_exceptions = NULL;
@@ -709,9 +746,9 @@ initThread(StgTSO *tso, nat stack_size)
   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
   tso->su = (StgUpdateFrame*)tso->sp;
 
-  IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words", 
-                          tso->id, tso->stack_size));
-
+  IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
+                                tso->id, tso->stack_size));
+  return tso;
 }
 
 
@@ -819,17 +856,19 @@ void initScheduler(void)
     Capability *cap, *prev;
     cap  = NULL;
     prev = NULL;
-    for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+    for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
       cap->link = prev;
       prev = cap;
     }
     free_capabilities = cap;
-    n_free_capabilities = RtsFlags.ConcFlags.nNodes;
+    n_free_capabilities = RtsFlags.ParFlags.nNodes;
   }
   IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n",
                             n_free_capabilities););
 #endif
+
+  initSparkPools();
 }
 
 #ifdef SMP
@@ -841,11 +880,11 @@ startTasks( void )
   pthread_t tid;
   
   /* make some space for saving all the thread ids */
-  task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info),
+  task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
                            "initScheduler:task_ids");
   
   /* and create all the threads */
-  for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
     r = pthread_create(&tid,NULL,taskStart,NULL);
     if (r != 0) {
       barf("startTasks: Can't create new Posix thread");
@@ -865,7 +904,7 @@ void
 exitScheduler( void )
 {
 #ifdef SMP
-  nat i; 
+  nat i;
 
   /* Don't want to use pthread_cancel, since we'd have to install
    * these silly exception handlers (pthread_cleanup_{push,pop}) around
@@ -873,12 +912,12 @@ exitScheduler( void )
    */
 #if 0
   /* Cancel all our tasks */
-  for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
     pthread_cancel(task_ids[i].id);
   }
   
   /* Wait for all the tasks to terminate */
-  for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
     IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n", 
                               task_ids[i].id));
     pthread_join(task_ids[i].id, NULL);
@@ -887,8 +926,8 @@ exitScheduler( void )
 
   /* Send 'em all a SIGHUP.  That should shut 'em up.
    */
-  await_death = RtsFlags.ConcFlags.nNodes;
-  for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+  await_death = RtsFlags.ParFlags.nNodes;
+  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
     pthread_kill(task_ids[i].id,SIGTERM);
   }
   while (await_death > 0) {
@@ -959,9 +998,13 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
 #ifdef SMP
   pthread_cond_destroy(&m->wakeup);
 #endif
+
+  IF_DEBUG(scheduler, fprintf(stderr, "schedule: main thread (%d) finished\n", 
+                             m->tso->id));
   free(m);
 
   RELEASE_LOCK(&sched_mutex);
+
   return stat;
 }
   
@@ -1029,6 +1072,10 @@ static void GetRoots(void)
   }
   suspended_ccalling_threads = 
     (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
+
+#if defined(SMP) || defined(PAR) || defined(GRAN)
+  markSparkQueue();
+#endif
 }
 
 /* -----------------------------------------------------------------------------
@@ -1171,12 +1218,7 @@ unblockOneLocked(StgTSO *tso)
   next = tso->link;
   PUSH_ON_RUN_QUEUE(tso);
   THREAD_RUNNABLE();
-#ifdef SMP
-  IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld", 
-                          pthread_self(), tso->id));
-#else
-  IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id));
-#endif
+  IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
   return next;
 }
 
@@ -1370,7 +1412,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
     return;
   }
 
-  IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id));
+  IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
 
   /* Remove it from any blocking queues */
   unblockThread(tso);
@@ -1549,3 +1591,22 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
   barf("raiseAsync");
 }
 
+/* -----------------------------------------------------------------------------
+   Debuggery...
+   -------------------------------------------------------------------------- */
+
+#ifdef DEBUG
+static void
+sched_belch(char *s, ...)
+{
+  va_list ap;
+  va_start(ap,s);
+#ifdef SMP
+  fprintf(stderr, "scheduler (task %ld): ", pthread_self());
+#else
+  fprintf(stderr, "scheduler: ");
+#endif
+  vfprintf(stderr, s, ap);
+  fprintf(stderr, "\n");
+}
+#endif