[project @ 2003-01-25 15:54:48 by wolfgang]
authorwolfgang <unknown>
Sat, 25 Jan 2003 15:54:51 +0000 (15:54 +0000)
committerwolfgang <unknown>
Sat, 25 Jan 2003 15:54:51 +0000 (15:54 +0000)
This commit fixes many bugs and limitations in the threaded RTS.
There are still some issues remaining, though.

The following bugs should have been fixed:

- [+] "safe" calls could cause crashes
- [+] yieldToReturningWorker/grabReturnCapability
    -     It used to deadlock.
- [+] couldn't wake blocked workers
    -     Calls into the RTS could go unanswered for a long time, and
          that includes ordinary callbacks in some circumstances.
- [+] couldn't block on an MVar and expect to be woken up by a signal
      handler
    -     Depending on the exact situation, the RTS shut down or
          blocked forever and ignored the signal.
- [+] The locking scheme in RtsAPI.c didn't work
- [+] run_thread label in wrong place (schedule())
- [+] Deadlock in GHC.Handle
    -     if a signal arrived at the wrong time, an mvar was never
          filled again
- [+] Signals delivered to the "wrong" thread were ignored or handled
      too late.

Issues:
*) If GC can move TSO objects (I don't know - can it?), then ghci
will occasionally crash when calling foreign functions, because the
parameters are stored on the TSO stack.

*) There is still a race condition lurking in the code
(both threaded and non-threaded RTS are affected):
If a signal arrives after the check for pending signals in
schedule(), but before the call to select() in awaitEvent(),
select() will be called anyway. The signal handler will be
executed much later than expected.

*) For Win32, GHC doesn't yet support non-blocking IO, so while a
thread is waiting for IO, no call-ins can happen. If the RTS is
blocked in awaitEvent, it uses a polling loop on Win32, so call-ins
should work (although the polling loop looks ugly).

*) Deadlock detection is disabled for the threaded rts, because I
don't know how to do it properly in the presence of foreign call-ins
from foreign threads.
This causes the tests conc031, conc033 and conc034 to fail.

*) "safe" is currently treated as "threadsafe". Implementing "safe" in
a way that blocks other Haskell threads is more difficult than was
thought at first. I think it could be done with a few additional lines
of code, but personally, I'm strongly in favour of abolishing the
distinction.

*) Running finalizers at program termination is inefficient - there
are two OS threads passing messages back and forth for every finalizer
that is run. Also (just as in the non-threaded case) the finalizers
are run in parallel to any remaining haskell threads and to any
foreign call-ins that might still happen.

15 files changed:
ghc/compiler/deSugar/DsForeign.lhs
ghc/includes/RtsAPI.h
ghc/includes/TSO.h
ghc/includes/Updates.h
ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Interpreter.c
ghc/rts/RtsAPI.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h
ghc/rts/Select.c
ghc/rts/Signals.c
ghc/rts/Signals.h
ghc/rts/Task.c
ghc/rts/Weak.c

index c16cc86..4074d04 100644 (file)
@@ -450,9 +450,12 @@ mkFExportCBits c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
 
   -- various other bits for inside the fn
   declareResult = text "HaskellObj ret;"
+  declareCResult | res_hty_is_unit = empty
+                 | otherwise       = cResType <+> text "cret;"
 
-  return_what | res_hty_is_unit = empty
-             | otherwise       = parens (unpackHObj res_hty <> parens (text "ret"))
+  assignCResult | res_hty_is_unit = empty
+               | otherwise       =
+                       text "cret=" <> unpackHObj res_hty <> parens (text "ret") <> semi
 
   -- an extern decl for the fn being called
   extern_decl
@@ -469,6 +472,8 @@ mkFExportCBits c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
      [ lbrace
      ,   text "SchedulerStatus rc;"
      ,   declareResult
+     ,   declareCResult
+     ,   text "rts_lock();"
          -- create the application + perform it.
      ,   text "rc=rts_evalIO" <> parens (
                text "rts_apply" <> parens (
@@ -483,7 +488,10 @@ mkFExportCBits c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
             ) <> semi
      ,   text "rts_checkSchedStatus" <> parens (doubleQuotes (ftext c_nm)
                                                <> comma <> text "rc") <> semi
-     ,   text "return" <> return_what <> semi
+     ,   assignCResult
+     ,   text "rts_unlock();"
+     ,   if res_hty_is_unit then empty
+            else text "return cret;"
      , rbrace
      ]
 
index cf07923..32e7362 100644 (file)
@@ -1,5 +1,5 @@
 /* ----------------------------------------------------------------------------
- * $Id: RtsAPI.h,v 1.30 2002/09/05 08:58:55 simonmar Exp $
+ * $Id: RtsAPI.h,v 1.31 2003/01/25 15:54:48 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -38,6 +38,20 @@ extern void shutdownHaskellAndExit ( int exitCode );
 extern void setProgArgv            ( int argc, char *argv[] );
 extern void getProgArgv            ( int *argc, char **argv[] );
 
+
+/* ----------------------------------------------------------------------------
+   Locking.
+   
+   In a multithreaded environments, you have to surround all access to the
+   RtsAPI with these calls.
+   ------------------------------------------------------------------------- */
+   
+void
+rts_lock ( void );
+
+void
+rts_unlock ( void );
+
 /* ----------------------------------------------------------------------------
    Building Haskell objects from C datatypes.
    ------------------------------------------------------------------------- */
@@ -85,6 +99,8 @@ HsBool       rts_getBool      ( HaskellObj );
    Evaluating Haskell expressions
 
    The versions ending in '_' allow you to specify an initial stack size.
+   Note that these calls may cause Garbage Collection, so all HaskellObj
+   references are rendered invalid by these calls.
    ------------------------------------------------------------------------- */
 SchedulerStatus 
 rts_eval ( HaskellObj p, /*out*/HaskellObj *ret );
index ed0f870..e664f9c 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.28 2002/12/11 15:36:40 simonmar Exp $
+ * $Id: TSO.h,v 1.29 2003/01/25 15:54:48 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -144,7 +144,9 @@ typedef enum {
   , BlockedOnGA_NoSend // same as above but without sending a Fetch message
 #endif
 #if defined(RTS_SUPPORTS_THREADS)
-  , BlockedOnCCall 
+  , BlockedOnCCall
+  , BlockedOnCCall_NoUnblockExc // same as above but don't unblock async exceptions
+                               // in resumeThread()
 #endif
 } StgTSOBlockReason;
 
index add8b26..0820b50 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Updates.h,v 1.28 2002/12/11 15:36:40 simonmar Exp $
+ * $Id: Updates.h,v 1.29 2003/01/25 15:54:48 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
                              (StgClosure *)updclosure,                 \
                              (StgClosure *)heapptr);                   \
    }
