3 * Non-blocking / asynchronous I/O for Win32.
10 #include "ConsoleHandler.h"
18 * Internal state maintained by the IO manager.
20 typedef struct IOManagerState {
27 unsigned int requestID;
28 /* fields for keeping track of active WorkItems */
29 CritSection active_work_lock;
30 WorkItem* active_work_items;
33 /* ToDo: wrap up this state via a IOManager handle instead? */
34 static IOManagerState* ioMan;
36 static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
37 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
40 * The routine executed by each worker thread.
45 IOWorkerProc(PVOID param)
49 IOManagerState* iom = (IOManagerState*)param;
50 WorkQueue* pq = iom->workQueue;
56 hWaits[0] = (HANDLE)iom->hExitEvent;
57 hWaits[1] = GetWorkQueueHandle(pq);
60 /* The error code is communicated back on completion of request; reset. */
63 EnterCriticalSection(&iom->manLock);
64 /* Signal that the worker is idle.
66 * 'workersIdle' is used when determining whether or not to
67 * increase the worker thread pool when adding a new request.
68 * (see addIORequest().)
71 LeaveCriticalSection(&iom->manLock);
74 * A possible future refinement is to make long-term idle threads
75 * wake up and decide to shut down should the number of idle threads
76 * be above some threshold.
79 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
81 if (rc == WAIT_OBJECT_0) {
82 // we received the exit event
86 EnterCriticalSection(&iom->manLock);
87 /* Signal that the thread is 'non-idle' and about to consume
92 LeaveCriticalSection(&iom->manLock);
94 if ( rc == (WAIT_OBJECT_0 + 1) ) {
95 /* work item available, fetch it. */
96 if (FetchWork(pq,(void**)&work)) {
98 RegisterWorkItem(iom,work);
99 if ( work->workKind & WORKER_READ ) {
100 if ( work->workKind & WORKER_FOR_SOCKET ) {
101 len = recv(work->workData.ioData.fd,
102 work->workData.ioData.buf,
103 work->workData.ioData.len,
105 if (len == SOCKET_ERROR) {
106 errCode = WSAGetLastError();
110 /* Do the read(), with extra-special handling for Ctrl+C */
111 len = read(work->workData.ioData.fd,
112 work->workData.ioData.buf,
113 work->workData.ioData.len);
114 if ( len == 0 && work->workData.ioData.len != 0 ) {
115 /* Given the following scenario:
116 * - a console handler has been registered that handles Ctrl+C
118 * - we've not tweaked the 'console mode' settings to turn on
119 * ENABLE_PROCESSED_INPUT.
120 * - we're blocked waiting on input from standard input.
121 * - the user hits Ctrl+C.
123 * The OS will invoke the console handler (in a separate OS thread),
124 * and the above read() (i.e., under the hood, a ReadFile() op) returns
125 * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
126 * want to percolate this error condition back to the Haskell user.
127 * Do this by waiting for the completion of the Haskell console handler.
128 * If upon completion of the console handler routine, the Haskell thread
129 * that issued the request is found to have been thrown an exception,
130 * the worker abandons the request (since that's what the Haskell thread
131 * has done.) If the Haskell thread hasn't been interrupted, the worker
132 * retries the read request as if nothing happened.
134 if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
135 /* For now, only abort when dealing with the standard input handle.
136 * i.e., for all others, an error is raised.
138 HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
139 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
140 if (rts_waitConsoleHandlerCompletion()) {
141 /* If the Scheduler has set work->abandonOp, the Haskell thread has
142 * been thrown an exception (=> the worker must abandon this request.)
143 * We test for this below before invoking the on-completion routine.
145 if (work->abandonOp) {
152 break; /* Treat it like an error */
161 if (len == -1) { errCode = errno; }
163 complData = work->workData.ioData.buf;
164 fd = work->workData.ioData.fd;
165 } else if ( work->workKind & WORKER_WRITE ) {
166 if ( work->workKind & WORKER_FOR_SOCKET ) {
167 len = send(work->workData.ioData.fd,
168 work->workData.ioData.buf,
169 work->workData.ioData.len,
171 if (len == SOCKET_ERROR) {
172 errCode = WSAGetLastError();
175 len = write(work->workData.ioData.fd,
176 work->workData.ioData.buf,
177 work->workData.ioData.len);
178 if (len == -1) { errCode = errno; }
180 complData = work->workData.ioData.buf;
181 fd = work->workData.ioData.fd;
182 } else if ( work->workKind & WORKER_DELAY ) {
183 /* Approximate implementation of threadDelay;
185 * Note: Sleep() is in milliseconds, not micros.
187 Sleep(work->workData.delayData.msecs / 1000);
188 len = work->workData.delayData.msecs;
192 } else if ( work->workKind & WORKER_DO_PROC ) {
193 /* perform operation/proc on behalf of Haskell thread. */
194 if (work->workData.procData.proc) {
195 /* The procedure is assumed to encode result + success/failure
198 errCode=work->workData.procData.proc(work->workData.procData.param);
202 complData = work->workData.procData.param;
204 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
208 if (!work->abandonOp) {
209 work->onCompletion(work->requestID,
215 /* Free the WorkItem */
216 DeregisterWorkItem(iom,work);
219 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
223 fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
232 NewIOWorkerThread(IOManagerState* iom)
235 return ( 0 != _beginthreadex(NULL,
250 if ( !wq ) return FALSE;
252 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
259 /* A manual-reset event */
260 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
267 ioMan->hExitEvent = hExit;
268 InitializeCriticalSection(&ioMan->manLock);
269 ioMan->workQueue = wq;
270 ioMan->numWorkers = 0;
271 ioMan->workersIdle = 0;
272 ioMan->queueSize = 0;
273 ioMan->requestID = 1;
274 InitializeCriticalSection(&ioMan->active_work_lock);
275 ioMan->active_work_items = NULL;
281 * Function: depositWorkItem()
283 * Local function which deposits a WorkItem onto a work queue,
284 * deciding in the process whether or not the thread pool needs
285 * to be augmented with another thread to handle the new request.
290 depositWorkItem( unsigned int reqID,
293 EnterCriticalSection(&ioMan->manLock);
296 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
299 /* A new worker thread is created when there are fewer idle threads
300 * than non-consumed queue requests. This ensures that requests will
301 * be dealt with in a timely manner.
303 * [Long explanation of why the previous thread pool policy lead to
306 * Previously, the thread pool was augmented iff no idle worker threads
307 * were available. That strategy runs the risk of repeatedly adding to
308 * the request queue without expanding the thread pool to handle this
309 * sudden spike in queued requests.
310 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
311 * thread is created and the request is simply queued. If addIORequest()
312 * is called again _before the OS schedules a worker thread to pull the
313 * request off the queue_, workersIdle is still 1 and another request is
314 * simply added to the queue. Once the worker thread is run, only one
315 * request is de-queued, leaving the 2nd request in the queue]
317 * Assuming none of the queued requests take an inordinate amount of to
318 * complete, the request queue would eventually be drained. But if that's
319 * not the case, the later requests will end up languishing in the queue
320 * indefinitely. The non-timely handling of requests may cause CH applications
321 * to misbehave / hang; bad.
325 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
326 /* see if giving up our quantum ferrets out some idle threads.
328 LeaveCriticalSection(&ioMan->manLock);
330 EnterCriticalSection(&ioMan->manLock);
331 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
332 /* No, go ahead and create another. */
334 LeaveCriticalSection(&ioMan->manLock);
335 NewIOWorkerThread(ioMan);
337 LeaveCriticalSection(&ioMan->manLock);
340 LeaveCriticalSection(&ioMan->manLock);
343 if (SubmitWork(ioMan->workQueue,wItem)) {
344 /* Note: the work item has potentially been consumed by a worker thread
345 * (and freed) at this point, so we cannot use wItem's requestID.
354 * Function: AddIORequest()
356 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
357 * request to work queue, deciding whether or not to augment
358 * the thread pool in the process.
361 AddIORequest ( int fd,
366 CompletionProc onCompletion)
368 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
369 unsigned int reqID = ioMan->requestID++;
370 if (!ioMan || !wItem) return 0;
372 /* Fill in the blanks */
373 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
374 ( forWriting ? WORKER_WRITE : WORKER_READ );
375 wItem->workData.ioData.fd = fd;
376 wItem->workData.ioData.len = len;
377 wItem->workData.ioData.buf = buffer;
380 wItem->onCompletion = onCompletion;
381 wItem->requestID = reqID;
383 return depositWorkItem(reqID, wItem);
387 * Function: AddDelayRequest()
389 * Like AddIORequest(), but this time adding a delay request to
393 AddDelayRequest ( unsigned int msecs,
394 CompletionProc onCompletion)
396 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
397 unsigned int reqID = ioMan->requestID++;
398 if (!ioMan || !wItem) return FALSE;
400 /* Fill in the blanks */
401 wItem->workKind = WORKER_DELAY;
402 wItem->workData.delayData.msecs = msecs;
403 wItem->onCompletion = onCompletion;
404 wItem->requestID = reqID;
407 return depositWorkItem(reqID, wItem);
411 * Function: AddProcRequest()
413 * Add an asynchronous procedure request.
416 AddProcRequest ( void* proc,
418 CompletionProc onCompletion)
420 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
421 unsigned int reqID = ioMan->requestID++;
422 if (!ioMan || !wItem) return FALSE;
424 /* Fill in the blanks */
425 wItem->workKind = WORKER_DO_PROC;
426 wItem->workData.procData.proc = proc;
427 wItem->workData.procData.param = param;
428 wItem->onCompletion = onCompletion;
429 wItem->requestID = reqID;
430 wItem->abandonOp = 0;
433 return depositWorkItem(reqID, wItem);
436 void ShutdownIOManager ( void )
438 SetEvent(ioMan->hExitEvent);
439 // ToDo: we can't free this now, because the worker thread(s)
440 // haven't necessarily finished with it yet. Perhaps it should
441 // have a reference count or something.
446 /* Keep track of WorkItems currently being serviced. */
449 RegisterWorkItem(IOManagerState* ioMan,
452 EnterCriticalSection(&ioMan->active_work_lock);
453 wi->link = ioMan->active_work_items;
454 ioMan->active_work_items = wi;
455 LeaveCriticalSection(&ioMan->active_work_lock);
460 DeregisterWorkItem(IOManagerState* ioMan,
463 WorkItem *ptr, *prev;
465 EnterCriticalSection(&ioMan->active_work_lock);
466 for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
467 if (wi->requestID == ptr->requestID) {
469 ioMan->active_work_items = ptr->link;
471 prev->link = ptr->link;
473 LeaveCriticalSection(&ioMan->active_work_lock);
477 fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
478 LeaveCriticalSection(&ioMan->active_work_lock);
483 * Function: abandonWorkRequest()
485 * Signal that a work request isn't of interest. Called by the Scheduler
486 * if a blocked Haskell thread has an exception thrown to it.
488 * Note: we're not aborting the system call that a worker might be blocked on
489 * here, just disabling the propagation of its result once its finished. We
490 * may have to go the whole hog here and switch to overlapped I/O so that we
491 * can abort blocked system calls.
494 abandonWorkRequest ( int reqID )
497 EnterCriticalSection(&ioMan->active_work_lock);
498 for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
499 if (ptr->requestID == (unsigned int)reqID ) {
501 LeaveCriticalSection(&ioMan->active_work_lock);
505 /* Note: if the request ID isn't present, the worker will have
506 * finished sometime since awaitRequests() last drained the completed
507 * request table; i.e., not an error.
509 LeaveCriticalSection(&ioMan->active_work_lock);