[project @ 2002-02-13 08:48:06 by sof]
authorsof <unknown>
Wed, 13 Feb 2002 08:48:07 +0000 (08:48 +0000)
committersof <unknown>
Wed, 13 Feb 2002 08:48:07 +0000 (08:48 +0000)
Revised implementation of multi-threaded callouts (and callins):

- unified synchronisation story for threaded and SMP builds,
  following up on SimonM's suggestion. The following synchro
  variables are now used inside the Scheduler:

    + thread_ready_cond - condition variable that is signalled
      when a H. thread has become runnable (via the THREAD_RUNNABLE()
      macro) and there are available capabilities. Waited on:
         + upon schedule() entry (iff no caps. available).
 + when a thread inside of the Scheduler spots that there
   are no runnable threads to service, but one or more
   external call is in progress.
 + in resumeThread(), waiting for a capability to become
   available.

      Prior to waiting on thread_ready_cond, a counter rts_n_waiting_tasks
      is incremented, so that we can keep track of the number of
      readily available worker threads (need this in order to make
      an informed decision on whether or not to create a new thread
      when an external call is made).

    + returning_worker_cond - condition variable that is waited
      on by an OS thread that has finished executing and external
      call & now want to feed its result back to the H thread
      that made the call. Before doing so, the counter
      rts_n_returning_workers is incremented.

      Upon entry to the Scheduler, this counter is checked for &
      if it is non-zero, the thread gives up its capability and
      signals returning_worker_cond before trying to re-grab a
      capability. (releaseCapability() takes care of this).

    + sched_mutex - protect Scheduler data structures.
    + gc_pending_cond - SMP-only condition variable for signalling
      completion of GCs.

- initial implementation of call-ins, i.e., multiple OS threads
  may concurrently call into the RTS without interfering with
  each other. Implementation uses cheesy locking protocol to
  ensure that only one OS thread at a time can construct a
  function application -- stop-gap measure until the RtsAPI
  is revised (as discussed last month) *and* a designated
  block is used for allocating these applications.

- In the implementation of call-ins, the OS thread blocks
  waiting for an RTS worker thread to complete the evaluation
  of the function application. Since main() also uses the
  RtsAPI, provide a separate entry point for it (rts_mainEvalIO()),
  which avoids creating a separate thread to evaluate Main.main,
  that can be done by the thread exec'ing main() directly.
  [Maybe there's a tidier way of doing this, a bit ugly the
  way it is now..]

There are a couple of dark corners that needs to be looked at,
such as conditions for shutting down (and how) + consider what
ought to happen when async I/O is thrown into the mix (I know
what will happen, but that's maybe not what we want).

Other than that, things are in a generally happy state & I hope
to declare myself done before the week is up.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Main.c
ghc/rts/RtsAPI.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h

index d3eacd6..d2a2ef8 100644 (file)
@@ -19,6 +19,7 @@
  * --------------------------------------------------------------------------*/
 #include "PosixSource.h"
 #include "Rts.h"
+#include "Schedule.h"
 #include "RtsUtils.h"
 #include "Capability.h"
 
@@ -51,6 +52,7 @@ initCapabilities()
   initCapabilities_(RtsFlags.ParFlags.nNodes);
 #else
   initCapability(&MainCapability);
+  rts_n_free_capabilities = 1;
 #endif
 
   return;
@@ -75,14 +77,38 @@ void grabCapability(Capability** cap)
 #endif
 }
 
-void releaseCapability(Capability* cap)
+/*
+ * Letting go of a capability
+ *
+ * Locks required: sched_mutex
+ */
+void releaseCapability(Capability* cap
+#if !defined(SMP)
+                      STG_UNUSED
+#endif
+)
 {
 #if defined(SMP)
   cap->link = free_capabilities;
   free_capabilities = cap;
   rts_n_free_capabilities++;
-#endif
+#else
   rts_n_free_capabilities = 1;
+#endif
+
+#if defined(RTS_SUPPORTS_THREADS)
+  /* Check to see whether a worker thread can be given
+     the go-ahead to return the result of an external call..*/
+  if (rts_n_waiting_workers > 0) {
+    /* The worker is responsible for grabbing the capability and
+     * decrementing the rts_n_returning_workers count
+     */
+    signalCondition(&returning_worker_cond);
+  } else if ( !EMPTY_RUN_QUEUE() ) {
+    /* Signal that work is available */
+    signalCondition(&thread_ready_cond);
+  }
+#endif
   return;
 }
 
