* Internal state maintained by the IO manager.
*/
typedef struct IOManagerState {
- CritSection manLock;
- WorkQueue* workQueue;
- int numWorkers;
- int workersIdle;
- HANDLE hExitEvent;
- unsigned int requestID;
+ CritSection manLock;
+ WorkQueue* workQueue;
+ int queueSize;
+ int numWorkers;
+ int workersIdle;
+ HANDLE hExitEvent;
+ unsigned int requestID;
} IOManagerState;
/* ToDo: wrap up this state via a IOManager handle instead? */
WINAPI
IOWorkerProc(PVOID param)
{
- HANDLE hWaits[2];
- DWORD rc;
- IOManagerState* iom = (IOManagerState*)param;
- WorkQueue* pq = iom->workQueue;
- WorkItem* work;
- int len;
- DWORD errCode;
+ HANDLE hWaits[2];
+ DWORD rc;
+ IOManagerState* iom = (IOManagerState*)param;
+ WorkQueue* pq = iom->workQueue;
+ WorkItem* work;
+ int len = 0, fd = 0;
+ DWORD errCode;
+ void* complData;
- hWaits[0] = (HANDLE)iom->hExitEvent;
- hWaits[1] = GetWorkQueueHandle(pq);
+ hWaits[0] = (HANDLE)iom->hExitEvent;
+ hWaits[1] = GetWorkQueueHandle(pq);
- while (1) {
- /* The error code is communicated back on completion of request; reset. */
- errCode = 0;
+ while (1) {
+ /* The error code is communicated back on completion of request; reset. */
+ errCode = 0;
+
+ EnterCriticalSection(&iom->manLock);
+ /* Signal that the worker is idle.
+ *
+ * 'workersIdle' is used when determining whether or not to
+ * increase the worker thread pool when adding a new request.
+ * (see addIORequest().)
+ */
+ iom->workersIdle++;
+ LeaveCriticalSection(&iom->manLock);
- EnterCriticalSection(&iom->manLock);
- iom->workersIdle++;
- LeaveCriticalSection(&iom->manLock);
+ rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
- rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
-
- EnterCriticalSection(&iom->manLock);
- iom->workersIdle--;
- LeaveCriticalSection(&iom->manLock);
+ EnterCriticalSection(&iom->manLock);
+ /* Signal that the thread is 'non-idle' and about to consume
+ * a work item.
+ */
+ iom->workersIdle--;
+ iom->queueSize--;
+ LeaveCriticalSection(&iom->manLock);
- if ( WAIT_OBJECT_0 == rc ) {
- /* shutdown */
-#if 0
- fprintf(stderr, "shutting down...\n"); fflush(stderr);
-#endif
- return 0;
- } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
- /* work item available, fetch it. */
-#if 0
- fprintf(stderr, "work available...\n"); fflush(stderr);
-#endif
- if (FetchWork(pq,(void**)&work)) {
- if ( work->workKind & WORKER_READ ) {
- if ( work->workKind & WORKER_FOR_SOCKET ) {
- len = recv(work->fd, work->buf, work->len, 0);
- if (len == SOCKET_ERROR) {
- errCode = WSAGetLastError();
- }
- } else {
- len = read(work->fd, work->buf, work->len);
- if (len == -1) { errCode = errno; }
- }
- } else if ( work->workKind & WORKER_WRITE ) {
- if ( work->workKind & WORKER_FOR_SOCKET ) {
- len = send(work->fd, work->buf, work->len, 0);
- if (len == SOCKET_ERROR) {
- errCode = WSAGetLastError();
+ if ( WAIT_OBJECT_0 == rc ) {
+ /* shutdown */
+ return 0;
+ } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
+ /* work item available, fetch it. */
+ if (FetchWork(pq,(void**)&work)) {
+ if ( work->workKind & WORKER_READ ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = recv(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ len = read(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
+ if (len == -1) { errCode = errno; }
+ }
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
+ } else if ( work->workKind & WORKER_WRITE ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = send(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ len = write(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
+ if (len == -1) { errCode = errno; }
+ }
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
+ } else if ( work->workKind & WORKER_DELAY ) {
+ /* Approximate implementation of threadDelay;
+ *
+ * Note: Sleep() is in milliseconds, not micros.
+ */
+ Sleep(work->workData.delayData.msecs / 1000);
+ len = work->workData.delayData.msecs;
+ complData = NULL;
+ fd = 0;
+ errCode = 0;
+ } else if ( work->workKind & WORKER_DO_PROC ) {
+ /* perform operation/proc on behalf of Haskell thread. */
+ if (work->workData.procData.proc) {
+ /* The procedure is assumed to encode result + success/failure
+ * via its param.
+ */
+ errCode=work->workData.procData.proc(work->workData.procData.param);
+ } else {
+ errCode=1;
+ }
+ complData = work->workData.procData.param;
+ } else {
+ fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+ fflush(stderr);
+ continue;
+ }
+ work->onCompletion(work->requestID,
+ fd,
+ len,
+ complData,
+ errCode);
+ /* Free the WorkItem */
+ free(work);
+ } else {
+ fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+ return 1;
}
- } else {
- len = write(work->fd,work->buf, work->len);
- if (len == -1) { errCode = errno; }
- }
- } else if ( work->workKind & WORKER_DELAY ) {
- /* very approximate implementation of threadDelay */
- Sleep(work->len);
- len = work->len;
- errCode = 0;
} else {
- fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
- fflush(stderr);
- continue;
+ fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
+ return 1;
}
- work->onCompletion(work->requestID,
- work->param,
- work->fd,
- len,
- work->buf,
- errCode);
- /* Free the WorkItem */
- free(work);
- } else {
- fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
- return 1;
- }
- } else {
- fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
- return 1;
}
- }
- return 0;
+ return 0;
}
static
BOOL
NewIOWorkerThread(IOManagerState* iom)
{
- return ( 0 != _beginthreadex(NULL,
- 0,
- IOWorkerProc,
- (LPVOID)iom,
- 0,
- NULL) );
+ unsigned threadId;
+ return ( 0 != _beginthreadex(NULL,
+ 0,
+ IOWorkerProc,
+ (LPVOID)iom,
+ 0,
+ &threadId) );
}
BOOL
StartIOManager(void)
{
- HANDLE hExit;
- WorkQueue* wq;
+ HANDLE hExit;
+ WorkQueue* wq;
- wq = NewWorkQueue();
- if ( !wq ) return FALSE;
+ wq = NewWorkQueue();
+ if ( !wq ) return FALSE;
- ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+ ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
- if (!ioMan) {
- FreeWorkQueue(wq);
- return FALSE;
- }
+ if (!ioMan) {
+ FreeWorkQueue(wq);
+ return FALSE;
+ }
- /* A manual-reset event */
- hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
- if ( !hExit ) {
- FreeWorkQueue(wq);
- free(ioMan);
- return FALSE;
- }
+ /* A manual-reset event */
+ hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+ if ( !hExit ) {
+ FreeWorkQueue(wq);
+ free(ioMan);
+ return FALSE;
+ }
- ioMan->hExitEvent = hExit;
- InitializeCriticalSection(&ioMan->manLock);
- ioMan->workQueue = wq;
- ioMan->numWorkers = 0;
- ioMan->workersIdle = 0;
- ioMan->requestID = 1;
+ ioMan->hExitEvent = hExit;
+ InitializeCriticalSection(&ioMan->manLock);
+ ioMan->workQueue = wq;
+ ioMan->numWorkers = 0;
+ ioMan->workersIdle = 0;
+ ioMan->queueSize = 0;
+ ioMan->requestID = 1;
- return TRUE;
+ return TRUE;
}
/*
* Function: AddIORequest()
*
* Conduit to underlying WorkQueue's SubmitWork(); adds IO
- * request to work queue, returning without blocking.
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process.
*/
int
AddIORequest ( int fd,
BOOL isSocket,
int len,
char* buffer,
- void* data,
CompletionProc onCompletion)
{
- WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
- if (!ioMan || !wItem) return 0;
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return 0;
- /* Fill in the blanks */
- wItem->fd = fd;
- wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
- ( forWriting ? WORKER_WRITE : WORKER_READ );
- wItem->len = len;
- wItem->buf = buffer;
- wItem->param = data;
- wItem->onCompletion = onCompletion;
- wItem->requestID = ioMan->requestID++;
+ /* Fill in the blanks */
+ wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
+ ( forWriting ? WORKER_WRITE : WORKER_READ );
+ wItem->workData.ioData.fd = fd;
+ wItem->workData.ioData.len = len;
+ wItem->workData.ioData.buf = buffer;
+
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
- EnterCriticalSection(&ioMan->manLock);
- /* If there are no worker threads available, create one.
- *
- * If this turns out to be too aggressive a policy, refine.
- */
+ EnterCriticalSection(&ioMan->manLock);
+ /* If there are no worker threads available, create one.
+ *
+ * If this turns out to be too aggressive a policy, refine.
+ */
#if 0
- fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
+ fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle);
+ fflush(stderr);
#endif
- if ( ioMan->workersIdle == 0 ) {
- ioMan->numWorkers++;
- NewIOWorkerThread(ioMan);
- }
- LeaveCriticalSection(&ioMan->manLock);
-
- if (SubmitWork(ioMan->workQueue,wItem)) {
- return wItem->requestID;
- } else {
- return 0;
- }
-}
+ /* A new worker thread is created when there are fewer idle threads
+ * than non-consumed queue requests. This ensures that requests will
+ * be dealt with in a timely manner.
+ *
+ * [Long explanation of why the previous thread pool policy lead to
+ * trouble]
+ *
+ * Previously, the thread pool was augmented iff no idle worker threads
+ * were available. That strategy runs the risk of repeatedly adding to
+ * the request queue without expanding the thread pool to handle this
+ * sudden spike in queued requests.
+ * [How? Assume workersIdle is 1, and addIORequest() is called. No new
+ * thread is created and the, returning without blocking.
+ request is simply queued. If addIORequest()
+ * is called again _before the OS schedules a worker thread to pull the
+ * request off the queue_, workersIdle is still 1 and another request is
+ * simply added to the queue. Once the worker thread is run, only one
+ * request is de-queued, leaving the 2nd request in the queue]
+ *
+ * Assuming none of the queued requests take an inordinate amount of to
+ * complete, the request queue would eventually be drained. But if that's
+ * not the case, the later requests will end up languishing in the queue
+ * indefinitely. The non-timely handling of requests may cause CH applications
+ * to misbehave / hang; bad.
+ *
+ */
+ ioMan->queueSize++;
+ if ( ioMan->workersIdle < ioMan->queueSize ) {
+ ioMan->numWorkers++;
+ LeaveCriticalSection(&ioMan->manLock);
+ NewIOWorkerThread(ioMan);
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ /* Note: the work item has potentially been consumed by a worker thread
+ * (and freed) at this point, so we cannot use wItem's requestID.
+ */
+ return reqID;
+ } else {
+ return 0;
+ }
+}
/*
* Function: AddDelayRequest()
*
+ * Like AddIORequest(), but this time adding a delay request to
+ * the request queue.
*/
BOOL
AddDelayRequest ( unsigned int msecs,
- void* data,
CompletionProc onCompletion)
{
- WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
- if (!ioMan || !wItem) return FALSE;
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return FALSE;
- /* Fill in the blanks */
- wItem->fd = 0;
- wItem->workKind = WORKER_DELAY;
- wItem->len = msecs;
- wItem->buf = 0;
- wItem->param = data;
- wItem->onCompletion = onCompletion;
- wItem->requestID = ioMan->requestID++;
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DELAY;
+ wItem->workData.delayData.msecs = msecs;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
- EnterCriticalSection(&ioMan->manLock);
- if ( ioMan->workersIdle == 0 ) {
- ioMan->numWorkers++;
- NewIOWorkerThread(ioMan);
- }
- LeaveCriticalSection(&ioMan->manLock);
+ EnterCriticalSection(&ioMan->manLock);
+#if 0
+ fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
+ fflush(stderr);
+#endif
+ /* See AddIORequest() for comments regarding policy
+ * for augmenting the worker thread pool.
+ */
+ ioMan->queueSize++;
+ if ( ioMan->workersIdle < ioMan->queueSize ) {
+ ioMan->numWorkers++;
+ LeaveCriticalSection(&ioMan->manLock);
+ NewIOWorkerThread(ioMan);
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
if (SubmitWork(ioMan->workQueue,wItem)) {
- return wItem->requestID;
+ /* See AddIORequest() comment */
+ return reqID;
} else {
- return 0;
+ return 0;
}
}
+/*
+ * Function: AddProcRequest()
+ *
+ * Add an asynchronous procedure request.
+ */
+BOOL
+AddProcRequest ( void* proc,
+ void* param,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DO_PROC;
+ wItem->workData.procData.proc = proc;
+ wItem->workData.procData.param = param;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+
+ EnterCriticalSection(&ioMan->manLock);
+#if 0
+ fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
+ fflush(stderr);
+#endif
+ /* See AddIORequest() for comments regarding policy
+ * for augmenting the worker thread pool.
+ */
+ ioMan->queueSize++;
+ if ( ioMan->workersIdle < ioMan->queueSize ) {
+ ioMan->numWorkers++;
+ LeaveCriticalSection(&ioMan->manLock);
+ NewIOWorkerThread(ioMan);
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ /* See AddIORequest() comment */
+ return reqID;
+ } else {
+ return 0;
+ }
+}
+
void ShutdownIOManager()
{
SetEvent(ioMan->hExitEvent);