[project @ 2003-07-03 15:14:56 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
index ce3ee99..42eba00 100644 (file)
@@ -40,8 +40,9 @@ IOWorkerProc(PVOID param)
   IOManagerState* iom = (IOManagerState*)param;
   WorkQueue* pq = iom->workQueue;
   WorkItem*  work;
-  int        len;
+  int        len = 0, fd = 0;
   DWORD      errCode;
+  void*      complData;
 
   hWaits[0] = (HANDLE)iom->hExitEvent;
   hWaits[1] = GetWorkQueueHandle(pq);
@@ -74,39 +75,66 @@ IOWorkerProc(PVOID param)
       if (FetchWork(pq,(void**)&work)) {
         if ( work->workKind & WORKER_READ ) {
          if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = recv(work->fd, work->buf, work->len, 0);
+           len = recv(work->workData.ioData.fd, 
+                      work->workData.ioData.buf,
+                      work->workData.ioData.len,
+                      0);
            if (len == SOCKET_ERROR) {
              errCode = WSAGetLastError();
            }
          } else {
-           len = read(work->fd, work->buf, work->len);
+           len = read(work->workData.ioData.fd,
+                      work->workData.ioData.buf,
+                      work->workData.ioData.len);
            if (len == -1) { errCode = errno; }
          }
+         complData = work->workData.ioData.buf;
+         fd = work->workData.ioData.fd;
        } else if ( work->workKind & WORKER_WRITE ) {
          if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = send(work->fd, work->buf, work->len, 0);
+           len = send(work->workData.ioData.fd,
+                      work->workData.ioData.buf,
+                      work->workData.ioData.len,
+                      0);
            if (len == SOCKET_ERROR) {
              errCode = WSAGetLastError();
            }
          } else {
-           len = write(work->fd,work->buf, work->len);
+           len = write(work->workData.ioData.fd,
+                       work->workData.ioData.buf,
+                       work->workData.ioData.len);
            if (len == -1) { errCode = errno; }
          }
+         complData = work->workData.ioData.buf;
+         fd = work->workData.ioData.fd;
        } else if ( work->workKind & WORKER_DELAY ) {
          /* very approximate implementation of threadDelay */
-         Sleep(work->len);
-         len = work->len;
+         Sleep(work->workData.delayData.msecs);
+         len = work->workData.delayData.msecs;
+         complData = NULL;
+         fd = 0;
          errCode = 0;
+       } else if ( work->workKind & WORKER_DO_PROC ) {
+           /* perform operation/proc on behalf of Haskell thread. */
+           if (work->workData.procData.proc) {
+               /* The procedure is assumed to encode result + success/failure
+                * via its param.
+                */
+               work->workData.procData.proc(work->workData.procData.param);
+               errCode=0;
+           } else {
+               errCode=1;
+           }
+           complData = work->workData.procData.param;
        } else {
          fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
          fflush(stderr);
          continue;
        }
        work->onCompletion(work->requestID,
-                          work->param,
-                          work->fd,
+                          fd,
                           len,
-                          work->buf,
+                          complData,
                           errCode);
        /* Free the WorkItem */
        free(work);
@@ -181,21 +209,20 @@ AddIORequest ( int   fd,
               BOOL  isSocket,
               int   len,
               char* buffer,
-              void* data,
               CompletionProc onCompletion)
 {
   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
   if (!ioMan || !wItem) return 0;
   
   /* Fill in the blanks */
-  wItem->fd           = fd;
   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
                         ( forWriting ? WORKER_WRITE : WORKER_READ );
-  wItem->len          = len;
-  wItem->buf          = buffer;
-  wItem->param        = data;
-  wItem->onCompletion = onCompletion;
-  wItem->requestID    = ioMan->requestID++;
+  wItem->workData.ioData.fd  = fd;
+  wItem->workData.ioData.len = len;
+  wItem->workData.ioData.buf = buffer;
+
+  wItem->onCompletion        = onCompletion;
+  wItem->requestID           = ioMan->requestID++;
   
   EnterCriticalSection(&ioMan->manLock);
   /* If there are no worker threads available, create one.
@@ -224,18 +251,47 @@ AddIORequest ( int   fd,
  */
 BOOL
 AddDelayRequest ( unsigned int   msecs,
-                 void*          data,
                  CompletionProc onCompletion)
 {
   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
   if (!ioMan || !wItem) return FALSE;
   
   /* Fill in the blanks */
-  wItem->fd           = 0;
   wItem->workKind     = WORKER_DELAY;
-  wItem->len          = msecs;
-  wItem->buf          = 0;
-  wItem->param        = data;
+  wItem->workData.delayData.msecs = msecs;
+  wItem->onCompletion = onCompletion;
+  wItem->requestID    = ioMan->requestID++;
+
+  EnterCriticalSection(&ioMan->manLock);
+  if ( ioMan->workersIdle == 0 ) {
+    ioMan->numWorkers++;
+    NewIOWorkerThread(ioMan);
+  }
+  LeaveCriticalSection(&ioMan->manLock);
+  
+  if (SubmitWork(ioMan->workQueue,wItem)) {
+    return wItem->requestID;
+  } else {
+    return 0;
+  }
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ */
+BOOL
+AddProcRequest ( void* proc,
+                void* param,
+                CompletionProc onCompletion)
+{
+  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+  if (!ioMan || !wItem) return FALSE;
+  
+  /* Fill in the blanks */
+  wItem->workKind     = WORKER_DO_PROC;
+  wItem->workData.procData.proc  = proc;
+  wItem->workData.procData.param = param;
   wItem->onCompletion = onCompletion;
   wItem->requestID    = ioMan->requestID++;