Add support for the IO manager thread on Windows
[haskell-directory.git] / GHC / Conc.lhs
index decd406..461a3bf 100644 (file)
 -- bits it exports, we'd rather have Control.Concurrent and the other
 -- higher level modules be the home.  Hence:
 
+#include "Typeable.h"
+
 -- #not-home
 module GHC.Conc
        ( ThreadId(..)
 
-       -- Forking and suchlike
+       -- * Forking and suchlike
        , forkIO        -- :: IO a -> IO ThreadId
        , forkOnIO      -- :: Int -> IO a -> IO ThreadId
        , childHandler  -- :: Exception -> IO ()
@@ -35,13 +37,13 @@ module GHC.Conc
        , yield         -- :: IO ()
        , labelThread   -- :: ThreadId -> String -> IO ()
 
-       -- Waiting
+       -- * Waiting
        , threadDelay           -- :: Int -> IO ()
        , registerDelay         -- :: Int -> IO (TVar Bool)
        , threadWaitRead        -- :: Int -> IO ()
        , threadWaitWrite       -- :: Int -> IO ()
 
-       -- MVars
+       -- * MVars
        , MVar          -- abstract
        , newMVar       -- :: a -> IO (MVar a)
        , newEmptyMVar  -- :: IO (MVar a)
@@ -52,12 +54,14 @@ module GHC.Conc
        , isEmptyMVar   -- :: MVar a -> IO Bool
        , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
 
-       -- TVars
+       -- * TVars
        , STM           -- abstract
        , atomically    -- :: STM a -> IO a
        , retry         -- :: STM a
        , orElse        -- :: STM a -> STM a -> STM a
         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
+       , alwaysSucceeds -- :: STM a -> STM ()
+       , always        -- :: STM Bool -> STM ()
        , TVar          -- abstract
        , newTVar       -- :: a -> STM (TVar a)
        , newTVarIO     -- :: a -> STM (TVar a)
@@ -65,6 +69,7 @@ module GHC.Conc
        , writeTVar     -- :: a -> TVar a -> STM ()
        , unsafeIOToSTM -- :: IO a -> STM a
 
+       -- * Miscellaneous
 #ifdef mingw32_HOST_OS
        , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
        , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
@@ -74,13 +79,13 @@ module GHC.Conc
        , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
 #endif
 
-#ifndef mingw32_HOST_OS
        , ensureIOManagerIsRunning
-#endif
         ) where
 
 import System.Posix.Types
+#ifndef mingw32_HOST_OS
 import System.Posix.Internals
+#endif
 import Foreign
 import Foreign.C
 
@@ -94,7 +99,9 @@ import GHC.Base
 import GHC.IOBase
 import GHC.Num         ( Num(..) )
 import GHC.Real                ( fromIntegral, quot )
+#ifndef mingw32_HOST_OS
 import GHC.Base                ( Int(..) )
+#endif
 import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
@@ -290,11 +297,14 @@ TVars are shared memory locations which support atomic memory
 transactions.
 
 \begin{code}
-newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
+-- |A monad supporting atomic memory transactions.
+newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
 
 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
 unSTM (STM a) = a
 
+INSTANCE_TYPEABLE1(STM,stmTc,"STM")
+
 instance  Functor STM where
    fmap f x = x >>= (return . f)
 
@@ -326,6 +336,15 @@ unsafeIOToSTM :: IO a -> STM a
 unsafeIOToSTM (IO m) = STM m
 
 -- |Perform a series of STM actions atomically.
