6af42456aef203a80e816d613eadf3f0833fd7e0
[ghc-hetmet.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "IOManager.h"
12 #include "WorkQueue.h"
13 #include "ConsoleHandler.h"
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <io.h>
17 #include <winsock.h>
18 #include <process.h>
19
20 /*
21  * Internal state maintained by the IO manager.
22  */
23 typedef struct IOManagerState {
24     CritSection      manLock;
25     WorkQueue*       workQueue;
26     int              queueSize;
27     int              numWorkers;
28     int              workersIdle;
29     HANDLE           hExitEvent;
30     unsigned int     requestID;
31     /* fields for keeping track of active WorkItems */
32     CritSection      active_work_lock;
33     WorkItem*        active_work_items;
34 } IOManagerState;
35
36 /* ToDo: wrap up this state via a IOManager handle instead? */
37 static IOManagerState* ioMan;
38
39 static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
40 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
41
42 /*
43  * The routine executed by each worker thread.
44  */
45 static
46 unsigned
47 WINAPI
48 IOWorkerProc(PVOID param)
49 {
50     HANDLE  hWaits[2];
51     DWORD   rc;
52     IOManagerState* iom = (IOManagerState*)param;
53     WorkQueue* pq = iom->workQueue;
54     WorkItem*  work;
55     int        len = 0, fd = 0;
56     DWORD      errCode = 0;
57     void*      complData;
58
59     hWaits[0] = (HANDLE)iom->hExitEvent;
60     hWaits[1] = GetWorkQueueHandle(pq);
61   
62     while (1) {
63         /* The error code is communicated back on completion of request; reset. */
64         errCode = 0;
65         
66         EnterCriticalSection(&iom->manLock);
67         /* Signal that the worker is idle.
68          *
69          * 'workersIdle' is used when determining whether or not to
70          * increase the worker thread pool when adding a new request.
71          * (see addIORequest().)
72          */
73         iom->workersIdle++;
74         LeaveCriticalSection(&iom->manLock);
75         
76         /*
77          * A possible future refinement is to make long-term idle threads
78          * wake up and decide to shut down should the number of idle threads
79          * be above some threshold.
80          *
81          */
82         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
83
84         if (rc == WAIT_OBJECT_0) {
85             // we received the exit event
86             EnterCriticalSection(&iom->manLock);
87             ioMan->numWorkers--;
88             LeaveCriticalSection(&iom->manLock);
89             return 0;
90         }
91
92         EnterCriticalSection(&iom->manLock);
93         /* Signal that the thread is 'non-idle' and about to consume 
94          * a work item.
95          */
96         iom->workersIdle--;
97         iom->queueSize--;
98         LeaveCriticalSection(&iom->manLock);
99     
100         if ( rc == (WAIT_OBJECT_0 + 1) ) {
101             /* work item available, fetch it. */
102             if (FetchWork(pq,(void**)&work)) {
103                 work->abandonOp = 0;
104                 RegisterWorkItem(iom,work);
105                 if ( work->workKind & WORKER_READ ) {
106                     if ( work->workKind & WORKER_FOR_SOCKET ) {
107                         len = recv(work->workData.ioData.fd, 
108                                    work->workData.ioData.buf,
109                                    work->workData.ioData.len,
110                                    0);
111                         if (len == SOCKET_ERROR) {
112                             errCode = WSAGetLastError();
113                         }
114                     } else {
115                         while (1) {
116                         /* Do the read(), with extra-special handling for Ctrl+C */
117                         len = read(work->workData.ioData.fd,
118                                    work->workData.ioData.buf,
119                                    work->workData.ioData.len);
120                         if ( len == 0 && work->workData.ioData.len != 0 ) {
121                             /* Given the following scenario:
122                              *     - a console handler has been registered that handles Ctrl+C
123                              *       events.
124                              *     - we've not tweaked the 'console mode' settings to turn on
125                              *       ENABLE_PROCESSED_INPUT.
126                              *     - we're blocked waiting on input from standard input.
127                              *     - the user hits Ctrl+C.
128                              *
129                              * The OS will invoke the console handler (in a separate OS thread),
130                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
131                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
132                              * want to percolate this error condition back to the Haskell user.
133                              * Do this by waiting for the completion of the Haskell console handler.
134                              * If upon completion of the console handler routine, the Haskell thread 
135                              * that issued the request is found to have been thrown an exception, 
136                              * the worker abandons the request (since that's what the Haskell thread 
137                              * has done.) If the Haskell thread hasn't been interrupted, the worker 
138                              * retries the read request as if nothing happened.
139                              */
140                             if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
141                                 /* For now, only abort when dealing with the standard input handle.
142                                  * i.e., for all others, an error is raised.
143                                  */
144                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
145                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
146                                     if (rts_waitConsoleHandlerCompletion()) {
147                                         /* If the Scheduler has set work->abandonOp, the Haskell thread has 
148                                          * been thrown an exception (=> the worker must abandon this request.)
149                                          * We test for this below before invoking the on-completion routine.
150                                          */
151                                         if (work->abandonOp) {
152                                             break;
153                                         } else {
154                                             continue;
155                                         }
156                                     } 
157                                 } else { 
158                                     break; /* Treat it like an error */
159                                 }
160                             } else {
161                                 break;
162                             }
163                         } else {
164                             break;
165                         }
166                         }
167                         if (len == -1) { errCode = errno; }
168                     }
169                     complData = work->workData.ioData.buf;
170                     fd = work->workData.ioData.fd;
171                 } else if ( work->workKind & WORKER_WRITE ) {
172                     if ( work->workKind & WORKER_FOR_SOCKET ) {
173                         len = send(work->workData.ioData.fd,
174                                    work->workData.ioData.buf,
175                                    work->workData.ioData.len,
176                                    0);
177                         if (len == SOCKET_ERROR) {
178                             errCode = WSAGetLastError();
179                         }
180                     } else {
181                         len = write(work->workData.ioData.fd,
182                                     work->workData.ioData.buf,
183                                     work->workData.ioData.len);
184                         if (len == -1) { errCode = errno; }
185                     }
186                     complData = work->workData.ioData.buf;
187                     fd = work->workData.ioData.fd;
188                 } else if ( work->workKind & WORKER_DELAY ) {
189                     /* Approximate implementation of threadDelay;
190                      * 
191                      * Note: Sleep() is in milliseconds, not micros.
192                      */
193                     Sleep((work->workData.delayData.msecs + 999) / 1000);
194                     len = work->workData.delayData.msecs;
195                     complData = NULL;
196                     fd = 0;
197                     errCode = 0;
198                 } else if ( work->workKind & WORKER_DO_PROC ) {
199                     /* perform operation/proc on behalf of Haskell thread. */
200                     if (work->workData.procData.proc) {
201                         /* The procedure is assumed to encode result + success/failure
202                          * via its param.
203                          */
204                         errCode=work->workData.procData.proc(work->workData.procData.param);
205                     } else {
206                         errCode=1;
207                     }
208                     complData = work->workData.procData.param;
209                 } else {
210                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
211                     fflush(stderr);
212                     continue;
213                 }
214                 if (!work->abandonOp) {
215                     work->onCompletion(work->requestID,
216                                        fd,
217                                        len,
218                                        complData,
219                                        errCode);
220                 }
221                 /* Free the WorkItem */
222                 DeregisterWorkItem(iom,work);
223                 free(work);
224             } else {
225                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
226                 EnterCriticalSection(&iom->manLock);
227                 ioMan->numWorkers--;
228                 LeaveCriticalSection(&iom->manLock);
229                 return 1;
230             }
231         } else {
232             fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
233             EnterCriticalSection(&iom->manLock);
234             ioMan->numWorkers--;
235             LeaveCriticalSection(&iom->manLock);
236             return 1;
237         }
238     }
239     return 0;
240 }
241
242 static 
243 BOOL
244 NewIOWorkerThread(IOManagerState* iom)
245 {
246     unsigned threadId;
247     return ( 0 != _beginthreadex(NULL,
248                                  0,
249                                  IOWorkerProc,
250                                  (LPVOID)iom,
251                                  0,
252                                  &threadId) );
253 }
254
255 BOOL
256 StartIOManager(void)
257 {
258     HANDLE hExit;
259     WorkQueue* wq;
260
261     wq = NewWorkQueue();
262     if ( !wq ) return FALSE;  
263   
264     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
265   
266     if (!ioMan) {
267         FreeWorkQueue(wq);
268         return FALSE;
269     }
270
271     /* A manual-reset event */
272     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
273     if ( !hExit ) {
274         FreeWorkQueue(wq);
275         free(ioMan);
276         return FALSE;
277     }
278   
279     ioMan->hExitEvent = hExit;
280     InitializeCriticalSection(&ioMan->manLock);
281     ioMan->workQueue   = wq;
282     ioMan->numWorkers  = 0;
283     ioMan->workersIdle = 0;
284     ioMan->queueSize   = 0;
285     ioMan->requestID   = 1;
286     InitializeCriticalSection(&ioMan->active_work_lock);
287     ioMan->active_work_items = NULL;
288  
289     return TRUE;
290 }
291
292 /*
293  * Function: depositWorkItem()
294  *
295  * Local function which deposits a WorkItem onto a work queue,
296  * deciding in the process whether or not the thread pool needs
297  * to be augmented with another thread to handle the new request.
298  *
299  */
300 static
301 int
302 depositWorkItem( unsigned int reqID,
303                  WorkItem* wItem )
304 {
305     EnterCriticalSection(&ioMan->manLock);
306
307 #if 0
308     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
309     fflush(stderr);
310 #endif
311     /* A new worker thread is created when there are fewer idle threads
312      * than non-consumed queue requests. This ensures that requests will
313      * be dealt with in a timely manner.
314      *
315      * [Long explanation of why the previous thread pool policy lead to 
316      * trouble]
317      *
318      * Previously, the thread pool was augmented iff no idle worker threads
319      * were available. That strategy runs the risk of repeatedly adding to
320      * the request queue without expanding the thread pool to handle this
321      * sudden spike in queued requests. 
322      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
323      * thread is created and the request is simply queued. If addIORequest()
324      * is called again _before the OS schedules a worker thread to pull the
325      * request off the queue_, workersIdle is still 1 and another request is 
326      * simply added to the queue. Once the worker thread is run, only one
327      * request is de-queued, leaving the 2nd request in the queue]
328      * 
329      * Assuming none of the queued requests take an inordinate amount of to 
330      * complete, the request queue would eventually be drained. But if that's 
331      * not the case, the later requests will end up languishing in the queue 
332      * indefinitely. The non-timely handling of requests may cause CH applications
333      * to misbehave / hang; bad.
334      *
335      */
336     ioMan->queueSize++;
337     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
338         /* see if giving up our quantum ferrets out some idle threads.
339          */
340         LeaveCriticalSection(&ioMan->manLock);
341         Sleep(0);
342         EnterCriticalSection(&ioMan->manLock);
343         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
344             /* No, go ahead and create another. */
345             ioMan->numWorkers++;
346             if (!NewIOWorkerThread(ioMan)) {
347                 ioMan->numWorkers--;
348             }
349         }
350     }
351     LeaveCriticalSection(&ioMan->manLock);
352   
353     if (SubmitWork(ioMan->workQueue,wItem)) {
354         /* Note: the work item has potentially been consumed by a worker thread
355          *       (and freed) at this point, so we cannot use wItem's requestID.
356          */
357         return reqID;
358     } else {
359         return 0;
360     }
361 }
362
363 /*
364  * Function: AddIORequest()
365  *
366  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
367  * request to work queue, deciding whether or not to augment
368  * the thread pool in the process. 
369  */
370 int
371 AddIORequest ( int   fd,
372                BOOL  forWriting,
373                BOOL  isSocket,
374                int   len,
375                char* buffer,
376                CompletionProc onCompletion)
377 {
378     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
379     unsigned int reqID = ioMan->requestID++;
380     if (!ioMan || !wItem) return 0;
381   
382     /* Fill in the blanks */
383     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
384                           ( forWriting ? WORKER_WRITE : WORKER_READ );
385     wItem->workData.ioData.fd  = fd;
386     wItem->workData.ioData.len = len;
387     wItem->workData.ioData.buf = buffer;
388     wItem->link = NULL;
389
390     wItem->onCompletion        = onCompletion;
391     wItem->requestID           = reqID;
392   
393     return depositWorkItem(reqID, wItem);
394 }       
395
396 /*
397  * Function: AddDelayRequest()
398  *
399  * Like AddIORequest(), but this time adding a delay request to
400  * the request queue.
401  */
402 BOOL
403 AddDelayRequest ( unsigned int   msecs,
404                   CompletionProc onCompletion)
405 {
406     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
407     unsigned int reqID = ioMan->requestID++;
408     if (!ioMan || !wItem) return FALSE;
409   
410     /* Fill in the blanks */
411     wItem->workKind     = WORKER_DELAY;
412     wItem->workData.delayData.msecs = msecs;
413     wItem->onCompletion = onCompletion;
414     wItem->requestID    = reqID;
415     wItem->link         = NULL;
416
417     return depositWorkItem(reqID, wItem);
418 }
419
420 /*
421  * Function: AddProcRequest()
422  *
423  * Add an asynchronous procedure request.
424  */
425 BOOL
426 AddProcRequest ( void* proc,
427                  void* param,
428                  CompletionProc onCompletion)
429 {
430     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
431     unsigned int reqID = ioMan->requestID++;
432     if (!ioMan || !wItem) return FALSE;
433   
434     /* Fill in the blanks */
435     wItem->workKind     = WORKER_DO_PROC;
436     wItem->workData.procData.proc  = proc;
437     wItem->workData.procData.param = param;
438     wItem->onCompletion = onCompletion;
439     wItem->requestID    = reqID;
440     wItem->abandonOp    = 0;
441     wItem->link         = NULL;
442
443     return depositWorkItem(reqID, wItem);
444 }
445
446 void ShutdownIOManager ( void )
447 {
448     int num;
449
450     SetEvent(ioMan->hExitEvent);
451   
452     /* Wait for all worker threads to die. */
453     for (;;) {
454         EnterCriticalSection(&ioMan->manLock);
455         num = ioMan->numWorkers;
456         LeaveCriticalSection(&ioMan->manLock);
457         if (num == 0)
458             break;
459         Sleep(10);
460     }
461     FreeWorkQueue(ioMan->workQueue);
462     CloseHandle(ioMan->hExitEvent);
463     free(ioMan);
464     ioMan = NULL;
465 }
466
467 /* Keep track of WorkItems currently being serviced. */
468 static 
469 void
470 RegisterWorkItem(IOManagerState* ioMan, 
471                  WorkItem* wi)
472 {
473     EnterCriticalSection(&ioMan->active_work_lock);
474     wi->link = ioMan->active_work_items;
475     ioMan->active_work_items = wi;
476     LeaveCriticalSection(&ioMan->active_work_lock);
477 }
478
479 static 
480 void
481 DeregisterWorkItem(IOManagerState* ioMan, 
482                    WorkItem* wi)
483 {
484     WorkItem *ptr, *prev;
485     
486     EnterCriticalSection(&ioMan->active_work_lock);
487     for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
488         if (wi->requestID == ptr->requestID) {
489             if (prev==NULL) {
490                 ioMan->active_work_items = ptr->link;
491             } else {
492                 prev->link = ptr->link;
493             }
494             LeaveCriticalSection(&ioMan->active_work_lock);
495             return;
496         }
497     }
498     fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
499     LeaveCriticalSection(&ioMan->active_work_lock);
500 }
501
502
503 /*
504  * Function: abandonWorkRequest()
505  *
506  * Signal that a work request isn't of interest. Called by the Scheduler
507  * if a blocked Haskell thread has an exception thrown to it.
508  *
509  * Note: we're not aborting the system call that a worker might be blocked on
510  * here, just disabling the propagation of its result once its finished. We
511  * may have to go the whole hog here and switch to overlapped I/O so that we
512  * can abort blocked system calls.
513  */
514 void
515 abandonWorkRequest ( int reqID )
516 {
517     WorkItem *ptr;
518     EnterCriticalSection(&ioMan->active_work_lock);
519     for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
520         if (ptr->requestID == (unsigned int)reqID ) {
521             ptr->abandonOp = 1;
522             LeaveCriticalSection(&ioMan->active_work_lock);
523             return;
524         }
525     }
526     /* Note: if the request ID isn't present, the worker will have
527      * finished sometime since awaitRequests() last drained the completed
528      * request table; i.e., not an error.
529      */
530     LeaveCriticalSection(&ioMan->active_work_lock);
531 }
532
533 #endif