[project @ 2003-12-18 13:27:27 by simonmar]
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "RtsUtils.h"
9 #include <windows.h>
10 #include <stdio.h>
11 #include "Schedule.h"
12 #include "Capability.h"
13 #include "win32/AsyncIO.h"
14 #include "win32/IOManager.h"
15
16 /*
17  * Overview:
18  *
19  * Haskell code issue asynchronous I/O requests via the 
20  * async{Read,Write,DoOp}# primops. These cause addIORequest()
21  * to be invoked, which forwards the request to the underlying
22  * asynchronous I/O subsystem. Each request is tagged with a unique
23  * ID.
24  *
25  * addIORequest() returns this ID, so that when the blocked CH
26  * thread is added onto blocked_queue, its TSO is annotated with
27  * it. Upon completion of an I/O request, the async I/O handling
28  * code makes a back-call to signal its completion; the local
29  * onIOComplete() routine. It adds the IO request ID (along with
30  * its result data) to a queue of completed requests before returning. 
31  *
32  * The queue of completed IO request is read by the thread operating
33  * the RTS scheduler. It de-queues the CH threads corresponding
34  * to the request IDs, making them runnable again.
35  *
36  */
37
38 typedef struct CompletedReq {
39     unsigned int   reqID;
40     int            len;
41     int            errCode;
42 } CompletedReq;
43
44 #define MAX_REQUESTS 200
45
46 static CRITICAL_SECTION queue_lock;
47 static HANDLE           completed_req_event;
48 static HANDLE           abandon_req_wait;
49 static HANDLE           wait_handles[2];
50 static CompletedReq     completedTable[MAX_REQUESTS];
51 static int              completed_hw;
52 static int              issued_reqs;
53
54 static void
55 onIOComplete(unsigned int reqID,
56              int   fd STG_UNUSED,
57              int   len,
58              void* buf STG_UNUSED,
59              int   errCode)
60 {
61     /* Deposit result of request in queue/table */
62     EnterCriticalSection(&queue_lock);
63     if (completed_hw == MAX_REQUESTS) {
64         /* Not likely */
65         fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
66         fflush(stderr);
67     } else {
68 #if 0
69         fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
70                 reqID, len, errCode, issued_reqs, completed_hw); 
71         fflush(stderr);
72 #endif
73         completedTable[completed_hw].reqID   = reqID;
74         completedTable[completed_hw].len     = len;
75         completedTable[completed_hw].errCode = errCode;
76         completed_hw++;
77         issued_reqs--;
78         if (completed_hw == 1) {
79             /* The event is used to wake up the scheduler thread should it
80              * be blocked waiting for requests to complete. It reset once
81              * that thread has cleared out the request queue/table.
82              */
83             SetEvent(completed_req_event);
84         }
85     }
86     LeaveCriticalSection(&queue_lock);
87 }
88
89 unsigned int
90 addIORequest(int   fd,
91              int   forWriting,
92              int   isSock,
93              int   len,
94              char* buf)
95 {
96     EnterCriticalSection(&queue_lock);
97     issued_reqs++;
98     LeaveCriticalSection(&queue_lock);
99 #if 0
100     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
101 #endif
102     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
103 }
104
105 unsigned int
106 addDelayRequest(int msecs)
107 {
108     EnterCriticalSection(&queue_lock);
109     issued_reqs++;
110     LeaveCriticalSection(&queue_lock);
111 #if 0
112     fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
113 #endif
114     return AddDelayRequest(msecs,onIOComplete);
115 }
116
117 unsigned int
118 addDoProcRequest(void* proc, void* param)
119 {
120     EnterCriticalSection(&queue_lock);
121     issued_reqs++;
122     LeaveCriticalSection(&queue_lock);
123 #if 0
124     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
125 #endif
126     return AddProcRequest(proc,param,onIOComplete);
127 }
128
129
130 int
131 startupAsyncIO()
132 {
133     if (!StartIOManager()) {
134         return 0;
135     }
136     InitializeCriticalSection(&queue_lock);
137     /* Create a pair of events:
138      *
139      *    - completed_req_event  -- signals the deposit of request result; manual reset.
140      *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
141      *                              thread to abandon wait for IO request completion.
142      *                              Auto reset.
143      */
144     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
145     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
146     wait_handles[0] = completed_req_event;
147     wait_handles[1] = abandon_req_wait;
148     completed_hw = 0;
149     return ( completed_req_event != INVALID_HANDLE_VALUE &&
150              abandon_req_wait    != INVALID_HANDLE_VALUE );
151 }
152
153 void
154 shutdownAsyncIO()
155 {
156     CloseHandle(completed_req_event);
157     ShutdownIOManager();
158 }
159
160 /*
161  * Function: awaitRequests(wait)
162  *
163  * Check for the completion of external IO work requests. Worker
164  * threads signal completion of IO requests by depositing them
165  * in a table (completedTable). awaitRequests() matches up 
166  * requests in that table with threads on the blocked_queue, 
167  * making the threads whose IO requests have completed runnable
168  * again.
169  * 
170  * awaitRequests() is called by the scheduler periodically _or_ if
171  * it is out of work, and need to wait for the completion of IO
172  * requests to make further progress. In the latter scenario, 
173  * awaitRequests() will simply block waiting for worker threads 
174  * to complete if the 'completedTable' is empty.
175  */
176 int
177 awaitRequests(rtsBool wait)
178 {
179 start:
180 #if 0
181     fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
182     fflush(stderr);
183 #endif
184     EnterCriticalSection(&queue_lock);
185     /* Nothing immediately available & we won't wait */
186     if ((!wait && completed_hw == 0)
187 #if 0
188         // If we just return when wait==rtsFalse, we'll go into a busy
189         // wait loop, so I disabled this condition --SDM 18/12/2003
190         (issued_reqs == 0 && completed_hw == 0)
191 #endif
192         ) {
193         LeaveCriticalSection(&queue_lock);
194         return 0;
195     }
196     if (completed_hw == 0) {
197         /* empty table, drop lock and wait */
198         LeaveCriticalSection(&queue_lock);
199         if ( wait && !interrupted ) {
200             DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
201             switch (dwRes) {
202             case WAIT_OBJECT_0:
203                 break;
204             case WAIT_OBJECT_0 + 1:
205             case WAIT_TIMEOUT:
206                 return 0;
207             default:
208                 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
209                 return 0;
210             }
211         } else {
212             return 0;
213         }
214         goto start;
215     } else {
216         int i;
217         StgTSO *tso, *prev;
218         
219         for (i=0; i < completed_hw; i++) {
220             /* For each of the completed requests, match up their Ids
221              * with those of the threads on the blocked_queue. If the
222              * thread that made the IO request has been subsequently
223              * killed (and removed from blocked_queue), no match will
224              * be found for that request Id. 
225              *
226              * i.e., killing a Haskell thread doesn't attempt to cancel
227              * the IO request it is blocked on.
228              *
229              */
230             unsigned int rID = completedTable[i].reqID;
231             prev = NULL;
232             
233             prev = NULL;
234             for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
235         
236                 switch(tso->why_blocked) {
237                 case BlockedOnRead:
238                 case BlockedOnWrite:
239                 case BlockedOnDoProc:
240                     if (tso->block_info.async_result->reqID == rID) {
241                         /* Found the thread blocked waiting on request; stodgily fill 
242                          * in its result block. 
243                          */
244                         tso->block_info.async_result->len = completedTable[i].len;
245                         tso->block_info.async_result->errCode = completedTable[i].errCode;
246                         
247                         /* Drop the matched TSO from blocked_queue */
248                         if (prev) {
249                             prev->link = tso->link;
250                         } else {
251                             blocked_queue_hd = tso->link;
252                         }
253                         if (blocked_queue_tl == tso) {
254                             blocked_queue_tl = prev;
255                         }
256                     
257                         /* Terminates the run queue + this inner for-loop. */
258                         tso->link = END_TSO_QUEUE;
259                         tso->why_blocked = NotBlocked;
260                         PUSH_ON_RUN_QUEUE(tso);
261                         break;
262                     }
263                     break;
264                 default:
265                     if (tso->why_blocked != NotBlocked) {
266                         barf("awaitRequests: odd thread state");
267                     }
268                     break;
269                 }
270             }
271         }
272         completed_hw = 0;
273         ResetEvent(completed_req_event);
274         LeaveCriticalSection(&queue_lock);
275         return 1;
276     }
277 }
278
279 /*
280  * Function: abandonRequestWait()
281  *
282  * Wake up a thread that's blocked waiting for new IO requests
283  * to complete (via awaitRequests().)
284  */
285 void
286 abandonRequestWait( void )
287 {
288     /* the event is auto-reset, but in case there's no thread
289      * already waiting on the event, we want to return it to
290      * a non-signalled state.
291      *
292      * Careful!  There is no synchronisation between
293      * abandonRequestWait and awaitRequest, which means that
294      * abandonRequestWait might be called just before a thread
295      * goes into a wait, and we miss the abandon signal.  So we
296      * must SetEvent() here rather than PulseEvent() to ensure
297      * that the event isn't lost.  We can re-optimise by resetting
298      * the event somewhere safe if we know the event has been
299      * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
300      */
301     SetEvent(abandon_req_wait);
302 }
303
304 void
305 resetAbandonRequestWait( void )
306 {
307     ResetEvent(abandon_req_wait);
308 }
309