[project @ 2003-09-21 22:20:51 by wolfgang]
authorwolfgang <unknown>
Sun, 21 Sep 2003 22:20:57 +0000 (22:20 +0000)
committerwolfgang <unknown>
Sun, 21 Sep 2003 22:20:57 +0000 (22:20 +0000)
Bound Threads
=============

Introduce a way to use foreign libraries that rely on thread local state
from multiple threads (mainly affects the threaded RTS).

See the file threads.tex in CVS at haskell-report/ffi/threads.tex
(not entirely finished yet) for a definition of this extension. A less formal
description is also found in the documentation of Control.Concurrent.

The changes mostly affect the THREADED_RTS (./configure --enable-threaded-rts),
except for saving & restoring errno on a per-TSO basis, which is also necessary
for the non-threaded RTS (a bugfix).

Detailed list of changes
------------------------

- errno is saved in the TSO object and restored when necessary:
ghc/includes/TSO.h, ghc/rts/Interpreter.c, ghc/rts/Schedule.c

- rts_mainLazyIO is no longer needed, main is no special case anymore
ghc/includes/RtsAPI.h, ghc/rts/RtsAPI.c, ghc/rts/Main.c, ghc/rts/Weak.c

- passCapability: a new function that releases the capability and "passes"
  it to a specific OS thread:
ghc/rts/Capability.h ghc/rts/Capability.c

- waitThread(), scheduleWaitThread() and schedule() get an optional
  Capability *initialCapability passed as an argument:
ghc/includes/SchedAPI.h, ghc/rts/Schedule.c, ghc/rts/RtsAPI.c

- Bound Thread scheduling (that's what this is all about):
ghc/rts/Schedule.h, ghc/rts/Schedule.c

- new Primop isCurrentThreadBound#:
ghc/compiler/prelude/primops.txt.pp, ghc/includes/PrimOps.h, ghc/rts/PrimOps.hc,
ghc/rts/Schedule.h, ghc/rts/Schedule.c

- a simple function, rtsSupportsBoundThreads, that returns true if THREADED_RTS
  is defined:
ghc/rts/Schedule.h, ghc/rts/Schedule.c

- a new implementation of forkProcess (the old implementation stays in place
  for the non-threaded case). Partially broken; works for the standard
  fork-and-exec case, but not for much else. A proper forkProcess is
  really next to impossible to implement:
ghc/rts/Schedule.c

- Library support for bound threads:
    Control.Concurrent.
      rtsSupportsBoundThreads, isCurrentThreadBound, forkOS,
      runInBoundThread, runInUnboundThread
libraries/base/Control/Concurrent.hs, libraries/base/Makefile,
libraries/base/include/HsBase.h, libraries/base/cbits/forkOS.c (new file)

16 files changed:
ghc/compiler/prelude/primops.txt.pp
ghc/includes/PrimOps.h
ghc/includes/RtsAPI.h
ghc/includes/SchedAPI.h
ghc/includes/TSO.h
ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Interpreter.c
ghc/rts/Linker.c
ghc/rts/Main.c
ghc/rts/Makefile
ghc/rts/PrimOps.hc
ghc/rts/RtsAPI.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h
ghc/rts/Weak.c

index f5fd8a7..af4a244 100644 (file)
@@ -1,5 +1,5 @@
 -----------------------------------------------------------------------
--- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $
+-- $Id: primops.txt.pp,v 1.29 2003/09/21 22:20:51 wolfgang Exp $
 --
 -- Primitive Operations
 --
@@ -1494,6 +1494,11 @@ primop LabelThreadOp "labelThread#" GenPrimOp
    with
    has_side_effects = True
    out_of_line      = True
+   
+primop  IsCurrentThreadBoundOp "isCurrentThreadBound#" GenPrimOp
+   State# RealWorld -> (# State# RealWorld, Int# #)
+   with
+   out_of_line = True
 
 ------------------------------------------------------------------------
 section "Weak pointers"
index cf67e61..9576f24 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $
+ * $Id: PrimOps.h,v 1.104 2003/09/21 22:20:52 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -281,6 +281,7 @@ EXTFUN_RTS(blockAsyncExceptionszh_fast);
 EXTFUN_RTS(unblockAsyncExceptionszh_fast);
 EXTFUN_RTS(myThreadIdzh_fast);
 EXTFUN_RTS(labelThreadzh_fast);
+EXTFUN_RTS(isCurrentThreadBoundzh_fast);
 
 extern int cmp_thread(StgPtr tso1, StgPtr tso2);
 extern int rts_getThreadId(StgPtr tso);
@@ -417,4 +418,5 @@ EXTFUN_RTS(mkApUpd0zh_fast);
    -------------------------------------------------------------------------- */
 #define ForeignObj_CLOSURE_DATA(c)  (((StgForeignObj *)c)->data)
 
+
 #endif /* PRIMOPS_H */
index 73a7354..d8e772f 100644 (file)
@@ -1,5 +1,5 @@
 /* ----------------------------------------------------------------------------
- * $Id: RtsAPI.h,v 1.35 2003/08/22 22:38:02 sof Exp $
+ * $Id: RtsAPI.h,v 1.36 2003/09/21 22:20:52 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -113,17 +113,14 @@ rts_eval_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
 SchedulerStatus 
 rts_evalIO ( HaskellObj p, /*out*/HaskellObj *ret );
 
