IOManagerState* iom = (IOManagerState*)param;
WorkQueue* pq = iom->workQueue;
WorkItem* work;
- int len;
+ int len = 0, fd = 0;
DWORD errCode;
+ void* complData;
hWaits[0] = (HANDLE)iom->hExitEvent;
hWaits[1] = GetWorkQueueHandle(pq);
if (FetchWork(pq,(void**)&work)) {
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
- len = recv(work->fd, work->buf, work->len, 0);
+ len = recv(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
- len = read(work->fd, work->buf, work->len);
+ len = read(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_WRITE ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
- len = send(work->fd, work->buf, work->len, 0);
+ len = send(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
- len = write(work->fd,work->buf, work->len);
+ len = write(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_DELAY ) {
/* very approximate implementation of threadDelay */
- Sleep(work->len);
- len = work->len;
+ Sleep(work->workData.delayData.msecs);
+ len = work->workData.delayData.msecs;
+ complData = NULL;
+ fd = 0;
errCode = 0;
+ } else if ( work->workKind & WORKER_DO_PROC ) {
+ /* perform operation/proc on behalf of Haskell thread. */
+ if (work->workData.procData.proc) {
+ /* The procedure is assumed to encode result + success/failure
+ * via its param.
+ */
+ work->workData.procData.proc(work->workData.procData.param);
+ errCode=0;
+ } else {
+ errCode=1;
+ }
+ complData = work->workData.procData.param;
} else {
fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
fflush(stderr);
continue;
}
work->onCompletion(work->requestID,
- work->param,
- work->fd,
+ fd,
len,
- work->buf,
+ complData,
errCode);
/* Free the WorkItem */
free(work);
BOOL isSocket,
int len,
char* buffer,
- void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
- wItem->fd = fd;
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
- wItem->len = len;
- wItem->buf = buffer;
- wItem->param = data;
- wItem->onCompletion = onCompletion;
- wItem->requestID = ioMan->requestID++;
+ wItem->workData.ioData.fd = fd;
+ wItem->workData.ioData.len = len;
+ wItem->workData.ioData.buf = buffer;
+
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = ioMan->requestID++;
EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
*/
BOOL
AddDelayRequest ( unsigned int msecs,
- void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return FALSE;
/* Fill in the blanks */
- wItem->fd = 0;
wItem->workKind = WORKER_DELAY;
- wItem->len = msecs;
- wItem->buf = 0;
- wItem->param = data;
+ wItem->workData.delayData.msecs = msecs;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = ioMan->requestID++;
+
+ EnterCriticalSection(&ioMan->manLock);
+ if ( ioMan->workersIdle == 0 ) {
+ ioMan->numWorkers++;
+ NewIOWorkerThread(ioMan);
+ }
+ LeaveCriticalSection(&ioMan->manLock);
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ return wItem->requestID;
+ } else {
+ return 0;
+ }
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ */
+BOOL
+AddProcRequest ( void* proc,
+ void* param,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DO_PROC;
+ wItem->workData.procData.proc = proc;
+ wItem->workData.procData.param = param;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;