[project @ 2003-09-15 20:39:38 by sof]
authorsof <unknown>
Mon, 15 Sep 2003 20:39:38 +0000 (20:39 +0000)
committersof <unknown>
Mon, 15 Sep 2003 20:39:38 +0000 (20:39 +0000)
factor out code that handles depositing of work items on the
  thread pool's request queue.
- when it looks as if a new worker thread needs to be created, give
  up our quantum first in the hope that this might at the last minute
  turn up more idle worker threads.
- add comment re: trimming pool size.

Merged to STABLE eventually; I may continue tinkering with this code
some more over the next day or two.

ghc/rts/win32/IOManager.c

index 4aec35f..b8b7942 100644 (file)
@@ -61,7 +61,13 @@ IOWorkerProc(PVOID param)
         */
        iom->workersIdle++;
        LeaveCriticalSection(&iom->manLock);
-
+       
+       /*
+        * A possible future refinement is to make long-term idle threads
+        * wake up and decide to shut down should the number of idle threads
+        * be above some threshold.
+        *
+        */
        rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
 
        EnterCriticalSection(&iom->manLock);
@@ -206,41 +212,22 @@ StartIOManager(void)
 }
 
 /*
- * Function: AddIORequest()
+ * Function: depositWorkItem()
+ *
+ * Local function which deposits a WorkItem onto a work queue,
+ * deciding in the process whether or not the thread pool needs
+ * to be augmented with another thread to handle the new request.
  *
- * Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, deciding whether or not to augment
- * the thread pool in the process. 
  */
+static
 int
-AddIORequest ( int   fd,
-              BOOL  forWriting,
-              BOOL  isSocket,
-              int   len,
-              char* buffer,
-              CompletionProc onCompletion)
+depositWorkItem( unsigned int reqID,
+                WorkItem* wItem )
 {
-    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
-    unsigned int reqID = ioMan->requestID++;
-    if (!ioMan || !wItem) return 0;
-  
-    /* Fill in the blanks */
-    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
-                         ( forWriting ? WORKER_WRITE : WORKER_READ );
-    wItem->workData.ioData.fd  = fd;
-    wItem->workData.ioData.len = len;
-    wItem->workData.ioData.buf = buffer;
-
-    wItem->onCompletion        = onCompletion;
-    wItem->requestID           = reqID;
-  
     EnterCriticalSection(&ioMan->manLock);
-    /* If there are no worker threads available, create one.
-     *
-     * If this turns out to be too aggressive a policy, refine.
-     */
+
 #if 0
-    fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); 
+    fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); 
     fflush(stderr);
 #endif
     /* A new worker thread is created when there are fewer idle threads
@@ -255,8 +242,7 @@ AddIORequest ( int   fd,
      * the request queue without expanding the thread pool to handle this
      * sudden spike in queued requests. 
      * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
-     * thread is created and the, returning without blocking.
- request is simply queued. If addIORequest()
+     * thread is created and the request is simply queued. If addIORequest()
      * is called again _before the OS schedules a worker thread to pull the
      * request off the queue_, workersIdle is still 1 and another request is 
      * simply added to the queue. Once the worker thread is run, only one
@@ -270,14 +256,24 @@ AddIORequest ( int   fd,
      *
      */
     ioMan->queueSize++;
-    if ( ioMan->workersIdle < ioMan->queueSize ) {
-       ioMan->numWorkers++;
+    if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+       /* see if giving up our quantum ferrets out some idle threads.
+        */
        LeaveCriticalSection(&ioMan->manLock);
-       NewIOWorkerThread(ioMan);
+       Sleep(0);
+       EnterCriticalSection(&ioMan->manLock);
+       if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+           /* No, go ahead and create another. */
+           ioMan->numWorkers++;
+           LeaveCriticalSection(&ioMan->manLock);
+           NewIOWorkerThread(ioMan);
+       } else {
+           LeaveCriticalSection(&ioMan->manLock);
+       }
     } else {
        LeaveCriticalSection(&ioMan->manLock);
     }
-    
+  
     if (SubmitWork(ioMan->workQueue,wItem)) {
        /* Note: the work item has potentially been consumed by a worker thread
         *       (and freed) at this point, so we cannot use wItem's requestID.
@@ -286,6 +282,38 @@ AddIORequest ( int   fd,
     } else {
        return 0;
     }
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process. 
+ */
+int
+AddIORequest ( int   fd,
+              BOOL  forWriting,
+              BOOL  isSocket,
+              int   len,
+              char* buffer,
+              CompletionProc onCompletion)
+{
+    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return 0;
+  
+    /* Fill in the blanks */
+    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
+                         ( forWriting ? WORKER_WRITE : WORKER_READ );
+    wItem->workData.ioData.fd  = fd;
+    wItem->workData.ioData.len = len;
+    wItem->workData.ioData.buf = buffer;
+
+    wItem->onCompletion        = onCompletion;
+    wItem->requestID           = reqID;
+  
+    return depositWorkItem(reqID, wItem);
 }       
 
 /*
@@ -308,29 +336,7 @@ AddDelayRequest ( unsigned int   msecs,
     wItem->onCompletion = onCompletion;
     wItem->requestID    = reqID;
 
-    EnterCriticalSection(&ioMan->manLock);
-#if 0
-    fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
-    fflush(stderr);
-#endif
-    /* See AddIORequest() for comments regarding policy
-     * for augmenting the worker thread pool.
-     */
-    ioMan->queueSize++;
-    if ( ioMan->workersIdle < ioMan->queueSize ) {
-       ioMan->numWorkers++;
-       LeaveCriticalSection(&ioMan->manLock);
-       NewIOWorkerThread(ioMan);
-    } else {
-       LeaveCriticalSection(&ioMan->manLock);
-    }
-  
-  if (SubmitWork(ioMan->workQueue,wItem)) {
-      /* See AddIORequest() comment */
-      return reqID;
-  } else {
-      return 0;
-  }
+    return depositWorkItem(reqID, wItem);
 }
 
 /*
@@ -354,29 +360,7 @@ AddProcRequest ( void* proc,
     wItem->onCompletion = onCompletion;
     wItem->requestID    = reqID;
 
-    EnterCriticalSection(&ioMan->manLock);
-#if 0
-    fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
-    fflush(stderr);
-#endif
-    /* See AddIORequest() for comments regarding policy
-     * for augmenting the worker thread pool.
-     */
-    ioMan->queueSize++;
-    if ( ioMan->workersIdle < ioMan->queueSize ) {
-       ioMan->numWorkers++;
-       LeaveCriticalSection(&ioMan->manLock);
-       NewIOWorkerThread(ioMan);
-    } else {
-       LeaveCriticalSection(&ioMan->manLock);
-    }
-  
-    if (SubmitWork(ioMan->workQueue,wItem)) {
-       /* See AddIORequest() comment */
-       return reqID;
-    } else {
-       return 0;
-    }
+    return depositWorkItem(reqID, wItem);
 }
 
 void ShutdownIOManager()