[project @ 2003-02-21 05:34:12 by sof]
authorsof <unknown>
Fri, 21 Feb 2003 05:34:13 +0000 (05:34 +0000)
committersof <unknown>
Fri, 21 Feb 2003 05:34:13 +0000 (05:34 +0000)
Asynchronous / non-blocking I/O for Win32 platforms.

This commit introduces a Concurrent Haskell friendly view of I/O on
Win32 platforms. Through the use of a pool of worker Win32 threads, CH
threads may issue asynchronous I/O requests without blocking the
progress of other CH threads. The issuing CH thread is blocked until
the request has been serviced though.

GHC.Conc exports the primops that take care of issuing the
asynchronous I/O requests, which the IO implementation now takes
advantage of. By default, all Handles are non-blocking/asynchronous,
but should performance become an issue, having a per-Handle flag for
turning off non-blocking could easily be imagined&introduced.

[Incidentally, this thread pool-based implementation could easily be
extended to also allow Haskell code to delegate the execution of
arbitrary pieces of (potentially blocking) external code to another OS
thread. Given how relatively gnarly the locking story has turned out
to be with the 'threaded' RTS, that may not be such a bad idea.]

Data/Array/IO.hs
GHC/Conc.lhs
GHC/Handle.hs
GHC/IO.hs

index 6a608fd..b2a9ddc 100644 (file)
@@ -462,10 +462,8 @@ readChunk fd is_stream ptr init_off bytes = loop init_off bytes
   loop :: Int -> Int -> IO Int
   loop off bytes | bytes <= 0 = return (off - init_off)
   loop off bytes = do
-    r' <- throwErrnoIfMinus1RetryMayBlock "readChunk"
-           (read_off_ba (fromIntegral fd) is_stream ptr 
-               (fromIntegral off) (fromIntegral bytes))
-           (threadWaitRead fd)
+    r' <- readRawBuffer "readChunk" (fromIntegral fd) is_stream ptr
+                                   (fromIntegral off) (fromIntegral bytes)
     let r = fromIntegral r'
     if r == 0
        then return (off - init_off)
index 49689a3..cb57319 100644 (file)
@@ -14,6 +14,7 @@
 -- 
 -----------------------------------------------------------------------------
 
+#include "config.h"
 module GHC.Conc
        ( ThreadId(..)
 
@@ -44,7 +45,14 @@ module GHC.Conc
        , isEmptyMVar   -- :: MVar a -> IO Bool
        , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
 
-    ) where
+#ifdef mingw32_TARGET_OS
+       , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+       , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+
+       , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
+       , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
+#endif
+        ) where
 
 import Data.Maybe
 
