[project @ 2003-02-21 05:34:12 by sof]
authorsof <unknown>
Fri, 21 Feb 2003 05:34:17 +0000 (05:34 +0000)
committersof <unknown>
Fri, 21 Feb 2003 05:34:17 +0000 (05:34 +0000)
Asynchronous / non-blocking I/O for Win32 platforms.

This commit introduces a Concurrent Haskell friendly view of I/O on
Win32 platforms. Through the use of a pool of worker Win32 threads, CH
threads may issue asynchronous I/O requests without blocking the
progress of other CH threads. The issuing CH thread is blocked until
the request has been serviced though.

GHC.Conc exports the primops that take care of issuing the
asynchronous I/O requests, which the IO implementation now takes
advantage of. By default, all Handles are non-blocking/asynchronous,
but should performance become an issue, having a per-Handle flag for
turning off non-blocking could easily be imagined&introduced.

[Incidentally, this thread pool-based implementation could easily be
extended to also allow Haskell code to delegate the execution of
arbitrary pieces of (potentially blocking) external code to another OS
thread. Given how relatively gnarly the locking story has turned out
to be with the 'threaded' RTS, that may not be such a bad idea.]

16 files changed:
ghc/compiler/prelude/primops.txt.pp
ghc/includes/PrimOps.h
ghc/includes/StgMiscClosures.h
ghc/includes/TSO.h
ghc/rts/HeapStackCheck.hc
ghc/rts/Linker.c
ghc/rts/Makefile
ghc/rts/PrimOps.hc
ghc/rts/RtsStartup.c
ghc/rts/Select.c
ghc/rts/win32/AsyncIO.c [new file with mode: 0644]
ghc/rts/win32/AsyncIO.h [new file with mode: 0644]
ghc/rts/win32/IOManager.c [new file with mode: 0644]
ghc/rts/win32/IOManager.h [new file with mode: 0644]
ghc/rts/win32/WorkQueue.c [new file with mode: 0644]
ghc/rts/win32/WorkQueue.h [new file with mode: 0644]

index a361c0c..5b60feb 100644 (file)
@@ -1,5 +1,5 @@
 -----------------------------------------------------------------------
--- $Id: primops.txt.pp,v 1.24 2003/02/04 12:40:00 simonpj Exp $
+-- $Id: primops.txt.pp,v 1.25 2003/02/21 05:34:14 sof Exp $
 --
 -- Primitive Operations
 --
@@ -1415,6 +1415,24 @@ primop  WaitWriteOp "waitWrite#" GenPrimOp
    has_side_effects = True
    out_of_line      = True
 