index e59f495..b878507 100644 (file)
 extern Capability MainCapability;
 #endif
 
+
 extern void initCapabilities(void);
 extern void grabCapability(Capability** cap);
 extern void releaseCapability(Capability* cap);
 
 #if defined(RTS_SUPPORTS_THREADS)
-extern nat rts_n_free_capabilities;  /* total number of available capabilities */
+/* total number of available capabilities */
+extern nat rts_n_free_capabilities;  
 
-static inline nat getFreeCapabilities()
+static inline nat getFreeCapabilities (void)
 {
   return rts_n_free_capabilities;
 }
 
-static inline rtsBool noCapabilities()
+static inline rtsBool noCapabilities (void)
 {
   return (rts_n_free_capabilities == 0);
 }
 
-static inline rtsBool allFreeCapabilities()
+static inline rtsBool allFreeCapabilities (void)
 {
 # if defined(SMP)
   return (rts_n_free_capabilities == RtsFlags.ParFlags.nNodes);
index def9e55..49a681f 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Main.c,v 1.33 2002/02/05 15:42:04 simonpj Exp $
+ * $Id: Main.c,v 1.34 2002/02/13 08:48:06 sof Exp $
  *
  * (c) The GHC Team 1998-2000
  *
@@ -83,7 +83,7 @@ int main(int argc, char *argv[])
                   fprintf(stderr, "==== [%x] Main Thread Started ...\n", mytid));
 
       /* ToDo: Dump event for the main thread */
-      status = rts_evalIO((HaskellObj)mainIO_closure, NULL);
+      status = rts_mainEvalIO((HaskellObj)mainIO_closure, NULL);
     } else {
       /* Just to show we're alive */
       IF_PAR_DEBUG(verbose,
@@ -98,12 +98,12 @@ int main(int argc, char *argv[])
 #  elif defined(GRAN)
 
     /* ToDo: Dump event for the main thread */
-    status = rts_evalIO(mainIO_closure, NULL);
+    status = rts_mainEvalIO(mainIO_closure, NULL);
 
 #  else /* !PAR && !GRAN */
 
     /* ToDo: want to start with a larger stack size */
-    status = rts_evalIO((HaskellObj)mainIO_closure, NULL);
+    status = rts_mainEvalIO((HaskellObj)mainIO_closure, NULL);
 
 #  endif /* !PAR && !GRAN */
 
index 96092d0..4178837 100644 (file)
@@ -1,5 +1,5 @@
 /* ----------------------------------------------------------------------------
- * $Id: RtsAPI.c,v 1.31 2002/01/22 13:54:22 simonmar Exp $
+ * $Id: RtsAPI.c,v 1.32 2002/02/13 08:48:06 sof Exp $
  *
  * (c) The GHC Team, 1998-2001
  *
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "Prelude.h"
+#include "OSThreads.h"
+#include "Schedule.h"
+
+#if defined(THREADED_RTS)
+#define SCHEDULE_MAIN_THREAD(tso) scheduleThread_(tso,rtsFalse)
+#define WAIT_MAIN_THREAD(tso,ret) waitThread_(tso,ret,rtsFalse)
+#else
+#define SCHEDULE_MAIN_THREAD(tso) scheduleThread(tso)
+#define WAIT_MAIN_THREAD(tso,ret) waitThread(tso,ret)
+#endif
+
+#if defined(RTS_SUPPORTS_THREADS)
+/* Cheesy locking scheme while waiting for the 
+ * RTS API to change.
+ */
+static Mutex     alloc_mutex = INIT_MUTEX_VAR;
+static Condition alloc_cond  = INIT_COND_VAR;
+#define INVALID_THREAD_ID ((OSThreadId)(-1))
+
+/* Thread currently owning the allocator */
+static OSThreadId c_id = INVALID_THREAD_ID;
+
+static StgPtr alloc(nat n)
+{
+  OSThreadId tid = osThreadId();
+  ACQUIRE_LOCK(&alloc_mutex);
+  if (tid == c_id) {
+    /* I've got the lock, just allocate() */
+    ;
+  } else if (c_id == INVALID_THREAD_ID) {
+    c_id = tid;
+  } else {
+    waitCondition(&alloc_cond, &alloc_mutex);
+    c_id = tid;
+  }
+  RELEASE_LOCK(&alloc_mutex);
+  return allocate(n);
+}
+
+static void releaseAllocLock(void)
+{
+  ACQUIRE_LOCK(&alloc_mutex);
+  /* Reset the allocator owner */
+  c_id = INVALID_THREAD_ID;
+  RELEASE_LOCK(&alloc_mutex);
+
+  /* Free up an OS thread waiting to get in */
+  signalCondition(&alloc_cond);
+}
+#else
+# define alloc(n) allocate(n)
+# define releaseAllocLock() /* nothing */
+#endif
+
 
 /* ----------------------------------------------------------------------------
    Building Haskell objects from C datatypes.
@@ -22,7 +76,7 @@
 HaskellObj
 rts_mkChar (HsChar c)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, Czh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgChar)c;
   return p;
@@ -31,7 +85,7 @@ rts_mkChar (HsChar c)
 HaskellObj
 rts_mkInt (HsInt i)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, Izh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgInt)i;
   return p;
@@ -40,7 +94,7 @@ rts_mkInt (HsInt i)
 HaskellObj
 rts_mkInt8 (HsInt8 i)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, I8zh_con_info, CCS_SYSTEM);
   /* Make sure we mask out the bits above the lowest 8 */
   p->payload[0]  = (StgClosure *)(StgInt)((unsigned)i & 0xff);
@@ -50,7 +104,7 @@ rts_mkInt8 (HsInt8 i)
 HaskellObj
 rts_mkInt16 (HsInt16 i)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, I16zh_con_info, CCS_SYSTEM);
   /* Make sure we mask out the relevant bits */
   p->payload[0]  = (StgClosure *)(StgInt)((unsigned)i & 0xffff);
@@ -60,7 +114,7 @@ rts_mkInt16 (HsInt16 i)
 HaskellObj
 rts_mkInt32 (HsInt32 i)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, I32zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgInt)((unsigned)i & 0xffffffff);
   return p;