+--
+-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
+-- Any attempt to do so will result in a runtime error.  (Reason: allowing
+-- this would effectively allow a transaction inside a transaction, depending
+-- on exactly when the thunk is evaluated.)
+--
+-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
+-- and which allows top-level TVars to be allocated.
+
 atomically :: STM a -> IO a
 atomically (STM m) = IO (\s -> (atomically# m) s )
 
@@ -333,14 +352,15 @@ atomically (STM m) = IO (\s -> (atomically# m) s )
 -- values in TVars which mean that it should not continue (e.g. the TVars
 -- represent a shared buffer that is now empty).  The implementation may
 -- block the thread until one of the TVars that it has read from has been
--- udpated.
+-- udpated. (GHC only)
 retry :: STM a
 retry = STM $ \s# -> retry# s#
 
--- |Compose two alternative STM actions.  If the first action completes without
--- retrying then it forms the result of the orElse.  Otherwise, if the first
--- action retries, then the second action is tried in its place.  If both actions
--- retry then the orElse as a whole retries.
+-- |Compose two alternative STM actions (GHC only).  If the first action
+-- completes without retrying then it forms the result of the orElse.
+-- Otherwise, if the first action retries, then the second action is
+-- tried in its place.  If both actions retry then the orElse as a
+-- whole retries.
 orElse :: STM a -> STM a -> STM a
 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
 
@@ -348,7 +368,34 @@ orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
 catchSTM :: STM a -> (Exception -> STM a) -> STM a
 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
 
-data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
+-- | Low-level primitive on which always and alwaysSucceeds are built.
+-- checkInv differs form these in that (i) the invariant is not 
+-- checked when checkInv is called, only at the end of this and
+-- subsequent transcations, (ii) the invariant failure is indicated
+-- by raising an exception.
+checkInv :: STM a -> STM ()
+checkInv (STM m) = STM (\s -> (check# m) s)
+
+-- | alwaysSucceeds adds a new invariant that must be true when passed
+-- to alwaysSucceeds, at the end of the current transaction, and at
+-- the end of every subsequent transaction.  If it fails at any
+-- of those points then the transaction violating it is aborted
+-- and the exception raised by the invariant is propagated.
+alwaysSucceeds :: STM a -> STM ()
+alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
+                      checkInv i
+
+-- | always is a variant of alwaysSucceeds in which the invariant is
+-- expressed as an STM Bool action that must return True.  Returning
+-- False or raising an exception are both treated as invariant failures.
+always :: STM Bool -> STM ()
+always i = alwaysSucceeds ( do v <- i
+                               if (v) then return () else ( error "Transacional invariant violation" ) )
+
+-- |Shared memory locations that support atomic memory transactions.
+data TVar a = TVar (TVar# RealWorld a)
+
+INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
 
 instance Eq (TVar a) where
        (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
@@ -569,31 +616,50 @@ threadWaitWrite fd
 --
 threadDelay :: Int -> IO ()
 threadDelay time
-#ifndef mingw32_HOST_OS
   | threaded  = waitForDelayEvent time
-#else
-  | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
-#endif
   | otherwise = IO $ \s -> 
        case fromIntegral time of { I# time# ->
        case delay# time# s of { s -> (# s, () #)
        }}
 
+registerDelay :: Int -> IO (TVar Bool)
 registerDelay usecs 
-#ifndef mingw32_HOST_OS
   | threaded = waitForDelayEventSTM usecs
   | otherwise = error "registerDelay: requires -threaded"
-#else
-  = error "registerDelay: not currently supported on Windows"
-#endif
-
--- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
-#ifdef mingw32_HOST_OS
-foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
-#endif
 
 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 
+waitForDelayEvent :: Int -> IO ()
+waitForDelayEvent usecs = do
+  m <- newEmptyMVar
+  now <- getTicksOfDay
+  let target = now + usecs `quot` tick_usecs
+  atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
+  prodServiceThread
+  takeMVar m
+
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+   t <- atomically $ newTVar False
+   now <- getTicksOfDay
+   let target = now + usecs `quot` tick_usecs
+   atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
+   prodServiceThread
+   return t  
+    
+calculateTarget :: Int -> IO Int
+calculateTarget usecs = do
+    now <- getTicksOfDay
+    let -- Convert usecs to ticks, rounding up as we must wait /at least/
+        -- as long as we are told
+        usecs' = (usecs + tick_usecs - 1) `quot` tick_usecs
+        target = now + 1 -- getTicksOfDay will have rounded down, but
+                         -- again we need to wait for /at least/ as long
+                         -- as we are told, so add 1 to it
+               + usecs'
+    return target
+
 -- ----------------------------------------------------------------------------
 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
 
@@ -627,16 +693,18 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 --       hope we don't need to do any blocking IO between fork & exec.
 
 #ifndef mingw32_HOST_OS
-
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
+#endif
 
 data DelayReq
   = Delay    {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
   | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
 
+#ifndef mingw32_HOST_OS
 pendingEvents :: IORef [IOReq]
+#endif
 pendingDelays :: IORef [DelayReq]
        -- could use a strict list or array here
 {-# NOINLINE pendingEvents #-}
@@ -654,6 +722,146 @@ ensureIOManagerIsRunning
   | threaded  = seq pendingEvents $ return ()
   | otherwise = return ()
 
+insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+  | delayTime d1 <= delayTime d2 = d1 : ds
+  | otherwise                    = d2 : insertDelay d1 rest
+
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
+
+type Ticks = Int
+tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
+tick_usecs = 1000000 `quot` tick_freq :: Int
+tick_msecs = 1000 `quot` tick_freq :: Int
+
+-- XXX: move into GHC.IOBase from Data.IORef?
+atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
+atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
+
+foreign import ccall unsafe "getTicksOfDay" 
+  getTicksOfDay :: IO Ticks
+
+#ifdef mingw32_HOST_OS
+-- ----------------------------------------------------------------------------
+-- Windows IO manager thread
+
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+  wakeup <- c_getIOManagerEvent
+  forkIO $ service_loop wakeup []
+  return ()
+
+service_loop :: HANDLE          -- read end of pipe
+             -> [DelayReq]      -- current delay requests
+             -> IO ()
+
+service_loop wakeup old_delays = do
+  -- pick up new delay requests
+  new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
+  let  delays = foldr insertDelay old_delays new_delays
+
+  now <- getTicksOfDay
+  (delays', timeout) <- getDelay now delays
+
+  r <- c_WaitForSingleObject wakeup timeout
+  case r of
+    0xffffffff -> do c_maperrno; throwErrno "service_loop"
+    0 -> do
+        r <- c_readIOManagerEvent
+        exit <- 
+             case r of
+               _ | r == io_MANAGER_WAKEUP -> return False
+               _ | r == io_MANAGER_DIE    -> return True
+                0 -> return False -- spurious wakeup
+               r -> do start_console_handler (r `shiftR` 1); return False
+        if exit
+          then return ()
+          else service_cont wakeup delays'
+
+    _other -> service_cont wakeup delays' -- probably timeout        
+
+service_cont wakeup delays = do
+  takeMVar prodding
+  putMVar prodding False
+  service_loop wakeup delays
+
+-- must agree with rts/win32/ThrIOManager.c
+io_MANAGER_WAKEUP = 0xffffffff :: Word32
+io_MANAGER_DIE    = 0xfffffffe :: Word32
+
+start_console_handler :: Word32 -> IO ()
+start_console_handler r = do                   
+  stableptr <- peek console_handler
+  forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r)
+  return ()
+
+foreign import ccall "&console_handler" 
+   console_handler :: Ptr (StablePtr (CInt -> IO ()))
+
+stick :: IORef HANDLE
+{-# NOINLINE stick #-}
+stick = unsafePerformIO (newIORef nullPtr)
+
+prodding :: MVar Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newMVar False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+  b <- takeMVar prodding
+  if (not b) 
+    then do hdl <- readIORef stick
+            c_sendIOManagerEvent io_MANAGER_WAKEUP
+    else return ()
+  putMVar prodding True
+
+-- Walk the queue of pending delays, waking up any that have passed
+-- and return the smallest delay to wait for.  The queue of pending
+-- delays is kept ordered.
+getDelay :: Ticks -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay now [] = return ([], iNFINITE)
+getDelay now all@(d : rest) 
+  = case d of
+     Delay time m | now >= time -> do
+       putMVar m ()
+       getDelay now rest
+     DelaySTM time t | now >= time -> do
+       atomically $ writeTVar t True
+       getDelay now rest
+     _otherwise ->
+        return (all, (fromIntegral (delayTime d - now) * 
+                        fromIntegral tick_msecs))
+                        -- delay is in millisecs for WaitForSingleObject
+
+-- ToDo: this just duplicates part of System.Win32.Types, which isn't
+-- available yet.  We should move some Win32 functionality down here,
+-- maybe as part of the grand reorganisation of the base package...
+type HANDLE       = Ptr ()
+type DWORD        = Word32
+
+iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
+
+foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
+  c_getIOManagerEvent :: IO HANDLE
+
+foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
+  c_readIOManagerEvent :: IO Word32
+
+foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
+  c_sendIOManagerEvent :: Word32 -> IO ()
+
+foreign import ccall unsafe "maperrno"             -- in runProcess.c
+   c_maperrno :: IO ()
+
+foreign import stdcall "WaitForSingleObject"
+   c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
+
+#else
+-- ----------------------------------------------------------------------------
+-- Unix IO manager thread, using select()
+
 startIOManagerThread :: IO ()
 startIOManagerThread = do
         allocaArray 2 $ \fds -> do
@@ -747,13 +955,13 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
 
   service_loop wakeup readfds writefds ptimeval reqs' delays'
 
+io_MANAGER_WAKEUP = 0xff :: CChar
+io_MANAGER_DIE    = 0xfe :: CChar
+
 stick :: IORef Fd
 {-# NOINLINE stick #-}
 stick = unsafePerformIO (newIORef 0)
 
-io_MANAGER_WAKEUP = 0xff :: CChar
-io_MANAGER_DIE    = 0xfe :: CChar
-
 prodding :: MVar Bool
 {-# NOINLINE prodding #-}
 prodding = unsafePerformIO (newMVar False)
@@ -818,32 +1026,9 @@ waitForWriteEvent fd = do
   prodServiceThread
   takeMVar m
 
--- XXX: move into GHC.IOBase from Data.IORef?
-atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
-atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-
 -- -----------------------------------------------------------------------------
 -- Delays
 
-waitForDelayEvent :: Int -> IO ()
-waitForDelayEvent usecs = do
-  m <- newEmptyMVar
-  now <- getTicksOfDay
-  let target = now + usecs `quot` tick_usecs
-  atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
-  prodServiceThread
-  takeMVar m
-
--- Delays for use in STM
-waitForDelayEventSTM :: Int -> IO (TVar Bool)
-waitForDelayEventSTM usecs = do
-   t <- atomically $ newTVar False
-   now <- getTicksOfDay
-   let target = now + usecs `quot` tick_usecs
-   atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
-   prodServiceThread
-   return t  
-    
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
 -- delays is kept ordered.
@@ -861,30 +1046,20 @@ getDelay now ptimeval all@(d : rest)
        setTimevalTicks ptimeval (delayTime d - now)
        return (all,ptimeval)
 
-insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
-insertDelay d [] = [d]
-insertDelay d1 ds@(d2 : rest)
-  | delayTime d1 <= delayTime d2 = d1 : ds
-  | otherwise                    = d2 : insertDelay d1 rest
-
-delayTime (Delay t _) = t
-delayTime (DelaySTM t _) = t
-
-type Ticks = Int
-tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
-tick_usecs = 1000000 `quot` tick_freq :: Int
-
 newtype CTimeVal = CTimeVal ()
 
 foreign import ccall unsafe "sizeofTimeVal"
   sizeofTimeVal :: Int
 
-foreign import ccall unsafe "getTicksOfDay" 
-  getTicksOfDay :: IO Ticks
-
 foreign import ccall unsafe "setTimevalTicks" 
   setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
 
+{- 
+  On Win32 we're going to have a single Pipe, and a
+  waitForSingleObject with the delay time.  For signals, we send a
+  byte down the pipe just like on Unix.
+-}
+
 -- ----------------------------------------------------------------------------
 -- select() interface
 
@@ -915,4 +1090,5 @@ foreign import ccall unsafe "sizeof_fd_set"
   sizeofFdSet :: Int
 
 #endif
+
 \end{code}