+#ifdef mingw32_TARGET_OS
+primop  AsyncReadOp "asyncRead#" GenPrimOp
+   Int# -> Int# -> Int# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
+   {Asynchronously read bytes from specified file descriptor.}
+   with
+   needs_wrapper    = True
+   has_side_effects = True
+   out_of_line      = True
+
+primop  AsyncWriteOp "asyncWrite#" GenPrimOp
+   Int# -> Int# -> Int# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
+   {Asynchronously write bytes from specified file descriptor.}
+   with
+   needs_wrapper    = True
+   has_side_effects = True
+   out_of_line      = True
+#endif
+
 ------------------------------------------------------------------------
 section "Concurrency primitives"
        {(In a non-concurrent implementation, ThreadId\# can be as singleton
index 0e4ee74..40b1959 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.99 2002/10/22 11:01:18 simonmar Exp $
+ * $Id: PrimOps.h,v 1.100 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -241,6 +241,10 @@ EXTFUN_RTS(tryPutMVarzh_fast);
 EXTFUN_RTS(waitReadzh_fast);
 EXTFUN_RTS(waitWritezh_fast);
 EXTFUN_RTS(delayzh_fast);
+#ifdef mingw32_TARGET_OS
+EXTFUN_RTS(asyncReadzh_fast);
+EXTFUN_RTS(asyncWritezh_fast);
+#endif
 
 
 /* -----------------------------------------------------------------------------
index 8ce8b4d..a038445 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: StgMiscClosures.h,v 1.45 2003/01/07 09:29:24 simonmar Exp $
+ * $Id: StgMiscClosures.h,v 1.46 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 1998-2002
  *
@@ -282,4 +282,6 @@ EF_(stg_block_noregs);
 EF_(stg_block_1);
 EF_(stg_block_takemvar);
 EF_(stg_block_putmvar);
-
+#ifdef mingw32_TARGET_OS
+EF_(stg_block_async);
+#endif
index e664f9c..c99a7cd 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.29 2003/01/25 15:54:48 wolfgang Exp $
+ * $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -150,10 +150,22 @@ typedef enum {
 #endif
 } StgTSOBlockReason;
 
+#ifdef mingw32_TARGET_OS
+/* results from an async I/O request + it's ID. */
+typedef struct {
+  unsigned int reqID;
+  int          len;
+  int          errCode;
+} StgAsyncIOResult;
+#endif
+
 typedef union {
   StgClosure *closure;
   struct StgTSO_ *tso;
   int fd;
+#ifdef mingw32_TARGET_OS
+  StgAsyncIOResult* async_result;
+#endif
   unsigned int target;
 } StgTSOBlockInfo;
 
index d3e6661..20ae42a 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: HeapStackCheck.hc,v 1.27 2002/12/11 15:36:42 simonmar Exp $
+ * $Id: HeapStackCheck.hc,v 1.28 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 1998-2002
  *
@@ -1020,3 +1020,36 @@ FN_(stg_block_putmvar)
   BLOCK_GENERIC;
   FE_
 }
+
+#ifdef mingw32_TARGET_OS
+INFO_TABLE_RET( stg_block_async_info,  stg_block_async_ret,
+               MK_SMALL_BITMAP(0/*framesize*/, 0/*bitmap*/),
+               0/*SRT*/, 0/*SRT_OFF*/, 0/*SRT_LEN*/, 
+               RET_SMALL,, IF_, 0, 0);
+
+IF_(stg_block_async_ret)
+{
+  StgAsyncIOResult* ares;
+  int len,errC;
+  FB_
+  ares = CurrentTSO->block_info.async_result;
+  len  = ares->len;
+  errC = ares->errCode;
+  CurrentTSO->block_info.async_result = NULL;
+  STGCALL1(free,ares);
+  R1.w = len;
+  *Sp = (W_)errC;
+  JMP_(ENTRY_CODE(Sp[1]));
+  FE_
+}
+
+FN_(stg_block_async)
+{
+  FB_
+  Sp -= 1;
+  Sp[0] = (W_)&stg_block_async_info;
+  BLOCK_GENERIC;
+  FE_
+}
+
+#endif
index 7253fdf..bccb5f8 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Linker.c,v 1.114 2003/02/10 23:35:03 wolfgang Exp $
+ * $Id: Linker.c,v 1.115 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 2000, 2001
  *
@@ -221,6 +221,8 @@ typedef struct _RtsSymbolVal {
 /* These are statically linked from the mingw libraries into the ghc
    executable, so we have to employ this hack. */
 #define RTS_MINGW_ONLY_SYMBOLS                  \
+      SymX(asyncReadzh_fast)                   \
+      SymX(asyncWritezh_fast)                  \
       SymX(memset)                              \
       SymX(inet_ntoa)                           \
       SymX(inet_addr)                           \
index 5281d13..ee75fee 100644 (file)
@@ -36,6 +36,10 @@ NON_HS_PACKAGE = YES
 # grab sources from these subdirectories
 ALL_DIRS = hooks parallel
 
+ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
+ALL_DIRS += win32
+endif
+
 ifneq "$(DLLized)" "YES"
 EXCLUDED_SRCS += RtsDllMain.c
 else
@@ -82,6 +86,10 @@ STANDARD_OPTS += -I../includes -I. -Iparallel
 # COMPILING_RTS is only used when building Win32 DLL support.
 STANDARD_OPTS += -DCOMPILING_RTS
 
+ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
+STANDARD_OPTS += -Iwin32
+endif
+
 # HC_OPTS is included in both .c and .hc compilations, whereas CC_OPTS is
 # only included in .c compilations.  HC_OPTS included the WAY_* opts, which
 # must be included in both types of compilations.
index e4ef7e7..00e35e2 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.103 2002/12/11 15:36:45 simonmar Exp $
+ * $Id: PrimOps.hc,v 1.104 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 1998-2002
  *
 
 #include <stdlib.h>
 
+#ifdef mingw32_TARGET_OS
+#include <windows.h>
+#include "win32/AsyncIO.h"
+#endif
+
 /* ** temporary **
 
    classes CCallable and CReturnable don't really exist, but the
@@ -1629,3 +1634,48 @@ FN_(delayzh_fast)
   FE_
 }
 
+#ifdef mingw32_TARGET_OS
+FN_(asyncReadzh_fast)
+{
+  StgAsyncIOResult* ares;
+  unsigned int reqID;
+  FB_
+    /* args: R1.i = fd, R2.i = isSock, R3.i = len, R4.p = buf */
+    ASSERT(CurrentTSO->why_blocked == NotBlocked);
+    CurrentTSO->why_blocked = BlockedOnRead;
+    ACQUIRE_LOCK(&sched_mutex);
+    /* could probably allocate this on the heap instead */
+    ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
+    reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p);
+    ares->reqID   = reqID;
+    ares->len     = 0;
+    ares->errCode = 0;
+    CurrentTSO->block_info.async_result = ares;
+    APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+    RELEASE_LOCK(&sched_mutex);
+    JMP_(stg_block_async);
+  FE_
+}
+
+FN_(asyncWritezh_fast)
+{
+  StgAsyncIOResult* ares;
+  unsigned int reqID;
+  FB_
+    /* args: R1.i */
+    /* args: R1.i = fd, R2.i = isSock, R3.i = len, R4.p = buf */
+    ASSERT(CurrentTSO->why_blocked == NotBlocked);
+    CurrentTSO->why_blocked = BlockedOnWrite;
+    ACQUIRE_LOCK(&sched_mutex);
+    ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
+    reqID = RET_STGCALL5(W_,addIORequest,R1.i,TRUE,R2.i,R3.i,(char*)R4.p);
+    ares->reqID   = reqID;
+    ares->len     = 0;
+    ares->errCode = 0;
+    CurrentTSO->block_info.async_result = ares;
+    APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+    RELEASE_LOCK(&sched_mutex);
+    JMP_(stg_block_async);
+  FE_
+}
+#endif
index fc7bf81..418ed6e 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: RtsStartup.c,v 1.70 2003/01/30 10:19:07 simonmar Exp $
+ * $Id: RtsStartup.c,v 1.71 2003/02/21 05:34:15 sof Exp $
  *
  * (c) The GHC Team, 1998-2002
  *
 # include "LLC.h"
 #endif
 
