[project @ 2003-09-12 16:26:05 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19     CritSection      manLock;
20     WorkQueue*       workQueue;
21     int              queueSize;
22     int              numWorkers;
23     int              workersIdle;
24     HANDLE           hExitEvent;
25     unsigned int     requestID;
26 } IOManagerState;
27
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
30
31 /*
32  * The routine executed by each worker thread.
33  */
34 static
35 unsigned
36 WINAPI
37 IOWorkerProc(PVOID param)
38 {
39     HANDLE  hWaits[2];
40     DWORD   rc;
41     IOManagerState* iom = (IOManagerState*)param;
42     WorkQueue* pq = iom->workQueue;
43     WorkItem*  work;
44     int        len = 0, fd = 0;
45     DWORD      errCode;
46     void*      complData;
47
48     hWaits[0] = (HANDLE)iom->hExitEvent;
49     hWaits[1] = GetWorkQueueHandle(pq);
50   
51     while (1) {
52         /* The error code is communicated back on completion of request; reset. */
53         errCode = 0;
54         
55         EnterCriticalSection(&iom->manLock);
56         /* Signal that the worker is idle.
57          *
58          * 'workersIdle' is used when determining whether or not to
59          * increase the worker thread pool when adding a new request.
60          * (see addIORequest().)
61          */
62         iom->workersIdle++;
63         LeaveCriticalSection(&iom->manLock);
64
65         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
66
67         EnterCriticalSection(&iom->manLock);
68         /* Signal that the thread is 'non-idle' and about to consume 
69          * a work item.
70          */
71         iom->workersIdle--;
72         iom->queueSize--;
73         LeaveCriticalSection(&iom->manLock);
74     
75         if ( WAIT_OBJECT_0 == rc ) {
76             /* shutdown */
77             return 0;
78         } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
79             /* work item available, fetch it. */
80             if (FetchWork(pq,(void**)&work)) {
81                 if ( work->workKind & WORKER_READ ) {
82                     if ( work->workKind & WORKER_FOR_SOCKET ) {
83                         len = recv(work->workData.ioData.fd, 
84                                    work->workData.ioData.buf,
85                                    work->workData.ioData.len,
86                                    0);
87                         if (len == SOCKET_ERROR) {
88                             errCode = WSAGetLastError();
89                         }
90                     } else {
91                         len = read(work->workData.ioData.fd,
92                                    work->workData.ioData.buf,
93                                    work->workData.ioData.len);
94                         if (len == -1) { errCode = errno; }
95                     }
96                     complData = work->workData.ioData.buf;
97                     fd = work->workData.ioData.fd;
98                 } else if ( work->workKind & WORKER_WRITE ) {
99                     if ( work->workKind & WORKER_FOR_SOCKET ) {
100                         len = send(work->workData.ioData.fd,
101                                    work->workData.ioData.buf,
102                                    work->workData.ioData.len,
103                                    0);
104                         if (len == SOCKET_ERROR) {
105                             errCode = WSAGetLastError();
106                         }
107                     } else {
108                         len = write(work->workData.ioData.fd,
109                                     work->workData.ioData.buf,
110                                     work->workData.ioData.len);
111                         if (len == -1) { errCode = errno; }
112                     }
113                     complData = work->workData.ioData.buf;
114                     fd = work->workData.ioData.fd;
115                 } else if ( work->workKind & WORKER_DELAY ) {
116                     /* Approximate implementation of threadDelay;
117                      * 
118                      * Note: Sleep() is in milliseconds, not micros.
119                      */
120                     Sleep(work->workData.delayData.msecs / 1000);
121                     len = work->workData.delayData.msecs;
122                     complData = NULL;
123                     fd = 0;
124                     errCode = 0;
125                 } else if ( work->workKind & WORKER_DO_PROC ) {
126                     /* perform operation/proc on behalf of Haskell thread. */
127                     if (work->workData.procData.proc) {
128                         /* The procedure is assumed to encode result + success/failure
129                          * via its param.
130                          */
131                         errCode=work->workData.procData.proc(work->workData.procData.param);
132                     } else {
133                         errCode=1;
134                     }
135                     complData = work->workData.procData.param;
136                 } else {
137                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
138                     fflush(stderr);
139                     continue;
140                 }
141                 work->onCompletion(work->requestID,
142                                    fd,
143                                    len,
144                                    complData,
145                                    errCode);
146                 /* Free the WorkItem */
147                 free(work);
148             } else {
149                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
150                 return 1;
151             }
152         } else {
153             fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
154             return 1;
155         }
156     }
157     return 0;
158 }
159
160 static 
161 BOOL
162 NewIOWorkerThread(IOManagerState* iom)
163 {
164     unsigned threadId;
165     return ( 0 != _beginthreadex(NULL,
166                                  0,
167                                  IOWorkerProc,
168                                  (LPVOID)iom,
169                                  0,
170                                  &threadId) );
171 }
172
173 BOOL
174 StartIOManager(void)
175 {
176     HANDLE hExit;
177     WorkQueue* wq;
178
179     wq = NewWorkQueue();
180     if ( !wq ) return FALSE;  
181   
182     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
183   
184     if (!ioMan) {
185         FreeWorkQueue(wq);
186         return FALSE;
187     }
188
189     /* A manual-reset event */
190     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
191     if ( !hExit ) {
192         FreeWorkQueue(wq);
193         free(ioMan);
194         return FALSE;
195     }
196   
197     ioMan->hExitEvent = hExit;
198     InitializeCriticalSection(&ioMan->manLock);
199     ioMan->workQueue   = wq;
200     ioMan->numWorkers  = 0;
201     ioMan->workersIdle = 0;
202     ioMan->queueSize   = 0;
203     ioMan->requestID   = 1;
204  
205     return TRUE;
206 }
207
208 /*
209  * Function: AddIORequest()
210  *
211  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
212  * request to work queue, deciding whether or not to augment
213  * the thread pool in the process. 
214  */
215 int
216 AddIORequest ( int   fd,
217                BOOL  forWriting,
218                BOOL  isSocket,
219                int   len,
220                char* buffer,
221                CompletionProc onCompletion)
222 {
223     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
224     unsigned int reqID = ioMan->requestID++;
225     if (!ioMan || !wItem) return 0;
226   
227     /* Fill in the blanks */
228     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
229                           ( forWriting ? WORKER_WRITE : WORKER_READ );
230     wItem->workData.ioData.fd  = fd;
231     wItem->workData.ioData.len = len;
232     wItem->workData.ioData.buf = buffer;
233
234     wItem->onCompletion        = onCompletion;
235     wItem->requestID           = reqID;
236   
237     EnterCriticalSection(&ioMan->manLock);
238     /* If there are no worker threads available, create one.
239      *
240      * If this turns out to be too aggressive a policy, refine.
241      */
242 #if 0
243     fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); 
244     fflush(stderr);
245 #endif
246     /* A new worker thread is created when there are fewer idle threads
247      * than non-consumed queue requests. This ensures that requests will
248      * be dealt with in a timely manner.
249      *
250      * [Long explanation of why the previous thread pool policy lead to 
251      * trouble]
252      *
253      * Previously, the thread pool was augmented iff no idle worker threads
254      * were available. That strategy runs the risk of repeatedly adding to
255      * the request queue without expanding the thread pool to handle this
256      * sudden spike in queued requests. 
257      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
258      * thread is created and the, returning without blocking.
259  request is simply queued. If addIORequest()
260      * is called again _before the OS schedules a worker thread to pull the
261      * request off the queue_, workersIdle is still 1 and another request is 
262      * simply added to the queue. Once the worker thread is run, only one
263      * request is de-queued, leaving the 2nd request in the queue]
264      * 
265      * Assuming none of the queued requests take an inordinate amount of to 
266      * complete, the request queue would eventually be drained. But if that's 
267      * not the case, the later requests will end up languishing in the queue 
268      * indefinitely. The non-timely handling of requests may cause CH applications
269      * to misbehave / hang; bad.
270      *
271      */
272     ioMan->queueSize++;
273     if ( ioMan->workersIdle < ioMan->queueSize ) {
274         ioMan->numWorkers++;
275         LeaveCriticalSection(&ioMan->manLock);
276         NewIOWorkerThread(ioMan);
277     } else {
278         LeaveCriticalSection(&ioMan->manLock);
279     }
280     
281     if (SubmitWork(ioMan->workQueue,wItem)) {
282         /* Note: the work item has potentially been consumed by a worker thread
283          *       (and freed) at this point, so we cannot use wItem's requestID.
284          */
285         return reqID;
286     } else {
287         return 0;
288     }
289 }       
290
291 /*
292  * Function: AddDelayRequest()
293  *
294  * Like AddIORequest(), but this time adding a delay request to
295  * the request queue.
296  */
297 BOOL
298 AddDelayRequest ( unsigned int   msecs,
299                   CompletionProc onCompletion)
300 {
301     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
302     unsigned int reqID = ioMan->requestID++;
303     if (!ioMan || !wItem) return FALSE;
304   
305     /* Fill in the blanks */
306     wItem->workKind     = WORKER_DELAY;
307     wItem->workData.delayData.msecs = msecs;
308     wItem->onCompletion = onCompletion;
309     wItem->requestID    = reqID;
310
311     EnterCriticalSection(&ioMan->manLock);
312 #if 0
313     fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
314     fflush(stderr);
315 #endif
316     /* See AddIORequest() for comments regarding policy
317      * for augmenting the worker thread pool.
318      */
319     ioMan->queueSize++;
320     if ( ioMan->workersIdle < ioMan->queueSize ) {
321         ioMan->numWorkers++;
322         LeaveCriticalSection(&ioMan->manLock);
323         NewIOWorkerThread(ioMan);
324     } else {
325         LeaveCriticalSection(&ioMan->manLock);
326     }
327   
328   if (SubmitWork(ioMan->workQueue,wItem)) {
329       /* See AddIORequest() comment */
330       return reqID;
331   } else {
332       return 0;
333   }
334 }
335
336 /*
337  * Function: AddProcRequest()
338  *
339  * Add an asynchronous procedure request.
340  */
341 BOOL
342 AddProcRequest ( void* proc,
343                  void* param,
344                  CompletionProc onCompletion)
345 {
346     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
347     unsigned int reqID = ioMan->requestID++;
348     if (!ioMan || !wItem) return FALSE;
349   
350     /* Fill in the blanks */
351     wItem->workKind     = WORKER_DO_PROC;
352     wItem->workData.procData.proc  = proc;
353     wItem->workData.procData.param = param;
354     wItem->onCompletion = onCompletion;
355     wItem->requestID    = reqID;
356
357     EnterCriticalSection(&ioMan->manLock);
358 #if 0
359     fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
360     fflush(stderr);
361 #endif
362     /* See AddIORequest() for comments regarding policy
363      * for augmenting the worker thread pool.
364      */
365     ioMan->queueSize++;
366     if ( ioMan->workersIdle < ioMan->queueSize ) {
367         ioMan->numWorkers++;
368         LeaveCriticalSection(&ioMan->manLock);
369         NewIOWorkerThread(ioMan);
370     } else {
371         LeaveCriticalSection(&ioMan->manLock);
372     }
373   
374     if (SubmitWork(ioMan->workQueue,wItem)) {
375         /* See AddIORequest() comment */
376         return reqID;
377     } else {
378         return 0;
379     }
380 }
381
382 void ShutdownIOManager()
383 {
384   SetEvent(ioMan->hExitEvent);
385   free(ioMan);
386   ioMan = NULL;
387 }