import Foreign
import Foreign.C
+#ifdef mingw32_HOST_OS
+import Data.Typeable
+#endif
+
#ifndef mingw32_HOST_OS
import Data.Dynamic
-import Control.Monad
#endif
+import Control.Monad
import Data.Maybe
import GHC.Base
+#ifndef mingw32_HOST_OS
+import GHC.Debug
+#endif
import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
import GHC.IO
#endif
#ifdef mingw32_HOST_OS
import GHC.Real ( div )
-import GHC.Ptr ( plusPtr, FunPtr(..) )
+import GHC.Ptr
#endif
#ifdef mingw32_HOST_OS
import GHC.Read ( Read )
import GHC.Enum ( Enum )
#endif
import GHC.Pack ( packCString# )
-import GHC.Ptr ( Ptr(..) )
import GHC.Show ( Show(..), showString )
-import Data.Typeable
-import GHC.Err
infixr 0 `par`, `pseq`
\end{code}
(see 'Control.Exception.block').
The newly created thread has an exception handler that discards the
-exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
+exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and
'ThreadKilled', and passes all other exceptions to the uncaught
exception handler (see 'setUncaughtExceptionHandler').
-}
real_handler se@(SomeException ex) =
-- ignore thread GC and killThread exceptions:
case cast ex of
- Just BlockedOnDeadMVar -> return ()
+ Just BlockedIndefinitelyOnMVar -> return ()
_ -> case cast ex of
- Just BlockedIndefinitely -> return ()
+ Just BlockedIndefinitelyOnSTM -> return ()
_ -> case cast ex of
Just ThreadKilled -> return ()
_ -> case cast ex of
Just StackOverflow -> reportStackOverflow
_ -> reportError se
-{- | 'killThread' terminates the given thread (GHC only).
-Any work already done by the thread isn\'t
-lost: the computation is suspended until required by another thread.
-The memory used by the thread will be garbage collected if it isn\'t
-referenced from anywhere. The 'killThread' function is defined in
-terms of 'throwTo':
+{- | 'killThread' raises the 'ThreadKilled' exception in the given
+thread (GHC only).
> killThread tid = throwTo tid ThreadKilled
-Killthread is a no-op if the target thread has already completed.
-}
killThread :: ThreadId -> IO ()
killThread tid = throwTo tid ThreadKilled
can kill each other, it is guaranteed that only one of the threads
will get to kill the other.
+Whatever work the target thread was doing when the exception was
+raised is not lost: the computation is suspended until required by
+another thread.
+
If the target thread is currently making a foreign call, then the
exception will not be raised (and hence 'throwTo' will not return)
until the call has completed. This is the case regardless of whether
a more synchronous design in which 'throwTo' does not return until the exception
is received by the target thread. The trade-off is discussed in Section 9 of the paper.
Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
-the paper).
+the paper). Unlike other interruptible operations, however, 'throwTo'
+is /always/ interruptible, even if it does not actually block.
+
+There is no guarantee that the exception will be delivered promptly,
+although the runtime will endeavour to ensure that arbitrary
+delays don't occur. In GHC, an exception can only be raised when a
+thread reaches a /safe point/, where a safe point is where memory
+allocation occurs. Some loops do not perform any memory allocation
+inside the loop and therefore cannot be interrupted by a 'throwTo'.
-There is currently no guarantee that the exception delivered by 'throwTo' will be
-delivered at the first possible opportunity. In particular, a thread may
-unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
-a pending 'throwTo'. This is arguably undesirable behaviour.
+Blocked 'throwTo' is fair: if multiple threads are trying to throw an
+exception to the same target thread, they will succeed in FIFO order.
- -}
+ -}
throwTo :: Exception e => ThreadId -> e -> IO ()
throwTo (ThreadId tid) ex = IO $ \ s ->
case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
returnSTM :: a -> STM a
returnSTM x = STM (\s -> (# s, x #))
+instance MonadPlus STM where
+ mzero = retry
+ mplus = orElse
+
-- | Unsafely performs IO in the STM monad. Beware: this is a highly
-- dangerous thing to do.
--
-- of those points then the transaction violating it is aborted
-- and the exception raised by the invariant is propagated.
alwaysSucceeds :: STM a -> STM ()
-alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () )
+alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () )
checkInv i
-- | always is a variant of alwaysSucceeds in which the invariant is
(\e -> do putMVar m a; throw e)
putMVar m a
return b
+
+modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
+modifyMVar_ m io =
+ block $ do
+ a <- takeMVar m
+ a' <- catchAny (unblock (io a))
+ (\e -> do putMVar m a; throw e)
+ putMVar m a'
+ return ()
\end{code}
%************************************************************************
-- around the scheduler loop. Furthermore, the scheduler can be simplified
-- by not having to check for completed IO requests.
--- Issues, possible problems:
---
--- - we might want bound threads to just do the blocking
--- operation rather than communicating with the IO manager
--- thread. This would prevent simgle-threaded programs which do
--- IO from requiring multiple OS threads. However, it would also
--- prevent bound threads waiting on IO from being killed or sent
--- exceptions.
---
--- - Apprently exec() doesn't work on Linux in a multithreaded program.
--- I couldn't repeat this.
---
--- - How do we handle signal delivery in the multithreaded RTS?
---
--- - forkProcess will kill the IO manager thread. Let's just
--- hope we don't need to do any blocking IO between fork & exec.
-
#ifndef mingw32_HOST_OS
data IOReq
= Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
| DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
#ifndef mingw32_HOST_OS
+{-# NOINLINE pendingEvents #-}
pendingEvents :: IORef [IOReq]
+pendingEvents = unsafePerformIO $ do
+ m <- newIORef []
+ sharedCAF m getOrSetGHCConcPendingEventsStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
+ getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
#endif
-pendingDelays :: IORef [DelayReq]
- -- could use a strict list or array here
-{-# NOINLINE pendingEvents #-}
+
{-# NOINLINE pendingDelays #-}
-(pendingEvents,pendingDelays) = unsafePerformIO $ do
- startIOManagerThread
- reqs <- newIORef []
- dels <- newIORef []
- return (reqs, dels)
- -- the first time we schedule an IO request, the service thread
- -- will be created (cool, huh?)
+pendingDelays :: IORef [DelayReq]
+pendingDelays = unsafePerformIO $ do
+ m <- newIORef []
+ sharedCAF m getOrSetGHCConcPendingDelaysStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
+ getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ do
+ m <- newMVar Nothing
+ sharedCAF m getOrSetGHCConcIOManagerThreadStore
+
+foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
+ getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
ensureIOManagerIsRunning :: IO ()
ensureIOManagerIsRunning
- | threaded = seq pendingEvents $ return ()
+ | threaded = startIOManagerThread
| otherwise = return ()
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+ modifyMVar_ ioManagerThread $ \old -> do
+ let create = do t <- forkIO ioManager; return (Just t)
+ case old of
+ Nothing -> create
+ Just t -> do
+ s <- threadStatus t
+ case s of
+ ThreadFinished -> create
+ ThreadDied -> create
+ _other -> return (Just t)
+
insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
insertDelay d [] = [d]
insertDelay d1 ds@(d2 : rest)
foreign import ccall unsafe "getUSecOfDay"
getUSecOfDay :: IO USecs
-prodding :: IORef Bool
{-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newIORef False)
+prodding :: IORef Bool
+prodding = unsafePerformIO $ do
+ r <- newIORef False
+ sharedCAF r getOrSetGHCConcProddingStore
+
+foreign import ccall unsafe "getOrSetGHCConcProddingStore"
+ getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
prodServiceThread :: IO ()
prodServiceThread = do
- was_set <- atomicModifyIORef prodding (\a -> (True,a))
- if (not (was_set)) then wakeupIOManager else return ()
+ -- NB. use atomicModifyIORef here, otherwise there are race
+ -- conditions in which prodding is left at True but the server is
+ -- blocked in select().
+ was_set <- atomicModifyIORef prodding $ \b -> (True,b)
+ unless was_set wakeupIOManager
+
+-- Machinery needed to ensure that we only have one copy of certain
+-- CAFs in this module even when the base package is present twice, as
+-- it is when base is dynamically loaded into GHCi. The RTS keeps
+-- track of the single true value of the CAF, so even when the CAFs in
+-- the dynamically-loaded base package are reverted, nothing bad
+-- happens.
+--
+sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
+sharedCAF a get_or_set =
+ block $ do
+ stable_ref <- newStablePtr a
+ let ref = castPtr (castStablePtrToPtr stable_ref)
+ ref2 <- get_or_set ref
+ if ref==ref2
+ then return a
+ else do freeStablePtr stable_ref
+ deRefStablePtr (castPtrToStablePtr (castPtr ref2))
#ifdef mingw32_HOST_OS
-- ----------------------------------------------------------------------------
-- Windows IO manager thread
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
wakeup <- c_getIOManagerEvent
- forkIO $ service_loop wakeup []
- return ()
+ service_loop wakeup []
service_loop :: HANDLE -- read end of pipe
-> [DelayReq] -- current delay requests
_ | r2 == io_MANAGER_DIE -> return True
0 -> return False -- spurious wakeup
_ -> do start_console_handler (r2 `shiftR` 1); return False
- if exit
- then return ()
- else service_cont wakeup delays'
+ unless exit $ service_cont wakeup delays'
_other -> service_cont wakeup delays' -- probably timeout
start_console_handler r =
case toWin32ConsoleEvent r of
Just x -> withMVar win32ConsoleHandler $ \handler -> do
- forkIO (handler x)
+ _ <- forkIO (handler x)
return ()
Nothing -> return ()
win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
--- XXX Is this actually needed?
-stick :: IORef HANDLE
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef nullPtr)
-
wakeupIOManager :: IO ()
-wakeupIOManager = do
- _hdl <- readIORef stick
- c_sendIOManagerEvent io_MANAGER_WAKEUP
+wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- ----------------------------------------------------------------------------
-- Unix IO manager thread, using select()
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
allocaArray 2 $ \fds -> do
- throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
+ throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
rd_end <- peekElemOff fds 0
wr_end <- peekElemOff fds 1
- setNonBlockingFD wr_end -- writes happen in a signal handler, we
- -- don't want them to block.
+ setNonBlockingFD wr_end True -- writes happen in a signal handler, we
+ -- don't want them to block.
setCloseOnExec rd_end
setCloseOnExec wr_end
- writeIORef stick (fromIntegral wr_end)
c_setIOManagerPipe wr_end
- forkIO $ do
- allocaBytes sizeofFdSet $ \readfds -> do
- allocaBytes sizeofFdSet $ \writefds -> do
- allocaBytes sizeofTimeVal $ \timeval -> do
- service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+ allocaBytes sizeofFdSet $ \readfds -> do
+ allocaBytes sizeofFdSet $ \writefds -> do
+ allocaBytes sizeofTimeVal $ \timeval -> do
+ service_loop (fromIntegral rd_end) readfds writefds timeval [] []
return ()
service_loop
-> IO ()
service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
+ -- reset prodding before we look at the new requests. If a new
+ -- client arrives after this point they will send a wakup which will
+ -- cause the server to loop around again, so we can be sure to not
+ -- miss any requests.
+ --
+ -- NB. it's important to do this in the *first* iteration of
+ -- service_loop, rather than after calling select(), since a client
+ -- may have set prodding to True without sending a wakeup byte down
+ -- the pipe, because the pipe wasn't set up.
+ atomicModifyIORef prodding (\_ -> (False, ()))
+
-- pick up new IO requests
new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
let reqs = new_reqs ++ old_reqs
if b == 0
then return False
else alloca $ \p -> do
- c_read (fromIntegral wakeup) p 1
+ warnErrnoIfMinus1_ "service_loop" $
+ c_read (fromIntegral wakeup) p 1
s <- peek p
case s of
_ | s == io_MANAGER_WAKEUP -> return False
runHandlers' fp (fromIntegral s)
return False
- if exit then return () else do
-
- atomicModifyIORef prodding (\_ -> (False,False))
+ unless exit $ do
reqs' <- if wakeup_all then do wakeupAll reqs; return []
else completeRequests reqs readfds writefds []
io_MANAGER_DIE = 0xfe
io_MANAGER_SYNC = 0xfd
--- | the stick is for poking the IO manager with
-stick :: IORef Fd
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef 0)
-
{-# NOINLINE sync #-}
sync :: IORef [MVar ()]
sync = unsafePerformIO (newIORef [])
syncIOManager = do
m <- newEmptyMVar
atomicModifyIORef sync (\old -> (m:old,()))
- fd <- readIORef stick
- with io_MANAGER_SYNC $ \pbuf -> do
- c_write (fromIntegral fd) pbuf 1; return ()
+ c_ioManagerSync
takeMVar m
-wakeupIOManager :: IO ()
-wakeupIOManager = do
- fd <- readIORef stick
- with io_MANAGER_WAKEUP $ \pbuf -> do
- c_write (fromIntegral fd) pbuf 1; return ()
+foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO ()
+foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
-- For the non-threaded RTS
runHandlers :: Ptr Word8 -> Int -> IO ()
else do handler <- unsafeReadIOArray arr int
case handler of
Nothing -> return ()
- Just (f,_) -> do forkIO (f p_info); return ()
+ Just (f,_) -> do _ <- forkIO (f p_info)
+ return ()
+
+warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO ()
+warnErrnoIfMinus1_ what io
+ = do r <- io
+ when (r == -1) $ do
+ errno <- getErrno
+ str <- strerror errno >>= peekCString
+ when (r == -1) $
+ debugErrLn ("Warning: " ++ what ++ " failed: " ++ str)
+
+foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar)
foreign import ccall "setIOManagerPipe"
c_setIOManagerPipe :: CInt -> IO ()
signal_handlers = unsafePerformIO $ do
arr <- newIOArray (0,maxSig) Nothing
m <- newMVar arr
- block $ do
- stable_ref <- newStablePtr m
- let ref = castStablePtrToPtr stable_ref
- ref2 <- getOrSetSignalHandlerStore ref
- if ref==ref2
- then return m
- else do freeStablePtr stable_ref
- deRefStablePtr (castPtrToStablePtr ref2)
+ sharedCAF m getOrSetGHCConcSignalHandlerStore
-foreign import ccall unsafe "getOrSetSignalHandlerStore"
- getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
+foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
+ getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
setHandler sig handler = do
data CFdSet
-foreign import ccall safe "select"
+foreign import ccall safe "__hscore_select"
c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
-> IO CInt
#endif
-reportStackOverflow :: IO a
-reportStackOverflow = do callStackOverflowHook; return undefined
+reportStackOverflow :: IO ()
+reportStackOverflow = callStackOverflowHook
-reportError :: SomeException -> IO a
+reportError :: SomeException -> IO ()
reportError ex = do
handler <- getUncaughtExceptionHandler
handler ex
- return undefined
-- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove
-- the unsafe below.
getUncaughtExceptionHandler :: IO (SomeException -> IO ())
getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
+
\end{code}