[project @ 2005-05-24 14:18:34 by simonmar]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
index 4aec35f..a67c350 100644 (file)
@@ -4,8 +4,10 @@
  *
  * (c) sof, 2002-2003.
  */
+#include "Rts.h"
 #include "IOManager.h"
 #include "WorkQueue.h"
+#include "ConsoleHandler.h"
 #include <stdio.h>
 #include <stdlib.h>
 #include <io.h>
@@ -23,11 +25,17 @@ typedef struct IOManagerState {
     int              workersIdle;
     HANDLE           hExitEvent;
     unsigned int     requestID;
+    /* fields for keeping track of active WorkItems */
+    CritSection      active_work_lock;
+    WorkItem*        active_work_items;
 } IOManagerState;
 
 /* ToDo: wrap up this state via a IOManager handle instead? */
 static IOManagerState* ioMan;
 
+static void RegisterWorkItem  ( IOManagerState* iom, WorkItem* wi);
+static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
+
 /*
  * The routine executed by each worker thread.
  */
@@ -42,7 +50,7 @@ IOWorkerProc(PVOID param)
     WorkQueue* pq = iom->workQueue;
     WorkItem*  work;
     int        len = 0, fd = 0;
-    DWORD      errCode;
+    DWORD      errCode = 0;
     void*      complData;
 
     hWaits[0] = (HANDLE)iom->hExitEvent;
@@ -61,9 +69,20 @@ IOWorkerProc(PVOID param)
         */
        iom->workersIdle++;
        LeaveCriticalSection(&iom->manLock);
-
+       
+       /*
+        * A possible future refinement is to make long-term idle threads
+        * wake up and decide to shut down should the number of idle threads
+        * be above some threshold.
+        *
+        */
        rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
 
+       if (rc == WAIT_OBJECT_0) {
+           // we received the exit event
+           return 0;
+       }
+
        EnterCriticalSection(&iom->manLock);
        /* Signal that the thread is 'non-idle' and about to consume 
         * a work item.
@@ -72,12 +91,11 @@ IOWorkerProc(PVOID param)
        iom->queueSize--;
        LeaveCriticalSection(&iom->manLock);
     
-       if ( WAIT_OBJECT_0 == rc ) {
-           /* shutdown */
-           return 0;
-       } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+       if ( rc == (WAIT_OBJECT_0 + 1) ) {
            /* work item available, fetch it. */
            if (FetchWork(pq,(void**)&work)) {
+               work->abandonOp = 0;
+               RegisterWorkItem(iom,work);
                if ( work->workKind & WORKER_READ ) {
                    if ( work->workKind & WORKER_FOR_SOCKET ) {
                        len = recv(work->workData.ioData.fd, 
@@ -88,9 +106,58 @@ IOWorkerProc(PVOID param)
                            errCode = WSAGetLastError();
                        }
                    } else {
+                       while (1) {
+                       /* Do the read(), with extra-special handling for Ctrl+C */
                        len = read(work->workData.ioData.fd,
                                   work->workData.ioData.buf,
                                   work->workData.ioData.len);
+                       if ( len == 0 && work->workData.ioData.len != 0 ) {
+                           /* Given the following scenario:
+                            *     - a console handler has been registered that handles Ctrl+C
+                            *       events.
+                            *     - we've not tweaked the 'console mode' settings to turn on
+                            *       ENABLE_PROCESSED_INPUT.
+                            *     - we're blocked waiting on input from standard input.
+                            *     - the user hits Ctrl+C.
+                            *
+                            * The OS will invoke the console handler (in a separate OS thread),
+                            * and the above read() (i.e., under the hood, a ReadFile() op) returns
+                            * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
+                            * want to percolate this error condition back to the Haskell user.
+                            * Do this by waiting for the completion of the Haskell console handler.
+                            * If upon completion of the console handler routine, the Haskell thread 
+                            * that issued the request is found to have been thrown an exception, 
+                            * the worker abandons the request (since that's what the Haskell thread 
+                            * has done.) If the Haskell thread hasn't been interrupted, the worker 
+                            * retries the read request as if nothing happened.
+                            */
+                           if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
+                               /* For now, only abort when dealing with the standard input handle.
+                                * i.e., for all others, an error is raised.
+                                */
+                               HANDLE h  = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
+                               if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
+                                   if (rts_waitConsoleHandlerCompletion()) {
+                                       /* If the Scheduler has set work->abandonOp, the Haskell thread has 
+                                        * been thrown an exception (=> the worker must abandon this request.)
+                                        * We test for this below before invoking the on-completion routine.
+                                        */
+                                       if (work->abandonOp) {
+                                           break;
+                                       } else {
+                                           continue;
+                                       }
+                                   } 
+                               } else { 
+                                   break; /* Treat it like an error */
+                               }
+                           } else {
+                               break;
+                           }
+                       } else {
+                           break;
+                       }
+                       }
                        if (len == -1) { errCode = errno; }
                    }
                    complData = work->workData.ioData.buf;
@@ -138,19 +205,22 @@ IOWorkerProc(PVOID param)
                    fflush(stderr);
                    continue;
                }
