[project @ 2003-02-21 05:34:12 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19   CritSection      manLock;
20   WorkQueue*       workQueue;
21   int              numWorkers;
22   int              workersIdle;
23   HANDLE           hExitEvent;
24   unsigned int     requestID;
25 } IOManagerState;
26
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
29
30 /*
31  * The routine executed by each worker thread.
32  */
33 static
34 unsigned
35 WINAPI
36 IOWorkerProc(PVOID param)
37 {
38   HANDLE  hWaits[2];
39   DWORD   rc;
40   IOManagerState* iom = (IOManagerState*)param;
41   WorkQueue* pq = iom->workQueue;
42   WorkItem*  work;
43   int        len;
44   DWORD      errCode;
45
46   hWaits[0] = (HANDLE)iom->hExitEvent;
47   hWaits[1] = GetWorkQueueHandle(pq);
48   
49   while (1) {
50     /* The error code is communicated back on completion of request; reset. */
51     errCode = 0;
52
53     EnterCriticalSection(&iom->manLock);
54     iom->workersIdle++;
55     LeaveCriticalSection(&iom->manLock);
56
57     rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
58
59     EnterCriticalSection(&iom->manLock);
60     iom->workersIdle--;
61     LeaveCriticalSection(&iom->manLock);
62     
63     if ( WAIT_OBJECT_0 == rc ) {
64       /* shutdown */
65 #if 0
66       fprintf(stderr, "shutting down...\n"); fflush(stderr);
67 #endif
68       return 0;
69     } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
70       /* work item available, fetch it. */
71 #if 0
72       fprintf(stderr, "work available...\n"); fflush(stderr);
73 #endif
74       if (FetchWork(pq,(void**)&work)) {
75         if ( work->workKind & WORKER_READ ) {
76           if ( work->workKind & WORKER_FOR_SOCKET ) {
77             len = recv(work->fd, work->buf, work->len, 0);
78             if (len == SOCKET_ERROR) {
79               errCode = WSAGetLastError();
80             }
81           } else {
82             len = read(work->fd, work->buf, work->len);
83             if (len == -1) { errCode = errno; }
84           }
85         } else if ( work->workKind & WORKER_WRITE ) {
86           if ( work->workKind & WORKER_FOR_SOCKET ) {
87             len = send(work->fd, work->buf, work->len, 0);
88             if (len == SOCKET_ERROR) {
89               errCode = WSAGetLastError();
90             }
91           } else {
92             len = write(work->fd,work->buf, work->len);
93             if (len == -1) { errCode = errno; }
94           }
95         } else if ( work->workKind & WORKER_DELAY ) {
96           /* very approximate implementation of threadDelay */
97           Sleep(work->len);
98           len = work->len;
99           errCode = 0;
100         } else {
101           fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
102           fflush(stderr);
103           continue;
104         }
105         work->onCompletion(work->requestID,
106                            work->param,
107                            work->fd,
108                            len,
109                            work->buf,
110                            errCode);
111         /* Free the WorkItem */
112         free(work);
113       } else {
114           fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
115           return 1;
116       }
117     } else {
118           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
119           return 1;
120     }
121   }
122   return 0;
123 }
124
125 static 
126 BOOL
127 NewIOWorkerThread(IOManagerState* iom)
128 {
129   return ( 0 != _beginthreadex(NULL,
130                                0,
131                                IOWorkerProc,
132                                (LPVOID)iom,
133                                0,
134                                NULL) );
135   //CreateThread( NULL, 0, IOWorkerProc, (LPVOID)iom, 0, NULL));
136 }
137
138 BOOL
139 StartIOManager(void)
140 {
141   HANDLE hExit;
142   WorkQueue* wq;
143
144   wq = NewWorkQueue();
145   if ( !wq ) return FALSE;  
146   
147   ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
148   
149   if (!ioMan) {
150     FreeWorkQueue(wq);
151     return FALSE;
152   }
153
154   /* A manual-reset event */
155   hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
156   if ( !hExit ) {
157     FreeWorkQueue(wq);
158     free(ioMan);
159     return FALSE;
160   }
161   
162   ioMan->hExitEvent = hExit;
163   InitializeCriticalSection(&ioMan->manLock);
164   ioMan->workQueue   = wq;
165   ioMan->numWorkers  = 0;
166   ioMan->workersIdle = 0;
167   ioMan->requestID   = 1;
168  
169   return TRUE;
170 }
171
172 /*
173  * Function: AddIORequest()
174  *
175  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
176  * request to work queue, returning without blocking.
177  */
178 int
179 AddIORequest ( int   fd,
180                BOOL  forWriting,
181                BOOL  isSocket,
182                int   len,
183                char* buffer,
184                void* data,
185                CompletionProc onCompletion)
186 {
187   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
188   if (!ioMan || !wItem) return 0;
189   
190   /* Fill in the blanks */
191   wItem->fd           = fd;
192   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
193                         ( forWriting ? WORKER_WRITE : WORKER_READ );
194   wItem->len          = len;
195   wItem->buf          = buffer;
196   wItem->param        = data;
197   wItem->onCompletion = onCompletion;
198   wItem->requestID    = ioMan->requestID++;
199   
200   EnterCriticalSection(&ioMan->manLock);
201   /* If there are no worker threads available, create one.
202    *
203    * If this turns out to be too aggressive a policy, refine.
204    */
205 #if 0
206   fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
207 #endif
208   if ( ioMan->workersIdle == 0 ) {
209     ioMan->numWorkers++;
210     NewIOWorkerThread(ioMan);
211   }
212   LeaveCriticalSection(&ioMan->manLock);
213   
214   if (SubmitWork(ioMan->workQueue,wItem)) {
215     return wItem->requestID;
216   } else {
217     return 0;
218   }
219 }              
220
221 /*
222  * Function: AddDelayRequest()
223  *
224  */
225 BOOL
226 AddDelayRequest ( unsigned int   msecs,
227                   void*          data,
228                   CompletionProc onCompletion)
229 {
230   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
231   if (!ioMan || !wItem) return FALSE;
232   
233   /* Fill in the blanks */
234   wItem->fd           = 0;
235   wItem->workKind     = WORKER_DELAY;
236   wItem->len          = msecs;
237   wItem->buf          = 0;
238   wItem->param        = data;
239   wItem->onCompletion = onCompletion;
240   wItem->requestID    = ioMan->requestID++;
241
242   EnterCriticalSection(&ioMan->manLock);
243   if ( ioMan->workersIdle == 0 ) {
244     ioMan->numWorkers++;
245     NewIOWorkerThread(ioMan);
246   }
247   LeaveCriticalSection(&ioMan->manLock);
248   
249   if (SubmitWork(ioMan->workQueue,wItem)) {
250     return wItem->requestID;
251   } else {
252     return 0;
253   }
254 }
255
256 void ShutdownIOManager()
257 {
258   SetEvent(ioMan->hExitEvent);
259   free(ioMan);
260   ioMan = NULL;
261 }