, throwTo -- :: ThreadId -> Exception -> IO ()
, par -- :: a -> b -> b
, pseq -- :: a -> b -> b
+ , runSparks
, yield -- :: IO ()
, labelThread -- :: ThreadId -> String -> IO ()
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
- -- * MVars
- , MVar(..)
- , newMVar -- :: a -> IO (MVar a)
- , newEmptyMVar -- :: IO (MVar a)
- , takeMVar -- :: MVar a -> IO a
- , putMVar -- :: MVar a -> a -> IO ()
- , tryTakeMVar -- :: MVar a -> IO (Maybe a)
- , tryPutMVar -- :: MVar a -> a -> IO Bool
- , isEmptyMVar -- :: MVar a -> IO Bool
- , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
-
-- * TVars
, STM(..)
, atomically -- :: STM a -> IO a
, unsafeIOToSTM -- :: IO a -> STM a
-- * Miscellaneous
+ , withMVar
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
#endif
#ifndef mingw32_HOST_OS
- , signalHandlerLock
+ , Signal, HandlerFun, setHandler, runHandlers
#endif
, ensureIOManagerIsRunning
+#ifndef mingw32_HOST_OS
+ , syncIOManager
+#endif
#ifdef mingw32_HOST_OS
, ConsoleEvent(..)
import Foreign
import Foreign.C
+#ifdef mingw32_HOST_OS
+import Data.Typeable
+#endif
+
+#ifndef mingw32_HOST_OS
+import Data.Dynamic
+#endif
+import Control.Monad
import Data.Maybe
import GHC.Base
-import {-# SOURCE #-} GHC.Handle
-import GHC.IOBase
+#ifndef mingw32_HOST_OS
+import GHC.Debug
+#endif
+import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
+import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
+import GHC.IO
+import GHC.IO.Exception
+import GHC.Exception
+import GHC.IORef
+import GHC.MVar
import GHC.Num ( Num(..) )
import GHC.Real ( fromIntegral )
+#ifndef mingw32_HOST_OS
+import GHC.IOArray
+import GHC.Arr ( inRange )
+#endif
#ifdef mingw32_HOST_OS
import GHC.Real ( div )
-import GHC.Ptr ( plusPtr, FunPtr(..) )
+import GHC.Ptr
#endif
#ifdef mingw32_HOST_OS
import GHC.Read ( Read )
import GHC.Enum ( Enum )
#endif
-import GHC.Exception ( SomeException(..), throw )
import GHC.Pack ( packCString# )
-import GHC.Ptr ( Ptr(..) )
-import GHC.STRef
import GHC.Show ( Show(..), showString )
-import Data.Typeable
-import GHC.Err
infixr 0 `par`, `pseq`
\end{code}
GHC note: the new thread inherits the /blocked/ state of the parent
(see 'Control.Exception.block').
+
+The newly created thread has an exception handler that discards the
+exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
+'ThreadKilled', and passes all other exceptions to the uncaught
+exception handler (see 'setUncaughtExceptionHandler').
-}
forkIO :: IO () -> IO ThreadId
forkIO action = IO $ \ s ->
n <- peek n_capabilities
return (fromIntegral n)
+#if defined(mingw32_HOST_OS) && defined(__PIC__)
+foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
+#else
foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
-
+#endif
childHandler :: SomeException -> IO ()
childHandler err = catchException (real_handler err) childHandler
(<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
In the paper, 'throwTo' is non-blocking; but the library implementation adopts
a more synchronous design in which 'throwTo' does not return until the exception
-is received by the target thread. The trade-off is discussed in Section 8 of the paper.
-Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of
+is received by the target thread. The trade-off is discussed in Section 9 of the paper.
+Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
the paper).
There is currently no guarantee that the exception delivered by 'throwTo' will be
-delivered at the first possible opportunity. In particular, if a thread may
+delivered at the first possible opportunity. In particular, a thread may
unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
a pending 'throwTo'. This is arguably undesirable behaviour.
labelThread :: ThreadId -> String -> IO ()
labelThread (ThreadId t) str = IO $ \ s ->
- let ps = packCString# str
- adr = byteArrayContents# ps in
+ let !ps = packCString# str
+ !adr = byteArrayContents# ps in
case (labelThread# t adr s) of s1 -> (# s1, () #)
-- Nota Bene: 'pseq' used to be 'seq'
par :: a -> b -> b
par x y = case (par# x) of { _ -> lazy y }
+-- | Internal function used by the RTS to run sparks.
+runSparks :: IO ()
+runSparks = IO loop
+ where loop s = case getSpark# s of
+ (# s', n, p #) ->
+ if n ==# 0# then (# s', () #)
+ else p `seq` loop s'
data BlockReason
= BlockedOnMVar
-- of those points then the transaction violating it is aborted
-- and the exception raised by the invariant is propagated.
alwaysSucceeds :: STM a -> STM ()
-alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
+alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
checkInv i
-- | always is a variant of alwaysSucceeds in which the invariant is
\end{code}
-%************************************************************************
-%* *
-\subsection[mvars]{M-Structures}
-%* *
-%************************************************************************
-
-M-Vars are rendezvous points for concurrent threads. They begin
-empty, and any attempt to read an empty M-Var blocks. When an M-Var
-is written, a single blocked thread may be freed. Reading an M-Var
-toggles its state from full back to empty. Therefore, any value
-written to an M-Var may only be read once. Multiple reads and writes
-are allowed, but there must be at least one read between any two
-writes.
+MVar utilities
\begin{code}
---Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
-
--- |Create an 'MVar' which is initially empty.
-newEmptyMVar :: IO (MVar a)
-newEmptyMVar = IO $ \ s# ->
- case newMVar# s# of
- (# s2#, svar# #) -> (# s2#, MVar svar# #)
-
--- |Create an 'MVar' which contains the supplied value.
-newMVar :: a -> IO (MVar a)
-newMVar value =
- newEmptyMVar >>= \ mvar ->
- putMVar mvar value >>
- return mvar
-
--- |Return the contents of the 'MVar'. If the 'MVar' is currently
--- empty, 'takeMVar' will wait until it is full. After a 'takeMVar',
--- the 'MVar' is left empty.
---
--- There are two further important properties of 'takeMVar':
---
--- * 'takeMVar' is single-wakeup. That is, if there are multiple
--- threads blocked in 'takeMVar', and the 'MVar' becomes full,
--- only one thread will be woken up. The runtime guarantees that
--- the woken thread completes its 'takeMVar' operation.
---
--- * When multiple threads are blocked on an 'MVar', they are
--- woken up in FIFO order. This is useful for providing
--- fairness properties of abstractions built using 'MVar's.
---
-takeMVar :: MVar a -> IO a
-takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
-
--- |Put a value into an 'MVar'. If the 'MVar' is currently full,
--- 'putMVar' will wait until it becomes empty.
---
--- There are two further important properties of 'putMVar':
---
--- * 'putMVar' is single-wakeup. That is, if there are multiple
--- threads blocked in 'putMVar', and the 'MVar' becomes empty,
--- only one thread will be woken up. The runtime guarantees that
--- the woken thread completes its 'putMVar' operation.
---
--- * When multiple threads are blocked on an 'MVar', they are
--- woken up in FIFO order. This is useful for providing
--- fairness properties of abstractions built using 'MVar's.
---
-putMVar :: MVar a -> a -> IO ()
-putMVar (MVar mvar#) x = IO $ \ s# ->
- case putMVar# mvar# x s# of
- s2# -> (# s2#, () #)
-
--- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function
--- returns immediately, with 'Nothing' if the 'MVar' was empty, or
--- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar',
--- the 'MVar' is left empty.
-tryTakeMVar :: MVar a -> IO (Maybe a)
-tryTakeMVar (MVar m) = IO $ \ s ->
- case tryTakeMVar# m s of
- (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty
- (# s', _, a #) -> (# s', Just a #) -- MVar is full
-
--- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function
--- attempts to put the value @a@ into the 'MVar', returning 'True' if
--- it was successful, or 'False' otherwise.
-tryPutMVar :: MVar a -> a -> IO Bool
-tryPutMVar (MVar mvar#) x = IO $ \ s# ->
- case tryPutMVar# mvar# x s# of
- (# s, 0# #) -> (# s, False #)
- (# s, _ #) -> (# s, True #)
-
--- |Check whether a given 'MVar' is empty.
---
--- Notice that the boolean value returned is just a snapshot of
--- the state of the MVar. By the time you get to react on its result,
--- the MVar may have been filled (or emptied) - so be extremely
--- careful when using this operation. Use 'tryTakeMVar' instead if possible.
-isEmptyMVar :: MVar a -> IO Bool
-isEmptyMVar (MVar mv#) = IO $ \ s# ->
- case isEmptyMVar# mv# s# of
- (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
-
--- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and
--- "System.Mem.Weak" for more about finalizers.
-addMVarFinalizer :: MVar a -> IO () -> IO ()
-addMVarFinalizer (MVar m) finalizer =
- IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
-
withMVar :: MVar a -> (a -> IO b) -> IO b
withMVar m io =
block $ do
return b
\end{code}
-
%************************************************************************
%* *
\subsection{Thread waiting}
type USecs = Word64
--- XXX: move into GHC.IOBase from Data.IORef?
-atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
-atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
-
foreign import ccall unsafe "getUSecOfDay"
getUSecOfDay :: IO USecs
startIOManagerThread :: IO ()
startIOManagerThread = do
wakeup <- c_getIOManagerEvent
- forkIO $ service_loop wakeup []
+ _ <- forkIO $ service_loop wakeup []
return ()
service_loop :: HANDLE -- read end of pipe
_ | r2 == io_MANAGER_DIE -> return True
0 -> return False -- spurious wakeup
_ -> do start_console_handler (r2 `shiftR` 1); return False
- if exit
- then return ()
- else service_cont wakeup delays'
+ unless exit $ service_cont wakeup delays'
_other -> service_cont wakeup delays' -- probably timeout
service_cont :: HANDLE -> [DelayReq] -> IO ()
service_cont wakeup delays = do
- atomicModifyIORef prodding (\_ -> (False,False))
+ r <- atomicModifyIORef prodding (\_ -> (False,False))
+ r `seq` return () -- avoid space leak
service_loop wakeup delays
-- must agree with rts/win32/ThrIOManager.c
start_console_handler r =
case toWin32ConsoleEvent r of
Just x -> withMVar win32ConsoleHandler $ \handler -> do
- forkIO (handler x)
+ _ <- forkIO (handler x)
return ()
Nothing -> return ()
startIOManagerThread :: IO ()
startIOManagerThread = do
allocaArray 2 $ \fds -> do
- throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
+ throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
rd_end <- peekElemOff fds 0
wr_end <- peekElemOff fds 1
+ setNonBlockingFD wr_end True -- writes happen in a signal handler, we
+ -- don't want them to block.
+ setCloseOnExec rd_end
+ setCloseOnExec wr_end
writeIORef stick (fromIntegral wr_end)
c_setIOManagerPipe wr_end
- forkIO $ do
+ _ <- forkIO $ do
allocaBytes sizeofFdSet $ \readfds -> do
allocaBytes sizeofFdSet $ \writefds -> do
allocaBytes sizeofTimeVal $ \timeval -> do
if b == 0
then return False
else alloca $ \p -> do
- c_read (fromIntegral wakeup) p 1; return ()
+ warnErrnoIfMinus1_ "service_loop" $
+ c_read (fromIntegral wakeup) p 1
s <- peek p
case s of
_ | s == io_MANAGER_WAKEUP -> return False
_ | s == io_MANAGER_DIE -> return True
- _ -> withMVar signalHandlerLock $ \_ -> do
- handler_tbl <- peek handlers
- sp <- peekElemOff handler_tbl (fromIntegral s)
- io <- deRefStablePtr sp
- forkIO io
- return False
-
- if exit then return () else do
-
- atomicModifyIORef prodding (\_ -> (False,False))
+ _ | s == io_MANAGER_SYNC -> do
+ mvars <- readIORef sync
+ mapM_ (flip putMVar ()) mvars
+ return False
+ _ -> do
+ fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
+ withForeignPtr fp $ \p_siginfo -> do
+ r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
+ sizeof_siginfo_t
+ when (r /= fromIntegral sizeof_siginfo_t) $
+ error "failed to read siginfo_t"
+ runHandlers' fp (fromIntegral s)
+ return False
+
+ unless exit $ do
+
+ atomicModifyIORef prodding (\_ -> (False, ()))
reqs' <- if wakeup_all then do wakeupAll reqs; return []
else completeRequests reqs readfds writefds []
service_loop wakeup readfds writefds ptimeval reqs' delays'
-io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar
+io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8
io_MANAGER_WAKEUP = 0xff
io_MANAGER_DIE = 0xfe
+io_MANAGER_SYNC = 0xfd
+-- | the stick is for poking the IO manager with
stick :: IORef Fd
{-# NOINLINE stick #-}
stick = unsafePerformIO (newIORef 0)
+{-# NOINLINE sync #-}
+sync :: IORef [MVar ()]
+sync = unsafePerformIO (newIORef [])
+
+-- waits for the IO manager to drain the pipe
+syncIOManager :: IO ()
+syncIOManager = do
+ m <- newEmptyMVar
+ atomicModifyIORef sync (\old -> (m:old,()))
+ fd <- readIORef stick
+ with io_MANAGER_SYNC $ \pbuf -> do
+ warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
+ takeMVar m
+
wakeupIOManager :: IO ()
wakeupIOManager = do
fd <- readIORef stick
with io_MANAGER_WAKEUP $ \pbuf -> do
- c_write (fromIntegral fd) pbuf 1; return ()
+ warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
+
+-- For the non-threaded RTS
+runHandlers :: Ptr Word8 -> Int -> IO ()
+runHandlers p_info sig = do
+ fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
+ withForeignPtr fp $ \p -> do
+ copyBytes p p_info (fromIntegral sizeof_siginfo_t)
+ free p_info
+ runHandlers' fp (fromIntegral sig)
+
+runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
+runHandlers' p_info sig = do
+ let int = fromIntegral sig
+ withMVar signal_handlers $ \arr ->
+ if not (inRange (boundsIOArray arr) int)
+ then return ()
+ else do handler <- unsafeReadIOArray arr int
+ case handler of
+ Nothing -> return ()
+ Just (f,_) -> do _ <- forkIO (f p_info)
+ return ()
+
+warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
+warnErrnoIfMinus1_ what io
+ = do r <- io
+ when (r == -1) $ do
+ errno <- getErrno
+ str <- strerror errno >>= peekCString
+ when (r == -1) $
+ debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
+
+foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
+
+foreign import ccall "setIOManagerPipe"
+ c_setIOManagerPipe :: CInt -> IO ()
+
+foreign import ccall "__hscore_sizeof_siginfo_t"
+ sizeof_siginfo_t :: CSize
+
+type Signal = CInt
+
+maxSig = 64 :: Int
+
+type HandlerFun = ForeignPtr Word8 -> IO ()
-- Lock used to protect concurrent access to signal_handlers. Symptom of
-- this race condition is #1922, although that bug was on Windows a similar
-- bug also exists on Unix.
-signalHandlerLock :: MVar ()
-signalHandlerLock = unsafePerformIO (newMVar ())
-
-foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ())))
-
-foreign import ccall "setIOManagerPipe"
- c_setIOManagerPipe :: CInt -> IO ()
+{-# NOINLINE signal_handlers #-}
+signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
+signal_handlers = unsafePerformIO $ do
+ arr <- newIOArray (0,maxSig) Nothing
+ m <- newMVar arr
+ block $ do
+ stable_ref <- newStablePtr m
+ let ref = castStablePtrToPtr stable_ref
+ ref2 <- getOrSetSignalHandlerStore ref
+ if ref==ref2
+ then return m
+ else do freeStablePtr stable_ref
+ deRefStablePtr (castPtrToStablePtr ref2)
+
+foreign import ccall unsafe "getOrSetSignalHandlerStore"
+ getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
+
+setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
+setHandler sig handler = do
+ let int = fromIntegral sig
+ withMVar signal_handlers $ \arr ->
+ if not (inRange (boundsIOArray arr) int)
+ then error "GHC.Conc.setHandler: signal out of range"
+ else do old <- unsafeReadIOArray arr int
+ unsafeWriteIOArray arr int handler
+ return old
-- -----------------------------------------------------------------------------
-- IO requests
#endif
-reportStackOverflow :: IO a
-reportStackOverflow = do callStackOverflowHook; return undefined
+reportStackOverflow :: IO ()
+reportStackOverflow = callStackOverflowHook
-reportError :: SomeException -> IO a
+reportError :: SomeException -> IO ()
reportError ex = do
handler <- getUncaughtExceptionHandler
handler ex
- return undefined
-- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
-- the unsafe below.
getUncaughtExceptionHandler :: IO (SomeException -> IO ())
getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
+
\end{code}