+#elif defined(RTS_SUPPORTS_THREADS)
+
+# ifdef TICKY_TICKY
+#  define UPD_IND_NOLOCK(updclosure, heapptr)                  \
+   {                                                           \
+       const StgInfoTable *info;                               \
+       info = ((StgClosure *)updclosure)->header.info;         \
+        AWAKEN_BQ_NOLOCK(info,updclosure);                     \
+       updateWithPermIndirection(info,                         \
+                                 (StgClosure *)updclosure,     \
+                                 (StgClosure *)heapptr);       \
+   }
+# else
+#  define UPD_IND_NOLOCK(updclosure, heapptr)          \
+   {                                                   \
+       const StgInfoTable *info;                       \
+       info = ((StgClosure *)updclosure)->header.info; \
+        AWAKEN_BQ_NOLOCK(info,updclosure);             \
+       updateWithIndirection(info,                     \
+                             (StgClosure *)updclosure, \
+                             (StgClosure *)heapptr);   \
+   }
+# endif
+
 #else
 #define UPD_IND_NOLOCK(updclosure,heapptr) UPD_IND(updclosure,heapptr)
 #endif
@@ -171,6 +195,17 @@ extern void awakenBlockedQueue(StgTSO *q);
           DO_AWAKEN_BQ(closure);                                        \
        }
 
+#ifdef RTS_SUPPORTS_THREADS
+extern void awakenBlockedQueueNoLock(StgTSO *q);
+#define DO_AWAKEN_BQ_NOLOCK(closure)                                   \
+        STGCALL1(awakenBlockedQueueNoLock,                             \
+                ((StgBlockingQueue *)closure)->blocking_queue);
+
+#define AWAKEN_BQ_NOLOCK(info,closure)                                 \
+       if (info == &stg_BLACKHOLE_BQ_info) {                           \
+          DO_AWAKEN_BQ_NOLOCK(closure);                                 \
+       }
+#endif
 #endif /* GRAN || PAR */
 
 /* -------------------------------------------------------------------------
index e098cb2..ee25f27 100644 (file)
@@ -21,6 +21,7 @@
 #include "OSThreads.h"
 #include "Capability.h"
 #include "Schedule.h"  /* to get at EMPTY_RUN_QUEUE() */
+#include "Signals.h" /* to get at handleSignalsInThisThread() */
 
 #if !defined(SMP)
 Capability MainCapability;     /* for non-SMP, we have one global capability */
@@ -44,7 +45,7 @@ Condition returning_worker_cond = INIT_COND_VAR;
  * there are one or more worker threads blocked waiting on
  * returning_worker_cond.
  */
-static nat rts_n_waiting_workers = 0;
+nat rts_n_waiting_workers = 0;
 
 /* thread_ready_cond: when signalled, a thread has become runnable for a
  * task to execute.
@@ -53,14 +54,10 @@ static nat rts_n_waiting_workers = 0;
  * exclusive access to the RTS and all its data structures (that are not
  * locked by the Scheduler's mutex).
  *
- * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
+ * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
  *
  */
 Condition thread_ready_cond = INIT_COND_VAR;
-#if 0
-/* For documentation purposes only */
-#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
-#endif
 
 /*
  * To be able to make an informed decision about whether or not 
@@ -119,6 +116,8 @@ initCapabilities()
 #if defined(SMP)
 /* Free capability list. */
 static Capability *free_capabilities; /* Available capabilities for running threads */
+static Capability *returning_capabilities; 
+       /* Capabilities being passed to returning worker threads */
 #endif
 
 /* -----------------------------------------------------------------------------
@@ -138,9 +137,11 @@ static Capability *free_capabilities; /* Available capabilities for running thre
  */ 
 void grabCapability(Capability** cap)
 {
+  ASSERT(rts_n_free_capabilities > 0);
 #if !defined(SMP)
   rts_n_free_capabilities = 0;
   *cap = &MainCapability;
+  handleSignalsInThisThread();
 #else
   *cap = free_capabilities;
   free_capabilities = (*cap)->link;
@@ -161,16 +162,11 @@ void releaseCapability(Capability* cap
                       STG_UNUSED
 #endif
 )
-{
-#if defined(SMP)
-  cap->link = free_capabilities;
-  free_capabilities = cap;
-  rts_n_free_capabilities++;
-#else
-  rts_n_free_capabilities = 1;
-#endif
-
+{      // Precondition: sched_mutex must be held
 #if defined(RTS_SUPPORTS_THREADS)
+#ifndef SMP
+  ASSERT(rts_n_free_capabilities == 0);
+#endif
   /* Check to see whether a worker thread can be given
      the go-ahead to return the result of an external call..*/
   if (rts_n_waiting_workers > 0) {
@@ -178,14 +174,27 @@ void releaseCapability(Capability* cap
      * thread that is yielding its capability will repeatedly
      * signal returning_worker_cond.
      */
+#if defined(SMP)
+       // SMP variant untested
+    cap->link = returning_capabilities;
+    returning_capabilities = cap;
+#else
+#endif
     rts_n_waiting_workers--;
     signalCondition(&returning_worker_cond);
-  } else if ( !EMPTY_RUN_QUEUE() ) {
-    /* Signal that work is available */
+  } else /*if ( !EMPTY_RUN_QUEUE() )*/ {
+#if defined(SMP)
+    cap->link = free_capabilities;
+    free_capabilities = cap;
+    rts_n_free_capabilities++;
+#else
+    rts_n_free_capabilities = 1;
+#endif
+    /* Signal that a capability is available */
     signalCondition(&thread_ready_cond);
   }
 #endif
-  return;
+ return;
 }
 
 #if defined(RTS_SUPPORTS_THREADS)
@@ -226,15 +235,25 @@ grabReturnCapability(Mutex* pMutex, Capability** pCap)
 {
   IF_DEBUG(scheduler,
           fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId()));
-  rts_n_waiting_workers++;
   IF_DEBUG(scheduler,
           fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n",
                   osThreadId(), rts_n_waiting_workers));
-  while ( noCapabilities() ) {
+  if ( noCapabilities() ) {
+    rts_n_waiting_workers++;
+    wakeBlockedWorkerThread();
+    context_switch = 1;        // make sure it's our turn soon
     waitCondition(&returning_worker_cond, pMutex);
+#if defined(SMP)
+    *pCap = returning_capabilities;
+    returning_capabilities = (*pCap)->link;
+#else
+    *pCap = &MainCapability;
+    ASSERT(rts_n_free_capabilities == 0);
+    handleSignalsInThisThread();
+#endif
+  } else {
+    grabCapability(pCap);
   }
-  
-  grabCapability(pCap);
   return;
 }
 
@@ -253,18 +272,21 @@ grabReturnCapability(Mutex* pMutex, Capability** pCap)
  *
  * Pre-condition:  pMutex is assumed held and the thread possesses
  *                 a Capability.
- * Post-condition: pMutex isn't held and the Capability has
+ * Post-condition: pMutex is held and the Capability has
  *                 been given back.
  */
 void
 yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
 {
-  if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
+  if ( rts_n_waiting_workers > 0 ) {
     IF_DEBUG(scheduler,
-            fprintf(stderr,"worker thread (%ld): giving up RTS token\n", osThreadId()));
+            fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
     releaseCapability(*pCap);
-    /* And wait for work */
+        /* And wait for work */
     waitForWorkCapability(pMutex, pCap, rtsFalse);
+    IF_DEBUG(scheduler,
+            fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
+               osThreadId()));
   }
   return;
 }
@@ -281,6 +303,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
  *           call is made.
  *
  * Pre-condition: pMutex is held.
+ * Post-condition: pMutex is held and *pCap is held by the current thread
  */
 void 
 waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
@@ -293,6 +316,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
   grabCapability(pCap);
   return;
 }
