3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
24 unsigned int requestID;
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
31 * The routine executed by each worker thread.
36 IOWorkerProc(PVOID param)
40 IOManagerState* iom = (IOManagerState*)param;
41 WorkQueue* pq = iom->workQueue;
47 hWaits[0] = (HANDLE)iom->hExitEvent;
48 hWaits[1] = GetWorkQueueHandle(pq);
51 /* The error code is communicated back on completion of request; reset. */
54 EnterCriticalSection(&iom->manLock);
56 LeaveCriticalSection(&iom->manLock);
58 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
60 EnterCriticalSection(&iom->manLock);
62 LeaveCriticalSection(&iom->manLock);
64 if ( WAIT_OBJECT_0 == rc ) {
67 fprintf(stderr, "shutting down...\n"); fflush(stderr);
70 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
71 /* work item available, fetch it. */
73 fprintf(stderr, "work available...\n"); fflush(stderr);
75 if (FetchWork(pq,(void**)&work)) {
76 if ( work->workKind & WORKER_READ ) {
77 if ( work->workKind & WORKER_FOR_SOCKET ) {
78 len = recv(work->workData.ioData.fd,
79 work->workData.ioData.buf,
80 work->workData.ioData.len,
82 if (len == SOCKET_ERROR) {
83 errCode = WSAGetLastError();
86 len = read(work->workData.ioData.fd,
87 work->workData.ioData.buf,
88 work->workData.ioData.len);
89 if (len == -1) { errCode = errno; }
91 complData = work->workData.ioData.buf;
92 fd = work->workData.ioData.fd;
93 } else if ( work->workKind & WORKER_WRITE ) {
94 if ( work->workKind & WORKER_FOR_SOCKET ) {
95 len = send(work->workData.ioData.fd,
96 work->workData.ioData.buf,
97 work->workData.ioData.len,
99 if (len == SOCKET_ERROR) {
100 errCode = WSAGetLastError();
103 len = write(work->workData.ioData.fd,
104 work->workData.ioData.buf,
105 work->workData.ioData.len);
106 if (len == -1) { errCode = errno; }
108 complData = work->workData.ioData.buf;
109 fd = work->workData.ioData.fd;
110 } else if ( work->workKind & WORKER_DELAY ) {
111 /* very approximate implementation of threadDelay */
112 Sleep(work->workData.delayData.msecs);
113 len = work->workData.delayData.msecs;
117 } else if ( work->workKind & WORKER_DO_PROC ) {
118 /* perform operation/proc on behalf of Haskell thread. */
119 if (work->workData.procData.proc) {
120 /* The procedure is assumed to encode result + success/failure
123 work->workData.procData.proc(work->workData.procData.param);
128 complData = work->workData.procData.param;
130 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
134 work->onCompletion(work->requestID,
139 /* Free the WorkItem */
142 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
146 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
155 NewIOWorkerThread(IOManagerState* iom)
158 return ( 0 != _beginthreadex(NULL,
173 if ( !wq ) return FALSE;
175 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
182 /* A manual-reset event */
183 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
190 ioMan->hExitEvent = hExit;
191 InitializeCriticalSection(&ioMan->manLock);
192 ioMan->workQueue = wq;
193 ioMan->numWorkers = 0;
194 ioMan->workersIdle = 0;
195 ioMan->requestID = 1;
201 * Function: AddIORequest()
203 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
204 * request to work queue, returning without blocking.
207 AddIORequest ( int fd,
212 CompletionProc onCompletion)
214 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
215 if (!ioMan || !wItem) return 0;
217 /* Fill in the blanks */
218 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
219 ( forWriting ? WORKER_WRITE : WORKER_READ );
220 wItem->workData.ioData.fd = fd;
221 wItem->workData.ioData.len = len;
222 wItem->workData.ioData.buf = buffer;
224 wItem->onCompletion = onCompletion;
225 wItem->requestID = ioMan->requestID++;
227 EnterCriticalSection(&ioMan->manLock);
228 /* If there are no worker threads available, create one.
230 * If this turns out to be too aggressive a policy, refine.
233 fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
235 if ( ioMan->workersIdle == 0 ) {
237 NewIOWorkerThread(ioMan);
239 LeaveCriticalSection(&ioMan->manLock);
241 if (SubmitWork(ioMan->workQueue,wItem)) {
242 return wItem->requestID;
249 * Function: AddDelayRequest()
253 AddDelayRequest ( unsigned int msecs,
254 CompletionProc onCompletion)
256 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
257 if (!ioMan || !wItem) return FALSE;
259 /* Fill in the blanks */
260 wItem->workKind = WORKER_DELAY;
261 wItem->workData.delayData.msecs = msecs;
262 wItem->onCompletion = onCompletion;
263 wItem->requestID = ioMan->requestID++;
265 EnterCriticalSection(&ioMan->manLock);
266 if ( ioMan->workersIdle == 0 ) {
268 NewIOWorkerThread(ioMan);
270 LeaveCriticalSection(&ioMan->manLock);
272 if (SubmitWork(ioMan->workQueue,wItem)) {
273 return wItem->requestID;
280 * Function: AddDelayRequest()
284 AddProcRequest ( void* proc,
286 CompletionProc onCompletion)
288 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
289 if (!ioMan || !wItem) return FALSE;
291 /* Fill in the blanks */
292 wItem->workKind = WORKER_DO_PROC;
293 wItem->workData.procData.proc = proc;
294 wItem->workData.procData.param = param;
295 wItem->onCompletion = onCompletion;
296 wItem->requestID = ioMan->requestID++;
298 EnterCriticalSection(&ioMan->manLock);
299 if ( ioMan->workersIdle == 0 ) {
301 NewIOWorkerThread(ioMan);
303 LeaveCriticalSection(&ioMan->manLock);
305 if (SubmitWork(ioMan->workQueue,wItem)) {
306 return wItem->requestID;
312 void ShutdownIOManager()
314 SetEvent(ioMan->hExitEvent);