+{-# LANGUAGE CPP
+ , ForeignFunctionInterface
+ , MagicHash
+ , UnboxedTuples
+ , ScopedTypeVariables
+ #-}
+{-# OPTIONS_GHC -fno-warn-unused-imports #-}
+
-----------------------------------------------------------------------------
-- |
-- Module : Control.Concurrent
-----------------------------------------------------------------------------
module Control.Concurrent (
- -- * Concurrent Haskell
+ -- * Concurrent Haskell
- -- $conc_intro
+ -- $conc_intro
- -- * Basic concurrency operations
+ -- * Basic concurrency operations
ThreadId,
#ifdef __GLASGOW_HASKELL__
- myThreadId,
+ myThreadId,
#endif
- forkIO,
+ forkIO,
#ifdef __GLASGOW_HASKELL__
- killThread,
- throwTo,
+ forkIOWithUnmask,
+ killThread,
+ throwTo,
#endif
- -- * Scheduling
+ -- ** Threads with affinity
+ forkOn,
+ forkOnWithUnmask,
+ getNumCapabilities,
+ threadCapability,
+
+ -- * Scheduling
+
+ -- $conc_scheduling
+ yield, -- :: IO ()
- -- $conc_scheduling
- yield, -- :: IO ()
+ -- ** Blocking
- -- ** Blocking
-
- -- $blocking
+ -- $blocking
#ifdef __GLASGOW_HASKELL__
- -- ** Waiting
- threadDelay, -- :: Int -> IO ()
- threadWaitRead, -- :: Int -> IO ()
- threadWaitWrite, -- :: Int -> IO ()
+ -- ** Waiting
+ threadDelay, -- :: Int -> IO ()
+ threadWaitRead, -- :: Int -> IO ()
+ threadWaitWrite, -- :: Int -> IO ()
#endif
- -- * Communication abstractions
+ -- * Communication abstractions
- module Control.Concurrent.MVar,
- module Control.Concurrent.Chan,
- module Control.Concurrent.QSem,
- module Control.Concurrent.QSemN,
- module Control.Concurrent.SampleVar,
+ module Control.Concurrent.MVar,
+ module Control.Concurrent.Chan,
+ module Control.Concurrent.QSem,
+ module Control.Concurrent.QSemN,
+ module Control.Concurrent.SampleVar,
- -- * Merging of streams
+ -- * Merging of streams
#ifndef __HUGS__
- mergeIO, -- :: [a] -> [a] -> IO [a]
- nmergeIO, -- :: [[a]] -> IO [a]
+ mergeIO, -- :: [a] -> [a] -> IO [a]
+ nmergeIO, -- :: [[a]] -> IO [a]
#endif
- -- $merge
+ -- $merge
#ifdef __GLASGOW_HASKELL__
- -- * Bound Threads
- -- $boundthreads
- rtsSupportsBoundThreads,
- forkOS,
- isCurrentThreadBound,
- runInBoundThread,
- runInUnboundThread
+ -- * Bound Threads
+ -- $boundthreads
+ rtsSupportsBoundThreads,
+ forkOS,
+ isCurrentThreadBound,
+ runInBoundThread,
+ runInUnboundThread,
#endif
- -- * GHC's implementation of concurrency
+ -- * GHC's implementation of concurrency
+
+ -- |This section describes features specific to GHC's
+ -- implementation of Concurrent Haskell.
+
+ -- ** Haskell threads and Operating System threads
- -- |This section describes features specific to GHC's
- -- implementation of Concurrent Haskell.
-
- -- ** Haskell threads and Operating System threads
+ -- $osthreads
- -- $osthreads
+ -- ** Terminating the program
- -- ** Terminating the program
+ -- $termination
- -- $termination
+ -- ** Pre-emption
- -- ** Pre-emption
+ -- $preemption
+
+ -- * Deprecated functions
+ forkIOUnmasked
- -- $preemption
) where
import Prelude
-import Control.Exception as Exception
+import Control.Exception.Base as Exception
#ifdef __GLASGOW_HASKELL__
-import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield,
- threadDelay, threadWaitRead, threadWaitWrite,
- forkIO, childHandler )
-import GHC.TopHandler ( reportStackOverflow, reportError )
-import GHC.IOBase ( IO(..) )
-import GHC.IOBase ( unsafeInterleaveIO )
-import GHC.IOBase ( newIORef, readIORef, writeIORef )
+import GHC.Exception
+import GHC.Conc hiding (threadWaitRead, threadWaitWrite)
+import qualified GHC.Conc
+import GHC.IO ( IO(..), unsafeInterleaveIO, unsafeUnmask )
+import GHC.IORef ( newIORef, readIORef, writeIORef )
import GHC.Base
+import System.Posix.Types ( Fd )
import Foreign.StablePtr
import Foreign.C.Types ( CInt )
import Control.Monad ( when )
+
+#ifdef mingw32_HOST_OS
+import Foreign.C
+import System.IO
+#endif
#endif
#ifdef __HUGS__
-}
{- $blocking
-Calling a foreign C procedure (such as @getchar@) that blocks waiting
-for input will block /all/ threads, unless the @threadsafe@ attribute
-is used on the foreign call (and your compiler \/ operating system
-supports it). GHC's I\/O system uses non-blocking I\/O internally to
-implement thread-friendly I\/O, so calling standard Haskell I\/O
-functions blocks only the thread making the call.
+Different Haskell implementations have different characteristics with
+regard to which operations block /all/ threads.
+
+Using GHC without the @-threaded@ option, all foreign calls will block
+all other Haskell threads in the system, although I\/O operations will
+not. With the @-threaded@ option, only foreign calls with the @unsafe@
+attribute will block all other threads.
+
+Using Hugs, all I\/O operations and foreign calls will block all other
+Haskell threads.
-}
#ifndef __HUGS__
-- preemptive multitasking.
mergeIO ls rs
- = newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
+ = newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
return val
-type Buffer a
+type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
- [] -> takeMVar branches_running >>= \ val ->
- if val == 1 then
- takeMVar tail_list >>= \ node ->
- putMVar node [] >>
- putMVar tail_list node
- else
- putMVar branches_running (val-1)
- (x:xs) ->
- waitQSem e >>
- takeMVar tail_list >>= \ node ->
- newEmptyMVar >>= \ next_node ->
- unsafeInterleaveIO (
- takeMVar next_node >>= \ y ->
- signalQSem e >>
- return y) >>= \ next_node_val ->
- putMVar node (x:next_node_val) >>
- putMVar tail_list next_node >>
- suckIO branches_running buff xs
+ [] -> takeMVar branches_running >>= \ val ->
+ if val == 1 then
+ takeMVar tail_list >>= \ node ->
+ putMVar node [] >>
+ putMVar tail_list node
+ else
+ putMVar branches_running (val-1)
+ (x:xs) ->
+ waitQSem e >>
+ takeMVar tail_list >>= \ node ->
+ newEmptyMVar >>= \ next_node ->
+ unsafeInterleaveIO (
+ takeMVar next_node >>= \ y ->
+ signalQSem e >>
+ return y) >>= \ next_node_val ->
+ putMVar node (x:next_node_val) >>
+ putMVar tail_list next_node >>
+ suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
- newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
+ newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
- newMVar len >>= \ branches_running ->
+ newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
libraries (OpenGL, for example) will not work from a thread created using
'forkIO'. They work fine in threads created using 'forkOS' or when called
from @main@ or from a @foreign export@.
+
+In terms of performance, 'forkOS' (aka bound) threads are much more
+expensive than 'forkIO' (aka unbound) threads, because a 'forkOS'
+thread is tied to a particular OS thread, whereas a 'forkIO' thread
+can be run by any OS thread. Context-switching between a 'forkOS'
+thread and a 'forkIO' thread is many times more expensive than between
+two 'forkIO' threads.
+
+Note in particular that the main program thread (the thread running
+@Main.main@) is always a bound thread, so for good concurrency
+performance you should ensure that the main thread is not doing
+repeated communication with other threads in the system. Typically
+this means forking subthreads to do the work using 'forkIO', and
+waiting for the results in the main thread.
+
-}
-- | 'True' if bound threads are supported.
foreign import ccall rtsSupportsBoundThreads :: Bool
-{- |
-Like 'forkIO', this sparks off a new thread to run the 'IO' computation passed as the
-first argument, and returns the 'ThreadId' of the newly created
-thread.
-
-However, @forkOS@ uses operating system-supplied multithreading support to create
-a new operating system thread. The new thread is /bound/, which means that
-all foreign calls made by the 'IO' computation are guaranteed to be executed
-in this new operating system thread; also, the operating system thread is not
-used for any other foreign calls.
-
-This means that you can use all kinds of foreign libraries from this thread
-(even those that rely on thread-local state), without the limitations of 'forkIO'.
-
-Just to clarify, 'forkOS' is /only/ necessary if you need to associate
-a Haskell thread with a particular OS thread. It is not necessary if
-you only need to make non-blocking foreign calls (see "Control.Concurrent#osthreads").
-
+{- |
+Like 'forkIO', this sparks off a new thread to run the 'IO'
+computation passed as the first argument, and returns the 'ThreadId'
+of the newly created thread.
+
+However, 'forkOS' creates a /bound/ thread, which is necessary if you
+need to call foreign (non-Haskell) libraries that make use of
+thread-local state, such as OpenGL (see "Control.Concurrent#boundthreads").
+
+Using 'forkOS' instead of 'forkIO' makes no difference at all to the
+scheduling behaviour of the Haskell runtime system. It is a common
+misconception that you need to use 'forkOS' instead of 'forkIO' to
+avoid blocking all the Haskell threads when making a foreign call;
+this isn't the case. To allow foreign calls to be made without
+blocking all the Haskell threads (with GHC), it is only necessary to
+use the @-threaded@ option when linking your program, and to make sure
+the foreign import is not marked @unsafe@.
-}
+
forkOS :: IO () -> IO ThreadId
foreign export ccall forkOS_entry
foreign import ccall "forkOS_entry" forkOS_entry_reimported
:: StablePtr (IO ()) -> IO ()
+forkOS_entry :: StablePtr (IO ()) -> IO ()
forkOS_entry stableAction = do
- action <- deRefStablePtr stableAction
- action
+ action <- deRefStablePtr stableAction
+ action
foreign import ccall forkOS_createThread
:: StablePtr (IO ()) -> IO CInt
+failNonThreaded :: IO a
failNonThreaded = fail $ "RTS doesn't support multiple OS threads "
++"(use ghc -threaded when linking)"
-
-forkOS action
+
+forkOS action0
| rtsSupportsBoundThreads = do
- mv <- newEmptyMVar
- let action_plus = Exception.catch action childHandler
- entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
- err <- forkOS_createThread entry
- when (err /= 0) $ fail "Cannot create OS thread."
- tid <- takeMVar mv
- freeStablePtr entry
- return tid
+ mv <- newEmptyMVar
+ b <- Exception.getMaskingState
+ let
+ -- async exceptions are masked in the child if they are masked
+ -- in the parent, as for forkIO (see #1048). forkOS_createThread
+ -- creates a thread with exceptions masked by default.
+ action1 = case b of
+ Unmasked -> unsafeUnmask action0
+ MaskedInterruptible -> action0
+ MaskedUninterruptible -> uninterruptibleMask_ action0
+
+ action_plus = Exception.catch action1 childHandler
+
+ entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
+ err <- forkOS_createThread entry
+ when (err /= 0) $ fail "Cannot create OS thread."
+ tid <- takeMVar mv
+ freeStablePtr entry
+ return tid
| otherwise = failNonThreaded
-- | Returns 'True' if the calling thread is /bound/, that is, if it is
-- safe to use foreign libraries that rely on thread-local state from the
-- calling thread.
isCurrentThreadBound :: IO Bool
-isCurrentThreadBound = IO $ \ s# ->
+isCurrentThreadBound = IO $ \ s# ->
case isCurrentThreadBound# s# of
(# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
runInBoundThread action
| rtsSupportsBoundThreads = do
- bound <- isCurrentThreadBound
- if bound
- then action
- else do
- ref <- newIORef undefined
- let action_plus = Exception.try action >>= writeIORef ref
- resultOrException <-
- bracket (newStablePtr action_plus)
- freeStablePtr
- (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
- case resultOrException of
- Left exception -> Exception.throw exception
- Right result -> return result
+ bound <- isCurrentThreadBound
+ if bound
+ then action
+ else do
+ ref <- newIORef undefined
+ let action_plus = Exception.try action >>= writeIORef ref
+ bracket (newStablePtr action_plus)
+ freeStablePtr
+ (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) >>=
+ unsafeResult
| otherwise = failNonThreaded
{- |
doesn't need it's main thread to be bound and makes /heavy/ use of concurrency
(e.g. a web server), might want to wrap it's @main@ action in
@runInUnboundThread@.
+
+Note that exceptions which are thrown to the current thread are thrown in turn
+to the thread that is executing the given computation. This ensures there's
+always a way of killing the forked thread.
-}
runInUnboundThread :: IO a -> IO a
runInUnboundThread action = do
- bound <- isCurrentThreadBound
- if bound
- then do
- mv <- newEmptyMVar
- forkIO (Exception.try action >>= putMVar mv)
- takeMVar mv >>= \either -> case either of
- Left exception -> Exception.throw exception
- Right result -> return result
- else action
-
+ bound <- isCurrentThreadBound
+ if bound
+ then do
+ mv <- newEmptyMVar
+ mask $ \restore -> do
+ tid <- forkIO $ Exception.try (restore action) >>= putMVar mv
+ let wait = takeMVar mv `Exception.catch` \(e :: SomeException) ->
+ Exception.throwTo tid e >> wait
+ wait >>= unsafeResult
+ else action
+
+unsafeResult :: Either SomeException a -> IO a
+unsafeResult = either Exception.throwIO return
#endif /* __GLASGOW_HASKELL__ */
+#ifdef __GLASGOW_HASKELL__
+-- ---------------------------------------------------------------------------
+-- threadWaitRead/threadWaitWrite
+
+-- | Block the current thread until data is available to read on the
+-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked. To safely close a file descriptor
+-- that has been used with 'threadWaitRead', use
+-- 'GHC.Conc.closeFdWith'.
+threadWaitRead :: Fd -> IO ()
+threadWaitRead fd
+#ifdef mingw32_HOST_OS
+ -- we have no IO manager implementing threadWaitRead on Windows.
+ -- fdReady does the right thing, but we have to call it in a
+ -- separate thread, otherwise threadWaitRead won't be interruptible,
+ -- and this only works with -threaded.
+ | threaded = withThread (waitFd fd 0)
+ | otherwise = case fd of
+ 0 -> do _ <- hWaitForInput stdin (-1)
+ return ()
+ -- hWaitForInput does work properly, but we can only
+ -- do this for stdin since we know its FD.
+ _ -> error "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput"
+#else
+ = GHC.Conc.threadWaitRead fd
+#endif
+
+-- | Block the current thread until data can be written to the
+-- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked. To safely close a file descriptor
+-- that has been used with 'threadWaitWrite', use
+-- 'GHC.Conc.closeFdWith'.
+threadWaitWrite :: Fd -> IO ()
+threadWaitWrite fd
+#ifdef mingw32_HOST_OS
+ | threaded = withThread (waitFd fd 1)
+ | otherwise = error "threadWaitWrite requires -threaded on Windows"
+#else
+ = GHC.Conc.threadWaitWrite fd
+#endif
+
+#ifdef mingw32_HOST_OS
+foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
+
+withThread :: IO a -> IO a
+withThread io = do
+ m <- newEmptyMVar
+ _ <- mask_ $ forkIO $ try io >>= putMVar m
+ x <- takeMVar m
+ case x of
+ Right a -> return a
+ Left e -> throwIO (e :: IOException)
+
+waitFd :: Fd -> CInt -> IO ()
+waitFd fd write = do
+ throwErrnoIfMinus1_ "fdReady" $
+ fdReady (fromIntegral fd) write iNFINITE 0
+
+iNFINITE :: CInt
+iNFINITE = 0xFFFFFFFF -- urgh
+
+foreign import ccall safe "fdReady"
+ fdReady :: CInt -> CInt -> CInt -> CInt -> IO CInt
+#endif
+
-- ---------------------------------------------------------------------------
-- More docs
The "System.IO" library manages multiplexing in its own way. On
Windows systems it uses @safe@ foreign calls to ensure that
threads doing I\/O operations don't block the whole runtime,
- whereas on Unix systems all the currently blocked I\/O reqwests
+ whereas on Unix systems all the currently blocked I\/O requests
are managed by a single thread (the /IO manager thread/) using
@select@.
> case cs of
> [] -> return ()
> m:ms -> do
-> putMVar children ms
-> takeMVar m
-> waitForChildren
->
-> forkChild :: IO () -> IO ()
+> putMVar children ms
+> takeMVar m
+> waitForChildren
+>
+> forkChild :: IO () -> IO ThreadId
> forkChild io = do
-> mvar <- newEmptyMVar
-> childs <- takeMVar children
-> putMVar children (mvar:childs)
-> forkIO (io `finally` putMVar mvar ())
+> mvar <- newEmptyMVar
+> childs <- takeMVar children
+> putMVar children (mvar:childs)
+> forkIO (io `finally` putMVar mvar ())
>
> main =
-> later waitForChildren $
-> ...
+> later waitForChildren $
+> ...
The main thread principle also applies to calls to Haskell from
outside, using @foreign export@. When the @foreign export@ed
lock is woken up, but haven't found it to be useful for anything
other than this example :-)
-}
+#endif /* __GLASGOW_HASKELL__ */