X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=de96b2c06d5ff44cf32922f10b451ffa6aeadef1;hb=HEAD;hp=7f7d585d22a6b1db8b8f5ba36492e370b38484b8;hpb=f19c1c38568beb6d7c9080a8174083913e47849f;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 7f7d585..de96b2c 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -1,7 +1,8 @@ \begin{code} -{-# OPTIONS_GHC -XNoImplicitPrelude #-} +{-# LANGUAGE CPP, NoImplicitPrelude #-} {-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_HADDOCK not-home #-} + ----------------------------------------------------------------------------- -- | -- Module : GHC.Conc @@ -29,8 +30,15 @@ module GHC.Conc -- * Forking and suchlike , forkIO -- :: IO a -> IO ThreadId + , forkIOUnmasked + , forkIOWithUnmask + , forkOn , forkOnIO -- :: Int -> IO a -> IO ThreadId + , forkOnIOUnmasked + , forkOnWithUnmask , numCapabilities -- :: Int + , getNumCapabilities -- :: IO Int + , numSparks -- :: IO Int , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () @@ -43,19 +51,22 @@ module GHC.Conc , ThreadStatus(..), BlockReason(..) , threadStatus -- :: ThreadId -> IO ThreadStatus + , threadCapability -- * Waiting , threadDelay -- :: Int -> IO () , registerDelay -- :: Int -> IO (TVar Bool) , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () + , closeFdWith -- :: (Fd -> IO ()) -> Fd -> IO () -- * TVars , STM(..) , atomically -- :: STM a -> IO a , retry -- :: STM a , orElse -- :: STM a -> STM a -> STM a - , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a + , throwSTM -- :: Exception e => e -> STM a + , catchSTM -- :: Exception e => STM a -> (e -> STM a) -> STM a , alwaysSucceeds -- :: STM a -> STM () , always -- :: STM Bool -> STM () , TVar(..) @@ -82,9 +93,6 @@ module GHC.Conc #endif , ensureIOManagerIsRunning -#ifndef mingw32_HOST_OS - , syncIOManager -#endif #ifdef mingw32_HOST_OS , ConsoleEvent(..) @@ -97,1228 +105,11 @@ module GHC.Conc , reportError, reportStackOverflow ) where -import System.Posix.Types -#ifndef mingw32_HOST_OS -import System.Posix.Internals -#endif -import Foreign -import Foreign.C - -#ifdef mingw32_HOST_OS -import Data.Typeable -#endif - -#ifndef mingw32_HOST_OS -import Data.Dynamic -#endif -import Control.Monad -import Data.Maybe - -import GHC.Base -#ifndef mingw32_HOST_OS -import GHC.Debug -#endif -import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) -import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) -import GHC.IO -import GHC.IO.Exception -import GHC.Exception -import GHC.IORef -import GHC.MVar -import GHC.Num ( Num(..) ) -import GHC.Real ( fromIntegral ) -#ifndef mingw32_HOST_OS -import GHC.IOArray -import GHC.Arr ( inRange ) -#endif -#ifdef mingw32_HOST_OS -import GHC.Real ( div ) -import GHC.Ptr -#endif -#ifdef mingw32_HOST_OS -import GHC.Read ( Read ) -import GHC.Enum ( Enum ) -#endif -import GHC.Pack ( packCString# ) -import GHC.Show ( Show(..), showString ) - -infixr 0 `par`, `pseq` -\end{code} - -%************************************************************************ -%* * -\subsection{@ThreadId@, @par@, and @fork@} -%* * -%************************************************************************ - -\begin{code} -data ThreadId = ThreadId ThreadId# deriving( Typeable ) --- ToDo: data ThreadId = ThreadId (Weak ThreadId#) --- But since ThreadId# is unlifted, the Weak type must use open --- type variables. -{- ^ -A 'ThreadId' is an abstract type representing a handle to a thread. -'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where -the 'Ord' instance implements an arbitrary total ordering over -'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued -'ThreadId' to string form; showing a 'ThreadId' value is occasionally -useful when debugging or diagnosing the behaviour of a concurrent -program. - -/Note/: in GHC, if you have a 'ThreadId', you essentially have -a pointer to the thread itself. This means the thread itself can\'t be -garbage collected until you drop the 'ThreadId'. -This misfeature will hopefully be corrected at a later date. - -/Note/: Hugs does not provide any operations on other threads; -it defines 'ThreadId' as a synonym for (). --} - -instance Show ThreadId where - showsPrec d t = - showString "ThreadId " . - showsPrec d (getThreadId (id2TSO t)) - -foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt - -id2TSO :: ThreadId -> ThreadId# -id2TSO (ThreadId t) = t - -foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt --- Returns -1, 0, 1 - -cmpThread :: ThreadId -> ThreadId -> Ordering -cmpThread t1 t2 = - case cmp_thread (id2TSO t1) (id2TSO t2) of - -1 -> LT - 0 -> EQ - _ -> GT -- must be 1 - -instance Eq ThreadId where - t1 == t2 = - case t1 `cmpThread` t2 of - EQ -> True - _ -> False - -instance Ord ThreadId where - compare = cmpThread - -{- | -Sparks off a new thread to run the 'IO' computation passed as the -first argument, and returns the 'ThreadId' of the newly created -thread. - -The new thread will be a lightweight thread; if you want to use a foreign -library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. - -GHC note: the new thread inherits the /blocked/ state of the parent -(see 'Control.Exception.block'). - -The newly created thread has an exception handler that discards the -exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and -'ThreadKilled', and passes all other exceptions to the uncaught -exception handler (see 'setUncaughtExceptionHandler'). --} -forkIO :: IO () -> IO ThreadId -forkIO action = IO $ \ s -> - case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) - where - action_plus = catchException action childHandler - -{- | -Like 'forkIO', but lets you specify on which CPU the thread is -created. Unlike a `forkIO` thread, a thread created by `forkOnIO` -will stay on the same CPU for its entire lifetime (`forkIO` threads -can migrate between CPUs according to the scheduling policy). -`forkOnIO` is useful for overriding the scheduling policy when you -know in advance how best to distribute the threads. - -The `Int` argument specifies the CPU number; it is interpreted modulo -'numCapabilities' (note that it actually specifies a capability number -rather than a CPU number, but to a first approximation the two are -equivalent). --} -forkOnIO :: Int -> IO () -> IO ThreadId -forkOnIO (I# cpu) action = IO $ \ s -> - case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) - where - action_plus = catchException action childHandler - --- | the value passed to the @+RTS -N@ flag. This is the number of --- Haskell threads that can run truly simultaneously at any given --- time, and is typically set to the number of physical CPU cores on --- the machine. -numCapabilities :: Int -numCapabilities = unsafePerformIO $ do - n <- peek n_capabilities - return (fromIntegral n) - -#if defined(mingw32_HOST_OS) && defined(__PIC__) -foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt -#else -foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt -#endif -childHandler :: SomeException -> IO () -childHandler err = catchException (real_handler err) childHandler - -real_handler :: SomeException -> IO () -real_handler se@(SomeException ex) = - -- ignore thread GC and killThread exceptions: - case cast ex of - Just BlockedIndefinitelyOnMVar -> return () - _ -> case cast ex of - Just BlockedIndefinitelyOnSTM -> return () - _ -> case cast ex of - Just ThreadKilled -> return () - _ -> case cast ex of - -- report all others: - Just StackOverflow -> reportStackOverflow - _ -> reportError se - -{- | 'killThread' terminates the given thread (GHC only). -Any work already done by the thread isn\'t -lost: the computation is suspended until required by another thread. -The memory used by the thread will be garbage collected if it isn\'t -referenced from anywhere. The 'killThread' function is defined in -terms of 'throwTo': - -> killThread tid = throwTo tid ThreadKilled - -Killthread is a no-op if the target thread has already completed. --} -killThread :: ThreadId -> IO () -killThread tid = throwTo tid ThreadKilled - -{- | 'throwTo' raises an arbitrary exception in the target thread (GHC only). - -'throwTo' does not return until the exception has been raised in the -target thread. -The calling thread can thus be certain that the target -thread has received the exception. This is a useful property to know -when dealing with race conditions: eg. if there are two threads that -can kill each other, it is guaranteed that only one of the threads -will get to kill the other. - -If the target thread is currently making a foreign call, then the -exception will not be raised (and hence 'throwTo' will not return) -until the call has completed. This is the case regardless of whether -the call is inside a 'block' or not. - -Important note: the behaviour of 'throwTo' differs from that described in -the paper \"Asynchronous exceptions in Haskell\" -(). -In the paper, 'throwTo' is non-blocking; but the library implementation adopts -a more synchronous design in which 'throwTo' does not return until the exception -is received by the target thread. The trade-off is discussed in Section 9 of the paper. -Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of -the paper). - -There is currently no guarantee that the exception delivered by 'throwTo' will be -delivered at the first possible opportunity. In particular, a thread may -unblock and then re-block exceptions (using 'unblock' and 'block') without receiving -a pending 'throwTo'. This is arguably undesirable behaviour. - - -} -throwTo :: Exception e => ThreadId -> e -> IO () -throwTo (ThreadId tid) ex = IO $ \ s -> - case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) - --- | Returns the 'ThreadId' of the calling thread (GHC only). -myThreadId :: IO ThreadId -myThreadId = IO $ \s -> - case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #) - - --- |The 'yield' action allows (forces, in a co-operative multitasking --- implementation) a context-switch to any other currently runnable --- threads (if any), and is occasionally useful when implementing --- concurrency abstractions. -yield :: IO () -yield = IO $ \s -> - case (yield# s) of s1 -> (# s1, () #) - -{- | 'labelThread' stores a string as identifier for this thread if -you built a RTS with debugging support. This identifier will be used in -the debugging output to make distinction of different threads easier -(otherwise you only have the thread state object\'s address in the heap). - -Other applications like the graphical Concurrent Haskell Debugger -() may choose to overload -'labelThread' for their purposes as well. --} - -labelThread :: ThreadId -> String -> IO () -labelThread (ThreadId t) str = IO $ \ s -> - let !ps = packCString# str - !adr = byteArrayContents# ps in - case (labelThread# t adr s) of s1 -> (# s1, () #) - --- Nota Bene: 'pseq' used to be 'seq' --- but 'seq' is now defined in PrelGHC --- --- "pseq" is defined a bit weirdly (see below) --- --- The reason for the strange "lazy" call is that --- it fools the compiler into thinking that pseq and par are non-strict in --- their second argument (even if it inlines pseq at the call site). --- If it thinks pseq is strict in "y", then it often evaluates --- "y" before "x", which is totally wrong. - -{-# INLINE pseq #-} -pseq :: a -> b -> b -pseq x y = x `seq` lazy y - -{-# INLINE par #-} -par :: a -> b -> b -par x y = case (par# x) of { _ -> lazy y } - --- | Internal function used by the RTS to run sparks. -runSparks :: IO () -runSparks = IO loop - where loop s = case getSpark# s of - (# s', n, p #) -> - if n ==# 0# then (# s', () #) - else p `seq` loop s' - -data BlockReason - = BlockedOnMVar - -- ^blocked on on 'MVar' - | BlockedOnBlackHole - -- ^blocked on a computation in progress by another thread - | BlockedOnException - -- ^blocked in 'throwTo' - | BlockedOnSTM - -- ^blocked in 'retry' in an STM transaction - | BlockedOnForeignCall - -- ^currently in a foreign call - | BlockedOnOther - -- ^blocked on some other resource. Without @-threaded@, - -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@ - -- they show up as 'BlockedOnMVar'. - deriving (Eq,Ord,Show) - --- | The current status of a thread -data ThreadStatus - = ThreadRunning - -- ^the thread is currently runnable or running - | ThreadFinished - -- ^the thread has finished - | ThreadBlocked BlockReason - -- ^the thread is blocked on some resource - | ThreadDied - -- ^the thread received an uncaught exception - deriving (Eq,Ord,Show) - -threadStatus :: ThreadId -> IO ThreadStatus -threadStatus (ThreadId t) = IO $ \s -> - case threadStatus# t s of - (# s', stat #) -> (# s', mk_stat (I# stat) #) - where - -- NB. keep these in sync with includes/Constants.h - mk_stat 0 = ThreadRunning - mk_stat 1 = ThreadBlocked BlockedOnMVar - mk_stat 2 = ThreadBlocked BlockedOnBlackHole - mk_stat 3 = ThreadBlocked BlockedOnException - mk_stat 7 = ThreadBlocked BlockedOnSTM - mk_stat 11 = ThreadBlocked BlockedOnForeignCall - mk_stat 12 = ThreadBlocked BlockedOnForeignCall - mk_stat 16 = ThreadFinished - mk_stat 17 = ThreadDied - mk_stat _ = ThreadBlocked BlockedOnOther -\end{code} - - -%************************************************************************ -%* * -\subsection[stm]{Transactional heap operations} -%* * -%************************************************************************ - -TVars are shared memory locations which support atomic memory -transactions. - -\begin{code} --- |A monad supporting atomic memory transactions. -newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) - -unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #)) -unSTM (STM a) = a - -INSTANCE_TYPEABLE1(STM,stmTc,"STM") - -instance Functor STM where - fmap f x = x >>= (return . f) - -instance Monad STM where - {-# INLINE return #-} - {-# INLINE (>>) #-} - {-# INLINE (>>=) #-} - m >> k = thenSTM m k - return x = returnSTM x - m >>= k = bindSTM m k - -bindSTM :: STM a -> (a -> STM b) -> STM b -bindSTM (STM m) k = STM ( \s -> - case m s of - (# new_s, a #) -> unSTM (k a) new_s - ) - -thenSTM :: STM a -> STM b -> STM b -thenSTM (STM m) k = STM ( \s -> - case m s of - (# new_s, _ #) -> unSTM k new_s - ) - -returnSTM :: a -> STM a -returnSTM x = STM (\s -> (# s, x #)) - --- | Unsafely performs IO in the STM monad. Beware: this is a highly --- dangerous thing to do. --- --- * The STM implementation will often run transactions multiple --- times, so you need to be prepared for this if your IO has any --- side effects. --- --- * The STM implementation will abort transactions that are known to --- be invalid and need to be restarted. This may happen in the middle --- of `unsafeIOToSTM`, so make sure you don't acquire any resources --- that need releasing (exception handlers are ignored when aborting --- the transaction). That includes doing any IO using Handles, for --- example. Getting this wrong will probably lead to random deadlocks. --- --- * The transaction may have seen an inconsistent view of memory when --- the IO runs. Invariants that you expect to be true throughout --- your program may not be true inside a transaction, due to the --- way transactions are implemented. Normally this wouldn't be visible --- to the programmer, but using `unsafeIOToSTM` can expose it. --- -unsafeIOToSTM :: IO a -> STM a -unsafeIOToSTM (IO m) = STM m - --- |Perform a series of STM actions atomically. --- --- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. --- Any attempt to do so will result in a runtime error. (Reason: allowing --- this would effectively allow a transaction inside a transaction, depending --- on exactly when the thunk is evaluated.) --- --- However, see 'newTVarIO', which can be called inside 'unsafePerformIO', --- and which allows top-level TVars to be allocated. - -atomically :: STM a -> IO a -atomically (STM m) = IO (\s -> (atomically# m) s ) - --- |Retry execution of the current memory transaction because it has seen --- values in TVars which mean that it should not continue (e.g. the TVars --- represent a shared buffer that is now empty). The implementation may --- block the thread until one of the TVars that it has read from has been --- udpated. (GHC only) -retry :: STM a -retry = STM $ \s# -> retry# s# - --- |Compose two alternative STM actions (GHC only). If the first action --- completes without retrying then it forms the result of the orElse. --- Otherwise, if the first action retries, then the second action is --- tried in its place. If both actions retry then the orElse as a --- whole retries. -orElse :: STM a -> STM a -> STM a -orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s - --- |Exception handling within STM actions. -catchSTM :: STM a -> (SomeException -> STM a) -> STM a -catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s - --- | Low-level primitive on which always and alwaysSucceeds are built. --- checkInv differs form these in that (i) the invariant is not --- checked when checkInv is called, only at the end of this and --- subsequent transcations, (ii) the invariant failure is indicated --- by raising an exception. -checkInv :: STM a -> STM () -checkInv (STM m) = STM (\s -> (check# m) s) - --- | alwaysSucceeds adds a new invariant that must be true when passed --- to alwaysSucceeds, at the end of the current transaction, and at --- the end of every subsequent transaction. If it fails at any --- of those points then the transaction violating it is aborted --- and the exception raised by the invariant is propagated. -alwaysSucceeds :: STM a -> STM () -alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) - checkInv i - --- | always is a variant of alwaysSucceeds in which the invariant is --- expressed as an STM Bool action that must return True. Returning --- False or raising an exception are both treated as invariant failures. -always :: STM Bool -> STM () -always i = alwaysSucceeds ( do v <- i - if (v) then return () else ( error "Transacional invariant violation" ) ) - --- |Shared memory locations that support atomic memory transactions. -data TVar a = TVar (TVar# RealWorld a) - -INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar") - -instance Eq (TVar a) where - (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2# - --- |Create a new TVar holding a value supplied -newTVar :: a -> STM (TVar a) -newTVar val = STM $ \s1# -> - case newTVar# val s1# of - (# s2#, tvar# #) -> (# s2#, TVar tvar# #) - --- |@IO@ version of 'newTVar'. This is useful for creating top-level --- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using --- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't --- possible. -newTVarIO :: a -> IO (TVar a) -newTVarIO val = IO $ \s1# -> - case newTVar# val s1# of - (# s2#, tvar# #) -> (# s2#, TVar tvar# #) - --- |Return the current value stored in a TVar. --- This is equivalent to --- --- > readTVarIO = atomically . readTVar --- --- but works much faster, because it doesn't perform a complete --- transaction, it just reads the current value of the 'TVar'. -readTVarIO :: TVar a -> IO a -readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s# - --- |Return the current value stored in a TVar -readTVar :: TVar a -> STM a -readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# - --- |Write the supplied value into a TVar -writeTVar :: TVar a -> a -> STM () -writeTVar (TVar tvar#) val = STM $ \s1# -> - case writeTVar# tvar# val s1# of - s2# -> (# s2#, () #) - -\end{code} - -MVar utilities - -\begin{code} -withMVar :: MVar a -> (a -> IO b) -> IO b -withMVar m io = - block $ do - a <- takeMVar m - b <- catchAny (unblock (io a)) - (\e -> do putMVar m a; throw e) - putMVar m a - return b -\end{code} - -%************************************************************************ -%* * -\subsection{Thread waiting} -%* * -%************************************************************************ - -\begin{code} -#ifdef mingw32_HOST_OS - --- Note: threadWaitRead and threadWaitWrite aren't really functional --- on Win32, but left in there because lib code (still) uses them (the manner --- in which they're used doesn't cause problems on a Win32 platform though.) - -asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) -asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = - IO $ \s -> case asyncRead# fd isSock len buf s of - (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) - -asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) -asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = - IO $ \s -> case asyncWrite# fd isSock len buf s of - (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) - -asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int -asyncDoProc (FunPtr proc) (Ptr param) = - -- the 'length' value is ignored; simplifies implementation of - -- the async*# primops to have them all return the same result. - IO $ \s -> case asyncDoProc# proc param s of - (# s', _len#, err# #) -> (# s', I# err# #) - --- to aid the use of these primops by the IO Handle implementation, --- provide the following convenience funs: - --- this better be a pinned byte array! -asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) -asyncReadBA fd isSock len off bufB = - asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) - -asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) -asyncWriteBA fd isSock len off bufB = - asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) - -#endif - --- ----------------------------------------------------------------------------- --- Thread IO API +import GHC.Conc.IO +import GHC.Conc.Sync --- | Block the current thread until data is available to read on the --- given file descriptor (GHC only). -threadWaitRead :: Fd -> IO () -threadWaitRead fd #ifndef mingw32_HOST_OS - | threaded = waitForReadEvent fd +import GHC.Conc.Signal #endif - | otherwise = IO $ \s -> - case fromIntegral fd of { I# fd# -> - case waitRead# fd# s of { s' -> (# s', () #) - }} - --- | Block the current thread until data can be written to the --- given file descriptor (GHC only). -threadWaitWrite :: Fd -> IO () -threadWaitWrite fd -#ifndef mingw32_HOST_OS - | threaded = waitForWriteEvent fd -#endif - | otherwise = IO $ \s -> - case fromIntegral fd of { I# fd# -> - case waitWrite# fd# s of { s' -> (# s', () #) - }} - --- | Suspends the current thread for a given number of microseconds --- (GHC only). --- --- There is no guarantee that the thread will be rescheduled promptly --- when the delay has expired, but the thread will never continue to --- run /earlier/ than specified. --- -threadDelay :: Int -> IO () -threadDelay time - | threaded = waitForDelayEvent time - | otherwise = IO $ \s -> - case fromIntegral time of { I# time# -> - case delay# time# s of { s' -> (# s', () #) - }} - - --- | Set the value of returned TVar to True after a given number of --- microseconds. The caveats associated with threadDelay also apply. --- -registerDelay :: Int -> IO (TVar Bool) -registerDelay usecs - | threaded = waitForDelayEventSTM usecs - | otherwise = error "registerDelay: requires -threaded" - -foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool - -waitForDelayEvent :: Int -> IO () -waitForDelayEvent usecs = do - m <- newEmptyMVar - target <- calculateTarget usecs - atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) - prodServiceThread - takeMVar m - --- Delays for use in STM -waitForDelayEventSTM :: Int -> IO (TVar Bool) -waitForDelayEventSTM usecs = do - t <- atomically $ newTVar False - target <- calculateTarget usecs - atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ())) - prodServiceThread - return t - -calculateTarget :: Int -> IO USecs -calculateTarget usecs = do - now <- getUSecOfDay - return $ now + (fromIntegral usecs) - - --- ---------------------------------------------------------------------------- --- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay - --- In the threaded RTS, we employ a single IO Manager thread to wait --- for all outstanding IO requests (threadWaitRead,threadWaitWrite) --- and delays (threadDelay). --- --- We can do this because in the threaded RTS the IO Manager can make --- a non-blocking call to select(), so we don't have to do select() in --- the scheduler as we have to in the non-threaded RTS. We get performance --- benefits from doing it this way, because we only have to restart the select() --- when a new request arrives, rather than doing one select() each time --- around the scheduler loop. Furthermore, the scheduler can be simplified --- by not having to check for completed IO requests. - --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - -#ifndef mingw32_HOST_OS -data IOReq - = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) - | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) -#endif - -data DelayReq - = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ()) - | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) - -#ifndef mingw32_HOST_OS -pendingEvents :: IORef [IOReq] -#endif -pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here -{-# NOINLINE pendingEvents #-} -{-# NOINLINE pendingDelays #-} -(pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread - reqs <- newIORef [] - dels <- newIORef [] - return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) - -ensureIOManagerIsRunning :: IO () -ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () - | otherwise = return () - -insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] -insertDelay d [] = [d] -insertDelay d1 ds@(d2 : rest) - | delayTime d1 <= delayTime d2 = d1 : ds - | otherwise = d2 : insertDelay d1 rest - -delayTime :: DelayReq -> USecs -delayTime (Delay t _) = t -delayTime (DelaySTM t _) = t - -type USecs = Word64 - -foreign import ccall unsafe "getUSecOfDay" - getUSecOfDay :: IO USecs - -prodding :: IORef Bool -{-# NOINLINE prodding #-} -prodding = unsafePerformIO (newIORef False) - -prodServiceThread :: IO () -prodServiceThread = do - was_set <- atomicModifyIORef prodding (\a -> (True,a)) - if (not (was_set)) then wakeupIOManager else return () - -#ifdef mingw32_HOST_OS --- ---------------------------------------------------------------------------- --- Windows IO manager thread - -startIOManagerThread :: IO () -startIOManagerThread = do - wakeup <- c_getIOManagerEvent - _ <- forkIO $ service_loop wakeup [] - return () - -service_loop :: HANDLE -- read end of pipe - -> [DelayReq] -- current delay requests - -> IO () - -service_loop wakeup old_delays = do - -- pick up new delay requests - new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) - let delays = foldr insertDelay old_delays new_delays - - now <- getUSecOfDay - (delays', timeout) <- getDelay now delays - - r <- c_WaitForSingleObject wakeup timeout - case r of - 0xffffffff -> do c_maperrno; throwErrno "service_loop" - 0 -> do - r2 <- c_readIOManagerEvent - exit <- - case r2 of - _ | r2 == io_MANAGER_WAKEUP -> return False - _ | r2 == io_MANAGER_DIE -> return True - 0 -> return False -- spurious wakeup - _ -> do start_console_handler (r2 `shiftR` 1); return False - unless exit $ service_cont wakeup delays' - - _other -> service_cont wakeup delays' -- probably timeout - -service_cont :: HANDLE -> [DelayReq] -> IO () -service_cont wakeup delays = do - r <- atomicModifyIORef prodding (\_ -> (False,False)) - r `seq` return () -- avoid space leak - service_loop wakeup delays - --- must agree with rts/win32/ThrIOManager.c -io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 -io_MANAGER_WAKEUP = 0xffffffff -io_MANAGER_DIE = 0xfffffffe - -data ConsoleEvent - = ControlC - | Break - | Close - -- these are sent to Services only. - | Logoff - | Shutdown - deriving (Eq, Ord, Enum, Show, Read, Typeable) - -start_console_handler :: Word32 -> IO () -start_console_handler r = - case toWin32ConsoleEvent r of - Just x -> withMVar win32ConsoleHandler $ \handler -> do - _ <- forkIO (handler x) - return () - Nothing -> return () - -toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent -toWin32ConsoleEvent ev = - case ev of - 0 {- CTRL_C_EVENT-} -> Just ControlC - 1 {- CTRL_BREAK_EVENT-} -> Just Break - 2 {- CTRL_CLOSE_EVENT-} -> Just Close - 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff - 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown - _ -> Nothing - -win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) -win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) - --- XXX Is this actually needed? -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - -wakeupIOManager :: IO () -wakeupIOManager = do - _hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP - --- Walk the queue of pending delays, waking up any that have passed --- and return the smallest delay to wait for. The queue of pending --- delays is kept ordered. -getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) -getDelay _ [] = return ([], iNFINITE) -getDelay now all@(d : rest) - = case d of - Delay time m | now >= time -> do - putMVar m () - getDelay now rest - DelaySTM time t | now >= time -> do - atomically $ writeTVar t True - getDelay now rest - _otherwise -> - -- delay is in millisecs for WaitForSingleObject - let micro_seconds = delayTime d - now - milli_seconds = (micro_seconds + 999) `div` 1000 - in return (all, fromIntegral milli_seconds) - --- ToDo: this just duplicates part of System.Win32.Types, which isn't --- available yet. We should move some Win32 functionality down here, --- maybe as part of the grand reorganisation of the base package... -type HANDLE = Ptr () -type DWORD = Word32 - -iNFINITE :: DWORD -iNFINITE = 0xFFFFFFFF -- urgh - -foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) - c_getIOManagerEvent :: IO HANDLE - -foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c) - c_readIOManagerEvent :: IO Word32 - -foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c) - c_sendIOManagerEvent :: Word32 -> IO () - -foreign import ccall unsafe "maperrno" -- in Win32Utils.c - c_maperrno :: IO () - -foreign import stdcall "WaitForSingleObject" - c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD - -#else --- ---------------------------------------------------------------------------- --- Unix IO manager thread, using select() - -startIOManagerThread :: IO () -startIOManagerThread = do - allocaArray 2 $ \fds -> do - throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) - rd_end <- peekElemOff fds 0 - wr_end <- peekElemOff fds 1 - setNonBlockingFD wr_end True -- writes happen in a signal handler, we - -- don't want them to block. - setCloseOnExec rd_end - setCloseOnExec wr_end - writeIORef stick (fromIntegral wr_end) - c_setIOManagerPipe wr_end - _ <- forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] - return () - -service_loop - :: Fd -- listen to this for wakeup calls - -> Ptr CFdSet - -> Ptr CFdSet - -> Ptr CTimeVal - -> [IOReq] - -> [DelayReq] - -> IO () -service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do - - -- pick up new IO requests - new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) - let reqs = new_reqs ++ old_reqs - - -- pick up new delay requests - new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) - let delays0 = foldr insertDelay old_delays new_delays - - -- build the FDSets for select() - fdZero readfds - fdZero writefds - fdSet wakeup readfds - maxfd <- buildFdSets 0 readfds writefds reqs - - -- perform the select() - let do_select delays = do - -- check the current time and wake up any thread in - -- threadDelay whose timeout has expired. Also find the - -- timeout value for the select() call. - now <- getUSecOfDay - (delays', timeout) <- getDelay now ptimeval delays - - res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds - nullPtr timeout - if (res == -1) - then do - err <- getErrno - case err of - _ | err == eINTR -> do_select delays' - -- EINTR: just redo the select() - _ | err == eBADF -> return (True, delays) - -- EBADF: one of the file descriptors is closed or bad, - -- we don't know which one, so wake everyone up. - _ | otherwise -> throwErrno "select" - -- otherwise (ENOMEM or EINVAL) something has gone - -- wrong; report the error. - else - return (False,delays') - - (wakeup_all,delays') <- do_select delays0 - - exit <- - if wakeup_all then return False - else do - b <- fdIsSet wakeup readfds - if b == 0 - then return False - else alloca $ \p -> do - warnErrnoIfMinus1_ "service_loop" $ - c_read (fromIntegral wakeup) p 1 - s <- peek p - case s of - _ | s == io_MANAGER_WAKEUP -> return False - _ | s == io_MANAGER_DIE -> return True - _ | s == io_MANAGER_SYNC -> do - mvars <- readIORef sync - mapM_ (flip putMVar ()) mvars - return False - _ -> do - fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) - withForeignPtr fp $ \p_siginfo -> do - r <- c_read (fromIntegral wakeup) (castPtr p_siginfo) - sizeof_siginfo_t - when (r /= fromIntegral sizeof_siginfo_t) $ - error "failed to read siginfo_t" - runHandlers' fp (fromIntegral s) - return False - - unless exit $ do - - atomicModifyIORef prodding (\_ -> (False, ())) - - reqs' <- if wakeup_all then do wakeupAll reqs; return [] - else completeRequests reqs readfds writefds [] - - service_loop wakeup readfds writefds ptimeval reqs' delays' - -io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8 -io_MANAGER_WAKEUP = 0xff -io_MANAGER_DIE = 0xfe -io_MANAGER_SYNC = 0xfd - --- | the stick is for poking the IO manager with -stick :: IORef Fd -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) - -{-# NOINLINE sync #-} -sync :: IORef [MVar ()] -sync = unsafePerformIO (newIORef []) - --- waits for the IO manager to drain the pipe -syncIOManager :: IO () -syncIOManager = do - m <- newEmptyMVar - atomicModifyIORef sync (\old -> (m:old,())) - fd <- readIORef stick - with io_MANAGER_SYNC $ \pbuf -> do - warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1 - takeMVar m - -wakeupIOManager :: IO () -wakeupIOManager = do - fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1 - --- For the non-threaded RTS -runHandlers :: Ptr Word8 -> Int -> IO () -runHandlers p_info sig = do - fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) - withForeignPtr fp $ \p -> do - copyBytes p p_info (fromIntegral sizeof_siginfo_t) - free p_info - runHandlers' fp (fromIntegral sig) - -runHandlers' :: ForeignPtr Word8 -> Signal -> IO () -runHandlers' p_info sig = do - let int = fromIntegral sig - withMVar signal_handlers $ \arr -> - if not (inRange (boundsIOArray arr) int) - then return () - else do handler <- unsafeReadIOArray arr int - case handler of - Nothing -> return () - Just (f,_) -> do _ <- forkIO (f p_info) - return () - -warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO () -warnErrnoIfMinus1_ what io - = do r <- io - when (r == -1) $ do - errno <- getErrno - str <- strerror errno >>= peekCString - when (r == -1) $ - debugErrLn ("Warning: " ++ what ++ " failed: " ++ str) - -foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar) - -foreign import ccall "setIOManagerPipe" - c_setIOManagerPipe :: CInt -> IO () - -foreign import ccall "__hscore_sizeof_siginfo_t" - sizeof_siginfo_t :: CSize - -type Signal = CInt - -maxSig = 64 :: Int - -type HandlerFun = ForeignPtr Word8 -> IO () - --- Lock used to protect concurrent access to signal_handlers. Symptom of --- this race condition is #1922, although that bug was on Windows a similar --- bug also exists on Unix. -{-# NOINLINE signal_handlers #-} -signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) -signal_handlers = unsafePerformIO $ do - arr <- newIOArray (0,maxSig) Nothing - m <- newMVar arr - block $ do - stable_ref <- newStablePtr m - let ref = castStablePtrToPtr stable_ref - ref2 <- getOrSetSignalHandlerStore ref - if ref==ref2 - then return m - else do freeStablePtr stable_ref - deRefStablePtr (castPtrToStablePtr ref2) - -foreign import ccall unsafe "getOrSetSignalHandlerStore" - getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a) - -setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) -setHandler sig handler = do - let int = fromIntegral sig - withMVar signal_handlers $ \arr -> - if not (inRange (boundsIOArray arr) int) - then error "GHC.Conc.setHandler: signal out of range" - else do old <- unsafeReadIOArray arr int - unsafeWriteIOArray arr int handler - return old - --- ----------------------------------------------------------------------------- --- IO requests - -buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd -buildFdSets maxfd _ _ [] = return maxfd -buildFdSets maxfd readfds writefds (Read fd _ : reqs) - | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" - | otherwise = do - fdSet fd readfds - buildFdSets (max maxfd fd) readfds writefds reqs -buildFdSets maxfd readfds writefds (Write fd _ : reqs) - | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" - | otherwise = do - fdSet fd writefds - buildFdSets (max maxfd fd) readfds writefds reqs - -completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] - -> IO [IOReq] -completeRequests [] _ _ reqs' = return reqs' -completeRequests (Read fd m : reqs) readfds writefds reqs' = do - b <- fdIsSet fd readfds - if b /= 0 - then do putMVar m (); completeRequests reqs readfds writefds reqs' - else completeRequests reqs readfds writefds (Read fd m : reqs') -completeRequests (Write fd m : reqs) readfds writefds reqs' = do - b <- fdIsSet fd writefds - if b /= 0 - then do putMVar m (); completeRequests reqs readfds writefds reqs' - else completeRequests reqs readfds writefds (Write fd m : reqs') - -wakeupAll :: [IOReq] -> IO () -wakeupAll [] = return () -wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs -wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs - -waitForReadEvent :: Fd -> IO () -waitForReadEvent fd = do - m <- newEmptyMVar - atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ())) - prodServiceThread - takeMVar m - -waitForWriteEvent :: Fd -> IO () -waitForWriteEvent fd = do - m <- newEmptyMVar - atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ())) - prodServiceThread - takeMVar m - --- ----------------------------------------------------------------------------- --- Delays - --- Walk the queue of pending delays, waking up any that have passed --- and return the smallest delay to wait for. The queue of pending --- delays is kept ordered. -getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) -getDelay _ _ [] = return ([],nullPtr) -getDelay now ptimeval all@(d : rest) - = case d of - Delay time m | now >= time -> do - putMVar m () - getDelay now ptimeval rest - DelaySTM time t | now >= time -> do - atomically $ writeTVar t True - getDelay now ptimeval rest - _otherwise -> do - setTimevalTicks ptimeval (delayTime d - now) - return (all,ptimeval) - -data CTimeVal - -foreign import ccall unsafe "sizeofTimeVal" - sizeofTimeVal :: Int - -foreign import ccall unsafe "setTimevalTicks" - setTimevalTicks :: Ptr CTimeVal -> USecs -> IO () - -{- - On Win32 we're going to have a single Pipe, and a - waitForSingleObject with the delay time. For signals, we send a - byte down the pipe just like on Unix. --} - --- ---------------------------------------------------------------------------- --- select() interface - --- ToDo: move to System.Posix.Internals? - -data CFdSet - -foreign import ccall safe "__hscore_select" - c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal - -> IO CInt - -foreign import ccall unsafe "hsFD_SETSIZE" - c_fD_SETSIZE :: CInt - -fD_SETSIZE :: Fd -fD_SETSIZE = fromIntegral c_fD_SETSIZE - -foreign import ccall unsafe "hsFD_ISSET" - c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt - -fdIsSet :: Fd -> Ptr CFdSet -> IO CInt -fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset - -foreign import ccall unsafe "hsFD_SET" - c_fdSet :: CInt -> Ptr CFdSet -> IO () - -fdSet :: Fd -> Ptr CFdSet -> IO () -fdSet (Fd fd) fdset = c_fdSet fd fdset - -foreign import ccall unsafe "hsFD_ZERO" - fdZero :: Ptr CFdSet -> IO () - -foreign import ccall unsafe "sizeof_fd_set" - sizeofFdSet :: Int - -#endif - -reportStackOverflow :: IO () -reportStackOverflow = callStackOverflowHook - -reportError :: SomeException -> IO () -reportError ex = do - handler <- getUncaughtExceptionHandler - handler ex - --- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove --- the unsafe below. -foreign import ccall unsafe "stackOverflow" - callStackOverflowHook :: IO () - -{-# NOINLINE uncaughtExceptionHandler #-} -uncaughtExceptionHandler :: IORef (SomeException -> IO ()) -uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler) - where - defaultHandler :: SomeException -> IO () - defaultHandler se@(SomeException ex) = do - (hFlush stdout) `catchAny` (\ _ -> return ()) - let msg = case cast ex of - Just Deadlock -> "no threads to run: infinite loop or deadlock?" - _ -> case cast ex of - Just (ErrorCall s) -> s - _ -> showsPrec 0 se "" - withCString "%s" $ \cfmt -> - withCString msg $ \cmsg -> - errorBelch cfmt cmsg - --- don't use errorBelch() directly, because we cannot call varargs functions --- using the FFI. -foreign import ccall unsafe "HsBase.h errorBelch2" - errorBelch :: CString -> CString -> IO () - -setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO () -setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler - -getUncaughtExceptionHandler :: IO (SomeException -> IO ()) -getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler \end{code}