-#if defined(COMPILING_RTS_MAIN)
-/* Used by the RTS' main() only */
-SchedulerStatus 
-rts_mainLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
-#endif
-
 SchedulerStatus
 rts_evalStableIO ( HsStablePtr s, /*out*/HsStablePtr *ret );
 
 SchedulerStatus 
-rts_evalLazyIO ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
+rts_evalLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
+
+SchedulerStatus 
+rts_evalLazyIO_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
 
 void
 rts_checkSchedStatus ( char* site, SchedulerStatus rc);
index 524b1da..b0cee60 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: SchedAPI.h,v 1.17 2002/12/27 12:33:21 panne Exp $
+ * $Id: SchedAPI.h,v 1.18 2003/09/21 22:20:53 wolfgang Exp $
  *
  * (c) The GHC Team 1998-2002
  *
@@ -16,7 +16,8 @@
 #define NO_PRI  0
 #endif
 
-extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret);
+extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret,
+                                  Capability *initialCapability);
 
 /* 
  * Creating threads
@@ -30,7 +31,8 @@ extern StgTSO *createThread(nat stack_size);
 extern void taskStart(void);
 #endif
 extern void scheduleThread(StgTSO *tso);
-extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret);
+extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret,
+                                          Capability *initialCapability);
 
 static inline void pushClosure   (StgTSO *tso, StgWord c) {
   tso->sp--;
index 7c6e1c0..56bc726 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $
+ * $Id: TSO.h,v 1.32 2003/09/21 22:20:53 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -194,7 +194,8 @@ typedef struct StgTSO_ {
   StgTSOBlockInfo    block_info;
   struct StgTSO_*    blocked_exceptions;
   StgThreadID        id;
-
+  int                saved_errno;
+  
   StgTSOTickyInfo    ticky; 
   StgTSOProfInfo     prof;
   StgTSOParInfo      par;
index d96b724..a2910ad 100644 (file)
@@ -149,6 +149,9 @@ void grabCapability(Capability** cap)
   free_capabilities = (*cap)->link;
   rts_n_free_capabilities--;
 #endif
+    IF_DEBUG(scheduler,
+            fprintf(stderr,"worker thread (%p): got capability\n",
+               osThreadId()));
 }
 
 /*
@@ -196,6 +199,9 @@ void releaseCapability(Capability* cap
     signalCondition(&thread_ready_cond);
   }
 #endif
+    IF_DEBUG(scheduler,
+            fprintf(stderr,"worker thread (%p): released capability\n",
+               osThreadId()));
  return;
 }
 
@@ -236,9 +242,9 @@ void
 grabReturnCapability(Mutex* pMutex, Capability** pCap)
 {
   IF_DEBUG(scheduler,
-          fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId()));
+          fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId()));
   IF_DEBUG(scheduler,
-          fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n",
+          fprintf(stderr,"worker (%p): returning; workers waiting: %d\n",
                   osThreadId(), rts_n_waiting_workers));
   if ( noCapabilities() ) {
     rts_n_waiting_workers++;
@@ -265,27 +271,30 @@ grabReturnCapability(Mutex* pMutex, Capability** pCap)
    -------------------------------------------------------------------------- */
 
 /*
- * Function: yieldToReturningWorker(Mutex*,Capability*)
+ * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*)
  *
  * Purpose:  when, upon entry to the Scheduler, an OS worker thread
  *           spots that one or more threads are blocked waiting for
  *           permission to return back their result, it gives up
- *           its Capability. 
+ *           its Capability.
+ *           Immediately afterwards, it tries to reaquire the Capabilty
+ *           using waitForWorkCapability.
+ *
  *
  * Pre-condition:  pMutex is assumed held and the thread possesses
  *                 a Capability.
- * Post-condition: pMutex is held and the Capability has
- *                 been given back.
+ * Post-condition: pMutex is held and the thread possesses
+ *                 a Capability.
  */
 void
-yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
+yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
 {
   if ( rts_n_waiting_workers > 0 ) {
     IF_DEBUG(scheduler,
             fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
     releaseCapability(*pCap);
         /* And wait for work */
-    waitForWorkCapability(pMutex, pCap, rtsFalse);
+    waitForWorkCapability(pMutex, pCap, pThreadCond);
     IF_DEBUG(scheduler,
             fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
                osThreadId()));
@@ -295,7 +304,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
 
 
 /*
- * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool)
+ * Function: waitForWorkCapability(Mutex*, Capability**, Condition*)
  *
  * Purpose:  wait for a Capability to become available. In
  *           the process of doing so, updates the number
@@ -303,22 +312,74 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
  *           work. That counter is used when deciding whether or
  *           not to create a new worker thread when an external
  *           call is made.
+ *           If pThreadCond is not NULL, a capability can be specifically
+ *           passed to this thread using passCapability.
  *
  * Pre-condition: pMutex is held.
  * Post-condition: pMutex is held and *pCap is held by the current thread
  */
+static Condition *passTarget = NULL;
 void 
-waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
+waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
 {
-  while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) {
-    rts_n_waiting_tasks++;
-    waitCondition(&thread_ready_cond, pMutex);
-    rts_n_waiting_tasks--;
+#ifdef SMP
+  #error SMP version not implemented
+#endif
+  IF_DEBUG(scheduler,
+          fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
+             osThreadId(),pThreadCond));
+  while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
+      || (!pThreadCond && passTarget)) {
+    if(pThreadCond)
+    {
+      waitCondition(pThreadCond, pMutex);
+      IF_DEBUG(scheduler,
+              fprintf(stderr,"worker thread (%p): get passed capability\n",
+                 osThreadId()));
+    }
+    else
+    {
+      rts_n_waiting_tasks++;
+      waitCondition(&thread_ready_cond, pMutex);
+      rts_n_waiting_tasks--;
+      IF_DEBUG(scheduler,
+              fprintf(stderr,"worker thread (%p): get normal capability\n",
+                 osThreadId()));
+    }
   }
+  passTarget = NULL;
   grabCapability(pCap);
   return;
 }
 
