-----------------------------------------------------------------------
--- $Id: primops.txt.pp,v 1.27 2003/06/19 10:42:26 simonmar Exp $
+-- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $
--
-- Primitive Operations
--
needs_wrapper = True
has_side_effects = True
out_of_line = True
+
+primop AsyncDoProcOp "asyncDoProc#" GenPrimOp
+ Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
+ {Asynchronously perform procedure (first arg), passing it 2nd arg.}
+ with
+ needs_wrapper = True
+ has_side_effects = True
+ out_of_line = True
+
#endif
------------------------------------------------------------------------
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.102 2003/06/19 10:42:24 simonmar Exp $
+ * $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
#ifdef mingw32_TARGET_OS
EXTFUN_RTS(asyncReadzh_fast);
EXTFUN_RTS(asyncWritezh_fast);
+EXTFUN_RTS(asyncDoProczh_fast);
#endif
/* -----------------------------------------------------------------------------
- * $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $
+ * $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-1999
*
BlockedOnRead,
BlockedOnWrite,
BlockedOnDelay
+#if defined(mingw32_TARGET_OS)
+ , BlockedOnDoProc
+#endif
#if defined(PAR)
, BlockedOnGA // blocked on a remote closure represented by a Global Address
, BlockedOnGA_NoSend // same as above but without sending a Fetch message
#endif
} StgTSOBlockReason;
-#ifdef mingw32_TARGET_OS
+#if defined(mingw32_TARGET_OS)
/* results from an async I/O request + it's ID. */
typedef struct {
unsigned int reqID;
StgClosure *closure;
struct StgTSO_ *tso;
int fd;
-#ifdef mingw32_TARGET_OS
+#if defined(mingw32_TARGET_OS)
StgAsyncIOResult* async_result;
#endif
unsigned int target;
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.hc,v 1.107 2003/04/15 14:37:12 simonmar Exp $
+ * $Id: PrimOps.hc,v 1.108 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
CurrentTSO->why_blocked = BlockedOnRead;
ACQUIRE_LOCK(&sched_mutex);
/* could probably allocate this on the heap instead */
- ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
+ ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncReadzh_fast");
reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p);
ares->reqID = reqID;
ares->len = 0;
JMP_(stg_block_async);
FE_
}
+
+FN_(asyncDoProczh_fast)
+{
+ StgAsyncIOResult* ares;
+ unsigned int reqID;
+ FB_
+ /* args: R1.i = proc, R2.i = param */
+ ASSERT(CurrentTSO->why_blocked == NotBlocked);
+ CurrentTSO->why_blocked = BlockedOnDoProc;
+ ACQUIRE_LOCK(&sched_mutex);
+ /* could probably allocate this on the heap instead */
+ ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncDoProczh_fast");
+ reqID = RET_STGCALL2(W_,addDoProcRequest,R1.p,R2.p);
+ ares->reqID = reqID;
+ ares->len = 0;
+ ares->errCode = 0;
+ CurrentTSO->block_info.async_result = ares;
+ APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ RELEASE_LOCK(&sched_mutex);
+ JMP_(stg_block_async);
+ FE_
+}
#endif
/* -----------------------------------------------------------------------------
- * $Id: Sanity.c,v 1.33 2003/04/22 16:25:12 simonmar Exp $
+ * $Id: Sanity.c,v 1.34 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-2001
*
case BlockedOnRead:
case BlockedOnWrite:
case BlockedOnDelay:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
/* isOnBQ(blocked_queue) */
break;
case BlockedOnException:
/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.170 2003/06/19 10:35:37 simonmar Exp $
+ * $Id: Schedule.c,v 1.171 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
/* take TSO off blocked_queue */
StgBlockingQueueElement *prev = NULL;
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
StgTSO *prev = NULL;
for (t = blocked_queue_hd; t != END_TSO_QUEUE;
case BlockedOnWrite:
fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
break;
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+ fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+ break;
+#endif
case BlockedOnDelay:
fprintf(stderr,"is blocked until %d", tso->block_info.target);
break;
static void
onIOComplete(unsigned int reqID,
- void* param STG_UNUSED,
int fd STG_UNUSED,
int len,
- char* buf STG_UNUSED,
+ void* buf STG_UNUSED,
int errCode)
{
/* Deposit result of request in queue/table */
#if 0
fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
#endif
- return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
+ return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
}
unsigned int
-addDelayRequest(int msecs)
+addDelayRequest(int msecs)
{
EnterCriticalSection(&queue_lock);
issued_reqs++;
LeaveCriticalSection(&queue_lock);
#if 0
- fprintf(stderr, "addDelayReq: %d %d %d\n", msecs); fflush(stderr);
+ fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
#endif
- return AddDelayRequest(msecs,0,onIOComplete);
+ return AddDelayRequest(msecs,onIOComplete);
}
+unsigned int
+addDoProcRequest(void* proc, void* param)
+{
+ EnterCriticalSection(&queue_lock);
+ issued_reqs++;
+ LeaveCriticalSection(&queue_lock);
+#if 0
+ fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
+#endif
+ return AddProcRequest(proc,param,onIOComplete);
+}
+
+
int
startupAsyncIO()
{
case BlockedOnDelay:
case BlockedOnRead:
case BlockedOnWrite:
+ case BlockedOnDoProc:
if (tso->block_info.async_result->reqID == rID) {
/* Found the thread blocked waiting on request; stodgily fill
* in its result block.
int len,
char* buf);
extern unsigned int addDelayRequest(int msecs);
+extern unsigned int addDoProcRequest(void* proc, void* param);
extern int startupAsyncIO(void);
extern void shutdownAsyncIO(void);
IOManagerState* iom = (IOManagerState*)param;
WorkQueue* pq = iom->workQueue;
WorkItem* work;
- int len;
+ int len = 0, fd = 0;
DWORD errCode;
+ void* complData;
hWaits[0] = (HANDLE)iom->hExitEvent;
hWaits[1] = GetWorkQueueHandle(pq);
if (FetchWork(pq,(void**)&work)) {
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
- len = recv(work->fd, work->buf, work->len, 0);
+ len = recv(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
- len = read(work->fd, work->buf, work->len);
+ len = read(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_WRITE ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
- len = send(work->fd, work->buf, work->len, 0);
+ len = send(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
- len = write(work->fd,work->buf, work->len);
+ len = write(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_DELAY ) {
/* very approximate implementation of threadDelay */
- Sleep(work->len);
- len = work->len;
+ Sleep(work->workData.delayData.msecs);
+ len = work->workData.delayData.msecs;
+ complData = NULL;
+ fd = 0;
errCode = 0;
+ } else if ( work->workKind & WORKER_DO_PROC ) {
+ /* perform operation/proc on behalf of Haskell thread. */
+ if (work->workData.procData.proc) {
+ /* The procedure is assumed to encode result + success/failure
+ * via its param.
+ */
+ work->workData.procData.proc(work->workData.procData.param);
+ errCode=0;
+ } else {
+ errCode=1;
+ }
+ complData = work->workData.procData.param;
} else {
fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
fflush(stderr);
continue;
}
work->onCompletion(work->requestID,
- work->param,
- work->fd,
+ fd,
len,
- work->buf,
+ complData,
errCode);
/* Free the WorkItem */
free(work);
BOOL isSocket,
int len,
char* buffer,
- void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
- wItem->fd = fd;
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
- wItem->len = len;
- wItem->buf = buffer;
- wItem->param = data;
- wItem->onCompletion = onCompletion;
- wItem->requestID = ioMan->requestID++;
+ wItem->workData.ioData.fd = fd;
+ wItem->workData.ioData.len = len;
+ wItem->workData.ioData.buf = buffer;
+
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = ioMan->requestID++;
EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
*/
BOOL
AddDelayRequest ( unsigned int msecs,
- void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return FALSE;
/* Fill in the blanks */
- wItem->fd = 0;
wItem->workKind = WORKER_DELAY;
- wItem->len = msecs;
- wItem->buf = 0;
- wItem->param = data;
+ wItem->workData.delayData.msecs = msecs;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = ioMan->requestID++;
+
+ EnterCriticalSection(&ioMan->manLock);
+ if ( ioMan->workersIdle == 0 ) {
+ ioMan->numWorkers++;
+ NewIOWorkerThread(ioMan);
+ }
+ LeaveCriticalSection(&ioMan->manLock);
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ return wItem->requestID;
+ } else {
+ return 0;
+ }
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ */
+BOOL
+AddProcRequest ( void* proc,
+ void* param,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DO_PROC;
+ wItem->workData.procData.proc = proc;
+ wItem->workData.procData.param = param;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;
*
*/
typedef void (*CompletionProc)(unsigned int requestID,
- void* param,
int fd,
int len,
- char* buf,
+ void* buf,
int errCode);
+typedef void (*DoProcProc)(void *param);
+
+typedef union workData {
+ struct {
+ int fd;
+ int len;
+ char *buf;
+ } ioData;
+ struct {
+ int msecs;
+ } delayData;
+ struct {
+ DoProcProc proc;
+ void* param;
+ } procData;
+} WorkData;
+
typedef struct WorkItem {
unsigned int workKind;
- int fd;
- int len;
- char* buf;
- void* param;
+ WorkData workData;
unsigned int requestID;
CompletionProc onCompletion;
} WorkItem;
* that instead of passing a tag describing the work to be performed,
* a function pointer is passed instead. Maybe later.
*/
-#define WORKER_READ 1
-#define WORKER_WRITE 2
-#define WORKER_DELAY 4
-#define WORKER_FOR_SOCKET 8
+#define WORKER_READ 1
+#define WORKER_WRITE 2
+#define WORKER_DELAY 4
+#define WORKER_FOR_SOCKET 8
+#define WORKER_DO_PROC 16
/*
* Starting up and shutting down.
* will invoke upon completion.
*/
extern int AddDelayRequest ( unsigned int msecs,
- void* data,
CompletionProc onCompletion);
extern int AddIORequest ( int fd,
BOOL isSocket,
int len,
char* buffer,
- void* data,
CompletionProc onCompletion);
+extern int AddProcRequest ( void* proc,
+ void* data,
+ CompletionProc onCompletion);
+
#endif /* __IOMANAGER_H__ */