Fix #1185: restart the IO manager after fork()
authorSimon Marlow <marlowsd@gmail.com>
Tue, 3 Nov 2009 16:05:40 +0000 (16:05 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Tue, 3 Nov 2009 16:05:40 +0000 (16:05 +0000)
This is the libraries/base part of the patch; there is a corresponding
patch to GHC itself.

The main change is that we now keep track of the IO manager's ThreadId
in a top-level MVar, and ensureIOManagerIsRunning checks whether a
previous IO manager thread is alive before starting one.  In the child
of fork(), we can hence call ensureIOManagerIsRunning to restart the
IO manager.

GHC/Conc.lhs

index 7f7d585..eac470b 100644 (file)
@@ -608,6 +608,15 @@ withMVar m io =
             (\e -> do putMVar m a; throw e)
     putMVar m a
     return b
+
+modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
+modifyMVar_ m io =
+  block $ do
+    a <- takeMVar m
+    a' <- catchAny (unblock (io a))
+            (\e -> do putMVar m a; throw e)
+    putMVar m a'
+    return ()
 \end{code}
 
 %************************************************************************
@@ -745,23 +754,6 @@ calculateTarget usecs = do
 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
 -- by not having to check for completed IO requests.
 
--- Issues, possible problems:
---
---      - we might want bound threads to just do the blocking
---        operation rather than communicating with the IO manager
---        thread.  This would prevent simgle-threaded programs which do
---        IO from requiring multiple OS threads.  However, it would also
---        prevent bound threads waiting on IO from being killed or sent
---        exceptions.
---
---      - Apprently exec() doesn't work on Linux in a multithreaded program.
---        I couldn't repeat this.
---
---      - How do we handle signal delivery in the multithreaded RTS?
---
---      - forkProcess will kill the IO manager thread.  Let's just
---        hope we don't need to do any blocking IO between fork & exec.
-
 #ifndef mingw32_HOST_OS
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
@@ -776,22 +768,35 @@ data DelayReq
 pendingEvents :: IORef [IOReq]
 #endif
 pendingDelays :: IORef [DelayReq]
-        -- could use a strict list or array here
 {-# NOINLINE pendingEvents #-}
 {-# NOINLINE pendingDelays #-}
 (pendingEvents,pendingDelays) = unsafePerformIO $ do
-  startIOManagerThread
   reqs <- newIORef []
   dels <- newIORef []
   return (reqs, dels)
-        -- the first time we schedule an IO request, the service thread
-        -- will be created (cool, huh?)
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ newMVar Nothing
 
 ensureIOManagerIsRunning :: IO ()
 ensureIOManagerIsRunning 
-  | threaded  = seq pendingEvents $ return ()
+  | threaded  = startIOManagerThread
   | otherwise = return ()
 
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+  modifyMVar_ ioManagerThread $ \old -> do
+    let create = do t <- forkIO ioManager; return (Just t)
+    case old of
+      Nothing -> create
+      Just t  -> do
+        s <- threadStatus t
+        case s of
+          ThreadFinished -> create
+          ThreadDied     -> create
+          _other         -> return (Just t)
+
 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
 insertDelay d [] = [d]
 insertDelay d1 ds@(d2 : rest)
@@ -820,11 +825,10 @@ prodServiceThread = do
 -- ----------------------------------------------------------------------------
 -- Windows IO manager thread
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
   wakeup <- c_getIOManagerEvent
-  _ <- forkIO $ service_loop wakeup []
-  return ()
+  service_loop wakeup []
 
 service_loop :: HANDLE          -- read end of pipe
              -> [DelayReq]      -- current delay requests
@@ -894,15 +898,8 @@ toWin32ConsoleEvent ev =
 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
 
--- XXX Is this actually needed?
-stick :: IORef HANDLE
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef nullPtr)
-
 wakeupIOManager :: IO ()
-wakeupIOManager = do 
-  _hdl <- readIORef stick
-  c_sendIOManagerEvent io_MANAGER_WAKEUP
+wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
 
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
@@ -951,8 +948,8 @@ foreign import stdcall "WaitForSingleObject"
 -- ----------------------------------------------------------------------------
 -- Unix IO manager thread, using select()
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
         allocaArray 2 $ \fds -> do
         throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
         rd_end <- peekElemOff fds 0
@@ -963,11 +960,10 @@ startIOManagerThread = do
         setCloseOnExec wr_end
         writeIORef stick (fromIntegral wr_end)
         c_setIOManagerPipe wr_end
-        _ <- forkIO $ do
-            allocaBytes sizeofFdSet   $ \readfds -> do
-            allocaBytes sizeofFdSet   $ \writefds -> do 
-            allocaBytes sizeofTimeVal $ \timeval -> do
-            service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+        allocaBytes sizeofFdSet   $ \readfds -> do
+        allocaBytes sizeofFdSet   $ \writefds -> do 
+        allocaBytes sizeofTimeVal $ \timeval -> do
+        service_loop (fromIntegral rd_end) readfds writefds timeval [] []
         return ()
 
 service_loop
@@ -1065,7 +1061,7 @@ io_MANAGER_SYNC   = 0xfd
 -- | the stick is for poking the IO manager with
 stick :: IORef Fd
 {-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef 0)
+stick = unsafePerformIO $ newIORef (-1)
 
 {-# NOINLINE sync #-}
 sync :: IORef [MVar ()]
@@ -1077,15 +1073,17 @@ syncIOManager = do
   m <- newEmptyMVar
   atomicModifyIORef sync (\old -> (m:old,()))
   fd <- readIORef stick
-  with io_MANAGER_SYNC $ \pbuf -> do 
-    warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
+  when (fd /= (-1)) $
+    with io_MANAGER_SYNC $ \pbuf -> do
+      warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
   takeMVar m
 
 wakeupIOManager :: IO ()
 wakeupIOManager = do
   fd <- readIORef stick
-  with io_MANAGER_WAKEUP $ \pbuf -> do 
-    warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
+  when (fd /= (-1)) $
+    with io_MANAGER_WAKEUP $ \pbuf -> do
+      warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
 
 -- For the non-threaded RTS
 runHandlers :: Ptr Word8 -> Int -> IO ()