[project @ 2003-07-16 17:40:38 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19   CritSection      manLock;
20   WorkQueue*       workQueue;
21   int              numWorkers;
22   int              workersIdle;
23   HANDLE           hExitEvent;
24   unsigned int     requestID;
25 } IOManagerState;
26
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
29
30 /*
31  * The routine executed by each worker thread.
32  */
33 static
34 unsigned
35 WINAPI
36 IOWorkerProc(PVOID param)
37 {
38   HANDLE  hWaits[2];
39   DWORD   rc;
40   IOManagerState* iom = (IOManagerState*)param;
41   WorkQueue* pq = iom->workQueue;
42   WorkItem*  work;
43   int        len = 0, fd = 0;
44   DWORD      errCode;
45   void*      complData;
46
47   hWaits[0] = (HANDLE)iom->hExitEvent;
48   hWaits[1] = GetWorkQueueHandle(pq);
49   
50   while (1) {
51     /* The error code is communicated back on completion of request; reset. */
52     errCode = 0;
53
54     EnterCriticalSection(&iom->manLock);
55     iom->workersIdle++;
56     LeaveCriticalSection(&iom->manLock);
57
58     rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
59
60     EnterCriticalSection(&iom->manLock);
61     iom->workersIdle--;
62     LeaveCriticalSection(&iom->manLock);
63     
64     if ( WAIT_OBJECT_0 == rc ) {
65       /* shutdown */
66 #if 0
67       fprintf(stderr, "shutting down...\n"); fflush(stderr);
68 #endif
69       return 0;
70     } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
71       /* work item available, fetch it. */
72 #if 0
73       fprintf(stderr, "work available...\n"); fflush(stderr);
74 #endif
75       if (FetchWork(pq,(void**)&work)) {
76         if ( work->workKind & WORKER_READ ) {
77           if ( work->workKind & WORKER_FOR_SOCKET ) {
78             len = recv(work->workData.ioData.fd, 
79                        work->workData.ioData.buf,
80                        work->workData.ioData.len,
81                        0);
82             if (len == SOCKET_ERROR) {
83               errCode = WSAGetLastError();
84             }
85           } else {
86             len = read(work->workData.ioData.fd,
87                        work->workData.ioData.buf,
88                        work->workData.ioData.len);
89             if (len == -1) { errCode = errno; }
90           }
91           complData = work->workData.ioData.buf;
92           fd = work->workData.ioData.fd;
93         } else if ( work->workKind & WORKER_WRITE ) {
94           if ( work->workKind & WORKER_FOR_SOCKET ) {
95             len = send(work->workData.ioData.fd,
96                        work->workData.ioData.buf,
97                        work->workData.ioData.len,
98                        0);
99             if (len == SOCKET_ERROR) {
100               errCode = WSAGetLastError();
101             }
102           } else {
103             len = write(work->workData.ioData.fd,
104                         work->workData.ioData.buf,
105                         work->workData.ioData.len);
106             if (len == -1) { errCode = errno; }
107           }
108           complData = work->workData.ioData.buf;
109           fd = work->workData.ioData.fd;
110         } else if ( work->workKind & WORKER_DELAY ) {
111           /* very approximate implementation of threadDelay */
112           Sleep(work->workData.delayData.msecs);
113           len = work->workData.delayData.msecs;
114           complData = NULL;
115           fd = 0;
116           errCode = 0;
117         } else if ( work->workKind & WORKER_DO_PROC ) {
118             /* perform operation/proc on behalf of Haskell thread. */
119             if (work->workData.procData.proc) {
120                 /* The procedure is assumed to encode result + success/failure
121                  * via its param.
122                  */
123                 errCode=work->workData.procData.proc(work->workData.procData.param);
124             } else {
125                 errCode=1;
126             }
127             complData = work->workData.procData.param;
128         } else {
129           fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
130           fflush(stderr);
131           continue;
132         }
133         work->onCompletion(work->requestID,
134                            fd,
135                            len,
136                            complData,
137                            errCode);
138         /* Free the WorkItem */
139         free(work);
140       } else {
141           fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
142           return 1;
143       }
144     } else {
145           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
146           return 1;
147     }
148   }
149   return 0;
150 }
151
152 static 
153 BOOL
154 NewIOWorkerThread(IOManagerState* iom)
155 {
156   unsigned threadId;
157   return ( 0 != _beginthreadex(NULL,
158                                0,
159                                IOWorkerProc,
160                                (LPVOID)iom,
161                                0,
162                                &threadId) );
163 }
164
165 BOOL
166 StartIOManager(void)
167 {
168   HANDLE hExit;
169   WorkQueue* wq;
170
171   wq = NewWorkQueue();
172   if ( !wq ) return FALSE;  
173   
174   ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
175   
176   if (!ioMan) {
177     FreeWorkQueue(wq);
178     return FALSE;
179   }
180
181   /* A manual-reset event */
182   hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
183   if ( !hExit ) {
184     FreeWorkQueue(wq);
185     free(ioMan);
186     return FALSE;
187   }
188   
189   ioMan->hExitEvent = hExit;
190   InitializeCriticalSection(&ioMan->manLock);
191   ioMan->workQueue   = wq;
192   ioMan->numWorkers  = 0;
193   ioMan->workersIdle = 0;
194   ioMan->requestID   = 1;
195  
196   return TRUE;
197 }
198
199 /*
200  * Function: AddIORequest()
201  *
202  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
203  * request to work queue, returning without blocking.
204  */
205 int
206 AddIORequest ( int   fd,
207                BOOL  forWriting,
208                BOOL  isSocket,
209                int   len,
210                char* buffer,
211                CompletionProc onCompletion)
212 {
213   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
214   if (!ioMan || !wItem) return 0;
215   
216   /* Fill in the blanks */
217   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
218                         ( forWriting ? WORKER_WRITE : WORKER_READ );
219   wItem->workData.ioData.fd  = fd;
220   wItem->workData.ioData.len = len;
221   wItem->workData.ioData.buf = buffer;
222
223   wItem->onCompletion        = onCompletion;
224   wItem->requestID           = ioMan->requestID++;
225   
226   EnterCriticalSection(&ioMan->manLock);
227   /* If there are no worker threads available, create one.
228    *
229    * If this turns out to be too aggressive a policy, refine.
230    */
231 #if 0
232   fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
233 #endif
234   if ( ioMan->workersIdle == 0 ) {
235     ioMan->numWorkers++;
236     LeaveCriticalSection(&ioMan->manLock);
237     NewIOWorkerThread(ioMan);
238   } else {
239       LeaveCriticalSection(&ioMan->manLock);
240   }
241   
242   if (SubmitWork(ioMan->workQueue,wItem)) {
243     return wItem->requestID;
244   } else {
245     return 0;
246   }
247 }              
248
249 /*
250  * Function: AddDelayRequest()
251  *
252  */
253 BOOL
254 AddDelayRequest ( unsigned int   msecs,
255                   CompletionProc onCompletion)
256 {
257   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
258   if (!ioMan || !wItem) return FALSE;
259   
260   /* Fill in the blanks */
261   wItem->workKind     = WORKER_DELAY;
262   wItem->workData.delayData.msecs = msecs;
263   wItem->onCompletion = onCompletion;
264   wItem->requestID    = ioMan->requestID++;
265
266   EnterCriticalSection(&ioMan->manLock);
267 #if 0
268   fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); fflush(stderr);
269 #endif
270   if ( ioMan->workersIdle == 0 ) {
271     ioMan->numWorkers++;
272     LeaveCriticalSection(&ioMan->manLock);
273     NewIOWorkerThread(ioMan);
274   } else {
275       LeaveCriticalSection(&ioMan->manLock);
276   }
277   
278   if (SubmitWork(ioMan->workQueue,wItem)) {
279     return wItem->requestID;
280   } else {
281     return 0;
282   }
283 }
284
285 /*
286  * Function: AddDelayRequest()
287  *
288  */
289 BOOL
290 AddProcRequest ( void* proc,
291                  void* param,
292                  CompletionProc onCompletion)
293 {
294   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
295   if (!ioMan || !wItem) return FALSE;
296   
297   /* Fill in the blanks */
298   wItem->workKind     = WORKER_DO_PROC;
299   wItem->workData.procData.proc  = proc;
300   wItem->workData.procData.param = param;
301   wItem->onCompletion = onCompletion;
302   wItem->requestID    = ioMan->requestID++;
303
304   EnterCriticalSection(&ioMan->manLock);
305 #if 0
306   fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); fflush(stderr);
307 #endif
308   if ( ioMan->workersIdle == 0 ) {
309     ioMan->numWorkers++;
310     LeaveCriticalSection(&ioMan->manLock);
311     NewIOWorkerThread(ioMan);
312   } else {
313       LeaveCriticalSection(&ioMan->manLock);
314   }
315   
316   if (SubmitWork(ioMan->workQueue,wItem)) {
317     return wItem->requestID;
318   } else {
319     return 0;
320   }
321 }
322
323 void ShutdownIOManager()
324 {
325   SetEvent(ioMan->hExitEvent);
326   free(ioMan);
327   ioMan = NULL;
328 }