From: Simon Marlow Date: Tue, 10 Aug 2010 08:22:48 +0000 (+0000) Subject: Integrated new I/O manager X-Git-Url: http://git.megacz.com/?p=ghc-base.git;a=commitdiff_plain;h=9520c5735e69668a33013c36f85152a1ef656b8d Integrated new I/O manager (patch originally by Johan Tibell , minor merging by me) --- diff --git a/Control/Exception/Base.hs b/Control/Exception/Base.hs index 525dc6a..1dc668b 100644 --- a/Control/Exception/Base.hs +++ b/Control/Exception/Base.hs @@ -120,7 +120,7 @@ import GHC.IO.Exception import GHC.Exception import GHC.Show -- import GHC.Exception hiding ( Exception ) -import GHC.Conc +import GHC.Conc.Sync #endif #ifdef __HUGS__ diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 0d17457..2e3247f 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -33,7 +33,7 @@ module GHC.Conc , forkOnIO -- :: Int -> IO a -> IO ThreadId , forkOnIOUnmasked , numCapabilities -- :: Int - , numSparks -- :: IO Int + , numSparks -- :: IO Int , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () @@ -85,9 +85,6 @@ module GHC.Conc #endif , ensureIOManagerIsRunning -#ifndef mingw32_HOST_OS - , syncIOManager -#endif #ifdef mingw32_HOST_OS , ConsoleEvent(..) @@ -100,1278 +97,11 @@ module GHC.Conc , reportError, reportStackOverflow ) where -import System.Posix.Types -#ifndef mingw32_HOST_OS -import System.Posix.Internals -#endif -import Foreign -import Foreign.C - -#ifdef mingw32_HOST_OS -import Data.Typeable -#endif - -#ifndef mingw32_HOST_OS -import Data.Dynamic -#endif -import Control.Monad -import Data.Maybe +import GHC.Conc.IO +import GHC.Conc.Sync -import GHC.Base -#ifndef mingw32_HOST_OS -import GHC.Debug -#endif -import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) -import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) -import GHC.IO -import GHC.IO.Exception -import GHC.Exception -import GHC.IORef -import GHC.MVar -import GHC.Num ( Num(..) ) -import GHC.Real ( fromIntegral ) #ifndef mingw32_HOST_OS -import GHC.IOArray -import GHC.Arr ( inRange ) -#endif -#ifdef mingw32_HOST_OS -import GHC.Real ( div ) -import GHC.Ptr -#endif -#ifdef mingw32_HOST_OS -import GHC.Read ( Read ) -import GHC.Enum ( Enum ) -#endif -import GHC.Pack ( packCString# ) -import GHC.Show ( Show(..), showString ) - -infixr 0 `par`, `pseq` -\end{code} - -%************************************************************************ -%* * -\subsection{@ThreadId@, @par@, and @fork@} -%* * -%************************************************************************ - -\begin{code} -data ThreadId = ThreadId ThreadId# deriving( Typeable ) --- ToDo: data ThreadId = ThreadId (Weak ThreadId#) --- But since ThreadId# is unlifted, the Weak type must use open --- type variables. -{- ^ -A 'ThreadId' is an abstract type representing a handle to a thread. -'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where -the 'Ord' instance implements an arbitrary total ordering over -'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued -'ThreadId' to string form; showing a 'ThreadId' value is occasionally -useful when debugging or diagnosing the behaviour of a concurrent -program. - -/Note/: in GHC, if you have a 'ThreadId', you essentially have -a pointer to the thread itself. This means the thread itself can\'t be -garbage collected until you drop the 'ThreadId'. -This misfeature will hopefully be corrected at a later date. - -/Note/: Hugs does not provide any operations on other threads; -it defines 'ThreadId' as a synonym for (). --} - -instance Show ThreadId where - showsPrec d t = - showString "ThreadId " . - showsPrec d (getThreadId (id2TSO t)) - -foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt - -id2TSO :: ThreadId -> ThreadId# -id2TSO (ThreadId t) = t - -foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt --- Returns -1, 0, 1 - -cmpThread :: ThreadId -> ThreadId -> Ordering -cmpThread t1 t2 = - case cmp_thread (id2TSO t1) (id2TSO t2) of - -1 -> LT - 0 -> EQ - _ -> GT -- must be 1 - -instance Eq ThreadId where - t1 == t2 = - case t1 `cmpThread` t2 of - EQ -> True - _ -> False - -instance Ord ThreadId where - compare = cmpThread - -{- | -Sparks off a new thread to run the 'IO' computation passed as the -first argument, and returns the 'ThreadId' of the newly created -thread. - -The new thread will be a lightweight thread; if you want to use a foreign -library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. - -GHC note: the new thread inherits the /masked/ state of the parent -(see 'Control.Exception.mask'). - -The newly created thread has an exception handler that discards the -exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and -'ThreadKilled', and passes all other exceptions to the uncaught -exception handler (see 'setUncaughtExceptionHandler'). --} -forkIO :: IO () -> IO ThreadId -forkIO action = IO $ \ s -> - case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) - where - action_plus = catchException action childHandler - --- | Like 'forkIO', but the child thread is created with asynchronous exceptions --- unmasked (see 'Control.Exception.mask'). -forkIOUnmasked :: IO () -> IO ThreadId -forkIOUnmasked io = forkIO (unsafeUnmask io) - -{- | -Like 'forkIO', but lets you specify on which CPU the thread is -created. Unlike a `forkIO` thread, a thread created by `forkOnIO` -will stay on the same CPU for its entire lifetime (`forkIO` threads -can migrate between CPUs according to the scheduling policy). -`forkOnIO` is useful for overriding the scheduling policy when you -know in advance how best to distribute the threads. - -The `Int` argument specifies the CPU number; it is interpreted modulo -'numCapabilities' (note that it actually specifies a capability number -rather than a CPU number, but to a first approximation the two are -equivalent). --} -forkOnIO :: Int -> IO () -> IO ThreadId -forkOnIO (I# cpu) action = IO $ \ s -> - case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) - where - action_plus = catchException action childHandler - --- | Like 'forkOnIO', but the child thread is created with --- asynchronous exceptions unmasked (see 'Control.Exception.mask'). -forkOnIOUnmasked :: Int -> IO () -> IO ThreadId -forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io) - --- | the value passed to the @+RTS -N@ flag. This is the number of --- Haskell threads that can run truly simultaneously at any given --- time, and is typically set to the number of physical CPU cores on --- the machine. -numCapabilities :: Int -numCapabilities = unsafePerformIO $ do - n <- peek n_capabilities - return (fromIntegral n) - --- | Returns the number of sparks currently in the local spark pool -numSparks :: IO Int -numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #) - -#if defined(mingw32_HOST_OS) && defined(__PIC__) -foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt -#else -foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt -#endif -childHandler :: SomeException -> IO () -childHandler err = catchException (real_handler err) childHandler - -real_handler :: SomeException -> IO () -real_handler se@(SomeException ex) = - -- ignore thread GC and killThread exceptions: - case cast ex of - Just BlockedIndefinitelyOnMVar -> return () - _ -> case cast ex of - Just BlockedIndefinitelyOnSTM -> return () - _ -> case cast ex of - Just ThreadKilled -> return () - _ -> case cast ex of - -- report all others: - Just StackOverflow -> reportStackOverflow - _ -> reportError se - -{- | 'killThread' raises the 'ThreadKilled' exception in the given -thread (GHC only). - -> killThread tid = throwTo tid ThreadKilled - --} -killThread :: ThreadId -> IO () -killThread tid = throwTo tid ThreadKilled - -{- | 'throwTo' raises an arbitrary exception in the target thread (GHC only). - -'throwTo' does not return until the exception has been raised in the -target thread. -The calling thread can thus be certain that the target -thread has received the exception. This is a useful property to know -when dealing with race conditions: eg. if there are two threads that -can kill each other, it is guaranteed that only one of the threads -will get to kill the other. - -Whatever work the target thread was doing when the exception was -raised is not lost: the computation is suspended until required by -another thread. - -If the target thread is currently making a foreign call, then the -exception will not be raised (and hence 'throwTo' will not return) -until the call has completed. This is the case regardless of whether -the call is inside a 'mask' or not. - -Important note: the behaviour of 'throwTo' differs from that described in -the paper \"Asynchronous exceptions in Haskell\" -(). -In the paper, 'throwTo' is non-blocking; but the library implementation adopts -a more synchronous design in which 'throwTo' does not return until the exception -is received by the target thread. The trade-off is discussed in Section 9 of the paper. -Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of -the paper). Unlike other interruptible operations, however, 'throwTo' -is /always/ interruptible, even if it does not actually block. - -There is no guarantee that the exception will be delivered promptly, -although the runtime will endeavour to ensure that arbitrary -delays don't occur. In GHC, an exception can only be raised when a -thread reaches a /safe point/, where a safe point is where memory -allocation occurs. Some loops do not perform any memory allocation -inside the loop and therefore cannot be interrupted by a 'throwTo'. - -Blocked 'throwTo' is fair: if multiple threads are trying to throw an -exception to the same target thread, they will succeed in FIFO order. - - -} -throwTo :: Exception e => ThreadId -> e -> IO () -throwTo (ThreadId tid) ex = IO $ \ s -> - case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) - --- | Returns the 'ThreadId' of the calling thread (GHC only). -myThreadId :: IO ThreadId -myThreadId = IO $ \s -> - case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #) - - --- |The 'yield' action allows (forces, in a co-operative multitasking --- implementation) a context-switch to any other currently runnable --- threads (if any), and is occasionally useful when implementing --- concurrency abstractions. -yield :: IO () -yield = IO $ \s -> - case (yield# s) of s1 -> (# s1, () #) - -{- | 'labelThread' stores a string as identifier for this thread if -you built a RTS with debugging support. This identifier will be used in -the debugging output to make distinction of different threads easier -(otherwise you only have the thread state object\'s address in the heap). - -Other applications like the graphical Concurrent Haskell Debugger -() may choose to overload -'labelThread' for their purposes as well. --} - -labelThread :: ThreadId -> String -> IO () -labelThread (ThreadId t) str = IO $ \ s -> - let !ps = packCString# str - !adr = byteArrayContents# ps in - case (labelThread# t adr s) of s1 -> (# s1, () #) - --- Nota Bene: 'pseq' used to be 'seq' --- but 'seq' is now defined in PrelGHC --- --- "pseq" is defined a bit weirdly (see below) --- --- The reason for the strange "lazy" call is that --- it fools the compiler into thinking that pseq and par are non-strict in --- their second argument (even if it inlines pseq at the call site). --- If it thinks pseq is strict in "y", then it often evaluates --- "y" before "x", which is totally wrong. - -{-# INLINE pseq #-} -pseq :: a -> b -> b -pseq x y = x `seq` lazy y - -{-# INLINE par #-} -par :: a -> b -> b -par x y = case (par# x) of { _ -> lazy y } - --- | Internal function used by the RTS to run sparks. -runSparks :: IO () -runSparks = IO loop - where loop s = case getSpark# s of - (# s', n, p #) -> - if n ==# 0# then (# s', () #) - else p `seq` loop s' - -data BlockReason - = BlockedOnMVar - -- ^blocked on on 'MVar' - | BlockedOnBlackHole - -- ^blocked on a computation in progress by another thread - | BlockedOnException - -- ^blocked in 'throwTo' - | BlockedOnSTM - -- ^blocked in 'retry' in an STM transaction - | BlockedOnForeignCall - -- ^currently in a foreign call - | BlockedOnOther - -- ^blocked on some other resource. Without @-threaded@, - -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@ - -- they show up as 'BlockedOnMVar'. - deriving (Eq,Ord,Show) - --- | The current status of a thread -data ThreadStatus - = ThreadRunning - -- ^the thread is currently runnable or running - | ThreadFinished - -- ^the thread has finished - | ThreadBlocked BlockReason - -- ^the thread is blocked on some resource - | ThreadDied - -- ^the thread received an uncaught exception - deriving (Eq,Ord,Show) - -threadStatus :: ThreadId -> IO ThreadStatus -threadStatus (ThreadId t) = IO $ \s -> - case threadStatus# t s of - (# s', stat #) -> (# s', mk_stat (I# stat) #) - where - -- NB. keep these in sync with includes/Constants.h - mk_stat 0 = ThreadRunning - mk_stat 1 = ThreadBlocked BlockedOnMVar - mk_stat 2 = ThreadBlocked BlockedOnBlackHole - mk_stat 3 = ThreadBlocked BlockedOnException - mk_stat 7 = ThreadBlocked BlockedOnSTM - mk_stat 11 = ThreadBlocked BlockedOnForeignCall - mk_stat 12 = ThreadBlocked BlockedOnForeignCall - mk_stat 16 = ThreadFinished - mk_stat 17 = ThreadDied - mk_stat _ = ThreadBlocked BlockedOnOther -\end{code} - - -%************************************************************************ -%* * -\subsection[stm]{Transactional heap operations} -%* * -%************************************************************************ - -TVars are shared memory locations which support atomic memory -transactions. - -\begin{code} --- |A monad supporting atomic memory transactions. -newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) - -unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #)) -unSTM (STM a) = a - -INSTANCE_TYPEABLE1(STM,stmTc,"STM") - -instance Functor STM where - fmap f x = x >>= (return . f) - -instance Monad STM where - {-# INLINE return #-} - {-# INLINE (>>) #-} - {-# INLINE (>>=) #-} - m >> k = thenSTM m k - return x = returnSTM x - m >>= k = bindSTM m k - -bindSTM :: STM a -> (a -> STM b) -> STM b -bindSTM (STM m) k = STM ( \s -> - case m s of - (# new_s, a #) -> unSTM (k a) new_s - ) - -thenSTM :: STM a -> STM b -> STM b -thenSTM (STM m) k = STM ( \s -> - case m s of - (# new_s, _ #) -> unSTM k new_s - ) - -returnSTM :: a -> STM a -returnSTM x = STM (\s -> (# s, x #)) - -instance MonadPlus STM where - mzero = retry - mplus = orElse - --- | Unsafely performs IO in the STM monad. Beware: this is a highly --- dangerous thing to do. --- --- * The STM implementation will often run transactions multiple --- times, so you need to be prepared for this if your IO has any --- side effects. --- --- * The STM implementation will abort transactions that are known to --- be invalid and need to be restarted. This may happen in the middle --- of `unsafeIOToSTM`, so make sure you don't acquire any resources --- that need releasing (exception handlers are ignored when aborting --- the transaction). That includes doing any IO using Handles, for --- example. Getting this wrong will probably lead to random deadlocks. --- --- * The transaction may have seen an inconsistent view of memory when --- the IO runs. Invariants that you expect to be true throughout --- your program may not be true inside a transaction, due to the --- way transactions are implemented. Normally this wouldn't be visible --- to the programmer, but using `unsafeIOToSTM` can expose it. --- -unsafeIOToSTM :: IO a -> STM a -unsafeIOToSTM (IO m) = STM m - --- |Perform a series of STM actions atomically. --- --- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. --- Any attempt to do so will result in a runtime error. (Reason: allowing --- this would effectively allow a transaction inside a transaction, depending --- on exactly when the thunk is evaluated.) --- --- However, see 'newTVarIO', which can be called inside 'unsafePerformIO', --- and which allows top-level TVars to be allocated. - -atomically :: STM a -> IO a -atomically (STM m) = IO (\s -> (atomically# m) s ) - --- |Retry execution of the current memory transaction because it has seen --- values in TVars which mean that it should not continue (e.g. the TVars --- represent a shared buffer that is now empty). The implementation may --- block the thread until one of the TVars that it has read from has been --- udpated. (GHC only) -retry :: STM a -retry = STM $ \s# -> retry# s# - --- |Compose two alternative STM actions (GHC only). If the first action --- completes without retrying then it forms the result of the orElse. --- Otherwise, if the first action retries, then the second action is --- tried in its place. If both actions retry then the orElse as a --- whole retries. -orElse :: STM a -> STM a -> STM a -orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s - --- |Exception handling within STM actions. -catchSTM :: STM a -> (SomeException -> STM a) -> STM a -catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s - --- | Low-level primitive on which always and alwaysSucceeds are built. --- checkInv differs form these in that (i) the invariant is not --- checked when checkInv is called, only at the end of this and --- subsequent transcations, (ii) the invariant failure is indicated --- by raising an exception. -checkInv :: STM a -> STM () -checkInv (STM m) = STM (\s -> (check# m) s) - --- | alwaysSucceeds adds a new invariant that must be true when passed --- to alwaysSucceeds, at the end of the current transaction, and at --- the end of every subsequent transaction. If it fails at any --- of those points then the transaction violating it is aborted --- and the exception raised by the invariant is propagated. -alwaysSucceeds :: STM a -> STM () -alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) - checkInv i - --- | always is a variant of alwaysSucceeds in which the invariant is --- expressed as an STM Bool action that must return True. Returning --- False or raising an exception are both treated as invariant failures. -always :: STM Bool -> STM () -always i = alwaysSucceeds ( do v <- i - if (v) then return () else ( error "Transacional invariant violation" ) ) - --- |Shared memory locations that support atomic memory transactions. -data TVar a = TVar (TVar# RealWorld a) - -INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar") - -instance Eq (TVar a) where - (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2# - --- |Create a new TVar holding a value supplied -newTVar :: a -> STM (TVar a) -newTVar val = STM $ \s1# -> - case newTVar# val s1# of - (# s2#, tvar# #) -> (# s2#, TVar tvar# #) - --- |@IO@ version of 'newTVar'. This is useful for creating top-level --- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using --- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't --- possible. -newTVarIO :: a -> IO (TVar a) -newTVarIO val = IO $ \s1# -> - case newTVar# val s1# of - (# s2#, tvar# #) -> (# s2#, TVar tvar# #) - --- |Return the current value stored in a TVar. --- This is equivalent to --- --- > readTVarIO = atomically . readTVar --- --- but works much faster, because it doesn't perform a complete --- transaction, it just reads the current value of the 'TVar'. -readTVarIO :: TVar a -> IO a -readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s# - --- |Return the current value stored in a TVar -readTVar :: TVar a -> STM a -readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# - --- |Write the supplied value into a TVar -writeTVar :: TVar a -> a -> STM () -writeTVar (TVar tvar#) val = STM $ \s1# -> - case writeTVar# tvar# val s1# of - s2# -> (# s2#, () #) - -\end{code} - -MVar utilities - -\begin{code} -withMVar :: MVar a -> (a -> IO b) -> IO b -withMVar m io = - mask $ \restore -> do - a <- takeMVar m - b <- catchAny (restore (io a)) - (\e -> do putMVar m a; throw e) - putMVar m a - return b - -modifyMVar_ :: MVar a -> (a -> IO a) -> IO () -modifyMVar_ m io = - mask $ \restore -> do - a <- takeMVar m - a' <- catchAny (restore (io a)) - (\e -> do putMVar m a; throw e) - putMVar m a' - return () -\end{code} - -%************************************************************************ -%* * -\subsection{Thread waiting} -%* * -%************************************************************************ - -\begin{code} -#ifdef mingw32_HOST_OS - --- Note: threadWaitRead and threadWaitWrite aren't really functional --- on Win32, but left in there because lib code (still) uses them (the manner --- in which they're used doesn't cause problems on a Win32 platform though.) - -asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) -asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = - IO $ \s -> case asyncRead# fd isSock len buf s of - (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) - -asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) -asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = - IO $ \s -> case asyncWrite# fd isSock len buf s of - (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) - -asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int -asyncDoProc (FunPtr proc) (Ptr param) = - -- the 'length' value is ignored; simplifies implementation of - -- the async*# primops to have them all return the same result. - IO $ \s -> case asyncDoProc# proc param s of - (# s', _len#, err# #) -> (# s', I# err# #) - --- to aid the use of these primops by the IO Handle implementation, --- provide the following convenience funs: - --- this better be a pinned byte array! -asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) -asyncReadBA fd isSock len off bufB = - asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) - -asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) -asyncWriteBA fd isSock len off bufB = - asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) - +import GHC.Conc.Signal #endif --- ----------------------------------------------------------------------------- --- Thread IO API - --- | Block the current thread until data is available to read on the --- given file descriptor (GHC only). -threadWaitRead :: Fd -> IO () -threadWaitRead fd -#ifndef mingw32_HOST_OS - | threaded = waitForReadEvent fd -#endif - | otherwise = IO $ \s -> - case fromIntegral fd of { I# fd# -> - case waitRead# fd# s of { s' -> (# s', () #) - }} - --- | Block the current thread until data can be written to the --- given file descriptor (GHC only). -threadWaitWrite :: Fd -> IO () -threadWaitWrite fd -#ifndef mingw32_HOST_OS - | threaded = waitForWriteEvent fd -#endif - | otherwise = IO $ \s -> - case fromIntegral fd of { I# fd# -> - case waitWrite# fd# s of { s' -> (# s', () #) - }} - --- | Suspends the current thread for a given number of microseconds --- (GHC only). --- --- There is no guarantee that the thread will be rescheduled promptly --- when the delay has expired, but the thread will never continue to --- run /earlier/ than specified. --- -threadDelay :: Int -> IO () -threadDelay time - | threaded = waitForDelayEvent time - | otherwise = IO $ \s -> - case fromIntegral time of { I# time# -> - case delay# time# s of { s' -> (# s', () #) - }} - - --- | Set the value of returned TVar to True after a given number of --- microseconds. The caveats associated with threadDelay also apply. --- -registerDelay :: Int -> IO (TVar Bool) -registerDelay usecs - | threaded = waitForDelayEventSTM usecs - | otherwise = error "registerDelay: requires -threaded" - -foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool - -waitForDelayEvent :: Int -> IO () -waitForDelayEvent usecs = do - m <- newEmptyMVar - target <- calculateTarget usecs - atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) - prodServiceThread - takeMVar m - --- Delays for use in STM -waitForDelayEventSTM :: Int -> IO (TVar Bool) -waitForDelayEventSTM usecs = do - t <- atomically $ newTVar False - target <- calculateTarget usecs - atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ())) - prodServiceThread - return t - -calculateTarget :: Int -> IO USecs -calculateTarget usecs = do - now <- getUSecOfDay - return $ now + (fromIntegral usecs) - - --- ---------------------------------------------------------------------------- --- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay - --- In the threaded RTS, we employ a single IO Manager thread to wait --- for all outstanding IO requests (threadWaitRead,threadWaitWrite) --- and delays (threadDelay). --- --- We can do this because in the threaded RTS the IO Manager can make --- a non-blocking call to select(), so we don't have to do select() in --- the scheduler as we have to in the non-threaded RTS. We get performance --- benefits from doing it this way, because we only have to restart the select() --- when a new request arrives, rather than doing one select() each time --- around the scheduler loop. Furthermore, the scheduler can be simplified --- by not having to check for completed IO requests. - -#ifndef mingw32_HOST_OS -data IOReq - = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) - | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) -#endif - -data DelayReq - = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ()) - | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) - -#ifndef mingw32_HOST_OS -{-# NOINLINE pendingEvents #-} -pendingEvents :: IORef [IOReq] -pendingEvents = unsafePerformIO $ do - m <- newIORef [] - sharedCAF m getOrSetGHCConcPendingEventsStore - -foreign import ccall unsafe "getOrSetGHCConcPendingEventsStore" - getOrSetGHCConcPendingEventsStore :: Ptr a -> IO (Ptr a) -#endif - -{-# NOINLINE pendingDelays #-} -pendingDelays :: IORef [DelayReq] -pendingDelays = unsafePerformIO $ do - m <- newIORef [] - sharedCAF m getOrSetGHCConcPendingDelaysStore - -foreign import ccall unsafe "getOrSetGHCConcPendingDelaysStore" - getOrSetGHCConcPendingDelaysStore :: Ptr a -> IO (Ptr a) - -{-# NOINLINE ioManagerThread #-} -ioManagerThread :: MVar (Maybe ThreadId) -ioManagerThread = unsafePerformIO $ do - m <- newMVar Nothing - sharedCAF m getOrSetGHCConcIOManagerThreadStore - -foreign import ccall unsafe "getOrSetGHCConcIOManagerThreadStore" - getOrSetGHCConcIOManagerThreadStore :: Ptr a -> IO (Ptr a) - -ensureIOManagerIsRunning :: IO () -ensureIOManagerIsRunning - | threaded = startIOManagerThread - | otherwise = return () - -startIOManagerThread :: IO () -startIOManagerThread = do - modifyMVar_ ioManagerThread $ \old -> do - let create = do t <- forkIO ioManager; return (Just t) - case old of - Nothing -> create - Just t -> do - s <- threadStatus t - case s of - ThreadFinished -> create - ThreadDied -> create - _other -> return (Just t) - -insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] -insertDelay d [] = [d] -insertDelay d1 ds@(d2 : rest) - | delayTime d1 <= delayTime d2 = d1 : ds - | otherwise = d2 : insertDelay d1 rest - -delayTime :: DelayReq -> USecs -delayTime (Delay t _) = t -delayTime (DelaySTM t _) = t - -type USecs = Word64 - -foreign import ccall unsafe "getUSecOfDay" - getUSecOfDay :: IO USecs - -{-# NOINLINE prodding #-} -prodding :: IORef Bool -prodding = unsafePerformIO $ do - r <- newIORef False - sharedCAF r getOrSetGHCConcProddingStore - -foreign import ccall unsafe "getOrSetGHCConcProddingStore" - getOrSetGHCConcProddingStore :: Ptr a -> IO (Ptr a) - -prodServiceThread :: IO () -prodServiceThread = do - -- NB. use atomicModifyIORef here, otherwise there are race - -- conditions in which prodding is left at True but the server is - -- blocked in select(). - was_set <- atomicModifyIORef prodding $ \b -> (True,b) - unless was_set wakeupIOManager - --- Machinery needed to ensure that we only have one copy of certain --- CAFs in this module even when the base package is present twice, as --- it is when base is dynamically loaded into GHCi. The RTS keeps --- track of the single true value of the CAF, so even when the CAFs in --- the dynamically-loaded base package are reverted, nothing bad --- happens. --- -sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a -sharedCAF a get_or_set = - mask_ $ do - stable_ref <- newStablePtr a - let ref = castPtr (castStablePtrToPtr stable_ref) - ref2 <- get_or_set ref - if ref==ref2 - then return a - else do freeStablePtr stable_ref - deRefStablePtr (castPtrToStablePtr (castPtr ref2)) - -#ifdef mingw32_HOST_OS --- ---------------------------------------------------------------------------- --- Windows IO manager thread - -ioManager :: IO () -ioManager = do - wakeup <- c_getIOManagerEvent - service_loop wakeup [] - -service_loop :: HANDLE -- read end of pipe - -> [DelayReq] -- current delay requests - -> IO () - -service_loop wakeup old_delays = do - -- pick up new delay requests - new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) - let delays = foldr insertDelay old_delays new_delays - - now <- getUSecOfDay - (delays', timeout) <- getDelay now delays - - r <- c_WaitForSingleObject wakeup timeout - case r of - 0xffffffff -> do c_maperrno; throwErrno "service_loop" - 0 -> do - r2 <- c_readIOManagerEvent - exit <- - case r2 of - _ | r2 == io_MANAGER_WAKEUP -> return False - _ | r2 == io_MANAGER_DIE -> return True - 0 -> return False -- spurious wakeup - _ -> do start_console_handler (r2 `shiftR` 1); return False - unless exit $ service_cont wakeup delays' - - _other -> service_cont wakeup delays' -- probably timeout - -service_cont :: HANDLE -> [DelayReq] -> IO () -service_cont wakeup delays = do - r <- atomicModifyIORef prodding (\_ -> (False,False)) - r `seq` return () -- avoid space leak - service_loop wakeup delays - --- must agree with rts/win32/ThrIOManager.c -io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 -io_MANAGER_WAKEUP = 0xffffffff -io_MANAGER_DIE = 0xfffffffe - -data ConsoleEvent - = ControlC - | Break - | Close - -- these are sent to Services only. - | Logoff - | Shutdown - deriving (Eq, Ord, Enum, Show, Read, Typeable) - -start_console_handler :: Word32 -> IO () -start_console_handler r = - case toWin32ConsoleEvent r of - Just x -> withMVar win32ConsoleHandler $ \handler -> do - _ <- forkIO (handler x) - return () - Nothing -> return () - -toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent -toWin32ConsoleEvent ev = - case ev of - 0 {- CTRL_C_EVENT-} -> Just ControlC - 1 {- CTRL_BREAK_EVENT-} -> Just Break - 2 {- CTRL_CLOSE_EVENT-} -> Just Close - 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff - 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown - _ -> Nothing - -win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) -win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) - -wakeupIOManager :: IO () -wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP - --- Walk the queue of pending delays, waking up any that have passed --- and return the smallest delay to wait for. The queue of pending --- delays is kept ordered. -getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) -getDelay _ [] = return ([], iNFINITE) -getDelay now all@(d : rest) - = case d of - Delay time m | now >= time -> do - putMVar m () - getDelay now rest - DelaySTM time t | now >= time -> do - atomically $ writeTVar t True - getDelay now rest - _otherwise -> - -- delay is in millisecs for WaitForSingleObject - let micro_seconds = delayTime d - now - milli_seconds = (micro_seconds + 999) `div` 1000 - in return (all, fromIntegral milli_seconds) - --- ToDo: this just duplicates part of System.Win32.Types, which isn't --- available yet. We should move some Win32 functionality down here, --- maybe as part of the grand reorganisation of the base package... -type HANDLE = Ptr () -type DWORD = Word32 - -iNFINITE :: DWORD -iNFINITE = 0xFFFFFFFF -- urgh - -foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) - c_getIOManagerEvent :: IO HANDLE - -foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c) - c_readIOManagerEvent :: IO Word32 - -foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c) - c_sendIOManagerEvent :: Word32 -> IO () - -foreign import ccall unsafe "maperrno" -- in Win32Utils.c - c_maperrno :: IO () - -foreign import stdcall "WaitForSingleObject" - c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD - -#else --- ---------------------------------------------------------------------------- --- Unix IO manager thread, using select() - -ioManager :: IO () -ioManager = do - allocaArray 2 $ \fds -> do - throwErrnoIfMinus1_ "startIOManagerThread" (c_pipe fds) - rd_end <- peekElemOff fds 0 - wr_end <- peekElemOff fds 1 - setNonBlockingFD wr_end True -- writes happen in a signal handler, we - -- don't want them to block. - setCloseOnExec rd_end - setCloseOnExec wr_end - c_setIOManagerPipe wr_end - allocaBytes sizeofFdSet $ \readfds -> do - allocaBytes sizeofFdSet $ \writefds -> do - allocaBytes sizeofTimeVal $ \timeval -> do - service_loop (fromIntegral rd_end) readfds writefds timeval [] [] - return () - -service_loop - :: Fd -- listen to this for wakeup calls - -> Ptr CFdSet - -> Ptr CFdSet - -> Ptr CTimeVal - -> [IOReq] - -> [DelayReq] - -> IO () -service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do - - -- reset prodding before we look at the new requests. If a new - -- client arrives after this point they will send a wakup which will - -- cause the server to loop around again, so we can be sure to not - -- miss any requests. - -- - -- NB. it's important to do this in the *first* iteration of - -- service_loop, rather than after calling select(), since a client - -- may have set prodding to True without sending a wakeup byte down - -- the pipe, because the pipe wasn't set up. - atomicModifyIORef prodding (\_ -> (False, ())) - - -- pick up new IO requests - new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) - let reqs = new_reqs ++ old_reqs - - -- pick up new delay requests - new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) - let delays0 = foldr insertDelay old_delays new_delays - - -- build the FDSets for select() - fdZero readfds - fdZero writefds - fdSet wakeup readfds - maxfd <- buildFdSets 0 readfds writefds reqs - - -- perform the select() - let do_select delays = do - -- check the current time and wake up any thread in - -- threadDelay whose timeout has expired. Also find the - -- timeout value for the select() call. - now <- getUSecOfDay - (delays', timeout) <- getDelay now ptimeval delays - - res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds - nullPtr timeout - if (res == -1) - then do - err <- getErrno - case err of - _ | err == eINTR -> do_select delays' - -- EINTR: just redo the select() - _ | err == eBADF -> return (True, delays) - -- EBADF: one of the file descriptors is closed or bad, - -- we don't know which one, so wake everyone up. - _ | otherwise -> throwErrno "select" - -- otherwise (ENOMEM or EINVAL) something has gone - -- wrong; report the error. - else - return (False,delays') - - (wakeup_all,delays') <- do_select delays0 - - exit <- - if wakeup_all then return False - else do - b <- fdIsSet wakeup readfds - if b == 0 - then return False - else alloca $ \p -> do - warnErrnoIfMinus1_ "service_loop" $ - c_read (fromIntegral wakeup) p 1 - s <- peek p - case s of - _ | s == io_MANAGER_WAKEUP -> return False - _ | s == io_MANAGER_DIE -> return True - _ | s == io_MANAGER_SYNC -> do - mvars <- readIORef sync - mapM_ (flip putMVar ()) mvars - return False - _ -> do - fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) - withForeignPtr fp $ \p_siginfo -> do - r <- c_read (fromIntegral wakeup) (castPtr p_siginfo) - sizeof_siginfo_t - when (r /= fromIntegral sizeof_siginfo_t) $ - error "failed to read siginfo_t" - runHandlers' fp (fromIntegral s) - return False - - unless exit $ do - - reqs' <- if wakeup_all then do wakeupAll reqs; return [] - else completeRequests reqs readfds writefds [] - - service_loop wakeup readfds writefds ptimeval reqs' delays' - -io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: Word8 -io_MANAGER_WAKEUP = 0xff -io_MANAGER_DIE = 0xfe -io_MANAGER_SYNC = 0xfd - -{-# NOINLINE sync #-} -sync :: IORef [MVar ()] -sync = unsafePerformIO (newIORef []) - --- waits for the IO manager to drain the pipe -syncIOManager :: IO () -syncIOManager = do - m <- newEmptyMVar - atomicModifyIORef sync (\old -> (m:old,())) - c_ioManagerSync - takeMVar m - -foreign import ccall unsafe "ioManagerSync" c_ioManagerSync :: IO () -foreign import ccall unsafe "ioManagerWakeup" wakeupIOManager :: IO () - --- For the non-threaded RTS -runHandlers :: Ptr Word8 -> Int -> IO () -runHandlers p_info sig = do - fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) - withForeignPtr fp $ \p -> do - copyBytes p p_info (fromIntegral sizeof_siginfo_t) - free p_info - runHandlers' fp (fromIntegral sig) - -runHandlers' :: ForeignPtr Word8 -> Signal -> IO () -runHandlers' p_info sig = do - let int = fromIntegral sig - withMVar signal_handlers $ \arr -> - if not (inRange (boundsIOArray arr) int) - then return () - else do handler <- unsafeReadIOArray arr int - case handler of - Nothing -> return () - Just (f,_) -> do _ <- forkIO (f p_info) - return () - -warnErrnoIfMinus1_ :: Num a => String -> IO a -> IO () -warnErrnoIfMinus1_ what io - = do r <- io - when (r == -1) $ do - errno <- getErrno - str <- strerror errno >>= peekCString - when (r == -1) $ - debugErrLn ("Warning: " ++ what ++ " failed: " ++ str) - -foreign import ccall unsafe "string.h" strerror :: Errno -> IO (Ptr CChar) - -foreign import ccall "setIOManagerPipe" - c_setIOManagerPipe :: CInt -> IO () - -foreign import ccall "__hscore_sizeof_siginfo_t" - sizeof_siginfo_t :: CSize - -type Signal = CInt - -maxSig = 64 :: Int - -type HandlerFun = ForeignPtr Word8 -> IO () - --- Lock used to protect concurrent access to signal_handlers. Symptom of --- this race condition is #1922, although that bug was on Windows a similar --- bug also exists on Unix. -{-# NOINLINE signal_handlers #-} -signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) -signal_handlers = unsafePerformIO $ do - arr <- newIOArray (0,maxSig) Nothing - m <- newMVar arr - sharedCAF m getOrSetGHCConcSignalHandlerStore - -foreign import ccall unsafe "getOrSetGHCConcSignalHandlerStore" - getOrSetGHCConcSignalHandlerStore :: Ptr a -> IO (Ptr a) - -setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic)) -setHandler sig handler = do - let int = fromIntegral sig - withMVar signal_handlers $ \arr -> - if not (inRange (boundsIOArray arr) int) - then error "GHC.Conc.setHandler: signal out of range" - else do old <- unsafeReadIOArray arr int - unsafeWriteIOArray arr int handler - return old - --- ----------------------------------------------------------------------------- --- IO requests - -buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd -buildFdSets maxfd _ _ [] = return maxfd -buildFdSets maxfd readfds writefds (Read fd _ : reqs) - | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" - | otherwise = do - fdSet fd readfds - buildFdSets (max maxfd fd) readfds writefds reqs -buildFdSets maxfd readfds writefds (Write fd _ : reqs) - | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" - | otherwise = do - fdSet fd writefds - buildFdSets (max maxfd fd) readfds writefds reqs - -completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] - -> IO [IOReq] -completeRequests [] _ _ reqs' = return reqs' -completeRequests (Read fd m : reqs) readfds writefds reqs' = do - b <- fdIsSet fd readfds - if b /= 0 - then do putMVar m (); completeRequests reqs readfds writefds reqs' - else completeRequests reqs readfds writefds (Read fd m : reqs') -completeRequests (Write fd m : reqs) readfds writefds reqs' = do - b <- fdIsSet fd writefds - if b /= 0 - then do putMVar m (); completeRequests reqs readfds writefds reqs' - else completeRequests reqs readfds writefds (Write fd m : reqs') - -wakeupAll :: [IOReq] -> IO () -wakeupAll [] = return () -wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs -wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs - -waitForReadEvent :: Fd -> IO () -waitForReadEvent fd = do - m <- newEmptyMVar - atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ())) - prodServiceThread - takeMVar m - -waitForWriteEvent :: Fd -> IO () -waitForWriteEvent fd = do - m <- newEmptyMVar - atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ())) - prodServiceThread - takeMVar m - --- ----------------------------------------------------------------------------- --- Delays - --- Walk the queue of pending delays, waking up any that have passed --- and return the smallest delay to wait for. The queue of pending --- delays is kept ordered. -getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) -getDelay _ _ [] = return ([],nullPtr) -getDelay now ptimeval all@(d : rest) - = case d of - Delay time m | now >= time -> do - putMVar m () - getDelay now ptimeval rest - DelaySTM time t | now >= time -> do - atomically $ writeTVar t True - getDelay now ptimeval rest - _otherwise -> do - setTimevalTicks ptimeval (delayTime d - now) - return (all,ptimeval) - -data CTimeVal - -foreign import ccall unsafe "sizeofTimeVal" - sizeofTimeVal :: Int - -foreign import ccall unsafe "setTimevalTicks" - setTimevalTicks :: Ptr CTimeVal -> USecs -> IO () - -{- - On Win32 we're going to have a single Pipe, and a - waitForSingleObject with the delay time. For signals, we send a - byte down the pipe just like on Unix. --} - --- ---------------------------------------------------------------------------- --- select() interface - --- ToDo: move to System.Posix.Internals? - -data CFdSet - -foreign import ccall safe "__hscore_select" - c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal - -> IO CInt - -foreign import ccall unsafe "hsFD_SETSIZE" - c_fD_SETSIZE :: CInt - -fD_SETSIZE :: Fd -fD_SETSIZE = fromIntegral c_fD_SETSIZE - -foreign import ccall unsafe "hsFD_ISSET" - c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt - -fdIsSet :: Fd -> Ptr CFdSet -> IO CInt -fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset - -foreign import ccall unsafe "hsFD_SET" - c_fdSet :: CInt -> Ptr CFdSet -> IO () - -fdSet :: Fd -> Ptr CFdSet -> IO () -fdSet (Fd fd) fdset = c_fdSet fd fdset - -foreign import ccall unsafe "hsFD_ZERO" - fdZero :: Ptr CFdSet -> IO () - -foreign import ccall unsafe "sizeof_fd_set" - sizeofFdSet :: Int - -#endif - -reportStackOverflow :: IO () -reportStackOverflow = callStackOverflowHook - -reportError :: SomeException -> IO () -reportError ex = do - handler <- getUncaughtExceptionHandler - handler ex - --- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove --- the unsafe below. -foreign import ccall unsafe "stackOverflow" - callStackOverflowHook :: IO () - -{-# NOINLINE uncaughtExceptionHandler #-} -uncaughtExceptionHandler :: IORef (SomeException -> IO ()) -uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler) - where - defaultHandler :: SomeException -> IO () - defaultHandler se@(SomeException ex) = do - (hFlush stdout) `catchAny` (\ _ -> return ()) - let msg = case cast ex of - Just Deadlock -> "no threads to run: infinite loop or deadlock?" - _ -> case cast ex of - Just (ErrorCall s) -> s - _ -> showsPrec 0 se "" - withCString "%s" $ \cfmt -> - withCString msg $ \cmsg -> - errorBelch cfmt cmsg - --- don't use errorBelch() directly, because we cannot call varargs functions --- using the FFI. -foreign import ccall unsafe "HsBase.h errorBelch2" - errorBelch :: CString -> CString -> IO () - -setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO () -setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler - -getUncaughtExceptionHandler :: IO (SomeException -> IO ()) -getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler - \end{code} diff --git a/GHC/Conc/IO.hs b/GHC/Conc/IO.hs new file mode 100644 index 0000000..ee0013c --- /dev/null +++ b/GHC/Conc/IO.hs @@ -0,0 +1,127 @@ +{-# OPTIONS_GHC -XNoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} +{-# OPTIONS_HADDOCK not-home #-} +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Conc.IO +-- Copyright : (c) The University of Glasgow, 1994-2002 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Basic concurrency stuff. +-- +----------------------------------------------------------------------------- + +-- No: #hide, because bits of this module are exposed by the stm package. +-- However, we don't want this module to be the home location for the +-- bits it exports, we'd rather have Control.Concurrent and the other +-- higher level modules be the home. Hence: + +#include "Typeable.h" + +-- #not-home +module GHC.Conc.IO + ( ensureIOManagerIsRunning + + -- * Waiting + , threadDelay -- :: Int -> IO () + , registerDelay -- :: Int -> IO (TVar Bool) + , threadWaitRead -- :: Int -> IO () + , threadWaitWrite -- :: Int -> IO () + +#ifdef mingw32_HOST_OS + , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) + , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) + , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int + + , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) + , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) + + , ConsoleEvent(..) + , win32ConsoleHandler + , toWin32ConsoleEvent +#endif + ) where + +import Control.Monad +import Foreign +import GHC.Base +import GHC.Conc.Sync as Sync +import GHC.Real ( fromIntegral ) +import System.Posix.Types + +#ifdef mingw32_HOST_OS +import qualified GHC.Conc.Windows as Windows +import GHC.Conc.Windows (asyncRead, asyncWrite, asyncDoProc, asyncReadBA, + asyncWriteBA, ConsoleEvent(..), win32ConsoleHandler, + toWin32ConsoleEvent) +#else +import qualified System.Event.Thread as Event +#endif + +ensureIOManagerIsRunning :: IO () +#ifndef mingw32_HOST_OS +ensureIOManagerIsRunning = Event.ensureIOManagerIsRunning +#else +ensureIOManagerIsRunning = Windows.ensureIOManagerIsRunning +#endif + +-- | Block the current thread until data is available to read on the +-- given file descriptor (GHC only). +threadWaitRead :: Fd -> IO () +threadWaitRead fd +#ifndef mingw32_HOST_OS + | threaded = Event.threadWaitRead fd +#endif + | otherwise = IO $ \s -> + case fromIntegral fd of { I# fd# -> + case waitRead# fd# s of { s' -> (# s', () #) + }} + +-- | Block the current thread until data can be written to the +-- given file descriptor (GHC only). +threadWaitWrite :: Fd -> IO () +threadWaitWrite fd +#ifndef mingw32_HOST_OS + | threaded = Event.threadWaitWrite fd +#endif + | otherwise = IO $ \s -> + case fromIntegral fd of { I# fd# -> + case waitWrite# fd# s of { s' -> (# s', () #) + }} + +-- | Suspends the current thread for a given number of microseconds +-- (GHC only). +-- +-- There is no guarantee that the thread will be rescheduled promptly +-- when the delay has expired, but the thread will never continue to +-- run /earlier/ than specified. +-- +threadDelay :: Int -> IO () +threadDelay time +#ifdef mingw32_HOST_OS + | threaded = Windows.threadDelay time +#else + | threaded = Event.threadDelay time +#endif + | otherwise = IO $ \s -> + case fromIntegral time of { I# time# -> + case delay# time# s of { s' -> (# s', () #) + }} + +-- | Set the value of returned TVar to True after a given number of +-- microseconds. The caveats associated with threadDelay also apply. +-- +registerDelay :: Int -> IO (TVar Bool) +registerDelay usecs +#ifdef mingw32_HOST_OS + | threaded = Windows.registerDelay usecs +#else + | threaded = Event.registerDelay usecs +#endif + | otherwise = error "registerDelay: requires -threaded" + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool diff --git a/GHC/Conc/Signal.hs b/GHC/Conc/Signal.hs new file mode 100644 index 0000000..8871b34 --- /dev/null +++ b/GHC/Conc/Signal.hs @@ -0,0 +1,91 @@ +{-# LANGUAGE NoImplicitPrelude #-} + +module GHC.Conc.Signal + ( Signal + , HandlerFun + , setHandler + , runHandlers + ) where + +import Control.Concurrent.MVar (MVar, newMVar, withMVar) +import Data.Dynamic (Dynamic) +import Data.Maybe (Maybe(..)) +import Foreign.C.Types (CInt) +import Foreign.ForeignPtr (ForeignPtr) +import Foreign.StablePtr (castPtrToStablePtr, castStablePtrToPtr, + deRefStablePtr, freeStablePtr, newStablePtr) +import Foreign.Ptr (Ptr, castPtr) +import GHC.Arr (inRange) +import GHC.Base +import GHC.Conc.Sync (forkIO) +import GHC.IO (mask_, unsafePerformIO) +import GHC.IOArray (IOArray, boundsIOArray, newIOArray, unsafeReadIOArray, + unsafeWriteIOArray) +import GHC.Num (fromInteger) +import GHC.Real (fromIntegral) +import GHC.Word (Word8) + +------------------------------------------------------------------------ +-- Signal handling + +type Signal = CInt + +maxSig :: Int +maxSig = 64 + +type HandlerFun = ForeignPtr Word8 -> IO () + +-- Lock used to protect concurrent access to signal_handlers. Symptom +-- of this race condition is GHC bug #1922, although that bug was on +-- Windows a similar bug also exists on Unix. +signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic))) +signal_handlers = unsafePerformIO $ do + arr <- newIOArray (0, maxSig) Nothing + m <- newMVar arr + sharedCAF m getOrSetGHCConcSignalSignalHandlerStore +{-# NOINLINE signal_handlers #-} + +foreign import ccall unsafe "getOrSetGHCConcSignalSignalHandlerStore" + getOrSetGHCConcSignalSignalHandlerStore :: Ptr a -> IO (Ptr a) + +setHandler :: Signal -> Maybe (HandlerFun, Dynamic) + -> IO (Maybe (HandlerFun, Dynamic)) +setHandler sig handler = do + let int = fromIntegral sig + withMVar signal_handlers $ \arr -> + if not (inRange (boundsIOArray arr) int) + then error "GHC.Conc.setHandler: signal out of range" + else do old <- unsafeReadIOArray arr int + unsafeWriteIOArray arr int handler + return old + +runHandlers :: ForeignPtr Word8 -> Signal -> IO () +runHandlers p_info sig = do + let int = fromIntegral sig + withMVar signal_handlers $ \arr -> + if not (inRange (boundsIOArray arr) int) + then return () + else do handler <- unsafeReadIOArray arr int + case handler of + Nothing -> return () + Just (f,_) -> do _ <- forkIO (f p_info) + return () + +-- Machinery needed to ensure that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + mask_ $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref == ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) + diff --git a/GHC/Conc/Sync.lhs b/GHC/Conc/Sync.lhs new file mode 100644 index 0000000..24178a0 --- /dev/null +++ b/GHC/Conc/Sync.lhs @@ -0,0 +1,670 @@ +\begin{code} +{-# OPTIONS_GHC -XNoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} +{-# OPTIONS_HADDOCK not-home #-} +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Conc.Sync +-- Copyright : (c) The University of Glasgow, 1994-2002 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Basic concurrency stuff. +-- +----------------------------------------------------------------------------- + +-- No: #hide, because bits of this module are exposed by the stm package. +-- However, we don't want this module to be the home location for the +-- bits it exports, we'd rather have Control.Concurrent and the other +-- higher level modules be the home. Hence: + +#include "Typeable.h" + +-- #not-home +module GHC.Conc.Sync + ( ThreadId(..) + + -- * Forking and suchlike + , forkIO -- :: IO a -> IO ThreadId + , forkIOUnmasked + , forkOnIO -- :: Int -> IO a -> IO ThreadId + , forkOnIOUnmasked + , numCapabilities -- :: Int + , numSparks -- :: IO Int + , childHandler -- :: Exception -> IO () + , myThreadId -- :: IO ThreadId + , killThread -- :: ThreadId -> IO () + , throwTo -- :: ThreadId -> Exception -> IO () + , par -- :: a -> b -> b + , pseq -- :: a -> b -> b + , runSparks + , yield -- :: IO () + , labelThread -- :: ThreadId -> String -> IO () + + , ThreadStatus(..), BlockReason(..) + , threadStatus -- :: ThreadId -> IO ThreadStatus + + -- * TVars + , STM(..) + , atomically -- :: STM a -> IO a + , retry -- :: STM a + , orElse -- :: STM a -> STM a -> STM a + , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a + , alwaysSucceeds -- :: STM a -> STM () + , always -- :: STM Bool -> STM () + , TVar(..) + , newTVar -- :: a -> STM (TVar a) + , newTVarIO -- :: a -> STM (TVar a) + , readTVar -- :: TVar a -> STM a + , readTVarIO -- :: TVar a -> IO a + , writeTVar -- :: a -> TVar a -> STM () + , unsafeIOToSTM -- :: IO a -> STM a + + -- * Miscellaneous + , withMVar + , modifyMVar_ + + , setUncaughtExceptionHandler -- :: (Exception -> IO ()) -> IO () + , getUncaughtExceptionHandler -- :: IO (Exception -> IO ()) + + , reportError, reportStackOverflow + + , sharedCAF + ) where + +import Foreign +import Foreign.C + +#ifdef mingw32_HOST_OS +import Data.Typeable +#endif + +#ifndef mingw32_HOST_OS +import Data.Dynamic +#endif +import Control.Monad +import Data.Maybe + +import GHC.Base +import {-# SOURCE #-} GHC.IO.Handle ( hFlush ) +import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout ) +import GHC.IO +import GHC.IO.Exception +import GHC.Exception +import GHC.IORef +import GHC.MVar +import GHC.Num ( Num(..) ) +import GHC.Real ( fromIntegral ) +import GHC.Pack ( packCString# ) +import GHC.Show ( Show(..), showString ) + +infixr 0 `par`, `pseq` +\end{code} + +%************************************************************************ +%* * +\subsection{@ThreadId@, @par@, and @fork@} +%* * +%************************************************************************ + +\begin{code} +data ThreadId = ThreadId ThreadId# deriving( Typeable ) +-- ToDo: data ThreadId = ThreadId (Weak ThreadId#) +-- But since ThreadId# is unlifted, the Weak type must use open +-- type variables. +{- ^ +A 'ThreadId' is an abstract type representing a handle to a thread. +'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where +the 'Ord' instance implements an arbitrary total ordering over +'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued +'ThreadId' to string form; showing a 'ThreadId' value is occasionally +useful when debugging or diagnosing the behaviour of a concurrent +program. + +/Note/: in GHC, if you have a 'ThreadId', you essentially have +a pointer to the thread itself. This means the thread itself can\'t be +garbage collected until you drop the 'ThreadId'. +This misfeature will hopefully be corrected at a later date. + +/Note/: Hugs does not provide any operations on other threads; +it defines 'ThreadId' as a synonym for (). +-} + +instance Show ThreadId where + showsPrec d t = + showString "ThreadId " . + showsPrec d (getThreadId (id2TSO t)) + +foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt + +id2TSO :: ThreadId -> ThreadId# +id2TSO (ThreadId t) = t + +foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt +-- Returns -1, 0, 1 + +cmpThread :: ThreadId -> ThreadId -> Ordering +cmpThread t1 t2 = + case cmp_thread (id2TSO t1) (id2TSO t2) of + -1 -> LT + 0 -> EQ + _ -> GT -- must be 1 + +instance Eq ThreadId where + t1 == t2 = + case t1 `cmpThread` t2 of + EQ -> True + _ -> False + +instance Ord ThreadId where + compare = cmpThread + +{- | +Sparks off a new thread to run the 'IO' computation passed as the +first argument, and returns the 'ThreadId' of the newly created +thread. + +The new thread will be a lightweight thread; if you want to use a foreign +library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. + +GHC note: the new thread inherits the /masked/ state of the parent +(see 'Control.Exception.mask'). + +The newly created thread has an exception handler that discards the +exceptions 'BlockedIndefinitelyOnMVar', 'BlockedIndefinitelyOnSTM', and +'ThreadKilled', and passes all other exceptions to the uncaught +exception handler (see 'setUncaughtExceptionHandler'). +-} +forkIO :: IO () -> IO ThreadId +forkIO action = IO $ \ s -> + case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) + where + action_plus = catchException action childHandler + +-- | Like 'forkIO', but the child thread is created with asynchronous exceptions +-- unmasked (see 'Control.Exception.mask'). +forkIOUnmasked :: IO () -> IO ThreadId +forkIOUnmasked io = forkIO (unsafeUnmask io) + +{- | +Like 'forkIO', but lets you specify on which CPU the thread is +created. Unlike a `forkIO` thread, a thread created by `forkOnIO` +will stay on the same CPU for its entire lifetime (`forkIO` threads +can migrate between CPUs according to the scheduling policy). +`forkOnIO` is useful for overriding the scheduling policy when you +know in advance how best to distribute the threads. + +The `Int` argument specifies the CPU number; it is interpreted modulo +'numCapabilities' (note that it actually specifies a capability number +rather than a CPU number, but to a first approximation the two are +equivalent). +-} +forkOnIO :: Int -> IO () -> IO ThreadId +forkOnIO (I# cpu) action = IO $ \ s -> + case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) + where + action_plus = catchException action childHandler + +-- | Like 'forkOnIO', but the child thread is created with +-- asynchronous exceptions unmasked (see 'Control.Exception.mask'). +forkOnIOUnmasked :: Int -> IO () -> IO ThreadId +forkOnIOUnmasked cpu io = forkOnIO cpu (unsafeUnmask io) + +-- | the value passed to the @+RTS -N@ flag. This is the number of +-- Haskell threads that can run truly simultaneously at any given +-- time, and is typically set to the number of physical CPU cores on +-- the machine. +numCapabilities :: Int +numCapabilities = unsafePerformIO $ do + n <- peek n_capabilities + return (fromIntegral n) + +-- | Returns the number of sparks currently in the local spark pool +numSparks :: IO Int +numSparks = IO $ \s -> case numSparks# s of (# s', n #) -> (# s', I# n #) + +#if defined(mingw32_HOST_OS) && defined(__PIC__) +foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt +#else +foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt +#endif +childHandler :: SomeException -> IO () +childHandler err = catchException (real_handler err) childHandler + +real_handler :: SomeException -> IO () +real_handler se@(SomeException ex) = + -- ignore thread GC and killThread exceptions: + case cast ex of + Just BlockedIndefinitelyOnMVar -> return () + _ -> case cast ex of + Just BlockedIndefinitelyOnSTM -> return () + _ -> case cast ex of + Just ThreadKilled -> return () + _ -> case cast ex of + -- report all others: + Just StackOverflow -> reportStackOverflow + _ -> reportError se + +{- | 'killThread' raises the 'ThreadKilled' exception in the given +thread (GHC only). + +> killThread tid = throwTo tid ThreadKilled + +-} +killThread :: ThreadId -> IO () +killThread tid = throwTo tid ThreadKilled + +{- | 'throwTo' raises an arbitrary exception in the target thread (GHC only). + +'throwTo' does not return until the exception has been raised in the +target thread. +The calling thread can thus be certain that the target +thread has received the exception. This is a useful property to know +when dealing with race conditions: eg. if there are two threads that +can kill each other, it is guaranteed that only one of the threads +will get to kill the other. + +Whatever work the target thread was doing when the exception was +raised is not lost: the computation is suspended until required by +another thread. + +If the target thread is currently making a foreign call, then the +exception will not be raised (and hence 'throwTo' will not return) +until the call has completed. This is the case regardless of whether +the call is inside a 'mask' or not. + +Important note: the behaviour of 'throwTo' differs from that described in +the paper \"Asynchronous exceptions in Haskell\" +(). +In the paper, 'throwTo' is non-blocking; but the library implementation adopts +a more synchronous design in which 'throwTo' does not return until the exception +is received by the target thread. The trade-off is discussed in Section 9 of the paper. +Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of +the paper). Unlike other interruptible operations, however, 'throwTo' +is /always/ interruptible, even if it does not actually block. + +There is no guarantee that the exception will be delivered promptly, +although the runtime will endeavour to ensure that arbitrary +delays don't occur. In GHC, an exception can only be raised when a +thread reaches a /safe point/, where a safe point is where memory +allocation occurs. Some loops do not perform any memory allocation +inside the loop and therefore cannot be interrupted by a 'throwTo'. + +Blocked 'throwTo' is fair: if multiple threads are trying to throw an +exception to the same target thread, they will succeed in FIFO order. + + -} +throwTo :: Exception e => ThreadId -> e -> IO () +throwTo (ThreadId tid) ex = IO $ \ s -> + case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) + +-- | Returns the 'ThreadId' of the calling thread (GHC only). +myThreadId :: IO ThreadId +myThreadId = IO $ \s -> + case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #) + + +-- |The 'yield' action allows (forces, in a co-operative multitasking +-- implementation) a context-switch to any other currently runnable +-- threads (if any), and is occasionally useful when implementing +-- concurrency abstractions. +yield :: IO () +yield = IO $ \s -> + case (yield# s) of s1 -> (# s1, () #) + +{- | 'labelThread' stores a string as identifier for this thread if +you built a RTS with debugging support. This identifier will be used in +the debugging output to make distinction of different threads easier +(otherwise you only have the thread state object\'s address in the heap). + +Other applications like the graphical Concurrent Haskell Debugger +() may choose to overload +'labelThread' for their purposes as well. +-} + +labelThread :: ThreadId -> String -> IO () +labelThread (ThreadId t) str = IO $ \ s -> + let !ps = packCString# str + !adr = byteArrayContents# ps in + case (labelThread# t adr s) of s1 -> (# s1, () #) + +-- Nota Bene: 'pseq' used to be 'seq' +-- but 'seq' is now defined in PrelGHC +-- +-- "pseq" is defined a bit weirdly (see below) +-- +-- The reason for the strange "lazy" call is that +-- it fools the compiler into thinking that pseq and par are non-strict in +-- their second argument (even if it inlines pseq at the call site). +-- If it thinks pseq is strict in "y", then it often evaluates +-- "y" before "x", which is totally wrong. + +{-# INLINE pseq #-} +pseq :: a -> b -> b +pseq x y = x `seq` lazy y + +{-# INLINE par #-} +par :: a -> b -> b +par x y = case (par# x) of { _ -> lazy y } + +-- | Internal function used by the RTS to run sparks. +runSparks :: IO () +runSparks = IO loop + where loop s = case getSpark# s of + (# s', n, p #) -> + if n ==# 0# then (# s', () #) + else p `seq` loop s' + +data BlockReason + = BlockedOnMVar + -- ^blocked on on 'MVar' + | BlockedOnBlackHole + -- ^blocked on a computation in progress by another thread + | BlockedOnException + -- ^blocked in 'throwTo' + | BlockedOnSTM + -- ^blocked in 'retry' in an STM transaction + | BlockedOnForeignCall + -- ^currently in a foreign call + | BlockedOnOther + -- ^blocked on some other resource. Without @-threaded@, + -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@ + -- they show up as 'BlockedOnMVar'. + deriving (Eq,Ord,Show) + +-- | The current status of a thread +data ThreadStatus + = ThreadRunning + -- ^the thread is currently runnable or running + | ThreadFinished + -- ^the thread has finished + | ThreadBlocked BlockReason + -- ^the thread is blocked on some resource + | ThreadDied + -- ^the thread received an uncaught exception + deriving (Eq,Ord,Show) + +threadStatus :: ThreadId -> IO ThreadStatus +threadStatus (ThreadId t) = IO $ \s -> + case threadStatus# t s of + (# s', stat #) -> (# s', mk_stat (I# stat) #) + where + -- NB. keep these in sync with includes/Constants.h + mk_stat 0 = ThreadRunning + mk_stat 1 = ThreadBlocked BlockedOnMVar + mk_stat 2 = ThreadBlocked BlockedOnBlackHole + mk_stat 3 = ThreadBlocked BlockedOnException + mk_stat 7 = ThreadBlocked BlockedOnSTM + mk_stat 11 = ThreadBlocked BlockedOnForeignCall + mk_stat 12 = ThreadBlocked BlockedOnForeignCall + mk_stat 16 = ThreadFinished + mk_stat 17 = ThreadDied + mk_stat _ = ThreadBlocked BlockedOnOther +\end{code} + + +%************************************************************************ +%* * +\subsection[stm]{Transactional heap operations} +%* * +%************************************************************************ + +TVars are shared memory locations which support atomic memory +transactions. + +\begin{code} +-- |A monad supporting atomic memory transactions. +newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) + +unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #)) +unSTM (STM a) = a + +INSTANCE_TYPEABLE1(STM,stmTc,"STM") + +instance Functor STM where + fmap f x = x >>= (return . f) + +instance Monad STM where + {-# INLINE return #-} + {-# INLINE (>>) #-} + {-# INLINE (>>=) #-} + m >> k = thenSTM m k + return x = returnSTM x + m >>= k = bindSTM m k + +bindSTM :: STM a -> (a -> STM b) -> STM b +bindSTM (STM m) k = STM ( \s -> + case m s of + (# new_s, a #) -> unSTM (k a) new_s + ) + +thenSTM :: STM a -> STM b -> STM b +thenSTM (STM m) k = STM ( \s -> + case m s of + (# new_s, _ #) -> unSTM k new_s + ) + +returnSTM :: a -> STM a +returnSTM x = STM (\s -> (# s, x #)) + +instance MonadPlus STM where + mzero = retry + mplus = orElse + +-- | Unsafely performs IO in the STM monad. Beware: this is a highly +-- dangerous thing to do. +-- +-- * The STM implementation will often run transactions multiple +-- times, so you need to be prepared for this if your IO has any +-- side effects. +-- +-- * The STM implementation will abort transactions that are known to +-- be invalid and need to be restarted. This may happen in the middle +-- of `unsafeIOToSTM`, so make sure you don't acquire any resources +-- that need releasing (exception handlers are ignored when aborting +-- the transaction). That includes doing any IO using Handles, for +-- example. Getting this wrong will probably lead to random deadlocks. +-- +-- * The transaction may have seen an inconsistent view of memory when +-- the IO runs. Invariants that you expect to be true throughout +-- your program may not be true inside a transaction, due to the +-- way transactions are implemented. Normally this wouldn't be visible +-- to the programmer, but using `unsafeIOToSTM` can expose it. +-- +unsafeIOToSTM :: IO a -> STM a +unsafeIOToSTM (IO m) = STM m + +-- |Perform a series of STM actions atomically. +-- +-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. +-- Any attempt to do so will result in a runtime error. (Reason: allowing +-- this would effectively allow a transaction inside a transaction, depending +-- on exactly when the thunk is evaluated.) +-- +-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO', +-- and which allows top-level TVars to be allocated. + +atomically :: STM a -> IO a +atomically (STM m) = IO (\s -> (atomically# m) s ) + +-- |Retry execution of the current memory transaction because it has seen +-- values in TVars which mean that it should not continue (e.g. the TVars +-- represent a shared buffer that is now empty). The implementation may +-- block the thread until one of the TVars that it has read from has been +-- udpated. (GHC only) +retry :: STM a +retry = STM $ \s# -> retry# s# + +-- |Compose two alternative STM actions (GHC only). If the first action +-- completes without retrying then it forms the result of the orElse. +-- Otherwise, if the first action retries, then the second action is +-- tried in its place. If both actions retry then the orElse as a +-- whole retries. +orElse :: STM a -> STM a -> STM a +orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s + +-- |Exception handling within STM actions. +catchSTM :: STM a -> (SomeException -> STM a) -> STM a +catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s + +-- | Low-level primitive on which always and alwaysSucceeds are built. +-- checkInv differs form these in that (i) the invariant is not +-- checked when checkInv is called, only at the end of this and +-- subsequent transcations, (ii) the invariant failure is indicated +-- by raising an exception. +checkInv :: STM a -> STM () +checkInv (STM m) = STM (\s -> (check# m) s) + +-- | alwaysSucceeds adds a new invariant that must be true when passed +-- to alwaysSucceeds, at the end of the current transaction, and at +-- the end of every subsequent transaction. If it fails at any +-- of those points then the transaction violating it is aborted +-- and the exception raised by the invariant is propagated. +alwaysSucceeds :: STM a -> STM () +alwaysSucceeds i = do ( i >> retry ) `orElse` ( return () ) + checkInv i + +-- | always is a variant of alwaysSucceeds in which the invariant is +-- expressed as an STM Bool action that must return True. Returning +-- False or raising an exception are both treated as invariant failures. +always :: STM Bool -> STM () +always i = alwaysSucceeds ( do v <- i + if (v) then return () else ( error "Transacional invariant violation" ) ) + +-- |Shared memory locations that support atomic memory transactions. +data TVar a = TVar (TVar# RealWorld a) + +INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar") + +instance Eq (TVar a) where + (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2# + +-- |Create a new TVar holding a value supplied +newTVar :: a -> STM (TVar a) +newTVar val = STM $ \s1# -> + case newTVar# val s1# of + (# s2#, tvar# #) -> (# s2#, TVar tvar# #) + +-- |@IO@ version of 'newTVar'. This is useful for creating top-level +-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTVarIO :: a -> IO (TVar a) +newTVarIO val = IO $ \s1# -> + case newTVar# val s1# of + (# s2#, tvar# #) -> (# s2#, TVar tvar# #) + +-- |Return the current value stored in a TVar. +-- This is equivalent to +-- +-- > readTVarIO = atomically . readTVar +-- +-- but works much faster, because it doesn't perform a complete +-- transaction, it just reads the current value of the 'TVar'. +readTVarIO :: TVar a -> IO a +readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s# + +-- |Return the current value stored in a TVar +readTVar :: TVar a -> STM a +readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# + +-- |Write the supplied value into a TVar +writeTVar :: TVar a -> a -> STM () +writeTVar (TVar tvar#) val = STM $ \s1# -> + case writeTVar# tvar# val s1# of + s2# -> (# s2#, () #) + +\end{code} + +MVar utilities + +\begin{code} +withMVar :: MVar a -> (a -> IO b) -> IO b +withMVar m io = + mask $ \restore -> do + a <- takeMVar m + b <- catchAny (restore (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a + return b + +modifyMVar_ :: MVar a -> (a -> IO a) -> IO () +modifyMVar_ m io = + mask $ \restore -> do + a <- takeMVar m + a' <- catchAny (restore (io a)) + (\e -> do putMVar m a; throw e) + putMVar m a' + return () +\end{code} + +%************************************************************************ +%* * +\subsection{Thread waiting} +%* * +%************************************************************************ + +\begin{code} + +-- Machinery needed to ensureb that we only have one copy of certain +-- CAFs in this module even when the base package is present twice, as +-- it is when base is dynamically loaded into GHCi. The RTS keeps +-- track of the single true value of the CAF, so even when the CAFs in +-- the dynamically-loaded base package are reverted, nothing bad +-- happens. +-- +sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a +sharedCAF a get_or_set = + mask_ $ do + stable_ref <- newStablePtr a + let ref = castPtr (castStablePtrToPtr stable_ref) + ref2 <- get_or_set ref + if ref==ref2 + then return a + else do freeStablePtr stable_ref + deRefStablePtr (castPtrToStablePtr (castPtr ref2)) + +reportStackOverflow :: IO () +reportStackOverflow = callStackOverflowHook + +reportError :: SomeException -> IO () +reportError ex = do + handler <- getUncaughtExceptionHandler + handler ex + +-- SUP: Are the hooks allowed to re-enter Haskell land? If so, remove +-- the unsafe below. +foreign import ccall unsafe "stackOverflow" + callStackOverflowHook :: IO () + +{-# NOINLINE uncaughtExceptionHandler #-} +uncaughtExceptionHandler :: IORef (SomeException -> IO ()) +uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler) + where + defaultHandler :: SomeException -> IO () + defaultHandler se@(SomeException ex) = do + (hFlush stdout) `catchAny` (\ _ -> return ()) + let msg = case cast ex of + Just Deadlock -> "no threads to run: infinite loop or deadlock?" + _ -> case cast ex of + Just (ErrorCall s) -> s + _ -> showsPrec 0 se "" + withCString "%s" $ \cfmt -> + withCString msg $ \cmsg -> + errorBelch cfmt cmsg + +-- don't use errorBelch() directly, because we cannot call varargs functions +-- using the FFI. +foreign import ccall unsafe "HsBase.h errorBelch2" + errorBelch :: CString -> CString -> IO () + +setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO () +setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler + +getUncaughtExceptionHandler :: IO (SomeException -> IO ()) +getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler + +\end{code} diff --git a/GHC/Conc/Windows.hs b/GHC/Conc/Windows.hs new file mode 100644 index 0000000..14139b7 --- /dev/null +++ b/GHC/Conc/Windows.hs @@ -0,0 +1,335 @@ +{-# OPTIONS_GHC -XNoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} +{-# OPTIONS_HADDOCK not-home #-} +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Conc.Windows +-- Copyright : (c) The University of Glasgow, 1994-2002 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Windows I/O manager +-- +----------------------------------------------------------------------------- + +-- #not-home +module GHC.Conc.Windows + ( ensureIOManagerIsRunning + + -- * Waiting + , threadDelay + , registerDelay + + -- * Miscellaneous + , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) + , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) + , asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int + + , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) + , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) + + , ConsoleEvent(..) + , win32ConsoleHandler + , toWin32ConsoleEvent + ) where + +import Control.Monad +import Data.Bits (shiftR) +import Data.Maybe (Maybe(..)) +import Data.Typeable +import Foreign.C.Error (throwErrno) +import GHC.Base +import GHC.Conc.Sync +import GHC.Enum (Enum) +import GHC.IO (unsafePerformIO) +import GHC.IORef +import GHC.MVar +import GHC.Num (Num(..)) +import GHC.Ptr +import GHC.Read (Read) +import GHC.Real (div, fromIntegral) +import GHC.Show (Show) +import GHC.Word (Word32, Word64) + +-- ---------------------------------------------------------------------------- +-- Thread waiting + +-- Note: threadWaitRead and threadWaitWrite aren't really functional +-- on Win32, but left in there because lib code (still) uses them (the manner +-- in which they're used doesn't cause problems on a Win32 platform though.) + +asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) +asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncRead# fd isSock len buf s of + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) + +asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) +asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncWrite# fd isSock len buf s of + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) + +asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int +asyncDoProc (FunPtr proc) (Ptr param) = + -- the 'length' value is ignored; simplifies implementation of + -- the async*# primops to have them all return the same result. + IO $ \s -> case asyncDoProc# proc param s of + (# s', _len#, err# #) -> (# s', I# err# #) + +-- to aid the use of these primops by the IO Handle implementation, +-- provide the following convenience funs: + +-- this better be a pinned byte array! +asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) +asyncReadBA fd isSock len off bufB = + asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) + +asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) +asyncWriteBA fd isSock len off bufB = + asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) + +-- ---------------------------------------------------------------------------- +-- Threaded RTS implementation of threadDelay + +-- | Suspends the current thread for a given number of microseconds +-- (GHC only). +-- +-- There is no guarantee that the thread will be rescheduled promptly +-- when the delay has expired, but the thread will never continue to +-- run /earlier/ than specified. +-- +threadDelay :: Int -> IO () +threadDelay time + | threaded = waitForDelayEvent time + | otherwise = IO $ \s -> + case fromIntegral time of { I# time# -> + case delay# time# s of { s' -> (# s', () #) + }} + +-- | Set the value of returned TVar to True after a given number of +-- microseconds. The caveats associated with threadDelay also apply. +-- +registerDelay :: Int -> IO (TVar Bool) +registerDelay usecs + | threaded = waitForDelayEventSTM usecs + | otherwise = error "registerDelay: requires -threaded" + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + +waitForDelayEvent :: Int -> IO () +waitForDelayEvent usecs = do + m <- newEmptyMVar + target <- calculateTarget usecs + atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) + prodServiceThread + takeMVar m + +-- Delays for use in STM +waitForDelayEventSTM :: Int -> IO (TVar Bool) +waitForDelayEventSTM usecs = do + t <- atomically $ newTVar False + target <- calculateTarget usecs + atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ())) + prodServiceThread + return t + +calculateTarget :: Int -> IO USecs +calculateTarget usecs = do + now <- getUSecOfDay + return $ now + (fromIntegral usecs) + +data DelayReq + = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ()) + | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) + +{-# NOINLINE pendingDelays #-} +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcWindowsPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsPendingDelaysStore" + getOrSetGHCConcWindowsPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore" + getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a) + +ensureIOManagerIsRunning :: IO () +ensureIOManagerIsRunning + | threaded = startIOManagerThread + | otherwise = return () + +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + +insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] +insertDelay d [] = [d] +insertDelay d1 ds@(d2 : rest) + | delayTime d1 <= delayTime d2 = d1 : ds + | otherwise = d2 : insertDelay d1 rest + +delayTime :: DelayReq -> USecs +delayTime (Delay t _) = t +delayTime (DelaySTM t _) = t + +type USecs = Word64 + +foreign import ccall unsafe "getUSecOfDay" + getUSecOfDay :: IO USecs + +{-# NOINLINE prodding #-} +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcWindowsProddingStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsProddingStore" + getOrSetGHCConcWindowsProddingStore :: Ptr a -> IO (Ptr a) + +prodServiceThread :: IO () +prodServiceThread = do + -- NB. use atomicModifyIORef here, otherwise there are race + -- conditions in which prodding is left at True but the server is + -- blocked in select(). + was_set <- atomicModifyIORef prodding $ \b -> (True,b) + unless was_set wakeupIOManager + +-- ---------------------------------------------------------------------------- +-- Windows IO manager thread + +ioManager :: IO () +ioManager = do + wakeup <- c_getIOManagerEvent + service_loop wakeup [] + +service_loop :: HANDLE -- read end of pipe + -> [DelayReq] -- current delay requests + -> IO () + +service_loop wakeup old_delays = do + -- pick up new delay requests + new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) + let delays = foldr insertDelay old_delays new_delays + + now <- getUSecOfDay + (delays', timeout) <- getDelay now delays + + r <- c_WaitForSingleObject wakeup timeout + case r of + 0xffffffff -> do c_maperrno; throwErrno "service_loop" + 0 -> do + r2 <- c_readIOManagerEvent + exit <- + case r2 of + _ | r2 == io_MANAGER_WAKEUP -> return False + _ | r2 == io_MANAGER_DIE -> return True + 0 -> return False -- spurious wakeup + _ -> do start_console_handler (r2 `shiftR` 1); return False + unless exit $ service_cont wakeup delays' + + _other -> service_cont wakeup delays' -- probably timeout + +service_cont :: HANDLE -> [DelayReq] -> IO () +service_cont wakeup delays = do + r <- atomicModifyIORef prodding (\_ -> (False,False)) + r `seq` return () -- avoid space leak + service_loop wakeup delays + +-- must agree with rts/win32/ThrIOManager.c +io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 +io_MANAGER_WAKEUP = 0xffffffff +io_MANAGER_DIE = 0xfffffffe + +data ConsoleEvent + = ControlC + | Break + | Close + -- these are sent to Services only. + | Logoff + | Shutdown + deriving (Eq, Ord, Enum, Show, Read, Typeable) + +start_console_handler :: Word32 -> IO () +start_console_handler r = + case toWin32ConsoleEvent r of + Just x -> withMVar win32ConsoleHandler $ \handler -> do + _ <- forkIO (handler x) + return () + Nothing -> return () + +toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent +toWin32ConsoleEvent ev = + case ev of + 0 {- CTRL_C_EVENT-} -> Just ControlC + 1 {- CTRL_BREAK_EVENT-} -> Just Break + 2 {- CTRL_CLOSE_EVENT-} -> Just Close + 5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff + 6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown + _ -> Nothing + +win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) +win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) + +wakeupIOManager :: IO () +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP + +-- Walk the queue of pending delays, waking up any that have passed +-- and return the smallest delay to wait for. The queue of pending +-- delays is kept ordered. +getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) +getDelay _ [] = return ([], iNFINITE) +getDelay now all@(d : rest) + = case d of + Delay time m | now >= time -> do + putMVar m () + getDelay now rest + DelaySTM time t | now >= time -> do + atomically $ writeTVar t True + getDelay now rest + _otherwise -> + -- delay is in millisecs for WaitForSingleObject + let micro_seconds = delayTime d - now + milli_seconds = (micro_seconds + 999) `div` 1000 + in return (all, fromIntegral milli_seconds) + +-- ToDo: this just duplicates part of System.Win32.Types, which isn't +-- available yet. We should move some Win32 functionality down here, +-- maybe as part of the grand reorganisation of the base package... +type HANDLE = Ptr () +type DWORD = Word32 + +iNFINITE :: DWORD +iNFINITE = 0xFFFFFFFF -- urgh + +foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_getIOManagerEvent :: IO HANDLE + +foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_readIOManagerEvent :: IO Word32 + +foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_sendIOManagerEvent :: Word32 -> IO () + +foreign import ccall unsafe "maperrno" -- in Win32Utils.c + c_maperrno :: IO () + +foreign import stdcall "WaitForSingleObject" + c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD diff --git a/GHC/IO/FD.hs b/GHC/IO/FD.hs index 4425a3a..0480bb6 100644 --- a/GHC/IO/FD.hs +++ b/GHC/IO/FD.hs @@ -37,7 +37,7 @@ import GHC.IO.Buffer import GHC.IO.BufferedIO import qualified GHC.IO.Device import GHC.IO.Device (SeekMode(..), IODeviceType(..)) -import GHC.Conc +import GHC.Conc.IO import GHC.IO.Exception import Foreign diff --git a/GHC/IO/Handle/Internals.hs b/GHC/IO/Handle/Internals.hs index eac74c2..844c8c6 100644 --- a/GHC/IO/Handle/Internals.hs +++ b/GHC/IO/Handle/Internals.hs @@ -59,7 +59,7 @@ import GHC.IO.Device (IODevice, SeekMode(..)) import qualified GHC.IO.Device as IODevice import qualified GHC.IO.BufferedIO as Buffered -import GHC.Conc +import GHC.Conc.Sync import GHC.Real import GHC.Base import GHC.Exception diff --git a/System/Event.hs b/System/Event.hs new file mode 100644 index 0000000..f8537ca --- /dev/null +++ b/System/Event.hs @@ -0,0 +1,35 @@ +module System.Event + ( -- * Types + EventManager + + -- * Creation + , new + + -- * Running + , loop + + -- ** Stepwise running + , step + , shutdown + + -- * Registering interest in I/O events + , Event + , evtRead + , evtWrite + , IOCallback + , FdKey(keyFd) + , registerFd + , registerFd_ + , unregisterFd + , unregisterFd_ + , fdWasClosed + + -- * Registering interest in timeout events + , TimeoutCallback + , TimeoutKey + , registerTimeout + , updateTimeout + , unregisterTimeout + ) where + +import System.Event.Manager diff --git a/System/Event/Array.hs b/System/Event/Array.hs new file mode 100644 index 0000000..4d590d9 --- /dev/null +++ b/System/Event/Array.hs @@ -0,0 +1,291 @@ +{-# LANGUAGE BangPatterns, CPP, ForeignFunctionInterface, NoImplicitPrelude #-} + +module System.Event.Array + ( + Array + , capacity + , clear + , concat + , copy + , duplicate + , empty + , ensureCapacity + , findIndex + , forM_ + , length + , loop + , new + , removeAt + , snoc + , unsafeLoad + , unsafeRead + , unsafeWrite + , useAsPtr + ) where + +import Control.Monad hiding (forM_) +import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef, writeIORef) +import Data.Maybe +import Foreign.C.Types (CSize) +import Foreign.ForeignPtr (ForeignPtr, withForeignPtr) +import Foreign.Ptr (Ptr, nullPtr, plusPtr) +import Foreign.Storable (Storable(..)) +import GHC.Base +import GHC.Err (undefined) +import GHC.Float (logBase) +import GHC.ForeignPtr (mallocPlainForeignPtrBytes, newForeignPtr_) +import GHC.Num (Num(..)) +import GHC.Real ((^), ceiling, fromIntegral, realToFrac) +import GHC.Show (show) + +#define BOUNDS_CHECKING 1 + +#if defined(BOUNDS_CHECKING) +-- This fugly hack is brought by GHC's apparent reluctance to deal +-- with MagicHash and UnboxedTuples when inferring types. Eek! +#define CHECK_BOUNDS(_func_,_len_,_k_) \ +if (_k_) < 0 || (_k_) >= (_len_) then error ("System.Event.Array." ++ (_func_) ++ ": bounds error, index " ++ show (_k_) ++ ", capacity " ++ show (_len_)) else +#else +#define CHECK_BOUNDS(_func_,_len_,_k_) +#endif + +-- Invariant: size <= capacity +newtype Array a = Array (IORef (AC a)) + +-- The actual array content. +data AC a = AC + !(ForeignPtr a) -- Elements + !Int -- Number of elements (length) + !Int -- Maximum number of elements (capacity) + +empty :: IO (Array a) +empty = do + p <- newForeignPtr_ nullPtr + Array `fmap` newIORef (AC p 0 0) + +allocArray :: Storable a => Int -> IO (ForeignPtr a) +allocArray n = allocHack undefined + where + allocHack :: Storable a => a -> IO (ForeignPtr a) + allocHack dummy = mallocPlainForeignPtrBytes (n * sizeOf dummy) + +reallocArray :: Storable a => ForeignPtr a -> Int -> Int -> IO (ForeignPtr a) +reallocArray p newSize oldSize = reallocHack undefined p + where + reallocHack :: Storable a => a -> ForeignPtr a -> IO (ForeignPtr a) + reallocHack dummy src = do + let size = sizeOf dummy + dst <- mallocPlainForeignPtrBytes (newSize * size) + withForeignPtr src $ \s -> + when (s /= nullPtr && oldSize > 0) . + withForeignPtr dst $ \d -> do + _ <- memcpy d s (fromIntegral (oldSize * size)) + return () + return dst + +new :: Storable a => Int -> IO (Array a) +new c = do + es <- allocArray cap + fmap Array (newIORef (AC es 0 cap)) + where + cap = firstPowerOf2 c + +duplicate :: Storable a => Array a -> IO (Array a) +duplicate a = dupHack undefined a + where + dupHack :: Storable b => b -> Array b -> IO (Array b) + dupHack dummy (Array ref) = do + AC es len cap <- readIORef ref + ary <- allocArray cap + withForeignPtr ary $ \dest -> + withForeignPtr es $ \src -> do + _ <- memcpy dest src (fromIntegral (len * sizeOf dummy)) + return () + Array `fmap` newIORef (AC ary len cap) + +length :: Array a -> IO Int +length (Array ref) = do + AC _ len _ <- readIORef ref + return len + +capacity :: Array a -> IO Int +capacity (Array ref) = do + AC _ _ cap <- readIORef ref + return cap + +unsafeRead :: Storable a => Array a -> Int -> IO a +unsafeRead (Array ref) ix = do + AC es _ cap <- readIORef ref + CHECK_BOUNDS("unsafeRead",cap,ix) + withForeignPtr es $ \p -> + peekElemOff p ix + +unsafeWrite :: Storable a => Array a -> Int -> a -> IO () +unsafeWrite (Array ref) ix a = do + ac <- readIORef ref + unsafeWrite' ac ix a + +unsafeWrite' :: Storable a => AC a -> Int -> a -> IO () +unsafeWrite' (AC es _ cap) ix a = do + CHECK_BOUNDS("unsafeWrite'",cap,ix) + withForeignPtr es $ \p -> + pokeElemOff p ix a + +unsafeLoad :: Storable a => Array a -> (Ptr a -> Int -> IO Int) -> IO Int +unsafeLoad (Array ref) load = do + AC es _ cap <- readIORef ref + len' <- withForeignPtr es $ \p -> load p cap + writeIORef ref (AC es len' cap) + return len' + +ensureCapacity :: Storable a => Array a -> Int -> IO () +ensureCapacity (Array ref) c = do + ac@(AC _ _ cap) <- readIORef ref + ac'@(AC _ _ cap') <- ensureCapacity' ac c + when (cap' /= cap) $ + writeIORef ref ac' + +ensureCapacity' :: Storable a => AC a -> Int -> IO (AC a) +ensureCapacity' ac@(AC es len cap) c = do + if c > cap + then do + es' <- reallocArray es cap' cap + return (AC es' len cap') + else + return ac + where + cap' = firstPowerOf2 c + +useAsPtr :: Array a -> (Ptr a -> Int -> IO b) -> IO b +useAsPtr (Array ref) f = do + AC es len _ <- readIORef ref + withForeignPtr es $ \p -> f p len + +snoc :: Storable a => Array a -> a -> IO () +snoc (Array ref) e = do + ac@(AC _ len _) <- readIORef ref + let len' = len + 1 + ac'@(AC es _ cap) <- ensureCapacity' ac len' + unsafeWrite' ac' len e + writeIORef ref (AC es len' cap) + +clear :: Storable a => Array a -> IO () +clear (Array ref) = do + !_ <- atomicModifyIORef ref $ \(AC es _ cap) -> + let e = AC es 0 cap in (e, e) + return () + +forM_ :: Storable a => Array a -> (a -> IO ()) -> IO () +forM_ ary g = forHack ary g undefined + where + forHack :: Storable b => Array b -> (b -> IO ()) -> b -> IO () + forHack (Array ref) f dummy = do + AC es len _ <- readIORef ref + let size = sizeOf dummy + offset = len * size + withForeignPtr es $ \p -> do + let go n | n >= offset = return () + | otherwise = do + f =<< peek (p `plusPtr` n) + go (n + size) + go 0 + +loop :: Storable a => Array a -> b -> (b -> a -> IO (b,Bool)) -> IO () +loop ary z g = loopHack ary z g undefined + where + loopHack :: Storable b => Array b -> c -> (c -> b -> IO (c,Bool)) -> b + -> IO () + loopHack (Array ref) y f dummy = do + AC es len _ <- readIORef ref + let size = sizeOf dummy + offset = len * size + withForeignPtr es $ \p -> do + let go n k + | n >= offset = return () + | otherwise = do + (k',cont) <- f k =<< peek (p `plusPtr` n) + when cont $ go (n + size) k' + go 0 y + +findIndex :: Storable a => (a -> Bool) -> Array a -> IO (Maybe (Int,a)) +findIndex = findHack undefined + where + findHack :: Storable b => b -> (b -> Bool) -> Array b -> IO (Maybe (Int,b)) + findHack dummy p (Array ref) = do + AC es len _ <- readIORef ref + let size = sizeOf dummy + offset = len * size + withForeignPtr es $ \ptr -> + let go !n !i + | n >= offset = return Nothing + | otherwise = do + val <- peek (ptr `plusPtr` n) + if p val + then return $ Just (i, val) + else go (n + size) (i + 1) + in go 0 0 + +concat :: Storable a => Array a -> Array a -> IO () +concat (Array d) (Array s) = do + da@(AC _ dlen _) <- readIORef d + sa@(AC _ slen _) <- readIORef s + writeIORef d =<< copy' da dlen sa 0 slen + +-- | Copy part of the source array into the destination array. The +-- destination array is resized if not large enough. +copy :: Storable a => Array a -> Int -> Array a -> Int -> Int -> IO () +copy (Array d) dstart (Array s) sstart maxCount = do + da <- readIORef d + sa <- readIORef s + writeIORef d =<< copy' da dstart sa sstart maxCount + +-- | Copy part of the source array into the destination array. The +-- destination array is resized if not large enough. +copy' :: Storable a => AC a -> Int -> AC a -> Int -> Int -> IO (AC a) +copy' d dstart s sstart maxCount = copyHack d s undefined + where + copyHack :: Storable b => AC b -> AC b -> b -> IO (AC b) + copyHack dac@(AC _ oldLen _) (AC src slen _) dummy = do + when (maxCount < 0 || dstart < 0 || dstart > oldLen || sstart < 0 || + sstart > slen) $ error "copy: bad offsets or lengths" + let size = sizeOf dummy + count = min maxCount (slen - sstart) + if count == 0 + then return dac + else do + AC dst dlen dcap <- ensureCapacity' dac (dstart + count) + withForeignPtr dst $ \dptr -> + withForeignPtr src $ \sptr -> do + _ <- memcpy (dptr `plusPtr` (dstart * size)) + (sptr `plusPtr` (sstart * size)) + (fromIntegral (count * size)) + return $ AC dst (max dlen (dstart + count)) dcap + +removeAt :: Storable a => Array a -> Int -> IO () +removeAt a i = removeHack a undefined + where + removeHack :: Storable b => Array b -> b -> IO () + removeHack (Array ary) dummy = do + AC fp oldLen cap <- readIORef ary + when (i < 0 || i >= oldLen) $ error "removeAt: invalid index" + let size = sizeOf dummy + newLen = oldLen - 1 + when (newLen > 0 && i < newLen) . + withForeignPtr fp $ \ptr -> do + _ <- memmove (ptr `plusPtr` (size * i)) + (ptr `plusPtr` (size * (i+1))) + (fromIntegral (size * (newLen-i))) + return () + writeIORef ary (AC fp newLen cap) + +firstPowerOf2 :: Int -> Int +firstPowerOf2 n + | n <= 0 = 0 + | otherwise = 2^p + where p = (ceiling . logBase (2 :: Double) . realToFrac) n :: Int + +foreign import ccall unsafe "string.h memcpy" + memcpy :: Ptr a -> Ptr a -> CSize -> IO (Ptr a) + +foreign import ccall unsafe "string.h memmove" + memmove :: Ptr a -> Ptr a -> CSize -> IO (Ptr a) diff --git a/System/Event/Clock.hsc b/System/Event/Clock.hsc new file mode 100644 index 0000000..fec00bb --- /dev/null +++ b/System/Event/Clock.hsc @@ -0,0 +1,48 @@ +{-# LANGUAGE ForeignFunctionInterface #-} + +module System.Event.Clock (getCurrentTime) where + +#include + +import Foreign (Ptr, Storable(..), nullPtr, with) +import Foreign.C.Error (throwErrnoIfMinus1_) +import Foreign.C.Types (CInt, CLong) +import GHC.Base +import GHC.Err +import GHC.Num +import GHC.Real + +-- TODO: Implement this for Windows. + +-- | Return the current time, in seconds since Jan. 1, 1970. +getCurrentTime :: IO Double +getCurrentTime = do + tv <- with (CTimeval 0 0) $ \tvptr -> do + throwErrnoIfMinus1_ "gettimeofday" (gettimeofday tvptr nullPtr) + peek tvptr + let !t = fromIntegral (sec tv) + fromIntegral (usec tv) / 1000000.0 + return t + +------------------------------------------------------------------------ +-- FFI binding + +data CTimeval = CTimeval + { sec :: {-# UNPACK #-} !CLong + , usec :: {-# UNPACK #-} !CLong + } + +instance Storable CTimeval where + sizeOf _ = #size struct timeval + alignment _ = alignment (undefined :: CLong) + + peek ptr = do + sec' <- #{peek struct timeval, tv_sec} ptr + usec' <- #{peek struct timeval, tv_usec} ptr + return $ CTimeval sec' usec' + + poke ptr tv = do + #{poke struct timeval, tv_sec} ptr (sec tv) + #{poke struct timeval, tv_usec} ptr (usec tv) + +foreign import ccall unsafe "sys/time.h gettimeofday" gettimeofday + :: Ptr CTimeval -> Ptr () -> IO CInt diff --git a/System/Event/Control.hs b/System/Event/Control.hs new file mode 100644 index 0000000..75a5ad0 --- /dev/null +++ b/System/Event/Control.hs @@ -0,0 +1,211 @@ +{-# LANGUAGE CPP, ForeignFunctionInterface, NoImplicitPrelude, + ScopedTypeVariables #-} + +module System.Event.Control + ( + -- * Managing the IO manager + Signal + , ControlMessage(..) + , Control + , newControl + , closeControl + -- ** Control message reception + , readControlMessage + -- *** File descriptors + , controlReadFd + , wakeupReadFd + -- ** Control message sending + , sendWakeup + , sendDie + -- * Utilities + , setNonBlockingFD + ) where + +#include "EventConfig.h" + +import Control.Monad (when) +import Foreign.ForeignPtr (ForeignPtr) +import GHC.Base +import GHC.Conc.Signal (Signal) +import GHC.Num (Num(..)) +import GHC.Real (fromIntegral) +import GHC.Show (Show) +import GHC.Word (Word8) +import Foreign.C.Error (throwErrnoIfMinus1_) +import Foreign.C.Types (CInt, CSize) +import Foreign.ForeignPtr (mallocForeignPtrBytes, withForeignPtr) +import Foreign.Marshal (alloca, allocaBytes) +import Foreign.Marshal.Array (allocaArray) +import Foreign.Ptr (castPtr) +import Foreign.Storable (peek, peekElemOff, poke) +import System.Posix.Internals (c_close, c_pipe, c_read, c_write, + setCloseOnExec, setNonBlockingFD) +import System.Posix.Types (Fd) + +#if defined(HAVE_EVENTFD) +import Data.Word (Word64) +import Foreign.C.Error (throwErrnoIfMinus1) +#else +import Foreign.C.Error (eAGAIN, eWOULDBLOCK, getErrno, throwErrno) +#endif + +data ControlMessage = CMsgWakeup + | CMsgDie + | CMsgSignal {-# UNPACK #-} !(ForeignPtr Word8) + {-# UNPACK #-} !Signal + deriving (Eq, Show) + +-- | The structure used to tell the IO manager thread what to do. +data Control = W { + controlReadFd :: {-# UNPACK #-} !Fd + , controlWriteFd :: {-# UNPACK #-} !Fd +#if defined(HAVE_EVENTFD) + , controlEventFd :: {-# UNPACK #-} !Fd +#else + , wakeupReadFd :: {-# UNPACK #-} !Fd + , wakeupWriteFd :: {-# UNPACK #-} !Fd +#endif + } deriving (Show) + +#if defined(HAVE_EVENTFD) +wakeupReadFd :: Control -> Fd +wakeupReadFd = controlEventFd +{-# INLINE wakeupReadFd #-} +#endif + +setNonBlock :: CInt -> IO () +setNonBlock fd = +#if __GLASGOW_HASKELL__ >= 611 + setNonBlockingFD fd True +#else + setNonBlockingFD fd +#endif + +-- | Create the structure (usually a pipe) used for waking up the IO +-- manager thread from another thread. +newControl :: IO Control +newControl = allocaArray 2 $ \fds -> do + let createPipe = do + throwErrnoIfMinus1_ "pipe" $ c_pipe fds + rd <- peekElemOff fds 0 + wr <- peekElemOff fds 1 + -- The write end must be non-blocking, since we may need to + -- poke the event manager from a signal handler. + setNonBlock wr + setCloseOnExec rd + setCloseOnExec wr + return (rd, wr) + (ctrl_rd, ctrl_wr) <- createPipe + c_setIOManagerControlFd ctrl_wr +#if defined(HAVE_EVENTFD) + ev <- throwErrnoIfMinus1 "eventfd" $ c_eventfd 0 0 + setNonBlock ev + setCloseOnExec ev + c_setIOManagerWakeupFd ev +#else + (wake_rd, wake_wr) <- createPipe + c_setIOManagerWakeupFd wake_wr +#endif + return W { controlReadFd = fromIntegral ctrl_rd + , controlWriteFd = fromIntegral ctrl_wr +#if defined(HAVE_EVENTFD) + , controlEventFd = fromIntegral ev +#else + , wakeupReadFd = fromIntegral wake_rd + , wakeupWriteFd = fromIntegral wake_wr +#endif + } + +-- | Close the control structure used by the IO manager thread. +closeControl :: Control -> IO () +closeControl w = do + _ <- c_close . fromIntegral . controlReadFd $ w + _ <- c_close . fromIntegral . controlWriteFd $ w +#if defined(HAVE_EVENTFD) + _ <- c_close . fromIntegral . controlEventFd $ w +#else + _ <- c_close . fromIntegral . wakeupReadFd $ w + _ <- c_close . fromIntegral . wakeupWriteFd $ w +#endif + return () + +io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word8 +io_MANAGER_WAKEUP = 0xff +io_MANAGER_DIE = 0xfe + +foreign import ccall "__hscore_sizeof_siginfo_t" + sizeof_siginfo_t :: CSize + +readControlMessage :: Control -> Fd -> IO ControlMessage +readControlMessage ctrl fd + | fd == wakeupReadFd ctrl = allocaBytes wakeupBufferSize $ \p -> do + throwErrnoIfMinus1_ "readWakeupMessage" $ + c_read (fromIntegral fd) p (fromIntegral wakeupBufferSize) + return CMsgWakeup + | otherwise = + alloca $ \p -> do + throwErrnoIfMinus1_ "readControlMessage" $ + c_read (fromIntegral fd) p 1 + s <- peek p + case s of + -- Wakeup messages shouldn't be sent on the control + -- file descriptor but we handle them anyway. + _ | s == io_MANAGER_WAKEUP -> return CMsgWakeup + _ | s == io_MANAGER_DIE -> return CMsgDie + _ -> do -- Signal + fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t) + withForeignPtr fp $ \p_siginfo -> do + r <- c_read (fromIntegral fd) (castPtr p_siginfo) + sizeof_siginfo_t + when (r /= fromIntegral sizeof_siginfo_t) $ + error "failed to read siginfo_t" + let !s' = fromIntegral s + return $ CMsgSignal fp s' + + where wakeupBufferSize = +#if defined(HAVE_EVENTFD) + 8 +#else + 4096 +#endif + +sendWakeup :: Control -> IO () +#if defined(HAVE_EVENTFD) +sendWakeup c = alloca $ \p -> do + poke p (1 :: Word64) + throwErrnoIfMinus1_ "sendWakeup" $ + c_write (fromIntegral (controlEventFd c)) (castPtr p) 8 +#else +sendWakeup c = do + n <- sendMessage (wakeupWriteFd c) CMsgWakeup + case n of + _ | n /= -1 -> return () + | otherwise -> do + errno <- getErrno + when (errno /= eAGAIN && errno /= eWOULDBLOCK) $ + throwErrno "sendWakeup" +#endif + +sendDie :: Control -> IO () +sendDie c = throwErrnoIfMinus1_ "sendDie" $ + sendMessage (controlWriteFd c) CMsgDie + +sendMessage :: Fd -> ControlMessage -> IO Int +sendMessage fd msg = alloca $ \p -> do + case msg of + CMsgWakeup -> poke p io_MANAGER_WAKEUP + CMsgDie -> poke p io_MANAGER_DIE + CMsgSignal _fp _s -> error "Signals can only be sent from within the RTS" + fromIntegral `fmap` c_write (fromIntegral fd) p 1 + +#if defined(HAVE_EVENTFD) +foreign import ccall unsafe "sys/eventfd.h eventfd" + c_eventfd :: CInt -> CInt -> IO CInt +#endif + +-- Used to tell the RTS how it can send messages to the I/O manager. +foreign import ccall "setIOManagerControlFd" + c_setIOManagerControlFd :: CInt -> IO () + +foreign import ccall "setIOManagerWakeupFd" + c_setIOManagerWakeupFd :: CInt -> IO () diff --git a/System/Event/EPoll.hsc b/System/Event/EPoll.hsc new file mode 100644 index 0000000..098946e --- /dev/null +++ b/System/Event/EPoll.hsc @@ -0,0 +1,213 @@ +{-# LANGUAGE ForeignFunctionInterface, GeneralizedNewtypeDeriving, + NoImplicitPrelude #-} + +-- +-- | A binding to the epoll I/O event notification facility +-- +-- epoll is a variant of poll that can be used either as an edge-triggered or +-- a level-triggered interface and scales well to large numbers of watched file +-- descriptors. +-- +-- epoll decouples monitor an fd from the process of registering it. +-- +module System.Event.EPoll + ( + new + , available + ) where + +import qualified System.Event.Internal as E + +#include "EventConfig.h" +#if !defined(HAVE_EPOLL) +import GHC.Base + +new :: IO E.Backend +new = error "EPoll back end not implemented for this platform" + +available :: Bool +available = False +{-# INLINE available #-} +#else + +#include + +import Control.Monad (when) +import Data.Bits (Bits, (.|.), (.&.)) +import Data.Monoid (Monoid(..)) +import Data.Word (Word32) +import Foreign.C.Error (throwErrnoIfMinus1, throwErrnoIfMinus1_) +import Foreign.C.Types (CInt) +import Foreign.Marshal.Utils (with) +import Foreign.Ptr (Ptr) +import Foreign.Storable (Storable(..)) +import GHC.Base +import GHC.Err (undefined) +import GHC.Num (Num(..)) +import GHC.Real (ceiling, fromIntegral) +import GHC.Show (Show) +import System.Posix.Internals (c_close) +#if !defined(HAVE_EPOLL_CREATE1) +import System.Posix.Internals (setCloseOnExec) +#endif +import System.Posix.Types (Fd(..)) + +import qualified System.Event.Array as A +import System.Event.Internal (Timeout(..)) + +available :: Bool +available = True +{-# INLINE available #-} + +data EPoll = EPoll { + epollFd :: {-# UNPACK #-} !EPollFd + , epollEvents :: {-# UNPACK #-} !(A.Array Event) + } + +-- | Create a new epoll backend. +new :: IO E.Backend +new = do + epfd <- epollCreate + evts <- A.new 64 + let !be = E.backend poll modifyFd delete (EPoll epfd evts) + return be + +delete :: EPoll -> IO () +delete be = do + _ <- c_close . fromEPollFd . epollFd $ be + return () + +-- | Change the set of events we are interested in for a given file +-- descriptor. +modifyFd :: EPoll -> Fd -> E.Event -> E.Event -> IO () +modifyFd ep fd oevt nevt = with (Event (fromEvent nevt) fd) $ + epollControl (epollFd ep) op fd + where op | oevt == mempty = controlOpAdd + | nevt == mempty = controlOpDelete + | otherwise = controlOpModify + +-- | Select a set of file descriptors which are ready for I/O +-- operations and call @f@ for all ready file descriptors, passing the +-- events that are ready. +poll :: EPoll -- ^ state + -> Timeout -- ^ timeout in milliseconds + -> (Fd -> E.Event -> IO ()) -- ^ I/O callback + -> IO () +poll ep timeout f = do + let events = epollEvents ep + + -- Will return zero if the system call was interupted, in which case + -- we just return (and try again later.) + n <- A.unsafeLoad events $ \es cap -> + epollWait (epollFd ep) es cap $ fromTimeout timeout + + when (n > 0) $ do + A.forM_ events $ \e -> f (eventFd e) (toEvent (eventTypes e)) + cap <- A.capacity events + when (cap == n) $ A.ensureCapacity events (2 * cap) + +newtype EPollFd = EPollFd { + fromEPollFd :: CInt + } deriving (Eq, Show) + +data Event = Event { + eventTypes :: EventType + , eventFd :: Fd + } deriving (Show) + +instance Storable Event where + sizeOf _ = #size struct epoll_event + alignment _ = alignment (undefined :: CInt) + + peek ptr = do + ets <- #{peek struct epoll_event, events} ptr + ed <- #{peek struct epoll_event, data.fd} ptr + let !ev = Event (EventType ets) ed + return ev + + poke ptr e = do + #{poke struct epoll_event, events} ptr (unEventType $ eventTypes e) + #{poke struct epoll_event, data.fd} ptr (eventFd e) + +newtype ControlOp = ControlOp CInt + +#{enum ControlOp, ControlOp + , controlOpAdd = EPOLL_CTL_ADD + , controlOpModify = EPOLL_CTL_MOD + , controlOpDelete = EPOLL_CTL_DEL + } + +newtype EventType = EventType { + unEventType :: Word32 + } deriving (Show, Eq, Num, Bits) + +#{enum EventType, EventType + , epollIn = EPOLLIN + , epollOut = EPOLLOUT + , epollErr = EPOLLERR + , epollHup = EPOLLHUP + } + +-- | Create a new epoll context, returning a file descriptor associated with the context. +-- The fd may be used for subsequent calls to this epoll context. +-- +-- The size parameter to epoll_create is a hint about the expected number of handles. +-- +-- The file descriptor returned from epoll_create() should be destroyed via +-- a call to close() after polling is finished +-- +epollCreate :: IO EPollFd +epollCreate = do + fd <- throwErrnoIfMinus1 "epollCreate" $ +#if defined(HAVE_EPOLL_CREATE1) + c_epoll_create1 (#const EPOLL_CLOEXEC) +#else + c_epoll_create 256 -- argument is ignored + setCloseOnExec fd +#endif + let !epollFd' = EPollFd fd + return epollFd' + +epollControl :: EPollFd -> ControlOp -> Fd -> Ptr Event -> IO () +epollControl (EPollFd epfd) (ControlOp op) (Fd fd) event = + throwErrnoIfMinus1_ "epollControl" $ c_epoll_ctl epfd op fd event + +epollWait :: EPollFd -> Ptr Event -> Int -> Int -> IO Int +epollWait (EPollFd epfd) events numEvents timeout = + fmap fromIntegral . + E.throwErrnoIfMinus1NoRetry "epollWait" $ + c_epoll_wait epfd events (fromIntegral numEvents) (fromIntegral timeout) + +fromEvent :: E.Event -> EventType +fromEvent e = remap E.evtRead epollIn .|. + remap E.evtWrite epollOut + where remap evt to + | e `E.eventIs` evt = to + | otherwise = 0 + +toEvent :: EventType -> E.Event +toEvent e = remap (epollIn .|. epollErr .|. epollHup) E.evtRead `mappend` + remap (epollOut .|. epollErr .|. epollHup) E.evtWrite + where remap evt to + | e .&. evt /= 0 = to + | otherwise = mempty + +fromTimeout :: Timeout -> Int +fromTimeout Forever = -1 +fromTimeout (Timeout s) = ceiling $ 1000 * s + +#if defined(HAVE_EPOLL_CREATE1) +foreign import ccall unsafe "sys/epoll.h epoll_create1" + c_epoll_create1 :: CInt -> IO CInt +#else +foreign import ccall unsafe "sys/epoll.h epoll_create" + c_epoll_create :: CInt -> IO CInt +#endif + +foreign import ccall unsafe "sys/epoll.h epoll_ctl" + c_epoll_ctl :: CInt -> CInt -> CInt -> Ptr Event -> IO CInt + +foreign import ccall safe "sys/epoll.h epoll_wait" + c_epoll_wait :: CInt -> Ptr Event -> CInt -> CInt -> IO CInt + +#endif /* defined(HAVE_EPOLL) */ diff --git a/System/Event/IntMap.hs b/System/Event/IntMap.hs new file mode 100644 index 0000000..f02628b --- /dev/null +++ b/System/Event/IntMap.hs @@ -0,0 +1,374 @@ +{-# LANGUAGE CPP, MagicHash, NoImplicitPrelude #-} +----------------------------------------------------------------------------- +-- | +-- Module : System.Event.IntMap +-- Copyright : (c) Daan Leijen 2002 +-- (c) Andriy Palamarchuk 2008 +-- License : BSD-style +-- Maintainer : libraries@haskell.org +-- Stability : provisional +-- Portability : portable +-- +-- An efficient implementation of maps from integer keys to values. +-- +-- Since many function names (but not the type name) clash with +-- "Prelude" names, this module is usually imported @qualified@, e.g. +-- +-- > import Data.IntMap (IntMap) +-- > import qualified Data.IntMap as IntMap +-- +-- The implementation is based on /big-endian patricia trees/. This data +-- structure performs especially well on binary operations like 'union' +-- and 'intersection'. However, my benchmarks show that it is also +-- (much) faster on insertions and deletions when compared to a generic +-- size-balanced map implementation (see "Data.Map"). +-- +-- * Chris Okasaki and Andy Gill, \"/Fast Mergeable Integer Maps/\", +-- Workshop on ML, September 1998, pages 77-86, +-- +-- +-- * D.R. Morrison, \"/PATRICIA -- Practical Algorithm To Retrieve +-- Information Coded In Alphanumeric/\", Journal of the ACM, 15(4), +-- October 1968, pages 514-534. +-- +-- Operation comments contain the operation time complexity in +-- the Big-O notation . +-- Many operations have a worst-case complexity of /O(min(n,W))/. +-- This means that the operation can become linear in the number of +-- elements with a maximum of /W/ -- the number of bits in an 'Int' +-- (32 or 64). +----------------------------------------------------------------------------- + +module System.Event.IntMap + ( + -- * Map type + IntMap + , Key + + -- * Query + , lookup + , member + + -- * Construction + , empty + + -- * Insertion + , insertWith + + -- * Delete\/Update + , delete + , updateWith + + -- * Traversal + -- ** Fold + , foldWithKey + + -- * Conversion + , keys + ) where + +import Data.Bits + +import Data.Maybe (Maybe(..)) +import GHC.Base hiding (foldr) +import GHC.Num (Num(..)) +import GHC.Real (fromIntegral) +import GHC.Show (Show(showsPrec), showParen, shows, showString) + +#if __GLASGOW_HASKELL__ +import GHC.Word (Word(..)) +#else +import Data.Word +#endif + +-- | A @Nat@ is a natural machine word (an unsigned Int) +type Nat = Word + +natFromInt :: Key -> Nat +natFromInt i = fromIntegral i + +intFromNat :: Nat -> Key +intFromNat w = fromIntegral w + +shiftRL :: Nat -> Key -> Nat +#if __GLASGOW_HASKELL__ +-- GHC: use unboxing to get @shiftRL@ inlined. +shiftRL (W# x) (I# i) = W# (shiftRL# x i) +#else +shiftRL x i = shiftR x i +#endif + +------------------------------------------------------------------------ +-- Types + +-- | A map of integers to values @a@. +data IntMap a = Nil + | Tip {-# UNPACK #-} !Key !a + | Bin {-# UNPACK #-} !Prefix + {-# UNPACK #-} !Mask + !(IntMap a) + !(IntMap a) + +type Prefix = Int +type Mask = Int +type Key = Int + +------------------------------------------------------------------------ +-- Query + +-- | /O(min(n,W))/ Lookup the value at a key in the map. See also +-- 'Data.Map.lookup'. +lookup :: Key -> IntMap a -> Maybe a +lookup k t = let nk = natFromInt k in seq nk (lookupN nk t) + +lookupN :: Nat -> IntMap a -> Maybe a +lookupN k t + = case t of + Bin _ m l r + | zeroN k (natFromInt m) -> lookupN k l + | otherwise -> lookupN k r + Tip kx x + | (k == natFromInt kx) -> Just x + | otherwise -> Nothing + Nil -> Nothing + +-- | /O(min(n,W))/. Is the key a member of the map? +-- +-- > member 5 (fromList [(5,'a'), (3,'b')]) == True +-- > member 1 (fromList [(5,'a'), (3,'b')]) == False + +member :: Key -> IntMap a -> Bool +member k m + = case lookup k m of + Nothing -> False + Just _ -> True + +------------------------------------------------------------------------ +-- Construction + +-- | /O(1)/ The empty map. +-- +-- > empty == fromList [] +-- > size empty == 0 +empty :: IntMap a +empty = Nil + +------------------------------------------------------------------------ +-- Insert + +-- | /O(min(n,W))/ Insert with a function, combining new value and old +-- value. @insertWith f key value mp@ will insert the pair (key, +-- value) into @mp@ if key does not exist in the map. If the key does +-- exist, the function will insert the pair (key, f new_value +-- old_value). The result is a pair where the first element is the +-- old value, if one was present, and the second is the modified map. +insertWith :: (a -> a -> a) -> Key -> a -> IntMap a -> (Maybe a, IntMap a) +insertWith f k x t = case t of + Bin p m l r + | nomatch k p m -> (Nothing, join k (Tip k x) p t) + | zero k m -> let (found, l') = insertWith f k x l + in (found, Bin p m l' r) + | otherwise -> let (found, r') = insertWith f k x r + in (found, Bin p m l r') + Tip ky y + | k == ky -> (Just y, Tip k (f x y)) + | otherwise -> (Nothing, join k (Tip k x) ky t) + Nil -> (Nothing, Tip k x) + + +------------------------------------------------------------------------ +-- Delete/Update + +-- | /O(min(n,W))/. Delete a key and its value from the map. When the +-- key is not a member of the map, the original map is returned. The +-- result is a pair where the first element is the value associated +-- with the deleted key, if one existed, and the second element is the +-- modified map. +delete :: Key -> IntMap a -> (Maybe a, IntMap a) +delete k t = case t of + Bin p m l r + | nomatch k p m -> (Nothing, t) + | zero k m -> let (found, l') = delete k l + in (found, bin p m l' r) + | otherwise -> let (found, r') = delete k r + in (found, bin p m l r') + Tip ky y + | k == ky -> (Just y, Nil) + | otherwise -> (Nothing, t) + Nil -> (Nothing, Nil) + +updateWith :: (a -> Maybe a) -> Key -> IntMap a -> (Maybe a, IntMap a) +updateWith f k t = case t of + Bin p m l r + | nomatch k p m -> (Nothing, t) + | zero k m -> let (found, l') = updateWith f k l + in (found, bin p m l' r) + | otherwise -> let (found, r') = updateWith f k r + in (found, bin p m l r') + Tip ky y + | k == ky -> case (f y) of + Just y' -> (Just y, Tip ky y') + Nothing -> (Just y, Nil) + | otherwise -> (Nothing, t) + Nil -> (Nothing, Nil) +-- | /O(n)/. Fold the keys and values in the map, such that +-- @'foldWithKey' f z == 'Prelude.foldr' ('uncurry' f) z . 'toAscList'@. +-- For example, +-- +-- > keys map = foldWithKey (\k x ks -> k:ks) [] map +-- +-- > let f k a result = result ++ "(" ++ (show k) ++ ":" ++ a ++ ")" +-- > foldWithKey f "Map: " (fromList [(5,"a"), (3,"b")]) == "Map: (5:a)(3:b)" + +foldWithKey :: (Key -> a -> b -> b) -> b -> IntMap a -> b +foldWithKey f z t + = foldr f z t + +-- | /O(n)/. Convert the map to a list of key\/value pairs. +-- +-- > toList (fromList [(5,"a"), (3,"b")]) == [(3,"b"), (5,"a")] +-- > toList empty == [] + +toList :: IntMap a -> [(Key,a)] +toList t + = foldWithKey (\k x xs -> (k,x):xs) [] t + +foldr :: (Key -> a -> b -> b) -> b -> IntMap a -> b +foldr f z t + = case t of + Bin 0 m l r | m < 0 -> foldr' f (foldr' f z l) r -- put negative numbers before. + Bin _ _ _ _ -> foldr' f z t + Tip k x -> f k x z + Nil -> z + +foldr' :: (Key -> a -> b -> b) -> b -> IntMap a -> b +foldr' f z t + = case t of + Bin _ _ l r -> foldr' f (foldr' f z r) l + Tip k x -> f k x z + Nil -> z + +-- | /O(n)/. Return all keys of the map in ascending order. +-- +-- > keys (fromList [(5,"a"), (3,"b")]) == [3,5] +-- > keys empty == [] + +keys :: IntMap a -> [Key] +keys m + = foldWithKey (\k _ ks -> k:ks) [] m + +------------------------------------------------------------------------ +-- Eq + +instance Eq a => Eq (IntMap a) where + t1 == t2 = equal t1 t2 + t1 /= t2 = nequal t1 t2 + +equal :: Eq a => IntMap a -> IntMap a -> Bool +equal (Bin p1 m1 l1 r1) (Bin p2 m2 l2 r2) + = (m1 == m2) && (p1 == p2) && (equal l1 l2) && (equal r1 r2) +equal (Tip kx x) (Tip ky y) + = (kx == ky) && (x==y) +equal Nil Nil = True +equal _ _ = False + +nequal :: Eq a => IntMap a -> IntMap a -> Bool +nequal (Bin p1 m1 l1 r1) (Bin p2 m2 l2 r2) + = (m1 /= m2) || (p1 /= p2) || (nequal l1 l2) || (nequal r1 r2) +nequal (Tip kx x) (Tip ky y) + = (kx /= ky) || (x/=y) +nequal Nil Nil = False +nequal _ _ = True + +instance Show a => Show (IntMap a) where + showsPrec d m = showParen (d > 10) $ + showString "fromList " . shows (toList m) + +------------------------------------------------------------------------ +-- Utility functions + +join :: Prefix -> IntMap a -> Prefix -> IntMap a -> IntMap a +join p1 t1 p2 t2 + | zero p1 m = Bin p m t1 t2 + | otherwise = Bin p m t2 t1 + where + m = branchMask p1 p2 + p = mask p1 m + +-- | @bin@ assures that we never have empty trees within a tree. +bin :: Prefix -> Mask -> IntMap a -> IntMap a -> IntMap a +bin _ _ l Nil = l +bin _ _ Nil r = r +bin p m l r = Bin p m l r + +------------------------------------------------------------------------ +-- Endian independent bit twiddling + +zero :: Key -> Mask -> Bool +zero i m = (natFromInt i) .&. (natFromInt m) == 0 + +nomatch :: Key -> Prefix -> Mask -> Bool +nomatch i p m = (mask i m) /= p + +mask :: Key -> Mask -> Prefix +mask i m = maskW (natFromInt i) (natFromInt m) + +zeroN :: Nat -> Nat -> Bool +zeroN i m = (i .&. m) == 0 + +------------------------------------------------------------------------ +-- Big endian operations + +maskW :: Nat -> Nat -> Prefix +maskW i m = intFromNat (i .&. (complement (m-1) `xor` m)) + +branchMask :: Prefix -> Prefix -> Mask +branchMask p1 p2 + = intFromNat (highestBitMask (natFromInt p1 `xor` natFromInt p2)) + +{- +Finding the highest bit mask in a word [x] can be done efficiently in +three ways: + +* convert to a floating point value and the mantissa tells us the + [log2(x)] that corresponds with the highest bit position. The mantissa + is retrieved either via the standard C function [frexp] or by some bit + twiddling on IEEE compatible numbers (float). Note that one needs to + use at least [double] precision for an accurate mantissa of 32 bit + numbers. + +* use bit twiddling, a logarithmic sequence of bitwise or's and shifts (bit). + +* use processor specific assembler instruction (asm). + +The most portable way would be [bit], but is it efficient enough? +I have measured the cycle counts of the different methods on an AMD +Athlon-XP 1800 (~ Pentium III 1.8Ghz) using the RDTSC instruction: + +highestBitMask: method cycles + -------------- + frexp 200 + float 33 + bit 11 + asm 12 + +Wow, the bit twiddling is on today's RISC like machines even faster +than a single CISC instruction (BSR)! +-} + +-- | @highestBitMask@ returns a word where only the highest bit is +-- set. It is found by first setting all bits in lower positions than +-- the highest bit and than taking an exclusive or with the original +-- value. Allthough the function may look expensive, GHC compiles +-- this into excellent C code that subsequently compiled into highly +-- efficient machine code. The algorithm is derived from Jorg Arndt's +-- FXT library. +highestBitMask :: Nat -> Nat +highestBitMask x0 + = case (x0 .|. shiftRL x0 1) of + x1 -> case (x1 .|. shiftRL x1 2) of + x2 -> case (x2 .|. shiftRL x2 4) of + x3 -> case (x3 .|. shiftRL x3 8) of + x4 -> case (x4 .|. shiftRL x4 16) of + x5 -> case (x5 .|. shiftRL x5 32) of -- for 64 bit platforms + x6 -> (x6 `xor` (shiftRL x6 1)) diff --git a/System/Event/Internal.hs b/System/Event/Internal.hs new file mode 100644 index 0000000..cbe961d --- /dev/null +++ b/System/Event/Internal.hs @@ -0,0 +1,129 @@ +{-# LANGUAGE ExistentialQuantification, NoImplicitPrelude #-} + +module System.Event.Internal + ( + -- * Event back end + Backend + , backend + , delete + , poll + , modifyFd + -- * Event type + , Event + , evtRead + , evtWrite + , eventIs + -- * Timeout type + , Timeout(..) + -- * Helpers + , throwErrnoIfMinus1NoRetry + ) where + +import Data.Bits ((.|.), (.&.)) +import Data.List (foldl', intercalate) +import Data.Monoid (Monoid(..)) +import Foreign.C.Error (eINTR, getErrno, throwErrno) +import System.Posix.Types (Fd) +import GHC.Base +import GHC.Num (Num(..)) +import GHC.Show (Show(..)) +import GHC.List (filter, null) + +-- | An I/O event. +newtype Event = Event Int + deriving (Eq) + +evtNothing :: Event +evtNothing = Event 0 +{-# INLINE evtNothing #-} + +evtRead :: Event +evtRead = Event 1 +{-# INLINE evtRead #-} + +evtWrite :: Event +evtWrite = Event 2 +{-# INLINE evtWrite #-} + +eventIs :: Event -> Event -> Bool +eventIs (Event a) (Event b) = a .&. b /= 0 + +instance Show Event where + show e = '[' : (intercalate "," . filter (not . null) $ + [evtRead `so` "evtRead", evtWrite `so` "evtWrite"]) ++ "]" + where ev `so` disp | e `eventIs` ev = disp + | otherwise = "" + +instance Monoid Event where + mempty = evtNothing + mappend = evtCombine + mconcat = evtConcat + +evtCombine :: Event -> Event -> Event +evtCombine (Event a) (Event b) = Event (a .|. b) +{-# INLINE evtCombine #-} + +evtConcat :: [Event] -> Event +evtConcat = foldl' evtCombine evtNothing +{-# INLINE evtConcat #-} + +-- | A type alias for timeouts, specified in seconds. +data Timeout = Timeout {-# UNPACK #-} !Double + | Forever + deriving (Show) + +-- | Event notification backend. +data Backend = forall a. Backend { + _beState :: !a + + -- | Poll backend for new events. The provided callback is called + -- once per file descriptor with new events. + , _bePoll :: a -- backend state + -> Timeout -- timeout in milliseconds + -> (Fd -> Event -> IO ()) -- I/O callback + -> IO () + + -- | Register, modify, or unregister interest in the given events + -- on the given file descriptor. + , _beModifyFd :: a + -> Fd -- file descriptor + -> Event -- old events to watch for ('mempty' for new) + -> Event -- new events to watch for ('mempty' to delete) + -> IO () + + , _beDelete :: a -> IO () + } + +backend :: (a -> Timeout -> (Fd -> Event -> IO ()) -> IO ()) + -> (a -> Fd -> Event -> Event -> IO ()) + -> (a -> IO ()) + -> a + -> Backend +backend bPoll bModifyFd bDelete state = Backend state bPoll bModifyFd bDelete +{-# INLINE backend #-} + +poll :: Backend -> Timeout -> (Fd -> Event -> IO ()) -> IO () +poll (Backend bState bPoll _ _) = bPoll bState +{-# INLINE poll #-} + +modifyFd :: Backend -> Fd -> Event -> Event -> IO () +modifyFd (Backend bState _ bModifyFd _) = bModifyFd bState +{-# INLINE modifyFd #-} + +delete :: Backend -> IO () +delete (Backend bState _ _ bDelete) = bDelete bState +{-# INLINE delete #-} + +-- | Throw an 'IOError' corresponding to the current value of +-- 'getErrno' if the result value of the 'IO' action is -1 and +-- 'getErrno' is not 'eINTR'. If the result value is -1 and +-- 'getErrno' returns 'eINTR' 0 is returned. Otherwise the result +-- value is returned. +throwErrnoIfMinus1NoRetry :: Num a => String -> IO a -> IO a +throwErrnoIfMinus1NoRetry loc f = do + res <- f + if res == -1 + then do + err <- getErrno + if err == eINTR then return 0 else throwErrno loc + else return res diff --git a/System/Event/KQueue.hsc b/System/Event/KQueue.hsc new file mode 100644 index 0000000..b272bcf --- /dev/null +++ b/System/Event/KQueue.hsc @@ -0,0 +1,296 @@ +{-# LANGUAGE ForeignFunctionInterface, GeneralizedNewtypeDeriving, + NoImplicitPrelude, RecordWildCards #-} + +module System.Event.KQueue + ( + new + , available + ) where + +import qualified System.Event.Internal as E + +#include "EventConfig.h" +#if !defined(HAVE_KQUEUE) +import GHC.Base + +new :: IO E.Backend +new = error "KQueue back end not implemented for this platform" + +available :: Bool +available = False +{-# INLINE available #-} +#else + +import Control.Concurrent.MVar (MVar, newMVar, swapMVar, withMVar) +import Control.Monad (when, unless) +import Data.Bits (Bits(..)) +import Data.Word (Word16, Word32) +import Foreign.C.Error (throwErrnoIfMinus1) +import Foreign.C.Types (CInt, CIntPtr, CLong, CTime, CUIntPtr) +import Foreign.Marshal.Alloc (alloca) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (Storable(..)) +import GHC.Base +import GHC.Enum (toEnum) +import GHC.Err (undefined) +import GHC.Num (Num(..)) +import GHC.Real (ceiling, floor, fromIntegral) +import GHC.Show (Show(show)) +import System.Event.Internal (Timeout(..)) +import System.Posix.Internals (c_close) +import System.Posix.Types (Fd(..)) +import qualified System.Event.Array as A + +#if defined(HAVE_KEVENT64) +import Data.Int (Int64) +import Data.Word (Word64) +import Foreign.C.Types (CUInt) +#endif + +#include +#include +#include + +-- Handle brokenness on some BSD variants, notably OS X up to at least +-- 10.6. If NOTE_EOF isn't available, we have no way to receive a +-- notification from the kernel when we reach EOF on a plain file. +#ifndef NOTE_EOF +# define NOTE_EOF 0 +#endif + +available :: Bool +available = True +{-# INLINE available #-} + +------------------------------------------------------------------------ +-- Exported interface + +data EventQueue = EventQueue { + eqFd :: {-# UNPACK #-} !QueueFd + , eqChanges :: {-# UNPACK #-} !(MVar (A.Array Event)) + , eqEvents :: {-# UNPACK #-} !(A.Array Event) + } + +new :: IO E.Backend +new = do + qfd <- kqueue + changesArr <- A.empty + changes <- newMVar changesArr + events <- A.new 64 + let !be = E.backend poll modifyFd delete (EventQueue qfd changes events) + return be + +delete :: EventQueue -> IO () +delete q = do + _ <- c_close . fromQueueFd . eqFd $ q + return () + +modifyFd :: EventQueue -> Fd -> E.Event -> E.Event -> IO () +modifyFd q fd oevt nevt = withMVar (eqChanges q) $ \ch -> do + let addChange filt flag = A.snoc ch $ event fd filt flag noteEOF + when (oevt `E.eventIs` E.evtRead) $ addChange filterRead flagDelete + when (oevt `E.eventIs` E.evtWrite) $ addChange filterWrite flagDelete + when (nevt `E.eventIs` E.evtRead) $ addChange filterRead flagAdd + when (nevt `E.eventIs` E.evtWrite) $ addChange filterWrite flagAdd + +poll :: EventQueue + -> Timeout + -> (Fd -> E.Event -> IO ()) + -> IO () +poll EventQueue{..} tout f = do + changesArr <- A.empty + changes <- swapMVar eqChanges changesArr + changesLen <- A.length changes + len <- A.length eqEvents + when (changesLen > len) $ A.ensureCapacity eqEvents (2 * changesLen) + n <- A.useAsPtr changes $ \changesPtr chLen -> + A.unsafeLoad eqEvents $ \evPtr evCap -> + withTimeSpec (fromTimeout tout) $ + kevent eqFd changesPtr chLen evPtr evCap + + unless (n == 0) $ do + cap <- A.capacity eqEvents + when (n == cap) $ A.ensureCapacity eqEvents (2 * cap) + A.forM_ eqEvents $ \e -> f (fromIntegral (ident e)) (toEvent (filter e)) + +------------------------------------------------------------------------ +-- FFI binding + +newtype QueueFd = QueueFd { + fromQueueFd :: CInt + } deriving (Eq, Show) + +#if defined(HAVE_KEVENT64) +data Event = KEvent64 { + ident :: {-# UNPACK #-} !Word64 + , filter :: {-# UNPACK #-} !Filter + , flags :: {-# UNPACK #-} !Flag + , fflags :: {-# UNPACK #-} !FFlag + , data_ :: {-# UNPACK #-} !Int64 + , udata :: {-# UNPACK #-} !Word64 + , ext0 :: {-# UNPACK #-} !Word64 + , ext1 :: {-# UNPACK #-} !Word64 + } deriving Show + +event :: Fd -> Filter -> Flag -> FFlag -> Event +event fd filt flag fflag = KEvent64 (fromIntegral fd) filt flag fflag 0 0 0 0 + +instance Storable Event where + sizeOf _ = #size struct kevent64_s + alignment _ = alignment (undefined :: CInt) + + peek ptr = do + ident' <- #{peek struct kevent64_s, ident} ptr + filter' <- #{peek struct kevent64_s, filter} ptr + flags' <- #{peek struct kevent64_s, flags} ptr + fflags' <- #{peek struct kevent64_s, fflags} ptr + data' <- #{peek struct kevent64_s, data} ptr + udata' <- #{peek struct kevent64_s, udata} ptr + ext0' <- #{peek struct kevent64_s, ext[0]} ptr + ext1' <- #{peek struct kevent64_s, ext[1]} ptr + let !ev = KEvent64 ident' (Filter filter') (Flag flags') fflags' data' + udata' ext0' ext1' + return ev + + poke ptr ev = do + #{poke struct kevent64_s, ident} ptr (ident ev) + #{poke struct kevent64_s, filter} ptr (filter ev) + #{poke struct kevent64_s, flags} ptr (flags ev) + #{poke struct kevent64_s, fflags} ptr (fflags ev) + #{poke struct kevent64_s, data} ptr (data_ ev) + #{poke struct kevent64_s, udata} ptr (udata ev) + #{poke struct kevent64_s, ext[0]} ptr (ext0 ev) + #{poke struct kevent64_s, ext[1]} ptr (ext1 ev) +#else +data Event = KEvent { + ident :: {-# UNPACK #-} !CUIntPtr + , filter :: {-# UNPACK #-} !Filter + , flags :: {-# UNPACK #-} !Flag + , fflags :: {-# UNPACK #-} !FFlag + , data_ :: {-# UNPACK #-} !CIntPtr + , udata :: {-# UNPACK #-} !(Ptr ()) + } deriving Show + +event :: Fd -> Filter -> Flag -> FFlag -> Event +event fd filt flag fflag = KEvent (fromIntegral fd) filt flag fflag 0 nullPtr + +instance Storable Event where + sizeOf _ = #size struct kevent + alignment _ = alignment (undefined :: CInt) + + peek ptr = do + ident' <- #{peek struct kevent, ident} ptr + filter' <- #{peek struct kevent, filter} ptr + flags' <- #{peek struct kevent, flags} ptr + fflags' <- #{peek struct kevent, fflags} ptr + data' <- #{peek struct kevent, data} ptr + udata' <- #{peek struct kevent, udata} ptr + let !ev = KEvent ident' (Filter filter') (Flag flags') fflags' data' + udata' + return ev + + poke ptr ev = do + #{poke struct kevent, ident} ptr (ident ev) + #{poke struct kevent, filter} ptr (filter ev) + #{poke struct kevent, flags} ptr (flags ev) + #{poke struct kevent, fflags} ptr (fflags ev) + #{poke struct kevent, data} ptr (data_ ev) + #{poke struct kevent, udata} ptr (udata ev) +#endif + +newtype FFlag = FFlag Word32 + deriving (Eq, Show, Storable) + +#{enum FFlag, FFlag + , noteEOF = NOTE_EOF + } + +newtype Flag = Flag Word16 + deriving (Eq, Show, Storable) + +#{enum Flag, Flag + , flagAdd = EV_ADD + , flagDelete = EV_DELETE + } + +newtype Filter = Filter Word16 + deriving (Bits, Eq, Num, Show, Storable) + +#{enum Filter, Filter + , filterRead = EVFILT_READ + , filterWrite = EVFILT_WRITE + } + +data TimeSpec = TimeSpec { + tv_sec :: {-# UNPACK #-} !CTime + , tv_nsec :: {-# UNPACK #-} !CLong + } + +instance Storable TimeSpec where + sizeOf _ = #size struct timespec + alignment _ = alignment (undefined :: CInt) + + peek ptr = do + tv_sec' <- #{peek struct timespec, tv_sec} ptr + tv_nsec' <- #{peek struct timespec, tv_nsec} ptr + let !ts = TimeSpec tv_sec' tv_nsec' + return ts + + poke ptr ts = do + #{poke struct timespec, tv_sec} ptr (tv_sec ts) + #{poke struct timespec, tv_nsec} ptr (tv_nsec ts) + +kqueue :: IO QueueFd +kqueue = QueueFd `fmap` throwErrnoIfMinus1 "kqueue" c_kqueue + +-- TODO: We cannot retry on EINTR as the timeout would be wrong. +-- Perhaps we should just return without calling any callbacks. +kevent :: QueueFd -> Ptr Event -> Int -> Ptr Event -> Int -> Ptr TimeSpec + -> IO Int +kevent k chs chlen evs evlen ts + = fmap fromIntegral $ E.throwErrnoIfMinus1NoRetry "kevent" $ +#if defined(HAVE_KEVENT64) + c_kevent64 k chs (fromIntegral chlen) evs (fromIntegral evlen) 0 ts +#else + c_kevent k chs (fromIntegral chlen) evs (fromIntegral evlen) ts +#endif + +withTimeSpec :: TimeSpec -> (Ptr TimeSpec -> IO a) -> IO a +withTimeSpec ts f = + if tv_sec ts < 0 then + f nullPtr + else + alloca $ \ptr -> poke ptr ts >> f ptr + +fromTimeout :: Timeout -> TimeSpec +fromTimeout Forever = TimeSpec (-1) (-1) +fromTimeout (Timeout s) = TimeSpec (toEnum sec) (toEnum nanosec) + where + sec :: Int + sec = floor s + + nanosec :: Int + nanosec = ceiling $ (s - fromIntegral sec) * 1000000000 + +toEvent :: Filter -> E.Event +toEvent (Filter f) + | f == (#const EVFILT_READ) = E.evtRead + | f == (#const EVFILT_WRITE) = E.evtWrite + | otherwise = error $ "toEvent: unknonwn filter " ++ show f + +foreign import ccall unsafe "kqueue" + c_kqueue :: IO CInt + +#if defined(HAVE_KEVENT64) +foreign import ccall safe "kevent64" + c_kevent64 :: QueueFd -> Ptr Event -> CInt -> Ptr Event -> CInt -> CUInt + -> Ptr TimeSpec -> IO CInt +#elif defined(HAVE_KEVENT) +foreign import ccall safe "kevent" + c_kevent :: QueueFd -> Ptr Event -> CInt -> Ptr Event -> CInt + -> Ptr TimeSpec -> IO CInt +#else +#error no kevent system call available!? +#endif + +#endif /* defined(HAVE_KQUEUE) */ diff --git a/System/Event/Manager.hs b/System/Event/Manager.hs new file mode 100644 index 0000000..dfa99f5 --- /dev/null +++ b/System/Event/Manager.hs @@ -0,0 +1,391 @@ +{-# LANGUAGE BangPatterns, CPP, ExistentialQuantification, NoImplicitPrelude, + RecordWildCards, TypeSynonymInstances #-} +module System.Event.Manager + ( -- * Types + EventManager + + -- * Creation + , new + , newWith + , newDefaultBackend + + -- * Running + , finished + , loop + , step + , shutdown + , wakeManager + + -- * Registering interest in I/O events + , Event + , evtRead + , evtWrite + , IOCallback + , FdKey(keyFd) + , registerFd_ + , registerFd + , unregisterFd_ + , unregisterFd + , fdWasClosed + + -- * Registering interest in timeout events + , TimeoutCallback + , TimeoutKey + , registerTimeout + , updateTimeout + , unregisterTimeout + ) where + +#include "EventConfig.h" + +------------------------------------------------------------------------ +-- Imports + +import Control.Concurrent.MVar (MVar, modifyMVar, modifyMVar_, newMVar, + readMVar) +import Control.Exception (finally) +import Control.Monad ((=<<), forM_, liftM, sequence_, when) +import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef, + writeIORef) +import Data.Maybe (Maybe(..)) +import Data.Monoid (mconcat, mempty) +import GHC.Base +import GHC.Conc.Signal (runHandlers) +import GHC.List (filter) +import GHC.Num (Num(..)) +import GHC.Real ((/), fromIntegral, fromRational) +import GHC.Show (Show(..)) +import System.Event.Clock (getCurrentTime) +import System.Event.Control +import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..)) +import System.Event.Unique (Unique, UniqueSource, newSource, newUnique) +import System.Posix.Types (Fd) + +import qualified System.Event.IntMap as IM +import qualified System.Event.Internal as I +import qualified System.Event.PSQ as Q + +#if defined(HAVE_KQUEUE) +import qualified System.Event.KQueue as KQueue +#elif defined(HAVE_EPOLL) +import qualified System.Event.EPoll as EPoll +#elif defined(HAVE_POLL) +import qualified System.Event.Poll as Poll +#else +# error not implemented for this operating system +#endif + +------------------------------------------------------------------------ +-- Types + +data FdData = FdData { + fdKey :: {-# UNPACK #-} !FdKey + , fdEvents :: {-# UNPACK #-} !Event + , _fdCallback :: !IOCallback + } deriving (Show) + +-- | A file descriptor registration cookie. +data FdKey = FdKey { + keyFd :: {-# UNPACK #-} !Fd + , keyUnique :: {-# UNPACK #-} !Unique + } deriving (Eq, Show) + +-- | Callback invoked on I/O events. +type IOCallback = FdKey -> Event -> IO () + +instance Show IOCallback where + show _ = "IOCallback" + +newtype TimeoutKey = TK Unique + deriving (Eq) + +-- | Callback invoked on timeout events. +type TimeoutCallback = IO () + +data State = Created + | Running + | Dying + | Finished + deriving (Eq, Show) + +-- | A priority search queue, with timeouts as priorities. +type TimeoutQueue = Q.PSQ TimeoutCallback + +{- +Instead of directly modifying the 'TimeoutQueue' in +e.g. 'registerTimeout' we keep a list of edits to perform, in the form +of a chain of function closures, and have the I/O manager thread +perform the edits later. This exist to address the following GC +problem: + +Since e.g. 'registerTimeout' doesn't force the evaluation of the +thunks inside the 'emTimeouts' IORef a number of thunks build up +inside the IORef. If the I/O manager thread doesn't evaluate these +thunks soon enough they'll get promoted to the old generation and +become roots for all subsequent minor GCs. + +When the thunks eventually get evaluated they will each create a new +intermediate 'TimeoutQueue' that immediately becomes garbage. Since +the thunks serve as roots until the next major GC these intermediate +'TimeoutQueue's will get copied unnecesarily in the next minor GC, +increasing GC time. This problem is known as "floating garbage". + +Keeping a list of edits doesn't stop this from happening but makes the +amount of data that gets copied smaller. + +TODO: Evaluate the content of the IORef to WHNF on each insert once +this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838 +-} + +-- | An edit to apply to a 'TimeoutQueue'. +type TimeoutEdit = TimeoutQueue -> TimeoutQueue + +-- | The event manager state. +data EventManager = EventManager + { emBackend :: !Backend + , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData])) + , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit) + , emState :: {-# UNPACK #-} !(IORef State) + , emUniqueSource :: {-# UNPACK #-} !UniqueSource + , emControl :: {-# UNPACK #-} !Control + } + +------------------------------------------------------------------------ +-- Creation + +handleControlEvent :: EventManager -> FdKey -> Event -> IO () +handleControlEvent mgr reg _evt = do + msg <- readControlMessage (emControl mgr) (keyFd reg) + case msg of + CMsgWakeup -> return () + CMsgDie -> writeIORef (emState mgr) Finished + CMsgSignal fp s -> runHandlers fp s + +newDefaultBackend :: IO Backend +#if defined(HAVE_KQUEUE) +newDefaultBackend = KQueue.new +#elif defined(HAVE_EPOLL) +newDefaultBackend = EPoll.new +#elif defined(HAVE_POLL) +newDefaultBackend = Poll.new +#else +newDefaultBackend = error "no back end for this platform" +#endif + +-- | Create a new event manager. +new :: IO EventManager +new = newWith =<< newDefaultBackend + +newWith :: Backend -> IO EventManager +newWith be = do + iofds <- newMVar IM.empty + timeouts <- newIORef id + ctrl <- newControl + state <- newIORef Created + us <- newSource + _ <- mkWeakIORef state $ do + st <- atomicModifyIORef state $ \s -> (Finished, s) + when (st /= Finished) $ do + I.delete be + closeControl ctrl + let mgr = EventManager { emBackend = be + , emFds = iofds + , emTimeouts = timeouts + , emState = state + , emUniqueSource = us + , emControl = ctrl + } + _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead + _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead + return mgr + +-- | Asynchronously shuts down the event manager, if running. +shutdown :: EventManager -> IO () +shutdown mgr = do + state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s) + when (state == Running) $ sendDie (emControl mgr) + +finished :: EventManager -> IO Bool +finished mgr = (== Finished) `liftM` readIORef (emState mgr) + +cleanup :: EventManager -> IO () +cleanup EventManager{..} = do + writeIORef emState Finished + I.delete emBackend + closeControl emControl + +------------------------------------------------------------------------ +-- Event loop + +-- | Start handling events. This function loops until told to stop. +-- +-- /Note/: This loop can only be run once per 'EventManager', as it +-- closes all of its control resources when it finishes. +loop :: EventManager -> IO () +loop mgr@EventManager{..} = do + state <- atomicModifyIORef emState $ \s -> case s of + Created -> (Running, s) + _ -> (s, s) + case state of + Created -> go Q.empty `finally` cleanup mgr + Dying -> cleanup mgr + _ -> do cleanup mgr + error $ "System.Event.Manager.loop: state is already " ++ + show state + where + go q = do (running, q') <- step mgr q + when running $ go q' + +step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue) +step mgr@EventManager{..} tq = do + (timeout, q') <- mkTimeout tq + I.poll emBackend timeout (onFdEvent mgr) + state <- readIORef emState + state `seq` return (state == Running, q') + where + + -- | Call all expired timer callbacks and return the time to the + -- next timeout. + mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue) + mkTimeout q = do + now <- getCurrentTime + applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f) + let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q' + sequence_ $ map Q.value expired + let timeout = case Q.minView q'' of + Nothing -> Forever + Just (Q.E _ t _, _) -> + -- This value will always be positive since the call + -- to 'atMost' above removed any timeouts <= 'now' + let t' = t - now in t' `seq` Timeout t' + return (timeout, q'') + +------------------------------------------------------------------------ +-- Registering interest in I/O events + +-- | Register interest in the given events, without waking the event +-- manager thread. The 'Bool' return value indicates whether the +-- event manager ought to be woken. +registerFd_ :: EventManager -> IOCallback -> Fd -> Event + -> IO (FdKey, Bool) +registerFd_ EventManager{..} cb fd evs = do + u <- newUnique emUniqueSource + modifyMVar emFds $ \oldMap -> do + let fd' = fromIntegral fd + reg = FdKey fd u + !fdd = FdData reg evs cb + (!newMap, (oldEvs, newEvs)) = + case IM.insertWith (++) fd' [fdd] oldMap of + (Nothing, n) -> (n, (mempty, evs)) + (Just prev, n) -> (n, pairEvents prev newMap fd') + modify = oldEvs /= newEvs + when modify $ I.modifyFd emBackend fd oldEvs newEvs + return (newMap, (reg, modify)) +{-# INLINE registerFd_ #-} + +-- | @registerFd mgr cb fd evs@ registers interest in the events @evs@ +-- on the file descriptor @fd@. @cb@ is called for each event that +-- occurs. Returns a cookie that can be handed to 'unregisterFd'. +registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey +registerFd mgr cb fd evs = do + (r, wake) <- registerFd_ mgr cb fd evs + when wake $ wakeManager mgr + return r +{-# INLINE registerFd #-} + +-- | Wake up the event manager. +wakeManager :: EventManager -> IO () +wakeManager mgr = sendWakeup (emControl mgr) + +eventsOf :: [FdData] -> Event +eventsOf = mconcat . map fdEvents + +pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event) +pairEvents prev m fd = let l = eventsOf prev + r = case IM.lookup fd m of + Nothing -> mempty + Just fds -> eventsOf fds + in (l, r) + +-- | Drop a previous file descriptor registration, without waking the +-- event manager thread. The return value indicates whether the event +-- manager ought to be woken. +unregisterFd_ :: EventManager -> FdKey -> IO Bool +unregisterFd_ EventManager{..} (FdKey fd u) = + modifyMVar emFds $ \oldMap -> do + let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of + [] -> Nothing + cbs' -> Just cbs' + fd' = fromIntegral fd + (!newMap, (oldEvs, newEvs)) = + case IM.updateWith dropReg fd' oldMap of + (Nothing, _) -> (oldMap, (mempty, mempty)) + (Just prev, newm) -> (newm, pairEvents prev newm fd') + modify = oldEvs /= newEvs + when modify $ I.modifyFd emBackend fd oldEvs newEvs + return (newMap, modify) + +-- | Drop a previous file descriptor registration. +unregisterFd :: EventManager -> FdKey -> IO () +unregisterFd mgr reg = do + wake <- unregisterFd_ mgr reg + when wake $ wakeManager mgr + +-- | Notify the event manager that a file descriptor has been closed. +fdWasClosed :: EventManager -> Fd -> IO () +fdWasClosed mgr fd = + modifyMVar_ (emFds mgr) $ \oldMap -> + case IM.delete (fromIntegral fd) oldMap of + (Nothing, _) -> return oldMap + (Just fds, !newMap) -> do + when (eventsOf fds /= mempty) $ wakeManager mgr + return newMap + +------------------------------------------------------------------------ +-- Registering interest in timeout events + +-- | Register a timeout in the given number of milliseconds. +registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey +registerTimeout mgr ms cb = do + !key <- newUnique (emUniqueSource mgr) + if ms <= 0 then cb + else do + now <- getCurrentTime + let expTime = fromIntegral ms / 1000.0 + now + + -- We intentionally do not evaluate the modified map to WHNF here. + -- Instead, we leave a thunk inside the IORef and defer its + -- evaluation until mkTimeout in the event loop. This is a + -- workaround for a nasty IORef contention problem that causes the + -- thread-delay benchmark to take 20 seconds instead of 0.2. + atomicModifyIORef (emTimeouts mgr) $ \f -> + let f' = (Q.insert key expTime cb) . f in (f', ()) + wakeManager mgr + return $ TK key + +unregisterTimeout :: EventManager -> TimeoutKey -> IO () +unregisterTimeout mgr (TK key) = do + atomicModifyIORef (emTimeouts mgr) $ \f -> + let f' = (Q.delete key) . f in (f', ()) + wakeManager mgr + +updateTimeout :: EventManager -> TimeoutKey -> Int -> IO () +updateTimeout mgr (TK key) ms = do + now <- getCurrentTime + let expTime = fromIntegral ms / 1000.0 + now + + atomicModifyIORef (emTimeouts mgr) $ \f -> + let f' = (Q.adjust (const expTime) key) . f in (f', ()) + wakeManager mgr + +------------------------------------------------------------------------ +-- Utilities + +-- | Call the callbacks corresponding to the given file descriptor. +onFdEvent :: EventManager -> Fd -> Event -> IO () +onFdEvent mgr fd evs = do + fds <- readMVar (emFds mgr) + case IM.lookup (fromIntegral fd) fds of + Just cbs -> forM_ cbs $ \(FdData reg ev cb) -> + when (evs `I.eventIs` ev) $ cb reg evs + Nothing -> return () diff --git a/System/Event/PSQ.hs b/System/Event/PSQ.hs new file mode 100644 index 0000000..f86be5b --- /dev/null +++ b/System/Event/PSQ.hs @@ -0,0 +1,483 @@ +{-# LANGUAGE BangPatterns, NoImplicitPrelude #-} + +-- Copyright (c) 2008, Ralf Hinze +-- All rights reserved. +-- +-- Redistribution and use in source and binary forms, with or without +-- modification, are permitted provided that the following conditions +-- are met: +-- +-- * Redistributions of source code must retain the above +-- copyright notice, this list of conditions and the following +-- disclaimer. +-- +-- * Redistributions in binary form must reproduce the above +-- copyright notice, this list of conditions and the following +-- disclaimer in the documentation and/or other materials +-- provided with the distribution. +-- +-- * The names of the contributors may not be used to endorse or +-- promote products derived from this software without specific +-- prior written permission. +-- +-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +-- "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +-- LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +-- FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +-- COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +-- INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +-- (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +-- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +-- HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +-- STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +-- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +-- OF THE POSSIBILITY OF SUCH DAMAGE. + +-- | A /priority search queue/ (henceforth /queue/) efficiently +-- supports the operations of both a search tree and a priority queue. +-- An 'Elem'ent is a product of a key, a priority, and a +-- value. Elements can be inserted, deleted, modified and queried in +-- logarithmic time, and the element with the least priority can be +-- retrieved in constant time. A queue can be built from a list of +-- elements, sorted by keys, in linear time. +-- +-- This implementation is due to Ralf Hinze with some modifications by +-- Scott Dillard and Johan Tibell. +-- +-- * Hinze, R., /A Simple Implementation Technique for Priority Search +-- Queues/, ICFP 2001, pp. 110-121 +-- +-- +module System.Event.PSQ + ( + -- * Binding Type + Elem(..) + , Key + , Prio + + -- * Priority Search Queue Type + , PSQ + + -- * Query + , size + , null + , lookup + + -- * Construction + , empty + , singleton + + -- * Insertion + , insert + + -- * Delete/Update + , delete + , adjust + + -- * Conversion + , toList + , toAscList + , toDescList + , fromList + + -- * Min + , findMin + , deleteMin + , minView + , atMost + ) where + +import Data.Maybe (Maybe(..)) +import GHC.Base +import GHC.Num (Num(..)) +import GHC.Show (Show(showsPrec)) +import System.Event.Unique (Unique) + +-- | @E k p@ binds the key @k@ with the priority @p@. +data Elem a = E + { key :: {-# UNPACK #-} !Key + , prio :: {-# UNPACK #-} !Prio + , value :: a + } deriving (Eq, Show) + +------------------------------------------------------------------------ +-- | A mapping from keys @k@ to priorites @p@. + +type Prio = Double +type Key = Unique + +data PSQ a = Void + | Winner {-# UNPACK #-} !(Elem a) + !(LTree a) + {-# UNPACK #-} !Key -- max key + deriving (Eq, Show) + +-- | /O(1)/ The number of elements in a queue. +size :: PSQ a -> Int +size Void = 0 +size (Winner _ lt _) = 1 + size' lt + +-- | /O(1)/ True if the queue is empty. +null :: PSQ a -> Bool +null Void = True +null (Winner _ _ _) = False + +-- | /O(log n)/ The priority and value of a given key, or Nothing if +-- the key is not bound. +lookup :: Key -> PSQ a -> Maybe (Prio, a) +lookup k q = case tourView q of + Null -> Nothing + Single (E k' p v) + | k == k' -> Just (p, v) + | otherwise -> Nothing + tl `Play` tr + | k <= maxKey tl -> lookup k tl + | otherwise -> lookup k tr + +------------------------------------------------------------------------ +-- Construction + +empty :: PSQ a +empty = Void + +-- | /O(1)/ Build a queue with one element. +singleton :: Key -> Prio -> a -> PSQ a +singleton k p v = Winner (E k p v) Start k + +------------------------------------------------------------------------ +-- Insertion + +-- | /O(log n)/ Insert a new key, priority and value in the queue. If +-- the key is already present in the queue, the associated priority +-- and value are replaced with the supplied priority and value. +insert :: Key -> Prio -> a -> PSQ a -> PSQ a +insert k p v q = case q of + Void -> singleton k p v + Winner (E k' p' v') Start _ -> case compare k k' of + LT -> singleton k p v `play` singleton k' p' v' + EQ -> singleton k p v + GT -> singleton k' p' v' `play` singleton k p v + Winner e (RLoser _ e' tl m tr) m' + | k <= m -> insert k p v (Winner e tl m) `play` (Winner e' tr m') + | otherwise -> (Winner e tl m) `play` insert k p v (Winner e' tr m') + Winner e (LLoser _ e' tl m tr) m' + | k <= m -> insert k p v (Winner e' tl m) `play` (Winner e tr m') + | otherwise -> (Winner e' tl m) `play` insert k p v (Winner e tr m') + +------------------------------------------------------------------------ +-- Delete/Update + +-- | /O(log n)/ Delete a key and its priority and value from the +-- queue. When the key is not a member of the queue, the original +-- queue is returned. +delete :: Key -> PSQ a -> PSQ a +delete k q = case q of + Void -> empty + Winner (E k' p v) Start _ + | k == k' -> empty + | otherwise -> singleton k' p v + Winner e (RLoser _ e' tl m tr) m' + | k <= m -> delete k (Winner e tl m) `play` (Winner e' tr m') + | otherwise -> (Winner e tl m) `play` delete k (Winner e' tr m') + Winner e (LLoser _ e' tl m tr) m' + | k <= m -> delete k (Winner e' tl m) `play` (Winner e tr m') + | otherwise -> (Winner e' tl m) `play` delete k (Winner e tr m') + +-- | /O(log n)/ Update a priority at a specific key with the result +-- of the provided function. When the key is not a member of the +-- queue, the original queue is returned. +adjust :: (Prio -> Prio) -> Key -> PSQ a -> PSQ a +adjust f k q0 = go q0 + where + go q = case q of + Void -> empty + Winner (E k' p v) Start _ + | k == k' -> singleton k' (f p) v + | otherwise -> singleton k' p v + Winner e (RLoser _ e' tl m tr) m' + | k <= m -> go (Winner e tl m) `unsafePlay` (Winner e' tr m') + | otherwise -> (Winner e tl m) `unsafePlay` go (Winner e' tr m') + Winner e (LLoser _ e' tl m tr) m' + | k <= m -> go (Winner e' tl m) `unsafePlay` (Winner e tr m') + | otherwise -> (Winner e' tl m) `unsafePlay` go (Winner e tr m') +{-# INLINE adjust #-} + +------------------------------------------------------------------------ +-- Conversion + +-- | /O(n*log n)/ Build a queue from a list of key/priority/value +-- tuples. If the list contains more than one priority and value for +-- the same key, the last priority and value for the key is retained. +fromList :: [Elem a] -> PSQ a +fromList = foldr (\(E k p v) q -> insert k p v q) empty + +-- | /O(n)/ Convert to a list of key/priority/value tuples. +toList :: PSQ a -> [Elem a] +toList = toAscList + +-- | /O(n)/ Convert to an ascending list. +toAscList :: PSQ a -> [Elem a] +toAscList q = seqToList (toAscLists q) + +toAscLists :: PSQ a -> Sequ (Elem a) +toAscLists q = case tourView q of + Null -> emptySequ + Single e -> singleSequ e + tl `Play` tr -> toAscLists tl <> toAscLists tr + +-- | /O(n)/ Convert to a descending list. +toDescList :: PSQ a -> [ Elem a ] +toDescList q = seqToList (toDescLists q) + +toDescLists :: PSQ a -> Sequ (Elem a) +toDescLists q = case tourView q of + Null -> emptySequ + Single e -> singleSequ e + tl `Play` tr -> toDescLists tr <> toDescLists tl + +------------------------------------------------------------------------ +-- Min + +-- | /O(1)/ The element with the lowest priority. +findMin :: PSQ a -> Maybe (Elem a) +findMin Void = Nothing +findMin (Winner e _ _) = Just e + +-- | /O(log n)/ Delete the element with the lowest priority. Returns +-- an empty queue if the queue is empty. +deleteMin :: PSQ a -> PSQ a +deleteMin Void = Void +deleteMin (Winner _ t m) = secondBest t m + +-- | /O(log n)/ Retrieve the binding with the least priority, and the +-- rest of the queue stripped of that binding. +minView :: PSQ a -> Maybe (Elem a, PSQ a) +minView Void = Nothing +minView (Winner e t m) = Just (e, secondBest t m) + +secondBest :: LTree a -> Key -> PSQ a +secondBest Start _ = Void +secondBest (LLoser _ e tl m tr) m' = Winner e tl m `play` secondBest tr m' +secondBest (RLoser _ e tl m tr) m' = secondBest tl m `play` Winner e tr m' + +-- | /O(r*(log n - log r))/ Return a list of elements ordered by +-- key whose priorities are at most @pt@. +atMost :: Prio -> PSQ a -> ([Elem a], PSQ a) +atMost pt q = let (sequ, q') = atMosts pt q + in (seqToList sequ, q') + +atMosts :: Prio -> PSQ a -> (Sequ (Elem a), PSQ a) +atMosts !pt q = case q of + (Winner e _ _) + | prio e > pt -> (emptySequ, q) + Void -> (emptySequ, Void) + Winner e Start _ -> (singleSequ e, Void) + Winner e (RLoser _ e' tl m tr) m' -> + let (sequ, q') = atMosts pt (Winner e tl m) + (sequ', q'') = atMosts pt (Winner e' tr m') + in (sequ <> sequ', q' `play` q'') + Winner e (LLoser _ e' tl m tr) m' -> + let (sequ, q') = atMosts pt (Winner e' tl m) + (sequ', q'') = atMosts pt (Winner e tr m') + in (sequ <> sequ', q' `play` q'') + +------------------------------------------------------------------------ +-- Loser tree + +type Size = Int + +data LTree a = Start + | LLoser {-# UNPACK #-} !Size + {-# UNPACK #-} !(Elem a) + !(LTree a) + {-# UNPACK #-} !Key -- split key + !(LTree a) + | RLoser {-# UNPACK #-} !Size + {-# UNPACK #-} !(Elem a) + !(LTree a) + {-# UNPACK #-} !Key -- split key + !(LTree a) + deriving (Eq, Show) + +size' :: LTree a -> Size +size' Start = 0 +size' (LLoser s _ _ _ _) = s +size' (RLoser s _ _ _ _) = s + +left, right :: LTree a -> LTree a + +left Start = moduleError "left" "empty loser tree" +left (LLoser _ _ tl _ _ ) = tl +left (RLoser _ _ tl _ _ ) = tl + +right Start = moduleError "right" "empty loser tree" +right (LLoser _ _ _ _ tr) = tr +right (RLoser _ _ _ _ tr) = tr + +maxKey :: PSQ a -> Key +maxKey Void = moduleError "maxKey" "empty queue" +maxKey (Winner _ _ m) = m + +lloser, rloser :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +lloser k p v tl m tr = LLoser (1 + size' tl + size' tr) (E k p v) tl m tr +rloser k p v tl m tr = RLoser (1 + size' tl + size' tr) (E k p v) tl m tr + +------------------------------------------------------------------------ +-- Balancing + +-- | Balance factor +omega :: Int +omega = 4 + +lbalance, rbalance :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a + +lbalance k p v l m r + | size' l + size' r < 2 = lloser k p v l m r + | size' r > omega * size' l = lbalanceLeft k p v l m r + | size' l > omega * size' r = lbalanceRight k p v l m r + | otherwise = lloser k p v l m r + +rbalance k p v l m r + | size' l + size' r < 2 = rloser k p v l m r + | size' r > omega * size' l = rbalanceLeft k p v l m r + | size' l > omega * size' r = rbalanceRight k p v l m r + | otherwise = rloser k p v l m r + +lbalanceLeft :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +lbalanceLeft k p v l m r + | size' (left r) < size' (right r) = lsingleLeft k p v l m r + | otherwise = ldoubleLeft k p v l m r + +lbalanceRight :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +lbalanceRight k p v l m r + | size' (left l) > size' (right l) = lsingleRight k p v l m r + | otherwise = ldoubleRight k p v l m r + +rbalanceLeft :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +rbalanceLeft k p v l m r + | size' (left r) < size' (right r) = rsingleLeft k p v l m r + | otherwise = rdoubleLeft k p v l m r + +rbalanceRight :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +rbalanceRight k p v l m r + | size' (left l) > size' (right l) = rsingleRight k p v l m r + | otherwise = rdoubleRight k p v l m r + +lsingleLeft :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +lsingleLeft k1 p1 v1 t1 m1 (LLoser _ (E k2 p2 v2) t2 m2 t3) + | p1 <= p2 = lloser k1 p1 v1 (rloser k2 p2 v2 t1 m1 t2) m2 t3 + | otherwise = lloser k2 p2 v2 (lloser k1 p1 v1 t1 m1 t2) m2 t3 +lsingleLeft k1 p1 v1 t1 m1 (RLoser _ (E k2 p2 v2) t2 m2 t3) = + rloser k2 p2 v2 (lloser k1 p1 v1 t1 m1 t2) m2 t3 +lsingleLeft _ _ _ _ _ _ = moduleError "lsingleLeft" "malformed tree" + +rsingleLeft :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +rsingleLeft k1 p1 v1 t1 m1 (LLoser _ (E k2 p2 v2) t2 m2 t3) = + rloser k1 p1 v1 (rloser k2 p2 v2 t1 m1 t2) m2 t3 +rsingleLeft k1 p1 v1 t1 m1 (RLoser _ (E k2 p2 v2) t2 m2 t3) = + rloser k2 p2 v2 (rloser k1 p1 v1 t1 m1 t2) m2 t3 +rsingleLeft _ _ _ _ _ _ = moduleError "rsingleLeft" "malformed tree" + +lsingleRight :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +lsingleRight k1 p1 v1 (LLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + lloser k2 p2 v2 t1 m1 (lloser k1 p1 v1 t2 m2 t3) +lsingleRight k1 p1 v1 (RLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + lloser k1 p1 v1 t1 m1 (lloser k2 p2 v2 t2 m2 t3) +lsingleRight _ _ _ _ _ _ = moduleError "lsingleRight" "malformed tree" + +rsingleRight :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +rsingleRight k1 p1 v1 (LLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + lloser k2 p2 v2 t1 m1 (rloser k1 p1 v1 t2 m2 t3) +rsingleRight k1 p1 v1 (RLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 + | p1 <= p2 = rloser k1 p1 v1 t1 m1 (lloser k2 p2 v2 t2 m2 t3) + | otherwise = rloser k2 p2 v2 t1 m1 (rloser k1 p1 v1 t2 m2 t3) +rsingleRight _ _ _ _ _ _ = moduleError "rsingleRight" "malformed tree" + +ldoubleLeft :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +ldoubleLeft k1 p1 v1 t1 m1 (LLoser _ (E k2 p2 v2) t2 m2 t3) = + lsingleLeft k1 p1 v1 t1 m1 (lsingleRight k2 p2 v2 t2 m2 t3) +ldoubleLeft k1 p1 v1 t1 m1 (RLoser _ (E k2 p2 v2) t2 m2 t3) = + lsingleLeft k1 p1 v1 t1 m1 (rsingleRight k2 p2 v2 t2 m2 t3) +ldoubleLeft _ _ _ _ _ _ = moduleError "ldoubleLeft" "malformed tree" + +ldoubleRight :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +ldoubleRight k1 p1 v1 (LLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + lsingleRight k1 p1 v1 (lsingleLeft k2 p2 v2 t1 m1 t2) m2 t3 +ldoubleRight k1 p1 v1 (RLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + lsingleRight k1 p1 v1 (rsingleLeft k2 p2 v2 t1 m1 t2) m2 t3 +ldoubleRight _ _ _ _ _ _ = moduleError "ldoubleRight" "malformed tree" + +rdoubleLeft :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +rdoubleLeft k1 p1 v1 t1 m1 (LLoser _ (E k2 p2 v2) t2 m2 t3) = + rsingleLeft k1 p1 v1 t1 m1 (lsingleRight k2 p2 v2 t2 m2 t3) +rdoubleLeft k1 p1 v1 t1 m1 (RLoser _ (E k2 p2 v2) t2 m2 t3) = + rsingleLeft k1 p1 v1 t1 m1 (rsingleRight k2 p2 v2 t2 m2 t3) +rdoubleLeft _ _ _ _ _ _ = moduleError "rdoubleLeft" "malformed tree" + +rdoubleRight :: Key -> Prio -> a -> LTree a -> Key -> LTree a -> LTree a +rdoubleRight k1 p1 v1 (LLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + rsingleRight k1 p1 v1 (lsingleLeft k2 p2 v2 t1 m1 t2) m2 t3 +rdoubleRight k1 p1 v1 (RLoser _ (E k2 p2 v2) t1 m1 t2) m2 t3 = + rsingleRight k1 p1 v1 (rsingleLeft k2 p2 v2 t1 m1 t2) m2 t3 +rdoubleRight _ _ _ _ _ _ = moduleError "rdoubleRight" "malformed tree" + +-- | Take two pennants and returns a new pennant that is the union of +-- the two with the precondition that the keys in the first tree are +-- strictly smaller than the keys in the second tree. +play :: PSQ a -> PSQ a -> PSQ a +Void `play` t' = t' +t `play` Void = t +Winner e@(E k p v) t m `play` Winner e'@(E k' p' v') t' m' + | p <= p' = Winner e (rbalance k' p' v' t m t') m' + | otherwise = Winner e' (lbalance k p v t m t') m' +{-# INLINE play #-} + +-- | A version of 'play' that can be used if the shape of the tree has +-- not changed or if the tree is known to be balanced. +unsafePlay :: PSQ a -> PSQ a -> PSQ a +Void `unsafePlay` t' = t' +t `unsafePlay` Void = t +Winner e@(E k p v) t m `unsafePlay` Winner e'@(E k' p' v') t' m' + | p <= p' = Winner e (rloser k' p' v' t m t') m' + | otherwise = Winner e' (lloser k p v t m t') m' +{-# INLINE unsafePlay #-} + +data TourView a = Null + | Single {-# UNPACK #-} !(Elem a) + | (PSQ a) `Play` (PSQ a) + +tourView :: PSQ a -> TourView a +tourView Void = Null +tourView (Winner e Start _) = Single e +tourView (Winner e (RLoser _ e' tl m tr) m') = + Winner e tl m `Play` Winner e' tr m' +tourView (Winner e (LLoser _ e' tl m tr) m') = + Winner e' tl m `Play` Winner e tr m' + +------------------------------------------------------------------------ +-- Utility functions + +moduleError :: String -> String -> a +moduleError fun msg = error ("System.Event.PSQ." ++ fun ++ ':' : ' ' : msg) +{-# NOINLINE moduleError #-} + +------------------------------------------------------------------------ +-- Hughes's efficient sequence type + +newtype Sequ a = Sequ ([a] -> [a]) + +emptySequ :: Sequ a +emptySequ = Sequ (\as -> as) + +singleSequ :: a -> Sequ a +singleSequ a = Sequ (\as -> a : as) + +(<>) :: Sequ a -> Sequ a -> Sequ a +Sequ x1 <> Sequ x2 = Sequ (\as -> x1 (x2 as)) +infixr 5 <> + +seqToList :: Sequ a -> [a] +seqToList (Sequ x) = x [] + +instance Show a => Show (Sequ a) where + showsPrec d a = showsPrec d (seqToList a) diff --git a/System/Event/Poll.hsc b/System/Event/Poll.hsc new file mode 100644 index 0000000..dc577a8 --- /dev/null +++ b/System/Event/Poll.hsc @@ -0,0 +1,149 @@ +{-# LANGUAGE ForeignFunctionInterface, GeneralizedNewtypeDeriving, + NoImplicitPrelude #-} + +module System.Event.Poll + ( + new + , available + ) where + +#include "EventConfig.h" + +#if !defined(HAVE_POLL_H) +import GHC.Base + +new :: IO E.Backend +new = error "Poll back end not implemented for this platform" + +available :: Bool +available = False +{-# INLINE available #-} +#else +#include + +import Control.Concurrent.MVar (MVar, newMVar, swapMVar) +import Control.Monad ((=<<), liftM, liftM2, unless) +import Data.Bits (Bits, (.|.), (.&.)) +import Data.Maybe (Maybe(..)) +import Data.Monoid (Monoid(..)) +import Foreign.C.Types (CInt, CShort, CULong) +import Foreign.Ptr (Ptr) +import Foreign.Storable (Storable(..)) +import GHC.Base +import GHC.Conc.Sync (withMVar) +import GHC.Err (undefined) +import GHC.Num (Num(..)) +import GHC.Real (ceiling, fromIntegral) +import GHC.Show (Show) +import System.Posix.Types (Fd(..)) + +import qualified System.Event.Array as A +import qualified System.Event.Internal as E + +available :: Bool +available = True +{-# INLINE available #-} + +data Poll = Poll { + pollChanges :: {-# UNPACK #-} !(MVar (A.Array PollFd)) + , pollFd :: {-# UNPACK #-} !(A.Array PollFd) + } + +new :: IO E.Backend +new = E.backend poll modifyFd (\_ -> return ()) `liftM` + liftM2 Poll (newMVar =<< A.empty) A.empty + +modifyFd :: Poll -> Fd -> E.Event -> E.Event -> IO () +modifyFd p fd oevt nevt = + withMVar (pollChanges p) $ \ary -> + A.snoc ary $ PollFd fd (fromEvent nevt) (fromEvent oevt) + +reworkFd :: Poll -> PollFd -> IO () +reworkFd p (PollFd fd npevt opevt) = do + let ary = pollFd p + if opevt == 0 + then A.snoc ary $ PollFd fd npevt 0 + else do + found <- A.findIndex ((== fd) . pfdFd) ary + case found of + Nothing -> error "reworkFd: event not found" + Just (i,_) + | npevt /= 0 -> A.unsafeWrite ary i $ PollFd fd npevt 0 + | otherwise -> A.removeAt ary i + +poll :: Poll + -> E.Timeout + -> (Fd -> E.Event -> IO ()) + -> IO () +poll p tout f = do + let a = pollFd p + mods <- swapMVar (pollChanges p) =<< A.empty + A.forM_ mods (reworkFd p) + n <- A.useAsPtr a $ \ptr len -> E.throwErrnoIfMinus1NoRetry "c_poll" $ + c_poll ptr (fromIntegral len) (fromIntegral (fromTimeout tout)) + unless (n == 0) $ do + A.loop a 0 $ \i e -> do + let r = pfdRevents e + if r /= 0 + then do f (pfdFd e) (toEvent r) + let i' = i + 1 + return (i', i' == n) + else return (i, True) + +fromTimeout :: E.Timeout -> Int +fromTimeout E.Forever = -1 +fromTimeout (E.Timeout s) = ceiling $ 1000 * s + +data PollFd = PollFd { + pfdFd :: {-# UNPACK #-} !Fd + , pfdEvents :: {-# UNPACK #-} !Event + , pfdRevents :: {-# UNPACK #-} !Event + } deriving (Show) + +newtype Event = Event CShort + deriving (Eq, Show, Num, Storable, Bits) + +#{enum Event, Event + , pollIn = POLLIN + , pollOut = POLLOUT +#ifdef POLLRDHUP + , pollRdHup = POLLRDHUP +#endif + , pollErr = POLLERR + , pollHup = POLLHUP + } + +fromEvent :: E.Event -> Event +fromEvent e = remap E.evtRead pollIn .|. + remap E.evtWrite pollOut + where remap evt to + | e `E.eventIs` evt = to + | otherwise = 0 + +toEvent :: Event -> E.Event +toEvent e = remap (pollIn .|. pollErr .|. pollHup) E.evtRead `mappend` + remap (pollOut .|. pollErr .|. pollHup) E.evtWrite + where remap evt to + | e .&. evt /= 0 = to + | otherwise = mempty + +instance Storable PollFd where + sizeOf _ = #size struct pollfd + alignment _ = alignment (undefined :: CInt) + + peek ptr = do + fd <- #{peek struct pollfd, fd} ptr + events <- #{peek struct pollfd, events} ptr + revents <- #{peek struct pollfd, revents} ptr + let !pollFd' = PollFd fd events revents + return pollFd' + + poke ptr p = do + #{poke struct pollfd, fd} ptr (pfdFd p) + #{poke struct pollfd, events} ptr (pfdEvents p) + #{poke struct pollfd, revents} ptr (pfdRevents p) + +foreign import ccall safe "poll.h poll" + c_poll :: Ptr PollFd -> CULong -> CInt -> IO CInt + +#endif /* defined(HAVE_POLL_H) */ diff --git a/System/Event/Thread.hs b/System/Event/Thread.hs new file mode 100644 index 0000000..342c914 --- /dev/null +++ b/System/Event/Thread.hs @@ -0,0 +1,106 @@ +{-# LANGUAGE BangPatterns, ForeignFunctionInterface, NoImplicitPrelude #-} + +module System.Event.Thread + ( + ensureIOManagerIsRunning + , threadWaitRead + , threadWaitWrite + , threadDelay + , registerDelay + ) where + +import Data.IORef (IORef, newIORef, readIORef, writeIORef) +import Data.Maybe (Maybe(..)) +import Foreign.Ptr (Ptr) +import GHC.Base +import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO, + labelThread, modifyMVar_, newTVar, sharedCAF, + threadStatus, writeTVar) +import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar) +import GHC.Num (fromInteger) +import GHC.Real (div) +import System.Event.Manager (Event, EventManager, evtRead, evtWrite, loop, + new, registerFd, unregisterFd_, registerTimeout) +import System.IO.Unsafe (unsafePerformIO) +import System.Posix.Types (Fd) + +-- | Suspends the current thread for a given number of microseconds +-- (GHC only). +-- +-- There is no guarantee that the thread will be rescheduled promptly +-- when the delay has expired, but the thread will never continue to +-- run /earlier/ than specified. +threadDelay :: Int -> IO () +threadDelay usecs = do + Just mgr <- readIORef eventManager + m <- newEmptyMVar + _ <- registerTimeout mgr (usecs `div` 1000) (putMVar m ()) + takeMVar m + +-- | Set the value of returned TVar to True after a given number of +-- microseconds. The caveats associated with threadDelay also apply. +-- +registerDelay :: Int -> IO (TVar Bool) +registerDelay usecs = do + t <- atomically $ newTVar False + Just mgr <- readIORef eventManager + _ <- registerTimeout mgr (usecs `div` 1000) . atomically $ writeTVar t True + return t + +-- | Block the current thread until data is available to read from the +-- given file descriptor. +threadWaitRead :: Fd -> IO () +threadWaitRead = threadWait evtRead +{-# INLINE threadWaitRead #-} + +-- | Block the current thread until the given file descriptor can +-- accept data to write. +threadWaitWrite :: Fd -> IO () +threadWaitWrite = threadWait evtWrite +{-# INLINE threadWaitWrite #-} + +threadWait :: Event -> Fd -> IO () +threadWait evt fd = do + m <- newEmptyMVar + Just mgr <- readIORef eventManager + _ <- registerFd mgr (\reg _ -> unregisterFd_ mgr reg >> putMVar m ()) fd evt + takeMVar m + +foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore" + getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a) + +eventManager :: IORef (Maybe EventManager) +eventManager = unsafePerformIO $ do + em <- newIORef Nothing + sharedCAF em getOrSetSystemEventThreadEventManagerStore +{-# NOINLINE eventManager #-} + +foreign import ccall unsafe "getOrSetSystemEventThreadIOManagerThreadStore" + getOrSetSystemEventThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManager #-} +ioManager :: MVar (Maybe ThreadId) +ioManager = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetSystemEventThreadIOManagerThreadStore + +ensureIOManagerIsRunning :: IO () +ensureIOManagerIsRunning + | not threaded = return () + | otherwise = modifyMVar_ ioManager $ \old -> do + let create = do + !mgr <- new + writeIORef eventManager $ Just mgr + !t <- forkIO $ loop mgr + labelThread t "IOManager" + return $ Just t + case old of + Nothing -> create + st@(Just t) -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return st + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool diff --git a/System/Event/Unique.hs b/System/Event/Unique.hs new file mode 100644 index 0000000..879232c --- /dev/null +++ b/System/Event/Unique.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE BangPatterns, GeneralizedNewtypeDeriving, NoImplicitPrelude #-} +module System.Event.Unique + ( + UniqueSource + , Unique(..) + , newSource + , newUnique + ) where + +import Data.Int (Int64) +import GHC.Base +import GHC.Conc.Sync (TVar, atomically, newTVarIO, readTVar, writeTVar) +import GHC.Num (Num(..)) +import GHC.Show (Show(..)) + +-- We used to use IORefs here, but Simon switched us to STM when we +-- found that our use of atomicModifyIORef was subject to a severe RTS +-- performance problem when used in a tight loop from multiple +-- threads: http://hackage.haskell.org/trac/ghc/ticket/3838 +-- +-- There seems to be no performance cost to using a TVar instead. + +newtype UniqueSource = US (TVar Int64) + +newtype Unique = Unique { asInt64 :: Int64 } + deriving (Eq, Ord, Num) + +instance Show Unique where + show = show . asInt64 + +newSource :: IO UniqueSource +newSource = US `fmap` newTVarIO 0 + +newUnique :: UniqueSource -> IO Unique +newUnique (US ref) = atomically $ do + u <- readTVar ref + let !u' = u+1 + writeTVar ref u' + return $ Unique u' +{-# INLINE newUnique #-} diff --git a/base.cabal b/base.cabal index 0d3d30d..5e1a6d5 100644 --- a/base.cabal +++ b/base.cabal @@ -13,7 +13,7 @@ cabal-version: >=1.6 build-type: Configure extra-tmp-files: config.log config.status autom4te.cache - include/HsBaseConfig.h + include/HsBaseConfig.h include/EventConfig.h extra-source-files: config.guess config.sub install-sh aclocal.m4 configure.ac configure @@ -40,6 +40,9 @@ Library { GHC.Base, GHC.Classes, GHC.Conc, + GHC.Conc.IO, + GHC.Conc.Signal, + GHC.Conc.Sync, GHC.ConsoleHandler, GHC.Constants, GHC.Desugar, @@ -95,6 +98,7 @@ Library { System.Timeout if os(windows) exposed-modules: GHC.IO.Encoding.CodePage.Table + GHC.Conc.Windows extensions: MagicHash, ExistentialQuantification, Rank2Types, ScopedTypeVariables, UnboxedTuples, ForeignFunctionInterface, UnliftedFFITypes, @@ -102,7 +106,7 @@ Library { FlexibleInstances, StandaloneDeriving, PatternGuards, EmptyDataDecls, NoImplicitPrelude - if impl(ghc < 6.10) + if impl(ghc < 6.10) -- PatternSignatures was deprecated in 6.10 extensions: PatternSignatures } @@ -206,10 +210,27 @@ Library { cbits/primFloat.c include-dirs: include includes: HsBase.h - install-includes: HsBase.h HsBaseConfig.h WCsubst.h consUtils.h Typeable.h + install-includes: HsBase.h HsBaseConfig.h EventConfig.h WCsubst.h consUtils.h Typeable.h if os(windows) { extra-libraries: wsock32, user32, shell32 } + if !os(windows) { + exposed-modules: + System.Event + other-modules: + System.Event.Array + System.Event.Clock + System.Event.Control + System.Event.EPoll + System.Event.IntMap + System.Event.Internal + System.Event.KQueue + System.Event.Manager + System.Event.PSQ + System.Event.Poll + System.Event.Thread + System.Event.Unique + } extensions: CPP -- We need to set the package name to base (without a version number) -- as it's magic. diff --git a/configure.ac b/configure.ac index e4ab28b..c79c531 100644 --- a/configure.ac +++ b/configure.ac @@ -3,7 +3,7 @@ AC_INIT([Haskell base package], [1.0], [libraries@haskell.org], [base]) # Safety check: Ensure that we are in the correct source directory. AC_CONFIG_SRCDIR([include/HsBase.h]) -AC_CONFIG_HEADERS([include/HsBaseConfig.h]) +AC_CONFIG_HEADERS([include/HsBaseConfig.h include/EventConfig.h]) AC_ARG_WITH([cc], [C compiler], @@ -17,7 +17,7 @@ dnl ** check for full ANSI header (.h) files AC_HEADER_STDC # check for specific header (.h) files that we are interested in -AC_CHECK_HEADERS([ctype.h errno.h fcntl.h inttypes.h limits.h signal.h sys/resource.h sys/select.h sys/stat.h sys/syscall.h sys/time.h sys/timeb.h sys/timers.h sys/times.h sys/types.h sys/utsname.h sys/wait.h termios.h time.h unistd.h utime.h windows.h winsock.h langinfo.h]) +AC_CHECK_HEADERS([ctype.h errno.h fcntl.h inttypes.h limits.h signal.h sys/resource.h sys/select.h sys/stat.h sys/syscall.h sys/time.h sys/timeb.h sys/timers.h sys/times.h sys/types.h sys/utsname.h sys/wait.h termios.h time.h unistd.h utime.h windows.h winsock.h langinfo.h poll.h sys/epoll.h sys/event.h sys/eventfd.h]) # Enable large file support. Do this before testing the types ino_t, off_t, and # rlim_t, because it will affect the result of that test. @@ -32,6 +32,22 @@ AC_CHECK_FUNCS([lstat]) AC_CHECK_FUNCS([getclock getrusage times]) AC_CHECK_FUNCS([_chsize ftruncate]) +AC_CHECK_FUNCS([epoll_create1 epoll_ctl eventfd kevent kevent64 kqueue poll]) + +# event-related fun + +if test "$ac_cv_header_sys_epoll_h" = yes -a "$ac_cv_func_epoll_ctl" = yes; then + AC_DEFINE([HAVE_EPOLL], [1], [Define if you have epoll support.]) +fi + +if test "$ac_cv_header_sys_event_h" = yes -a "$ac_cv_func_kqueue" = yes; then + AC_DEFINE([HAVE_KQUEUE], [1], [Define if you have kqueue support.]) +fi + +if test "$ac_cv_header_poll_h" = yes -a "$ac_cv_func_poll" = yes; then + AC_DEFINE([HAVE_POLL], [1], [Define if you have poll support.]) +fi + dnl-------------------------------------------------------------------- dnl * Deal with arguments telling us iconv is somewhere odd dnl-------------------------------------------------------------------- diff --git a/include/EventConfig.h.in b/include/EventConfig.h.in new file mode 100644 index 0000000..032ceb1 --- /dev/null +++ b/include/EventConfig.h.in @@ -0,0 +1,85 @@ +/* include/EventConfig.h.in. Generated from configure.ac by autoheader. */ + +/* Define if you have epoll support. */ +#undef HAVE_EPOLL + +/* Define to 1 if you have the `epoll_create1' function. */ +#undef HAVE_EPOLL_CREATE1 + +/* Define to 1 if you have the `epoll_ctl' function. */ +#undef HAVE_EPOLL_CTL + +/* Define to 1 if you have the `eventfd' function. */ +#undef HAVE_EVENTFD + +/* Define to 1 if you have the header file. */ +#undef HAVE_INTTYPES_H + +/* Define to 1 if you have the `kevent' function. */ +#undef HAVE_KEVENT + +/* Define to 1 if you have the `kevent64' function. */ +#undef HAVE_KEVENT64 + +/* Define if you have kqueue support. */ +#undef HAVE_KQUEUE + +/* Define to 1 if you have the header file. */ +#undef HAVE_MEMORY_H + +/* Define if you have poll support. */ +#undef HAVE_POLL + +/* Define to 1 if you have the header file. */ +#undef HAVE_POLL_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SIGNAL_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STDINT_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STDLIB_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STRINGS_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STRING_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_EPOLL_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_EVENTFD_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_EVENT_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_STAT_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_TYPES_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_UNISTD_H + +/* Define to the address where bug reports for this package should be sent. */ +#undef PACKAGE_BUGREPORT + +/* Define to the full name of this package. */ +#undef PACKAGE_NAME + +/* Define to the full name and version of this package. */ +#undef PACKAGE_STRING + +/* Define to the one symbol short name of this package. */ +#undef PACKAGE_TARNAME + +/* Define to the version of this package. */ +#undef PACKAGE_VERSION + +/* Define to 1 if you have the ANSI C header files. */ +#undef STDC_HEADERS diff --git a/include/HsEvent.h b/include/HsEvent.h new file mode 100644 index 0000000..fe0a7ca --- /dev/null +++ b/include/HsEvent.h @@ -0,0 +1,41 @@ +#ifndef __HS_EVENT_H__ +#define __HS_EVENT_H__ + +#include "EventConfig.h" + +#include +#include + +#if !defined(INLINE) +# if defined(_MSC_VER) +# define INLINE extern __inline +# else +# define INLINE inline +# endif +#endif + +INLINE int __hsevent_num_signals(void) +{ +#if defined(_NSIG) + return _NSIG; +#else + return 128; /* best guess */ +#endif +} + +INLINE void __hsevent_thread_self(pthread_t *tid) +{ + *tid = pthread_self(); +} + +INLINE int __hsevent_kill_thread(pthread_t *tid, int sig) +{ + return pthread_kill(*tid, sig); +} + +#endif /* __HS_EVENT_H__ */ +/* + * Local Variables: + * c-file-style: "stroustrup" + * End: + */