(\e -> do putMVar m a; throw e)
putMVar m a
return b
-
-modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
-modifyMVar_ m io =
- block $ do
- a <- takeMVar m
- a' <- catchAny (unblock (io a))
- (\e -> do putMVar m a; throw e)
- putMVar m a'
- return ()
\end{code}
%************************************************************************
-- around the scheduler loop. Furthermore, the scheduler can be simplified
-- by not having to check for completed IO requests.
+-- Issues, possible problems:
+--
+-- - we might want bound threads to just do the blocking
+-- operation rather than communicating with the IO manager
+-- thread. This would prevent simgle-threaded programs which do
+-- IO from requiring multiple OS threads. However, it would also
+-- prevent bound threads waiting on IO from being killed or sent
+-- exceptions.
+--
+-- - Apprently exec() doesn't work on Linux in a multithreaded program.
+-- I couldn't repeat this.
+--
+-- - How do we handle signal delivery in the multithreaded RTS?
+--
+-- - forkProcess will kill the IO manager thread. Let's just
+-- hope we don't need to do any blocking IO between fork & exec.
+
#ifndef mingw32_HOST_OS
data IOReq
= Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
| DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
#ifndef mingw32_HOST_OS
-{-# NOINLINE pendingEvents #-}
pendingEvents :: IORef [IOReq]
-pendingEvents = unsafePerformIO $ newIORef []
#endif
-{-# NOINLINE pendingDelays #-}
pendingDelays :: IORef [DelayReq]
-pendingDelays = unsafePerformIO $ newIORef []
-
-{-# NOINLINE ioManagerThread #-}
-ioManagerThread :: MVar (Maybe ThreadId)
-ioManagerThread = unsafePerformIO $ newMVar Nothing
+ -- could use a strict list or array here
+{-# NOINLINE pendingEvents #-}
+{-# NOINLINE pendingDelays #-}
+(pendingEvents,pendingDelays) = unsafePerformIO $ do
+ startIOManagerThread
+ reqs <- newIORef []
+ dels <- newIORef []
+ return (reqs, dels)
+ -- the first time we schedule an IO request, the service thread
+ -- will be created (cool, huh?)
ensureIOManagerIsRunning :: IO ()
ensureIOManagerIsRunning
- | threaded = startIOManagerThread
+ | threaded = seq pendingEvents $ return ()
| otherwise = return ()
-startIOManagerThread :: IO ()
-startIOManagerThread = do
- modifyMVar_ ioManagerThread $ \old -> do
- let create = do t <- forkIO ioManager; return (Just t)
- case old of
- Nothing -> create
- Just t -> do
- s <- threadStatus t
- case s of
- ThreadFinished -> create
- ThreadDied -> create
- _other -> return (Just t)
-
insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
insertDelay d [] = [d]
insertDelay d1 ds@(d2 : rest)
-- ----------------------------------------------------------------------------
-- Windows IO manager thread
-ioManager :: IO ()
-ioManager = do
+startIOManagerThread :: IO ()
+startIOManagerThread = do
wakeup <- c_getIOManagerEvent
- service_loop wakeup []
+ _ <- forkIO $ service_loop wakeup []
+ return ()
service_loop :: HANDLE -- read end of pipe
-> [DelayReq] -- current delay requests
win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
+-- XXX Is this actually needed?
+stick :: IORef HANDLE
+{-# NOINLINE stick #-}
+stick = unsafePerformIO (newIORef nullPtr)
+
wakeupIOManager :: IO ()
-wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
+wakeupIOManager = do
+ _hdl <- readIORef stick
+ c_sendIOManagerEvent io_MANAGER_WAKEUP
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- ----------------------------------------------------------------------------
-- Unix IO manager thread, using select()
-ioManager :: IO ()
-ioManager = do
+startIOManagerThread :: IO ()
+startIOManagerThread = do
allocaArray 2 $ \fds -> do
throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds)
rd_end <- peekElemOff fds 0
setCloseOnExec wr_end
writeIORef stick (fromIntegral wr_end)
c_setIOManagerPipe wr_end
- allocaBytes sizeofFdSet $ \readfds -> do
- allocaBytes sizeofFdSet $ \writefds -> do
- allocaBytes sizeofTimeVal $ \timeval -> do
- service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+ _ <- forkIO $ do
+ allocaBytes sizeofFdSet $ \readfds -> do
+ allocaBytes sizeofFdSet $ \writefds -> do
+ allocaBytes sizeofTimeVal $ \timeval -> do
+ service_loop (fromIntegral rd_end) readfds writefds timeval [] []
return ()
service_loop
-- | the stick is for poking the IO manager with
stick :: IORef Fd
{-# NOINLINE stick #-}
-stick = unsafePerformIO $ newIORef (-1)
+stick = unsafePerformIO (newIORef 0)
{-# NOINLINE sync #-}
sync :: IORef [MVar ()]
m <- newEmptyMVar
atomicModifyIORef sync (\old -> (m:old,()))
fd <- readIORef stick
- when (fd /= (-1)) $
- with io_MANAGER_SYNC $ \pbuf -> do
- warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
+ with io_MANAGER_SYNC $ \pbuf -> do
+ warnErrnoIfMinus1_ "syncIOManager" $ c_write (fromIntegral fd) pbuf 1
takeMVar m
wakeupIOManager :: IO ()
wakeupIOManager = do
fd <- readIORef stick
- when (fd /= (-1)) $
- with io_MANAGER_WAKEUP $ \pbuf -> do
- warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
+ with io_MANAGER_WAKEUP $ \pbuf -> do
+ warnErrnoIfMinus1_ "wakeupIOManager" $ c_write (fromIntegral fd) pbuf 1
-- For the non-threaded RTS
runHandlers :: Ptr Word8 -> Int -> IO ()