[project @ 2003-09-21 22:20:51 by wolfgang]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index c58584f..d29b6bb 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $
+ * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
 #include <stdlib.h>
 #include <stdarg.h>
 
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
 //@subsection Variables and Data structures
 
  */
 StgMainThread *main_threads = NULL;
 
-#ifdef THREADED_RTS
-// Pointer to the thread that executes main
-// When this thread is finished, the program terminates
-// by calling shutdownHaskellAndExit.
-// It would be better to add a call to shutdownHaskellAndExit
-// to the Main.main wrapper and to remove this hack.
-StgMainThread *main_main_thread = NULL;
-#endif
-
 /* Thread queues.
  * Locks required: sched_mutex.
  */
@@ -249,7 +244,7 @@ static rtsBool shutting_down_scheduler = rtsFalse;
 
 void            addToBlockedQueue ( StgTSO *tso );
 
-static void     schedule          ( void );
+static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
        void     interruptStgRts   ( void );
 
 static void     detectBlackHoles  ( void );
@@ -311,7 +306,7 @@ static void taskStart(void);
 static void
 taskStart(void)
 {
-  schedule();
+  schedule(NULL,NULL);
 }
 #endif
 
@@ -363,10 +358,10 @@ startSchedulerTask(void)
    ------------------------------------------------------------------------ */
 //@cindex schedule
 static void
-schedule( void )
+schedule( StgMainThread *mainThread, Capability *initialCapability )
 {
   StgTSO *t;
-  Capability *cap;
+  Capability *cap = initialCapability;
   StgThreadReturnCode ret;
 #if defined(GRAN)
   rtsEvent *event;
@@ -386,8 +381,16 @@ schedule( void )
   ACQUIRE_LOCK(&sched_mutex);
  
 #if defined(RTS_SUPPORTS_THREADS)
-  waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
-  IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
+  /* in the threaded case, the capability is either passed in via the initialCapability
+     parameter, or initialized inside the scheduler loop */
+
+  IF_DEBUG(scheduler,
+    fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
+           osThreadId(), osThreadId()));
+  IF_DEBUG(scheduler,
+    fprintf(stderr,"### main thread: %p\n",mainThread));
+  IF_DEBUG(scheduler,
+    fprintf(stderr,"### initial cap: %p\n",initialCapability));
 #else
   /* simply initialise it in the non-threaded case */
   grabCapability(&cap);
@@ -431,8 +434,19 @@ schedule( void )
 #if defined(RTS_SUPPORTS_THREADS)
     /* Check to see whether there are any worker threads
        waiting to deposit external call results. If so,
-       yield our capability */
-    yieldToReturningWorker(&sched_mutex, &cap);
+       yield our capability... if we have a capability, that is. */
+    if(cap)
+      yieldToReturningWorker(&sched_mutex, &cap,
+         mainThread ? &mainThread->bound_thread_cond : NULL);
+
+    /* If we do not currently hold a capability, we wait for one */
+    if(!cap)
+    {
+      waitForWorkCapability(&sched_mutex, &cap,
+         mainThread ? &mainThread->bound_thread_cond : NULL);
+      IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
+                                     osThreadId()));
+    }
 #endif
 
     /* If we're interrupted (the user pressed ^C, or some other
@@ -463,55 +477,63 @@ schedule( void )
      */
 #if defined(RTS_SUPPORTS_THREADS)
     { 
-      StgMainThread *m, **prev;
-      prev = &main_threads;
-      for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
-       switch (m->tso->what_next) {
-       case ThreadComplete:
-         if (m->ret) {
-              // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-             *(m->ret) = (StgClosure *)m->tso->sp[1]; 
-         }
-         *prev = m->link;
-         m->stat = Success;
-         broadcastCondition(&m->wakeup);
-#ifdef DEBUG
-         removeThreadLabel((StgWord)m->tso);
-#endif
-          if(m == main_main_thread)
-          {
-              releaseCapability(cap);
-              startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
-              RELEASE_LOCK(&sched_mutex);
-              shutdownHaskellAndExit(EXIT_SUCCESS);
-          }
-         break;
-       case ThreadKilled:
-         if (m->ret) *(m->ret) = NULL;
-         *prev = m->link;
-         if (was_interrupted) {
-           m->stat = Interrupted;
-         } else {
-           m->stat = Killed;
-         }
-         broadcastCondition(&m->wakeup);
+       StgMainThread *m, **prev;
+       prev = &main_threads;
+       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
+         if (m->tso->what_next == ThreadComplete
+             || m->tso->what_next == ThreadKilled)
+         {
+           if(m == mainThread)
+           {
+              if(m->tso->what_next == ThreadComplete)
+              {
+                if (m->ret)
+                {
+                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+                  *(m->ret) = (StgClosure *)m->tso->sp[1]; 
+                }
+                m->stat = Success;
+              }
+              else
+              {
+                if (m->ret)
+                {
+                  *(m->ret) = NULL;
+                }
+                if (was_interrupted)
+                {
+                  m->stat = Interrupted;
+                }
+                else
+                {
+                  m->stat = Killed;
+                }
+              }
+              *prev = m->link;
+           
 #ifdef DEBUG
-         removeThreadLabel((StgWord)m->tso);
+             removeThreadLabel((StgWord)m->tso);
 #endif
-          if(m == main_main_thread)
-          {
               releaseCapability(cap);
-              startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
               RELEASE_LOCK(&sched_mutex);
-              shutdownHaskellAndExit(EXIT_SUCCESS);
+              return;
+            }
+            else
+            {
+                // The current OS thread can not handle the fact that the Haskell
+                // thread "m" has ended. 
+                // "m" is bound; the scheduler loop in it's bound OS thread has
+                // to return, so let's pass our capability directly to that thread.
+              passCapability(&sched_mutex, cap, &m->bound_thread_cond);
+              cap = NULL;
+            }
           }
-         break;
-       default:
-         break;
        }
-      }
     }
