Massive patch for the first months work adding System FC to GHC #35
[ghc-hetmet.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "IOManager.h"
9 #include "WorkQueue.h"
10 #include "ConsoleHandler.h"
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <io.h>
14 #include <winsock.h>
15 #include <process.h>
16
17 /*
18  * Internal state maintained by the IO manager.
19  */
20 typedef struct IOManagerState {
21     CritSection      manLock;
22     WorkQueue*       workQueue;
23     int              queueSize;
24     int              numWorkers;
25     int              workersIdle;
26     HANDLE           hExitEvent;
27     unsigned int     requestID;
28     /* fields for keeping track of active WorkItems */
29     CritSection      active_work_lock;
30     WorkItem*        active_work_items;
31 } IOManagerState;
32
33 /* ToDo: wrap up this state via a IOManager handle instead? */
34 static IOManagerState* ioMan;
35
36 static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
37 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
38
39 /*
40  * The routine executed by each worker thread.
41  */
42 static
43 unsigned
44 WINAPI
45 IOWorkerProc(PVOID param)
46 {
47     HANDLE  hWaits[2];
48     DWORD   rc;
49     IOManagerState* iom = (IOManagerState*)param;
50     WorkQueue* pq = iom->workQueue;
51     WorkItem*  work;
52     int        len = 0, fd = 0;
53     DWORD      errCode = 0;
54     void*      complData;
55
56     hWaits[0] = (HANDLE)iom->hExitEvent;
57     hWaits[1] = GetWorkQueueHandle(pq);
58   
59     while (1) {
60         /* The error code is communicated back on completion of request; reset. */
61         errCode = 0;
62         
63         EnterCriticalSection(&iom->manLock);
64         /* Signal that the worker is idle.
65          *
66          * 'workersIdle' is used when determining whether or not to
67          * increase the worker thread pool when adding a new request.
68          * (see addIORequest().)
69          */
70         iom->workersIdle++;
71         LeaveCriticalSection(&iom->manLock);
72         
73         /*
74          * A possible future refinement is to make long-term idle threads
75          * wake up and decide to shut down should the number of idle threads
76          * be above some threshold.
77          *
78          */
79         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
80
81         if (rc == WAIT_OBJECT_0) {
82             // we received the exit event
83             EnterCriticalSection(&iom->manLock);
84             ioMan->numWorkers--;
85             LeaveCriticalSection(&iom->manLock);
86             return 0;
87         }
88
89         EnterCriticalSection(&iom->manLock);
90         /* Signal that the thread is 'non-idle' and about to consume 
91          * a work item.
92          */
93         iom->workersIdle--;
94         iom->queueSize--;
95         LeaveCriticalSection(&iom->manLock);
96     
97         if ( rc == (WAIT_OBJECT_0 + 1) ) {
98             /* work item available, fetch it. */
99             if (FetchWork(pq,(void**)&work)) {
100                 work->abandonOp = 0;
101                 RegisterWorkItem(iom,work);
102                 if ( work->workKind & WORKER_READ ) {
103                     if ( work->workKind & WORKER_FOR_SOCKET ) {
104                         len = recv(work->workData.ioData.fd, 
105                                    work->workData.ioData.buf,
106                                    work->workData.ioData.len,
107                                    0);
108                         if (len == SOCKET_ERROR) {
109                             errCode = WSAGetLastError();
110                         }
111                     } else {
112                         while (1) {
113                         /* Do the read(), with extra-special handling for Ctrl+C */
114                         len = read(work->workData.ioData.fd,
115                                    work->workData.ioData.buf,
116                                    work->workData.ioData.len);
117                         if ( len == 0 && work->workData.ioData.len != 0 ) {
118                             /* Given the following scenario:
119                              *     - a console handler has been registered that handles Ctrl+C
120                              *       events.
121                              *     - we've not tweaked the 'console mode' settings to turn on
122                              *       ENABLE_PROCESSED_INPUT.
123                              *     - we're blocked waiting on input from standard input.
124                              *     - the user hits Ctrl+C.
125                              *
126                              * The OS will invoke the console handler (in a separate OS thread),
127                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
128                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
129                              * want to percolate this error condition back to the Haskell user.
130                              * Do this by waiting for the completion of the Haskell console handler.
131                              * If upon completion of the console handler routine, the Haskell thread 
132                              * that issued the request is found to have been thrown an exception, 
133                              * the worker abandons the request (since that's what the Haskell thread 
134                              * has done.) If the Haskell thread hasn't been interrupted, the worker 
135                              * retries the read request as if nothing happened.
136                              */
137                             if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
138                                 /* For now, only abort when dealing with the standard input handle.
139                                  * i.e., for all others, an error is raised.
140                                  */
141                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
142                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
143                                     if (rts_waitConsoleHandlerCompletion()) {
144                                         /* If the Scheduler has set work->abandonOp, the Haskell thread has 
145                                          * been thrown an exception (=> the worker must abandon this request.)
146                                          * We test for this below before invoking the on-completion routine.
147                                          */
148                                         if (work->abandonOp) {
149                                             break;
150                                         } else {
151                                             continue;
152                                         }
153                                     } 
154                                 } else { 
155                                     break; /* Treat it like an error */
156                                 }
157                             } else {
158                                 break;
159                             }
160                         } else {
161                             break;
162                         }
163                         }
164                         if (len == -1) { errCode = errno; }
165                     }
166                     complData = work->workData.ioData.buf;
167                     fd = work->workData.ioData.fd;
168                 } else if ( work->workKind & WORKER_WRITE ) {
169                     if ( work->workKind & WORKER_FOR_SOCKET ) {
170                         len = send(work->workData.ioData.fd,
171                                    work->workData.ioData.buf,
172                                    work->workData.ioData.len,
173                                    0);
174                         if (len == SOCKET_ERROR) {
175                             errCode = WSAGetLastError();
176                         }
177                     } else {
178                         len = write(work->workData.ioData.fd,
179                                     work->workData.ioData.buf,
180                                     work->workData.ioData.len);
181                         if (len == -1) { errCode = errno; }
182                     }
183                     complData = work->workData.ioData.buf;
184                     fd = work->workData.ioData.fd;
185                 } else if ( work->workKind & WORKER_DELAY ) {
186                     /* Approximate implementation of threadDelay;
187                      * 
188                      * Note: Sleep() is in milliseconds, not micros.
189                      */
190                     Sleep(work->workData.delayData.msecs / 1000);
191                     len = work->workData.delayData.msecs;
192                     complData = NULL;
193                     fd = 0;
194                     errCode = 0;
195                 } else if ( work->workKind & WORKER_DO_PROC ) {
196                     /* perform operation/proc on behalf of Haskell thread. */
197                     if (work->workData.procData.proc) {
198                         /* The procedure is assumed to encode result + success/failure
199                          * via its param.
200                          */
201                         errCode=work->workData.procData.proc(work->workData.procData.param);
202                     } else {
203                         errCode=1;
204                     }
205                     complData = work->workData.procData.param;
206                 } else {
207                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
208                     fflush(stderr);
209                     continue;
210                 }
211                 if (!work->abandonOp) {
212                     work->onCompletion(work->requestID,
213                                        fd,
214                                        len,
215                                        complData,
216                                        errCode);
217                 }
218                 /* Free the WorkItem */
219                 DeregisterWorkItem(iom,work);
220                 free(work);
221             } else {
222                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
223                 EnterCriticalSection(&iom->manLock);
224                 ioMan->numWorkers--;
225                 LeaveCriticalSection(&iom->manLock);
226                 return 1;
227             }
228         } else {
229             fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
230             EnterCriticalSection(&iom->manLock);
231             ioMan->numWorkers--;
232             LeaveCriticalSection(&iom->manLock);
233             return 1;
234         }
235     }
236     return 0;
237 }
238
239 static 
240 BOOL
241 NewIOWorkerThread(IOManagerState* iom)
242 {
243     unsigned threadId;
244     return ( 0 != _beginthreadex(NULL,
245                                  0,
246                                  IOWorkerProc,
247                                  (LPVOID)iom,
248                                  0,
249                                  &threadId) );
250 }
251
252 BOOL
253 StartIOManager(void)
254 {
255     HANDLE hExit;
256     WorkQueue* wq;
257
258     wq = NewWorkQueue();
259     if ( !wq ) return FALSE;  
260   
261     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
262   
263     if (!ioMan) {
264         FreeWorkQueue(wq);
265         return FALSE;
266     }
267
268     /* A manual-reset event */
269     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
270     if ( !hExit ) {
271         FreeWorkQueue(wq);
272         free(ioMan);
273         return FALSE;
274     }
275   
276     ioMan->hExitEvent = hExit;
277     InitializeCriticalSection(&ioMan->manLock);
278     ioMan->workQueue   = wq;
279     ioMan->numWorkers  = 0;
280     ioMan->workersIdle = 0;
281     ioMan->queueSize   = 0;
282     ioMan->requestID   = 1;
283     InitializeCriticalSection(&ioMan->active_work_lock);
284     ioMan->active_work_items = NULL;
285  
286     return TRUE;
287 }
288
289 /*
290  * Function: depositWorkItem()
291  *
292  * Local function which deposits a WorkItem onto a work queue,
293  * deciding in the process whether or not the thread pool needs
294  * to be augmented with another thread to handle the new request.
295  *
296  */
297 static
298 int
299 depositWorkItem( unsigned int reqID,
300                  WorkItem* wItem )
301 {
302     EnterCriticalSection(&ioMan->manLock);
303
304 #if 0
305     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
306     fflush(stderr);
307 #endif
308     /* A new worker thread is created when there are fewer idle threads
309      * than non-consumed queue requests. This ensures that requests will
310      * be dealt with in a timely manner.
311      *
312      * [Long explanation of why the previous thread pool policy lead to 
313      * trouble]
314      *
315      * Previously, the thread pool was augmented iff no idle worker threads
316      * were available. That strategy runs the risk of repeatedly adding to
317      * the request queue without expanding the thread pool to handle this
318      * sudden spike in queued requests. 
319      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
320      * thread is created and the request is simply queued. If addIORequest()
321      * is called again _before the OS schedules a worker thread to pull the
322      * request off the queue_, workersIdle is still 1 and another request is 
323      * simply added to the queue. Once the worker thread is run, only one
324      * request is de-queued, leaving the 2nd request in the queue]
325      * 
326      * Assuming none of the queued requests take an inordinate amount of to 
327      * complete, the request queue would eventually be drained. But if that's 
328      * not the case, the later requests will end up languishing in the queue 
329      * indefinitely. The non-timely handling of requests may cause CH applications
330      * to misbehave / hang; bad.
331      *
332      */
333     ioMan->queueSize++;
334     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
335         /* see if giving up our quantum ferrets out some idle threads.
336          */
337         LeaveCriticalSection(&ioMan->manLock);
338         Sleep(0);
339         EnterCriticalSection(&ioMan->manLock);
340         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
341             /* No, go ahead and create another. */
342             ioMan->numWorkers++;
343             if (!NewIOWorkerThread(ioMan)) {
344                 ioMan->numWorkers--;
345             }
346         }
347     }
348     LeaveCriticalSection(&ioMan->manLock);
349   
350     if (SubmitWork(ioMan->workQueue,wItem)) {
351         /* Note: the work item has potentially been consumed by a worker thread
352          *       (and freed) at this point, so we cannot use wItem's requestID.
353          */
354         return reqID;
355     } else {
356         return 0;
357     }
358 }
359
360 /*
361  * Function: AddIORequest()
362  *
363  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
364  * request to work queue, deciding whether or not to augment
365  * the thread pool in the process. 
366  */
367 int
368 AddIORequest ( int   fd,
369                BOOL  forWriting,
370                BOOL  isSocket,
371                int   len,
372                char* buffer,
373                CompletionProc onCompletion)
374 {
375     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
376     unsigned int reqID = ioMan->requestID++;
377     if (!ioMan || !wItem) return 0;
378   
379     /* Fill in the blanks */
380     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
381                           ( forWriting ? WORKER_WRITE : WORKER_READ );
382     wItem->workData.ioData.fd  = fd;
383     wItem->workData.ioData.len = len;
384     wItem->workData.ioData.buf = buffer;
385     wItem->link = NULL;
386
387     wItem->onCompletion        = onCompletion;
388     wItem->requestID           = reqID;
389   
390     return depositWorkItem(reqID, wItem);
391 }       
392
393 /*
394  * Function: AddDelayRequest()
395  *
396  * Like AddIORequest(), but this time adding a delay request to
397  * the request queue.
398  */
399 BOOL
400 AddDelayRequest ( unsigned int   msecs,
401                   CompletionProc onCompletion)
402 {
403     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
404     unsigned int reqID = ioMan->requestID++;
405     if (!ioMan || !wItem) return FALSE;
406   
407     /* Fill in the blanks */
408     wItem->workKind     = WORKER_DELAY;
409     wItem->workData.delayData.msecs = msecs;
410     wItem->onCompletion = onCompletion;
411     wItem->requestID    = reqID;
412     wItem->link         = NULL;
413
414     return depositWorkItem(reqID, wItem);
415 }
416
417 /*
418  * Function: AddProcRequest()
419  *
420  * Add an asynchronous procedure request.
421  */
422 BOOL
423 AddProcRequest ( void* proc,
424                  void* param,
425                  CompletionProc onCompletion)
426 {
427     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
428     unsigned int reqID = ioMan->requestID++;
429     if (!ioMan || !wItem) return FALSE;
430   
431     /* Fill in the blanks */
432     wItem->workKind     = WORKER_DO_PROC;
433     wItem->workData.procData.proc  = proc;
434     wItem->workData.procData.param = param;
435     wItem->onCompletion = onCompletion;
436     wItem->requestID    = reqID;
437     wItem->abandonOp    = 0;
438     wItem->link         = NULL;
439
440     return depositWorkItem(reqID, wItem);
441 }
442
443 void ShutdownIOManager ( void )
444 {
445     int num;
446
447     SetEvent(ioMan->hExitEvent);
448   
449     /* Wait for all worker threads to die. */
450     for (;;) {
451         EnterCriticalSection(&ioMan->manLock);
452         num = ioMan->numWorkers;
453         LeaveCriticalSection(&ioMan->manLock);
454         if (num == 0)
455             break;
456         Sleep(10);
457     }
458     FreeWorkQueue(ioMan->workQueue);
459     CloseHandle(ioMan->hExitEvent);
460     free(ioMan);
461     ioMan = NULL;
462 }
463
464 /* Keep track of WorkItems currently being serviced. */
465 static 
466 void
467 RegisterWorkItem(IOManagerState* ioMan, 
468                  WorkItem* wi)
469 {
470     EnterCriticalSection(&ioMan->active_work_lock);
471     wi->link = ioMan->active_work_items;
472     ioMan->active_work_items = wi;
473     LeaveCriticalSection(&ioMan->active_work_lock);
474 }
475
476 static 
477 void
478 DeregisterWorkItem(IOManagerState* ioMan, 
479                    WorkItem* wi)
480 {
481     WorkItem *ptr, *prev;
482     
483     EnterCriticalSection(&ioMan->active_work_lock);
484     for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
485         if (wi->requestID == ptr->requestID) {
486             if (prev==NULL) {
487                 ioMan->active_work_items = ptr->link;
488             } else {
489                 prev->link = ptr->link;
490             }
491             LeaveCriticalSection(&ioMan->active_work_lock);
492             return;
493         }
494     }
495     fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
496     LeaveCriticalSection(&ioMan->active_work_lock);
497 }
498
499
500 /*
501  * Function: abandonWorkRequest()
502  *
503  * Signal that a work request isn't of interest. Called by the Scheduler
504  * if a blocked Haskell thread has an exception thrown to it.
505  *
506  * Note: we're not aborting the system call that a worker might be blocked on
507  * here, just disabling the propagation of its result once its finished. We
508  * may have to go the whole hog here and switch to overlapped I/O so that we
509  * can abort blocked system calls.
510  */
511 void
512 abandonWorkRequest ( int reqID )
513 {
514     WorkItem *ptr;
515     EnterCriticalSection(&ioMan->active_work_lock);
516     for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
517         if (ptr->requestID == (unsigned int)reqID ) {
518             ptr->abandonOp = 1;
519             LeaveCriticalSection(&ioMan->active_work_lock);
520             return;
521         }
522     }
523     /* Note: if the request ID isn't present, the worker will have
524      * finished sometime since awaitRequests() last drained the completed
525      * request table; i.e., not an error.
526      */
527     LeaveCriticalSection(&ioMan->active_work_lock);
528 }