[project @ 1999-09-16 13:14:38 by simonmar]
authorsimonmar <unknown>
Thu, 16 Sep 1999 13:14:43 +0000 (13:14 +0000)
committersimonmar <unknown>
Thu, 16 Sep 1999 13:14:43 +0000 (13:14 +0000)
Cleanup of non-blocking I/O

- file descriptors are now always set to non-blocking mode.

- we don't do an inputReady operation on descriptors before
  attempting to read from them any more.

- the non-blocking flag on Handles has gone.

- the {set,clear}[Conn]NonBlockingFlag() functions have gone.

- the socket operations have been made to work properly with threads:
  accept is now non-blocking (it does a threadWaitRead instead of
  blocking), and the file descriptors returned by accept are set to
  non-blocking mode.

Win32 will need some adjustments, no doubt.

ghc/lib/misc/SocketPrim.lhs
ghc/lib/misc/cbits/acceptSocket.c
ghc/lib/misc/cbits/createSocket.c
ghc/lib/std/PrelHandle.lhs
ghc/lib/std/cbits/fileObject.c
ghc/lib/std/cbits/fileObject.h
ghc/lib/std/cbits/filePutc.c
ghc/lib/std/cbits/openFile.c
ghc/lib/std/cbits/readFile.c
ghc/lib/std/cbits/writeFile.c

index 809cd85..1eb4111 100644 (file)
@@ -87,6 +87,7 @@ import Ix
 import Weak        ( addForeignFinalizer )
 import PrelIOBase  -- IOError, Handle representation
 import PrelHandle
+import PrelConc            ( threadWaitRead )
 import Foreign
 import Addr        ( nullAddr )
 
@@ -373,14 +374,26 @@ accept sock@(MkSocket s family stype protocol status) = do
      (ptr, sz) <- allocSockAddr family
      int_star <- stToIO (newIntArray ((0::Int),1))
      stToIO (writeIntArray int_star 0 sz)
+     new_sock <- accept_socket s ptr int_star
+     a_sz <- stToIO (readIntArray int_star 0)
+     addr <- unpackSockAddr ptr a_sz
+     new_status <- newIORef Connected
+     return ((MkSocket new_sock family stype protocol new_status), addr)
+
+accept_socket :: Int 
+       -> MutableByteArray RealWorld Int
+       -> MutableByteArray RealWorld Int
+       -> IO Int
+
+accept_socket s ptr int_star = do
      new_sock <- _ccall_ acceptSocket s ptr int_star
      case (new_sock::Int) of
          -1 -> constructErrorAndFail "accept"
-         _  -> do
-               a_sz <- stToIO (readIntArray int_star 0)
-               addr <- unpackSockAddr ptr a_sz
-               new_status <- newIORef Connected
-               return ((MkSocket new_sock family stype protocol new_status), addr)
+
+               -- wait if there are no pending connections
+         -5 -> threadWaitRead s >> accept_socket s ptr int_star
+
+         _  -> return new_sock
 \end{code}
 
 %************************************************************************
