b8b79426522220bd20876965c2fffdad1e01c2d2
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19     CritSection      manLock;
20     WorkQueue*       workQueue;
21     int              queueSize;
22     int              numWorkers;
23     int              workersIdle;
24     HANDLE           hExitEvent;
25     unsigned int     requestID;
26 } IOManagerState;
27
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
30
31 /*
32  * The routine executed by each worker thread.
33  */
34 static
35 unsigned
36 WINAPI
37 IOWorkerProc(PVOID param)
38 {
39     HANDLE  hWaits[2];
40     DWORD   rc;
41     IOManagerState* iom = (IOManagerState*)param;
42     WorkQueue* pq = iom->workQueue;
43     WorkItem*  work;
44     int        len = 0, fd = 0;
45     DWORD      errCode;
46     void*      complData;
47
48     hWaits[0] = (HANDLE)iom->hExitEvent;
49     hWaits[1] = GetWorkQueueHandle(pq);
50   
51     while (1) {
52         /* The error code is communicated back on completion of request; reset. */
53         errCode = 0;
54         
55         EnterCriticalSection(&iom->manLock);
56         /* Signal that the worker is idle.
57          *
58          * 'workersIdle' is used when determining whether or not to
59          * increase the worker thread pool when adding a new request.
60          * (see addIORequest().)
61          */
62         iom->workersIdle++;
63         LeaveCriticalSection(&iom->manLock);
64         
65         /*
66          * A possible future refinement is to make long-term idle threads
67          * wake up and decide to shut down should the number of idle threads
68          * be above some threshold.
69          *
70          */
71         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
72
73         EnterCriticalSection(&iom->manLock);
74         /* Signal that the thread is 'non-idle' and about to consume 
75          * a work item.
76          */
77         iom->workersIdle--;
78         iom->queueSize--;
79         LeaveCriticalSection(&iom->manLock);
80     
81         if ( WAIT_OBJECT_0 == rc ) {
82             /* shutdown */
83             return 0;
84         } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
85             /* work item available, fetch it. */
86             if (FetchWork(pq,(void**)&work)) {
87                 if ( work->workKind & WORKER_READ ) {
88                     if ( work->workKind & WORKER_FOR_SOCKET ) {
89                         len = recv(work->workData.ioData.fd, 
90                                    work->workData.ioData.buf,
91                                    work->workData.ioData.len,
92                                    0);
93                         if (len == SOCKET_ERROR) {
94                             errCode = WSAGetLastError();
95                         }
96                     } else {
97                         len = read(work->workData.ioData.fd,
98                                    work->workData.ioData.buf,
99                                    work->workData.ioData.len);
100                         if (len == -1) { errCode = errno; }
101                     }
102                     complData = work->workData.ioData.buf;
103                     fd = work->workData.ioData.fd;
104                 } else if ( work->workKind & WORKER_WRITE ) {
105                     if ( work->workKind & WORKER_FOR_SOCKET ) {
106                         len = send(work->workData.ioData.fd,
107                                    work->workData.ioData.buf,
108                                    work->workData.ioData.len,
109                                    0);
110                         if (len == SOCKET_ERROR) {
111                             errCode = WSAGetLastError();
112                         }
113                     } else {
114                         len = write(work->workData.ioData.fd,
115                                     work->workData.ioData.buf,
116                                     work->workData.ioData.len);
117                         if (len == -1) { errCode = errno; }
118                     }
119                     complData = work->workData.ioData.buf;
120                     fd = work->workData.ioData.fd;
121                 } else if ( work->workKind & WORKER_DELAY ) {
122                     /* Approximate implementation of threadDelay;
123                      * 
124                      * Note: Sleep() is in milliseconds, not micros.
125                      */
126                     Sleep(work->workData.delayData.msecs / 1000);
127                     len = work->workData.delayData.msecs;
128                     complData = NULL;
129                     fd = 0;
130                     errCode = 0;
131                 } else if ( work->workKind & WORKER_DO_PROC ) {
132                     /* perform operation/proc on behalf of Haskell thread. */
133                     if (work->workData.procData.proc) {
134                         /* The procedure is assumed to encode result + success/failure
135                          * via its param.
136                          */
137                         errCode=work->workData.procData.proc(work->workData.procData.param);
138                     } else {
139                         errCode=1;
140                     }
141                     complData = work->workData.procData.param;
142                 } else {
143                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
144                     fflush(stderr);
145                     continue;
146                 }
147                 work->onCompletion(work->requestID,
148                                    fd,
149                                    len,
150                                    complData,
151                                    errCode);
152                 /* Free the WorkItem */
153                 free(work);
154             } else {
155                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
156                 return 1;
157             }
158         } else {
159             fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
160             return 1;
161         }
162     }
163     return 0;
164 }
165
166 static 
167 BOOL
168 NewIOWorkerThread(IOManagerState* iom)
169 {
170     unsigned threadId;
171     return ( 0 != _beginthreadex(NULL,
172                                  0,
173                                  IOWorkerProc,
174                                  (LPVOID)iom,
175                                  0,
176                                  &threadId) );
177 }
178
179 BOOL
180 StartIOManager(void)
181 {
182     HANDLE hExit;
183     WorkQueue* wq;
184
185     wq = NewWorkQueue();
186     if ( !wq ) return FALSE;  
187   
188     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
189   
190     if (!ioMan) {
191         FreeWorkQueue(wq);
192         return FALSE;
193     }
194
195     /* A manual-reset event */
196     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
197     if ( !hExit ) {
198         FreeWorkQueue(wq);
199         free(ioMan);
200         return FALSE;
201     }
202   
203     ioMan->hExitEvent = hExit;
204     InitializeCriticalSection(&ioMan->manLock);
205     ioMan->workQueue   = wq;
206     ioMan->numWorkers  = 0;
207     ioMan->workersIdle = 0;
208     ioMan->queueSize   = 0;
209     ioMan->requestID   = 1;
210  
211     return TRUE;
212 }
213
214 /*
215  * Function: depositWorkItem()
216  *
217  * Local function which deposits a WorkItem onto a work queue,
218  * deciding in the process whether or not the thread pool needs
219  * to be augmented with another thread to handle the new request.
220  *
221  */
222 static
223 int
224 depositWorkItem( unsigned int reqID,
225                  WorkItem* wItem )
226 {
227     EnterCriticalSection(&ioMan->manLock);
228
229 #if 0
230     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
231     fflush(stderr);
232 #endif
233     /* A new worker thread is created when there are fewer idle threads
234      * than non-consumed queue requests. This ensures that requests will
235      * be dealt with in a timely manner.
236      *
237      * [Long explanation of why the previous thread pool policy lead to 
238      * trouble]
239      *
240      * Previously, the thread pool was augmented iff no idle worker threads
241      * were available. That strategy runs the risk of repeatedly adding to
242      * the request queue without expanding the thread pool to handle this
243      * sudden spike in queued requests. 
244      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
245      * thread is created and the request is simply queued. If addIORequest()
246      * is called again _before the OS schedules a worker thread to pull the
247      * request off the queue_, workersIdle is still 1 and another request is 
248      * simply added to the queue. Once the worker thread is run, only one
249      * request is de-queued, leaving the 2nd request in the queue]
250      * 
251      * Assuming none of the queued requests take an inordinate amount of to 
252      * complete, the request queue would eventually be drained. But if that's 
253      * not the case, the later requests will end up languishing in the queue 
254      * indefinitely. The non-timely handling of requests may cause CH applications
255      * to misbehave / hang; bad.
256      *
257      */
258     ioMan->queueSize++;
259     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
260         /* see if giving up our quantum ferrets out some idle threads.
261          */
262         LeaveCriticalSection(&ioMan->manLock);
263         Sleep(0);
264         EnterCriticalSection(&ioMan->manLock);
265         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
266             /* No, go ahead and create another. */
267             ioMan->numWorkers++;
268             LeaveCriticalSection(&ioMan->manLock);
269             NewIOWorkerThread(ioMan);
270         } else {
271             LeaveCriticalSection(&ioMan->manLock);
272         }
273     } else {
274         LeaveCriticalSection(&ioMan->manLock);
275     }
276   
277     if (SubmitWork(ioMan->workQueue,wItem)) {
278         /* Note: the work item has potentially been consumed by a worker thread
279          *       (and freed) at this point, so we cannot use wItem's requestID.
280          */
281         return reqID;
282     } else {
283         return 0;
284     }
285 }
286
287 /*
288  * Function: AddIORequest()
289  *
290  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
291  * request to work queue, deciding whether or not to augment
292  * the thread pool in the process. 
293  */
294 int
295 AddIORequest ( int   fd,
296                BOOL  forWriting,
297                BOOL  isSocket,
298                int   len,
299                char* buffer,
300                CompletionProc onCompletion)
301 {
302     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
303     unsigned int reqID = ioMan->requestID++;
304     if (!ioMan || !wItem) return 0;
305   
306     /* Fill in the blanks */
307     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
308                           ( forWriting ? WORKER_WRITE : WORKER_READ );
309     wItem->workData.ioData.fd  = fd;
310     wItem->workData.ioData.len = len;
311     wItem->workData.ioData.buf = buffer;
312
313     wItem->onCompletion        = onCompletion;
314     wItem->requestID           = reqID;
315   
316     return depositWorkItem(reqID, wItem);
317 }       
318
319 /*
320  * Function: AddDelayRequest()
321  *
322  * Like AddIORequest(), but this time adding a delay request to
323  * the request queue.
324  */
325 BOOL
326 AddDelayRequest ( unsigned int   msecs,
327                   CompletionProc onCompletion)
328 {
329     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
330     unsigned int reqID = ioMan->requestID++;
331     if (!ioMan || !wItem) return FALSE;
332   
333     /* Fill in the blanks */
334     wItem->workKind     = WORKER_DELAY;
335     wItem->workData.delayData.msecs = msecs;
336     wItem->onCompletion = onCompletion;
337     wItem->requestID    = reqID;
338
339     return depositWorkItem(reqID, wItem);
340 }
341
342 /*
343  * Function: AddProcRequest()
344  *
345  * Add an asynchronous procedure request.
346  */
347 BOOL
348 AddProcRequest ( void* proc,
349                  void* param,
350                  CompletionProc onCompletion)
351 {
352     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
353     unsigned int reqID = ioMan->requestID++;
354     if (!ioMan || !wItem) return FALSE;
355   
356     /* Fill in the blanks */
357     wItem->workKind     = WORKER_DO_PROC;
358     wItem->workData.procData.proc  = proc;
359     wItem->workData.procData.param = param;
360     wItem->onCompletion = onCompletion;
361     wItem->requestID    = reqID;
362
363     return depositWorkItem(reqID, wItem);
364 }
365
366 void ShutdownIOManager()
367 {
368   SetEvent(ioMan->hExitEvent);
369   free(ioMan);
370   ioMan = NULL;
371 }