haddock attributes for haddock-2.0
[ghc-base.git] / GHC / Conc.lhs
index 0b6bc91..aa2eff1 100644 (file)
@@ -1,5 +1,6 @@
 \begin{code}
 {-# OPTIONS_GHC -fno-implicit-prelude #-}
+{-# OPTIONS_HADDOCK not-home #-}
 -----------------------------------------------------------------------------
 -- |
 -- Module      :  GHC.Conc
@@ -19,6 +20,8 @@
 -- bits it exports, we'd rather have Control.Concurrent and the other
 -- higher level modules be the home.  Hence:
 
+#include "Typeable.h"
+
 -- #not-home
 module GHC.Conc
        ( ThreadId(..)
@@ -26,6 +29,7 @@ module GHC.Conc
        -- * Forking and suchlike
        , forkIO        -- :: IO a -> IO ThreadId
        , forkOnIO      -- :: Int -> IO a -> IO ThreadId
+        , numCapabilities -- :: Int
        , childHandler  -- :: Exception -> IO ()
        , myThreadId    -- :: IO ThreadId
        , killThread    -- :: ThreadId -> IO ()
@@ -58,6 +62,8 @@ module GHC.Conc
        , retry         -- :: STM a
        , orElse        -- :: STM a -> STM a -> STM a
         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
+       , alwaysSucceeds -- :: STM a -> STM ()
+       , always        -- :: STM Bool -> STM ()
        , TVar          -- abstract
        , newTVar       -- :: a -> STM (TVar a)
        , newTVarIO     -- :: a -> STM (TVar a)
@@ -76,12 +82,22 @@ module GHC.Conc
 #endif
 
 #ifndef mingw32_HOST_OS
+        , signalHandlerLock
+#endif
+
        , ensureIOManagerIsRunning
+
+#ifdef mingw32_HOST_OS
+        , ConsoleEvent(..)
+        , win32ConsoleHandler
+        , toWin32ConsoleEvent
 #endif
         ) where
 
 import System.Posix.Types
+#ifndef mingw32_HOST_OS
 import System.Posix.Internals
+#endif
 import Foreign
 import Foreign.C
 
@@ -94,9 +110,15 @@ import Data.Maybe
 import GHC.Base
 import GHC.IOBase
 import GHC.Num         ( Num(..) )
-import GHC.Real                ( fromIntegral, quot )
+import GHC.Real                ( fromIntegral, div )
+#ifndef mingw32_HOST_OS
 import GHC.Base                ( Int(..) )
-import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
+#endif
+#ifdef mingw32_HOST_OS
+import GHC.Read         ( Read )
+import GHC.Enum         ( Enum )
+#endif
+import GHC.Exception
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
 import GHC.STRef
@@ -140,7 +162,7 @@ instance Show ThreadId where
        showString "ThreadId " . 
         showsPrec d (getThreadId (id2TSO t))
 
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
 
 id2TSO :: ThreadId -> ThreadId#
 id2TSO (ThreadId t) = t
@@ -165,12 +187,15 @@ instance Ord ThreadId where
    compare = cmpThread
 
 {- |
-This sparks off a new thread to run the 'IO' computation passed as the
+Sparks off a new thread to run the 'IO' computation passed as the
 first argument, and returns the 'ThreadId' of the newly created
 thread.
 
 The new thread will be a lightweight thread; if you want to use a foreign
-library that uses thread-local storage, use 'forkOS' instead.
+library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
+
+GHC note: the new thread inherits the /blocked/ state of the parent 
+(see 'Control.Exception.block').
 -}
 forkIO :: IO () -> IO ThreadId
 forkIO action = IO $ \ s -> 
@@ -178,12 +203,36 @@ forkIO action = IO $ \ s ->
  where
   action_plus = catchException action childHandler
 
+{- |
+Like 'forkIO', but lets you specify on which CPU the thread is
+created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
+will stay on the same CPU for its entire lifetime (`forkIO` threads
+can migrate between CPUs according to the scheduling policy).
+`forkOnIO` is useful for overriding the scheduling policy when you
+know in advance how best to distribute the threads.
+
+The `Int` argument specifies the CPU number; it is interpreted modulo
+'numCapabilities' (note that it actually specifies a capability number
+rather than a CPU number, but to a first approximation the two are
+equivalent).
+-}
 forkOnIO :: Int -> IO () -> IO ThreadId
 forkOnIO (I# cpu) action = IO $ \ s -> 
    case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
  where
   action_plus = catchException action childHandler
 
+-- | the value passed to the @+RTS -N@ flag.  This is the number of
+-- Haskell threads that can run truly simultaneously at any given
+-- time, and is typically set to the number of physical CPU cores on
+-- the machine.
+numCapabilities :: Int
+numCapabilities = unsafePerformIO $  do 
+                    n <- peek n_capabilities
+                    return (fromIntegral n)
+
+foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
+
 childHandler :: Exception -> IO ()
 childHandler err = catchException (real_handler err) childHandler
 
@@ -215,7 +264,8 @@ killThread tid = throwTo tid (AsyncException ThreadKilled)
 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
 
 'throwTo' does not return until the exception has been raised in the
-target thread.  The calling thread can thus be certain that the target
+target thread. 
+The calling thread can thus be certain that the target
 thread has received the exception.  This is a useful property to know
 when dealing with race conditions: eg. if there are two threads that
 can kill each other, it is guaranteed that only one of the threads
@@ -225,6 +275,21 @@ If the target thread is currently making a foreign call, then the
 exception will not be raised (and hence 'throwTo' will not return)
 until the call has completed.  This is the case regardless of whether
 the call is inside a 'block' or not.
+
+Important note: the behaviour of 'throwTo' differs from that described in
+the paper \"Asynchronous exceptions in Haskell\"
+(<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
+In the paper, 'throwTo' is non-blocking; but the library implementation adopts
+a more synchronous design in which 'throwTo' does not return until the exception
+is received by the target thread.  The trade-off is discussed in Section 8 of the paper.
+Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
+the paper).
+
+There is currently no guarantee that the exception delivered by 'throwTo' will be
+delivered at the first possible opportunity.  In particular, if a thread may 
+unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
+a pending 'throwTo'.  This is arguably undesirable behaviour.
+
  -}
 throwTo :: ThreadId -> Exception -> IO ()
 throwTo (ThreadId id) ex = IO $ \ s ->
@@ -292,11 +357,13 @@ transactions.
 
 \begin{code}
 -- |A monad supporting atomic memory transactions.
-newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
+newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
 
 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
 unSTM (STM a) = a
 
+INSTANCE_TYPEABLE1(STM,stmTc,"STM")
+
 instance  Functor STM where
    fmap f x = x >>= (return . f)
 
@@ -328,6 +395,15 @@ unsafeIOToSTM :: IO a -> STM a
 unsafeIOToSTM (IO m) = STM m
 
 -- |Perform a series of STM actions atomically.
+--
+-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
+-- Any attempt to do so will result in a runtime error.  (Reason: allowing
+-- this would effectively allow a transaction inside a transaction, depending
+-- on exactly when the thunk is evaluated.)
+--
+-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
+-- and which allows top-level TVars to be allocated.
+
 atomically :: STM a -> IO a
 atomically (STM m) = IO (\s -> (atomically# m) s )
 
@@ -351,8 +427,34 @@ orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
 catchSTM :: STM a -> (Exception -> STM a) -> STM a
 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
 
+-- | Low-level primitive on which always and alwaysSucceeds are built.
+-- checkInv differs form these in that (i) the invariant is not 
+-- checked when checkInv is called, only at the end of this and
+-- subsequent transcations, (ii) the invariant failure is indicated
+-- by raising an exception.
+checkInv :: STM a -> STM ()
+checkInv (STM m) = STM (\s -> (check# m) s)
+
+-- | alwaysSucceeds adds a new invariant that must be true when passed
+-- to alwaysSucceeds, at the end of the current transaction, and at
+-- the end of every subsequent transaction.  If it fails at any
+-- of those points then the transaction violating it is aborted
+-- and the exception raised by the invariant is propagated.
+alwaysSucceeds :: STM a -> STM ()
+alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
+                      checkInv i
+
+-- | always is a variant of alwaysSucceeds in which the invariant is
+-- expressed as an STM Bool action that must return True.  Returning
+-- False or raising an exception are both treated as invariant failures.
+always :: STM Bool -> STM ()
+always i = alwaysSucceeds ( do v <- i
+                               if (v) then return () else ( error "Transacional invariant violation" ) )
+
 -- |Shared memory locations that support atomic memory transactions.
-data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
+data TVar a = TVar (TVar# RealWorld a)
+
+INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
 
 instance Eq (TVar a) where
        (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
@@ -486,6 +588,15 @@ isEmptyMVar (MVar mv#) = IO $ \ s# ->
 addMVarFinalizer :: MVar a -> IO () -> IO ()
 addMVarFinalizer (MVar m) finalizer = 
   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
+
+withMVar :: MVar a -> (a -> IO b) -> IO b
+withMVar m io = 
+  block $ do
+    a <- takeMVar m
+    b <- catchException (unblock (io a))
+           (\e -> do putMVar m a; throw e)
+    putMVar m a
+    return b
 \end{code}
 
 
@@ -498,7 +609,7 @@ addMVarFinalizer (MVar m) finalizer =
 \begin{code}
 #ifdef mingw32_HOST_OS
 
--- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
+-- Note: threadWaitRead and threadWaitWrite aren't really functional
 -- on Win32, but left in there because lib code (still) uses them (the manner
 -- in which they're used doesn't cause problems on a Win32 platform though.)
 
@@ -563,42 +674,52 @@ threadWaitWrite fd
 -- | Suspends the current thread for a given number of microseconds
 -- (GHC only).
 --
--- Note that the resolution used by the Haskell runtime system's
--- internal timer is 1\/50 second, and 'threadDelay' will round its
--- argument up to the nearest multiple of this resolution.
---
 -- There is no guarantee that the thread will be rescheduled promptly
 -- when the delay has expired, but the thread will never continue to
 -- run /earlier/ than specified.
 --
 threadDelay :: Int -> IO ()
 threadDelay time
-#ifndef mingw32_HOST_OS
   | threaded  = waitForDelayEvent time
-#else
-  | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
-#endif
   | otherwise = IO $ \s -> 
        case fromIntegral time of { I# time# ->
        case delay# time# s of { s -> (# s, () #)
        }}
 
+
+-- | Set the value of returned TVar to True after a given number of
+-- microseconds. The caveats associated with threadDelay also apply.
+--
 registerDelay :: Int -> IO (TVar Bool)
 registerDelay usecs 
-#ifndef mingw32_HOST_OS
   | threaded = waitForDelayEventSTM usecs
   | otherwise = error "registerDelay: requires -threaded"
-#else
-  = error "registerDelay: not currently supported on Windows"
-#endif
-
--- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
-#ifdef mingw32_HOST_OS
-foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
-#endif
 
 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 
+waitForDelayEvent :: Int -> IO ()
+waitForDelayEvent usecs = do
+  m <- newEmptyMVar
+  target <- calculateTarget usecs
+  atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
+  prodServiceThread
+  takeMVar m
+
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+   t <- atomically $ newTVar False
+   target <- calculateTarget usecs
+   atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
+   prodServiceThread
+   return t  
+    
+calculateTarget :: Int -> IO USecs
+calculateTarget usecs = do
+    now <- getUSecOfDay
+    return $ now + (fromIntegral usecs)
+
+
 -- ----------------------------------------------------------------------------
 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
 
@@ -632,16 +753,18 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 --       hope we don't need to do any blocking IO between fork & exec.
 
 #ifndef mingw32_HOST_OS
-
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
+#endif
 
 data DelayReq
-  = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
-  | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
+  = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
+  | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
 
+#ifndef mingw32_HOST_OS
 pendingEvents :: IORef [IOReq]
+#endif
 pendingDelays :: IORef [DelayReq]
        -- could use a strict list or array here
 {-# NOINLINE pendingEvents #-}
@@ -659,6 +782,164 @@ ensureIOManagerIsRunning
   | threaded  = seq pendingEvents $ return ()
   | otherwise = return ()
 
+insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+  | delayTime d1 <= delayTime d2 = d1 : ds
+  | otherwise                    = d2 : insertDelay d1 rest
+
+delayTime :: DelayReq -> USecs
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
+
+type USecs = Word64
+
+-- XXX: move into GHC.IOBase from Data.IORef?
+atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
+atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
+
+foreign import ccall unsafe "getUSecOfDay" 
+  getUSecOfDay :: IO USecs
+
+prodding :: IORef Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newIORef False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+  was_set <- atomicModifyIORef prodding (\a -> (True,a))
+  if (not (was_set)) then wakeupIOManager else return ()
+
+#ifdef mingw32_HOST_OS
+-- ----------------------------------------------------------------------------
+-- Windows IO manager thread
+
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+  wakeup <- c_getIOManagerEvent
+  forkIO $ service_loop wakeup []
+  return ()
+
+service_loop :: HANDLE          -- read end of pipe
+             -> [DelayReq]      -- current delay requests
+             -> IO ()
+
+service_loop wakeup old_delays = do
+  -- pick up new delay requests
+  new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
+  let  delays = foldr insertDelay old_delays new_delays
+
+  now <- getUSecOfDay
+  (delays', timeout) <- getDelay now delays
+
+  r <- c_WaitForSingleObject wakeup timeout
+  case r of
+    0xffffffff -> do c_maperrno; throwErrno "service_loop"
+    0 -> do
+        r <- c_readIOManagerEvent
+        exit <- 
+             case r of
+               _ | r == io_MANAGER_WAKEUP -> return False
+               _ | r == io_MANAGER_DIE    -> return True
+                0 -> return False -- spurious wakeup
+               r -> do start_console_handler (r `shiftR` 1); return False
+        if exit
+          then return ()
+          else service_cont wakeup delays'
+
+    _other -> service_cont wakeup delays' -- probably timeout        
+
+service_cont wakeup delays = do
+  atomicModifyIORef prodding (\_ -> (False,False))
+  service_loop wakeup delays
+
+-- must agree with rts/win32/ThrIOManager.c
+io_MANAGER_WAKEUP = 0xffffffff :: Word32
+io_MANAGER_DIE    = 0xfffffffe :: Word32
+
+data ConsoleEvent
+ = ControlC
+ | Break
+ | Close
+    -- these are sent to Services only.
+ | Logoff
+ | Shutdown
+ deriving (Eq, Ord, Enum, Show, Read, Typeable)
+
+start_console_handler :: Word32 -> IO ()
+start_console_handler r =
+  case toWin32ConsoleEvent r of
+     Just x  -> withMVar win32ConsoleHandler $ \handler -> do
+                    forkIO (handler x)
+                    return ()
+     Nothing -> return ()
+
+toWin32ConsoleEvent ev = 
+   case ev of
+       0 {- CTRL_C_EVENT-}        -> Just ControlC
+       1 {- CTRL_BREAK_EVENT-}    -> Just Break
+       2 {- CTRL_CLOSE_EVENT-}    -> Just Close
+       5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
+       6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
+       _ -> Nothing
+
+win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
+win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
+
+stick :: IORef HANDLE
+{-# NOINLINE stick #-}
+stick = unsafePerformIO (newIORef nullPtr)
+
+wakeupIOManager = do 
+  hdl <- readIORef stick
+  c_sendIOManagerEvent io_MANAGER_WAKEUP
+
+-- Walk the queue of pending delays, waking up any that have passed
+-- and return the smallest delay to wait for.  The queue of pending
+-- delays is kept ordered.
+getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay now [] = return ([], iNFINITE)
+getDelay now all@(d : rest) 
+  = case d of
+     Delay time m | now >= time -> do
+       putMVar m ()
+       getDelay now rest
+     DelaySTM time t | now >= time -> do
+       atomically $ writeTVar t True
+       getDelay now rest
+     _otherwise ->
+        -- delay is in millisecs for WaitForSingleObject
+        let micro_seconds = delayTime d - now
+            milli_seconds = (micro_seconds + 999) `div` 1000
+        in return (all, fromIntegral milli_seconds)
+
+-- ToDo: this just duplicates part of System.Win32.Types, which isn't
+-- available yet.  We should move some Win32 functionality down here,
+-- maybe as part of the grand reorganisation of the base package...
+type HANDLE       = Ptr ()
+type DWORD        = Word32
+
+iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
+
+foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
+  c_getIOManagerEvent :: IO HANDLE
+
+foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
+  c_readIOManagerEvent :: IO Word32
+
+foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
+  c_sendIOManagerEvent :: Word32 -> IO ()
+
+foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
+   c_maperrno :: IO ()
+
+foreign import stdcall "WaitForSingleObject"
+   c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
+
+#else
+-- ----------------------------------------------------------------------------
+-- Unix IO manager thread, using select()
+
 startIOManagerThread :: IO ()
 startIOManagerThread = do
         allocaArray 2 $ \fds -> do
@@ -703,10 +984,10 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
          -- check the current time and wake up any thread in
          -- threadDelay whose timeout has expired.  Also find the
          -- timeout value for the select() call.
-         now <- getTicksOfDay
+         now <- getUSecOfDay
          (delays', timeout) <- getDelay now ptimeval delays
 
-         res <- c_select ((max wakeup maxfd)+1) readfds writefds 
+         res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
                        nullPtr timeout
          if (res == -1)
             then do
@@ -737,41 +1018,40 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
                 case s of
                  _ | s == io_MANAGER_WAKEUP -> return False
                  _ | s == io_MANAGER_DIE    -> return True
-                 _ -> do handler_tbl <- peek handlers
+                 _ -> withMVar signalHandlerLock $ \_ -> do
+                          handler_tbl <- peek handlers
                          sp <- peekElemOff handler_tbl (fromIntegral s)
-                         forkIO (do io <- deRefStablePtr sp; io)
+                          io <- deRefStablePtr sp
+                         forkIO io
                          return False
 
   if exit then return () else do
 
-  takeMVar prodding
-  putMVar prodding False
+  atomicModifyIORef prodding (\_ -> (False,False))
 
   reqs' <- if wakeup_all then do wakeupAll reqs; return []
                         else completeRequests reqs readfds writefds []
 
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
+io_MANAGER_WAKEUP = 0xff :: CChar
+io_MANAGER_DIE    = 0xfe :: CChar
+
 stick :: IORef Fd
 {-# NOINLINE stick #-}
 stick = unsafePerformIO (newIORef 0)
 
-io_MANAGER_WAKEUP = 0xff :: CChar
-io_MANAGER_DIE    = 0xfe :: CChar
+wakeupIOManager :: IO ()
+wakeupIOManager = do
+  fd <- readIORef stick
+  with io_MANAGER_WAKEUP $ \pbuf -> do 
+    c_write (fromIntegral fd) pbuf 1; return ()
 
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
-
-prodServiceThread :: IO ()
-prodServiceThread = do
-  b <- takeMVar prodding
-  if (not b) 
-    then do fd <- readIORef stick
-           with io_MANAGER_WAKEUP $ \pbuf -> do 
-               c_write (fromIntegral fd) pbuf 1; return ()
-    else return ()
-  putMVar prodding True
+-- Lock used to protect concurrent access to signal_handlers.  Symptom of
+-- this race condition is #1922, although that bug was on Windows a similar
+-- bug also exists on Unix.
+signalHandlerLock :: MVar ()
+signalHandlerLock = unsafePerformIO (newMVar ())
 
 foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
 
@@ -823,36 +1103,13 @@ waitForWriteEvent fd = do
   prodServiceThread
   takeMVar m
 
--- XXX: move into GHC.IOBase from Data.IORef?
-atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
-atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-
 -- -----------------------------------------------------------------------------
 -- Delays
 
-waitForDelayEvent :: Int -> IO ()
-waitForDelayEvent usecs = do
-  m <- newEmptyMVar
-  now <- getTicksOfDay
-  let target = now + usecs `quot` tick_usecs
-  atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
-  prodServiceThread
-  takeMVar m
-
--- Delays for use in STM
-waitForDelayEventSTM :: Int -> IO (TVar Bool)
-waitForDelayEventSTM usecs = do
-   t <- atomically $ newTVar False
-   now <- getTicksOfDay
-   let target = now + usecs `quot` tick_usecs
-   atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
-   prodServiceThread
-   return t  
-    
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
 -- delays is kept ordered.
-getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
+getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
 getDelay now ptimeval [] = return ([],nullPtr)
 getDelay now ptimeval all@(d : rest) 
   = case d of
@@ -866,29 +1123,19 @@ getDelay now ptimeval all@(d : rest)
        setTimevalTicks ptimeval (delayTime d - now)
        return (all,ptimeval)
 
-insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
-insertDelay d [] = [d]
-insertDelay d1 ds@(d2 : rest)
-  | delayTime d1 <= delayTime d2 = d1 : ds
-  | otherwise                    = d2 : insertDelay d1 rest
-
-delayTime (Delay t _) = t
-delayTime (DelaySTM t _) = t
-
-type Ticks = Int
-tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
-tick_usecs = 1000000 `quot` tick_freq :: Int
-
 newtype CTimeVal = CTimeVal ()
 
 foreign import ccall unsafe "sizeofTimeVal"
   sizeofTimeVal :: Int
 
-foreign import ccall unsafe "getTicksOfDay" 
-  getTicksOfDay :: IO Ticks
-
 foreign import ccall unsafe "setTimevalTicks" 
-  setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
+  setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
+
+{- 
+  On Win32 we're going to have a single Pipe, and a
+  waitForSingleObject with the delay time.  For signals, we send a
+  byte down the pipe just like on Unix.
+-}
 
 -- ----------------------------------------------------------------------------
 -- select() interface
@@ -898,20 +1145,32 @@ foreign import ccall unsafe "setTimevalTicks"
 newtype CFdSet = CFdSet ()
 
 foreign import ccall safe "select"
-  c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
+  c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
            -> IO CInt
 
 foreign import ccall unsafe "hsFD_SETSIZE"
-  fD_SETSIZE :: Fd
+  c_fD_SETSIZE :: CInt
+
+fD_SETSIZE :: Fd
+fD_SETSIZE = fromIntegral c_fD_SETSIZE
 
 foreign import ccall unsafe "hsFD_CLR"
-  fdClr :: Fd -> Ptr CFdSet -> IO ()
+  c_fdClr :: CInt -> Ptr CFdSet -> IO ()
+
+fdClr :: Fd -> Ptr CFdSet -> IO ()
+fdClr (Fd fd) fdset = c_fdClr fd fdset
 
 foreign import ccall unsafe "hsFD_ISSET"
-  fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+  c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
+
+fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
 
 foreign import ccall unsafe "hsFD_SET"
-  fdSet :: Fd -> Ptr CFdSet -> IO ()
+  c_fdSet :: CInt -> Ptr CFdSet -> IO ()
+
+fdSet :: Fd -> Ptr CFdSet -> IO ()
+fdSet (Fd fd) fdset = c_fdSet fd fdset
 
 foreign import ccall unsafe "hsFD_ZERO"
   fdZero :: Ptr CFdSet -> IO ()
@@ -920,4 +1179,5 @@ foreign import ccall unsafe "sizeof_fd_set"
   sizeofFdSet :: Int
 
 #endif
+
 \end{code}