[project @ 2003-09-12 16:26:05 by sof]
authorsof <unknown>
Fri, 12 Sep 2003 16:26:05 +0000 (16:26 +0000)
committersof <unknown>
Fri, 12 Sep 2003 16:26:05 +0000 (16:26 +0000)
- Sleep()'s resolution is millisecs, not microsecs.
- adopt a more agressive policy for augmenting the thread pool
  to handle incoming requests (see code comments for details.)

  The previous policy ran the risk of starvation in rare (and hard
  to reproduce) cases, as spotted after having chased a bug
  for two days.

Merge to STABLE

ghc/rts/win32/IOManager.c

index 91e4d0d..4aec35f 100644 (file)
  * Internal state maintained by the IO manager.
  */
 typedef struct IOManagerState {
-  CritSection      manLock;
-  WorkQueue*       workQueue;
-  int              numWorkers;
-  int              workersIdle;
-  HANDLE           hExitEvent;
-  unsigned int     requestID;
+    CritSection      manLock;
+    WorkQueue*       workQueue;
+    int              queueSize;
+    int              numWorkers;
+    int              workersIdle;
+    HANDLE           hExitEvent;
+    unsigned int     requestID;
 } IOManagerState;
 
 /* ToDo: wrap up this state via a IOManager handle instead? */
