From c32b545138f37d8455fd2dbd6d70eeb5c9e8b085 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Tue, 3 Nov 2009 16:05:40 +0000 Subject: [PATCH] Fix #1185: restart the IO manager after fork() This is the libraries/base part of the patch; there is a corresponding patch to GHC itself. The main change is that we now keep track of the IO manager's ThreadId in a top-level MVar, and ensureIOManagerIsRunning checks whether a previous IO manager thread is alive before starting one. In the child of fork(), we can hence call ensureIOManagerIsRunning to restart the IO manager. --- GHC/Conc.lhs | 90 ++++++++++++++++++++++++++++------------------------------ 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 7f7d585..eac470b 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -608,6 +608,15 @@ withMVar m io = (\e -> do putMVar m a; throw e) putMVar m a return b + +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + block $ do + a <- takeMVar m + a' <- catchAny (unblock (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () \end{code} %************************************************************************ @@ -745,23 +754,6 @@ calculateTarget usecs = do -- around the scheduler loop. Furthermore, the scheduler can be simplified -- by not having to check for completed IO requests. --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - #ifndef mingw32_HOST_OS data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) @@ -776,22 +768,35 @@ data DelayReq pendingEvents :: IORef [IOReq] #endif pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here {-# NOINLINE pendingEvents #-} {-# NOINLINE pendingDelays #-} (pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread reqs <- newIORef [] dels <- newIORef [] return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ newMVar Nothing ensureIOManagerIsRunning :: IO () ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () + | threaded = startIOManagerThread | otherwise = return () +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] insertDelay d [] = [d] insertDelay d1 ds@(d2 : rest) @@ -820,11 +825,10 @@ prodServiceThread = do -- ---------------------------------------------------------------------------- -- Windows IO manager thread -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do wakeup <- c_getIOManagerEvent - _ <- forkIO $ service_loop wakeup [] - return () + service_loop wakeup [] service_loop :: HANDLE -- read end of pipe -> [DelayReq] -- current delay requests @@ -894,15 +898,8 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) --- XXX Is this actually needed? -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - wakeupIOManager :: IO () -wakeupIOManager = do - _hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending @@ -951,8 +948,8 @@ foreign import stdcall "WaitForSingleObject" -- ---------------------------------------------------------------------------- -- Unix IO manager thread, using select() -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do allocaArray 2 $ \fds -> do throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 @@ -963,11 +960,10 @@ startIOManagerThread = do setCloseOnExec wr_end writeIORef stick (fromIntegral wr_end) c_setIOManagerPipe wr_end - _ <- forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () service_loop @@ -1065,7 +1061,7 @@ io_MANAGER_SYNC = 0xfd -- | the stick is for poking the IO manager with stick :: IORef Fd {-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) +stick = unsafePerformIO $ newIORef (-1) {-# NOINLINE sync #-} sync :: IORef [MVar ()] @@ -1077,15 +1073,17 @@ syncIOManager = do m <- newEmptyMVar atomicModifyIORef sync (\old -> (m:old,())) fd <- readIORef stick - with io_MANAGER_SYNC $ \pbuf -> do - warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1 + when (fd /= (-1)) $ + with io_MANAGER_SYNC $ \pbuf -> do + warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1 takeMVar m wakeupIOManager :: IO () wakeupIOManager = do fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1 + when (fd /= (-1)) $ + with io_MANAGER_WAKEUP $ \pbuf -> do + warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1 -- For the non-threaded RTS runHandlers :: Ptr Word8 -> Int -> IO () -- 1.7.10.4