3 * Non-blocking / asynchronous I/O for Win32.
8 #if !defined(THREADED_RTS)
11 #include "IOManager.h"
12 #include "WorkQueue.h"
13 #include "ConsoleHandler.h"
22 * Internal state maintained by the IO manager.
24 typedef struct IOManagerState {
31 unsigned int requestID;
32 /* fields for keeping track of active WorkItems */
33 CritSection active_work_lock;
34 WorkItem* active_work_items;
37 /* ToDo: wrap up this state via a IOManager handle instead? */
38 static IOManagerState* ioMan;
40 static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
41 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
44 * The routine executed by each worker thread.
49 IOWorkerProc(PVOID param)
53 IOManagerState* iom = (IOManagerState*)param;
54 WorkQueue* pq = iom->workQueue;
60 hWaits[0] = (HANDLE)iom->hExitEvent;
61 hWaits[1] = GetWorkQueueHandle(pq);
64 /* The error code is communicated back on completion of request; reset. */
67 EnterCriticalSection(&iom->manLock);
68 /* Signal that the worker is idle.
70 * 'workersIdle' is used when determining whether or not to
71 * increase the worker thread pool when adding a new request.
72 * (see addIORequest().)
75 LeaveCriticalSection(&iom->manLock);
78 * A possible future refinement is to make long-term idle threads
79 * wake up and decide to shut down should the number of idle threads
80 * be above some threshold.
83 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
85 if (rc == WAIT_OBJECT_0) {
86 // we received the exit event
87 EnterCriticalSection(&iom->manLock);
89 LeaveCriticalSection(&iom->manLock);
93 EnterCriticalSection(&iom->manLock);
94 /* Signal that the thread is 'non-idle' and about to consume
99 LeaveCriticalSection(&iom->manLock);
101 if ( rc == (WAIT_OBJECT_0 + 1) ) {
102 /* work item available, fetch it. */
103 if (FetchWork(pq,(void**)&work)) {
105 RegisterWorkItem(iom,work);
106 if ( work->workKind & WORKER_READ ) {
107 if ( work->workKind & WORKER_FOR_SOCKET ) {
108 len = recv(work->workData.ioData.fd,
109 work->workData.ioData.buf,
110 work->workData.ioData.len,
112 if (len == SOCKET_ERROR) {
113 errCode = WSAGetLastError();
117 /* Do the read(), with extra-special handling for Ctrl+C */
118 len = read(work->workData.ioData.fd,
119 work->workData.ioData.buf,
120 work->workData.ioData.len);
121 if ( len == 0 && work->workData.ioData.len != 0 ) {
122 /* Given the following scenario:
123 * - a console handler has been registered that handles Ctrl+C
125 * - we've not tweaked the 'console mode' settings to turn on
126 * ENABLE_PROCESSED_INPUT.
127 * - we're blocked waiting on input from standard input.
128 * - the user hits Ctrl+C.
130 * The OS will invoke the console handler (in a separate OS thread),
131 * and the above read() (i.e., under the hood, a ReadFile() op) returns
132 * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
133 * want to percolate this error condition back to the Haskell user.
134 * Do this by waiting for the completion of the Haskell console handler.
135 * If upon completion of the console handler routine, the Haskell thread
136 * that issued the request is found to have been thrown an exception,
137 * the worker abandons the request (since that's what the Haskell thread
138 * has done.) If the Haskell thread hasn't been interrupted, the worker
139 * retries the read request as if nothing happened.
141 if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
142 /* For now, only abort when dealing with the standard input handle.
143 * i.e., for all others, an error is raised.
145 HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
146 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
147 if (rts_waitConsoleHandlerCompletion()) {
148 /* If the Scheduler has set work->abandonOp, the Haskell thread has
149 * been thrown an exception (=> the worker must abandon this request.)
150 * We test for this below before invoking the on-completion routine.
152 if (work->abandonOp) {
159 break; /* Treat it like an error */
168 if (len == -1) { errCode = errno; }
170 complData = work->workData.ioData.buf;
171 fd = work->workData.ioData.fd;
172 } else if ( work->workKind & WORKER_WRITE ) {
173 if ( work->workKind & WORKER_FOR_SOCKET ) {
174 len = send(work->workData.ioData.fd,
175 work->workData.ioData.buf,
176 work->workData.ioData.len,
178 if (len == SOCKET_ERROR) {
179 errCode = WSAGetLastError();
182 len = write(work->workData.ioData.fd,
183 work->workData.ioData.buf,
184 work->workData.ioData.len);
187 // write() gets errno wrong for
188 // ERROR_NO_DATA, we have to fix it here:
189 if (errCode == EINVAL &&
190 GetLastError() == ERROR_NO_DATA) {
195 complData = work->workData.ioData.buf;
196 fd = work->workData.ioData.fd;
197 } else if ( work->workKind & WORKER_DELAY ) {
198 /* Approximate implementation of threadDelay;
200 * Note: Sleep() is in milliseconds, not micros.
202 Sleep((work->workData.delayData.msecs + 999) / 1000);
203 len = work->workData.delayData.msecs;
207 } else if ( work->workKind & WORKER_DO_PROC ) {
208 /* perform operation/proc on behalf of Haskell thread. */
209 if (work->workData.procData.proc) {
210 /* The procedure is assumed to encode result + success/failure
213 errCode=work->workData.procData.proc(work->workData.procData.param);
217 complData = work->workData.procData.param;
219 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
223 if (!work->abandonOp) {
224 work->onCompletion(work->requestID,
230 /* Free the WorkItem */
231 DeregisterWorkItem(iom,work);
234 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
235 EnterCriticalSection(&iom->manLock);
237 LeaveCriticalSection(&iom->manLock);
241 fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
242 EnterCriticalSection(&iom->manLock);
244 LeaveCriticalSection(&iom->manLock);
253 NewIOWorkerThread(IOManagerState* iom)
256 return ( 0 != _beginthreadex(NULL,
271 if ( !wq ) return FALSE;
273 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
280 /* A manual-reset event */
281 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
288 ioMan->hExitEvent = hExit;
289 InitializeCriticalSection(&ioMan->manLock);
290 ioMan->workQueue = wq;
291 ioMan->numWorkers = 0;
292 ioMan->workersIdle = 0;
293 ioMan->queueSize = 0;
294 ioMan->requestID = 1;
295 InitializeCriticalSection(&ioMan->active_work_lock);
296 ioMan->active_work_items = NULL;
302 * Function: depositWorkItem()
304 * Local function which deposits a WorkItem onto a work queue,
305 * deciding in the process whether or not the thread pool needs
306 * to be augmented with another thread to handle the new request.
311 depositWorkItem( unsigned int reqID,
314 EnterCriticalSection(&ioMan->manLock);
317 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
320 /* A new worker thread is created when there are fewer idle threads
321 * than non-consumed queue requests. This ensures that requests will
322 * be dealt with in a timely manner.
324 * [Long explanation of why the previous thread pool policy lead to
327 * Previously, the thread pool was augmented iff no idle worker threads
328 * were available. That strategy runs the risk of repeatedly adding to
329 * the request queue without expanding the thread pool to handle this
330 * sudden spike in queued requests.
331 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
332 * thread is created and the request is simply queued. If addIORequest()
333 * is called again _before the OS schedules a worker thread to pull the
334 * request off the queue_, workersIdle is still 1 and another request is
335 * simply added to the queue. Once the worker thread is run, only one
336 * request is de-queued, leaving the 2nd request in the queue]
338 * Assuming none of the queued requests take an inordinate amount of to
339 * complete, the request queue would eventually be drained. But if that's
340 * not the case, the later requests will end up languishing in the queue
341 * indefinitely. The non-timely handling of requests may cause CH applications
342 * to misbehave / hang; bad.
346 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
347 /* see if giving up our quantum ferrets out some idle threads.
349 LeaveCriticalSection(&ioMan->manLock);
351 EnterCriticalSection(&ioMan->manLock);
352 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
353 /* No, go ahead and create another. */
355 if (!NewIOWorkerThread(ioMan)) {
360 LeaveCriticalSection(&ioMan->manLock);
362 if (SubmitWork(ioMan->workQueue,wItem)) {
363 /* Note: the work item has potentially been consumed by a worker thread
364 * (and freed) at this point, so we cannot use wItem's requestID.
373 * Function: AddIORequest()
375 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
376 * request to work queue, deciding whether or not to augment
377 * the thread pool in the process.
380 AddIORequest ( int fd,
385 CompletionProc onCompletion)
387 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
388 unsigned int reqID = ioMan->requestID++;
389 if (!ioMan || !wItem) return 0;
391 /* Fill in the blanks */
392 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
393 ( forWriting ? WORKER_WRITE : WORKER_READ );
394 wItem->workData.ioData.fd = fd;
395 wItem->workData.ioData.len = len;
396 wItem->workData.ioData.buf = buffer;
399 wItem->onCompletion = onCompletion;
400 wItem->requestID = reqID;
402 return depositWorkItem(reqID, wItem);
406 * Function: AddDelayRequest()
408 * Like AddIORequest(), but this time adding a delay request to
412 AddDelayRequest ( unsigned int msecs,
413 CompletionProc onCompletion)
415 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
416 unsigned int reqID = ioMan->requestID++;
417 if (!ioMan || !wItem) return FALSE;
419 /* Fill in the blanks */
420 wItem->workKind = WORKER_DELAY;
421 wItem->workData.delayData.msecs = msecs;
422 wItem->onCompletion = onCompletion;
423 wItem->requestID = reqID;
426 return depositWorkItem(reqID, wItem);
430 * Function: AddProcRequest()
432 * Add an asynchronous procedure request.
435 AddProcRequest ( void* proc,
437 CompletionProc onCompletion)
439 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
440 unsigned int reqID = ioMan->requestID++;
441 if (!ioMan || !wItem) return FALSE;
443 /* Fill in the blanks */
444 wItem->workKind = WORKER_DO_PROC;
445 wItem->workData.procData.proc = proc;
446 wItem->workData.procData.param = param;
447 wItem->onCompletion = onCompletion;
448 wItem->requestID = reqID;
449 wItem->abandonOp = 0;
452 return depositWorkItem(reqID, wItem);
455 void ShutdownIOManager ( rtsBool wait_threads )
459 SetEvent(ioMan->hExitEvent);
462 /* Wait for all worker threads to die. */
464 EnterCriticalSection(&ioMan->manLock);
465 num = ioMan->numWorkers;
466 LeaveCriticalSection(&ioMan->manLock);
471 FreeWorkQueue(ioMan->workQueue);
472 CloseHandle(ioMan->hExitEvent);
473 DeleteCriticalSection(&ioMan->active_work_lock);
474 DeleteCriticalSection(&ioMan->manLock);
480 /* Keep track of WorkItems currently being serviced. */
483 RegisterWorkItem(IOManagerState* ioMan,
486 EnterCriticalSection(&ioMan->active_work_lock);
487 wi->link = ioMan->active_work_items;
488 ioMan->active_work_items = wi;
489 LeaveCriticalSection(&ioMan->active_work_lock);
494 DeregisterWorkItem(IOManagerState* ioMan,
497 WorkItem *ptr, *prev;
499 EnterCriticalSection(&ioMan->active_work_lock);
500 for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
501 if (wi->requestID == ptr->requestID) {
503 ioMan->active_work_items = ptr->link;
505 prev->link = ptr->link;
507 LeaveCriticalSection(&ioMan->active_work_lock);
511 fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
512 LeaveCriticalSection(&ioMan->active_work_lock);
517 * Function: abandonWorkRequest()
519 * Signal that a work request isn't of interest. Called by the Scheduler
520 * if a blocked Haskell thread has an exception thrown to it.
522 * Note: we're not aborting the system call that a worker might be blocked on
523 * here, just disabling the propagation of its result once its finished. We
524 * may have to go the whole hog here and switch to overlapped I/O so that we
525 * can abort blocked system calls.
528 abandonWorkRequest ( int reqID )
531 EnterCriticalSection(&ioMan->active_work_lock);
532 for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
533 if (ptr->requestID == (unsigned int)reqID ) {
535 LeaveCriticalSection(&ioMan->active_work_lock);
539 /* Note: if the request ID isn't present, the worker will have
540 * finished sometime since awaitRequests() last drained the completed
541 * request table; i.e., not an error.
543 LeaveCriticalSection(&ioMan->active_work_lock);