X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=0d174578a65b55bb9f64c02ee77a0ad5da47b9e5;hb=0bf2fdab482da7a287ef09f18e7656abe62256d0;hp=96e00e01d81d57eeefc279cd21e650d1feed7dea;hpb=e4a135de27a999fe0ede5ac47a6cc34964157823;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 96e00e0..0d17457 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -29,14 +29,18 @@ module GHC.Conc -- * Forking and suchlike , forkIO -- :: IO a -> IO ThreadId + , forkIOUnmasked , forkOnIO -- :: Int -> IO a -> IO ThreadId + , forkOnIOUnmasked , numCapabilities -- :: Int + , numSparks -- :: IO Int , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () , throwTo -- :: ThreadId -> Exception -> IO () , par -- :: a -> b -> b , pseq -- :: a -> b -> b + , runSparks , yield -- :: IO () , labelThread -- :: ThreadId -> String -> IO () @@ -49,17 +53,6 @@ module GHC.Conc , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () - -- * MVars - , MVar(..) - , newMVar -- :: a -> IO (MVar a) - , newEmptyMVar -- :: IO (MVar a) - , takeMVar -- :: MVar a -> IO a - , putMVar -- :: MVar a -> a -> IO () - , tryTakeMVar -- :: MVar a -> IO (Maybe a) - , tryPutMVar -- :: MVar a -> a -> IO Bool - , isEmptyMVar -- :: MVar a -> IO Bool - , addMVarFinalizer -- :: MVar a -> IO () -> IO () - -- * TVars , STM(..) , atomically -- :: STM a -> IO a @@ -77,6 +70,7 @@ module GHC.Conc , unsafeIOToSTM -- :: IO a -> STM a -- * Miscellaneous + , withMVar #ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) @@ -87,10 +81,13 @@ module GHC.Conc #endif #ifndef mingw32_HOST_OS - , signalHandlerLock + , Signal, HandlerFun, setHandler, runHandlers #endif , ensureIOManagerIsRunning +#ifndef mingw32_HOST_OS + , syncIOManager +#endif #ifdef mingw32_HOST_OS , ConsoleEvent(..) @@ -110,28 +107,43 @@ import System.Posix.Internals import Foreign import Foreign.C +#ifdef mingw32_HOST_OS +import Data.Typeable +#endif + +#ifndef mingw32_HOST_OS +import Data.Dynamic +#endif +import Control.Monad import Data.Maybe import GHC.Base -import {-# SOURCE #-} GHC.Handle -import GHC.IOBase +#ifndef mingw32_HOST_OS +import GHC.Debug +#endif +import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) +import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) +import GHC.IO +import GHC.IO.Exception +import GHC.Exception +import GHC.IORef +import GHC.MVar import GHC.Num ( Num(..) ) import GHC.Real ( fromIntegral ) +#ifndef mingw32_HOST_OS +import GHC.IOArray +import GHC.Arr ( inRange ) +#endif #ifdef mingw32_HOST_OS import GHC.Real ( div ) -import GHC.Ptr ( plusPtr, FunPtr(..) ) +import GHC.Ptr #endif #ifdef mingw32_HOST_OS import GHC.Read ( Read ) import GHC.Enum ( Enum ) #endif -import GHC.Exception ( SomeException(..), throw ) import GHC.Pack ( packCString# ) -import GHC.Ptr ( Ptr(..) ) -import GHC.STRef import GHC.Show ( Show(..), showString ) -import Data.Typeable -import GHC.Err infixr 0 `par`, `pseq` \end{code} @@ -202,11 +214,11 @@ thread. The new thread will be a lightweight thread; if you want to use a foreign library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. -GHC note: the new thread inherits the /blocked/ state of the parent -(see 'Control.Exception.block'). +GHC note: the new thread inherits the /masked/ state of the parent +(see 'Control.Exception.mask'). The newly created thread has an exception handler that discards the -exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and +exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and 'ThreadKilled', and passes all other exceptions to the uncaught exception handler (see 'setUncaughtExceptionHandler'). -} @@ -216,6 +228,11 @@ forkIO action = IO $ \ s -> where action_plus = catchException action childHandler +-- | Like 'forkIO', but the child thread is created with asynchronous exceptions +-- unmasked (see 'Control.Exception.mask'). +forkIOUnmasked :: IO () -> IO ThreadId +forkIOUnmasked io = forkIO (unsafeUnmask io) + {- | Like 'forkIO', but lets you specify on which CPU the thread is created. Unlike a `forkIO` thread, a thread created by `forkOnIO` @@ -235,6 +252,11 @@ forkOnIO (I# cpu) action = IO $ \ s -> where action_plus = catchException action childHandler +-- | Like 'forkOnIO', but the child thread is created with +-- asynchronous exceptions unmasked (see 'Control.Exception.mask'). +forkOnIOUnmasked :: Int -> IO () -> IO ThreadId +forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io) + -- | the value passed to the @+RTS -N@ flag. This is the number of -- Haskell threads that can run truly simultaneously at any given -- time, and is typically set to the number of physical CPU cores on @@ -244,6 +266,10 @@ numCapabilities = unsafePerformIO $ do n <- peek n_capabilities return (fromIntegral n) +-- | Returns the number of sparks currently in the local spark pool +numSparks :: IO Int +numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #) + #if defined(mingw32_HOST_OS) && defined(__PIC__) foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt #else @@ -256,9 +282,9 @@ real_handler :: SomeException -> IO () real_handler se@(SomeException ex) = -- ignore thread GC and killThread exceptions: case cast ex of - Just BlockedOnDeadMVar -> return () + Just BlockedIndefinitelyOnMVar -> return () _ -> case cast ex of - Just BlockedIndefinitely -> return () + Just BlockedIndefinitelyOnSTM -> return () _ -> case cast ex of Just ThreadKilled -> return () _ -> case cast ex of @@ -266,16 +292,11 @@ real_handler se@(SomeException ex) = Just StackOverflow -> reportStackOverflow _ -> reportError se -{- | 'killThread' terminates the given thread (GHC only). -Any work already done by the thread isn\'t -lost: the computation is suspended until required by another thread. -The memory used by the thread will be garbage collected if it isn\'t -referenced from anywhere. The 'killThread' function is defined in -terms of 'throwTo': +{- | 'killThread' raises the 'ThreadKilled' exception in the given +thread (GHC only). > killThread tid = throwTo tid ThreadKilled -Killthread is a no-op if the target thread has already completed. -} killThread :: ThreadId -> IO () killThread tid = throwTo tid ThreadKilled @@ -290,26 +311,36 @@ when dealing with race conditions: eg. if there are two threads that can kill each other, it is guaranteed that only one of the threads will get to kill the other. +Whatever work the target thread was doing when the exception was +raised is not lost: the computation is suspended until required by +another thread. + If the target thread is currently making a foreign call, then the exception will not be raised (and hence 'throwTo' will not return) until the call has completed. This is the case regardless of whether -the call is inside a 'block' or not. +the call is inside a 'mask' or not. Important note: the behaviour of 'throwTo' differs from that described in the paper \"Asynchronous exceptions in Haskell\" (). In the paper, 'throwTo' is non-blocking; but the library implementation adopts a more synchronous design in which 'throwTo' does not return until the exception -is received by the target thread. The trade-off is discussed in Section 8 of the paper. -Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of -the paper). - -There is currently no guarantee that the exception delivered by 'throwTo' will be -delivered at the first possible opportunity. In particular, if a thread may -unblock and then re-block exceptions (using 'unblock' and 'block') without receiving -a pending 'throwTo'. This is arguably undesirable behaviour. - - -} +is received by the target thread. The trade-off is discussed in Section 9 of the paper. +Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of +the paper). Unlike other interruptible operations, however, 'throwTo' +is /always/ interruptible, even if it does not actually block. + +There is no guarantee that the exception will be delivered promptly, +although the runtime will endeavour to ensure that arbitrary +delays don't occur. In GHC, an exception can only be raised when a +thread reaches a /safe point/, where a safe point is where memory +allocation occurs. Some loops do not perform any memory allocation +inside the loop and therefore cannot be interrupted by a 'throwTo'. + +Blocked 'throwTo' is fair: if multiple threads are trying to throw an +exception to the same target thread, they will succeed in FIFO order. + + -} throwTo :: Exception e => ThreadId -> e -> IO () throwTo (ThreadId tid) ex = IO $ \ s -> case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) @@ -340,8 +371,8 @@ Other applications like the graphical Concurrent Haskell Debugger labelThread :: ThreadId -> String -> IO () labelThread (ThreadId t) str = IO $ \ s -> - let ps = packCString# str - adr = byteArrayContents# ps in + let !ps = packCString# str + !adr = byteArrayContents# ps in case (labelThread# t adr s) of s1 -> (# s1, () #) -- Nota Bene: 'pseq' used to be 'seq' @@ -363,6 +394,13 @@ pseq x y = x `seq` lazy y par :: a -> b -> b par x y = case (par# x) of { _ -> lazy y } +-- | Internal function used by the RTS to run sparks. +runSparks :: IO () +runSparks = IO loop + where loop s = case getSpark# s of + (# s', n, p #) -> + if n ==# 0# then (# s', () #) + else p `seq` loop s' data BlockReason = BlockedOnMVar @@ -456,6 +494,10 @@ thenSTM (STM m) k = STM ( \s -> returnSTM :: a -> STM a returnSTM x = STM (\s -> (# s, x #)) +instance MonadPlus STM where + mzero = retry + mplus = orElse + -- | Unsafely performs IO in the STM monad. Beware: this is a highly -- dangerous thing to do. -- @@ -526,7 +568,7 @@ checkInv (STM m) = STM (\s -> (check# m) s) -- of those points then the transaction violating it is aborted -- and the exception raised by the invariant is propagated. alwaysSucceeds :: STM a -> STM () -alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) +alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) checkInv i -- | always is a variant of alwaysSucceeds in which the invariant is @@ -581,119 +623,27 @@ writeTVar (TVar tvar#) val = STM $ \s1# -> \end{code} -%************************************************************************ -%* * -\subsection[mvars]{M-Structures} -%* * -%************************************************************************ - -M-Vars are rendezvous points for concurrent threads. They begin -empty, and any attempt to read an empty M-Var blocks. When an M-Var -is written, a single blocked thread may be freed. Reading an M-Var -toggles its state from full back to empty. Therefore, any value -written to an M-Var may only be read once. Multiple reads and writes -are allowed, but there must be at least one read between any two -writes. +MVar utilities \begin{code} ---Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a) - --- |Create an 'MVar' which is initially empty. -newEmptyMVar :: IO (MVar a) -newEmptyMVar = IO $ \ s# -> - case newMVar# s# of - (# s2#, svar# #) -> (# s2#, MVar svar# #) - --- |Create an 'MVar' which contains the supplied value. -newMVar :: a -> IO (MVar a) -newMVar value = - newEmptyMVar >>= \ mvar -> - putMVar mvar value >> - return mvar - --- |Return the contents of the 'MVar'. If the 'MVar' is currently --- empty, 'takeMVar' will wait until it is full. After a 'takeMVar', --- the 'MVar' is left empty. --- --- There are two further important properties of 'takeMVar': --- --- * 'takeMVar' is single-wakeup. That is, if there are multiple --- threads blocked in 'takeMVar', and the 'MVar' becomes full, --- only one thread will be woken up. The runtime guarantees that --- the woken thread completes its 'takeMVar' operation. --- --- * When multiple threads are blocked on an 'MVar', they are --- woken up in FIFO order. This is useful for providing --- fairness properties of abstractions built using 'MVar's. --- -takeMVar :: MVar a -> IO a -takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s# - --- |Put a value into an 'MVar'. If the 'MVar' is currently full, --- 'putMVar' will wait until it becomes empty. --- --- There are two further important properties of 'putMVar': --- --- * 'putMVar' is single-wakeup. That is, if there are multiple --- threads blocked in 'putMVar', and the 'MVar' becomes empty, --- only one thread will be woken up. The runtime guarantees that --- the woken thread completes its 'putMVar' operation. --- --- * When multiple threads are blocked on an 'MVar', they are --- woken up in FIFO order. This is useful for providing --- fairness properties of abstractions built using 'MVar's. --- -putMVar :: MVar a -> a -> IO () -putMVar (MVar mvar#) x = IO $ \ s# -> - case putMVar# mvar# x s# of - s2# -> (# s2#, () #) - --- |A non-blocking version of 'takeMVar'. The 'tryTakeMVar' function --- returns immediately, with 'Nothing' if the 'MVar' was empty, or --- @'Just' a@ if the 'MVar' was full with contents @a@. After 'tryTakeMVar', --- the 'MVar' is left empty. -tryTakeMVar :: MVar a -> IO (Maybe a) -tryTakeMVar (MVar m) = IO $ \ s -> - case tryTakeMVar# m s of - (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty - (# s', _, a #) -> (# s', Just a #) -- MVar is full - --- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function --- attempts to put the value @a@ into the 'MVar', returning 'True' if --- it was successful, or 'False' otherwise. -tryPutMVar :: MVar a -> a -> IO Bool -tryPutMVar (MVar mvar#) x = IO $ \ s# -> - case tryPutMVar# mvar# x s# of - (# s, 0# #) -> (# s, False #) - (# s, _ #) -> (# s, True #) - --- |Check whether a given 'MVar' is empty. --- --- Notice that the boolean value returned is just a snapshot of --- the state of the MVar. By the time you get to react on its result, --- the MVar may have been filled (or emptied) - so be extremely --- careful when using this operation. Use 'tryTakeMVar' instead if possible. -isEmptyMVar :: MVar a -> IO Bool -isEmptyMVar (MVar mv#) = IO $ \ s# -> - case isEmptyMVar# mv# s# of - (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #) - --- |Add a finalizer to an 'MVar' (GHC only). See "Foreign.ForeignPtr" and --- "System.Mem.Weak" for more about finalizers. -addMVarFinalizer :: MVar a -> IO () -> IO () -addMVarFinalizer (MVar m) finalizer = - IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) } - withMVar :: MVar a -> (a -> IO b) -> IO b withMVar m io = - block $ do + mask $ \restore -> do a <- takeMVar m - b <- catchAny (unblock (io a)) + b <- catchAny (restore (io a)) (\e -> do putMVar m a; throw e) putMVar m a return b -\end{code} +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + mask $ \restore -> do + a <- takeMVar m + a' <- catchAny (restore (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () +\end{code} %************************************************************************ %* * @@ -830,23 +780,6 @@ calculateTarget usecs = do -- around the scheduler loop. Furthermore, the scheduler can be simplified -- by not having to check for completed IO requests. --- Issues, possible problems: --- --- - we might want bound threads to just do the blocking --- operation rather than communicating with the IO manager --- thread. This would prevent simgle-threaded programs which do --- IO from requiring multiple OS threads. However, it would also --- prevent bound threads waiting on IO from being killed or sent --- exceptions. --- --- - Apprently exec() doesn't work on Linux in a multithreaded program. --- I couldn't repeat this. --- --- - How do we handle signal delivery in the multithreaded RTS? --- --- - forkProcess will kill the IO manager thread. Let's just --- hope we don't need to do any blocking IO between fork & exec. - #ifndef mingw32_HOST_OS data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) @@ -858,25 +791,52 @@ data DelayReq | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) #ifndef mingw32_HOST_OS +{-# NOINLINE pendingEvents #-} pendingEvents :: IORef [IOReq] +pendingEvents = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingEventsStore + +foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore" + getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a) #endif -pendingDelays :: IORef [DelayReq] - -- could use a strict list or array here -{-# NOINLINE pendingEvents #-} + {-# NOINLINE pendingDelays #-} -(pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOManagerThread - reqs <- newIORef [] - dels <- newIORef [] - return (reqs, dels) - -- the first time we schedule an IO request, the service thread - -- will be created (cool, huh?) +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore" + getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore" + getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a) ensureIOManagerIsRunning :: IO () ensureIOManagerIsRunning - | threaded = seq pendingEvents $ return () + | threaded = startIOManagerThread | otherwise = return () +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] insertDelay d [] = [d] insertDelay d1 ds@(d2 : rest) @@ -889,31 +849,52 @@ delayTime (DelaySTM t _) = t type USecs = Word64 --- XXX: move into GHC.IOBase from Data.IORef? -atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b -atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s - foreign import ccall unsafe "getUSecOfDay" getUSecOfDay :: IO USecs -prodding :: IORef Bool {-# NOINLINE prodding #-} -prodding = unsafePerformIO (newIORef False) +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcProddingStore + +foreign import ccall unsafe "getOrSetGHCConcProddingStore" + getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a) prodServiceThread :: IO () prodServiceThread = do - was_set <- atomicModifyIORef prodding (\a -> (True,a)) - if (not (was_set)) then wakeupIOManager else return () + -- NB. use atomicModifyIORef here, otherwise there are race + -- conditions in which prodding is left at True but the server is + -- blocked in select(). + was_set <- atomicModifyIORef prodding $ \b -> (True,b) + unless was_set wakeupIOManager + +-- Machinery needed to ensure that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + mask_ $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref==ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) #ifdef mingw32_HOST_OS -- ---------------------------------------------------------------------------- -- Windows IO manager thread -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do wakeup <- c_getIOManagerEvent - forkIO $ service_loop wakeup [] - return () + service_loop wakeup [] service_loop :: HANDLE -- read end of pipe -> [DelayReq] -- current delay requests @@ -938,15 +919,14 @@ service_loop wakeup old_delays = do _ | r2 == io_MANAGER_DIE -> return True 0 -> return False -- spurious wakeup _ -> do start_console_handler (r2 `shiftR` 1); return False - if exit - then return () - else service_cont wakeup delays' + unless exit $ service_cont wakeup delays' _other -> service_cont wakeup delays' -- probably timeout service_cont :: HANDLE -> [DelayReq] -> IO () service_cont wakeup delays = do - atomicModifyIORef prodding (\_ -> (False,False)) + r <- atomicModifyIORef prodding (\_ -> (False,False)) + r `seq` return () -- avoid space leak service_loop wakeup delays -- must agree with rts/win32/ThrIOManager.c @@ -967,7 +947,7 @@ start_console_handler :: Word32 -> IO () start_console_handler r = case toWin32ConsoleEvent r of Just x -> withMVar win32ConsoleHandler $ \handler -> do - forkIO (handler x) + _ <- forkIO (handler x) return () Nothing -> return () @@ -984,15 +964,8 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) --- XXX Is this actually needed? -stick :: IORef HANDLE -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef nullPtr) - wakeupIOManager :: IO () -wakeupIOManager = do - _hdl <- readIORef stick - c_sendIOManagerEvent io_MANAGER_WAKEUP +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending @@ -1041,19 +1014,21 @@ foreign import stdcall "WaitForSingleObject" -- ---------------------------------------------------------------------------- -- Unix IO manager thread, using select() -startIOManagerThread :: IO () -startIOManagerThread = do +ioManager :: IO () +ioManager = do allocaArray 2 $ \fds -> do - throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds) + throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 wr_end <- peekElemOff fds 1 - writeIORef stick (fromIntegral wr_end) + setNonBlockingFD wr_end True -- writes happen in a signal handler, we + -- don't want them to block. + setCloseOnExec rd_end + setCloseOnExec wr_end c_setIOManagerPipe wr_end - forkIO $ do - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () service_loop @@ -1066,6 +1041,17 @@ service_loop -> IO () service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do + -- reset prodding before we look at the new requests. If a new + -- client arrives after this point they will send a wakup which will + -- cause the server to loop around again, so we can be sure to not + -- miss any requests. + -- + -- NB. it's important to do this in the *first* iteration of + -- service_loop, rather than after calling select(), since a client + -- may have set prodding to True without sending a wakeup byte down + -- the pipe, because the pipe wasn't set up. + atomicModifyIORef prodding (\_ -> (False, ())) + -- pick up new IO requests new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) let reqs = new_reqs ++ old_reqs @@ -1114,52 +1100,120 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do if b == 0 then return False else alloca $ \p -> do - c_read (fromIntegral wakeup) p 1; return () + warnErrnoIfMinus1_ "service_loop" $ + c_read (fromIntegral wakeup) p 1 s <- peek p case s of _ | s == io_MANAGER_WAKEUP -> return False _ | s == io_MANAGER_DIE -> return True - _ -> withMVar signalHandlerLock $ \_ -> do - handler_tbl <- peek handlers - sp <- peekElemOff handler_tbl (fromIntegral s) - io <- deRefStablePtr sp - forkIO io - return False - - if exit then return () else do - - atomicModifyIORef prodding (\_ -> (False,False)) + _ | s == io_MANAGER_SYNC -> do + mvars <- readIORef sync + mapM_ (flip putMVar ()) mvars + return False + _ -> do + fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) + withForeignPtr fp $ \p_siginfo -> do + r <- c_read (fromIntegral wakeup) (castPtr p_siginfo) + sizeof_siginfo_t + when (r /= fromIntegral sizeof_siginfo_t) $ + error "failed to read siginfo_t" + runHandlers' fp (fromIntegral s) + return False + + unless exit $ do reqs' <- if wakeup_all then do wakeupAll reqs; return [] else completeRequests reqs readfds writefds [] service_loop wakeup readfds writefds ptimeval reqs' delays' -io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar +io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8 io_MANAGER_WAKEUP = 0xff io_MANAGER_DIE = 0xfe +io_MANAGER_SYNC = 0xfd -stick :: IORef Fd -{-# NOINLINE stick #-} -stick = unsafePerformIO (newIORef 0) +{-# NOINLINE sync #-} +sync :: IORef [MVar ()] +sync = unsafePerformIO (newIORef []) -wakeupIOManager :: IO () -wakeupIOManager = do - fd <- readIORef stick - with io_MANAGER_WAKEUP $ \pbuf -> do - c_write (fromIntegral fd) pbuf 1; return () - --- Lock used to protect concurrent access to signal_handlers. Symptom of --- this race condition is #1922, although that bug was on Windows a similar --- bug also exists on Unix. -signalHandlerLock :: MVar () -signalHandlerLock = unsafePerformIO (newMVar ()) +-- waits for the IO manager to drain the pipe +syncIOManager :: IO () +syncIOManager = do + m <- newEmptyMVar + atomicModifyIORef sync (\old -> (m:old,())) + c_ioManagerSync + takeMVar m -foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ()))) +foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO () +foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO () + +-- For the non-threaded RTS +runHandlers :: Ptr Word8 -> Int -> IO () +runHandlers p_info sig = do + fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) + withForeignPtr fp $ \p -> do + copyBytes p p_info (fromIntegral sizeof_siginfo_t) + free p_info + runHandlers' fp (fromIntegral sig) + +runHandlers' :: ForeignPtr Word8 -> Signal -> IO () +runHandlers' p_info sig = do + let int = fromIntegral sig + withMVar signal_handlers $ \arr -> + if not (inRange (boundsIOArray arr) int) + then return () + else do handler <- unsafeReadIOArray arr int + case handler of + Nothing -> return () + Just (f,_) -> do _ <- forkIO (f p_info) + return () + +warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO () +warnErrnoIfMinus1_ what io + = do r <- io + when (r == -1) $ do + errno <- getErrno + str <- strerror errno >>= peekCString + when (r == -1) $ + debugErrLn ("Warning: " ++ what ++ " failed: " ++ str) + +foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar) foreign import ccall "setIOManagerPipe" c_setIOManagerPipe :: CInt -> IO () +foreign import ccall "__hscore_sizeof_siginfo_t" + sizeof_siginfo_t :: CSize + +type Signal = CInt + +maxSig = 64 :: Int + +type HandlerFun = ForeignPtr Word8 -> IO () + +-- Lock used to protect concurrent access to signal_handlers. Symptom of +-- this race condition is #1922, although that bug was on Windows a similar +-- bug also exists on Unix. +{-# NOINLINE signal_handlers #-} +signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) +signal_handlers = unsafePerformIO $ do + arr <- newIOArray (0,maxSig) Nothing + m <- newMVar arr + sharedCAF m getOrSetGHCConcSignalHandlerStore + +foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore" + getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a) + +setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) +setHandler sig handler = do + let int = fromIntegral sig + withMVar signal_handlers $ \arr -> + if not (inRange (boundsIOArray arr) int) + then error "GHC.Conc.setHandler: signal out of range" + else do old <- unsafeReadIOArray arr int + unsafeWriteIOArray arr int handler + return old + -- ----------------------------------------------------------------------------- -- IO requests @@ -1250,7 +1304,7 @@ foreign import ccall unsafe "setTimevalTicks" data CFdSet -foreign import ccall safe "select" +foreign import ccall safe "__hscore_select" c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal -> IO CInt @@ -1280,14 +1334,13 @@ foreign import ccall unsafe "sizeof_fd_set" #endif -reportStackOverflow :: IO a -reportStackOverflow = do callStackOverflowHook; return undefined +reportStackOverflow :: IO () +reportStackOverflow = callStackOverflowHook -reportError :: SomeException -> IO a +reportError :: SomeException -> IO () reportError ex = do handler <- getUncaughtExceptionHandler handler ex - return undefined -- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove -- the unsafe below. @@ -1320,4 +1373,5 @@ setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler getUncaughtExceptionHandler :: IO (SomeException -> IO ()) getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler + \end{code}