@@ -54,6 +62,7 @@ import GHC.Num                ( fromInteger, negate )
 import GHC.Base                ( Int(..) )
 import GHC.Exception    ( Exception(..), AsyncException(..) )
 import GHC.Pack                ( packCString# )
+import GHC.Ptr          ( Ptr(..), plusPtr )
 
 infixr 0 `par`, `pseq`
 \end{code}
@@ -310,4 +319,34 @@ threadWaitWrite :: Int -> IO ()
 threadDelay     (I# ms) = IO $ \s -> case delay# ms s     of s -> (# s, () #)
 threadWaitRead  (I# fd) = IO $ \s -> case waitRead# fd s  of s -> (# s, () #)
 threadWaitWrite (I# fd) = IO $ \s -> case waitWrite# fd s of s -> (# s, () #)
+
+#ifdef mingw32_TARGET_OS
+
+-- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional
+-- on Win32, but left in there because lib code (still) uses them (the manner
+-- in which they're used doesn't cause problems on a Win32 platform though.)
+
+asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) = 
+  IO $ \s -> case asyncRead# fd isSock len buf s  of 
+              (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
+
+asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) = 
+  IO $ \s -> case asyncWrite# fd isSock len buf s  of 
+              (# s, len#, err# #) -> (# s, (I# len#, I# err#) #)
+
+-- to aid the use of these primops by the IO Handle implementation,
+-- provide the following convenience funs:
+
+-- this better be a pinned byte array!
+asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
+asyncReadBA fd isSock len off bufB = 
+  asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
+  
+asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
+asyncWriteBA fd isSock len off bufB = 
+  asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
+
+#endif
 \end{code}
index 6760b1f..260074d 100644 (file)
@@ -23,8 +23,13 @@ module GHC.Handle (
   
   newEmptyBuffer, allocateBuffer, readCharFromBuffer, writeCharIntoBuffer,
   flushWriteBufferOnly, flushWriteBuffer, flushReadBuffer, fillReadBuffer,
-  read_off,  read_off_ba,
-  write_off, write_off_ba, unlockFile,
+  readRawBuffer, readRawBufferPtr,
+  writeRawBuffer, writeRawBufferPtr,
+  unlockFile,
+  
+  {- ought to be unnecessary, but just in case.. -}
+  write_off, write_rawBuffer,
+  read_off,  read_rawBuffer,
 
   ioe_closedHandle, ioe_EOF, ioe_notReadable, ioe_notWritable,
 
@@ -360,7 +365,15 @@ newEmptyBuffer b state size
 
 allocateBuffer :: Int -> BufferState -> IO Buffer
 allocateBuffer sz@(I# size) state = IO $ \s -> 
+#ifdef mingw32_TARGET_OS
+   -- To implement asynchronous I/O under Win32, we have to pass
+   -- buffer references to external threads that handles the
+   -- filling/emptying of their contents. Hence, the buffer cannot
+   -- be moved around by the GC.
+  case newPinnedByteArray# size s of { (# s, b #) ->
+#else
   case newByteArray# size s of { (# s, b #) ->
+#endif
   (# s, newEmptyBuffer b state sz #) }
 
 writeCharIntoBuffer :: RawBuffer -> Int -> Char -> IO Int
@@ -441,21 +454,13 @@ flushWriteBuffer fd is_stream buf@Buffer{ bufBuf=b, bufRPtr=r, bufWPtr=w }  = do
   if bytes == 0
      then return (buf{ bufRPtr=0, bufWPtr=0 })
      else do
-  res <- throwErrnoIfMinus1RetryMayBlock "flushWriteBuffer"
-               (write_off_ba (fromIntegral fd) is_stream b (fromIntegral r)
-                             (fromIntegral bytes))
-               (threadWaitWrite fd)
+  res <- writeRawBuffer "flushWriteBuffer" (fromIntegral fd) is_stream b 
+                       (fromIntegral r) (fromIntegral bytes)
   let res' = fromIntegral res
   if res' < bytes 
      then flushWriteBuffer fd is_stream (buf{ bufRPtr = r + res' })
      else return buf{ bufRPtr=0, bufWPtr=0 }
 
-foreign import ccall unsafe "__hscore_PrelHandle_write"
-   write_off_ba :: CInt -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
-
-foreign import ccall unsafe "__hscore_PrelHandle_write"
-   write_off :: CInt -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
-
 fillReadBuffer :: FD -> Bool -> Bool -> Buffer -> IO Buffer
 fillReadBuffer fd is_line is_stream
       buf@Buffer{ bufBuf=b, bufRPtr=r, bufWPtr=w, bufSize=size } =
@@ -477,9 +482,8 @@ fillReadBufferLoop fd is_line is_stream buf b w size = do
 #ifdef DEBUG_DUMP
   puts ("fillReadBufferLoop: bytes = " ++ show bytes ++ "\n")
 #endif
-  res <- throwErrnoIfMinus1RetryMayBlock "fillReadBuffer"
-           (read_off_ba fd is_stream b (fromIntegral w) (fromIntegral bytes))
-           (threadWaitRead fd)
+  res <- readRawBuffer "fillReadBuffer" fd is_stream b
+                      (fromIntegral w) (fromIntegral bytes)
   let res' = fromIntegral res
 #ifdef DEBUG_DUMP
   puts ("fillReadBufferLoop:  res' = " ++ show res' ++ "\n")
@@ -492,12 +496,93 @@ fillReadBufferLoop fd is_line is_stream buf b w size = do
             then fillReadBufferLoop fd is_line is_stream buf b (w+res') size
             else return buf{ bufRPtr=0, bufWPtr=w+res' }
  
+
+-- Low level routines for reading/writing to (raw)buffers:
+
+#ifndef mingw32_TARGET_OS
+readRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+readRawBuffer loc fd is_stream buf off len = 
+  throwErrnoIfMinus1RetryMayBlock loc
+           (read_rawBuffer fd is_stream buf off len)
+           (threadWaitRead fd)
+
+readRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+readRawBufferPtr loc fd is_stream buf off len = 
+  throwErrnoIfMinus1RetryMayBlock loc
+           (read_off fd is_stream buf off len)
+           (threadWaitRead fd)
+
+writeRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+writeRawBuffer loc fd is_stream buf off len = 
+  throwErrnoIfMinus1RetryMayBlock loc
+               (write_rawBuffer (fromIntegral fd) is_stream buf off len)
+               (threadWaitWrite fd)
+
+writeRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+writeRawBufferPtr loc fd is_stream buf off len = 
+  throwErrnoIfMinus1RetryMayBlock loc
+               (write_off (fromIntegral fd) is_stream buf off len)
+               (threadWaitWrite fd)
+
+foreign import ccall unsafe "__hscore_PrelHandle_read"
+   read_rawBuffer :: FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+
+foreign import ccall unsafe "__hscore_PrelHandle_read"
+   read_off :: FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+
+foreign import ccall unsafe "__hscore_PrelHandle_write"
+   write_rawBuffer :: CInt -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+
+foreign import ccall unsafe "__hscore_PrelHandle_write"
+   write_off :: CInt -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+
+#else
+readRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+readRawBuffer loc fd is_stream buf off len = do
+  (l, rc) <- asyncReadBA fd (if is_stream then 1 else 0) (fromIntegral len) off buf
+  if l == (-1)
+   then 
+    ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing)
+    else return (fromIntegral l)
+
+readRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+readRawBufferPtr loc fd is_stream buf off len = do
+  (l, rc) <- asyncRead fd (if is_stream then 1 else 0) (fromIntegral len) (buf `plusPtr` off)
+  if l == (-1)
+   then 
+    ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing)
+    else return (fromIntegral l)
+
+writeRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+writeRawBuffer loc fd is_stream buf off len = do
+  (l, rc) <- asyncWriteBA fd (if is_stream then 1 else 0) (fromIntegral len) off buf
+  if l == (-1)
+   then 
+    ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing)
+    else return (fromIntegral l)
+
+writeRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+writeRawBufferPtr loc fd is_stream buf off len = do
+  (l, rc) <- asyncWrite fd (if is_stream then 1 else 0) (fromIntegral len) (buf `plusPtr` off)
+  if l == (-1)
+   then 
+    ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing)
+    else return (fromIntegral l)
+
 foreign import ccall unsafe "__hscore_PrelHandle_read"
-   read_off_ba :: FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+   read_rawBuffer :: FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
 
 foreign import ccall unsafe "__hscore_PrelHandle_read"
    read_off :: FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
 
+foreign import ccall unsafe "__hscore_PrelHandle_write"
+   write_rawBuffer :: CInt -> Bool -> RawBuffer -> Int -> CInt -> IO CInt
+
+foreign import ccall unsafe "__hscore_PrelHandle_write"
+   write_off :: CInt -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt
+
+#endif
+
 -- ---------------------------------------------------------------------------
 -- Standard Handles
 
@@ -1294,7 +1379,7 @@ hDuplicateTo h1 _ =
 
 #ifdef DEBUG_DUMP
 puts :: String -> IO ()
-puts s = withCString s $ \cstr -> do write_off_ba 1 False cstr 0 (fromIntegral (length s))
+puts s = withCString s $ \cstr -> do write_rawBuffer 1 False cstr 0 (fromIntegral (length s))
                                     return ()
 #endif
 
index ab5b319..a192a67 100644 (file)
--- a/GHC/IO.hs
+++ b/GHC/IO.hs
@@ -103,9 +103,7 @@ hGetChar handle =
     NoBuffering -> do
        -- make use of the minimal buffer we already have
        let raw = bufBuf buf
-       r <- throwErrnoIfMinus1RetryMayBlock "hGetChar"
-               (read_off_ba (fromIntegral fd) (haIsStream handle_) raw 0 1)
-               (threadWaitRead fd)
+       r <- readRawBuffer "hGetChar" (fromIntegral fd) (haIsStream handle_) raw 0 1
        if r == 0
           then ioe_EOF
           else do (c,_) <- readCharFromBuffer raw 0
@@ -288,9 +286,7 @@ lazyRead' h handle_ = do
      NoBuffering      -> do
        -- make use of the minimal buffer we already have
        let raw = bufBuf buf
-       r <- throwErrnoIfMinus1RetryMayBlock "lazyRead"
-               (read_off_ba (fromIntegral fd) (haIsStream handle_) raw 0 1)
-               (threadWaitRead fd)
+       r <- readRawBuffer "lazyRead" (fromIntegral fd) (haIsStream handle_) raw 0 1
        if r == 0
           then do handle_ <- hClose_help handle_ 
                   return (handle_, "")
@@ -346,11 +342,9 @@ hPutChar handle c =
        LineBuffering    -> hPutcBuffered handle_ True  c
        BlockBuffering _ -> hPutcBuffered handle_ False c
        NoBuffering      ->
-               withObject (castCharToCChar c) $ \buf ->
-               throwErrnoIfMinus1RetryMayBlock_ "hPutChar"
-                  (write_off (fromIntegral fd) (haIsStream handle_) buf 0 1)
-                  (threadWaitWrite fd)
-
+               withObject (castCharToCChar c) $ \buf -> do
+                 writeRawBufferPtr "hPutChar" (fromIntegral fd) (haIsStream handle_) buf 0 1
+                 return ()
 
 hPutcBuffered handle_ is_line c = do
   let ref = haBuffer handle_
@@ -633,18 +627,17 @@ hPutBuf handle ptr count
            else do flushed_buf <- flushWriteBuffer fd is_stream old_buf
                    writeIORef ref flushed_buf
                    -- ToDo: should just memcpy instead of writing if possible
-                   writeChunk fd ptr count
+                   writeChunk fd is_stream (castPtr ptr) count
 
-writeChunk :: FD -> Ptr a -> Int -> IO ()
-writeChunk fd ptr bytes = loop 0 bytes 
+writeChunk :: FD -> Bool -> Ptr CChar -> Int -> IO ()
+writeChunk fd is_stream ptr bytes = loop 0 bytes 
  where
   loop :: Int -> Int -> IO ()
   loop _   bytes | bytes <= 0 = return ()
   loop off bytes = do
     r <- fromIntegral `liftM`
-          throwErrnoIfMinus1RetryMayBlock "writeChunk"
-           (c_write (fromIntegral fd) (ptr `plusPtr` off) (fromIntegral bytes))
-           (threadWaitWrite fd)
+          writeRawBufferPtr "writeChunk" (fromIntegral fd) is_stream ptr
+                            off (fromIntegral bytes)
     -- write can't return 0
     loop (off + r) (bytes - r)
 
@@ -657,10 +650,10 @@ hGetBuf handle ptr count
   | count <  0 = illegalBufferSize handle "hGetBuf" count
   | otherwise = 
       wantReadableHandle "hGetBuf" handle $ 
-       \ handle_@Handle__{ haFD=fd, haBuffer=ref } -> do
+       \ handle_@Handle__{ haFD=fd, haBuffer=ref, haIsStream=is_stream } -> do
        buf@Buffer{ bufBuf=raw, bufWPtr=w, bufRPtr=r } <- readIORef ref
        if bufferEmpty buf
-          then readChunk fd ptr count
+          then readChunk fd is_stream ptr count
           else do 
                let avail = w - r
                copied <- if (count >= avail)
@@ -675,20 +668,19 @@ hGetBuf handle ptr count
 
                let remaining = count - copied
                if remaining > 0 
-                  then do rest <- readChunk fd (ptr `plusPtr` copied) remaining
+                  then do rest <- readChunk fd is_stream (ptr `plusPtr` copied) remaining
                           return (rest + copied)
                   else return count
                
-readChunk :: FD -> Ptr a -> Int -> IO Int
-readChunk fd ptr bytes = loop 0 bytes 
+readChunk :: FD -> Bool -> Ptr a -> Int -> IO Int
+readChunk fd is_stream ptr bytes = loop 0 bytes 
  where
   loop :: Int -> Int -> IO Int
   loop off bytes | bytes <= 0 = return off
   loop off bytes = do
     r <- fromIntegral `liftM`
-          throwErrnoIfMinus1RetryMayBlock "readChunk"
-           (c_read (fromIntegral fd) (ptr `plusPtr` off) (fromIntegral bytes))
-           (threadWaitRead fd)
+           readRawBufferPtr "readChunk" (fromIntegral fd) is_stream 
+                           (castPtr ptr) off (fromIntegral bytes)
     if r == 0
        then return off
        else loop (off + r) (bytes - r)