3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
25 unsigned int requestID;
28 /* ToDo: wrap up this state via a IOManager handle instead? */
29 static IOManagerState* ioMan;
32 * The routine executed by each worker thread.
37 IOWorkerProc(PVOID param)
41 IOManagerState* iom = (IOManagerState*)param;
42 WorkQueue* pq = iom->workQueue;
48 hWaits[0] = (HANDLE)iom->hExitEvent;
49 hWaits[1] = GetWorkQueueHandle(pq);
52 /* The error code is communicated back on completion of request; reset. */
55 EnterCriticalSection(&iom->manLock);
56 /* Signal that the worker is idle.
58 * 'workersIdle' is used when determining whether or not to
59 * increase the worker thread pool when adding a new request.
60 * (see addIORequest().)
63 LeaveCriticalSection(&iom->manLock);
66 * A possible future refinement is to make long-term idle threads
67 * wake up and decide to shut down should the number of idle threads
68 * be above some threshold.
71 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
73 EnterCriticalSection(&iom->manLock);
74 /* Signal that the thread is 'non-idle' and about to consume
79 LeaveCriticalSection(&iom->manLock);
81 if ( WAIT_OBJECT_0 == rc ) {
84 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
85 /* work item available, fetch it. */
86 if (FetchWork(pq,(void**)&work)) {
87 if ( work->workKind & WORKER_READ ) {
88 if ( work->workKind & WORKER_FOR_SOCKET ) {
89 len = recv(work->workData.ioData.fd,
90 work->workData.ioData.buf,
91 work->workData.ioData.len,
93 if (len == SOCKET_ERROR) {
94 errCode = WSAGetLastError();
97 len = read(work->workData.ioData.fd,
98 work->workData.ioData.buf,
99 work->workData.ioData.len);
100 if (len == -1) { errCode = errno; }
102 complData = work->workData.ioData.buf;
103 fd = work->workData.ioData.fd;
104 } else if ( work->workKind & WORKER_WRITE ) {
105 if ( work->workKind & WORKER_FOR_SOCKET ) {
106 len = send(work->workData.ioData.fd,
107 work->workData.ioData.buf,
108 work->workData.ioData.len,
110 if (len == SOCKET_ERROR) {
111 errCode = WSAGetLastError();
114 len = write(work->workData.ioData.fd,
115 work->workData.ioData.buf,
116 work->workData.ioData.len);
117 if (len == -1) { errCode = errno; }
119 complData = work->workData.ioData.buf;
120 fd = work->workData.ioData.fd;
121 } else if ( work->workKind & WORKER_DELAY ) {
122 /* Approximate implementation of threadDelay;
124 * Note: Sleep() is in milliseconds, not micros.
126 Sleep(work->workData.delayData.msecs / 1000);
127 len = work->workData.delayData.msecs;
131 } else if ( work->workKind & WORKER_DO_PROC ) {
132 /* perform operation/proc on behalf of Haskell thread. */
133 if (work->workData.procData.proc) {
134 /* The procedure is assumed to encode result + success/failure
137 errCode=work->workData.procData.proc(work->workData.procData.param);
141 complData = work->workData.procData.param;
143 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
147 work->onCompletion(work->requestID,
152 /* Free the WorkItem */
155 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
159 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
168 NewIOWorkerThread(IOManagerState* iom)
171 return ( 0 != _beginthreadex(NULL,
186 if ( !wq ) return FALSE;
188 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
195 /* A manual-reset event */
196 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
203 ioMan->hExitEvent = hExit;
204 InitializeCriticalSection(&ioMan->manLock);
205 ioMan->workQueue = wq;
206 ioMan->numWorkers = 0;
207 ioMan->workersIdle = 0;
208 ioMan->queueSize = 0;
209 ioMan->requestID = 1;
215 * Function: depositWorkItem()
217 * Local function which deposits a WorkItem onto a work queue,
218 * deciding in the process whether or not the thread pool needs
219 * to be augmented with another thread to handle the new request.
224 depositWorkItem( unsigned int reqID,
227 EnterCriticalSection(&ioMan->manLock);
230 fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
233 /* A new worker thread is created when there are fewer idle threads
234 * than non-consumed queue requests. This ensures that requests will
235 * be dealt with in a timely manner.
237 * [Long explanation of why the previous thread pool policy lead to
240 * Previously, the thread pool was augmented iff no idle worker threads
241 * were available. That strategy runs the risk of repeatedly adding to
242 * the request queue without expanding the thread pool to handle this
243 * sudden spike in queued requests.
244 * [How? Assume workersIdle is 1, and addIORequest() is called. No new
245 * thread is created and the request is simply queued. If addIORequest()
246 * is called again _before the OS schedules a worker thread to pull the
247 * request off the queue_, workersIdle is still 1 and another request is
248 * simply added to the queue. Once the worker thread is run, only one
249 * request is de-queued, leaving the 2nd request in the queue]
251 * Assuming none of the queued requests take an inordinate amount of to
252 * complete, the request queue would eventually be drained. But if that's
253 * not the case, the later requests will end up languishing in the queue
254 * indefinitely. The non-timely handling of requests may cause CH applications
255 * to misbehave / hang; bad.
259 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
260 /* see if giving up our quantum ferrets out some idle threads.
262 LeaveCriticalSection(&ioMan->manLock);
264 EnterCriticalSection(&ioMan->manLock);
265 if ( (ioMan->workersIdle < ioMan->queueSize) ) {
266 /* No, go ahead and create another. */
268 LeaveCriticalSection(&ioMan->manLock);
269 NewIOWorkerThread(ioMan);
271 LeaveCriticalSection(&ioMan->manLock);
274 LeaveCriticalSection(&ioMan->manLock);
277 if (SubmitWork(ioMan->workQueue,wItem)) {
278 /* Note: the work item has potentially been consumed by a worker thread
279 * (and freed) at this point, so we cannot use wItem's requestID.
288 * Function: AddIORequest()
290 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
291 * request to work queue, deciding whether or not to augment
292 * the thread pool in the process.
295 AddIORequest ( int fd,
300 CompletionProc onCompletion)
302 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
303 unsigned int reqID = ioMan->requestID++;
304 if (!ioMan || !wItem) return 0;
306 /* Fill in the blanks */
307 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
308 ( forWriting ? WORKER_WRITE : WORKER_READ );
309 wItem->workData.ioData.fd = fd;
310 wItem->workData.ioData.len = len;
311 wItem->workData.ioData.buf = buffer;
313 wItem->onCompletion = onCompletion;
314 wItem->requestID = reqID;
316 return depositWorkItem(reqID, wItem);
320 * Function: AddDelayRequest()
322 * Like AddIORequest(), but this time adding a delay request to
326 AddDelayRequest ( unsigned int msecs,
327 CompletionProc onCompletion)
329 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
330 unsigned int reqID = ioMan->requestID++;
331 if (!ioMan || !wItem) return FALSE;
333 /* Fill in the blanks */
334 wItem->workKind = WORKER_DELAY;
335 wItem->workData.delayData.msecs = msecs;
336 wItem->onCompletion = onCompletion;
337 wItem->requestID = reqID;
339 return depositWorkItem(reqID, wItem);
343 * Function: AddProcRequest()
345 * Add an asynchronous procedure request.
348 AddProcRequest ( void* proc,
350 CompletionProc onCompletion)
352 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
353 unsigned int reqID = ioMan->requestID++;
354 if (!ioMan || !wItem) return FALSE;
356 /* Fill in the blanks */
357 wItem->workKind = WORKER_DO_PROC;
358 wItem->workData.procData.proc = proc;
359 wItem->workData.procData.param = param;
360 wItem->onCompletion = onCompletion;
361 wItem->requestID = reqID;
363 return depositWorkItem(reqID, wItem);
366 void ShutdownIOManager ( void )
368 SetEvent(ioMan->hExitEvent);
369 // ToDo: we can't free this now, because the worker thread(s)
370 // haven't necessarily finished with it yet. Perhaps it should
371 // have a reference count or something.