[project @ 2004-10-14 14:58:50 by simonmar]
authorsimonmar <unknown>
Thu, 14 Oct 2004 14:58:51 +0000 (14:58 +0000)
committersimonmar <unknown>
Thu, 14 Oct 2004 14:58:51 +0000 (14:58 +0000)
Threaded RTS improvements:

 - Unix only: implement waitRead#, waitWrite# and delay# in Haskell,
   by having a single Haskell thread (the IO manager) performing a blocking
   select() operation.  Threads communicate with the IO manager
   via channels.  This is faster than doing the select() in the RTS,
   because we only restart the select() when a new request arrives,
   rather than each time around the scheduler.

   On Windows we just make blocking IO calls, we don't have a fancy IO
   manager (yet).

 - Simplify the scheduler for the threaded RTS, now that we don't have
   to wait for IO in the scheduler loop.

 - Remove detectBlackHoles(), which isn't used now (not sure how long
   this has been unused for... perhaps it was needed back when main threads
   used to be GC roots, so we had to check for blackholes manually rather
   than relying on the GC.)

Signals aren't quite right in the threaded RTS.  In fact, they're
slightly worse than before, because the thread receiving signals might
be blocked in a C call - previously there always be another thread
stuck in awaitEvent() that would notice the signal, but that's not
true now.  I can't see an easy fix yet.

GHC/Conc.lhs
cbits/selectUtils.c [new file with mode: 0644]
include/HsBase.h

index 2abc28e..e3bfae2 100644 (file)
@@ -53,16 +53,22 @@ module GHC.Conc
 #endif
         ) where
 
+import System.Posix.Types
+import System.Posix.Internals
+import Foreign
+import Foreign.C
+
 import Data.Maybe
 
 import GHC.Base
