[project @ 2004-10-14 14:58:37 by simonmar]
authorsimonmar <unknown>
Thu, 14 Oct 2004 14:58:51 +0000 (14:58 +0000)
committersimonmar <unknown>
Thu, 14 Oct 2004 14:58:51 +0000 (14:58 +0000)
Threaded RTS improvements:

 - Unix only: implement waitRead#, waitWrite# and delay# in Haskell,
   by having a single Haskell thread (the IO manager) performing a blocking
   select() operation.  Threads communicate with the IO manager
   via channels.  This is faster than doing the select() in the RTS,
   because we only restart the select() when a new request arrives,
   rather than each time around the scheduler.

   On Windows we just make blocking IO calls, we don't have a fancy IO
   manager (yet).

 - Simplify the scheduler for the threaded RTS, now that we don't have
   to wait for IO in the scheduler loop.

 - Remove detectBlackHoles(), which isn't used now (not sure how long
   this has been unused for... perhaps it was needed back when main threads
   used to be GC roots, so we had to check for blackholes manually rather
   than relying on the GC.)

Signals aren't quite right in the threaded RTS.  In fact, they're
slightly worse than before, because the thread receiving signals might
be blocked in a C call - previously there always be another thread
stuck in awaitEvent() that would notice the signal, but that's not
true now.  I can't see an easy fix yet.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/PrimOps.cmm
ghc/rts/Schedule.c
ghc/rts/Schedule.h
ghc/rts/Select.c
ghc/rts/Signals.c
ghc/rts/Signals.h

index 3ea96fe..62f205d 100644 (file)
@@ -56,8 +56,8 @@ nat rts_n_waiting_workers = 0;
  * exclusive access to the RTS and all its data structures (that are not
  * locked by the Scheduler's mutex).
  *
- * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
- *
+ * thread_ready_cond is signalled whenever
+ *      !noCapabilities && !EMPTY_RUN_QUEUE().
  */
 Condition thread_ready_cond = INIT_COND_VAR;
 
@@ -82,6 +82,12 @@ static rtsBool passingCapability = rtsFalse;
 #define UNUSED_IF_NOT_SMP STG_UNUSED
 #endif
 
+#if defined(RTS_USER_SIGNALS)
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending())
+#else
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted)
+#endif
+
 /* ----------------------------------------------------------------------------
    Initialisation
    ------------------------------------------------------------------------- */
@@ -211,7 +217,7 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
        rts_n_free_capabilities = 1;
 #endif
        // Signal that a capability is available
-       if (rts_n_waiting_tasks > 0) {
+       if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
            signalCondition(&thread_ready_cond);
        }
        startSchedulerTaskIfNecessary();
