X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=8476498c1f2d1caba26efdd6a7b0d53c3919b417;hb=04a66406a173c23a85081d3768a0364a03d6af5c;hp=279681beabd6040c4967ffa8eabd8682601dd88e;hpb=32db97748d339d778f56886be07c0e38a8bdbd9a;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 279681b..8476498 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -1,5 +1,5 @@ \begin{code} -{-# OPTIONS -fno-implicit-prelude #-} +{-# OPTIONS_GHC -fno-implicit-prelude #-} ----------------------------------------------------------------------------- -- | -- Module : GHC.Conc @@ -14,7 +14,12 @@ -- ----------------------------------------------------------------------------- -#include "config.h" +-- No: #hide, because bits of this module are exposed by the stm package. +-- However, we don't want this module to be the home location for the +-- bits it exports, we'd rather have Control.Concurrent and the other +-- higher level modules be the home. Hence: + +-- #not-home module GHC.Conc ( ThreadId(..) @@ -26,7 +31,6 @@ module GHC.Conc , pseq -- :: a -> b -> b , yield -- :: IO () , labelThread -- :: ThreadId -> String -> IO () - , forkProcessPrim -- :: IO Int -- Waiting , threadDelay -- :: Int -> IO () @@ -44,7 +48,19 @@ module GHC.Conc , isEmptyMVar -- :: MVar a -> IO Bool , addMVarFinalizer -- :: MVar a -> IO () -> IO () -#ifdef mingw32_TARGET_OS + -- TVars + , STM -- abstract + , atomically -- :: STM a -> IO a + , retry -- :: STM a + , orElse -- :: STM a -> STM a -> STM a + , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a + , TVar -- abstract + , newTVar -- :: a -> STM (TVar a) + , readTVar -- :: TVar a -> STM a + , writeTVar -- :: a -> TVar a -> STM () + , unsafeIOToSTM -- :: IO a -> STM a + +#ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int @@ -54,16 +70,23 @@ module GHC.Conc #endif ) where +import System.Posix.Types +import System.Posix.Internals +import Foreign +import Foreign.C + import Data.Maybe import GHC.Base -import GHC.IOBase ( IO(..), MVar(..), ioException, IOException(..), IOErrorType(..) ) -import GHC.Num ( fromInteger, negate ) -import GHC.Real ( fromIntegral ) +import GHC.IOBase +import GHC.Num ( Num(..) ) +import GHC.Real ( fromIntegral, quot ) import GHC.Base ( Int(..) ) import GHC.Exception ( Exception(..), AsyncException(..) ) import GHC.Pack ( packCString# ) import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) ) +import GHC.STRef +import Data.Typeable infixr 0 `par`, `pseq` \end{code} @@ -75,7 +98,7 @@ infixr 0 `par`, `pseq` %************************************************************************ \begin{code} -data ThreadId = ThreadId ThreadId# +data ThreadId = ThreadId ThreadId# deriving( Typeable ) -- ToDo: data ThreadId = ThreadId (Weak ThreadId#) -- But since ThreadId# is unlifted, the Weak type must use open -- type variables. @@ -154,29 +177,6 @@ labelThread (ThreadId t) str = IO $ \ s -> adr = byteArrayContents# ps in case (labelThread# t adr s) of s1 -> (# s1, () #) -{- | This function is a replacement for 'System.Posix.Process.forkProcessAll': -This implementation /will stop all other Concurrent Haskell threads/ in the -(heavyweight) forked copy. -'forkProcessPrim' returns the pid of the child process to the parent, 0 to the -child, and a value less than 0 in case of errors. See also: -'System.Posix.Process.forkProcess' in package @unix@. - -Without this function, you need excessive and often impractical -explicit synchronization using the regular Concurrent Haskell constructs to assure -that only the desired thread is running after the fork(). - -The stopped threads are /not/ garbage collected! This behaviour may change in -future releases. - -NOTE: currently, main threads are not stopped in the child process. -To work around this problem, call 'forkProcessPrim' from the main thread. --} - --- XXX RTS should know about 'pid_t'. - -forkProcessPrim :: IO Int -forkProcessPrim = IO $ \s -> case (forkProcess# s) of (# s1, id #) -> (# s1, (I# id) #) - -- Nota Bene: 'pseq' used to be 'seq' -- but 'seq' is now defined in PrelGHC -- @@ -197,6 +197,98 @@ par :: a -> b -> b par x y = case (par# x) of { _ -> lazy y } \end{code} + +%************************************************************************ +%* * +\subsection[stm]{Transactional heap operations} +%* * +%************************************************************************ + +TVars are shared memory locations which support atomic memory +transactions. + +\begin{code} +newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable ) + +unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #)) +unSTM (STM a) = a + +instance Functor STM where + fmap f x = x >>= (return . f) + +instance Monad STM where + {-# INLINE return #-} + {-# INLINE (>>) #-} + {-# INLINE (>>=) #-} + m >> k = thenSTM m k + return x = returnSTM x + m >>= k = bindSTM m k + +bindSTM :: STM a -> (a -> STM b) -> STM b +bindSTM (STM m) k = STM ( \s -> + case m s of + (# new_s, a #) -> unSTM (k a) new_s + ) + +thenSTM :: STM a -> STM b -> STM b +thenSTM (STM m) k = STM ( \s -> + case m s of + (# new_s, a #) -> unSTM k new_s + ) + +returnSTM :: a -> STM a +returnSTM x = STM (\s -> (# s, x #)) + +-- | Unsafely performs IO in the STM monad. +unsafeIOToSTM :: IO a -> STM a +unsafeIOToSTM (IO m) = STM m + +-- |Perform a series of STM actions atomically. +atomically :: STM a -> IO a +atomically (STM m) = IO (\s -> (atomically# m) s ) + +-- |Retry execution of the current memory transaction because it has seen +-- values in TVars which mean that it should not continue (e.g. the TVars +-- represent a shared buffer that is now empty). The implementation may +-- block the thread until one of the TVars that it has read from has been +-- udpated. +retry :: STM a +retry = STM $ \s# -> retry# s# + +-- |Compose two alternative STM actions. If the first action completes without +-- retrying then it forms the result of the orElse. Otherwise, if the first +-- action retries, then the second action is tried in its place. If both actions +-- retry then the orElse as a whole retries. +orElse :: STM a -> STM a -> STM a +orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s + +-- |Exception handling within STM actions. +catchSTM :: STM a -> (Exception -> STM a) -> STM a +catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s + +data TVar a = TVar (TVar# RealWorld a) deriving( Typeable ) + +instance Eq (TVar a) where + (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2# + +-- |Create a new TVar holding a value supplied +newTVar :: a -> STM (TVar a) +newTVar val = STM $ \s1# -> + case newTVar# val s1# of + (# s2#, tvar# #) -> (# s2#, TVar tvar# #) + +-- |Return the current value stored in a TVar +readTVar :: TVar a -> STM a +readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# + +-- |Write the supplied value into a TVar +writeTVar :: TVar a -> a -> STM () +writeTVar (TVar tvar#) val = STM $ \s1# -> + case writeTVar# tvar# val s1# of + s2# -> (# s2#, () #) + +\end{code} + %************************************************************************ %* * \subsection[mvars]{M-Structures} @@ -240,7 +332,7 @@ takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s# -- 'putMVar' will wait until it becomes empty. -- -- If several threads are competing to fill the same 'MVar', one is --- chosen to continue at random with the 'MVar' becomes empty. +-- chosen to continue at random when the 'MVar' becomes empty. putMVar :: MVar a -> a -> IO () putMVar (MVar mvar#) x = IO $ \ s# -> case putMVar# mvar# x s# of @@ -290,46 +382,21 @@ addMVarFinalizer (MVar m) finalizer = %* * %************************************************************************ -@threadWaitRead@ delays rescheduling of a thread until input on the -specified file descriptor is available for reading (just like select). -@threadWaitWrite@ is similar, but for writing on a file descriptor. - \begin{code} --- |The 'threadDelay' operation will cause the current thread to --- suspend for a given number of microseconds (GHC only). --- --- Note that the resolution --- used by the Haskell runtime system\'s internal timer together with the --- fact that the thread may take some time to be rescheduled after the --- time has expired, means that the accuracy is more like 1\/50 second. -threadDelay :: Int -> IO () - --- | Block the current thread until data is available to read on the --- given file descriptor (GHC only). -threadWaitRead :: Int -> IO () - --- | Block the current thread until data can be written to the --- given file descriptor (GHC only). -threadWaitWrite :: Int -> IO () - -threadDelay (I# ms) = IO $ \s -> case delay# ms s of s -> (# s, () #) -threadWaitRead (I# fd) = IO $ \s -> case waitRead# fd s of s -> (# s, () #) -threadWaitWrite (I# fd) = IO $ \s -> case waitWrite# fd s of s -> (# s, () #) - -#ifdef mingw32_TARGET_OS +#ifdef mingw32_HOST_OS -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional -- on Win32, but left in there because lib code (still) uses them (the manner -- in which they're used doesn't cause problems on a Win32 platform though.) asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) -asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = - IO $ \s -> case asyncRead# fd isSock len buf s of +asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncRead# fd isSock len buf s of (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) -asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = - IO $ \s -> case asyncWrite# fd isSock len buf s of +asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncWrite# fd isSock len buf s of (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int @@ -352,4 +419,319 @@ asyncWriteBA fd isSock len off bufB = asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) #endif + +-- ----------------------------------------------------------------------------- +-- Thread IO API + +-- | Block the current thread until data is available to read on the +-- given file descriptor (GHC only). +threadWaitRead :: Fd -> IO () +threadWaitRead fd +#ifndef mingw32_HOST_OS + | threaded = waitForReadEvent fd +#endif + | otherwise = IO $ \s -> + case fromIntegral fd of { I# fd# -> + case waitRead# fd# s of { s -> (# s, () #) + }} + +-- | Block the current thread until data can be written to the +-- given file descriptor (GHC only). +threadWaitWrite :: Fd -> IO () +threadWaitWrite fd +#ifndef mingw32_HOST_OS + | threaded = waitForWriteEvent fd +#endif + | otherwise = IO $ \s -> + case fromIntegral fd of { I# fd# -> + case waitWrite# fd# s of { s -> (# s, () #) + }} + +-- | Suspends the current thread for a given number of microseconds +-- (GHC only). +-- +-- Note that the resolution used by the Haskell runtime system's +-- internal timer is 1\/50 second, and 'threadDelay' will round its +-- argument up to the nearest multiple of this resolution. +-- +-- There is no guarantee that the thread will be rescheduled promptly +-- when the delay has expired, but the thread will never continue to +-- run /earlier/ than specified. +-- +threadDelay :: Int -> IO () +threadDelay time +#ifndef mingw32_HOST_OS + | threaded = waitForDelayEvent time +#else + | threaded = c_Sleep (fromIntegral (time `quot` 1000)) +#endif + | otherwise = IO $ \s -> + case fromIntegral time of { I# time# -> + case delay# time# s of { s -> (# s, () #) + }} + +-- On Windows, we just make a safe call to 'Sleep' to implement threadDelay. +#ifdef mingw32_HOST_OS +foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO () +#endif + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + +-- ---------------------------------------------------------------------------- +-- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay + +-- In the threaded RTS, we employ a single IO Manager thread to wait +-- for all outstanding IO requests (threadWaitRead,threadWaitWrite) +-- and delays (threadDelay). +-- +-- We can do this because in the threaded RTS the IO Manager can make +-- a non-blocking call to select(), so we don't have to do select() in +-- the scheduler as we have to in the non-threaded RTS. We get performance +-- benefits from doing it this way, because we only have to restart the select() +-- when a new request arrives, rather than doing one select() each time +-- around the scheduler loop. Furthermore, the scheduler can be simplified +-- by not having to check for completed IO requests. + +-- Issues, possible problems: +-- +-- - we might want bound threads to just do the blocking +-- operation rather than communicating with the IO manager +-- thread. This would prevent simgle-threaded programs which do +-- IO from requiring multiple OS threads. However, it would also +-- prevent bound threads waiting on IO from being killed or sent +-- exceptions. +-- +-- - Apprently exec() doesn't work on Linux in a multithreaded program. +-- I couldn't repeat this. +-- +-- - How do we handle signal delivery in the multithreaded RTS? +-- +-- - forkProcess will kill the IO manager thread. Let's just +-- hope we don't need to do any blocking IO between fork & exec. + +#ifndef mingw32_HOST_OS + +data IOReq + = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) + | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) + +data DelayReq + = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ()) + +pendingEvents :: IORef [IOReq] +pendingDelays :: IORef [DelayReq] + -- could use a strict list or array here +{-# NOINLINE pendingEvents #-} +{-# NOINLINE pendingDelays #-} +(pendingEvents,pendingDelays) = unsafePerformIO $ do + startIOServiceThread + reqs <- newIORef [] + dels <- newIORef [] + return (reqs, dels) + -- the first time we schedule an IO request, the service thread + -- will be created (cool, huh?) + +startIOServiceThread :: IO () +startIOServiceThread = do + allocaArray 2 $ \fds -> do + throwErrnoIfMinus1 "startIOServiceThread" (c_pipe fds) + rd_end <- peekElemOff fds 0 + wr_end <- peekElemOff fds 1 + writeIORef stick (fromIntegral wr_end) + quickForkIO $ do + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + return () + +-- XXX: move real forkIO here from Control.Concurrent? +quickForkIO action = IO $ \s -> + case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #) + +service_loop + :: Fd -- listen to this for wakeup calls + -> Ptr CFdSet + -> Ptr CFdSet + -> Ptr CTimeVal + -> [IOReq] + -> [DelayReq] + -> IO () +service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do + + -- pick up new IO requests + new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) + let reqs = new_reqs ++ old_reqs + + -- pick up new delay requests + new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) + let delays = foldr insertDelay old_delays new_delays + + -- build the FDSets for select() + fdZero readfds + fdZero writefds + fdSet wakeup readfds + maxfd <- buildFdSets 0 readfds writefds reqs + + -- perform the select() + let do_select delays = do + -- check the current time and wake up any thread in + -- threadDelay whose timeout has expired. Also find the + -- timeout value for the select() call. + now <- getTicksOfDay + (delays', timeout) <- getDelay now ptimeval delays + + res <- c_select ((max wakeup maxfd)+1) readfds writefds + nullPtr timeout + if (res == -1) + then do + err <- getErrno + if err == eINTR + then do_select delays' + else return (res,delays') + else + return (res,delays') + + (res,delays') <- do_select delays + -- ToDo: check result + + b <- takeMVar prodding + if b then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return () + else return () + putMVar prodding False + + reqs' <- completeRequests reqs readfds writefds [] + service_loop wakeup readfds writefds ptimeval reqs' delays' + +stick :: IORef Fd +{-# NOINLINE stick #-} +stick = unsafePerformIO (newIORef 0) + +prodding :: MVar Bool +{-# NOINLINE prodding #-} +prodding = unsafePerformIO (newMVar False) + +prodServiceThread :: IO () +prodServiceThread = do + b <- takeMVar prodding + if (not b) + then do fd <- readIORef stick + with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return () + else return () + putMVar prodding True + +-- ----------------------------------------------------------------------------- +-- IO requests + +buildFdSets maxfd readfds writefds [] = return maxfd +buildFdSets maxfd readfds writefds (Read fd m : reqs) = do + fdSet fd readfds + buildFdSets (max maxfd fd) readfds writefds reqs +buildFdSets maxfd readfds writefds (Write fd m : reqs) = do + fdSet fd writefds + buildFdSets (max maxfd fd) readfds writefds reqs + +completeRequests [] _ _ reqs' = return reqs' +completeRequests (Read fd m : reqs) readfds writefds reqs' = do + b <- fdIsSet fd readfds + if b /= 0 + then do putMVar m (); completeRequests reqs readfds writefds reqs' + else completeRequests reqs readfds writefds (Read fd m : reqs') +completeRequests (Write fd m : reqs) readfds writefds reqs' = do + b <- fdIsSet fd writefds + if b /= 0 + then do putMVar m (); completeRequests reqs readfds writefds reqs' + else completeRequests reqs readfds writefds (Write fd m : reqs') + +waitForReadEvent :: Fd -> IO () +waitForReadEvent fd = do + m <- newEmptyMVar + atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ())) + prodServiceThread + takeMVar m + +waitForWriteEvent :: Fd -> IO () +waitForWriteEvent fd = do + m <- newEmptyMVar + atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ())) + prodServiceThread + takeMVar m + +-- XXX: move into GHC.IOBase from Data.IORef? +atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b +atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s + +-- ----------------------------------------------------------------------------- +-- Delays + +waitForDelayEvent :: Int -> IO () +waitForDelayEvent usecs = do + m <- newEmptyMVar + now <- getTicksOfDay + let target = now + usecs `quot` tick_usecs + atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) + prodServiceThread + takeMVar m + +-- Walk the queue of pending delays, waking up any that have passed +-- and return the smallest delay to wait for. The queue of pending +-- delays is kept ordered. +getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) +getDelay now ptimeval [] = return ([],nullPtr) +getDelay now ptimeval all@(Delay time m : rest) + | now >= time = do + putMVar m () + getDelay now ptimeval rest + | otherwise = do + setTimevalTicks ptimeval (time - now) + return (all,ptimeval) + +insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] +insertDelay d@(Delay time m) [] = [d] +insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest) + | time <= time' = d1 : ds + | otherwise = d2 : insertDelay d1 rest + +type Ticks = Int +tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec) +tick_usecs = 1000000 `quot` tick_freq :: Int + +newtype CTimeVal = CTimeVal () + +foreign import ccall unsafe "sizeofTimeVal" + sizeofTimeVal :: Int + +foreign import ccall unsafe "getTicksOfDay" + getTicksOfDay :: IO Ticks + +foreign import ccall unsafe "setTimevalTicks" + setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO () + +-- ---------------------------------------------------------------------------- +-- select() interface + +-- ToDo: move to System.Posix.Internals? + +newtype CFdSet = CFdSet () + +foreign import ccall safe "select" + c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal + -> IO CInt + +foreign import ccall unsafe "hsFD_CLR" + fdClr :: Fd -> Ptr CFdSet -> IO () + +foreign import ccall unsafe "hsFD_ISSET" + fdIsSet :: Fd -> Ptr CFdSet -> IO CInt + +foreign import ccall unsafe "hsFD_SET" + fdSet :: Fd -> Ptr CFdSet -> IO () + +foreign import ccall unsafe "hsFD_ZERO" + fdZero :: Ptr CFdSet -> IO () + +foreign import ccall unsafe "sizeof_fd_set" + sizeofFdSet :: Int + +#endif \end{code}