3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
25 unsigned int requestID;
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
32 * The routine executed by each worker thread.
37 IOWorkerProc(PVOID param)
41 IOManagerState* iom = (IOManagerState*)param;
42 WorkQueue* pq = iom->workQueue;
48 hWaits[0] = (HANDLE)iom->hExitEvent;
49 hWaits[1] = GetWorkQueueHandle(pq);
52 /* The error code is communicated back on completion of request; reset. */
55 EnterCriticalSection(&iom->manLock);
56 /* Signal that the worker is idle.
58 * 'workersIdle' is used when determining whether or not to
59 * increase the worker thread pool when adding a new request.
60 * (see addIORequest().)
63 LeaveCriticalSection(&iom->manLock);
66 * A possible future refinement is to make long-term idle threads
67 * wake up and decide to shut down should the number of idle threads
68 * be above some threshold.
71 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
73 if (rc == WAIT_OBJECT_0) {
74 // we received the exit event
78 EnterCriticalSection(&iom->manLock);
79 /* Signal that the thread is 'non-idle' and about to consume
84 LeaveCriticalSection(&iom->manLock);
86 if ( rc == (WAIT_OBJECT_0 + 1) ) {
87 /* work item available, fetch it. */
88 if (FetchWork(pq,(void**)&work)) {
89 if ( work->workKind & WORKER_READ ) {
90 if ( work->workKind & WORKER_FOR_SOCKET ) {
91 len = recv(work->workData.ioData.fd,
92 work->workData.ioData.buf,
93 work->workData.ioData.len,
95 if (len == SOCKET_ERROR) {
96 errCode = WSAGetLastError();
99 len = read(work->workData.ioData.fd,
100 work->workData.ioData.buf,
101 work->workData.ioData.len);
102 if (len == -1) { errCode = errno; }
104 complData = work->workData.ioData.buf;
105 fd = work->workData.ioData.fd;
106 } else if ( work->workKind & WORKER_WRITE ) {
107 if ( work->workKind & WORKER_FOR_SOCKET ) {
108 len = send(work->workData.ioData.fd,
109 work->workData.ioData.buf,
110 work->workData.ioData.len,
112 if (len == SOCKET_ERROR) {
113 errCode = WSAGetLastError();
116 len = write(work->workData.ioData.fd,
117 work->workData.ioData.buf,
118 work->workData.ioData.len);
119 if (len == -1) { errCode = errno; }
121 complData = work->workData.ioData.buf;
122 fd = work->workData.ioData.fd;
123 } else if ( work->workKind & WORKER_DELAY ) {
124 /* Approximate implementation of threadDelay;
126 * Note: Sleep() is in milliseconds, not micros.
128 Sleep(work->workData.delayData.msecs / 1000);
129 len = work->workData.delayData.msecs;
133 } else if ( work->workKind & WORKER_DO_PROC ) {
134 /* perform operation/proc on behalf of Haskell thread. */
135 if (work->workData.procData.proc) {
136 /* The procedure is assumed to encode result + success/failure
139 errCode=work->workData.procData.proc(work->workData.procData.param);
143 complData = work->workData.procData.param;
145 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
149 work->onCompletion(work->requestID,
154 /* Free the WorkItem */
157 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
161 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
170 NewIOWorkerThread(IOManagerState* iom)
173 return ( 0 != _beginthreadex(NULL,
188 if ( !wq ) return FALSE;
190 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
197 /* A manual-reset event */
198 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
205 ioMan->hExitEvent = hExit;
206 InitializeCriticalSection(&ioMan->manLock);
207 ioMan->workQueue = wq;
208 ioMan->numWorkers = 0;
209 ioMan->workersIdle = 0;
210 ioMan->queueSize = 0;
211 ioMan->requestID = 1;
217 * Function: depositWorkItem()
219 * Local function which deposits a WorkItem onto a work queue,
220 * deciding in the process whether or not the thread pool needs
221 * to be augmented with another thread to handle the new request.
226 depositWorkItem( unsigned int reqID,
229 EnterCriticalSection(&ioMan->manLock);
232 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
235 /* A new worker thread is created when there are fewer idle threads
236 * than non-consumed queue requests. This ensures that requests will
237 * be dealt with in a timely manner.
239 * [Long explanation of why the previous thread pool policy lead to
242 * Previously, the thread pool was augmented iff no idle worker threads
243 * were available. That strategy runs the risk of repeatedly adding to
244 * the request queue without expanding the thread pool to handle this
245 * sudden spike in queued requests.
246 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
247 * thread is created and the request is simply queued. If addIORequest()
248 * is called again _before the OS schedules a worker thread to pull the
249 * request off the queue_, workersIdle is still 1 and another request is
250 * simply added to the queue. Once the worker thread is run, only one
251 * request is de-queued, leaving the 2nd request in the queue]
253 * Assuming none of the queued requests take an inordinate amount of to
254 * complete, the request queue would eventually be drained. But if that's
255 * not the case, the later requests will end up languishing in the queue
256 * indefinitely. The non-timely handling of requests may cause CH applications
257 * to misbehave / hang; bad.
261 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
262 /* see if giving up our quantum ferrets out some idle threads.
264 LeaveCriticalSection(&ioMan->manLock);
266 EnterCriticalSection(&ioMan->manLock);
267 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
268 /* No, go ahead and create another. */
270 LeaveCriticalSection(&ioMan->manLock);
271 NewIOWorkerThread(ioMan);
273 LeaveCriticalSection(&ioMan->manLock);
276 LeaveCriticalSection(&ioMan->manLock);
279 if (SubmitWork(ioMan->workQueue,wItem)) {
280 /* Note: the work item has potentially been consumed by a worker thread
281 * (and freed) at this point, so we cannot use wItem's requestID.
290 * Function: AddIORequest()
292 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
293 * request to work queue, deciding whether or not to augment
294 * the thread pool in the process.
297 AddIORequest ( int fd,
302 CompletionProc onCompletion)
304 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
305 unsigned int reqID = ioMan->requestID++;
306 if (!ioMan || !wItem) return 0;
308 /* Fill in the blanks */
309 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
310 ( forWriting ? WORKER_WRITE : WORKER_READ );
311 wItem->workData.ioData.fd = fd;
312 wItem->workData.ioData.len = len;
313 wItem->workData.ioData.buf = buffer;
315 wItem->onCompletion = onCompletion;
316 wItem->requestID = reqID;
318 return depositWorkItem(reqID, wItem);
322 * Function: AddDelayRequest()
324 * Like AddIORequest(), but this time adding a delay request to
328 AddDelayRequest ( unsigned int msecs,
329 CompletionProc onCompletion)
331 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
332 unsigned int reqID = ioMan->requestID++;
333 if (!ioMan || !wItem) return FALSE;
335 /* Fill in the blanks */
336 wItem->workKind = WORKER_DELAY;
337 wItem->workData.delayData.msecs = msecs;
338 wItem->onCompletion = onCompletion;
339 wItem->requestID = reqID;
341 return depositWorkItem(reqID, wItem);
345 * Function: AddProcRequest()
347 * Add an asynchronous procedure request.
350 AddProcRequest ( void* proc,
352 CompletionProc onCompletion)
354 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
355 unsigned int reqID = ioMan->requestID++;
356 if (!ioMan || !wItem) return FALSE;
358 /* Fill in the blanks */
359 wItem->workKind = WORKER_DO_PROC;
360 wItem->workData.procData.proc = proc;
361 wItem->workData.procData.param = param;
362 wItem->onCompletion = onCompletion;
363 wItem->requestID = reqID;
365 return depositWorkItem(reqID, wItem);
368 void ShutdownIOManager ( void )
370 SetEvent(ioMan->hExitEvent);
371 // ToDo: we can't free this now, because the worker thread(s)
372 // haven't necessarily finished with it yet. Perhaps it should
373 // have a reference count or something.