@@ -263,7 +269,6 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
 
     if ( noCapabilities() || passingCapability ) {
        rts_n_waiting_workers++;
-       wakeBlockedWorkerThread();
        context_switch = 1;     // make sure it's our turn soon
        waitCondition(&returning_worker_cond, pMutex);
 #if defined(SMP)
@@ -294,8 +299,16 @@ yieldCapability( Capability** pCap )
     // Pre-condition:  pMutex is assumed held, the current thread
     // holds the capability pointed to by pCap.
 
-    if ( rts_n_waiting_workers > 0 || passingCapability ) {
-       IF_DEBUG(scheduler, sched_belch("worker: giving up capability"));
+    if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
+       IF_DEBUG(scheduler, 
+                if (rts_n_waiting_workers > 0) {
+                    sched_belch("worker: giving up capability (returning wkr)");
+                } else if (passingCapability) {
+                    sched_belch("worker: giving up capability (passing capability)");
+                } else {
+                    sched_belch("worker: giving up capability (no threads to run)");
+                }
+           );
        releaseCapability(*pCap);
        *pCap = NULL;
     }
@@ -324,13 +337,14 @@ yieldCapability( Capability** pCap )
  *           passed to this thread using passCapability.
  * ------------------------------------------------------------------------- */
  
-void 
+void
 waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
 {
     // Pre-condition: pMutex is held.
 
-    while ( noCapabilities() || 
-           (passingCapability && passTarget != pThreadCond)) {
+    while ( noCapabilities() ||
+           (passingCapability && passTarget != pThreadCond) ||
+           !ANY_WORK_TO_DO()) {
        IF_DEBUG(scheduler,
                 sched_belch("worker: wait for capability (cond: %p)",
                             pThreadCond));
@@ -384,6 +398,27 @@ passCapabilityToWorker( void )
 
 #endif /* RTS_SUPPORTS_THREADS */
 
+/* ----------------------------------------------------------------------------
+   threadRunnable()
+
+   Signals that a thread has been placed on the run queue, so a worker
+   might need to be woken up to run it.
+
+   ToDo: should check whether the thread at the front of the queue is
+   bound, and if so wake up the appropriate worker.
+   -------------------------------------------------------------------------- */
+
+void
+threadRunnable ( void )
+{
+#if defined(RTS_SUPPORTS_THREADS)
+    if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
+       signalCondition(&thread_ready_cond);
+    }
+    startSchedulerTaskIfNecessary();
+#endif
+}
+
 /* ------------------------------------------------------------------------- */
 
 #if defined(SMP)
index 450bf74..e615035 100644 (file)
@@ -31,6 +31,10 @@ extern void initCapabilities( void );
 //
 extern void releaseCapability( Capability* cap );
 
+// Signal that a thread has become runnable
+//
+extern void threadRunnable ( void );
+
 #ifdef RTS_SUPPORTS_THREADS
 // Gives up the current capability IFF there is a higher-priority
 // thread waiting for it.  This happens in one of two ways:
index c9556f4..91c1325 100644 (file)
@@ -1342,6 +1342,10 @@ mkApUpd0zh_fast
 waitReadzh_fast
 {
     /* args: R1 */
+#ifdef THREADED_RTS
+    foreign "C" barf("waitRead# on threaded RTS");
+#endif
+
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
     StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
     StgTSO_block_info(CurrentTSO) = R1;
@@ -1354,6 +1358,10 @@ waitReadzh_fast
 waitWritezh_fast
 {
     /* args: R1 */
+#ifdef THREADED_RTS
+    foreign "C" barf("waitWrite# on threaded RTS");
+#endif
+
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
     StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
     StgTSO_block_info(CurrentTSO) = R1;
@@ -1374,6 +1382,10 @@ delayzh_fast
     W_ t, prev, target;
 #endif
 
+#ifdef THREADED_RTS
+    foreign "C" barf("delay# on threaded RTS");
+#endif
+
     /* args: R1 (microsecond delay amount) */
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
     StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
@@ -1432,6 +1444,10 @@ asyncReadzh_fast
     W_ ares;
     CInt reqID;
 
+#ifdef THREADED_RTS
+    foreign "C" barf("asyncRead# on threaded RTS");
+#endif
+
     /* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
     StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
@@ -1454,6 +1470,10 @@ asyncWritezh_fast
     W_ ares;
     CInt reqID;
 
+#ifdef THREADED_RTS
+    foreign "C" barf("asyncWrite# on threaded RTS");
+#endif
+
     /* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
     StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
index 04e9f1f..04e70da 100644 (file)
@@ -453,13 +453,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
     // run queue is empty, and there are no other tasks running, we
     // can wait indefinitely for something to happen.
     //
-    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
+    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
+    {
 #if defined(RTS_SUPPORTS_THREADS)
-               || EMPTY_RUN_QUEUE()
+       // We shouldn't be here...
+       barf("schedule: awaitEvent() in threaded RTS");
 #endif
-       )
-    {
-      awaitEvent( EMPTY_RUN_QUEUE() );
+       awaitEvent( EMPTY_RUN_QUEUE() );
     }
     // we can be interrupted while waiting for I/O...
     if (interrupted) continue;
@@ -479,18 +479,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
     if (   EMPTY_THREAD_QUEUES() )
     {
        IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
+
        // Garbage collection can release some new threads due to
        // either (a) finalizers or (b) threads resurrected because
-       // they are about to be send BlockedOnDeadMVar.  Any threads
-       // thus released will be immediately runnable.
+       // they are unreachable and will therefore be sent an
+       // exception.  Any threads thus released will be immediately
+       // runnable.
        GarbageCollect(GetRoots,rtsTrue);
-
-       if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-
-       IF_DEBUG(scheduler, 
-                sched_belch("still deadlocked, checking for black holes..."));
-       detectBlackHoles();
-
        if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
 
 #if defined(RTS_USER_SIGNALS)
@@ -1457,12 +1452,6 @@ forkProcess(HsStablePtr *entry
       stgFree(m);
     }
     
-# ifdef RTS_SUPPORTS_THREADS
-    resetTaskManagerAfterFork();      // tell startTask() and friends that
-    startingWorkerThread = rtsFalse;  // we have no worker threads any more
-    resetWorkerWakeupPipeAfterFork();
-# endif
-    
     rc = rts_evalStableIO(entry, NULL);  // run the action
     rts_checkSchedStatus("forkProcess",rc);
     
@@ -1568,8 +1557,6 @@ suspendThread( StgRegTable *reg )
   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
 #endif
 
-  /* Other threads _might_ be available for execution; signal this */
-  THREAD_RUNNABLE();
   RELEASE_LOCK(&sched_mutex);
   
   errno = saved_errno;
@@ -1933,11 +1920,10 @@ static void scheduleThread_ (StgTSO* tso);
 void
 scheduleThread_(StgTSO *tso)
 {
-  // Precondition: sched_mutex must be held.
   // The thread goes at the *end* of the run-queue, to avoid possible
   // starvation of any threads already on the queue.
   APPEND_TO_RUN_QUEUE(tso);
-  THREAD_RUNNABLE();
+  threadRunnable();
 }
 
 void
@@ -1997,7 +1983,7 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
     
     APPEND_TO_RUN_QUEUE(tso);
-    // NB. Don't call THREAD_RUNNABLE() here, because the thread is
+    // NB. Don't call threadRunnable() here, because the thread is
     // bound and only runnable by *this* OS thread, so waking up other
     // workers will just slow things down.
 
@@ -2428,7 +2414,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
       next = bqe->link;
       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
-      THREAD_RUNNABLE();
+      threadRunnable();
       unblockCount(bqe, node);
       /* reset blocking status after dumping event */
       ((StgTSO *)bqe)->why_blocked = NotBlocked;
@@ -2473,7 +2459,7 @@ unblockOneLocked(StgTSO *tso)
   next = tso->link;
   tso->link = END_TSO_QUEUE;
   APPEND_TO_RUN_QUEUE(tso);
-  THREAD_RUNNABLE();
+  threadRunnable();
   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
   return next;
 }
@@ -2644,9 +2630,6 @@ interruptStgRts(void)
 {
     interrupted    = 1;
     context_switch = 1;
-#ifdef RTS_SUPPORTS_THREADS
-    wakeBlockedWorkerThread();
-#endif
 }
 
 /* -----------------------------------------------------------------------------
@@ -3277,70 +3260,6 @@ resurrectThreads( StgTSO *threads )
   }
 }
 
-/* -----------------------------------------------------------------------------
- * Blackhole detection: if we reach a deadlock, test whether any
- * threads are blocked on themselves.  Any threads which are found to
- * be self-blocked get sent a NonTermination exception.
- *
- * This is only done in a deadlock situation in order to avoid
- * performance overhead in the normal case.
- *
- * Locks: sched_mutex is held upon entry and exit.
- * -------------------------------------------------------------------------- */
-
-#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
-static void
-detectBlackHoles( void )
-{
-    StgTSO *tso = all_threads;
-    StgPtr frame;
-    StgClosure *blocked_on;
-    StgRetInfoTable *info;
-
-    for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
-
-       while (tso->what_next == ThreadRelocated) {
-           tso = tso->link;
-           ASSERT(get_itbl(tso)->type == TSO);
-       }
-      
-       if (tso->why_blocked != BlockedOnBlackHole) {
-           continue;
-       }
-       blocked_on = tso->block_info.closure;
-
-       frame = tso->sp;
-
-       while(1) {
-           info = get_ret_itbl((StgClosure *)frame);
-           switch (info->i.type) {
-           case UPDATE_FRAME:
-               if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
-                   /* We are blocking on one of our own computations, so
-                    * send this thread the NonTermination exception.  
-                    */
-                   IF_DEBUG(scheduler, 
-                            sched_belch("thread %d is blocked on itself", tso->id));
-                   raiseAsync(tso, (StgClosure *)NonTermination_closure);
-                   goto done;
-               }
-               
-               frame = (StgPtr)((StgUpdateFrame *)frame + 1);
-               continue;
-
-           case STOP_FRAME:
-               goto done;
-
-               // normal stack frames; do nothing except advance the pointer
-           default:
-               frame += stack_frame_sizeW((StgClosure *)frame);
-           }
-       }   
-       done: ;
-    }
-}
-#endif
-
 /* ----------------------------------------------------------------------------
  * Debugging: why is a thread blocked
  * [Also provides useful information when debugging threaded programs
index 8924a62..59d900f 100644 (file)
@@ -156,7 +156,6 @@ extern nat         RTS_VAR(rts_n_waiting_workers);
 extern nat         RTS_VAR(rts_n_waiting_tasks);
 #endif
 
-StgBool rtsSupportsBoundThreads(void);
 StgBool isThreadBound(StgTSO *tso);
 
 extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
@@ -280,17 +279,6 @@ void labelThread(StgPtr tso, char *label);
     }                                          \
     blocked_queue_tl = tso;
 
-/* Signal that a runnable thread has become available, in
- * case there are any waiting tasks to execute it.
- */
-#if defined(RTS_SUPPORTS_THREADS)
-#define THREAD_RUNNABLE()                      \
-  wakeBlockedWorkerThread();                   \
-  context_switch = 1;
-#else
-#define THREAD_RUNNABLE()  /* nothing */
-#endif
-
 /* Check whether various thread queues are empty
  */
 #define EMPTY_QUEUE(q)         (q == END_TSO_QUEUE)
index 2687064..418e48c 100644 (file)
 /* last timestamp */
 nat timestamp = 0;
 
-#ifdef RTS_SUPPORTS_THREADS
-static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
-static rtsBool workerWakeupPending = rtsFalse;
-static int workerWakeupPipe[2];
-static rtsBool workerWakeupInited = rtsFalse;
-#endif
-
 /* There's a clever trick here to avoid problems when the time wraps
  * around.  Since our maximum delay is smaller than 31 bits of ticks
  * (it's actually 31 bits of microseconds), we can safely check
@@ -163,34 +156,6 @@ awaitEvent(rtsBool wait)
        }
       }
 
-#ifdef RTS_SUPPORTS_THREADS
-      if(!workerWakeupInited) {
-          pipe(workerWakeupPipe);
-          workerWakeupInited = rtsTrue;
-      }
-      FD_SET(workerWakeupPipe[0], &rfd);
-      maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
-#endif
-      
-      /* Release the scheduler lock while we do the poll.
-       * this means that someone might muck with the blocked_queue
-       * while we do this, but it shouldn't matter:
-       *
-       *   - another task might poll for I/O and remove one
-       *     or more threads from the blocked_queue.
-       *   - more I/O threads may be added to blocked_queue.
-       *   - more delayed threads may be added to blocked_queue. We'll
-       *     just subtract delta from their delays after the poll.
-       *
-       * I believe none of these cases lead to trouble --SDM.
-       */
-      
-#ifdef RTS_SUPPORTS_THREADS
-      isWorkerBlockedInAwaitEvent = rtsTrue;
-      workerWakeupPending = rtsFalse;
-#endif
-      RELEASE_LOCK(&sched_mutex);
-
       /* Check for any interesting events */
       
       tv.tv_sec  = min / 1000000;
@@ -223,10 +188,6 @@ awaitEvent(rtsBool wait)
              barf("select failed");
            }
          }