@@ -70,7 +124,7 @@ HaskellObj
 rts_mkInt64 (HsInt64 i)
 {
   long long *tmp;
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
   SET_HDR(p, I64zh_con_info, CCS_SYSTEM);
   tmp  = (long long*)&(p->payload[0]);
   *tmp = (StgInt64)i;
@@ -80,7 +134,7 @@ rts_mkInt64 (HsInt64 i)
 HaskellObj
 rts_mkWord (HsWord i)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, Wzh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)i;
   return p;
@@ -90,7 +144,7 @@ HaskellObj
 rts_mkWord8 (HsWord8 w)
 {
   /* see rts_mkInt* comments */
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, W8zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)(w & 0xff);
   return p;
@@ -100,7 +154,7 @@ HaskellObj
 rts_mkWord16 (HsWord16 w)
 {
   /* see rts_mkInt* comments */
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, W16zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)(w & 0xffff);
   return p;
@@ -110,7 +164,7 @@ HaskellObj
 rts_mkWord32 (HsWord32 w)
 {
   /* see rts_mkInt* comments */
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, W32zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)(w & 0xffffffff);
   return p;
@@ -121,7 +175,7 @@ rts_mkWord64 (HsWord64 w)
 {
   unsigned long long *tmp;
 
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
   /* see mk_Int8 comment */
   SET_HDR(p, W64zh_con_info, CCS_SYSTEM);
   tmp  = (unsigned long long*)&(p->payload[0]);
@@ -132,7 +186,7 @@ rts_mkWord64 (HsWord64 w)
 HaskellObj
 rts_mkFloat (HsFloat f)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
   SET_HDR(p, Fzh_con_info, CCS_SYSTEM);
   ASSIGN_FLT((P_)p->payload, (StgFloat)f);
   return p;
@@ -141,7 +195,7 @@ rts_mkFloat (HsFloat f)
 HaskellObj
 rts_mkDouble (HsDouble d)
 {
-  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble)));
+  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,sizeofW(StgDouble)));
   SET_HDR(p, Dzh_con_info, CCS_SYSTEM);
   ASSIGN_DBL((P_)p->payload, (StgDouble)d);
   return p;
