[project @ 2003-09-11 00:58:02 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19   CritSection      manLock;
20   WorkQueue*       workQueue;
21   int              numWorkers;
22   int              workersIdle;
23   HANDLE           hExitEvent;
24   unsigned int     requestID;
25 } IOManagerState;
26
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
29
30 /*
31  * The routine executed by each worker thread.
32  */
33 static
34 unsigned
35 WINAPI
36 IOWorkerProc(PVOID param)
37 {
38   HANDLE  hWaits[2];
39   DWORD   rc;
40   IOManagerState* iom = (IOManagerState*)param;
41   WorkQueue* pq = iom->workQueue;
42   WorkItem*  work;
43   int        len = 0, fd = 0;
44   DWORD      errCode;
45   void*      complData;
46
47   hWaits[0] = (HANDLE)iom->hExitEvent;
48   hWaits[1] = GetWorkQueueHandle(pq);
49   
50   while (1) {
51     /* The error code is communicated back on completion of request; reset. */
52     errCode = 0;
53
54     EnterCriticalSection(&iom->manLock);
55     iom->workersIdle++;
56     LeaveCriticalSection(&iom->manLock);
57
58     rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
59
60     EnterCriticalSection(&iom->manLock);
61     iom->workersIdle--;
62     LeaveCriticalSection(&iom->manLock);
63     
64     if ( WAIT_OBJECT_0 == rc ) {
65       /* shutdown */
66 #if 0
67       fprintf(stderr, "shutting down...\n"); fflush(stderr);
68 #endif
69       return 0;
70     } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
71       /* work item available, fetch it. */
72 #if 0
73       fprintf(stderr, "work available...\n"); fflush(stderr);
74 #endif
75       if (FetchWork(pq,(void**)&work)) {
76         if ( work->workKind & WORKER_READ ) {
77           if ( work->workKind & WORKER_FOR_SOCKET ) {
78             len = recv(work->workData.ioData.fd, 
79                        work->workData.ioData.buf,
80                        work->workData.ioData.len,
81                        0);
82             if (len == SOCKET_ERROR) {
83               errCode = WSAGetLastError();
84             }
85           } else {
86             len = read(work->workData.ioData.fd,
87                        work->workData.ioData.buf,
88                        work->workData.ioData.len);
89             if (len == -1) { errCode = errno; }
90           }
91           complData = work->workData.ioData.buf;
92           fd = work->workData.ioData.fd;
93         } else if ( work->workKind & WORKER_WRITE ) {
94           if ( work->workKind & WORKER_FOR_SOCKET ) {
95             len = send(work->workData.ioData.fd,
96                        work->workData.ioData.buf,
97                        work->workData.ioData.len,
98                        0);
99             if (len == SOCKET_ERROR) {
100               errCode = WSAGetLastError();
101             }
102           } else {
103             len = write(work->workData.ioData.fd,
104                         work->workData.ioData.buf,
105                         work->workData.ioData.len);
106             if (len == -1) { errCode = errno; }
107           }
108           complData = work->workData.ioData.buf;
109           fd = work->workData.ioData.fd;
110         } else if ( work->workKind & WORKER_DELAY ) {
111           /* very approximate implementation of threadDelay */
112           Sleep(work->workData.delayData.msecs);
113           len = work->workData.delayData.msecs;
114           complData = NULL;
115           fd = 0;
116           errCode = 0;
117         } else if ( work->workKind & WORKER_DO_PROC ) {
118             /* perform operation/proc on behalf of Haskell thread. */
119             if (work->workData.procData.proc) {
120                 /* The procedure is assumed to encode result + success/failure
121                  * via its param.
122                  */
123                 errCode=work->workData.procData.proc(work->workData.procData.param);
124             } else {
125                 errCode=1;
126             }
127             complData = work->workData.procData.param;
128         } else {
129           fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
130           fflush(stderr);
131           continue;
132         }
133         work->onCompletion(work->requestID,
134                            fd,
135                            len,
136                            complData,
137                            errCode);
138         /* Free the WorkItem */
139         free(work);
140       } else {
141           fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
142           return 1;
143       }
144     } else {
145           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
146           return 1;
147     }
148   }
149   return 0;
150 }
151
152 static 
153 BOOL
154 NewIOWorkerThread(IOManagerState* iom)
155 {
156   unsigned threadId;
157   return ( 0 != _beginthreadex(NULL,
158                                0,
159                                IOWorkerProc,
160                                (LPVOID)iom,
161                                0,
162                                &threadId) );
163 }
164
165 BOOL
166 StartIOManager(void)
167 {
168   HANDLE hExit;
169   WorkQueue* wq;
170
171   wq = NewWorkQueue();
172   if ( !wq ) return FALSE;  
173   
174   ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
175   
176   if (!ioMan) {
177     FreeWorkQueue(wq);
178     return FALSE;
179   }
180
181   /* A manual-reset event */
182   hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
183   if ( !hExit ) {
184     FreeWorkQueue(wq);
185     free(ioMan);
186     return FALSE;
187   }
188   
189   ioMan->hExitEvent = hExit;
190   InitializeCriticalSection(&ioMan->manLock);
191   ioMan->workQueue   = wq;
192   ioMan->numWorkers  = 0;
193   ioMan->workersIdle = 0;
194   ioMan->requestID   = 1;
195  
196   return TRUE;
197 }
198
199 /*
200  * Function: AddIORequest()
201  *
202  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
203  * request to work queue, returning without blocking.
204  */
205 int
206 AddIORequest ( int   fd,
207                BOOL  forWriting,
208                BOOL  isSocket,
209                int   len,
210                char* buffer,
211                CompletionProc onCompletion)
212 {
213   WorkItem* wItem    = (WorkItem*)malloc(sizeof(WorkItem));
214   unsigned int reqID = ioMan->requestID++;
215   if (!ioMan || !wItem) return 0;
216   
217   /* Fill in the blanks */
218   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
219                         ( forWriting ? WORKER_WRITE : WORKER_READ );
220   wItem->workData.ioData.fd  = fd;
221   wItem->workData.ioData.len = len;
222   wItem->workData.ioData.buf = buffer;
223
224   wItem->onCompletion        = onCompletion;
225   wItem->requestID           = reqID;
226   
227   EnterCriticalSection(&ioMan->manLock);
228   /* If there are no worker threads available, create one.
229    *
230    * If this turns out to be too aggressive a policy, refine.
231    */
232 #if 0
233   fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
234 #endif
235   if ( ioMan->workersIdle == 0 ) {
236     ioMan->numWorkers++;
237     LeaveCriticalSection(&ioMan->manLock);
238     NewIOWorkerThread(ioMan);
239   } else {
240       LeaveCriticalSection(&ioMan->manLock);
241   }
242   
243   if (SubmitWork(ioMan->workQueue,wItem)) {
244       /* Note: the work item has potentially been consumed by a worker thread
245        *       (and freed) at this point, so we cannot use wItem's requestID.
246        */
247       return reqID;
248   } else {
249       return 0;
250   }
251 }              
252
253 /*
254  * Function: AddDelayRequest()
255  *
256  */
257 BOOL
258 AddDelayRequest ( unsigned int   msecs,
259                   CompletionProc onCompletion)
260 {
261   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
262   unsigned int reqID = ioMan->requestID++;
263   if (!ioMan || !wItem) return FALSE;
264   
265   /* Fill in the blanks */
266   wItem->workKind     = WORKER_DELAY;
267   wItem->workData.delayData.msecs = msecs;
268   wItem->onCompletion = onCompletion;
269   wItem->requestID    = reqID;
270
271   EnterCriticalSection(&ioMan->manLock);
272 #if 0
273   fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); fflush(stderr);
274 #endif
275   if ( ioMan->workersIdle == 0 ) {
276     ioMan->numWorkers++;
277     LeaveCriticalSection(&ioMan->manLock);
278     NewIOWorkerThread(ioMan);
279   } else {
280       LeaveCriticalSection(&ioMan->manLock);
281   }
282   
283   if (SubmitWork(ioMan->workQueue,wItem)) {
284       /* See AddIORequest() comment */
285       return reqID;
286   } else {
287       return 0;
288   }
289 }
290
291 /*
292  * Function: AddDelayRequest()
293  *
294  */
295 BOOL
296 AddProcRequest ( void* proc,
297                  void* param,
298                  CompletionProc onCompletion)
299 {
300   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
301   unsigned int reqID = ioMan->requestID++;
302   if (!ioMan || !wItem) return FALSE;
303   
304   /* Fill in the blanks */
305   wItem->workKind     = WORKER_DO_PROC;
306   wItem->workData.procData.proc  = proc;
307   wItem->workData.procData.param = param;
308   wItem->onCompletion = onCompletion;
309   wItem->requestID    = reqID;
310
311   EnterCriticalSection(&ioMan->manLock);
312 #if 0
313   fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); fflush(stderr);
314 #endif
315   if ( ioMan->workersIdle == 0 ) {
316     ioMan->numWorkers++;
317     LeaveCriticalSection(&ioMan->manLock);
318     NewIOWorkerThread(ioMan);
319   } else {
320       LeaveCriticalSection(&ioMan->manLock);
321   }
322   
323   if (SubmitWork(ioMan->workQueue,wItem)) {
324       /* See AddIORequest() comment */
325       return reqID;
326   } else {
327       return 0;
328   }
329 }
330
331 void ShutdownIOManager()
332 {
333   SetEvent(ioMan->hExitEvent);
334   free(ioMan);
335   ioMan = NULL;
336 }