-
+    
+    if(!cap)   // If we gave our capability away,
+      continue;        // go to the top to get it back
+      
 #else /* not threaded */
 
 # if defined(PAR)
@@ -1067,6 +1089,63 @@ schedule( void )
     IF_DEBUG(sanity,checkTSO(t));
 #endif
 
+#ifdef THREADED_RTS
+    {
+      StgMainThread *m;
+      for(m = main_threads; m; m = m->link)
+      {
+       if(m->tso == t)
+         break;
+      }
+      
+      if(m)
+      {
+       if(m == mainThread)
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
+                   t, osThreadId()));
+         // yes, the Haskell thread is bound to the current native thread
+       }
+       else
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
+                   t, osThreadId()));
+         // no, bound to a different Haskell thread: pass to that thread
+         PUSH_ON_RUN_QUEUE(t);
+         passCapability(&sched_mutex,cap,&m->bound_thread_cond);
+         cap = NULL;
+         continue;
+       }
+      }
+      else
+      {
+        // The thread we want to run is not bound.
+       if(mainThread == NULL)
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
+                   t, osThreadId()));
+          // if we are a worker thread,
+         // we may run it here
+       }
+       else
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
+                   t, mainThread, osThreadId()));
+         // no, the current native thread is bound to a different
+         // Haskell thread, so pass it to any worker thread
+         PUSH_ON_RUN_QUEUE(t);
+         releaseCapability(cap);
+         cap = NULL;
+         continue; 
+       }
+      }
+    }
+#endif
+
     cap->r.rCurrentTSO = t;
     
     /* context switches are now initiated by the timer signal, unless
@@ -1103,7 +1182,9 @@ run_thread:
        ret = ThreadFinished;
        break;
     case ThreadRunGHC:
+       errno = t->saved_errno;
        ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+       t->saved_errno = errno;
        break;
     case ThreadInterpret:
        ret = interpretBCO(cap);
@@ -1122,7 +1203,7 @@ run_thread:
     ACQUIRE_LOCK(&sched_mutex);
     
 #ifdef RTS_SUPPORTS_THREADS
-    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
+    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
 #elif !defined(GRAN) && !defined(PAR)
     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
 #endif
@@ -1492,19 +1573,54 @@ run_thread:
 }
 
 /* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#ifdef THREADED_RTS
+  return rtsTrue;
+#else
+  return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+StgBool
+isThreadBound(StgTSO* tso)
+{
+#ifdef THREADED_RTS
+  StgMainThread *m;
+  for(m = main_threads; m; m = m->link)
+  {
+    if(m->tso == tso)
+      return rtsTrue;
+  }
+#endif
+  return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
  * Singleton fork(). Do not copy any running threads.
  * ------------------------------------------------------------------------- */
 
