X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=GHC%2FConc.lhs;h=e3bfae2793f6d878e451d8c9d3da9aef598fe381;hb=af7a1e96efe4aa3f10cbd29e9989a7fc695d7ff9;hp=2abc28e71d40854b8d6acb33f87d5b9c01ae55e6;hpb=b6cbe4e36af5a486678e4622839dcef2aa487cc7;p=haskell-directory.git diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 2abc28e..e3bfae2 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -53,16 +53,22 @@ module GHC.Conc #endif ) where +import System.Posix.Types +import System.Posix.Internals +import Foreign +import Foreign.C + import Data.Maybe import GHC.Base -import GHC.IOBase ( IO(..), MVar(..), ioException, IOException(..), IOErrorType(..) ) -import GHC.Num ( fromInteger, negate ) -import GHC.Real ( fromIntegral ) +import GHC.IOBase +import GHC.Num ( Num(..) ) +import GHC.Real ( fromIntegral, quot ) import GHC.Base ( Int(..) ) import GHC.Exception ( Exception(..), AsyncException(..) ) import GHC.Pack ( packCString# ) import GHC.Ptr ( Ptr(..), plusPtr, FunPtr(..) ) +import GHC.STRef infixr 0 `par`, `pseq` \end{code} @@ -266,36 +272,7 @@ addMVarFinalizer (MVar m) finalizer = %* * %************************************************************************ -@threadWaitRead@ delays rescheduling of a thread until input on the -specified file descriptor is available for reading (just like select). -@threadWaitWrite@ is similar, but for writing on a file descriptor. - \begin{code} --- | Suspends the current thread for a given number of microseconds --- (GHC only). --- --- Note that the resolution used by the Haskell runtime system's --- internal timer is 1\/50 second, and 'threadDelay' will round its --- argument up to the nearest multiple of this resolution. --- --- There is no guarantee that the thread will be rescheduled promptly --- when the delay has expired, but the thread will never continue to --- run /earlier/ than specified. --- -threadDelay :: Int -> IO () - --- | Block the current thread until data is available to read on the --- given file descriptor (GHC only). -threadWaitRead :: Int -> IO () - --- | Block the current thread until data can be written to the --- given file descriptor (GHC only). -threadWaitWrite :: Int -> IO () - -threadDelay (I# ms) = IO $ \s -> case delay# ms s of s -> (# s, () #) -threadWaitRead (I# fd) = IO $ \s -> case waitRead# fd s of s -> (# s, () #) -threadWaitWrite (I# fd) = IO $ \s -> case waitWrite# fd s of s -> (# s, () #) - #ifdef mingw32_TARGET_OS -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional @@ -332,4 +309,318 @@ asyncWriteBA fd isSock len off bufB = asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) #endif + +-- ----------------------------------------------------------------------------- +-- Thread IO API + +-- | Block the current thread until data is available to read on the +-- given file descriptor (GHC only). +threadWaitRead :: Fd -> IO () +threadWaitRead fd +#ifndef mingw32_TARGET_OS + | threaded = waitForReadEvent fd +#endif + | otherwise = IO $ \s -> + case fromIntegral fd of { I# fd# -> + case waitRead# fd# s of { s -> (# s, () #) + }} + +-- | Block the current thread until data can be written to the +-- given file descriptor (GHC only). +threadWaitWrite :: Fd -> IO () +threadWaitWrite fd +#ifndef mingw32_TARGET_OS + | threaded = waitForWriteEvent fd +#endif + | otherwise = IO $ \s -> + case fromIntegral fd of { I# fd# -> + case waitWrite# fd# s of { s -> (# s, () #) + }} + +-- | Suspends the current thread for a given number of microseconds +-- (GHC only). +-- +-- Note that the resolution used by the Haskell runtime system's +-- internal timer is 1\/50 second, and 'threadDelay' will round its +-- argument up to the nearest multiple of this resolution. +-- +-- There is no guarantee that the thread will be rescheduled promptly +-- when the delay has expired, but the thread will never continue to +-- run /earlier/ than specified. +-- +threadDelay :: Int -> IO () +threadDelay time +#ifndef mingw32_TARGET_OS + | threaded = waitForDelayEvent time +#else + | threaded = c_Sleep (fromIntegral (time `quot` 1000)) +#endif + | otherwise = IO $ \s -> + case fromIntegral time of { I# time# -> + case delay# time# s of { s -> (# s, () #) + }} + +-- On Windows, we just make a safe call to 'Sleep' to implement threadDelay. +#ifdef mingw32_TARGET_OS +foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO () +#endif + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + +-- ---------------------------------------------------------------------------- +-- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay + +-- In the threaded RTS, we employ a single IO Manager thread to wait +-- for all outstanding IO requests (threadWaitRead,threadWaitWrite) +-- and delays (threadDelay). +-- +-- We can do this because in the threaded RTS the IO Manager can make +-- a non-blocking call to select(), so we don't have to do select() in +-- the scheduler as we have to in the non-threaded RTS. We get performance +-- benefits from doing it this way, because we only have to restart the select() +-- when a new request arrives, rather than doing one select() each time +-- around the scheduler loop. Furthermore, the scheduler can be simplified +-- by not having to check for completed IO requests. + +-- Issues, possible problems: +-- +-- - we might want bound threads to just do the blocking +-- operation rather than communicating with the IO manager +-- thread. This would prevent simgle-threaded programs which do +-- IO from requiring multiple OS threads. However, it would also +-- prevent bound threads waiting on IO from being killed or sent +-- exceptions. +-- +-- - Apprently exec() doesn't work on Linux in a multithreaded program. +-- I couldn't repeat this. +-- +-- - How do we handle signal delivery in the multithreaded RTS? +-- +-- - forkProcess will kill the IO manager thread. Let's just +-- hope we don't need to do any blocking IO between fork & exec. + +#ifndef mingw32_TARGET_OS + +data IOReq + = Read {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) + | Write {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ()) + +data DelayReq + = Delay {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ()) + +pendingEvents :: IORef [IOReq] +pendingDelays :: IORef [DelayReq] + -- could use a strict list or array here +{-# NOINLINE pendingEvents #-} +{-# NOINLINE pendingDelays #-} +(pendingEvents,pendingDelays) = unsafePerformIO $ do + startIOServiceThread + reqs <- newIORef [] + dels <- newIORef [] + return (reqs, dels) + -- the first time we schedule an IO request, the service thread + -- will be created (cool, huh?) + +startIOServiceThread :: IO () +startIOServiceThread = do + allocaArray 2 $ \fds -> do + throwErrnoIfMinus1 "startIOServiceThread" (c_pipe fds) + rd_end <- peekElemOff fds 0 + wr_end <- peekElemOff fds 1 + writeIORef stick (fromIntegral wr_end) + quickForkIO $ do + allocaBytes sizeofFdSet $ \readfds -> do + allocaBytes sizeofFdSet $ \writefds -> do + allocaBytes sizeofTimeVal $ \timeval -> do + service_loop (fromIntegral rd_end) readfds writefds timeval [] [] + return () + +-- XXX: move real forkIO here from Control.Concurrent? +quickForkIO action = IO $ \s -> + case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #) + +service_loop + :: Fd -- listen to this for wakeup calls + -> Ptr CFdSet + -> Ptr CFdSet + -> Ptr CTimeVal + -> [IOReq] + -> [DelayReq] + -> IO () +service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do + + -- pick up new IO requests + new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a)) + let reqs = new_reqs ++ old_reqs + + -- pick up new delay requests + new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a)) + let delays = foldr insertDelay old_delays new_delays + + -- build the FDSets for select() + fdZero readfds + fdZero writefds + fdSet wakeup readfds + maxfd <- buildFdSets 0 readfds writefds reqs + + -- check the current time and wake up any thread in threadDelay whose + -- timeout has expired. Also find the timeout value for the select() call. + now <- getTicksOfDay + (delays', timeout) <- getDelay now ptimeval delays + + -- perform the select() + let do_select = do + res <- c_select ((max wakeup maxfd)+1) readfds writefds + nullPtr timeout + if (res == -1) + then do + err <- getErrno + if err == eINTR + then do_select + else return res + else + return res + res <- do_select + -- ToDo: check result + + old <- atomicModifyIORef prodding (\old -> (False,old)) + if old + then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return () + else return () + + reqs' <- completeRequests reqs readfds writefds [] + service_loop wakeup readfds writefds ptimeval reqs' delays' + +stick :: IORef Fd +{-# NOINLINE stick #-} +stick = unsafePerformIO (newIORef 0) + +prodding :: IORef Bool +{-# NOINLINE prodding #-} +prodding = unsafePerformIO (newIORef False) + +prodServiceThread :: IO () +prodServiceThread = do + b <- atomicModifyIORef prodding (\old -> (True,old)) -- compare & swap! + if (not b) + then do + fd <- readIORef stick + with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return () + else + return () + +-- ----------------------------------------------------------------------------- +-- IO requests + +buildFdSets maxfd readfds writefds [] = return maxfd +buildFdSets maxfd readfds writefds (Read fd m : reqs) = do + fdSet fd readfds + buildFdSets (max maxfd fd) readfds writefds reqs +buildFdSets maxfd readfds writefds (Write fd m : reqs) = do + fdSet fd writefds + buildFdSets (max maxfd fd) readfds writefds reqs + +completeRequests [] _ _ reqs' = return reqs' +completeRequests (Read fd m : reqs) readfds writefds reqs' = do + b <- fdIsSet fd readfds + if b /= 0 + then do putMVar m (); completeRequests reqs readfds writefds reqs' + else completeRequests reqs readfds writefds (Read fd m : reqs') +completeRequests (Write fd m : reqs) readfds writefds reqs' = do + b <- fdIsSet fd writefds + if b /= 0 + then do putMVar m (); completeRequests reqs readfds writefds reqs' + else completeRequests reqs readfds writefds (Write fd m : reqs') + +waitForReadEvent :: Fd -> IO () +waitForReadEvent fd = do + m <- newEmptyMVar + atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ())) + prodServiceThread + takeMVar m + +waitForWriteEvent :: Fd -> IO () +waitForWriteEvent fd = do + m <- newEmptyMVar + atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ())) + prodServiceThread + takeMVar m + +-- XXX: move into GHC.IOBase from Data.IORef? +atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b +atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s + +-- ----------------------------------------------------------------------------- +-- Delays + +waitForDelayEvent :: Int -> IO () +waitForDelayEvent usecs = do + m <- newEmptyMVar + now <- getTicksOfDay + let target = now + usecs `quot` tick_usecs + atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ())) + prodServiceThread + takeMVar m + +-- Walk the queue of pending delays, waking up any that have passed +-- and return the smallest delay to wait for. The queue of pending +-- delays is kept ordered. +getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal) +getDelay now ptimeval [] = return ([],nullPtr) +getDelay now ptimeval all@(Delay time m : rest) + | now >= time = do + putMVar m () + getDelay now ptimeval rest + | otherwise = do + setTimevalTicks ptimeval (time - now) + return (all,ptimeval) + +insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] +insertDelay d@(Delay time m) [] = [d] +insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest) + | time <= time' = d1 : ds + | otherwise = d2 : insertDelay d1 rest + +type Ticks = Int +tick_freq = 50 :: Ticks -- accuracy of threadDelay (ticks per sec) +tick_usecs = 1000000 `quot` tick_freq :: Int + +newtype CTimeVal = CTimeVal () + +foreign import ccall unsafe "sizeofTimeVal" + sizeofTimeVal :: Int + +foreign import ccall unsafe "getTicksOfDay" + getTicksOfDay :: IO Ticks + +foreign import ccall unsafe "setTimevalTicks" + setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO () + +-- ---------------------------------------------------------------------------- +-- select() interface + +-- ToDo: move to System.Posix.Internals? + +newtype CFdSet = CFdSet () + +foreign import ccall safe "select" + c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal + -> IO CInt + +foreign import ccall unsafe "hsFD_CLR" + fdClr :: Fd -> Ptr CFdSet -> IO () + +foreign import ccall unsafe "hsFD_ISSET" + fdIsSet :: Fd -> Ptr CFdSet -> IO CInt + +foreign import ccall unsafe "hsFD_SET" + fdSet :: Fd -> Ptr CFdSet -> IO () + +foreign import ccall unsafe "hsFD_ZERO" + fdZero :: Ptr CFdSet -> IO () + +foreign import ccall unsafe "sizeof_fd_set" + sizeofFdSet :: Int + +#endif \end{code}