-               work->onCompletion(work->requestID,
-                                  fd,
-                                  len,
-                                  complData,
-                                  errCode);
+               if (!work->abandonOp) {
+                   work->onCompletion(work->requestID,
+                                      fd,
+                                      len,
+                                      complData,
+                                      errCode);
+               }
                /* Free the WorkItem */
+               DeregisterWorkItem(iom,work);
                free(work);
            } else {
                fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
                return 1;
            }
        } else {
-           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+           fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
            return 1;
        }
     }
@@ -201,46 +271,29 @@ StartIOManager(void)
     ioMan->workersIdle = 0;
     ioMan->queueSize   = 0;
     ioMan->requestID   = 1;
+    InitializeCriticalSection(&ioMan->active_work_lock);
+    ioMan->active_work_items = NULL;
  
     return TRUE;
 }
 
 /*
- * Function: AddIORequest()
+ * Function: depositWorkItem()
+ *
+ * Local function which deposits a WorkItem onto a work queue,
+ * deciding in the process whether or not the thread pool needs
+ * to be augmented with another thread to handle the new request.
  *
- * Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, deciding whether or not to augment
- * the thread pool in the process. 
  */
+static
 int
-AddIORequest ( int   fd,
-              BOOL  forWriting,
-              BOOL  isSocket,
-              int   len,
-              char* buffer,
-              CompletionProc onCompletion)
+depositWorkItem( unsigned int reqID,
+                WorkItem* wItem )
 {
-    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
-    unsigned int reqID = ioMan->requestID++;
-    if (!ioMan || !wItem) return 0;
-  
-    /* Fill in the blanks */
-    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
-                         ( forWriting ? WORKER_WRITE : WORKER_READ );
-    wItem->workData.ioData.fd  = fd;
-    wItem->workData.ioData.len = len;
-    wItem->workData.ioData.buf = buffer;
-
-    wItem->onCompletion        = onCompletion;
-    wItem->requestID           = reqID;
-  
     EnterCriticalSection(&ioMan->manLock);
-    /* If there are no worker threads available, create one.
-     *
-     * If this turns out to be too aggressive a policy, refine.
-     */
+
 #if 0
-    fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); 
+    fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
     fflush(stderr);
 #endif
     /* A new worker thread is created when there are fewer idle threads
@@ -255,8 +308,7 @@ AddIORequest ( int   fd,
      * the request queue without expanding the thread pool to handle this
      * sudden spike in queued requests. 
      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
-     * thread is created and the, returning without blocking.
- request is simply queued. If addIORequest()
+     * thread is created and the request is simply queued. If addIORequest()
      * is called again _before the OS schedules a worker thread to pull the
      * request off the queue_, workersIdle is still 1 and another request is 
      * simply added to the queue. Once the worker thread is run, only one
@@ -270,14 +322,24 @@ AddIORequest ( int   fd,
      *
      */
     ioMan->queueSize++;
-    if ( ioMan->workersIdle < ioMan->queueSize ) {
-       ioMan->numWorkers++;
+    if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+       /* see if giving up our quantum ferrets out some idle threads.
+        */
        LeaveCriticalSection(&ioMan->manLock);
-       NewIOWorkerThread(ioMan);
+       Sleep(0);
+       EnterCriticalSection(&ioMan->manLock);
+       if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+           /* No, go ahead and create another. */
+           ioMan->numWorkers++;
+           LeaveCriticalSection(&ioMan->manLock);
+           NewIOWorkerThread(ioMan);
+       } else {
+           LeaveCriticalSection(&ioMan->manLock);
+       }
     } else {
        LeaveCriticalSection(&ioMan->manLock);
     }
