\begin{code}
-{-# OPTIONS_GHC -fno-implicit-prelude #-}
+{-# OPTIONS_GHC -XNoImplicitPrelude #-}
+{-# OPTIONS_HADDOCK not-home #-}
-----------------------------------------------------------------------------
-- |
-- Module : GHC.Conc
-- #not-home
module GHC.Conc
- ( ThreadId(..)
-
- -- * Forking and suchlike
- , forkIO -- :: IO a -> IO ThreadId
- , forkOnIO -- :: Int -> IO a -> IO ThreadId
- , childHandler -- :: Exception -> IO ()
- , myThreadId -- :: IO ThreadId
- , killThread -- :: ThreadId -> IO ()
- , throwTo -- :: ThreadId -> Exception -> IO ()
- , par -- :: a -> b -> b
- , pseq -- :: a -> b -> b
- , yield -- :: IO ()
- , labelThread -- :: ThreadId -> String -> IO ()
-
- -- * Waiting
- , threadDelay -- :: Int -> IO ()
- , registerDelay -- :: Int -> IO (TVar Bool)
- , threadWaitRead -- :: Int -> IO ()
- , threadWaitWrite -- :: Int -> IO ()
-
- -- * MVars
- , MVar -- abstract
- , newMVar -- :: a -> IO (MVar a)
- , newEmptyMVar -- :: IO (MVar a)
- , takeMVar -- :: MVar a -> IO a
- , putMVar -- :: MVar a -> a -> IO ()
- , tryTakeMVar -- :: MVar a -> IO (Maybe a)
- , tryPutMVar -- :: MVar a -> a -> IO Bool
- , isEmptyMVar -- :: MVar a -> IO Bool
- , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
-
- -- * TVars
- , STM -- abstract
- , atomically -- :: STM a -> IO a
- , retry -- :: STM a
- , orElse -- :: STM a -> STM a -> STM a
+ ( ThreadId(..)
+
+ -- * Forking and suchlike
+ , forkIO -- :: IO a -> IO ThreadId
+ , forkOnIO -- :: Int -> IO a -> IO ThreadId
+ , numCapabilities -- :: Int
+ , childHandler -- :: Exception -> IO ()
+ , myThreadId -- :: IO ThreadId
+ , killThread -- :: ThreadId -> IO ()
+ , throwTo -- :: ThreadId -> Exception -> IO ()
+ , par -- :: a -> b -> b
+ , pseq -- :: a -> b -> b
+ , yield -- :: IO ()
+ , labelThread -- :: ThreadId -> String -> IO ()
+
+ , ThreadStatus(..), BlockReason(..)
+ , threadStatus -- :: ThreadId -> IO ThreadStatus
+
+ -- * Waiting
+ , threadDelay -- :: Int -> IO ()
+ , registerDelay -- :: Int -> IO (TVar Bool)
+ , threadWaitRead -- :: Int -> IO ()
+ , threadWaitWrite -- :: Int -> IO ()
+
+ -- * MVars
+ , MVar(..)
+ , newMVar -- :: a -> IO (MVar a)
+ , newEmptyMVar -- :: IO (MVar a)
+ , takeMVar -- :: MVar a -> IO a
+ , putMVar -- :: MVar a -> a -> IO ()
+ , tryTakeMVar -- :: MVar a -> IO (Maybe a)
+ , tryPutMVar -- :: MVar a -> a -> IO Bool
+ , isEmptyMVar -- :: MVar a -> IO Bool
+ , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
+
+ -- * TVars
+ , STM(..)
+ , atomically -- :: STM a -> IO a
+ , retry -- :: STM a
+ , orElse -- :: STM a -> STM a -> STM a
, catchSTM -- :: STM a -> (Exception -> STM a) -> STM a
- , alwaysSucceeds -- :: STM a -> STM ()
- , always -- :: STM Bool -> STM ()
- , TVar -- abstract
- , newTVar -- :: a -> STM (TVar a)
- , newTVarIO -- :: a -> STM (TVar a)
- , readTVar -- :: TVar a -> STM a
- , writeTVar -- :: a -> TVar a -> STM ()
- , unsafeIOToSTM -- :: IO a -> STM a
-
- -- * Miscellaneous
+ , alwaysSucceeds -- :: STM a -> STM ()
+ , always -- :: STM Bool -> STM ()
+ , TVar(..)
+ , newTVar -- :: a -> STM (TVar a)
+ , newTVarIO -- :: a -> STM (TVar a)
+ , readTVar -- :: TVar a -> STM a
+ , writeTVar -- :: a -> TVar a -> STM ()
+ , unsafeIOToSTM -- :: IO a -> STM a
+
+ -- * Miscellaneous
#ifdef mingw32_HOST_OS
- , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
- , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
- , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
+ , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+ , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+ , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
- , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
- , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
+ , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
+ , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
#endif
- , ensureIOManagerIsRunning
+#ifndef mingw32_HOST_OS
+ , signalHandlerLock
+#endif
+
+ , ensureIOManagerIsRunning
+
+#ifdef mingw32_HOST_OS
+ , ConsoleEvent(..)
+ , win32ConsoleHandler
+ , toWin32ConsoleEvent
+#endif
) where
import System.Posix.Types
import GHC.Base
import GHC.IOBase
-import GHC.Num ( Num(..) )
-import GHC.Real ( fromIntegral, quot )
+import GHC.Num ( Num(..) )
+import GHC.Real ( fromIntegral, div )
#ifndef mingw32_HOST_OS
-import GHC.Base ( Int(..) )
+import GHC.Base ( Int(..) )
#endif
-import GHC.Exception ( catchException, Exception(..), AsyncException(..) )
-import GHC.Pack ( packCString# )
+#ifdef mingw32_HOST_OS
+import GHC.Read ( Read )
+import GHC.Enum ( Enum )
+#endif
+import GHC.Exception ( throw )
+import GHC.Pack ( packCString# )
import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) )
import GHC.STRef
-import GHC.Show ( Show(..), showString )
+import GHC.Show ( Show(..), showString )
import Data.Typeable
infixr 0 `par`, `pseq`
\end{code}
%************************************************************************
-%* *
+%* *
\subsection{@ThreadId@, @par@, and @fork@}
-%* *
+%* *
%************************************************************************
\begin{code}
instance Show ThreadId where
showsPrec d t =
- showString "ThreadId " .
+ showString "ThreadId " .
showsPrec d (getThreadId (id2TSO t))
-foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int
+foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
id2TSO :: ThreadId -> ThreadId#
id2TSO (ThreadId t) = t
compare = cmpThread
{- |
-This sparks off a new thread to run the 'IO' computation passed as the
+Sparks off a new thread to run the 'IO' computation passed as the
first argument, and returns the 'ThreadId' of the newly created
thread.
The new thread will be a lightweight thread; if you want to use a foreign
-library that uses thread-local storage, use 'forkOS' instead.
+library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
+
+GHC note: the new thread inherits the /blocked/ state of the parent
+(see 'Control.Exception.block').
-}
forkIO :: IO () -> IO ThreadId
forkIO action = IO $ \ s ->
where
action_plus = catchException action childHandler
+{- |
+Like 'forkIO', but lets you specify on which CPU the thread is
+created. Unlike a `forkIO` thread, a thread created by `forkOnIO`
+will stay on the same CPU for its entire lifetime (`forkIO` threads
+can migrate between CPUs according to the scheduling policy).
+`forkOnIO` is useful for overriding the scheduling policy when you
+know in advance how best to distribute the threads.
+
+The `Int` argument specifies the CPU number; it is interpreted modulo
+'numCapabilities' (note that it actually specifies a capability number
+rather than a CPU number, but to a first approximation the two are
+equivalent).
+-}
forkOnIO :: Int -> IO () -> IO ThreadId
forkOnIO (I# cpu) action = IO $ \ s ->
case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
where
action_plus = catchException action childHandler
+-- | the value passed to the @+RTS -N@ flag. This is the number of
+-- Haskell threads that can run truly simultaneously at any given
+-- time, and is typically set to the number of physical CPU cores on
+-- the machine.
+numCapabilities :: Int
+numCapabilities = unsafePerformIO $ do
+ n <- peek n_capabilities
+ return (fromIntegral n)
+
+foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
+
childHandler :: Exception -> IO ()
childHandler err = catchException (real_handler err) childHandler
real_handler :: Exception -> IO ()
real_handler ex =
case ex of
- -- ignore thread GC and killThread exceptions:
- BlockedOnDeadMVar -> return ()
- BlockedIndefinitely -> return ()
- AsyncException ThreadKilled -> return ()
+ -- ignore thread GC and killThread exceptions:
+ BlockedOnDeadMVar -> return ()
+ BlockedIndefinitely -> return ()
+ AsyncException ThreadKilled -> return ()
- -- report all others:
- AsyncException StackOverflow -> reportStackOverflow
- other -> reportError other
+ -- report all others:
+ AsyncException StackOverflow -> reportStackOverflow
+ other -> reportError other
{- | 'killThread' terminates the given thread (GHC only).
Any work already done by the thread isn\'t
the call is inside a 'block' or not.
Important note: the behaviour of 'throwTo' differs from that described in
-the paper "Asynchronous exceptions in Haskell"
+the paper \"Asynchronous exceptions in Haskell\"
(<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
In the paper, 'throwTo' is non-blocking; but the library implementation adopts
a more synchronous design in which 'throwTo' does not return until the exception
adr = byteArrayContents# ps in
case (labelThread# t adr s) of s1 -> (# s1, () #)
--- Nota Bene: 'pseq' used to be 'seq'
--- but 'seq' is now defined in PrelGHC
+-- Nota Bene: 'pseq' used to be 'seq'
+-- but 'seq' is now defined in PrelGHC
--
-- "pseq" is defined a bit weirdly (see below)
--
{-# INLINE par #-}
par :: a -> b -> b
par x y = case (par# x) of { _ -> lazy y }
+
+
+data BlockReason
+ = BlockedOnMVar
+ -- ^blocked on on 'MVar'
+ | BlockedOnBlackHole
+ -- ^blocked on a computation in progress by another thread
+ | BlockedOnException
+ -- ^blocked in 'throwTo'
+ | BlockedOnSTM
+ -- ^blocked in 'retry' in an STM transaction
+ | BlockedOnForeignCall
+ -- ^currently in a foreign call
+ | BlockedOnOther
+ -- ^blocked on some other resource. Without @-threaded@,
+ -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
+ -- they show up as 'BlockedOnMVar'.
+ deriving (Eq,Ord,Show)
+
+-- | The current status of a thread
+data ThreadStatus
+ = ThreadRunning
+ -- ^the thread is currently runnable or running
+ | ThreadFinished
+ -- ^the thread has finished
+ | ThreadBlocked BlockReason
+ -- ^the thread is blocked on some resource
+ | ThreadDied
+ -- ^the thread received an uncaught exception
+ deriving (Eq,Ord,Show)
+
+threadStatus :: ThreadId -> IO ThreadStatus
+threadStatus (ThreadId t) = IO $ \s ->
+ case threadStatus# t s of
+ (# s', stat #) -> (# s', mk_stat (I# stat) #)
+ where
+ -- NB. keep these in sync with includes/Constants.h
+ mk_stat 0 = ThreadRunning
+ mk_stat 1 = ThreadBlocked BlockedOnMVar
+ mk_stat 2 = ThreadBlocked BlockedOnBlackHole
+ mk_stat 3 = ThreadBlocked BlockedOnException
+ mk_stat 7 = ThreadBlocked BlockedOnSTM
+ mk_stat 11 = ThreadBlocked BlockedOnForeignCall
+ mk_stat 12 = ThreadBlocked BlockedOnForeignCall
+ mk_stat 16 = ThreadFinished
+ mk_stat 17 = ThreadDied
+ mk_stat _ = ThreadBlocked BlockedOnOther
\end{code}
%************************************************************************
-%* *
+%* *
\subsection[stm]{Transactional heap operations}
-%* *
+%* *
%************************************************************************
TVars are shared memory locations which support atomic memory
{-# INLINE (>>) #-}
{-# INLINE (>>=) #-}
m >> k = thenSTM m k
- return x = returnSTM x
+ return x = returnSTM x
m >>= k = bindSTM m k
bindSTM :: STM a -> (a -> STM b) -> STM b
returnSTM :: a -> STM a
returnSTM x = STM (\s -> (# s, x #))
--- | Unsafely performs IO in the STM monad.
+-- | Unsafely performs IO in the STM monad. Beware: this is a highly
+-- dangerous thing to do.
+--
+-- * The STM implementation will often run transactions multiple
+-- times, so you need to be prepared for this if your IO has any
+-- side effects.
+--
+-- * The STM implementation will abort transactions that are known to
+-- be invalid and need to be restarted. This may happen in the middle
+-- of `unsafeIOToSTM`, so make sure you don't acquire any resources
+-- that need releasing (exception handlers are ignored when aborting
+-- the transaction). That includes doing any IO using Handles, for
+-- example. Getting this wrong will probably lead to random deadlocks.
+--
+-- * The transaction may have seen an inconsistent view of memory when
+-- the IO runs. Invariants that you expect to be true throughout
+-- your program may not be true inside a transaction, due to the
+-- way transactions are implemented. Normally this wouldn't be visible
+-- to the programmer, but using `unsafeIOToSTM` can expose it.
+--
unsafeIOToSTM :: IO a -> STM a
unsafeIOToSTM (IO m) = STM m
INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
instance Eq (TVar a) where
- (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
+ (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
-- |Create a new TVar holding a value supplied
newTVar :: a -> STM (TVar a)
newTVar val = STM $ \s1# ->
case newTVar# val s1# of
- (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+ (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
-- |@IO@ version of 'newTVar'. This is useful for creating top-level
-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
newTVarIO :: a -> IO (TVar a)
newTVarIO val = IO $ \s1# ->
case newTVar# val s1# of
- (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
+ (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
-- |Return the current value stored in a TVar
readTVar :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ()
writeTVar (TVar tvar#) val = STM $ \s1# ->
case writeTVar# tvar# val s1# of
- s2# -> (# s2#, () #)
+ s2# -> (# s2#, () #)
\end{code}
%************************************************************************
-%* *
+%* *
\subsection[mvars]{M-Structures}
-%* *
+%* *
%************************************************************************
M-Vars are rendezvous points for concurrent threads. They begin
-- |Create an 'MVar' which contains the supplied value.
newMVar :: a -> IO (MVar a)
newMVar value =
- newEmptyMVar >>= \ mvar ->
- putMVar mvar value >>
+ newEmptyMVar >>= \ mvar ->
+ putMVar mvar value >>
return mvar
-- |Return the contents of the 'MVar'. If the 'MVar' is currently
tryTakeMVar :: MVar a -> IO (Maybe a)
tryTakeMVar (MVar m) = IO $ \ s ->
case tryTakeMVar# m s of
- (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
- (# s, _, a #) -> (# s, Just a #) -- MVar is full
+ (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty
+ (# s, _, a #) -> (# s, Just a #) -- MVar is full
-- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
-- attempts to put the value @a@ into the 'MVar', returning 'True' if
addMVarFinalizer :: MVar a -> IO () -> IO ()
addMVarFinalizer (MVar m) finalizer =
IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) }
+
+withMVar :: MVar a -> (a -> IO b) -> IO b
+withMVar m io =
+ block $ do
+ a <- takeMVar m
+ b <- catchAny (unblock (io a))
+ (\e -> do putMVar m a; throw e)
+ putMVar m a
+ return b
\end{code}
%************************************************************************
-%* *
+%* *
\subsection{Thread waiting}
-%* *
+%* *
%************************************************************************
\begin{code}
#ifdef mingw32_HOST_OS
--- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
+-- Note: threadWaitRead and threadWaitWrite aren't really functional
-- on Win32, but left in there because lib code (still) uses them (the manner
-- in which they're used doesn't cause problems on a Win32 platform though.)
asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
IO $ \s -> case asyncRead# fd isSock len buf s of
- (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
+ (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
IO $ \s -> case asyncWrite# fd isSock len buf s of
- (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
+ (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
asyncDoProc (FunPtr proc) (Ptr param) =
-- the 'length' value is ignored; simplifies implementation of
-- the async*# primops to have them all return the same result.
IO $ \s -> case asyncDoProc# proc param s of
- (# s, len#, err# #) -> (# s, I# err# #)
+ (# s, len#, err# #) -> (# s, I# err# #)
-- to aid the use of these primops by the IO Handle implementation,
-- provide the following convenience funs:
| threaded = waitForReadEvent fd
#endif
| otherwise = IO $ \s ->
- case fromIntegral fd of { I# fd# ->
- case waitRead# fd# s of { s -> (# s, () #)
- }}
+ case fromIntegral fd of { I# fd# ->
+ case waitRead# fd# s of { s -> (# s, () #)
+ }}
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
| threaded = waitForWriteEvent fd
#endif
| otherwise = IO $ \s ->
- case fromIntegral fd of { I# fd# ->
- case waitWrite# fd# s of { s -> (# s, () #)
- }}
+ case fromIntegral fd of { I# fd# ->
+ case waitWrite# fd# s of { s -> (# s, () #)
+ }}
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
--- Note that the resolution used by the Haskell runtime system's
--- internal timer is 1\/50 second, and 'threadDelay' will round its
--- argument up to the nearest multiple of this resolution.
---
-- There is no guarantee that the thread will be rescheduled promptly
-- when the delay has expired, but the thread will never continue to
-- run /earlier/ than specified.
threadDelay time
| threaded = waitForDelayEvent time
| otherwise = IO $ \s ->
- case fromIntegral time of { I# time# ->
- case delay# time# s of { s -> (# s, () #)
- }}
+ case fromIntegral time of { I# time# ->
+ case delay# time# s of { s -> (# s, () #)
+ }}
+
+-- | Set the value of returned TVar to True after a given number of
+-- microseconds. The caveats associated with threadDelay also apply.
+--
registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs
| threaded = waitForDelayEventSTM usecs
waitForDelayEvent :: Int -> IO ()
waitForDelayEvent usecs = do
m <- newEmptyMVar
- now <- getTicksOfDay
- let target = now + usecs `quot` tick_usecs
+ target <- calculateTarget usecs
atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
prodServiceThread
takeMVar m
waitForDelayEventSTM :: Int -> IO (TVar Bool)
waitForDelayEventSTM usecs = do
t <- atomically $ newTVar False
- now <- getTicksOfDay
- let target = now + usecs `quot` tick_usecs
+ target <- calculateTarget usecs
atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
prodServiceThread
return t
-calculateTarget :: Int -> IO Int
+calculateTarget :: Int -> IO USecs
calculateTarget usecs = do
- now <- getTicksOfDay
- let -- Convert usecs to ticks, rounding up as we must wait /at least/
- -- as long as we are told
- usecs' = (usecs + tick_usecs - 1) `quot` tick_usecs
- target = now + 1 -- getTicksOfDay will have rounded down, but
- -- again we need to wait for /at least/ as long
- -- as we are told, so add 1 to it
- + usecs'
- return target
+ now <- getUSecOfDay
+ return $ now + (fromIntegral usecs)
+
-- ----------------------------------------------------------------------------
-- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
-- Issues, possible problems:
--
--- - we might want bound threads to just do the blocking
--- operation rather than communicating with the IO manager
--- thread. This would prevent simgle-threaded programs which do
--- IO from requiring multiple OS threads. However, it would also
--- prevent bound threads waiting on IO from being killed or sent
--- exceptions.
+-- - we might want bound threads to just do the blocking
+-- operation rather than communicating with the IO manager
+-- thread. This would prevent simgle-threaded programs which do
+-- IO from requiring multiple OS threads. However, it would also
+-- prevent bound threads waiting on IO from being killed or sent
+-- exceptions.
--
--- - Apprently exec() doesn't work on Linux in a multithreaded program.
--- I couldn't repeat this.
+-- - Apprently exec() doesn't work on Linux in a multithreaded program.
+-- I couldn't repeat this.
--
--- - How do we handle signal delivery in the multithreaded RTS?
+-- - How do we handle signal delivery in the multithreaded RTS?
--
--- - forkProcess will kill the IO manager thread. Let's just
--- hope we don't need to do any blocking IO between fork & exec.
+-- - forkProcess will kill the IO manager thread. Let's just
+-- hope we don't need to do any blocking IO between fork & exec.
#ifndef mingw32_HOST_OS
data IOReq
#endif
data DelayReq
- = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
- | DelaySTM {-# UNPACK #-} !Int {-# UNPACK #-} !(TVar Bool)
+ = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
+ | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
#ifndef mingw32_HOST_OS
pendingEvents :: IORef [IOReq]
#endif
pendingDelays :: IORef [DelayReq]
- -- could use a strict list or array here
+ -- could use a strict list or array here
{-# NOINLINE pendingEvents #-}
{-# NOINLINE pendingDelays #-}
(pendingEvents,pendingDelays) = unsafePerformIO $ do
reqs <- newIORef []
dels <- newIORef []
return (reqs, dels)
- -- the first time we schedule an IO request, the service thread
- -- will be created (cool, huh?)
+ -- the first time we schedule an IO request, the service thread
+ -- will be created (cool, huh?)
ensureIOManagerIsRunning :: IO ()
ensureIOManagerIsRunning
| delayTime d1 <= delayTime d2 = d1 : ds
| otherwise = d2 : insertDelay d1 rest
+delayTime :: DelayReq -> USecs
delayTime (Delay t _) = t
delayTime (DelaySTM t _) = t
-type Ticks = Int
-tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec)
-tick_usecs = 1000000 `quot` tick_freq :: Int
-tick_msecs = 1000 `quot` tick_freq :: Int
+type USecs = Word64
-- XXX: move into GHC.IOBase from Data.IORef?
atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-foreign import ccall unsafe "getTicksOfDay"
- getTicksOfDay :: IO Ticks
+foreign import ccall unsafe "getUSecOfDay"
+ getUSecOfDay :: IO USecs
+
+prodding :: IORef Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newIORef False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+ was_set <- atomicModifyIORef prodding (\a -> (True,a))
+ if (not (was_set)) then wakeupIOManager else return ()
#ifdef mingw32_HOST_OS
-- ----------------------------------------------------------------------------
new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
let delays = foldr insertDelay old_delays new_delays
- now <- getTicksOfDay
+ now <- getUSecOfDay
(delays', timeout) <- getDelay now delays
r <- c_WaitForSingleObject wakeup timeout
0 -> do
r <- c_readIOManagerEvent
exit <-
- case r of
- _ | r == io_MANAGER_WAKEUP -> return False
- _ | r == io_MANAGER_DIE -> return True
+ case r of
+ _ | r == io_MANAGER_WAKEUP -> return False
+ _ | r == io_MANAGER_DIE -> return True
0 -> return False -- spurious wakeup
- r -> do start_console_handler (r `shiftR` 1); return False
+ r -> do start_console_handler (r `shiftR` 1); return False
if exit
then return ()
else service_cont wakeup delays'
_other -> service_cont wakeup delays' -- probably timeout
service_cont wakeup delays = do
- takeMVar prodding
- putMVar prodding False
+ atomicModifyIORef prodding (\_ -> (False,False))
service_loop wakeup delays
-- must agree with rts/win32/ThrIOManager.c
io_MANAGER_WAKEUP = 0xffffffff :: Word32
io_MANAGER_DIE = 0xfffffffe :: Word32
-start_console_handler :: Word32 -> IO ()
-start_console_handler r = do
- stableptr <- peek console_handler
- forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r)
- return ()
+data ConsoleEvent
+ = ControlC
+ | Break
+ | Close
+ -- these are sent to Services only.
+ | Logoff
+ | Shutdown
+ deriving (Eq, Ord, Enum, Show, Read, Typeable)
-foreign import ccall "&console_handler"
- console_handler :: Ptr (StablePtr (CInt -> IO ()))
+start_console_handler :: Word32 -> IO ()
+start_console_handler r =
+ case toWin32ConsoleEvent r of
+ Just x -> withMVar win32ConsoleHandler $ \handler -> do
+ forkIO (handler x)
+ return ()
+ Nothing -> return ()
+
+toWin32ConsoleEvent ev =
+ case ev of
+ 0 {- CTRL_C_EVENT-} -> Just ControlC
+ 1 {- CTRL_BREAK_EVENT-} -> Just Break
+ 2 {- CTRL_CLOSE_EVENT-} -> Just Close
+ 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
+ 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
+ _ -> Nothing
+
+win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
+win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
stick :: IORef HANDLE
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef nullPtr)
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
-
-prodServiceThread :: IO ()
-prodServiceThread = do
- b <- takeMVar prodding
- if (not b)
- then do hdl <- readIORef stick
- c_sendIOManagerEvent io_MANAGER_WAKEUP
- else return ()
- putMVar prodding True
+wakeupIOManager = do
+ hdl <- readIORef stick
+ c_sendIOManagerEvent io_MANAGER_WAKEUP
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
-getDelay :: Ticks -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
getDelay now [] = return ([], iNFINITE)
getDelay now all@(d : rest)
= case d of
Delay time m | now >= time -> do
- putMVar m ()
- getDelay now rest
+ putMVar m ()
+ getDelay now rest
DelaySTM time t | now >= time -> do
- atomically $ writeTVar t True
- getDelay now rest
+ atomically $ writeTVar t True
+ getDelay now rest
_otherwise ->
- return (all, (fromIntegral (delayTime d - now) *
- fromIntegral tick_msecs))
- -- delay is in millisecs for WaitForSingleObject
+ -- delay is in millisecs for WaitForSingleObject
+ let micro_seconds = delayTime d - now
+ milli_seconds = (micro_seconds + 999) `div` 1000
+ in return (all, fromIntegral milli_seconds)
-- ToDo: this just duplicates part of System.Win32.Types, which isn't
-- available yet. We should move some Win32 functionality down here,
foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
c_sendIOManagerEvent :: Word32 -> IO ()
-foreign import ccall unsafe "maperrno" -- in runProcess.c
+foreign import ccall unsafe "maperrno" -- in Win32Utils.c
c_maperrno :: IO ()
foreign import stdcall "WaitForSingleObject"
startIOManagerThread :: IO ()
startIOManagerThread = do
allocaArray 2 $ \fds -> do
- throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
- rd_end <- peekElemOff fds 0
- wr_end <- peekElemOff fds 1
- writeIORef stick (fromIntegral wr_end)
- c_setIOManagerPipe wr_end
- forkIO $ do
- allocaBytes sizeofFdSet $ \readfds -> do
- allocaBytes sizeofFdSet $ \writefds -> do
- allocaBytes sizeofTimeVal $ \timeval -> do
- service_loop (fromIntegral rd_end) readfds writefds timeval [] []
- return ()
+ throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
+ rd_end <- peekElemOff fds 0
+ wr_end <- peekElemOff fds 1
+ writeIORef stick (fromIntegral wr_end)
+ c_setIOManagerPipe wr_end
+ forkIO $ do
+ allocaBytes sizeofFdSet $ \readfds -> do
+ allocaBytes sizeofFdSet $ \writefds -> do
+ allocaBytes sizeofTimeVal $ \timeval -> do
+ service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+ return ()
service_loop
- :: Fd -- listen to this for wakeup calls
+ :: Fd -- listen to this for wakeup calls
-> Ptr CFdSet
-> Ptr CFdSet
-> Ptr CTimeVal
-- perform the select()
let do_select delays = do
- -- check the current time and wake up any thread in
- -- threadDelay whose timeout has expired. Also find the
- -- timeout value for the select() call.
- now <- getTicksOfDay
- (delays', timeout) <- getDelay now ptimeval delays
-
- res <- c_select ((max wakeup maxfd)+1) readfds writefds
- nullPtr timeout
- if (res == -1)
- then do
- err <- getErrno
- case err of
- _ | err == eINTR -> do_select delays'
- -- EINTR: just redo the select()
- _ | err == eBADF -> return (True, delays)
- -- EBADF: one of the file descriptors is closed or bad,
- -- we don't know which one, so wake everyone up.
- _ | otherwise -> throwErrno "select"
- -- otherwise (ENOMEM or EINVAL) something has gone
- -- wrong; report the error.
- else
- return (False,delays')
+ -- check the current time and wake up any thread in
+ -- threadDelay whose timeout has expired. Also find the
+ -- timeout value for the select() call.
+ now <- getUSecOfDay
+ (delays', timeout) <- getDelay now ptimeval delays
+
+ res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds
+ nullPtr timeout
+ if (res == -1)
+ then do
+ err <- getErrno
+ case err of
+ _ | err == eINTR -> do_select delays'
+ -- EINTR: just redo the select()
+ _ | err == eBADF -> return (True, delays)
+ -- EBADF: one of the file descriptors is closed or bad,
+ -- we don't know which one, so wake everyone up.
+ _ | otherwise -> throwErrno "select"
+ -- otherwise (ENOMEM or EINVAL) something has gone
+ -- wrong; report the error.
+ else
+ return (False,delays')
(wakeup_all,delays') <- do_select delays
if b == 0
then return False
else alloca $ \p -> do
- c_read (fromIntegral wakeup) p 1; return ()
- s <- peek p
- case s of
- _ | s == io_MANAGER_WAKEUP -> return False
- _ | s == io_MANAGER_DIE -> return True
- _ -> do handler_tbl <- peek handlers
- sp <- peekElemOff handler_tbl (fromIntegral s)
- forkIO (do io <- deRefStablePtr sp; io)
- return False
+ c_read (fromIntegral wakeup) p 1; return ()
+ s <- peek p
+ case s of
+ _ | s == io_MANAGER_WAKEUP -> return False
+ _ | s == io_MANAGER_DIE -> return True
+ _ -> withMVar signalHandlerLock $ \_ -> do
+ handler_tbl <- peek handlers
+ sp <- peekElemOff handler_tbl (fromIntegral s)
+ io <- deRefStablePtr sp
+ forkIO io
+ return False
if exit then return () else do
- takeMVar prodding
- putMVar prodding False
+ atomicModifyIORef prodding (\_ -> (False,False))
reqs' <- if wakeup_all then do wakeupAll reqs; return []
- else completeRequests reqs readfds writefds []
+ else completeRequests reqs readfds writefds []
service_loop wakeup readfds writefds ptimeval reqs' delays'
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef 0)
-prodding :: MVar Bool
-{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newMVar False)
+wakeupIOManager :: IO ()
+wakeupIOManager = do
+ fd <- readIORef stick
+ with io_MANAGER_WAKEUP $ \pbuf -> do
+ c_write (fromIntegral fd) pbuf 1; return ()
-prodServiceThread :: IO ()
-prodServiceThread = do
- b <- takeMVar prodding
- if (not b)
- then do fd <- readIORef stick
- with io_MANAGER_WAKEUP $ \pbuf -> do
- c_write (fromIntegral fd) pbuf 1; return ()
- else return ()
- putMVar prodding True
+-- Lock used to protect concurrent access to signal_handlers. Symptom of
+-- this race condition is #1922, although that bug was on Windows a similar
+-- bug also exists on Unix.
+signalHandlerLock :: MVar ()
+signalHandlerLock = unsafePerformIO (newMVar ())
foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
buildFdSets maxfd readfds writefds (Read fd m : reqs)
| fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
| otherwise = do
- fdSet fd readfds
+ fdSet fd readfds
buildFdSets (max maxfd fd) readfds writefds reqs
buildFdSets maxfd readfds writefds (Write fd m : reqs)
| fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range"
| otherwise = do
- fdSet fd writefds
- buildFdSets (max maxfd fd) readfds writefds reqs
+ fdSet fd writefds
+ buildFdSets (max maxfd fd) readfds writefds reqs
completeRequests [] _ _ reqs' = return reqs'
completeRequests (Read fd m : reqs) readfds writefds reqs' = do
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
-getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
+getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
getDelay now ptimeval [] = return ([],nullPtr)
getDelay now ptimeval all@(d : rest)
= case d of
Delay time m | now >= time -> do
- putMVar m ()
- getDelay now ptimeval rest
+ putMVar m ()
+ getDelay now ptimeval rest
DelaySTM time t | now >= time -> do
- atomically $ writeTVar t True
- getDelay now ptimeval rest
+ atomically $ writeTVar t True
+ getDelay now ptimeval rest
_otherwise -> do
- setTimevalTicks ptimeval (delayTime d - now)
- return (all,ptimeval)
+ setTimevalTicks ptimeval (delayTime d - now)
+ return (all,ptimeval)
newtype CTimeVal = CTimeVal ()
sizeofTimeVal :: Int
foreign import ccall unsafe "setTimevalTicks"
- setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
+ setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
{-
On Win32 we're going to have a single Pipe, and a
newtype CFdSet = CFdSet ()
foreign import ccall safe "select"
- c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
+ c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
-> IO CInt
foreign import ccall unsafe "hsFD_SETSIZE"
- fD_SETSIZE :: Fd
+ c_fD_SETSIZE :: CInt
+
+fD_SETSIZE :: Fd
+fD_SETSIZE = fromIntegral c_fD_SETSIZE
foreign import ccall unsafe "hsFD_CLR"
- fdClr :: Fd -> Ptr CFdSet -> IO ()
+ c_fdClr :: CInt -> Ptr CFdSet -> IO ()
+
+fdClr :: Fd -> Ptr CFdSet -> IO ()
+fdClr (Fd fd) fdset = c_fdClr fd fdset
foreign import ccall unsafe "hsFD_ISSET"
- fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+ c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
+
+fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
foreign import ccall unsafe "hsFD_SET"
- fdSet :: Fd -> Ptr CFdSet -> IO ()
+ c_fdSet :: CInt -> Ptr CFdSet -> IO ()
+
+fdSet :: Fd -> Ptr CFdSet -> IO ()
+fdSet (Fd fd) fdset = c_fdSet fd fdset
foreign import ccall unsafe "hsFD_ZERO"
fdZero :: Ptr CFdSet -> IO ()