[project @ 2003-10-01 10:49:07 by wolfgang]
authorwolfgang <unknown>
Wed, 1 Oct 2003 10:49:09 +0000 (10:49 +0000)
committerwolfgang <unknown>
Wed, 1 Oct 2003 10:49:09 +0000 (10:49 +0000)
Threaded RTS:
Don't start new worker threads earlier than necessary.
After this commit, a Haskell program that uses neither forkOS nor forkIO is
really single-threaded (rather than using two OS threads internally).

Some details:
Worker threads are now only created when a capability is released, and
only when
(there are no worker threads)
&& (there are runnable Haskell threads ||
    there are Haskell threads blocked on IO or threadDelay)
awaitEvent can now be called from bound thread scheduling loops
(so that we don't have to create a worker thread just to run awaitEvent)

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/RtsAPI.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h
ghc/rts/Select.c
ghc/rts/Task.c
ghc/rts/Task.h

index 74d50ac..d748aee 100644 (file)
@@ -199,6 +199,8 @@ void releaseCapability(Capability* cap
 #endif
     /* Signal that a capability is available */
     signalCondition(&thread_ready_cond);
+    startSchedulerTaskIfNecessary();  // if there is more work to be done,
+                                     // we'll need a new thread
   }
 #endif
 #ifdef RTS_SUPPORTS_THREADS
@@ -324,6 +326,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
  */
  
 static Condition *passTarget = NULL;
+static rtsBool passingCapability = rtsFalse;
  
 void 
 waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
@@ -334,8 +337,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
   IF_DEBUG(scheduler,
           fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
              osThreadId(),pThreadCond));
-  while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
-      || (!pThreadCond && passTarget)) {
+  while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) {
     if(pThreadCond)
     {
       waitCondition(pThreadCond, pMutex);
@@ -353,7 +355,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
                  osThreadId()));
     }
   }
-  passTarget = NULL;
+  passingCapability = rtsFalse;
   grabCapability(pCap);
   return;
 }
@@ -378,11 +380,40 @@ passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
     rts_n_free_capabilities = 1;
     signalCondition(pTargetThreadCond);
     passTarget = pTargetThreadCond;
+       passingCapability = rtsTrue;
     IF_DEBUG(scheduler,
             fprintf(stderr,"worker thread (%p): passCapability\n",
                osThreadId()));
 }
 
+/*
+ * Function: passCapabilityToWorker(Mutex*, Capability*)
+ *
+ * Purpose:  Let go of the capability and make sure that a
+ *          "plain" worker thread (not a bound thread) gets it next.
+ *
+ * Pre-condition: pMutex is held and cap is held by the current thread
+ * Post-condition: pMutex is held; cap will be grabbed by the "target"
+ *                thread when pMutex is released.
+ */
+
+void
+passCapabilityToWorker(Mutex* pMutex, Capability* cap)
+{
+#ifdef SMP
+  #error SMP version not implemented
+#endif
+    rts_n_free_capabilities = 1;
+    signalCondition(&thread_ready_cond);
+    startSchedulerTaskIfNecessary();
+    passTarget = NULL;
+    passingCapability = rtsTrue;
+    IF_DEBUG(scheduler,
+            fprintf(stderr,"worker thread (%p): passCapabilityToWorker\n",
+               osThreadId()));
+}
+
+
 
 #endif /* RTS_SUPPORTS_THREADS */
 
index 70acc15..ede787b 100644 (file)
@@ -42,6 +42,7 @@ extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
 extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
 extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
 extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond);
+extern void passCapabilityToWorker(Mutex* pMutex, Capability* cap);
 
 static inline rtsBool needToYieldToReturningWorker(void)
 {
index 835db72..8d1cfd9 100644 (file)
@@ -1,5 +1,5 @@
 /* ----------------------------------------------------------------------------
- * $Id: RtsAPI.c,v 1.48 2003/10/01 10:36:49 wolfgang Exp $
+ * $Id: RtsAPI.c,v 1.49 2003/10/01 10:49:07 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2001
  *
@@ -501,12 +501,6 @@ rts_lock()
                // b) wake the current worker thread from awaitEvent()
                //       (so that a thread started by rts_eval* will start immediately)
        grabReturnCapability(&sched_mutex,&rtsApiCapability);
-       
-               // In the RTS hasn't been entered yet,
-               // start a RTS task.
-               // If there is already a task available (waiting for the work capability),
-               // this will do nothing.
-       startSchedulerTask();
 #endif
 }
 
index 2754c4f..33db7e6 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.175 2003/09/26 13:32:14 panne Exp $
+ * $Id: Schedule.c,v 1.176 2003/10/01 10:49:08 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -313,20 +313,38 @@ StgTSO * activateSpark (rtsSpark spark);
 StgTSO   *MainTSO;
  */
 
-#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS)
+static rtsBool startingWorkerThread = rtsFalse;
+
 static void taskStart(void);
 static void
 taskStart(void)
 {
-  schedule(NULL,NULL);
+  Capability *cap;
+  
+  ACQUIRE_LOCK(&sched_mutex);
+  startingWorkerThread = rtsFalse;
+  waitForWorkCapability(&sched_mutex, &cap, NULL);
+  RELEASE_LOCK(&sched_mutex);
+  
+  schedule(NULL,cap);
 }
-#endif
 
-#if defined(RTS_SUPPORTS_THREADS)
 void
-startSchedulerTask(void)
+startSchedulerTaskIfNecessary(void)
 {
-    startTask(taskStart);
+  if(run_queue_hd != END_TSO_QUEUE
+    || blocked_queue_hd != END_TSO_QUEUE
+    || sleeping_queue != END_TSO_QUEUE)
+  {
+    if(!startingWorkerThread)
+    { // we don't want to start another worker thread
+      // just because the last one hasn't yet reached the
+      // "waiting for capability" state
+      startingWorkerThread = rtsTrue;
+      startTask(taskStart);
+    }
+  }
 }
 #endif
 
@@ -475,7 +493,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
       // so just exit right away.
       prog_belch("interrupted");
       releaseCapability(cap);
-      startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
       RELEASE_LOCK(&sched_mutex);
       shutdownHaskellAndExit(EXIT_SUCCESS);
 #else
@@ -1151,7 +1168,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
          // no, the current native thread is bound to a different
          // Haskell thread, so pass it to any worker thread
          PUSH_ON_RUN_QUEUE(t);
-         releaseCapability(cap);
+         passCapabilityToWorker(&sched_mutex, cap);
          cap = NULL;
          continue; 
        }