@@ -150,7 +204,7 @@ rts_mkDouble (HsDouble d)
 HaskellObj
 rts_mkStablePtr (HsStablePtr s)
 {
-  StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
+  StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
   SET_HDR(p, StablePtr_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)s;
   return p;
@@ -159,7 +213,7 @@ rts_mkStablePtr (HsStablePtr s)
 HaskellObj
 rts_mkPtr (HsPtr a)
 {
-  StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
+  StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
   SET_HDR(p, Ptr_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)a;
   return p;
@@ -186,7 +240,7 @@ rts_mkString (char *s)
 HaskellObj
 rts_apply (HaskellObj f, HaskellObj arg)
 {
-  StgAP_UPD *ap = (StgAP_UPD *)allocate(AP_sizeW(1));
+  StgAP_UPD *ap = (StgAP_UPD *)alloc(AP_sizeW(1));
   SET_HDR(ap, &stg_AP_UPD_info, CCS_SYSTEM);
   ap->n_args = 1;
   ap->fun    = f;
@@ -400,6 +454,7 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret)
     StgTSO *tso;
 
     tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
+    releaseAllocLock();
     scheduleThread(tso);
     return waitThread(tso, ret);
 }
@@ -410,6 +465,7 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
     StgTSO *tso;
     
     tso = createGenThread(stack_size, p);
+    releaseAllocLock();
     scheduleThread(tso);
     return waitThread(tso, ret);
 }
@@ -424,11 +480,27 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
     StgTSO* tso; 
     
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
+    releaseAllocLock();
     scheduleThread(tso);
     return waitThread(tso, ret);
 }
 
 /*
+ * Identical to rts_evalIO(), but won't create a new task/OS thread
+ * to evaluate the Haskell thread. Used by main() only. Hack.
+ */
+SchedulerStatus
+rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret)
+{
+    StgTSO* tso; 
+    
+    tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
+    releaseAllocLock();
+    SCHEDULE_MAIN_THREAD(tso);
+    return WAIT_MAIN_THREAD(tso, ret);
+}
+
+/*
  * rts_evalStableIO() is suitable for calling from Haskell.  It
  * evaluates a value of the form (StablePtr (IO a)), forcing the
  * action's result to WHNF before returning.  The result is returned
@@ -443,6 +515,7 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
     
     p = (StgClosure *)deRefStablePtr(s);
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
+    releaseAllocLock();
     scheduleThread(tso);
     stat = waitThread(tso, &r);
 
@@ -463,6 +536,7 @@ rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
     StgTSO *tso;
 
     tso = createIOThread(stack_size, p);
+    releaseAllocLock();
     scheduleThread(tso);
     return waitThread(tso, ret);
 }
index 4430f5a..d73559e 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $
+ * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -158,7 +158,7 @@ StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
 
 /* 
-   In GranSim we have a runable and a blocked queue for each processor.
+   In GranSim we have a runnable and a blocked queue for each processor.
    In order to minimise code changes new arrays run_queue_hds/tls
    are created. run_queue_hd is then a short cut (macro) for
    run_queue_hds[CurrentProc] (see GranSim.h).
@@ -265,44 +265,31 @@ static void sched_belch(char *s, ...);
  */
 Mutex     sched_mutex       = INIT_MUTEX_VAR;
 Mutex     term_mutex        = INIT_MUTEX_VAR;
-#if defined(THREADED_RTS)
-/*
- * The rts_mutex is the 'big lock' that the active native
- * thread within the RTS holds while executing code.
- * It is given up when the thread makes a transition out of
- * the RTS (e.g., to perform an external C call), hopefully
- * for another thread to take over its chores and enter
- * the RTS.
- *
- */
-Mutex     rts_mutex         = INIT_MUTEX_VAR;
+
+
 /*
  * When a native thread has completed executing an external
  * call, it needs to communicate the result back to the
  * (Haskell) thread that made the call. Do this as follows:
  *
  *  - in resumeThread(), the thread increments the counter
- *    threads_waiting, and then blocks on the 'big' RTS lock.
- *  - upon entry to the scheduler, the thread that's currently
- *    holding the RTS lock checks threads_waiting. If there
- *    are native threads waiting, it gives up its RTS lock
- *    and tries to re-grab the RTS lock [perhaps after having
- *    waited for a bit..?]
- *  - care must be taken to deal with the case where more than
- *    one external thread are waiting on the lock. [ToDo: more]
- *    
+ *    rts_n_returning_workers, and then blocks waiting on the
+ *    condition returning_worker_cond.
+ *  - upon entry to the scheduler, a worker/task checks 
+ *    rts_n_returning_workers. If it is > 0, worker threads
+ *    are waiting to return, so it gives up its capability
+ *    to let a worker deposit its result.
+ *  - the worker thread that gave up its capability then tries
+ *    to re-grab a capability and re-enter the Scheduler.
  */
 