+#if defined(mingw32_TARGET_OS)
+#include "win32/AsyncIO.h"
+#endif
+
 #include <stdlib.h>
 
 // Flag Structure
@@ -153,6 +157,10 @@ hs_init(int *argc, char **argv[])
     initDefaultHandlers();
 #endif
  
+#if defined(mingw32_TARGET_OS)
+    startupAsyncIO();
+#endif
+
 #ifdef RTS_GTK_FRONTPANEL
     if (RtsFlags.GcFlags.frontpanel) {
        initFrontPanel();
@@ -343,6 +351,10 @@ hs_exit(void)
 #if defined(TICKY_TICKY)
     if (RtsFlags.TickyFlags.showTickyStats) PrintTickyInfo();
 #endif
+
+#if defined(mingw32_TARGET_OS)
+    shutdownAsyncIO();
+#endif
 }
 
 // Compatibility interfaces
index a2ad455..5f43ec0 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Select.c,v 1.23 2003/01/25 15:54:50 wolfgang Exp $
+ * $Id: Select.c,v 1.24 2003/02/21 05:34:16 sof Exp $
  *
  * (c) The GHC Team 1995-2002
  *
@@ -28,6 +28,7 @@
 
 # ifdef mingw32_TARGET_OS
 #  include <windows.h>
+#  include "win32/AsyncIO.h"
 # endif
 
 #include <errno.h>
