X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=Control%2FConcurrent.hs;h=62a30b434889b0cdd1e410d2c10275d740d85b65;hb=7dbb606d7b57cdad87a0ffbdb6ea4a274ebca7c0;hp=d9c35b83a303b03d624678840c76d0818be82df5;hpb=5ba59750c2005575da6d1ab199aae369d3d81c0c;p=ghc-base.git diff --git a/Control/Concurrent.hs b/Control/Concurrent.hs index d9c35b8..62a30b4 100644 --- a/Control/Concurrent.hs +++ b/Control/Concurrent.hs @@ -1,3 +1,11 @@ +{-# LANGUAGE CPP + , ForeignFunctionInterface + , MagicHash + , UnboxedTuples + , ScopedTypeVariables + #-} +{-# OPTIONS_GHC -fno-warn-unused-imports #-} + ----------------------------------------------------------------------------- -- | -- Module : Control.Concurrent @@ -14,99 +22,114 @@ ----------------------------------------------------------------------------- module Control.Concurrent ( - -- * Concurrent Haskell + -- * Concurrent Haskell - -- $conc_intro + -- $conc_intro - -- * Basic concurrency operations + -- * Basic concurrency operations ThreadId, #ifdef __GLASGOW_HASKELL__ - myThreadId, + myThreadId, #endif - forkIO, + forkIO, #ifdef __GLASGOW_HASKELL__ - killThread, - throwTo, + forkIOWithUnmask, + killThread, + throwTo, #endif - -- * Scheduling + -- ** Threads with affinity + forkOn, + forkOnWithUnmask, + getNumCapabilities, + threadCapability, + + -- * Scheduling + + -- $conc_scheduling + yield, -- :: IO () - -- $conc_scheduling - yield, -- :: IO () + -- ** Blocking - -- ** Blocking - - -- $blocking + -- $blocking #ifdef __GLASGOW_HASKELL__ - -- ** Waiting - threadDelay, -- :: Int -> IO () - threadWaitRead, -- :: Int -> IO () - threadWaitWrite, -- :: Int -> IO () + -- ** Waiting + threadDelay, -- :: Int -> IO () + threadWaitRead, -- :: Int -> IO () + threadWaitWrite, -- :: Int -> IO () #endif - -- * Communication abstractions + -- * Communication abstractions - module Control.Concurrent.MVar, - module Control.Concurrent.Chan, - module Control.Concurrent.QSem, - module Control.Concurrent.QSemN, - module Control.Concurrent.SampleVar, + module Control.Concurrent.MVar, + module Control.Concurrent.Chan, + module Control.Concurrent.QSem, + module Control.Concurrent.QSemN, + module Control.Concurrent.SampleVar, - -- * Merging of streams + -- * Merging of streams #ifndef __HUGS__ - mergeIO, -- :: [a] -> [a] -> IO [a] - nmergeIO, -- :: [[a]] -> IO [a] + mergeIO, -- :: [a] -> [a] -> IO [a] + nmergeIO, -- :: [[a]] -> IO [a] #endif - -- $merge + -- $merge #ifdef __GLASGOW_HASKELL__ - -- * Bound Threads - -- $boundthreads - rtsSupportsBoundThreads, - forkOS, - isCurrentThreadBound, - runInBoundThread, - runInUnboundThread + -- * Bound Threads + -- $boundthreads + rtsSupportsBoundThreads, + forkOS, + isCurrentThreadBound, + runInBoundThread, + runInUnboundThread, #endif - -- * GHC's implementation of concurrency + -- * GHC's implementation of concurrency + + -- |This section describes features specific to GHC's + -- implementation of Concurrent Haskell. + + -- ** Haskell threads and Operating System threads - -- |This section describes features specific to GHC's - -- implementation of Concurrent Haskell. - - -- ** Haskell threads and Operating System threads + -- $osthreads - -- $osthreads + -- ** Terminating the program - -- ** Terminating the program + -- $termination - -- $termination + -- ** Pre-emption - -- ** Pre-emption + -- $preemption + + -- * Deprecated functions + forkIOUnmasked - -- $preemption ) where import Prelude -import Control.Exception as Exception +import Control.Exception.Base as Exception #ifdef __GLASGOW_HASKELL__ -import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield, - threadDelay, threadWaitRead, threadWaitWrite, - forkIO, childHandler ) -import GHC.TopHandler ( reportStackOverflow, reportError ) -import GHC.IOBase ( IO(..) ) -import GHC.IOBase ( unsafeInterleaveIO ) -import GHC.IOBase ( newIORef, readIORef, writeIORef ) +import GHC.Exception +import GHC.Conc hiding (threadWaitRead, threadWaitWrite) +import qualified GHC.Conc +import GHC.IO ( IO(..), unsafeInterleaveIO, unsafeUnmask ) +import GHC.IORef ( newIORef, readIORef, writeIORef ) import GHC.Base +import System.Posix.Types ( Fd ) import Foreign.StablePtr import Foreign.C.Types ( CInt ) import Control.Monad ( when ) + +#ifdef mingw32_HOST_OS +import Foreign.C +import System.IO +#endif #endif #ifdef __HUGS__ @@ -193,8 +216,8 @@ nmergeIO :: [[a]] -> IO [a] -- preemptive multitasking. mergeIO ls rs - = newEmptyMVar >>= \ tail_node -> - newMVar tail_node >>= \ tail_list -> + = newEmptyMVar >>= \ tail_node -> + newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar 2 >>= \ branches_running -> let @@ -202,50 +225,50 @@ mergeIO ls rs in forkIO (suckIO branches_running buff ls) >> forkIO (suckIO branches_running buff rs) >> - takeMVar tail_node >>= \ val -> - signalQSem e >> + takeMVar tail_node >>= \ val -> + signalQSem e >> return val -type Buffer a +type Buffer a = (MVar (MVar [a]), QSem) suckIO :: MVar Int -> Buffer a -> [a] -> IO () suckIO branches_running buff@(tail_list,e) vs = case vs of - [] -> takeMVar branches_running >>= \ val -> - if val == 1 then - takeMVar tail_list >>= \ node -> - putMVar node [] >> - putMVar tail_list node - else - putMVar branches_running (val-1) - (x:xs) -> - waitQSem e >> - takeMVar tail_list >>= \ node -> - newEmptyMVar >>= \ next_node -> - unsafeInterleaveIO ( - takeMVar next_node >>= \ y -> - signalQSem e >> - return y) >>= \ next_node_val -> - putMVar node (x:next_node_val) >> - putMVar tail_list next_node >> - suckIO branches_running buff xs + [] -> takeMVar branches_running >>= \ val -> + if val == 1 then + takeMVar tail_list >>= \ node -> + putMVar node [] >> + putMVar tail_list node + else + putMVar branches_running (val-1) + (x:xs) -> + waitQSem e >> + takeMVar tail_list >>= \ node -> + newEmptyMVar >>= \ next_node -> + unsafeInterleaveIO ( + takeMVar next_node >>= \ y -> + signalQSem e >> + return y) >>= \ next_node_val -> + putMVar node (x:next_node_val) >> + putMVar tail_list next_node >> + suckIO branches_running buff xs nmergeIO lss = let len = length lss in - newEmptyMVar >>= \ tail_node -> - newMVar tail_node >>= \ tail_list -> + newEmptyMVar >>= \ tail_node -> + newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> - newMVar len >>= \ branches_running -> + newMVar len >>= \ branches_running -> let buff = (tail_list,e) in mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> - takeMVar tail_node >>= \ val -> - signalQSem e >> + takeMVar tail_node >>= \ val -> + signalQSem e >> return val where mapIO f xs = sequence (map f xs) @@ -285,6 +308,21 @@ state variables that have specific values for each OS thread libraries (OpenGL, for example) will not work from a thread created using 'forkIO'. They work fine in threads created using 'forkOS' or when called from @main@ or from a @foreign export@. + +In terms of performance, 'forkOS' (aka bound) threads are much more +expensive than 'forkIO' (aka unbound) threads, because a 'forkOS' +thread is tied to a particular OS thread, whereas a 'forkIO' thread +can be run by any OS thread. Context-switching between a 'forkOS' +thread and a 'forkIO' thread is many times more expensive than between +two 'forkIO' threads. + +Note in particular that the main program thread (the thread running +@Main.main@) is always a bound thread, so for good concurrency +performance you should ensure that the main thread is not doing +repeated communication with other threads in the system. Typically +this means forking subthreads to do the work using 'forkIO', and +waiting for the results in the main thread. + -} -- | 'True' if bound threads are supported. @@ -294,29 +332,25 @@ from @main@ or from a @foreign export@. foreign import ccall rtsSupportsBoundThreads :: Bool -{- | -Like 'forkIO', this sparks off a new thread to run the 'IO' computation passed as the -first argument, and returns the 'ThreadId' of the newly created -thread. - -However, @forkOS@ uses operating system-supplied multithreading support to create -a new operating system thread. The new thread is /bound/, which means that -all foreign calls made by the 'IO' computation are guaranteed to be executed -in this new operating system thread; also, the operating system thread is not -used for any other foreign calls. - -This means that you can use all kinds of foreign libraries from this thread -(even those that rely on thread-local state), without the limitations of 'forkIO'. - -Just to clarify, 'forkOS' is /only/ necessary if you need to associate -a Haskell thread with a particular OS thread. It is not necessary if -you only need to make non-blocking foreign calls (see -"Control.Concurrent#osthreads"). Neither is it necessary if you want -to run threads in parallel on a multiprocessor: threads created with -'forkIO' will be shared out amongst the running CPUs (using GHC, -@-threaded@, and the @+RTS -N@ runtime option). - +{- | +Like 'forkIO', this sparks off a new thread to run the 'IO' +computation passed as the first argument, and returns the 'ThreadId' +of the newly created thread. + +However, 'forkOS' creates a /bound/ thread, which is necessary if you +need to call foreign (non-Haskell) libraries that make use of +thread-local state, such as OpenGL (see "Control.Concurrent#boundthreads"). + +Using 'forkOS' instead of 'forkIO' makes no difference at all to the +scheduling behaviour of the Haskell runtime system. It is a common +misconception that you need to use 'forkOS' instead of 'forkIO' to +avoid blocking all the Haskell threads when making a foreign call; +this isn't the case. To allow foreign calls to be made without +blocking all the Haskell threads (with GHC), it is only necessary to +use the @-threaded@ option when linking your program, and to make sure +the foreign import is not marked @unsafe@. -} + forkOS :: IO () -> IO ThreadId foreign export ccall forkOS_entry @@ -325,33 +359,46 @@ foreign export ccall forkOS_entry foreign import ccall "forkOS_entry" forkOS_entry_reimported :: StablePtr (IO ()) -> IO () +forkOS_entry :: StablePtr (IO ()) -> IO () forkOS_entry stableAction = do - action <- deRefStablePtr stableAction - action + action <- deRefStablePtr stableAction + action foreign import ccall forkOS_createThread :: StablePtr (IO ()) -> IO CInt +failNonThreaded :: IO a failNonThreaded = fail $ "RTS doesn't support multiple OS threads " ++"(use ghc -threaded when linking)" - -forkOS action + +forkOS action0 | rtsSupportsBoundThreads = do - mv <- newEmptyMVar - let action_plus = Exception.catch action childHandler - entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus) - err <- forkOS_createThread entry - when (err /= 0) $ fail "Cannot create OS thread." - tid <- takeMVar mv - freeStablePtr entry - return tid + mv <- newEmptyMVar + b <- Exception.getMaskingState + let + -- async exceptions are masked in the child if they are masked + -- in the parent, as for forkIO (see #1048). forkOS_createThread + -- creates a thread with exceptions masked by default. + action1 = case b of + Unmasked -> unsafeUnmask action0 + MaskedInterruptible -> action0 + MaskedUninterruptible -> uninterruptibleMask_ action0 + + action_plus = Exception.catch action1 childHandler + + entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus) + err <- forkOS_createThread entry + when (err /= 0) $ fail "Cannot create OS thread." + tid <- takeMVar mv + freeStablePtr entry + return tid | otherwise = failNonThreaded -- | Returns 'True' if the calling thread is /bound/, that is, if it is -- safe to use foreign libraries that rely on thread-local state from the -- calling thread. isCurrentThreadBound :: IO Bool -isCurrentThreadBound = IO $ \ s# -> +isCurrentThreadBound = IO $ \ s# -> case isCurrentThreadBound# s# of (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #) @@ -369,19 +416,16 @@ runInBoundThread :: IO a -> IO a runInBoundThread action | rtsSupportsBoundThreads = do - bound <- isCurrentThreadBound - if bound - then action - else do - ref <- newIORef undefined - let action_plus = Exception.try action >>= writeIORef ref - resultOrException <- - bracket (newStablePtr action_plus) - freeStablePtr - (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) - case resultOrException of - Left exception -> Exception.throw exception - Right result -> return result + bound <- isCurrentThreadBound + if bound + then action + else do + ref <- newIORef undefined + let action_plus = Exception.try action >>= writeIORef ref + bracket (newStablePtr action_plus) + freeStablePtr + (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) >>= + unsafeResult | otherwise = failNonThreaded {- | @@ -394,22 +438,98 @@ performance loss due to the use of bound threads. A program that doesn't need it's main thread to be bound and makes /heavy/ use of concurrency (e.g. a web server), might want to wrap it's @main@ action in @runInUnboundThread@. + +Note that exceptions which are thrown to the current thread are thrown in turn +to the thread that is executing the given computation. This ensures there's +always a way of killing the forked thread. -} runInUnboundThread :: IO a -> IO a runInUnboundThread action = do - bound <- isCurrentThreadBound - if bound - then do - mv <- newEmptyMVar - forkIO (Exception.try action >>= putMVar mv) - takeMVar mv >>= \either -> case either of - Left exception -> Exception.throw exception - Right result -> return result - else action - + bound <- isCurrentThreadBound + if bound + then do + mv <- newEmptyMVar + mask $ \restore -> do + tid <- forkIO $ Exception.try (restore action) >>= putMVar mv + let wait = takeMVar mv `Exception.catch` \(e :: SomeException) -> + Exception.throwTo tid e >> wait + wait >>= unsafeResult + else action + +unsafeResult :: Either SomeException a -> IO a +unsafeResult = either Exception.throwIO return #endif /* __GLASGOW_HASKELL__ */ +#ifdef __GLASGOW_HASKELL__ +-- --------------------------------------------------------------------------- +-- threadWaitRead/threadWaitWrite + +-- | Block the current thread until data is available to read on the +-- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. To safely close a file descriptor +-- that has been used with 'threadWaitRead', use +-- 'GHC.Conc.closeFdWith'. +threadWaitRead :: Fd -> IO () +threadWaitRead fd +#ifdef mingw32_HOST_OS + -- we have no IO manager implementing threadWaitRead on Windows. + -- fdReady does the right thing, but we have to call it in a + -- separate thread, otherwise threadWaitRead won't be interruptible, + -- and this only works with -threaded. + | threaded = withThread (waitFd fd 0) + | otherwise = case fd of + 0 -> do _ <- hWaitForInput stdin (-1) + return () + -- hWaitForInput does work properly, but we can only + -- do this for stdin since we know its FD. + _ -> error "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput" +#else + = GHC.Conc.threadWaitRead fd +#endif + +-- | Block the current thread until data can be written to the +-- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. To safely close a file descriptor +-- that has been used with 'threadWaitWrite', use +-- 'GHC.Conc.closeFdWith'. +threadWaitWrite :: Fd -> IO () +threadWaitWrite fd +#ifdef mingw32_HOST_OS + | threaded = withThread (waitFd fd 1) + | otherwise = error "threadWaitWrite requires -threaded on Windows" +#else + = GHC.Conc.threadWaitWrite fd +#endif + +#ifdef mingw32_HOST_OS +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + +withThread :: IO a -> IO a +withThread io = do + m <- newEmptyMVar + _ <- mask_ $ forkIO $ try io >>= putMVar m + x <- takeMVar m + case x of + Right a -> return a + Left e -> throwIO (e :: IOException) + +waitFd :: Fd -> CInt -> IO () +waitFd fd write = do + throwErrnoIfMinus1_ "fdReady" $ + fdReady (fromIntegral fd) write iNFINITE 0 + +iNFINITE :: CInt +iNFINITE = 0xFFFFFFFF -- urgh + +foreign import ccall safe "fdReady" + fdReady :: CInt -> CInt -> CInt -> CInt -> IO CInt +#endif + -- --------------------------------------------------------------------------- -- More docs @@ -436,7 +556,7 @@ runInUnboundThread action = do The "System.IO" library manages multiplexing in its own way. On Windows systems it uses @safe@ foreign calls to ensure that threads doing I\/O operations don't block the whole runtime, - whereas on Unix systems all the currently blocked I\/O reqwests + whereas on Unix systems all the currently blocked I\/O requests are managed by a single thread (the /IO manager thread/) using @select@. @@ -492,20 +612,20 @@ runInUnboundThread action = do > case cs of > [] -> return () > m:ms -> do -> putMVar children ms -> takeMVar m -> waitForChildren -> +> putMVar children ms +> takeMVar m +> waitForChildren +> > forkChild :: IO () -> IO ThreadId > forkChild io = do -> mvar <- newEmptyMVar -> childs <- takeMVar children -> putMVar children (mvar:childs) -> forkIO (io `finally` putMVar mvar ()) +> mvar <- newEmptyMVar +> childs <- takeMVar children +> putMVar children (mvar:childs) +> forkIO (io `finally` putMVar mvar ()) > > main = -> later waitForChildren $ -> ... +> later waitForChildren $ +> ... The main thread principle also applies to calls to Haskell from outside, using @foreign export@. When the @foreign export@ed @@ -544,3 +664,4 @@ runInUnboundThread action = do lock is woken up, but haven't found it to be useful for anything other than this example :-) -} +#endif /* __GLASGOW_HASKELL__ */