+
 #endif /* RTS_SUPPORTS_THREADS */
 
 #if defined(SMP)
@@ -319,6 +343,7 @@ initCapabilities_(nat n)
   }
   free_capabilities = cap;
   rts_n_free_capabilities = n;
+  returning_capabilities = NULL;
   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities););
 }
 #endif /* SMP */
index cb0b09b..dd6a7be 100644 (file)
@@ -34,17 +34,20 @@ extern void releaseCapability(Capability* cap);
 
 extern nat rts_n_free_capabilities;  
 #if defined(RTS_SUPPORTS_THREADS)
-/* number of worker threads waiting to do good work within
-   the RTS. Used by Task.c (only) to determine whether or not
-   new worker threads needs to be created (when an external call
-   is made).
+/* number of worker threads waiting for a return capability
  */
-extern nat rts_n_waiting_workers; /* used by Task.c to determine */
+extern nat rts_n_waiting_workers;
 
 extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
 extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap);
 extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable);
 
+
+static inline rtsBool needToYieldToReturningWorker(void)
+{
+       return rts_n_waiting_workers > 0;
+}
+
 static inline nat getFreeCapabilities (void)
 {
   return rts_n_free_capabilities;
index d39becb..270feb0 100644 (file)
@@ -1157,6 +1157,7 @@ run_BCO:
            int stk_offset            = BCO_NEXT;
            int o_itbl                = BCO_NEXT;
            void(*marshall_fn)(void*) = (void (*)(void*))BCO_LIT(o_itbl);
+           StgTSO *tso               = cap->r.rCurrentTSO;
 
            // There are a bunch of non-ptr words on the stack (the
            // ccall args, the ccall fun address and space for the
@@ -1175,12 +1176,17 @@ run_BCO:
            SAVE_STACK_POINTERS;
            tok = suspendThread(&cap->r,rtsFalse);
 
-           // Careful: suspendThread might have shifted the stack
+           // Careful:
+           // suspendThread might have shifted the stack
            // around (stack squeezing), so we have to grab the real
-           // Sp out of the TSO to find the ccall args again:
-           marshall_fn ( (void*)(cap->r.rCurrentTSO->sp + RET_DYN_SIZE
-               + sizeofW(StgRetDyn)) );
-
+           // Sp out of the TSO to find the ccall args again.
+           // We don't own the capability anymore, so we mustn't use it.
+           // Instead, we have to save the TSO ptr beforehand.
+           // Also note that GC may strike at any time now (from another thread).
+           // FIXME - DANGER!! Can GC move our TSO?
+           // If so, we have to copy the args elsewhere!
+           marshall_fn ( (void*)(tso->sp + RET_DYN_SIZE + sizeofW(StgRetDyn)) );
+                   
            // And restart the thread again, popping the RET_DYN frame.
            cap = (Capability *)((void *)resumeThread(tok,rtsFalse) - sizeof(StgFunTable));
            LOAD_STACK_POINTERS;
index e52e222..2722bf4 100644 (file)
@@ -1,5 +1,5 @@
 /* ----------------------------------------------------------------------------
- * $Id: RtsAPI.c,v 1.38 2002/12/11 15:36:47 simonmar Exp $
+ * $Id: RtsAPI.c,v 1.39 2003/01/25 15:54:49 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2001
  *
 #include "Prelude.h"
 #include "OSThreads.h"
 #include "Schedule.h"
+#include "Capability.h"
 
 #include <stdlib.h>
 
-#if defined(RTS_SUPPORTS_THREADS)
-/* Cheesy locking scheme while waiting for the 
- * RTS API to change.
- */
-static Mutex     alloc_mutex = INIT_MUTEX_VAR;
-static Condition alloc_cond  = INIT_COND_VAR;
-#define INVALID_THREAD_ID ((OSThreadId)(-1))
-
-/* Thread currently owning the allocator */
-static OSThreadId c_id = INVALID_THREAD_ID;
-
-static StgPtr alloc(nat n)
-{
-  OSThreadId tid = osThreadId();
-  ACQUIRE_LOCK(&alloc_mutex);
-  if (tid == c_id) {
-    /* I've got the lock, just allocate() */
-    ;
-  } else if (c_id == INVALID_THREAD_ID) {
-    c_id = tid;
-  } else {
-    waitCondition(&alloc_cond, &alloc_mutex);
-    c_id = tid;
-  }
-  RELEASE_LOCK(&alloc_mutex);
-  return allocate(n);
-}
-
-static void releaseAllocLock(void)
-{
-  ACQUIRE_LOCK(&alloc_mutex);
-  /* Reset the allocator owner */
-  c_id = INVALID_THREAD_ID;
-  RELEASE_LOCK(&alloc_mutex);
-
-  /* Free up an OS thread waiting to get in */
-  signalCondition(&alloc_cond);
-}
-#else
-# define alloc(n) allocate(n)
-# define releaseAllocLock() /* nothing */
-#endif
-
-
 /* ----------------------------------------------------------------------------
    Building Haskell objects from C datatypes.
    ------------------------------------------------------------------------- */
 HaskellObj
 rts_mkChar (HsChar c)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, Czh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgChar)c;
   return p;
