3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
24 unsigned int requestID;
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
31 * The routine executed by each worker thread.
36 IOWorkerProc(PVOID param)
40 IOManagerState* iom = (IOManagerState*)param;
41 WorkQueue* pq = iom->workQueue;
47 hWaits[0] = (HANDLE)iom->hExitEvent;
48 hWaits[1] = GetWorkQueueHandle(pq);
51 /* The error code is communicated back on completion of request; reset. */
54 EnterCriticalSection(&iom->manLock);
56 LeaveCriticalSection(&iom->manLock);
58 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
60 EnterCriticalSection(&iom->manLock);
62 LeaveCriticalSection(&iom->manLock);
64 if ( WAIT_OBJECT_0 == rc ) {
67 fprintf(stderr, "shutting down...\n"); fflush(stderr);
70 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
71 /* work item available, fetch it. */
73 fprintf(stderr, "work available...\n"); fflush(stderr);
75 if (FetchWork(pq,(void**)&work)) {
76 if ( work->workKind & WORKER_READ ) {
77 if ( work->workKind & WORKER_FOR_SOCKET ) {
78 len = recv(work->workData.ioData.fd,
79 work->workData.ioData.buf,
80 work->workData.ioData.len,
82 if (len == SOCKET_ERROR) {
83 errCode = WSAGetLastError();
86 len = read(work->workData.ioData.fd,
87 work->workData.ioData.buf,
88 work->workData.ioData.len);
89 if (len == -1) { errCode = errno; }
91 complData = work->workData.ioData.buf;
92 fd = work->workData.ioData.fd;
93 } else if ( work->workKind & WORKER_WRITE ) {
94 if ( work->workKind & WORKER_FOR_SOCKET ) {
95 len = send(work->workData.ioData.fd,
96 work->workData.ioData.buf,
97 work->workData.ioData.len,
99 if (len == SOCKET_ERROR) {
100 errCode = WSAGetLastError();
103 len = write(work->workData.ioData.fd,
104 work->workData.ioData.buf,
105 work->workData.ioData.len);
106 if (len == -1) { errCode = errno; }
108 complData = work->workData.ioData.buf;
109 fd = work->workData.ioData.fd;
110 } else if ( work->workKind & WORKER_DELAY ) {
111 /* very approximate implementation of threadDelay */
112 Sleep(work->workData.delayData.msecs);
113 len = work->workData.delayData.msecs;
117 } else if ( work->workKind & WORKER_DO_PROC ) {
118 /* perform operation/proc on behalf of Haskell thread. */
119 if (work->workData.procData.proc) {
120 /* The procedure is assumed to encode result + success/failure
123 errCode=work->workData.procData.proc(work->workData.procData.param);
127 complData = work->workData.procData.param;
129 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
133 work->onCompletion(work->requestID,
138 /* Free the WorkItem */
141 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
145 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
154 NewIOWorkerThread(IOManagerState* iom)
157 return ( 0 != _beginthreadex(NULL,
172 if ( !wq ) return FALSE;
174 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
181 /* A manual-reset event */
182 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
189 ioMan->hExitEvent = hExit;
190 InitializeCriticalSection(&ioMan->manLock);
191 ioMan->workQueue = wq;
192 ioMan->numWorkers = 0;
193 ioMan->workersIdle = 0;
194 ioMan->requestID = 1;
200 * Function: AddIORequest()
202 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
203 * request to work queue, returning without blocking.
206 AddIORequest ( int fd,
211 CompletionProc onCompletion)
213 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
214 if (!ioMan || !wItem) return 0;
216 /* Fill in the blanks */
217 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
218 ( forWriting ? WORKER_WRITE : WORKER_READ );
219 wItem->workData.ioData.fd = fd;
220 wItem->workData.ioData.len = len;
221 wItem->workData.ioData.buf = buffer;
223 wItem->onCompletion = onCompletion;
224 wItem->requestID = ioMan->requestID++;
226 EnterCriticalSection(&ioMan->manLock);
227 /* If there are no worker threads available, create one.
229 * If this turns out to be too aggressive a policy, refine.
232 fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
234 if ( ioMan->workersIdle == 0 ) {
236 LeaveCriticalSection(&ioMan->manLock);
237 NewIOWorkerThread(ioMan);
239 LeaveCriticalSection(&ioMan->manLock);
242 if (SubmitWork(ioMan->workQueue,wItem)) {
243 return wItem->requestID;
250 * Function: AddDelayRequest()
254 AddDelayRequest ( unsigned int msecs,
255 CompletionProc onCompletion)
257 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
258 if (!ioMan || !wItem) return FALSE;
260 /* Fill in the blanks */
261 wItem->workKind = WORKER_DELAY;
262 wItem->workData.delayData.msecs = msecs;
263 wItem->onCompletion = onCompletion;
264 wItem->requestID = ioMan->requestID++;
266 EnterCriticalSection(&ioMan->manLock);
268 fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); fflush(stderr);
270 if ( ioMan->workersIdle == 0 ) {
272 LeaveCriticalSection(&ioMan->manLock);
273 NewIOWorkerThread(ioMan);
275 LeaveCriticalSection(&ioMan->manLock);
278 if (SubmitWork(ioMan->workQueue,wItem)) {
279 return wItem->requestID;
286 * Function: AddDelayRequest()
290 AddProcRequest ( void* proc,
292 CompletionProc onCompletion)
294 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
295 if (!ioMan || !wItem) return FALSE;
297 /* Fill in the blanks */
298 wItem->workKind = WORKER_DO_PROC;
299 wItem->workData.procData.proc = proc;
300 wItem->workData.procData.param = param;
301 wItem->onCompletion = onCompletion;
302 wItem->requestID = ioMan->requestID++;
304 EnterCriticalSection(&ioMan->manLock);
306 fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); fflush(stderr);
308 if ( ioMan->workersIdle == 0 ) {
310 LeaveCriticalSection(&ioMan->manLock);
311 NewIOWorkerThread(ioMan);
313 LeaveCriticalSection(&ioMan->manLock);
316 if (SubmitWork(ioMan->workQueue,wItem)) {
317 return wItem->requestID;
323 void ShutdownIOManager()
325 SetEvent(ioMan->hExitEvent);