3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
24 unsigned int requestID;
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
31 * The routine executed by each worker thread.
36 IOWorkerProc(PVOID param)
40 IOManagerState* iom = (IOManagerState*)param;
41 WorkQueue* pq = iom->workQueue;
47 hWaits[0] = (HANDLE)iom->hExitEvent;
48 hWaits[1] = GetWorkQueueHandle(pq);
51 /* The error code is communicated back on completion of request; reset. */
54 EnterCriticalSection(&iom->manLock);
56 LeaveCriticalSection(&iom->manLock);
58 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
60 EnterCriticalSection(&iom->manLock);
62 LeaveCriticalSection(&iom->manLock);
64 if ( WAIT_OBJECT_0 == rc ) {
67 fprintf(stderr, "shutting down...\n"); fflush(stderr);
70 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
71 /* work item available, fetch it. */
73 fprintf(stderr, "work available...\n"); fflush(stderr);
75 if (FetchWork(pq,(void**)&work)) {
76 if ( work->workKind & WORKER_READ ) {
77 if ( work->workKind & WORKER_FOR_SOCKET ) {
78 len = recv(work->workData.ioData.fd,
79 work->workData.ioData.buf,
80 work->workData.ioData.len,
82 if (len == SOCKET_ERROR) {
83 errCode = WSAGetLastError();
86 len = read(work->workData.ioData.fd,
87 work->workData.ioData.buf,
88 work->workData.ioData.len);
89 if (len == -1) { errCode = errno; }
91 complData = work->workData.ioData.buf;
92 fd = work->workData.ioData.fd;
93 } else if ( work->workKind & WORKER_WRITE ) {
94 if ( work->workKind & WORKER_FOR_SOCKET ) {
95 len = send(work->workData.ioData.fd,
96 work->workData.ioData.buf,
97 work->workData.ioData.len,
99 if (len == SOCKET_ERROR) {
100 errCode = WSAGetLastError();
103 len = write(work->workData.ioData.fd,
104 work->workData.ioData.buf,
105 work->workData.ioData.len);
106 if (len == -1) { errCode = errno; }
108 complData = work->workData.ioData.buf;
109 fd = work->workData.ioData.fd;
110 } else if ( work->workKind & WORKER_DELAY ) {
111 /* very approximate implementation of threadDelay */
112 Sleep(work->workData.delayData.msecs);
113 len = work->workData.delayData.msecs;
117 } else if ( work->workKind & WORKER_DO_PROC ) {
118 /* perform operation/proc on behalf of Haskell thread. */
119 if (work->workData.procData.proc) {
120 /* The procedure is assumed to encode result + success/failure
123 errCode=work->workData.procData.proc(work->workData.procData.param);
127 complData = work->workData.procData.param;
129 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
133 work->onCompletion(work->requestID,
138 /* Free the WorkItem */
141 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
145 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
154 NewIOWorkerThread(IOManagerState* iom)
157 return ( 0 != _beginthreadex(NULL,
172 if ( !wq ) return FALSE;
174 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
181 /* A manual-reset event */
182 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
189 ioMan->hExitEvent = hExit;
190 InitializeCriticalSection(&ioMan->manLock);
191 ioMan->workQueue = wq;
192 ioMan->numWorkers = 0;
193 ioMan->workersIdle = 0;
194 ioMan->requestID = 1;
200 * Function: AddIORequest()
202 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
203 * request to work queue, returning without blocking.
206 AddIORequest ( int fd,
211 CompletionProc onCompletion)
213 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
214 unsigned int reqID = ioMan->requestID++;
215 if (!ioMan || !wItem) return 0;
217 /* Fill in the blanks */
218 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
219 ( forWriting ? WORKER_WRITE : WORKER_READ );
220 wItem->workData.ioData.fd = fd;
221 wItem->workData.ioData.len = len;
222 wItem->workData.ioData.buf = buffer;
224 wItem->onCompletion = onCompletion;
225 wItem->requestID = reqID;
227 EnterCriticalSection(&ioMan->manLock);
228 /* If there are no worker threads available, create one.
230 * If this turns out to be too aggressive a policy, refine.
233 fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
235 if ( ioMan->workersIdle == 0 ) {
237 LeaveCriticalSection(&ioMan->manLock);
238 NewIOWorkerThread(ioMan);
240 LeaveCriticalSection(&ioMan->manLock);
243 if (SubmitWork(ioMan->workQueue,wItem)) {
244 /* Note: the work item has potentially been consumed by a worker thread
245 * (and freed) at this point, so we cannot use wItem's requestID.
254 * Function: AddDelayRequest()
258 AddDelayRequest ( unsigned int msecs,
259 CompletionProc onCompletion)
261 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
262 unsigned int reqID = ioMan->requestID++;
263 if (!ioMan || !wItem) return FALSE;
265 /* Fill in the blanks */
266 wItem->workKind = WORKER_DELAY;
267 wItem->workData.delayData.msecs = msecs;
268 wItem->onCompletion = onCompletion;
269 wItem->requestID = reqID;
271 EnterCriticalSection(&ioMan->manLock);
273 fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); fflush(stderr);
275 if ( ioMan->workersIdle == 0 ) {
277 LeaveCriticalSection(&ioMan->manLock);
278 NewIOWorkerThread(ioMan);
280 LeaveCriticalSection(&ioMan->manLock);
283 if (SubmitWork(ioMan->workQueue,wItem)) {
284 /* See AddIORequest() comment */
292 * Function: AddDelayRequest()
296 AddProcRequest ( void* proc,
298 CompletionProc onCompletion)
300 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
301 unsigned int reqID = ioMan->requestID++;
302 if (!ioMan || !wItem) return FALSE;
304 /* Fill in the blanks */
305 wItem->workKind = WORKER_DO_PROC;
306 wItem->workData.procData.proc = proc;
307 wItem->workData.procData.param = param;
308 wItem->onCompletion = onCompletion;
309 wItem->requestID = reqID;
311 EnterCriticalSection(&ioMan->manLock);
313 fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); fflush(stderr);
315 if ( ioMan->workersIdle == 0 ) {
317 LeaveCriticalSection(&ioMan->manLock);
318 NewIOWorkerThread(ioMan);
320 LeaveCriticalSection(&ioMan->manLock);
323 if (SubmitWork(ioMan->workQueue,wItem)) {
324 /* See AddIORequest() comment */
331 void ShutdownIOManager()
333 SetEvent(ioMan->hExitEvent);