@@ -79,7 +36,7 @@ rts_mkChar (HsChar c)
 HaskellObj
 rts_mkInt (HsInt i)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, Izh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgInt)i;
   return p;
@@ -88,7 +45,7 @@ rts_mkInt (HsInt i)
 HaskellObj
 rts_mkInt8 (HsInt8 i)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, I8zh_con_info, CCS_SYSTEM);
   /* Make sure we mask out the bits above the lowest 8 */
   p->payload[0]  = (StgClosure *)(StgInt)((unsigned)i & 0xff);
@@ -98,7 +55,7 @@ rts_mkInt8 (HsInt8 i)
 HaskellObj
 rts_mkInt16 (HsInt16 i)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, I16zh_con_info, CCS_SYSTEM);
   /* Make sure we mask out the relevant bits */
   p->payload[0]  = (StgClosure *)(StgInt)((unsigned)i & 0xffff);
@@ -108,7 +65,7 @@ rts_mkInt16 (HsInt16 i)
 HaskellObj
 rts_mkInt32 (HsInt32 i)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, I32zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgInt)((unsigned)i & 0xffffffff);
   return p;
@@ -118,7 +75,7 @@ HaskellObj
 rts_mkInt64 (HsInt64 i)
 {
   long long *tmp;
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
   SET_HDR(p, I64zh_con_info, CCS_SYSTEM);
   tmp  = (long long*)&(p->payload[0]);
   *tmp = (StgInt64)i;
@@ -128,7 +85,7 @@ rts_mkInt64 (HsInt64 i)
 HaskellObj
 rts_mkWord (HsWord i)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, Wzh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)i;
   return p;
@@ -138,7 +95,7 @@ HaskellObj
 rts_mkWord8 (HsWord8 w)
 {
   /* see rts_mkInt* comments */
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, W8zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)(w & 0xff);
   return p;
@@ -148,7 +105,7 @@ HaskellObj
 rts_mkWord16 (HsWord16 w)
 {
   /* see rts_mkInt* comments */
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, W16zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)(w & 0xffff);
   return p;
@@ -158,7 +115,7 @@ HaskellObj
 rts_mkWord32 (HsWord32 w)
 {
   /* see rts_mkInt* comments */
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, W32zh_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)(StgWord)(w & 0xffffffff);
   return p;
@@ -169,7 +126,7 @@ rts_mkWord64 (HsWord64 w)
 {
   unsigned long long *tmp;
 
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
   /* see mk_Int8 comment */
   SET_HDR(p, W64zh_con_info, CCS_SYSTEM);
   tmp  = (unsigned long long*)&(p->payload[0]);
@@ -180,7 +137,7 @@ rts_mkWord64 (HsWord64 w)
 HaskellObj
 rts_mkFloat (HsFloat f)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
   SET_HDR(p, Fzh_con_info, CCS_SYSTEM);
   ASSIGN_FLT((P_)p->payload, (StgFloat)f);
   return p;
@@ -189,7 +146,7 @@ rts_mkFloat (HsFloat f)
 HaskellObj
 rts_mkDouble (HsDouble d)
 {
-  StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,sizeofW(StgDouble)));
+  StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble)));
   SET_HDR(p, Dzh_con_info, CCS_SYSTEM);
   ASSIGN_DBL((P_)p->payload, (StgDouble)d);
   return p;
@@ -198,7 +155,7 @@ rts_mkDouble (HsDouble d)
 HaskellObj
 rts_mkStablePtr (HsStablePtr s)
 {
-  StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
+  StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
   SET_HDR(p, StablePtr_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)s;
   return p;
@@ -207,7 +164,7 @@ rts_mkStablePtr (HsStablePtr s)
 HaskellObj
 rts_mkPtr (HsPtr a)
 {
-  StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
+  StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
   SET_HDR(p, Ptr_con_info, CCS_SYSTEM);
   p->payload[0]  = (StgClosure *)a;
   return p;
@@ -236,7 +193,7 @@ rts_apply (HaskellObj f, HaskellObj arg)
 {
     StgClosure *ap;
 
-    ap = (StgClosure *)alloc(sizeofW(StgClosure) + 2);
+    ap = (StgClosure *)allocate(sizeofW(StgClosure) + 2);
     SET_HDR(ap, (StgInfoTable *)&stg_ap_2_upd_info, CCS_SYSTEM);
     ap->payload[0] = f;
     ap->payload[1] = arg;
@@ -414,7 +371,6 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret)
     StgTSO *tso;
 
     tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
-    releaseAllocLock();
     return scheduleWaitThread(tso,ret);
 }
 
@@ -424,7 +380,6 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
     StgTSO *tso;
     
     tso = createGenThread(stack_size, p);
-    releaseAllocLock();
     return scheduleWaitThread(tso,ret);
 }
 
@@ -438,7 +393,6 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
     StgTSO* tso; 
     
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
-    releaseAllocLock();
     return scheduleWaitThread(tso,ret);
 }
 
@@ -446,13 +400,13 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
  * Identical to rts_evalIO(), but won't create a new task/OS thread
  * to evaluate the Haskell thread. Used by main() only. Hack.
  */
 SchedulerStatus
 rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret)
 {
-    StgTSO* tso; 
+    StgTSO* tso;
     
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
-    releaseAllocLock();
     scheduleThread(tso);
     return waitThread(tso, ret);
 }
@@ -472,7 +426,6 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
     
     p = (StgClosure *)deRefStablePtr(s);
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
-    releaseAllocLock();
     stat = scheduleWaitThread(tso,&r);
 
     if (stat == Success) {
@@ -492,7 +445,6 @@ rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
     StgTSO *tso;
 
     tso = createIOThread(stack_size, p);
-    releaseAllocLock();
     return scheduleWaitThread(tso,ret);
 }
 
@@ -515,3 +467,34 @@ rts_checkSchedStatus ( char* site, SchedulerStatus rc )
        stg_exit(EXIT_FAILURE);
     }
 }