-static nat threads_waiting = 0;
-#endif
-
 
 /* thread_ready_cond: when signalled, a thread has become runnable for a
  * task to execute.
  *
  * In the non-SMP case, it also implies that the thread that is woken up has
- * exclusive access to the RTS and all its DS (that are not under sched_mutex's
- * control).
+ * exclusive access to the RTS and all its data structures (that are not
+ * under sched_mutex's control).
  *
  * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
  *
@@ -313,12 +300,40 @@ Condition thread_ready_cond = INIT_COND_VAR;
 #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
 #endif
 
-#if defined(SMP)
-Condition gc_pending_cond   = INIT_COND_VAR;
+/*
+ * To be able to make an informed decision about whether or not 
+ * to create a new task when making an external call, keep track of
+ * the number of tasks currently blocked waiting on thread_ready_cond.
+ * (if > 0 => no need for a new task, just unblock an existing one).
+ */
+nat rts_n_waiting_tasks = 0;
+
+/* returning_worker_cond: when a worker thread returns from executing an
+ * external call, it needs to wait for an RTS Capability before passing
+ * on the result of the call to the Haskell thread that made it.
+ * 
+ * returning_worker_cond is signalled in Capability.releaseCapability().
+ *
+ */
+Condition returning_worker_cond = INIT_COND_VAR;
+
+/*
+ * To avoid starvation of threads blocked on worker_thread_cond,
+ * the task(s) that enter the Scheduler will check to see whether
+ * there are one or more worker threads blocked waiting on
+ * returning_worker_cond.
+ *
+ * Locks needed: sched_mutex
+ */
+nat rts_n_waiting_workers = 0;
+
+
+# if defined(SMP)
+static Condition gc_pending_cond = INIT_COND_VAR;
 nat await_death;
-#endif
+# endif
 
-#endif
+#endif /* RTS_SUPPORTS_THREADS */
 
 #if defined(PAR)
 StgTSO *LastTSO;
@@ -360,12 +375,6 @@ static void taskStart(void);
 static void
 taskStart(void)
 {
-  /* threads start up using 'taskStart', so make them
-     them grab the RTS lock. */
-#if defined(THREADED_RTS)
-  ACQUIRE_LOCK(&rts_mutex);
-  taskNotAvailable();
-#endif
   schedule();
 }
 #endif
@@ -431,28 +440,36 @@ schedule( void )
 # endif
 #endif
   rtsBool was_interrupted = rtsFalse;
+
+#if defined(RTS_SUPPORTS_THREADS)
+schedule_start:
+#endif
   
+#if defined(RTS_SUPPORTS_THREADS)
   ACQUIRE_LOCK(&sched_mutex);
-  
-#if defined(THREADED_RTS)
+#endif
+#if defined(RTS_SUPPORTS_THREADS)
   /* ToDo: consider SMP support */
-  if (threads_waiting > 0) {
+  if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
     /* (At least) one native thread is waiting to
      * deposit the result of an external call. So,
-     * give up our RTS executing privileges and let
-     * one of them continue.
-     * 
+     * be nice and hand over our capability.
      */
-    taskAvailable();
+    IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers));
+    releaseCapability(cap);
     RELEASE_LOCK(&sched_mutex);
-    IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
-    RELEASE_LOCK(&rts_mutex);
-    /* ToDo: come up with mechanism that guarantees that
-     * the main thread doesn't loop here.
-     */
+
     yieldThread();
-    /* ToDo: longjmp() */
-    taskStart();
+    goto schedule_start;
+  }
+#endif
+
+#if defined(RTS_SUPPORTS_THREADS)
+  while ( noCapabilities() ) {
+    rts_n_waiting_tasks++;
+    waitCondition(&thread_ready_cond, &sched_mutex);
+    rts_n_waiting_tasks--;
   }
 #endif
 
@@ -646,21 +663,25 @@ schedule( void )
      * inform all the main threads.
      */
 #ifndef PAR