-    
+  
     if (SubmitWork(ioMan->workQueue,wItem)) {
        /* Note: the work item has potentially been consumed by a worker thread
         *       (and freed) at this point, so we cannot use wItem's requestID.
@@ -286,6 +348,39 @@ AddIORequest ( int   fd,
     } else {
        return 0;
     }
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process. 
+ */
+int
+AddIORequest ( int   fd,
+              BOOL  forWriting,
+              BOOL  isSocket,
+              int   len,
+              char* buffer,
+              CompletionProc onCompletion)
+{
+    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return 0;
+  
+    /* Fill in the blanks */
+    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
+                         ( forWriting ? WORKER_WRITE : WORKER_READ );
+    wItem->workData.ioData.fd  = fd;
+    wItem->workData.ioData.len = len;
+    wItem->workData.ioData.buf = buffer;
+    wItem->link = NULL;
+
+    wItem->onCompletion        = onCompletion;
+    wItem->requestID           = reqID;
+  
+    return depositWorkItem(reqID, wItem);
 }       
 
 /*
@@ -307,30 +402,9 @@ AddDelayRequest ( unsigned int   msecs,
     wItem->workData.delayData.msecs = msecs;
     wItem->onCompletion = onCompletion;
     wItem->requestID    = reqID;
+    wItem->link         = NULL;
 
-    EnterCriticalSection(&ioMan->manLock);
-#if 0
-    fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
-    fflush(stderr);
-#endif
-    /* See AddIORequest() for comments regarding policy
-     * for augmenting the worker thread pool.
-     */
-    ioMan->queueSize++;
-    if ( ioMan->workersIdle < ioMan->queueSize ) {
-       ioMan->numWorkers++;
-       LeaveCriticalSection(&ioMan->manLock);
-       NewIOWorkerThread(ioMan);
-    } else {
-       LeaveCriticalSection(&ioMan->manLock);
-    }
-  
-  if (SubmitWork(ioMan->workQueue,wItem)) {
-      /* See AddIORequest() comment */
-      return reqID;
-  } else {
-      return 0;
-  }
+    return depositWorkItem(reqID, wItem);
 }
 
 /*
@@ -353,35 +427,84 @@ AddProcRequest ( void* proc,
     wItem->workData.procData.param = param;
     wItem->onCompletion = onCompletion;
     wItem->requestID    = reqID;
+    wItem->abandonOp    = 0;
+    wItem->link         = NULL;
 
-    EnterCriticalSection(&ioMan->manLock);
-#if 0
-    fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
-    fflush(stderr);
-#endif
-    /* See AddIORequest() for comments regarding policy
-     * for augmenting the worker thread pool.
-     */
-    ioMan->queueSize++;
-    if ( ioMan->workersIdle < ioMan->queueSize ) {
-       ioMan->numWorkers++;
-       LeaveCriticalSection(&ioMan->manLock);
-       NewIOWorkerThread(ioMan);
-    } else {
-       LeaveCriticalSection(&ioMan->manLock);
-    }
-  
-    if (SubmitWork(ioMan->workQueue,wItem)) {
-       /* See AddIORequest() comment */
-       return reqID;
-    } else {
-       return 0;
-    }
+    return depositWorkItem(reqID, wItem);
 }
 
-void ShutdownIOManager()
+void ShutdownIOManager ( void )
 {
   SetEvent(ioMan->hExitEvent);
-  free(ioMan);
-  ioMan = NULL;
+  // ToDo: we can't free this now, because the worker thread(s)
+  // haven't necessarily finished with it yet.  Perhaps it should
+  // have a reference count or something.
+  // free(ioMan);
+  // ioMan = NULL;
+}
+
+/* Keep track of WorkItems currently being serviced. */
+static 
+void
+RegisterWorkItem(IOManagerState* ioMan, 
+                WorkItem* wi)
+{
+    EnterCriticalSection(&ioMan->active_work_lock);
+    wi->link = ioMan->active_work_items;
+    ioMan->active_work_items = wi;
+    LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+static 
+void
+DeregisterWorkItem(IOManagerState* ioMan, 
+                  WorkItem* wi)
+{
+    WorkItem *ptr, *prev;
+    
+    EnterCriticalSection(&ioMan->active_work_lock);
+    for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
+       if (wi->requestID == ptr->requestID) {
+           if (prev==NULL) {
+               ioMan->active_work_items = ptr->link;
+           } else {
+               prev->link = ptr->link;
+           }
+           LeaveCriticalSection(&ioMan->active_work_lock);
+           return;
+       }
+    }
+    fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
+    LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+
+/*
+ * Function: abandonWorkRequest()
+ *
+ * Signal that a work request isn't of interest. Called by the Scheduler
+ * if a blocked Haskell thread has an exception thrown to it.
+ *
+ * Note: we're not aborting the system call that a worker might be blocked on
+ * here, just disabling the propagation of its result once its finished. We
+ * may have to go the whole hog here and switch to overlapped I/O so that we
+ * can abort blocked system calls.
+ */
+void
+abandonWorkRequest ( int reqID )
+{
+    WorkItem *ptr;
+    EnterCriticalSection(&ioMan->active_work_lock);
+    for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
+       if (ptr->requestID == (unsigned int)reqID ) {
+           ptr->abandonOp = 1;
+           LeaveCriticalSection(&ioMan->active_work_lock);
+           return;
+       }
+    }
+    /* Note: if the request ID isn't present, the worker will have
+     * finished sometime since awaitRequests() last drained the completed
+     * request table; i.e., not an error.
+     */
+    LeaveCriticalSection(&ioMan->active_work_lock);
 }