3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
24 unsigned int requestID;
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
31 * The routine executed by each worker thread.
36 IOWorkerProc(PVOID param)
40 IOManagerState* iom = (IOManagerState*)param;
41 WorkQueue* pq = iom->workQueue;
46 hWaits[0] = (HANDLE)iom->hExitEvent;
47 hWaits[1] = GetWorkQueueHandle(pq);
50 /* The error code is communicated back on completion of request; reset. */
53 EnterCriticalSection(&iom->manLock);
55 LeaveCriticalSection(&iom->manLock);
57 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
59 EnterCriticalSection(&iom->manLock);
61 LeaveCriticalSection(&iom->manLock);
63 if ( WAIT_OBJECT_0 == rc ) {
66 fprintf(stderr, "shutting down...\n"); fflush(stderr);
69 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
70 /* work item available, fetch it. */
72 fprintf(stderr, "work available...\n"); fflush(stderr);
74 if (FetchWork(pq,(void**)&work)) {
75 if ( work->workKind & WORKER_READ ) {
76 if ( work->workKind & WORKER_FOR_SOCKET ) {
77 len = recv(work->fd, work->buf, work->len, 0);
78 if (len == SOCKET_ERROR) {
79 errCode = WSAGetLastError();
82 len = read(work->fd, work->buf, work->len);
83 if (len == -1) { errCode = errno; }
85 } else if ( work->workKind & WORKER_WRITE ) {
86 if ( work->workKind & WORKER_FOR_SOCKET ) {
87 len = send(work->fd, work->buf, work->len, 0);
88 if (len == SOCKET_ERROR) {
89 errCode = WSAGetLastError();
92 len = write(work->fd,work->buf, work->len);
93 if (len == -1) { errCode = errno; }
95 } else if ( work->workKind & WORKER_DELAY ) {
96 /* very approximate implementation of threadDelay */
101 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
105 work->onCompletion(work->requestID,
111 /* Free the WorkItem */
114 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
118 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
127 NewIOWorkerThread(IOManagerState* iom)
129 return ( 0 != _beginthreadex(NULL,
144 if ( !wq ) return FALSE;
146 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
153 /* A manual-reset event */
154 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
161 ioMan->hExitEvent = hExit;
162 InitializeCriticalSection(&ioMan->manLock);
163 ioMan->workQueue = wq;
164 ioMan->numWorkers = 0;
165 ioMan->workersIdle = 0;
166 ioMan->requestID = 1;
172 * Function: AddIORequest()
174 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
175 * request to work queue, returning without blocking.
178 AddIORequest ( int fd,
184 CompletionProc onCompletion)
186 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
187 if (!ioMan || !wItem) return 0;
189 /* Fill in the blanks */
191 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
192 ( forWriting ? WORKER_WRITE : WORKER_READ );
196 wItem->onCompletion = onCompletion;
197 wItem->requestID = ioMan->requestID++;
199 EnterCriticalSection(&ioMan->manLock);
200 /* If there are no worker threads available, create one.
202 * If this turns out to be too aggressive a policy, refine.
205 fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
207 if ( ioMan->workersIdle == 0 ) {
209 NewIOWorkerThread(ioMan);
211 LeaveCriticalSection(&ioMan->manLock);
213 if (SubmitWork(ioMan->workQueue,wItem)) {
214 return wItem->requestID;
221 * Function: AddDelayRequest()
225 AddDelayRequest ( unsigned int msecs,
227 CompletionProc onCompletion)
229 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
230 if (!ioMan || !wItem) return FALSE;
232 /* Fill in the blanks */
234 wItem->workKind = WORKER_DELAY;
238 wItem->onCompletion = onCompletion;
239 wItem->requestID = ioMan->requestID++;
241 EnterCriticalSection(&ioMan->manLock);
242 if ( ioMan->workersIdle == 0 ) {
244 NewIOWorkerThread(ioMan);
246 LeaveCriticalSection(&ioMan->manLock);
248 if (SubmitWork(ioMan->workQueue,wItem)) {
249 return wItem->requestID;
255 void ShutdownIOManager()
257 SetEvent(ioMan->hExitEvent);