X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=Control%2FConcurrent.hs;h=8df665e37cae5166202a7cd46ebac4a89ef1d8c8;hb=696a935b0818ab9cf1a4fbd93faf9add88ead1cd;hp=451e4f9393032a7e07eabba3249ac2c63c97ec55;hpb=6be5e3277137f11000e7eb145d53009e157e7c90;p=ghc-base.git diff --git a/Control/Concurrent.hs b/Control/Concurrent.hs index 451e4f9..8df665e 100644 --- a/Control/Concurrent.hs +++ b/Control/Concurrent.hs @@ -62,6 +62,16 @@ module Control.Concurrent ( #endif -- $merge +#ifdef __GLASGOW_HASKELL__ + -- * Bound Threads + -- $boundthreads + rtsSupportsBoundThreads, + forkOS, + isCurrentThreadBound, + runInBoundThread, + runInUnboundThread +#endif + -- * GHC's implementation of concurrency -- |This section describes features specific to GHC's @@ -74,7 +84,6 @@ module Control.Concurrent ( -- ** Pre-emption -- $preemption - ) where import Prelude @@ -82,11 +91,17 @@ import Prelude import Control.Exception as Exception #ifdef __GLASGOW_HASKELL__ -import GHC.Conc +import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield, + threadDelay, threadWaitRead, threadWaitWrite ) import GHC.TopHandler ( reportStackOverflow, reportError ) import GHC.IOBase ( IO(..) ) import GHC.IOBase ( unsafeInterleaveIO ) +import GHC.IOBase ( newIORef, readIORef, writeIORef ) import GHC.Base + +import Foreign.StablePtr +import Foreign.C.Types ( CInt ) +import Control.Monad ( when ) #endif #ifdef __HUGS__ @@ -114,6 +129,10 @@ and context switching overheads are extremely low. Scheduling of Haskell threads is done internally in the Haskell runtime system, and doesn't make use of any operating system-supplied thread packages. +However, if you want to interact with a foreign library that expects your +program to use the operating system-supplied thread package, you can do so +by using 'forkOS' instead of 'forkIO'. + Haskell threads can communicate via 'MVar's, a kind of synchronised mutable variable (see "Control.Concurrent.MVar"). Several common concurrency abstractions can be built from 'MVar's, and these are @@ -125,7 +144,7 @@ In GHC, threads may also communicate via exceptions. Scheduling may be either pre-emptive or co-operative, depending on the implementation of Concurrent Haskell (see below - for imformation related to specific compilers). In a co-operative + for information related to specific compilers). In a co-operative system, context switches only occur when you use one of the primitives defined in this module. This means that programs such as: @@ -189,6 +208,9 @@ instance Show ThreadId where This sparks off a new thread to run the 'IO' computation passed as the first argument, and returns the 'ThreadId' of the newly created thread. + +The new thread will be a lightweight thread; if you want to use a foreign +library that uses thread-local storage, use 'forkOS' instead. -} forkIO :: IO () -> IO ThreadId forkIO action = IO $ \ s -> @@ -204,12 +226,12 @@ real_handler ex = case ex of -- ignore thread GC and killThread exceptions: BlockedOnDeadMVar -> return () + BlockedIndefinitely -> return () AsyncException ThreadKilled -> return () -- report all others: - AsyncException StackOverflow -> reportStackOverflow False - ErrorCall s -> reportError False s - other -> reportError False (showsPrec 0 other "\n") + AsyncException StackOverflow -> reportStackOverflow + other -> reportError other #endif /* __GLASGOW_HASKELL__ */ @@ -287,6 +309,155 @@ nmergeIO lss mapIO f xs = sequence (map f xs) #endif /* __HUGS__ */ +#ifdef __GLASGOW_HASKELL__ +-- --------------------------------------------------------------------------- +-- Bound Threads + +{- $boundthreads + +Support for multiple operating system threads and bound threads as described +below is currently only available in the GHC runtime system if you use the +/-threaded/ option when linking. + +Other Haskell systems do not currently support multiple operating system threads. + +A bound thread is a haskell thread that is /bound/ to an operating system +thread. While the bound thread is still scheduled by the Haskell run-time +system, the operating system thread takes care of all the foreign calls made +by the bound thread. + +To a foreign library, the bound thread will look exactly like an ordinary +operating system thread created using OS functions like @pthread_create@ +or @CreateThread@. + +Bound threads can be created using the 'forkOS' function below. All foreign +exported functions are run in a bound thread (bound to the OS thread that +called the function). Also, the @main@ action of every Haskell program is +run in a bound thread. + +Why do we need this? Because if a foreign library is called from a thread +created using 'forkIO', it won't have access to any /thread-local state/ - +state variables that have specific values for each OS thread +(see POSIX's @pthread_key_create@ or Win32's @TlsAlloc@). Therefore, some +libraries (OpenGL, for example) will not work from a thread created using +'forkIO'. They work fine in threads created using 'forkOS' or when called +from @main@ or from a @foreign export@. +-} + +-- | 'True' if bound threads are supported. +-- If @rtsSupportsBoundThreads@ is 'False', 'isCurrentThreadBound' +-- will always return 'False' and both 'forkOS' and 'runInBoundThread' will +-- fail. +foreign import ccall rtsSupportsBoundThreads :: Bool + + +{- | +Like 'forkIO', this sparks off a new thread to run the 'IO' computation passed as the +first argument, and returns the 'ThreadId' of the newly created +thread. + +However, @forkOS@ uses operating system-supplied multithreading support to create +a new operating system thread. The new thread is /bound/, which means that +all foreign calls made by the 'IO' computation are guaranteed to be executed +in this new operating system thread; also, the operating system thread is not +used for any other foreign calls. + +This means that you can use all kinds of foreign libraries from this thread +(even those that rely on thread-local state), without the limitations of 'forkIO'. +-} +forkOS :: IO () -> IO ThreadId + +foreign export ccall forkOS_entry + :: StablePtr (IO ()) -> IO () + +foreign import ccall "forkOS_entry" forkOS_entry_reimported + :: StablePtr (IO ()) -> IO () + +forkOS_entry stableAction = do + action <- deRefStablePtr stableAction + action + +foreign import ccall forkOS_createThread + :: StablePtr (IO ()) -> IO CInt + +failNonThreaded = fail $ "RTS doesn't support multiple OS threads " + ++"(use ghc -threaded when linking)" + +forkOS action + | rtsSupportsBoundThreads = do + mv <- newEmptyMVar + let action_plus = Exception.catch action childHandler + entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus) + err <- forkOS_createThread entry + when (err /= 0) $ fail "Cannot create OS thread." + tid <- takeMVar mv + freeStablePtr entry + return tid + | otherwise = failNonThreaded + +-- | Returns 'True' if the calling thread is /bound/, that is, if it is +-- safe to use foreign libraries that rely on thread-local state from the +-- calling thread. +isCurrentThreadBound :: IO Bool +isCurrentThreadBound = IO $ \ s# -> + case isCurrentThreadBound# s# of + (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #) + + +{- | +Run the 'IO' computation passed as the first argument. If the calling thread +is not /bound/, a bound thread is created temporarily. @runInBoundThread@ +doesn't finish until the 'IO' computation finishes. + +You can wrap a series of foreign function calls that rely on thread-local state +with @runInBoundThread@ so that you can use them without knowing whether the +current thread is /bound/. +-} +runInBoundThread :: IO a -> IO a + +runInBoundThread action + | rtsSupportsBoundThreads = do + bound <- isCurrentThreadBound + if bound + then action + else do + ref <- newIORef undefined + let action_plus = Exception.try action >>= writeIORef ref + resultOrException <- + bracket (newStablePtr action_plus) + freeStablePtr + (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) + case resultOrException of + Left exception -> Exception.throw exception + Right result -> return result + | otherwise = failNonThreaded + +{- | +Run the 'IO' computation passed as the first argument. If the calling thread +is /bound/, an unbound thread is created temporarily using 'forkIO'. +@runInBoundThread@ doesn't finish until the 'IO' computation finishes. + +Use this function /only/ in the rare case that you have actually observed a +performance loss due to the use of bound threads. A program that +doesn't need it's main thread to be bound and makes /heavy/ use of concurrency +(e.g. a web server), might want to wrap it's @main@ action in +@runInUnboundThread@. +-} +runInUnboundThread :: IO a -> IO a + +runInUnboundThread action = do + bound <- isCurrentThreadBound + if bound + then do + mv <- newEmptyMVar + forkIO (Exception.try action >>= putMVar mv) + takeMVar mv >>= \either -> case either of + Left exception -> Exception.throw exception + Right result -> return result + else action + +#endif /* __GLASGOW_HASKELL__ */ + -- --------------------------------------------------------------------------- -- More docs @@ -307,8 +478,8 @@ nmergeIO lss > myForkIO :: IO () -> IO (MVar ()) > myForkIO io = do -> mvar \<- newEmptyMVar -> forkIO (io \`finally\` putMVar mvar ()) +> mvar <- newEmptyMVar +> forkIO (io `finally` putMVar mvar ()) > return mvar Note that we use 'finally' from the @@ -319,25 +490,26 @@ nmergeIO lss A better method is to keep a global list of all child threads which we should wait for at the end of the program: -> children :: MVar [MVar ()] -> children = unsafePerformIO (newMVar []) -> -> waitForChildren :: IO () -> waitForChildren = do -> (mvar:mvars) \<- takeMVar children -> putMVar children mvars -> takeMVar mvar -> waitForChildren -> -> forkChild :: IO () -> IO () -> forkChild io = do -> mvar \<- newEmptyMVar -> forkIO (p \`finally\` putMVar mvar ()) -> childs \<- takeMVar children -> putMVar children (mvar:childs) -> -> later = flip finally -> +> children :: MVar [MVar ()] +> children = unsafePerformIO (newMVar []) +> +> waitForChildren :: IO () +> waitForChildren = do +> cs <- takeMVar children +> case cs of +> [] -> return () +> m:ms -> do +> putMVar children ms +> takeMVar m +> waitForChildren +> +> forkChild :: IO () -> IO () +> forkChild io = do +> mvar <- newEmptyMVar +> childs <- takeMVar children +> putMVar children (mvar:childs) +> forkIO (io `finally` putMVar mvar ()) +> > main = > later waitForChildren $ > ... @@ -357,7 +529,7 @@ nmergeIO lss a thread may be pre-empted whenever it allocates some memory, which unfortunately means that tight loops which do no allocation tend to lock out other threads (this only seems to - happen with pathalogical benchmark-style code, however). + happen with pathological benchmark-style code, however). The rescheduling timer runs on a 20ms granularity by default, but this may be altered using the