index efd13b9..3995795 100644 (file)
@@ -16,37 +16,47 @@ StgInt
 acceptSocket(I_ sockfd, A_ peer, A_ addrlen)
 {
     StgInt fd;
-    
+    long flags;
+
     while ((fd = accept((int)sockfd, (struct sockaddr *)peer, (int *)addrlen)) < 0) {
-      if (errno != EINTR) {
-         cvtErrno();
-         switch (ghc_errno) {
-         default:
-             stdErrno();
-             break;
-         case GHC_EBADF:
-                     ghc_errtype = ERR_INVALIDARGUMENT;
-              ghc_errstr  = "Not a valid descriptor";
-             break;
-         case GHC_EFAULT:
-                     ghc_errtype = ERR_INVALIDARGUMENT;
-              ghc_errstr  = "Address not in writeable part of user address space";
-             break;
-         case GHC_ENOTSOCK:
-             ghc_errtype = ERR_INVALIDARGUMENT;
-             ghc_errstr  = "Descriptor not a socket";
-             break;
-         case GHC_EOPNOTSUPP:
-             ghc_errtype = ERR_INVALIDARGUMENT;
-             ghc_errstr  = "Socket not of type that supports listen";
-             break;
-         case GHC_EWOULDBLOCK:
-             ghc_errtype = ERR_OTHERERROR;
-             ghc_errstr  = "No sockets are present to be accepted";
-             break;
-         }
-         return -1;
+      if (errno == EAGAIN) {
+       errno = 0;
+       return FILEOBJ_BLOCKED_READ;
+
+      } else if (errno != EINTR) {
+       cvtErrno();
+       switch (ghc_errno) {
+       default:
+         stdErrno();
+         break;
+       case GHC_EBADF:
+         ghc_errtype = ERR_INVALIDARGUMENT;
+         ghc_errstr  = "Not a valid descriptor";
+         break;
+       case GHC_EFAULT:
+         ghc_errtype = ERR_INVALIDARGUMENT;
+         ghc_errstr  = "Address not in writeable part of user address space";
+         break;
+       case GHC_ENOTSOCK:
+         ghc_errtype = ERR_INVALIDARGUMENT;
+         ghc_errstr  = "Descriptor not a socket";
+         break;
+       case GHC_EOPNOTSUPP:
+         ghc_errtype = ERR_INVALIDARGUMENT;
+         ghc_errstr  = "Socket not of type that supports listen";
+         break;
+       case GHC_EWOULDBLOCK:
+         ghc_errtype = ERR_OTHERERROR;
+         ghc_errstr  = "No sockets are present to be accepted";
+         break;
+       }
+       return -1;
       }
     }
+
+    /* set the non-blocking flag on this file descriptor */
+    flags = fcntl(fd, F_GETFL);
+    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+
     return fd;
 }
index 8b30d72..297fcb2 100644 (file)
@@ -16,6 +16,7 @@ StgInt
 createSocket(I_ family, I_ type, I_ protocol)
 {
     int fd;
+    long flags;
 
     if ((fd = socket((int)family, (int)type, (int)protocol)) < 0) {
       if (errno != EINTR) {
@@ -48,5 +49,10 @@ createSocket(I_ family, I_ type, I_ protocol)
          return (StgInt)-1;
       }
     }
+
+    /* set the non-blocking flag on this file descriptor */
+    flags = fcntl(fd, F_GETFL);
+    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+
     return (StgInt)fd;
 }
index 5085b9c..ba3cc2c 100644 (file)
@@ -31,7 +31,7 @@ import PrelPack         ( packString )
 import PrelWeak                ( addForeignFinalizer )
 import Ix
 
-#if __CONCURRENT_HASKELL__
+#ifdef __CONCURRENT_HASKELL__
 import PrelConc
 #endif
 
@@ -222,7 +222,7 @@ stdout = unsafePerformIO (do
                                     (0::Int){-writeable-}  -- ConcHask: SAFE, won't block
 #else
            fo <- CCALL(openStdFile) (1::Int)
-                                    ((1{-flush on close-} + 128 {- don't block on I/O-})::Int)
+                                    ((1{-flush on close-})::Int)
                                     (0::Int){-writeable-}  -- ConcHask: SAFE, won't block
 #endif
 
@@ -256,7 +256,7 @@ stdin = unsafePerformIO (do
                                     (1::Int){-readable-}  -- ConcHask: SAFE, won't block
 #else
            fo <- CCALL(openStdFile) (0::Int)
-                                    ((0{-flush on close-} + 128 {- don't block on I/O-})::Int)
+                                    ((0{-flush on close-})::Int)
                                     (1::Int){-readable-}  -- ConcHask: SAFE, won't block
 #endif
 
@@ -288,7 +288,7 @@ stderr = unsafePerformIO (do
                                     (0::Int){-writeable-} -- ConcHask: SAFE, won't block
 #else
            fo <- CCALL(openStdFile) (2::Int)
-                                    ((1{-flush on close-} + 128 {- don't block on I/O-})::Int)
+                                    ((1{-flush on close-})::Int)
                                     (0::Int){-writeable-} -- ConcHask: SAFE, won't block
 #endif
 
@@ -348,15 +348,7 @@ openFileEx f m = do
         BinaryMode bmo -> (bmo, 1)
        TextMode tmo   -> (tmo, 0)
 
-#ifndef __CONCURRENT_HASKELL__
-    file_flags = file_flags'
-#else
-       -- See comment next to 'stderr' for why we leave
-       -- non-blocking off for now.
-    file_flags = file_flags' + 128  -- Don't block on I/O
-#endif
-
-    (file_flags', file_mode) =
+    (file_flags, file_mode) =
       case imo of
            AppendMode    -> (1, 0)
            WriteMode     -> (1, 1)
@@ -1164,21 +1156,16 @@ mayBlock fo act = do
      -5 -> do  -- (possibly blocking) read
         fd <- CCALL(getFileFd) fo
         threadWaitRead fd
-        CCALL(clearNonBlockingIOFlag__) fo  -- force read to happen this time.
        mayBlock fo act  -- input available, re-try
      -6 -> do  -- (possibly blocking) write
         fd <- CCALL(getFileFd) fo
         threadWaitWrite fd
-        CCALL(clearNonBlockingIOFlag__) fo  -- force write to happen this time.
        mayBlock fo act  -- output possible
      -7 -> do  -- (possibly blocking) write on connected handle
         fd <- CCALL(getConnFileFd) fo
         threadWaitWrite fd
-        CCALL(clearConnNonBlockingIOFlag__) fo  -- force write to happen this time.
        mayBlock fo act  -- output possible
      _ -> do
-       CCALL(setNonBlockingIOFlag__) fo      -- reset file object.
-       CCALL(setConnNonBlockingIOFlag__) fo  -- reset (connected) file object.
         return rc
 \end{code}
 
@@ -1247,11 +1234,6 @@ foreign import ccall "libHS_cbits.so" "freeFileObject"        unsafe prim_freeFi
 foreign import ccall "libHS_cbits.so" "freeStdFileObject"     unsafe prim_freeStdFileObject :: FILE_OBJ -> IO ()
 foreign import ccall "libHS_cbits.so" "const_BUFSIZ"          unsafe const_BUFSIZ          :: Int
 
-foreign import ccall "libHS_cbits.so" "setConnNonBlockingIOFlag__"   unsafe prim_setConnNonBlockingIOFlag__   :: FILE_OBJ -> IO ()
-foreign import ccall "libHS_cbits.so" "clearConnNonBlockingIOFlag__" unsafe prim_clearConnNonBlockingIOFlag__ :: FILE_OBJ -> IO ()
-foreign import ccall "libHS_cbits.so" "setNonBlockingIOFlag__"       unsafe prim_setNonBlockingIOFlag__       :: FILE_OBJ -> IO ()
-foreign import ccall "libHS_cbits.so" "clearNonBlockingIOFlag__"     unsafe prim_clearNonBlockingIOFlag__     :: FILE_OBJ -> IO ()
-
 foreign import ccall "libHS_cbits.so" "getErrStr__"  unsafe prim_getErrStr__  :: IO Addr 
 foreign import ccall "libHS_cbits.so" "getErrNo__"   unsafe prim_getErrNo__   :: IO Int  
 foreign import ccall "libHS_cbits.so" "getErrType__" unsafe prim_getErrType__ :: IO Int  
index d209f66..b8b009d 100644 (file)
@@ -1,7 +1,7 @@
 /* 
  * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998
  *
- * $Id: fileObject.c,v 1.5 1999/07/12 10:43:12 sof Exp $
+ * $Id: fileObject.c,v 1.6 1999/09/16 13:14:42 simonmar Exp $
  *
  * hPutStr Runtime Support
  */
@@ -136,32 +136,6 @@ StgInt
 getPushbackBufSize()
 { return (__pushback_buf_size__); }
 
-void
-clearNonBlockingIOFlag__ (ptr)
-StgForeignPtr ptr;
-{ ((IOFileObject*)ptr)->flags &= ~FILEOBJ_NONBLOCKING_IO; }
-
-void
-setNonBlockingIOFlag__ (ptr)
-StgForeignPtr ptr;
-{ ((IOFileObject*)ptr)->flags |= FILEOBJ_NONBLOCKING_IO; }
-
-void
-clearConnNonBlockingIOFlag__ (ptr)
-StgForeignPtr ptr;
-{ ((IOFileObject*)ptr)->connectedTo->flags &= ~FILEOBJ_NONBLOCKING_IO; }
-
-void
-setConnNonBlockingIOFlag__ (ptr)
-StgForeignPtr ptr;
-{ 
-  if ( ((IOFileObject*)ptr)->connectedTo != NULL )  {
-    ((IOFileObject*)ptr)->connectedTo->flags |= FILEOBJ_NONBLOCKING_IO;
-  }
-  return;
-}
-
-
 /* Only ever called on line-buffered file objects */
 StgInt
 fill_up_line_buffer(fo)
@@ -180,9 +154,6 @@ IOFileObject* fo;
   len = fo->bufSize - fo->bufWPtr + 1;
   p   = (unsigned char*)fo->buf + fo->bufWPtr;
 
-  if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady ((StgForeignPtr)fo,0) != 1 )
-     return FILEOBJ_BLOCKED_READ;
-
   if ((count = 
          (
 #ifdef USE_WINSOCK
index 886373f..0da85ac 100644 (file)
@@ -40,7 +40,6 @@ typedef struct _IOFileObject {
 #define FILEOBJ_READ    16
 #define FILEOBJ_WRITE   32
 #define FILEOBJ_STD     64
-#define FILEOBJ_NONBLOCKING_IO 128
 /* The next two flags are used for RW file objects only.
    They indicate whether the last operation was a read or a write.
    (Need this info to determine whether a RW file object's
index b205116..b48f9fe 100644 (file)
@@ -1,7 +1,7 @@
 /* 
  * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998
  *
- * $Id: filePutc.c,v 1.7 1999/07/12 10:43:13 sof Exp $
+ * $Id: filePutc.c,v 1.8 1999/09/16 13:14:43 simonmar Exp $
  *
  * hPutChar Runtime Support
  */
@@ -78,23 +78,25 @@ StgChar c;
       return rc;
     }
 
-    if ( fo->flags & FILEOBJ_NONBLOCKING_IO )
-      return FILEOBJ_BLOCKED_WRITE;
-
     /* Unbuffered, write the character directly. */
-    while ((rc = (
+    while (rc = (
 #ifdef USE_WINSOCK
                 fo->flags & FILEOBJ_WINSOCK ?
                 send(fo->fd, &c, 1, 0) :
-                write(fo->fd, &c, 1))) == 0 && errno == EINTR) ;
+                write(fo->fd, &c, 1)) <= 0) {
 #else
-                write(fo->fd, &c, 1))) == 0 && errno == EINTR) ;
+                write(fo->fd, &c, 1)) <= 0) {
 #endif
-    if (rc == 0) {
-       cvtErrno();
-       stdErrno();
-       return -1;
+
+        if ( rc == -1 && errno == EAGAIN) {
+           errno = 0;
+           return FILEOBJ_BLOCKED_WRITE;
+       } else if (rc == 0 || rc == -1 && errno != EINTR) {
+           cvtErrno();
+           stdErrno();
+           return -1;
+       }
     }
-    return 0;
 
+    return 0;
 }
index 7d3b217..5f24491 100644 (file)
@@ -1,7 +1,7 @@
 /* 
  * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998
  *
- * $Id: openFile.c,v 1.6 1999/02/04 12:13:15 sof Exp $
+ * $Id: openFile.c,v 1.7 1999/09/16 13:14:43 simonmar Exp $
  *
  * openFile Runtime Support
  */
@@ -40,6 +40,7 @@ StgInt flags;
 StgInt rd;
 {
     IOFileObject* fo;
+    long fd_flags;
 
     if ((fo = malloc(sizeof(IOFileObject))) == NULL)
        return NULL;
@@ -49,7 +50,12 @@ StgInt rd;
     fo->bufRPtr  = 0;
     fo->flags   = flags | FILEOBJ_STD | ( rd ? FILEOBJ_READ : FILEOBJ_WRITE);
     fo->connectedTo = NULL;
-    return fo;
+    /* set the non-blocking flag on this file descriptor */
+    fd_flags = fcntl(fd, F_GETFL);
+    fcntl(fd, F_SETFL, fd_flags | O_NONBLOCK);
+
+   return fo;
 }
 
 #define OPENFILE_APPEND 0
@@ -79,19 +85,19 @@ StgInt flags;
 
     switch (how) {
       case OPENFILE_APPEND:
-        oflags = O_WRONLY | O_NOCTTY | O_APPEND; 
+        oflags = O_NONBLOCK | O_WRONLY | O_NOCTTY | O_APPEND; 
        for_writing = 1;
        break;
       case OPENFILE_WRITE:
-       oflags = O_WRONLY | O_NOCTTY;
+       oflags = O_NONBLOCK | O_WRONLY | O_NOCTTY;
        for_writing = 1;
        break;
     case OPENFILE_READ_ONLY:
-        oflags = O_RDONLY | O_NOCTTY;
+        oflags = O_NONBLOCK | O_RDONLY | O_NOCTTY;
        for_writing = 0;
        break;
     case OPENFILE_READ_WRITE:
-       oflags = O_RDWR | O_NOCTTY;
+       oflags = O_NONBLOCK | O_RDWR | O_NOCTTY;
        for_writing = 1;
        break;
     default:
index 7445b3e..c47b56c 100644 (file)
@@ -1,7 +1,7 @@
 /* 
  * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998
  *
- * $Id: readFile.c,v 1.7 1999/07/12 10:43:13 sof Exp $
+ * $Id: readFile.c,v 1.8 1999/09/16 13:14:43 simonmar Exp $
  *
  * hGetContents Runtime Support
  */
@@ -77,9 +77,6 @@ StgForeignPtr ptr;
     fprintf(stderr, "rb: %d %d %d\n", fo->bufRPtr, fo->bufWPtr, fo->bufSize);
 #endif
 
-    if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady (ptr,0) != 1 )
-      return FILEOBJ_BLOCKED_READ;
-
     while ((count =
             (
 #ifdef USE_WINSOCK
@@ -170,9 +167,6 @@ StgInt len;
     p += count;
     total_count = count;
 
-    if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady (ptr,0) != 1 )
-      return FILEOBJ_BLOCKED_READ;
-
     while ((count =
              (
 #ifdef USE_WINSOCK
@@ -317,9 +311,6 @@ StgForeignPtr ptr;
     }
     fo->flags = (fo->flags & ~FILEOBJ_RW_WRITE) | FILEOBJ_RW_READ;
 
-    if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady (ptr,0) != 1 )
-      return FILEOBJ_BLOCKED_READ;
-
     while ( (count = 
               (
 #ifdef USE_WINSOCK
index e69442d..0b459f3 100644 (file)
@@ -1,7 +1,7 @@
 /* 
  * (c) The GRASP/AQUA Project, Glasgow University, 1994-1998
  *
- * $Id: writeFile.c,v 1.7 1999/09/12 19:18:22 sof Exp $
+ * $Id: writeFile.c,v 1.8 1999/09/16 13:14:43 simonmar Exp $
  *
  * hPutStr Runtime Support
  */
@@ -53,9 +53,6 @@ StgInt bytes;
     if (bytes == 0  || fo->buf == NULL)
        return 0;
 
-    if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady(ptr,0) != 1 )
-       return FILEOBJ_BLOCKED_WRITE;
-
     while ((count = 
               (
 #ifdef USE_WINSOCK
@@ -121,10 +118,6 @@ StgInt  len;
        return rc;
     }
 
-    if ( fo->flags & FILEOBJ_NONBLOCKING_IO && inputReady(ptr,0) != 1 )
-       return FILEOBJ_BLOCKED_WRITE;
-
-    /* Disallow short writes */
     while ((count = 
                (
 #ifdef USE_WINSOCK
@@ -134,13 +127,18 @@ StgInt  len;
 #else
                 write(fo->fd, pBuf, (int)len))) < len ) {
 #endif
-       if (errno != EINTR) {
+        if ( count >= 0 ) {
+            len -= count;
+           pBuf += count;
+           continue;
+       } else if ( errno == EAGAIN ) {
+           errno = 0;
+           return FILEOBJ_BLOCKED_WRITE;
+       } else if ( errno != EINTR ) {
            cvtErrno();
            stdErrno();
            return -1;
        }
-       len  -= count;
-       pBuf += count;
     }
 
     return 0;