--
-----------------------------------------------------------------------------
-#include "ghcconfig.h"
+-- No: #hide, because bits of this module are exposed by the stm package.
+-- However, we don't want this module to be the home location for the
+-- bits it exports, we'd rather have Control.Concurrent and the other
+-- higher level modules be the home. Hence:
+
+#include "Typeable.h"
+
+-- #not-home
module GHC.Conc
( ThreadId(..)
- -- Forking and suchlike
+ -- * Forking and suchlike
+ , forkIO -- :: IO a -> IO ThreadId
+ , forkOnIO -- :: Int -> IO a -> IO ThreadId
+ , childHandler -- :: Exception -> IO ()
, myThreadId -- :: IO ThreadId
, killThread -- :: ThreadId -> IO ()
, throwTo -- :: ThreadId -> Exception -> IO ()
, yield -- :: IO ()
, labelThread -- :: ThreadId -> String -> IO ()
- -- Waiting
+ -- * Waiting
, threadDelay -- :: Int -> IO ()
+ , registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
- -- MVars
+ -- * MVars
, MVar -- abstract
, newMVar -- :: a -> IO (MVar a)
, newEmptyMVar -- :: IO (MVar a)
, isEmptyMVar -- :: MVar a -> IO Bool
, addMVarFinalizer -- :: MVar a -> IO () -> IO ()
- -- TVars
+ -- * TVars
, STM -- abstract
, atomically -- :: STM a -> IO a
, retry -- :: STM a
, orElse -- :: STM a -> STM a -> STM a
, catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
+ , alwaysSucceeds -- :: STM a -> STM ()
+ , always -- :: STM Bool -> STM ()
, TVar -- abstract
, newTVar -- :: a -> STM (TVar a)
+ , newTVarIO -- :: a -> STM (TVar a)
, readTVar -- :: TVar a -> STM a
, writeTVar -- :: a -> TVar a -> STM ()
, unsafeIOToSTM -- :: IO a -> STM a
+ -- * Miscellaneous
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
, asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
#endif
+
+#ifndef mingw32_HOST_OS
+ , ensureIOManagerIsRunning
+#endif
) where
import System.Posix.Types
import Foreign
import Foreign.C
+#ifndef __HADDOCK__
+import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow )
+#endif
+
import Data.Maybe
import GHC.Base
import GHC.Num ( Num(..) )
import GHC.Real ( fromIntegral, quot )
import GHC.Base ( Int(..) )
-import GHC.Exception ( Exception(..), AsyncException(..) )
+import GHC.Exception ( catchException, Exception(..), AsyncException(..) )
import GHC.Pack ( packCString# )
import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
import GHC.STRef
+import GHC.Show ( Show(..), showString )
import Data.Typeable
-#include "Typeable.h"
infixr 0 `par`, `pseq`
\end{code}
%************************************************************************
\begin{code}
-data ThreadId = ThreadId ThreadId#
+data ThreadId = ThreadId ThreadId# deriving( Typeable )
-- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
-- But since ThreadId# is unlifted, the Weak type must use open
-- type variables.
it defines 'ThreadId' as a synonym for ().
-}
-INSTANCE_TYPEABLE0(ThreadId,threadIdTc,"ThreadId")
+instance Show ThreadId where
+ showsPrec d t =
+ showString "ThreadId " .
+ showsPrec d (getThreadId (id2TSO t))
+
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+
+id2TSO :: ThreadId -> ThreadId#
+id2TSO (ThreadId t) = t
+
+foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
+-- Returns -1, 0, 1
+
+cmpThread :: ThreadId -> ThreadId -> Ordering
+cmpThread t1 t2 =
+ case cmp_thread (id2TSO t1) (id2TSO t2) of
+ -1 -> LT
+ 0 -> EQ
+ _ -> GT -- must be 1
+instance Eq ThreadId where
+ t1 == t2 =
+ case t1 `cmpThread` t2 of
+ EQ -> True
+ _ -> False
---forkIO has now been hoisted out into the Concurrent library.
+instance Ord ThreadId where
+ compare = cmpThread
+
+{- |
+This sparks off a new thread to run the 'IO' computation passed as the
+first argument, and returns the 'ThreadId' of the newly created
+thread.
+
+The new thread will be a lightweight thread; if you want to use a foreign
+library that uses thread-local storage, use 'forkOS' instead.
+-}
+forkIO :: IO () -> IO ThreadId
+forkIO action = IO $ \ s ->
+ case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+ action_plus = catchException action childHandler
+
+forkOnIO :: Int -> IO () -> IO ThreadId
+forkOnIO (I# cpu) action = IO $ \ s ->
+ case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
+ where
+ action_plus = catchException action childHandler
+
+childHandler :: Exception -> IO ()
+childHandler err = catchException (real_handler err) childHandler
+
+real_handler :: Exception -> IO ()
+real_handler ex =
+ case ex of
+ -- ignore thread GC and killThread exceptions:
+ BlockedOnDeadMVar -> return ()
+ BlockedIndefinitely -> return ()
+ AsyncException ThreadKilled -> return ()
+
+ -- report all others:
+ AsyncException StackOverflow -> reportStackOverflow
+ other -> reportError other
{- | 'killThread' terminates the given thread (GHC only).
Any work already done by the thread isn\'t
thread has received the exception. This is a useful property to know
when dealing with race conditions: eg. if there are two threads that
can kill each other, it is guaranteed that only one of the threads
-will get to kill the other. -}
+will get to kill the other.
+
+If the target thread is currently making a foreign call, then the
+exception will not be raised (and hence 'throwTo' will not return)
+until the call has completed. This is the case regardless of whether
+the call is inside a 'block' or not.
+ -}
throwTo :: ThreadId -> Exception -> IO ()
throwTo (ThreadId id) ex = IO $ \ s ->
case (killThread# id ex s) of s1 -> (# s1, () #)
transactions.
\begin{code}
+-- |A monad supporting atomic memory transactions.
newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
-INSTANCE_TYPEABLE1(STM,stmTc,"STM" )
-
unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
unSTM (STM a) = a
+INSTANCE_TYPEABLE1(STM,stmTc,"STM")
+
instance Functor STM where
fmap f x = x >>= (return . f)
unsafeIOToSTM (IO m) = STM m
-- |Perform a series of STM actions atomically.
+--
+-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'.
+-- Any attempt to do so will result in a runtime error. (Reason: allowing
+-- this would effectively allow a transaction inside a transaction, depending
+-- on exactly when the thunk is evaluated.)
+--
+-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
+-- and which allows top-level TVars to be allocated.
+
atomically :: STM a -> IO a
atomically (STM m) = IO (\s -> (atomically# m) s )
-- values in TVars which mean that it should not continue (e.g. the TVars
-- represent a shared buffer that is now empty). The implementation may
-- block the thread until one of the TVars that it has read from has been
--- udpated.
+-- udpated. (GHC only)
retry :: STM a
retry = STM $ \s# -> retry# s#
--- |Compose two alternative STM actions. If the first action completes without
--- retrying then it forms the result of the orElse. Otherwise, if the first
--- action retries, then the second action is tried in its place. If both actions
--- retry then the orElse as a whole retries.
+-- |Compose two alternative STM actions (GHC only). If the first action
+-- completes without retrying then it forms the result of the orElse.
+-- Otherwise, if the first action retries, then the second action is
+-- tried in its place. If both actions retry then the orElse as a
+-- whole retries.
orElse :: STM a -> STM a -> STM a
orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
catchSTM :: STM a -> (Exception -> STM a) -> STM a
catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
+-- | Low-level primitive on which always and alwaysSucceeds are built.
+-- checkInv differs form these in that (i) the invariant is not
+-- checked when checkInv is called, only at the end of this and
+-- subsequent transcations, (ii) the invariant failure is indicated
+-- by raising an exception.
+checkInv :: STM a -> STM ()
+checkInv (STM m) = STM (\s -> (check# m) s)
+
+-- | alwaysSucceeds adds a new invariant that must be true when passed
+-- to alwaysSucceeds, at the end of the current transaction, and at
+-- the end of every subsequent transaction. If it fails at any
+-- of those points then the transaction violating it is aborted
+-- and the exception raised by the invariant is propagated.
+alwaysSucceeds :: STM a -> STM ()
+alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
+ checkInv i
+
+-- | always is a variant of alwaysSucceeds in which the invariant is
+-- expressed as an STM Bool action that must return True. Returning
+-- False or raising an exception are both treated as invariant failures.
+always :: STM Bool -> STM ()
+always i = alwaysSucceeds ( do v <- i
+ if (v) then return () else ( error "Transacional invariant violation" ) )
+
+-- |Shared memory locations that support atomic memory transactions.
data TVar a = TVar (TVar# RealWorld a)
-INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar" )
+INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
instance Eq (TVar a) where
(TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
case newTVar# val s1# of
(# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+-- |@IO@ version of 'newTVar'. This is useful for creating top-level
+-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
+-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
+-- possible.
+newTVarIO :: a -> IO (TVar a)
+newTVarIO val = IO $ \s1# ->
+ case newTVar# val s1# of
+ (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+
-- |Return the current value stored in a TVar
readTVar :: TVar a -> STM a
readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
\begin{code}
--Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
-INSTANCE_TYPEABLE1(MVar,mvarTc,"MVar" )
-
-- |Create an 'MVar' which is initially empty.
newEmptyMVar :: IO (MVar a)
newEmptyMVar = IO $ \ s# ->
-- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
-- the 'MVar' is left empty.
--
--- If several threads are competing to take the same 'MVar', one is chosen
--- to continue at random when the 'MVar' becomes full.
+-- There are two further important properties of 'takeMVar':
+--
+-- * 'takeMVar' is single-wakeup. That is, if there are multiple
+-- threads blocked in 'takeMVar', and the 'MVar' becomes full,
+-- only one thread will be woken up. The runtime guarantees that
+-- the woken thread completes its 'takeMVar' operation.
+--
+-- * When multiple threads are blocked on an 'MVar', they are
+-- woken up in FIFO order. This is useful for providing
+-- fairness properties of abstractions built using 'MVar's.
+--
takeMVar :: MVar a -> IO a
takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
-- |Put a value into an 'MVar'. If the 'MVar' is currently full,
-- 'putMVar' will wait until it becomes empty.
--
--- If several threads are competing to fill the same 'MVar', one is
--- chosen to continue at random when the 'MVar' becomes empty.
+-- There are two further important properties of 'putMVar':
+--
+-- * 'putMVar' is single-wakeup. That is, if there are multiple
+-- threads blocked in 'putMVar', and the 'MVar' becomes empty,
+-- only one thread will be woken up. The runtime guarantees that
+-- the woken thread completes its 'putMVar' operation.
+--
+-- * When multiple threads are blocked on an 'MVar', they are
+-- woken up in FIFO order. This is useful for providing
+-- fairness properties of abstractions built using 'MVar's.
+--
putMVar :: MVar a -> a -> IO ()
putMVar (MVar mvar#) x = IO $ \ s# ->
case putMVar# mvar# x s# of
case delay# time# s of { s -> (# s, () #)
}}
+registerDelay :: Int -> IO (TVar Bool)
+registerDelay usecs
+#ifndef mingw32_HOST_OS
+ | threaded = waitForDelayEventSTM usecs
+ | otherwise = error "registerDelay: requires -threaded"
+#else
+ = error "registerDelay: not currently supported on Windows"
+#endif
+
-- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
#ifdef mingw32_HOST_OS
-foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO ()
+foreign import stdcall safe "Sleep" c_Sleep :: CInt -> IO ()
#endif
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
| Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
data DelayReq
- = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+ = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+ | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
pendingEvents :: IORef [IOReq]
pendingDelays :: IORef [DelayReq]
{-# NOINLINE pendingEvents #-}
{-# NOINLINE pendingDelays #-}
(pendingEvents,pendingDelays) = unsafePerformIO $ do
- startIOServiceThread
+ startIOManagerThread
reqs <- newIORef []
dels <- newIORef []
return (reqs, dels)
-- the first time we schedule an IO request, the service thread
-- will be created (cool, huh?)
-startIOServiceThread :: IO ()
-startIOServiceThread = do
+ensureIOManagerIsRunning :: IO ()
+ensureIOManagerIsRunning
+ | threaded = seq pendingEvents $ return ()
+ | otherwise = return ()
+
+startIOManagerThread :: IO ()
+startIOManagerThread = do
allocaArray 2 $ \fds -> do
- throwErrnoIfMinus1 "startIOServiceThread" (c_pipe fds)
+ throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
rd_end <- peekElemOff fds 0
wr_end <- peekElemOff fds 1
writeIORef stick (fromIntegral wr_end)
- quickForkIO $ do
+ c_setIOManagerPipe wr_end
+ forkIO $ do
allocaBytes sizeofFdSet $ \readfds -> do
allocaBytes sizeofFdSet $ \writefds -> do
allocaBytes sizeofTimeVal $ \timeval -> do
service_loop (fromIntegral rd_end) readfds writefds timeval [] []
return ()
--- XXX: move real forkIO here from Control.Concurrent?
-quickForkIO action = IO $ \s ->
- case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
-
service_loop
:: Fd -- listen to this for wakeup calls
-> Ptr CFdSet
fdSet wakeup readfds
maxfd <- buildFdSets 0 readfds writefds reqs
- -- check the current time and wake up any thread in threadDelay whose
- -- timeout has expired. Also find the timeout value for the select() call.
- now <- getTicksOfDay
- (delays', timeout) <- getDelay now ptimeval delays
-
-- perform the select()
- let do_select = do
+ let do_select delays = do
+ -- check the current time and wake up any thread in
+ -- threadDelay whose timeout has expired. Also find the
+ -- timeout value for the select() call.
+ now <- getTicksOfDay
+ (delays', timeout) <- getDelay now ptimeval delays
+
res <- c_select ((max wakeup maxfd)+1) readfds writefds
nullPtr timeout
if (res == -1)
then do
err <- getErrno
- if err == eINTR
- then do_select
- else return res
+ case err of
+ _ | err == eINTR -> do_select delays'
+ -- EINTR: just redo the select()
+ _ | err == eBADF -> return (True, delays)
+ -- EBADF: one of the file descriptors is closed or bad,
+ -- we don't know which one, so wake everyone up.
+ _ | otherwise -> throwErrno "select"
+ -- otherwise (ENOMEM or EINVAL) something has gone
+ -- wrong; report the error.
else
- return res
- res <- do_select
- -- ToDo: check result
-
- b <- takeMVar prodding
- if b then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return ()
- else return ()
+ return (False,delays')
+
+ (wakeup_all,delays') <- do_select delays
+
+ exit <-
+ if wakeup_all then return False
+ else do
+ b <- fdIsSet wakeup readfds
+ if b == 0
+ then return False
+ else alloca $ \p -> do
+ c_read (fromIntegral wakeup) p 1; return ()
+ s <- peek p
+ case s of
+ _ | s == io_MANAGER_WAKEUP -> return False
+ _ | s == io_MANAGER_DIE -> return True
+ _ -> do handler_tbl <- peek handlers
+ sp <- peekElemOff handler_tbl (fromIntegral s)
+ forkIO (do io <- deRefStablePtr sp; io)
+ return False
+
+ if exit then return () else do
+
+ takeMVar prodding
putMVar prodding False
- reqs' <- completeRequests reqs readfds writefds []
+ reqs' <- if wakeup_all then do wakeupAll reqs; return []
+ else completeRequests reqs readfds writefds []
+
service_loop wakeup readfds writefds ptimeval reqs' delays'
stick :: IORef Fd
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef 0)
+io_MANAGER_WAKEUP = 0xff :: CChar
+io_MANAGER_DIE = 0xfe :: CChar
+
prodding :: MVar Bool
{-# NOINLINE prodding #-}
prodding = unsafePerformIO (newMVar False)
b <- takeMVar prodding
if (not b)
then do fd <- readIORef stick
- with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
+ with io_MANAGER_WAKEUP $ \pbuf -> do
+ c_write (fromIntegral fd) pbuf 1; return ()
else return ()
putMVar prodding True
+foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
+
+foreign import ccall "setIOManagerPipe"
+ c_setIOManagerPipe :: CInt -> IO ()
+
-- -----------------------------------------------------------------------------
-- IO requests
buildFdSets maxfd readfds writefds [] = return maxfd
-buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
- fdSet fd readfds
- buildFdSets (max maxfd fd) readfds writefds reqs
-buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
- fdSet fd writefds
- buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Read fd m : reqs)
+ | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
+ | otherwise = do
+ fdSet fd readfds
+ buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Write fd m : reqs)
+ | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
+ | otherwise = do
+ fdSet fd writefds
+ buildFdSets (max maxfd fd) readfds writefds reqs
completeRequests [] _ _ reqs' = return reqs'
completeRequests (Read fd m : reqs) readfds writefds reqs' = do
then do putMVar m (); completeRequests reqs readfds writefds reqs'
else completeRequests reqs readfds writefds (Write fd m : reqs')
+wakeupAll [] = return ()
+wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs
+wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs
+
waitForReadEvent :: Fd -> IO ()
waitForReadEvent fd = do
m <- newEmptyMVar
prodServiceThread
takeMVar m
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+ t <- atomically $ newTVar False
+ now <- getTicksOfDay
+ let target = now + usecs `quot` tick_usecs
+ atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
+ prodServiceThread
+ return t
+
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
getDelay now ptimeval [] = return ([],nullPtr)
-getDelay now ptimeval all@(Delay time m : rest)
- | now >= time = do
+getDelay now ptimeval all@(d : rest)
+ = case d of
+ Delay time m | now >= time -> do
putMVar m ()
getDelay now ptimeval rest
- | otherwise = do
- setTimevalTicks ptimeval (time - now)
+ DelaySTM time t | now >= time -> do
+ atomically $ writeTVar t True
+ getDelay now ptimeval rest
+ _otherwise -> do
+ setTimevalTicks ptimeval (delayTime d - now)
return (all,ptimeval)
insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
-insertDelay d@(Delay time m) [] = [d]
-insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest)
- | time <= time' = d1 : ds
- | otherwise = d2 : insertDelay d1 rest
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+ | delayTime d1 <= delayTime d2 = d1 : ds
+ | otherwise = d2 : insertDelay d1 rest
+
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
type Ticks = Int
tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
-> IO CInt
+foreign import ccall unsafe "hsFD_SETSIZE"
+ fD_SETSIZE :: Fd
+
foreign import ccall unsafe "hsFD_CLR"
fdClr :: Fd -> Ptr CFdSet -> IO ()