From ce42f19f8c840fbe89844471a0d850d310a94556 Mon Sep 17 00:00:00 2001 From: sof Date: Mon, 15 Sep 2003 20:39:38 +0000 Subject: [PATCH] [project @ 2003-09-15 20:39:38 by sof] factor out code that handles depositing of work items on the thread pool's request queue. - when it looks as if a new worker thread needs to be created, give up our quantum first in the hope that this might at the last minute turn up more idle worker threads. - add comment re: trimming pool size. Merged to STABLE eventually; I may continue tinkering with this code some more over the next day or two. --- ghc/rts/win32/IOManager.c | 148 ++++++++++++++++++++------------------------- 1 file changed, 66 insertions(+), 82 deletions(-) diff --git a/ghc/rts/win32/IOManager.c b/ghc/rts/win32/IOManager.c index 4aec35f..b8b7942 100644 --- a/ghc/rts/win32/IOManager.c +++ b/ghc/rts/win32/IOManager.c @@ -61,7 +61,13 @@ IOWorkerProc(PVOID param) */ iom->workersIdle++; LeaveCriticalSection(&iom->manLock); - + + /* + * A possible future refinement is to make long-term idle threads + * wake up and decide to shut down should the number of idle threads + * be above some threshold. + * + */ rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE ); EnterCriticalSection(&iom->manLock); @@ -206,41 +212,22 @@ StartIOManager(void) } /* - * Function: AddIORequest() + * Function: depositWorkItem() + * + * Local function which deposits a WorkItem onto a work queue, + * deciding in the process whether or not the thread pool needs + * to be augmented with another thread to handle the new request. * - * Conduit to underlying WorkQueue's SubmitWork(); adds IO - * request to work queue, deciding whether or not to augment - * the thread pool in the process. */ +static int -AddIORequest ( int fd, - BOOL forWriting, - BOOL isSocket, - int len, - char* buffer, - CompletionProc onCompletion) +depositWorkItem( unsigned int reqID, + WorkItem* wItem ) { - WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); - unsigned int reqID = ioMan->requestID++; - if (!ioMan || !wItem) return 0; - - /* Fill in the blanks */ - wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | - ( forWriting ? WORKER_WRITE : WORKER_READ ); - wItem->workData.ioData.fd = fd; - wItem->workData.ioData.len = len; - wItem->workData.ioData.buf = buffer; - - wItem->onCompletion = onCompletion; - wItem->requestID = reqID; - EnterCriticalSection(&ioMan->manLock); - /* If there are no worker threads available, create one. - * - * If this turns out to be too aggressive a policy, refine. - */ + #if 0 - fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); + fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); fflush(stderr); #endif /* A new worker thread is created when there are fewer idle threads @@ -255,8 +242,7 @@ AddIORequest ( int fd, * the request queue without expanding the thread pool to handle this * sudden spike in queued requests. * [How? Assume workersIdle is 1, and addIORequest() is called. No new - * thread is created and the, returning without blocking. - request is simply queued. If addIORequest() + * thread is created and the request is simply queued. If addIORequest() * is called again _before the OS schedules a worker thread to pull the * request off the queue_, workersIdle is still 1 and another request is * simply added to the queue. Once the worker thread is run, only one @@ -270,14 +256,24 @@ AddIORequest ( int fd, * */ ioMan->queueSize++; - if ( ioMan->workersIdle < ioMan->queueSize ) { - ioMan->numWorkers++; + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* see if giving up our quantum ferrets out some idle threads. + */ LeaveCriticalSection(&ioMan->manLock); - NewIOWorkerThread(ioMan); + Sleep(0); + EnterCriticalSection(&ioMan->manLock); + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* No, go ahead and create another. */ + ioMan->numWorkers++; + LeaveCriticalSection(&ioMan->manLock); + NewIOWorkerThread(ioMan); + } else { + LeaveCriticalSection(&ioMan->manLock); + } } else { LeaveCriticalSection(&ioMan->manLock); } - + if (SubmitWork(ioMan->workQueue,wItem)) { /* Note: the work item has potentially been consumed by a worker thread * (and freed) at this point, so we cannot use wItem's requestID. @@ -286,6 +282,38 @@ AddIORequest ( int fd, } else { return 0; } +} + +/* + * Function: AddIORequest() + * + * Conduit to underlying WorkQueue's SubmitWork(); adds IO + * request to work queue, deciding whether or not to augment + * the thread pool in the process. + */ +int +AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return 0; + + /* Fill in the blanks */ + wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | + ( forWriting ? WORKER_WRITE : WORKER_READ ); + wItem->workData.ioData.fd = fd; + wItem->workData.ioData.len = len; + wItem->workData.ioData.buf = buffer; + + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + + return depositWorkItem(reqID, wItem); } /* @@ -308,29 +336,7 @@ AddDelayRequest ( unsigned int msecs, wItem->onCompletion = onCompletion; wItem->requestID = reqID; - EnterCriticalSection(&ioMan->manLock); -#if 0 - fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); - fflush(stderr); -#endif - /* See AddIORequest() for comments regarding policy - * for augmenting the worker thread pool. - */ - ioMan->queueSize++; - if ( ioMan->workersIdle < ioMan->queueSize ) { - ioMan->numWorkers++; - LeaveCriticalSection(&ioMan->manLock); - NewIOWorkerThread(ioMan); - } else { - LeaveCriticalSection(&ioMan->manLock); - } - - if (SubmitWork(ioMan->workQueue,wItem)) { - /* See AddIORequest() comment */ - return reqID; - } else { - return 0; - } + return depositWorkItem(reqID, wItem); } /* @@ -354,29 +360,7 @@ AddProcRequest ( void* proc, wItem->onCompletion = onCompletion; wItem->requestID = reqID; - EnterCriticalSection(&ioMan->manLock); -#if 0 - fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); - fflush(stderr); -#endif - /* See AddIORequest() for comments regarding policy - * for augmenting the worker thread pool. - */ - ioMan->queueSize++; - if ( ioMan->workersIdle < ioMan->queueSize ) { - ioMan->numWorkers++; - LeaveCriticalSection(&ioMan->manLock); - NewIOWorkerThread(ioMan); - } else { - LeaveCriticalSection(&ioMan->manLock); - } - - if (SubmitWork(ioMan->workQueue,wItem)) { - /* See AddIORequest() comment */ - return reqID; - } else { - return 0; - } + return depositWorkItem(reqID, wItem); } void ShutdownIOManager() -- 1.7.10.4