@@ -35,172 +36,181 @@ unsigned
 WINAPI
 IOWorkerProc(PVOID param)
 {
-  HANDLE  hWaits[2];
-  DWORD   rc;
-  IOManagerState* iom = (IOManagerState*)param;
-  WorkQueue* pq = iom->workQueue;
-  WorkItem*  work;
-  int        len = 0, fd = 0;
-  DWORD      errCode;
-  void*      complData;
+    HANDLE  hWaits[2];
+    DWORD   rc;
+    IOManagerState* iom = (IOManagerState*)param;
+    WorkQueue* pq = iom->workQueue;
+    WorkItem*  work;
+    int        len = 0, fd = 0;
+    DWORD      errCode;
+    void*      complData;
 
-  hWaits[0] = (HANDLE)iom->hExitEvent;
-  hWaits[1] = GetWorkQueueHandle(pq);
+    hWaits[0] = (HANDLE)iom->hExitEvent;
+    hWaits[1] = GetWorkQueueHandle(pq);
   
-  while (1) {
-    /* The error code is communicated back on completion of request; reset. */
-    errCode = 0;
+    while (1) {
+       /* The error code is communicated back on completion of request; reset. */
+       errCode = 0;
+       
+       EnterCriticalSection(&iom->manLock);
+       /* Signal that the worker is idle.
+        *
+        * 'workersIdle' is used when determining whether or not to
+        * increase the worker thread pool when adding a new request.
+        * (see addIORequest().)
+        */
+       iom->workersIdle++;
+       LeaveCriticalSection(&iom->manLock);
 
-    EnterCriticalSection(&iom->manLock);
-    iom->workersIdle++;
-    LeaveCriticalSection(&iom->manLock);
+       rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
 
-    rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
-
-    EnterCriticalSection(&iom->manLock);
-    iom->workersIdle--;
-    LeaveCriticalSection(&iom->manLock);
+       EnterCriticalSection(&iom->manLock);
+       /* Signal that the thread is 'non-idle' and about to consume 
+        * a work item.
+        */
+       iom->workersIdle--;
+       iom->queueSize--;
+       LeaveCriticalSection(&iom->manLock);
     
-    if ( WAIT_OBJECT_0 == rc ) {
-      /* shutdown */
-#if 0
-      fprintf(stderr, "shutting down...\n"); fflush(stderr);
-#endif
-      return 0;
-    } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
-      /* work item available, fetch it. */
-#if 0
-      fprintf(stderr, "work available...\n"); fflush(stderr);
-#endif
-      if (FetchWork(pq,(void**)&work)) {
-        if ( work->workKind & WORKER_READ ) {
-         if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = recv(work->workData.ioData.fd, 
-                      work->workData.ioData.buf,
-                      work->workData.ioData.len,
-                      0);
-           if (len == SOCKET_ERROR) {
-             errCode = WSAGetLastError();
-           }
-         } else {
-           len = read(work->workData.ioData.fd,
-                      work->workData.ioData.buf,
-                      work->workData.ioData.len);
-           if (len == -1) { errCode = errno; }
-         }
-         complData = work->workData.ioData.buf;
-         fd = work->workData.ioData.fd;
-       } else if ( work->workKind & WORKER_WRITE ) {
-         if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = send(work->workData.ioData.fd,
-                      work->workData.ioData.buf,
-                      work->workData.ioData.len,
-                      0);
-           if (len == SOCKET_ERROR) {
-             errCode = WSAGetLastError();
-           }
-         } else {
-           len = write(work->workData.ioData.fd,
-                       work->workData.ioData.buf,
-                       work->workData.ioData.len);
-           if (len == -1) { errCode = errno; }
-         }
-         complData = work->workData.ioData.buf;
-         fd = work->workData.ioData.fd;
-       } else if ( work->workKind & WORKER_DELAY ) {
-         /* very approximate implementation of threadDelay */
-         Sleep(work->workData.delayData.msecs);
-         len = work->workData.delayData.msecs;
-         complData = NULL;
-         fd = 0;
-         errCode = 0;
-       } else if ( work->workKind & WORKER_DO_PROC ) {
-           /* perform operation/proc on behalf of Haskell thread. */
-           if (work->workData.procData.proc) {
-               /* The procedure is assumed to encode result + success/failure
-                * via its param.
-                */
-               errCode=work->workData.procData.proc(work->workData.procData.param);
+       if ( WAIT_OBJECT_0 == rc ) {
+           /* shutdown */
+           return 0;
+       } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+           /* work item available, fetch it. */
+           if (FetchWork(pq,(void**)&work)) {
+               if ( work->workKind & WORKER_READ ) {
+                   if ( work->workKind & WORKER_FOR_SOCKET ) {
+                       len = recv(work->workData.ioData.fd, 
+                                  work->workData.ioData.buf,
+                                  work->workData.ioData.len,
+                                  0);
+                       if (len == SOCKET_ERROR) {
+                           errCode = WSAGetLastError();
+                       }
+                   } else {
+                       len = read(work->workData.ioData.fd,
+                                  work->workData.ioData.buf,
+                                  work->workData.ioData.len);
+                       if (len == -1) { errCode = errno; }
+                   }
+                   complData = work->workData.ioData.buf;
+                   fd = work->workData.ioData.fd;
+               } else if ( work->workKind & WORKER_WRITE ) {
+                   if ( work->workKind & WORKER_FOR_SOCKET ) {
+                       len = send(work->workData.ioData.fd,
+                                  work->workData.ioData.buf,
+                                  work->workData.ioData.len,
+                                  0);
+                       if (len == SOCKET_ERROR) {
+                           errCode = WSAGetLastError();
+                       }
+                   } else {
+                       len = write(work->workData.ioData.fd,
+                                   work->workData.ioData.buf,
+                                   work->workData.ioData.len);
+                       if (len == -1) { errCode = errno; }
+                   }
+                   complData = work->workData.ioData.buf;
+                   fd = work->workData.ioData.fd;
+               } else if ( work->workKind & WORKER_DELAY ) {
+                   /* Approximate implementation of threadDelay;
+                    * 
+                    * Note: Sleep() is in milliseconds, not micros.
+                    */
+                   Sleep(work->workData.delayData.msecs / 1000);
+                   len = work->workData.delayData.msecs;
+                   complData = NULL;
+                   fd = 0;
+                   errCode = 0;
+               } else if ( work->workKind & WORKER_DO_PROC ) {
+                   /* perform operation/proc on behalf of Haskell thread. */
+                   if (work->workData.procData.proc) {
+                       /* The procedure is assumed to encode result + success/failure
+                        * via its param.
+                        */
+                       errCode=work->workData.procData.proc(work->workData.procData.param);
+                   } else {
+                       errCode=1;
+                   }
+                   complData = work->workData.procData.param;
+               } else {
+                   fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+                   fflush(stderr);
+                   continue;
+               }
+               work->onCompletion(work->requestID,
+                                  fd,
+                                  len,
+                                  complData,
+                                  errCode);
+               /* Free the WorkItem */
+               free(work);
            } else {
-               errCode=1;
+               fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+               return 1;
            }
-           complData = work->workData.procData.param;
        } else {
-         fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
-         fflush(stderr);
-         continue;
+           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+           return 1;
        }
-       work->onCompletion(work->requestID,
-                          fd,
-                          len,
-                          complData,
-                          errCode);
-       /* Free the WorkItem */
-       free(work);
-      } else {
-         fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
-         return 1;
-      }
-    } else {
-         fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
-         return 1;
     }
-  }
-  return 0;
+    return 0;
 }
 
 static 
 BOOL
 NewIOWorkerThread(IOManagerState* iom)
 {
-  unsigned threadId;
-  return ( 0 != _beginthreadex(NULL,
-                              0,
-                              IOWorkerProc,
-                              (LPVOID)iom,
-                              0,
-                              &threadId) );
+    unsigned threadId;
+    return ( 0 != _beginthreadex(NULL,
+                                0,
+                                IOWorkerProc,
+                                (LPVOID)iom,
+                                0,
+                                &threadId) );
 }
 
 BOOL
 StartIOManager(void)
 {
-  HANDLE hExit;
-  WorkQueue* wq;
+    HANDLE hExit;
+    WorkQueue* wq;
 
-  wq = NewWorkQueue();
-  if ( !wq ) return FALSE;  
+    wq = NewWorkQueue();
+    if ( !wq ) return FALSE;  
   
-  ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+    ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
   
-  if (!ioMan) {
-    FreeWorkQueue(wq);
-    return FALSE;
-  }
+    if (!ioMan) {
+       FreeWorkQueue(wq);
+       return FALSE;
+    }
 
-  /* A manual-reset event */
-  hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
-  if ( !hExit ) {
-    FreeWorkQueue(wq);
-    free(ioMan);
-    return FALSE;
-  }
+    /* A manual-reset event */
+    hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+    if ( !hExit ) {
+       FreeWorkQueue(wq);
+       free(ioMan);
+       return FALSE;
+    }
   
-  ioMan->hExitEvent = hExit;
-  InitializeCriticalSection(&ioMan->manLock);
-  ioMan->workQueue   = wq;
-  ioMan->numWorkers  = 0;
-  ioMan->workersIdle = 0;
-  ioMan->requestID   = 1;
+    ioMan->hExitEvent = hExit;
+    InitializeCriticalSection(&ioMan->manLock);
+    ioMan->workQueue   = wq;
+    ioMan->numWorkers  = 0;
+    ioMan->workersIdle = 0;
+    ioMan->queueSize   = 0;
+    ioMan->requestID   = 1;
  
-  return TRUE;
+    return TRUE;
 }
 
 /*
  * Function: AddIORequest()
  *
  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, returning without blocking.
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process. 
  */
 int
 AddIORequest ( int   fd,
@@ -210,75 +220,110 @@ AddIORequest ( int   fd,
               char* buffer,
               CompletionProc onCompletion)
 {
-  WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
-  unsigned int reqID = ioMan->requestID++;
-  if (!ioMan || !wItem) return 0;
+    WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return 0;
   
-  /* Fill in the blanks */
-  wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
-                        ( forWriting ? WORKER_WRITE : WORKER_READ );
-  wItem->workData.ioData.fd  = fd;
-  wItem->workData.ioData.len = len;
-  wItem->workData.ioData.buf = buffer;
+    /* Fill in the blanks */
+    wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
+                         ( forWriting ? WORKER_WRITE : WORKER_READ );
+    wItem->workData.ioData.fd  = fd;
+    wItem->workData.ioData.len = len;
+    wItem->workData.ioData.buf = buffer;
 
-  wItem->onCompletion        = onCompletion;
-  wItem->requestID           = reqID;
+    wItem->onCompletion        = onCompletion;
+    wItem->requestID           = reqID;
   
-  EnterCriticalSection(&ioMan->manLock);
-  /* If there are no worker threads available, create one.
-   *
-   * If this turns out to be too aggressive a policy, refine.
-   */
+    EnterCriticalSection(&ioMan->manLock);
+    /* If there are no worker threads available, create one.
+     *
+     * If this turns out to be too aggressive a policy, refine.
+     */
 #if 0
-  fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
+    fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); 
+    fflush(stderr);
 #endif
-  if ( ioMan->workersIdle == 0 ) {
-    ioMan->numWorkers++;
-    LeaveCriticalSection(&ioMan->manLock);
-    NewIOWorkerThread(ioMan);
-  } else {
-      LeaveCriticalSection(&ioMan->manLock);
-  }
-  
-  if (SubmitWork(ioMan->workQueue,wItem)) {
-      /* Note: the work item has potentially been consumed by a worker thread
-       *       (and freed) at this point, so we cannot use wItem's requestID.
-       */
-      return reqID;
-  } else {
-      return 0;
-  }
-}             
+    /* A new worker thread is created when there are fewer idle threads
+     * than non-consumed queue requests. This ensures that requests will
+     * be dealt with in a timely manner.
+     *
+     * [Long explanation of why the previous thread pool policy lead to 
+     * trouble]
+     *
+     * Previously, the thread pool was augmented iff no idle worker threads
+     * were available. That strategy runs the risk of repeatedly adding to
+     * the request queue without expanding the thread pool to handle this
+     * sudden spike in queued requests. 
+     * [How? Assume workersIdle is 1, and addIORequest() is called. No new 
+     * thread is created and the, returning without blocking.
+ request is simply queued. If addIORequest()
+     * is called again _before the OS schedules a worker thread to pull the
+     * request off the queue_, workersIdle is still 1 and another request is 
+     * simply added to the queue. Once the worker thread is run, only one
+     * request is de-queued, leaving the 2nd request in the queue]
+     * 
+     * Assuming none of the queued requests take an inordinate amount of to 
+     * complete, the request queue would eventually be drained. But if that's 
+     * not the case, the later requests will end up languishing in the queue 
+     * indefinitely. The non-timely handling of requests may cause CH applications
+     * to misbehave / hang; bad.
+     *
+     */
+    ioMan->queueSize++;
+    if ( ioMan->workersIdle < ioMan->queueSize ) {
+       ioMan->numWorkers++;
+       LeaveCriticalSection(&ioMan->manLock);
+       NewIOWorkerThread(ioMan);
+    } else {
+       LeaveCriticalSection(&ioMan->manLock);
+    }
+    
+    if (SubmitWork(ioMan->workQueue,wItem)) {
+       /* Note: the work item has potentially been consumed by a worker thread
+        *       (and freed) at this point, so we cannot use wItem's requestID.
+        */
+       return reqID;
+    } else {
+       return 0;
+    }
+}       
 
 /*
  * Function: AddDelayRequest()
  *
+ * Like AddIORequest(), but this time adding a delay request to
+ * the request queue.
  */
 BOOL
 AddDelayRequest ( unsigned int   msecs,
                  CompletionProc onCompletion)
 {
-  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-  unsigned int reqID = ioMan->requestID++;
-  if (!ioMan || !wItem) return FALSE;
+    WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return FALSE;
   
-  /* Fill in the blanks */
-  wItem->workKind     = WORKER_DELAY;
-  wItem->workData.delayData.msecs = msecs;
-  wItem->onCompletion = onCompletion;
-  wItem->requestID    = reqID;
+    /* Fill in the blanks */
+    wItem->workKind     = WORKER_DELAY;
+    wItem->workData.delayData.msecs = msecs;
+    wItem->onCompletion = onCompletion;
+    wItem->requestID    = reqID;
 
-  EnterCriticalSection(&ioMan->manLock);
+    EnterCriticalSection(&ioMan->manLock);
 #if 0
-  fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); fflush(stderr);
+    fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
+    fflush(stderr);
 #endif
