Free Win32 Handles on shutdown
[ghc-hetmet.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "IOManager.h"
9 #include "WorkQueue.h"
10 #include "ConsoleHandler.h"
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <io.h>
14 #include <winsock.h>
15 #include <process.h>
16
17 /*
18  * Internal state maintained by the IO manager.
19  */
20 typedef struct IOManagerState {
21     CritSection      manLock;
22     WorkQueue*       workQueue;
23     int              queueSize;
24     int              numWorkers;
25     int              workersIdle;
26     HANDLE           hExitEvent;
27     unsigned int     requestID;
28     /* fields for keeping track of active WorkItems */
29     CritSection      active_work_lock;
30     WorkItem*        active_work_items;
31 } IOManagerState;
32
33 /* ToDo: wrap up this state via a IOManager handle instead? */
34 static IOManagerState* ioMan;
35
36 static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
37 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
38
39 /*
40  * The routine executed by each worker thread.
41  */
42 static
43 unsigned
44 WINAPI
45 IOWorkerProc(PVOID param)
46 {
47     HANDLE  hWaits[2];
48     DWORD   rc;
49     IOManagerState* iom = (IOManagerState*)param;
50     WorkQueue* pq = iom->workQueue;
51     WorkItem*  work;
52     int        len = 0, fd = 0;
53     DWORD      errCode = 0;
54     void*      complData;
55
56     hWaits[0] = (HANDLE)iom->hExitEvent;
57     hWaits[1] = GetWorkQueueHandle(pq);
58   
59     while (1) {
60         /* The error code is communicated back on completion of request; reset. */
61         errCode = 0;
62         
63         EnterCriticalSection(&iom->manLock);
64         /* Signal that the worker is idle.
65          *
66          * 'workersIdle' is used when determining whether or not to
67          * increase the worker thread pool when adding a new request.
68          * (see addIORequest().)
69          */
70         iom->workersIdle++;
71         LeaveCriticalSection(&iom->manLock);
72         
73         /*
74          * A possible future refinement is to make long-term idle threads
75          * wake up and decide to shut down should the number of idle threads
76          * be above some threshold.
77          *
78          */
79         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
80
81         if (rc == WAIT_OBJECT_0) {
82             // we received the exit event
83             EnterCriticalSection(&iom->manLock);
84             ioMan->numWorkers--;
85             LeaveCriticalSection(&iom->manLock);
86             return 0;
87         }
88
89         EnterCriticalSection(&iom->manLock);
90         /* Signal that the thread is 'non-idle' and about to consume 
91          * a work item.
92          */
93         iom->workersIdle--;
94         iom->queueSize--;
95         LeaveCriticalSection(&iom->manLock);
96     
97         if ( rc == (WAIT_OBJECT_0 + 1) ) {
98             /* work item available, fetch it. */
99             if (FetchWork(pq,(void**)&work)) {
100                 work->abandonOp = 0;
101                 RegisterWorkItem(iom,work);
102                 if ( work->workKind & WORKER_READ ) {
103                     if ( work->workKind & WORKER_FOR_SOCKET ) {
104                         len = recv(work->workData.ioData.fd, 
105                                    work->workData.ioData.buf,
106                                    work->workData.ioData.len,
107                                    0);
108                         if (len == SOCKET_ERROR) {
109                             errCode = WSAGetLastError();
110                         }
111                     } else {
112                         while (1) {
113                         /* Do the read(), with extra-special handling for Ctrl+C */
114                         len = read(work->workData.ioData.fd,
115                                    work->workData.ioData.buf,
116                                    work->workData.ioData.len);
117                         if ( len == 0 && work->workData.ioData.len != 0 ) {
118                             /* Given the following scenario:
119                              *     - a console handler has been registered that handles Ctrl+C
120                              *       events.
121                              *     - we've not tweaked the 'console mode' settings to turn on
122                              *       ENABLE_PROCESSED_INPUT.
123                              *     - we're blocked waiting on input from standard input.
124                              *     - the user hits Ctrl+C.
125                              *
126                              * The OS will invoke the console handler (in a separate OS thread),
127                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
128                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
129                              * want to percolate this error condition back to the Haskell user.
130                              * Do this by waiting for the completion of the Haskell console handler.
131                              * If upon completion of the console handler routine, the Haskell thread 
132                              * that issued the request is found to have been thrown an exception, 
133                              * the worker abandons the request (since that's what the Haskell thread 
134                              * has done.) If the Haskell thread hasn't been interrupted, the worker 
135                              * retries the read request as if nothing happened.
136                              */
137                             if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
138                                 /* For now, only abort when dealing with the standard input handle.
139                                  * i.e., for all others, an error is raised.
140                                  */
141                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
142                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
143                                     if (rts_waitConsoleHandlerCompletion()) {
144                                         /* If the Scheduler has set work->abandonOp, the Haskell thread has 
145                                          * been thrown an exception (=> the worker must abandon this request.)
146                                          * We test for this below before invoking the on-completion routine.
147                                          */
148                                         if (work->abandonOp) {
149                                             break;
150                                         } else {
151                                             continue;
152                                         }
153                                     } 
154                                 } else { 
155                                     break; /* Treat it like an error */
156                                 }
157                             } else {
158                                 break;
159                             }
160                         } else {
161                             break;
162                         }
163                         }
164                         if (len == -1) { errCode = errno; }
165                     }
166                     complData = work->workData.ioData.buf;
167                     fd = work->workData.ioData.fd;
168                 } else if ( work->workKind & WORKER_WRITE ) {
169                     if ( work->workKind & WORKER_FOR_SOCKET ) {
170                         len = send(work->workData.ioData.fd,
171                                    work->workData.ioData.buf,
172                                    work->workData.ioData.len,
173                                    0);
174                         if (len == SOCKET_ERROR) {
175                             errCode = WSAGetLastError();
176                         }
177                     } else {
178                         len = write(work->workData.ioData.fd,
179                                     work->workData.ioData.buf,
180                                     work->workData.ioData.len);
181                         if (len == -1) { errCode = errno; }
182                     }
183                     complData = work->workData.ioData.buf;
184                     fd = work->workData.ioData.fd;
185                 } else if ( work->workKind & WORKER_DELAY ) {
186                     /* Approximate implementation of threadDelay;
187                      * 
188                      * Note: Sleep() is in milliseconds, not micros.
189                      */
190                     Sleep(work->workData.delayData.msecs / 1000);
191                     len = work->workData.delayData.msecs;
192                     complData = NULL;
193                     fd = 0;
194                     errCode = 0;
195                 } else if ( work->workKind & WORKER_DO_PROC ) {
196                     /* perform operation/proc on behalf of Haskell thread. */
197                     if (work->workData.procData.proc) {
198                         /* The procedure is assumed to encode result + success/failure
199                          * via its param.
200                          */
201                         errCode=work->workData.procData.proc(work->workData.procData.param);
202                     } else {
203                         errCode=1;
204                     }
205                     complData = work->workData.procData.param;
206                 } else {
207                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
208                     fflush(stderr);
209                     continue;
210                 }
211                 if (!work->abandonOp) {
212                     work->onCompletion(work->requestID,
213                                        fd,
214                                        len,
215                                        complData,
216                                        errCode);
217                 }
218                 /* Free the WorkItem */
219                 DeregisterWorkItem(iom,work);
220                 free(work);
221             } else {
222                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
223                 return 1;
224             }
225         } else {
226             fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
227             return 1;
228         }
229     }
230     return 0;
231 }
232
233 static 
234 BOOL
235 NewIOWorkerThread(IOManagerState* iom)
236 {
237     unsigned threadId;
238     return ( 0 != _beginthreadex(NULL,
239                                  0,
240                                  IOWorkerProc,
241                                  (LPVOID)iom,
242                                  0,
243                                  &threadId) );
244 }
245
246 BOOL
247 StartIOManager(void)
248 {
249     HANDLE hExit;
250     WorkQueue* wq;
251
252     wq = NewWorkQueue();
253     if ( !wq ) return FALSE;  
254   
255     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
256   
257     if (!ioMan) {
258         FreeWorkQueue(wq);
259         return FALSE;
260     }
261
262     /* A manual-reset event */
263     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
264     if ( !hExit ) {
265         FreeWorkQueue(wq);
266         free(ioMan);
267         return FALSE;
268     }
269   
270     ioMan->hExitEvent = hExit;
271     InitializeCriticalSection(&ioMan->manLock);
272     ioMan->workQueue   = wq;
273     ioMan->numWorkers  = 0;
274     ioMan->workersIdle = 0;
275     ioMan->queueSize   = 0;
276     ioMan->requestID   = 1;
277     InitializeCriticalSection(&ioMan->active_work_lock);
278     ioMan->active_work_items = NULL;
279  
280     return TRUE;
281 }
282
283 /*
284  * Function: depositWorkItem()
285  *
286  * Local function which deposits a WorkItem onto a work queue,
287  * deciding in the process whether or not the thread pool needs
288  * to be augmented with another thread to handle the new request.
289  *
290  */
291 static
292 int
293 depositWorkItem( unsigned int reqID,
294                  WorkItem* wItem )
295 {
296     EnterCriticalSection(&ioMan->manLock);
297
298 #if 0
299     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
300     fflush(stderr);
301 #endif
302     /* A new worker thread is created when there are fewer idle threads
303      * than non-consumed queue requests. This ensures that requests will
304      * be dealt with in a timely manner.
305      *
306      * [Long explanation of why the previous thread pool policy lead to 
307      * trouble]
308      *
309      * Previously, the thread pool was augmented iff no idle worker threads
310      * were available. That strategy runs the risk of repeatedly adding to
311      * the request queue without expanding the thread pool to handle this
312      * sudden spike in queued requests. 
313      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
314      * thread is created and the request is simply queued. If addIORequest()
315      * is called again _before the OS schedules a worker thread to pull the
316      * request off the queue_, workersIdle is still 1 and another request is 
317      * simply added to the queue. Once the worker thread is run, only one
318      * request is de-queued, leaving the 2nd request in the queue]
319      * 
320      * Assuming none of the queued requests take an inordinate amount of to 
321      * complete, the request queue would eventually be drained. But if that's 
322      * not the case, the later requests will end up languishing in the queue 
323      * indefinitely. The non-timely handling of requests may cause CH applications
324      * to misbehave / hang; bad.
325      *
326      */
327     ioMan->queueSize++;
328     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
329         /* see if giving up our quantum ferrets out some idle threads.
330          */
331         LeaveCriticalSection(&ioMan->manLock);
332         Sleep(0);
333         EnterCriticalSection(&ioMan->manLock);
334         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
335             /* No, go ahead and create another. */
336             ioMan->numWorkers++;
337             LeaveCriticalSection(&ioMan->manLock);
338             NewIOWorkerThread(ioMan);
339         } else {
340             LeaveCriticalSection(&ioMan->manLock);
341         }
342     } else {
343         LeaveCriticalSection(&ioMan->manLock);
344     }
345   
346     if (SubmitWork(ioMan->workQueue,wItem)) {
347         /* Note: the work item has potentially been consumed by a worker thread
348          *       (and freed) at this point, so we cannot use wItem's requestID.
349          */
350         return reqID;
351     } else {
352         return 0;
353     }
354 }
355
356 /*
357  * Function: AddIORequest()
358  *
359  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
360  * request to work queue, deciding whether or not to augment
361  * the thread pool in the process. 
362  */
363 int
364 AddIORequest ( int   fd,
365                BOOL  forWriting,
366                BOOL  isSocket,
367                int   len,
368                char* buffer,
369                CompletionProc onCompletion)
370 {
371     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
372     unsigned int reqID = ioMan->requestID++;
373     if (!ioMan || !wItem) return 0;
374   
375     /* Fill in the blanks */
376     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
377                           ( forWriting ? WORKER_WRITE : WORKER_READ );
378     wItem->workData.ioData.fd  = fd;
379     wItem->workData.ioData.len = len;
380     wItem->workData.ioData.buf = buffer;
381     wItem->link = NULL;
382
383     wItem->onCompletion        = onCompletion;
384     wItem->requestID           = reqID;
385   
386     return depositWorkItem(reqID, wItem);
387 }       
388
389 /*
390  * Function: AddDelayRequest()
391  *
392  * Like AddIORequest(), but this time adding a delay request to
393  * the request queue.
394  */
395 BOOL
396 AddDelayRequest ( unsigned int   msecs,
397                   CompletionProc onCompletion)
398 {
399     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
400     unsigned int reqID = ioMan->requestID++;
401     if (!ioMan || !wItem) return FALSE;
402   
403     /* Fill in the blanks */
404     wItem->workKind     = WORKER_DELAY;
405     wItem->workData.delayData.msecs = msecs;
406     wItem->onCompletion = onCompletion;
407     wItem->requestID    = reqID;
408     wItem->link         = NULL;
409
410     return depositWorkItem(reqID, wItem);
411 }
412
413 /*
414  * Function: AddProcRequest()
415  *
416  * Add an asynchronous procedure request.
417  */
418 BOOL
419 AddProcRequest ( void* proc,
420                  void* param,
421                  CompletionProc onCompletion)
422 {
423     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
424     unsigned int reqID = ioMan->requestID++;
425     if (!ioMan || !wItem) return FALSE;
426   
427     /* Fill in the blanks */
428     wItem->workKind     = WORKER_DO_PROC;
429     wItem->workData.procData.proc  = proc;
430     wItem->workData.procData.param = param;
431     wItem->onCompletion = onCompletion;
432     wItem->requestID    = reqID;
433     wItem->abandonOp    = 0;
434     wItem->link         = NULL;
435
436     return depositWorkItem(reqID, wItem);
437 }
438
439 void ShutdownIOManager ( void )
440 {
441     int num;
442
443     SetEvent(ioMan->hExitEvent);
444   
445     /* Wait for all worker threads to die. */
446     for (;;) {
447         EnterCriticalSection(&ioMan->manLock);
448         num = ioMan->numWorkers;
449         LeaveCriticalSection(&ioMan->manLock);
450         if (num == 0)
451             break;
452         Sleep(10);
453     }
454     FreeWorkQueue(ioMan->workQueue);
455     CloseHandle(ioMan->hExitEvent);
456     free(ioMan);
457     ioMan = NULL;
458 }
459
460 /* Keep track of WorkItems currently being serviced. */
461 static 
462 void
463 RegisterWorkItem(IOManagerState* ioMan, 
464                  WorkItem* wi)
465 {
466     EnterCriticalSection(&ioMan->active_work_lock);
467     wi->link = ioMan->active_work_items;
468     ioMan->active_work_items = wi;
469     LeaveCriticalSection(&ioMan->active_work_lock);
470 }
471
472 static 
473 void
474 DeregisterWorkItem(IOManagerState* ioMan, 
475                    WorkItem* wi)
476 {
477     WorkItem *ptr, *prev;
478     
479     EnterCriticalSection(&ioMan->active_work_lock);
480     for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
481         if (wi->requestID == ptr->requestID) {
482             if (prev==NULL) {
483                 ioMan->active_work_items = ptr->link;
484             } else {
485                 prev->link = ptr->link;
486             }
487             LeaveCriticalSection(&ioMan->active_work_lock);
488             return;
489         }
490     }
491     fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
492     LeaveCriticalSection(&ioMan->active_work_lock);
493 }
494
495
496 /*
497  * Function: abandonWorkRequest()
498  *
499  * Signal that a work request isn't of interest. Called by the Scheduler
500  * if a blocked Haskell thread has an exception thrown to it.
501  *
502  * Note: we're not aborting the system call that a worker might be blocked on
503  * here, just disabling the propagation of its result once its finished. We
504  * may have to go the whole hog here and switch to overlapped I/O so that we
505  * can abort blocked system calls.
506  */
507 void
508 abandonWorkRequest ( int reqID )
509 {
510     WorkItem *ptr;
511     EnterCriticalSection(&ioMan->active_work_lock);
512     for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
513         if (ptr->requestID == (unsigned int)reqID ) {
514             ptr->abandonOp = 1;
515             LeaveCriticalSection(&ioMan->active_work_lock);
516             return;
517         }
518     }
519     /* Note: if the request ID isn't present, the worker will have
520      * finished sometime since awaitRequests() last drained the completed
521      * request table; i.e., not an error.
522      */
523     LeaveCriticalSection(&ioMan->active_work_lock);
524 }