+static void 
+deleteThreadImmediately(StgTSO *tso);
+
 StgInt
 forkProcess(StgTSO* tso)
 {
 #ifndef mingw32_TARGET_OS
   pid_t pid;
   StgTSO* t,*next;
-  StgMainThread *m;
-  rtsBool doKill;
 
   IF_DEBUG(scheduler,sched_belch("forking!"));
+  ACQUIRE_LOCK(&sched_mutex);
 
   pid = fork();
   if (pid) { /* parent */
@@ -1512,6 +1628,43 @@ forkProcess(StgTSO* tso)
   /* just return the pid */
     
   } else { /* child */
+#ifdef THREADED_RTS
+    /* wipe all other threads */
+    run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+    tso->link = END_TSO_QUEUE;
+    
+    for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      next = t->link;
+      
+      /* Don't kill the current thread.. */
+      if (t->id == tso->id) {
+       continue;
+      }
+      
+      if (isThreadBound(t)) {
+       // If the thread is bound, the OS thread that the thread is bound to
+       // no longer exists after the fork() system call.
+       // The bound Haskell thread is therefore unable to run at all;
+       // we must not give it a chance to survive by catching the
+       // ThreadKilled exception. So we kill it "brutally" rather than
+       // using deleteThread.
+       deleteThreadImmediately(t);
+      } else {
+       deleteThread(t);
+      }
+    }
+    
+    if (isThreadBound(tso)) {
+    } else {
+      // If the current is not bound, then we should make it so.
+      // The OS thread left over by fork() is special in that the process
+      // will terminate as soon as the thread terminates; 
+      // we'd expect forkProcess to behave similarily.
+      // FIXME - we don't do this.
+    }
+#else
+  StgMainThread *m;
+  rtsBool doKill;
   /* wipe all other threads */
   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
   tso->link = END_TSO_QUEUE;
@@ -1555,7 +1708,9 @@ forkProcess(StgTSO* tso)
       deleteThread(t);
     }
   }
+#endif
   }
+  RELEASE_LOCK(&sched_mutex);
   return pid;
 #else /* mingw32 */
   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
@@ -1618,6 +1773,7 @@ suspendThread( StgRegTable *reg,
 {
   nat tok;
   Capability *cap;
+  int saved_errno = errno;
 
   /* assume that *reg is a pointer to the StgRegTable part
    * of a Capability.
@@ -1667,6 +1823,8 @@ suspendThread( StgRegTable *reg,
   /* Other threads _might_ be available for execution; signal this */
   THREAD_RUNNABLE();
   RELEASE_LOCK(&sched_mutex);
+  
+  errno = saved_errno;
   return tok; 
 }
 
@@ -1676,6 +1834,7 @@ resumeThread( StgInt tok,
 {
   StgTSO *tso, **prev;
   Capability *cap;
+  int saved_errno = errno;
 
 #if defined(RTS_SUPPORTS_THREADS)
   /* Wait for permission to re-enter the RTS with the result. */
@@ -1717,6 +1876,7 @@ resumeThread( StgInt tok,
 #if defined(RTS_SUPPORTS_THREADS)
   RELEASE_LOCK(&sched_mutex);
 #endif
+  errno = saved_errno;
   return &cap->r;
 }
 
@@ -1845,6 +2005,8 @@ createThread(nat size)
   tso->why_blocked  = NotBlocked;
   tso->blocked_exceptions = NULL;
 
+  tso->saved_errno = 0;
+  
   tso->stack_size   = stack_size;
   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
                               - TSO_STRUCT_SIZEW;
@@ -2016,10 +2178,8 @@ activateSpark (rtsSpark spark)
 }
 #endif
 
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m
-#if defined(THREADED_RTS)
-                                  , rtsBool blockWaiting
-#endif
+static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
+                                  Capability *initialCapability
                                   );
 
 
@@ -2061,7 +2221,7 @@ void scheduleThread(StgTSO* tso)
 }
 
 SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
+scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
 {      // Precondition: sched_mutex must be held
   StgMainThread *m;
 
@@ -2071,6 +2231,9 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
   m->stat = NoStatus;
 #if defined(RTS_SUPPORTS_THREADS)
   initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+  initCondition(&m->bound_thread_cond);
+#endif
 #endif
 
   /* Put the thread on the main-threads list prior to scheduling the TSO.
@@ -2088,11 +2251,8 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
   main_threads = m;
 
   scheduleThread_(tso);
-#if defined(THREADED_RTS)
-  return waitThread_(m, rtsTrue);
-#else
-  return waitThread_(m);
-#endif
+
+  return waitThread_(m, initialCapability);
 }
 
 /* ---------------------------------------------------------------------------
@@ -2259,13 +2419,13 @@ finishAllThreads ( void )
 {
    do {
       while (run_queue_hd != END_TSO_QUEUE) {
-         waitThread ( run_queue_hd, NULL);
+         waitThread ( run_queue_hd, NULL, NULL );
       }
       while (blocked_queue_hd != END_TSO_QUEUE) {
-         waitThread ( blocked_queue_hd, NULL);
+         waitThread ( blocked_queue_hd, NULL, NULL );
       }
       while (sleeping_queue != END_TSO_QUEUE) {
-         waitThread ( blocked_queue_hd, NULL);
+         waitThread ( blocked_queue_hd, NULL, NULL );
       }
    } while 
       (blocked_queue_hd != END_TSO_QUEUE || 
@@ -2274,7 +2434,7 @@ finishAllThreads ( void )
 }
 
 SchedulerStatus
-waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
 { 
   StgMainThread *m;
   SchedulerStatus stat;
@@ -2285,6 +2445,9 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
   m->stat = NoStatus;
 #if defined(RTS_SUPPORTS_THREADS)
   initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+  initCondition(&m->bound_thread_cond);
+#endif
 #endif
 
   /* see scheduleWaitThread() comment */
@@ -2293,45 +2456,25 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
   main_threads = m;
 
   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
-#if defined(THREADED_RTS)
-  stat = waitThread_(m, rtsFalse);
-#else
-  stat = waitThread_(m);
-#endif
+
+  stat = waitThread_(m,initialCapability);
+  
   RELEASE_LOCK(&sched_mutex);
   return stat;
 }
 
 static
 SchedulerStatus
-waitThread_(StgMainThread* m
-#if defined(THREADED_RTS)
-           , rtsBool blockWaiting
-#endif
-          )
+waitThread_(StgMainThread* m, Capability *initialCapability)
 {
   SchedulerStatus stat;
 
   // Precondition: sched_mutex must be held.
   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
 
-#if defined(RTS_SUPPORTS_THREADS)
-
-# if defined(THREADED_RTS)
-  if (!blockWaiting) {
-    /* In the threaded case, the OS thread that called main()
-     * gets to enter the RTS directly without going via another
-     * task/thread.
-     */
-    main_main_thread = m;
-    RELEASE_LOCK(&sched_mutex);
-    schedule();
-    ACQUIRE_LOCK(&sched_mutex);
-    main_main_thread = NULL;
-    ASSERT(m->stat != NoStatus);
-  } else 
-# endif
-  {
+#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
+  {    // FIXME: does this still make sense?
+       // It's not for the threaded rts => SMP only
     do {
       waitCondition(&m->wakeup, &sched_mutex);
     } while (m->stat == NoStatus);
@@ -2343,10 +2486,11 @@ waitThread_(StgMainThread* m
   CurrentProc = MainProc;             // PE to run it on
 
   RELEASE_LOCK(&sched_mutex);
-  schedule();
+  schedule(m,initialCapability);
 #else
   RELEASE_LOCK(&sched_mutex);
-  schedule();
+  schedule(m,initialCapability);
+  ACQUIRE_LOCK(&sched_mutex);
   ASSERT(m->stat != NoStatus);
 #endif
 
@@ -2354,6 +2498,9 @@ waitThread_(StgMainThread* m
 
 #if defined(RTS_SUPPORTS_THREADS)
   closeCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+  closeCondition(&m->bound_thread_cond);
+#endif
 #endif
 
   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
@@ -3327,6 +3474,18 @@ deleteThread(StgTSO *tso)
   raiseAsync(tso,NULL);
 }
 
+static void 
+deleteThreadImmediately(StgTSO *tso)
+{ // for forkProcess only:
+  // delete thread without giving it a chance to catch the KillThread exception
+
+  if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+      return;
+  }
+  unblockThread(tso);
+  tso->what_next = ThreadKilled;
+}
+
 void
 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
 {