add numSparks :: IO Int (#4167)
[ghc-base.git] / GHC / Conc.lhs
index 96e00e0..0d17457 100644 (file)
@@ -29,14 +29,18 @@ module GHC.Conc
 
         -- * Forking and suchlike
         , forkIO        -- :: IO a -> IO ThreadId
 
         -- * Forking and suchlike
         , forkIO        -- :: IO a -> IO ThreadId
+        , forkIOUnmasked
         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
+        , forkOnIOUnmasked
         , numCapabilities -- :: Int
         , numCapabilities -- :: Int
+        , numSparks      -- :: IO Int
         , childHandler  -- :: Exception -> IO ()
         , myThreadId    -- :: IO ThreadId
         , killThread    -- :: ThreadId -> IO ()
         , throwTo       -- :: ThreadId -> Exception -> IO ()
         , par           -- :: a -> b -> b
         , pseq          -- :: a -> b -> b
         , childHandler  -- :: Exception -> IO ()
         , myThreadId    -- :: IO ThreadId
         , killThread    -- :: ThreadId -> IO ()
         , throwTo       -- :: ThreadId -> Exception -> IO ()
         , par           -- :: a -> b -> b
         , pseq          -- :: a -> b -> b
+        , runSparks
         , yield         -- :: IO ()
         , labelThread   -- :: ThreadId -> String -> IO ()
 
         , yield         -- :: IO ()
         , labelThread   -- :: ThreadId -> String -> IO ()
 
@@ -49,17 +53,6 @@ module GHC.Conc
         , threadWaitRead        -- :: Int -> IO ()
         , threadWaitWrite       -- :: Int -> IO ()
 
         , threadWaitRead        -- :: Int -> IO ()
         , threadWaitWrite       -- :: Int -> IO ()
 
-        -- * MVars
-        , MVar(..)
-        , newMVar       -- :: a -> IO (MVar a)
-        , newEmptyMVar  -- :: IO (MVar a)
-        , takeMVar      -- :: MVar a -> IO a
-        , putMVar       -- :: MVar a -> a -> IO ()
-        , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
-        , tryPutMVar    -- :: MVar a -> a -> IO Bool
-        , isEmptyMVar   -- :: MVar a -> IO Bool
-        , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
-
         -- * TVars
         , STM(..)
         , atomically    -- :: STM a -> IO a
         -- * TVars
         , STM(..)
         , atomically    -- :: STM a -> IO a
@@ -77,6 +70,7 @@ module GHC.Conc
         , unsafeIOToSTM -- :: IO a -> STM a
 
         -- * Miscellaneous
         , unsafeIOToSTM -- :: IO a -> STM a
 
         -- * Miscellaneous
+        , withMVar
 #ifdef mingw32_HOST_OS
         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
 #ifdef mingw32_HOST_OS
         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
@@ -87,10 +81,13 @@ module GHC.Conc
 #endif
 
 #ifndef mingw32_HOST_OS
 #endif
 
 #ifndef mingw32_HOST_OS
-        , signalHandlerLock
+        , Signal, HandlerFun, setHandler, runHandlers
 #endif
 
         , ensureIOManagerIsRunning
 #endif
 
         , ensureIOManagerIsRunning
+#ifndef mingw32_HOST_OS
+        , syncIOManager
+#endif
 
 #ifdef mingw32_HOST_OS
         , ConsoleEvent(..)
 
 #ifdef mingw32_HOST_OS
         , ConsoleEvent(..)
@@ -110,28 +107,43 @@ import System.Posix.Internals
 import Foreign
 import Foreign.C
 
 import Foreign
 import Foreign.C
 
+#ifdef mingw32_HOST_OS
+import Data.Typeable
+#endif
+
+#ifndef mingw32_HOST_OS
+import Data.Dynamic
+#endif
+import Control.Monad
 import Data.Maybe
 
 import GHC.Base
 import Data.Maybe
 
 import GHC.Base
-import {-# SOURCE #-} GHC.Handle
-import GHC.IOBase
+#ifndef mingw32_HOST_OS
+import GHC.Debug
+#endif
+import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
+import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
+import GHC.IO
+import GHC.IO.Exception
+import GHC.Exception
+import GHC.IORef
+import GHC.MVar
 import GHC.Num          ( Num(..) )
 import GHC.Real         ( fromIntegral )
 import GHC.Num          ( Num(..) )
 import GHC.Real         ( fromIntegral )
+#ifndef mingw32_HOST_OS
+import GHC.IOArray
+import GHC.Arr          ( inRange )
+#endif
 #ifdef mingw32_HOST_OS
 import GHC.Real         ( div )
 #ifdef mingw32_HOST_OS
 import GHC.Real         ( div )
-import GHC.Ptr          ( plusPtr, FunPtr(..) )
+import GHC.Ptr
 #endif
 #ifdef mingw32_HOST_OS
 import GHC.Read         ( Read )
 import GHC.Enum         ( Enum )
 #endif
 #endif
 #ifdef mingw32_HOST_OS
 import GHC.Read         ( Read )
 import GHC.Enum         ( Enum )
 #endif
-import GHC.Exception    ( SomeException(..), throw )
 import GHC.Pack         ( packCString# )
 import GHC.Pack         ( packCString# )
-import GHC.Ptr          ( Ptr(..) )
-import GHC.STRef
 import GHC.Show         ( Show(..), showString )
 import GHC.Show         ( Show(..), showString )
-import Data.Typeable
-import GHC.Err
 
 infixr 0 `par`, `pseq`
 \end{code}
 
 infixr 0 `par`, `pseq`
 \end{code}
@@ -202,11 +214,11 @@ thread.
 The new thread will be a lightweight thread; if you want to use a foreign
 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
 
 The new thread will be a lightweight thread; if you want to use a foreign
 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
 
-GHC note: the new thread inherits the /blocked/ state of the parent 
-(see 'Control.Exception.block').
+GHC note: the new thread inherits the /masked/ state of the parent 
+(see 'Control.Exception.mask').
 
 The newly created thread has an exception handler that discards the
 
 The newly created thread has an exception handler that discards the
-exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
+exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and
 'ThreadKilled', and passes all other exceptions to the uncaught
 exception handler (see 'setUncaughtExceptionHandler').
 -}
 'ThreadKilled', and passes all other exceptions to the uncaught
 exception handler (see 'setUncaughtExceptionHandler').
 -}
@@ -216,6 +228,11 @@ forkIO action = IO $ \ s ->
  where
   action_plus = catchException action childHandler
 
  where
   action_plus = catchException action childHandler
 
+-- | Like 'forkIO', but the child thread is created with asynchronous exceptions
+-- unmasked (see 'Control.Exception.mask').
+forkIOUnmasked :: IO () -> IO ThreadId
+forkIOUnmasked io = forkIO (unsafeUnmask io)
+
 {- |
 Like 'forkIO', but lets you specify on which CPU the thread is
 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
 {- |
 Like 'forkIO', but lets you specify on which CPU the thread is
 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
@@ -235,6 +252,11 @@ forkOnIO (I# cpu) action = IO $ \ s ->
  where
   action_plus = catchException action childHandler
 
  where
   action_plus = catchException action childHandler
 
+-- | Like 'forkOnIO', but the child thread is created with
+-- asynchronous exceptions unmasked (see 'Control.Exception.mask').
+forkOnIOUnmasked :: Int -> IO () -> IO ThreadId
+forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io)
+
 -- | the value passed to the @+RTS -N@ flag.  This is the number of
 -- Haskell threads that can run truly simultaneously at any given
 -- time, and is typically set to the number of physical CPU cores on
 -- | the value passed to the @+RTS -N@ flag.  This is the number of
 -- Haskell threads that can run truly simultaneously at any given
 -- time, and is typically set to the number of physical CPU cores on
@@ -244,6 +266,10 @@ numCapabilities = unsafePerformIO $  do
                     n <- peek n_capabilities
                     return (fromIntegral n)
 
                     n <- peek n_capabilities
                     return (fromIntegral n)
 
+-- | Returns the number of sparks currently in the local spark pool
+numSparks :: IO Int
+numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #)
+
 #if defined(mingw32_HOST_OS) && defined(__PIC__)
 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
 #else
 #if defined(mingw32_HOST_OS) && defined(__PIC__)
 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
 #else
@@ -256,9 +282,9 @@ real_handler :: SomeException -> IO ()
 real_handler se@(SomeException ex) =
   -- ignore thread GC and killThread exceptions:
   case cast ex of
 real_handler se@(SomeException ex) =
   -- ignore thread GC and killThread exceptions:
   case cast ex of
-  Just BlockedOnDeadMVar                -> return ()
+  Just BlockedIndefinitelyOnMVar        -> return ()
   _ -> case cast ex of
   _ -> case cast ex of
-       Just BlockedIndefinitely         -> return ()
+       Just BlockedIndefinitelyOnSTM    -> return ()
        _ -> case cast ex of
             Just ThreadKilled           -> return ()
             _ -> case cast ex of
        _ -> case cast ex of
             Just ThreadKilled           -> return ()
             _ -> case cast ex of
@@ -266,16 +292,11 @@ real_handler se@(SomeException ex) =
                  Just StackOverflow     -> reportStackOverflow
                  _                      -> reportError se
 
                  Just StackOverflow     -> reportStackOverflow
                  _                      -> reportError se
 
-{- | 'killThread' terminates the given thread (GHC only).
-Any work already done by the thread isn\'t
-lost: the computation is suspended until required by another thread.
-The memory used by the thread will be garbage collected if it isn\'t
-referenced from anywhere.  The 'killThread' function is defined in
-terms of 'throwTo':
+{- | 'killThread' raises the 'ThreadKilled' exception in the given
+thread (GHC only). 
 
 > killThread tid = throwTo tid ThreadKilled
 
 
 > killThread tid = throwTo tid ThreadKilled
 
-Killthread is a no-op if the target thread has already completed.
 -}
 killThread :: ThreadId -> IO ()
 killThread tid = throwTo tid ThreadKilled
 -}
 killThread :: ThreadId -> IO ()
 killThread tid = throwTo tid ThreadKilled
@@ -290,26 +311,36 @@ when dealing with race conditions: eg. if there are two threads that
 can kill each other, it is guaranteed that only one of the threads
 will get to kill the other.
 
 can kill each other, it is guaranteed that only one of the threads
 will get to kill the other.
 
+Whatever work the target thread was doing when the exception was
+raised is not lost: the computation is suspended until required by
+another thread.
+
 If the target thread is currently making a foreign call, then the
 exception will not be raised (and hence 'throwTo' will not return)
 until the call has completed.  This is the case regardless of whether
 If the target thread is currently making a foreign call, then the
 exception will not be raised (and hence 'throwTo' will not return)
 until the call has completed.  This is the case regardless of whether
-the call is inside a 'block' or not.
+the call is inside a 'mask' or not.
 
 Important note: the behaviour of 'throwTo' differs from that described in
 the paper \"Asynchronous exceptions in Haskell\"
 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
 a more synchronous design in which 'throwTo' does not return until the exception
 
 Important note: the behaviour of 'throwTo' differs from that described in
 the paper \"Asynchronous exceptions in Haskell\"
 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
 a more synchronous design in which 'throwTo' does not return until the exception
-is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
-Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
-the paper).
-
-There is currently no guarantee that the exception delivered by 'throwTo' will be
-delivered at the first possible opportunity.  In particular, if a thread may 
-unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
-a pending 'throwTo'.  This is arguably undesirable behaviour.
-
- -}
+is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
+Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
+the paper).  Unlike other interruptible operations, however, 'throwTo'
+is /always/ interruptible, even if it does not actually block.
+
+There is no guarantee that the exception will be delivered promptly,
+although the runtime will endeavour to ensure that arbitrary
+delays don't occur.  In GHC, an exception can only be raised when a
+thread reaches a /safe point/, where a safe point is where memory
+allocation occurs.  Some loops do not perform any memory allocation
+inside the loop and therefore cannot be interrupted by a 'throwTo'.
+
+Blocked 'throwTo' is fair: if multiple threads are trying to throw an
+exception to the same target thread, they will succeed in FIFO order.
+
+  -}
 throwTo :: Exception e => ThreadId -> e -> IO ()
 throwTo (ThreadId tid) ex = IO $ \ s ->
    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
 throwTo :: Exception e => ThreadId -> e -> IO ()
 throwTo (ThreadId tid) ex = IO $ \ s ->
    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