+
+#ifdef RTS_SUPPORTS_THREADS
+void
+rts_lock()
+{
+       Capability *cap;
+       ACQUIRE_LOCK(&sched_mutex);
+       
+               // we request to get the capability immediately, in order to
+               // a) stop other threads from using allocate()
+               // b) wake the current worker thread from awaitEvent()
+               //       (so that a thread started by rts_eval* will start immediately)
+       grabReturnCapability(&sched_mutex,&cap);
+       
+               // now that we have the capability, we don't need it anymore
+               // (other threads will continue to run as soon as we release the sched_mutex)
+       releaseCapability(cap);
+       
+               // In the RTS hasn't been entered yet,
+               // start a RTS task.
+               // If there is already a task available (waiting for the work capability),
+               // this will do nothing.
+       startSchedulerTask();
+}
+
+void
+rts_unlock()
+{
+       RELEASE_LOCK(&sched_mutex);
+}
+#endif
index 7173ed7..497a0c6 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.160 2002/12/13 15:16:29 simonmar Exp $
+ * $Id: Schedule.c,v 1.161 2003/01/25 15:54:49 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
  */
 StgMainThread *main_threads = NULL;
 
+#ifdef THREADED_RTS
+// Pointer to the thread that executes main
+// When this thread is finished, the program terminates
+// by calling shutdownHaskellAndExit.
+// It would be better to add a call to shutdownHaskellAndExit
+// to the Main.main wrapper and to remove this hack.
+StgMainThread *main_main_thread = NULL;
+#endif
+
 /* Thread queues.
  * Locks required: sched_mutex.
  */
@@ -306,8 +315,13 @@ taskStart(void)
 }
 #endif
 
-
-
+#if defined(RTS_SUPPORTS_THREADS)
+void
+startSchedulerTask(void)
+{
+    startTask(taskStart);
+}
+#endif
 
 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
 //@subsection Main scheduling loop
@@ -373,6 +387,7 @@ schedule( void )
  
 #if defined(RTS_SUPPORTS_THREADS)
   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
+  IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
 #else
   /* simply initialise it in the non-threaded case */
   grabCapability(&cap);
@@ -426,9 +441,19 @@ schedule( void )
      */
     if (interrupted) {
       IF_DEBUG(scheduler, sched_belch("interrupted"));
-      deleteAllThreads();
       interrupted = rtsFalse;
       was_interrupted = rtsTrue;
+#if defined(RTS_SUPPORTS_THREADS)
+      // In the threaded RTS, deadlock detection doesn't work,
+      // so just exit right away.
+      prog_belch("interrupted");
+      releaseCapability(cap);
+      startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
+      RELEASE_LOCK(&sched_mutex);
+      shutdownHaskellAndExit(EXIT_SUCCESS);
+#else
+      deleteAllThreads();
+#endif
     }
 
     /* Go through the list of main threads and wake up any
@@ -440,7 +465,7 @@ schedule( void )
     { 
       StgMainThread *m, **prev;
       prev = &main_threads;
-      for (m = main_threads; m != NULL; m = m->link) {
+      for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
        switch (m->tso->what_next) {
        case ThreadComplete:
          if (m->ret) {
@@ -453,6 +478,13 @@ schedule( void )
 #ifdef DEBUG
          removeThreadLabel((StgWord)m->tso);
 #endif
+          if(m == main_main_thread)
+          {
+              releaseCapability(cap);
+              startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
+              RELEASE_LOCK(&sched_mutex);
+              shutdownHaskellAndExit(EXIT_SUCCESS);
+          }
          break;
        case ThreadKilled:
          if (m->ret) *(m->ret) = NULL;
@@ -466,6 +498,13 @@ schedule( void )
 #ifdef DEBUG
          removeThreadLabel((StgWord)m->tso);
 #endif
+          if(m == main_main_thread)
+          {
+              releaseCapability(cap);
+              startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
+              RELEASE_LOCK(&sched_mutex);
+              shutdownHaskellAndExit(EXIT_SUCCESS);
+          }
          break;
        default:
          break;
@@ -562,10 +601,13 @@ schedule( void )
     /* Check whether any waiting threads need to be woken up.  If the
      * run queue is empty, and there are no other tasks running, we
      * can wait indefinitely for something to happen.
-     * ToDo: what if another client comes along & requests another
-     * main thread?
      */
