3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
25 unsigned int requestID;
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
32 * The routine executed by each worker thread.
37 IOWorkerProc(PVOID param)
41 IOManagerState* iom = (IOManagerState*)param;
42 WorkQueue* pq = iom->workQueue;
48 hWaits[0] = (HANDLE)iom->hExitEvent;
49 hWaits[1] = GetWorkQueueHandle(pq);
52 /* The error code is communicated back on completion of request; reset. */
55 EnterCriticalSection(&iom->manLock);
56 /* Signal that the worker is idle.
58 * 'workersIdle' is used when determining whether or not to
59 * increase the worker thread pool when adding a new request.
60 * (see addIORequest().)
63 LeaveCriticalSection(&iom->manLock);
66 * A possible future refinement is to make long-term idle threads
67 * wake up and decide to shut down should the number of idle threads
68 * be above some threshold.
71 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
73 if (rc == WAIT_OBJECT_0) {
74 // we received the exit event
78 EnterCriticalSection(&iom->manLock);
79 /* Signal that the thread is 'non-idle' and about to consume
84 LeaveCriticalSection(&iom->manLock);
86 if ( rc == (WAIT_OBJECT_0 + 1) ) {
87 /* work item available, fetch it. */
88 if (FetchWork(pq,(void**)&work)) {
89 if ( work->workKind & WORKER_READ ) {
90 if ( work->workKind & WORKER_FOR_SOCKET ) {
91 len = recv(work->workData.ioData.fd,
92 work->workData.ioData.buf,
93 work->workData.ioData.len,
95 if (len == SOCKET_ERROR) {
96 errCode = WSAGetLastError();
102 /* Do the read(), with extra-special handling for Ctrl+C */
103 len = read(work->workData.ioData.fd,
104 work->workData.ioData.buf,
105 work->workData.ioData.len);
107 if ( len == 0 && work->workData.ioData.len != 0 ) {
108 /* Given the following scenario:
109 * - a console handler has been registered that handles Ctrl+C
111 * - we've not tweaked the 'console mode' settings to turn on
112 * ENABLE_PROCESSED_INPUT.
113 * - we're blocked waiting on input from standard input.
114 * - the user hits Ctrl+C.
116 * The OS will invoke the console handler (in a separate OS thread),
117 * and the above read() (i.e., under the hood, a ReadFile() op) returns
118 * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
119 * want to percolate this non-EOF condition too far back up, but ignore
122 * However, we do want to give the RTS an opportunity to deliver the
123 * console event. Take care of this in the low-level console handler
124 * in ConsoleHandler.c which wakes up the RTS thread that's blocked
125 * waiting for I/O results from this worker (and possibly others).
126 * It won't see any I/O, but notices and dispatches the queued up
127 * signals/console events while in the Scheduler.
129 * The original, and way hackier scheme, was to have the worker
130 * return a special return code representing aborted-due-to-ctrl-C-on-stdin,
131 * which GHC.Conc.asyncRead would look out for and retry the I/O
132 * call if encountered.
134 if ( dw == ERROR_OPERATION_ABORTED ) {
135 /* Only do the retry when dealing with the standard input handle. */
136 HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
137 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
149 if (len == -1) { errCode = errno; }
151 complData = work->workData.ioData.buf;
152 fd = work->workData.ioData.fd;
153 } else if ( work->workKind & WORKER_WRITE ) {
154 if ( work->workKind & WORKER_FOR_SOCKET ) {
155 len = send(work->workData.ioData.fd,
156 work->workData.ioData.buf,
157 work->workData.ioData.len,
159 if (len == SOCKET_ERROR) {
160 errCode = WSAGetLastError();
163 len = write(work->workData.ioData.fd,
164 work->workData.ioData.buf,
165 work->workData.ioData.len);
166 if (len == -1) { errCode = errno; }
168 complData = work->workData.ioData.buf;
169 fd = work->workData.ioData.fd;
170 } else if ( work->workKind & WORKER_DELAY ) {
171 /* Approximate implementation of threadDelay;
173 * Note: Sleep() is in milliseconds, not micros.
175 Sleep(work->workData.delayData.msecs / 1000);
176 len = work->workData.delayData.msecs;
180 } else if ( work->workKind & WORKER_DO_PROC ) {
181 /* perform operation/proc on behalf of Haskell thread. */
182 if (work->workData.procData.proc) {
183 /* The procedure is assumed to encode result + success/failure
186 errCode=work->workData.procData.proc(work->workData.procData.param);
190 complData = work->workData.procData.param;
192 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
196 work->onCompletion(work->requestID,
201 /* Free the WorkItem */
204 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
208 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
217 NewIOWorkerThread(IOManagerState* iom)
220 return ( 0 != _beginthreadex(NULL,
235 if ( !wq ) return FALSE;
237 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
244 /* A manual-reset event */
245 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
252 ioMan->hExitEvent = hExit;
253 InitializeCriticalSection(&ioMan->manLock);
254 ioMan->workQueue = wq;
255 ioMan->numWorkers = 0;
256 ioMan->workersIdle = 0;
257 ioMan->queueSize = 0;
258 ioMan->requestID = 1;
264 * Function: depositWorkItem()
266 * Local function which deposits a WorkItem onto a work queue,
267 * deciding in the process whether or not the thread pool needs
268 * to be augmented with another thread to handle the new request.
273 depositWorkItem( unsigned int reqID,
276 EnterCriticalSection(&ioMan->manLock);
279 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
282 /* A new worker thread is created when there are fewer idle threads
283 * than non-consumed queue requests. This ensures that requests will
284 * be dealt with in a timely manner.
286 * [Long explanation of why the previous thread pool policy lead to
289 * Previously, the thread pool was augmented iff no idle worker threads
290 * were available. That strategy runs the risk of repeatedly adding to
291 * the request queue without expanding the thread pool to handle this
292 * sudden spike in queued requests.
293 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
294 * thread is created and the request is simply queued. If addIORequest()
295 * is called again _before the OS schedules a worker thread to pull the
296 * request off the queue_, workersIdle is still 1 and another request is
297 * simply added to the queue. Once the worker thread is run, only one
298 * request is de-queued, leaving the 2nd request in the queue]
300 * Assuming none of the queued requests take an inordinate amount of to
301 * complete, the request queue would eventually be drained. But if that's
302 * not the case, the later requests will end up languishing in the queue
303 * indefinitely. The non-timely handling of requests may cause CH applications
304 * to misbehave / hang; bad.
308 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
309 /* see if giving up our quantum ferrets out some idle threads.
311 LeaveCriticalSection(&ioMan->manLock);
313 EnterCriticalSection(&ioMan->manLock);
314 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
315 /* No, go ahead and create another. */
317 LeaveCriticalSection(&ioMan->manLock);
318 NewIOWorkerThread(ioMan);
320 LeaveCriticalSection(&ioMan->manLock);
323 LeaveCriticalSection(&ioMan->manLock);
326 if (SubmitWork(ioMan->workQueue,wItem)) {
327 /* Note: the work item has potentially been consumed by a worker thread
328 * (and freed) at this point, so we cannot use wItem's requestID.
337 * Function: AddIORequest()
339 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
340 * request to work queue, deciding whether or not to augment
341 * the thread pool in the process.
344 AddIORequest ( int fd,
349 CompletionProc onCompletion)
351 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
352 unsigned int reqID = ioMan->requestID++;
353 if (!ioMan || !wItem) return 0;
355 /* Fill in the blanks */
356 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
357 ( forWriting ? WORKER_WRITE : WORKER_READ );
358 wItem->workData.ioData.fd = fd;
359 wItem->workData.ioData.len = len;
360 wItem->workData.ioData.buf = buffer;
362 wItem->onCompletion = onCompletion;
363 wItem->requestID = reqID;
365 return depositWorkItem(reqID, wItem);
369 * Function: AddDelayRequest()
371 * Like AddIORequest(), but this time adding a delay request to
375 AddDelayRequest ( unsigned int msecs,
376 CompletionProc onCompletion)
378 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
379 unsigned int reqID = ioMan->requestID++;
380 if (!ioMan || !wItem) return FALSE;
382 /* Fill in the blanks */
383 wItem->workKind = WORKER_DELAY;
384 wItem->workData.delayData.msecs = msecs;
385 wItem->onCompletion = onCompletion;
386 wItem->requestID = reqID;
388 return depositWorkItem(reqID, wItem);
392 * Function: AddProcRequest()
394 * Add an asynchronous procedure request.
397 AddProcRequest ( void* proc,
399 CompletionProc onCompletion)
401 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
402 unsigned int reqID = ioMan->requestID++;
403 if (!ioMan || !wItem) return FALSE;
405 /* Fill in the blanks */
406 wItem->workKind = WORKER_DO_PROC;
407 wItem->workData.procData.proc = proc;
408 wItem->workData.procData.param = param;
409 wItem->onCompletion = onCompletion;
410 wItem->requestID = reqID;
412 return depositWorkItem(reqID, wItem);
415 void ShutdownIOManager ( void )
417 SetEvent(ioMan->hExitEvent);
418 // ToDo: we can't free this now, because the worker thread(s)
419 // haven't necessarily finished with it yet. Perhaps it should
420 // have a reference count or something.