From 967f7424d2713bbe35d2480d3f621f74305e539d Mon Sep 17 00:00:00 2001 From: sof Date: Fri, 21 Feb 2003 05:34:13 +0000 Subject: [PATCH] [project @ 2003-02-21 05:34:12 by sof] Asynchronous / non-blocking I/O for Win32 platforms. This commit introduces a Concurrent Haskell friendly view of I/O on Win32 platforms. Through the use of a pool of worker Win32 threads, CH threads may issue asynchronous I/O requests without blocking the progress of other CH threads. The issuing CH thread is blocked until the request has been serviced though. GHC.Conc exports the primops that take care of issuing the asynchronous I/O requests, which the IO implementation now takes advantage of. By default, all Handles are non-blocking/asynchronous, but should performance become an issue, having a per-Handle flag for turning off non-blocking could easily be imagined&introduced. [Incidentally, this thread pool-based implementation could easily be extended to also allow Haskell code to delegate the execution of arbitrary pieces of (potentially blocking) external code to another OS thread. Given how relatively gnarly the locking story has turned out to be with the 'threaded' RTS, that may not be such a bad idea.] --- Data/Array/IO.hs | 6 +-- GHC/Conc.lhs | 41 ++++++++++++++++++- GHC/Handle.hs | 119 ++++++++++++++++++++++++++++++++++++++++++++++-------- GHC/IO.hs | 42 ++++++++----------- 4 files changed, 161 insertions(+), 47 deletions(-) diff --git a/Data/Array/IO.hs b/Data/Array/IO.hs index 6a608fd..b2a9ddc 100644 --- a/Data/Array/IO.hs +++ b/Data/Array/IO.hs @@ -462,10 +462,8 @@ readChunk fd is_stream ptr init_off bytes = loop init_off bytes loop :: Int -> Int -> IO Int loop off bytes | bytes <= 0 = return (off - init_off) loop off bytes = do - r' <- throwErrnoIfMinus1RetryMayBlock "readChunk" - (read_off_ba (fromIntegral fd) is_stream ptr - (fromIntegral off) (fromIntegral bytes)) - (threadWaitRead fd) + r' <- readRawBuffer "readChunk" (fromIntegral fd) is_stream ptr + (fromIntegral off) (fromIntegral bytes) let r = fromIntegral r' if r == 0 then return (off - init_off) diff --git a/GHC/Conc.lhs b/GHC/Conc.lhs index 49689a3..cb57319 100644 --- a/GHC/Conc.lhs +++ b/GHC/Conc.lhs @@ -14,6 +14,7 @@ -- ----------------------------------------------------------------------------- +#include "config.h" module GHC.Conc ( ThreadId(..) @@ -44,7 +45,14 @@ module GHC.Conc , isEmptyMVar -- :: MVar a -> IO Bool , addMVarFinalizer -- :: MVar a -> IO () -> IO () - ) where +#ifdef mingw32_TARGET_OS + , asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) + , asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) + + , asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) + , asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int) +#endif + ) where import Data.Maybe @@ -54,6 +62,7 @@ import GHC.Num ( fromInteger, negate ) import GHC.Base ( Int(..) ) import GHC.Exception ( Exception(..), AsyncException(..) ) import GHC.Pack ( packCString# ) +import GHC.Ptr ( Ptr(..), plusPtr ) infixr 0 `par`, `pseq` \end{code} @@ -310,4 +319,34 @@ threadWaitWrite :: Int -> IO () threadDelay (I# ms) = IO $ \s -> case delay# ms s of s -> (# s, () #) threadWaitRead (I# fd) = IO $ \s -> case waitRead# fd s of s -> (# s, () #) threadWaitWrite (I# fd) = IO $ \s -> case waitWrite# fd s of s -> (# s, () #) + +#ifdef mingw32_TARGET_OS + +-- Note: threadDelay, threadWaitRead and threadWaitWrite aren't really functional +-- on Win32, but left in there because lib code (still) uses them (the manner +-- in which they're used doesn't cause problems on a Win32 platform though.) + +asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) +asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncRead# fd isSock len buf s of + (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) + +asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) +asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncWrite# fd isSock len buf s of + (# s, len#, err# #) -> (# s, (I# len#, I# err#) #) + +-- to aid the use of these primops by the IO Handle implementation, +-- provide the following convenience funs: + +-- this better be a pinned byte array! +asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) +asyncReadBA fd isSock len off bufB = + asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) + +asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) +asyncWriteBA fd isSock len off bufB = + asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off) + +#endif \end{code} diff --git a/GHC/Handle.hs b/GHC/Handle.hs index 6760b1f..260074d 100644 --- a/GHC/Handle.hs +++ b/GHC/Handle.hs @@ -23,8 +23,13 @@ module GHC.Handle ( newEmptyBuffer, allocateBuffer, readCharFromBuffer, writeCharIntoBuffer, flushWriteBufferOnly, flushWriteBuffer, flushReadBuffer, fillReadBuffer, - read_off, read_off_ba, - write_off, write_off_ba, unlockFile, + readRawBuffer, readRawBufferPtr, + writeRawBuffer, writeRawBufferPtr, + unlockFile, + + {- ought to be unnecessary, but just in case.. -} + write_off, write_rawBuffer, + read_off, read_rawBuffer, ioe_closedHandle, ioe_EOF, ioe_notReadable, ioe_notWritable, @@ -360,7 +365,15 @@ newEmptyBuffer b state size allocateBuffer :: Int -> BufferState -> IO Buffer allocateBuffer sz@(I# size) state = IO $ \s -> +#ifdef mingw32_TARGET_OS + -- To implement asynchronous I/O under Win32, we have to pass + -- buffer references to external threads that handles the + -- filling/emptying of their contents. Hence, the buffer cannot + -- be moved around by the GC. + case newPinnedByteArray# size s of { (# s, b #) -> +#else case newByteArray# size s of { (# s, b #) -> +#endif (# s, newEmptyBuffer b state sz #) } writeCharIntoBuffer :: RawBuffer -> Int -> Char -> IO Int @@ -441,21 +454,13 @@ flushWriteBuffer fd is_stream buf@Buffer{ bufBuf=b, bufRPtr=r, bufWPtr=w } = do if bytes == 0 then return (buf{ bufRPtr=0, bufWPtr=0 }) else do - res <- throwErrnoIfMinus1RetryMayBlock "flushWriteBuffer" - (write_off_ba (fromIntegral fd) is_stream b (fromIntegral r) - (fromIntegral bytes)) - (threadWaitWrite fd) + res <- writeRawBuffer "flushWriteBuffer" (fromIntegral fd) is_stream b + (fromIntegral r) (fromIntegral bytes) let res' = fromIntegral res if res' < bytes then flushWriteBuffer fd is_stream (buf{ bufRPtr = r + res' }) else return buf{ bufRPtr=0, bufWPtr=0 } -foreign import ccall unsafe "__hscore_PrelHandle_write" - write_off_ba :: CInt -> Bool -> RawBuffer -> Int -> CInt -> IO CInt - -foreign import ccall unsafe "__hscore_PrelHandle_write" - write_off :: CInt -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt - fillReadBuffer :: FD -> Bool -> Bool -> Buffer -> IO Buffer fillReadBuffer fd is_line is_stream buf@Buffer{ bufBuf=b, bufRPtr=r, bufWPtr=w, bufSize=size } = @@ -477,9 +482,8 @@ fillReadBufferLoop fd is_line is_stream buf b w size = do #ifdef DEBUG_DUMP puts ("fillReadBufferLoop: bytes = " ++ show bytes ++ "\n") #endif - res <- throwErrnoIfMinus1RetryMayBlock "fillReadBuffer" - (read_off_ba fd is_stream b (fromIntegral w) (fromIntegral bytes)) - (threadWaitRead fd) + res <- readRawBuffer "fillReadBuffer" fd is_stream b + (fromIntegral w) (fromIntegral bytes) let res' = fromIntegral res #ifdef DEBUG_DUMP puts ("fillReadBufferLoop: res' = " ++ show res' ++ "\n") @@ -492,12 +496,93 @@ fillReadBufferLoop fd is_line is_stream buf b w size = do then fillReadBufferLoop fd is_line is_stream buf b (w+res') size else return buf{ bufRPtr=0, bufWPtr=w+res' } + +-- Low level routines for reading/writing to (raw)buffers: + +#ifndef mingw32_TARGET_OS +readRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt +readRawBuffer loc fd is_stream buf off len = + throwErrnoIfMinus1RetryMayBlock loc + (read_rawBuffer fd is_stream buf off len) + (threadWaitRead fd) + +readRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt +readRawBufferPtr loc fd is_stream buf off len = + throwErrnoIfMinus1RetryMayBlock loc + (read_off fd is_stream buf off len) + (threadWaitRead fd) + +writeRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt +writeRawBuffer loc fd is_stream buf off len = + throwErrnoIfMinus1RetryMayBlock loc + (write_rawBuffer (fromIntegral fd) is_stream buf off len) + (threadWaitWrite fd) + +writeRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt +writeRawBufferPtr loc fd is_stream buf off len = + throwErrnoIfMinus1RetryMayBlock loc + (write_off (fromIntegral fd) is_stream buf off len) + (threadWaitWrite fd) + +foreign import ccall unsafe "__hscore_PrelHandle_read" + read_rawBuffer :: FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt + +foreign import ccall unsafe "__hscore_PrelHandle_read" + read_off :: FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt + +foreign import ccall unsafe "__hscore_PrelHandle_write" + write_rawBuffer :: CInt -> Bool -> RawBuffer -> Int -> CInt -> IO CInt + +foreign import ccall unsafe "__hscore_PrelHandle_write" + write_off :: CInt -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt + +#else +readRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt +readRawBuffer loc fd is_stream buf off len = do + (l, rc) <- asyncReadBA fd (if is_stream then 1 else 0) (fromIntegral len) off buf + if l == (-1) + then + ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing) + else return (fromIntegral l) + +readRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt +readRawBufferPtr loc fd is_stream buf off len = do + (l, rc) <- asyncRead fd (if is_stream then 1 else 0) (fromIntegral len) (buf `plusPtr` off) + if l == (-1) + then + ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing) + else return (fromIntegral l) + +writeRawBuffer :: String -> FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt +writeRawBuffer loc fd is_stream buf off len = do + (l, rc) <- asyncWriteBA fd (if is_stream then 1 else 0) (fromIntegral len) off buf + if l == (-1) + then + ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing) + else return (fromIntegral l) + +writeRawBufferPtr :: String -> FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt +writeRawBufferPtr loc fd is_stream buf off len = do + (l, rc) <- asyncWrite fd (if is_stream then 1 else 0) (fromIntegral len) (buf `plusPtr` off) + if l == (-1) + then + ioError (errnoToIOError loc (Errno (fromIntegral rc)) Nothing Nothing) + else return (fromIntegral l) + foreign import ccall unsafe "__hscore_PrelHandle_read" - read_off_ba :: FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt + read_rawBuffer :: FD -> Bool -> RawBuffer -> Int -> CInt -> IO CInt foreign import ccall unsafe "__hscore_PrelHandle_read" read_off :: FD -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt +foreign import ccall unsafe "__hscore_PrelHandle_write" + write_rawBuffer :: CInt -> Bool -> RawBuffer -> Int -> CInt -> IO CInt + +foreign import ccall unsafe "__hscore_PrelHandle_write" + write_off :: CInt -> Bool -> Ptr CChar -> Int -> CInt -> IO CInt + +#endif + -- --------------------------------------------------------------------------- -- Standard Handles @@ -1294,7 +1379,7 @@ hDuplicateTo h1 _ = #ifdef DEBUG_DUMP puts :: String -> IO () -puts s = withCString s $ \cstr -> do write_off_ba 1 False cstr 0 (fromIntegral (length s)) +puts s = withCString s $ \cstr -> do write_rawBuffer 1 False cstr 0 (fromIntegral (length s)) return () #endif diff --git a/GHC/IO.hs b/GHC/IO.hs index ab5b319..a192a67 100644 --- a/GHC/IO.hs +++ b/GHC/IO.hs @@ -103,9 +103,7 @@ hGetChar handle = NoBuffering -> do -- make use of the minimal buffer we already have let raw = bufBuf buf - r <- throwErrnoIfMinus1RetryMayBlock "hGetChar" - (read_off_ba (fromIntegral fd) (haIsStream handle_) raw 0 1) - (threadWaitRead fd) + r <- readRawBuffer "hGetChar" (fromIntegral fd) (haIsStream handle_) raw 0 1 if r == 0 then ioe_EOF else do (c,_) <- readCharFromBuffer raw 0 @@ -288,9 +286,7 @@ lazyRead' h handle_ = do NoBuffering -> do -- make use of the minimal buffer we already have let raw = bufBuf buf - r <- throwErrnoIfMinus1RetryMayBlock "lazyRead" - (read_off_ba (fromIntegral fd) (haIsStream handle_) raw 0 1) - (threadWaitRead fd) + r <- readRawBuffer "lazyRead" (fromIntegral fd) (haIsStream handle_) raw 0 1 if r == 0 then do handle_ <- hClose_help handle_ return (handle_, "") @@ -346,11 +342,9 @@ hPutChar handle c = LineBuffering -> hPutcBuffered handle_ True c BlockBuffering _ -> hPutcBuffered handle_ False c NoBuffering -> - withObject (castCharToCChar c) $ \buf -> - throwErrnoIfMinus1RetryMayBlock_ "hPutChar" - (write_off (fromIntegral fd) (haIsStream handle_) buf 0 1) - (threadWaitWrite fd) - + withObject (castCharToCChar c) $ \buf -> do + writeRawBufferPtr "hPutChar" (fromIntegral fd) (haIsStream handle_) buf 0 1 + return () hPutcBuffered handle_ is_line c = do let ref = haBuffer handle_ @@ -633,18 +627,17 @@ hPutBuf handle ptr count else do flushed_buf <- flushWriteBuffer fd is_stream old_buf writeIORef ref flushed_buf -- ToDo: should just memcpy instead of writing if possible - writeChunk fd ptr count + writeChunk fd is_stream (castPtr ptr) count -writeChunk :: FD -> Ptr a -> Int -> IO () -writeChunk fd ptr bytes = loop 0 bytes +writeChunk :: FD -> Bool -> Ptr CChar -> Int -> IO () +writeChunk fd is_stream ptr bytes = loop 0 bytes where loop :: Int -> Int -> IO () loop _ bytes | bytes <= 0 = return () loop off bytes = do r <- fromIntegral `liftM` - throwErrnoIfMinus1RetryMayBlock "writeChunk" - (c_write (fromIntegral fd) (ptr `plusPtr` off) (fromIntegral bytes)) - (threadWaitWrite fd) + writeRawBufferPtr "writeChunk" (fromIntegral fd) is_stream ptr + off (fromIntegral bytes) -- write can't return 0 loop (off + r) (bytes - r) @@ -657,10 +650,10 @@ hGetBuf handle ptr count | count < 0 = illegalBufferSize handle "hGetBuf" count | otherwise = wantReadableHandle "hGetBuf" handle $ - \ handle_@Handle__{ haFD=fd, haBuffer=ref } -> do + \ handle_@Handle__{ haFD=fd, haBuffer=ref, haIsStream=is_stream } -> do buf@Buffer{ bufBuf=raw, bufWPtr=w, bufRPtr=r } <- readIORef ref if bufferEmpty buf - then readChunk fd ptr count + then readChunk fd is_stream ptr count else do let avail = w - r copied <- if (count >= avail) @@ -675,20 +668,19 @@ hGetBuf handle ptr count let remaining = count - copied if remaining > 0 - then do rest <- readChunk fd (ptr `plusPtr` copied) remaining + then do rest <- readChunk fd is_stream (ptr `plusPtr` copied) remaining return (rest + copied) else return count -readChunk :: FD -> Ptr a -> Int -> IO Int -readChunk fd ptr bytes = loop 0 bytes +readChunk :: FD -> Bool -> Ptr a -> Int -> IO Int +readChunk fd is_stream ptr bytes = loop 0 bytes where loop :: Int -> Int -> IO Int loop off bytes | bytes <= 0 = return off loop off bytes = do r <- fromIntegral `liftM` - throwErrnoIfMinus1RetryMayBlock "readChunk" - (c_read (fromIntegral fd) (ptr `plusPtr` off) (fromIntegral bytes)) - (threadWaitRead fd) + readRawBufferPtr "readChunk" (fromIntegral fd) is_stream + (castPtr ptr) off (fromIntegral bytes) if r == 0 then return off else loop (off + r) (bytes - r) -- 1.7.10.4