-----------------------------------------------------------------------------
module Control.Concurrent (
- -- * Concurrent Haskell
+ -- * Concurrent Haskell
- -- $conc_intro
+ -- $conc_intro
- -- * Basic concurrency operations
+ -- * Basic concurrency operations
ThreadId,
#ifdef __GLASGOW_HASKELL__
- myThreadId,
+ myThreadId,
#endif
- forkIO,
+ forkIO,
#ifdef __GLASGOW_HASKELL__
- killThread,
- throwTo,
+ killThread,
+ throwTo,
#endif
- -- * Scheduling
+ -- * Scheduling
- -- $conc_scheduling
- yield, -- :: IO ()
+ -- $conc_scheduling
+ yield, -- :: IO ()
- -- ** Blocking
-
- -- $blocking
+ -- ** Blocking
+
+ -- $blocking
#ifdef __GLASGOW_HASKELL__
- -- ** Waiting
- threadDelay, -- :: Int -> IO ()
- threadWaitRead, -- :: Int -> IO ()
- threadWaitWrite, -- :: Int -> IO ()
+ -- ** Waiting
+ threadDelay, -- :: Int -> IO ()
+ threadWaitRead, -- :: Int -> IO ()
+ threadWaitWrite, -- :: Int -> IO ()
#endif
- -- * Communication abstractions
+ -- * Communication abstractions
- module Control.Concurrent.MVar,
- module Control.Concurrent.Chan,
- module Control.Concurrent.QSem,
- module Control.Concurrent.QSemN,
- module Control.Concurrent.SampleVar,
+ module Control.Concurrent.MVar,
+ module Control.Concurrent.Chan,
+ module Control.Concurrent.QSem,
+ module Control.Concurrent.QSemN,
+ module Control.Concurrent.SampleVar,
- -- * Merging of streams
+ -- * Merging of streams
#ifndef __HUGS__
- mergeIO, -- :: [a] -> [a] -> IO [a]
- nmergeIO, -- :: [[a]] -> IO [a]
+ mergeIO, -- :: [a] -> [a] -> IO [a]
+ nmergeIO, -- :: [[a]] -> IO [a]
#endif
- -- $merge
+ -- $merge
#ifdef __GLASGOW_HASKELL__
- -- * Bound Threads
- -- $boundthreads
- rtsSupportsBoundThreads,
- forkOS,
- isCurrentThreadBound,
- runInBoundThread,
- runInUnboundThread
+ -- * Bound Threads
+ -- $boundthreads
+ rtsSupportsBoundThreads,
+ forkOS,
+ isCurrentThreadBound,
+ runInBoundThread,
+ runInUnboundThread
#endif
- -- * GHC's implementation of concurrency
+ -- * GHC's implementation of concurrency
+
+ -- |This section describes features specific to GHC's
+ -- implementation of Concurrent Haskell.
+
+ -- ** Haskell threads and Operating System threads
- -- |This section describes features specific to GHC's
- -- implementation of Concurrent Haskell.
-
- -- ** Terminating the program
+ -- $osthreads
- -- $termination
+ -- ** Terminating the program
- -- ** Pre-emption
+ -- $termination
- -- $preemption
+ -- ** Pre-emption
+
+ -- $preemption
) where
import Prelude
-import Control.Exception as Exception
+import Control.Exception.Base as Exception
#ifdef __GLASGOW_HASKELL__
-import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield,
- threadDelay, threadWaitRead, threadWaitWrite )
+import GHC.Exception
+import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield,
+ threadDelay, forkIO, childHandler )
+import qualified GHC.Conc
import GHC.TopHandler ( reportStackOverflow, reportError )
-import GHC.IOBase ( IO(..) )
-import GHC.IOBase ( unsafeInterleaveIO )
-import GHC.IOBase ( newIORef, readIORef, writeIORef )
+import GHC.IOBase ( IO(..) )
+import GHC.IOBase ( unsafeInterleaveIO )
+import GHC.IOBase ( newIORef, readIORef, writeIORef )
import GHC.Base
+import System.Posix.Types ( Fd )
import Foreign.StablePtr
import Foreign.C.Types ( CInt )
import Control.Monad ( when )
+
+#ifdef mingw32_HOST_OS
+import Foreign.C
+import System.IO
+import GHC.Handle
+#endif
#endif
#ifdef __HUGS__
-}
{- $blocking
-Calling a foreign C procedure (such as @getchar@) that blocks waiting
-for input will block /all/ threads, unless the @threadsafe@ attribute
-is used on the foreign call (and your compiler \/ operating system
-supports it). GHC's I\/O system uses non-blocking I\/O internally to
-implement thread-friendly I\/O, so calling standard Haskell I\/O
-functions blocks only the thread making the call.
--}
-
--- Thread Ids, specifically the instances of Eq and Ord for these things.
--- The ThreadId type itself is defined in std/PrelConc.lhs.
-
--- Rather than define a new primitve, we use a little helper function
--- cmp_thread in the RTS.
-
-#ifdef __GLASGOW_HASKELL__
-id2TSO :: ThreadId -> ThreadId#
-id2TSO (ThreadId t) = t
+Different Haskell implementations have different characteristics with
+regard to which operations block /all/ threads.
-foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> Int
--- Returns -1, 0, 1
+Using GHC without the @-threaded@ option, all foreign calls will block
+all other Haskell threads in the system, although I\/O operations will
+not. With the @-threaded@ option, only foreign calls with the @unsafe@
+attribute will block all other threads.
-cmpThread :: ThreadId -> ThreadId -> Ordering
-cmpThread t1 t2 =
- case cmp_thread (id2TSO t1) (id2TSO t2) of
- -1 -> LT
- 0 -> EQ
- _ -> GT -- must be 1
-
-instance Eq ThreadId where
- t1 == t2 =
- case t1 `cmpThread` t2 of
- EQ -> True
- _ -> False
-
-instance Ord ThreadId where
- compare = cmpThread
-
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
-
-instance Show ThreadId where
- showsPrec d t =
- showString "ThreadId " .
- showsPrec d (getThreadId (id2TSO t))
-
-{- |
-This sparks off a new thread to run the 'IO' computation passed as the
-first argument, and returns the 'ThreadId' of the newly created
-thread.
-
-The new thread will be a lightweight thread; if you want to use a foreign
-library that uses thread-local storage, use 'forkOS' instead.
+Using Hugs, all I\/O operations and foreign calls will block all other
+Haskell threads.
-}
-forkIO :: IO () -> IO ThreadId
-forkIO action = IO $ \ s ->
- case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
- where
- action_plus = Exception.catch action childHandler
-
-childHandler :: Exception -> IO ()
-childHandler err = Exception.catch (real_handler err) childHandler
-
-real_handler :: Exception -> IO ()
-real_handler ex =
- case ex of
- -- ignore thread GC and killThread exceptions:
- BlockedOnDeadMVar -> return ()
- BlockedIndefinitely -> return ()
- AsyncException ThreadKilled -> return ()
-
- -- report all others:
- AsyncException StackOverflow -> reportStackOverflow False
- other -> reportError False other
-
-#endif /* __GLASGOW_HASKELL__ */
#ifndef __HUGS__
max_buff_size :: Int
-- preemptive multitasking.
mergeIO ls rs
- = newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
+ = newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
return val
-type Buffer a
+type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
- [] -> takeMVar branches_running >>= \ val ->
- if val == 1 then
- takeMVar tail_list >>= \ node ->
- putMVar node [] >>
- putMVar tail_list node
- else
- putMVar branches_running (val-1)
- (x:xs) ->
- waitQSem e >>
- takeMVar tail_list >>= \ node ->
- newEmptyMVar >>= \ next_node ->
- unsafeInterleaveIO (
- takeMVar next_node >>= \ y ->
- signalQSem e >>
- return y) >>= \ next_node_val ->
- putMVar node (x:next_node_val) >>
- putMVar tail_list next_node >>
- suckIO branches_running buff xs
+ [] -> takeMVar branches_running >>= \ val ->
+ if val == 1 then
+ takeMVar tail_list >>= \ node ->
+ putMVar node [] >>
+ putMVar tail_list node
+ else
+ putMVar branches_running (val-1)
+ (x:xs) ->
+ waitQSem e >>
+ takeMVar tail_list >>= \ node ->
+ newEmptyMVar >>= \ next_node ->
+ unsafeInterleaveIO (
+ takeMVar next_node >>= \ y ->
+ signalQSem e >>
+ return y) >>= \ next_node_val ->
+ putMVar node (x:next_node_val) >>
+ putMVar tail_list next_node >>
+ suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
- newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
+ newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
- newMVar len >>= \ branches_running ->
+ newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
-- Bound Threads
{- $boundthreads
+ #boundthreads#
Support for multiple operating system threads and bound threads as described
below is currently only available in the GHC runtime system if you use the
This means that you can use all kinds of foreign libraries from this thread
(even those that rely on thread-local state), without the limitations of 'forkIO'.
+
+Just to clarify, 'forkOS' is /only/ necessary if you need to associate
+a Haskell thread with a particular OS thread. It is not necessary if
+you only need to make non-blocking foreign calls (see
+"Control.Concurrent#osthreads"). Neither is it necessary if you want
+to run threads in parallel on a multiprocessor: threads created with
+'forkIO' will be shared out amongst the running CPUs (using GHC,
+@-threaded@, and the @+RTS -N@ runtime option).
+
-}
forkOS :: IO () -> IO ThreadId
:: StablePtr (IO ()) -> IO ()
forkOS_entry stableAction = do
- action <- deRefStablePtr stableAction
- action
+ action <- deRefStablePtr stableAction
+ action
foreign import ccall forkOS_createThread
:: StablePtr (IO ()) -> IO CInt
failNonThreaded = fail $ "RTS doesn't support multiple OS threads "
++"(use ghc -threaded when linking)"
-
-forkOS action
+
+forkOS action0
| rtsSupportsBoundThreads = do
- mv <- newEmptyMVar
- let action_plus = Exception.catch action childHandler
- entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
- err <- forkOS_createThread entry
- when (err /= 0) $ fail "Cannot create OS thread."
- tid <- takeMVar mv
- freeStablePtr entry
- return tid
+ mv <- newEmptyMVar
+ b <- Exception.blocked
+ let
+ -- async exceptions are blocked in the child if they are blocked
+ -- in the parent, as for forkIO (see #1048). forkOS_createThread
+ -- creates a thread with exceptions blocked by default.
+ action1 | b = action0
+ | otherwise = unblock action0
+
+ action_plus = Exception.catch action1 childHandler
+
+ entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
+ err <- forkOS_createThread entry
+ when (err /= 0) $ fail "Cannot create OS thread."
+ tid <- takeMVar mv
+ freeStablePtr entry
+ return tid
| otherwise = failNonThreaded
-- | Returns 'True' if the calling thread is /bound/, that is, if it is
-- safe to use foreign libraries that rely on thread-local state from the
-- calling thread.
isCurrentThreadBound :: IO Bool
-isCurrentThreadBound = IO $ \ s# ->
+isCurrentThreadBound = IO $ \ s# ->
case isCurrentThreadBound# s# of
(# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
runInBoundThread action
| rtsSupportsBoundThreads = do
- bound <- isCurrentThreadBound
- if bound
- then action
- else do
- ref <- newIORef undefined
- let action_plus = Exception.try action >>= writeIORef ref
- resultOrException <-
- bracket (newStablePtr action_plus)
- freeStablePtr
- (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
- case resultOrException of
- Left exception -> Exception.throw exception
- Right result -> return result
+ bound <- isCurrentThreadBound
+ if bound
+ then action
+ else do
+ ref <- newIORef undefined
+ let action_plus = Exception.try action >>= writeIORef ref
+ resultOrException <-
+ bracket (newStablePtr action_plus)
+ freeStablePtr
+ (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
+ case resultOrException of
+ Left exception -> Exception.throw (exception :: SomeException)
+ Right result -> return result
| otherwise = failNonThreaded
{- |
mv <- newEmptyMVar
forkIO (Exception.try action >>= putMVar mv)
takeMVar mv >>= \either -> case either of
- Left exception -> Exception.throw exception
+ Left exception -> Exception.throw (exception :: SomeException)
Right result -> return result
else action
-
+
#endif /* __GLASGOW_HASKELL__ */
+#ifdef __GLASGOW_HASKELL__
+-- ---------------------------------------------------------------------------
+-- threadWaitRead/threadWaitWrite
+
+-- | Block the current thread until data is available to read on the
+-- given file descriptor (GHC only).
+threadWaitRead :: Fd -> IO ()
+threadWaitRead fd
+#ifdef mingw32_HOST_OS
+ -- we have no IO manager implementing threadWaitRead on Windows.
+ -- fdReady does the right thing, but we have to call it in a
+ -- separate thread, otherwise threadWaitRead won't be interruptible,
+ -- and this only works with -threaded.
+ | threaded = withThread (waitFd fd 0)
+ | otherwise = case fd of
+ 0 -> do hWaitForInput stdin (-1); return ()
+ -- hWaitForInput does work properly, but we can only
+ -- do this for stdin since we know its FD.
+ _ -> error "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput"
+#else
+ = GHC.Conc.threadWaitRead fd
+#endif
+
+-- | Block the current thread until data can be written to the
+-- given file descriptor (GHC only).
+threadWaitWrite :: Fd -> IO ()
+threadWaitWrite fd
+#ifdef mingw32_HOST_OS
+ | threaded = withThread (waitFd fd 1)
+ | otherwise = error "threadWaitWrite requires -threaded on Windows"
+#else
+ = GHC.Conc.threadWaitWrite fd
+#endif
+
+#ifdef mingw32_HOST_OS
+foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
+
+withThread :: IO a -> IO a
+withThread io = do
+ m <- newEmptyMVar
+ forkIO $ try io >>= putMVar m
+ x <- takeMVar m
+ case x of
+ Right a -> return a
+ Left e -> throwIO (e :: IOException)
+
+waitFd :: Fd -> CInt -> IO ()
+waitFd fd write = do
+ throwErrnoIfMinus1 "fdReady" $
+ fdReady (fromIntegral fd) write (fromIntegral iNFINITE) 0
+ return ()
+
+iNFINITE = 0xFFFFFFFF :: CInt -- urgh
+
+foreign import ccall safe "fdReady"
+ fdReady :: CInt -> CInt -> CInt -> CInt -> IO CInt
+#endif
+
-- ---------------------------------------------------------------------------
-- More docs
+{- $osthreads
+
+ #osthreads# In GHC, threads created by 'forkIO' are lightweight threads, and
+ are managed entirely by the GHC runtime. Typically Haskell
+ threads are an order of magnitude or two more efficient (in
+ terms of both time and space) than operating system threads.
+
+ The downside of having lightweight threads is that only one can
+ run at a time, so if one thread blocks in a foreign call, for
+ example, the other threads cannot continue. The GHC runtime
+ works around this by making use of full OS threads where
+ necessary. When the program is built with the @-threaded@
+ option (to link against the multithreaded version of the
+ runtime), a thread making a @safe@ foreign call will not block
+ the other threads in the system; another OS thread will take
+ over running Haskell threads until the original call returns.
+ The runtime maintains a pool of these /worker/ threads so that
+ multiple Haskell threads can be involved in external calls
+ simultaneously.
+
+ The "System.IO" library manages multiplexing in its own way. On
+ Windows systems it uses @safe@ foreign calls to ensure that
+ threads doing I\/O operations don't block the whole runtime,
+ whereas on Unix systems all the currently blocked I\/O reqwests
+ are managed by a single thread (the /IO manager thread/) using
+ @select@.
+
+ The runtime will run a Haskell thread using any of the available
+ worker OS threads. If you need control over which particular OS
+ thread is used to run a given Haskell thread, perhaps because
+ you need to call a foreign library that uses OS-thread-local
+ state, then you need bound threads (see "Control.Concurrent#boundthreads").
+
+ If you don't use the @-threaded@ option, then the runtime does
+ not make use of multiple OS threads. Foreign calls will block
+ all other running Haskell threads until the call returns. The
+ "System.IO" library still does multiplexing, so there can be multiple
+ threads doing I\/O, and this is handled internally by the runtime using
+ @select@.
+-}
+
{- $termination
In a standalone GHC program, only the main thread is
> case cs of
> [] -> return ()
> m:ms -> do
-> putMVar children ms
-> takeMVar m
-> waitForChildren
->
-> forkChild :: IO () -> IO ()
+> putMVar children ms
+> takeMVar m
+> waitForChildren
+>
+> forkChild :: IO () -> IO ThreadId
> forkChild io = do
-> mvar <- newEmptyMVar
-> forkIO (io `finally` putMVar mvar ())
-> childs <- takeMVar children
-> putMVar children (mvar:childs)
+> mvar <- newEmptyMVar
+> childs <- takeMVar children
+> putMVar children (mvar:childs)
+> forkIO (io `finally` putMVar mvar ())
>
> main =
-> later waitForChildren $
-> ...
+> later waitForChildren $
+> ...
The main thread principle also applies to calls to Haskell from
outside, using @foreign export@. When the @foreign export@ed
lock is woken up, but haven't found it to be useful for anything
other than this example :-)
-}
+#endif /* __GLASGOW_HASKELL__ */