-- bits it exports, we'd rather have Control.Concurrent and the other
-- higher level modules be the home. Hence:
+#include "Typeable.h"
+
-- #not-home
module GHC.Conc
( ThreadId(..)
- -- Forking and suchlike
+ -- * Forking and suchlike
, forkIO -- :: IO a -> IO ThreadId
, forkOnIO -- :: Int -> IO a -> IO ThreadId
, childHandler -- :: Exception -> IO ()
, yield -- :: IO ()
, labelThread -- :: ThreadId -> String -> IO ()
- -- Waiting
+ -- * Waiting
, threadDelay -- :: Int -> IO ()
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
- -- MVars
+ -- * MVars
, MVar -- abstract
, newMVar -- :: a -> IO (MVar a)
, newEmptyMVar -- :: IO (MVar a)
, isEmptyMVar -- :: MVar a -> IO Bool
, addMVarFinalizer -- :: MVar a -> IO () -> IO ()
- -- TVars
+ -- * TVars
, STM -- abstract
, atomically -- :: STM a -> IO a
, retry -- :: STM a
, orElse -- :: STM a -> STM a -> STM a
, catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
+ , alwaysSucceeds -- :: STM a -> STM ()
+ , always -- :: STM Bool -> STM ()
, TVar -- abstract
, newTVar -- :: a -> STM (TVar a)
, newTVarIO -- :: a -> STM (TVar a)
, writeTVar -- :: a -> TVar a -> STM ()
, unsafeIOToSTM -- :: IO a -> STM a
+ -- * Miscellaneous
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
#endif
-#ifndef mingw32_HOST_OS
, ensureIOManagerIsRunning
-#endif
) where
import System.Posix.Types
+#ifndef mingw32_HOST_OS
import System.Posix.Internals
+#endif
import Foreign
import Foreign.C
import GHC.IOBase
import GHC.Num ( Num(..) )
import GHC.Real ( fromIntegral, quot )
+#ifndef mingw32_HOST_OS
import GHC.Base ( Int(..) )
+#endif
import GHC.Exception ( catchException, Exception(..), AsyncException(..) )
import GHC.Pack ( packCString# )
import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
transactions.
\begin{code}
-newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable )
+-- |A monad supporting atomic memory transactions.
+newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
unSTM (STM a) = a
+INSTANCE_TYPEABLE1(STM,stmTc,"STM")
+
instance Functor STM where
fmap f x = x >>= (return . f)
unsafeIOToSTM (IO m) = STM m
-- |Perform a series of STM actions atomically.
+--
+-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
+-- Any attempt to do so will result in a runtime error. (Reason: allowing
+-- this would effectively allow a transaction inside a transaction, depending
+-- on exactly when the thunk is evaluated.)
+--
+-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
+-- and which allows top-level TVars to be allocated.
+
atomically :: STM a -> IO a
atomically (STM m) = IO (\s -> (atomically# m) s )
-- values in TVars which mean that it should not continue (e.g. the TVars
-- represent a shared buffer that is now empty). The implementation may
-- block the thread until one of the TVars that it has read from has been
--- udpated.
+-- udpated. (GHC only)
retry :: STM a
retry = STM $ \s# -> retry# s#
--- |Compose two alternative STM actions. If the first action completes without
--- retrying then it forms the result of the orElse. Otherwise, if the first
--- action retries, then the second action is tried in its place. If both actions
--- retry then the orElse as a whole retries.
+-- |Compose two alternative STM actions (GHC only). If the first action
+-- completes without retrying then it forms the result of the orElse.
+-- Otherwise, if the first action retries, then the second action is
+-- tried in its place. If both actions retry then the orElse as a
+-- whole retries.
orElse :: STM a -> STM a -> STM a
orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
catchSTM :: STM a -> (Exception -> STM a) -> STM a
catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
-data TVar a = TVar (TVar# RealWorld a) deriving( Typeable )
+-- | Low-level primitive on which always and alwaysSucceeds are built.
+-- checkInv differs form these in that (i) the invariant is not
+-- checked when checkInv is called, only at the end of this and
+-- subsequent transcations, (ii) the invariant failure is indicated
+-- by raising an exception.
+checkInv :: STM a -> STM ()
+checkInv (STM m) = STM (\s -> (check# m) s)
+
+-- | alwaysSucceeds adds a new invariant that must be true when passed
+-- to alwaysSucceeds, at the end of the current transaction, and at
+-- the end of every subsequent transaction. If it fails at any
+-- of those points then the transaction violating it is aborted
+-- and the exception raised by the invariant is propagated.
+alwaysSucceeds :: STM a -> STM ()
+alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
+ checkInv i
+
+-- | always is a variant of alwaysSucceeds in which the invariant is
+-- expressed as an STM Bool action that must return True. Returning
+-- False or raising an exception are both treated as invariant failures.
+always :: STM Bool -> STM ()
+always i = alwaysSucceeds ( do v <- i
+ if (v) then return () else ( error "Transacional invariant violation" ) )
+
+-- |Shared memory locations that support atomic memory transactions.
+data TVar a = TVar (TVar# RealWorld a)
+
+INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
instance Eq (TVar a) where
(TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
--
threadDelay :: Int -> IO ()
threadDelay time
-#ifndef mingw32_HOST_OS
| threaded = waitForDelayEvent time
-#else
- | threaded = c_Sleep (fromIntegral (time `quot` 1000))
-#endif
| otherwise = IO $ \s ->
case fromIntegral time of { I# time# ->
case delay# time# s of { s -> (# s, () #)
}}
+registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs
-#ifndef mingw32_HOST_OS
| threaded = waitForDelayEventSTM usecs
| otherwise = error "registerDelay: requires -threaded"
-#else
- = error "registerDelay: not currently supported on Windows"
-#endif
-
--- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
-#ifdef mingw32_HOST_OS
-foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
-#endif
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
+waitForDelayEvent :: Int -> IO ()
+waitForDelayEvent usecs = do
+ m <- newEmptyMVar
+ now <- getTicksOfDay
+ let target = now + usecs `quot` tick_usecs
+ atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
+ prodServiceThread
+ takeMVar m
+
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+ t <- atomically $ newTVar False
+ now <- getTicksOfDay
+ let target = now + usecs `quot` tick_usecs
+ atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
+ prodServiceThread
+ return t
+
+calculateTarget :: Int -> IO Int
+calculateTarget usecs = do
+ now <- getTicksOfDay
+ let -- Convert usecs to ticks, rounding up as we must wait /at least/
+ -- as long as we are told
+ usecs' = (usecs + tick_usecs - 1) `quot` tick_usecs
+ target = now + 1 -- getTicksOfDay will have rounded down, but
+ -- again we need to wait for /at least/ as long
+ -- as we are told, so add 1 to it
+ + usecs'
+ return target
+
-- ----------------------------------------------------------------------------
-- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
-- hope we don't need to do any blocking IO between fork & exec.
#ifndef mingw32_HOST_OS
-
data IOReq
= Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
| Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
+#endif
data DelayReq
= Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
| DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
+#ifndef mingw32_HOST_OS
pendingEvents :: IORef [IOReq]
+#endif
pendingDelays :: IORef [DelayReq]
-- could use a strict list or array here
{-# NOINLINE pendingEvents #-}
| threaded = seq pendingEvents $ return ()
| otherwise = return ()
+insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+ | delayTime d1 <= delayTime d2 = d1 : ds
+ | otherwise = d2 : insertDelay d1 rest
+
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
+
+type Ticks = Int
+tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
+tick_usecs = 1000000 `quot` tick_freq :: Int
+tick_msecs = 1000 `quot` tick_freq :: Int
+
+-- XXX: move into GHC.IOBase from Data.IORef?
+atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
+atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
+
+foreign import ccall unsafe "getTicksOfDay"
+ getTicksOfDay :: IO Ticks
+
+#ifdef mingw32_HOST_OS
+-- ----------------------------------------------------------------------------
+-- Windows IO manager thread
+
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+ wakeup <- c_getIOManagerEvent
+ forkIO $ service_loop wakeup []
+ return ()
+
+service_loop :: HANDLE -- read end of pipe
+ -> [DelayReq] -- current delay requests
+ -> IO ()
+
+service_loop wakeup old_delays = do
+ -- pick up new delay requests
+ new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
+ let delays = foldr insertDelay old_delays new_delays
+
+ now <- getTicksOfDay
+ (delays', timeout) <- getDelay now delays
+
+ r <- c_WaitForSingleObject wakeup timeout
+ case r of
+ 0xffffffff -> do c_maperrno; throwErrno "service_loop"
+ 0 -> do
+ r <- c_readIOManagerEvent
+ exit <-
+ case r of
+ _ | r == io_MANAGER_WAKEUP -> return False
+ _ | r == io_MANAGER_DIE -> return True
+ 0 -> return False -- spurious wakeup
+ r -> do start_console_handler (r `shiftR` 1); return False
+ if exit
+ then return ()
+ else service_cont wakeup delays'
+
+ _other -> service_cont wakeup delays' -- probably timeout
+
+service_cont wakeup delays = do
+ takeMVar prodding
+ putMVar prodding False
+ service_loop wakeup delays
+
+-- must agree with rts/win32/ThrIOManager.c
+io_MANAGER_WAKEUP = 0xffffffff :: Word32
+io_MANAGER_DIE = 0xfffffffe :: Word32
+
+start_console_handler :: Word32 -> IO ()
+start_console_handler r = do
+ stableptr <- peek console_handler
+ forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r)
+ return ()
+
+foreign import ccall "&console_handler"
+ console_handler :: Ptr (StablePtr (CInt -> IO ()))
+
+stick :: IORef HANDLE
+{-# NOINLINE stick #-}
+stick = unsafePerformIO (newIORef nullPtr)
+
+prodding :: MVar Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newMVar False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+ b <- takeMVar prodding
+ if (not b)
+ then do hdl <- readIORef stick
+ c_sendIOManagerEvent io_MANAGER_WAKEUP
+ else return ()
+ putMVar prodding True
+
+-- Walk the queue of pending delays, waking up any that have passed
+-- and return the smallest delay to wait for. The queue of pending
+-- delays is kept ordered.
+getDelay :: Ticks -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay now [] = return ([], iNFINITE)
+getDelay now all@(d : rest)
+ = case d of
+ Delay time m | now >= time -> do
+ putMVar m ()
+ getDelay now rest
+ DelaySTM time t | now >= time -> do
+ atomically $ writeTVar t True
+ getDelay now rest
+ _otherwise ->
+ return (all, (fromIntegral (delayTime d - now) *
+ fromIntegral tick_msecs))
+ -- delay is in millisecs for WaitForSingleObject
+
+-- ToDo: this just duplicates part of System.Win32.Types, which isn't
+-- available yet. We should move some Win32 functionality down here,
+-- maybe as part of the grand reorganisation of the base package...
+type HANDLE = Ptr ()
+type DWORD = Word32
+
+iNFINITE = 0xFFFFFFFF :: DWORD -- urgh
+
+foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_getIOManagerEvent :: IO HANDLE
+
+foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_readIOManagerEvent :: IO Word32
+
+foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_sendIOManagerEvent :: Word32 -> IO ()
+
+foreign import ccall unsafe "maperrno" -- in runProcess.c
+ c_maperrno :: IO ()
+
+foreign import stdcall "WaitForSingleObject"
+ c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
+
+#else
+-- ----------------------------------------------------------------------------
+-- Unix IO manager thread, using select()
+
startIOManagerThread :: IO ()
startIOManagerThread = do
allocaArray 2 $ \fds -> do
service_loop wakeup readfds writefds ptimeval reqs' delays'
+io_MANAGER_WAKEUP = 0xff :: CChar
+io_MANAGER_DIE = 0xfe :: CChar
+
stick :: IORef Fd
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef 0)
-io_MANAGER_WAKEUP = 0xff :: CChar
-io_MANAGER_DIE = 0xfe :: CChar
-
prodding :: MVar Bool
{-# NOINLINE prodding #-}
prodding = unsafePerformIO (newMVar False)
prodServiceThread
takeMVar m
--- XXX: move into GHC.IOBase from Data.IORef?
-atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
-atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-
-- -----------------------------------------------------------------------------
-- Delays
-waitForDelayEvent :: Int -> IO ()
-waitForDelayEvent usecs = do
- m <- newEmptyMVar
- now <- getTicksOfDay
- let target = now + usecs `quot` tick_usecs
- atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
- prodServiceThread
- takeMVar m
-
--- Delays for use in STM
-waitForDelayEventSTM :: Int -> IO (TVar Bool)
-waitForDelayEventSTM usecs = do
- t <- atomically $ newTVar False
- now <- getTicksOfDay
- let target = now + usecs `quot` tick_usecs
- atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
- prodServiceThread
- return t
-
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
setTimevalTicks ptimeval (delayTime d - now)
return (all,ptimeval)
-insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
-insertDelay d [] = [d]
-insertDelay d1 ds@(d2 : rest)
- | delayTime d1 <= delayTime d2 = d1 : ds
- | otherwise = d2 : insertDelay d1 rest
-
-delayTime (Delay t _) = t
-delayTime (DelaySTM t _) = t
-
-type Ticks = Int
-tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
-tick_usecs = 1000000 `quot` tick_freq :: Int
-
newtype CTimeVal = CTimeVal ()
foreign import ccall unsafe "sizeofTimeVal"
sizeofTimeVal :: Int
-foreign import ccall unsafe "getTicksOfDay"
- getTicksOfDay :: IO Ticks
-
foreign import ccall unsafe "setTimevalTicks"
setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
+{-
+ On Win32 we're going to have a single Pipe, and a
+ waitForSingleObject with the delay time. For signals, we send a
+ byte down the pipe just like on Unix.
+-}
+
-- ----------------------------------------------------------------------------
-- select() interface
sizeofFdSet :: Int
#endif
+
\end{code}