X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=1d5cc9c2a2f5d975c4dfe717142c4b233341fcfa;hb=c856e1e71c608e8c291218f66645eb748270b6d2;hp=c45d563f1bd55587ae0baa182a5a84adeb6f9c86;hpb=b9152b3523862840a0b682ffa55cf55281c93185;p=ghc-base.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index c45d563..1d5cc9c 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -1,5 +1,6 @@ \begin{code} {-# OPTIONS_GHC -XNoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_HADDOCK not-home #-} ----------------------------------------------------------------------------- -- | @@ -36,6 +37,7 @@ module GHC.Conc , throwTo -- :: ThreadId -> Exception -> IO () , par -- :: a -> b -> b , pseq -- :: a -> b -> b + , runSparks , yield -- :: IO () , labelThread -- :: ThreadId -> String -> IO () @@ -71,6 +73,7 @@ module GHC.Conc , newTVar -- :: a -> STM (TVar a) , newTVarIO -- :: a -> STM (TVar a) , readTVar -- :: TVar a -> STM a + , readTVarIO -- :: TVar a -> IO a , writeTVar -- :: a -> TVar a -> STM () , unsafeIOToSTM -- :: IO a -> STM a @@ -114,9 +117,10 @@ import GHC.Base import {-# SOURCE #-} GHC.Handle import GHC.IOBase import GHC.Num ( Num(..) ) -import GHC.Real ( fromIntegral, div ) -#ifndef mingw32_HOST_OS -import GHC.Base ( Int(..) ) +import GHC.Real ( fromIntegral ) +#ifdef mingw32_HOST_OS +import GHC.Real ( div ) +import GHC.Ptr ( plusPtr, FunPtr(..) ) #endif #ifdef mingw32_HOST_OS import GHC.Read ( Read ) @@ -124,7 +128,7 @@ import GHC.Enum ( Enum ) #endif import GHC.Exception ( SomeException(..), throw ) import GHC.Pack ( packCString# ) -import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) ) +import GHC.Ptr ( Ptr(..) ) import GHC.STRef import GHC.Show ( Show(..), showString ) import Data.Typeable @@ -201,10 +205,15 @@ library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead. GHC note: the new thread inherits the /blocked/ state of the parent (see 'Control.Exception.block'). + +The newly created thread has an exception handler that discards the +exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and +'ThreadKilled', and passes all other exceptions to the uncaught +exception handler (see 'setUncaughtExceptionHandler'). -} forkIO :: IO () -> IO ThreadId forkIO action = IO $ \ s -> - case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) where action_plus = catchException action childHandler @@ -223,7 +232,7 @@ equivalent). -} forkOnIO :: Int -> IO () -> IO ThreadId forkOnIO (I# cpu) action = IO $ \ s -> - case (forkOn# cpu action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) where action_plus = catchException action childHandler @@ -236,8 +245,11 @@ numCapabilities = unsafePerformIO $ do n <- peek n_capabilities return (fromIntegral n) +#if defined(mingw32_HOST_OS) && defined(__PIC__) +foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt +#else foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt - +#endif childHandler :: SomeException -> IO () childHandler err = catchException (real_handler err) childHandler @@ -262,11 +274,12 @@ The memory used by the thread will be garbage collected if it isn\'t referenced from anywhere. The 'killThread' function is defined in terms of 'throwTo': -> killThread tid = throwTo tid (AsyncException ThreadKilled) +> killThread tid = throwTo tid ThreadKilled +Killthread is a no-op if the target thread has already completed. -} killThread :: ThreadId -> IO () -killThread tid = throwTo tid (toException ThreadKilled) +killThread tid = throwTo tid ThreadKilled {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only). @@ -288,25 +301,24 @@ the paper \"Asynchronous exceptions in Haskell\" (). In the paper, 'throwTo' is non-blocking; but the library implementation adopts a more synchronous design in which 'throwTo' does not return until the exception -is received by the target thread. The trade-off is discussed in Section 8 of the paper. -Like any blocking operation, 'throwTo' is therefore interruptible (see Section 4.3 of +is received by the target thread. The trade-off is discussed in Section 9 of the paper. +Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of the paper). There is currently no guarantee that the exception delivered by 'throwTo' will be -delivered at the first possible opportunity. In particular, if a thread may +delivered at the first possible opportunity. In particular, a thread may unblock and then re-block exceptions (using 'unblock' and 'block') without receiving a pending 'throwTo'. This is arguably undesirable behaviour. -} --- XXX This is duplicated in Control.{Old,}Exception -throwTo :: ThreadId -> SomeException -> IO () -throwTo (ThreadId id) ex = IO $ \ s -> - case (killThread# id ex s) of s1 -> (# s1, () #) +throwTo :: Exception e => ThreadId -> e -> IO () +throwTo (ThreadId tid) ex = IO $ \ s -> + case (killThread# tid (toException ex) s) of s1 -> (# s1, () #) -- | Returns the 'ThreadId' of the calling thread (GHC only). myThreadId :: IO ThreadId myThreadId = IO $ \s -> - case (myThreadId# s) of (# s1, id #) -> (# s1, ThreadId id #) + case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #) -- |The 'yield' action allows (forces, in a co-operative multitasking @@ -352,6 +364,13 @@ pseq x y = x `seq` lazy y par :: a -> b -> b par x y = case (par# x) of { _ -> lazy y } +-- | Internal function used by the RTS to run sparks. +runSparks :: IO () +runSparks = IO loop + where loop s = case getSpark# s of + (# s', n, p #) -> + if n ==# 0# then (# s', () #) + else p `seq` loop s' data BlockReason = BlockedOnMVar @@ -439,7 +458,7 @@ bindSTM (STM m) k = STM ( \s -> thenSTM :: STM a -> STM b -> STM b thenSTM (STM m) k = STM ( \s -> case m s of - (# new_s, a #) -> unSTM k new_s + (# new_s, _ #) -> unSTM k new_s ) returnSTM :: a -> STM a @@ -548,6 +567,16 @@ newTVarIO val = IO $ \s1# -> case newTVar# val s1# of (# s2#, tvar# #) -> (# s2#, TVar tvar# #) +-- |Return the current value stored in a TVar. +-- This is equivalent to +-- +-- > readTVarIO = atomically . readTVar +-- +-- but works much faster, because it doesn't perform a complete +-- transaction, it just reads the current value of the 'TVar'. +readTVarIO :: TVar a -> IO a +readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s# + -- |Return the current value stored in a TVar readTVar :: TVar a -> STM a readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s# @@ -634,8 +663,8 @@ putMVar (MVar mvar#) x = IO $ \ s# -> tryTakeMVar :: MVar a -> IO (Maybe a) tryTakeMVar (MVar m) = IO $ \ s -> case tryTakeMVar# m s of - (# s, 0#, _ #) -> (# s, Nothing #) -- MVar is empty - (# s, _, a #) -> (# s, Just a #) -- MVar is full + (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty + (# s', _, a #) -> (# s', Just a #) -- MVar is full -- |A non-blocking version of 'putMVar'. The 'tryPutMVar' function -- attempts to put the value @a@ into the 'MVar', returning 'True' if @@ -661,7 +690,7 @@ isEmptyMVar (MVar mv#) = IO $ \ s# -> -- "System.Mem.Weak" for more about finalizers. addMVarFinalizer :: MVar a -> IO () -> IO () addMVarFinalizer (MVar m) finalizer = - IO $ \s -> case mkWeak# m () finalizer s of { (# s1, w #) -> (# s1, () #) } + IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) } withMVar :: MVar a -> (a -> IO b) -> IO b withMVar m io = @@ -690,19 +719,19 @@ withMVar m io = asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = IO $ \s -> case asyncRead# fd isSock len buf s of - (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = IO $ \s -> case asyncWrite# fd isSock len buf s of - (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int asyncDoProc (FunPtr proc) (Ptr param) = -- the 'length' value is ignored; simplifies implementation of -- the async*# primops to have them all return the same result. IO $ \s -> case asyncDoProc# proc param s of - (# s, len#, err# #) -> (# s, I# err# #) + (# s', _len#, err# #) -> (# s', I# err# #) -- to aid the use of these primops by the IO Handle implementation, -- provide the following convenience funs: @@ -730,7 +759,7 @@ threadWaitRead fd #endif | otherwise = IO $ \s -> case fromIntegral fd of { I# fd# -> - case waitRead# fd# s of { s -> (# s, () #) + case waitRead# fd# s of { s' -> (# s', () #) }} -- | Block the current thread until data can be written to the @@ -742,7 +771,7 @@ threadWaitWrite fd #endif | otherwise = IO $ \s -> case fromIntegral fd of { I# fd# -> - case waitWrite# fd# s of { s -> (# s, () #) + case waitWrite# fd# s of { s' -> (# s', () #) }} -- | Suspends the current thread for a given number of microseconds @@ -757,7 +786,7 @@ threadDelay time | threaded = waitForDelayEvent time | otherwise = IO $ \s -> case fromIntegral time of { I# time# -> - case delay# time# s of { s -> (# s, () #) + case delay# time# s of { s' -> (# s', () #) }} @@ -910,26 +939,28 @@ service_loop wakeup old_delays = do case r of 0xffffffff -> do c_maperrno; throwErrno "service_loop" 0 -> do - r <- c_readIOManagerEvent + r2 <- c_readIOManagerEvent exit <- - case r of - _ | r == io_MANAGER_WAKEUP -> return False - _ | r == io_MANAGER_DIE -> return True + case r2 of + _ | r2 == io_MANAGER_WAKEUP -> return False + _ | r2 == io_MANAGER_DIE -> return True 0 -> return False -- spurious wakeup - r -> do start_console_handler (r `shiftR` 1); return False + _ -> do start_console_handler (r2 `shiftR` 1); return False if exit then return () else service_cont wakeup delays' _other -> service_cont wakeup delays' -- probably timeout +service_cont :: HANDLE -> [DelayReq] -> IO () service_cont wakeup delays = do atomicModifyIORef prodding (\_ -> (False,False)) service_loop wakeup delays -- must agree with rts/win32/ThrIOManager.c -io_MANAGER_WAKEUP = 0xffffffff :: Word32 -io_MANAGER_DIE = 0xfffffffe :: Word32 +io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 +io_MANAGER_WAKEUP = 0xffffffff +io_MANAGER_DIE = 0xfffffffe data ConsoleEvent = ControlC @@ -948,6 +979,7 @@ start_console_handler r = return () Nothing -> return () +toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent toWin32ConsoleEvent ev = case ev of 0 {- CTRL_C_EVENT-} -> Just ControlC @@ -960,19 +992,21 @@ toWin32ConsoleEvent ev = win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler")) +-- XXX Is this actually needed? stick :: IORef HANDLE {-# NOINLINE stick #-} stick = unsafePerformIO (newIORef nullPtr) +wakeupIOManager :: IO () wakeupIOManager = do - hdl <- readIORef stick + _hdl <- readIORef stick c_sendIOManagerEvent io_MANAGER_WAKEUP -- Walk the queue of pending delays, waking up any that have passed -- and return the smallest delay to wait for. The queue of pending -- delays is kept ordered. getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) -getDelay now [] = return ([], iNFINITE) +getDelay _ [] = return ([], iNFINITE) getDelay now all@(d : rest) = case d of Delay time m | now >= time -> do @@ -993,7 +1027,8 @@ getDelay now all@(d : rest) type HANDLE = Ptr () type DWORD = Word32 -iNFINITE = 0xFFFFFFFF :: DWORD -- urgh +iNFINITE :: DWORD +iNFINITE = 0xFFFFFFFF -- urgh foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) c_getIOManagerEvent :: IO HANDLE @@ -1045,7 +1080,7 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do -- pick up new delay requests new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) - let delays = foldr insertDelay old_delays new_delays + let delays0 = foldr insertDelay old_delays new_delays -- build the FDSets for select() fdZero readfds @@ -1078,7 +1113,7 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do else return (False,delays') - (wakeup_all,delays') <- do_select delays + (wakeup_all,delays') <- do_select delays0 exit <- if wakeup_all then return False @@ -1108,8 +1143,9 @@ service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do service_loop wakeup readfds writefds ptimeval reqs' delays' -io_MANAGER_WAKEUP = 0xff :: CChar -io_MANAGER_DIE = 0xfe :: CChar +io_MANAGER_WAKEUP, io_MANAGER_DIE :: CChar +io_MANAGER_WAKEUP = 0xff +io_MANAGER_DIE = 0xfe stick :: IORef Fd {-# NOINLINE stick #-} @@ -1135,18 +1171,21 @@ foreign import ccall "setIOManagerPipe" -- ----------------------------------------------------------------------------- -- IO requests -buildFdSets maxfd readfds writefds [] = return maxfd -buildFdSets maxfd readfds writefds (Read fd m : reqs) +buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd +buildFdSets maxfd _ _ [] = return maxfd +buildFdSets maxfd readfds writefds (Read fd _ : reqs) | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" | otherwise = do fdSet fd readfds buildFdSets (max maxfd fd) readfds writefds reqs -buildFdSets maxfd readfds writefds (Write fd m : reqs) +buildFdSets maxfd readfds writefds (Write fd _ : reqs) | fd >= fD_SETSIZE = error "buildFdSets: file descriptor out of range" | otherwise = do fdSet fd writefds buildFdSets (max maxfd fd) readfds writefds reqs +completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] + -> IO [IOReq] completeRequests [] _ _ reqs' = return reqs' completeRequests (Read fd m : reqs) readfds writefds reqs' = do b <- fdIsSet fd readfds @@ -1159,9 +1198,10 @@ completeRequests (Write fd m : reqs) readfds writefds reqs' = do then do putMVar m (); completeRequests reqs readfds writefds reqs' else completeRequests reqs readfds writefds (Write fd m : reqs') +wakeupAll :: [IOReq] -> IO () wakeupAll [] = return () -wakeupAll (Read fd m : reqs) = do putMVar m (); wakeupAll reqs -wakeupAll (Write fd m : reqs) = do putMVar m (); wakeupAll reqs +wakeupAll (Read _ m : reqs) = do putMVar m (); wakeupAll reqs +wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs waitForReadEvent :: Fd -> IO () waitForReadEvent fd = do @@ -1184,7 +1224,7 @@ waitForWriteEvent fd = do -- and return the smallest delay to wait for. The queue of pending -- delays is kept ordered. getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) -getDelay now ptimeval [] = return ([],nullPtr) +getDelay _ _ [] = return ([],nullPtr) getDelay now ptimeval all@(d : rest) = case d of Delay time m | now >= time -> do @@ -1197,7 +1237,7 @@ getDelay now ptimeval all@(d : rest) setTimevalTicks ptimeval (delayTime d - now) return (all,ptimeval) -newtype CTimeVal = CTimeVal () +data CTimeVal foreign import ccall unsafe "sizeofTimeVal" sizeofTimeVal :: Int @@ -1216,7 +1256,7 @@ foreign import ccall unsafe "setTimevalTicks" -- ToDo: move to System.Posix.Internals? -newtype CFdSet = CFdSet () +data CFdSet foreign import ccall safe "select" c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal @@ -1228,12 +1268,6 @@ foreign import ccall unsafe "hsFD_SETSIZE" fD_SETSIZE :: Fd fD_SETSIZE = fromIntegral c_fD_SETSIZE -foreign import ccall unsafe "hsFD_CLR" - c_fdClr :: CInt -> Ptr CFdSet -> IO () - -fdClr :: Fd -> Ptr CFdSet -> IO () -fdClr (Fd fd) fdset = c_fdClr fd fdset - foreign import ccall unsafe "hsFD_ISSET" c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt