c08c4e3014b9e546691fc63e187465b49067da15
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19     CritSection      manLock;
20     WorkQueue*       workQueue;
21     int              queueSize;
22     int              numWorkers;
23     int              workersIdle;
24     HANDLE           hExitEvent;
25     unsigned int     requestID;
26 } IOManagerState;
27
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
30
31 /*
32  * The routine executed by each worker thread.
33  */
34 static
35 unsigned
36 WINAPI
37 IOWorkerProc(PVOID param)
38 {
39     HANDLE  hWaits[2];
40     DWORD   rc;
41     IOManagerState* iom = (IOManagerState*)param;
42     WorkQueue* pq = iom->workQueue;
43     WorkItem*  work;
44     int        len = 0, fd = 0;
45     DWORD      errCode;
46     void*      complData;
47
48     hWaits[0] = (HANDLE)iom->hExitEvent;
49     hWaits[1] = GetWorkQueueHandle(pq);
50   
51     while (1) {
52         /* The error code is communicated back on completion of request; reset. */
53         errCode = 0;
54         
55         EnterCriticalSection(&iom->manLock);
56         /* Signal that the worker is idle.
57          *
58          * 'workersIdle' is used when determining whether or not to
59          * increase the worker thread pool when adding a new request.
60          * (see addIORequest().)
61          */
62         iom->workersIdle++;
63         LeaveCriticalSection(&iom->manLock);
64         
65         /*
66          * A possible future refinement is to make long-term idle threads
67          * wake up and decide to shut down should the number of idle threads
68          * be above some threshold.
69          *
70          */
71         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
72
73         if (rc == WAIT_OBJECT_0) {
74             // we received the exit event
75             return 0;
76         }
77
78         EnterCriticalSection(&iom->manLock);
79         /* Signal that the thread is 'non-idle' and about to consume 
80          * a work item.
81          */
82         iom->workersIdle--;
83         iom->queueSize--;
84         LeaveCriticalSection(&iom->manLock);
85     
86         if ( rc == (WAIT_OBJECT_0 + 1) ) {
87             /* work item available, fetch it. */
88             if (FetchWork(pq,(void**)&work)) {
89                 if ( work->workKind & WORKER_READ ) {
90                     if ( work->workKind & WORKER_FOR_SOCKET ) {
91                         len = recv(work->workData.ioData.fd, 
92                                    work->workData.ioData.buf,
93                                    work->workData.ioData.len,
94                                    0);
95                         if (len == SOCKET_ERROR) {
96                             errCode = WSAGetLastError();
97                         }
98                     } else {
99                         len = read(work->workData.ioData.fd,
100                                    work->workData.ioData.buf,
101                                    work->workData.ioData.len);
102                         if (len == -1) { errCode = errno; }
103                     }
104                     complData = work->workData.ioData.buf;
105                     fd = work->workData.ioData.fd;
106                 } else if ( work->workKind & WORKER_WRITE ) {
107                     if ( work->workKind & WORKER_FOR_SOCKET ) {
108                         len = send(work->workData.ioData.fd,
109                                    work->workData.ioData.buf,
110                                    work->workData.ioData.len,
111                                    0);
112                         if (len == SOCKET_ERROR) {
113                             errCode = WSAGetLastError();
114                         }
115                     } else {
116                         len = write(work->workData.ioData.fd,
117                                     work->workData.ioData.buf,
118                                     work->workData.ioData.len);
119                         if (len == -1) { errCode = errno; }
120                     }
121                     complData = work->workData.ioData.buf;
122                     fd = work->workData.ioData.fd;
123                 } else if ( work->workKind & WORKER_DELAY ) {
124                     /* Approximate implementation of threadDelay;
125                      * 
126                      * Note: Sleep() is in milliseconds, not micros.
127                      */
128                     Sleep(work->workData.delayData.msecs / 1000);
129                     len = work->workData.delayData.msecs;
130                     complData = NULL;
131                     fd = 0;
132                     errCode = 0;
133                 } else if ( work->workKind & WORKER_DO_PROC ) {
134                     /* perform operation/proc on behalf of Haskell thread. */
135                     if (work->workData.procData.proc) {
136                         /* The procedure is assumed to encode result + success/failure
137                          * via its param.
138                          */
139                         errCode=work->workData.procData.proc(work->workData.procData.param);
140                     } else {
141                         errCode=1;
142                     }
143                     complData = work->workData.procData.param;
144                 } else {
145                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
146                     fflush(stderr);
147                     continue;
148                 }
149                 work->onCompletion(work->requestID,
150                                    fd,
151                                    len,
152                                    complData,
153                                    errCode);
154                 /* Free the WorkItem */
155                 free(work);
156             } else {
157                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
158                 return 1;
159             }
160         } else {
161             fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
162             return 1;
163         }
164     }
165     return 0;
166 }
167
168 static 
169 BOOL
170 NewIOWorkerThread(IOManagerState* iom)
171 {
172     unsigned threadId;
173     return ( 0 != _beginthreadex(NULL,
174                                  0,
175                                  IOWorkerProc,
176                                  (LPVOID)iom,
177                                  0,
178                                  &threadId) );
179 }
180
181 BOOL
182 StartIOManager(void)
183 {
184     HANDLE hExit;
185     WorkQueue* wq;
186
187     wq = NewWorkQueue();
188     if ( !wq ) return FALSE;  
189   
190     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
191   
192     if (!ioMan) {
193         FreeWorkQueue(wq);
194         return FALSE;
195     }
196
197     /* A manual-reset event */
198     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
199     if ( !hExit ) {
200         FreeWorkQueue(wq);
201         free(ioMan);
202         return FALSE;
203     }
204   
205     ioMan->hExitEvent = hExit;
206     InitializeCriticalSection(&ioMan->manLock);
207     ioMan->workQueue   = wq;
208     ioMan->numWorkers  = 0;
209     ioMan->workersIdle = 0;
210     ioMan->queueSize   = 0;
211     ioMan->requestID   = 1;
212  
213     return TRUE;
214 }
215
216 /*
217  * Function: depositWorkItem()
218  *
219  * Local function which deposits a WorkItem onto a work queue,
220  * deciding in the process whether or not the thread pool needs
221  * to be augmented with another thread to handle the new request.
222  *
223  */
224 static
225 int
226 depositWorkItem( unsigned int reqID,
227                  WorkItem* wItem )
228 {
229     EnterCriticalSection(&ioMan->manLock);
230
231 #if 0
232     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
233     fflush(stderr);
234 #endif
235     /* A new worker thread is created when there are fewer idle threads
236      * than non-consumed queue requests. This ensures that requests will
237      * be dealt with in a timely manner.
238      *
239      * [Long explanation of why the previous thread pool policy lead to 
240      * trouble]
241      *
242      * Previously, the thread pool was augmented iff no idle worker threads
243      * were available. That strategy runs the risk of repeatedly adding to
244      * the request queue without expanding the thread pool to handle this
245      * sudden spike in queued requests. 
246      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
247      * thread is created and the request is simply queued. If addIORequest()
248      * is called again _before the OS schedules a worker thread to pull the
249      * request off the queue_, workersIdle is still 1 and another request is 
250      * simply added to the queue. Once the worker thread is run, only one
251      * request is de-queued, leaving the 2nd request in the queue]
252      * 
253      * Assuming none of the queued requests take an inordinate amount of to 
254      * complete, the request queue would eventually be drained. But if that's 
255      * not the case, the later requests will end up languishing in the queue 
256      * indefinitely. The non-timely handling of requests may cause CH applications
257      * to misbehave / hang; bad.
258      *
259      */
260     ioMan->queueSize++;
261     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
262         /* see if giving up our quantum ferrets out some idle threads.
263          */
264         LeaveCriticalSection(&ioMan->manLock);
265         Sleep(0);
266         EnterCriticalSection(&ioMan->manLock);
267         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
268             /* No, go ahead and create another. */
269             ioMan->numWorkers++;
270             LeaveCriticalSection(&ioMan->manLock);
271             NewIOWorkerThread(ioMan);
272         } else {
273             LeaveCriticalSection(&ioMan->manLock);
274         }
275     } else {
276         LeaveCriticalSection(&ioMan->manLock);
277     }
278   
279     if (SubmitWork(ioMan->workQueue,wItem)) {
280         /* Note: the work item has potentially been consumed by a worker thread
281          *       (and freed) at this point, so we cannot use wItem's requestID.
282          */
283         return reqID;
284     } else {
285         return 0;
286     }
287 }
288
289 /*
290  * Function: AddIORequest()
291  *
292  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
293  * request to work queue, deciding whether or not to augment
294  * the thread pool in the process. 
295  */
296 int
297 AddIORequest ( int   fd,
298                BOOL  forWriting,
299                BOOL  isSocket,
300                int   len,
301                char* buffer,
302                CompletionProc onCompletion)
303 {
304     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
305     unsigned int reqID = ioMan->requestID++;
306     if (!ioMan || !wItem) return 0;
307   
308     /* Fill in the blanks */
309     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
310                           ( forWriting ? WORKER_WRITE : WORKER_READ );
311     wItem->workData.ioData.fd  = fd;
312     wItem->workData.ioData.len = len;
313     wItem->workData.ioData.buf = buffer;
314
315     wItem->onCompletion        = onCompletion;
316     wItem->requestID           = reqID;
317   
318     return depositWorkItem(reqID, wItem);
319 }       
320
321 /*
322  * Function: AddDelayRequest()
323  *
324  * Like AddIORequest(), but this time adding a delay request to
325  * the request queue.
326  */
327 BOOL
328 AddDelayRequest ( unsigned int   msecs,
329                   CompletionProc onCompletion)
330 {
331     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
332     unsigned int reqID = ioMan->requestID++;
333     if (!ioMan || !wItem) return FALSE;
334   
335     /* Fill in the blanks */
336     wItem->workKind     = WORKER_DELAY;
337     wItem->workData.delayData.msecs = msecs;
338     wItem->onCompletion = onCompletion;
339     wItem->requestID    = reqID;
340
341     return depositWorkItem(reqID, wItem);
342 }
343
344 /*
345  * Function: AddProcRequest()
346  *
347  * Add an asynchronous procedure request.
348  */
349 BOOL
350 AddProcRequest ( void* proc,
351                  void* param,
352                  CompletionProc onCompletion)
353 {
354     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
355     unsigned int reqID = ioMan->requestID++;
356     if (!ioMan || !wItem) return FALSE;
357   
358     /* Fill in the blanks */
359     wItem->workKind     = WORKER_DO_PROC;
360     wItem->workData.procData.proc  = proc;
361     wItem->workData.procData.param = param;
362     wItem->onCompletion = onCompletion;
363     wItem->requestID    = reqID;
364
365     return depositWorkItem(reqID, wItem);
366 }
367
368 void ShutdownIOManager ( void )
369 {
370   SetEvent(ioMan->hExitEvent);
371   // ToDo: we can't free this now, because the worker thread(s)
372   // haven't necessarily finished with it yet.  Perhaps it should
373   // have a reference count or something.
374   // free(ioMan);
375   // ioMan = NULL;
376 }