X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=57500f4a3f0899ef81be2731045692d58103a706;hb=c3ccd3464c798d57f6db8fd95ede905246acbee6;hp=b53bf54b91a843c544ebfef5415265e83f641131;hpb=73c4a36a4f5140f4444feacfdafb466c6f940b26;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index b53bf54..57500f4 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -50,17 +50,6 @@ module GHC.Conc , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () - -- * MVars - , MVar(..) - , newMVar -- :: a -> IO (MVar a) - , newEmptyMVar -- :: IO (MVar a) - , takeMVar -- :: MVar a -> IO a - , putMVar -- :: MVar a -> a -> IO () - , tryTakeMVar -- :: MVar a -> IO (Maybe a) - , tryPutMVar -- :: MVar a -> a -> IO Bool - , isEmptyMVar -- :: MVar a -> IO Bool - , addMVarFinalizer -- :: MVar a -> IO () -> IO () - -- * TVars , STM(..) , atomically -- :: STM a -> IO a @@ -78,6 +67,7 @@ module GHC.Conc , unsafeIOToSTM -- :: IO a -> STM a -- * Miscellaneous + , withMVar #ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) @@ -114,35 +104,43 @@ import System.Posix.Internals import Foreign import Foreign.C +#ifdef mingw32_HOST_OS +import Data.Typeable +#endif + #ifndef mingw32_HOST_OS import Data.Dynamic -import Control.Monad #endif +import Control.Monad import Data.Maybe import GHC.Base -import {-# SOURCE #-} GHC.Handle -import GHC.IOBase +#ifndef mingw32_HOST_OS +import GHC.Debug +#endif +import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) +import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) +import GHC.IO +import GHC.IO.Exception +import GHC.Exception +import GHC.IORef +import GHC.MVar import GHC.Num ( Num(..) ) import GHC.Real ( fromIntegral ) #ifndef mingw32_HOST_OS +import GHC.IOArray import GHC.Arr ( inRange ) #endif #ifdef mingw32_HOST_OS import GHC.Real ( div ) -import GHC.Ptr ( plusPtr, FunPtr(..) ) +import GHC.Ptr #endif #ifdef mingw32_HOST_OS import GHC.Read ( Read ) import GHC.Enum ( Enum ) #endif -import GHC.Exception ( SomeException(..), throw ) import GHC.Pack ( packCString# ) -import GHC.Ptr ( Ptr(..) ) -import GHC.STRef import GHC.Show ( Show(..), showString ) -import Data.Typeable -import GHC.Err infixr 0 `par`, `pseq` \end{code} @@ -267,9 +265,9 @@ real_handler :: SomeException -> IO () real_handler se@(SomeException ex) = -- ignore thread GC and killThread exceptions: case cast ex of - Just BlockedOnDeadMVar -> return () + Just BlockedIndefinitelyOnMVar -> return () _ -> case cast ex of - Just BlockedIndefinitely -> return () + Just BlockedIndefinitelyOnSTM -> return () _ -> case cast ex of Just ThreadKilled -> return () _ -> case cast ex of @@ -544,7 +542,7 @@ checkInv (STM m) = STM (\s -> (check# m) s) -- of those points then the transaction violating it is aborted -- and the exception raised by the invariant is propagated. alwaysSucceeds :: STM a -> STM () -alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) +alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) checkInv i -- | always is a variant of alwaysSucceeds in which the invariant is @@ -599,111 +597,28 @@ writeTVar (TVar tvar#) val = STM $ \s1# -> \end{code} -%************************************************************************ -%* * -\subsection[mvars]{M-Structures} -%* * -%************************************************************************ - -M-Vars are rendezvous points for concurrent threads. They begin -empty, and any attempt to read an empty M-Var blocks. When an M-Var -is written, a single blocked thread may be freed. Reading an M-Var -toggles its state from full back to empty. Therefore, any value -written to an M-Var may only be read once. Multiple reads and writes -are allowed, but there must be at least one read between any two -writes. +MVar utilities \begin{code} ---Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a) - --- |Create an 'MVar' which is initially empty. -newEmptyMVar :: IO (MVar a) -newEmptyMVar = IO $ \ s# -> - case newMVar# s# of - (# s2#, svar# #) -> (# s2#, MVar svar# #) - --- |Create an 'MVar' which contains the supplied value. -newMVar :: a -> IO (MVar a) -newMVar value = - newEmptyMVar >>= \ mvar -> - putMVar mvar value >> - return mvar - --- |Return the contents of the 'MVar'. If the 'MVar' is currently --- empty, 'takeMVar' will wait until it is full. After a 'takeMVar', --- the 'MVar' is left empty. --- --- There are two further important properties of 'takeMVar': --- --- * 'takeMVar' is single-wakeup. That is, if there are multiple --- threads blocked in 'takeMVar', and the 'MVar' becomes full, --- only one thread will be woken up. The runtime guarantees that --- the woken thread completes its 'takeMVar' operation. --- --- * When multiple threads are blocked on an 'MVar', they are --- woken up in FIFO order. This is useful for providing --- fairness properties of abstractions built using 'MVar's. --- -takeMVar :: MVar a -> IO a -takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s# +withMVar :: MVar a -> (a -> IO b) -> IO b +withMVar m io = + block $ do + a <- takeMVar m + b <- catchAny (unblock (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a + return b --- |Put a value into an 'MVar'. If the 'MVar' is currently full, --- 'putMVar' will wait until it becomes empty. --- --- There are two further important properties of 'putMVar': --- --- * 'putMVar' is single-wakeup. That is, if there are multiple --- threads blocked in 'putMVar', and the 'MVar' becomes empty, --- only one thread will be woken up. The runtime guarantees that --- the woken thread completes its 'putMVar' operation. --- --- * When multiple threads are blocked on an 'MVar', they are --- woken up in FIFO order. This is useful for providing --- fairness properties of abstractions built using 'MVar's. --- -putMVar :: MVar a -> a -> IO () -putMVar (MVar mvar#) x = IO $ \ s# -> - case putMVar# mvar# x s# of - s2# -> (# s2#, () #) - --- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function --- returns immediately, with 'Nothing' if the 'MVar' was empty, or --- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar', --- the 'MVar' is left empty. -tryTakeMVar :: MVar a -> IO (Maybe a) -tryTakeMVar (MVar m) = IO $ \ s -> - case tryTakeMVar# m s of - (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty - (# s', _, a #) -> (# s', Just a #) -- MVar is full - --- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function --- attempts to put the value @a@ into the 'MVar', returning 'True' if --- it was successful, or 'False' otherwise. -tryPutMVar :: MVar a -> a -> IO Bool -tryPutMVar (MVar mvar#) x = IO $ \ s# -> - case tryPutMVar# mvar# x s# of - (# s, 0# #) -> (# s, False #) - (# s, _ #) -> (# s, True #) - --- |Check whether a given 'MVar' is empty. --- --- Notice that the boolean value returned is just a snapshot of --- the state of the MVar. By the time you get to react on its result, --- the MVar may have been filled (or emptied) - so be extremely --- careful when using this operation. Use 'tryTakeMVar' instead if possible. -isEmptyMVar :: MVar a -> IO Bool -isEmptyMVar (MVar mv#) = IO $ \ s# -> - case isEmptyMVar# mv# s# of - (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #) - --- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and --- "System.Mem.Weak" for more about finalizers. -addMVarFinalizer :: MVar a -> IO () -> IO () -addMVarFinalizer (MVar m) finalizer = - IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) } +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + block $ do + a <- takeMVar m + a' <- catchAny (unblock (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () \end{code} - %************************************************************************ %* * \subsection{Thread waiting} @@ -839,23 +754,6 @@ calculateTarget usecs = do -- around the scheduler loop. Furthermore, the scheduler can be simplified -- by not having to check for completed IO requests. --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - #ifndef mingw32_HOST_OS data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) @@ -867,25 +765,52 @@ data DelayReq | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) #ifndef mingw32_HOST_OS +{-# NOINLINE pendingEvents #-} pendingEvents :: IORef [IOReq] +pendingEvents = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingEventsStore + +foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore" + getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a) #endif -pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here -{-# NOINLINE pendingEvents #-} + {-# NOINLINE pendingDelays #-} -(pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread - reqs <- newIORef [] - dels <- newIORef [] - return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore" + getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore" + getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a) ensureIOManagerIsRunning :: IO () ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () + | threaded = startIOManagerThread | otherwise = return () +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] insertDelay d [] = [d] insertDelay d1 ds@(d2 : rest) @@ -898,31 +823,51 @@ delayTime (DelaySTM t _) = t type USecs = Word64 --- XXX: move into GHC.IOBase from Data.IORef? -atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b -atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s - foreign import ccall unsafe "getUSecOfDay" getUSecOfDay :: IO USecs -prodding :: IORef Bool {-# NOINLINE prodding #-} -prodding = unsafePerformIO (newIORef False) +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcProddingStore + +foreign import ccall unsafe "getOrSetGHCConcProddingStore" + getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a) prodServiceThread :: IO () prodServiceThread = do - was_set <- atomicModifyIORef prodding (\a -> (True,a)) + was_set <- readIORef prodding + writeIORef prodding True + -- no need for atomicModifyIORef, extra prods are harmless. if (not (was_set)) then wakeupIOManager else return () +-- Machinery needed to ensure that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + block $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref==ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) + #ifdef mingw32_HOST_OS -- ---------------------------------------------------------------------------- -- Windows IO manager thread -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do wakeup <- c_getIOManagerEvent - forkIO $ service_loop wakeup [] - return () + service_loop wakeup [] service_loop :: HANDLE -- read end of pipe -> [DelayReq] -- current delay requests @@ -947,9 +892,7 @@ service_loop wakeup old_delays = do _ | r2 == io_MANAGER_DIE -> return True 0 -> return False -- spurious wakeup _ -> do start_console_handler (r2 `shiftR` 1); return False - if exit - then return () - else service_cont wakeup delays' + unless exit $ service_cont wakeup delays' _other -> service_cont wakeup delays' -- probably timeout @@ -977,7 +920,7 @@ start_console_handler :: Word32 -> IO () start_console_handler r = case toWin32ConsoleEvent r of Just x -> withMVar win32ConsoleHandler $ \handler -> do - forkIO (handler x) + _ <- forkIO (handler x) return () Nothing -> return () @@ -994,15 +937,8 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) --- XXX Is this actually needed? -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - wakeupIOManager :: IO () -wakeupIOManager = do - _hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending @@ -1051,23 +987,21 @@ foreign import stdcall "WaitForSingleObject" -- ---------------------------------------------------------------------------- -- Unix IO manager thread, using select() -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do allocaArray 2 $ \fds -> do - throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds) + throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 wr_end <- peekElemOff fds 1 - setNonBlockingFD wr_end -- writes happen in a signal handler, we - -- don't want them to block. + setNonBlockingFD wr_end True -- writes happen in a signal handler, we + -- don't want them to block. setCloseOnExec rd_end setCloseOnExec wr_end - writeIORef stick (fromIntegral wr_end) c_setIOManagerPipe wr_end - forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () service_loop @@ -1128,7 +1062,8 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do if b == 0 then return False else alloca $ \p -> do - c_read (fromIntegral wakeup) p 1 + warnErrnoIfMinus1_ "service_loop" $ + c_read (fromIntegral wakeup) p 1 s <- peek p case s of _ | s == io_MANAGER_WAKEUP -> return False @@ -1147,25 +1082,20 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do runHandlers' fp (fromIntegral s) return False - if exit then return () else do + unless exit $ do - atomicModifyIORef prodding (\_ -> (False,False)) + atomicModifyIORef prodding (\_ -> (False, ())) reqs' <- if wakeup_all then do wakeupAll reqs; return [] else completeRequests reqs readfds writefds [] service_loop wakeup readfds writefds ptimeval reqs' delays' -io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar +io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8 io_MANAGER_WAKEUP = 0xff io_MANAGER_DIE = 0xfe io_MANAGER_SYNC = 0xfd --- | the stick is for poking the IO manager with -stick :: IORef Fd -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) - {-# NOINLINE sync #-} sync :: IORef [MVar ()] sync = unsafePerformIO (newIORef []) @@ -1175,16 +1105,11 @@ syncIOManager :: IO () syncIOManager = do m <- newEmptyMVar atomicModifyIORef sync (\old -> (m:old,())) - fd <- readIORef stick - with io_MANAGER_SYNC $ \pbuf -> do - c_write (fromIntegral fd) pbuf 1; return () + c_ioManagerSync takeMVar m -wakeupIOManager :: IO () -wakeupIOManager = do - fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - c_write (fromIntegral fd) pbuf 1; return () +foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO () +foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO () -- For the non-threaded RTS runHandlers :: Ptr Word8 -> Int -> IO () @@ -1204,7 +1129,19 @@ runHandlers' p_info sig = do else do handler <- unsafeReadIOArray arr int case handler of Nothing -> return () - Just (f,_) -> do forkIO (f p_info); return () + Just (f,_) -> do _ <- forkIO (f p_info) + return () + +warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO () +warnErrnoIfMinus1_ what io + = do r <- io + when (r == -1) $ do + errno <- getErrno + str <- strerror errno >>= peekCString + when (r == -1) $ + debugErrLn ("Warning: " ++ what ++ " failed: " ++ str) + +foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar) foreign import ccall "setIOManagerPipe" c_setIOManagerPipe :: CInt -> IO () @@ -1226,17 +1163,10 @@ signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) signal_handlers = unsafePerformIO $ do arr <- newIOArray (0,maxSig) Nothing m <- newMVar arr - block $ do - stable_ref <- newStablePtr m - let ref = castStablePtrToPtr stable_ref - ref2 <- getOrSetSignalHandlerStore ref - if ref==ref2 - then return m - else do freeStablePtr stable_ref - deRefStablePtr (castPtrToStablePtr ref2) + sharedCAF m getOrSetGHCConcSignalHandlerStore -foreign import ccall unsafe "getOrSetSignalHandlerStore" - getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a) +foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore" + getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a) setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) setHandler sig handler = do @@ -1338,7 +1268,7 @@ foreign import ccall unsafe "setTimevalTicks" data CFdSet -foreign import ccall safe "select" +foreign import ccall safe "__hscore_select" c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal -> IO CInt @@ -1368,14 +1298,13 @@ foreign import ccall unsafe "sizeof_fd_set" #endif -reportStackOverflow :: IO a -reportStackOverflow = do callStackOverflowHook; return undefined +reportStackOverflow :: IO () +reportStackOverflow = callStackOverflowHook -reportError :: SomeException -> IO a +reportError :: SomeException -> IO () reportError ex = do handler <- getUncaughtExceptionHandler handler ex - return undefined -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove -- the unsafe below. @@ -1409,13 +1338,4 @@ setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler getUncaughtExceptionHandler :: IO (SomeException -> IO ()) getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler - -withMVar :: MVar a -> (a -> IO b) -> IO b -withMVar m io = - block $ do - a <- takeMVar m - b <- catchAny (unblock (io a)) - (\e -> do putMVar m a; throw e) - putMVar m a - return b \end{code}