[project @ 2005-01-21 19:58:51 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19     CritSection      manLock;
20     WorkQueue*       workQueue;
21     int              queueSize;
22     int              numWorkers;
23     int              workersIdle;
24     HANDLE           hExitEvent;
25     unsigned int     requestID;
26 } IOManagerState;
27
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
30
31 /*
32  * The routine executed by each worker thread.
33  */
34 static
35 unsigned
36 WINAPI
37 IOWorkerProc(PVOID param)
38 {
39     HANDLE  hWaits[2];
40     DWORD   rc;
41     IOManagerState* iom = (IOManagerState*)param;
42     WorkQueue* pq = iom->workQueue;
43     WorkItem*  work;
44     int        len = 0, fd = 0;
45     DWORD      errCode = 0;
46     void*      complData;
47
48     hWaits[0] = (HANDLE)iom->hExitEvent;
49     hWaits[1] = GetWorkQueueHandle(pq);
50   
51     while (1) {
52         /* The error code is communicated back on completion of request; reset. */
53         errCode = 0;
54         
55         EnterCriticalSection(&iom->manLock);
56         /* Signal that the worker is idle.
57          *
58          * 'workersIdle' is used when determining whether or not to
59          * increase the worker thread pool when adding a new request.
60          * (see addIORequest().)
61          */
62         iom->workersIdle++;
63         LeaveCriticalSection(&iom->manLock);
64         
65         /*
66          * A possible future refinement is to make long-term idle threads
67          * wake up and decide to shut down should the number of idle threads
68          * be above some threshold.
69          *
70          */
71         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
72
73         if (rc == WAIT_OBJECT_0) {
74             // we received the exit event
75             return 0;
76         }
77
78         EnterCriticalSection(&iom->manLock);
79         /* Signal that the thread is 'non-idle' and about to consume 
80          * a work item.
81          */
82         iom->workersIdle--;
83         iom->queueSize--;
84         LeaveCriticalSection(&iom->manLock);
85     
86         if ( rc == (WAIT_OBJECT_0 + 1) ) {
87             /* work item available, fetch it. */
88             if (FetchWork(pq,(void**)&work)) {
89                 if ( work->workKind & WORKER_READ ) {
90                     if ( work->workKind & WORKER_FOR_SOCKET ) {
91                         len = recv(work->workData.ioData.fd, 
92                                    work->workData.ioData.buf,
93                                    work->workData.ioData.len,
94                                    0);
95                         if (len == SOCKET_ERROR) {
96                             errCode = WSAGetLastError();
97                         }
98                     } else {
99                         DWORD dw;
100
101                         while (1) {
102                         /* Do the read(), with extra-special handling for Ctrl+C */
103                         len = read(work->workData.ioData.fd,
104                                    work->workData.ioData.buf,
105                                    work->workData.ioData.len);
106                         dw = GetLastError();
107                         if ( len == 0 && work->workData.ioData.len != 0 ) {
108                             /* Given the following scenario:
109                              *     - a console handler has been registered that handles Ctrl+C
110                              *       events.
111                              *     - we've not tweaked the 'console mode' settings to turn on
112                              *       ENABLE_PROCESSED_INPUT.
113                              *     - we're blocked waiting on input from standard input.
114                              *     - the user hits Ctrl+C.
115                              *
116                              * The OS will invoke the console handler (in a separate OS thread),
117                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
118                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
119                              * want to percolate this non-EOF condition too far back up, but ignore
120                              * it. 
121                              *
122                              * However, we do want to give the RTS an opportunity to deliver the
123                              * console event. Take care of this in the low-level console handler
124                              * in ConsoleHandler.c which wakes up the RTS thread that's blocked
125                              * waiting for I/O results from this worker (and possibly others).
126                              * It won't see any I/O, but notices and dispatches the queued up
127                              * signals/console events while in the Scheduler.
128                              *
129                              * The original, and way hackier scheme, was to have the worker
130                              * return a special return code representing aborted-due-to-ctrl-C-on-stdin,
131                              * which GHC.Conc.asyncRead would look out for and retry the I/O
132                              * call if encountered.
133                              */
134                             if ( dw == ERROR_OPERATION_ABORTED ) {
135                                 /* Only do the retry when dealing with the standard input handle. */
136                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
137                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
138                                     Sleep(0);
139                                 } else {
140                                     break;
141                                 }
142                             } else {
143                                 break;
144                             }
145                         } else {
146                             break;
147                         }
148                         }
149                         if (len == -1) { errCode = errno; }
150                     }
151                     complData = work->workData.ioData.buf;
152                     fd = work->workData.ioData.fd;
153                 } else if ( work->workKind & WORKER_WRITE ) {
154                     if ( work->workKind & WORKER_FOR_SOCKET ) {
155                         len = send(work->workData.ioData.fd,
156                                    work->workData.ioData.buf,
157                                    work->workData.ioData.len,
158                                    0);
159                         if (len == SOCKET_ERROR) {
160                             errCode = WSAGetLastError();
161                         }
162                     } else {
163                         len = write(work->workData.ioData.fd,
164                                     work->workData.ioData.buf,
165                                     work->workData.ioData.len);
166                         if (len == -1) { errCode = errno; }
167                     }
168                     complData = work->workData.ioData.buf;
169                     fd = work->workData.ioData.fd;
170                 } else if ( work->workKind & WORKER_DELAY ) {
171                     /* Approximate implementation of threadDelay;
172                      * 
173                      * Note: Sleep() is in milliseconds, not micros.
174                      */
175                     Sleep(work->workData.delayData.msecs / 1000);
176                     len = work->workData.delayData.msecs;
177                     complData = NULL;
178                     fd = 0;
179                     errCode = 0;
180                 } else if ( work->workKind & WORKER_DO_PROC ) {
181                     /* perform operation/proc on behalf of Haskell thread. */
182                     if (work->workData.procData.proc) {
183                         /* The procedure is assumed to encode result + success/failure
184                          * via its param.
185                          */
186                         errCode=work->workData.procData.proc(work->workData.procData.param);
187                     } else {
188                         errCode=1;
189                     }
190                     complData = work->workData.procData.param;
191                 } else {
192                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
193                     fflush(stderr);
194                     continue;
195                 }
196                 work->onCompletion(work->requestID,
197                                    fd,
198                                    len,
199                                    complData,
200                                    errCode);
201                 /* Free the WorkItem */
202                 free(work);
203             } else {
204                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
205                 return 1;
206             }
207         } else {
208             fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
209             return 1;
210         }
211     }
212     return 0;
213 }
214
215 static 
216 BOOL
217 NewIOWorkerThread(IOManagerState* iom)
218 {
219     unsigned threadId;
220     return ( 0 != _beginthreadex(NULL,
221                                  0,
222                                  IOWorkerProc,
223                                  (LPVOID)iom,
224                                  0,
225                                  &threadId) );
226 }
227
228 BOOL
229 StartIOManager(void)
230 {
231     HANDLE hExit;
232     WorkQueue* wq;
233
234     wq = NewWorkQueue();
235     if ( !wq ) return FALSE;  
236   
237     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
238   
239     if (!ioMan) {
240         FreeWorkQueue(wq);
241         return FALSE;
242     }
243
244     /* A manual-reset event */
245     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
246     if ( !hExit ) {
247         FreeWorkQueue(wq);
248         free(ioMan);
249         return FALSE;
250     }
251   
252     ioMan->hExitEvent = hExit;
253     InitializeCriticalSection(&ioMan->manLock);
254     ioMan->workQueue   = wq;
255     ioMan->numWorkers  = 0;
256     ioMan->workersIdle = 0;
257     ioMan->queueSize   = 0;
258     ioMan->requestID   = 1;
259  
260     return TRUE;
261 }
262
263 /*
264  * Function: depositWorkItem()
265  *
266  * Local function which deposits a WorkItem onto a work queue,
267  * deciding in the process whether or not the thread pool needs
268  * to be augmented with another thread to handle the new request.
269  *
270  */
271 static
272 int
273 depositWorkItem( unsigned int reqID,
274                  WorkItem* wItem )
275 {
276     EnterCriticalSection(&ioMan->manLock);
277
278 #if 0
279     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
280     fflush(stderr);
281 #endif
282     /* A new worker thread is created when there are fewer idle threads
283      * than non-consumed queue requests. This ensures that requests will
284      * be dealt with in a timely manner.
285      *
286      * [Long explanation of why the previous thread pool policy lead to 
287      * trouble]
288      *
289      * Previously, the thread pool was augmented iff no idle worker threads
290      * were available. That strategy runs the risk of repeatedly adding to
291      * the request queue without expanding the thread pool to handle this
292      * sudden spike in queued requests. 
293      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
294      * thread is created and the request is simply queued. If addIORequest()
295      * is called again _before the OS schedules a worker thread to pull the
296      * request off the queue_, workersIdle is still 1 and another request is 
297      * simply added to the queue. Once the worker thread is run, only one
298      * request is de-queued, leaving the 2nd request in the queue]
299      * 
300      * Assuming none of the queued requests take an inordinate amount of to 
301      * complete, the request queue would eventually be drained. But if that's 
302      * not the case, the later requests will end up languishing in the queue 
303      * indefinitely. The non-timely handling of requests may cause CH applications
304      * to misbehave / hang; bad.
305      *
306      */
307     ioMan->queueSize++;
308     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
309         /* see if giving up our quantum ferrets out some idle threads.
310          */
311         LeaveCriticalSection(&ioMan->manLock);
312         Sleep(0);
313         EnterCriticalSection(&ioMan->manLock);
314         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
315             /* No, go ahead and create another. */
316             ioMan->numWorkers++;
317             LeaveCriticalSection(&ioMan->manLock);
318             NewIOWorkerThread(ioMan);
319         } else {
320             LeaveCriticalSection(&ioMan->manLock);
321         }
322     } else {
323         LeaveCriticalSection(&ioMan->manLock);
324     }
325   
326     if (SubmitWork(ioMan->workQueue,wItem)) {
327         /* Note: the work item has potentially been consumed by a worker thread
328          *       (and freed) at this point, so we cannot use wItem's requestID.
329          */
330         return reqID;
331     } else {
332         return 0;
333     }
334 }
335
336 /*
337  * Function: AddIORequest()
338  *
339  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
340  * request to work queue, deciding whether or not to augment
341  * the thread pool in the process. 
342  */
343 int
344 AddIORequest ( int   fd,
345                BOOL  forWriting,
346                BOOL  isSocket,
347                int   len,
348                char* buffer,
349                CompletionProc onCompletion)
350 {
351     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
352     unsigned int reqID = ioMan->requestID++;
353     if (!ioMan || !wItem) return 0;
354   
355     /* Fill in the blanks */
356     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
357                           ( forWriting ? WORKER_WRITE : WORKER_READ );
358     wItem->workData.ioData.fd  = fd;
359     wItem->workData.ioData.len = len;
360     wItem->workData.ioData.buf = buffer;
361
362     wItem->onCompletion        = onCompletion;
363     wItem->requestID           = reqID;
364   
365     return depositWorkItem(reqID, wItem);
366 }       
367
368 /*
369  * Function: AddDelayRequest()
370  *
371  * Like AddIORequest(), but this time adding a delay request to
372  * the request queue.
373  */
374 BOOL
375 AddDelayRequest ( unsigned int   msecs,
376                   CompletionProc onCompletion)
377 {
378     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
379     unsigned int reqID = ioMan->requestID++;
380     if (!ioMan || !wItem) return FALSE;
381   
382     /* Fill in the blanks */
383     wItem->workKind     = WORKER_DELAY;
384     wItem->workData.delayData.msecs = msecs;
385     wItem->onCompletion = onCompletion;
386     wItem->requestID    = reqID;
387
388     return depositWorkItem(reqID, wItem);
389 }
390
391 /*
392  * Function: AddProcRequest()
393  *
394  * Add an asynchronous procedure request.
395  */
396 BOOL
397 AddProcRequest ( void* proc,
398                  void* param,
399                  CompletionProc onCompletion)
400 {
401     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
402     unsigned int reqID = ioMan->requestID++;
403     if (!ioMan || !wItem) return FALSE;
404   
405     /* Fill in the blanks */
406     wItem->workKind     = WORKER_DO_PROC;
407     wItem->workData.procData.proc  = proc;
408     wItem->workData.procData.param = param;
409     wItem->onCompletion = onCompletion;
410     wItem->requestID    = reqID;
411
412     return depositWorkItem(reqID, wItem);
413 }
414
415 void ShutdownIOManager ( void )
416 {
417   SetEvent(ioMan->hExitEvent);
418   // ToDo: we can't free this now, because the worker thread(s)
419   // haven't necessarily finished with it yet.  Perhaps it should
420   // have a reference count or something.
421   // free(ioMan);
422   // ioMan = NULL;
423 }