From 18340925716fb6e68672c69bd263ad9041f81822 Mon Sep 17 00:00:00 2001 From: sof Date: Thu, 3 Jul 2003 15:14:59 +0000 Subject: [PATCH] [project @ 2003-07-03 15:14:56 by sof] New primop (mingw only), asyncDoProc# :: Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #) which lets a Haskell thread hand off a pointer to external code (1st arg) for asynchronous execution by the RTS worker thread pool. Second arg is data passed in to the asynchronous routine. The routine is _not_ permitted to re-enter the RTS as part of its execution. --- ghc/compiler/prelude/primops.txt.pp | 11 +++- ghc/includes/PrimOps.h | 3 +- ghc/includes/TSO.h | 9 ++-- ghc/rts/PrimOps.hc | 26 ++++++++- ghc/rts/Sanity.c | 5 +- ghc/rts/Schedule.c | 13 ++++- ghc/rts/win32/AsyncIO.c | 25 ++++++--- ghc/rts/win32/AsyncIO.h | 1 + ghc/rts/win32/IOManager.c | 100 +++++++++++++++++++++++++++-------- ghc/rts/win32/IOManager.h | 40 +++++++++----- 10 files changed, 184 insertions(+), 49 deletions(-) diff --git a/ghc/compiler/prelude/primops.txt.pp b/ghc/compiler/prelude/primops.txt.pp index 37c6c6f..f5fd8a7 100644 --- a/ghc/compiler/prelude/primops.txt.pp +++ b/ghc/compiler/prelude/primops.txt.pp @@ -1,5 +1,5 @@ ----------------------------------------------------------------------- --- $Id: primops.txt.pp,v 1.27 2003/06/19 10:42:26 simonmar Exp $ +-- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $ -- -- Primitive Operations -- @@ -1440,6 +1440,15 @@ primop AsyncWriteOp "asyncWrite#" GenPrimOp needs_wrapper = True has_side_effects = True out_of_line = True + +primop AsyncDoProcOp "asyncDoProc#" GenPrimOp + Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #) + {Asynchronously perform procedure (first arg), passing it 2nd arg.} + with + needs_wrapper = True + has_side_effects = True + out_of_line = True + #endif ------------------------------------------------------------------------ diff --git a/ghc/includes/PrimOps.h b/ghc/includes/PrimOps.h index ecc82bc..cf67e61 100644 --- a/ghc/includes/PrimOps.h +++ b/ghc/includes/PrimOps.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: PrimOps.h,v 1.102 2003/06/19 10:42:24 simonmar Exp $ + * $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -244,6 +244,7 @@ EXTFUN_RTS(delayzh_fast); #ifdef mingw32_TARGET_OS EXTFUN_RTS(asyncReadzh_fast); EXTFUN_RTS(asyncWritezh_fast); +EXTFUN_RTS(asyncDoProczh_fast); #endif diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index c99a7cd..7c6e1c0 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $ + * $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $ * * (c) The GHC Team, 1998-1999 * @@ -139,6 +139,9 @@ typedef enum { BlockedOnRead, BlockedOnWrite, BlockedOnDelay +#if defined(mingw32_TARGET_OS) + , BlockedOnDoProc +#endif #if defined(PAR) , BlockedOnGA // blocked on a remote closure represented by a Global Address , BlockedOnGA_NoSend // same as above but without sending a Fetch message @@ -150,7 +153,7 @@ typedef enum { #endif } StgTSOBlockReason; -#ifdef mingw32_TARGET_OS +#if defined(mingw32_TARGET_OS) /* results from an async I/O request + it's ID. */ typedef struct { unsigned int reqID; @@ -163,7 +166,7 @@ typedef union { StgClosure *closure; struct StgTSO_ *tso; int fd; -#ifdef mingw32_TARGET_OS +#if defined(mingw32_TARGET_OS) StgAsyncIOResult* async_result; #endif unsigned int target; diff --git a/ghc/rts/PrimOps.hc b/ghc/rts/PrimOps.hc index 168d968..ea57f05 100644 --- a/ghc/rts/PrimOps.hc +++ b/ghc/rts/PrimOps.hc @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: PrimOps.hc,v 1.107 2003/04/15 14:37:12 simonmar Exp $ + * $Id: PrimOps.hc,v 1.108 2003/07/03 15:14:58 sof Exp $ * * (c) The GHC Team, 1998-2002 * @@ -1676,7 +1676,7 @@ FN_(asyncReadzh_fast) CurrentTSO->why_blocked = BlockedOnRead; ACQUIRE_LOCK(&sched_mutex); /* could probably allocate this on the heap instead */ - ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast"); + ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncReadzh_fast"); reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p); ares->reqID = reqID; ares->len = 0; @@ -1709,4 +1709,26 @@ FN_(asyncWritezh_fast) JMP_(stg_block_async); FE_ } + +FN_(asyncDoProczh_fast) +{ + StgAsyncIOResult* ares; + unsigned int reqID; + FB_ + /* args: R1.i = proc, R2.i = param */ + ASSERT(CurrentTSO->why_blocked == NotBlocked); + CurrentTSO->why_blocked = BlockedOnDoProc; + ACQUIRE_LOCK(&sched_mutex); + /* could probably allocate this on the heap instead */ + ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncDoProczh_fast"); + reqID = RET_STGCALL2(W_,addDoProcRequest,R1.p,R2.p); + ares->reqID = reqID; + ares->len = 0; + ares->errCode = 0; + CurrentTSO->block_info.async_result = ares; + APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + RELEASE_LOCK(&sched_mutex); + JMP_(stg_block_async); + FE_ +} #endif diff --git a/ghc/rts/Sanity.c b/ghc/rts/Sanity.c index 383ef64..a71f862 100644 --- a/ghc/rts/Sanity.c +++ b/ghc/rts/Sanity.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Sanity.c,v 1.33 2003/04/22 16:25:12 simonmar Exp $ + * $Id: Sanity.c,v 1.34 2003/07/03 15:14:58 sof Exp $ * * (c) The GHC Team, 1998-2001 * @@ -610,6 +610,9 @@ checkTSO(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: case BlockedOnDelay: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif /* isOnBQ(blocked_queue) */ break; case BlockedOnException: diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 1afc9fe..69af752 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.170 2003/06/19 10:35:37 simonmar Exp $ + * $Id: Schedule.c,v 1.171 2003/07/03 15:14:58 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -3103,6 +3103,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { /* take TSO off blocked_queue */ StgBlockingQueueElement *prev = NULL; @@ -3230,6 +3233,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { StgTSO *prev = NULL; for (t = blocked_queue_hd; t != END_TSO_QUEUE; @@ -3623,6 +3629,11 @@ printThreadBlockage(StgTSO *tso) case BlockedOnWrite: fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd); break; +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: + fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID); + break; +#endif case BlockedOnDelay: fprintf(stderr,"is blocked until %d", tso->block_info.target); break; diff --git a/ghc/rts/win32/AsyncIO.c b/ghc/rts/win32/AsyncIO.c index b823308..7efaf14 100644 --- a/ghc/rts/win32/AsyncIO.c +++ b/ghc/rts/win32/AsyncIO.c @@ -51,10 +51,9 @@ static int issued_reqs; static void onIOComplete(unsigned int reqID, - void* param STG_UNUSED, int fd STG_UNUSED, int len, - char* buf STG_UNUSED, + void* buf STG_UNUSED, int errCode) { /* Deposit result of request in queue/table */ @@ -96,21 +95,34 @@ addIORequest(int fd, #if 0 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr); #endif - return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete); + return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); } unsigned int -addDelayRequest(int msecs) +addDelayRequest(int msecs) { EnterCriticalSection(&queue_lock); issued_reqs++; LeaveCriticalSection(&queue_lock); #if 0 - fprintf(stderr, "addDelayReq: %d %d %d\n", msecs); fflush(stderr); + fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr); #endif - return AddDelayRequest(msecs,0,onIOComplete); + return AddDelayRequest(msecs,onIOComplete); } +unsigned int +addDoProcRequest(void* proc, void* param) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); +#endif + return AddProcRequest(proc,param,onIOComplete); +} + + int startupAsyncIO() { @@ -186,6 +198,7 @@ start: case BlockedOnDelay: case BlockedOnRead: case BlockedOnWrite: + case BlockedOnDoProc: if (tso->block_info.async_result->reqID == rID) { /* Found the thread blocked waiting on request; stodgily fill * in its result block. diff --git a/ghc/rts/win32/AsyncIO.h b/ghc/rts/win32/AsyncIO.h index d30d55d..00581c5 100644 --- a/ghc/rts/win32/AsyncIO.h +++ b/ghc/rts/win32/AsyncIO.h @@ -13,6 +13,7 @@ addIORequest(int fd, int len, char* buf); extern unsigned int addDelayRequest(int msecs); +extern unsigned int addDoProcRequest(void* proc, void* param); extern int startupAsyncIO(void); extern void shutdownAsyncIO(void); diff --git a/ghc/rts/win32/IOManager.c b/ghc/rts/win32/IOManager.c index ce3ee99..42eba00 100644 --- a/ghc/rts/win32/IOManager.c +++ b/ghc/rts/win32/IOManager.c @@ -40,8 +40,9 @@ IOWorkerProc(PVOID param) IOManagerState* iom = (IOManagerState*)param; WorkQueue* pq = iom->workQueue; WorkItem* work; - int len; + int len = 0, fd = 0; DWORD errCode; + void* complData; hWaits[0] = (HANDLE)iom->hExitEvent; hWaits[1] = GetWorkQueueHandle(pq); @@ -74,39 +75,66 @@ IOWorkerProc(PVOID param) if (FetchWork(pq,(void**)&work)) { if ( work->workKind & WORKER_READ ) { if ( work->workKind & WORKER_FOR_SOCKET ) { - len = recv(work->fd, work->buf, work->len, 0); + len = recv(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); if (len == SOCKET_ERROR) { errCode = WSAGetLastError(); } } else { - len = read(work->fd, work->buf, work->len); + len = read(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); if (len == -1) { errCode = errno; } } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; } else if ( work->workKind & WORKER_WRITE ) { if ( work->workKind & WORKER_FOR_SOCKET ) { - len = send(work->fd, work->buf, work->len, 0); + len = send(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); if (len == SOCKET_ERROR) { errCode = WSAGetLastError(); } } else { - len = write(work->fd,work->buf, work->len); + len = write(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); if (len == -1) { errCode = errno; } } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; } else if ( work->workKind & WORKER_DELAY ) { /* very approximate implementation of threadDelay */ - Sleep(work->len); - len = work->len; + Sleep(work->workData.delayData.msecs); + len = work->workData.delayData.msecs; + complData = NULL; + fd = 0; errCode = 0; + } else if ( work->workKind & WORKER_DO_PROC ) { + /* perform operation/proc on behalf of Haskell thread. */ + if (work->workData.procData.proc) { + /* The procedure is assumed to encode result + success/failure + * via its param. + */ + work->workData.procData.proc(work->workData.procData.param); + errCode=0; + } else { + errCode=1; + } + complData = work->workData.procData.param; } else { fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind); fflush(stderr); continue; } work->onCompletion(work->requestID, - work->param, - work->fd, + fd, len, - work->buf, + complData, errCode); /* Free the WorkItem */ free(work); @@ -181,21 +209,20 @@ AddIORequest ( int fd, BOOL isSocket, int len, char* buffer, - void* data, CompletionProc onCompletion) { WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); if (!ioMan || !wItem) return 0; /* Fill in the blanks */ - wItem->fd = fd; wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | ( forWriting ? WORKER_WRITE : WORKER_READ ); - wItem->len = len; - wItem->buf = buffer; - wItem->param = data; - wItem->onCompletion = onCompletion; - wItem->requestID = ioMan->requestID++; + wItem->workData.ioData.fd = fd; + wItem->workData.ioData.len = len; + wItem->workData.ioData.buf = buffer; + + wItem->onCompletion = onCompletion; + wItem->requestID = ioMan->requestID++; EnterCriticalSection(&ioMan->manLock); /* If there are no worker threads available, create one. @@ -224,18 +251,47 @@ AddIORequest ( int fd, */ BOOL AddDelayRequest ( unsigned int msecs, - void* data, CompletionProc onCompletion) { WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); if (!ioMan || !wItem) return FALSE; /* Fill in the blanks */ - wItem->fd = 0; wItem->workKind = WORKER_DELAY; - wItem->len = msecs; - wItem->buf = 0; - wItem->param = data; + wItem->workData.delayData.msecs = msecs; + wItem->onCompletion = onCompletion; + wItem->requestID = ioMan->requestID++; + + EnterCriticalSection(&ioMan->manLock); + if ( ioMan->workersIdle == 0 ) { + ioMan->numWorkers++; + NewIOWorkerThread(ioMan); + } + LeaveCriticalSection(&ioMan->manLock); + + if (SubmitWork(ioMan->workQueue,wItem)) { + return wItem->requestID; + } else { + return 0; + } +} + +/* + * Function: AddDelayRequest() + * + */ +BOOL +AddProcRequest ( void* proc, + void* param, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DO_PROC; + wItem->workData.procData.proc = proc; + wItem->workData.procData.param = param; wItem->onCompletion = onCompletion; wItem->requestID = ioMan->requestID++; diff --git a/ghc/rts/win32/IOManager.h b/ghc/rts/win32/IOManager.h index 3543a41..cbdda44 100644 --- a/ghc/rts/win32/IOManager.h +++ b/ghc/rts/win32/IOManager.h @@ -32,18 +32,31 @@ extern void* GetFiberData ( void ); * */ typedef void (*CompletionProc)(unsigned int requestID, - void* param, int fd, int len, - char* buf, + void* buf, int errCode); +typedef void (*DoProcProc)(void *param); + +typedef union workData { + struct { + int fd; + int len; + char *buf; + } ioData; + struct { + int msecs; + } delayData; + struct { + DoProcProc proc; + void* param; + } procData; +} WorkData; + typedef struct WorkItem { unsigned int workKind; - int fd; - int len; - char* buf; - void* param; + WorkData workData; unsigned int requestID; CompletionProc onCompletion; } WorkItem; @@ -54,10 +67,11 @@ extern CompletionProc onComplete; * that instead of passing a tag describing the work to be performed, * a function pointer is passed instead. Maybe later. */ -#define WORKER_READ 1 -#define WORKER_WRITE 2 -#define WORKER_DELAY 4 -#define WORKER_FOR_SOCKET 8 +#define WORKER_READ 1 +#define WORKER_WRITE 2 +#define WORKER_DELAY 4 +#define WORKER_FOR_SOCKET 8 +#define WORKER_DO_PROC 16 /* * Starting up and shutting down. @@ -71,7 +85,6 @@ extern void ShutdownIOManager ( void ); * will invoke upon completion. */ extern int AddDelayRequest ( unsigned int msecs, - void* data, CompletionProc onCompletion); extern int AddIORequest ( int fd, @@ -79,7 +92,10 @@ extern int AddIORequest ( int fd, BOOL isSocket, int len, char* buffer, - void* data, CompletionProc onCompletion); +extern int AddProcRequest ( void* proc, + void* data, + CompletionProc onCompletion); + #endif /* __IOMANAGER_H__ */ -- 1.7.10.4