add numSparks :: IO Int (#4167)
[ghc-base.git] / GHC / Conc.lhs
index 96e00e0..0d17457 100644 (file)
@@ -29,14 +29,18 @@ module GHC.Conc
 
         -- * Forking and suchlike
         , forkIO        -- :: IO a -> IO ThreadId
+        , forkIOUnmasked
         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
+        , forkOnIOUnmasked
         , numCapabilities -- :: Int
+        , numSparks      -- :: IO Int
         , childHandler  -- :: Exception -> IO ()
         , myThreadId    -- :: IO ThreadId
         , killThread    -- :: ThreadId -> IO ()
         , throwTo       -- :: ThreadId -> Exception -> IO ()
         , par           -- :: a -> b -> b
         , pseq          -- :: a -> b -> b
+        , runSparks
         , yield         -- :: IO ()
         , labelThread   -- :: ThreadId -> String -> IO ()
 
@@ -49,17 +53,6 @@ module GHC.Conc
         , threadWaitRead        -- :: Int -> IO ()
         , threadWaitWrite       -- :: Int -> IO ()
 
-        -- * MVars
-        , MVar(..)
-        , newMVar       -- :: a -> IO (MVar a)
-        , newEmptyMVar  -- :: IO (MVar a)
-        , takeMVar      -- :: MVar a -> IO a
-        , putMVar       -- :: MVar a -> a -> IO ()
-        , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
-        , tryPutMVar    -- :: MVar a -> a -> IO Bool
-        , isEmptyMVar   -- :: MVar a -> IO Bool
-        , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
-
         -- * TVars
         , STM(..)
         , atomically    -- :: STM a -> IO a
@@ -77,6 +70,7 @@ module GHC.Conc
         , unsafeIOToSTM -- :: IO a -> STM a
 
         -- * Miscellaneous
+        , withMVar
 #ifdef mingw32_HOST_OS
         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
@@ -87,10 +81,13 @@ module GHC.Conc
 #endif
 
 #ifndef mingw32_HOST_OS
-        , signalHandlerLock
+        , Signal, HandlerFun, setHandler, runHandlers
 #endif
 
         , ensureIOManagerIsRunning
+#ifndef mingw32_HOST_OS
+        , syncIOManager
+#endif
 
 #ifdef mingw32_HOST_OS
         , ConsoleEvent(..)
@@ -110,28 +107,43 @@ import System.Posix.Internals
 import Foreign
 import Foreign.C
 
+#ifdef mingw32_HOST_OS
+import Data.Typeable
+#endif
+
+#ifndef mingw32_HOST_OS
+import Data.Dynamic
+#endif
+import Control.Monad
 import Data.Maybe
 
 import GHC.Base
-import {-# SOURCE #-} GHC.Handle
-import GHC.IOBase
+#ifndef mingw32_HOST_OS
+import GHC.Debug
+#endif
+import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
+import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
+import GHC.IO
+import GHC.IO.Exception
+import GHC.Exception
+import GHC.IORef
+import GHC.MVar
 import GHC.Num          ( Num(..) )
 import GHC.Real         ( fromIntegral )
+#ifndef mingw32_HOST_OS
+import GHC.IOArray
+import GHC.Arr          ( inRange )
+#endif
 #ifdef mingw32_HOST_OS
 import GHC.Real         ( div )
-import GHC.Ptr          ( plusPtr, FunPtr(..) )
+import GHC.Ptr
 #endif
 #ifdef mingw32_HOST_OS
 import GHC.Read         ( Read )
 import GHC.Enum         ( Enum )
 #endif
-import GHC.Exception    ( SomeException(..), throw )
 import GHC.Pack         ( packCString# )
-import GHC.Ptr          ( Ptr(..) )
-import GHC.STRef
 import GHC.Show         ( Show(..), showString )
-import Data.Typeable
-import GHC.Err
 
 infixr 0 `par`, `pseq`
 \end{code}
@@ -202,11 +214,11 @@ thread.
 The new thread will be a lightweight thread; if you want to use a foreign
 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
 
-GHC note: the new thread inherits the /blocked/ state of the parent 
-(see 'Control.Exception.block').
+GHC note: the new thread inherits the /masked/ state of the parent 
+(see 'Control.Exception.mask').
 
 The newly created thread has an exception handler that discards the
-exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
+exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and
 'ThreadKilled', and passes all other exceptions to the uncaught
 exception handler (see 'setUncaughtExceptionHandler').
 -}
@@ -216,6 +228,11 @@ forkIO action = IO $ \ s ->
  where
   action_plus = catchException action childHandler
 
+-- | Like 'forkIO', but the child thread is created with asynchronous exceptions
+-- unmasked (see 'Control.Exception.mask').
+forkIOUnmasked :: IO () -> IO ThreadId
+forkIOUnmasked io = forkIO (unsafeUnmask io)
+
 {- |
 Like 'forkIO', but lets you specify on which CPU the thread is
 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
@@ -235,6 +252,11 @@ forkOnIO (I# cpu) action = IO $ \ s ->
  where
   action_plus = catchException action childHandler
 
+-- | Like 'forkOnIO', but the child thread is created with
+-- asynchronous exceptions unmasked (see 'Control.Exception.mask').
+forkOnIOUnmasked :: Int -> IO () -> IO ThreadId
+forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io)
+
 -- | the value passed to the @+RTS -N@ flag.  This is the number of
 -- Haskell threads that can run truly simultaneously at any given
 -- time, and is typically set to the number of physical CPU cores on
@@ -244,6 +266,10 @@ numCapabilities = unsafePerformIO $  do
                     n <- peek n_capabilities
                     return (fromIntegral n)
 
+-- | Returns the number of sparks currently in the local spark pool
+numSparks :: IO Int
+numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #)
+
 #if defined(mingw32_HOST_OS) && defined(__PIC__)
 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
 #else
@@ -256,9 +282,9 @@ real_handler :: SomeException -> IO ()
 real_handler se@(SomeException ex) =
   -- ignore thread GC and killThread exceptions:
   case cast ex of
-  Just BlockedOnDeadMVar                -> return ()
+  Just BlockedIndefinitelyOnMVar        -> return ()
   _ -> case cast ex of
-       Just BlockedIndefinitely         -> return ()
+       Just BlockedIndefinitelyOnSTM    -> return ()
        _ -> case cast ex of
             Just ThreadKilled           -> return ()
             _ -> case cast ex of
@@ -266,16 +292,11 @@ real_handler se@(SomeException ex) =
                  Just StackOverflow     -> reportStackOverflow
                  _                      -> reportError se
 
-{- | 'killThread' terminates the given thread (GHC only).
-Any work already done by the thread isn\'t
-lost: the computation is suspended until required by another thread.
-The memory used by the thread will be garbage collected if it isn\'t
-referenced from anywhere.  The 'killThread' function is defined in
-terms of 'throwTo':
+{- | 'killThread' raises the 'ThreadKilled' exception in the given
+thread (GHC only). 
 
 > killThread tid = throwTo tid ThreadKilled
 
-Killthread is a no-op if the target thread has already completed.
 -}
 killThread :: ThreadId -> IO ()
 killThread tid = throwTo tid ThreadKilled
@@ -290,26 +311,36 @@ when dealing with race conditions: eg. if there are two threads that
 can kill each other, it is guaranteed that only one of the threads
 will get to kill the other.
 
+Whatever work the target thread was doing when the exception was
+raised is not lost: the computation is suspended until required by
+another thread.
+
 If the target thread is currently making a foreign call, then the
 exception will not be raised (and hence 'throwTo' will not return)
 until the call has completed.  This is the case regardless of whether
-the call is inside a 'block' or not.
+the call is inside a 'mask' or not.
 
 Important note: the behaviour of 'throwTo' differs from that described in
 the paper \"Asynchronous exceptions in Haskell\"
 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
 a more synchronous design in which 'throwTo' does not return until the exception
-is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
-Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
-the paper).
-
-There is currently no guarantee that the exception delivered by 'throwTo' will be
-delivered at the first possible opportunity.  In particular, if a thread may 
-unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
-a pending 'throwTo'.  This is arguably undesirable behaviour.
-
- -}
+is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
+Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
+the paper).  Unlike other interruptible operations, however, 'throwTo'
+is /always/ interruptible, even if it does not actually block.
+
+There is no guarantee that the exception will be delivered promptly,
+although the runtime will endeavour to ensure that arbitrary
+delays don't occur.  In GHC, an exception can only be raised when a
+thread reaches a /safe point/, where a safe point is where memory
+allocation occurs.  Some loops do not perform any memory allocation
+inside the loop and therefore cannot be interrupted by a 'throwTo'.
+
+Blocked 'throwTo' is fair: if multiple threads are trying to throw an
+exception to the same target thread, they will succeed in FIFO order.
+
+  -}
 throwTo :: Exception e => ThreadId -> e -> IO ()
 throwTo (ThreadId tid) ex = IO $ \ s ->
    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
