X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=Control%2FConcurrent.hs;h=62a30b434889b0cdd1e410d2c10275d740d85b65;hb=7dbb606d7b57cdad87a0ffbdb6ea4a274ebca7c0;hp=f22aca8d847a270c97591731fcbc107e4897f455;hpb=2b2397221c29a275630c62d4982caedc2c7cd987;p=ghc-base.git diff --git a/Control/Concurrent.hs b/Control/Concurrent.hs index f22aca8..62a30b4 100644 --- a/Control/Concurrent.hs +++ b/Control/Concurrent.hs @@ -1,3 +1,11 @@ +{-# LANGUAGE CPP + , ForeignFunctionInterface + , MagicHash + , UnboxedTuples + , ScopedTypeVariables + #-} +{-# OPTIONS_GHC -fno-warn-unused-imports #-} + ----------------------------------------------------------------------------- -- | -- Module : Control.Concurrent @@ -27,10 +35,17 @@ module Control.Concurrent ( forkIO, #ifdef __GLASGOW_HASKELL__ + forkIOWithUnmask, killThread, throwTo, #endif + -- ** Threads with affinity + forkOn, + forkOnWithUnmask, + getNumCapabilities, + threadCapability, + -- * Scheduling -- $conc_scheduling @@ -69,7 +84,7 @@ module Control.Concurrent ( forkOS, isCurrentThreadBound, runInBoundThread, - runInUnboundThread + runInUnboundThread, #endif -- * GHC's implementation of concurrency @@ -88,6 +103,10 @@ module Control.Concurrent ( -- ** Pre-emption -- $preemption + + -- * Deprecated functions + forkIOUnmasked + ) where import Prelude @@ -96,12 +115,10 @@ import Control.Exception.Base as Exception #ifdef __GLASGOW_HASKELL__ import GHC.Exception -import GHC.Conc ( ThreadId(..), myThreadId, killThread, yield, - threadDelay, forkIO, childHandler ) +import GHC.Conc hiding (threadWaitRead, threadWaitWrite) import qualified GHC.Conc -import GHC.IOBase ( IO(..) ) -import GHC.IOBase ( unsafeInterleaveIO ) -import GHC.IOBase ( newIORef, readIORef, writeIORef ) +import GHC.IO ( IO(..), unsafeInterleaveIO, unsafeUnmask ) +import GHC.IORef ( newIORef, readIORef, writeIORef ) import GHC.Base import System.Posix.Types ( Fd ) @@ -112,7 +129,6 @@ import Control.Monad ( when ) #ifdef mingw32_HOST_OS import Foreign.C import System.IO -import GHC.Handle #endif #endif @@ -358,13 +374,15 @@ failNonThreaded = fail $ "RTS doesn't support multiple OS threads " forkOS action0 | rtsSupportsBoundThreads = do mv <- newEmptyMVar - b <- Exception.blocked + b <- Exception.getMaskingState let - -- async exceptions are blocked in the child if they are blocked + -- async exceptions are masked in the child if they are masked -- in the parent, as for forkIO (see #1048). forkOS_createThread - -- creates a thread with exceptions blocked by default. - action1 | b = action0 - | otherwise = unblock action0 + -- creates a thread with exceptions masked by default. + action1 = case b of + Unmasked -> unsafeUnmask action0 + MaskedInterruptible -> action0 + MaskedUninterruptible -> uninterruptibleMask_ action0 action_plus = Exception.catch action1 childHandler @@ -404,13 +422,10 @@ runInBoundThread action else do ref <- newIORef undefined let action_plus = Exception.try action >>= writeIORef ref - resultOrException <- - bracket (newStablePtr action_plus) - freeStablePtr - (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) - case resultOrException of - Left exception -> Exception.throw (exception :: SomeException) - Right result -> return result + bracket (newStablePtr action_plus) + freeStablePtr + (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) >>= + unsafeResult | otherwise = failNonThreaded {- | @@ -423,20 +438,27 @@ performance loss due to the use of bound threads. A program that doesn't need it's main thread to be bound and makes /heavy/ use of concurrency (e.g. a web server), might want to wrap it's @main@ action in @runInUnboundThread@. + +Note that exceptions which are thrown to the current thread are thrown in turn +to the thread that is executing the given computation. This ensures there's +always a way of killing the forked thread. -} runInUnboundThread :: IO a -> IO a runInUnboundThread action = do - bound <- isCurrentThreadBound - if bound - then do - mv <- newEmptyMVar - forkIO (Exception.try action >>= putMVar mv) - takeMVar mv >>= \ei -> case ei of - Left exception -> Exception.throw (exception :: SomeException) - Right result -> return result - else action - + bound <- isCurrentThreadBound + if bound + then do + mv <- newEmptyMVar + mask $ \restore -> do + tid <- forkIO $ Exception.try (restore action) >>= putMVar mv + let wait = takeMVar mv `Exception.catch` \(e :: SomeException) -> + Exception.throwTo tid e >> wait + wait >>= unsafeResult + else action + +unsafeResult :: Either SomeException a -> IO a +unsafeResult = either Exception.throwIO return #endif /* __GLASGOW_HASKELL__ */ #ifdef __GLASGOW_HASKELL__ @@ -445,6 +467,11 @@ runInUnboundThread action = do -- | Block the current thread until data is available to read on the -- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. To safely close a file descriptor +-- that has been used with 'threadWaitRead', use +-- 'GHC.Conc.closeFdWith'. threadWaitRead :: Fd -> IO () threadWaitRead fd #ifdef mingw32_HOST_OS @@ -454,7 +481,8 @@ threadWaitRead fd -- and this only works with -threaded. | threaded = withThread (waitFd fd 0) | otherwise = case fd of - 0 -> do hWaitForInput stdin (-1); return () + 0 -> do _ <- hWaitForInput stdin (-1) + return () -- hWaitForInput does work properly, but we can only -- do this for stdin since we know its FD. _ -> error "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput" @@ -464,6 +492,11 @@ threadWaitRead fd -- | Block the current thread until data can be written to the -- given file descriptor (GHC only). +-- +-- This will throw an 'IOError' if the file descriptor was closed +-- while this thread was blocked. To safely close a file descriptor +-- that has been used with 'threadWaitWrite', use +-- 'GHC.Conc.closeFdWith'. threadWaitWrite :: Fd -> IO () threadWaitWrite fd #ifdef mingw32_HOST_OS @@ -479,7 +512,7 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool withThread :: IO a -> IO a withThread io = do m <- newEmptyMVar - forkIO $ try io >>= putMVar m + _ <- mask_ $ forkIO $ try io >>= putMVar m x <- takeMVar m case x of Right a -> return a @@ -487,11 +520,11 @@ withThread io = do waitFd :: Fd -> CInt -> IO () waitFd fd write = do - throwErrnoIfMinus1 "fdReady" $ - fdReady (fromIntegral fd) write (fromIntegral iNFINITE) 0 - return () + throwErrnoIfMinus1_ "fdReady" $ + fdReady (fromIntegral fd) write iNFINITE 0 -iNFINITE = 0xFFFFFFFF :: CInt -- urgh +iNFINITE :: CInt +iNFINITE = 0xFFFFFFFF -- urgh foreign import ccall safe "fdReady" fdReady :: CInt -> CInt -> CInt -> CInt -> IO CInt @@ -523,7 +556,7 @@ foreign import ccall safe "fdReady" The "System.IO" library manages multiplexing in its own way. On Windows systems it uses @safe@ foreign calls to ensure that threads doing I\/O operations don't block the whole runtime, - whereas on Unix systems all the currently blocked I\/O reqwests + whereas on Unix systems all the currently blocked I\/O requests are managed by a single thread (the /IO manager thread/) using @select@.