[project @ 2004-11-17 19:07:38 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19     CritSection      manLock;
20     WorkQueue*       workQueue;
21     int              queueSize;
22     int              numWorkers;
23     int              workersIdle;
24     HANDLE           hExitEvent;
25     unsigned int     requestID;
26 } IOManagerState;
27
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
30
31 /*
32  * The routine executed by each worker thread.
33  */
34 static
35 unsigned
36 WINAPI
37 IOWorkerProc(PVOID param)
38 {
39     HANDLE  hWaits[2];
40     DWORD   rc;
41     IOManagerState* iom = (IOManagerState*)param;
42     WorkQueue* pq = iom->workQueue;
43     WorkItem*  work;
44     int        len = 0, fd = 0;
45     DWORD      errCode = 0;
46     void*      complData;
47
48     hWaits[0] = (HANDLE)iom->hExitEvent;
49     hWaits[1] = GetWorkQueueHandle(pq);
50   
51     while (1) {
52         /* The error code is communicated back on completion of request; reset. */
53         errCode = 0;
54         
55         EnterCriticalSection(&iom->manLock);
56         /* Signal that the worker is idle.
57          *
58          * 'workersIdle' is used when determining whether or not to
59          * increase the worker thread pool when adding a new request.
60          * (see addIORequest().)
61          */
62         iom->workersIdle++;
63         LeaveCriticalSection(&iom->manLock);
64         
65         /*
66          * A possible future refinement is to make long-term idle threads
67          * wake up and decide to shut down should the number of idle threads
68          * be above some threshold.
69          *
70          */
71         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
72
73         if (rc == WAIT_OBJECT_0) {
74             // we received the exit event
75             return 0;
76         }
77
78         EnterCriticalSection(&iom->manLock);
79         /* Signal that the thread is 'non-idle' and about to consume 
80          * a work item.
81          */
82         iom->workersIdle--;
83         iom->queueSize--;
84         LeaveCriticalSection(&iom->manLock);
85     
86         if ( rc == (WAIT_OBJECT_0 + 1) ) {
87             /* work item available, fetch it. */
88             if (FetchWork(pq,(void**)&work)) {
89                 if ( work->workKind & WORKER_READ ) {
90                     if ( work->workKind & WORKER_FOR_SOCKET ) {
91                         len = recv(work->workData.ioData.fd, 
92                                    work->workData.ioData.buf,
93                                    work->workData.ioData.len,
94                                    0);
95                         if (len == SOCKET_ERROR) {
96                             errCode = WSAGetLastError();
97                         }
98                     } else {
99                         DWORD dw;
100
101                         /* Do the read(), with extra-special handling for Ctrl+C */
102                         len = read(work->workData.ioData.fd,
103                                    work->workData.ioData.buf,
104                                    work->workData.ioData.len);
105                         if ( len == 0 && work->workData.ioData.len != 0 ) {
106                             /* Given the following scenario:
107                              *     - a console handler has been registered that handles Ctrl+C
108                              *       events.
109                              *     - we've not tweaked the 'console mode' settings to turn on
110                              *       ENABLE_PROCESSED_INPUT.
111                              *     - we're blocked waiting on input from standard input.
112                              *     - the user hits Ctrl+C.
113                              *
114                              * The OS will invoke the console handler (in a separate OS thread),
115                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
116                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
117                              * want to percolate this non-EOF condition too far back up, but ignore
118                              * it. However, we do want to give the RTS an opportunity to deliver the
119                              * console event.
120                              * 
121                              * Hence, we set 'errorCode' to (-2), which we then look out for in
122                              * GHC.Conc.asyncRead.
123                              */
124                             dw = GetLastError();
125                             if ( dw == ERROR_OPERATION_ABORTED ) {
126                                 /* Only do the retry when dealing with the standard input handle. */
127                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
128                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
129                                     errCode = (DWORD)-2;
130                                 }
131                             }
132                         }
133                         if (len == -1) { errCode = errno; }
134                     }
135                     complData = work->workData.ioData.buf;
136                     fd = work->workData.ioData.fd;
137                 } else if ( work->workKind & WORKER_WRITE ) {
138                     if ( work->workKind & WORKER_FOR_SOCKET ) {
139                         len = send(work->workData.ioData.fd,
140                                    work->workData.ioData.buf,
141                                    work->workData.ioData.len,
142                                    0);
143                         if (len == SOCKET_ERROR) {
144                             errCode = WSAGetLastError();
145                         }
146                     } else {
147                         len = write(work->workData.ioData.fd,
148                                     work->workData.ioData.buf,
149                                     work->workData.ioData.len);
150                         if (len == -1) { errCode = errno; }
151                     }
152                     complData = work->workData.ioData.buf;
153                     fd = work->workData.ioData.fd;
154                 } else if ( work->workKind & WORKER_DELAY ) {
155                     /* Approximate implementation of threadDelay;
156                      * 
157                      * Note: Sleep() is in milliseconds, not micros.
158                      */
159                     Sleep(work->workData.delayData.msecs / 1000);
160                     len = work->workData.delayData.msecs;
161                     complData = NULL;
162                     fd = 0;
163                     errCode = 0;
164                 } else if ( work->workKind & WORKER_DO_PROC ) {
165                     /* perform operation/proc on behalf of Haskell thread. */
166                     if (work->workData.procData.proc) {
167                         /* The procedure is assumed to encode result + success/failure
168                          * via its param.
169                          */
170                         errCode=work->workData.procData.proc(work->workData.procData.param);
171                     } else {
172                         errCode=1;
173                     }
174                     complData = work->workData.procData.param;
175                 } else {
176                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
177                     fflush(stderr);
178                     continue;
179                 }
180                 work->onCompletion(work->requestID,
181                                    fd,
182                                    len,
183                                    complData,
184                                    errCode);
185                 /* Free the WorkItem */
186                 free(work);
187             } else {
188                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
189                 return 1;
190             }
191         } else {
192             fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
193             return 1;
194         }
195     }
196     return 0;
197 }
198
199 static 
200 BOOL
201 NewIOWorkerThread(IOManagerState* iom)
202 {
203     unsigned threadId;
204     return ( 0 != _beginthreadex(NULL,
205                                  0,
206                                  IOWorkerProc,
207                                  (LPVOID)iom,
208                                  0,
209                                  &threadId) );
210 }
211
212 BOOL
213 StartIOManager(void)
214 {
215     HANDLE hExit;
216     WorkQueue* wq;
217
218     wq = NewWorkQueue();
219     if ( !wq ) return FALSE;  
220   
221     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
222   
223     if (!ioMan) {
224         FreeWorkQueue(wq);
225         return FALSE;
226     }
227
228     /* A manual-reset event */
229     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
230     if ( !hExit ) {
231         FreeWorkQueue(wq);
232         free(ioMan);
233         return FALSE;
234     }
235   
236     ioMan->hExitEvent = hExit;
237     InitializeCriticalSection(&ioMan->manLock);
238     ioMan->workQueue   = wq;
239     ioMan->numWorkers  = 0;
240     ioMan->workersIdle = 0;
241     ioMan->queueSize   = 0;
242     ioMan->requestID   = 1;
243  
244     return TRUE;
245 }
246
247 /*
248  * Function: depositWorkItem()
249  *
250  * Local function which deposits a WorkItem onto a work queue,
251  * deciding in the process whether or not the thread pool needs
252  * to be augmented with another thread to handle the new request.
253  *
254  */
255 static
256 int
257 depositWorkItem( unsigned int reqID,
258                  WorkItem* wItem )
259 {
260     EnterCriticalSection(&ioMan->manLock);
261
262 #if 0
263     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
264     fflush(stderr);
265 #endif
266     /* A new worker thread is created when there are fewer idle threads
267      * than non-consumed queue requests. This ensures that requests will
268      * be dealt with in a timely manner.
269      *
270      * [Long explanation of why the previous thread pool policy lead to 
271      * trouble]
272      *
273      * Previously, the thread pool was augmented iff no idle worker threads
274      * were available. That strategy runs the risk of repeatedly adding to
275      * the request queue without expanding the thread pool to handle this
276      * sudden spike in queued requests. 
277      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
278      * thread is created and the request is simply queued. If addIORequest()
279      * is called again _before the OS schedules a worker thread to pull the
280      * request off the queue_, workersIdle is still 1 and another request is 
281      * simply added to the queue. Once the worker thread is run, only one
282      * request is de-queued, leaving the 2nd request in the queue]
283      * 
284      * Assuming none of the queued requests take an inordinate amount of to 
285      * complete, the request queue would eventually be drained. But if that's 
286      * not the case, the later requests will end up languishing in the queue 
287      * indefinitely. The non-timely handling of requests may cause CH applications
288      * to misbehave / hang; bad.
289      *
290      */
291     ioMan->queueSize++;
292     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
293         /* see if giving up our quantum ferrets out some idle threads.
294          */
295         LeaveCriticalSection(&ioMan->manLock);
296         Sleep(0);
297         EnterCriticalSection(&ioMan->manLock);
298         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
299             /* No, go ahead and create another. */
300             ioMan->numWorkers++;
301             LeaveCriticalSection(&ioMan->manLock);
302             NewIOWorkerThread(ioMan);
303         } else {
304             LeaveCriticalSection(&ioMan->manLock);
305         }
306     } else {
307         LeaveCriticalSection(&ioMan->manLock);
308     }
309   
310     if (SubmitWork(ioMan->workQueue,wItem)) {
311         /* Note: the work item has potentially been consumed by a worker thread
312          *       (and freed) at this point, so we cannot use wItem's requestID.
313          */
314         return reqID;
315     } else {
316         return 0;
317     }
318 }
319
320 /*
321  * Function: AddIORequest()
322  *
323  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
324  * request to work queue, deciding whether or not to augment
325  * the thread pool in the process. 
326  */
327 int
328 AddIORequest ( int   fd,
329                BOOL  forWriting,
330                BOOL  isSocket,
331                int   len,
332                char* buffer,
333                CompletionProc onCompletion)
334 {
335     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
336     unsigned int reqID = ioMan->requestID++;
337     if (!ioMan || !wItem) return 0;
338   
339     /* Fill in the blanks */
340     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
341                           ( forWriting ? WORKER_WRITE : WORKER_READ );
342     wItem->workData.ioData.fd  = fd;
343     wItem->workData.ioData.len = len;
344     wItem->workData.ioData.buf = buffer;
345
346     wItem->onCompletion        = onCompletion;
347     wItem->requestID           = reqID;
348   
349     return depositWorkItem(reqID, wItem);
350 }       
351
352 /*
353  * Function: AddDelayRequest()
354  *
355  * Like AddIORequest(), but this time adding a delay request to
356  * the request queue.
357  */
358 BOOL
359 AddDelayRequest ( unsigned int   msecs,
360                   CompletionProc onCompletion)
361 {
362     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
363     unsigned int reqID = ioMan->requestID++;
364     if (!ioMan || !wItem) return FALSE;
365   
366     /* Fill in the blanks */
367     wItem->workKind     = WORKER_DELAY;
368     wItem->workData.delayData.msecs = msecs;
369     wItem->onCompletion = onCompletion;
370     wItem->requestID    = reqID;
371
372     return depositWorkItem(reqID, wItem);
373 }
374
375 /*
376  * Function: AddProcRequest()
377  *
378  * Add an asynchronous procedure request.
379  */
380 BOOL
381 AddProcRequest ( void* proc,
382                  void* param,
383                  CompletionProc onCompletion)
384 {
385     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
386     unsigned int reqID = ioMan->requestID++;
387     if (!ioMan || !wItem) return FALSE;
388   
389     /* Fill in the blanks */
390     wItem->workKind     = WORKER_DO_PROC;
391     wItem->workData.procData.proc  = proc;
392     wItem->workData.procData.param = param;
393     wItem->onCompletion = onCompletion;
394     wItem->requestID    = reqID;
395
396     return depositWorkItem(reqID, wItem);
397 }
398
399 void ShutdownIOManager ( void )
400 {
401   SetEvent(ioMan->hExitEvent);
402   // ToDo: we can't free this now, because the worker thread(s)
403   // haven't necessarily finished with it yet.  Perhaps it should
404   // have a reference count or something.
405   // free(ioMan);
406   // ioMan = NULL;
407 }