+/*
+ * Function: passCapability(Mutex*, Capability*, Condition*)
+ *
+ * Purpose:  Let go of the capability and make sure the thread associated
+ *          with the Condition pTargetThreadCond gets it next.
+ *
+ * Pre-condition: pMutex is held and cap is held by the current thread
+ * Post-condition: pMutex is held; cap will be grabbed by the "target"
+ *                thread when pMutex is released.
+ */
+
+void
+passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
+{
+#ifdef SMP
+  #error SMP version not implemented
+#endif
+    rts_n_free_capabilities = 1;
+    signalCondition(pTargetThreadCond);
+    passTarget = pTargetThreadCond;
+    IF_DEBUG(scheduler,
+            fprintf(stderr,"worker thread (%p): passCapability\n",
+               osThreadId()));
+}
+
+
 #endif /* RTS_SUPPORTS_THREADS */
 
 #if defined(SMP)
index dd6a7be..70acc15 100644 (file)
@@ -39,9 +39,9 @@ extern nat rts_n_free_capabilities;
 extern nat rts_n_waiting_workers;
 
 extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
-extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap);
-extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable);
-
+extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
+extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
+extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond);
 
 static inline rtsBool needToYieldToReturningWorker(void)
 {
index 7f408d7..888f3b8 100644 (file)
 #include "Disassembler.h"
 #include "Interpreter.h"
 
+#include <string.h>     /* for memcpy */
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
 
 /* --------------------------------------------------------------------------
  * The bytecode interpreter
@@ -1172,6 +1177,9 @@ run_BCO:
            memcpy(arguments, Sp, sizeof(W_) * stk_offset);
 #endif
 
+           // Restore the Haskell thread's current value of errno
+           errno = cap->r.rCurrentTSO->saved_errno;
+
            // There are a bunch of non-ptr words on the stack (the
            // ccall args, the ccall fun address and space for the
            // result), which we need to cover with an info table
@@ -1208,6 +1216,9 @@ run_BCO:
            LOAD_STACK_POINTERS;
            Sp += ret_dyn_size;
            
+           // Save the Haskell thread's current value of errno
+           cap->r.rCurrentTSO->saved_errno = errno;
+               
 #ifdef RTS_SUPPORTS_THREADS
            // Threaded RTS:
            // Copy the "arguments", which might include a return value,
index 8ec5972..6753876 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Linker.c,v 1.129 2003/09/11 15:12:25 wolfgang Exp $
+ * $Id: Linker.c,v 1.130 2003/09/21 22:20:54 wolfgang Exp $
  *
  * (c) The GHC Team, 2000-2003
  *
@@ -381,6 +381,7 @@ typedef struct _RtsSymbolVal {
       SymX(int2Integerzh_fast)                 \
       SymX(integer2Intzh_fast)                 \
       SymX(integer2Wordzh_fast)                        \
+      SymX(isCurrentThreadBoundzh_fast)                \
       SymX(isDoubleDenormalized)               \
       SymX(isDoubleInfinite)                   \
       SymX(isDoubleNaN)                                \
@@ -422,6 +423,7 @@ typedef struct _RtsSymbolVal {
       SymX(rts_eval)                           \
       SymX(rts_evalIO)                         \
       SymX(rts_evalLazyIO)                     \
+      SymX(rts_evalStableIO)                   \
       SymX(rts_eval_)                          \
       SymX(rts_getBool)                                \
       SymX(rts_getChar)                                \
@@ -455,6 +457,7 @@ typedef struct _RtsSymbolVal {
       SymX(rts_mkWord64)                       \
       SymX(rts_mkWord8)                                \
       SymX(rts_unlock)                         \
+      SymX(rtsSupportsBoundThreads)            \
       SymX(run_queue_hd)                       \
       SymX(setProgArgv)                                \
       SymX(startupHaskell)                     \
index a651eaa..6029921 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Main.c,v 1.39 2003/07/10 08:02:29 simonpj Exp $
+ * $Id: Main.c,v 1.40 2003/09/21 22:20:55 wolfgang Exp $
  *
  * (c) The GHC Team 1998-2000
  *
@@ -105,7 +105,9 @@ int main(int argc, char *argv[])
 #  else /* !PAR && !GRAN */
 
     /* ToDo: want to start with a larger stack size */
-    status = rts_mainLazyIO((HaskellObj)mainIO_closure, NULL);
+    rts_lock();
+    status = rts_evalLazyIO((HaskellObj)mainIO_closure, NULL);
+    rts_unlock();
 
 #  endif /* !PAR && !GRAN */
 
index 1e39b6f..6a75b87 100644 (file)
@@ -110,11 +110,17 @@ ifeq "$(way)" "mp"
 SRC_HC_OPTS += -I$$PVM_ROOT/include
 endif
 
-# Currently, you only get 'threads support' in the normal
-# way.
+# You get 'threads support' in the normal
+# and profiling ways.
 ifeq "$(GhcRtsThreaded)" "YES"
 ifeq "$(way)" ""
 SRC_CC_OPTS += -DTHREADED_RTS
+SRC_HC_OPTS += -optc-DTHREADED_RTS
+PACKAGE_CPP_OPTS += -DTHREADED_RTS
+endif
+ifeq "$(way)" "p"
+SRC_CC_OPTS += -DTHREADED_RTS
+SRC_HC_OPTS += -optc-DTHREADED_RTS
 PACKAGE_CPP_OPTS += -DTHREADED_RTS
 endif
 endif
index 53fabf6..9b3f3a4 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.112 2003/09/12 16:32:13 sof Exp $
+ * $Id: PrimOps.hc,v 1.113 2003/09/21 22:20:55 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2002
  *
@@ -1095,6 +1095,15 @@ FN_(labelThreadzh_fast)
   FE_
 }
 
