Second attempt to fix #1185 (forkProcess and -threaded)
authorSimon Marlow <marlowsd@gmail.com>
Wed, 11 Nov 2009 15:19:15 +0000 (15:19 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Wed, 11 Nov 2009 15:19:15 +0000 (15:19 +0000)
Patch 2/2: first patch is to ghc

This time without dynamic linker hacks, instead I've expanded the
existing rts/Globals.c to cache more CAFs, specifically those in
GHC.Conc.  We were already using this trick for signal handlers, I
should have realised before.

It's still quite unsavoury, but we can do away with rts/Globals.c in
the future when we switch to a dynamically-linked GHCi.

GHC/Conc.lhs

index 7f7d585..57500f4 100644 (file)
@@ -608,6 +608,15 @@ withMVar m io =
             (\e -> do putMVar m a; throw e)
     putMVar m a
     return b
+
+modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
+modifyMVar_ m io =
+  block $ do
+    a <- takeMVar m
+    a' <- catchAny (unblock (io a))
+            (\e -> do putMVar m a; throw e)
+    putMVar m a'
+    return ()
 \end{code}
 
 %************************************************************************
@@ -745,23 +754,6 @@ calculateTarget usecs = do
 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
 -- by not having to check for completed IO requests.
 
--- Issues, possible problems:
---
---      - we might want bound threads to just do the blocking
---        operation rather than communicating with the IO manager
---        thread.  This would prevent simgle-threaded programs which do
---        IO from requiring multiple OS threads.  However, it would also
---        prevent bound threads waiting on IO from being killed or sent
---        exceptions.
---
---      - Apprently exec() doesn't work on Linux in a multithreaded program.
---        I couldn't repeat this.
---
---      - How do we handle signal delivery in the multithreaded RTS?
---
---      - forkProcess will kill the IO manager thread.  Let's just
---        hope we don't need to do any blocking IO between fork & exec.
-
 #ifndef mingw32_HOST_OS
 data IOReq
   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
@@ -773,25 +765,52 @@ data DelayReq
   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
 
 #ifndef mingw32_HOST_OS
+{-# NOINLINE pendingEvents #-}
 pendingEvents :: IORef [IOReq]
+pendingEvents = unsafePerformIO $ do
+   m <- newIORef []
+   sharedCAF m getOrSetGHCConcPendingEventsStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore"
+    getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a)
 #endif
-pendingDelays :: IORef [DelayReq]
-        -- could use a strict list or array here
-{-# NOINLINE pendingEvents #-}
+
 {-# NOINLINE pendingDelays #-}
-(pendingEvents,pendingDelays) = unsafePerformIO $ do
-  startIOManagerThread
-  reqs <- newIORef []
-  dels <- newIORef []
-  return (reqs, dels)
-        -- the first time we schedule an IO request, the service thread
-        -- will be created (cool, huh?)
+pendingDelays :: IORef [DelayReq]
+pendingDelays = unsafePerformIO $ do
+   m <- newIORef []
+   sharedCAF m getOrSetGHCConcPendingDelaysStore
+
+foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore"
+    getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a)
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ do
+   m <- newMVar Nothing
+   sharedCAF m getOrSetGHCConcIOManagerThreadStore
+
+foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore"
+    getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a)
 
 ensureIOManagerIsRunning :: IO ()
 ensureIOManagerIsRunning 
-  | threaded  = seq pendingEvents $ return ()
+  | threaded  = startIOManagerThread
   | otherwise = return ()
 
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+  modifyMVar_ ioManagerThread $ \old -> do
+    let create = do t <- forkIO ioManager; return (Just t)
+    case old of
+      Nothing -> create
+      Just t  -> do
+        s <- threadStatus t
+        case s of
+          ThreadFinished -> create
+          ThreadDied     -> create
+          _other         -> return (Just t)
+
 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
 insertDelay d [] = [d]
 insertDelay d1 ds@(d2 : rest)
@@ -807,24 +826,48 @@ type USecs = Word64
 foreign import ccall unsafe "getUSecOfDay" 
   getUSecOfDay :: IO USecs
 
-prodding :: IORef Bool
 {-# NOINLINE prodding #-}
-prodding = unsafePerformIO (newIORef False)
+prodding :: IORef Bool
+prodding = unsafePerformIO $ do
+   r <- newIORef False
+   sharedCAF r getOrSetGHCConcProddingStore
+
+foreign import ccall unsafe "getOrSetGHCConcProddingStore"
+    getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a)
 
 prodServiceThread :: IO ()
 prodServiceThread = do
-  was_set <- atomicModifyIORef prodding (\a -> (True,a))
+  was_set <- readIORef prodding
+  writeIORef prodding True
+     -- no need for atomicModifyIORef, extra prods are harmless.
   if (not (was_set)) then wakeupIOManager else return ()
 
+-- Machinery needed to ensure that we only have one copy of certain
+-- CAFs in this module even when the base package is present twice, as
+-- it is when base is dynamically loaded into GHCi.  The RTS keeps
+-- track of the single true value of the CAF, so even when the CAFs in
+-- the dynamically-loaded base package are reverted, nothing bad
+-- happens.
+--
+sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
+sharedCAF a get_or_set =
+   block $ do
+     stable_ref <- newStablePtr a
+     let ref = castPtr (castStablePtrToPtr stable_ref)
+     ref2 <- get_or_set ref
+     if ref==ref2
+        then return a
+        else do freeStablePtr stable_ref
+                deRefStablePtr (castPtrToStablePtr (castPtr ref2))
+
 #ifdef mingw32_HOST_OS
 -- ----------------------------------------------------------------------------
 -- Windows IO manager thread
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
   wakeup <- c_getIOManagerEvent
-  _ <- forkIO $ service_loop wakeup []
-  return ()
+  service_loop wakeup []
 
 service_loop :: HANDLE          -- read end of pipe
              -> [DelayReq]      -- current delay requests
@@ -894,15 +937,8 @@ toWin32ConsoleEvent ev =
 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
 
--- XXX Is this actually needed?
-stick :: IORef HANDLE
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef nullPtr)
-
 wakeupIOManager :: IO ()
