add Control.Monad.Instances to nhc98 build
[haskell-directory.git] / GHC / Conc.lhs
index b67847c..1deb160 100644 (file)
@@ -1,5 +1,5 @@
 \begin{code}
-{-# OPTIONS -fno-implicit-prelude #-}
+{-# OPTIONS_GHC -fno-implicit-prelude #-}
 -----------------------------------------------------------------------------
 -- |
 -- Module      :  GHC.Conc
 -- 
 -----------------------------------------------------------------------------
 
-#include "ghcconfig.h"
+-- No: #hide, because bits of this module are exposed by the stm package.
+-- However, we don't want this module to be the home location for the
+-- bits it exports, we'd rather have Control.Concurrent and the other
+-- higher level modules be the home.  Hence:
+
+#include "Typeable.h"
+
+-- #not-home
 module GHC.Conc
        ( ThreadId(..)
 
-       -- Forking and suchlike
+       -- * Forking and suchlike
+       , forkIO        -- :: IO a -> IO ThreadId
+       , forkOnIO      -- :: Int -> IO a -> IO ThreadId
+       , childHandler  -- :: Exception -> IO ()
        , myThreadId    -- :: IO ThreadId
        , killThread    -- :: ThreadId -> IO ()
        , throwTo       -- :: ThreadId -> Exception -> IO ()
@@ -27,12 +37,13 @@ module GHC.Conc
        , yield         -- :: IO ()
        , labelThread   -- :: ThreadId -> String -> IO ()
 
-       -- Waiting
+       -- * Waiting
        , threadDelay           -- :: Int -> IO ()
+       , registerDelay         -- :: Int -> IO (TVar Bool)
        , threadWaitRead        -- :: Int -> IO ()
        , threadWaitWrite       -- :: Int -> IO ()
 
-       -- MVars
+       -- * MVars
        , MVar          -- abstract
        , newMVar       -- :: a -> IO (MVar a)
        , newEmptyMVar  -- :: IO (MVar a)
@@ -43,18 +54,23 @@ module GHC.Conc
        , isEmptyMVar   -- :: MVar a -> IO Bool
        , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
 
-       -- TVars
+       -- * TVars
        , STM           -- abstract
        , atomically    -- :: STM a -> IO a
        , retry         -- :: STM a
        , orElse        -- :: STM a -> STM a -> STM a
         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
+       , alwaysSucceeds -- :: STM a -> STM ()
+       , always        -- :: STM Bool -> STM ()
        , TVar          -- abstract
        , newTVar       -- :: a -> STM (TVar a)
+       , newTVarIO     -- :: a -> STM (TVar a)
        , readTVar      -- :: TVar a -> STM a
        , writeTVar     -- :: a -> TVar a -> STM ()
+       , unsafeIOToSTM -- :: IO a -> STM a
 
-#ifdef mingw32_TARGET_OS
+       -- * Miscellaneous
+#ifdef mingw32_HOST_OS
        , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
        , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
        , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
@@ -62,6 +78,10 @@ module GHC.Conc
        , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
        , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
 #endif
+
+#ifndef mingw32_HOST_OS
+       , ensureIOManagerIsRunning
+#endif
         ) where
 
 import System.Posix.Types
@@ -69,6 +89,10 @@ import System.Posix.Internals
 import Foreign
 import Foreign.C
 
+#ifndef __HADDOCK__
+import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
+#endif
+
 import Data.Maybe
 
 import GHC.Base
@@ -76,10 +100,12 @@ import GHC.IOBase
 import GHC.Num         ( Num(..) )
 import GHC.Real                ( fromIntegral, quot )
 import GHC.Base                ( Int(..) )
-import GHC.Exception    ( Exception(..), AsyncException(..) )
+import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
 import GHC.STRef
+import GHC.Show                ( Show(..), showString )
+import Data.Typeable
 
 infixr 0 `par`, `pseq`
 \end{code}
@@ -91,7 +117,7 @@ infixr 0 `par`, `pseq`
 %************************************************************************
 
 \begin{code}
-data ThreadId = ThreadId ThreadId#
+data ThreadId = ThreadId ThreadId# deriving( Typeable )
 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
 -- But since ThreadId# is unlifted, the Weak type must use open
 -- type variables.
@@ -113,7 +139,69 @@ This misfeature will hopefully be corrected at a later date.
 it defines 'ThreadId' as a synonym for ().
 -}
 
---forkIO has now been hoisted out into the Concurrent library.
+instance Show ThreadId where
+   showsPrec d t = 
+       showString "ThreadId " . 
+        showsPrec d (getThreadId (id2TSO t))
+
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+
+id2TSO :: ThreadId -> ThreadId#
+id2TSO (ThreadId t) = t
+
+foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
+-- Returns -1, 0, 1
+
+cmpThread :: ThreadId -> ThreadId -> Ordering
+cmpThread t1 t2 = 
+   case cmp_thread (id2TSO t1) (id2TSO t2) of
+      -1 -> LT
+      0  -> EQ
+      _  -> GT -- must be 1
+
+instance Eq ThreadId where
+   t1 == t2 = 
+      case t1 `cmpThread` t2 of
+         EQ -> True
+         _  -> False
+
+instance Ord ThreadId where
+   compare = cmpThread
+
+{- |
+This sparks off a new thread to run the 'IO' computation passed as the
+first argument, and returns the 'ThreadId' of the newly created
+thread.
+
+The new thread will be a lightweight thread; if you want to use a foreign
+library that uses thread-local storage, use 'forkOS' instead.
+-}
+forkIO :: IO () -> IO ThreadId
+forkIO action = IO $ \ s -> 
+   case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+  action_plus = catchException action childHandler
+
+forkOnIO :: Int -> IO () -> IO ThreadId
+forkOnIO (I# cpu) action = IO $ \ s -> 
+   case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+  action_plus = catchException action childHandler
+
+childHandler :: Exception -> IO ()
+childHandler err = catchException (real_handler err) childHandler
+
+real_handler :: Exception -> IO ()
+real_handler ex =
+  case ex of
+       -- ignore thread GC and killThread exceptions:
+       BlockedOnDeadMVar            -> return ()
+       BlockedIndefinitely          -> return ()
+       AsyncException ThreadKilled  -> return ()
+
+       -- report all others:
+       AsyncException StackOverflow -> reportStackOverflow
+       other       -> reportError other
 
 {- | 'killThread' terminates the given thread (GHC only).
 Any work already done by the thread isn\'t
@@ -135,7 +223,13 @@ target thread.  The calling thread can thus be certain that the target
 thread has received the exception.  This is a useful property to know
 when dealing with race conditions: eg. if there are two threads that
 can kill each other, it is guaranteed that only one of the threads
-will get to kill the other. -}
+will get to kill the other.
+
+If the target thread is currently making a foreign call, then the
+exception will not be raised (and hence 'throwTo' will not return)
+until the call has completed.  This is the case regardless of whether
+the call is inside a 'block' or not.
+ -}
 throwTo :: ThreadId -> Exception -> IO ()
 throwTo (ThreadId id) ex = IO $ \ s ->
    case (killThread# id ex s) of s1 -> (# s1, () #)
@@ -201,11 +295,14 @@ TVars are shared memory locations which support atomic memory
 transactions.
 
 \begin{code}
+-- |A monad supporting atomic memory transactions.
 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
 
 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
 unSTM (STM a) = a
 
+INSTANCE_TYPEABLE1(STM,stmTc,"STM")
+
 instance  Functor STM where
    fmap f x = x >>= (return . f)
 
@@ -213,7 +310,7 @@ instance  Monad STM  where
     {-# INLINE return #-}
     {-# INLINE (>>)   #-}
     {-# INLINE (>>=)  #-}
-    m >> k      =  m >>= \_ -> k
+    m >> k      = thenSTM m k
     return x   = returnSTM x
     m >>= k     = bindSTM m k
 
@@ -232,7 +329,20 @@ thenSTM (STM m) k = STM ( \s ->
 returnSTM :: a -> STM a
 returnSTM x = STM (\s -> (# s, x #))
 
+-- | Unsafely performs IO in the STM monad.
+unsafeIOToSTM :: IO a -> STM a
+unsafeIOToSTM (IO m) = STM m
+
 -- |Perform a series of STM actions atomically.
+--
+-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
+-- Any attempt to do so will result in a runtime error.  (Reason: allowing
+-- this would effectively allow a transaction inside a transaction, depending
+-- on exactly when the thunk is evaluated.)
+--
+-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
+-- and which allows top-level TVars to be allocated.
+
 atomically :: STM a -> IO a
 atomically (STM m) = IO (\s -> (atomically# m) s )
 
@@ -240,14 +350,15 @@ atomically (STM m) = IO (\s -> (atomically# m) s )
 -- values in TVars which mean that it should not continue (e.g. the TVars
 -- represent a shared buffer that is now empty).  The implementation may
 -- block the thread until one of the TVars that it has read from has been
--- udpated.
+-- udpated. (GHC only)
 retry :: STM a
 retry = STM $ \s# -> retry# s#
 
--- |Compose two alternative STM actions.  If the first action completes without
--- retrying then it forms the result of the orElse.  Otherwise, if the first
--- action retries, then the second action is tried in its place.  If both actions
--- retry then the orElse as a whole retries.
+-- |Compose two alternative STM actions (GHC only).  If the first action
+-- completes without retrying then it forms the result of the orElse.
+-- Otherwise, if the first action retries, then the second action is
+-- tried in its place.  If both actions retry then the orElse as a
+-- whole retries.
 orElse :: STM a -> STM a -> STM a
 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
 
@@ -255,8 +366,35 @@ orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
 catchSTM :: STM a -> (Exception -> STM a) -> STM a
 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
 
+-- | Low-level primitive on which always and alwaysSucceeds are built.
+-- checkInv differs form these in that (i) the invariant is not 
+-- checked when checkInv is called, only at the end of this and
+-- subsequent transcations, (ii) the invariant failure is indicated
+-- by raising an exception.
+checkInv :: STM a -> STM ()
+checkInv (STM m) = STM (\s -> (check# m) s)
+
+-- | alwaysSucceeds adds a new invariant that must be true when passed
+-- to alwaysSucceeds, at the end of the current transaction, and at
+-- the end of every subsequent transaction.  If it fails at any
+-- of those points then the transaction violating it is aborted
+-- and the exception raised by the invariant is propagated.
+alwaysSucceeds :: STM a -> STM ()
+alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
+                      checkInv i
+
+-- | always is a variant of alwaysSucceeds in which the invariant is
+-- expressed as an STM Bool action that must return True.  Returning
+-- False or raising an exception are both treated as invariant failures.
+always :: STM Bool -> STM ()
+always i = alwaysSucceeds ( do v <- i
+                               if (v) then return () else ( error "Transacional invariant violation" ) )
+
+-- |Shared memory locations that support atomic memory transactions.
 data TVar a = TVar (TVar# RealWorld a)
 
+INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
+
 instance Eq (TVar a) where
        (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
 
@@ -266,6 +404,15 @@ newTVar val = STM $ \s1# ->
     case newTVar# val s1# of
         (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
 
+-- |@IO@ version of 'newTVar'.  This is useful for creating top-level
+-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
+-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
+-- possible.
+newTVarIO :: a -> IO (TVar a)
+newTVarIO val = IO $ \s1# ->
+    case newTVar# val s1# of
+        (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+
 -- |Return the current value stored in a TVar
 readTVar :: TVar a -> STM a
 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
@@ -312,16 +459,34 @@ newMVar value =
 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
 -- the 'MVar' is left empty.
 -- 
--- If several threads are competing to take the same 'MVar', one is chosen
--- to continue at random when the 'MVar' becomes full.
+-- There are two further important properties of 'takeMVar':
+--
+--   * 'takeMVar' is single-wakeup.  That is, if there are multiple
+--     threads blocked in 'takeMVar', and the 'MVar' becomes full,
+--     only one thread will be woken up.  The runtime guarantees that
+--     the woken thread completes its 'takeMVar' operation.
+--
+--   * When multiple threads are blocked on an 'MVar', they are
+--     woken up in FIFO order.  This is useful for providing
+--     fairness properties of abstractions built using 'MVar's.
+--
 takeMVar :: MVar a -> IO a
 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
 
 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
 -- 'putMVar' will wait until it becomes empty.
 --
--- If several threads are competing to fill the same 'MVar', one is
--- chosen to continue at random when the 'MVar' becomes empty.
+-- There are two further important properties of 'putMVar':
+--
+--   * 'putMVar' is single-wakeup.  That is, if there are multiple
+--     threads blocked in 'putMVar', and the 'MVar' becomes empty,
+--     only one thread will be woken up.  The runtime guarantees that
+--     the woken thread completes its 'putMVar' operation.
+--
+--   * When multiple threads are blocked on an 'MVar', they are
+--     woken up in FIFO order.  This is useful for providing
+--     fairness properties of abstractions built using 'MVar's.
+--
 putMVar  :: MVar a -> a -> IO ()
 putMVar (MVar mvar#) x = IO $ \ s# ->
     case putMVar# mvar# x s# of
@@ -372,25 +537,20 @@ addMVarFinalizer (MVar m) finalizer =
 %************************************************************************
 
 \begin{code}
-#ifdef mingw32_TARGET_OS
+#ifdef mingw32_HOST_OS
 
 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
 -- on Win32, but left in there because lib code (still) uses them (the manner
 -- in which they're used doesn't cause problems on a Win32 platform though.)
 
 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
-asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) = do
-  (l, rc) <- IO (\s -> case asyncRead# fd isSock len buf s  of 
-                        (# s, len#, err# #) -> (# s, (I# len#, I# err#) #))
-    -- special handling for Ctrl+C-aborted 'standard input' reads;
-    -- see rts/win32/ConsoleHandler.c for details.
-  if (l == 0 && rc == -2)
-   then asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf)
-   else return (l,rc)
+asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
+  IO $ \s -> case asyncRead# fd isSock len buf s of 
+              (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
 
 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
-asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) = 
-  IO $ \s -> case asyncWrite# fd isSock len buf s  of 
+asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
+  IO $ \s -> case asyncWrite# fd isSock len buf s of 
               (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
 
 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
@@ -421,7 +581,7 @@ asyncWriteBA fd isSock len off bufB =
 -- given file descriptor (GHC only).
 threadWaitRead :: Fd -> IO ()
 threadWaitRead fd
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
   | threaded  = waitForReadEvent fd
 #endif
   | otherwise = IO $ \s -> 
@@ -433,7 +593,7 @@ threadWaitRead fd
 -- given file descriptor (GHC only).
 threadWaitWrite :: Fd -> IO ()
 threadWaitWrite fd
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
   | threaded  = waitForWriteEvent fd
 #endif
   | otherwise = IO $ \s -> 
@@ -454,7 +614,7 @@ threadWaitWrite fd
 --
 threadDelay :: Int -> IO ()
 threadDelay time
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
   | threaded  = waitForDelayEvent time
 #else
   | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
@@ -464,9 +624,18 @@ threadDelay time
        case delay# time# s of { s -> (# s, () #)
        }}
 
+registerDelay :: Int -> IO (TVar Bool)
+registerDelay usecs 
+#ifndef mingw32_HOST_OS
+  | threaded = waitForDelayEventSTM usecs
+  | otherwise = error "registerDelay: requires -threaded"
+#else
+  = error "registerDelay: not currently supported on Windows"
+#endif
+
 -- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
-#ifdef mingw32_TARGET_OS
-foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO ()
+#ifdef mingw32_HOST_OS
+foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
 #endif
 
 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
@@ -503,14 +672,15 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 --     - forkProcess will kill the IO manager thread.  Let's just
 --       hope we don't need to do any blocking IO between fork & exec.
 
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
 
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
 
 data DelayReq
-  = Delay  {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+  = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+  | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
 
 pendingEvents :: IORef [IOReq]
 pendingDelays :: IORef [DelayReq]
@@ -518,31 +688,33 @@ pendingDelays :: IORef [DelayReq]
 {-# NOINLINE pendingEvents #-}
 {-# NOINLINE pendingDelays #-}
 (pendingEvents,pendingDelays) = unsafePerformIO $ do
-  startIOServiceThread
+  startIOManagerThread
   reqs <- newIORef []
   dels <- newIORef []
   return (reqs, dels)
        -- the first time we schedule an IO request, the service thread
        -- will be created (cool, huh?)
 
-startIOServiceThread :: IO ()
-startIOServiceThread = do
+ensureIOManagerIsRunning :: IO ()
+ensureIOManagerIsRunning 
+  | threaded  = seq pendingEvents $ return ()
+  | otherwise = return ()
+
+startIOManagerThread :: IO ()
+startIOManagerThread = do
         allocaArray 2 $ \fds -> do
-       throwErrnoIfMinus1 "startIOServiceThread" (c_pipe fds)
+       throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
        rd_end <- peekElemOff fds 0
        wr_end <- peekElemOff fds 1
        writeIORef stick (fromIntegral wr_end)
-       quickForkIO $ do
+       c_setIOManagerPipe wr_end
+       forkIO $ do
            allocaBytes sizeofFdSet   $ \readfds -> do
            allocaBytes sizeofFdSet   $ \writefds -> do 
            allocaBytes sizeofTimeVal $ \timeval -> do
            service_loop (fromIntegral rd_end) readfds writefds timeval [] []
        return ()
 
--- XXX: move real forkIO here from Control.Concurrent?
-quickForkIO action = IO $ \s ->
-   case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
-
 service_loop
    :: Fd               -- listen to this for wakeup calls
    -> Ptr CFdSet
@@ -567,38 +739,67 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
   fdSet wakeup readfds
   maxfd <- buildFdSets 0 readfds writefds reqs
 
-  -- check the current time and wake up any thread in threadDelay whose
-  -- timeout has expired.  Also find the timeout value for the select() call.
-  now <- getTicksOfDay
-  (delays', timeout) <- getDelay now ptimeval delays
-
   -- perform the select()
-  let do_select = do
+  let do_select delays = do
+         -- check the current time and wake up any thread in
+         -- threadDelay whose timeout has expired.  Also find the
+         -- timeout value for the select() call.
+         now <- getTicksOfDay
+         (delays', timeout) <- getDelay now ptimeval delays
+
          res <- c_select ((max wakeup maxfd)+1) readfds writefds 
                        nullPtr timeout
          if (res == -1)
             then do
                err <- getErrno
-               if err == eINTR
-                       then do_select
-                       else return res
+               case err of
+                 _ | err == eINTR ->  do_select delays'
+                       -- EINTR: just redo the select()
+                 _ | err == eBADF ->  return (True, delays)
+                       -- EBADF: one of the file descriptors is closed or bad,
+                       -- we don't know which one, so wake everyone up.
+                 _ | otherwise    ->  throwErrno "select"
+                       -- otherwise (ENOMEM or EINVAL) something has gone
+                       -- wrong; report the error.
             else
-               return res
-  res <- do_select
-  -- ToDo: check result
-
-  b <- takeMVar prodding
-  if b then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return ()
-       else return ()
+               return (False,delays')
+
+  (wakeup_all,delays') <- do_select delays
+
+  exit <-
+    if wakeup_all then return False
+      else do
+        b <- fdIsSet wakeup readfds
+        if b == 0 
+          then return False
+          else alloca $ \p -> do 
+                c_read (fromIntegral wakeup) p 1; return ()
+                s <- peek p            
+                case s of
+                 _ | s == io_MANAGER_WAKEUP -> return False
+                 _ | s == io_MANAGER_DIE    -> return True
+                 _ -> do handler_tbl <- peek handlers
+                         sp <- peekElemOff handler_tbl (fromIntegral s)
+                         forkIO (do io <- deRefStablePtr sp; io)
+                         return False
+
+  if exit then return () else do
+
+  takeMVar prodding
   putMVar prodding False
 
-  reqs' <- completeRequests reqs readfds writefds []
+  reqs' <- if wakeup_all then do wakeupAll reqs; return []
+                        else completeRequests reqs readfds writefds []
+
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
 stick :: IORef Fd
 {-# NOINLINE stick #-}
 stick = unsafePerformIO (newIORef 0)
 
+io_MANAGER_WAKEUP = 0xff :: CChar
+io_MANAGER_DIE    = 0xfe :: CChar
+
 prodding :: MVar Bool
 {-# NOINLINE prodding #-}
 prodding = unsafePerformIO (newMVar False)
@@ -608,20 +809,30 @@ prodServiceThread = do
   b <- takeMVar prodding
   if (not b) 
     then do fd <- readIORef stick
-           with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
+           with io_MANAGER_WAKEUP $ \pbuf -> do 
+               c_write (fromIntegral fd) pbuf 1; return ()
     else return ()
   putMVar prodding True
 
+foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
+
+foreign import ccall "setIOManagerPipe"
+  c_setIOManagerPipe :: CInt -> IO ()
+
 -- -----------------------------------------------------------------------------
 -- IO requests
 
 buildFdSets maxfd readfds writefds [] = return maxfd
-buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
-  fdSet fd readfds
-  buildFdSets (max maxfd fd) readfds writefds reqs
-buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
-  fdSet fd writefds
-  buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Read fd m : reqs)
+  | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
+  | otherwise        =  do
+       fdSet fd readfds
+        buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Write fd m : reqs)
+  | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
+  | otherwise        =  do
+       fdSet fd writefds
+       buildFdSets (max maxfd fd) readfds writefds reqs
 
 completeRequests [] _ _ reqs' = return reqs'
 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
@@ -635,6 +846,10 @@ completeRequests (Write fd m : reqs) readfds writefds reqs' = do
     then do putMVar m (); completeRequests reqs readfds writefds reqs'
     else completeRequests reqs readfds writefds (Write fd m : reqs')
 
+wakeupAll [] = return ()
+wakeupAll (Read  fd m : reqs) = do putMVar m (); wakeupAll reqs
+wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
+
 waitForReadEvent :: Fd -> IO ()
 waitForReadEvent fd = do
   m <- newEmptyMVar
@@ -665,24 +880,41 @@ waitForDelayEvent usecs = do
   prodServiceThread
   takeMVar m
 
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+   t <- atomically $ newTVar False
+   now <- getTicksOfDay
+   let target = now + usecs `quot` tick_usecs
+   atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
+   prodServiceThread
+   return t  
+    
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
 -- delays is kept ordered.
 getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
 getDelay now ptimeval [] = return ([],nullPtr)
-getDelay now ptimeval all@(Delay time m : rest)
-  | now >= time = do
+getDelay now ptimeval all@(d : rest) 
+  = case d of
+     Delay time m | now >= time -> do
        putMVar m ()
        getDelay now ptimeval rest
-  | otherwise = do
-       setTimevalTicks ptimeval (time - now)
+     DelaySTM time t | now >= time -> do
+       atomically $ writeTVar t True
+       getDelay now ptimeval rest
+     _otherwise -> do
+       setTimevalTicks ptimeval (delayTime d - now)
        return (all,ptimeval)
 
 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
-insertDelay d@(Delay time m) [] = [d]
-insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest)
-  | time <= time' = d1 : ds
-  | otherwise     = d2 : insertDelay d1 rest
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+  | delayTime d1 <= delayTime d2 = d1 : ds
+  | otherwise                    = d2 : insertDelay d1 rest
+
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
 
 type Ticks = Int
 tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
@@ -710,6 +942,9 @@ foreign import ccall safe "select"
   c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
+foreign import ccall unsafe "hsFD_SETSIZE"
+  fD_SETSIZE :: Fd
+
 foreign import ccall unsafe "hsFD_CLR"
   fdClr :: Fd -> Ptr CFdSet -> IO ()