\begin{code}
{-# OPTIONS_GHC -fno-implicit-prelude #-}
+{-# OPTIONS_HADDOCK not-home #-}
-----------------------------------------------------------------------------
-- |
-- Module : GHC.Conc
-- * Forking and suchlike
, forkIO -- :: IO a -> IO ThreadId
, forkOnIO -- :: Int -> IO a -> IO ThreadId
+ , numCapabilities -- :: Int
, childHandler -- :: Exception -> IO ()
, myThreadId -- :: IO ThreadId
, killThread -- :: ThreadId -> IO ()
, asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
#endif
+#ifndef mingw32_HOST_OS
+ , signalHandlerLock
+#endif
+
, ensureIOManagerIsRunning
+
+#ifdef mingw32_HOST_OS
+ , ConsoleEvent(..)
+ , win32ConsoleHandler
+ , toWin32ConsoleEvent
+#endif
) where
import System.Posix.Types
import GHC.Base
import GHC.IOBase
import GHC.Num ( Num(..) )
-import GHC.Real ( fromIntegral, quot )
+import GHC.Real ( fromIntegral, div )
#ifndef mingw32_HOST_OS
import GHC.Base ( Int(..) )
#endif
-import GHC.Exception ( catchException, Exception(..), AsyncException(..) )
+#ifdef mingw32_HOST_OS
+import GHC.Read ( Read )
+import GHC.Enum ( Enum )
+#endif
+import GHC.Exception
import GHC.Pack ( packCString# )
import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
import GHC.STRef
showString "ThreadId " .
showsPrec d (getThreadId (id2TSO t))
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
id2TSO :: ThreadId -> ThreadId#
id2TSO (ThreadId t) = t
compare = cmpThread
{- |
-This sparks off a new thread to run the 'IO' computation passed as the
+Sparks off a new thread to run the 'IO' computation passed as the
first argument, and returns the 'ThreadId' of the newly created
thread.
The new thread will be a lightweight thread; if you want to use a foreign
-library that uses thread-local storage, use 'forkOS' instead.
+library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
+
+GHC note: the new thread inherits the /blocked/ state of the parent
+(see 'Control.Exception.block').
-}
forkIO :: IO () -> IO ThreadId
forkIO action = IO $ \ s ->
where
action_plus = catchException action childHandler
+{- |
+Like 'forkIO', but lets you specify on which CPU the thread is
+created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
+will stay on the same CPU for its entire lifetime (`forkIO` threads
+can migrate between CPUs according to the scheduling policy).
+`forkOnIO` is useful for overriding the scheduling policy when you
+know in advance how best to distribute the threads.
+
+The `Int` argument specifies the CPU number; it is interpreted modulo
+'numCapabilities' (note that it actually specifies a capability number
+rather than a CPU number, but to a first approximation the two are
+equivalent).
+-}
forkOnIO :: Int -> IO () -> IO ThreadId
forkOnIO (I# cpu) action = IO $ \ s ->
case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
where
action_plus = catchException action childHandler
+-- | the value passed to the @+RTS -N@ flag. This is the number of
+-- Haskell threads that can run truly simultaneously at any given
+-- time, and is typically set to the number of physical CPU cores on
+-- the machine.
+numCapabilities :: Int
+numCapabilities = unsafePerformIO $ do
+ n <- peek n_capabilities
+ return (fromIntegral n)
+
+foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
+
childHandler :: Exception -> IO ()
childHandler err = catchException (real_handler err) childHandler
the call is inside a 'block' or not.
Important note: the behaviour of 'throwTo' differs from that described in
-the paper "Asynchronous exceptions in Haskell"
+the paper \"Asynchronous exceptions in Haskell\"
(<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
In the paper, 'throwTo' is non-blocking; but the library implementation adopts
a more synchronous design in which 'throwTo' does not return until the exception
addMVarFinalizer :: MVar a -> IO () -> IO ()
addMVarFinalizer (MVar m) finalizer =
IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
+
+withMVar :: MVar a -> (a -> IO b) -> IO b
+withMVar m io =
+ block $ do
+ a <- takeMVar m
+ b <- catchException (unblock (io a))
+ (\e -> do putMVar m a; throw e)
+ putMVar m a
+ return b
\end{code}
\begin{code}
#ifdef mingw32_HOST_OS
--- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
+-- Note: threadWaitRead and threadWaitWrite aren't really functional
-- on Win32, but left in there because lib code (still) uses them (the manner
-- in which they're used doesn't cause problems on a Win32 platform though.)
#endif
data DelayReq
- = Delay {-# UNPACK #-} !Word64 {-# UNPACK #-} !(MVar ())
- | DelaySTM {-# UNPACK #-} !Word64 {-# UNPACK #-} !(TVar Bool)
+ = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
+ | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
#ifndef mingw32_HOST_OS
pendingEvents :: IORef [IOReq]
| delayTime d1 <= delayTime d2 = d1 : ds
| otherwise = d2 : insertDelay d1 rest
+delayTime :: DelayReq -> USecs
delayTime (Delay t _) = t
delayTime (DelaySTM t _) = t
foreign import ccall unsafe "getUSecOfDay"
getUSecOfDay :: IO USecs
+prodding :: IORef Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newIORef False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+ was_set <- atomicModifyIORef prodding (\a -> (True,a))
+ if (not (was_set)) then wakeupIOManager else return ()
+
#ifdef mingw32_HOST_OS
-- ----------------------------------------------------------------------------
-- Windows IO manager thread
_other -> service_cont wakeup delays' -- probably timeout
service_cont wakeup delays = do
- takeMVar prodding
- putMVar prodding False
+ atomicModifyIORef prodding (\_ -> (False,False))
service_loop wakeup delays
-- must agree with rts/win32/ThrIOManager.c
io_MANAGER_WAKEUP = 0xffffffff :: Word32
io_MANAGER_DIE = 0xfffffffe :: Word32
-start_console_handler :: Word32 -> IO ()
-start_console_handler r = do
- stableptr <- peek console_handler
- forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r)
- return ()
+data ConsoleEvent
+ = ControlC
+ | Break
+ | Close
+ -- these are sent to Services only.
+ | Logoff
+ | Shutdown
+ deriving (Eq, Ord, Enum, Show, Read, Typeable)
-foreign import ccall "&console_handler"
- console_handler :: Ptr (StablePtr (CInt -> IO ()))
+start_console_handler :: Word32 -> IO ()
+start_console_handler r =
+ case toWin32ConsoleEvent r of
+ Just x -> withMVar win32ConsoleHandler $ \handler -> do
+ forkIO (handler x)
+ return ()
+ Nothing -> return ()
+
+toWin32ConsoleEvent ev =
+ case ev of
+ 0 {- CTRL_C_EVENT-} -> Just ControlC
+ 1 {- CTRL_BREAK_EVENT-} -> Just Break
+ 2 {- CTRL_CLOSE_EVENT-} -> Just Close
+ 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
+ 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
+ _ -> Nothing
+
+win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
+win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
stick :: IORef HANDLE
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef nullPtr)
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
-
-prodServiceThread :: IO ()
-prodServiceThread = do
- b <- takeMVar prodding
- if (not b)
- then do hdl <- readIORef stick
- c_sendIOManagerEvent io_MANAGER_WAKEUP
- else return ()
- putMVar prodding True
+wakeupIOManager = do
+ hdl <- readIORef stick
+ c_sendIOManagerEvent io_MANAGER_WAKEUP
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
atomically $ writeTVar t True
getDelay now rest
_otherwise ->
- return (all, (fromIntegral (delayTime d - now) *
- fromIntegral tick_msecs))
- -- delay is in millisecs for WaitForSingleObject
+ -- delay is in millisecs for WaitForSingleObject
+ let micro_seconds = delayTime d - now
+ milli_seconds = (micro_seconds + 999) `div` 1000
+ in return (all, fromIntegral milli_seconds)
-- ToDo: this just duplicates part of System.Win32.Types, which isn't
-- available yet. We should move some Win32 functionality down here,
foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
c_sendIOManagerEvent :: Word32 -> IO ()
-foreign import ccall unsafe "maperrno" -- in runProcess.c
+foreign import ccall unsafe "maperrno" -- in Win32Utils.c
c_maperrno :: IO ()
foreign import stdcall "WaitForSingleObject"
now <- getUSecOfDay
(delays', timeout) <- getDelay now ptimeval delays
- res <- c_select ((max wakeup maxfd)+1) readfds writefds
+ res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
nullPtr timeout
if (res == -1)
then do
case s of
_ | s == io_MANAGER_WAKEUP -> return False
_ | s == io_MANAGER_DIE -> return True
- _ -> do handler_tbl <- peek handlers
+ _ -> withMVar signalHandlerLock $ \_ -> do
+ handler_tbl <- peek handlers
sp <- peekElemOff handler_tbl (fromIntegral s)
- forkIO (do io <- deRefStablePtr sp; io)
+ io <- deRefStablePtr sp
+ forkIO io
return False
if exit then return () else do
- takeMVar prodding
- putMVar prodding False
+ atomicModifyIORef prodding (\_ -> (False,False))
reqs' <- if wakeup_all then do wakeupAll reqs; return []
else completeRequests reqs readfds writefds []
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef 0)
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
+wakeupIOManager :: IO ()
+wakeupIOManager = do
+ fd <- readIORef stick
+ with io_MANAGER_WAKEUP $ \pbuf -> do
+ c_write (fromIntegral fd) pbuf 1; return ()
-prodServiceThread :: IO ()
-prodServiceThread = do
- b <- takeMVar prodding
- if (not b)
- then do fd <- readIORef stick
- with io_MANAGER_WAKEUP $ \pbuf -> do
- c_write (fromIntegral fd) pbuf 1; return ()
- else return ()
- putMVar prodding True
+-- Lock used to protect concurrent access to signal_handlers. Symptom of
+-- this race condition is #1922, although that bug was on Windows a similar
+-- bug also exists on Unix.
+signalHandlerLock :: MVar ()
+signalHandlerLock = unsafePerformIO (newMVar ())
foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
newtype CFdSet = CFdSet ()
foreign import ccall safe "select"
- c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
+ c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
-> IO CInt
foreign import ccall unsafe "hsFD_SETSIZE"
- fD_SETSIZE :: Fd
+ c_fD_SETSIZE :: CInt
+
+fD_SETSIZE :: Fd
+fD_SETSIZE = fromIntegral c_fD_SETSIZE
foreign import ccall unsafe "hsFD_CLR"
- fdClr :: Fd -> Ptr CFdSet -> IO ()
+ c_fdClr :: CInt -> Ptr CFdSet -> IO ()
+
+fdClr :: Fd -> Ptr CFdSet -> IO ()
+fdClr (Fd fd) fdset = c_fdClr fd fdset
foreign import ccall unsafe "hsFD_ISSET"
- fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+ c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
+
+fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
foreign import ccall unsafe "hsFD_SET"
- fdSet :: Fd -> Ptr CFdSet -> IO ()
+ c_fdSet :: CInt -> Ptr CFdSet -> IO ()
+
+fdSet :: Fd -> Ptr CFdSet -> IO ()
+fdSet (Fd fd) fdset = c_fdSet fd fdset
foreign import ccall unsafe "hsFD_ZERO"
fdZero :: Ptr CFdSet -> IO ()