+FN_(isCurrentThreadBoundzh_fast)
+{
+  /* no args */
+  I_ r;
+  FB_
+  r = (I_)(RET_STGCALL1(StgBool, isThreadBound, CurrentTSO));
+  RET_N(r);
+  FE_
+}
 
 /* -----------------------------------------------------------------------------
  * MVar primitives
@@ -1736,3 +1745,4 @@ FN_(asyncDoProczh_fast)
   FE_
 }
 #endif
+
index 3236d1e..651a497 100644 (file)
@@ -1,5 +1,5 @@
 /* ----------------------------------------------------------------------------
- * $Id: RtsAPI.c,v 1.45 2003/08/28 16:33:42 simonmar Exp $
+ * $Id: RtsAPI.c,v 1.46 2003/09/21 22:20:56 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2001
  *
@@ -21,6 +21,8 @@
 
 #include <stdlib.h>
 
+static Capability *rtsApiCapability = NULL;
+
 /* ----------------------------------------------------------------------------
    Building Haskell objects from C datatypes.
    ------------------------------------------------------------------------- */
@@ -385,7 +387,7 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret)
     StgTSO *tso;
 
     tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
-    return scheduleWaitThread(tso,ret);
+    return scheduleWaitThread(tso,ret,rtsApiCapability);
 }
 
 SchedulerStatus
@@ -394,7 +396,7 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
     StgTSO *tso;
     
     tso = createGenThread(stack_size, p);
-    return scheduleWaitThread(tso,ret);
+    return scheduleWaitThread(tso,ret,rtsApiCapability);
 }
 
 /*
@@ -407,22 +409,7 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
     StgTSO* tso; 
     
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
-    return scheduleWaitThread(tso,ret);
-}
-
-/*
- * Identical to rts_evalLazyIO(), but won't create a new task/OS thread
- * to evaluate the Haskell thread. Used by main() only. Hack.
- */
-SchedulerStatus
-rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret)
-{
-    StgTSO* tso;
-    
-    tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
-    scheduleThread(tso);
-    return waitThread(tso, ret);
+    return scheduleWaitThread(tso,ret,rtsApiCapability);
 }
 
 /*
@@ -440,9 +427,9 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
     
     p = (StgClosure *)deRefStablePtr(s);
     tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
-    stat = scheduleWaitThread(tso,&r);
+    stat = scheduleWaitThread(tso,&r,rtsApiCapability);
 
-    if (stat == Success) {
+    if (stat == Success && ret != NULL) {
        ASSERT(r != NULL);
        *ret = getStablePtr((StgPtr)r);
     }
@@ -454,12 +441,21 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
  * Like rts_evalIO(), but doesn't force the action's result.
  */
 SchedulerStatus
-rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
+rts_evalLazyIO (HaskellObj p, /*out*/HaskellObj *ret)
+{
+    StgTSO *tso;
+
+    tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
+    return scheduleWaitThread(tso,ret,rtsApiCapability);
+}
+
+SchedulerStatus
+rts_evalLazyIO_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
 {
     StgTSO *tso;
 
     tso = createIOThread(stack_size, p);
-    return scheduleWaitThread(tso,ret);
+    return scheduleWaitThread(tso,ret,rtsApiCapability);
 }
 
 /* Convenience function for decoding the returned status. */
