Asynchronous / non-blocking I/O for Win32 platforms.
This commit introduces a Concurrent Haskell friendly view of I/O on
Win32 platforms. Through the use of a pool of worker Win32 threads, CH
threads may issue asynchronous I/O requests without blocking the
progress of other CH threads. The issuing CH thread is blocked until
the request has been serviced though.
GHC.Conc exports the primops that take care of issuing the
asynchronous I/O requests, which the IO implementation now takes
advantage of. By default, all Handles are non-blocking/asynchronous,
but should performance become an issue, having a per-Handle flag for
turning off non-blocking could easily be imagined&introduced.
[Incidentally, this thread pool-based implementation could easily be
extended to also allow Haskell code to delegate the execution of
arbitrary pieces of (potentially blocking) external code to another OS
thread. Given how relatively gnarly the locking story has turned out
to be with the 'threaded' RTS, that may not be such a bad idea.]
-----------------------------------------------------------------------
--- $Id: primops.txt.pp,v 1.24 2003/02/04 12:40:00 simonpj Exp $
+-- $Id: primops.txt.pp,v 1.25 2003/02/21 05:34:14 sof Exp $
--
-- Primitive Operations
--
has_side_effects = True
out_of_line = True
+#ifdef mingw32_TARGET_OS
+primop AsyncReadOp "asyncRead#" GenPrimOp
+ Int# -> Int# -> Int# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
+ {Asynchronously read bytes from specified file descriptor.}
+ with
+ needs_wrapper = True
+ has_side_effects = True
+ out_of_line = True
+
+primop AsyncWriteOp "asyncWrite#" GenPrimOp
+ Int# -> Int# -> Int# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
+ {Asynchronously write bytes from specified file descriptor.}
+ with
+ needs_wrapper = True
+ has_side_effects = True
+ out_of_line = True
+#endif
+
------------------------------------------------------------------------
section "Concurrency primitives"
{(In a non-concurrent implementation, ThreadId\# can be as singleton
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.99 2002/10/22 11:01:18 simonmar Exp $
+ * $Id: PrimOps.h,v 1.100 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
EXTFUN_RTS(waitReadzh_fast);
EXTFUN_RTS(waitWritezh_fast);
EXTFUN_RTS(delayzh_fast);
+#ifdef mingw32_TARGET_OS
+EXTFUN_RTS(asyncReadzh_fast);
+EXTFUN_RTS(asyncWritezh_fast);
+#endif
/* -----------------------------------------------------------------------------
/* -----------------------------------------------------------------------------
- * $Id: StgMiscClosures.h,v 1.45 2003/01/07 09:29:24 simonmar Exp $
+ * $Id: StgMiscClosures.h,v 1.46 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
EF_(stg_block_1);
EF_(stg_block_takemvar);
EF_(stg_block_putmvar);
-
+#ifdef mingw32_TARGET_OS
+EF_(stg_block_async);
+#endif
/* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.29 2003/01/25 15:54:48 wolfgang Exp $
+ * $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-1999
*
#endif
} StgTSOBlockReason;
+#ifdef mingw32_TARGET_OS
+/* results from an async I/O request + it's ID. */
+typedef struct {
+ unsigned int reqID;
+ int len;
+ int errCode;
+} StgAsyncIOResult;
+#endif
+
typedef union {
StgClosure *closure;
struct StgTSO_ *tso;
int fd;
+#ifdef mingw32_TARGET_OS
+ StgAsyncIOResult* async_result;
+#endif
unsigned int target;
} StgTSOBlockInfo;
/* -----------------------------------------------------------------------------
- * $Id: HeapStackCheck.hc,v 1.27 2002/12/11 15:36:42 simonmar Exp $
+ * $Id: HeapStackCheck.hc,v 1.28 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
BLOCK_GENERIC;
FE_
}
+
+#ifdef mingw32_TARGET_OS
+INFO_TABLE_RET( stg_block_async_info, stg_block_async_ret,
+ MK_SMALL_BITMAP(0/*framesize*/, 0/*bitmap*/),
+ 0/*SRT*/, 0/*SRT_OFF*/, 0/*SRT_LEN*/,
+ RET_SMALL,, IF_, 0, 0);
+
+IF_(stg_block_async_ret)
+{
+ StgAsyncIOResult* ares;
+ int len,errC;
+ FB_
+ ares = CurrentTSO->block_info.async_result;
+ len = ares->len;
+ errC = ares->errCode;
+ CurrentTSO->block_info.async_result = NULL;
+ STGCALL1(free,ares);
+ R1.w = len;
+ *Sp = (W_)errC;
+ JMP_(ENTRY_CODE(Sp[1]));
+ FE_
+}
+
+FN_(stg_block_async)
+{
+ FB_
+ Sp -= 1;
+ Sp[0] = (W_)&stg_block_async_info;
+ BLOCK_GENERIC;
+ FE_
+}
+
+#endif
/* -----------------------------------------------------------------------------
- * $Id: Linker.c,v 1.114 2003/02/10 23:35:03 wolfgang Exp $
+ * $Id: Linker.c,v 1.115 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 2000, 2001
*
/* These are statically linked from the mingw libraries into the ghc
executable, so we have to employ this hack. */
#define RTS_MINGW_ONLY_SYMBOLS \
+ SymX(asyncReadzh_fast) \
+ SymX(asyncWritezh_fast) \
SymX(memset) \
SymX(inet_ntoa) \
SymX(inet_addr) \
# grab sources from these subdirectories
ALL_DIRS = hooks parallel
+ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
+ALL_DIRS += win32
+endif
+
ifneq "$(DLLized)" "YES"
EXCLUDED_SRCS += RtsDllMain.c
else
# COMPILING_RTS is only used when building Win32 DLL support.
STANDARD_OPTS += -DCOMPILING_RTS
+ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
+STANDARD_OPTS += -Iwin32
+endif
+
# HC_OPTS is included in both .c and .hc compilations, whereas CC_OPTS is
# only included in .c compilations. HC_OPTS included the WAY_* opts, which
# must be included in both types of compilations.
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.103 2002/12/11 15:36:45 simonmar Exp $
+ * $Id: PrimOps.hc,v 1.104 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
#include <stdlib.h>
+#ifdef mingw32_TARGET_OS
+#include <windows.h>
+#include "win32/AsyncIO.h"
+#endif
+
/* ** temporary **
classes CCallable and CReturnable don't really exist, but the
FE_
}
+#ifdef mingw32_TARGET_OS
+FN_(asyncReadzh_fast)
+{
+ StgAsyncIOResult* ares;
+ unsigned int reqID;
+ FB_
+ /* args: R1.i = fd, R2.i = isSock, R3.i = len, R4.p = buf */
+ ASSERT(CurrentTSO->why_blocked == NotBlocked);
+ CurrentTSO->why_blocked = BlockedOnRead;
+ ACQUIRE_LOCK(&sched_mutex);
+ /* could probably allocate this on the heap instead */
+ ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
+ reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p);
+ ares->reqID = reqID;
+ ares->len = 0;
+ ares->errCode = 0;
+ CurrentTSO->block_info.async_result = ares;
+ APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ RELEASE_LOCK(&sched_mutex);
+ JMP_(stg_block_async);
+ FE_
+}
+
+FN_(asyncWritezh_fast)
+{
+ StgAsyncIOResult* ares;
+ unsigned int reqID;
+ FB_
+ /* args: R1.i */
+ /* args: R1.i = fd, R2.i = isSock, R3.i = len, R4.p = buf */
+ ASSERT(CurrentTSO->why_blocked == NotBlocked);
+ CurrentTSO->why_blocked = BlockedOnWrite;
+ ACQUIRE_LOCK(&sched_mutex);
+ ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
+ reqID = RET_STGCALL5(W_,addIORequest,R1.i,TRUE,R2.i,R3.i,(char*)R4.p);
+ ares->reqID = reqID;
+ ares->len = 0;
+ ares->errCode = 0;
+ CurrentTSO->block_info.async_result = ares;
+ APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ RELEASE_LOCK(&sched_mutex);
+ JMP_(stg_block_async);
+ FE_
+}
+#endif
/* -----------------------------------------------------------------------------
- * $Id: RtsStartup.c,v 1.70 2003/01/30 10:19:07 simonmar Exp $
+ * $Id: RtsStartup.c,v 1.71 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
# include "LLC.h"
#endif
+#if defined(mingw32_TARGET_OS)
+#include "win32/AsyncIO.h"
+#endif
+
#include <stdlib.h>
// Flag Structure
initDefaultHandlers();
#endif
+#if defined(mingw32_TARGET_OS)
+ startupAsyncIO();
+#endif
+
#ifdef RTS_GTK_FRONTPANEL
if (RtsFlags.GcFlags.frontpanel) {
initFrontPanel();
#if defined(TICKY_TICKY)
if (RtsFlags.TickyFlags.showTickyStats) PrintTickyInfo();
#endif
+
+#if defined(mingw32_TARGET_OS)
+ shutdownAsyncIO();
+#endif
}
// Compatibility interfaces
/* -----------------------------------------------------------------------------
- * $Id: Select.c,v 1.23 2003/01/25 15:54:50 wolfgang Exp $
+ * $Id: Select.c,v 1.24 2003/02/21 05:34:16 sof Exp $
*
* (c) The GHC Team 1995-2002
*
# ifdef mingw32_TARGET_OS
# include <windows.h>
+# include "win32/AsyncIO.h"
# endif
#include <errno.h>
#endif
RELEASE_LOCK(&sched_mutex);
while (1) {
- Sleep(0); /* don't busy wait */
+ if (!awaitRequests(wait)) {
+ Sleep(0); /* don't busy wait */
+ }
#endif /* mingw32_TARGET_OS */
ACQUIRE_LOCK(&sched_mutex);
#ifdef RTS_SUPPORTS_THREADS
--- /dev/null
+/* AsyncIO.c
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "Rts.h"
+#include <windows.h>
+#include <stdio.h>
+#include "Schedule.h"
+#include "win32/AsyncIO.h"
+#include "win32/IOManager.h"
+
+/*
+ * Overview:
+ *
+ * Haskell code issue asynchronous I/O requests via the
+ * asyncRead# and asyncWrite# primops. These cause addIORequest()
+ * to be invoked, which forwards the request to the underlying
+ * asynchronous I/O subsystem. Each request is tagged with a unique
+ * ID.
+ *
+ * addIORequest() returns this ID, so that when the blocked CH
+ * thread is added onto blocked_queue, its TSO is annotated with
+ * it. Upon completion of an I/O request, the async I/O handling
+ * code makes a back-call to signal its completion; the local
+ * onIOComplete() routine. It adds the IO request ID (along with
+ * its result data) to a queue of completed requests before returning.
+ *
+ * The queue of completed IO request is read by the thread operating
+ * the RTS scheduler. It de-queues the CH threads corresponding
+ * to the request IDs, making them runnable again.
+ *
+ */
+
+typedef struct CompletedReq {
+ unsigned int reqID;
+ int len;
+ int errCode;
+} CompletedReq;
+
+#define MAX_REQUESTS 200
+
+static CRITICAL_SECTION queue_lock;
+static HANDLE completed_req_event;
+static CompletedReq completedTable[MAX_REQUESTS];
+static int completed_hw;
+static int issued_reqs;
+
+static void
+onIOComplete(unsigned int reqID,
+ void* param STG_UNUSED,
+ int fd STG_UNUSED,
+ int len,
+ char* buf STG_UNUSED,
+ int errCode)
+{
+ /* Deposit result of request in queue/table */
+ EnterCriticalSection(&queue_lock);
+ if (completed_hw == MAX_REQUESTS) {
+ /* Not likely */
+ fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
+ fflush(stderr);
+ } else {
+#if 0
+ fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
+#endif
+ completedTable[completed_hw].reqID = reqID;
+ completedTable[completed_hw].len = len;
+ completedTable[completed_hw].errCode = errCode;
+ completed_hw++;
+ issued_reqs--;
+ if (completed_hw == 1) {
+ /* The event is used to wake up the scheduler thread should it
+ * be blocked waiting for requests to complete. It reset once
+ * that thread has cleared out the request queue/table.
+ */
+ SetEvent(completed_req_event);
+ }
+ }
+ LeaveCriticalSection(&queue_lock);
+}
+
+unsigned int
+addIORequest(int fd,
+ int forWriting,
+ int isSock,
+ int len,
+ char* buf)
+{
+ EnterCriticalSection(&queue_lock);
+ issued_reqs++;
+ LeaveCriticalSection(&queue_lock);
+#if 0
+ fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
+#endif
+ return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
+}
+
+int
+startupAsyncIO()
+{
+ if (!StartIOManager()) {
+ return 0;
+ }
+ InitializeCriticalSection(&queue_lock);
+ completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+ completed_hw = 0;
+ return 1;
+}
+
+void
+shutdownAsyncIO()
+{
+ CloseHandle(completed_req_event);
+ ShutdownIOManager();
+}
+
+int
+awaitRequests(rtsBool wait)
+{
+start:
+#if 0
+ fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
+#endif
+ EnterCriticalSection(&queue_lock);
+ /* Nothing immediately available & we won't wait */
+ if ((!wait && completed_hw == 0) ||
+ (issued_reqs == 0 && completed_hw == 0)) {
+ LeaveCriticalSection(&queue_lock);
+ return 0;
+ }
+ if (completed_hw == 0) {
+ /* empty table, drop lock and wait */
+ LeaveCriticalSection(&queue_lock);
+ if (wait) {
+ WaitForSingleObject( completed_req_event, INFINITE );
+ } else {
+ return 0; /* cannot happen */
+ }
+ goto start;
+ } else {
+ int i;
+ StgTSO *tso, *prev;
+
+ for (i=0; i < completed_hw; i++) {
+ unsigned int rID = completedTable[i].reqID;
+ prev = NULL;
+ for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
+ switch(tso->why_blocked) {
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ if (tso->block_info.async_result->reqID == rID) {
+ /* Found the thread blocked waiting on request; stodgily fill
+ * in its result block.
+ */
+ tso->block_info.async_result->len = completedTable[i].len;
+ tso->block_info.async_result->errCode = completedTable[i].errCode;
+
+ /* Drop the matched TSO from blocked_queue */
+ if (prev) {
+ prev->link = tso->link;
+ } else {
+ blocked_queue_hd = tso->link;
+ }
+ if (blocked_queue_tl == tso) {
+ blocked_queue_tl = prev;
+ }
+ /* Terminates the run queue + this inner for-loop. */
+ tso->link = END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
+ PUSH_ON_RUN_QUEUE(tso);
+ break;
+ }
+ break;
+ default:
+ break;
+ }
+ prev = tso;
+ }
+ }
+ completed_hw = 0;
+ ResetEvent(completed_req_event);
+ LeaveCriticalSection(&queue_lock);
+ return 1;
+ }
+}
--- /dev/null
+/* AsyncIO.h
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ */
+#ifndef __ASYNCHIO_H__
+#define __ASYNCHIO_H__
+extern unsigned int
+addIORequest(int fd,
+ int forWriting,
+ int isSock,
+ int len,
+ char* buf);
+
+extern int startupAsyncIO(void);
+extern void shutdownAsyncIO(void);
+
+extern int awaitRequests(rtsBool wait);
+
+#endif /* __ASYNCHIO_H__ */
--- /dev/null
+/* IOManager.c
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "IOManager.h"
+#include "WorkQueue.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <io.h>
+#include <winsock.h>
+#include <process.h>
+
+/*
+ * Internal state maintained by the IO manager.
+ */
+typedef struct IOManagerState {
+ CritSection manLock;
+ WorkQueue* workQueue;
+ int numWorkers;
+ int workersIdle;
+ HANDLE hExitEvent;
+ unsigned int requestID;
+} IOManagerState;
+
+/* ToDo: wrap up this state via a IOManager handle instead? */
+static IOManagerState* ioMan;
+
+/*
+ * The routine executed by each worker thread.
+ */
+static
+unsigned
+WINAPI
+IOWorkerProc(PVOID param)
+{
+ HANDLE hWaits[2];
+ DWORD rc;
+ IOManagerState* iom = (IOManagerState*)param;
+ WorkQueue* pq = iom->workQueue;
+ WorkItem* work;
+ int len;
+ DWORD errCode;
+
+ hWaits[0] = (HANDLE)iom->hExitEvent;
+ hWaits[1] = GetWorkQueueHandle(pq);
+
+ while (1) {
+ /* The error code is communicated back on completion of request; reset. */
+ errCode = 0;
+
+ EnterCriticalSection(&iom->manLock);
+ iom->workersIdle++;
+ LeaveCriticalSection(&iom->manLock);
+
+ rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
+
+ EnterCriticalSection(&iom->manLock);
+ iom->workersIdle--;
+ LeaveCriticalSection(&iom->manLock);
+
+ if ( WAIT_OBJECT_0 == rc ) {
+ /* shutdown */
+#if 0
+ fprintf(stderr, "shutting down...\n"); fflush(stderr);
+#endif
+ return 0;
+ } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+ /* work item available, fetch it. */
+#if 0
+ fprintf(stderr, "work available...\n"); fflush(stderr);
+#endif
+ if (FetchWork(pq,(void**)&work)) {
+ if ( work->workKind & WORKER_READ ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = recv(work->fd, work->buf, work->len, 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ len = read(work->fd, work->buf, work->len);
+ if (len == -1) { errCode = errno; }
+ }
+ } else if ( work->workKind & WORKER_WRITE ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = send(work->fd, work->buf, work->len, 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ len = write(work->fd,work->buf, work->len);
+ if (len == -1) { errCode = errno; }
+ }
+ } else if ( work->workKind & WORKER_DELAY ) {
+ /* very approximate implementation of threadDelay */
+ Sleep(work->len);
+ len = work->len;
+ errCode = 0;
+ } else {
+ fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+ fflush(stderr);
+ continue;
+ }
+ work->onCompletion(work->requestID,
+ work->param,
+ work->fd,
+ len,
+ work->buf,
+ errCode);
+ /* Free the WorkItem */
+ free(work);
+ } else {
+ fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+ return 1;
+ }
+ } else {
+ fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static
+BOOL
+NewIOWorkerThread(IOManagerState* iom)
+{
+ return ( 0 != _beginthreadex(NULL,
+ 0,
+ IOWorkerProc,
+ (LPVOID)iom,
+ 0,
+ NULL) );
+ //CreateThread( NULL, 0, IOWorkerProc, (LPVOID)iom, 0, NULL));
+}
+
+BOOL
+StartIOManager(void)
+{
+ HANDLE hExit;
+ WorkQueue* wq;
+
+ wq = NewWorkQueue();
+ if ( !wq ) return FALSE;
+
+ ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+
+ if (!ioMan) {
+ FreeWorkQueue(wq);
+ return FALSE;
+ }
+
+ /* A manual-reset event */
+ hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+ if ( !hExit ) {
+ FreeWorkQueue(wq);
+ free(ioMan);
+ return FALSE;
+ }
+
+ ioMan->hExitEvent = hExit;
+ InitializeCriticalSection(&ioMan->manLock);
+ ioMan->workQueue = wq;
+ ioMan->numWorkers = 0;
+ ioMan->workersIdle = 0;
+ ioMan->requestID = 1;
+
+ return TRUE;
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, returning without blocking.
+ */
+int
+AddIORequest ( int fd,
+ BOOL forWriting,
+ BOOL isSocket,
+ int len,
+ char* buffer,
+ void* data,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ if (!ioMan || !wItem) return 0;
+
+ /* Fill in the blanks */
+ wItem->fd = fd;
+ wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
+ ( forWriting ? WORKER_WRITE : WORKER_READ );
+ wItem->len = len;
+ wItem->buf = buffer;
+ wItem->param = data;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = ioMan->requestID++;
+
+ EnterCriticalSection(&ioMan->manLock);
+ /* If there are no worker threads available, create one.
+ *
+ * If this turns out to be too aggressive a policy, refine.
+ */
+#if 0
+ fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
+#endif
+ if ( ioMan->workersIdle == 0 ) {
+ ioMan->numWorkers++;
+ NewIOWorkerThread(ioMan);
+ }
+ LeaveCriticalSection(&ioMan->manLock);
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ return wItem->requestID;
+ } else {
+ return 0;
+ }
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ */
+BOOL
+AddDelayRequest ( unsigned int msecs,
+ void* data,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->fd = 0;
+ wItem->workKind = WORKER_DELAY;
+ wItem->len = msecs;
+ wItem->buf = 0;
+ wItem->param = data;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = ioMan->requestID++;
+
+ EnterCriticalSection(&ioMan->manLock);
+ if ( ioMan->workersIdle == 0 ) {
+ ioMan->numWorkers++;
+ NewIOWorkerThread(ioMan);
+ }
+ LeaveCriticalSection(&ioMan->manLock);
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ return wItem->requestID;
+ } else {
+ return 0;
+ }
+}
+
+void ShutdownIOManager()
+{
+ SetEvent(ioMan->hExitEvent);
+ free(ioMan);
+ ioMan = NULL;
+}
--- /dev/null
+/* IOManager.h
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003
+ */
+#ifndef __IOMANAGER_H__
+#define __IOMANAGER_H__
+/* On the yucky side..suppress -Wmissing-declarations warnings when
+ * including <windows.h>
+ */
+extern void* GetCurrentFiber ( void );
+extern void* GetFiberData ( void );
+#include <windows.h>
+
+/*
+ The IOManager subsystem provides a non-blocking view
+ of I/O operations. It lets one (or more) OS thread(s)
+ issue multiple I/O requests, which the IOManager then
+ handles independently of/concurrent to the thread(s)
+ that issued the request. Upon completion, the issuing
+ thread can inspect the result of the I/O operation &
+ take appropriate action.
+
+ The IOManager is intended used with the GHC RTS to
+ implement non-blocking I/O in Concurrent Haskell.
+ */
+
+/*
+ * Our WorkQueue holds WorkItems, encoding IO and
+ * delay requests.
+ *
+ */
+typedef void (*CompletionProc)(unsigned int requestID,
+ void* param,
+ int fd,
+ int len,
+ char* buf,
+ int errCode);
+
+typedef struct WorkItem {
+ unsigned int workKind;
+ int fd;
+ int len;
+ char* buf;
+ void* param;
+ unsigned int requestID;
+ CompletionProc onCompletion;
+} WorkItem;
+
+extern CompletionProc onComplete;
+
+/* the kind of operations supported; you could easily imagine
+ * that instead of passing a tag describing the work to be performed,
+ * a function pointer is passed instead. Maybe later.
+ */
+#define WORKER_READ 1
+#define WORKER_WRITE 2
+#define WORKER_DELAY 4
+#define WORKER_FOR_SOCKET 8
+
+/*
+ * Starting up and shutting down.
+ */
+extern BOOL StartIOManager ( void );
+extern void ShutdownIOManager ( void );
+
+/*
+ * Adding I/O and delay requests. With each request a
+ * completion routine is supplied, which the worker thread
+ * will invoke upon completion.
+ */
+extern int AddDelayRequest ( unsigned int msecs,
+ void* data,
+ CompletionProc onCompletion);
+
+extern int AddIORequest ( int fd,
+ BOOL forWriting,
+ BOOL isSocket,
+ int len,
+ char* buffer,
+ void* data,
+ CompletionProc onCompletion);
+
+#endif /* __IOMANAGER_H__ */
--- /dev/null
+/*
+ * A fixed-size queue; MT-friendly.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "WorkQueue.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+static void queue_error_rc( char* loc, DWORD err);
+static void queue_error( char* loc, char* reason);
+
+
+/* Wrapper around OS call to create semaphore */
+static Semaphore
+newSemaphore(int initCount, int max)
+{
+ Semaphore s;
+ s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */
+ initCount, /* LONG lInitialCount */
+ max, /* LONG lMaxCount */
+ NULL); /* LPCTSTR (anonymous / no object name) */
+ if ( NULL == s) {
+ queue_error_rc("newSemaphore", GetLastError());
+ return NULL;
+ }
+ return s;
+}
+
+/*
+ * Function: NewWorkQueue
+ *
+ * The queue constructor - semaphores are initialised to match
+ * max number of queue entries.
+ *
+ */
+WorkQueue*
+NewWorkQueue()
+{
+ WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue));
+
+ if (!wq) {
+ queue_error("NewWorkQueue", "malloc() failed");
+ return wq;
+ }
+
+ wq->head = 0;
+ wq->tail = 0;
+
+ InitializeCriticalSection(&wq->queueLock);
+ wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE);
+ wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE);
+
+ /* Fail if we were unable to create any of the sync objects. */
+ if ( NULL == wq->workAvailable ||
+ NULL == wq->roomAvailable ) {
+ FreeWorkQueue(wq);
+ return NULL;
+ }
+
+ return wq;
+}
+
+void
+FreeWorkQueue ( WorkQueue* pq )
+{
+ /* Close the semaphores; any threads blocked waiting
+ * on either will as a result be woken up.
+ */
+ if ( pq->workAvailable ) {
+ CloseHandle(pq->workAvailable);
+ }
+ if ( pq->roomAvailable ) {
+ CloseHandle(pq->workAvailable);
+ }
+ free(pq);
+ return;
+}
+
+HANDLE
+GetWorkQueueHandle ( WorkQueue* pq )
+{
+ if (!pq) return NULL;
+
+ return pq->workAvailable;
+}
+
+/*
+ * Function: GetWork
+ *
+ * Fetch a work item from the queue, blocking if none available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+GetWork ( WorkQueue* pq, void** ppw )
+{
+ DWORD rc;
+
+ if (!pq) {
+ queue_error("GetWork", "NULL WorkQueue object");
+ return FALSE;
+ }
+ if (!ppw) {
+ queue_error("GetWork", "NULL WorkItem object");
+ return FALSE;
+ }
+
+ /* Block waiting for work item to become available */
+ if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
+ queue_error_rc("GetWork.WaitForSingleObject(workAvailable)",
+ ( (WAIT_FAILED == rc) ? GetLastError() : rc));
+ return FALSE;
+ }
+
+ return FetchWork(pq,ppw);
+}
+
+/*
+ * Function: FetchWork
+ *
+ * Fetch a work item from the queue, blocking if none available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+FetchWork ( WorkQueue* pq, void** ppw )
+{
+ DWORD rc;
+
+ if (!pq) {
+ queue_error("FetchWork", "NULL WorkQueue object");
+ return FALSE;
+ }
+ if (!ppw) {
+ queue_error("FetchWork", "NULL WorkItem object");
+ return FALSE;
+ }
+
+ EnterCriticalSection(&pq->queueLock);
+ *ppw = pq->items[pq->head];
+ /* For sanity's sake, zero out the pointer. */
+ pq->items[pq->head] = NULL;
+ pq->head = (pq->head + 1) % WORKQUEUE_SIZE;
+ rc = ReleaseSemaphore(pq->roomAvailable,1, NULL);
+ LeaveCriticalSection(&pq->queueLock);
+ if ( 0 == rc ) {
+ queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError());
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/*
+ * Function: SubmitWork
+ *
+ * Add work item to the queue, blocking if no room not available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+SubmitWork ( WorkQueue* pq, void* pw )
+{
+ DWORD rc;
+
+ if (!pq) {
+ queue_error("SubmitWork", "NULL WorkQueue object");
+ return FALSE;
+ }
+ if (!pw) {
+ queue_error("SubmitWork", "NULL WorkItem object");
+ return FALSE;
+ }
+
+ /* Block waiting for work item to become available */
+ if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
+ queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)",
+ ( (WAIT_FAILED == rc) ? GetLastError() : rc));
+
+ return FALSE;
+ }
+
+ EnterCriticalSection(&pq->queueLock);
+ pq->items[pq->tail] = pw;
+ pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE;
+ rc = ReleaseSemaphore(pq->workAvailable,1, NULL);
+ LeaveCriticalSection(&pq->queueLock);
+ if ( 0 == rc ) {
+ queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError());
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/* Error handling */
+
+static void
+queue_error_rc( char* loc,
+ DWORD err)
+{
+ fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err);
+ fflush(stderr);
+ return;
+}
+
+
+static void
+queue_error( char* loc,
+ char* reason)
+{
+ fprintf(stderr, "%s failed: %s\n", loc, reason);
+ fflush(stderr);
+ return;
+}
+
--- /dev/null
+/* WorkQueue.h
+ *
+ * A fixed-size queue; MT-friendly.
+ *
+ * (c) sof, 2002-2003
+ *
+ */
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__
+#include <windows.h>
+
+/* This is a fixed-size queue. */
+#define WORKQUEUE_SIZE 16
+
+typedef HANDLE Semaphore;
+typedef CRITICAL_SECTION CritSection;
+
+typedef struct WorkQueue {
+ /* the master lock, need to be grabbed prior to
+ using any of the other elements of the struct. */
+ CritSection queueLock;
+ /* consumers/workers block waiting for 'workAvailable' */
+ Semaphore workAvailable;
+ Semaphore roomAvailable;
+ int head;
+ int tail;
+ void** items[WORKQUEUE_SIZE];
+} WorkQueue;
+
+extern WorkQueue* NewWorkQueue ( void );
+extern void FreeWorkQueue ( WorkQueue* pq );
+extern HANDLE GetWorkQueueHandle ( WorkQueue* pq );
+extern BOOL GetWork ( WorkQueue* pq, void** ppw );
+extern BOOL FetchWork ( WorkQueue* pq, void** ppw );
+extern int SubmitWork ( WorkQueue* pq, void* pw );
+
+#endif /* __WORKQUEUE_H__ */