From 6986b2b2439ce264df153878374f70cee54ef100 Mon Sep 17 00:00:00 2001 From: simonmar Date: Thu, 16 Sep 1999 13:14:43 +0000 Subject: [PATCH] [project @ 1999-09-16 13:14:38 by simonmar] Cleanup of non-blocking I/O - file descriptors are now always set to non-blocking mode. - we don't do an inputReady operation on descriptors before attempting to read from them any more. - the non-blocking flag on Handles has gone. - the {set,clear}[Conn]NonBlockingFlag() functions have gone. - the socket operations have been made to work properly with threads: accept is now non-blocking (it does a threadWaitRead instead of blocking), and the file descriptors returned by accept are set to non-blocking mode. Win32 will need some adjustments, no doubt. --- ghc/lib/misc/SocketPrim.lhs | 23 ++++++++++--- ghc/lib/misc/cbits/acceptSocket.c | 68 +++++++++++++++++++++---------------- ghc/lib/misc/cbits/createSocket.c | 6 ++++ ghc/lib/std/PrelHandle.lhs | 28 +++------------ ghc/lib/std/cbits/fileObject.c | 31 +---------------- ghc/lib/std/cbits/fileObject.h | 1 - ghc/lib/std/cbits/filePutc.c | 26 +++++++------- ghc/lib/std/cbits/openFile.c | 18 ++++++---- ghc/lib/std/cbits/readFile.c | 11 +----- ghc/lib/std/cbits/writeFile.c | 20 +++++------ 10 files changed, 105 insertions(+), 127 deletions(-) diff --git a/ghc/lib/misc/SocketPrim.lhs b/ghc/lib/misc/SocketPrim.lhs index 809cd85..1eb4111 100644 --- a/ghc/lib/misc/SocketPrim.lhs +++ b/ghc/lib/misc/SocketPrim.lhs @@ -87,6 +87,7 @@ import Ix import Weak ( addForeignFinalizer ) import PrelIOBase -- IOError, Handle representation import PrelHandle +import PrelConc ( threadWaitRead ) import Foreign import Addr ( nullAddr ) @@ -373,14 +374,26 @@ accept sock@(MkSocket s family stype protocol status) = do (ptr, sz) <- allocSockAddr family int_star <- stToIO (newIntArray ((0::Int),1)) stToIO (writeIntArray int_star 0 sz) + new_sock <- accept_socket s ptr int_star + a_sz <- stToIO (readIntArray int_star 0) + addr <- unpackSockAddr ptr a_sz + new_status <- newIORef Connected + return ((MkSocket new_sock family stype protocol new_status), addr) + +accept_socket :: Int + -> MutableByteArray RealWorld Int + -> MutableByteArray RealWorld Int + -> IO Int + +accept_socket s ptr int_star = do new_sock <- _ccall_ acceptSocket s ptr int_star case (new_sock::Int) of -1 -> constructErrorAndFail "accept" - _ -> do - a_sz <- stToIO (readIntArray int_star 0) - addr <- unpackSockAddr ptr a_sz - new_status <- newIORef Connected - return ((MkSocket new_sock family stype protocol new_status), addr) + + -- wait if there are no pending connections + -5 -> threadWaitRead s >> accept_socket s ptr int_star + + _ -> return new_sock \end{code} %************************************************************************ diff --git a/ghc/lib/misc/cbits/acceptSocket.c b/ghc/lib/misc/cbits/acceptSocket.c index efd13b9..3995795 100644 --- a/ghc/lib/misc/cbits/acceptSocket.c +++ b/ghc/lib/misc/cbits/acceptSocket.c @@ -16,37 +16,47 @@ StgInt acceptSocket(I_ sockfd, A_ peer, A_ addrlen) { StgInt fd; - + long flags; + while ((fd = accept((int)sockfd, (struct sockaddr *)peer, (int *)addrlen)) < 0) { - if (errno != EINTR) { - cvtErrno(); - switch (ghc_errno) { - default: - stdErrno(); - break; - case GHC_EBADF: - ghc_errtype = ERR_INVALIDARGUMENT; - ghc_errstr = "Not a valid descriptor"; - break; - case GHC_EFAULT: - ghc_errtype = ERR_INVALIDARGUMENT; - ghc_errstr = "Address not in writeable part of user address space"; - break; - case GHC_ENOTSOCK: - ghc_errtype = ERR_INVALIDARGUMENT; - ghc_errstr = "Descriptor not a socket"; - break; - case GHC_EOPNOTSUPP: - ghc_errtype = ERR_INVALIDARGUMENT; - ghc_errstr = "Socket not of type that supports listen"; - break; - case GHC_EWOULDBLOCK: - ghc_errtype = ERR_OTHERERROR; - ghc_errstr = "No sockets are present to be accepted"; - break; - } - return -1; + if (errno == EAGAIN) { + errno = 0; + return FILEOBJ_BLOCKED_READ; + + } else if (errno != EINTR) { + cvtErrno(); + switch (ghc_errno) { + default: + stdErrno(); + break; + case GHC_EBADF: + ghc_errtype = ERR_INVALIDARGUMENT; + ghc_errstr = "Not a valid descriptor"; + break; + case GHC_EFAULT: + ghc_errtype = ERR_INVALIDARGUMENT; + ghc_errstr = "Address not in writeable part of user address space"; + break; + case GHC_ENOTSOCK: + ghc_errtype = ERR_INVALIDARGUMENT; + ghc_errstr = "Descriptor not a socket"; + break; + case GHC_EOPNOTSUPP: + ghc_errtype = ERR_INVALIDARGUMENT; + ghc_errstr = "Socket not of type that supports listen"; + break; + case GHC_EWOULDBLOCK: + ghc_errtype = ERR_OTHERERROR; + ghc_errstr = "No sockets are present to be accepted"; + break; + } + return -1; } } + + /* set the non-blocking flag on this file descriptor */ + flags = fcntl(fd, F_GETFL); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + return fd; } diff --git a/ghc/lib/misc/cbits/createSocket.c b/ghc/lib/misc/cbits/createSocket.c index 8b30d72..297fcb2 100644 --- a/ghc/lib/misc/cbits/createSocket.c +++ b/ghc/lib/misc/cbits/createSocket.c @@ -16,6 +16,7 @@ StgInt createSocket(I_ family, I_ type, I_ protocol) { int fd; + long flags; if ((fd = socket((int)family, (int)type, (int)protocol)) < 0) { if (errno != EINTR) { @@ -48,5 +49,10 @@ createSocket(I_ family, I_ type, I_ protocol) return (StgInt)-1; } } + + /* set the non-blocking flag on this file descriptor */ + flags = fcntl(fd, F_GETFL); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + return (StgInt)fd; } diff --git a/ghc/lib/std/PrelHandle.lhs b/ghc/lib/std/PrelHandle.lhs index 5085b9c..ba3cc2c 100644 --- a/ghc/lib/std/PrelHandle.lhs +++ b/ghc/lib/std/PrelHandle.lhs @@ -31,7 +31,7 @@ import PrelPack ( packString ) import PrelWeak ( addForeignFinalizer ) import Ix -#if __CONCURRENT_HASKELL__ +#ifdef __CONCURRENT_HASKELL__ import PrelConc #endif @@ -222,7 +222,7 @@ stdout = unsafePerformIO (do (0::Int){-writeable-} -- ConcHask: SAFE, won't block #else fo <- CCALL(openStdFile) (1::Int) - ((1{-flush on close-} + 128 {- don't block on I/O-})::Int) + ((1{-flush on close-})::Int) (0::Int){-writeable-} -- ConcHask: SAFE, won't block #endif @@ -256,7 +256,7 @@ stdin = unsafePerformIO (do (1::Int){-readable-} -- ConcHask: SAFE, won't block #else fo <- CCALL(openStdFile) (0::Int) - ((0{-flush on close-} + 128 {- don't block on I/O-})::Int) + ((0{-flush on close-})::Int) (1::Int){-readable-} -- ConcHask: SAFE, won't block #endif @@ -288,7 +288,7 @@ stderr = unsafePerformIO (do (0::Int){-writeable-} -- ConcHask: SAFE, won't block #else fo <- CCALL(openStdFile) (2::Int) - ((1{-flush on close-} + 128 {- don't block on I/O-})::Int) + ((1{-flush on close-})::Int) (0::Int){-writeable-} -- ConcHask: SAFE, won't block #endif @@ -348,15 +348,7 @@ openFileEx f m = do BinaryMode bmo -> (bmo, 1) TextMode tmo -> (tmo, 0) -#ifndef __CONCURRENT_HASKELL__ - file_flags = file_flags' -#else - -- See comment next to 'stderr' for why we leave - -- non-blocking off for now. - file_flags = file_flags' + 128 -- Don't block on I/O -#endif - - (file_flags', file_mode) = + (file_flags, file_mode) = case imo of AppendMode -> (1, 0) WriteMode -> (1, 1) @@ -1164,21 +1156,16 @@ mayBlock fo act = do -5 -> do -- (possibly blocking) read fd <- CCALL(getFileFd) fo threadWaitRead fd - CCALL(clearNonBlockingIOFlag__) fo -- force read to happen this time. mayBlock fo act -- input available, re-try -6 -> do -- (possibly blocking) write fd <- CCALL(getFileFd) fo threadWaitWrite fd - CCALL(clearNonBlockingIOFlag__) fo -- force write to happen this time. mayBlock fo act -- output possible -7 -> do -- (possibly blocking) write on connected handle fd <- CCALL(getConnFileFd) fo threadWaitWrite fd - CCALL(clearConnNonBlockingIOFlag__) fo -- force write to happen this time. mayBlock fo act -- output possible _ -> do - CCALL(setNonBlockingIOFlag__) fo -- reset file object. - CCALL(setConnNonBlockingIOFlag__) fo -- reset (connected) file object. return rc \end{code} @@ -1247,11 +1234,6 @@ foreign import ccall "libHS_cbits.so" "freeFileObject" unsafe prim_freeFi foreign import ccall "libHS_cbits.so" "freeStdFileObject" unsafe prim_freeStdFileObject :: FILE_OBJ -> IO () foreign import ccall "libHS_cbits.so" "const_BUFSIZ" unsafe const_BUFSIZ :: Int -foreign import ccall "libHS_cbits.so" "setConnNonBlockingIOFlag__" unsafe prim_setConnNonBlockingIOFlag__ :: FILE_OBJ -> IO () -foreign import ccall "libHS_cbits.so" "clearConnNonBlockingIOFlag__" unsafe prim_clearConnNonBlockingIOFlag__ :: FILE_OBJ -> IO () -foreign import ccall "libHS_cbits.so" "setNonBlockingIOFlag__" unsafe prim_setNonBlockingIOFlag__ :: FILE_OBJ -> IO () -foreign import ccall "libHS_cbits.so" "clearNonBlockingIOFlag__" unsafe prim_clearNonBlockingIOFlag__ :: FILE_OBJ -> IO () - foreign import ccall "libHS_cbits.so" "getErrStr__" unsafe prim_getErrStr__ :: IO Addr foreign import ccall "libHS_cbits.so" "getErrNo__" unsafe prim_getErrNo__ :: IO Int foreign import ccall "libHS_cbits.so" "getErrType__" unsafe prim_getErrType__ :: IO Int diff --git a/ghc/lib/std/cbits/fileObject.c b/ghc/lib/std/cbits/fileObject.c index d209f66..b8b009d 100644 --- a/ghc/lib/std/cbits/fileObject.c +++ b/ghc/lib/std/cbits/fileObject.c @@ -1,7 +1,7 @@ /* * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998 * - * $Id: fileObject.c,v 1.5 1999/07/12 10:43:12 sof Exp $ + * $Id: fileObject.c,v 1.6 1999/09/16 13:14:42 simonmar Exp $ * * hPutStr Runtime Support */ @@ -136,32 +136,6 @@ StgInt getPushbackBufSize() { return (__pushback_buf_size__); } -void -clearNonBlockingIOFlag__ (ptr) -StgForeignPtr ptr; -{ ((IOFileObject*)ptr)->flags &= ~FILEOBJ_NONBLOCKING_IO; } - -void -setNonBlockingIOFlag__ (ptr) -StgForeignPtr ptr; -{ ((IOFileObject*)ptr)->flags |= FILEOBJ_NONBLOCKING_IO; } - -void -clearConnNonBlockingIOFlag__ (ptr) -StgForeignPtr ptr; -{ ((IOFileObject*)ptr)->connectedTo->flags &= ~FILEOBJ_NONBLOCKING_IO; } - -void -setConnNonBlockingIOFlag__ (ptr) -StgForeignPtr ptr; -{ - if ( ((IOFileObject*)ptr)->connectedTo != NULL ) { - ((IOFileObject*)ptr)->connectedTo->flags |= FILEOBJ_NONBLOCKING_IO; - } - return; -} - - /* Only ever called on line-buffered file objects */ StgInt fill_up_line_buffer(fo) @@ -180,9 +154,6 @@ IOFileObject* fo; len = fo->bufSize - fo->bufWPtr + 1; p = (unsigned char*)fo->buf + fo->bufWPtr; - if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady ((StgForeignPtr)fo,0) != 1 ) - return FILEOBJ_BLOCKED_READ; - if ((count = ( #ifdef USE_WINSOCK diff --git a/ghc/lib/std/cbits/fileObject.h b/ghc/lib/std/cbits/fileObject.h index 886373f..0da85ac 100644 --- a/ghc/lib/std/cbits/fileObject.h +++ b/ghc/lib/std/cbits/fileObject.h @@ -40,7 +40,6 @@ typedef struct _IOFileObject { #define FILEOBJ_READ 16 #define FILEOBJ_WRITE 32 #define FILEOBJ_STD 64 -#define FILEOBJ_NONBLOCKING_IO 128 /* The next two flags are used for RW file objects only. They indicate whether the last operation was a read or a write. (Need this info to determine whether a RW file object's diff --git a/ghc/lib/std/cbits/filePutc.c b/ghc/lib/std/cbits/filePutc.c index b205116..b48f9fe 100644 --- a/ghc/lib/std/cbits/filePutc.c +++ b/ghc/lib/std/cbits/filePutc.c @@ -1,7 +1,7 @@ /* * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998 * - * $Id: filePutc.c,v 1.7 1999/07/12 10:43:13 sof Exp $ + * $Id: filePutc.c,v 1.8 1999/09/16 13:14:43 simonmar Exp $ * * hPutChar Runtime Support */ @@ -78,23 +78,25 @@ StgChar c; return rc; } - if ( fo->flags & FILEOBJ_NONBLOCKING_IO ) - return FILEOBJ_BLOCKED_WRITE; - /* Unbuffered, write the character directly. */ - while ((rc = ( + while (rc = ( #ifdef USE_WINSOCK fo->flags & FILEOBJ_WINSOCK ? send(fo->fd, &c, 1, 0) : - write(fo->fd, &c, 1))) == 0 && errno == EINTR) ; + write(fo->fd, &c, 1)) <= 0) { #else - write(fo->fd, &c, 1))) == 0 && errno == EINTR) ; + write(fo->fd, &c, 1)) <= 0) { #endif - if (rc == 0) { - cvtErrno(); - stdErrno(); - return -1; + + if ( rc == -1 && errno == EAGAIN) { + errno = 0; + return FILEOBJ_BLOCKED_WRITE; + } else if (rc == 0 || rc == -1 && errno != EINTR) { + cvtErrno(); + stdErrno(); + return -1; + } } - return 0; + return 0; } diff --git a/ghc/lib/std/cbits/openFile.c b/ghc/lib/std/cbits/openFile.c index 7d3b217..5f24491 100644 --- a/ghc/lib/std/cbits/openFile.c +++ b/ghc/lib/std/cbits/openFile.c @@ -1,7 +1,7 @@ /* * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998 * - * $Id: openFile.c,v 1.6 1999/02/04 12:13:15 sof Exp $ + * $Id: openFile.c,v 1.7 1999/09/16 13:14:43 simonmar Exp $ * * openFile Runtime Support */ @@ -40,6 +40,7 @@ StgInt flags; StgInt rd; { IOFileObject* fo; + long fd_flags; if ((fo = malloc(sizeof(IOFileObject))) == NULL) return NULL; @@ -49,7 +50,12 @@ StgInt rd; fo->bufRPtr = 0; fo->flags = flags | FILEOBJ_STD | ( rd ? FILEOBJ_READ : FILEOBJ_WRITE); fo->connectedTo = NULL; - return fo; + + /* set the non-blocking flag on this file descriptor */ + fd_flags = fcntl(fd, F_GETFL); + fcntl(fd, F_SETFL, fd_flags | O_NONBLOCK); + + return fo; } #define OPENFILE_APPEND 0 @@ -79,19 +85,19 @@ StgInt flags; switch (how) { case OPENFILE_APPEND: - oflags = O_WRONLY | O_NOCTTY | O_APPEND; + oflags = O_NONBLOCK | O_WRONLY | O_NOCTTY | O_APPEND; for_writing = 1; break; case OPENFILE_WRITE: - oflags = O_WRONLY | O_NOCTTY; + oflags = O_NONBLOCK | O_WRONLY | O_NOCTTY; for_writing = 1; break; case OPENFILE_READ_ONLY: - oflags = O_RDONLY | O_NOCTTY; + oflags = O_NONBLOCK | O_RDONLY | O_NOCTTY; for_writing = 0; break; case OPENFILE_READ_WRITE: - oflags = O_RDWR | O_NOCTTY; + oflags = O_NONBLOCK | O_RDWR | O_NOCTTY; for_writing = 1; break; default: diff --git a/ghc/lib/std/cbits/readFile.c b/ghc/lib/std/cbits/readFile.c index 7445b3e..c47b56c 100644 --- a/ghc/lib/std/cbits/readFile.c +++ b/ghc/lib/std/cbits/readFile.c @@ -1,7 +1,7 @@ /* * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998 * - * $Id: readFile.c,v 1.7 1999/07/12 10:43:13 sof Exp $ + * $Id: readFile.c,v 1.8 1999/09/16 13:14:43 simonmar Exp $ * * hGetContents Runtime Support */ @@ -77,9 +77,6 @@ StgForeignPtr ptr; fprintf(stderr, "rb: %d %d %d\n", fo->bufRPtr, fo->bufWPtr, fo->bufSize); #endif - if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady (ptr,0) != 1 ) - return FILEOBJ_BLOCKED_READ; - while ((count = ( #ifdef USE_WINSOCK @@ -170,9 +167,6 @@ StgInt len; p += count; total_count = count; - if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady (ptr,0) != 1 ) - return FILEOBJ_BLOCKED_READ; - while ((count = ( #ifdef USE_WINSOCK @@ -317,9 +311,6 @@ StgForeignPtr ptr; } fo->flags = (fo->flags & ~FILEOBJ_RW_WRITE) | FILEOBJ_RW_READ; - if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady (ptr,0) != 1 ) - return FILEOBJ_BLOCKED_READ; - while ( (count = ( #ifdef USE_WINSOCK diff --git a/ghc/lib/std/cbits/writeFile.c b/ghc/lib/std/cbits/writeFile.c index e69442d..0b459f3 100644 --- a/ghc/lib/std/cbits/writeFile.c +++ b/ghc/lib/std/cbits/writeFile.c @@ -1,7 +1,7 @@ /* * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998 * - * $Id: writeFile.c,v 1.7 1999/09/12 19:18:22 sof Exp $ + * $Id: writeFile.c,v 1.8 1999/09/16 13:14:43 simonmar Exp $ * * hPutStr Runtime Support */ @@ -53,9 +53,6 @@ StgInt bytes; if (bytes == 0 || fo->buf == NULL) return 0; - if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady(ptr,0) != 1 ) - return FILEOBJ_BLOCKED_WRITE; - while ((count = ( #ifdef USE_WINSOCK @@ -121,10 +118,6 @@ StgInt len; return rc; } - if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady(ptr,0) != 1 ) - return FILEOBJ_BLOCKED_WRITE; - - /* Disallow short writes */ while ((count = ( #ifdef USE_WINSOCK @@ -134,13 +127,18 @@ StgInt len; #else write(fo->fd, pBuf, (int)len))) < len ) { #endif - if (errno != EINTR) { + if ( count >= 0 ) { + len -= count; + pBuf += count; + continue; + } else if ( errno == EAGAIN ) { + errno = 0; + return FILEOBJ_BLOCKED_WRITE; + } else if ( errno != EINTR ) { cvtErrno(); stdErrno(); return -1; } - len -= count; - pBuf += count; } return 0; -- 1.7.10.4