3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
25 unsigned int requestID;
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
32 * The routine executed by each worker thread.
37 IOWorkerProc(PVOID param)
41 IOManagerState* iom = (IOManagerState*)param;
42 WorkQueue* pq = iom->workQueue;
48 hWaits[0] = (HANDLE)iom->hExitEvent;
49 hWaits[1] = GetWorkQueueHandle(pq);
52 /* The error code is communicated back on completion of request; reset. */
55 EnterCriticalSection(&iom->manLock);
56 /* Signal that the worker is idle.
58 * 'workersIdle' is used when determining whether or not to
59 * increase the worker thread pool when adding a new request.
60 * (see addIORequest().)
63 LeaveCriticalSection(&iom->manLock);
65 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
67 EnterCriticalSection(&iom->manLock);
68 /* Signal that the thread is 'non-idle' and about to consume
73 LeaveCriticalSection(&iom->manLock);
75 if ( WAIT_OBJECT_0 == rc ) {
78 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
79 /* work item available, fetch it. */
80 if (FetchWork(pq,(void**)&work)) {
81 if ( work->workKind & WORKER_READ ) {
82 if ( work->workKind & WORKER_FOR_SOCKET ) {
83 len = recv(work->workData.ioData.fd,
84 work->workData.ioData.buf,
85 work->workData.ioData.len,
87 if (len == SOCKET_ERROR) {
88 errCode = WSAGetLastError();
91 len = read(work->workData.ioData.fd,
92 work->workData.ioData.buf,
93 work->workData.ioData.len);
94 if (len == -1) { errCode = errno; }
96 complData = work->workData.ioData.buf;
97 fd = work->workData.ioData.fd;
98 } else if ( work->workKind & WORKER_WRITE ) {
99 if ( work->workKind & WORKER_FOR_SOCKET ) {
100 len = send(work->workData.ioData.fd,
101 work->workData.ioData.buf,
102 work->workData.ioData.len,
104 if (len == SOCKET_ERROR) {
105 errCode = WSAGetLastError();
108 len = write(work->workData.ioData.fd,
109 work->workData.ioData.buf,
110 work->workData.ioData.len);
111 if (len == -1) { errCode = errno; }
113 complData = work->workData.ioData.buf;
114 fd = work->workData.ioData.fd;
115 } else if ( work->workKind & WORKER_DELAY ) {
116 /* Approximate implementation of threadDelay;
118 * Note: Sleep() is in milliseconds, not micros.
120 Sleep(work->workData.delayData.msecs / 1000);
121 len = work->workData.delayData.msecs;
125 } else if ( work->workKind & WORKER_DO_PROC ) {
126 /* perform operation/proc on behalf of Haskell thread. */
127 if (work->workData.procData.proc) {
128 /* The procedure is assumed to encode result + success/failure
131 errCode=work->workData.procData.proc(work->workData.procData.param);
135 complData = work->workData.procData.param;
137 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
141 work->onCompletion(work->requestID,
146 /* Free the WorkItem */
149 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
153 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
162 NewIOWorkerThread(IOManagerState* iom)
165 return ( 0 != _beginthreadex(NULL,
180 if ( !wq ) return FALSE;
182 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
189 /* A manual-reset event */
190 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
197 ioMan->hExitEvent = hExit;
198 InitializeCriticalSection(&ioMan->manLock);
199 ioMan->workQueue = wq;
200 ioMan->numWorkers = 0;
201 ioMan->workersIdle = 0;
202 ioMan->queueSize = 0;
203 ioMan->requestID = 1;
209 * Function: AddIORequest()
211 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
212 * request to work queue, deciding whether or not to augment
213 * the thread pool in the process.
216 AddIORequest ( int fd,
221 CompletionProc onCompletion)
223 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
224 unsigned int reqID = ioMan->requestID++;
225 if (!ioMan || !wItem) return 0;
227 /* Fill in the blanks */
228 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
229 ( forWriting ? WORKER_WRITE : WORKER_READ );
230 wItem->workData.ioData.fd = fd;
231 wItem->workData.ioData.len = len;
232 wItem->workData.ioData.buf = buffer;
234 wItem->onCompletion = onCompletion;
235 wItem->requestID = reqID;
237 EnterCriticalSection(&ioMan->manLock);
238 /* If there are no worker threads available, create one.
240 * If this turns out to be too aggressive a policy, refine.
243 fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle);
246 /* A new worker thread is created when there are fewer idle threads
247 * than non-consumed queue requests. This ensures that requests will
248 * be dealt with in a timely manner.
250 * [Long explanation of why the previous thread pool policy lead to
253 * Previously, the thread pool was augmented iff no idle worker threads
254 * were available. That strategy runs the risk of repeatedly adding to
255 * the request queue without expanding the thread pool to handle this
256 * sudden spike in queued requests.
257 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
258 * thread is created and the, returning without blocking.
259 request is simply queued. If addIORequest()
260 * is called again _before the OS schedules a worker thread to pull the
261 * request off the queue_, workersIdle is still 1 and another request is
262 * simply added to the queue. Once the worker thread is run, only one
263 * request is de-queued, leaving the 2nd request in the queue]
265 * Assuming none of the queued requests take an inordinate amount of to
266 * complete, the request queue would eventually be drained. But if that's
267 * not the case, the later requests will end up languishing in the queue
268 * indefinitely. The non-timely handling of requests may cause CH applications
269 * to misbehave / hang; bad.
273 if ( ioMan->workersIdle < ioMan->queueSize ) {
275 LeaveCriticalSection(&ioMan->manLock);
276 NewIOWorkerThread(ioMan);
278 LeaveCriticalSection(&ioMan->manLock);
281 if (SubmitWork(ioMan->workQueue,wItem)) {
282 /* Note: the work item has potentially been consumed by a worker thread
283 * (and freed) at this point, so we cannot use wItem's requestID.
292 * Function: AddDelayRequest()
294 * Like AddIORequest(), but this time adding a delay request to
298 AddDelayRequest ( unsigned int msecs,
299 CompletionProc onCompletion)
301 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
302 unsigned int reqID = ioMan->requestID++;
303 if (!ioMan || !wItem) return FALSE;
305 /* Fill in the blanks */
306 wItem->workKind = WORKER_DELAY;
307 wItem->workData.delayData.msecs = msecs;
308 wItem->onCompletion = onCompletion;
309 wItem->requestID = reqID;
311 EnterCriticalSection(&ioMan->manLock);
313 fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
316 /* See AddIORequest() for comments regarding policy
317 * for augmenting the worker thread pool.
320 if ( ioMan->workersIdle < ioMan->queueSize ) {
322 LeaveCriticalSection(&ioMan->manLock);
323 NewIOWorkerThread(ioMan);
325 LeaveCriticalSection(&ioMan->manLock);
328 if (SubmitWork(ioMan->workQueue,wItem)) {
329 /* See AddIORequest() comment */
337 * Function: AddProcRequest()
339 * Add an asynchronous procedure request.
342 AddProcRequest ( void* proc,
344 CompletionProc onCompletion)
346 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
347 unsigned int reqID = ioMan->requestID++;
348 if (!ioMan || !wItem) return FALSE;
350 /* Fill in the blanks */
351 wItem->workKind = WORKER_DO_PROC;
352 wItem->workData.procData.proc = proc;
353 wItem->workData.procData.param = param;
354 wItem->onCompletion = onCompletion;
355 wItem->requestID = reqID;
357 EnterCriticalSection(&ioMan->manLock);
359 fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
362 /* See AddIORequest() for comments regarding policy
363 * for augmenting the worker thread pool.
366 if ( ioMan->workersIdle < ioMan->queueSize ) {
368 LeaveCriticalSection(&ioMan->manLock);
369 NewIOWorkerThread(ioMan);
371 LeaveCriticalSection(&ioMan->manLock);
374 if (SubmitWork(ioMan->workQueue,wItem)) {
375 /* See AddIORequest() comment */
382 void ShutdownIOManager()
384 SetEvent(ioMan->hExitEvent);