X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=1deb1601acf73a93b8b2519d3ee60471effd1108;hb=567080c906535534628b1ab83a4a4425dcd4bb5e;hp=f50da8ab4fbe67ae54a3a1daa54ccb98a4cc7724;hpb=b7564a80c1ce808319a396bafedd08fc97df05b3;p=haskell-directory.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index f50da8a..1deb160 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -19,12 +19,16 @@ -- bits it exports, we'd rather have Control.Concurrent and the other -- higher level modules be the home. Hence: +#include "Typeable.h" + -- #not-home module GHC.Conc ( ThreadId(..) - -- Forking and suchlike + -- * Forking and suchlike , forkIO -- :: IO a -> IO ThreadId + , forkOnIO -- :: Int -> IO a -> IO ThreadId + , childHandler -- :: Exception -> IO () , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () , throwTo -- :: ThreadId -> Exception -> IO () @@ -33,13 +37,13 @@ module GHC.Conc , yield -- :: IO () , labelThread -- :: ThreadId -> String -> IO () - -- Waiting + -- * Waiting , threadDelay -- :: Int -> IO () , registerDelay -- :: Int -> IO (TVar Bool) , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO () - -- MVars + -- * MVars , MVar -- abstract , newMVar -- :: a -> IO (MVar a) , newEmptyMVar -- :: IO (MVar a) @@ -50,18 +54,22 @@ module GHC.Conc , isEmptyMVar -- :: MVar a -> IO Bool , addMVarFinalizer -- :: MVar a -> IO () -> IO () - -- TVars + -- * TVars , STM -- abstract , atomically -- :: STM a -> IO a , retry -- :: STM a , orElse -- :: STM a -> STM a -> STM a , catchSTM -- :: STM a -> (Exception -> STM a) -> STM a + , alwaysSucceeds -- :: STM a -> STM () + , always -- :: STM Bool -> STM () , TVar -- abstract , newTVar -- :: a -> STM (TVar a) + , newTVarIO -- :: a -> STM (TVar a) , readTVar -- :: TVar a -> STM a , writeTVar -- :: a -> TVar a -> STM () , unsafeIOToSTM -- :: IO a -> STM a + -- * Miscellaneous #ifdef mingw32_HOST_OS , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) @@ -81,7 +89,9 @@ import System.Posix.Internals import Foreign import Foreign.C +#ifndef __HADDOCK__ import {-# SOURCE #-} GHC.TopHandler ( reportError, reportStackOverflow ) +#endif import Data.Maybe @@ -94,6 +104,7 @@ import GHC.Exception ( catchException, Exception(..), AsyncException(..) ) import GHC.Pack ( packCString# ) import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) ) import GHC.STRef +import GHC.Show ( Show(..), showString ) import Data.Typeable infixr 0 `par`, `pseq` @@ -128,6 +139,35 @@ This misfeature will hopefully be corrected at a later date. it defines 'ThreadId' as a synonym for (). -} +instance Show ThreadId where + showsPrec d t = + showString "ThreadId " . + showsPrec d (getThreadId (id2TSO t)) + +foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> Int + +id2TSO :: ThreadId -> ThreadId# +id2TSO (ThreadId t) = t + +foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt +-- Returns -1, 0, 1 + +cmpThread :: ThreadId -> ThreadId -> Ordering +cmpThread t1 t2 = + case cmp_thread (id2TSO t1) (id2TSO t2) of + -1 -> LT + 0 -> EQ + _ -> GT -- must be 1 + +instance Eq ThreadId where + t1 == t2 = + case t1 `cmpThread` t2 of + EQ -> True + _ -> False + +instance Ord ThreadId where + compare = cmpThread + {- | This sparks off a new thread to run the 'IO' computation passed as the first argument, and returns the 'ThreadId' of the newly created @@ -142,6 +182,12 @@ forkIO action = IO $ \ s -> where action_plus = catchException action childHandler +forkOnIO :: Int -> IO () -> IO ThreadId +forkOnIO (I# cpu) action = IO $ \ s -> + case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + where + action_plus = catchException action childHandler + childHandler :: Exception -> IO () childHandler err = catchException (real_handler err) childHandler @@ -249,11 +295,14 @@ TVars are shared memory locations which support atomic memory transactions. \begin{code} -newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) deriving( Typeable ) +-- |A monad supporting atomic memory transactions. +newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #)) unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #)) unSTM (STM a) = a +INSTANCE_TYPEABLE1(STM,stmTc,"STM") + instance Functor STM where fmap f x = x >>= (return . f) @@ -285,6 +334,15 @@ unsafeIOToSTM :: IO a -> STM a unsafeIOToSTM (IO m) = STM m -- |Perform a series of STM actions atomically. +-- +-- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. +-- Any attempt to do so will result in a runtime error. (Reason: allowing +-- this would effectively allow a transaction inside a transaction, depending +-- on exactly when the thunk is evaluated.) +-- +-- However, see 'newTVarIO', which can be called inside 'unsafePerformIO', +-- and which allows top-level TVars to be allocated. + atomically :: STM a -> IO a atomically (STM m) = IO (\s -> (atomically# m) s ) @@ -292,14 +350,15 @@ atomically (STM m) = IO (\s -> (atomically# m) s ) -- values in TVars which mean that it should not continue (e.g. the TVars -- represent a shared buffer that is now empty). The implementation may -- block the thread until one of the TVars that it has read from has been --- udpated. +-- udpated. (GHC only) retry :: STM a retry = STM $ \s# -> retry# s# --- |Compose two alternative STM actions. If the first action completes without --- retrying then it forms the result of the orElse. Otherwise, if the first --- action retries, then the second action is tried in its place. If both actions --- retry then the orElse as a whole retries. +-- |Compose two alternative STM actions (GHC only). If the first action +-- completes without retrying then it forms the result of the orElse. +-- Otherwise, if the first action retries, then the second action is +-- tried in its place. If both actions retry then the orElse as a +-- whole retries. orElse :: STM a -> STM a -> STM a orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s @@ -307,7 +366,34 @@ orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s catchSTM :: STM a -> (Exception -> STM a) -> STM a catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s -data TVar a = TVar (TVar# RealWorld a) deriving( Typeable ) +-- | Low-level primitive on which always and alwaysSucceeds are built. +-- checkInv differs form these in that (i) the invariant is not +-- checked when checkInv is called, only at the end of this and +-- subsequent transcations, (ii) the invariant failure is indicated +-- by raising an exception. +checkInv :: STM a -> STM () +checkInv (STM m) = STM (\s -> (check# m) s) + +-- | alwaysSucceeds adds a new invariant that must be true when passed +-- to alwaysSucceeds, at the end of the current transaction, and at +-- the end of every subsequent transaction. If it fails at any +-- of those points then the transaction violating it is aborted +-- and the exception raised by the invariant is propagated. +alwaysSucceeds :: STM a -> STM () +alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) + checkInv i + +-- | always is a variant of alwaysSucceeds in which the invariant is +-- expressed as an STM Bool action that must return True. Returning +-- False or raising an exception are both treated as invariant failures. +always :: STM Bool -> STM () +always i = alwaysSucceeds ( do v <- i + if (v) then return () else ( error "Transacional invariant violation" ) ) + +-- |Shared memory locations that support atomic memory transactions. +data TVar a = TVar (TVar# RealWorld a) + +INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar") instance Eq (TVar a) where (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2# @@ -318,6 +404,15 @@ newTVar val = STM $ \s1# -> case newTVar# val s1# of (# s2#, tvar# #) -> (# s2#, TVar tvar# #) +-- |@IO@ version of 'newTVar'. This is useful for creating top-level +-- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTVarIO :: a -> IO (TVar a) +newTVarIO val = IO $ \s1# -> + case newTVar# val s1# of + (# s2#, tvar# #) -> (# s2#, TVar tvar# #) + -- |Return the current value stored in a TVar readTVar :: TVar a -> STM a readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# @@ -364,16 +459,34 @@ newMVar value = -- empty, 'takeMVar' will wait until it is full. After a 'takeMVar', -- the 'MVar' is left empty. -- --- If several threads are competing to take the same 'MVar', one is chosen --- to continue at random when the 'MVar' becomes full. +-- There are two further important properties of 'takeMVar': +-- +-- * 'takeMVar' is single-wakeup. That is, if there are multiple +-- threads blocked in 'takeMVar', and the 'MVar' becomes full, +-- only one thread will be woken up. The runtime guarantees that +-- the woken thread completes its 'takeMVar' operation. +-- +-- * When multiple threads are blocked on an 'MVar', they are +-- woken up in FIFO order. This is useful for providing +-- fairness properties of abstractions built using 'MVar's. +-- takeMVar :: MVar a -> IO a takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s# -- |Put a value into an 'MVar'. If the 'MVar' is currently full, -- 'putMVar' will wait until it becomes empty. -- --- If several threads are competing to fill the same 'MVar', one is --- chosen to continue at random when the 'MVar' becomes empty. +-- There are two further important properties of 'putMVar': +-- +-- * 'putMVar' is single-wakeup. That is, if there are multiple +-- threads blocked in 'putMVar', and the 'MVar' becomes empty, +-- only one thread will be woken up. The runtime guarantees that +-- the woken thread completes its 'putMVar' operation. +-- +-- * When multiple threads are blocked on an 'MVar', they are +-- woken up in FIFO order. This is useful for providing +-- fairness properties of abstractions built using 'MVar's. +-- putMVar :: MVar a -> a -> IO () putMVar (MVar mvar#) x = IO $ \ s# -> case putMVar# mvar# x s# of @@ -511,6 +624,7 @@ threadDelay time case delay# time# s of { s -> (# s, () #) }} +registerDelay :: Int -> IO (TVar Bool) registerDelay usecs #ifndef mingw32_HOST_OS | threaded = waitForDelayEventSTM usecs @@ -638,38 +752,54 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do if (res == -1) then do err <- getErrno - if err == eINTR - then do_select delays' - else return (res,delays') + case err of + _ | err == eINTR -> do_select delays' + -- EINTR: just redo the select() + _ | err == eBADF -> return (True, delays) + -- EBADF: one of the file descriptors is closed or bad, + -- we don't know which one, so wake everyone up. + _ | otherwise -> throwErrno "select" + -- otherwise (ENOMEM or EINVAL) something has gone + -- wrong; report the error. else - return (res,delays') - - (res,delays') <- do_select delays - -- ToDo: check result - - b <- fdIsSet wakeup readfds - if b == 0 - then return () - else alloca $ \p -> do - c_read (fromIntegral wakeup) p 1; return () - s <- peek p - if (s == 0xff) - then return () - else do handler_tbl <- peek handlers - sp <- peekElemOff handler_tbl (fromIntegral s) - forkIO (do io <- deRefStablePtr sp; io) - return () + return (False,delays') + + (wakeup_all,delays') <- do_select delays + + exit <- + if wakeup_all then return False + else do + b <- fdIsSet wakeup readfds + if b == 0 + then return False + else alloca $ \p -> do + c_read (fromIntegral wakeup) p 1; return () + s <- peek p + case s of + _ | s == io_MANAGER_WAKEUP -> return False + _ | s == io_MANAGER_DIE -> return True + _ -> do handler_tbl <- peek handlers + sp <- peekElemOff handler_tbl (fromIntegral s) + forkIO (do io <- deRefStablePtr sp; io) + return False + + if exit then return () else do takeMVar prodding putMVar prodding False - reqs' <- completeRequests reqs readfds writefds [] + reqs' <- if wakeup_all then do wakeupAll reqs; return [] + else completeRequests reqs readfds writefds [] + service_loop wakeup readfds writefds ptimeval reqs' delays' stick :: IORef Fd {-# NOINLINE stick #-} stick = unsafePerformIO (newIORef 0) +io_MANAGER_WAKEUP = 0xff :: CChar +io_MANAGER_DIE = 0xfe :: CChar + prodding :: MVar Bool {-# NOINLINE prodding #-} prodding = unsafePerformIO (newMVar False) @@ -679,7 +809,8 @@ prodServiceThread = do b <- takeMVar prodding if (not b) then do fd <- readIORef stick - with 0xff $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return () + with io_MANAGER_WAKEUP $ \pbuf -> do + c_write (fromIntegral fd) pbuf 1; return () else return () putMVar prodding True @@ -715,6 +846,10 @@ completeRequests (Write fd m : reqs) readfds writefds reqs' = do then do putMVar m (); completeRequests reqs readfds writefds reqs' else completeRequests reqs readfds writefds (Write fd m : reqs') +wakeupAll [] = return () +wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs +wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs + waitForReadEvent :: Fd -> IO () waitForReadEvent fd = do m <- newEmptyMVar