@@ -486,18 +482,13 @@ void
 rts_lock()
 {
 #ifdef RTS_SUPPORTS_THREADS
-       Capability *cap;
        ACQUIRE_LOCK(&sched_mutex);
        
                // we request to get the capability immediately, in order to
                // a) stop other threads from using allocate()
                // b) wake the current worker thread from awaitEvent()
                //       (so that a thread started by rts_eval* will start immediately)
-       grabReturnCapability(&sched_mutex,&cap);
-       
-               // now that we have the capability, we don't need it anymore
-               // (other threads will continue to run as soon as we release the sched_mutex)
-       releaseCapability(cap);
+       grabReturnCapability(&sched_mutex,&rtsApiCapability);
        
                // In the RTS hasn't been entered yet,
                // start a RTS task.
@@ -511,6 +502,7 @@ void
 rts_unlock()
 {
 #ifdef RTS_SUPPORTS_THREADS
+       rtsApiCapability = NULL;
        RELEASE_LOCK(&sched_mutex);
 #endif
 }
index c58584f..d29b6bb 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $
+ * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
 #include <stdlib.h>
 #include <stdarg.h>
 
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
 //@subsection Variables and Data structures
 
  */
 StgMainThread *main_threads = NULL;
 
-#ifdef THREADED_RTS
-// Pointer to the thread that executes main
-// When this thread is finished, the program terminates
-// by calling shutdownHaskellAndExit.
-// It would be better to add a call to shutdownHaskellAndExit
-// to the Main.main wrapper and to remove this hack.
-StgMainThread *main_main_thread = NULL;
-#endif
-
 /* Thread queues.
  * Locks required: sched_mutex.
  */
@@ -249,7 +244,7 @@ static rtsBool shutting_down_scheduler = rtsFalse;
 
 void            addToBlockedQueue ( StgTSO *tso );
 
-static void     schedule          ( void );
+static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
        void     interruptStgRts   ( void );
 
 static void     detectBlackHoles  ( void );
@@ -311,7 +306,7 @@ static void taskStart(void);
 static void
 taskStart(void)
 {
-  schedule();
+  schedule(NULL,NULL);
 }
 #endif
 
@@ -363,10 +358,10 @@ startSchedulerTask(void)
    ------------------------------------------------------------------------ */
 //@cindex schedule
 static void
-schedule( void )
+schedule( StgMainThread *mainThread, Capability *initialCapability )
 {
   StgTSO *t;
-  Capability *cap;
+  Capability *cap = initialCapability;
   StgThreadReturnCode ret;
 #if defined(GRAN)
   rtsEvent *event;
@@ -386,8 +381,16 @@ schedule( void )
   ACQUIRE_LOCK(&sched_mutex);
  
 #if defined(RTS_SUPPORTS_THREADS)
-  waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
-  IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
+  /* in the threaded case, the capability is either passed in via the initialCapability
+     parameter, or initialized inside the scheduler loop */
+
+  IF_DEBUG(scheduler,
+    fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
+           osThreadId(), osThreadId()));
+  IF_DEBUG(scheduler,
+    fprintf(stderr,"### main thread: %p\n",mainThread));
+  IF_DEBUG(scheduler,
+    fprintf(stderr,"### initial cap: %p\n",initialCapability));
 #else
   /* simply initialise it in the non-threaded case */
   grabCapability(&cap);
@@ -431,8 +434,19 @@ schedule( void )
 #if defined(RTS_SUPPORTS_THREADS)
     /* Check to see whether there are any worker threads
        waiting to deposit external call results. If so,
-       yield our capability */
-    yieldToReturningWorker(&sched_mutex, &cap);
+       yield our capability... if we have a capability, that is. */
+    if(cap)
+      yieldToReturningWorker(&sched_mutex, &cap,
+         mainThread ? &mainThread->bound_thread_cond : NULL);
+
+    /* If we do not currently hold a capability, we wait for one */
+    if(!cap)
+    {
+      waitForWorkCapability(&sched_mutex, &cap,
+         mainThread ? &mainThread->bound_thread_cond : NULL);
+      IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
+                                     osThreadId()));
+    }
 #endif
 
     /* If we're interrupted (the user pressed ^C, or some other
@@ -463,55 +477,63 @@ schedule( void )
      */
 #if defined(RTS_SUPPORTS_THREADS)
     { 
-      StgMainThread *m, **prev;
-      prev = &main_threads;
-      for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
-       switch (m->tso->what_next) {
-       case ThreadComplete:
-         if (m->ret) {
-              // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-             *(m->ret) = (StgClosure *)m->tso->sp[1]; 
-         }
-         *prev = m->link;
-         m->stat = Success;
-         broadcastCondition(&m->wakeup);
-#ifdef DEBUG
-         removeThreadLabel((StgWord)m->tso);
-#endif
-          if(m == main_main_thread)
-          {
-              releaseCapability(cap);
-              startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
-              RELEASE_LOCK(&sched_mutex);
-              shutdownHaskellAndExit(EXIT_SUCCESS);
-          }
-         break;
-       case ThreadKilled:
-         if (m->ret) *(m->ret) = NULL;
-         *prev = m->link;
-         if (was_interrupted) {
-           m->stat = Interrupted;
-         } else {
-           m->stat = Killed;
-         }
-         broadcastCondition(&m->wakeup);
+       StgMainThread *m, **prev;
+       prev = &main_threads;
+       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
+         if (m->tso->what_next == ThreadComplete
+             || m->tso->what_next == ThreadKilled)
+         {
+           if(m == mainThread)
+           {
+              if(m->tso->what_next == ThreadComplete)
+              {
+                if (m->ret)
+                {
+                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+                  *(m->ret) = (StgClosure *)m->tso->sp[1]; 
+                }
+                m->stat = Success;
+              }
+              else
+              {
+                if (m->ret)
+                {
+                  *(m->ret) = NULL;
+                }
+                if (was_interrupted)
+                {
+                  m->stat = Interrupted;
+                }
+                else
+                {
+                  m->stat = Killed;
+                }
+              }
+              *prev = m->link;
+           
 #ifdef DEBUG
-         removeThreadLabel((StgWord)m->tso);
+             removeThreadLabel((StgWord)m->tso);
 #endif
-          if(m == main_main_thread)
-          {
               releaseCapability(cap);
-              startTask(taskStart);    // thread-safe-call to shutdownHaskellAndExit
               RELEASE_LOCK(&sched_mutex);
-              shutdownHaskellAndExit(EXIT_SUCCESS);
+              return;
+            }
+            else
+            {
+                // The current OS thread can not handle the fact that the Haskell
+                // thread "m" has ended. 
+                // "m" is bound; the scheduler loop in it's bound OS thread has
+                // to return, so let's pass our capability directly to that thread.
+              passCapability(&sched_mutex, cap, &m->bound_thread_cond);
+              cap = NULL;
+            }
           }
-         break;
-       default:
-         break;
        }
-      }
     }