@@ -340,8 +371,8 @@ Other applications like the graphical Concurrent Haskell Debugger
 
 labelThread :: ThreadId -> String -> IO ()
 labelThread (ThreadId t) str = IO $ \ s ->
-   let ps  = packCString# str
-       adr = byteArrayContents# ps in
+   let !ps  = packCString# str
+       !adr = byteArrayContents# ps in
      case (labelThread# t adr s) of s1 -> (# s1, () #)
 
 --      Nota Bene: 'pseq' used to be 'seq'
@@ -363,6 +394,13 @@ pseq  x y = x `seq` lazy y
 par :: a -> b -> b
 par  x y = case (par# x) of { _ -> lazy y }
 
+-- | Internal function used by the RTS to run sparks.
+runSparks :: IO ()
+runSparks = IO loop
+  where loop s = case getSpark# s of
+                   (# s', n, p #) ->
+                      if n ==# 0# then (# s', () #)
+                                  else p `seq` loop s'
 
 data BlockReason
   = BlockedOnMVar
@@ -456,6 +494,10 @@ thenSTM (STM m) k = STM ( \s ->
 returnSTM :: a -> STM a
 returnSTM x = STM (\s -> (# s, x #))
 
+instance MonadPlus STM where
+  mzero = retry
+  mplus = orElse
+
 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
 -- dangerous thing to do.  
 --
@@ -526,7 +568,7 @@ checkInv (STM m) = STM (\s -> (check# m) s)
 -- of those points then the transaction violating it is aborted
 -- and the exception raised by the invariant is propagated.
 alwaysSucceeds :: STM a -> STM ()
-alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
+alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) 
                       checkInv i
 
 -- | always is a variant of alwaysSucceeds in which the invariant is
@@ -581,119 +623,27 @@ writeTVar (TVar tvar#) val = STM $ \s1# ->
   
 \end{code}
 
-%************************************************************************
-%*                                                                      *
-\subsection[mvars]{M-Structures}
-%*                                                                      *
-%************************************************************************
-
-M-Vars are rendezvous points for concurrent threads.  They begin
-empty, and any attempt to read an empty M-Var blocks.  When an M-Var
-is written, a single blocked thread may be freed.  Reading an M-Var
-toggles its state from full back to empty.  Therefore, any value
-written to an M-Var may only be read once.  Multiple reads and writes
-are allowed, but there must be at least one read between any two
-writes.
+MVar utilities
 
 \begin{code}
---Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
-
--- |Create an 'MVar' which is initially empty.
-newEmptyMVar  :: IO (MVar a)
-newEmptyMVar = IO $ \ s# ->
-    case newMVar# s# of
-         (# s2#, svar# #) -> (# s2#, MVar svar# #)
-
--- |Create an 'MVar' which contains the supplied value.
-newMVar :: a -> IO (MVar a)
-newMVar value =
-    newEmptyMVar        >>= \ mvar ->
-    putMVar mvar value  >>
-    return mvar
-
--- |Return the contents of the 'MVar'.  If the 'MVar' is currently
--- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
--- the 'MVar' is left empty.
--- 
--- There are two further important properties of 'takeMVar':
---
---   * 'takeMVar' is single-wakeup.  That is, if there are multiple
---     threads blocked in 'takeMVar', and the 'MVar' becomes full,
---     only one thread will be woken up.  The runtime guarantees that
---     the woken thread completes its 'takeMVar' operation.
---
---   * When multiple threads are blocked on an 'MVar', they are
---     woken up in FIFO order.  This is useful for providing
---     fairness properties of abstractions built using 'MVar's.
---
-takeMVar :: MVar a -> IO a
-takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
-
--- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
--- 'putMVar' will wait until it becomes empty.
---
--- There are two further important properties of 'putMVar':
---
---   * 'putMVar' is single-wakeup.  That is, if there are multiple
---     threads blocked in 'putMVar', and the 'MVar' becomes empty,
---     only one thread will be woken up.  The runtime guarantees that
---     the woken thread completes its 'putMVar' operation.
---
---   * When multiple threads are blocked on an 'MVar', they are
---     woken up in FIFO order.  This is useful for providing
---     fairness properties of abstractions built using 'MVar's.
---
-putMVar  :: MVar a -> a -> IO ()
-putMVar (MVar mvar#) x = IO $ \ s# ->
-    case putMVar# mvar# x s# of
-        s2# -> (# s2#, () #)
-
--- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
--- returns immediately, with 'Nothing' if the 'MVar' was empty, or
--- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
--- the 'MVar' is left empty.
-tryTakeMVar :: MVar a -> IO (Maybe a)
-tryTakeMVar (MVar m) = IO $ \ s ->
-    case tryTakeMVar# m s of
-        (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
-        (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
-
--- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
--- attempts to put the value @a@ into the 'MVar', returning 'True' if
--- it was successful, or 'False' otherwise.
-tryPutMVar  :: MVar a -> a -> IO Bool
-tryPutMVar (MVar mvar#) x = IO $ \ s# ->
-    case tryPutMVar# mvar# x s# of
-        (# s, 0# #) -> (# s, False #)
-        (# s, _  #) -> (# s, True #)
-
--- |Check whether a given 'MVar' is empty.
---
--- Notice that the boolean value returned  is just a snapshot of
--- the state of the MVar. By the time you get to react on its result,
--- the MVar may have been filled (or emptied) - so be extremely
--- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
-isEmptyMVar :: MVar a -> IO Bool
-isEmptyMVar (MVar mv#) = IO $ \ s# -> 
-    case isEmptyMVar# mv# s# of
-        (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
-
--- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
--- "System.Mem.Weak" for more about finalizers.
-addMVarFinalizer :: MVar a -> IO () -> IO ()
-addMVarFinalizer (MVar m) finalizer = 
-  IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
-
 withMVar :: MVar a -> (a -> IO b) -> IO b
 withMVar m io = 
-  block $ do
+  mask $ \restore -> do
     a <- takeMVar m
-    b <- catchAny (unblock (io a))
+    b <- catchAny (restore (io a))
             (\e -> do putMVar m a; throw e)
     putMVar m a
     return b
-\end{code}
 
+modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
+modifyMVar_ m io =
+  mask $ \restore -> do
+    a <- takeMVar m
+    a' <- catchAny (restore (io a))
+            (\e -> do putMVar m a; throw e)
+    putMVar m a'
+    return ()
+\end{code}
 
 %************************************************************************
 %*                                                                      *
@@ -830,23 +780,6 @@ calculateTarget usecs = do
 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
 -- by not having to check for completed IO requests.
 
--- Issues, possible problems:
---
---      - we might want bound threads to just do the blocking
---        operation rather than communicating with the IO manager
---        thread.  This would prevent simgle-threaded programs which do
---        IO from requiring multiple OS threads.  However, it would also
---        prevent bound threads waiting on IO from being killed or sent
---        exceptions.
---
---      - Apprently exec() doesn't work on Linux in a multithreaded program.
---        I couldn't repeat this.
---
---      - How do we handle signal delivery in the multithreaded RTS?
---
---      - forkProcess will kill the IO manager thread.  Let's just
---        hope we don't need to do any blocking IO between fork & exec.
-
 #ifndef mingw32_HOST_OS
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
@@ -858,25 +791,52 @@ data DelayReq
   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
 
 #ifndef mingw32_HOST_OS
+{-# NOINLINE pendingEvents #-}
 pendingEvents :: IORef [IOReq]
+pendingEvents = unsafePerformIO $ do
+   m <- newIORef []
+   sharedCAF m getOrSetGHCConcPendingEventsStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
+    getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
 #endif
-pendingDelays :: IORef [DelayReq]
-        -- could use a strict list or array here
-{-# NOINLINE pendingEvents #-}
+
 {-# NOINLINE pendingDelays #-}
-(pendingEvents,pendingDelays) = unsafePerformIO $ do
-  startIOManagerThread
-  reqs <- newIORef []
-  dels <- newIORef []
-  return (reqs, dels)
-        -- the first time we schedule an IO request, the service thread
-        -- will be created (cool, huh?)
+pendingDelays :: IORef [DelayReq]
+pendingDelays = unsafePerformIO $ do
+   m <- newIORef []
+   sharedCAF m getOrSetGHCConcPendingDelaysStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
+    getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ do
+   m <- newMVar Nothing
+   sharedCAF m getOrSetGHCConcIOManagerThreadStore
+
+foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
+    getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
 
 ensureIOManagerIsRunning :: IO ()
 ensureIOManagerIsRunning 
-  | threaded  = seq pendingEvents $ return ()
+  | threaded  = startIOManagerThread
   | otherwise = return ()
 
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+  modifyMVar_ ioManagerThread $ \old -> do
+    let create = do t <- forkIO ioManager; return (Just t)
+    case old of
+      Nothing -> create
+      Just t  -> do
+        s <- threadStatus t
+        case s of
+          ThreadFinished -> create
+          ThreadDied     -> create
+          _other         -> return (Just t)
+
 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
 insertDelay d [] = [d]
 insertDelay d1 ds@(d2 : rest)
@@ -889,31 +849,52 @@ delayTime (DelaySTM t _) = t
 
 type USecs = Word64
 
--- XXX: move into GHC.IOBase from Data.IORef?
-atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
-atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-
 foreign import ccall unsafe "getUSecOfDay" 
   getUSecOfDay :: IO USecs
 
-prodding :: IORef Bool
 {-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newIORef False)
+prodding :: IORef Bool
+prodding = unsafePerformIO $ do
+   r <- newIORef False
+   sharedCAF r getOrSetGHCConcProddingStore
+
+foreign import ccall unsafe "getOrSetGHCConcProddingStore"
+    getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
 
 prodServiceThread :: IO ()
 prodServiceThread = do
-  was_set <- atomicModifyIORef prodding (\a -> (True,a))
-  if (not (was_set)) then wakeupIOManager else return ()
+  -- NB. use atomicModifyIORef here, otherwise there are race
+  -- conditions in which prodding is left at True but the server is
+  -- blocked in select().
+  was_set <- atomicModifyIORef prodding $ \b -> (True,b)
+  unless was_set wakeupIOManager
+
+-- Machinery needed to ensure that we only have one copy of certain
+-- CAFs in this module even when the base package is present twice, as
+-- it is when base is dynamically loaded into GHCi.  The RTS keeps
+-- track of the single true value of the CAF, so even when the CAFs in
+-- the dynamically-loaded base package are reverted, nothing bad
+-- happens.
+--
+sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
+sharedCAF a get_or_set =
+   mask_ $ do
+     stable_ref <- newStablePtr a
+     let ref = castPtr (castStablePtrToPtr stable_ref)
+     ref2 <- get_or_set ref
+     if ref==ref2
+        then return a
+        else do freeStablePtr stable_ref
+                deRefStablePtr (castPtrToStablePtr (castPtr ref2))
 
 #ifdef mingw32_HOST_OS
 -- ----------------------------------------------------------------------------
 -- Windows IO manager thread
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
   wakeup <- c_getIOManagerEvent
-  forkIO $ service_loop wakeup []
-  return ()
+  service_loop wakeup []
 
 service_loop :: HANDLE          -- read end of pipe
              -> [DelayReq]      -- current delay requests
@@ -938,15 +919,14 @@ service_loop wakeup old_delays = do
                 _ | r2 == io_MANAGER_DIE    -> return True
                 0 -> return False -- spurious wakeup
                 _ -> do start_console_handler (r2 `shiftR` 1); return False
-        if exit
-          then return ()
-          else service_cont wakeup delays'
+        unless exit $ service_cont wakeup delays'
 
     _other -> service_cont wakeup delays' -- probably timeout        
 
 service_cont :: HANDLE -> [DelayReq] -> IO ()
 service_cont wakeup delays = do
-  atomicModifyIORef prodding (\_ -> (False,False))
+  r <- atomicModifyIORef prodding (\_ -> (False,False))
+  r `seq` return () -- avoid space leak
   service_loop wakeup delays
 
 -- must agree with rts/win32/ThrIOManager.c
@@ -967,7 +947,7 @@ start_console_handler :: Word32 -> IO ()
 start_console_handler r =
   case toWin32ConsoleEvent r of
      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
-                    forkIO (handler x)
+                    _ <- forkIO (handler x)
                     return ()
      Nothing -> return ()
 
@@ -984,15 +964,8 @@ toWin32ConsoleEvent ev =
 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
 
--- XXX Is this actually needed?
-stick :: IORef HANDLE
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef nullPtr)
-
 wakeupIOManager :: IO ()
-wakeupIOManager = do 
-  _hdl <- readIORef stick
-  c_sendIOManagerEvent io_MANAGER_WAKEUP
+wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
 
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
@@ -1041,19 +1014,21 @@ foreign import stdcall "WaitForSingleObject"
 -- ----------------------------------------------------------------------------
 -- Unix IO manager thread, using select()
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
         allocaArray 2 $ \fds -> do
-        throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
+        throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
         rd_end <- peekElemOff fds 0
         wr_end <- peekElemOff fds 1
-        writeIORef stick (fromIntegral wr_end)
+        setNonBlockingFD wr_end True -- writes happen in a signal handler, we
+                                     -- don't want them to block.
+        setCloseOnExec rd_end
+        setCloseOnExec wr_end
         c_setIOManagerPipe wr_end
-        forkIO $ do
-            allocaBytes sizeofFdSet   $ \readfds -> do
-            allocaBytes sizeofFdSet   $ \writefds -> do 
-            allocaBytes sizeofTimeVal $ \timeval -> do
-            service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+        allocaBytes sizeofFdSet   $ \readfds -> do
+        allocaBytes sizeofFdSet   $ \writefds -> do 
+        allocaBytes sizeofTimeVal $ \timeval -> do
+        service_loop (fromIntegral rd_end) readfds writefds timeval [] []
         return ()
 
 service_loop
@@ -1066,6 +1041,17 @@ service_loop
    -> IO ()
 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
 
+  -- reset prodding before we look at the new requests.  If a new
+  -- client arrives after this point they will send a wakup which will
+  -- cause the server to loop around again, so we can be sure to not
+  -- miss any requests.
+  --
+  -- NB. it's important to do this in the *first* iteration of
+  -- service_loop, rather than after calling select(), since a client
+  -- may have set prodding to True without sending a wakeup byte down
+  -- the pipe, because the pipe wasn't set up.
+  atomicModifyIORef prodding (\_ -> (False, ()))
+
   -- pick up new IO requests
   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
   let reqs = new_reqs ++ old_reqs
@@ -1114,52 +1100,120 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
         if b == 0 
           then return False
           else alloca $ \p -> do 
-                 c_read (fromIntegral wakeup) p 1; return ()
+                 warnErrnoIfMinus1_ "service_loop" $
+                     c_read (fromIntegral wakeup) p 1
                  s <- peek p            
                  case s of
                   _ | s == io_MANAGER_WAKEUP -> return False
                   _ | s == io_MANAGER_DIE    -> return True
-                  _ -> withMVar signalHandlerLock $ \_ -> do
-                          handler_tbl <- peek handlers
-                          sp <- peekElemOff handler_tbl (fromIntegral s)
-                          io <- deRefStablePtr sp
-                          forkIO io
-                          return False
-
-  if exit then return () else do
-
-  atomicModifyIORef prodding (\_ -> (False,False))
+                  _ | s == io_MANAGER_SYNC   -> do
+                       mvars <- readIORef sync
+                       mapM_ (flip putMVar ()) mvars
+                       return False
+                  _ -> do
+                       fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
+                       withForeignPtr fp $ \p_siginfo -> do
+                         r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
+                                 sizeof_siginfo_t
+                         when (r /= fromIntegral sizeof_siginfo_t) $
+                            error "failed to read siginfo_t"
+                       runHandlers' fp (fromIntegral s)
+                       return False
+
+  unless exit $ do
 
   reqs' <- if wakeup_all then do wakeupAll reqs; return []
                          else completeRequests reqs readfds writefds []
 
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
-io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
+io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
 io_MANAGER_WAKEUP = 0xff
 io_MANAGER_DIE    = 0xfe
+io_MANAGER_SYNC   = 0xfd
 
-stick :: IORef Fd
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef 0)
+{-# NOINLINE sync #-}
+sync :: IORef [MVar ()]
+sync = unsafePerformIO (newIORef [])
 
-wakeupIOManager :: IO ()
-wakeupIOManager = do
-  fd <- readIORef stick
-  with io_MANAGER_WAKEUP $ \pbuf -> do 
-    c_write (fromIntegral fd) pbuf 1; return ()
-
--- Lock used to protect concurrent access to signal_handlers.  Symptom of
--- this race condition is #1922, although that bug was on Windows a similar
--- bug also exists on Unix.
-signalHandlerLock :: MVar ()
-signalHandlerLock = unsafePerformIO (newMVar ())
+-- waits for the IO manager to drain the pipe
+syncIOManager :: IO ()
+syncIOManager = do
+  m <- newEmptyMVar
+  atomicModifyIORef sync (\old -> (m:old,()))
+  c_ioManagerSync
+  takeMVar m
 
-foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
+foreign import ccall unsafe "ioManagerSync"   c_ioManagerSync :: IO ()
+foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
+
+-- For the non-threaded RTS
+runHandlers :: Ptr Word8 -> Int -> IO ()
+runHandlers p_info sig = do
+  fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
+  withForeignPtr fp $ \p -> do
+    copyBytes p p_info (fromIntegral sizeof_siginfo_t)
+    free p_info
+  runHandlers' fp (fromIntegral sig)
+
+runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
+runHandlers' p_info sig = do
+  let int = fromIntegral sig
+  withMVar signal_handlers $ \arr ->
+      if not (inRange (boundsIOArray arr) int)
+         then return ()
+         else do handler <- unsafeReadIOArray arr int
+                 case handler of
+                    Nothing -> return ()
+                    Just (f,_)  -> do _ <- forkIO (f p_info)
+                                      return ()
+
+warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
+warnErrnoIfMinus1_ what io
+    = do r <- io
+         when (r == -1) $ do
+             errno <- getErrno
+             str <- strerror errno >>= peekCString
+             when (r == -1) $
+                 debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
+
+foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
 
 foreign import ccall "setIOManagerPipe"
   c_setIOManagerPipe :: CInt -> IO ()
 
+foreign import ccall "__hscore_sizeof_siginfo_t"
+  sizeof_siginfo_t :: CSize
+
+type Signal = CInt
+
+maxSig = 64 :: Int
+
+type HandlerFun = ForeignPtr Word8 -> IO ()
+
+-- Lock used to protect concurrent access to signal_handlers.  Symptom of
+-- this race condition is #1922, although that bug was on Windows a similar
+-- bug also exists on Unix.
+{-# NOINLINE signal_handlers #-}
+signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
+signal_handlers = unsafePerformIO $ do
+   arr <- newIOArray (0,maxSig) Nothing
+   m <- newMVar arr
+   sharedCAF m getOrSetGHCConcSignalHandlerStore
+
+foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
+    getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
+
+setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
+setHandler sig handler = do
+  let int = fromIntegral sig
+  withMVar signal_handlers $ \arr -> 
+     if not (inRange (boundsIOArray arr) int)
+        then error "GHC.Conc.setHandler: signal out of range"
+        else do old <- unsafeReadIOArray arr int
+                unsafeWriteIOArray arr int handler
+                return old
+
 -- -----------------------------------------------------------------------------
 -- IO requests
 
@@ -1250,7 +1304,7 @@ foreign import ccall unsafe "setTimevalTicks"
 
 data CFdSet
 
-foreign import ccall safe "select"
+foreign import ccall safe "__hscore_select"
   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
@@ -1280,14 +1334,13 @@ foreign import ccall unsafe "sizeof_fd_set"
 
 #endif
 
-reportStackOverflow :: IO a
-reportStackOverflow = do callStackOverflowHook; return undefined
+reportStackOverflow :: IO ()
+reportStackOverflow = callStackOverflowHook
 
-reportError :: SomeException -> IO a
+reportError :: SomeException -> IO ()
 reportError ex = do
    handler <- getUncaughtExceptionHandler
    handler ex
-   return undefined
 
 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
 -- the unsafe below.
@@ -1320,4 +1373,5 @@ setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
 
 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
+
 \end{code}