-import GHC.IOBase      ( IO(..), MVar(..), ioException, IOException(..), IOErrorType(..) )
-import GHC.Num         ( fromInteger, negate )
-import GHC.Real                ( fromIntegral )
+import GHC.IOBase
+import GHC.Num         ( Num(..) )
+import GHC.Real                ( fromIntegral, quot )
 import GHC.Base                ( Int(..) )
 import GHC.Exception    ( Exception(..), AsyncException(..) )
 import GHC.Pack                ( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
+import GHC.STRef
 
 infixr 0 `par`, `pseq`
 \end{code}
@@ -266,36 +272,7 @@ addMVarFinalizer (MVar m) finalizer =
 %*                                                                     *
 %************************************************************************
 
-@threadWaitRead@ delays rescheduling of a thread until input on the
-specified file descriptor is available for reading (just like select).
-@threadWaitWrite@ is similar, but for writing on a file descriptor.
-
 \begin{code}
--- | Suspends the current thread for a given number of microseconds
--- (GHC only).
---
--- Note that the resolution used by the Haskell runtime system's
--- internal timer is 1\/50 second, and 'threadDelay' will round its
--- argument up to the nearest multiple of this resolution.
---
--- There is no guarantee that the thread will be rescheduled promptly
--- when the delay has expired, but the thread will never continue to
--- run /earlier/ than specified.
---
-threadDelay :: Int -> IO ()
-
--- | Block the current thread until data is available to read on the
--- given file descriptor (GHC only).
-threadWaitRead :: Int -> IO ()
-
--- | Block the current thread until data can be written to the
--- given file descriptor (GHC only).
-threadWaitWrite :: Int -> IO ()
-
-threadDelay     (I# ms) = IO $ \s -> case delay# ms s     of s -> (# s, () #)
-threadWaitRead  (I# fd) = IO $ \s -> case waitRead# fd s  of s -> (# s, () #)
-threadWaitWrite (I# fd) = IO $ \s -> case waitWrite# fd s of s -> (# s, () #)
-
 #ifdef mingw32_TARGET_OS
 
 -- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
@@ -332,4 +309,318 @@ asyncWriteBA fd isSock len off bufB =
   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
 
 #endif
+
+-- -----------------------------------------------------------------------------
+-- Thread IO API
+
+-- | Block the current thread until data is available to read on the
+-- given file descriptor (GHC only).
+threadWaitRead :: Fd -> IO ()
+threadWaitRead fd
+#ifndef mingw32_TARGET_OS
+  | threaded  = waitForReadEvent fd
+#endif
+  | otherwise = IO $ \s -> 
+       case fromIntegral fd of { I# fd# ->
+       case waitRead# fd# s of { s -> (# s, () #)
+       }}
+
+-- | Block the current thread until data can be written to the
+-- given file descriptor (GHC only).
+threadWaitWrite :: Fd -> IO ()
+threadWaitWrite fd
+#ifndef mingw32_TARGET_OS
+  | threaded  = waitForWriteEvent fd
+#endif
+  | otherwise = IO $ \s -> 
+       case fromIntegral fd of { I# fd# ->
+       case waitWrite# fd# s of { s -> (# s, () #)
+       }}
+
+-- | Suspends the current thread for a given number of microseconds
+-- (GHC only).
+--
+-- Note that the resolution used by the Haskell runtime system's
+-- internal timer is 1\/50 second, and 'threadDelay' will round its
+-- argument up to the nearest multiple of this resolution.
+--
+-- There is no guarantee that the thread will be rescheduled promptly
+-- when the delay has expired, but the thread will never continue to
+-- run /earlier/ than specified.
+--
+threadDelay :: Int -> IO ()
+threadDelay time
+#ifndef mingw32_TARGET_OS
+  | threaded  = waitForDelayEvent time
+#else
+  | threaded  = c_Sleep (fromIntegral (time `quot` 1000))
+#endif
+  | otherwise = IO $ \s -> 
+       case fromIntegral time of { I# time# ->
+       case delay# time# s of { s -> (# s, () #)
+       }}
+
+-- On Windows, we just make a safe call to 'Sleep' to implement threadDelay.
+#ifdef mingw32_TARGET_OS
+foreign import ccall safe "Sleep" c_Sleep :: CInt -> IO ()
+#endif
+
+foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
+
+-- ----------------------------------------------------------------------------
+-- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
+
+-- In the threaded RTS, we employ a single IO Manager thread to wait
+-- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
+-- and delays (threadDelay).  
+--
+-- We can do this because in the threaded RTS the IO Manager can make
+-- a non-blocking call to select(), so we don't have to do select() in
+-- the scheduler as we have to in the non-threaded RTS.  We get performance
+-- benefits from doing it this way, because we only have to restart the select()
+-- when a new request arrives, rather than doing one select() each time
+-- around the scheduler loop.  Furthermore, the scheduler can be simplified
+-- by not having to check for completed IO requests.
+
+-- Issues, possible problems:
+--
+--     - we might want bound threads to just do the blocking
+--       operation rather than communicating with the IO manager
+--       thread.  This would prevent simgle-threaded programs which do
+--       IO from requiring multiple OS threads.  However, it would also
+--       prevent bound threads waiting on IO from being killed or sent
+--       exceptions.
+--
+--     - Apprently exec() doesn't work on Linux in a multithreaded program.
+--       I couldn't repeat this.
+--
+--     - How do we handle signal delivery in the multithreaded RTS?
+--
+--     - forkProcess will kill the IO manager thread.  Let's just
+--       hope we don't need to do any blocking IO between fork & exec.
+
+#ifndef mingw32_TARGET_OS
+
+data IOReq
+  = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
+  | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
+
+data DelayReq
+  = Delay  {-# UNPACK #-} !Int {-# UNPACK #-} !(MVar ())
+
+pendingEvents :: IORef [IOReq]
+pendingDelays :: IORef [DelayReq]
+       -- could use a strict list or array here
+{-# NOINLINE pendingEvents #-}
+{-# NOINLINE pendingDelays #-}
+(pendingEvents,pendingDelays) = unsafePerformIO $ do
+  startIOServiceThread
+  reqs <- newIORef []
+  dels <- newIORef []
+  return (reqs, dels)
+       -- the first time we schedule an IO request, the service thread
+       -- will be created (cool, huh?)
+
+startIOServiceThread :: IO ()
+startIOServiceThread = do
+        allocaArray 2 $ \fds -> do
+       throwErrnoIfMinus1 "startIOServiceThread" (c_pipe fds)
+       rd_end <- peekElemOff fds 0
+       wr_end <- peekElemOff fds 1
+       writeIORef stick (fromIntegral wr_end)
+       quickForkIO $ do
+           allocaBytes sizeofFdSet   $ \readfds -> do
+           allocaBytes sizeofFdSet   $ \writefds -> do 
+           allocaBytes sizeofTimeVal $ \timeval -> do
+           service_loop (fromIntegral rd_end) readfds writefds timeval [] []
+       return ()
+
+-- XXX: move real forkIO here from Control.Concurrent?
+quickForkIO action = IO $ \s ->
+   case (fork# action s) of (# s1, id #) -> (# s1, ThreadId id #)
+
+service_loop
+   :: Fd               -- listen to this for wakeup calls
+   -> Ptr CFdSet
+   -> Ptr CFdSet
+   -> Ptr CTimeVal
+   -> [IOReq]
+   -> [DelayReq]
+   -> IO ()
+service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
+
+  -- pick up new IO requests
+  new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
+  let reqs = new_reqs ++ old_reqs
+
+  -- pick up new delay requests
+  new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
+  let  delays = foldr insertDelay old_delays new_delays
+
+  -- build the FDSets for select()
+  fdZero readfds
+  fdZero writefds
+  fdSet wakeup readfds
+  maxfd <- buildFdSets 0 readfds writefds reqs
+
+  -- check the current time and wake up any thread in threadDelay whose
+  -- timeout has expired.  Also find the timeout value for the select() call.
+  now <- getTicksOfDay
+  (delays', timeout) <- getDelay now ptimeval delays
+
+  -- perform the select()
+  let do_select = do
+         res <- c_select ((max wakeup maxfd)+1) readfds writefds 
+                       nullPtr timeout
+         if (res == -1)
+            then do
+               err <- getErrno
+               if err == eINTR
+                       then do_select
+                       else return res
+            else
+               return res
+  res <- do_select
+  -- ToDo: check result
+
+  old <- atomicModifyIORef prodding (\old -> (False,old))
+  if old 
+       then alloca $ \p -> do c_read (fromIntegral wakeup) p 1; return ()
+       else return ()
+
+  reqs' <- completeRequests reqs readfds writefds []
+  service_loop wakeup readfds writefds ptimeval reqs' delays'
+
+stick :: IORef Fd
+{-# NOINLINE stick #-}
+stick = unsafePerformIO (newIORef 0)
+
+prodding :: IORef Bool
+{-# NOINLINE prodding #-}
+prodding = unsafePerformIO (newIORef False)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+  b <- atomicModifyIORef prodding (\old -> (True,old)) -- compare & swap!
+  if (not b)
+       then do
+         fd <- readIORef stick
+         with 42 $ \pbuf -> do c_write (fromIntegral fd) pbuf 1; return ()
+       else
+         return ()
+
+-- -----------------------------------------------------------------------------
+-- IO requests
+
+buildFdSets maxfd readfds writefds [] = return maxfd
+buildFdSets maxfd readfds writefds (Read fd m : reqs) = do
+  fdSet fd readfds
+  buildFdSets (max maxfd fd) readfds writefds reqs
+buildFdSets maxfd readfds writefds (Write fd m : reqs) = do
+  fdSet fd writefds
+  buildFdSets (max maxfd fd) readfds writefds reqs
+
+completeRequests [] _ _ reqs' = return reqs'
+completeRequests (Read fd m : reqs) readfds writefds reqs' = do
+  b <- fdIsSet fd readfds
+  if b /= 0
+    then do putMVar m (); completeRequests reqs readfds writefds reqs'
+    else completeRequests reqs readfds writefds (Read fd m : reqs')
+completeRequests (Write fd m : reqs) readfds writefds reqs' = do
+  b <- fdIsSet fd writefds
+  if b /= 0
+    then do putMVar m (); completeRequests reqs readfds writefds reqs'
+    else completeRequests reqs readfds writefds (Write fd m : reqs')
+
+waitForReadEvent :: Fd -> IO ()
+waitForReadEvent fd = do
+  m <- newEmptyMVar
+  atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
+  prodServiceThread
+  takeMVar m
+
+waitForWriteEvent :: Fd -> IO ()
+waitForWriteEvent fd = do
+  m <- newEmptyMVar
+  atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
+  prodServiceThread
+  takeMVar m
+
+-- XXX: move into GHC.IOBase from Data.IORef?
+atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
+atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
+
+-- -----------------------------------------------------------------------------
+-- Delays
+
+waitForDelayEvent :: Int -> IO ()
+waitForDelayEvent usecs = do
+  m <- newEmptyMVar
+  now <- getTicksOfDay
+  let target = now + usecs `quot` tick_usecs
+  atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
+  prodServiceThread
+  takeMVar m
+
+-- Walk the queue of pending delays, waking up any that have passed
+-- and return the smallest delay to wait for.  The queue of pending
+-- delays is kept ordered.
+getDelay :: Ticks -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
+getDelay now ptimeval [] = return ([],nullPtr)
+getDelay now ptimeval all@(Delay time m : rest)
+  | now >= time = do
+       putMVar m ()
+       getDelay now ptimeval rest
+  | otherwise = do
+       setTimevalTicks ptimeval (time - now)
+       return (all,ptimeval)
+
+insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
+insertDelay d@(Delay time m) [] = [d]
+insertDelay d1@(Delay time m) ds@(d2@(Delay time' m') : rest)
+  | time <= time' = d1 : ds
+  | otherwise     = d2 : insertDelay d1 rest
+
+type Ticks = Int
+tick_freq  = 50 :: Ticks  -- accuracy of threadDelay (ticks per sec)
+tick_usecs = 1000000 `quot` tick_freq :: Int
+
+newtype CTimeVal = CTimeVal ()
+
+foreign import ccall unsafe "sizeofTimeVal"
+  sizeofTimeVal :: Int
+
+foreign import ccall unsafe "getTicksOfDay" 
+  getTicksOfDay :: IO Ticks
+
+foreign import ccall unsafe "setTimevalTicks" 
+  setTimevalTicks :: Ptr CTimeVal -> Ticks -> IO ()
+
+-- ----------------------------------------------------------------------------
+-- select() interface
+
+-- ToDo: move to System.Posix.Internals?
+
+newtype CFdSet = CFdSet ()
+
+foreign import ccall safe "select"
+  c_select :: Fd -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
+           -> IO CInt
+
+foreign import ccall unsafe "hsFD_CLR"
+  fdClr :: Fd -> Ptr CFdSet -> IO ()
+
+foreign import ccall unsafe "hsFD_ISSET"
+  fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
+
+foreign import ccall unsafe "hsFD_SET"
+  fdSet :: Fd -> Ptr CFdSet -> IO ()
+
+foreign import ccall unsafe "hsFD_ZERO"
+  fdZero :: Ptr CFdSet -> IO ()
+
+foreign import ccall unsafe "sizeof_fd_set"
+  sizeofFdSet :: Int
+
+#endif
 \end{code}
diff --git a/cbits/selectUtils.c b/cbits/selectUtils.c
new file mode 100644 (file)
index 0000000..44abb22
--- /dev/null
@@ -0,0 +1,3 @@
+
+#include "HsBase.h"
+void hsFD_ZERO(fd_set *fds) { FD_ZERO(fds); }
index c27140e..74f20f7 100644 (file)
 #include <shlobj.h>
 #endif
 
+#if HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
 /* in inputReady.c */
 int inputReady(int fd, int msecs, int isSock);
 
@@ -713,6 +717,40 @@ INLINE int __hscore_fstat(int fd, struct stat *buf) {
        return (fstat(fd,buf));
 }
 
+// select-related stuff
+
+#if !defined(mingw32_TARGET_OS)
+INLINE void hsFD_CLR(int fd, fd_set *fds) { FD_CLR(fd, fds); }
+INLINE int  hsFD_ISSET(int fd, fd_set *fds) { return FD_ISSET(fd, fds); }
+INLINE void hsFD_SET(int fd, fd_set *fds) { FD_SET(fd, fds); }
+INLINE int  sizeof_fd_set(void) { return sizeof(fd_set); }
+extern void hsFD_ZERO(fd_set *fds);
+#endif
+
+// gettimeofday()-related
+
+#if !defined(mingw32_TARGET_OS)
+#define TICK_FREQ  50
+
+INLINE HsInt sizeofTimeVal(void) { return sizeof(struct timeval); }
+
+INLINE HsInt getTicksOfDay(void)
+{  
+    struct timeval tv;
+    gettimeofday(&tv, (struct timezone *) NULL);
+    return (tv.tv_sec * TICK_FREQ +
+           tv.tv_usec * TICK_FREQ / 1000000);
+}
+
+INLINE void setTimevalTicks(struct timeval *p, HsInt ticks)
+{
+    p->tv_sec  = ticks / TICK_FREQ;
+    p->tv_usec = (ticks % TICK_FREQ) * (1000000 / TICK_FREQ);
+}
+#endif // !defined(mingw32_TARGET_OS)
+
+// Directory-related
+
 #if defined(mingw32_TARGET_OS)
 
 /* Make sure we've got the reqd CSIDL_ constants in scope;