3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
25 unsigned int requestID;
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
32 * The routine executed by each worker thread.
37 IOWorkerProc(PVOID param)
41 IOManagerState* iom = (IOManagerState*)param;
42 WorkQueue* pq = iom->workQueue;
48 hWaits[0] = (HANDLE)iom->hExitEvent;
49 hWaits[1] = GetWorkQueueHandle(pq);
52 /* The error code is communicated back on completion of request; reset. */
55 EnterCriticalSection(&iom->manLock);
56 /* Signal that the worker is idle.
58 * 'workersIdle' is used when determining whether or not to
59 * increase the worker thread pool when adding a new request.
60 * (see addIORequest().)
63 LeaveCriticalSection(&iom->manLock);
66 * A possible future refinement is to make long-term idle threads
67 * wake up and decide to shut down should the number of idle threads
68 * be above some threshold.
71 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
73 if (rc == WAIT_OBJECT_0) {
74 // we received the exit event
78 EnterCriticalSection(&iom->manLock);
79 /* Signal that the thread is 'non-idle' and about to consume
84 LeaveCriticalSection(&iom->manLock);
86 if ( rc == (WAIT_OBJECT_0 + 1) ) {
87 /* work item available, fetch it. */
88 if (FetchWork(pq,(void**)&work)) {
89 if ( work->workKind & WORKER_READ ) {
90 if ( work->workKind & WORKER_FOR_SOCKET ) {
91 len = recv(work->workData.ioData.fd,
92 work->workData.ioData.buf,
93 work->workData.ioData.len,
95 if (len == SOCKET_ERROR) {
96 errCode = WSAGetLastError();
101 /* Do the read(), with extra-special handling for Ctrl+C */
102 len = read(work->workData.ioData.fd,
103 work->workData.ioData.buf,
104 work->workData.ioData.len);
105 if ( len == 0 && work->workData.ioData.len != 0 ) {
106 /* Given the following scenario:
107 * - a console handler has been registered that handles Ctrl+C
109 * - we've not tweaked the 'console mode' settings to turn on
110 * ENABLE_PROCESSED_INPUT.
111 * - we're blocked waiting on input from standard input.
112 * - the user hits Ctrl+C.
114 * The OS will invoke the console handler (in a separate OS thread),
115 * and the above read() (i.e., under the hood, a ReadFile() op) returns
116 * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
117 * want to percolate this non-EOF condition too far back up, but ignore
118 * it. However, we do want to give the RTS an opportunity to deliver the
121 * Hence, we set 'errorCode' to (-2), which we then look out for in
122 * GHC.Conc.asyncRead.
125 if ( dw == ERROR_OPERATION_ABORTED ) {
126 /* Only do the retry when dealing with the standard input handle. */
127 HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
128 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
133 if (len == -1) { errCode = errno; }
135 complData = work->workData.ioData.buf;
136 fd = work->workData.ioData.fd;
137 } else if ( work->workKind & WORKER_WRITE ) {
138 if ( work->workKind & WORKER_FOR_SOCKET ) {
139 len = send(work->workData.ioData.fd,
140 work->workData.ioData.buf,
141 work->workData.ioData.len,
143 if (len == SOCKET_ERROR) {
144 errCode = WSAGetLastError();
147 len = write(work->workData.ioData.fd,
148 work->workData.ioData.buf,
149 work->workData.ioData.len);
150 if (len == -1) { errCode = errno; }
152 complData = work->workData.ioData.buf;
153 fd = work->workData.ioData.fd;
154 } else if ( work->workKind & WORKER_DELAY ) {
155 /* Approximate implementation of threadDelay;
157 * Note: Sleep() is in milliseconds, not micros.
159 Sleep(work->workData.delayData.msecs / 1000);
160 len = work->workData.delayData.msecs;
164 } else if ( work->workKind & WORKER_DO_PROC ) {
165 /* perform operation/proc on behalf of Haskell thread. */
166 if (work->workData.procData.proc) {
167 /* The procedure is assumed to encode result + success/failure
170 errCode=work->workData.procData.proc(work->workData.procData.param);
174 complData = work->workData.procData.param;
176 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
180 work->onCompletion(work->requestID,
185 /* Free the WorkItem */
188 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
192 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
201 NewIOWorkerThread(IOManagerState* iom)
204 return ( 0 != _beginthreadex(NULL,
219 if ( !wq ) return FALSE;
221 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
228 /* A manual-reset event */
229 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
236 ioMan->hExitEvent = hExit;
237 InitializeCriticalSection(&ioMan->manLock);
238 ioMan->workQueue = wq;
239 ioMan->numWorkers = 0;
240 ioMan->workersIdle = 0;
241 ioMan->queueSize = 0;
242 ioMan->requestID = 1;
248 * Function: depositWorkItem()
250 * Local function which deposits a WorkItem onto a work queue,
251 * deciding in the process whether or not the thread pool needs
252 * to be augmented with another thread to handle the new request.
257 depositWorkItem( unsigned int reqID,
260 EnterCriticalSection(&ioMan->manLock);
263 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
266 /* A new worker thread is created when there are fewer idle threads
267 * than non-consumed queue requests. This ensures that requests will
268 * be dealt with in a timely manner.
270 * [Long explanation of why the previous thread pool policy lead to
273 * Previously, the thread pool was augmented iff no idle worker threads
274 * were available. That strategy runs the risk of repeatedly adding to
275 * the request queue without expanding the thread pool to handle this
276 * sudden spike in queued requests.
277 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
278 * thread is created and the request is simply queued. If addIORequest()
279 * is called again _before the OS schedules a worker thread to pull the
280 * request off the queue_, workersIdle is still 1 and another request is
281 * simply added to the queue. Once the worker thread is run, only one
282 * request is de-queued, leaving the 2nd request in the queue]
284 * Assuming none of the queued requests take an inordinate amount of to
285 * complete, the request queue would eventually be drained. But if that's
286 * not the case, the later requests will end up languishing in the queue
287 * indefinitely. The non-timely handling of requests may cause CH applications
288 * to misbehave / hang; bad.
292 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
293 /* see if giving up our quantum ferrets out some idle threads.
295 LeaveCriticalSection(&ioMan->manLock);
297 EnterCriticalSection(&ioMan->manLock);
298 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
299 /* No, go ahead and create another. */
301 LeaveCriticalSection(&ioMan->manLock);
302 NewIOWorkerThread(ioMan);
304 LeaveCriticalSection(&ioMan->manLock);
307 LeaveCriticalSection(&ioMan->manLock);
310 if (SubmitWork(ioMan->workQueue,wItem)) {
311 /* Note: the work item has potentially been consumed by a worker thread
312 * (and freed) at this point, so we cannot use wItem's requestID.
321 * Function: AddIORequest()
323 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
324 * request to work queue, deciding whether or not to augment
325 * the thread pool in the process.
328 AddIORequest ( int fd,
333 CompletionProc onCompletion)
335 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
336 unsigned int reqID = ioMan->requestID++;
337 if (!ioMan || !wItem) return 0;
339 /* Fill in the blanks */
340 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
341 ( forWriting ? WORKER_WRITE : WORKER_READ );
342 wItem->workData.ioData.fd = fd;
343 wItem->workData.ioData.len = len;
344 wItem->workData.ioData.buf = buffer;
346 wItem->onCompletion = onCompletion;
347 wItem->requestID = reqID;
349 return depositWorkItem(reqID, wItem);
353 * Function: AddDelayRequest()
355 * Like AddIORequest(), but this time adding a delay request to
359 AddDelayRequest ( unsigned int msecs,
360 CompletionProc onCompletion)
362 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
363 unsigned int reqID = ioMan->requestID++;
364 if (!ioMan || !wItem) return FALSE;
366 /* Fill in the blanks */
367 wItem->workKind = WORKER_DELAY;
368 wItem->workData.delayData.msecs = msecs;
369 wItem->onCompletion = onCompletion;
370 wItem->requestID = reqID;
372 return depositWorkItem(reqID, wItem);
376 * Function: AddProcRequest()
378 * Add an asynchronous procedure request.
381 AddProcRequest ( void* proc,
383 CompletionProc onCompletion)
385 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
386 unsigned int reqID = ioMan->requestID++;
387 if (!ioMan || !wItem) return FALSE;
389 /* Fill in the blanks */
390 wItem->workKind = WORKER_DO_PROC;
391 wItem->workData.procData.proc = proc;
392 wItem->workData.procData.param = param;
393 wItem->onCompletion = onCompletion;
394 wItem->requestID = reqID;
396 return depositWorkItem(reqID, wItem);
399 void ShutdownIOManager ( void )
401 SetEvent(ioMan->hExitEvent);
402 // ToDo: we can't free this now, because the worker thread(s)
403 // haven't necessarily finished with it yet. Perhaps it should
404 // have a reference count or something.