[project @ 2003-09-12 16:26:05 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
index 42eba00..4aec35f 100644 (file)
  * Internal state maintained by the IO manager.
  */
 typedef struct IOManagerState {
-  CritSection      manLock;
-  WorkQueue*       workQueue;
-  int              numWorkers;
-  int              workersIdle;
-  HANDLE           hExitEvent;
-  unsigned int     requestID;
+    CritSection      manLock;
+    WorkQueue*       workQueue;
+    int              queueSize;
+    int              numWorkers;
+    int              workersIdle;
+    HANDLE           hExitEvent;
+    unsigned int     requestID;
 } IOManagerState;
 
 /* ToDo: wrap up this state via a IOManager handle instead? */
@@ -35,173 +36,181 @@ unsigned
 WINAPI
 IOWorkerProc(PVOID param)
 {
-  HANDLE  hWaits[2];
-  DWORD   rc;
-  IOManagerState* iom = (IOManagerState*)param;
-  WorkQueue* pq = iom->workQueue;
-  WorkItem*  work;
-  int        len = 0, fd = 0;
-  DWORD      errCode;
-  void*      complData;
+    HANDLE  hWaits[2];
+    DWORD   rc;
+    IOManagerState* iom = (IOManagerState*)param;
+    WorkQueue* pq = iom->workQueue;
+    WorkItem*  work;
+    int        len = 0, fd = 0;
+    DWORD      errCode;
+    void*      complData;
 
-  hWaits[0] = (HANDLE)iom->hExitEvent;
-  hWaits[1] = GetWorkQueueHandle(pq);
+    hWaits[0] = (HANDLE)iom->hExitEvent;
+    hWaits[1] = GetWorkQueueHandle(pq);
   
-  while (1) {
-    /* The error code is communicated back on completion of request; reset. */
-    errCode = 0;
+    while (1) {
+       /* The error code is communicated back on completion of request; reset. */
+       errCode = 0;
+       
+       EnterCriticalSection(&iom->manLock);
+       /* Signal that the worker is idle.
+        *
+        * 'workersIdle' is used when determining whether or not to
+        * increase the worker thread pool when adding a new request.
+        * (see addIORequest().)
+        */
+       iom->workersIdle++;
+       LeaveCriticalSection(&iom->manLock);
 
-    EnterCriticalSection(&iom->manLock);
-    iom->workersIdle++;
-    LeaveCriticalSection(&iom->manLock);
+       rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
 
-    rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
-
-    EnterCriticalSection(&iom->manLock);
-    iom->workersIdle--;
-    LeaveCriticalSection(&iom->manLock);
+       EnterCriticalSection(&iom->manLock);
+       /* Signal that the thread is 'non-idle' and about to consume 
+        * a work item.
+        */
+       iom->workersIdle--;
+       iom->queueSize--;
+       LeaveCriticalSection(&iom->manLock);
     
-    if ( WAIT_OBJECT_0 == rc ) {
-      /* shutdown */
-#if 0
-      fprintf(stderr, "shutting down...\n"); fflush(stderr);
-#endif
-      return 0;
-    } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
-      /* work item available, fetch it. */
-#if 0
-      fprintf(stderr, "work available...\n"); fflush(stderr);
-#endif
-      if (FetchWork(pq,(void**)&work)) {
-        if ( work->workKind & WORKER_READ ) {
-         if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = recv(work->workData.ioData.fd, 
-                      work->workData.ioData.buf,
-                      work->workData.ioData.len,
-                      0);
-           if (len == SOCKET_ERROR) {
-             errCode = WSAGetLastError();
-           }
-         } else {
-           len = read(work->workData.ioData.fd,
-                      work->workData.ioData.buf,
-                      work->workData.ioData.len);
-           if (len == -1) { errCode = errno; }
-         }
-         complData = work->workData.ioData.buf;
-         fd = work->workData.ioData.fd;
-       } else if ( work->workKind & WORKER_WRITE ) {
-         if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = send(work->workData.ioData.fd,
-                      work->workData.ioData.buf,
-                      work->workData.ioData.len,
-                      0);
-           if (len == SOCKET_ERROR) {
-             errCode = WSAGetLastError();
-           }
-         } else {
-           len = write(work->workData.ioData.fd,
-                       work->workData.ioData.buf,
-                       work->workData.ioData.len);
-           if (len == -1) { errCode = errno; }
-         }
-         complData = work->workData.ioData.buf;
-         fd = work->workData.ioData.fd;
-       } else if ( work->workKind & WORKER_DELAY ) {
-         /* very approximate implementation of threadDelay */
-         Sleep(work->workData.delayData.msecs);
-         len = work->workData.delayData.msecs;
-         complData = NULL;
-         fd = 0;
-         errCode = 0;
-       } else if ( work->workKind & WORKER_DO_PROC ) {
-           /* perform operation/proc on behalf of Haskell thread. */
-           if (work->workData.procData.proc) {
-               /* The procedure is assumed to encode result + success/failure
-                * via its param.
-                */
-               work->workData.procData.proc(work->workData.procData.param);
-               errCode=0;
+       if ( WAIT_OBJECT_0 == rc ) {
+           /* shutdown */
+           return 0;
+       } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+           /* work item available, fetch it. */
+           if (FetchWork(pq,(void**)&work)) {
+               if ( work->workKind & WORKER_READ ) {
+                   if ( work->workKind & WORKER_FOR_SOCKET ) {
+                       len = recv(work->workData.ioData.fd, 
+                                  work->workData.ioData.buf,
+                                  work->workData.ioData.len,
+                                  0);
+                       if (len == SOCKET_ERROR) {
+                           errCode = WSAGetLastError();
+                       }
+                   } else {
+                       len = read(work->workData.ioData.fd,
+                                  work->workData.ioData.buf,
+                                  work->workData.ioData.len);
+                       if (len == -1) { errCode = errno; }
+                   }
+                   complData = work->workData.ioData.buf;
+                   fd = work->workData.ioData.fd;
+               } else if ( work->workKind & WORKER_WRITE ) {
+                   if ( work->workKind & WORKER_FOR_SOCKET ) {
+                       len = send(work->workData.ioData.fd,
+                                  work->workData.ioData.buf,
+                                  work->workData.ioData.len,
+                                  0);
+                       if (len == SOCKET_ERROR) {
+                           errCode = WSAGetLastError();
+                       }
+                   } else {
+                       len = write(work->workData.ioData.fd,
+                                   work->workData.ioData.buf,
+                                   work->workData.ioData.len);
+                       if (len == -1) { errCode = errno; }
+                   }
+                   complData = work->workData.ioData.buf;
+                   fd = work->workData.ioData.fd;
+               } else if ( work->workKind & WORKER_DELAY ) {
+                   /* Approximate implementation of threadDelay;
+                    * 
+                    * Note: Sleep() is in milliseconds, not micros.
+                    */
+                   Sleep(work->workData.delayData.msecs / 1000);
+                   len = work->workData.delayData.msecs;
+                   complData = NULL;
+                   fd = 0;
+                   errCode = 0;
+               } else if ( work->workKind & WORKER_DO_PROC ) {
+                   /* perform operation/proc on behalf of Haskell thread. */
+                   if (work->workData.procData.proc) {
+                       /* The procedure is assumed to encode result + success/failure
+                        * via its param.
+                        */
+                       errCode=work->workData.procData.proc(work->workData.procData.param);
+                   } else {
+                       errCode=1;
+                   }
+                   complData = work->workData.procData.param;
+               } else {
+                   fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+                   fflush(stderr);
+                   continue;
+               }
+               work->onCompletion(work->requestID,
+                                  fd,
+                                  len,
+                                  complData,
+                                  errCode);
+               /* Free the WorkItem */
+               free(work);
            } else {
-               errCode=1;
+               fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+               return 1;
            }
-           complData = work->workData.procData.param;
        } else {
-         fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
-         fflush(stderr);
-         continue;
+           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+           return 1;
        }
-       work->onCompletion(work->requestID,
-                          fd,
-                          len,
-                          complData,
-                          errCode);
-       /* Free the WorkItem */
-       free(work);
-      } else {
-         fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
-         return 1;
-      }
-    } else {
-         fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
-         return 1;
     }
-  }
-  return 0;
+    return 0;
 }
 
 static 
 BOOL
 NewIOWorkerThread(IOManagerState* iom)
 {
-  unsigned threadId;
-  return ( 0 != _beginthreadex(NULL,
-                              0,
-                              IOWorkerProc,
-                              (LPVOID)iom,
-                              0,
-                              &threadId) );
+    unsigned threadId;
+    return ( 0 != _beginthreadex(NULL,
+                                0,
+                                IOWorkerProc,
+                                (LPVOID)iom,
+                                0,
+                                &threadId) );
 }
 
 BOOL
 StartIOManager(void)
 {
-  HANDLE hExit;
-  WorkQueue* wq;
+    HANDLE hExit;
+    WorkQueue* wq;
 
-  wq = NewWorkQueue();
-  if ( !wq ) return FALSE;  
+    wq = NewWorkQueue();
+    if ( !wq ) return FALSE;  
   
-  ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+    ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
   
-  if (!ioMan) {
-    FreeWorkQueue(wq);
-    return FALSE;
-  }
+    if (!ioMan) {
+       FreeWorkQueue(wq);
+       return FALSE;
+    }
 
-  /* A manual-reset event */
-  hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
-  if ( !hExit ) {
-    FreeWorkQueue(wq);
-    free(ioMan);
-    return FALSE;
-  }
+    /* A manual-reset event */
+    hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+    if ( !hExit ) {
+       FreeWorkQueue(wq);
+       free(ioMan);
+       return FALSE;
+    }
   
-  ioMan->hExitEvent = hExit;
-  InitializeCriticalSection(&ioMan->manLock);
-  ioMan->workQueue   = wq;
-  ioMan->numWorkers  = 0;
-  ioMan->workersIdle = 0;
-  ioMan->requestID   = 1;
+    ioMan->hExitEvent = hExit;
+    InitializeCriticalSection(&ioMan->manLock);
+    ioMan->workQueue   = wq;
+    ioMan->numWorkers  = 0;
+    ioMan->workersIdle = 0;
+    ioMan->queueSize   = 0;
+    ioMan->requestID   = 1;
  
-  return TRUE;
+    return TRUE;
 }
 
 /*
  * Function: AddIORequest()
  *
  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, returning without blocking.
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process. 
  */
 int
 AddIORequest ( int   fd,
@@ -211,102 +220,163 @@ AddIORequest ( int   fd,
               char* buffer,
               CompletionProc onCompletion)
 {
-  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-  if (!ioMan || !wItem) return 0;
+    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return 0;
   
-  /* Fill in the blanks */
-  wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
-                        ( forWriting ? WORKER_WRITE : WORKER_READ );
-  wItem->workData.ioData.fd  = fd;
-  wItem->workData.ioData.len = len;
-  wItem->workData.ioData.buf = buffer;
+    /* Fill in the blanks */
+    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
+                         ( forWriting ? WORKER_WRITE : WORKER_READ );
+    wItem->workData.ioData.fd  = fd;
+    wItem->workData.ioData.len = len;
+    wItem->workData.ioData.buf = buffer;
 
-  wItem->onCompletion        = onCompletion;
-  wItem->requestID           = ioMan->requestID++;
+    wItem->onCompletion        = onCompletion;
+    wItem->requestID           = reqID;
   
-  EnterCriticalSection(&ioMan->manLock);
-  /* If there are no worker threads available, create one.
-   *
-   * If this turns out to be too aggressive a policy, refine.
-   */
+    EnterCriticalSection(&ioMan->manLock);
+    /* If there are no worker threads available, create one.
+     *
+     * If this turns out to be too aggressive a policy, refine.
+     */
 #if 0
-  fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
+    fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); 
+    fflush(stderr);
 #endif
-  if ( ioMan->workersIdle == 0 ) {
-    ioMan->numWorkers++;
-    NewIOWorkerThread(ioMan);
-  }
-  LeaveCriticalSection(&ioMan->manLock);
-  
-  if (SubmitWork(ioMan->workQueue,wItem)) {
-    return wItem->requestID;
-  } else {
-    return 0;
-  }
-}             
+    /* A new worker thread is created when there are fewer idle threads
+     * than non-consumed queue requests. This ensures that requests will
+     * be dealt with in a timely manner.
+     *
+     * [Long explanation of why the previous thread pool policy lead to 
+     * trouble]
+     *
+     * Previously, the thread pool was augmented iff no idle worker threads
+     * were available. That strategy runs the risk of repeatedly adding to
+     * the request queue without expanding the thread pool to handle this
+     * sudden spike in queued requests. 
+     * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
+     * thread is created and the, returning without blocking.
+ request is simply queued. If addIORequest()
+     * is called again _before the OS schedules a worker thread to pull the
+     * request off the queue_, workersIdle is still 1 and another request is 
+     * simply added to the queue. Once the worker thread is run, only one
+     * request is de-queued, leaving the 2nd request in the queue]
+     * 
+     * Assuming none of the queued requests take an inordinate amount of to 
+     * complete, the request queue would eventually be drained. But if that's 
+     * not the case, the later requests will end up languishing in the queue 
+     * indefinitely. The non-timely handling of requests may cause CH applications
+     * to misbehave / hang; bad.
+     *
+     */
+    ioMan->queueSize++;
+    if ( ioMan->workersIdle < ioMan->queueSize ) {
+       ioMan->numWorkers++;
+       LeaveCriticalSection(&ioMan->manLock);
+       NewIOWorkerThread(ioMan);
+    } else {
+       LeaveCriticalSection(&ioMan->manLock);
+    }
+    
+    if (SubmitWork(ioMan->workQueue,wItem)) {
+       /* Note: the work item has potentially been consumed by a worker thread
+        *       (and freed) at this point, so we cannot use wItem's requestID.
+        */
+       return reqID;
+    } else {
+       return 0;
+    }
+}       
 
 /*
  * Function: AddDelayRequest()
  *
+ * Like AddIORequest(), but this time adding a delay request to
+ * the request queue.
  */
 BOOL
 AddDelayRequest ( unsigned int   msecs,
                  CompletionProc onCompletion)
 {
-  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-  if (!ioMan || !wItem) return FALSE;
+    WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return FALSE;
   
-  /* Fill in the blanks */
-  wItem->workKind     = WORKER_DELAY;
-  wItem->workData.delayData.msecs = msecs;
-  wItem->onCompletion = onCompletion;
-  wItem->requestID    = ioMan->requestID++;
+    /* Fill in the blanks */
+    wItem->workKind     = WORKER_DELAY;
+    wItem->workData.delayData.msecs = msecs;
+    wItem->onCompletion = onCompletion;
+    wItem->requestID    = reqID;
 
-  EnterCriticalSection(&ioMan->manLock);
-  if ( ioMan->workersIdle == 0 ) {
-    ioMan->numWorkers++;
-    NewIOWorkerThread(ioMan);
-  }
-  LeaveCriticalSection(&ioMan->manLock);
+    EnterCriticalSection(&ioMan->manLock);
+#if 0
+    fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
+    fflush(stderr);
+#endif
+    /* See AddIORequest() for comments regarding policy
+     * for augmenting the worker thread pool.
+     */
+    ioMan->queueSize++;
+    if ( ioMan->workersIdle < ioMan->queueSize ) {
+       ioMan->numWorkers++;
+       LeaveCriticalSection(&ioMan->manLock);
+       NewIOWorkerThread(ioMan);
+    } else {
+       LeaveCriticalSection(&ioMan->manLock);
+    }
   
   if (SubmitWork(ioMan->workQueue,wItem)) {
-    return wItem->requestID;
+      /* See AddIORequest() comment */
+      return reqID;
   } else {
-    return 0;
+      return 0;
   }
 }
 
 /*
- * Function: AddDelayRequest()
+ * Function: AddProcRequest()
  *
+ * Add an asynchronous procedure request.
  */
 BOOL
 AddProcRequest ( void* proc,
                 void* param,
                 CompletionProc onCompletion)
 {
-  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-  if (!ioMan || !wItem) return FALSE;
+    WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return FALSE;
   
-  /* Fill in the blanks */
-  wItem->workKind     = WORKER_DO_PROC;
-  wItem->workData.procData.proc  = proc;
-  wItem->workData.procData.param = param;
-  wItem->onCompletion = onCompletion;
-  wItem->requestID    = ioMan->requestID++;
+    /* Fill in the blanks */
+    wItem->workKind     = WORKER_DO_PROC;
+    wItem->workData.procData.proc  = proc;
+    wItem->workData.procData.param = param;
+    wItem->onCompletion = onCompletion;
+    wItem->requestID    = reqID;
 
-  EnterCriticalSection(&ioMan->manLock);
-  if ( ioMan->workersIdle == 0 ) {
-    ioMan->numWorkers++;
-    NewIOWorkerThread(ioMan);
-  }
-  LeaveCriticalSection(&ioMan->manLock);
+    EnterCriticalSection(&ioMan->manLock);
+#if 0
+    fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
+    fflush(stderr);
+#endif
+    /* See AddIORequest() for comments regarding policy
+     * for augmenting the worker thread pool.
+     */
+    ioMan->queueSize++;
+    if ( ioMan->workersIdle < ioMan->queueSize ) {
+       ioMan->numWorkers++;
+       LeaveCriticalSection(&ioMan->manLock);
+       NewIOWorkerThread(ioMan);
+    } else {
+       LeaveCriticalSection(&ioMan->manLock);
+    }
   
-  if (SubmitWork(ioMan->workQueue,wItem)) {
-    return wItem->requestID;
-  } else {
-    return 0;
-  }
+    if (SubmitWork(ioMan->workQueue,wItem)) {
+       /* See AddIORequest() comment */
+       return reqID;
+    } else {
+       return 0;
+    }
 }
 
 void ShutdownIOManager()