-    if (   EMPTY_QUEUE(blocked_queue_hd)
-       && EMPTY_RUN_QUEUE()
+    if (   EMPTY_RUN_QUEUE()
+       && EMPTY_QUEUE(blocked_queue_hd)
        && EMPTY_QUEUE(sleeping_queue)
-#if defined(SMP)
-       && allFreeCapabilities()
-#elif defined(THREADED_RTS)
+#if defined(RTS_SUPPORTS_THREADS)
        && EMPTY_QUEUE(suspended_ccalling_threads)
 #endif
+#ifdef SMP
+       && allFreeCapabilities()
+#endif
        )
     {
        IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
+#if defined(THREADED_RTS)
+       /* and SMP mode ..? */
+       releaseCapability(cap);
+#endif
        RELEASE_LOCK(&sched_mutex);
        GarbageCollect(GetRoots,rtsTrue);
        ACQUIRE_LOCK(&sched_mutex);
-       IF_DEBUG(scheduler, sched_belch("GC done."));
        if (   EMPTY_QUEUE(blocked_queue_hd)
            && EMPTY_RUN_QUEUE()
            && EMPTY_QUEUE(sleeping_queue) ) {
@@ -705,8 +726,10 @@ schedule( void )
 #endif
            }
 #if defined(RTS_SUPPORTS_THREADS)
+           /* ToDo: revisit conditions (and mechanism) for shutting
+              down a multi-threaded world  */
            if ( EMPTY_RUN_QUEUE() ) {
-             IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
+             IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
              shutdownHaskellAndExit(0);
            
            }
@@ -728,31 +751,22 @@ schedule( void )
     }
 #endif    
 
-#if defined(SMP)
+#if defined(RTS_SUPPORTS_THREADS)
     /* block until we've got a thread on the run queue and a free
      * capability.
+     *
      */
-    while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
-      IF_DEBUG(scheduler, sched_belch("waiting for work"));
-      waitCondition( &thread_ready_cond, &sched_mutex );
-      IF_DEBUG(scheduler, sched_belch("work now available"));
+    if ( EMPTY_RUN_QUEUE() ) {
+      /* Give up our capability */
+      releaseCapability(cap);
+      while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
+       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
+       rts_n_waiting_tasks++;
+       waitCondition( &thread_ready_cond, &sched_mutex );
+       rts_n_waiting_tasks--;
+       IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
+      }
     }
-#elif defined(THREADED_RTS)
-   if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) {
-     /* no work available, wait for external calls to complete. */
-     IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
-     taskAvailable();
-     RELEASE_LOCK(&rts_mutex);
-
-     while ( EMPTY_RUN_QUEUE() ) {
-       waitCondition(&thread_ready_cond, &sched_mutex);
-     };
-     RELEASE_LOCK(&sched_mutex);
-
-     IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
-     /* ToDo: longjmp() */
-     taskStart();
-   }
 #endif
 
 #if defined(GRAN)
@@ -1030,8 +1044,7 @@ schedule( void )
 #endif
 #else /* !GRAN && !PAR */
   
-    /* grab a thread from the run queue
-     */
+    /* grab a thread from the run queue */
     ASSERT(run_queue_hd != END_TSO_QUEUE);
     t = POP_RUN_QUEUE();
     // Sanity check the thread we're about to run.  This can be
@@ -1388,7 +1401,8 @@ schedule( void )
       barf("schedule: invalid thread return code %d", (int)ret);
     }
     
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
+    /* I don't understand what this re-grab is doing -- sof */
     grabCapability(&cap);
 #endif
 
@@ -1518,6 +1532,10 @@ suspendThread( StgRegTable *reg )
   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
   suspended_ccalling_threads = cap->r.rCurrentTSO;
 
+#if defined(RTS_SUPPORTS_THREADS)
+  cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
+#endif
+
   /* Use the thread ID as the token; it should be unique */
   tok = cap->r.rCurrentTSO->id;
 
@@ -1534,12 +1552,10 @@ suspendThread( StgRegTable *reg )
   */
   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
   startTask(taskStart);
-
 #endif
 
   THREAD_RUNNABLE();
   RELEASE_LOCK(&sched_mutex);
-  //  RELEASE_LOCK(&rts_mutex);
   return tok; 
 }
 
@@ -1549,23 +1565,22 @@ resumeThread( StgInt tok )
   StgTSO *tso, **prev;
   Capability *cap;
 
-#if defined(THREADED_RTS)
-  IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
+#if defined(RTS_SUPPORTS_THREADS)
+  IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok));
   ACQUIRE_LOCK(&sched_mutex);
-  threads_waiting++;
-  IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting));
-  RELEASE_LOCK(&sched_mutex);
-  
-  IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
-  ACQUIRE_LOCK(&rts_mutex);
-  threads_waiting--;
-  taskNotAvailable();
-  IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
-#endif
+  rts_n_waiting_workers++;
+  IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers));
 