-
+    
+    if(!cap)   // If we gave our capability away,
+      continue;        // go to the top to get it back
+      
 #else /* not threaded */
 
 # if defined(PAR)
@@ -1067,6 +1089,63 @@ schedule( void )
     IF_DEBUG(sanity,checkTSO(t));
 #endif
 
+#ifdef THREADED_RTS
+    {
+      StgMainThread *m;
+      for(m = main_threads; m; m = m->link)
+      {
+       if(m->tso == t)
+         break;
+      }
+      
+      if(m)
+      {
+       if(m == mainThread)
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
+                   t, osThreadId()));
+         // yes, the Haskell thread is bound to the current native thread
+       }
+       else
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
+                   t, osThreadId()));
+         // no, bound to a different Haskell thread: pass to that thread
+         PUSH_ON_RUN_QUEUE(t);
+         passCapability(&sched_mutex,cap,&m->bound_thread_cond);
+         cap = NULL;
+         continue;
+       }
+      }
+      else
+      {
+        // The thread we want to run is not bound.
+       if(mainThread == NULL)
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
+                   t, osThreadId()));
+          // if we are a worker thread,
+         // we may run it here
+       }
+       else
+       {
+         IF_DEBUG(scheduler,
+           fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
+                   t, mainThread, osThreadId()));
+         // no, the current native thread is bound to a different
+         // Haskell thread, so pass it to any worker thread
+         PUSH_ON_RUN_QUEUE(t);
+         releaseCapability(cap);
+         cap = NULL;
+         continue; 
+       }
+      }
+    }
+#endif
+
     cap->r.rCurrentTSO = t;
     
     /* context switches are now initiated by the timer signal, unless
@@ -1103,7 +1182,9 @@ run_thread:
        ret = ThreadFinished;
        break;
     case ThreadRunGHC:
+       errno = t->saved_errno;
        ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+       t->saved_errno = errno;
        break;
     case ThreadInterpret:
        ret = interpretBCO(cap);
@@ -1122,7 +1203,7 @@ run_thread:
     ACQUIRE_LOCK(&sched_mutex);
     
 #ifdef RTS_SUPPORTS_THREADS
-    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
+    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
 #elif !defined(GRAN) && !defined(PAR)
     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
 #endif
@@ -1492,19 +1573,54 @@ run_thread:
 }
 
 /* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#ifdef THREADED_RTS
+  return rtsTrue;
+#else
+  return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+StgBool
+isThreadBound(StgTSO* tso)
+{
+#ifdef THREADED_RTS
+  StgMainThread *m;
+  for(m = main_threads; m; m = m->link)
+  {
+    if(m->tso == tso)
+      return rtsTrue;
+  }
+#endif
+  return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
  * Singleton fork(). Do not copy any running threads.
  * ------------------------------------------------------------------------- */
 
+static void 
+deleteThreadImmediately(StgTSO *tso);
+
 StgInt
 forkProcess(StgTSO* tso)
 {
 #ifndef mingw32_TARGET_OS
   pid_t pid;
   StgTSO* t,*next;
-  StgMainThread *m;
-  rtsBool doKill;
 
   IF_DEBUG(scheduler,sched_belch("forking!"));
+  ACQUIRE_LOCK(&sched_mutex);
 
   pid = fork();
   if (pid) { /* parent */
@@ -1512,6 +1628,43 @@ forkProcess(StgTSO* tso)
   /* just return the pid */
     
   } else { /* child */
+#ifdef THREADED_RTS
+    /* wipe all other threads */
+    run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+    tso->link = END_TSO_QUEUE;
+    
+    for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      next = t->link;
+      
+      /* Don't kill the current thread.. */
+      if (t->id == tso->id) {
+       continue;
+      }
+      
+      if (isThreadBound(t)) {
+       // If the thread is bound, the OS thread that the thread is bound to
+       // no longer exists after the fork() system call.
+       // The bound Haskell thread is therefore unable to run at all;
+       // we must not give it a chance to survive by catching the
+       // ThreadKilled exception. So we kill it "brutally" rather than
+       // using deleteThread.
+       deleteThreadImmediately(t);
+      } else {
+       deleteThread(t);
+      }
+    }
+    
+    if (isThreadBound(tso)) {
+    } else {
+      // If the current is not bound, then we should make it so.
+      // The OS thread left over by fork() is special in that the process
+      // will terminate as soon as the thread terminates; 
+      // we'd expect forkProcess to behave similarily.
+      // FIXME - we don't do this.
+    }
+#else
+  StgMainThread *m;
+  rtsBool doKill;
   /* wipe all other threads */
   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
   tso->link = END_TSO_QUEUE;
@@ -1555,7 +1708,9 @@ forkProcess(StgTSO* tso)
       deleteThread(t);
     }
   }