-  if ( ioMan->workersIdle == 0 ) {
-    ioMan->numWorkers++;
-    LeaveCriticalSection(&ioMan->manLock);
-    NewIOWorkerThread(ioMan);
-  } else {
-      LeaveCriticalSection(&ioMan->manLock);
-  }
+    /* See AddIORequest() for comments regarding policy
+     * for augmenting the worker thread pool.
+     */
+    ioMan->queueSize++;
+    if ( ioMan->workersIdle < ioMan->queueSize ) {
+       ioMan->numWorkers++;
+       LeaveCriticalSection(&ioMan->manLock);
+       NewIOWorkerThread(ioMan);
+    } else {
+       LeaveCriticalSection(&ioMan->manLock);
+    }
   
   if (SubmitWork(ioMan->workQueue,wItem)) {
       /* See AddIORequest() comment */
@@ -289,43 +334,49 @@ AddDelayRequest ( unsigned int   msecs,
 }
 
 /*
- * Function: AddDelayRequest()
+ * Function: AddProcRequest()
  *
+ * Add an asynchronous procedure request.
  */
 BOOL
 AddProcRequest ( void* proc,
                 void* param,
                 CompletionProc onCompletion)
 {
-  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
-  unsigned int reqID = ioMan->requestID++;
-  if (!ioMan || !wItem) return FALSE;
+    WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+    unsigned int reqID = ioMan->requestID++;
+    if (!ioMan || !wItem) return FALSE;
   
-  /* Fill in the blanks */
-  wItem->workKind     = WORKER_DO_PROC;
-  wItem->workData.procData.proc  = proc;
-  wItem->workData.procData.param = param;
-  wItem->onCompletion = onCompletion;
-  wItem->requestID    = reqID;
+    /* Fill in the blanks */
+    wItem->workKind     = WORKER_DO_PROC;
+    wItem->workData.procData.proc  = proc;
+    wItem->workData.procData.param = param;
+    wItem->onCompletion = onCompletion;
+    wItem->requestID    = reqID;
 
-  EnterCriticalSection(&ioMan->manLock);
+    EnterCriticalSection(&ioMan->manLock);
 #if 0
-  fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); fflush(stderr);
+    fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
+    fflush(stderr);
 #endif
-  if ( ioMan->workersIdle == 0 ) {
-    ioMan->numWorkers++;
-    LeaveCriticalSection(&ioMan->manLock);
-    NewIOWorkerThread(ioMan);
-  } else {
-      LeaveCriticalSection(&ioMan->manLock);
-  }
+    /* See AddIORequest() for comments regarding policy
+     * for augmenting the worker thread pool.
+     */
+    ioMan->queueSize++;
+    if ( ioMan->workersIdle < ioMan->queueSize ) {
+       ioMan->numWorkers++;
+       LeaveCriticalSection(&ioMan->manLock);
+       NewIOWorkerThread(ioMan);
+    } else {
+       LeaveCriticalSection(&ioMan->manLock);
+    }
   
-  if (SubmitWork(ioMan->workQueue,wItem)) {
-      /* See AddIORequest() comment */
-      return reqID;
-  } else {
-      return 0;
-  }
+    if (SubmitWork(ioMan->workQueue,wItem)) {
+       /* See AddIORequest() comment */
+       return reqID;
+    } else {
+       return 0;
+    }
 }
 
 void ShutdownIOManager()