-wakeupIOManager = do 
-  _hdl <- readIORef stick
-  c_sendIOManagerEvent io_MANAGER_WAKEUP
+wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
 
 -- Walk the queue of pending delays, waking up any that have passed
 -- and return the smallest delay to wait for.  The queue of pending
@@ -951,8 +987,8 @@ foreign import stdcall "WaitForSingleObject"
 -- ----------------------------------------------------------------------------
 -- Unix IO manager thread, using select()
 
-startIOManagerThread :: IO ()
-startIOManagerThread = do
+ioManager :: IO ()
+ioManager = do
         allocaArray 2 $ \fds -> do
         throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
         rd_end <- peekElemOff fds 0
@@ -961,13 +997,11 @@ startIOManagerThread = do
                                      -- don't want them to block.
         setCloseOnExec rd_end
         setCloseOnExec wr_end
-        writeIORef stick (fromIntegral wr_end)
         c_setIOManagerPipe wr_end
-        _ <- forkIO $ do
-            allocaBytes sizeofFdSet   $ \readfds -> do
-            allocaBytes sizeofFdSet   $ \writefds -> do 
-            allocaBytes sizeofTimeVal $ \timeval -> do
-            service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+        allocaBytes sizeofFdSet   $ \readfds -> do
+        allocaBytes sizeofFdSet   $ \writefds -> do 
+        allocaBytes sizeofTimeVal $ \timeval -> do
+        service_loop (fromIntegral rd_end) readfds writefds timeval [] []
         return ()
 
 service_loop
@@ -1062,11 +1096,6 @@ io_MANAGER_WAKEUP = 0xff
 io_MANAGER_DIE    = 0xfe
 io_MANAGER_SYNC   = 0xfd
 
--- | the stick is for poking the IO manager with
-stick :: IORef Fd
-{-# NOINLINE stick #-}
-stick = unsafePerformIO (newIORef 0)
-
 {-# NOINLINE sync #-}
 sync :: IORef [MVar ()]
 sync = unsafePerformIO (newIORef [])
@@ -1076,16 +1105,11 @@ syncIOManager :: IO ()
 syncIOManager = do
   m <- newEmptyMVar
   atomicModifyIORef sync (\old -> (m:old,()))
-  fd <- readIORef stick
-  with io_MANAGER_SYNC $ \pbuf -> do 
-    warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
+  c_ioManagerSync
   takeMVar m
 
-wakeupIOManager :: IO ()
-wakeupIOManager = do
-  fd <- readIORef stick
-  with io_MANAGER_WAKEUP $ \pbuf -> do 
-    warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
+foreign import ccall unsafe "ioManagerSync"   c_ioManagerSync :: IO ()
+foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO ()
 
 -- For the non-threaded RTS
 runHandlers :: Ptr Word8 -> Int -> IO ()
@@ -1139,17 +1163,10 @@ signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
 signal_handlers = unsafePerformIO $ do
    arr <- newIOArray (0,maxSig) Nothing
    m <- newMVar arr
-   block $ do
-     stable_ref <- newStablePtr m
-     let ref = castStablePtrToPtr stable_ref
-     ref2 <- getOrSetSignalHandlerStore ref
-     if ref==ref2
-        then return m
-        else do freeStablePtr stable_ref
-                deRefStablePtr (castPtrToStablePtr ref2)
+   sharedCAF m getOrSetGHCConcSignalHandlerStore
 
-foreign import ccall unsafe "getOrSetSignalHandlerStore"
-    getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
+foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore"
+    getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a)
 
 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
 setHandler sig handler = do