*/
iom->workersIdle++;
LeaveCriticalSection(&iom->manLock);
-
+
+ /*
+ * A possible future refinement is to make long-term idle threads
+ * wake up and decide to shut down should the number of idle threads
+ * be above some threshold.
+ *
+ */
rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
EnterCriticalSection(&iom->manLock);
}
/*
- * Function: AddIORequest()
+ * Function: depositWorkItem()
+ *
+ * Local function which deposits a WorkItem onto a work queue,
+ * deciding in the process whether or not the thread pool needs
+ * to be augmented with another thread to handle the new request.
*
- * Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, deciding whether or not to augment
- * the thread pool in the process.
*/
+static
int
-AddIORequest ( int fd,
- BOOL forWriting,
- BOOL isSocket,
- int len,
- char* buffer,
- CompletionProc onCompletion)
+depositWorkItem( unsigned int reqID,
+ WorkItem* wItem )
{
- WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
- unsigned int reqID = ioMan->requestID++;
- if (!ioMan || !wItem) return 0;
-
- /* Fill in the blanks */
- wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
- ( forWriting ? WORKER_WRITE : WORKER_READ );
- wItem->workData.ioData.fd = fd;
- wItem->workData.ioData.len = len;
- wItem->workData.ioData.buf = buffer;
-
- wItem->onCompletion = onCompletion;
- wItem->requestID = reqID;
-
EnterCriticalSection(&ioMan->manLock);
- /* If there are no worker threads available, create one.
- *
- * If this turns out to be too aggressive a policy, refine.
- */
+
#if 0
- fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle);
+ fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
fflush(stderr);
#endif
/* A new worker thread is created when there are fewer idle threads
* the request queue without expanding the thread pool to handle this
* sudden spike in queued requests.
* [How? Assume workersIdle is 1, and addIORequest() is called. No new
- * thread is created and the, returning without blocking.
- request is simply queued. If addIORequest()
+ * thread is created and the request is simply queued. If addIORequest()
* is called again _before the OS schedules a worker thread to pull the
* request off the queue_, workersIdle is still 1 and another request is
* simply added to the queue. Once the worker thread is run, only one
*
*/
ioMan->queueSize++;
- if ( ioMan->workersIdle < ioMan->queueSize ) {
- ioMan->numWorkers++;
+ if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+ /* see if giving up our quantum ferrets out some idle threads.
+ */
LeaveCriticalSection(&ioMan->manLock);
- NewIOWorkerThread(ioMan);
+ Sleep(0);
+ EnterCriticalSection(&ioMan->manLock);
+ if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+ /* No, go ahead and create another. */
+ ioMan->numWorkers++;
+ LeaveCriticalSection(&ioMan->manLock);
+ NewIOWorkerThread(ioMan);
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
} else {
LeaveCriticalSection(&ioMan->manLock);
}
-
+
if (SubmitWork(ioMan->workQueue,wItem)) {
/* Note: the work item has potentially been consumed by a worker thread
* (and freed) at this point, so we cannot use wItem's requestID.
} else {
return 0;
}
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process.
+ */
+int
+AddIORequest ( int fd,
+ BOOL forWriting,
+ BOOL isSocket,
+ int len,
+ char* buffer,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return 0;
+
+ /* Fill in the blanks */
+ wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
+ ( forWriting ? WORKER_WRITE : WORKER_READ );
+ wItem->workData.ioData.fd = fd;
+ wItem->workData.ioData.len = len;
+ wItem->workData.ioData.buf = buffer;
+
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+
+ return depositWorkItem(reqID, wItem);
}
/*
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
- EnterCriticalSection(&ioMan->manLock);
-#if 0
- fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
- fflush(stderr);
-#endif
- /* See AddIORequest() for comments regarding policy
- * for augmenting the worker thread pool.
- */
- ioMan->queueSize++;
- if ( ioMan->workersIdle < ioMan->queueSize ) {
- ioMan->numWorkers++;
- LeaveCriticalSection(&ioMan->manLock);
- NewIOWorkerThread(ioMan);
- } else {
- LeaveCriticalSection(&ioMan->manLock);
- }
-
- if (SubmitWork(ioMan->workQueue,wItem)) {
- /* See AddIORequest() comment */
- return reqID;
- } else {
- return 0;
- }
+ return depositWorkItem(reqID, wItem);
}
/*
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
- EnterCriticalSection(&ioMan->manLock);
-#if 0
- fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
- fflush(stderr);
-#endif
- /* See AddIORequest() for comments regarding policy
- * for augmenting the worker thread pool.
- */
- ioMan->queueSize++;
- if ( ioMan->workersIdle < ioMan->queueSize ) {
- ioMan->numWorkers++;
- LeaveCriticalSection(&ioMan->manLock);
- NewIOWorkerThread(ioMan);
- } else {
- LeaveCriticalSection(&ioMan->manLock);
- }
-
- if (SubmitWork(ioMan->workQueue,wItem)) {
- /* See AddIORequest() comment */
- return reqID;
- } else {
- return 0;
- }
+ return depositWorkItem(reqID, wItem);
}
void ShutdownIOManager()