-#if defined(THREADED_RTS)
-  /* Free up any RTS-blocked threads. */
-  broadcastCondition(&thread_ready_cond);
+  /*
+   * Wait for the go ahead
+   */
+  IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities));
+  while ( noCapabilities() ) {
+    waitCondition(&returning_worker_cond, &sched_mutex);
+  }
+  rts_n_waiting_workers--;
+
+  IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok));
 #endif
 
   /* Remove the thread off of the suspended list */
@@ -1584,14 +1599,23 @@ resumeThread( StgInt tok )
   tso->link = END_TSO_QUEUE;
 
 #if defined(RTS_SUPPORTS_THREADS)
+  /* Is it clever to block here with the TSO off the list,
+   * but not hooked up to a capability?
+   */
   while ( noCapabilities() ) {
     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
+    rts_n_waiting_tasks++;
     waitCondition(&thread_ready_cond, &sched_mutex);
+    rts_n_waiting_tasks--;
     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
   }
 #endif
 
   grabCapability(&cap);
+  RELEASE_LOCK(&sched_mutex);
+
+  /* Reset blocking status */
+  tso->why_blocked  = NotBlocked;
 
   cap->r.rCurrentTSO = tso;
 
@@ -1900,7 +1924,11 @@ activateSpark (rtsSpark spark)
  * ------------------------------------------------------------------------ */
 
 void
-scheduleThread(StgTSO *tso)
+scheduleThread_(StgTSO *tso
+#if defined(THREADED_RTS)
+              , rtsBool createTask
+#endif
+             )
 {
   ACQUIRE_LOCK(&sched_mutex);
 
@@ -1910,6 +1938,14 @@ scheduleThread(StgTSO *tso)
    * soon as we release the scheduler lock below.
    */
   PUSH_ON_RUN_QUEUE(tso);
+#if defined(THREADED_RTS)
+  /* If main() is scheduling a thread, don't bother creating a 
+   * new task.
+   */
+  if ( createTask ) {
+    startTask(taskStart);
+  }
+#endif
   THREAD_RUNNABLE();
 
 #if 0
@@ -1918,6 +1954,15 @@ scheduleThread(StgTSO *tso)
   RELEASE_LOCK(&sched_mutex);
 }
 
+void scheduleThread(StgTSO* tso)
+{
+#if defined(THREADED_RTS)
+  return scheduleThread_(tso, rtsTrue);
+#else
+  return scheduleThread_(tso);
+#endif
+}
+
 /* ---------------------------------------------------------------------------
  * initScheduler()
  *
@@ -1979,22 +2024,19 @@ initScheduler(void)
   initMutex(&term_mutex);
 
   initCondition(&thread_ready_cond);
-#if defined(THREADED_RTS)
-  initMutex(&rts_mutex);
+  initCondition(&returning_worker_cond);
 #endif
   
+#if defined(SMP)
   initCondition(&gc_pending_cond);
 #endif
 
-#if defined(THREADED_RTS)
-  /* Grab big lock */
-  ACQUIRE_LOCK(&rts_mutex);
-  IF_DEBUG(scheduler, 
-          sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
+#if defined(RTS_SUPPORTS_THREADS)
+  ACQUIRE_LOCK(&sched_mutex);
 #endif
 
   /* Install the SIGHUP handler */
-#ifdef SMP
+#if defined(SMP)
   {
     struct sigaction action,oact;
 
@@ -2025,6 +2067,11 @@ initScheduler(void)
 #if /* defined(SMP) ||*/ defined(PAR)
   initSparkPools();
 #endif
+
+#if defined(RTS_SUPPORTS_THREADS)
+  RELEASE_LOCK(&sched_mutex);
+#endif
+
 }
 
 void
@@ -2079,13 +2126,13 @@ finishAllThreads ( void )
 {
    do {
       while (run_queue_hd != END_TSO_QUEUE) {
-         waitThread ( run_queue_hd, NULL );
+         waitThread ( run_queue_hd, NULL);
       }
       while (blocked_queue_hd != END_TSO_QUEUE) {
-         waitThread ( blocked_queue_hd, NULL );
+         waitThread ( blocked_queue_hd, NULL);
       }
       while (sleeping_queue != END_TSO_QUEUE) {
-         waitThread ( blocked_queue_hd, NULL );
+         waitThread ( blocked_queue_hd, NULL);
       }
    } while 
       (blocked_queue_hd != END_TSO_QUEUE || 
@@ -2095,6 +2142,21 @@ finishAllThreads ( void )
 
 SchedulerStatus
 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+{ 
+#if defined(THREADED_RTS)
+  return waitThread_(tso,ret, rtsFalse);
+#else
+  return waitThread_(tso,ret);
+#endif
+}
+
+SchedulerStatus
+waitThread_(StgTSO *tso,
+           /*out*/StgClosure **ret
+#if defined(THREADED_RTS)
+           , rtsBool blockWaiting
+#endif
+          )
 {
   StgMainThread *m;
   SchedulerStatus stat;
@@ -2113,13 +2175,27 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
   m->link = main_threads;
   main_threads = m;
 
-  IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
-                             m->tso->id));
+  IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
 
-#ifdef SMP
-  do {
-    waitCondition(&m->wakeup, &sched_mutex);
-  } while (m->stat == NoStatus);
+#if defined(RTS_SUPPORTS_THREADS)
+
+# if defined(THREADED_RTS)
+  if (!blockWaiting) {
+    /* In the threaded case, the OS thread that called main()
+     * gets to enter the RTS directly without going via another
+     * task/thread.
+     */
+    RELEASE_LOCK(&sched_mutex);
+    schedule();
+    ASSERT(m->stat != NoStatus);
+  } else 
+# endif
+  {
+    IF_DEBUG(scheduler, sched_belch("sfoo"));
+    do {
+      waitCondition(&m->wakeup, &sched_mutex);
+    } while (m->stat == NoStatus);
+  }
 #elif defined(GRAN)
   /* GranSim specific init */
   CurrentTSO = m->tso;                // the TSO to run
@@ -2143,7 +2219,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
                              m->tso->id));
   free(m);
 
