-----------------------------------------------------------------------------
module Control.Concurrent (
- -- * Concurrent Haskell
+ -- * Concurrent Haskell
- -- $conc_intro
+ -- $conc_intro
- -- * Basic concurrency operations
+ -- * Basic concurrency operations
ThreadId,
#ifdef __GLASGOW_HASKELL__
- myThreadId,
+ myThreadId,
#endif
- forkIO,
+ forkIO,
#ifdef __GLASGOW_HASKELL__
- killThread,
- throwTo,
+ killThread,
+ throwTo,
#endif
- -- * Scheduling
+ -- * Scheduling
- -- $conc_scheduling
- yield, -- :: IO ()
+ -- $conc_scheduling
+ yield, -- :: IO ()
- -- ** Blocking
-
- -- $blocking
+ -- ** Blocking
+
+ -- $blocking
#ifdef __GLASGOW_HASKELL__
- -- ** Waiting
- threadDelay, -- :: Int -> IO ()
- threadWaitRead, -- :: Int -> IO ()
- threadWaitWrite, -- :: Int -> IO ()
+ -- ** Waiting
+ threadDelay, -- :: Int -> IO ()
+ threadWaitRead, -- :: Int -> IO ()
+ threadWaitWrite, -- :: Int -> IO ()
#endif
- -- * Communication abstractions
+ -- * Communication abstractions
- module Control.Concurrent.MVar,
- module Control.Concurrent.Chan,
- module Control.Concurrent.QSem,
- module Control.Concurrent.QSemN,
- module Control.Concurrent.SampleVar,
+ module Control.Concurrent.MVar,
+ module Control.Concurrent.Chan,
+ module Control.Concurrent.QSem,
+ module Control.Concurrent.QSemN,
+ module Control.Concurrent.SampleVar,
- -- * Merging of streams
+ -- * Merging of streams
#ifndef __HUGS__
- mergeIO, -- :: [a] -> [a] -> IO [a]
- nmergeIO, -- :: [[a]] -> IO [a]
+ mergeIO, -- :: [a] -> [a] -> IO [a]
+ nmergeIO, -- :: [[a]] -> IO [a]
#endif
- -- $merge
+ -- $merge
#ifdef __GLASGOW_HASKELL__
- -- * Bound Threads
- -- $boundthreads
- rtsSupportsBoundThreads,
- forkOS,
- isCurrentThreadBound,
- runInBoundThread,
- runInUnboundThread
+ -- * Bound Threads
+ -- $boundthreads
+ rtsSupportsBoundThreads,
+ forkOS,
+ isCurrentThreadBound,
+ runInBoundThread,
+ runInUnboundThread
#endif
- -- * GHC's implementation of concurrency
+ -- * GHC's implementation of concurrency
+
+ -- |This section describes features specific to GHC's
+ -- implementation of Concurrent Haskell.
- -- |This section describes features specific to GHC's
- -- implementation of Concurrent Haskell.
-
- -- ** Haskell threads and Operating System threads
+ -- ** Haskell threads and Operating System threads
- -- $osthreads
+ -- $osthreads
- -- ** Terminating the program
+ -- ** Terminating the program
- -- $termination
+ -- $termination
- -- ** Pre-emption
+ -- ** Pre-emption
- -- $preemption
+ -- $preemption
) where
import Prelude
import Control.Exception as Exception
#ifdef __GLASGOW_HASKELL__
-import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield,
- threadDelay, threadWaitRead, threadWaitWrite,
- forkIO, childHandler )
+import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield,
+ threadDelay, threadWaitRead, threadWaitWrite,
+ forkIO, childHandler )
import GHC.TopHandler ( reportStackOverflow, reportError )
-import GHC.IOBase ( IO(..) )
-import GHC.IOBase ( unsafeInterleaveIO )
-import GHC.IOBase ( newIORef, readIORef, writeIORef )
+import GHC.IOBase ( IO(..) )
+import GHC.IOBase ( unsafeInterleaveIO )
+import GHC.IOBase ( newIORef, readIORef, writeIORef )
import GHC.Base
import Foreign.StablePtr
-- preemptive multitasking.
mergeIO ls rs
- = newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
+ = newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
return val
-type Buffer a
+type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
- [] -> takeMVar branches_running >>= \ val ->
- if val == 1 then
- takeMVar tail_list >>= \ node ->
- putMVar node [] >>
- putMVar tail_list node
- else
- putMVar branches_running (val-1)
- (x:xs) ->
- waitQSem e >>
- takeMVar tail_list >>= \ node ->
- newEmptyMVar >>= \ next_node ->
- unsafeInterleaveIO (
- takeMVar next_node >>= \ y ->
- signalQSem e >>
- return y) >>= \ next_node_val ->
- putMVar node (x:next_node_val) >>
- putMVar tail_list next_node >>
- suckIO branches_running buff xs
+ [] -> takeMVar branches_running >>= \ val ->
+ if val == 1 then
+ takeMVar tail_list >>= \ node ->
+ putMVar node [] >>
+ putMVar tail_list node
+ else
+ putMVar branches_running (val-1)
+ (x:xs) ->
+ waitQSem e >>
+ takeMVar tail_list >>= \ node ->
+ newEmptyMVar >>= \ next_node ->
+ unsafeInterleaveIO (
+ takeMVar next_node >>= \ y ->
+ signalQSem e >>
+ return y) >>= \ next_node_val ->
+ putMVar node (x:next_node_val) >>
+ putMVar tail_list next_node >>
+ suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
- newEmptyMVar >>= \ tail_node ->
- newMVar tail_node >>= \ tail_list ->
+ newEmptyMVar >>= \ tail_node ->
+ newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
- newMVar len >>= \ branches_running ->
+ newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
- takeMVar tail_node >>= \ val ->
- signalQSem e >>
+ takeMVar tail_node >>= \ val ->
+ signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
:: StablePtr (IO ()) -> IO ()
forkOS_entry stableAction = do
- action <- deRefStablePtr stableAction
- action
+ action <- deRefStablePtr stableAction
+ action
foreign import ccall forkOS_createThread
:: StablePtr (IO ()) -> IO CInt
failNonThreaded = fail $ "RTS doesn't support multiple OS threads "
++"(use ghc -threaded when linking)"
-
-forkOS action
+
+forkOS action
| rtsSupportsBoundThreads = do
- mv <- newEmptyMVar
- let action_plus = Exception.catch action childHandler
- entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
- err <- forkOS_createThread entry
- when (err /= 0) $ fail "Cannot create OS thread."
- tid <- takeMVar mv
- freeStablePtr entry
- return tid
+ mv <- newEmptyMVar
+ let action_plus = Exception.catch action childHandler
+ entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
+ err <- forkOS_createThread entry
+ when (err /= 0) $ fail "Cannot create OS thread."
+ tid <- takeMVar mv
+ freeStablePtr entry
+ return tid
| otherwise = failNonThreaded
-- | Returns 'True' if the calling thread is /bound/, that is, if it is
-- safe to use foreign libraries that rely on thread-local state from the
-- calling thread.
isCurrentThreadBound :: IO Bool
-isCurrentThreadBound = IO $ \ s# ->
+isCurrentThreadBound = IO $ \ s# ->
case isCurrentThreadBound# s# of
(# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
runInBoundThread action
| rtsSupportsBoundThreads = do
- bound <- isCurrentThreadBound
- if bound
- then action
- else do
- ref <- newIORef undefined
- let action_plus = Exception.try action >>= writeIORef ref
- resultOrException <-
- bracket (newStablePtr action_plus)
- freeStablePtr
- (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
- case resultOrException of
- Left exception -> Exception.throw exception
- Right result -> return result
+ bound <- isCurrentThreadBound
+ if bound
+ then action
+ else do
+ ref <- newIORef undefined
+ let action_plus = Exception.try action >>= writeIORef ref
+ resultOrException <-
+ bracket (newStablePtr action_plus)
+ freeStablePtr
+ (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
+ case resultOrException of
+ Left exception -> Exception.throw exception
+ Right result -> return result
| otherwise = failNonThreaded
{- |
Left exception -> Exception.throw exception
Right result -> return result
else action
-
+
#endif /* __GLASGOW_HASKELL__ */
-- ---------------------------------------------------------------------------
> case cs of
> [] -> return ()
> m:ms -> do
-> putMVar children ms
-> takeMVar m
-> waitForChildren
->
+> putMVar children ms
+> takeMVar m
+> waitForChildren
+>
> forkChild :: IO () -> IO ThreadId
> forkChild io = do
-> mvar <- newEmptyMVar
-> childs <- takeMVar children
-> putMVar children (mvar:childs)
-> forkIO (io `finally` putMVar mvar ())
+> mvar <- newEmptyMVar
+> childs <- takeMVar children
+> putMVar children (mvar:childs)
+> forkIO (io `finally` putMVar mvar ())
>
> main =
-> later waitForChildren $
-> ...
+> later waitForChildren $
+> ...
The main thread principle also applies to calls to Haskell from
outside, using @foreign export@. When the @foreign export@ed