+#endif
   }
+  RELEASE_LOCK(&sched_mutex);
   return pid;
 #else /* mingw32 */
   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
@@ -1618,6 +1773,7 @@ suspendThread( StgRegTable *reg,
 {
   nat tok;
   Capability *cap;
+  int saved_errno = errno;
 
   /* assume that *reg is a pointer to the StgRegTable part
    * of a Capability.
@@ -1667,6 +1823,8 @@ suspendThread( StgRegTable *reg,
   /* Other threads _might_ be available for execution; signal this */
   THREAD_RUNNABLE();
   RELEASE_LOCK(&sched_mutex);
+  
+  errno = saved_errno;
   return tok; 
 }
 
@@ -1676,6 +1834,7 @@ resumeThread( StgInt tok,
 {
   StgTSO *tso, **prev;
   Capability *cap;
+  int saved_errno = errno;
 
 #if defined(RTS_SUPPORTS_THREADS)
   /* Wait for permission to re-enter the RTS with the result. */
@@ -1717,6 +1876,7 @@ resumeThread( StgInt tok,
 #if defined(RTS_SUPPORTS_THREADS)
   RELEASE_LOCK(&sched_mutex);
 #endif
+  errno = saved_errno;
   return &cap->r;
 }
 
@@ -1845,6 +2005,8 @@ createThread(nat size)
   tso->why_blocked  = NotBlocked;
   tso->blocked_exceptions = NULL;
 
+  tso->saved_errno = 0;
+  
   tso->stack_size   = stack_size;
   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
                               - TSO_STRUCT_SIZEW;
@@ -2016,10 +2178,8 @@ activateSpark (rtsSpark spark)
 }
 #endif
 
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m
-#if defined(THREADED_RTS)
-                                  , rtsBool blockWaiting
-#endif
+static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
+                                  Capability *initialCapability
                                   );
 
 
@@ -2061,7 +2221,7 @@ void scheduleThread(StgTSO* tso)
 }
 
 SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
+scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
 {      // Precondition: sched_mutex must be held
   StgMainThread *m;
 
@@ -2071,6 +2231,9 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
   m->stat = NoStatus;
 #if defined(RTS_SUPPORTS_THREADS)
   initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+  initCondition(&m->bound_thread_cond);
+#endif
 #endif
 
   /* Put the thread on the main-threads list prior to scheduling the TSO.
@@ -2088,11 +2251,8 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
   main_threads = m;
 
   scheduleThread_(tso);
-#if defined(THREADED_RTS)
-  return waitThread_(m, rtsTrue);
-#else
-  return waitThread_(m);
-#endif
+
+  return waitThread_(m, initialCapability);
 }
 
 /* ---------------------------------------------------------------------------
@@ -2259,13 +2419,13 @@ finishAllThreads ( void )
 {
    do {
       while (run_queue_hd != END_TSO_QUEUE) {
-         waitThread ( run_queue_hd, NULL);
+         waitThread ( run_queue_hd, NULL, NULL );
       }
       while (blocked_queue_hd != END_TSO_QUEUE) {
-         waitThread ( blocked_queue_hd, NULL);
+         waitThread ( blocked_queue_hd, NULL, NULL );
       }
       while (sleeping_queue != END_TSO_QUEUE) {
-         waitThread ( blocked_queue_hd, NULL);
+         waitThread ( blocked_queue_hd, NULL, NULL );
       }
    } while 
       (blocked_queue_hd != END_TSO_QUEUE || 
@@ -2274,7 +2434,7 @@ finishAllThreads ( void )
 }
 
 SchedulerStatus
-waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
 { 
   StgMainThread *m;
   SchedulerStatus stat;
@@ -2285,6 +2445,9 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
   m->stat = NoStatus;
 #if defined(RTS_SUPPORTS_THREADS)
   initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+  initCondition(&m->bound_thread_cond);
+#endif
 #endif
 
   /* see scheduleWaitThread() comment */
@@ -2293,45 +2456,25 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret)
   main_threads = m;
 
   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
-#if defined(THREADED_RTS)
-  stat = waitThread_(m, rtsFalse);
-#else
-  stat = waitThread_(m);
-#endif
+
+  stat = waitThread_(m,initialCapability);
+  
   RELEASE_LOCK(&sched_mutex);
   return stat;
 }
 
 static
 SchedulerStatus