-  RELEASE_LOCK(&sched_mutex);
+#if defined(THREADED_RTS)
+  if (blockWaiting) 
+#endif
+    RELEASE_LOCK(&sched_mutex);
 
   return stat;
 }
@@ -3418,6 +3497,11 @@ printThreadBlockage(StgTSO *tso)
            tso->block_info.closure, info_type(tso->block_info.closure));
     break;
 #endif
+#if defined(RTS_SUPPORTS_THREADS)
+  case BlockedOnCCall:
+    fprintf(stderr,"is blocked on an external call");
+    break;
+#endif
   default:
     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
         tso->why_blocked, tso->id, tso);
index 47cbd2d..93ef030 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.27 2002/02/12 15:39:49 sof Exp $
+ * $Id: Schedule.h,v 1.28 2002/02/13 08:48:07 sof Exp $
  *
  * (c) The GHC Team 1998-1999
  *
@@ -131,16 +131,6 @@ extern rtsBool interrupted;
 /* In Select.c */
 extern nat timestamp;
 
-/* Free capability list.
- * Locks required: sched_mutex.
- */
-#ifdef SMP
-extern Capability *free_capabilities;
-extern nat n_free_capabilities;
-#else
-extern Capability MainCapability;
-#endif
-
 /* Thread queues.
  * Locks required  : sched_mutex
  *
@@ -157,13 +147,31 @@ extern  StgTSO *sleeping_queue;
 extern  StgTSO *all_threads;
 
 #if defined(RTS_SUPPORTS_THREADS)
+/* Schedule.c has detailed info on what these do */
 extern Mutex       sched_mutex;
 extern Condition   thread_ready_cond;
-# if defined(SMP)
-extern Condition   gc_pending_cond;
-# endif
+extern Condition   returning_worker_cond;
+extern nat         rts_n_waiting_workers;
+extern nat         rts_n_waiting_tasks;
 #endif
 
+
+/* Sigh, RTS-internal versions of waitThread(), scheduleThread(), and
+   rts_evalIO() for the use by main() only. ToDo: better. */
+extern SchedulerStatus waitThread_(StgTSO *tso,
+                                  /*out*/StgClosure **ret
+#if defined(THREADED_RTS)
+                                  , rtsBool blockWaiting
+#endif
+                                  );
+extern void scheduleThread_(StgTSO *tso
+#if defined(THREADED_RTS)
+                          , rtsBool createTask
+#endif
+                           );
+extern SchedulerStatus rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret);
+
+
 /* Called by shutdown_handler(). */
 void interruptStgRts ( void );
 
@@ -250,7 +258,7 @@ void print_bqe (StgBlockingQueueElement *bqe);
  */
 #if defined(RTS_SUPPORTS_THREADS)
 #define THREAD_RUNNABLE()                      \
-  if ( !noCapabilities() ) {                   \
+  if ( !noCapabilities() ) {                   \
      signalCondition(&thread_ready_cond);      \
   }                                            \
   context_switch = 1;