X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=0d174578a65b55bb9f64c02ee77a0ad5da47b9e5;hb=0bf2fdab482da7a287ef09f18e7656abe62256d0;hp=460a98a20a61966c6afa7702844da56b420ca25f;hpb=55fb0382ef8d4a08424ab4751106e9f588a8a6f7;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 460a98a..0d17457 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -1,5 +1,6 @@ \begin{code} -{-# OPTIONS_GHC -fno-implicit-prelude #-} +{-# OPTIONS_GHC -XNoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_HADDOCK not-home #-} ----------------------------------------------------------------------------- -- | @@ -28,34 +29,30 @@ module GHC.Conc -- * Forking and suchlike , forkIO -- :: IO a -> IO ThreadId + , forkIOUnmasked , forkOnIO -- :: Int -> IO a -> IO ThreadId + , forkOnIOUnmasked , numCapabilities -- :: Int + , numSparks -- :: IO Int , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () , throwTo -- :: ThreadId -> Exception -> IO () , par -- :: a -> b -> b , pseq -- :: a -> b -> b + , runSparks , yield -- :: IO () , labelThread -- :: ThreadId -> String -> IO () + , ThreadStatus(..), BlockReason(..) + , threadStatus -- :: ThreadId -> IO ThreadStatus + -- * Waiting , threadDelay -- :: Int -> IO () , registerDelay -- :: Int -> IO (TVar Bool) , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () - -- * MVars - , MVar(..) - , newMVar -- :: a -> IO (MVar a) - , newEmptyMVar -- :: IO (MVar a) - , takeMVar -- :: MVar a -> IO a - , putMVar -- :: MVar a -> a -> IO () - , tryTakeMVar -- :: MVar a -> IO (Maybe a) - , tryPutMVar -- :: MVar a -> a -> IO Bool - , isEmptyMVar -- :: MVar a -> IO Bool - , addMVarFinalizer -- :: MVar a -> IO () -> IO () - -- * TVars , STM(..) , atomically -- :: STM a -> IO a @@ -68,10 +65,12 @@ module GHC.Conc , newTVar -- :: a -> STM (TVar a) , newTVarIO -- :: a -> STM (TVar a) , readTVar -- :: TVar a -> STM a + , readTVarIO -- :: TVar a -> IO a , writeTVar -- :: a -> TVar a -> STM () , unsafeIOToSTM -- :: IO a -> STM a -- * Miscellaneous + , withMVar #ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) @@ -82,16 +81,23 @@ module GHC.Conc #endif #ifndef mingw32_HOST_OS - , signalHandlerLock + , Signal, HandlerFun, setHandler, runHandlers #endif , ensureIOManagerIsRunning +#ifndef mingw32_HOST_OS + , syncIOManager +#endif #ifdef mingw32_HOST_OS , ConsoleEvent(..) , win32ConsoleHandler , toWin32ConsoleEvent #endif + , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO () + , getUncaughtExceptionHandler -- :: IO (Exception -> IO ()) + + , reportError, reportStackOverflow ) where import System.Posix.Types @@ -101,29 +107,43 @@ import System.Posix.Internals import Foreign import Foreign.C -#ifndef __HADDOCK__ -import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow ) +#ifdef mingw32_HOST_OS +import Data.Typeable #endif +#ifndef mingw32_HOST_OS +import Data.Dynamic +#endif +import Control.Monad import Data.Maybe import GHC.Base -import GHC.IOBase +#ifndef mingw32_HOST_OS +import GHC.Debug +#endif +import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) +import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) +import GHC.IO +import GHC.IO.Exception +import GHC.Exception +import GHC.IORef +import GHC.MVar import GHC.Num ( Num(..) ) -import GHC.Real ( fromIntegral, div ) +import GHC.Real ( fromIntegral ) #ifndef mingw32_HOST_OS -import GHC.Base ( Int(..) ) +import GHC.IOArray +import GHC.Arr ( inRange ) +#endif +#ifdef mingw32_HOST_OS +import GHC.Real ( div ) +import GHC.Ptr #endif #ifdef mingw32_HOST_OS import GHC.Read ( Read ) import GHC.Enum ( Enum ) #endif -import GHC.Exception import GHC.Pack ( packCString# ) -import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) ) -import GHC.STRef import GHC.Show ( Show(..), showString ) -import Data.Typeable infixr 0 `par`, `pseq` \end{code} @@ -194,15 +214,25 @@ thread. The new thread will be a lightweight thread; if you want to use a foreign library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. -GHC note: the new thread inherits the /blocked/ state of the parent -(see 'Control.Exception.block'). +GHC note: the new thread inherits the /masked/ state of the parent +(see 'Control.Exception.mask'). + +The newly created thread has an exception handler that discards the +exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and +'ThreadKilled', and passes all other exceptions to the uncaught +exception handler (see 'setUncaughtExceptionHandler'). -} forkIO :: IO () -> IO ThreadId forkIO action = IO $ \ s -> - case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) where action_plus = catchException action childHandler +-- | Like 'forkIO', but the child thread is created with asynchronous exceptions +-- unmasked (see 'Control.Exception.mask'). +forkIOUnmasked :: IO () -> IO ThreadId +forkIOUnmasked io = forkIO (unsafeUnmask io) + {- | Like 'forkIO', but lets you specify on which CPU the thread is created. Unlike a `forkIO` thread, a thread created by `forkOnIO` @@ -218,10 +248,15 @@ equivalent). -} forkOnIO :: Int -> IO () -> IO ThreadId forkOnIO (I# cpu) action = IO $ \ s -> - case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) where action_plus = catchException action childHandler +-- | Like 'forkOnIO', but the child thread is created with +-- asynchronous exceptions unmasked (see 'Control.Exception.mask'). +forkOnIOUnmasked :: Int -> IO () -> IO ThreadId +forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io) + -- | the value passed to the @+RTS -N@ flag. This is the number of -- Haskell threads that can run truly simultaneously at any given -- time, and is typically set to the number of physical CPU cores on @@ -231,35 +266,40 @@ numCapabilities = unsafePerformIO $ do n <- peek n_capabilities return (fromIntegral n) -foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt +-- | Returns the number of sparks currently in the local spark pool +numSparks :: IO Int +numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #) -childHandler :: Exception -> IO () +#if defined(mingw32_HOST_OS) && defined(__PIC__) +foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt +#else +foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt +#endif +childHandler :: SomeException -> IO () childHandler err = catchException (real_handler err) childHandler -real_handler :: Exception -> IO () -real_handler ex = - case ex of - -- ignore thread GC and killThread exceptions: - BlockedOnDeadMVar -> return () - BlockedIndefinitely -> return () - AsyncException ThreadKilled -> return () - - -- report all others: - AsyncException StackOverflow -> reportStackOverflow - other -> reportError other - -{- | 'killThread' terminates the given thread (GHC only). -Any work already done by the thread isn\'t -lost: the computation is suspended until required by another thread. -The memory used by the thread will be garbage collected if it isn\'t -referenced from anywhere. The 'killThread' function is defined in -terms of 'throwTo': - -> killThread tid = throwTo tid (AsyncException ThreadKilled) +real_handler :: SomeException -> IO () +real_handler se@(SomeException ex) = + -- ignore thread GC and killThread exceptions: + case cast ex of + Just BlockedIndefinitelyOnMVar -> return () + _ -> case cast ex of + Just BlockedIndefinitelyOnSTM -> return () + _ -> case cast ex of + Just ThreadKilled -> return () + _ -> case cast ex of + -- report all others: + Just StackOverflow -> reportStackOverflow + _ -> reportError se + +{- | 'killThread' raises the 'ThreadKilled' exception in the given +thread (GHC only). + +> killThread tid = throwTo tid ThreadKilled -} killThread :: ThreadId -> IO () -killThread tid = throwTo tid (AsyncException ThreadKilled) +killThread tid = throwTo tid ThreadKilled {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only). @@ -271,34 +311,44 @@ when dealing with race conditions: eg. if there are two threads that can kill each other, it is guaranteed that only one of the threads will get to kill the other. +Whatever work the target thread was doing when the exception was +raised is not lost: the computation is suspended until required by +another thread. + If the target thread is currently making a foreign call, then the exception will not be raised (and hence 'throwTo' will not return) until the call has completed. This is the case regardless of whether -the call is inside a 'block' or not. +the call is inside a 'mask' or not. Important note: the behaviour of 'throwTo' differs from that described in the paper \"Asynchronous exceptions in Haskell\" (). In the paper, 'throwTo' is non-blocking; but the library implementation adopts a more synchronous design in which 'throwTo' does not return until the exception -is received by the target thread. The trade-off is discussed in Section 8 of the paper. -Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of -the paper). - -There is currently no guarantee that the exception delivered by 'throwTo' will be -delivered at the first possible opportunity. In particular, if a thread may -unblock and then re-block exceptions (using 'unblock' and 'block') without receiving -a pending 'throwTo'. This is arguably undesirable behaviour. - - -} -throwTo :: ThreadId -> Exception -> IO () -throwTo (ThreadId id) ex = IO $ \ s -> - case (killThread# id ex s) of s1 -> (# s1, () #) +is received by the target thread. The trade-off is discussed in Section 9 of the paper. +Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of +the paper). Unlike other interruptible operations, however, 'throwTo' +is /always/ interruptible, even if it does not actually block. + +There is no guarantee that the exception will be delivered promptly, +although the runtime will endeavour to ensure that arbitrary +delays don't occur. In GHC, an exception can only be raised when a +thread reaches a /safe point/, where a safe point is where memory +allocation occurs. Some loops do not perform any memory allocation +inside the loop and therefore cannot be interrupted by a 'throwTo'. + +Blocked 'throwTo' is fair: if multiple threads are trying to throw an +exception to the same target thread, they will succeed in FIFO order. + + -} +throwTo :: Exception e => ThreadId -> e -> IO () +throwTo (ThreadId tid) ex = IO $ \ s -> + case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) -- | Returns the 'ThreadId' of the calling thread (GHC only). myThreadId :: IO ThreadId myThreadId = IO $ \s -> - case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #) + case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #) -- |The 'yield' action allows (forces, in a co-operative multitasking @@ -321,8 +371,8 @@ Other applications like the graphical Concurrent Haskell Debugger labelThread :: ThreadId -> String -> IO () labelThread (ThreadId t) str = IO $ \ s -> - let ps = packCString# str - adr = byteArrayContents# ps in + let !ps = packCString# str + !adr = byteArrayContents# ps in case (labelThread# t adr s) of s1 -> (# s1, () #) -- Nota Bene: 'pseq' used to be 'seq' @@ -343,6 +393,60 @@ pseq x y = x `seq` lazy y {-# INLINE par #-} par :: a -> b -> b par x y = case (par# x) of { _ -> lazy y } + +-- | Internal function used by the RTS to run sparks. +runSparks :: IO () +runSparks = IO loop + where loop s = case getSpark# s of + (# s', n, p #) -> + if n ==# 0# then (# s', () #) + else p `seq` loop s' + +data BlockReason + = BlockedOnMVar + -- ^blocked on on 'MVar' + | BlockedOnBlackHole + -- ^blocked on a computation in progress by another thread + | BlockedOnException + -- ^blocked in 'throwTo' + | BlockedOnSTM + -- ^blocked in 'retry' in an STM transaction + | BlockedOnForeignCall + -- ^currently in a foreign call + | BlockedOnOther + -- ^blocked on some other resource. Without @-threaded@, + -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@ + -- they show up as 'BlockedOnMVar'. + deriving (Eq,Ord,Show) + +-- | The current status of a thread +data ThreadStatus + = ThreadRunning + -- ^the thread is currently runnable or running + | ThreadFinished + -- ^the thread has finished + | ThreadBlocked BlockReason + -- ^the thread is blocked on some resource + | ThreadDied + -- ^the thread received an uncaught exception + deriving (Eq,Ord,Show) + +threadStatus :: ThreadId -> IO ThreadStatus +threadStatus (ThreadId t) = IO $ \s -> + case threadStatus# t s of + (# s', stat #) -> (# s', mk_stat (I# stat) #) + where + -- NB. keep these in sync with includes/Constants.h + mk_stat 0 = ThreadRunning + mk_stat 1 = ThreadBlocked BlockedOnMVar + mk_stat 2 = ThreadBlocked BlockedOnBlackHole + mk_stat 3 = ThreadBlocked BlockedOnException + mk_stat 7 = ThreadBlocked BlockedOnSTM + mk_stat 11 = ThreadBlocked BlockedOnForeignCall + mk_stat 12 = ThreadBlocked BlockedOnForeignCall + mk_stat 16 = ThreadFinished + mk_stat 17 = ThreadDied + mk_stat _ = ThreadBlocked BlockedOnOther \end{code} @@ -384,13 +488,36 @@ bindSTM (STM m) k = STM ( \s -> thenSTM :: STM a -> STM b -> STM b thenSTM (STM m) k = STM ( \s -> case m s of - (# new_s, a #) -> unSTM k new_s + (# new_s, _ #) -> unSTM k new_s ) returnSTM :: a -> STM a returnSTM x = STM (\s -> (# s, x #)) --- | Unsafely performs IO in the STM monad. +instance MonadPlus STM where + mzero = retry + mplus = orElse + +-- | Unsafely performs IO in the STM monad. Beware: this is a highly +-- dangerous thing to do. +-- +-- * The STM implementation will often run transactions multiple +-- times, so you need to be prepared for this if your IO has any +-- side effects. +-- +-- * The STM implementation will abort transactions that are known to +-- be invalid and need to be restarted. This may happen in the middle +-- of `unsafeIOToSTM`, so make sure you don't acquire any resources +-- that need releasing (exception handlers are ignored when aborting +-- the transaction). That includes doing any IO using Handles, for +-- example. Getting this wrong will probably lead to random deadlocks. +-- +-- * The transaction may have seen an inconsistent view of memory when +-- the IO runs. Invariants that you expect to be true throughout +-- your program may not be true inside a transaction, due to the +-- way transactions are implemented. Normally this wouldn't be visible +-- to the programmer, but using `unsafeIOToSTM` can expose it. +-- unsafeIOToSTM :: IO a -> STM a unsafeIOToSTM (IO m) = STM m @@ -424,7 +551,7 @@ orElse :: STM a -> STM a -> STM a orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s -- |Exception handling within STM actions. -catchSTM :: STM a -> (Exception -> STM a) -> STM a +catchSTM :: STM a -> (SomeException -> STM a) -> STM a catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s -- | Low-level primitive on which always and alwaysSucceeds are built. @@ -441,7 +568,7 @@ checkInv (STM m) = STM (\s -> (check# m) s) -- of those points then the transaction violating it is aborted -- and the exception raised by the invariant is propagated. alwaysSucceeds :: STM a -> STM () -alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) +alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) checkInv i -- | always is a variant of alwaysSucceeds in which the invariant is @@ -474,6 +601,16 @@ newTVarIO val = IO $ \s1# -> case newTVar# val s1# of (# s2#, tvar# #) -> (# s2#, TVar tvar# #) +-- |Return the current value stored in a TVar. +-- This is equivalent to +-- +-- > readTVarIO = atomically . readTVar +-- +-- but works much faster, because it doesn't perform a complete +-- transaction, it just reads the current value of the 'TVar'. +readTVarIO :: TVar a -> IO a +readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s# + -- |Return the current value stored in a TVar readTVar :: TVar a -> STM a readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# @@ -486,119 +623,27 @@ writeTVar (TVar tvar#) val = STM $ \s1# -> \end{code} -%************************************************************************ -%* * -\subsection[mvars]{M-Structures} -%* * -%************************************************************************ - -M-Vars are rendezvous points for concurrent threads. They begin -empty, and any attempt to read an empty M-Var blocks. When an M-Var -is written, a single blocked thread may be freed. Reading an M-Var -toggles its state from full back to empty. Therefore, any value -written to an M-Var may only be read once. Multiple reads and writes -are allowed, but there must be at least one read between any two -writes. +MVar utilities \begin{code} ---Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a) - --- |Create an 'MVar' which is initially empty. -newEmptyMVar :: IO (MVar a) -newEmptyMVar = IO $ \ s# -> - case newMVar# s# of - (# s2#, svar# #) -> (# s2#, MVar svar# #) - --- |Create an 'MVar' which contains the supplied value. -newMVar :: a -> IO (MVar a) -newMVar value = - newEmptyMVar >>= \ mvar -> - putMVar mvar value >> - return mvar - --- |Return the contents of the 'MVar'. If the 'MVar' is currently --- empty, 'takeMVar' will wait until it is full. After a 'takeMVar', --- the 'MVar' is left empty. --- --- There are two further important properties of 'takeMVar': --- --- * 'takeMVar' is single-wakeup. That is, if there are multiple --- threads blocked in 'takeMVar', and the 'MVar' becomes full, --- only one thread will be woken up. The runtime guarantees that --- the woken thread completes its 'takeMVar' operation. --- --- * When multiple threads are blocked on an 'MVar', they are --- woken up in FIFO order. This is useful for providing --- fairness properties of abstractions built using 'MVar's. --- -takeMVar :: MVar a -> IO a -takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s# - --- |Put a value into an 'MVar'. If the 'MVar' is currently full, --- 'putMVar' will wait until it becomes empty. --- --- There are two further important properties of 'putMVar': --- --- * 'putMVar' is single-wakeup. That is, if there are multiple --- threads blocked in 'putMVar', and the 'MVar' becomes empty, --- only one thread will be woken up. The runtime guarantees that --- the woken thread completes its 'putMVar' operation. --- --- * When multiple threads are blocked on an 'MVar', they are --- woken up in FIFO order. This is useful for providing --- fairness properties of abstractions built using 'MVar's. --- -putMVar :: MVar a -> a -> IO () -putMVar (MVar mvar#) x = IO $ \ s# -> - case putMVar# mvar# x s# of - s2# -> (# s2#, () #) - --- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function --- returns immediately, with 'Nothing' if the 'MVar' was empty, or --- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar', --- the 'MVar' is left empty. -tryTakeMVar :: MVar a -> IO (Maybe a) -tryTakeMVar (MVar m) = IO $ \ s -> - case tryTakeMVar# m s of - (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty - (# s, _, a #) -> (# s, Just a #) -- MVar is full - --- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function --- attempts to put the value @a@ into the 'MVar', returning 'True' if --- it was successful, or 'False' otherwise. -tryPutMVar :: MVar a -> a -> IO Bool -tryPutMVar (MVar mvar#) x = IO $ \ s# -> - case tryPutMVar# mvar# x s# of - (# s, 0# #) -> (# s, False #) - (# s, _ #) -> (# s, True #) - --- |Check whether a given 'MVar' is empty. --- --- Notice that the boolean value returned is just a snapshot of --- the state of the MVar. By the time you get to react on its result, --- the MVar may have been filled (or emptied) - so be extremely --- careful when using this operation. Use 'tryTakeMVar' instead if possible. -isEmptyMVar :: MVar a -> IO Bool -isEmptyMVar (MVar mv#) = IO $ \ s# -> - case isEmptyMVar# mv# s# of - (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #) - --- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and --- "System.Mem.Weak" for more about finalizers. -addMVarFinalizer :: MVar a -> IO () -> IO () -addMVarFinalizer (MVar m) finalizer = - IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) } - withMVar :: MVar a -> (a -> IO b) -> IO b withMVar m io = - block $ do + mask $ \restore -> do a <- takeMVar m - b <- catchException (unblock (io a)) + b <- catchAny (restore (io a)) (\e -> do putMVar m a; throw e) putMVar m a return b -\end{code} +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + mask $ \restore -> do + a <- takeMVar m + a' <- catchAny (restore (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () +\end{code} %************************************************************************ %* * @@ -616,19 +661,19 @@ withMVar m io = asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = IO $ \s -> case asyncRead# fd isSock len buf s of - (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = IO $ \s -> case asyncWrite# fd isSock len buf s of - (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int asyncDoProc (FunPtr proc) (Ptr param) = -- the 'length' value is ignored; simplifies implementation of -- the async*# primops to have them all return the same result. IO $ \s -> case asyncDoProc# proc param s of - (# s, len#, err# #) -> (# s, I# err# #) + (# s', _len#, err# #) -> (# s', I# err# #) -- to aid the use of these primops by the IO Handle implementation, -- provide the following convenience funs: @@ -656,7 +701,7 @@ threadWaitRead fd #endif | otherwise = IO $ \s -> case fromIntegral fd of { I# fd# -> - case waitRead# fd# s of { s -> (# s, () #) + case waitRead# fd# s of { s' -> (# s', () #) }} -- | Block the current thread until data can be written to the @@ -668,7 +713,7 @@ threadWaitWrite fd #endif | otherwise = IO $ \s -> case fromIntegral fd of { I# fd# -> - case waitWrite# fd# s of { s -> (# s, () #) + case waitWrite# fd# s of { s' -> (# s', () #) }} -- | Suspends the current thread for a given number of microseconds @@ -683,7 +728,7 @@ threadDelay time | threaded = waitForDelayEvent time | otherwise = IO $ \s -> case fromIntegral time of { I# time# -> - case delay# time# s of { s -> (# s, () #) + case delay# time# s of { s' -> (# s', () #) }} @@ -735,23 +780,6 @@ calculateTarget usecs = do -- around the scheduler loop. Furthermore, the scheduler can be simplified -- by not having to check for completed IO requests. --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - #ifndef mingw32_HOST_OS data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) @@ -763,25 +791,52 @@ data DelayReq | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) #ifndef mingw32_HOST_OS +{-# NOINLINE pendingEvents #-} pendingEvents :: IORef [IOReq] +pendingEvents = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingEventsStore + +foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore" + getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a) #endif -pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here -{-# NOINLINE pendingEvents #-} + {-# NOINLINE pendingDelays #-} -(pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread - reqs <- newIORef [] - dels <- newIORef [] - return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore" + getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore" + getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a) ensureIOManagerIsRunning :: IO () ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () + | threaded = startIOManagerThread | otherwise = return () +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] insertDelay d [] = [d] insertDelay d1 ds@(d2 : rest) @@ -794,31 +849,52 @@ delayTime (DelaySTM t _) = t type USecs = Word64 --- XXX: move into GHC.IOBase from Data.IORef? -atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b -atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s - foreign import ccall unsafe "getUSecOfDay" getUSecOfDay :: IO USecs -prodding :: IORef Bool {-# NOINLINE prodding #-} -prodding = unsafePerformIO (newIORef False) +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcProddingStore + +foreign import ccall unsafe "getOrSetGHCConcProddingStore" + getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a) prodServiceThread :: IO () prodServiceThread = do - was_set <- atomicModifyIORef prodding (\a -> (True,a)) - if (not (was_set)) then wakeupIOManager else return () + -- NB. use atomicModifyIORef here, otherwise there are race + -- conditions in which prodding is left at True but the server is + -- blocked in select(). + was_set <- atomicModifyIORef prodding $ \b -> (True,b) + unless was_set wakeupIOManager + +-- Machinery needed to ensure that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + mask_ $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref==ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) #ifdef mingw32_HOST_OS -- ---------------------------------------------------------------------------- -- Windows IO manager thread -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do wakeup <- c_getIOManagerEvent - forkIO $ service_loop wakeup [] - return () + service_loop wakeup [] service_loop :: HANDLE -- read end of pipe -> [DelayReq] -- current delay requests @@ -836,26 +912,27 @@ service_loop wakeup old_delays = do case r of 0xffffffff -> do c_maperrno; throwErrno "service_loop" 0 -> do - r <- c_readIOManagerEvent + r2 <- c_readIOManagerEvent exit <- - case r of - _ | r == io_MANAGER_WAKEUP -> return False - _ | r == io_MANAGER_DIE -> return True + case r2 of + _ | r2 == io_MANAGER_WAKEUP -> return False + _ | r2 == io_MANAGER_DIE -> return True 0 -> return False -- spurious wakeup - r -> do start_console_handler (r `shiftR` 1); return False - if exit - then return () - else service_cont wakeup delays' + _ -> do start_console_handler (r2 `shiftR` 1); return False + unless exit $ service_cont wakeup delays' _other -> service_cont wakeup delays' -- probably timeout +service_cont :: HANDLE -> [DelayReq] -> IO () service_cont wakeup delays = do - atomicModifyIORef prodding (\_ -> (False,False)) + r <- atomicModifyIORef prodding (\_ -> (False,False)) + r `seq` return () -- avoid space leak service_loop wakeup delays -- must agree with rts/win32/ThrIOManager.c -io_MANAGER_WAKEUP = 0xffffffff :: Word32 -io_MANAGER_DIE = 0xfffffffe :: Word32 +io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 +io_MANAGER_WAKEUP = 0xffffffff +io_MANAGER_DIE = 0xfffffffe data ConsoleEvent = ControlC @@ -870,10 +947,11 @@ start_console_handler :: Word32 -> IO () start_console_handler r = case toWin32ConsoleEvent r of Just x -> withMVar win32ConsoleHandler $ \handler -> do - forkIO (handler x) + _ <- forkIO (handler x) return () Nothing -> return () +toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent toWin32ConsoleEvent ev = case ev of 0 {- CTRL_C_EVENT-} -> Just ControlC @@ -886,19 +964,14 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - -wakeupIOManager = do - hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP +wakeupIOManager :: IO () +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending -- delays is kept ordered. getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) -getDelay now [] = return ([], iNFINITE) +getDelay _ [] = return ([], iNFINITE) getDelay now all@(d : rest) = case d of Delay time m | now >= time -> do @@ -919,7 +992,8 @@ getDelay now all@(d : rest) type HANDLE = Ptr () type DWORD = Word32 -iNFINITE = 0xFFFFFFFF :: DWORD -- urgh +iNFINITE :: DWORD +iNFINITE = 0xFFFFFFFF -- urgh foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) c_getIOManagerEvent :: IO HANDLE @@ -940,19 +1014,21 @@ foreign import stdcall "WaitForSingleObject" -- ---------------------------------------------------------------------------- -- Unix IO manager thread, using select() -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do allocaArray 2 $ \fds -> do - throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds) + throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 wr_end <- peekElemOff fds 1 - writeIORef stick (fromIntegral wr_end) + setNonBlockingFD wr_end True -- writes happen in a signal handler, we + -- don't want them to block. + setCloseOnExec rd_end + setCloseOnExec wr_end c_setIOManagerPipe wr_end - forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () service_loop @@ -965,13 +1041,24 @@ service_loop -> IO () service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do + -- reset prodding before we look at the new requests. If a new + -- client arrives after this point they will send a wakup which will + -- cause the server to loop around again, so we can be sure to not + -- miss any requests. + -- + -- NB. it's important to do this in the *first* iteration of + -- service_loop, rather than after calling select(), since a client + -- may have set prodding to True without sending a wakeup byte down + -- the pipe, because the pipe wasn't set up. + atomicModifyIORef prodding (\_ -> (False, ())) + -- pick up new IO requests new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) let reqs = new_reqs ++ old_reqs -- pick up new delay requests new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) - let delays = foldr insertDelay old_delays new_delays + let delays0 = foldr insertDelay old_delays new_delays -- build the FDSets for select() fdZero readfds @@ -1004,7 +1091,7 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do else return (False,delays') - (wakeup_all,delays') <- do_select delays + (wakeup_all,delays') <- do_select delays0 exit <- if wakeup_all then return False @@ -1013,66 +1100,138 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do if b == 0 then return False else alloca $ \p -> do - c_read (fromIntegral wakeup) p 1; return () + warnErrnoIfMinus1_ "service_loop" $ + c_read (fromIntegral wakeup) p 1 s <- peek p case s of _ | s == io_MANAGER_WAKEUP -> return False _ | s == io_MANAGER_DIE -> return True - _ -> withMVar signalHandlerLock $ \_ -> do - handler_tbl <- peek handlers - sp <- peekElemOff handler_tbl (fromIntegral s) - io <- deRefStablePtr sp - forkIO io - return False - - if exit then return () else do - - atomicModifyIORef prodding (\_ -> (False,False)) + _ | s == io_MANAGER_SYNC -> do + mvars <- readIORef sync + mapM_ (flip putMVar ()) mvars + return False + _ -> do + fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) + withForeignPtr fp $ \p_siginfo -> do + r <- c_read (fromIntegral wakeup) (castPtr p_siginfo) + sizeof_siginfo_t + when (r /= fromIntegral sizeof_siginfo_t) $ + error "failed to read siginfo_t" + runHandlers' fp (fromIntegral s) + return False + + unless exit $ do reqs' <- if wakeup_all then do wakeupAll reqs; return [] else completeRequests reqs readfds writefds [] service_loop wakeup readfds writefds ptimeval reqs' delays' -io_MANAGER_WAKEUP = 0xff :: CChar -io_MANAGER_DIE = 0xfe :: CChar +io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8 +io_MANAGER_WAKEUP = 0xff +io_MANAGER_DIE = 0xfe +io_MANAGER_SYNC = 0xfd -stick :: IORef Fd -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) - -wakeupIOManager :: IO () -wakeupIOManager = do - fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - c_write (fromIntegral fd) pbuf 1; return () +{-# NOINLINE sync #-} +sync :: IORef [MVar ()] +sync = unsafePerformIO (newIORef []) --- Lock used to protect concurrent access to signal_handlers. Symptom of --- this race condition is #1922, although that bug was on Windows a similar --- bug also exists on Unix. -signalHandlerLock :: MVar () -signalHandlerLock = unsafePerformIO (newMVar ()) +-- waits for the IO manager to drain the pipe +syncIOManager :: IO () +syncIOManager = do + m <- newEmptyMVar + atomicModifyIORef sync (\old -> (m:old,())) + c_ioManagerSync + takeMVar m -foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ()))) +foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO () +foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO () + +-- For the non-threaded RTS +runHandlers :: Ptr Word8 -> Int -> IO () +runHandlers p_info sig = do + fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) + withForeignPtr fp $ \p -> do + copyBytes p p_info (fromIntegral sizeof_siginfo_t) + free p_info + runHandlers' fp (fromIntegral sig) + +runHandlers' :: ForeignPtr Word8 -> Signal -> IO () +runHandlers' p_info sig = do + let int = fromIntegral sig + withMVar signal_handlers $ \arr -> + if not (inRange (boundsIOArray arr) int) + then return () + else do handler <- unsafeReadIOArray arr int + case handler of + Nothing -> return () + Just (f,_) -> do _ <- forkIO (f p_info) + return () + +warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO () +warnErrnoIfMinus1_ what io + = do r <- io + when (r == -1) $ do + errno <- getErrno + str <- strerror errno >>= peekCString + when (r == -1) $ + debugErrLn ("Warning: " ++ what ++ " failed: " ++ str) + +foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar) foreign import ccall "setIOManagerPipe" c_setIOManagerPipe :: CInt -> IO () +foreign import ccall "__hscore_sizeof_siginfo_t" + sizeof_siginfo_t :: CSize + +type Signal = CInt + +maxSig = 64 :: Int + +type HandlerFun = ForeignPtr Word8 -> IO () + +-- Lock used to protect concurrent access to signal_handlers. Symptom of +-- this race condition is #1922, although that bug was on Windows a similar +-- bug also exists on Unix. +{-# NOINLINE signal_handlers #-} +signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) +signal_handlers = unsafePerformIO $ do + arr <- newIOArray (0,maxSig) Nothing + m <- newMVar arr + sharedCAF m getOrSetGHCConcSignalHandlerStore + +foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore" + getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a) + +setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) +setHandler sig handler = do + let int = fromIntegral sig + withMVar signal_handlers $ \arr -> + if not (inRange (boundsIOArray arr) int) + then error "GHC.Conc.setHandler: signal out of range" + else do old <- unsafeReadIOArray arr int + unsafeWriteIOArray arr int handler + return old + -- ----------------------------------------------------------------------------- -- IO requests -buildFdSets maxfd readfds writefds [] = return maxfd -buildFdSets maxfd readfds writefds (Read fd m : reqs) +buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd +buildFdSets maxfd _ _ [] = return maxfd +buildFdSets maxfd readfds writefds (Read fd _ : reqs) | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" | otherwise = do fdSet fd readfds buildFdSets (max maxfd fd) readfds writefds reqs -buildFdSets maxfd readfds writefds (Write fd m : reqs) +buildFdSets maxfd readfds writefds (Write fd _ : reqs) | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" | otherwise = do fdSet fd writefds buildFdSets (max maxfd fd) readfds writefds reqs +completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] + -> IO [IOReq] completeRequests [] _ _ reqs' = return reqs' completeRequests (Read fd m : reqs) readfds writefds reqs' = do b <- fdIsSet fd readfds @@ -1085,9 +1244,10 @@ completeRequests (Write fd m : reqs) readfds writefds reqs' = do then do putMVar m (); completeRequests reqs readfds writefds reqs' else completeRequests reqs readfds writefds (Write fd m : reqs') +wakeupAll :: [IOReq] -> IO () wakeupAll [] = return () -wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs -wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs +wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs +wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs waitForReadEvent :: Fd -> IO () waitForReadEvent fd = do @@ -1110,7 +1270,7 @@ waitForWriteEvent fd = do -- and return the smallest delay to wait for. The queue of pending -- delays is kept ordered. getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) -getDelay now ptimeval [] = return ([],nullPtr) +getDelay _ _ [] = return ([],nullPtr) getDelay now ptimeval all@(d : rest) = case d of Delay time m | now >= time -> do @@ -1123,7 +1283,7 @@ getDelay now ptimeval all@(d : rest) setTimevalTicks ptimeval (delayTime d - now) return (all,ptimeval) -newtype CTimeVal = CTimeVal () +data CTimeVal foreign import ccall unsafe "sizeofTimeVal" sizeofTimeVal :: Int @@ -1142,9 +1302,9 @@ foreign import ccall unsafe "setTimevalTicks" -- ToDo: move to System.Posix.Internals? -newtype CFdSet = CFdSet () +data CFdSet -foreign import ccall safe "select" +foreign import ccall safe "__hscore_select" c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal -> IO CInt @@ -1154,12 +1314,6 @@ foreign import ccall unsafe "hsFD_SETSIZE" fD_SETSIZE :: Fd fD_SETSIZE = fromIntegral c_fD_SETSIZE -foreign import ccall unsafe "hsFD_CLR" - c_fdClr :: CInt -> Ptr CFdSet -> IO () - -fdClr :: Fd -> Ptr CFdSet -> IO () -fdClr (Fd fd) fdset = c_fdClr fd fdset - foreign import ccall unsafe "hsFD_ISSET" c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt @@ -1180,4 +1334,44 @@ foreign import ccall unsafe "sizeof_fd_set" #endif +reportStackOverflow :: IO () +reportStackOverflow = callStackOverflowHook + +reportError :: SomeException -> IO () +reportError ex = do + handler <- getUncaughtExceptionHandler + handler ex + +-- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove +-- the unsafe below. +foreign import ccall unsafe "stackOverflow" + callStackOverflowHook :: IO () + +{-# NOINLINE uncaughtExceptionHandler #-} +uncaughtExceptionHandler :: IORef (SomeException -> IO ()) +uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler) + where + defaultHandler :: SomeException -> IO () + defaultHandler se@(SomeException ex) = do + (hFlush stdout) `catchAny` (\ _ -> return ()) + let msg = case cast ex of + Just Deadlock -> "no threads to run: infinite loop or deadlock?" + _ -> case cast ex of + Just (ErrorCall s) -> s + _ -> showsPrec 0 se "" + withCString "%s" $ \cfmt -> + withCString msg $ \cmsg -> + errorBelch cfmt cmsg + +-- don't use errorBelch() directly, because we cannot call varargs functions +-- using the FFI. +foreign import ccall unsafe "HsBase.h errorBelch2" + errorBelch :: CString -> CString -> IO () + +setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO () +setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler + +getUncaughtExceptionHandler :: IO (SomeException -> IO ()) +getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler + \end{code}