@@ -1830,9 +1847,6 @@ suspendThread( StgRegTable *reg,
      waiting to take over.
   */
   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
-  //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
-      startTask(taskStart);
-  //}
 #endif
 
   /* Other threads _might_ be available for execution; signal this */
@@ -2245,9 +2259,10 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCap
   m->ret = ret;
   m->stat = NoStatus;
 #if defined(RTS_SUPPORTS_THREADS)
-  initCondition(&m->wakeup);
 #if defined(THREADED_RTS)
   initCondition(&m->bound_thread_cond);
+#else
+  initCondition(&m->wakeup);
 #endif
 #endif
 
@@ -2459,9 +2474,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
   m->ret = ret;
   m->stat = NoStatus;
 #if defined(RTS_SUPPORTS_THREADS)
-  initCondition(&m->wakeup);
 #if defined(THREADED_RTS)
   initCondition(&m->bound_thread_cond);
+#else
+  initCondition(&m->wakeup);
 #endif
 #endif
 
@@ -2512,9 +2528,10 @@ waitThread_(StgMainThread* m, Capability *initialCapability)
   stat = m->stat;
 
 #if defined(RTS_SUPPORTS_THREADS)
-  closeCondition(&m->wakeup);
 #if defined(THREADED_RTS)
   closeCondition(&m->bound_thread_cond);
+#else
+  closeCondition(&m->wakeup);
 #endif
 #endif
 
@@ -3498,7 +3515,11 @@ deleteThreadImmediately(StgTSO *tso)
   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
       return;
   }
-  unblockThread(tso);
+#if defined(RTS_SUPPORTS_THREADS)
+  if (tso->why_blocked != BlockedOnCCall
+      && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
+#endif
+    unblockThread(tso);
   tso->what_next = ThreadKilled;
 }
 #endif
index fccac3c..ebbe7d1 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $
+ * $Id: Schedule.h,v 1.40 2003/10/01 10:49:09 wolfgang Exp $
  *
  * (c) The GHC Team 1998-1999
  *
@@ -190,9 +190,10 @@ typedef struct StgMainThread_ {
   SchedulerStatus  stat;
   StgClosure **    ret;
 #if defined(RTS_SUPPORTS_THREADS)
-  Condition        wakeup;
 #if defined(THREADED_RTS)
   Condition        bound_thread_cond;
+#else
+  Condition        wakeup;
 #endif
 #endif
   struct StgMainThread_ *link;
@@ -297,12 +298,12 @@ void labelThread(StgPtr tso, char *label);
 
 #if defined(RTS_SUPPORTS_THREADS)
 /* If no task is waiting for a capability,
+ * and if there is work to be done
+ * or if we need to wait for IO or delay requests,
  * spawn a new worker thread.
- *
- * (Used by the RtsAPI)
  */
 void
-startSchedulerTask(void);
+startSchedulerTaskIfNecessary(void);
 #endif
 
 #endif /* __SCHEDULE_H__ */
index 677fdd2..d7e6ffc 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Select.c,v 1.29 2003/06/26 12:22:59 stolz Exp $
+ * $Id: Select.c,v 1.30 2003/10/01 10:49:09 wolfgang Exp $
  *
  * (c) The GHC Team 1995-2002
  *
@@ -351,4 +351,20 @@ wakeBlockedWorkerThread()
        workerWakeupPending = rtsTrue;
     }
 }
+
+/* resetWorkerWakeupPipeAfterFork
+ *
+ * To be called right after a fork().
+ * After the fork(), the worker wakeup pipe will be shared
+ * with the parent process, and that's something we don't want.
+ */
+void
+resetWorkerWakeupPipeAfterFork()
+{
+    if(workerWakeupInited) {
+       close(workerWakeupPipe[0]);
+       close(workerWakeupPipe[1]);
+    }
+    workerWakeupInited = rtsFalse;
+}
 #endif
index 92b5c25..c720538 100644 (file)
@@ -134,6 +134,12 @@ stopTaskManager ()
   return;
 }
 
+void
+resetTaskManagerAfterFork ()
+{
+       barf("resetTaskManagerAfterFork not implemented for SMP");
+}
+
 #else
 /************ THREADS version *****************/
 
@@ -192,6 +198,13 @@ stopTaskManager ()
 {
 
 }
+
+void
+resetTaskManagerAfterFork ()
+{
+       rts_n_waiting_tasks = 0;
+       taskCount = 0;
+}
 #endif
 
 
index bf29d91..ee59987 100644 (file)
@@ -28,6 +28,7 @@ extern TaskInfo *taskIds;
 
 extern void startTaskManager ( nat maxTasks, void (*taskStart)(void) );
 extern void stopTaskManager ( void );
+void resetTaskManagerAfterFork ();
 
 extern void startTask ( void (*taskStart)(void) );