-waitThread_(StgMainThread* m
-#if defined(THREADED_RTS)
-           , rtsBool blockWaiting
-#endif
-          )
+waitThread_(StgMainThread* m, Capability *initialCapability)
 {
   SchedulerStatus stat;
 
   // Precondition: sched_mutex must be held.
   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
 
-#if defined(RTS_SUPPORTS_THREADS)
-
-# if defined(THREADED_RTS)
-  if (!blockWaiting) {
-    /* In the threaded case, the OS thread that called main()
-     * gets to enter the RTS directly without going via another
-     * task/thread.
-     */
-    main_main_thread = m;
-    RELEASE_LOCK(&sched_mutex);
-    schedule();
-    ACQUIRE_LOCK(&sched_mutex);
-    main_main_thread = NULL;
-    ASSERT(m->stat != NoStatus);
-  } else 
-# endif
-  {
+#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
+  {    // FIXME: does this still make sense?
+       // It's not for the threaded rts => SMP only
     do {
       waitCondition(&m->wakeup, &sched_mutex);
     } while (m->stat == NoStatus);
@@ -2343,10 +2486,11 @@ waitThread_(StgMainThread* m
   CurrentProc = MainProc;             // PE to run it on
 
   RELEASE_LOCK(&sched_mutex);
-  schedule();
+  schedule(m,initialCapability);
 #else
   RELEASE_LOCK(&sched_mutex);
-  schedule();
+  schedule(m,initialCapability);
+  ACQUIRE_LOCK(&sched_mutex);
   ASSERT(m->stat != NoStatus);
 #endif
 
@@ -2354,6 +2498,9 @@ waitThread_(StgMainThread* m
 
 #if defined(RTS_SUPPORTS_THREADS)
   closeCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+  closeCondition(&m->bound_thread_cond);
+#endif
 #endif
 
   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
@@ -3327,6 +3474,18 @@ deleteThread(StgTSO *tso)
   raiseAsync(tso,NULL);
 }
 
+static void 
+deleteThreadImmediately(StgTSO *tso)
+{ // for forkProcess only:
+  // delete thread without giving it a chance to catch the KillThread exception
+
+  if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+      return;
+  }
+  unblockThread(tso);
+  tso->what_next = ThreadKilled;
+}
+
 void
 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
 {
index b2a07e4..fccac3c 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.38 2003/03/17 14:47:48 simonmar Exp $
+ * $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $
  *
  * (c) The GHC Team 1998-1999
  *
@@ -149,6 +149,8 @@ extern nat         rts_n_waiting_workers;
 extern nat         rts_n_waiting_tasks;
 #endif
 
+StgBool rtsSupportsBoundThreads(void);
+StgBool isThreadBound(StgTSO *tso);
 StgInt forkProcess(StgTSO *tso);
 
 extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
@@ -166,11 +168,13 @@ void resurrectThreads( StgTSO * );
  *
  * These are the threads which clients have requested that we run.  
  *
- * In a 'threaded' build, we might have several concurrent clients all
- * waiting for results, and each one will wait on a condition variable
- * until the result is available.
+ * In a 'threaded' build, each of these corresponds to one bound thread.
+ * The pointer to the StgMainThread is passed as a parameter to schedule;
+ * this invocation of schedule will always pass this main thread's
+ * bound_thread_cond to waitForkWorkCapability; OS-thread-switching
+ * takes place using passCapability.
  *
- * In non-SMP, clients are strictly nested: the first client calls
+ * In non-threaded builds, clients are strictly nested: the first client calls
  * into the RTS, which might call out again to C with a _ccall_GC, and
  * eventually re-enter the RTS.
  *
@@ -188,7 +192,6 @@ typedef struct StgMainThread_ {
 #if defined(RTS_SUPPORTS_THREADS)
   Condition        wakeup;
 #if defined(THREADED_RTS)
-  rtsBool          thread_bound;
   Condition        bound_thread_cond;
 #endif
 #endif
index fe2ae74..7201ba1 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Weak.c,v 1.29 2003/03/26 17:43:05 sof Exp $
+ * $Id: Weak.c,v 1.30 2003/09/21 22:20:56 wolfgang Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -35,28 +35,20 @@ finalizeWeakPointersNow(void)
 {
   StgWeak *w;
   
-#if defined(RTS_SUPPORTS_THREADS)
   rts_lock();
-#endif
   while ((w = weak_ptr_list)) {
     weak_ptr_list = w->link;
     if (w->header.info != &stg_DEAD_WEAK_info) {
        w->header.info = &stg_DEAD_WEAK_info;
        IF_DEBUG(weak,fprintf(stderr,"Finalising weak pointer at %p -> %p\n", w, w->key));
        if (w->finalizer != &stg_NO_FINALIZER_closure) {
-#if defined(RTS_SUPPORTS_THREADS)
-           rts_evalIO(w->finalizer,NULL);
+           rts_evalLazyIO(w->finalizer,NULL);
            rts_unlock();
            rts_lock();
-#else
-           rts_mainLazyIO(w->finalizer,NULL);
-#endif
        }
     }
   }
-#if defined(RTS_SUPPORTS_THREADS)
   rts_unlock();
-#endif
 } 
 
 /*