Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "IOManager.h"
12 #include "WorkQueue.h"
13 #include "ConsoleHandler.h"
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <io.h>
17 #include <winsock.h>
18 #include <process.h>
19 #include <errno.h>
20
21 /*
22  * Internal state maintained by the IO manager.
23  */
24 typedef struct IOManagerState {
25     CritSection      manLock;
26     WorkQueue*       workQueue;
27     int              queueSize;
28     int              numWorkers;
29     int              workersIdle;
30     HANDLE           hExitEvent;
31     unsigned int     requestID;
32     /* fields for keeping track of active WorkItems */
33     CritSection      active_work_lock;
34     WorkItem*        active_work_items;
35 } IOManagerState;
36
37 /* ToDo: wrap up this state via a IOManager handle instead? */
38 static IOManagerState* ioMan;
39
40 static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
41 static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
42
43 /*
44  * The routine executed by each worker thread.
45  */
46 static
47 unsigned
48 WINAPI
49 IOWorkerProc(PVOID param)
50 {
51     HANDLE  hWaits[2];
52     DWORD   rc;
53     IOManagerState* iom = (IOManagerState*)param;
54     WorkQueue* pq = iom->workQueue;
55     WorkItem*  work;
56     int        len = 0, fd = 0;
57     DWORD      errCode = 0;
58     void*      complData;
59
60     hWaits[0] = (HANDLE)iom->hExitEvent;
61     hWaits[1] = GetWorkQueueHandle(pq);
62   
63     while (1) {
64         /* The error code is communicated back on completion of request; reset. */
65         errCode = 0;
66         
67         EnterCriticalSection(&iom->manLock);
68         /* Signal that the worker is idle.
69          *
70          * 'workersIdle' is used when determining whether or not to
71          * increase the worker thread pool when adding a new request.
72          * (see addIORequest().)
73          */
74         iom->workersIdle++;
75         LeaveCriticalSection(&iom->manLock);
76         
77         /*
78          * A possible future refinement is to make long-term idle threads
79          * wake up and decide to shut down should the number of idle threads
80          * be above some threshold.
81          *
82          */
83         rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
84
85         if (rc == WAIT_OBJECT_0) {
86             // we received the exit event
87             EnterCriticalSection(&iom->manLock);
88             ioMan->numWorkers--;
89             LeaveCriticalSection(&iom->manLock);
90             return 0;
91         }
92
93         EnterCriticalSection(&iom->manLock);
94         /* Signal that the thread is 'non-idle' and about to consume 
95          * a work item.
96          */
97         iom->workersIdle--;
98         iom->queueSize--;
99         LeaveCriticalSection(&iom->manLock);
100     
101         if ( rc == (WAIT_OBJECT_0 + 1) ) {
102             /* work item available, fetch it. */
103             if (FetchWork(pq,(void**)&work)) {
104                 work->abandonOp = 0;
105                 RegisterWorkItem(iom,work);
106                 if ( work->workKind & WORKER_READ ) {
107                     if ( work->workKind & WORKER_FOR_SOCKET ) {
108                         len = recv(work->workData.ioData.fd, 
109                                    work->workData.ioData.buf,
110                                    work->workData.ioData.len,
111                                    0);
112                         if (len == SOCKET_ERROR) {
113                             errCode = WSAGetLastError();
114                         }
115                     } else {
116                         while (1) {
117                         /* Do the read(), with extra-special handling for Ctrl+C */
118                         len = read(work->workData.ioData.fd,
119                                    work->workData.ioData.buf,
120                                    work->workData.ioData.len);
121                         if ( len == 0 && work->workData.ioData.len != 0 ) {
122                             /* Given the following scenario:
123                              *     - a console handler has been registered that handles Ctrl+C
124                              *       events.
125                              *     - we've not tweaked the 'console mode' settings to turn on
126                              *       ENABLE_PROCESSED_INPUT.
127                              *     - we're blocked waiting on input from standard input.
128                              *     - the user hits Ctrl+C.
129                              *
130                              * The OS will invoke the console handler (in a separate OS thread),
131                              * and the above read() (i.e., under the hood, a ReadFile() op) returns
132                              * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
133                              * want to percolate this error condition back to the Haskell user.
134                              * Do this by waiting for the completion of the Haskell console handler.
135                              * If upon completion of the console handler routine, the Haskell thread 
136                              * that issued the request is found to have been thrown an exception, 
137                              * the worker abandons the request (since that's what the Haskell thread 
138                              * has done.) If the Haskell thread hasn't been interrupted, the worker 
139                              * retries the read request as if nothing happened.
140                              */
141                             if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
142                                 /* For now, only abort when dealing with the standard input handle.
143                                  * i.e., for all others, an error is raised.
144                                  */
145                                 HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
146                                 if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
147                                     if (rts_waitConsoleHandlerCompletion()) {
148                                         /* If the Scheduler has set work->abandonOp, the Haskell thread has 
149                                          * been thrown an exception (=> the worker must abandon this request.)
150                                          * We test for this below before invoking the on-completion routine.
151                                          */
152                                         if (work->abandonOp) {
153                                             break;
154                                         } else {
155                                             continue;
156                                         }
157                                     } 
158                                 } else { 
159                                     break; /* Treat it like an error */
160                                 }
161                             } else {
162                                 break;
163                             }
164                         } else {
165                             break;
166                         }
167                         }
168                         if (len == -1) { errCode = errno; }
169                     }
170                     complData = work->workData.ioData.buf;
171                     fd = work->workData.ioData.fd;
172                 } else if ( work->workKind & WORKER_WRITE ) {
173                     if ( work->workKind & WORKER_FOR_SOCKET ) {
174                         len = send(work->workData.ioData.fd,
175                                    work->workData.ioData.buf,
176                                    work->workData.ioData.len,
177                                    0);
178                         if (len == SOCKET_ERROR) {
179                             errCode = WSAGetLastError();
180                         }
181                     } else {
182                         len = write(work->workData.ioData.fd,
183                                     work->workData.ioData.buf,
184                                     work->workData.ioData.len);
185                         if (len == -1) {
186                             errCode = errno;
187                             // write() gets errno wrong for
188                             // ERROR_NO_DATA, we have to fix it here:
189                             if (errCode == EINVAL &&
190                                 GetLastError() == ERROR_NO_DATA) {
191                                 errCode = EPIPE;
192                             }
193                         }
194                     }
195                     complData = work->workData.ioData.buf;
196                     fd = work->workData.ioData.fd;
197                 } else if ( work->workKind & WORKER_DELAY ) {
198                     /* Approximate implementation of threadDelay;
199                      * 
200                      * Note: Sleep() is in milliseconds, not micros.
201                      */
202                     Sleep((work->workData.delayData.msecs + 999) / 1000);
203                     len = work->workData.delayData.msecs;
204                     complData = NULL;
205                     fd = 0;
206                     errCode = 0;
207                 } else if ( work->workKind & WORKER_DO_PROC ) {
208                     /* perform operation/proc on behalf of Haskell thread. */
209                     if (work->workData.procData.proc) {
210                         /* The procedure is assumed to encode result + success/failure
211                          * via its param.
212                          */
213                         errCode=work->workData.procData.proc(work->workData.procData.param);
214                     } else {
215                         errCode=1;
216                     }
217                     complData = work->workData.procData.param;
218                 } else {
219                     fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
220                     fflush(stderr);
221                     continue;
222                 }
223                 if (!work->abandonOp) {
224                     work->onCompletion(work->requestID,
225                                        fd,
226                                        len,
227                                        complData,
228                                        errCode);
229                 }
230                 /* Free the WorkItem */
231                 DeregisterWorkItem(iom,work);
232                 free(work);
233             } else {
234                 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
235                 EnterCriticalSection(&iom->manLock);
236                 ioMan->numWorkers--;
237                 LeaveCriticalSection(&iom->manLock);
238                 return 1;
239             }
240         } else {
241             fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
242             EnterCriticalSection(&iom->manLock);
243             ioMan->numWorkers--;
244             LeaveCriticalSection(&iom->manLock);
245             return 1;
246         }
247     }
248     return 0;
249 }
250
251 static 
252 BOOL
253 NewIOWorkerThread(IOManagerState* iom)
254 {
255     unsigned threadId;
256     return ( 0 != _beginthreadex(NULL,
257                                  0,
258                                  IOWorkerProc,
259                                  (LPVOID)iom,
260                                  0,
261                                  &threadId) );
262 }
263
264 BOOL
265 StartIOManager(void)
266 {
267     HANDLE hExit;
268     WorkQueue* wq;
269
270     wq = NewWorkQueue();
271     if ( !wq ) return FALSE;  
272   
273     ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
274   
275     if (!ioMan) {
276         FreeWorkQueue(wq);
277         return FALSE;
278     }
279
280     /* A manual-reset event */
281     hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
282     if ( !hExit ) {
283         FreeWorkQueue(wq);
284         free(ioMan);
285         return FALSE;
286     }
287   
288     ioMan->hExitEvent = hExit;
289     InitializeCriticalSection(&ioMan->manLock);
290     ioMan->workQueue   = wq;
291     ioMan->numWorkers  = 0;
292     ioMan->workersIdle = 0;
293     ioMan->queueSize   = 0;
294     ioMan->requestID   = 1;
295     InitializeCriticalSection(&ioMan->active_work_lock);
296     ioMan->active_work_items = NULL;
297  
298     return TRUE;
299 }
300
301 /*
302  * Function: depositWorkItem()
303  *
304  * Local function which deposits a WorkItem onto a work queue,
305  * deciding in the process whether or not the thread pool needs
306  * to be augmented with another thread to handle the new request.
307  *
308  */
309 static
310 int
311 depositWorkItem( unsigned int reqID,
312                  WorkItem* wItem )
313 {
314     EnterCriticalSection(&ioMan->manLock);
315
316 #if 0
317     fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
318     fflush(stderr);
319 #endif
320     /* A new worker thread is created when there are fewer idle threads
321      * than non-consumed queue requests. This ensures that requests will
322      * be dealt with in a timely manner.
323      *
324      * [Long explanation of why the previous thread pool policy lead to 
325      * trouble]
326      *
327      * Previously, the thread pool was augmented iff no idle worker threads
328      * were available. That strategy runs the risk of repeatedly adding to
329      * the request queue without expanding the thread pool to handle this
330      * sudden spike in queued requests. 
331      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
332      * thread is created and the request is simply queued. If addIORequest()
333      * is called again _before the OS schedules a worker thread to pull the
334      * request off the queue_, workersIdle is still 1 and another request is 
335      * simply added to the queue. Once the worker thread is run, only one
336      * request is de-queued, leaving the 2nd request in the queue]
337      * 
338      * Assuming none of the queued requests take an inordinate amount of to 
339      * complete, the request queue would eventually be drained. But if that's 
340      * not the case, the later requests will end up languishing in the queue 
341      * indefinitely. The non-timely handling of requests may cause CH applications
342      * to misbehave / hang; bad.
343      *
344      */
345     ioMan->queueSize++;
346     if ( (ioMan->workersIdle < ioMan->queueSize) ) {
347         /* see if giving up our quantum ferrets out some idle threads.
348          */
349         LeaveCriticalSection(&ioMan->manLock);
350         Sleep(0);
351         EnterCriticalSection(&ioMan->manLock);
352         if ( (ioMan->workersIdle < ioMan->queueSize) ) {
353             /* No, go ahead and create another. */
354             ioMan->numWorkers++;
355             if (!NewIOWorkerThread(ioMan)) {
356                 ioMan->numWorkers--;
357             }
358         }
359     }
360     LeaveCriticalSection(&ioMan->manLock);
361   
362     if (SubmitWork(ioMan->workQueue,wItem)) {
363         /* Note: the work item has potentially been consumed by a worker thread
364          *       (and freed) at this point, so we cannot use wItem's requestID.
365          */
366         return reqID;
367     } else {
368         return 0;
369     }
370 }
371
372 /*
373  * Function: AddIORequest()
374  *
375  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
376  * request to work queue, deciding whether or not to augment
377  * the thread pool in the process. 
378  */
379 int
380 AddIORequest ( int   fd,
381                BOOL  forWriting,
382                BOOL  isSocket,
383                int   len,
384                char* buffer,
385                CompletionProc onCompletion)
386 {
387     WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
388     unsigned int reqID = ioMan->requestID++;
389     if (!ioMan || !wItem) return 0;
390   
391     /* Fill in the blanks */
392     wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
393                           ( forWriting ? WORKER_WRITE : WORKER_READ );
394     wItem->workData.ioData.fd  = fd;
395     wItem->workData.ioData.len = len;
396     wItem->workData.ioData.buf = buffer;
397     wItem->link = NULL;
398
399     wItem->onCompletion        = onCompletion;
400     wItem->requestID           = reqID;
401   
402     return depositWorkItem(reqID, wItem);
403 }       
404
405 /*
406  * Function: AddDelayRequest()
407  *
408  * Like AddIORequest(), but this time adding a delay request to
409  * the request queue.
410  */
411 BOOL
412 AddDelayRequest ( unsigned int   msecs,
413                   CompletionProc onCompletion)
414 {
415     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
416     unsigned int reqID = ioMan->requestID++;
417     if (!ioMan || !wItem) return FALSE;
418   
419     /* Fill in the blanks */
420     wItem->workKind     = WORKER_DELAY;
421     wItem->workData.delayData.msecs = msecs;
422     wItem->onCompletion = onCompletion;
423     wItem->requestID    = reqID;
424     wItem->link         = NULL;
425
426     return depositWorkItem(reqID, wItem);
427 }
428
429 /*
430  * Function: AddProcRequest()
431  *
432  * Add an asynchronous procedure request.
433  */
434 BOOL
435 AddProcRequest ( void* proc,
436                  void* param,
437                  CompletionProc onCompletion)
438 {
439     WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
440     unsigned int reqID = ioMan->requestID++;
441     if (!ioMan || !wItem) return FALSE;
442   
443     /* Fill in the blanks */
444     wItem->workKind     = WORKER_DO_PROC;
445     wItem->workData.procData.proc  = proc;
446     wItem->workData.procData.param = param;
447     wItem->onCompletion = onCompletion;
448     wItem->requestID    = reqID;
449     wItem->abandonOp    = 0;
450     wItem->link         = NULL;
451
452     return depositWorkItem(reqID, wItem);
453 }
454
455 void ShutdownIOManager ( rtsBool wait_threads )
456 {
457     int num;
458
459     SetEvent(ioMan->hExitEvent);
460   
461     if (wait_threads) {
462         /* Wait for all worker threads to die. */
463         for (;;) {
464             EnterCriticalSection(&ioMan->manLock);
465             num = ioMan->numWorkers;
466             LeaveCriticalSection(&ioMan->manLock);
467             if (num == 0)
468                 break;
469             Sleep(10);
470         }
471         FreeWorkQueue(ioMan->workQueue);
472         CloseHandle(ioMan->hExitEvent);
473         DeleteCriticalSection(&ioMan->active_work_lock);
474         DeleteCriticalSection(&ioMan->manLock);
475         free(ioMan);
476         ioMan = NULL;
477     }
478 }
479
480 /* Keep track of WorkItems currently being serviced. */
481 static 
482 void
483 RegisterWorkItem(IOManagerState* ioMan, 
484                  WorkItem* wi)
485 {
486     EnterCriticalSection(&ioMan->active_work_lock);
487     wi->link = ioMan->active_work_items;
488     ioMan->active_work_items = wi;
489     LeaveCriticalSection(&ioMan->active_work_lock);
490 }
491
492 static 
493 void
494 DeregisterWorkItem(IOManagerState* ioMan, 
495                    WorkItem* wi)
496 {
497     WorkItem *ptr, *prev;
498     
499     EnterCriticalSection(&ioMan->active_work_lock);
500     for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
501         if (wi->requestID == ptr->requestID) {
502             if (prev==NULL) {
503                 ioMan->active_work_items = ptr->link;
504             } else {
505                 prev->link = ptr->link;
506             }
507             LeaveCriticalSection(&ioMan->active_work_lock);
508             return;
509         }
510     }
511     fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
512     LeaveCriticalSection(&ioMan->active_work_lock);
513 }
514
515
516 /*
517  * Function: abandonWorkRequest()
518  *
519  * Signal that a work request isn't of interest. Called by the Scheduler
520  * if a blocked Haskell thread has an exception thrown to it.
521  *
522  * Note: we're not aborting the system call that a worker might be blocked on
523  * here, just disabling the propagation of its result once its finished. We
524  * may have to go the whole hog here and switch to overlapped I/O so that we
525  * can abort blocked system calls.
526  */
527 void
528 abandonWorkRequest ( int reqID )
529 {
530     WorkItem *ptr;
531     EnterCriticalSection(&ioMan->active_work_lock);
532     for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
533         if (ptr->requestID == (unsigned int)reqID ) {
534             ptr->abandonOp = 1;
535             LeaveCriticalSection(&ioMan->active_work_lock);
536             return;
537         }
538     }
539     /* Note: if the request ID isn't present, the worker will have
540      * finished sometime since awaitRequests() last drained the completed
541      * request table; i.e., not an error.
542      */
543     LeaveCriticalSection(&ioMan->active_work_lock);
544 }
545
546 #endif