[project @ 2003-02-22 04:51:50 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / IOManager.c
1 /* IOManager.c
2  *
3  * Non-blocking / asynchronous I/O for Win32.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "IOManager.h"
8 #include "WorkQueue.h"
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <io.h>
12 #include <winsock.h>
13 #include <process.h>
14
15 /*
16  * Internal state maintained by the IO manager.
17  */
18 typedef struct IOManagerState {
19   CritSection      manLock;
20   WorkQueue*       workQueue;
21   int              numWorkers;
22   int              workersIdle;
23   HANDLE           hExitEvent;
24   unsigned int     requestID;
25 } IOManagerState;
26
27 /* ToDo: wrap up this state via a IOManager handle instead? */
28 static IOManagerState* ioMan;
29
30 /*
31  * The routine executed by each worker thread.
32  */
33 static
34 unsigned
35 WINAPI
36 IOWorkerProc(PVOID param)
37 {
38   HANDLE  hWaits[2];
39   DWORD   rc;
40   IOManagerState* iom = (IOManagerState*)param;
41   WorkQueue* pq = iom->workQueue;
42   WorkItem*  work;
43   int        len;
44   DWORD      errCode;
45
46   hWaits[0] = (HANDLE)iom->hExitEvent;
47   hWaits[1] = GetWorkQueueHandle(pq);
48   
49   while (1) {
50     /* The error code is communicated back on completion of request; reset. */
51     errCode = 0;
52
53     EnterCriticalSection(&iom->manLock);
54     iom->workersIdle++;
55     LeaveCriticalSection(&iom->manLock);
56
57     rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
58
59     EnterCriticalSection(&iom->manLock);
60     iom->workersIdle--;
61     LeaveCriticalSection(&iom->manLock);
62     
63     if ( WAIT_OBJECT_0 == rc ) {
64       /* shutdown */
65 #if 0
66       fprintf(stderr, "shutting down...\n"); fflush(stderr);
67 #endif
68       return 0;
69     } else if ( (WAIT_OBJECT_0 + 1) == rc ) {
70       /* work item available, fetch it. */
71 #if 0
72       fprintf(stderr, "work available...\n"); fflush(stderr);
73 #endif
74       if (FetchWork(pq,(void**)&work)) {
75         if ( work->workKind & WORKER_READ ) {
76           if ( work->workKind & WORKER_FOR_SOCKET ) {
77             len = recv(work->fd, work->buf, work->len, 0);
78             if (len == SOCKET_ERROR) {
79               errCode = WSAGetLastError();
80             }
81           } else {
82             len = read(work->fd, work->buf, work->len);
83             if (len == -1) { errCode = errno; }
84           }
85         } else if ( work->workKind & WORKER_WRITE ) {
86           if ( work->workKind & WORKER_FOR_SOCKET ) {
87             len = send(work->fd, work->buf, work->len, 0);
88             if (len == SOCKET_ERROR) {
89               errCode = WSAGetLastError();
90             }
91           } else {
92             len = write(work->fd,work->buf, work->len);
93             if (len == -1) { errCode = errno; }
94           }
95         } else if ( work->workKind & WORKER_DELAY ) {
96           /* very approximate implementation of threadDelay */
97           Sleep(work->len);
98           len = work->len;
99           errCode = 0;
100         } else {
101           fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
102           fflush(stderr);
103           continue;
104         }
105         work->onCompletion(work->requestID,
106                            work->param,
107                            work->fd,
108                            len,
109                            work->buf,
110                            errCode);
111         /* Free the WorkItem */
112         free(work);
113       } else {
114           fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
115           return 1;
116       }
117     } else {
118           fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
119           return 1;
120     }
121   }
122   return 0;
123 }
124
125 static 
126 BOOL
127 NewIOWorkerThread(IOManagerState* iom)
128 {
129   return ( 0 != _beginthreadex(NULL,
130                                0,
131                                IOWorkerProc,
132                                (LPVOID)iom,
133                                0,
134                                NULL) );
135 }
136
137 BOOL
138 StartIOManager(void)
139 {
140   HANDLE hExit;
141   WorkQueue* wq;
142
143   wq = NewWorkQueue();
144   if ( !wq ) return FALSE;  
145   
146   ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
147   
148   if (!ioMan) {
149     FreeWorkQueue(wq);
150     return FALSE;
151   }
152
153   /* A manual-reset event */
154   hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
155   if ( !hExit ) {
156     FreeWorkQueue(wq);
157     free(ioMan);
158     return FALSE;
159   }
160   
161   ioMan->hExitEvent = hExit;
162   InitializeCriticalSection(&ioMan->manLock);
163   ioMan->workQueue   = wq;
164   ioMan->numWorkers  = 0;
165   ioMan->workersIdle = 0;
166   ioMan->requestID   = 1;
167  
168   return TRUE;
169 }
170
171 /*
172  * Function: AddIORequest()
173  *
174  * Conduit to underlying WorkQueue's SubmitWork(); adds IO
175  * request to work queue, returning without blocking.
176  */
177 int
178 AddIORequest ( int   fd,
179                BOOL  forWriting,
180                BOOL  isSocket,
181                int   len,
182                char* buffer,
183                void* data,
184                CompletionProc onCompletion)
185 {
186   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
187   if (!ioMan || !wItem) return 0;
188   
189   /* Fill in the blanks */
190   wItem->fd           = fd;
191   wItem->workKind     = ( isSocket   ? WORKER_FOR_SOCKET : 0 ) | 
192                         ( forWriting ? WORKER_WRITE : WORKER_READ );
193   wItem->len          = len;
194   wItem->buf          = buffer;
195   wItem->param        = data;
196   wItem->onCompletion = onCompletion;
197   wItem->requestID    = ioMan->requestID++;
198   
199   EnterCriticalSection(&ioMan->manLock);
200   /* If there are no worker threads available, create one.
201    *
202    * If this turns out to be too aggressive a policy, refine.
203    */
204 #if 0
205   fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
206 #endif
207   if ( ioMan->workersIdle == 0 ) {
208     ioMan->numWorkers++;
209     NewIOWorkerThread(ioMan);
210   }
211   LeaveCriticalSection(&ioMan->manLock);
212   
213   if (SubmitWork(ioMan->workQueue,wItem)) {
214     return wItem->requestID;
215   } else {
216     return 0;
217   }
218 }              
219
220 /*
221  * Function: AddDelayRequest()
222  *
223  */
224 BOOL
225 AddDelayRequest ( unsigned int   msecs,
226                   void*          data,
227                   CompletionProc onCompletion)
228 {
229   WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
230   if (!ioMan || !wItem) return FALSE;
231   
232   /* Fill in the blanks */
233   wItem->fd           = 0;
234   wItem->workKind     = WORKER_DELAY;
235   wItem->len          = msecs;
236   wItem->buf          = 0;
237   wItem->param        = data;
238   wItem->onCompletion = onCompletion;
239   wItem->requestID    = ioMan->requestID++;
240
241   EnterCriticalSection(&ioMan->manLock);
242   if ( ioMan->workersIdle == 0 ) {
243     ioMan->numWorkers++;
244     NewIOWorkerThread(ioMan);
245   }
246   LeaveCriticalSection(&ioMan->manLock);
247   
248   if (SubmitWork(ioMan->workQueue,wItem)) {
249     return wItem->requestID;
250   } else {
251     return 0;
252   }
253 }
254
255 void ShutdownIOManager()
256 {
257   SetEvent(ioMan->hExitEvent);
258   free(ioMan);
259   ioMan = NULL;
260 }