X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=0d174578a65b55bb9f64c02ee77a0ad5da47b9e5;hb=0bf2fdab482da7a287ef09f18e7656abe62256d0;hp=2d623081b02c6f1695dcd7e02f176b10e6976def;hpb=d2063b5b0be014545b21819172c87756efcb0b0c;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 2d62308..0d17457 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -29,8 +29,11 @@ module GHC.Conc -- * Forking and suchlike , forkIO -- :: IO a -> IO ThreadId + , forkIOUnmasked , forkOnIO -- :: Int -> IO a -> IO ThreadId + , forkOnIOUnmasked , numCapabilities -- :: Int + , numSparks -- :: IO Int , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () @@ -104,13 +107,20 @@ import System.Posix.Internals import Foreign import Foreign.C +#ifdef mingw32_HOST_OS +import Data.Typeable +#endif + #ifndef mingw32_HOST_OS import Data.Dynamic -import Control.Monad #endif +import Control.Monad import Data.Maybe import GHC.Base +#ifndef mingw32_HOST_OS +import GHC.Debug +#endif import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) import GHC.IO @@ -126,17 +136,14 @@ import GHC.Arr ( inRange ) #endif #ifdef mingw32_HOST_OS import GHC.Real ( div ) -import GHC.Ptr ( plusPtr, FunPtr(..) ) +import GHC.Ptr #endif #ifdef mingw32_HOST_OS import GHC.Read ( Read ) import GHC.Enum ( Enum ) #endif import GHC.Pack ( packCString# ) -import GHC.Ptr ( Ptr(..) ) import GHC.Show ( Show(..), showString ) -import Data.Typeable -import GHC.Err infixr 0 `par`, `pseq` \end{code} @@ -207,11 +214,11 @@ thread. The new thread will be a lightweight thread; if you want to use a foreign library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. -GHC note: the new thread inherits the /blocked/ state of the parent -(see 'Control.Exception.block'). +GHC note: the new thread inherits the /masked/ state of the parent +(see 'Control.Exception.mask'). The newly created thread has an exception handler that discards the -exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and +exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and 'ThreadKilled', and passes all other exceptions to the uncaught exception handler (see 'setUncaughtExceptionHandler'). -} @@ -221,6 +228,11 @@ forkIO action = IO $ \ s -> where action_plus = catchException action childHandler +-- | Like 'forkIO', but the child thread is created with asynchronous exceptions +-- unmasked (see 'Control.Exception.mask'). +forkIOUnmasked :: IO () -> IO ThreadId +forkIOUnmasked io = forkIO (unsafeUnmask io) + {- | Like 'forkIO', but lets you specify on which CPU the thread is created. Unlike a `forkIO` thread, a thread created by `forkOnIO` @@ -240,6 +252,11 @@ forkOnIO (I# cpu) action = IO $ \ s -> where action_plus = catchException action childHandler +-- | Like 'forkOnIO', but the child thread is created with +-- asynchronous exceptions unmasked (see 'Control.Exception.mask'). +forkOnIOUnmasked :: Int -> IO () -> IO ThreadId +forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io) + -- | the value passed to the @+RTS -N@ flag. This is the number of -- Haskell threads that can run truly simultaneously at any given -- time, and is typically set to the number of physical CPU cores on @@ -249,6 +266,10 @@ numCapabilities = unsafePerformIO $ do n <- peek n_capabilities return (fromIntegral n) +-- | Returns the number of sparks currently in the local spark pool +numSparks :: IO Int +numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #) + #if defined(mingw32_HOST_OS) && defined(__PIC__) foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt #else @@ -261,9 +282,9 @@ real_handler :: SomeException -> IO () real_handler se@(SomeException ex) = -- ignore thread GC and killThread exceptions: case cast ex of - Just BlockedOnDeadMVar -> return () + Just BlockedIndefinitelyOnMVar -> return () _ -> case cast ex of - Just BlockedIndefinitely -> return () + Just BlockedIndefinitelyOnSTM -> return () _ -> case cast ex of Just ThreadKilled -> return () _ -> case cast ex of @@ -271,16 +292,11 @@ real_handler se@(SomeException ex) = Just StackOverflow -> reportStackOverflow _ -> reportError se -{- | 'killThread' terminates the given thread (GHC only). -Any work already done by the thread isn\'t -lost: the computation is suspended until required by another thread. -The memory used by the thread will be garbage collected if it isn\'t -referenced from anywhere. The 'killThread' function is defined in -terms of 'throwTo': +{- | 'killThread' raises the 'ThreadKilled' exception in the given +thread (GHC only). > killThread tid = throwTo tid ThreadKilled -Killthread is a no-op if the target thread has already completed. -} killThread :: ThreadId -> IO () killThread tid = throwTo tid ThreadKilled @@ -295,10 +311,14 @@ when dealing with race conditions: eg. if there are two threads that can kill each other, it is guaranteed that only one of the threads will get to kill the other. +Whatever work the target thread was doing when the exception was +raised is not lost: the computation is suspended until required by +another thread. + If the target thread is currently making a foreign call, then the exception will not be raised (and hence 'throwTo' will not return) until the call has completed. This is the case regardless of whether -the call is inside a 'block' or not. +the call is inside a 'mask' or not. Important note: the behaviour of 'throwTo' differs from that described in the paper \"Asynchronous exceptions in Haskell\" @@ -307,14 +327,20 @@ In the paper, 'throwTo' is non-blocking; but the library implementation adopts a more synchronous design in which 'throwTo' does not return until the exception is received by the target thread. The trade-off is discussed in Section 9 of the paper. Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of -the paper). +the paper). Unlike other interruptible operations, however, 'throwTo' +is /always/ interruptible, even if it does not actually block. + +There is no guarantee that the exception will be delivered promptly, +although the runtime will endeavour to ensure that arbitrary +delays don't occur. In GHC, an exception can only be raised when a +thread reaches a /safe point/, where a safe point is where memory +allocation occurs. Some loops do not perform any memory allocation +inside the loop and therefore cannot be interrupted by a 'throwTo'. -There is currently no guarantee that the exception delivered by 'throwTo' will be -delivered at the first possible opportunity. In particular, a thread may -unblock and then re-block exceptions (using 'unblock' and 'block') without receiving -a pending 'throwTo'. This is arguably undesirable behaviour. +Blocked 'throwTo' is fair: if multiple threads are trying to throw an +exception to the same target thread, they will succeed in FIFO order. - -} + -} throwTo :: Exception e => ThreadId -> e -> IO () throwTo (ThreadId tid) ex = IO $ \ s -> case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) @@ -468,6 +494,10 @@ thenSTM (STM m) k = STM ( \s -> returnSTM :: a -> STM a returnSTM x = STM (\s -> (# s, x #)) +instance MonadPlus STM where + mzero = retry + mplus = orElse + -- | Unsafely performs IO in the STM monad. Beware: this is a highly -- dangerous thing to do. -- @@ -538,7 +568,7 @@ checkInv (STM m) = STM (\s -> (check# m) s) -- of those points then the transaction violating it is aborted -- and the exception raised by the invariant is propagated. alwaysSucceeds :: STM a -> STM () -alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) +alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) checkInv i -- | always is a variant of alwaysSucceeds in which the invariant is @@ -598,12 +628,21 @@ MVar utilities \begin{code} withMVar :: MVar a -> (a -> IO b) -> IO b withMVar m io = - block $ do + mask $ \restore -> do a <- takeMVar m - b <- catchAny (unblock (io a)) + b <- catchAny (restore (io a)) (\e -> do putMVar m a; throw e) putMVar m a return b + +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + mask $ \restore -> do + a <- takeMVar m + a' <- catchAny (restore (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () \end{code} %************************************************************************ @@ -741,23 +780,6 @@ calculateTarget usecs = do -- around the scheduler loop. Furthermore, the scheduler can be simplified -- by not having to check for completed IO requests. --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - #ifndef mingw32_HOST_OS data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) @@ -769,25 +791,52 @@ data DelayReq | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) #ifndef mingw32_HOST_OS +{-# NOINLINE pendingEvents #-} pendingEvents :: IORef [IOReq] +pendingEvents = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingEventsStore + +foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore" + getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a) #endif -pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here -{-# NOINLINE pendingEvents #-} + {-# NOINLINE pendingDelays #-} -(pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread - reqs <- newIORef [] - dels <- newIORef [] - return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore" + getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore" + getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a) ensureIOManagerIsRunning :: IO () ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () + | threaded = startIOManagerThread | otherwise = return () +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] insertDelay d [] = [d] insertDelay d1 ds@(d2 : rest) @@ -803,24 +852,49 @@ type USecs = Word64 foreign import ccall unsafe "getUSecOfDay" getUSecOfDay :: IO USecs -prodding :: IORef Bool {-# NOINLINE prodding #-} -prodding = unsafePerformIO (newIORef False) +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcProddingStore + +foreign import ccall unsafe "getOrSetGHCConcProddingStore" + getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a) prodServiceThread :: IO () prodServiceThread = do - was_set <- atomicModifyIORef prodding (\a -> (True,a)) - if (not (was_set)) then wakeupIOManager else return () + -- NB. use atomicModifyIORef here, otherwise there are race + -- conditions in which prodding is left at True but the server is + -- blocked in select(). + was_set <- atomicModifyIORef prodding $ \b -> (True,b) + unless was_set wakeupIOManager + +-- Machinery needed to ensure that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + mask_ $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref==ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) #ifdef mingw32_HOST_OS -- ---------------------------------------------------------------------------- -- Windows IO manager thread -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do wakeup <- c_getIOManagerEvent - forkIO $ service_loop wakeup [] - return () + service_loop wakeup [] service_loop :: HANDLE -- read end of pipe -> [DelayReq] -- current delay requests @@ -845,9 +919,7 @@ service_loop wakeup old_delays = do _ | r2 == io_MANAGER_DIE -> return True 0 -> return False -- spurious wakeup _ -> do start_console_handler (r2 `shiftR` 1); return False - if exit - then return () - else service_cont wakeup delays' + unless exit $ service_cont wakeup delays' _other -> service_cont wakeup delays' -- probably timeout @@ -875,7 +947,7 @@ start_console_handler :: Word32 -> IO () start_console_handler r = case toWin32ConsoleEvent r of Just x -> withMVar win32ConsoleHandler $ \handler -> do - forkIO (handler x) + _ <- forkIO (handler x) return () Nothing -> return () @@ -892,15 +964,8 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) --- XXX Is this actually needed? -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - wakeupIOManager :: IO () -wakeupIOManager = do - _hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending @@ -949,23 +1014,21 @@ foreign import stdcall "WaitForSingleObject" -- ---------------------------------------------------------------------------- -- Unix IO manager thread, using select() -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do allocaArray 2 $ \fds -> do - throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds) + throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 wr_end <- peekElemOff fds 1 - setNonBlockingFD wr_end -- writes happen in a signal handler, we - -- don't want them to block. + setNonBlockingFD wr_end True -- writes happen in a signal handler, we + -- don't want them to block. setCloseOnExec rd_end setCloseOnExec wr_end - writeIORef stick (fromIntegral wr_end) c_setIOManagerPipe wr_end - forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () service_loop @@ -978,6 +1041,17 @@ service_loop -> IO () service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do + -- reset prodding before we look at the new requests. If a new + -- client arrives after this point they will send a wakup which will + -- cause the server to loop around again, so we can be sure to not + -- miss any requests. + -- + -- NB. it's important to do this in the *first* iteration of + -- service_loop, rather than after calling select(), since a client + -- may have set prodding to True without sending a wakeup byte down + -- the pipe, because the pipe wasn't set up. + atomicModifyIORef prodding (\_ -> (False, ())) + -- pick up new IO requests new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) let reqs = new_reqs ++ old_reqs @@ -1026,7 +1100,8 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do if b == 0 then return False else alloca $ \p -> do - c_read (fromIntegral wakeup) p 1 + warnErrnoIfMinus1_ "service_loop" $ + c_read (fromIntegral wakeup) p 1 s <- peek p case s of _ | s == io_MANAGER_WAKEUP -> return False @@ -1045,25 +1120,18 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do runHandlers' fp (fromIntegral s) return False - if exit then return () else do - - atomicModifyIORef prodding (\_ -> (False,False)) + unless exit $ do reqs' <- if wakeup_all then do wakeupAll reqs; return [] else completeRequests reqs readfds writefds [] service_loop wakeup readfds writefds ptimeval reqs' delays' -io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar +io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8 io_MANAGER_WAKEUP = 0xff io_MANAGER_DIE = 0xfe io_MANAGER_SYNC = 0xfd --- | the stick is for poking the IO manager with -stick :: IORef Fd -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) - {-# NOINLINE sync #-} sync :: IORef [MVar ()] sync = unsafePerformIO (newIORef []) @@ -1073,16 +1141,11 @@ syncIOManager :: IO () syncIOManager = do m <- newEmptyMVar atomicModifyIORef sync (\old -> (m:old,())) - fd <- readIORef stick - with io_MANAGER_SYNC $ \pbuf -> do - c_write (fromIntegral fd) pbuf 1; return () + c_ioManagerSync takeMVar m -wakeupIOManager :: IO () -wakeupIOManager = do - fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - c_write (fromIntegral fd) pbuf 1; return () +foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO () +foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO () -- For the non-threaded RTS runHandlers :: Ptr Word8 -> Int -> IO () @@ -1102,7 +1165,19 @@ runHandlers' p_info sig = do else do handler <- unsafeReadIOArray arr int case handler of Nothing -> return () - Just (f,_) -> do forkIO (f p_info); return () + Just (f,_) -> do _ <- forkIO (f p_info) + return () + +warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO () +warnErrnoIfMinus1_ what io + = do r <- io + when (r == -1) $ do + errno <- getErrno + str <- strerror errno >>= peekCString + when (r == -1) $ + debugErrLn ("Warning: " ++ what ++ " failed: " ++ str) + +foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar) foreign import ccall "setIOManagerPipe" c_setIOManagerPipe :: CInt -> IO () @@ -1124,17 +1199,10 @@ signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) signal_handlers = unsafePerformIO $ do arr <- newIOArray (0,maxSig) Nothing m <- newMVar arr - block $ do - stable_ref <- newStablePtr m - let ref = castStablePtrToPtr stable_ref - ref2 <- getOrSetSignalHandlerStore ref - if ref==ref2 - then return m - else do freeStablePtr stable_ref - deRefStablePtr (castPtrToStablePtr ref2) + sharedCAF m getOrSetGHCConcSignalHandlerStore -foreign import ccall unsafe "getOrSetSignalHandlerStore" - getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a) +foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore" + getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a) setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) setHandler sig handler = do @@ -1236,7 +1304,7 @@ foreign import ccall unsafe "setTimevalTicks" data CFdSet -foreign import ccall safe "select" +foreign import ccall safe "__hscore_select" c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal -> IO CInt @@ -1266,14 +1334,13 @@ foreign import ccall unsafe "sizeof_fd_set" #endif -reportStackOverflow :: IO a -reportStackOverflow = do callStackOverflowHook; return undefined +reportStackOverflow :: IO () +reportStackOverflow = callStackOverflowHook -reportError :: SomeException -> IO a +reportError :: SomeException -> IO () reportError ex = do handler <- getUncaughtExceptionHandler handler ex - return undefined -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove -- the unsafe below. @@ -1306,4 +1373,5 @@ setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler getUncaughtExceptionHandler :: IO (SomeException -> IO ()) getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler + \end{code}