@@ -340,8 +371,8 @@ Other applications like the graphical Concurrent Haskell Debugger
 
 labelThread :: ThreadId -> String -> IO ()
 labelThread (ThreadId t) str = IO $ \ s ->
 
 labelThread :: ThreadId -> String -> IO ()
 labelThread (ThreadId t) str = IO $ \ s ->
-   let ps  = packCString# str
-       adr = byteArrayContents# ps in
+   let !ps  = packCString# str
+       !adr = byteArrayContents# ps in
      case (labelThread# t adr s) of s1 -> (# s1, () #)
 
 --      Nota Bene: 'pseq' used to be 'seq'
      case (labelThread# t adr s) of s1 -> (# s1, () #)
 
 --      Nota Bene: 'pseq' used to be 'seq'
@@ -363,6 +394,13 @@ pseq  x y = x `seq` lazy y
 par :: a -> b -> b
 par  x y = case (par# x) of { _ -> lazy y }
 
 par :: a -> b -> b
 par  x y = case (par# x) of { _ -> lazy y }
 
+-- | Internal function used by the RTS to run sparks.
+runSparks :: IO ()
+runSparks = IO loop
+  where loop s = case getSpark# s of
+                   (# s', n, p #) ->
+                      if n ==# 0# then (# s', () #)
+                                  else p `seq` loop s'
 
 data BlockReason
   = BlockedOnMVar
 
 data BlockReason
   = BlockedOnMVar
@@ -456,6 +494,10 @@ thenSTM (STM m) k = STM ( \s ->
 returnSTM :: a -> STM a
 returnSTM x = STM (\s -> (# s, x #))
 
 returnSTM :: a -> STM a
 returnSTM x = STM (\s -> (# s, x #))
 
+instance MonadPlus STM where
+  mzero = retry
+  mplus = orElse
+
 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
 -- dangerous thing to do.  
 --
 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
 -- dangerous thing to do.  
 --
@@ -526,7 +568,7 @@ checkInv (STM m) = STM (\s -> (check# m) s)
 -- of those points then the transaction violating it is aborted
 -- and the exception raised by the invariant is propagated.
 alwaysSucceeds :: STM a -> STM ()
 -- of those points then the transaction violating it is aborted
 -- and the exception raised by the invariant is propagated.
 alwaysSucceeds :: STM a -> STM ()
-alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
+alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) 
                       checkInv i
 
 -- | always is a variant of alwaysSucceeds in which the invariant is
                       checkInv i
 
 -- | always is a variant of alwaysSucceeds in which the invariant is
@@ -581,119 +623,27 @@ writeTVar (TVar tvar#) val = STM $ \s1# ->
   
 \end{code}
 
   
 \end{code}
 
-%************************************************************************
-%*                                                                      *
-\subsection[mvars]{M-Structures}
-%*                                                                      *
-%************************************************************************
-
-M-Vars are rendezvous points for concurrent threads.  They begin
-empty, and any attempt to read an empty M-Var blocks.  When an M-Var
-is written, a single blocked thread may be freed.  Reading an M-Var
-toggles its state from full back to empty.  Therefore, any value
-written to an M-Var may only be read once.  Multiple reads and writes
-are allowed, but there must be at least one read between any two
-writes.
+MVar utilities
 
 \begin{code}
 
 \begin{code}
---Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
-
--- |Create an 'MVar' which is initially empty.
-newEmptyMVar  :: IO (MVar a)
-newEmptyMVar = IO $ \ s# ->
-    case newMVar# s# of
-         (# s2#, svar# #) -> (# s2#, MVar svar# #)
-
--- |Create an 'MVar' which contains the supplied value.
-newMVar :: a -> IO (MVar a)
-newMVar value =
-    newEmptyMVar        >>= \ mvar ->
-    putMVar mvar value  >>
-    return mvar
-
--- |Return the contents of the 'MVar'.  If the 'MVar' is currently
--- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
--- the 'MVar' is left empty.
--- 
--- There are two further important properties of 'takeMVar':
---
---   * 'takeMVar' is single-wakeup.  That is, if there are multiple
---     threads blocked in 'takeMVar', and the 'MVar' becomes full,
---     only one thread will be woken up.  The runtime guarantees that
---     the woken thread completes its 'takeMVar' operation.
---
---   * When multiple threads are blocked on an 'MVar', they are
---     woken up in FIFO order.  This is useful for providing
---     fairness properties of abstractions built using 'MVar's.
---
-takeMVar :: MVar a -> IO a
-takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
-
--- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
--- 'putMVar' will wait until it becomes empty.
---
--- There are two further important properties of 'putMVar':
---
---   * 'putMVar' is single-wakeup.  That is, if there are multiple
---     threads blocked in 'putMVar', and the 'MVar' becomes empty,
---     only one thread will be woken up.  The runtime guarantees that
---     the woken thread completes its 'putMVar' operation.
---
---   * When multiple threads are blocked on an 'MVar', they are
---     woken up in FIFO order.  This is useful for providing
---     fairness properties of abstractions built using 'MVar's.
---
-putMVar  :: MVar a -> a -> IO ()
-putMVar (MVar mvar#) x = IO $ \ s# ->
-    case putMVar# mvar# x s# of
-        s2# -> (# s2#, () #)
-
--- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
--- returns immediately, with 'Nothing' if the 'MVar' was empty, or
--- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
--- the 'MVar' is left empty.
-tryTakeMVar :: MVar a -> IO (Maybe a)
-tryTakeMVar (MVar m) = IO $ \ s ->
-    case tryTakeMVar# m s of
-        (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
-        (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
-
--- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
--- attempts to put the value @a@ into the 'MVar', returning 'True' if
--- it was successful, or 'False' otherwise.
-tryPutMVar  :: MVar a -> a -> IO Bool
-tryPutMVar (MVar mvar#) x = IO $ \ s# ->
-    case tryPutMVar# mvar# x s# of
-        (# s, 0# #) -> (# s, False #)
-        (# s, _  #) -> (# s, True #)
-
--- |Check whether a given 'MVar' is empty.
---
--- Notice that the boolean value returned  is just a snapshot of
--- the state of the MVar. By the time you get to react on its result,
--- the MVar may have been filled (or emptied) - so be extremely
--- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
-isEmptyMVar :: MVar a -> IO Bool
-isEmptyMVar (MVar mv#) = IO $ \ s# -> 
-    case isEmptyMVar# mv# s# of
-        (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
-
--- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
--- "System.Mem.Weak" for more about finalizers.
-addMVarFinalizer :: MVar a -> IO () -> IO ()
-addMVarFinalizer (MVar m) finalizer = 
-  IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
-
 withMVar :: MVar a -> (a -> IO b) -> IO b
 withMVar m io = 
 withMVar :: MVar a -> (a -> IO b) -> IO b
 withMVar m io = 
-  block $ do
+  mask $ \restore -> do
     a <- takeMVar m
     a <- takeMVar m
-    b <- catchAny (unblock (io a))
+    b <- catchAny (restore (io a))
             (\e -> do putMVar m a; throw e)
     putMVar m a
     return b
             (\e -> do putMVar m a; throw e)
     putMVar m a
     return b
-\end{code}
 
 
+modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
+modifyMVar_ m io =
+  mask $ \restore -> do
+    a <- takeMVar m
+    a' <- catchAny (restore (io a))
+            (\e -> do putMVar m a; throw e)
+    putMVar m a'
+    return ()
+\end{code}
 
 %************************************************************************
 %*                                                                      *
 
 %************************************************************************
 %*                                                                      *
@@ -830,23 +780,6 @@ calculateTarget usecs = do
 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
 -- by not having to check for completed IO requests.
 
 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
 -- by not having to check for completed IO requests.
 
--- Issues, possible problems:
---
---      - we might want bound threads to just do the blocking
---        operation rather than communicating with the IO manager
---        thread.  This would prevent simgle-threaded programs which do
---        IO from requiring multiple OS threads.  However, it would also
---        prevent bound threads waiting on IO from being killed or sent
---        exceptions.
---
---      - Apprently exec() doesn't work on Linux in a multithreaded program.
---        I couldn't repeat this.
---
---      - How do we handle signal delivery in the multithreaded RTS?
---
---      - forkProcess will kill the IO manager thread.  Let's just
---        hope we don't need to do any blocking IO between fork & exec.
-
 #ifndef mingw32_HOST_OS
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
 #ifndef mingw32_HOST_OS
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
@@ -858,25 +791,52 @@ data DelayReq
   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
 
 #ifndef mingw32_HOST_OS
   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
 
 #ifndef mingw32_HOST_OS
+{-# NOINLINE pendingEvents #-}
 pendingEvents :: IORef [IOReq]
 pendingEvents :: IORef [IOReq]
+pendingEvents = unsafePerformIO $ do
+   m <- newIORef []
+   sharedCAF m getOrSetGHCConcPendingEventsStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
+    getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
 #endif
 #endif
-pendingDelays :: IORef [DelayReq]
-        -- could use a strict list or array here
-{-# NOINLINE pendingEvents #-}
+
 {-# NOINLINE pendingDelays #-}
 {-# NOINLINE pendingDelays #-}
-(pendingEvents,pendingDelays) = unsafePerformIO $ do
-  startIOManagerThread
-  reqs <- newIORef []
-  dels <- newIORef []
-  return (reqs, dels)
-        -- the first time we schedule an IO request, the service thread
-        -- will be created (cool, huh?)
+pendingDelays :: IORef [DelayReq]
+pendingDelays = unsafePerformIO $ do
+   m <- newIORef []
+   sharedCAF m getOrSetGHCConcPendingDelaysStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
+    getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ do
+   m <- newMVar Nothing
+   sharedCAF m getOrSetGHCConcIOManagerThreadStore
+
+foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
+    getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
 
 ensureIOManagerIsRunning :: IO ()
 ensureIOManagerIsRunning 
 
 ensureIOManagerIsRunning :: IO ()
 ensureIOManagerIsRunning 
-  | threaded  = seq pendingEvents $ return ()
+  | threaded  = startIOManagerThread
   | otherwise = return ()
 
   | otherwise = return ()
 
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+  modifyMVar_ ioManagerThread $ \old -> do
+    let create = do t <- forkIO ioManager; return (Just t)
+    case old of
+      Nothing -> create
+      Just t  -> do
+        s <- threadStatus t
+        case s of
+          ThreadFinished -> create
+          ThreadDied     -> create
+          _other         -> return (Just t)
+
 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
 insertDelay d [] = [d]
 insertDelay d1 ds@(d2 : rest)
 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
 insertDelay d [] = [d]
 insertDelay d1 ds@(d2 : rest)
@@ -889,31 +849,52 @@ delayTime (DelaySTM t _) = t
 
 type USecs = Word64
 
 
 type USecs = Word64
 
--- XXX: move into GHC.IOBase from Data.IORef?
-atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
-atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-
 foreign import ccall unsafe "getUSecOfDay" 
   getUSecOfDay :: IO USecs
 
 foreign import ccall unsafe "getUSecOfDay" 
   getUSecOfDay :: IO USecs
 
-prodding :: IORef Bool
 {-# NOINLINE prodding #-}
 {-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newIORef False)
+prodding :: IORef Bool
+prodding = unsafePerformIO $ do
+   r <- newIORef False
+   sharedCAF r getOrSetGHCConcProddingStore
+
+foreign import ccall unsafe "getOrSetGHCConcProddingStore"
+    getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
 
 prodServiceThread :: IO ()
 prodServiceThread = do
 
 prodServiceThread :: IO ()
 prodServiceThread = do
-  was_set <- atomicModifyIORef prodding (\a -> (True,a))
-  if (not (was_set)) then wakeupIOManager else return ()
+  -- NB. use atomicModifyIORef here, otherwise there are race
+  -- conditions in which prodding is left at True but the server is
+  -- blocked in select().
+  was_set <- atomicModifyIORef prodding $ \b -> (True,b)
+  unless was_set wakeupIOManager
+
+-- Machinery needed to ensure that we only have one copy of certain
+-- CAFs in this module even when the base package is present twice, as
+-- it is when base is dynamically loaded into GHCi.  The RTS keeps
+-- track of the single true value of the CAF, so even when the CAFs in
+-- the dynamically-loaded base package are reverted, nothing bad
+-- happens.
+--
+sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
+sharedCAF a get_or_set =
+   mask_ $ do
+     stable_ref <- newStablePtr a
+     let ref = castPtr (castStablePtrToPtr stable_ref)
+     ref2 <- get_or_set ref
+     if ref==ref2
+        then return a
+        else do freeStablePtr stable_ref
+                deRefStablePtr (castPtrToStablePtr (castPtr ref2))
 
 #ifdef mingw32_HOST_OS
 -- ----------------------------------------------------------------------------
 -- Windows IO manager thread
 
 
 #ifdef mingw32_HOST_OS
 -- ----------------------------------------------------------------------------
 -- Windows IO manager thread
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
   wakeup <- c_getIOManagerEvent
   wakeup <- c_getIOManagerEvent
-  forkIO $ service_loop wakeup []
-  return ()
+  service_loop wakeup []
 
 service_loop :: HANDLE          -- read end of pipe
              -> [DelayReq]      -- current delay requests
 
 service_loop :: HANDLE          -- read end of pipe
              -> [DelayReq]      -- current delay requests
@@ -938,15 +919,14 @@ service_loop wakeup old_delays = do
                 _ | r2 == io_MANAGER_DIE    -> return True
                 0 -> return False -- spurious wakeup
                 _ -> do start_console_handler (r2 `shiftR` 1); return False
                 _ | r2 == io_MANAGER_DIE    -> return True
                 0 -> return False -- spurious wakeup
                 _ -> do start_console_handler (r2 `shiftR` 1); return False
-        if exit
-          then return ()
-          else service_cont wakeup delays'
+        unless exit $ service_cont wakeup delays'
 
     _other -> service_cont wakeup delays' -- probably timeout        
 
 service_cont :: HANDLE -> [DelayReq] -> IO ()
 service_cont wakeup delays = do
 
     _other -> service_cont wakeup delays' -- probably timeout        
 
 service_cont :: HANDLE -> [DelayReq] -> IO ()
 service_cont wakeup delays = do
-  atomicModifyIORef prodding (\_ -> (False,False))
+  r <- atomicModifyIORef prodding (\_ -> (False,False))
+  r `seq` return () -- avoid space leak
   service_loop wakeup delays
 
 -- must agree with rts/win32/ThrIOManager.c
   service_loop wakeup delays
 
 -- must agree with rts/win32/ThrIOManager.c
@@ -967,7 +947,7 @@ start_console_handler :: Word32 -> IO ()
 start_console_handler r =
   case toWin32ConsoleEvent r of
      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
 start_console_handler r =
   case toWin32ConsoleEvent r of
      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
-                    forkIO (handler x)
+                    _ <- forkIO (handler x)
                     return ()
      Nothing -> return ()
 
                     return ()
      Nothing -> return ()
 
@@ -984,15 +964,8 @@ toWin32ConsoleEvent ev =
 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
 
 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
 
--- XXX Is this actually needed?
-stick :: IORef HANDLE
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef nullPtr)
-
 wakeupIOManager :: IO ()
 wakeupIOManager :: IO ()
-wakeupIOManager = do 
-  _hdl <- readIORef stick
-  c_sendIOManagerEvent io_MANAGER_WAKEUP
+wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
 
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
 
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
@@ -1041,19 +1014,21 @@ foreign import stdcall "WaitForSingleObject"
 -- ----------------------------------------------------------------------------
 -- Unix IO manager thread, using select()
 
 -- ----------------------------------------------------------------------------
 -- Unix IO manager thread, using select()
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
         allocaArray 2 $ \fds -> do
         allocaArray 2 $ \fds -> do
-        throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
+        throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
         rd_end <- peekElemOff fds 0
         wr_end <- peekElemOff fds 1
         rd_end <- peekElemOff fds 0
         wr_end <- peekElemOff fds 1
-        writeIORef stick (fromIntegral wr_end)
+        setNonBlockingFD wr_end True -- writes happen in a signal handler, we
+                                     -- don't want them to block.
+        setCloseOnExec rd_end
+        setCloseOnExec wr_end
         c_setIOManagerPipe wr_end
         c_setIOManagerPipe wr_end
-        forkIO $ do
-            allocaBytes sizeofFdSet   $ \readfds -> do
-            allocaBytes sizeofFdSet   $ \writefds -> do 
-            allocaBytes sizeofTimeVal $ \timeval -> do
-            service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+        allocaBytes sizeofFdSet   $ \readfds -> do
+        allocaBytes sizeofFdSet   $ \writefds -> do 
+        allocaBytes sizeofTimeVal $ \timeval -> do
+        service_loop (fromIntegral rd_end) readfds writefds timeval [] []
         return ()
 
 service_loop
         return ()
 
 service_loop
@@ -1066,6 +1041,17 @@ service_loop
    -> IO ()
 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
 
    -> IO ()
 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
 
+  -- reset prodding before we look at the new requests.  If a new
+  -- client arrives after this point they will send a wakup which will
+  -- cause the server to loop around again, so we can be sure to not
+  -- miss any requests.
+  --
+  -- NB. it's important to do this in the *first* iteration of
+  -- service_loop, rather than after calling select(), since a client
+  -- may have set prodding to True without sending a wakeup byte down
+  -- the pipe, because the pipe wasn't set up.
+  atomicModifyIORef prodding (\_ -> (False, ()))
+
   -- pick up new IO requests
   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
   let reqs = new_reqs ++ old_reqs
   -- pick up new IO requests
   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
   let reqs = new_reqs ++ old_reqs
@@ -1114,52 +1100,120 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
         if b == 0 
           then return False
           else alloca $ \p -> do 
         if b == 0 
           then return False
           else alloca $ \p -> do 
-                 c_read (fromIntegral wakeup) p 1; return ()
+                 warnErrnoIfMinus1_ "service_loop" $
+                     c_read (fromIntegral wakeup) p 1
                  s <- peek p            
                  case s of
                   _ | s == io_MANAGER_WAKEUP -> return False
                   _ | s == io_MANAGER_DIE    -> return True
                  s <- peek p            
                  case s of
                   _ | s == io_MANAGER_WAKEUP -> return False
                   _ | s == io_MANAGER_DIE    -> return True
-                  _ -> withMVar signalHandlerLock $ \_ -> do
-                          handler_tbl <- peek handlers
-                          sp <- peekElemOff handler_tbl (fromIntegral s)
-                          io <- deRefStablePtr sp
-                          forkIO io
-                          return False
-
-  if exit then return () else do
-
-  atomicModifyIORef prodding (\_ -> (False,False))
+                  _ | s == io_MANAGER_SYNC   -> do
+                       mvars <- readIORef sync
+                       mapM_ (flip putMVar ()) mvars
+                       return False
+                  _ -> do
+                       fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
+                       withForeignPtr fp $ \p_siginfo -> do
+                         r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
+                                 sizeof_siginfo_t
+                         when (r /= fromIntegral sizeof_siginfo_t) $
+                            error "failed to read siginfo_t"
+                       runHandlers' fp (fromIntegral s)
+                       return False
+
+  unless exit $ do
 
   reqs' <- if wakeup_all then do wakeupAll reqs; return []
                          else completeRequests reqs readfds writefds []
 
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
 
   reqs' <- if wakeup_all then do wakeupAll reqs; return []
                          else completeRequests reqs readfds writefds []
 
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
-io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
+io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
 io_MANAGER_WAKEUP = 0xff
 io_MANAGER_DIE    = 0xfe
 io_MANAGER_WAKEUP = 0xff
 io_MANAGER_DIE    = 0xfe
+io_MANAGER_SYNC   = 0xfd
 
 
-stick :: IORef Fd
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef 0)
+{-# NOINLINE sync #-}
+sync :: IORef [MVar ()]
+sync = unsafePerformIO (newIORef [])
 
 
-wakeupIOManager :: IO ()
-wakeupIOManager = do
-  fd <- readIORef stick
-  with io_MANAGER_WAKEUP $ \pbuf -> do 
-    c_write (fromIntegral fd) pbuf 1; return ()
-
--- Lock used to protect concurrent access to signal_handlers.  Symptom of
--- this race condition is #1922, although that bug was on Windows a similar
--- bug also exists on Unix.
-signalHandlerLock :: MVar ()
-signalHandlerLock = unsafePerformIO (newMVar ())
+-- waits for the IO manager to drain the pipe
+syncIOManager :: IO ()
+syncIOManager = do
+  m <- newEmptyMVar
+  atomicModifyIORef sync (\old -> (m:old,()))
+  c_ioManagerSync
+  takeMVar m
 
 
-foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
+foreign import ccall unsafe "ioManagerSync"   c_ioManagerSync :: IO ()
+foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
+
+-- For the non-threaded RTS
+runHandlers :: Ptr Word8 -> Int -> IO ()
+runHandlers p_info sig = do
+  fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
+  withForeignPtr fp $ \p -> do
+    copyBytes p p_info (fromIntegral sizeof_siginfo_t)
+    free p_info
+  runHandlers' fp (fromIntegral sig)
+
+runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
+runHandlers' p_info sig = do
+  let int = fromIntegral sig
+  withMVar signal_handlers $ \arr ->
+      if not (inRange (boundsIOArray arr) int)
+         then return ()
+         else do handler <- unsafeReadIOArray arr int
+                 case handler of
+                    Nothing -> return ()
+                    Just (f,_)  -> do _ <- forkIO (f p_info)
+                                      return ()
+
+warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
+warnErrnoIfMinus1_ what io
+    = do r <- io
+         when (r == -1) $ do
+             errno <- getErrno
+             str <- strerror errno >>= peekCString
+             when (r == -1) $
+                 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
+
+foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
 
 foreign import ccall "setIOManagerPipe"
   c_setIOManagerPipe :: CInt -> IO ()
 
 
 foreign import ccall "setIOManagerPipe"
   c_setIOManagerPipe :: CInt -> IO ()
 
+foreign import ccall "__hscore_sizeof_siginfo_t"
+  sizeof_siginfo_t :: CSize
+
+type Signal = CInt
+
+maxSig = 64 :: Int
+
+type HandlerFun = ForeignPtr Word8 -> IO ()
+
+-- Lock used to protect concurrent access to signal_handlers.  Symptom of
+-- this race condition is #1922, although that bug was on Windows a similar
+-- bug also exists on Unix.
+{-# NOINLINE signal_handlers #-}
+signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
+signal_handlers = unsafePerformIO $ do
+   arr <- newIOArray (0,maxSig) Nothing
+   m <- newMVar arr
+   sharedCAF m getOrSetGHCConcSignalHandlerStore
+
+foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
+    getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
+
+setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
+setHandler sig handler = do
+  let int = fromIntegral sig
+  withMVar signal_handlers $ \arr -> 
+     if not (inRange (boundsIOArray arr) int)
+        then error "GHC.Conc.setHandler: signal out of range"
+        else do old <- unsafeReadIOArray arr int
+                unsafeWriteIOArray arr int handler
+                return old
+
 -- -----------------------------------------------------------------------------
 -- IO requests
 
 -- -----------------------------------------------------------------------------
 -- IO requests
 
@@ -1250,7 +1304,7 @@ foreign import ccall unsafe "setTimevalTicks"
 
 data CFdSet
 
 
 data CFdSet
 
-foreign import ccall safe "select"
+foreign import ccall safe "__hscore_select"
   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
@@ -1280,14 +1334,13 @@ foreign import ccall unsafe "sizeof_fd_set"
 
 #endif
 
 
 #endif
 
-reportStackOverflow :: IO a
-reportStackOverflow = do callStackOverflowHook; return undefined
+reportStackOverflow :: IO ()
+reportStackOverflow = callStackOverflowHook
 
 
-reportError :: SomeException -> IO a
+reportError :: SomeException -> IO ()
 reportError ex = do
    handler <- getUncaughtExceptionHandler
    handler ex
 reportError ex = do
    handler <- getUncaughtExceptionHandler
    handler ex
-   return undefined
 
 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
 -- the unsafe below.
 
 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
 -- the unsafe below.
@@ -1320,4 +1373,5 @@ setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
 
 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
 
 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
+
 \end{code}
 \end{code}