[project @ 2003-07-03 15:14:56 by sof]
authorsof <unknown>
Thu, 3 Jul 2003 15:14:59 +0000 (15:14 +0000)
committersof <unknown>
Thu, 3 Jul 2003 15:14:59 +0000 (15:14 +0000)
New primop (mingw only),

  asyncDoProc# :: Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)

which lets a Haskell thread hand off a pointer to external code (1st arg) for
asynchronous execution by the RTS worker thread pool. Second arg is data passed
in to the asynchronous routine. The routine is _not_ permitted to re-enter
the RTS as part of its execution.

ghc/compiler/prelude/primops.txt.pp
ghc/includes/PrimOps.h
ghc/includes/TSO.h
ghc/rts/PrimOps.hc
ghc/rts/Sanity.c
ghc/rts/Schedule.c
ghc/rts/win32/AsyncIO.c
ghc/rts/win32/AsyncIO.h
ghc/rts/win32/IOManager.c
ghc/rts/win32/IOManager.h

index 37c6c6f..f5fd8a7 100644 (file)
@@ -1,5 +1,5 @@
 -----------------------------------------------------------------------
--- $Id: primops.txt.pp,v 1.27 2003/06/19 10:42:26 simonmar Exp $
+-- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $
 --
 -- Primitive Operations
 --
@@ -1440,6 +1440,15 @@ primop  AsyncWriteOp "asyncWrite#" GenPrimOp
    needs_wrapper    = True
    has_side_effects = True
    out_of_line      = True
+
+primop  AsyncDoProcOp "asyncDoProc#" GenPrimOp
+   Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
+   {Asynchronously perform procedure (first arg), passing it 2nd arg.}
+   with
+   needs_wrapper    = True
+   has_side_effects = True
+   out_of_line      = True
+
 #endif
 
 ------------------------------------------------------------------------
index ecc82bc..cf67e61 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.102 2003/06/19 10:42:24 simonmar Exp $
+ * $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -244,6 +244,7 @@ EXTFUN_RTS(delayzh_fast);
 #ifdef mingw32_TARGET_OS
 EXTFUN_RTS(asyncReadzh_fast);
 EXTFUN_RTS(asyncWritezh_fast);
+EXTFUN_RTS(asyncDoProczh_fast);
 #endif
 
 
index c99a7cd..7c6e1c0 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $
+ * $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $
  *
  * (c) The GHC Team, 1998-1999
  *
