final revision to GArrow classes
[ghc-base.git] / Control / Concurrent.hs
index a25e659..62a30b4 100644 (file)
@@ -1,4 +1,11 @@
+{-# LANGUAGE CPP
+           , ForeignFunctionInterface
+           , MagicHash
+           , UnboxedTuples
+           , ScopedTypeVariables
+  #-}
 {-# OPTIONS_GHC -fno-warn-unused-imports #-}
+
 -----------------------------------------------------------------------------
 -- |
 -- Module      :  Control.Concurrent
@@ -28,10 +35,17 @@ module Control.Concurrent (
 
         forkIO,
 #ifdef __GLASGOW_HASKELL__
+        forkIOWithUnmask,
         killThread,
         throwTo,
 #endif
 
+        -- ** Threads with affinity
+        forkOn,
+        forkOnWithUnmask,
+        getNumCapabilities,
+        threadCapability,
+
         -- * Scheduling
 
         -- $conc_scheduling     
@@ -70,7 +84,7 @@ module Control.Concurrent (
         forkOS,
         isCurrentThreadBound,
         runInBoundThread,
-        runInUnboundThread
+        runInUnboundThread,
 #endif
 
         -- * GHC's implementation of concurrency
@@ -89,6 +103,10 @@ module Control.Concurrent (
         -- ** Pre-emption
 
         -- $preemption
+
+        -- * Deprecated functions
+        forkIOUnmasked
+
     ) where
 
 import Prelude
@@ -97,10 +115,9 @@ import Control.Exception.Base as Exception
 
 #ifdef __GLASGOW_HASKELL__
 import GHC.Exception
-import GHC.Conc         ( ThreadId(..), myThreadId, killThread, yield,
-                          threadDelay, forkIO, childHandler )
+import GHC.Conc hiding (threadWaitRead, threadWaitWrite)
 import qualified GHC.Conc
-import GHC.IO           ( IO(..), unsafeInterleaveIO )
+import GHC.IO           ( IO(..), unsafeInterleaveIO, unsafeUnmask )
 import GHC.IORef        ( newIORef, readIORef, writeIORef )
 import GHC.Base
 
@@ -357,13 +374,15 @@ failNonThreaded = fail $ "RTS doesn't support multiple OS threads "
 forkOS action0
     | rtsSupportsBoundThreads = do
         mv <- newEmptyMVar
-        b <- Exception.blocked
+        b <- Exception.getMaskingState
         let
-            -- async exceptions are blocked in the child if they are blocked
+            -- async exceptions are masked in the child if they are masked
             -- in the parent, as for forkIO (see #1048). forkOS_createThread
-            -- creates a thread with exceptions blocked by default.
-            action1 | b = action0
-                    | otherwise = unblock action0
+            -- creates a thread with exceptions masked by default.
+            action1 = case b of
+                        Unmasked -> unsafeUnmask action0
+                        MaskedInterruptible -> action0
+                        MaskedUninterruptible -> uninterruptibleMask_ action0
 
             action_plus = Exception.catch action1 childHandler
 
@@ -403,13 +422,10 @@ runInBoundThread action
             else do
                 ref <- newIORef undefined
                 let action_plus = Exception.try action >>= writeIORef ref
-                resultOrException <-
-                    bracket (newStablePtr action_plus)
-                            freeStablePtr
-                            (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref)
-                case resultOrException of
-                    Left exception -> Exception.throw (exception :: SomeException)
-                    Right result -> return result
+                bracket (newStablePtr action_plus)
+                        freeStablePtr
+                        (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) >>=
+                  unsafeResult
     | otherwise = failNonThreaded
 
 {- | 
@@ -422,23 +438,27 @@ performance loss due to the use of bound threads. A program that
 doesn't need it's main thread to be bound and makes /heavy/ use of concurrency
 (e.g. a web server), might want to wrap it's @main@ action in
 @runInUnboundThread@.
+
+Note that exceptions which are thrown to the current thread are thrown in turn
+to the thread that is executing the given computation. This ensures there's
+always a way of killing the forked thread.
 -}
 runInUnboundThread :: IO a -> IO a
 
 runInUnboundThread action = do
-    bound <- isCurrentThreadBound
-    if bound
-        then do
-            mv <- newEmptyMVar
-            b <- blocked
-            _ <- block $ forkIO $
-              Exception.try (if b then action else unblock action) >>=
-              putMVar mv
-            takeMVar mv >>= \ei -> case ei of
-                Left exception -> Exception.throw (exception :: SomeException)
-                Right result -> return result
-        else action
-
+  bound <- isCurrentThreadBound
+  if bound
+    then do
+      mv <- newEmptyMVar
+      mask $ \restore -> do
+        tid <- forkIO $ Exception.try (restore action) >>= putMVar mv
+        let wait = takeMVar mv `Exception.catch` \(e :: SomeException) ->
+                     Exception.throwTo tid e >> wait
+        wait >>= unsafeResult
+    else action
+
+unsafeResult :: Either SomeException a -> IO a
+unsafeResult = either Exception.throwIO return
 #endif /* __GLASGOW_HASKELL__ */
 
 #ifdef __GLASGOW_HASKELL__
@@ -447,6 +467,11 @@ runInUnboundThread action = do
 
 -- | Block the current thread until data is available to read on the
 -- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.  To safely close a file descriptor
+-- that has been used with 'threadWaitRead', use
+-- 'GHC.Conc.closeFdWith'.
 threadWaitRead :: Fd -> IO ()
 threadWaitRead fd
 #ifdef mingw32_HOST_OS
@@ -467,6 +492,11 @@ threadWaitRead fd
 
 -- | Block the current thread until data can be written to the
 -- given file descriptor (GHC only).
+--
+-- This will throw an 'IOError' if the file descriptor was closed
+-- while this thread was blocked.  To safely close a file descriptor
+-- that has been used with 'threadWaitWrite', use
+-- 'GHC.Conc.closeFdWith'.
 threadWaitWrite :: Fd -> IO ()
 threadWaitWrite fd
 #ifdef mingw32_HOST_OS
@@ -482,7 +512,7 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
 withThread :: IO a -> IO a
 withThread io = do
   m <- newEmptyMVar
-  _ <- block $ forkIO $ try io >>= putMVar m
+  _ <- mask_ $ forkIO $ try io >>= putMVar m
   x <- takeMVar m
   case x of
     Right a -> return a
@@ -491,7 +521,7 @@ withThread io = do
 waitFd :: Fd -> CInt -> IO ()
 waitFd fd write = do
    throwErrnoIfMinus1_ "fdReady" $
-        fdReady (fromIntegral fd) write (fromIntegral iNFINITE) 0
+        fdReady (fromIntegral fd) write iNFINITE 0
 
 iNFINITE :: CInt
 iNFINITE = 0xFFFFFFFF -- urgh