-         ACQUIRE_LOCK(&sched_mutex);
-#ifdef RTS_SUPPORTS_THREADS
-          isWorkerBlockedInAwaitEvent = rtsFalse;
-#endif
 
          /* We got a signal; could be one of ours.  If so, we need
           * to start up the signal handler straight away, otherwise
@@ -235,9 +196,7 @@ awaitEvent(rtsBool wait)
           */
 #if defined(RTS_USER_SIGNALS)
          if (signals_pending()) {
-             RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
              startSignalHandlers();
-             ACQUIRE_LOCK(&sched_mutex);
              return; /* still hold the lock */
          }
 #endif
@@ -258,24 +217,8 @@ awaitEvent(rtsBool wait)
          if (run_queue_hd != END_TSO_QUEUE) {
              return; /* still hold the lock */
          }
-         
-#ifdef RTS_SUPPORTS_THREADS
-         /* If another worker thread wants to take over,
-          * return to the scheduler
-          */
-         if (needToYieldToReturningWorker()) {
-             return; /* still hold the lock */
-         }
-#endif
-         
-#ifdef RTS_SUPPORTS_THREADS
-          isWorkerBlockedInAwaitEvent = rtsTrue;
-#endif
-         RELEASE_LOCK(&sched_mutex);
       }
 
