X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=d6fdd4f4c2d336d76b9572e392bdb8d3fbe83583;hb=4b26136ab82fb1ff12e49477c4833a9586d368c5;hp=5fd03709b4317c4956d53fe918048084c8762cb6;hpb=ec3ba94b254bd444e7a1c560c1d91c4879948c69;p=haskell-directory.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 5fd0370..d6fdd4f 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -14,11 +14,21 @@ -- ----------------------------------------------------------------------------- -#include "ghcconfig.h" +-- No: #hide, because bits of this module are exposed by the stm package. +-- However, we don't want this module to be the home location for the +-- bits it exports, we'd rather have Control.Concurrent and the other +-- higher level modules be the home. Hence: + +#include "Typeable.h" + +-- #not-home module GHC.Conc ( ThreadId(..) - -- Forking and suchlike + -- * Forking and suchlike + , forkIO -- :: IO a -> IO ThreadId + , forkOnIO -- :: Int -> IO a -> IO ThreadId + , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () , throwTo -- :: ThreadId -> Exception -> IO () @@ -27,12 +37,13 @@ module GHC.Conc , yield -- :: IO () , labelThread -- :: ThreadId -> String -> IO () - -- Waiting + -- * Waiting , threadDelay -- :: Int -> IO () + , registerDelay -- :: Int -> IO (TVar Bool) , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () - -- MVars + -- * MVars , MVar -- abstract , newMVar -- :: a -> IO (MVar a) , newEmptyMVar -- :: IO (MVar a) @@ -43,18 +54,22 @@ module GHC.Conc , isEmptyMVar -- :: MVar a -> IO Bool , addMVarFinalizer -- :: MVar a -> IO () -> IO () - -- TVars + -- * TVars , STM -- abstract , atomically -- :: STM a -> IO a , retry -- :: STM a , orElse -- :: STM a -> STM a -> STM a , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a + , alwaysSucceeds -- :: STM a -> STM () + , always -- :: STM Bool -> STM () , TVar -- abstract , newTVar -- :: a -> STM (TVar a) + , newTVarIO -- :: a -> STM (TVar a) , readTVar -- :: TVar a -> STM a , writeTVar -- :: a -> TVar a -> STM () , unsafeIOToSTM -- :: IO a -> STM a + -- * Miscellaneous #ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) @@ -63,26 +78,36 @@ module GHC.Conc , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) #endif + + , ensureIOManagerIsRunning ) where import System.Posix.Types +#ifndef mingw32_HOST_OS import System.Posix.Internals +#endif import Foreign import Foreign.C +#ifndef __HADDOCK__ +import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow ) +#endif + import Data.Maybe import GHC.Base import GHC.IOBase import GHC.Num ( Num(..) ) -import GHC.Real ( fromIntegral, quot ) +import GHC.Real ( fromIntegral, div ) +#ifndef mingw32_HOST_OS import GHC.Base ( Int(..) ) -import GHC.Exception ( Exception(..), AsyncException(..) ) +#endif +import GHC.Exception ( catchException, Exception(..), AsyncException(..) ) import GHC.Pack ( packCString# ) import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) ) import GHC.STRef +import GHC.Show ( Show(..), showString ) import Data.Typeable -#include "Typeable.h" infixr 0 `par`, `pseq` \end{code} @@ -94,7 +119,7 @@ infixr 0 `par`, `pseq` %************************************************************************ \begin{code} -data ThreadId = ThreadId ThreadId# +data ThreadId = ThreadId ThreadId# deriving( Typeable ) -- ToDo: data ThreadId = ThreadId (Weak ThreadId#) -- But since ThreadId# is unlifted, the Weak type must use open -- type variables. @@ -116,10 +141,69 @@ This misfeature will hopefully be corrected at a later date. it defines 'ThreadId' as a synonym for (). -} -INSTANCE_TYPEABLE0(ThreadId,threadIdTc,"ThreadId") +instance Show ThreadId where + showsPrec d t = + showString "ThreadId " . + showsPrec d (getThreadId (id2TSO t)) + +foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt + +id2TSO :: ThreadId -> ThreadId# +id2TSO (ThreadId t) = t +foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt +-- Returns -1, 0, 1 ---forkIO has now been hoisted out into the Concurrent library. +cmpThread :: ThreadId -> ThreadId -> Ordering +cmpThread t1 t2 = + case cmp_thread (id2TSO t1) (id2TSO t2) of + -1 -> LT + 0 -> EQ + _ -> GT -- must be 1 + +instance Eq ThreadId where + t1 == t2 = + case t1 `cmpThread` t2 of + EQ -> True + _ -> False + +instance Ord ThreadId where + compare = cmpThread + +{- | +This sparks off a new thread to run the 'IO' computation passed as the +first argument, and returns the 'ThreadId' of the newly created +thread. + +The new thread will be a lightweight thread; if you want to use a foreign +library that uses thread-local storage, use 'forkOS' instead. +-} +forkIO :: IO () -> IO ThreadId +forkIO action = IO $ \ s -> + case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + where + action_plus = catchException action childHandler + +forkOnIO :: Int -> IO () -> IO ThreadId +forkOnIO (I# cpu) action = IO $ \ s -> + case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + where + action_plus = catchException action childHandler + +childHandler :: Exception -> IO () +childHandler err = catchException (real_handler err) childHandler + +real_handler :: Exception -> IO () +real_handler ex = + case ex of + -- ignore thread GC and killThread exceptions: + BlockedOnDeadMVar -> return () + BlockedIndefinitely -> return () + AsyncException ThreadKilled -> return () + + -- report all others: + AsyncException StackOverflow -> reportStackOverflow + other -> reportError other {- | 'killThread' terminates the given thread (GHC only). Any work already done by the thread isn\'t @@ -137,11 +221,33 @@ killThread tid = throwTo tid (AsyncException ThreadKilled) {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only). 'throwTo' does not return until the exception has been raised in the -target thread. The calling thread can thus be certain that the target +target thread. +The calling thread can thus be certain that the target thread has received the exception. This is a useful property to know when dealing with race conditions: eg. if there are two threads that can kill each other, it is guaranteed that only one of the threads -will get to kill the other. -} +will get to kill the other. + +If the target thread is currently making a foreign call, then the +exception will not be raised (and hence 'throwTo' will not return) +until the call has completed. This is the case regardless of whether +the call is inside a 'block' or not. + +Important note: the behaviour of 'throwTo' differs from that described in +the paper "Asynchronous exceptions in Haskell" +(). +In the paper, 'throwTo' is non-blocking; but the library implementation adopts +a more synchronous design in which 'throwTo' does not return until the exception +is received by the target thread. The trade-off is discussed in Section 8 of the paper. +Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of +the paper). + +There is currently no guarantee that the exception delivered by 'throwTo' will be +delivered at the first possible opportunity. In particular, if a thread may +unblock and then re-block exceptions (using 'unblock' and 'block') without receiving +a pending 'throwTo'. This is arguably undesirable behaviour. + + -} throwTo :: ThreadId -> Exception -> IO () throwTo (ThreadId id) ex = IO $ \ s -> case (killThread# id ex s) of s1 -> (# s1, () #) @@ -207,13 +313,14 @@ TVars are shared memory locations which support atomic memory transactions. \begin{code} +-- |A monad supporting atomic memory transactions. newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) -INSTANCE_TYPEABLE1(STM,stmTc,"STM" ) - unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #)) unSTM (STM a) = a +INSTANCE_TYPEABLE1(STM,stmTc,"STM") + instance Functor STM where fmap f x = x >>= (return . f) @@ -245,6 +352,15 @@ unsafeIOToSTM :: IO a -> STM a unsafeIOToSTM (IO m) = STM m -- |Perform a series of STM actions atomically. +-- +-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. +-- Any attempt to do so will result in a runtime error. (Reason: allowing +-- this would effectively allow a transaction inside a transaction, depending +-- on exactly when the thunk is evaluated.) +-- +-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO', +-- and which allows top-level TVars to be allocated. + atomically :: STM a -> IO a atomically (STM m) = IO (\s -> (atomically# m) s ) @@ -252,14 +368,15 @@ atomically (STM m) = IO (\s -> (atomically# m) s ) -- values in TVars which mean that it should not continue (e.g. the TVars -- represent a shared buffer that is now empty). The implementation may -- block the thread until one of the TVars that it has read from has been --- udpated. +-- udpated. (GHC only) retry :: STM a retry = STM $ \s# -> retry# s# --- |Compose two alternative STM actions. If the first action completes without --- retrying then it forms the result of the orElse. Otherwise, if the first --- action retries, then the second action is tried in its place. If both actions --- retry then the orElse as a whole retries. +-- |Compose two alternative STM actions (GHC only). If the first action +-- completes without retrying then it forms the result of the orElse. +-- Otherwise, if the first action retries, then the second action is +-- tried in its place. If both actions retry then the orElse as a +-- whole retries. orElse :: STM a -> STM a -> STM a orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s @@ -267,9 +384,34 @@ orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s catchSTM :: STM a -> (Exception -> STM a) -> STM a catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s +-- | Low-level primitive on which always and alwaysSucceeds are built. +-- checkInv differs form these in that (i) the invariant is not +-- checked when checkInv is called, only at the end of this and +-- subsequent transcations, (ii) the invariant failure is indicated +-- by raising an exception. +checkInv :: STM a -> STM () +checkInv (STM m) = STM (\s -> (check# m) s) + +-- | alwaysSucceeds adds a new invariant that must be true when passed +-- to alwaysSucceeds, at the end of the current transaction, and at +-- the end of every subsequent transaction. If it fails at any +-- of those points then the transaction violating it is aborted +-- and the exception raised by the invariant is propagated. +alwaysSucceeds :: STM a -> STM () +alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) + checkInv i + +-- | always is a variant of alwaysSucceeds in which the invariant is +-- expressed as an STM Bool action that must return True. Returning +-- False or raising an exception are both treated as invariant failures. +always :: STM Bool -> STM () +always i = alwaysSucceeds ( do v <- i + if (v) then return () else ( error "Transacional invariant violation" ) ) + +-- |Shared memory locations that support atomic memory transactions. data TVar a = TVar (TVar# RealWorld a) -INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar" ) +INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar") instance Eq (TVar a) where (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2# @@ -280,6 +422,15 @@ newTVar val = STM $ \s1# -> case newTVar# val s1# of (# s2#, tvar# #) -> (# s2#, TVar tvar# #) +-- |@IO@ version of 'newTVar'. This is useful for creating top-level +-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTVarIO :: a -> IO (TVar a) +newTVarIO val = IO $ \s1# -> + case newTVar# val s1# of + (# s2#, tvar# #) -> (# s2#, TVar tvar# #) + -- |Return the current value stored in a TVar readTVar :: TVar a -> STM a readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# @@ -309,8 +460,6 @@ writes. \begin{code} --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a) -INSTANCE_TYPEABLE1(MVar,mvarTc,"MVar" ) - -- |Create an 'MVar' which is initially empty. newEmptyMVar :: IO (MVar a) newEmptyMVar = IO $ \ s# -> @@ -328,16 +477,34 @@ newMVar value = -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar', -- the 'MVar' is left empty. -- --- If several threads are competing to take the same 'MVar', one is chosen --- to continue at random when the 'MVar' becomes full. +-- There are two further important properties of 'takeMVar': +-- +-- * 'takeMVar' is single-wakeup. That is, if there are multiple +-- threads blocked in 'takeMVar', and the 'MVar' becomes full, +-- only one thread will be woken up. The runtime guarantees that +-- the woken thread completes its 'takeMVar' operation. +-- +-- * When multiple threads are blocked on an 'MVar', they are +-- woken up in FIFO order. This is useful for providing +-- fairness properties of abstractions built using 'MVar's. +-- takeMVar :: MVar a -> IO a takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s# -- |Put a value into an 'MVar'. If the 'MVar' is currently full, -- 'putMVar' will wait until it becomes empty. -- --- If several threads are competing to fill the same 'MVar', one is --- chosen to continue at random when the 'MVar' becomes empty. +-- There are two further important properties of 'putMVar': +-- +-- * 'putMVar' is single-wakeup. That is, if there are multiple +-- threads blocked in 'putMVar', and the 'MVar' becomes empty, +-- only one thread will be woken up. The runtime guarantees that +-- the woken thread completes its 'putMVar' operation. +-- +-- * When multiple threads are blocked on an 'MVar', they are +-- woken up in FIFO order. This is useful for providing +-- fairness properties of abstractions built using 'MVar's. +-- putMVar :: MVar a -> a -> IO () putMVar (MVar mvar#) x = IO $ \ s# -> case putMVar# mvar# x s# of @@ -455,33 +622,52 @@ threadWaitWrite fd -- | Suspends the current thread for a given number of microseconds -- (GHC only). -- --- Note that the resolution used by the Haskell runtime system's --- internal timer is 1\/50 second, and 'threadDelay' will round its --- argument up to the nearest multiple of this resolution. --- -- There is no guarantee that the thread will be rescheduled promptly -- when the delay has expired, but the thread will never continue to -- run /earlier/ than specified. -- threadDelay :: Int -> IO () threadDelay time -#ifndef mingw32_HOST_OS | threaded = waitForDelayEvent time -#else - | threaded = c_Sleep (fromIntegral (time `quot` 1000)) -#endif | otherwise = IO $ \s -> case fromIntegral time of { I# time# -> case delay# time# s of { s -> (# s, () #) }} --- On Windows, we just make a safe call to 'Sleep' to implement threadDelay. -#ifdef mingw32_HOST_OS -foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO () -#endif + +-- | Set the value of returned TVar to True after a given number of +-- microseconds. The caveats associated with threadDelay also apply. +-- +registerDelay :: Int -> IO (TVar Bool) +registerDelay usecs + | threaded = waitForDelayEventSTM usecs + | otherwise = error "registerDelay: requires -threaded" foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool +waitForDelayEvent :: Int -> IO () +waitForDelayEvent usecs = do + m <- newEmptyMVar + target <- calculateTarget usecs + atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) + prodServiceThread + takeMVar m + +-- Delays for use in STM +waitForDelayEventSTM :: Int -> IO (TVar Bool) +waitForDelayEventSTM usecs = do + t <- atomically $ newTVar False + target <- calculateTarget usecs + atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ())) + prodServiceThread + return t + +calculateTarget :: Int -> IO USecs +calculateTarget usecs = do + now <- getUSecOfDay + return $ now + (fromIntegral usecs) + + -- ---------------------------------------------------------------------------- -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay @@ -515,45 +701,188 @@ foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool -- hope we don't need to do any blocking IO between fork & exec. #ifndef mingw32_HOST_OS - data IOReq = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) +#endif data DelayReq - = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ()) + = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ()) + | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) +#ifndef mingw32_HOST_OS pendingEvents :: IORef [IOReq] +#endif pendingDelays :: IORef [DelayReq] -- could use a strict list or array here {-# NOINLINE pendingEvents #-} {-# NOINLINE pendingDelays #-} (pendingEvents,pendingDelays) = unsafePerformIO $ do - startIOServiceThread + startIOManagerThread reqs <- newIORef [] dels <- newIORef [] return (reqs, dels) -- the first time we schedule an IO request, the service thread -- will be created (cool, huh?) -startIOServiceThread :: IO () -startIOServiceThread = do +ensureIOManagerIsRunning :: IO () +ensureIOManagerIsRunning + | threaded = seq pendingEvents $ return () + | otherwise = return () + +insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] +insertDelay d [] = [d] +insertDelay d1 ds@(d2 : rest) + | delayTime d1 <= delayTime d2 = d1 : ds + | otherwise = d2 : insertDelay d1 rest + +delayTime :: DelayReq -> USecs +delayTime (Delay t _) = t +delayTime (DelaySTM t _) = t + +type USecs = Word64 + +-- XXX: move into GHC.IOBase from Data.IORef? +atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b +atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s + +foreign import ccall unsafe "getUSecOfDay" + getUSecOfDay :: IO USecs + +prodding :: IORef Bool +{-# NOINLINE prodding #-} +prodding = unsafePerformIO (newIORef False) + +prodServiceThread :: IO () +prodServiceThread = do + was_set <- atomicModifyIORef prodding (\a -> (True,a)) + if (not (was_set)) then wakeupIOManager else return () + +#ifdef mingw32_HOST_OS +-- ---------------------------------------------------------------------------- +-- Windows IO manager thread + +startIOManagerThread :: IO () +startIOManagerThread = do + wakeup <- c_getIOManagerEvent + forkIO $ service_loop wakeup [] + return () + +service_loop :: HANDLE -- read end of pipe + -> [DelayReq] -- current delay requests + -> IO () + +service_loop wakeup old_delays = do + -- pick up new delay requests + new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) + let delays = foldr insertDelay old_delays new_delays + + now <- getUSecOfDay + (delays', timeout) <- getDelay now delays + + r <- c_WaitForSingleObject wakeup timeout + case r of + 0xffffffff -> do c_maperrno; throwErrno "service_loop" + 0 -> do + r <- c_readIOManagerEvent + exit <- + case r of + _ | r == io_MANAGER_WAKEUP -> return False + _ | r == io_MANAGER_DIE -> return True + 0 -> return False -- spurious wakeup + r -> do start_console_handler (r `shiftR` 1); return False + if exit + then return () + else service_cont wakeup delays' + + _other -> service_cont wakeup delays' -- probably timeout + +service_cont wakeup delays = do + atomicModifyIORef prodding (\_ -> (False,False)) + service_loop wakeup delays + +-- must agree with rts/win32/ThrIOManager.c +io_MANAGER_WAKEUP = 0xffffffff :: Word32 +io_MANAGER_DIE = 0xfffffffe :: Word32 + +start_console_handler :: Word32 -> IO () +start_console_handler r = do + stableptr <- peek console_handler + forkIO $ do io <- deRefStablePtr stableptr; io (fromIntegral r) + return () + +foreign import ccall "&console_handler" + console_handler :: Ptr (StablePtr (CInt -> IO ())) + +stick :: IORef HANDLE +{-# NOINLINE stick #-} +stick = unsafePerformIO (newIORef nullPtr) + +wakeupIOManager = do + hdl <- readIORef stick + c_sendIOManagerEvent io_MANAGER_WAKEUP + +-- Walk the queue of pending delays, waking up any that have passed +-- and return the smallest delay to wait for. The queue of pending +-- delays is kept ordered. +getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) +getDelay now [] = return ([], iNFINITE) +getDelay now all@(d : rest) + = case d of + Delay time m | now >= time -> do + putMVar m () + getDelay now rest + DelaySTM time t | now >= time -> do + atomically $ writeTVar t True + getDelay now rest + _otherwise -> + -- delay is in millisecs for WaitForSingleObject + let micro_seconds = delayTime d - now + milli_seconds = (micro_seconds + 999) `div` 1000 + in return (all, fromIntegral milli_seconds) + +-- ToDo: this just duplicates part of System.Win32.Types, which isn't +-- available yet. We should move some Win32 functionality down here, +-- maybe as part of the grand reorganisation of the base package... +type HANDLE = Ptr () +type DWORD = Word32 + +iNFINITE = 0xFFFFFFFF :: DWORD -- urgh + +foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_getIOManagerEvent :: IO HANDLE + +foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_readIOManagerEvent :: IO Word32 + +foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_sendIOManagerEvent :: Word32 -> IO () + +foreign import ccall unsafe "maperrno" -- in runProcess.c + c_maperrno :: IO () + +foreign import stdcall "WaitForSingleObject" + c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD + +#else +-- ---------------------------------------------------------------------------- +-- Unix IO manager thread, using select() + +startIOManagerThread :: IO () +startIOManagerThread = do allocaArray 2 $ \fds -> do - throwErrnoIfMinus1 "startIOServiceThread" (c_pipe fds) + throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds) rd_end <- peekElemOff fds 0 wr_end <- peekElemOff fds 1 writeIORef stick (fromIntegral wr_end) - quickForkIO $ do + c_setIOManagerPipe wr_end + forkIO $ do allocaBytes sizeofFdSet $ \readfds -> do allocaBytes sizeofFdSet $ \writefds -> do allocaBytes sizeofTimeVal $ \timeval -> do service_loop (fromIntegral rd_end) readfds writefds timeval [] [] return () --- XXX: move real forkIO here from Control.Concurrent? -quickForkIO action = IO $ \s -> - case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #) - service_loop :: Fd -- listen to this for wakeup calls -> Ptr CFdSet @@ -578,61 +907,91 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do fdSet wakeup readfds maxfd <- buildFdSets 0 readfds writefds reqs - -- check the current time and wake up any thread in threadDelay whose - -- timeout has expired. Also find the timeout value for the select() call. - now <- getTicksOfDay - (delays', timeout) <- getDelay now ptimeval delays - -- perform the select() - let do_select = do - res <- c_select ((max wakeup maxfd)+1) readfds writefds + let do_select delays = do + -- check the current time and wake up any thread in + -- threadDelay whose timeout has expired. Also find the + -- timeout value for the select() call. + now <- getUSecOfDay + (delays', timeout) <- getDelay now ptimeval delays + + res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds nullPtr timeout if (res == -1) then do err <- getErrno - if err == eINTR - then do_select - else return res + case err of + _ | err == eINTR -> do_select delays' + -- EINTR: just redo the select() + _ | err == eBADF -> return (True, delays) + -- EBADF: one of the file descriptors is closed or bad, + -- we don't know which one, so wake everyone up. + _ | otherwise -> throwErrno "select" + -- otherwise (ENOMEM or EINVAL) something has gone + -- wrong; report the error. else - return res - res <- do_select - -- ToDo: check result - - b <- takeMVar prodding - if b then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return () - else return () - putMVar prodding False + return (False,delays') + + (wakeup_all,delays') <- do_select delays + + exit <- + if wakeup_all then return False + else do + b <- fdIsSet wakeup readfds + if b == 0 + then return False + else alloca $ \p -> do + c_read (fromIntegral wakeup) p 1; return () + s <- peek p + case s of + _ | s == io_MANAGER_WAKEUP -> return False + _ | s == io_MANAGER_DIE -> return True + _ -> do handler_tbl <- peek handlers + sp <- peekElemOff handler_tbl (fromIntegral s) + forkIO (do io <- deRefStablePtr sp; io) + return False + + if exit then return () else do + + atomicModifyIORef prodding (\_ -> (False,False)) + + reqs' <- if wakeup_all then do wakeupAll reqs; return [] + else completeRequests reqs readfds writefds [] - reqs' <- completeRequests reqs readfds writefds [] service_loop wakeup readfds writefds ptimeval reqs' delays' +io_MANAGER_WAKEUP = 0xff :: CChar +io_MANAGER_DIE = 0xfe :: CChar + stick :: IORef Fd {-# NOINLINE stick #-} stick = unsafePerformIO (newIORef 0) -prodding :: MVar Bool -{-# NOINLINE prodding #-} -prodding = unsafePerformIO (newMVar False) +wakeupIOManager :: IO () +wakeupIOManager = do + fd <- readIORef stick + with io_MANAGER_WAKEUP $ \pbuf -> do + c_write (fromIntegral fd) pbuf 1; return () -prodServiceThread :: IO () -prodServiceThread = do - b <- takeMVar prodding - if (not b) - then do fd <- readIORef stick - with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return () - else return () - putMVar prodding True +foreign import ccall "&signal_handlers" handlers :: Ptr (Ptr (StablePtr (IO ()))) + +foreign import ccall "setIOManagerPipe" + c_setIOManagerPipe :: CInt -> IO () -- ----------------------------------------------------------------------------- -- IO requests buildFdSets maxfd readfds writefds [] = return maxfd -buildFdSets maxfd readfds writefds (Read fd m : reqs) = do - fdSet fd readfds - buildFdSets (max maxfd fd) readfds writefds reqs -buildFdSets maxfd readfds writefds (Write fd m : reqs) = do - fdSet fd writefds - buildFdSets (max maxfd fd) readfds writefds reqs +buildFdSets maxfd readfds writefds (Read fd m : reqs) + | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" + | otherwise = do + fdSet fd readfds + buildFdSets (max maxfd fd) readfds writefds reqs +buildFdSets maxfd readfds writefds (Write fd m : reqs) + | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" + | otherwise = do + fdSet fd writefds + buildFdSets (max maxfd fd) readfds writefds reqs completeRequests [] _ _ reqs' = return reqs' completeRequests (Read fd m : reqs) readfds writefds reqs' = do @@ -646,6 +1005,10 @@ completeRequests (Write fd m : reqs) readfds writefds reqs' = do then do putMVar m (); completeRequests reqs readfds writefds reqs' else completeRequests reqs readfds writefds (Write fd m : reqs') +wakeupAll [] = return () +wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs +wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs + waitForReadEvent :: Fd -> IO () waitForReadEvent fd = do m <- newEmptyMVar @@ -660,55 +1023,39 @@ waitForWriteEvent fd = do prodServiceThread takeMVar m --- XXX: move into GHC.IOBase from Data.IORef? -atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b -atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s - -- ----------------------------------------------------------------------------- -- Delays -waitForDelayEvent :: Int -> IO () -waitForDelayEvent usecs = do - m <- newEmptyMVar - now <- getTicksOfDay - let target = now + usecs `quot` tick_usecs - atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) - prodServiceThread - takeMVar m - -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending -- delays is kept ordered. -getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) +getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) getDelay now ptimeval [] = return ([],nullPtr) -getDelay now ptimeval all@(Delay time m : rest) - | now >= time = do +getDelay now ptimeval all@(d : rest) + = case d of + Delay time m | now >= time -> do putMVar m () getDelay now ptimeval rest - | otherwise = do - setTimevalTicks ptimeval (time - now) + DelaySTM time t | now >= time -> do + atomically $ writeTVar t True + getDelay now ptimeval rest + _otherwise -> do + setTimevalTicks ptimeval (delayTime d - now) return (all,ptimeval) -insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] -insertDelay d@(Delay time m) [] = [d] -insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest) - | time <= time' = d1 : ds - | otherwise = d2 : insertDelay d1 rest - -type Ticks = Int -tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec) -tick_usecs = 1000000 `quot` tick_freq :: Int - newtype CTimeVal = CTimeVal () foreign import ccall unsafe "sizeofTimeVal" sizeofTimeVal :: Int -foreign import ccall unsafe "getTicksOfDay" - getTicksOfDay :: IO Ticks - foreign import ccall unsafe "setTimevalTicks" - setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO () + setTimevalTicks :: Ptr CTimeVal -> USecs -> IO () + +{- + On Win32 we're going to have a single Pipe, and a + waitForSingleObject with the delay time. For signals, we send a + byte down the pipe just like on Unix. +-} -- ---------------------------------------------------------------------------- -- select() interface @@ -718,17 +1065,32 @@ foreign import ccall unsafe "setTimevalTicks" newtype CFdSet = CFdSet () foreign import ccall safe "select" - c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal + c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal -> IO CInt +foreign import ccall unsafe "hsFD_SETSIZE" + c_fD_SETSIZE :: CInt + +fD_SETSIZE :: Fd +fD_SETSIZE = fromIntegral c_fD_SETSIZE + foreign import ccall unsafe "hsFD_CLR" - fdClr :: Fd -> Ptr CFdSet -> IO () + c_fdClr :: CInt -> Ptr CFdSet -> IO () + +fdClr :: Fd -> Ptr CFdSet -> IO () +fdClr (Fd fd) fdset = c_fdClr fd fdset foreign import ccall unsafe "hsFD_ISSET" - fdIsSet :: Fd -> Ptr CFdSet -> IO CInt + c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt + +fdIsSet :: Fd -> Ptr CFdSet -> IO CInt +fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset foreign import ccall unsafe "hsFD_SET" - fdSet :: Fd -> Ptr CFdSet -> IO () + c_fdSet :: CInt -> Ptr CFdSet -> IO () + +fdSet :: Fd -> Ptr CFdSet -> IO () +fdSet (Fd fd) fdset = c_fdSet fd fdset foreign import ccall unsafe "hsFD_ZERO" fdZero :: Ptr CFdSet -> IO () @@ -737,4 +1099,5 @@ foreign import ccall unsafe "sizeof_fd_set" sizeofFdSet :: Int #endif + \end{code}