3 * Non-blocking / asynchronous I/O for Win32.
16 * Internal state maintained by the IO manager.
18 typedef struct IOManagerState {
24 unsigned int requestID;
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
31 * The routine executed by each worker thread.
36 IOWorkerProc(PVOID param)
40 IOManagerState* iom = (IOManagerState*)param;
41 WorkQueue* pq = iom->workQueue;
46 hWaits[0] = (HANDLE)iom->hExitEvent;
47 hWaits[1] = GetWorkQueueHandle(pq);
50 /* The error code is communicated back on completion of request; reset. */
53 EnterCriticalSection(&iom->manLock);
55 LeaveCriticalSection(&iom->manLock);
57 rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
59 EnterCriticalSection(&iom->manLock);
61 LeaveCriticalSection(&iom->manLock);
63 if ( WAIT_OBJECT_0 == rc ) {
66 fprintf(stderr, "shutting down...\n"); fflush(stderr);
69 } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
70 /* work item available, fetch it. */
72 fprintf(stderr, "work available...\n"); fflush(stderr);
74 if (FetchWork(pq,(void**)&work)) {
75 if ( work->workKind & WORKER_READ ) {
76 if ( work->workKind & WORKER_FOR_SOCKET ) {
77 len = recv(work->fd, work->buf, work->len, 0);
78 if (len == SOCKET_ERROR) {
79 errCode = WSAGetLastError();
82 len = read(work->fd, work->buf, work->len);
83 if (len == -1) { errCode = errno; }
85 } else if ( work->workKind & WORKER_WRITE ) {
86 if ( work->workKind & WORKER_FOR_SOCKET ) {
87 len = send(work->fd, work->buf, work->len, 0);
88 if (len == SOCKET_ERROR) {
89 errCode = WSAGetLastError();
92 len = write(work->fd,work->buf, work->len);
93 if (len == -1) { errCode = errno; }
95 } else if ( work->workKind & WORKER_DELAY ) {
96 /* very approximate implementation of threadDelay */
101 fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
105 work->onCompletion(work->requestID,
111 /* Free the WorkItem */
114 fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
118 fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
127 NewIOWorkerThread(IOManagerState* iom)
129 return ( 0 != _beginthreadex(NULL,
135 //CreateThread( NULL, 0, IOWorkerProc, (LPVOID)iom, 0, NULL));
145 if ( !wq ) return FALSE;
147 ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
154 /* A manual-reset event */
155 hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
162 ioMan->hExitEvent = hExit;
163 InitializeCriticalSection(&ioMan->manLock);
164 ioMan->workQueue = wq;
165 ioMan->numWorkers = 0;
166 ioMan->workersIdle = 0;
167 ioMan->requestID = 1;
173 * Function: AddIORequest()
175 * Conduit to underlying WorkQueue's SubmitWork(); adds IO
176 * request to work queue, returning without blocking.
179 AddIORequest ( int fd,
185 CompletionProc onCompletion)
187 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
188 if (!ioMan || !wItem) return 0;
190 /* Fill in the blanks */
192 wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
193 ( forWriting ? WORKER_WRITE : WORKER_READ );
197 wItem->onCompletion = onCompletion;
198 wItem->requestID = ioMan->requestID++;
200 EnterCriticalSection(&ioMan->manLock);
201 /* If there are no worker threads available, create one.
203 * If this turns out to be too aggressive a policy, refine.
206 fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
208 if ( ioMan->workersIdle == 0 ) {
210 NewIOWorkerThread(ioMan);
212 LeaveCriticalSection(&ioMan->manLock);
214 if (SubmitWork(ioMan->workQueue,wItem)) {
215 return wItem->requestID;
222 * Function: AddDelayRequest()
226 AddDelayRequest ( unsigned int msecs,
228 CompletionProc onCompletion)
230 WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
231 if (!ioMan || !wItem) return FALSE;
233 /* Fill in the blanks */
235 wItem->workKind = WORKER_DELAY;
239 wItem->onCompletion = onCompletion;
240 wItem->requestID = ioMan->requestID++;
242 EnterCriticalSection(&ioMan->manLock);
243 if ( ioMan->workersIdle == 0 ) {
245 NewIOWorkerThread(ioMan);
247 LeaveCriticalSection(&ioMan->manLock);
249 if (SubmitWork(ioMan->workQueue,wItem)) {
250 return wItem->requestID;
256 void ShutdownIOManager()
258 SetEvent(ioMan->hExitEvent);