X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=ec6e0642d1fab84c8c393e99c4321a0ceaa49a7d;hb=7dfe4a22aa6a2c598b1496c661c7d532aaafa94f;hp=7f7d585d22a6b1db8b8f5ba36492e370b38484b8;hpb=f19c1c38568beb6d7c9080a8174083913e47849f;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 7f7d585..ec6e064 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -215,7 +215,7 @@ GHC note: the new thread inherits the /blocked/ state of the parent (see 'Control.Exception.block'). The newly created thread has an exception handler that discards the -exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and +exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and 'ThreadKilled', and passes all other exceptions to the uncaught exception handler (see 'setUncaughtExceptionHandler'). -} @@ -275,16 +275,11 @@ real_handler se@(SomeException ex) = Just StackOverflow -> reportStackOverflow _ -> reportError se -{- | 'killThread' terminates the given thread (GHC only). -Any work already done by the thread isn\'t -lost: the computation is suspended until required by another thread. -The memory used by the thread will be garbage collected if it isn\'t -referenced from anywhere. The 'killThread' function is defined in -terms of 'throwTo': +{- | 'killThread' raises the 'ThreadKilled' exception in the given +thread (GHC only). > killThread tid = throwTo tid ThreadKilled -Killthread is a no-op if the target thread has already completed. -} killThread :: ThreadId -> IO () killThread tid = throwTo tid ThreadKilled @@ -299,6 +294,10 @@ when dealing with race conditions: eg. if there are two threads that can kill each other, it is guaranteed that only one of the threads will get to kill the other. +Whatever work the target thread was doing when the exception was +raised is not lost: the computation is suspended until required by +another thread. + If the target thread is currently making a foreign call, then the exception will not be raised (and hence 'throwTo' will not return) until the call has completed. This is the case regardless of whether @@ -311,14 +310,20 @@ In the paper, 'throwTo' is non-blocking; but the library implementation adopts a more synchronous design in which 'throwTo' does not return until the exception is received by the target thread. The trade-off is discussed in Section 9 of the paper. Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of -the paper). +the paper). Unlike other interruptible operations, however, 'throwTo' +is /always/ interruptible, even if it does not actually block. + +There is no guarantee that the exception will be delivered promptly, +although the runtime will endeavour to ensure that arbitrary +delays don't occur. In GHC, an exception can only be raised when a +thread reaches a /safe point/, where a safe point is where memory +allocation occurs. Some loops do not perform any memory allocation +inside the loop and therefore cannot be interrupted by a 'throwTo'. -There is currently no guarantee that the exception delivered by 'throwTo' will be -delivered at the first possible opportunity. In particular, a thread may -unblock and then re-block exceptions (using 'unblock' and 'block') without receiving -a pending 'throwTo'. This is arguably undesirable behaviour. +Blocked 'throwTo' is fair: if multiple threads are trying to throw an +exception to the same target thread, they will succeed in FIFO order. - -} + -} throwTo :: Exception e => ThreadId -> e -> IO () throwTo (ThreadId tid) ex = IO $ \ s -> case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) @@ -472,6 +477,10 @@ thenSTM (STM m) k = STM ( \s -> returnSTM :: a -> STM a returnSTM x = STM (\s -> (# s, x #)) +instance MonadPlus STM where + mzero = retry + mplus = orElse + -- | Unsafely performs IO in the STM monad. Beware: this is a highly -- dangerous thing to do. -- @@ -608,6 +617,15 @@ withMVar m io = (\e -> do putMVar m a; throw e) putMVar m a return b + +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + block $ do + a <- takeMVar m + a' <- catchAny (unblock (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () \end{code} %************************************************************************ @@ -745,23 +763,6 @@ calculateTarget usecs = do -- around the scheduler loop. Furthermore, the scheduler can be simplified -- by not having to check for completed IO requests. --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - #ifndef mingw32_HOST_OS data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) @@ -773,25 +774,52 @@ data DelayReq | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) #ifndef mingw32_HOST_OS +{-# NOINLINE pendingEvents #-} pendingEvents :: IORef [IOReq] +pendingEvents = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingEventsStore + +foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore" + getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a) #endif -pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here -{-# NOINLINE pendingEvents #-} + {-# NOINLINE pendingDelays #-} -(pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread - reqs <- newIORef [] - dels <- newIORef [] - return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore" + getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore" + getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a) ensureIOManagerIsRunning :: IO () ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () + | threaded = startIOManagerThread | otherwise = return () +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] insertDelay d [] = [d] insertDelay d1 ds@(d2 : rest) @@ -807,24 +835,49 @@ type USecs = Word64 foreign import ccall unsafe "getUSecOfDay" getUSecOfDay :: IO USecs -prodding :: IORef Bool {-# NOINLINE prodding #-} -prodding = unsafePerformIO (newIORef False) +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcProddingStore + +foreign import ccall unsafe "getOrSetGHCConcProddingStore" + getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a) prodServiceThread :: IO () prodServiceThread = do - was_set <- atomicModifyIORef prodding (\a -> (True,a)) - if (not (was_set)) then wakeupIOManager else return () + -- NB. use atomicModifyIORef here, otherwise there are race + -- conditions in which prodding is left at True but the server is + -- blocked in select(). + was_set <- atomicModifyIORef prodding $ \b -> (True,b) + unless was_set wakeupIOManager + +-- Machinery needed to ensure that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + block $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref==ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) #ifdef mingw32_HOST_OS -- ---------------------------------------------------------------------------- -- Windows IO manager thread -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do wakeup <- c_getIOManagerEvent - _ <- forkIO $ service_loop wakeup [] - return () + service_loop wakeup [] service_loop :: HANDLE -- read end of pipe -> [DelayReq] -- current delay requests @@ -894,15 +947,8 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) --- XXX Is this actually needed? -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - wakeupIOManager :: IO () -wakeupIOManager = do - _hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending @@ -951,8 +997,8 @@ foreign import stdcall "WaitForSingleObject" -- ---------------------------------------------------------------------------- -- Unix IO manager thread, using select() -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do allocaArray 2 $ \fds -> do throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 @@ -961,13 +1007,11 @@ startIOManagerThread = do -- don't want them to block. setCloseOnExec rd_end setCloseOnExec wr_end - writeIORef stick (fromIntegral wr_end) c_setIOManagerPipe wr_end - _ <- forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () service_loop @@ -980,6 +1024,17 @@ service_loop -> IO () service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do + -- reset prodding before we look at the new requests. If a new + -- client arrives after this point they will send a wakup which will + -- cause the server to loop around again, so we can be sure to not + -- miss any requests. + -- + -- NB. it's important to do this in the *first* iteration of + -- service_loop, rather than after calling select(), since a client + -- may have set prodding to True without sending a wakeup byte down + -- the pipe, because the pipe wasn't set up. + atomicModifyIORef prodding (\_ -> (False, ())) + -- pick up new IO requests new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) let reqs = new_reqs ++ old_reqs @@ -1050,8 +1105,6 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do unless exit $ do - atomicModifyIORef prodding (\_ -> (False, ())) - reqs' <- if wakeup_all then do wakeupAll reqs; return [] else completeRequests reqs readfds writefds [] @@ -1062,11 +1115,6 @@ io_MANAGER_WAKEUP = 0xff io_MANAGER_DIE = 0xfe io_MANAGER_SYNC = 0xfd --- | the stick is for poking the IO manager with -stick :: IORef Fd -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) - {-# NOINLINE sync #-} sync :: IORef [MVar ()] sync = unsafePerformIO (newIORef []) @@ -1076,16 +1124,11 @@ syncIOManager :: IO () syncIOManager = do m <- newEmptyMVar atomicModifyIORef sync (\old -> (m:old,())) - fd <- readIORef stick - with io_MANAGER_SYNC $ \pbuf -> do - warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1 + c_ioManagerSync takeMVar m -wakeupIOManager :: IO () -wakeupIOManager = do - fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1 +foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO () +foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO () -- For the non-threaded RTS runHandlers :: Ptr Word8 -> Int -> IO () @@ -1139,17 +1182,10 @@ signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) signal_handlers = unsafePerformIO $ do arr <- newIOArray (0,maxSig) Nothing m <- newMVar arr - block $ do - stable_ref <- newStablePtr m - let ref = castStablePtrToPtr stable_ref - ref2 <- getOrSetSignalHandlerStore ref - if ref==ref2 - then return m - else do freeStablePtr stable_ref - deRefStablePtr (castPtrToStablePtr ref2) + sharedCAF m getOrSetGHCConcSignalHandlerStore -foreign import ccall unsafe "getOrSetSignalHandlerStore" - getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a) +foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore" + getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a) setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) setHandler sig handler = do