Reorganisation of the source tree
[ghc-hetmet.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "IOManager.h"
9 #include "WorkQueue.h"
10 #include "ConsoleHandler.h"
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <io.h>
14 #include <winsock.h>
15 #include <process.h>
16
17 /*
18  * Internal state maintained by the IO manager.
19  */
20 typedef struct IOManagerState {
21     CritSection      manLock;
22     WorkQueue*       workQueue;
23     int              queueSize;
24     int              numWorkers;
25     int              workersIdle;
26     HANDLE           hExitEvent;
27     unsigned int     requestID;
28     /* fields for keeping track of active WorkItems */
29     CritSection      active_work_lock;
30     WorkItem*        active_work_items;
31 } IOManagerState;
32
33 /* ToDo: wrap up this state via a IOManager handle instead? */
34 static IOManagerState* ioMan;
35
36 static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
37 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
38
39 /*
40  * The routine executed by each worker thread.
41  */
42 static
43 unsigned
44 WINAPI
45 IOWorkerProc(PVOID param)
46 {
47     HANDLE  hWaits[2];
48     DWORD   rc;
49     IOManagerState* iom = (IOManagerState*)param;
50     WorkQueue* pq = iom->workQueue;
51     WorkItem*  work;
52     int        len = 0, fd = 0;
53     DWORD      errCode = 0;
54     void*      complData;
55
56     hWaits[0] = (HANDLE)iom->hExitEvent;
57     hWaits[1] = GetWorkQueueHandle(pq);
58   
59     while (1) {
60         /* The error code is communicated back on completion of request; reset. */
61         errCode = 0;
62         
63         EnterCriticalSection(&iom->manLock);
64         /* Signal that the worker is idle.
65          *
66          * 'workersIdle' is used when determining whether or not to
67          * increase the worker thread pool when adding a new request.
68          * (see addIORequest().)
69          */
70         iom->workersIdle++;
71         LeaveCriticalSection(&iom->manLock);
72         
73         /*
74          * A possible future refinement is to make long-term idle threads
75          * wake up and decide to shut down should the number of idle threads
76          * be above some threshold.
77          *
78          */
79         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
80
81         if (rc == WAIT_OBJECT_0) {
82             // we received the exit event
83             return 0;
84         }
85
86         EnterCriticalSection(&iom->manLock);
87         /* Signal that the thread is 'non-idle' and about to consume 
88          * a work item.
89          */
90         iom->workersIdle--;
91         iom->queueSize--;
92         LeaveCriticalSection(&iom->manLock);
93     
94         if ( rc == (WAIT_OBJECT_0 + 1) ) {
95             /* work item available, fetch it. */
96             if (FetchWork(pq,(void**)&work)) {
97                 work->abandonOp = 0;
98                 RegisterWorkItem(iom,work);
99                 if ( work->workKind & WORKER_READ ) {
100                     if ( work->workKind & WORKER_FOR_SOCKET ) {
101                         len = recv(work->workData.ioData.fd, 
102                                    work->workData.ioData.buf,
103                                    work->workData.ioData.len,
104                                    0);
105                         if (len == SOCKET_ERROR) {
106                             errCode = WSAGetLastError();
107                         }
108                     } else {
109                         while (1) {
110                         /* Do the read(), with extra-special handling for Ctrl+C */
111                         len = read(work->workData.ioData.fd,
112                                    work->workData.ioData.buf,
113                                    work->workData.ioData.len);
114                         if ( len == 0 && work->workData.ioData.len != 0 ) {
115                             /* Given the following scenario:
116                              *     - a console handler has been registered that handles Ctrl+C
117                              *       events.
118                              *     - we've not tweaked the 'console mode' settings to turn on
119                              *       ENABLE_PROCESSED_INPUT.
120                              *     - we're blocked waiting on input from standard input.
121                              *     - the user hits Ctrl+C.
122                              *
123                              * The OS will invoke the console handler (in a separate OS thread),
124                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
125                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
126                              * want to percolate this error condition back to the Haskell user.
127                              * Do this by waiting for the completion of the Haskell console handler.
128                              * If upon completion of the console handler routine, the Haskell thread 
129                              * that issued the request is found to have been thrown an exception, 
130                              * the worker abandons the request (since that's what the Haskell thread 
131                              * has done.) If the Haskell thread hasn't been interrupted, the worker 
132                              * retries the read request as if nothing happened.
133                              */
134                             if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
135                                 /* For now, only abort when dealing with the standard input handle.
136                                  * i.e., for all others, an error is raised.
137                                  */
138                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
139                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
140                                     if (rts_waitConsoleHandlerCompletion()) {
141                                         /* If the Scheduler has set work->abandonOp, the Haskell thread has 
142                                          * been thrown an exception (=> the worker must abandon this request.)
143                                          * We test for this below before invoking the on-completion routine.
144                                          */
145                                         if (work->abandonOp) {
146                                             break;
147                                         } else {
148                                             continue;
149                                         }
150                                     } 
151                                 } else { 
152                                     break; /* Treat it like an error */
153                                 }
154                             } else {
155                                 break;
156                             }
157                         } else {
158                             break;
159                         }
160                         }
161                         if (len == -1) { errCode = errno; }
162                     }
163                     complData = work->workData.ioData.buf;
164                     fd = work->workData.ioData.fd;
165                 } else if ( work->workKind & WORKER_WRITE ) {
166                     if ( work->workKind & WORKER_FOR_SOCKET ) {
167                         len = send(work->workData.ioData.fd,
168                                    work->workData.ioData.buf,
169                                    work->workData.ioData.len,
170                                    0);
171                         if (len == SOCKET_ERROR) {
172                             errCode = WSAGetLastError();
173                         }
174                     } else {
175                         len = write(work->workData.ioData.fd,
176                                     work->workData.ioData.buf,
177                                     work->workData.ioData.len);
178                         if (len == -1) { errCode = errno; }
179                     }
180                     complData = work->workData.ioData.buf;
181                     fd = work->workData.ioData.fd;
182                 } else if ( work->workKind & WORKER_DELAY ) {
183                     /* Approximate implementation of threadDelay;
184                      * 
185                      * Note: Sleep() is in milliseconds, not micros.
186                      */
187                     Sleep(work->workData.delayData.msecs / 1000);
188                     len = work->workData.delayData.msecs;
189                     complData = NULL;
190                     fd = 0;
191                     errCode = 0;
192                 } else if ( work->workKind & WORKER_DO_PROC ) {
193                     /* perform operation/proc on behalf of Haskell thread. */
194                     if (work->workData.procData.proc) {
195                         /* The procedure is assumed to encode result + success/failure
196                          * via its param.
197                          */
198                         errCode=work->workData.procData.proc(work->workData.procData.param);
199                     } else {
200                         errCode=1;
201                     }
202                     complData = work->workData.procData.param;
203                 } else {
204                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
205                     fflush(stderr);
206                     continue;
207                 }
208                 if (!work->abandonOp) {
209                     work->onCompletion(work->requestID,
210                                        fd,
211                                        len,
212                                        complData,
213                                        errCode);
214                 }
215                 /* Free the WorkItem */
216                 DeregisterWorkItem(iom,work);
217                 free(work);
218             } else {
219                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
220                 return 1;
221             }
222         } else {
223             fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
224             return 1;
225         }
226     }
227     return 0;
228 }
229
230 static 
231 BOOL
232 NewIOWorkerThread(IOManagerState* iom)
233 {
234     unsigned threadId;
235     return ( 0 != _beginthreadex(NULL,
236                                  0,
237                                  IOWorkerProc,
238                                  (LPVOID)iom,
239                                  0,
240                                  &threadId) );
241 }
242
243 BOOL
244 StartIOManager(void)
245 {
246     HANDLE hExit;
247     WorkQueue* wq;
248
249     wq = NewWorkQueue();
250     if ( !wq ) return FALSE;  
251   
252     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
253   
254     if (!ioMan) {
255         FreeWorkQueue(wq);
256         return FALSE;
257     }
258
259     /* A manual-reset event */
260     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
261     if ( !hExit ) {
262         FreeWorkQueue(wq);
263         free(ioMan);
264         return FALSE;
265     }
266   
267     ioMan->hExitEvent = hExit;
268     InitializeCriticalSection(&ioMan->manLock);
269     ioMan->workQueue   = wq;
270     ioMan->numWorkers  = 0;
271     ioMan->workersIdle = 0;
272     ioMan->queueSize   = 0;
273     ioMan->requestID   = 1;
274     InitializeCriticalSection(&ioMan->active_work_lock);
275     ioMan->active_work_items = NULL;
276  
277     return TRUE;
278 }
279
280 /*
281  * Function: depositWorkItem()
282  *
283  * Local function which deposits a WorkItem onto a work queue,
284  * deciding in the process whether or not the thread pool needs
285  * to be augmented with another thread to handle the new request.
286  *
287  */
288 static
289 int
290 depositWorkItem( unsigned int reqID,
291                  WorkItem* wItem )
292 {
293     EnterCriticalSection(&ioMan->manLock);
294
295 #if 0
296     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
297     fflush(stderr);
298 #endif
299     /* A new worker thread is created when there are fewer idle threads
300      * than non-consumed queue requests. This ensures that requests will
301      * be dealt with in a timely manner.
302      *
303      * [Long explanation of why the previous thread pool policy lead to 
304      * trouble]
305      *
306      * Previously, the thread pool was augmented iff no idle worker threads
307      * were available. That strategy runs the risk of repeatedly adding to
308      * the request queue without expanding the thread pool to handle this
309      * sudden spike in queued requests. 
310      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
311      * thread is created and the request is simply queued. If addIORequest()
312      * is called again _before the OS schedules a worker thread to pull the
313      * request off the queue_, workersIdle is still 1 and another request is 
314      * simply added to the queue. Once the worker thread is run, only one
315      * request is de-queued, leaving the 2nd request in the queue]
316      * 
317      * Assuming none of the queued requests take an inordinate amount of to 
318      * complete, the request queue would eventually be drained. But if that's 
319      * not the case, the later requests will end up languishing in the queue 
320      * indefinitely. The non-timely handling of requests may cause CH applications
321      * to misbehave / hang; bad.
322      *
323      */
324     ioMan->queueSize++;
325     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
326         /* see if giving up our quantum ferrets out some idle threads.
327          */
328         LeaveCriticalSection(&ioMan->manLock);
329         Sleep(0);
330         EnterCriticalSection(&ioMan->manLock);
331         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
332             /* No, go ahead and create another. */
333             ioMan->numWorkers++;
334             LeaveCriticalSection(&ioMan->manLock);
335             NewIOWorkerThread(ioMan);
336         } else {
337             LeaveCriticalSection(&ioMan->manLock);
338         }
339     } else {
340         LeaveCriticalSection(&ioMan->manLock);
341     }
342   
343     if (SubmitWork(ioMan->workQueue,wItem)) {
344         /* Note: the work item has potentially been consumed by a worker thread
345          *       (and freed) at this point, so we cannot use wItem's requestID.
346          */
347         return reqID;
348     } else {
349         return 0;
350     }
351 }
352
353 /*
354  * Function: AddIORequest()
355  *
356  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
357  * request to work queue, deciding whether or not to augment
358  * the thread pool in the process. 
359  */
360 int
361 AddIORequest ( int   fd,
362                BOOL  forWriting,
363                BOOL  isSocket,
364                int   len,
365                char* buffer,
366                CompletionProc onCompletion)
367 {
368     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
369     unsigned int reqID = ioMan->requestID++;
370     if (!ioMan || !wItem) return 0;
371   
372     /* Fill in the blanks */
373     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
374                           ( forWriting ? WORKER_WRITE : WORKER_READ );
375     wItem->workData.ioData.fd  = fd;
376     wItem->workData.ioData.len = len;
377     wItem->workData.ioData.buf = buffer;
378     wItem->link = NULL;
379
380     wItem->onCompletion        = onCompletion;
381     wItem->requestID           = reqID;
382   
383     return depositWorkItem(reqID, wItem);
384 }       
385
386 /*
387  * Function: AddDelayRequest()
388  *
389  * Like AddIORequest(), but this time adding a delay request to
390  * the request queue.
391  */
392 BOOL
393 AddDelayRequest ( unsigned int   msecs,
394                   CompletionProc onCompletion)
395 {
396     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
397     unsigned int reqID = ioMan->requestID++;
398     if (!ioMan || !wItem) return FALSE;
399   
400     /* Fill in the blanks */
401     wItem->workKind     = WORKER_DELAY;
402     wItem->workData.delayData.msecs = msecs;
403     wItem->onCompletion = onCompletion;
404     wItem->requestID    = reqID;
405     wItem->link         = NULL;
406
407     return depositWorkItem(reqID, wItem);
408 }
409
410 /*
411  * Function: AddProcRequest()
412  *
413  * Add an asynchronous procedure request.
414  */
415 BOOL
416 AddProcRequest ( void* proc,
417                  void* param,
418                  CompletionProc onCompletion)
419 {
420     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
421     unsigned int reqID = ioMan->requestID++;
422     if (!ioMan || !wItem) return FALSE;
423   
424     /* Fill in the blanks */
425     wItem->workKind     = WORKER_DO_PROC;
426     wItem->workData.procData.proc  = proc;
427     wItem->workData.procData.param = param;
428     wItem->onCompletion = onCompletion;
429     wItem->requestID    = reqID;
430     wItem->abandonOp    = 0;
431     wItem->link         = NULL;
432
433     return depositWorkItem(reqID, wItem);
434 }
435
436 void ShutdownIOManager ( void )
437 {
438   SetEvent(ioMan->hExitEvent);
439   // ToDo: we can't free this now, because the worker thread(s)
440   // haven't necessarily finished with it yet.  Perhaps it should
441   // have a reference count or something.
442   // free(ioMan);
443   // ioMan = NULL;
444 }
445
446 /* Keep track of WorkItems currently being serviced. */
447 static 
448 void
449 RegisterWorkItem(IOManagerState* ioMan, 
450                  WorkItem* wi)
451 {
452     EnterCriticalSection(&ioMan->active_work_lock);
453     wi->link = ioMan->active_work_items;
454     ioMan->active_work_items = wi;
455     LeaveCriticalSection(&ioMan->active_work_lock);
456 }
457
458 static 
459 void
460 DeregisterWorkItem(IOManagerState* ioMan, 
461                    WorkItem* wi)
462 {
463     WorkItem *ptr, *prev;
464     
465     EnterCriticalSection(&ioMan->active_work_lock);
466     for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
467         if (wi->requestID == ptr->requestID) {
468             if (prev==NULL) {
469                 ioMan->active_work_items = ptr->link;
470             } else {
471                 prev->link = ptr->link;
472             }
473             LeaveCriticalSection(&ioMan->active_work_lock);
474             return;
475         }
476     }
477     fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
478     LeaveCriticalSection(&ioMan->active_work_lock);
479 }
480
481
482 /*
483  * Function: abandonWorkRequest()
484  *
485  * Signal that a work request isn't of interest. Called by the Scheduler
486  * if a blocked Haskell thread has an exception thrown to it.
487  *
488  * Note: we're not aborting the system call that a worker might be blocked on
489  * here, just disabling the propagation of its result once its finished. We
490  * may have to go the whole hog here and switch to overlapped I/O so that we
491  * can abort blocked system calls.
492  */
493 void
494 abandonWorkRequest ( int reqID )
495 {
496     WorkItem *ptr;
497     EnterCriticalSection(&ioMan->active_work_lock);
498     for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
499         if (ptr->requestID == (unsigned int)reqID ) {
500             ptr->abandonOp = 1;
501             LeaveCriticalSection(&ioMan->active_work_lock);
502             return;
503         }
504     }
505     /* Note: if the request ID isn't present, the worker will have
506      * finished sometime since awaitRequests() last drained the completed
507      * request table; i.e., not an error.
508      */
509     LeaveCriticalSection(&ioMan->active_work_lock);
510 }