@@ -139,6 +139,9 @@ typedef enum {
   BlockedOnRead,
   BlockedOnWrite,
   BlockedOnDelay
+#if defined(mingw32_TARGET_OS)
+  , BlockedOnDoProc
+#endif
 #if defined(PAR)
   , BlockedOnGA  // blocked on a remote closure represented by a Global Address
   , BlockedOnGA_NoSend // same as above but without sending a Fetch message
@@ -150,7 +153,7 @@ typedef enum {
 #endif
 } StgTSOBlockReason;
 
-#ifdef mingw32_TARGET_OS
+#if defined(mingw32_TARGET_OS)
 /* results from an async I/O request + it's ID. */
 typedef struct {
   unsigned int reqID;
@@ -163,7 +166,7 @@ typedef union {
   StgClosure *closure;
   struct StgTSO_ *tso;
   int fd;
-#ifdef mingw32_TARGET_OS
+#if defined(mingw32_TARGET_OS)
   StgAsyncIOResult* async_result;
 #endif
   unsigned int target;
index 168d968..ea57f05 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.107 2003/04/15 14:37:12 simonmar Exp $
+ * $Id: PrimOps.hc,v 1.108 2003/07/03 15:14:58 sof Exp $
  *
  * (c) The GHC Team, 1998-2002
  *
@@ -1676,7 +1676,7 @@ FN_(asyncReadzh_fast)
     CurrentTSO->why_blocked = BlockedOnRead;
     ACQUIRE_LOCK(&sched_mutex);
     /* could probably allocate this on the heap instead */
-    ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
+    ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncReadzh_fast");
     reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p);
     ares->reqID   = reqID;
     ares->len     = 0;
@@ -1709,4 +1709,26 @@ FN_(asyncWritezh_fast)
     JMP_(stg_block_async);
   FE_
 }
+
+FN_(asyncDoProczh_fast)
+{
+  StgAsyncIOResult* ares;
+  unsigned int reqID;
+  FB_
+    /* args: R1.i = proc, R2.i = param */
+    ASSERT(CurrentTSO->why_blocked == NotBlocked);
+    CurrentTSO->why_blocked = BlockedOnDoProc;
+    ACQUIRE_LOCK(&sched_mutex);
+    /* could probably allocate this on the heap instead */
+    ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncDoProczh_fast");
+    reqID = RET_STGCALL2(W_,addDoProcRequest,R1.p,R2.p);
+    ares->reqID   = reqID;
+    ares->len     = 0;
+    ares->errCode = 0;
+    CurrentTSO->block_info.async_result = ares;
+    APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+    RELEASE_LOCK(&sched_mutex);
+    JMP_(stg_block_async);
+  FE_
+}
 #endif
index 383ef64..a71f862 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Sanity.c,v 1.33 2003/04/22 16:25:12 simonmar Exp $
+ * $Id: Sanity.c,v 1.34 2003/07/03 15:14:58 sof Exp $
  *
  * (c) The GHC Team, 1998-2001
  *
@@ -610,6 +610,9 @@ checkTSO(StgTSO *tso)
     case BlockedOnRead:
     case BlockedOnWrite:
     case BlockedOnDelay:
+#if defined(mingw32_TARGET_OS)
+    case BlockedOnDoProc:
+#endif
       /* isOnBQ(blocked_queue) */
       break;
     case BlockedOnException:
index 1afc9fe..69af752 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.170 2003/06/19 10:35:37 simonmar Exp $
+ * $Id: Schedule.c,v 1.171 2003/07/03 15:14:58 sof Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -3103,6 +3103,9 @@ unblockThread(StgTSO *tso)
 
   case BlockedOnRead:
   case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+  case BlockedOnDoProc:
+#endif
     {
       /* take TSO off blocked_queue */
       StgBlockingQueueElement *prev = NULL;
@@ -3230,6 +3233,9 @@ unblockThread(StgTSO *tso)
 
   case BlockedOnRead:
   case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+  case BlockedOnDoProc:
+#endif
     {
       StgTSO *prev = NULL;
       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
@@ -3623,6 +3629,11 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnWrite:
     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
     break;
+#if defined(mingw32_TARGET_OS)
+    case BlockedOnDoProc:
+    fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+    break;
+#endif
   case BlockedOnDelay:
     fprintf(stderr,"is blocked until %d", tso->block_info.target);
     break;
index b823308..7efaf14 100644 (file)
@@ -51,10 +51,9 @@ static int              issued_reqs;
 
 static void
 onIOComplete(unsigned int reqID,
-            void* param STG_UNUSED,
             int   fd STG_UNUSED,
             int   len,
-            char* buf STG_UNUSED,
+            void* buf STG_UNUSED,
             int   errCode)
 {
   /* Deposit result of request in queue/table */
@@ -96,21 +95,34 @@ addIORequest(int   fd,
 #if 0
   fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
 #endif
-  return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
+  return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
 }
 
 unsigned int
-addDelayRequest(int   msecs)
+addDelayRequest(int msecs)
 {
   EnterCriticalSection(&queue_lock);
   issued_reqs++;
   LeaveCriticalSection(&queue_lock);
 #if 0
-  fprintf(stderr, "addDelayReq: %d %d %d\n", msecs); fflush(stderr);
+  fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
 #endif
-  return AddDelayRequest(msecs,0,onIOComplete);
+  return AddDelayRequest(msecs,onIOComplete);
 }
 
+unsigned int
+addDoProcRequest(void* proc, void* param)
+{
+  EnterCriticalSection(&queue_lock);
+  issued_reqs++;
+  LeaveCriticalSection(&queue_lock);
+#if 0
+  fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
+#endif
+  return AddProcRequest(proc,param,onIOComplete);
+}
+
+
 int
 startupAsyncIO()
 {
@@ -186,6 +198,7 @@ start:
        case BlockedOnDelay:
        case BlockedOnRead:
        case BlockedOnWrite:
+       case BlockedOnDoProc:
          if (tso->block_info.async_result->reqID == rID) {
            /* Found the thread blocked waiting on request; stodgily fill 
             * in its result block. 
index d30d55d..00581c5 100644 (file)
@@ -13,6 +13,7 @@ addIORequest(int   fd,
             int   len,
             char* buf);
 extern unsigned int addDelayRequest(int   msecs);
+extern unsigned int addDoProcRequest(void* proc, void* param);
 extern int  startupAsyncIO(void);
 extern void shutdownAsyncIO(void);
 
index ce3ee99..42eba00 100644 (file)
@@ -40,8 +40,9 @@ IOWorkerProc(PVOID param)
   IOManagerState* iom = (IOManagerState*)param;
   WorkQueue* pq = iom->workQueue;
   WorkItem*  work;
-  int        len;
+  int        len = 0, fd = 0;
   DWORD      errCode;
+  void*      complData;
 
   hWaits[0] = (HANDLE)iom->hExitEvent;
   hWaits[1] = GetWorkQueueHandle(pq);
@@ -74,39 +75,66 @@ IOWorkerProc(PVOID param)
       if (FetchWork(pq,(void**)&work)) {
         if ( work->workKind & WORKER_READ ) {
          if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = recv(work->fd, work->buf, work->len, 0);
+           len = recv(work->workData.ioData.fd, 
+                      work->workData.ioData.buf,
+                      work->workData.ioData.len,
+                      0);
            if (len == SOCKET_ERROR) {
              errCode = WSAGetLastError();
            }
          } else {
-           len = read(work->fd, work->buf, work->len);
+           len = read(work->workData.ioData.fd,
+                      work->workData.ioData.buf,
+                      work->workData.ioData.len);
            if (len == -1) { errCode = errno; }
          }
+         complData = work->workData.ioData.buf;
+         fd = work->workData.ioData.fd;
        } else if ( work->workKind & WORKER_WRITE ) {
          if ( work->workKind & WORKER_FOR_SOCKET ) {
-           len = send(work->fd, work->buf, work->len, 0);
+           len = send(work->workData.ioData.fd,
+                      work->workData.ioData.buf,
+                      work->workData.ioData.len,
+                      0);
            if (len == SOCKET_ERROR) {
              errCode = WSAGetLastError();
            }
          } else {
-           len = write(work->fd,work->buf, work->len);
+           len = write(work->workData.ioData.fd,
+                       work->workData.ioData.buf,
+                       work->workData.ioData.len);
            if (len == -1) { errCode = errno; }
          }
+         complData = work->workData.ioData.buf;
+         fd = work->workData.ioData.fd;
        } else if ( work->workKind & WORKER_DELAY ) {
          /* very approximate implementation of threadDelay */
-         Sleep(work->len);
-         len = work->len;
+         Sleep(work->workData.delayData.msecs);
+         len = work->workData.delayData.msecs;
+         complData = NULL;
+         fd = 0;
          errCode = 0;
+       } else if ( work->workKind & WORKER_DO_PROC ) {
+           /* perform operation/proc on behalf of Haskell thread. */
+           if (work->workData.procData.proc) {
+               /* The procedure is assumed to encode result + success/failure
+                * via its param.
+                */
+               work->workData.procData.proc(work->workData.procData.param);
+               errCode=0;
+           } else {
+               errCode=1;
+           }
+           complData = work->workData.procData.param;
        } else {
          fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
          fflush(stderr);
          continue;
        }
        work->onCompletion(work->requestID,
-                          work->param,
-                          work->fd,
+                          fd,
                           len,
-                          work->buf,
+                          complData,
                           errCode);
        /* Free the WorkItem */
        free(work);
@@ -181,21 +209,20 @@ AddIORequest ( int   fd,
               BOOL  isSocket,
               int   len,
               char* buffer,
-              void* data,
               CompletionProc onCompletion)
 {
   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
   if (!ioMan || !wItem) return 0;
   
   /* Fill in the blanks */
-  wItem->fd           = fd;
   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
                         ( forWriting ? WORKER_WRITE : WORKER_READ );
-  wItem->len          = len;
-  wItem->buf          = buffer;
-  wItem->param        = data;
-  wItem->onCompletion = onCompletion;
-  wItem->requestID    = ioMan->requestID++;
+  wItem->workData.ioData.fd  = fd;
+  wItem->workData.ioData.len = len;
+  wItem->workData.ioData.buf = buffer;
+
+  wItem->onCompletion        = onCompletion;
+  wItem->requestID           = ioMan->requestID++;
   
   EnterCriticalSection(&ioMan->manLock);
   /* If there are no worker threads available, create one.
@@ -224,18 +251,47 @@ AddIORequest ( int   fd,
  */
 BOOL
 AddDelayRequest ( unsigned int   msecs,
-                 void*          data,
                  CompletionProc onCompletion)
 {
   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
   if (!ioMan || !wItem) return FALSE;
   
   /* Fill in the blanks */
-  wItem->fd           = 0;
   wItem->workKind     = WORKER_DELAY;
-  wItem->len          = msecs;
-  wItem->buf          = 0;
-  wItem->param        = data;
+  wItem->workData.delayData.msecs = msecs;
+  wItem->onCompletion = onCompletion;
+  wItem->requestID    = ioMan->requestID++;
+
+  EnterCriticalSection(&ioMan->manLock);
+  if ( ioMan->workersIdle == 0 ) {
+    ioMan->numWorkers++;
+    NewIOWorkerThread(ioMan);
+  }
+  LeaveCriticalSection(&ioMan->manLock);
+  
+  if (SubmitWork(ioMan->workQueue,wItem)) {
+    return wItem->requestID;
+  } else {
+    return 0;
+  }
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ */
+BOOL
+AddProcRequest ( void* proc,
+                void* param,
+                CompletionProc onCompletion)
+{
+  WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+  if (!ioMan || !wItem) return FALSE;
+  
+  /* Fill in the blanks */
+  wItem->workKind     = WORKER_DO_PROC;
+  wItem->workData.procData.proc  = proc;
+  wItem->workData.procData.param = param;
   wItem->onCompletion = onCompletion;
   wItem->requestID    = ioMan->requestID++;
 
index 3543a41..cbdda44 100644 (file)
@@ -32,18 +32,31 @@ extern void* GetFiberData ( void );
  *
  */
 typedef void (*CompletionProc)(unsigned int requestID,
-                              void* param,
                               int   fd,
                               int   len,
-                              char* buf,
+                              void* buf,
                               int   errCode);
 
+typedef void (*DoProcProc)(void *param);
+
+typedef union workData {
+    struct {
+       int   fd;
+       int   len;
+       char *buf; 
+    } ioData;
+    struct { 
+       int   msecs;
+    } delayData;
+    struct { 
+       DoProcProc proc;
+       void* param;
+    } procData;
+} WorkData;
+
 typedef struct WorkItem {
   unsigned int   workKind;
-  int            fd;
-  int            len;
-  char*          buf;
-  void*          param;
+  WorkData       workData;
   unsigned int   requestID;
   CompletionProc onCompletion;
 } WorkItem;
@@ -54,10 +67,11 @@ extern CompletionProc onComplete;
  * that instead of passing a tag describing the work to be performed,
  * a function pointer is passed instead. Maybe later.
  */
-#define WORKER_READ       1
-#define WORKER_WRITE      2
-#define WORKER_DELAY      4
-#define WORKER_FOR_SOCKET 8
+#define WORKER_READ        1
+#define WORKER_WRITE       2
+#define WORKER_DELAY       4
+#define WORKER_FOR_SOCKET  8
+#define WORKER_DO_PROC    16
 
 /*
  * Starting up and shutting down. 
@@ -71,7 +85,6 @@ extern void ShutdownIOManager  ( void );
  * will invoke upon completion.
  */
 extern int AddDelayRequest ( unsigned int   msecs,
-                            void*          data,
                             CompletionProc onCompletion);
 
 extern int AddIORequest ( int            fd,
@@ -79,7 +92,10 @@ extern int AddIORequest ( int            fd,
                          BOOL           isSocket,
                          int            len,
                          char*          buffer,
-                         void*          data,
                          CompletionProc onCompletion);
 
+extern int AddProcRequest ( void*          proc,
+                           void*          data,
+                           CompletionProc onCompletion);
+
 #endif /* __IOMANAGER_H__ */