42eba0092c0281c438118b7e82889b7ca6f3ef71
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19   CritSection      manLock;
20   WorkQueue*       workQueue;
21   int              numWorkers;
22   int              workersIdle;
23   HANDLE           hExitEvent;
24   unsigned int     requestID;
25 } IOManagerState;
26
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
29
30 /*
31  * The routine executed by each worker thread.
32  */
33 static
34 unsigned
35 WINAPI
36 IOWorkerProc(PVOID param)
37 {
38   HANDLE  hWaits[2];
39   DWORD   rc;
40   IOManagerState* iom = (IOManagerState*)param;
41   WorkQueue* pq = iom->workQueue;
42   WorkItem*  work;
43   int        len = 0, fd = 0;
44   DWORD      errCode;
45   void*      complData;
46
47   hWaits[0] = (HANDLE)iom->hExitEvent;
48   hWaits[1] = GetWorkQueueHandle(pq);
49   
50   while (1) {
51     /* The error code is communicated back on completion of request; reset. */
52     errCode = 0;
53
54     EnterCriticalSection(&iom->manLock);
55     iom->workersIdle++;
56     LeaveCriticalSection(&iom->manLock);
57
58     rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
59
60     EnterCriticalSection(&iom->manLock);
61     iom->workersIdle--;
62     LeaveCriticalSection(&iom->manLock);
63     
64     if ( WAIT_OBJECT_0 == rc ) {
65       /* shutdown */
66 #if 0
67       fprintf(stderr, "shutting down...\n"); fflush(stderr);
68 #endif
69       return 0;
70     } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
71       /* work item available, fetch it. */
72 #if 0
73       fprintf(stderr, "work available...\n"); fflush(stderr);
74 #endif
75       if (FetchWork(pq,(void**)&work)) {
76         if ( work->workKind & WORKER_READ ) {
77           if ( work->workKind & WORKER_FOR_SOCKET ) {
78             len = recv(work->workData.ioData.fd, 
79                        work->workData.ioData.buf,
80                        work->workData.ioData.len,
81                        0);
82             if (len == SOCKET_ERROR) {
83               errCode = WSAGetLastError();
84             }
85           } else {
86             len = read(work->workData.ioData.fd,
87                        work->workData.ioData.buf,
88                        work->workData.ioData.len);
89             if (len == -1) { errCode = errno; }
90           }
91           complData = work->workData.ioData.buf;
92           fd = work->workData.ioData.fd;
93         } else if ( work->workKind & WORKER_WRITE ) {
94           if ( work->workKind & WORKER_FOR_SOCKET ) {
95             len = send(work->workData.ioData.fd,
96                        work->workData.ioData.buf,
97                        work->workData.ioData.len,
98                        0);
99             if (len == SOCKET_ERROR) {
100               errCode = WSAGetLastError();
101             }
102           } else {
103             len = write(work->workData.ioData.fd,
104                         work->workData.ioData.buf,
105                         work->workData.ioData.len);
106             if (len == -1) { errCode = errno; }
107           }
108           complData = work->workData.ioData.buf;
109           fd = work->workData.ioData.fd;
110         } else if ( work->workKind & WORKER_DELAY ) {
111           /* very approximate implementation of threadDelay */
112           Sleep(work->workData.delayData.msecs);
113           len = work->workData.delayData.msecs;
114           complData = NULL;
115           fd = 0;
116           errCode = 0;
117         } else if ( work->workKind & WORKER_DO_PROC ) {
118             /* perform operation/proc on behalf of Haskell thread. */
119             if (work->workData.procData.proc) {
120                 /* The procedure is assumed to encode result + success/failure
121                  * via its param.
122                  */
123                 work->workData.procData.proc(work->workData.procData.param);
124                 errCode=0;
125             } else {
126                 errCode=1;
127             }
128             complData = work->workData.procData.param;
129         } else {
130           fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
131           fflush(stderr);
132           continue;
133         }
134         work->onCompletion(work->requestID,
135                            fd,
136                            len,
137                            complData,
138                            errCode);
139         /* Free the WorkItem */
140         free(work);
141       } else {
142           fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
143           return 1;
144       }
145     } else {
146           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
147           return 1;
148     }
149   }
150   return 0;
151 }
152
153 static 
154 BOOL
155 NewIOWorkerThread(IOManagerState* iom)
156 {
157   unsigned threadId;
158   return ( 0 != _beginthreadex(NULL,
159                                0,
160                                IOWorkerProc,
161                                (LPVOID)iom,
162                                0,
163                                &threadId) );
164 }
165
166 BOOL
167 StartIOManager(void)
168 {
169   HANDLE hExit;
170   WorkQueue* wq;
171
172   wq = NewWorkQueue();
173   if ( !wq ) return FALSE;  
174   
175   ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
176   
177   if (!ioMan) {
178     FreeWorkQueue(wq);
179     return FALSE;
180   }
181
182   /* A manual-reset event */
183   hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
184   if ( !hExit ) {
185     FreeWorkQueue(wq);
186     free(ioMan);
187     return FALSE;
188   }
189   
190   ioMan->hExitEvent = hExit;
191   InitializeCriticalSection(&ioMan->manLock);
192   ioMan->workQueue   = wq;
193   ioMan->numWorkers  = 0;
194   ioMan->workersIdle = 0;
195   ioMan->requestID   = 1;
196  
197   return TRUE;
198 }
199
200 /*
201  * Function: AddIORequest()
202  *
203  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
204  * request to work queue, returning without blocking.
205  */
206 int
207 AddIORequest ( int   fd,
208                BOOL  forWriting,
209                BOOL  isSocket,
210                int   len,
211                char* buffer,
212                CompletionProc onCompletion)
213 {
214   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
215   if (!ioMan || !wItem) return 0;
216   
217   /* Fill in the blanks */
218   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
219                         ( forWriting ? WORKER_WRITE : WORKER_READ );
220   wItem->workData.ioData.fd  = fd;
221   wItem->workData.ioData.len = len;
222   wItem->workData.ioData.buf = buffer;
223
224   wItem->onCompletion        = onCompletion;
225   wItem->requestID           = ioMan->requestID++;
226   
227   EnterCriticalSection(&ioMan->manLock);
228   /* If there are no worker threads available, create one.
229    *
230    * If this turns out to be too aggressive a policy, refine.
231    */
232 #if 0
233   fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
234 #endif
235   if ( ioMan->workersIdle == 0 ) {
236     ioMan->numWorkers++;
237     NewIOWorkerThread(ioMan);
238   }
239   LeaveCriticalSection(&ioMan->manLock);
240   
241   if (SubmitWork(ioMan->workQueue,wItem)) {
242     return wItem->requestID;
243   } else {
244     return 0;
245   }
246 }              
247
248 /*
249  * Function: AddDelayRequest()
250  *
251  */
252 BOOL
253 AddDelayRequest ( unsigned int   msecs,
254                   CompletionProc onCompletion)
255 {
256   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
257   if (!ioMan || !wItem) return FALSE;
258   
259   /* Fill in the blanks */
260   wItem->workKind     = WORKER_DELAY;
261   wItem->workData.delayData.msecs = msecs;
262   wItem->onCompletion = onCompletion;
263   wItem->requestID    = ioMan->requestID++;
264
265   EnterCriticalSection(&ioMan->manLock);
266   if ( ioMan->workersIdle == 0 ) {
267     ioMan->numWorkers++;
268     NewIOWorkerThread(ioMan);
269   }
270   LeaveCriticalSection(&ioMan->manLock);
271   
272   if (SubmitWork(ioMan->workQueue,wItem)) {
273     return wItem->requestID;
274   } else {
275     return 0;
276   }
277 }
278
279 /*
280  * Function: AddDelayRequest()
281  *
282  */
283 BOOL
284 AddProcRequest ( void* proc,
285                  void* param,
286                  CompletionProc onCompletion)
287 {
288   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
289   if (!ioMan || !wItem) return FALSE;
290   
291   /* Fill in the blanks */
292   wItem->workKind     = WORKER_DO_PROC;
293   wItem->workData.procData.proc  = proc;
294   wItem->workData.procData.param = param;
295   wItem->onCompletion = onCompletion;
296   wItem->requestID    = ioMan->requestID++;
297
298   EnterCriticalSection(&ioMan->manLock);
299   if ( ioMan->workersIdle == 0 ) {
300     ioMan->numWorkers++;
301     NewIOWorkerThread(ioMan);
302   }
303   LeaveCriticalSection(&ioMan->manLock);
304   
305   if (SubmitWork(ioMan->workQueue,wItem)) {
306     return wItem->requestID;
307   } else {
308     return 0;
309   }
310 }
311
312 void ShutdownIOManager()
313 {
314   SetEvent(ioMan->hExitEvent);
315   free(ioMan);
316   ioMan = NULL;
317 }