-      ACQUIRE_LOCK(&sched_mutex);
-
       /* Step through the waiting queue, unblocking every thread that now has
        * a file descriptor in a ready state.
        */
@@ -317,51 +260,5 @@ awaitEvent(rtsBool wait)
          }
       }
       
-#if defined(RTS_SUPPORTS_THREADS)
-       // if we were woken up by wakeBlockedWorkerThread,
-       // read the dummy byte from the pipe
-      if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
-          unsigned char dummy;
-          wait = rtsFalse;
-          read(workerWakeupPipe[0],&dummy,1);
-      }
-#endif
     } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
 }
-
-
-#ifdef RTS_SUPPORTS_THREADS
-/* wakeBlockedWorkerThread
- *
- * If a worker thread is currently blocked within awaitEvent,
- * wake it.
- * Must be called with sched_mutex held.
- */
-void
-wakeBlockedWorkerThread()
-{
-    if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
-       unsigned char dummy = 42;       // Any value will do here
-       
-                       // write something so that select() wakes up
-       write(workerWakeupPipe[1],&dummy,1);
-       workerWakeupPending = rtsTrue;
-    }
-}
-
-/* resetWorkerWakeupPipeAfterFork
- *
- * To be called right after a fork().
- * After the fork(), the worker wakeup pipe will be shared
- * with the parent process, and that's something we don't want.
- */
-void
-resetWorkerWakeupPipeAfterFork()
-{
-    if(workerWakeupInited) {
-       close(workerWakeupPipe[0]);
-       close(workerWakeupPipe[1]);
-    }
-    workerWakeupInited = rtsFalse;
-}
-#endif
index d5a046e..ac6d266 100644 (file)
@@ -54,22 +54,25 @@ static nat n_haskell_handlers = 0;
 StgPtr pending_handler_buf[N_PENDING_HANDLERS];
 StgPtr *next_pending_handler = pending_handler_buf;
 
+/* -----------------------------------------------------------------------------
+ * Signal handling
+ * -------------------------------------------------------------------------- */
+
 #ifdef RTS_SUPPORTS_THREADS
 pthread_t signalHandlingThread;
 #endif
 
-       // Handle all signals in the current thread.
-       // Called from Capability.c whenever the main capability is granted to a thread
-       // and in installDefaultHandlers
+// Handle all signals in the current thread.
+// Called from Capability.c whenever the main capability is granted to a thread
+// and in installDefaultHandlers
 void
-handleSignalsInThisThread()
+handleSignalsInThisThread(void)
 {
 #ifdef RTS_SUPPORTS_THREADS
     signalHandlingThread = pthread_self();
 #endif
 }
 
-
 /* -----------------------------------------------------------------------------
  * Allocate/resize the table of signal handlers.
  * -------------------------------------------------------------------------- */
index 4825fb7..09ecec0 100644 (file)
@@ -28,6 +28,7 @@ extern void markSignalHandlers (evac_fn evac);
 extern void initDefaultHandlers(void);
 
 extern void handleSignalsInThisThread(void);
+extern void handleSignalsInPrevThread(void);
 
 #elif defined(mingw32_TARGET_OS)
 #define RTS_USER_SIGNALS 1