-    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
+    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
+#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
+               || EMPTY_RUN_QUEUE()
+#endif
+        )
+    {
       awaitEvent( EMPTY_RUN_QUEUE()
 #if defined(SMP)
        && allFreeCapabilities()
@@ -586,7 +628,7 @@ schedule( void )
      * If no threads are black holed, we have a deadlock situation, so
      * inform all the main threads.
      */
-#ifndef PAR
+#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
     if (   EMPTY_THREAD_QUEUES()
 #if defined(RTS_SUPPORTS_THREADS)
        && EMPTY_QUEUE(suspended_ccalling_threads)
@@ -699,6 +741,8 @@ schedule( void )
     }
   not_deadlocked:
 
+#elif defined(RTS_SUPPORTS_THREADS)
+    /* ToDo: add deadlock detection in threaded RTS */
 #elif defined(PAR)
     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
 #endif
@@ -714,6 +758,7 @@ schedule( void )
 #endif    
 
 #if defined(RTS_SUPPORTS_THREADS)
+#if defined(SMP)
     /* block until we've got a thread on the run queue and a free
      * capability.
      *
@@ -733,6 +778,11 @@ schedule( void )
       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
     }
+#else
+    if ( EMPTY_RUN_QUEUE() ) {
+      continue; // nothing to do
+    }
+#endif
 #endif
 
 #if defined(GRAN)
@@ -1016,7 +1066,7 @@ schedule( void )
     // expensive if there is lots of thread switching going on...
     IF_DEBUG(sanity,checkTSO(t));
 #endif
-    
+
     cap->r.rCurrentTSO = t;
     
     /* context switches are now initiated by the timer signal, unless
@@ -1031,6 +1081,8 @@ schedule( void )
     else
        context_switch = 0;
 
+run_thread:
+
     RELEASE_LOCK(&sched_mutex);
 
     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
@@ -1043,7 +1095,6 @@ schedule( void )
     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
     /* Run the current thread 
      */
-    run_thread:
     prev_what_next = t->what_next;
     switch (prev_what_next) {
     case ThreadKilled:
@@ -1069,8 +1120,8 @@ schedule( void )
 #endif
     
     ACQUIRE_LOCK(&sched_mutex);
-
-#ifdef SMP
+    
+#ifdef RTS_SUPPORTS_THREADS
     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
 #elif !defined(GRAN) && !defined(PAR)
     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
@@ -1584,7 +1635,15 @@ suspendThread( StgRegTable *reg,
   suspended_ccalling_threads = cap->r.rCurrentTSO;
 
 #if defined(RTS_SUPPORTS_THREADS)
-  cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
+  if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
+  {
+      cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+      cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+  }
+  else
+  {
+      cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+  }
 #endif
 
   /* Use the thread ID as the token; it should be unique */
@@ -1596,15 +1655,11 @@ suspendThread( StgRegTable *reg,
 #if defined(RTS_SUPPORTS_THREADS)
   /* Preparing to leave the RTS, so ensure there's a native thread/task
      waiting to take over.
-     
-     ToDo: optimise this and only create a new task if there's a need
-     for one (i.e., if there's only one Concurrent Haskell thread alive,
-     there's no need to create a new task).
   */
-  IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
-  if (concCall) {
-    startTask(taskStart);
-  }
+  IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
+  //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
+      startTask(taskStart);
+  //}
 #endif
 
   /* Other threads _might_ be available for execution; signal this */
@@ -1626,12 +1681,10 @@ resumeThread( StgInt tok,
 
 #if defined(RTS_SUPPORTS_THREADS)
   /* Wait for permission to re-enter the RTS with the result. */
-  if ( concCall ) {
-    ACQUIRE_LOCK(&sched_mutex);
-    grabReturnCapability(&sched_mutex, &cap);
-  } else {
-    grabCapability(&cap);
-  }
+  ACQUIRE_LOCK(&sched_mutex);
+  grabReturnCapability(&sched_mutex, &cap);
+
+  IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
 #else
   grabCapability(&cap);
 #endif
@@ -1650,11 +1703,22 @@ resumeThread( StgInt tok,
     barf("resumeThread: thread not found");
   }
   tso->link = END_TSO_QUEUE;
+  
+#if defined(RTS_SUPPORTS_THREADS)
+  if(tso->why_blocked == BlockedOnCCall)
+  {
+      awakenBlockedQueueNoLock(tso->blocked_exceptions);
+      tso->blocked_exceptions = NULL;
+  }
+#endif
+  
   /* Reset blocking status */
   tso->why_blocked  = NotBlocked;
 
   cap->r.rCurrentTSO = tso;
+#if defined(RTS_SUPPORTS_THREADS)
   RELEASE_LOCK(&sched_mutex);
+#endif
   return &cap->r;
 }
 
@@ -1974,15 +2038,10 @@ static SchedulerStatus waitThread_(/*out*/StgMainThread* m
  * on this thread's stack before the scheduler is invoked.
  * ------------------------------------------------------------------------ */
 
-static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
+static void scheduleThread_ (StgTSO* tso);
 
 void
-scheduleThread_(StgTSO *tso
-              , rtsBool createTask
-#if !defined(THREADED_RTS)
-                STG_UNUSED
-#endif
-             )
+scheduleThread_(StgTSO *tso)
 {
   // Precondition: sched_mutex must be held.
 
@@ -1992,14 +2051,6 @@ scheduleThread_(StgTSO *tso
    * soon as we release the scheduler lock below.
    */
   PUSH_ON_RUN_QUEUE(tso);
-#if defined(THREADED_RTS)
-  /* If main() is scheduling a thread, don't bother creating a 
-   * new task.
-   */
-  if ( createTask ) {
-    startTask(taskStart);
-  }
-#endif
   THREAD_RUNNABLE();
 
 #if 0
@@ -2010,13 +2061,13 @@ scheduleThread_(StgTSO *tso
 void scheduleThread(StgTSO* tso)
 {
   ACQUIRE_LOCK(&sched_mutex);
-  scheduleThread_(tso, rtsFalse);
+  scheduleThread_(tso);
   RELEASE_LOCK(&sched_mutex);
 }
 
 SchedulerStatus
 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
-{
+{      // Precondition: sched_mutex must be held
   StgMainThread *m;
 
   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
@@ -2036,15 +2087,14 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
      signal the completion of the its work item for the main thread to
      see (==> it got stuck waiting.)    -- sof 6/02.
   */
-  ACQUIRE_LOCK(&sched_mutex);
   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
   
   m->link = main_threads;
   main_threads = m;
 
-  scheduleThread_(tso, rtsTrue);
+  scheduleThread_(tso);
 #if defined(THREADED_RTS)
-  return waitThread_(m, rtsTrue);      // waitThread_ releases sched_mutex
+  return waitThread_(m, rtsTrue);
 #else
   return waitThread_(m);
 #endif
@@ -2232,6 +2282,7 @@ SchedulerStatus
 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
 { 
   StgMainThread *m;
+  SchedulerStatus stat;
 
   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
   m->tso = tso;
@@ -2248,10 +2299,12 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
 
   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
 #if defined(THREADED_RTS)
-  return waitThread_(m, rtsFalse);     // waitThread_ releases sched_mutex
+  stat = waitThread_(m, rtsFalse);
 #else
-  return waitThread_(m);
+  stat = waitThread_(m);
 #endif
+  RELEASE_LOCK(&sched_mutex);
+  return stat;
 }
 
 static
@@ -2275,8 +2328,11 @@ waitThread_(StgMainThread* m
      * gets to enter the RTS directly without going via another
      * task/thread.
      */
+    main_main_thread = m;
     RELEASE_LOCK(&sched_mutex);
     schedule();
+    ACQUIRE_LOCK(&sched_mutex);
+    main_main_thread = NULL;
     ASSERT(m->stat != NoStatus);
   } else 
 # endif
@@ -2309,12 +2365,7 @@ waitThread_(StgMainThread* m
                              m->tso->id));
   free(m);
 
-#if defined(THREADED_RTS)
-  if (blockWaiting) 
-#endif
-    RELEASE_LOCK(&sched_mutex);
-
-  // Postcondition: sched_mutex must not be held
+  // Postcondition: sched_mutex still held
   return stat;
 }
 
@@ -2928,6 +2979,17 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
 }
 
 #else   /* !GRAN && !PAR */
+
+#ifdef RTS_SUPPORTS_THREADS
+void
+awakenBlockedQueueNoLock(StgTSO *tso)
+{
+  while (tso != END_TSO_QUEUE) {
+    tso = unblockOneLocked(tso);
+  }
+}
+#endif
+
 void
 awakenBlockedQueue(StgTSO *tso)
 {
@@ -3514,7 +3576,6 @@ detectBlackHoles( void )
        if (tso->why_blocked != BlockedOnBlackHole) {
            continue;
        }
-
        blocked_on = tso->block_info.closure;
 
        frame = (StgClosure *)tso->sp;
@@ -3522,7 +3583,6 @@ detectBlackHoles( void )
        while(1) {
            info = get_ret_itbl(frame);
            switch (info->i.type) {
-
            case UPDATE_FRAME:
                if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
                    /* We are blocking on one of our own computations, so
@@ -3599,6 +3659,9 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnCCall:
     fprintf(stderr,"is blocked on an external call");
     break;
+  case BlockedOnCCall_NoUnblockExc:
+    fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
+    break;
 #endif
   default:
     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
index 85bece5..cf49a78 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.36 2002/10/22 11:01:20 simonmar Exp $
+ * $Id: Schedule.h,v 1.37 2003/01/25 15:54:50 wolfgang Exp $
  *
  * (c) The GHC Team 1998-1999
  *
@@ -34,6 +34,9 @@ void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
 void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
 #else
 void awakenBlockedQueue(StgTSO *tso);
+#if defined(RTS_SUPPORTS_THREADS)
+void awakenBlockedQueueNoLock(StgTSO *tso);
+#endif
 #endif
 
 /* unblockOne()
@@ -60,9 +63,9 @@ StgTSO *unblockOne(StgTSO *tso);
 void raiseAsync(StgTSO *tso, StgClosure *exception);
 void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception);
 
-/* awaitEvent()
+/* awaitEvent(rtsBool wait)
  *
- * Raises an exception asynchronously in the specified thread.
+ * Checks for blocked threads that need to be woken.
  *
  * Called from STG :  NO
  * Locks assumed   :  sched_mutex
@@ -78,6 +81,16 @@ void awaitEvent(rtsBool wait);  /* In Select.c */
  */
 rtsBool wakeUpSleepingThreads(nat);  /* In Select.c */
 
+/* wakeBlockedWorkerThread()
+ *
+ * If a worker thread is currently blocked in awaitEvent(), interrupt it.
+ *
+ * Called from STG :  NO
+ * Locks assumed   :  sched_mutex
+ */
+void wakeBlockedWorkerThread(void); /* In Select.c */
+
+
 /* GetRoots(evac_fn f)
  *
  * Call f() for each root known to the scheduler.
@@ -174,6 +187,10 @@ typedef struct StgMainThread_ {
   StgClosure **    ret;
 #if defined(RTS_SUPPORTS_THREADS)
   Condition        wakeup;
+#if defined(THREADED_RTS)
+  rtsBool          thread_bound;
+  Condition        bound_thread_cond;
+#endif
 #endif
   struct StgMainThread_ *link;
 } StgMainThread;
@@ -257,9 +274,7 @@ void labelThread(StgPtr tso, char *label);
  */
 #if defined(RTS_SUPPORTS_THREADS)
 #define THREAD_RUNNABLE()                      \
-  if ( !noCapabilities() ) {                   \
-     signalCondition(&thread_ready_cond);      \
-  }                                            \
+  wakeBlockedWorkerThread();                   \
   context_switch = 1;
 #else
 #define THREAD_RUNNABLE()  /* nothing */
@@ -277,4 +292,14 @@ void labelThread(StgPtr tso, char *label);
                                EMPTY_BLOCKED_QUEUE() && \
                                EMPTY_SLEEPING_QUEUE())
 
+#if defined(RTS_SUPPORTS_THREADS)
+/* If no task is waiting for a capability,
+ * spawn a new worker thread.
+ *
+ * (Used by the RtsAPI)
+ */
+void
+startSchedulerTask(void);
+#endif
+
 #endif /* __SCHEDULE_H__ */
index dc19cbf..a2ad455 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Select.c,v 1.22 2002/07/24 03:38:58 sof Exp $
+ * $Id: Select.c,v 1.23 2003/01/25 15:54:50 wolfgang Exp $
  *
  * (c) The GHC Team 1995-2002
  *
@@ -16,6 +16,7 @@
 #include "RtsFlags.h"
 #include "Itimer.h"
 #include "Signals.h"
+#include "Capability.h"
 
 # ifdef HAVE_SYS_TYPES_H
 #  include <sys/types.h>
 /* last timestamp */
 nat timestamp = 0;
 
+#ifdef RTS_SUPPORTS_THREADS
+static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
+static rtsBool workerWakeupPending = rtsFalse;
+#ifndef mingw32_TARGET_OS
+static int workerWakeupPipe[2];
+static rtsBool workerWakeupInited = rtsFalse;
+#endif
+#endif
+
 /* There's a clever trick here to avoid problems when the time wraps
  * around.  Since our maximum delay is smaller than 31 bits of ticks
  * (it's actually 31 bits of microseconds), we can safely check
@@ -157,6 +167,15 @@ awaitEvent(rtsBool wait)
        }
       }
 
+#ifdef RTS_SUPPORTS_THREADS
+      if(!workerWakeupInited) {
+          pipe(workerWakeupPipe);
+          workerWakeupInited = rtsTrue;
+      }
+      FD_SET(workerWakeupPipe[0], &rfd);
+      maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
+#endif
+      
       /* Release the scheduler lock while we do the poll.
        * this means that someone might muck with the blocked_queue
        * while we do this, but it shouldn't matter:
@@ -169,6 +188,11 @@ awaitEvent(rtsBool wait)
        *
        * I believe none of these cases lead to trouble --SDM.
        */
+      
+#ifdef RTS_SUPPORTS_THREADS
+      isWorkerBlockedInAwaitEvent = rtsTrue;
+      workerWakeupPending = rtsFalse;
+#endif
       RELEASE_LOCK(&sched_mutex);
 
       /* Check for any interesting events */
@@ -206,10 +230,17 @@ awaitEvent(rtsBool wait)
            }
          }
 #else /* on mingwin */
+#ifdef RTS_SUPPORTS_THREADS
+      isWorkerBlockedInAwaitEvent = rtsTrue;
+#endif
+      RELEASE_LOCK(&sched_mutex);
       while (1) {
          Sleep(0); /* don't busy wait */
 #endif /* mingw32_TARGET_OS */
          ACQUIRE_LOCK(&sched_mutex);
+#ifdef RTS_SUPPORTS_THREADS
+          isWorkerBlockedInAwaitEvent = rtsFalse;
+#endif
 
 #ifndef mingw32_TARGET_OS
          /* We got a signal; could be one of ours.  If so, we need
@@ -242,6 +273,18 @@ awaitEvent(rtsBool wait)
              return; /* still hold the lock */
          }
          
+#ifdef RTS_SUPPORTS_THREADS
+         /* If another worker thread wants to take over,
+          * return to the scheduler
+          */
+         if (needToYieldToReturningWorker()) {
+             return; /* still hold the lock */
+         }
+#endif
+         
+#ifdef RTS_SUPPORTS_THREADS
+          isWorkerBlockedInAwaitEvent = rtsTrue;
+#endif
          RELEASE_LOCK(&sched_mutex);
       }
 
@@ -287,6 +330,43 @@ awaitEvent(rtsBool wait)
              blocked_queue_tl = prev;
          }
       }
-
+      
+#if defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_TARGET_OS)
+       // if we were woken up by wakeBlockedWorkerThread,
+       // read the dummy byte from the pipe
+      if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
+          unsigned char dummy;
+          wait = rtsFalse;
+          read(workerWakeupPipe[0],&dummy,1);
+      }
+#endif
     } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
 }
+
+
+#ifdef RTS_SUPPORTS_THREADS
+/* wakeBlockedWorkerThread
+ *
+ * If a worker thread is currently blocked within awaitEvent,
+ * wake it.
+ * Must be called with sched_mutex held.
+ */
+
+void
+wakeBlockedWorkerThread()
+{
+#ifndef mingw32_TARGET_OS
+    if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
+       unsigned char dummy = 42;       // Any value will do here
+       
+                       // write something so that select() wakes up
+       write(workerWakeupPipe[1],&dummy,1);
+       workerWakeupPending = rtsTrue;
+    }
+#else
+       // The Win32 implementation currently uses a polling loop,
+       // so there is no need to explicitly wake it
+#endif
+}
+
+#endif
index 8e33d9d..d337078 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Signals.c,v 1.32 2003/01/10 22:08:20 wolfgang Exp $
+ * $Id: Signals.c,v 1.33 2003/01/25 15:54:50 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -49,6 +49,23 @@ StgPtr *next_pending_handler = pending_handler_buf;
 
 StgInt nocldstop = 0;
 
+
+#ifdef RTS_SUPPORTS_THREADS
+pthread_t signalHandlingThread;
+#endif
+
+       // Handle all signals in the current thread.
+       // Called from Capability.c whenever the main capability is granted to a thread
+       // and in installDefaultHandlers
+void
+handleSignalsInThisThread()
+{
+#ifdef RTS_SUPPORTS_THREADS
+    signalHandlingThread = pthread_self();
+#endif
+}
+
+
 /* -----------------------------------------------------------------------------
  * Allocate/resize the table of signal handlers.
  * -------------------------------------------------------------------------- */
@@ -105,6 +122,19 @@ generic_handler(int sig)
 {
     sigset_t signals;
 
+#if defined(THREADED_RTS)
+       // Make the thread that currently holds the main capability
+       // handle the signal.
+       // This makes sure that awaitEvent() is interrupted
+       // and it (hopefully) prevents race conditions
+       // (signal handlers are not atomic with respect to other threads)
+
+    if(pthread_self() != signalHandlingThread) {
+        pthread_kill(signalHandlingThread, sig);
+        return;
+    }
+#endif
+
     /* Can't call allocate from here.  Probably can't call malloc
        either.  However, we have to schedule a new thread somehow.
 
@@ -215,6 +245,8 @@ stg_sig_install(int sig, int spi, StgStablePtr *handler, void *mask)
 
     previous_spi = handlers[sig];
 
+    action.sa_flags = 0;
+    
     switch(spi) {
     case STG_SIG_IGN:
        handlers[sig] = STG_SIG_IGN;
@@ -348,6 +380,17 @@ shutdown_handler(int sig STG_UNUSED)
        pthread_kill(startup_guy, sig);
        return;
     }
+    // ToDo: The code for the threaded RTS below does something very
+    // similar. Maybe the SMP special case is not needed
+    // -- Wolfgang Thaller
+#elif defined(THREADED_RTS)
+       // Make the thread that currently holds the main capability
+       // handle the signal.
+       // This makes sure that awaitEvent() is interrupted
+    if(pthread_self() != signalHandlingThread) {
+        pthread_kill(signalHandlingThread, sig);
+        return;
+    }
 #endif
 
     // If we're already trying to interrupt the RTS, terminate with
@@ -383,6 +426,9 @@ initDefaultHandlers()
 #ifdef SMP
     startup_guy = pthread_self();
 #endif
+#ifdef RTS_SUPPORTS_THREADS
+       handleSignalsInThisThread();
+#endif
 
     // install the SIGINT handler
     action.sa_handler = shutdown_handler;
index d00c8b6..4db8086 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Signals.h,v 1.8 2002/09/17 12:11:45 simonmar Exp $
+ * $Id: Signals.h,v 1.9 2003/01/25 15:54:50 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -27,6 +27,8 @@ extern void startSignalHandlers(void);
 extern void markSignalHandlers (evac_fn evac);
 extern void initDefaultHandlers(void);
 
+extern void handleSignalsInThisThread(void);
+
 #else
 
 #define signals_pending() (rtsFalse)
index bc701b0..92b5c25 100644 (file)
@@ -157,15 +157,14 @@ startTask ( void (*taskStart)(void) )
   OSThreadId tid;
   
   /* If more than one worker thread is known to be blocked waiting
-     on thread_ready_cond, signal it rather than creating a new one.
+     on thread_ready_cond, don't create a new one.
   */
   if ( rts_n_waiting_tasks > 0) {
     IF_DEBUG(scheduler,fprintf(stderr,
                               "scheduler: startTask: %d tasks waiting, not creating new one.\n", 
                               rts_n_waiting_tasks););
-    signalCondition(&thread_ready_cond);
-    /* Not needed, but gives more 'interesting' thread schedules when testing */
-    yieldThread();
+    // the task will run as soon as a capability is available,
+    // so there's no need to wake it.
     return;
   }
 
index a0ae5cb..f216f62 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Weak.c,v 1.26 2002/12/11 15:36:54 simonmar Exp $
+ * $Id: Weak.c,v 1.27 2003/01/25 15:54:50 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -34,16 +34,28 @@ finalizeWeakPointersNow(void)
 {
   StgWeak *w;
   
+#if defined(RTS_SUPPORTS_THREADS)
+  rts_lock();
+#endif
   while ((w = weak_ptr_list)) {
     weak_ptr_list = w->link;
     if (w->header.info != &stg_DEAD_WEAK_info) {
        w->header.info = &stg_DEAD_WEAK_info;
        IF_DEBUG(weak,fprintf(stderr,"Finalising weak pointer at %p -> %p\n", w, w->key));
        if (w->finalizer != &stg_NO_FINALIZER_closure) {
+#if defined(RTS_SUPPORTS_THREADS)
+           rts_evalIO(w->finalizer,NULL);
+           rts_unlock();
+           rts_lock();
+#else
            rts_mainEvalIO(w->finalizer,NULL);
+#endif
        }
     }
   }
+#if defined(RTS_SUPPORTS_THREADS)
+  rts_unlock();
+#endif
 } 
 
 /*