@@ -235,7 +236,9 @@ awaitEvent(rtsBool wait)
 #endif
       RELEASE_LOCK(&sched_mutex);
       while (1) {
-         Sleep(0); /* don't busy wait */
+         if (!awaitRequests(wait)) {
+           Sleep(0); /* don't busy wait */
+         }
 #endif /* mingw32_TARGET_OS */
          ACQUIRE_LOCK(&sched_mutex);
 #ifdef RTS_SUPPORTS_THREADS
diff --git a/ghc/rts/win32/AsyncIO.c b/ghc/rts/win32/AsyncIO.c
new file mode 100644 (file)
index 0000000..8b15470
--- /dev/null
@@ -0,0 +1,187 @@
+/* AsyncIO.c
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "Rts.h"
+#include <windows.h>
+#include <stdio.h>
+#include "Schedule.h"
+#include "win32/AsyncIO.h"
+#include "win32/IOManager.h"
+
+/*
+ * Overview:
+ *
+ * Haskell code issue asynchronous I/O requests via the 
+ * asyncRead# and asyncWrite# primops. These cause addIORequest()
+ * to be invoked, which forwards the request to the underlying
+ * asynchronous I/O subsystem. Each request is tagged with a unique
+ * ID.
+ *
+ * addIORequest() returns this ID, so that when the blocked CH
+ * thread is added onto blocked_queue, its TSO is annotated with
+ * it. Upon completion of an I/O request, the async I/O handling
+ * code makes a back-call to signal its completion; the local
+ * onIOComplete() routine. It adds the IO request ID (along with
+ * its result data) to a queue of completed requests before returning. 
+ *
+ * The queue of completed IO request is read by the thread operating
+ * the RTS scheduler. It de-queues the CH threads corresponding
+ * to the request IDs, making them runnable again.
+ *
+ */
+
+typedef struct CompletedReq {
+  unsigned int   reqID;
+  int            len;
+  int            errCode;
+} CompletedReq;
+
+#define MAX_REQUESTS 200
+
+static CRITICAL_SECTION queue_lock;
+static HANDLE           completed_req_event;
+static CompletedReq     completedTable[MAX_REQUESTS];
+static int              completed_hw;
+static int              issued_reqs;
+
+static void
+onIOComplete(unsigned int reqID,
+            void* param STG_UNUSED,
+            int   fd STG_UNUSED,
+            int   len,
+            char* buf STG_UNUSED,
+            int   errCode)
+{
+  /* Deposit result of request in queue/table */
+  EnterCriticalSection(&queue_lock);
+  if (completed_hw == MAX_REQUESTS) {
+    /* Not likely */
+    fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
+    fflush(stderr);
+  } else {
+#if 0
+    fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
+#endif
+    completedTable[completed_hw].reqID   = reqID;
+    completedTable[completed_hw].len     = len;
+    completedTable[completed_hw].errCode = errCode;
+    completed_hw++;
+    issued_reqs--;
+    if (completed_hw == 1) {
+      /* The event is used to wake up the scheduler thread should it
+       * be blocked waiting for requests to complete. It reset once
+       * that thread has cleared out the request queue/table.
+       */
+      SetEvent(completed_req_event);
+    }
+  }
+  LeaveCriticalSection(&queue_lock);
+}
+
+unsigned int
+addIORequest(int   fd,
+            int   forWriting,
+            int   isSock,
+            int   len,
+            char* buf)
+{
+  EnterCriticalSection(&queue_lock);
+  issued_reqs++;
+  LeaveCriticalSection(&queue_lock);
+#if 0
+  fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
+#endif
+  return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
+}
+
+int
+startupAsyncIO()
+{
+  if (!StartIOManager()) {
+    return 0;
+  }
+  InitializeCriticalSection(&queue_lock);
+  completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+  completed_hw = 0;
+  return 1;
+}
+
+void
+shutdownAsyncIO()
+{
+  CloseHandle(completed_req_event);
+  ShutdownIOManager();
+}
+
+int
+awaitRequests(rtsBool wait)
+{
+start:
+#if 0
+  fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
+#endif
+  EnterCriticalSection(&queue_lock);
+  /* Nothing immediately available & we won't wait */
+  if ((!wait && completed_hw == 0) || 
+      (issued_reqs == 0 && completed_hw == 0)) {
+    LeaveCriticalSection(&queue_lock);
+    return 0;
+  }
+  if (completed_hw == 0) {
+    /* empty table, drop lock and wait */
+    LeaveCriticalSection(&queue_lock);
+    if (wait) {
+      WaitForSingleObject( completed_req_event, INFINITE );
+    } else {
+      return 0; /* cannot happen */
+    }
+    goto start;
+  } else {
+    int i;
+    StgTSO *tso, *prev;
+    
+    for (i=0; i < completed_hw; i++) {
+      unsigned int rID = completedTable[i].reqID;
+      prev = NULL;
+      for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
+       switch(tso->why_blocked) {
+       case BlockedOnRead:
+       case BlockedOnWrite:
+         if (tso->block_info.async_result->reqID == rID) {
+           /* Found the thread blocked waiting on request; stodgily fill 
+            * in its result block. 
+            */
+           tso->block_info.async_result->len = completedTable[i].len;
+           tso->block_info.async_result->errCode = completedTable[i].errCode;
+
+           /* Drop the matched TSO from blocked_queue */
+           if (prev) {
+             prev->link = tso->link;
+           } else {
+             blocked_queue_hd = tso->link;
+           }
+           if (blocked_queue_tl == tso) {
+             blocked_queue_tl = prev;
+           }
+           /* Terminates the run queue + this inner for-loop. */
+           tso->link = END_TSO_QUEUE;
+           tso->why_blocked = NotBlocked;
+           PUSH_ON_RUN_QUEUE(tso);
+           break;
+         }
+         break;
+       default:
+         break;
+       }
+       prev = tso;
+      }
+    }
+    completed_hw = 0;
+    ResetEvent(completed_req_event);
+    LeaveCriticalSection(&queue_lock);
+    return 1;
+  }
+}
diff --git a/ghc/rts/win32/AsyncIO.h b/ghc/rts/win32/AsyncIO.h
new file mode 100644 (file)
index 0000000..831f792
--- /dev/null
@@ -0,0 +1,21 @@
+/* AsyncIO.h
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ */
+#ifndef __ASYNCHIO_H__
+#define __ASYNCHIO_H__
+extern unsigned int
+addIORequest(int   fd,
+            int   forWriting,
+            int   isSock,
+            int   len,
+            char* buf);
+
+extern int  startupAsyncIO(void);
+extern void shutdownAsyncIO(void);
+
+extern int awaitRequests(rtsBool wait);
+
+#endif /* __ASYNCHIO_H__ */
diff --git a/ghc/rts/win32/IOManager.c b/ghc/rts/win32/IOManager.c
new file mode 100644 (file)
index 0000000..f9d56c6
--- /dev/null
@@ -0,0 +1,261 @@
+/* IOManager.c
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "IOManager.h"
+#include "WorkQueue.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <io.h>
+#include <winsock.h>
+#include <process.h>
+
+/*
+ * Internal state maintained by the IO manager.
+ */
+typedef struct IOManagerState {
+  CritSection      manLock;
+  WorkQueue*       workQueue;
+  int              numWorkers;
+  int              workersIdle;
+  HANDLE           hExitEvent;
+  unsigned int     requestID;
+} IOManagerState;
+
+/* ToDo: wrap up this state via a IOManager handle instead? */
+static IOManagerState* ioMan;
+
+/*
+ * The routine executed by each worker thread.
+ */
+static
+unsigned
+WINAPI
+IOWorkerProc(PVOID param)
+{
+  HANDLE  hWaits[2];
+  DWORD   rc;
+  IOManagerState* iom = (IOManagerState*)param;
+  WorkQueue* pq = iom->workQueue;
+  WorkItem*  work;
+  int        len;
+  DWORD      errCode;
+
+  hWaits[0] = (HANDLE)iom->hExitEvent;
+  hWaits[1] = GetWorkQueueHandle(pq);
+  
+  while (1) {
+    /* The error code is communicated back on completion of request; reset. */
+    errCode = 0;
+
+    EnterCriticalSection(&iom->manLock);
+    iom->workersIdle++;
+    LeaveCriticalSection(&iom->manLock);
+
+    rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
+
+    EnterCriticalSection(&iom->manLock);
+    iom->workersIdle--;
+    LeaveCriticalSection(&iom->manLock);
+    
+    if ( WAIT_OBJECT_0 == rc ) {
+      /* shutdown */
+#if 0
+      fprintf(stderr, "shutting down...\n"); fflush(stderr);
+#endif
+      return 0;
+    } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+      /* work item available, fetch it. */
+#if 0
+      fprintf(stderr, "work available...\n"); fflush(stderr);
+#endif
+      if (FetchWork(pq,(void**)&work)) {
+        if ( work->workKind & WORKER_READ ) {
+         if ( work->workKind & WORKER_FOR_SOCKET ) {
+           len = recv(work->fd, work->buf, work->len, 0);
+           if (len == SOCKET_ERROR) {
+             errCode = WSAGetLastError();
+           }
+         } else {
+           len = read(work->fd, work->buf, work->len);
+           if (len == -1) { errCode = errno; }
+         }
+       } else if ( work->workKind & WORKER_WRITE ) {
+         if ( work->workKind & WORKER_FOR_SOCKET ) {
+           len = send(work->fd, work->buf, work->len, 0);
+           if (len == SOCKET_ERROR) {
+             errCode = WSAGetLastError();
+           }
+         } else {
+           len = write(work->fd,work->buf, work->len);
+           if (len == -1) { errCode = errno; }
+         }
+       } else if ( work->workKind & WORKER_DELAY ) {
+         /* very approximate implementation of threadDelay */
+         Sleep(work->len);
+         len = work->len;
+         errCode = 0;
+       } else {
+         fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+         fflush(stderr);
+         continue;
+       }
+       work->onCompletion(work->requestID,
+                          work->param,
+                          work->fd,
+                          len,
+                          work->buf,
+                          errCode);
+       /* Free the WorkItem */
+       free(work);
+      } else {
+         fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+         return 1;
+      }
+    } else {
+         fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+         return 1;
+    }
+  }
+  return 0;
+}
+
+static 
+BOOL
+NewIOWorkerThread(IOManagerState* iom)
+{
+  return ( 0 != _beginthreadex(NULL,
+                              0,
+                              IOWorkerProc,
+                              (LPVOID)iom,
+                              0,
+                              NULL) );
+  //CreateThread( NULL, 0, IOWorkerProc, (LPVOID)iom, 0, NULL));
+}
+
+BOOL
+StartIOManager(void)
+{
+  HANDLE hExit;
+  WorkQueue* wq;
+
+  wq = NewWorkQueue();
+  if ( !wq ) return FALSE;  
+  
+  ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+  
+  if (!ioMan) {
+    FreeWorkQueue(wq);
+    return FALSE;
+  }
+
+  /* A manual-reset event */
+  hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+  if ( !hExit ) {
+    FreeWorkQueue(wq);
+    free(ioMan);
+    return FALSE;
+  }
+  
+  ioMan->hExitEvent = hExit;
+  InitializeCriticalSection(&ioMan->manLock);
+  ioMan->workQueue   = wq;
+  ioMan->numWorkers  = 0;
+  ioMan->workersIdle = 0;
+  ioMan->requestID   = 1;
+  return TRUE;
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, returning without blocking.
+ */
+int
+AddIORequest ( int   fd,
+              BOOL  forWriting,
+              BOOL  isSocket,
+              int   len,
+              char* buffer,
+              void* data,
+              CompletionProc onCompletion)
+{
+  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+  if (!ioMan || !wItem) return 0;
+  
+  /* Fill in the blanks */
+  wItem->fd           = fd;
+  wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
+                        ( forWriting ? WORKER_WRITE : WORKER_READ );
+  wItem->len          = len;
+  wItem->buf          = buffer;
+  wItem->param        = data;
+  wItem->onCompletion = onCompletion;
+  wItem->requestID    = ioMan->requestID++;
+  
+  EnterCriticalSection(&ioMan->manLock);
+  /* If there are no worker threads available, create one.
+   *
+   * If this turns out to be too aggressive a policy, refine.
+   */
+#if 0
+  fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
+#endif
+  if ( ioMan->workersIdle == 0 ) {
+    ioMan->numWorkers++;
+    NewIOWorkerThread(ioMan);
+  }
+  LeaveCriticalSection(&ioMan->manLock);
+  
+  if (SubmitWork(ioMan->workQueue,wItem)) {
+    return wItem->requestID;
+  } else {
+    return 0;
+  }
+}             
+
+/*
+ * Function: AddDelayRequest()
+ *
+ */
+BOOL
+AddDelayRequest ( unsigned int   msecs,
+                 void*          data,
+                 CompletionProc onCompletion)
+{
+  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+  if (!ioMan || !wItem) return FALSE;
+  
+  /* Fill in the blanks */
+  wItem->fd           = 0;
+  wItem->workKind     = WORKER_DELAY;
+  wItem->len          = msecs;
+  wItem->buf          = 0;
+  wItem->param        = data;
+  wItem->onCompletion = onCompletion;
+  wItem->requestID    = ioMan->requestID++;
+
+  EnterCriticalSection(&ioMan->manLock);
+  if ( ioMan->workersIdle == 0 ) {
+    ioMan->numWorkers++;
+    NewIOWorkerThread(ioMan);
+  }
+  LeaveCriticalSection(&ioMan->manLock);
+  
+  if (SubmitWork(ioMan->workQueue,wItem)) {
+    return wItem->requestID;
+  } else {
+    return 0;
+  }
+}
+
+void ShutdownIOManager()
+{
+  SetEvent(ioMan->hExitEvent);
+  free(ioMan);
+  ioMan = NULL;
+}
diff --git a/ghc/rts/win32/IOManager.h b/ghc/rts/win32/IOManager.h
new file mode 100644 (file)
index 0000000..3543a41
--- /dev/null
@@ -0,0 +1,85 @@
+/* IOManager.h
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003
+ */
+#ifndef __IOMANAGER_H__
+#define __IOMANAGER_H__
+/* On the yucky side..suppress -Wmissing-declarations warnings when
+ * including <windows.h>
+ */
+extern void* GetCurrentFiber ( void );
+extern void* GetFiberData ( void );
+#include <windows.h>
+
+/*
+ The IOManager subsystem provides a non-blocking view
+ of I/O operations. It lets one (or more) OS thread(s)
+ issue multiple I/O requests, which the IOManager then 
+ handles independently of/concurrent to the thread(s)
+ that issued the request. Upon completion, the issuing
+ thread can inspect the result of the I/O operation &
+ take appropriate action.
+
+ The IOManager is intended used with the GHC RTS to
+ implement non-blocking I/O in Concurrent Haskell.
+ */
+
+/*
+ * Our WorkQueue holds WorkItems, encoding IO and
+ * delay requests.
+ *
+ */
+typedef void (*CompletionProc)(unsigned int requestID,
+                              void* param,
+                              int   fd,
+                              int   len,
+                              char* buf,
+                              int   errCode);
+
+typedef struct WorkItem {
+  unsigned int   workKind;
+  int            fd;
+  int            len;
+  char*          buf;
+  void*          param;
+  unsigned int   requestID;
+  CompletionProc onCompletion;
+} WorkItem;
+
+extern CompletionProc onComplete;
+
+/* the kind of operations supported; you could easily imagine
+ * that instead of passing a tag describing the work to be performed,
+ * a function pointer is passed instead. Maybe later.
+ */
+#define WORKER_READ       1
+#define WORKER_WRITE      2
+#define WORKER_DELAY      4
+#define WORKER_FOR_SOCKET 8
+
+/*
+ * Starting up and shutting down. 
+ */ 
+extern BOOL StartIOManager     ( void );
+extern void ShutdownIOManager  ( void );
+
+/*
+ * Adding I/O and delay requests. With each request a
+ * completion routine is supplied, which the worker thread
+ * will invoke upon completion.
+ */
+extern int AddDelayRequest ( unsigned int   msecs,
+                            void*          data,
+                            CompletionProc onCompletion);
+
+extern int AddIORequest ( int            fd,
+                         BOOL           forWriting,
+                         BOOL           isSocket,
+                         int            len,
+                         char*          buffer,
+                         void*          data,
+                         CompletionProc onCompletion);
+
+#endif /* __IOMANAGER_H__ */
diff --git a/ghc/rts/win32/WorkQueue.c b/ghc/rts/win32/WorkQueue.c
new file mode 100644 (file)
index 0000000..6d3a358
--- /dev/null
@@ -0,0 +1,215 @@
+/*
+ * A fixed-size queue; MT-friendly.
+ * 
+ * (c) sof, 2002-2003.
+ */
+#include "WorkQueue.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+static void queue_error_rc( char* loc, DWORD err);
+static void queue_error( char* loc, char* reason);
+
+
+/* Wrapper around OS call to create semaphore */
+static Semaphore
+newSemaphore(int initCount, int max)
+{
+  Semaphore s;
+  s = CreateSemaphore ( NULL,       /* LPSECURITY_ATTRIBUTES (default) */
+                       initCount,  /* LONG lInitialCount */
+                       max,        /* LONG lMaxCount */
+                       NULL);      /* LPCTSTR (anonymous / no object name) */
+  if ( NULL == s) {
+    queue_error_rc("newSemaphore", GetLastError());
+    return NULL;
+  }
+  return s;
+}
+
+/*
+ * Function: NewWorkQueue
+ *
+ * The queue constructor - semaphores are initialised to match
+ * max number of queue entries.
+ * 
+ */
+WorkQueue*
+NewWorkQueue()
+{
+  WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue));
+  
+  if (!wq) {
+    queue_error("NewWorkQueue", "malloc() failed");
+    return wq;
+  }
+    
+  wq->head   = 0;
+  wq->tail   = 0;
+  
+  InitializeCriticalSection(&wq->queueLock);
+  wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE);
+  wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE);
+  
+  /* Fail if we were unable to create any of the sync objects. */
+  if ( NULL == wq->workAvailable ||
+       NULL == wq->roomAvailable ) {
+    FreeWorkQueue(wq);
+    return NULL;
+  }
+
+  return wq;
+}
+
+void
+FreeWorkQueue ( WorkQueue* pq )
+{
+  /* Close the semaphores; any threads blocked waiting
+   * on either will as a result be woken up.
+   */ 
+  if ( pq->workAvailable ) {
+    CloseHandle(pq->workAvailable);
+  }
+  if ( pq->roomAvailable ) {
+    CloseHandle(pq->workAvailable);
+  }
+  free(pq);
+  return;
+}
+
+HANDLE
+GetWorkQueueHandle ( WorkQueue* pq )
+{
+  if (!pq) return NULL;
+  
+  return pq->workAvailable;
+}
+
+/*
+ * Function: GetWork
+ *
+ * Fetch a work item from the queue, blocking if none available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+GetWork ( WorkQueue* pq, void** ppw )
+{
+  DWORD rc;
+
+  if (!pq) {
+    queue_error("GetWork", "NULL WorkQueue object");
+    return FALSE;
+  }
+  if (!ppw) {
+    queue_error("GetWork", "NULL WorkItem object");
+    return FALSE;
+  }
+  
+  /* Block waiting for work item to become available */
+  if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
+    queue_error_rc("GetWork.WaitForSingleObject(workAvailable)", 
+                  ( (WAIT_FAILED == rc) ? GetLastError() : rc));
+    return FALSE;
+  }
+  
+  return FetchWork(pq,ppw);
+}
+
+/*
+ * Function: FetchWork
+ *
+ * Fetch a work item from the queue, blocking if none available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+FetchWork ( WorkQueue* pq, void** ppw )
+{
+  DWORD rc;
+
+  if (!pq) {
+    queue_error("FetchWork", "NULL WorkQueue object");
+    return FALSE;
+  }
+  if (!ppw) {
+    queue_error("FetchWork", "NULL WorkItem object");
+    return FALSE;
+  }
+  
+  EnterCriticalSection(&pq->queueLock);
+  *ppw = pq->items[pq->head];
+  /* For sanity's sake, zero out the pointer. */
+  pq->items[pq->head] = NULL;
+  pq->head = (pq->head + 1) % WORKQUEUE_SIZE;
+  rc = ReleaseSemaphore(pq->roomAvailable,1, NULL);
+  LeaveCriticalSection(&pq->queueLock);
+  if ( 0 == rc ) {
+    queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError());
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+/*
+ * Function: SubmitWork
+ *
+ * Add work item to the queue, blocking if no room not available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+SubmitWork ( WorkQueue* pq, void* pw )
+{
+  DWORD rc;
+
+  if (!pq) {
+    queue_error("SubmitWork", "NULL WorkQueue object");
+    return FALSE;
+  }
+  if (!pw) {
+    queue_error("SubmitWork", "NULL WorkItem object");
+    return FALSE;
+  }
+  
+  /* Block waiting for work item to become available */
+  if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
+    queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)", 
+                  ( (WAIT_FAILED == rc) ? GetLastError() : rc));
+
+    return FALSE;
+  }
+  
+  EnterCriticalSection(&pq->queueLock);
+  pq->items[pq->tail] = pw;
+  pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE;
+  rc = ReleaseSemaphore(pq->workAvailable,1, NULL);
+  LeaveCriticalSection(&pq->queueLock);
+  if ( 0 == rc ) {
+    queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError());
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+/* Error handling */
+
+static void
+queue_error_rc( char* loc,
+               DWORD err)
+{
+  fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err);
+  fflush(stderr);
+  return;
+}
+                           
+
+static void
+queue_error( char* loc,
+            char* reason)
+{
+  fprintf(stderr, "%s failed: %s\n", loc, reason);
+  fflush(stderr);
+  return;
+}
+
diff --git a/ghc/rts/win32/WorkQueue.h b/ghc/rts/win32/WorkQueue.h
new file mode 100644 (file)
index 0000000..bde82a3
--- /dev/null
@@ -0,0 +1,37 @@
+/* WorkQueue.h
+ *
+ * A fixed-size queue; MT-friendly.
+ * 
+ * (c) sof, 2002-2003
+ *
+ */
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__
+#include <windows.h>
+
+/* This is a fixed-size queue. */
+#define WORKQUEUE_SIZE 16
+
+typedef HANDLE           Semaphore;
+typedef CRITICAL_SECTION CritSection;
+
+typedef struct WorkQueue {
+    /* the master lock, need to be grabbed prior to
+       using any of the other elements of the struct. */
+  CritSection   queueLock;
+  /* consumers/workers block waiting for 'workAvailable' */
+  Semaphore     workAvailable;
+  Semaphore     roomAvailable;
+  int           head;
+  int           tail;
+  void**        items[WORKQUEUE_SIZE];
+} WorkQueue;
+
+extern WorkQueue* NewWorkQueue       ( void );
+extern void       FreeWorkQueue      ( WorkQueue* pq );
+extern HANDLE     GetWorkQueueHandle ( WorkQueue* pq );
+extern BOOL       GetWork            ( WorkQueue* pq, void** ppw );
+extern BOOL       FetchWork          ( WorkQueue* pq, void** ppw );
+extern int        SubmitWork         ( WorkQueue* pq, void*   pw );
+
+#endif /* __WORKQUEUE_H__ */