91495d5e8c2139274691d19bdc046f4671b1671e
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "RtsUtils.h"
9 #include <windows.h>
10 #include <stdio.h>
11 #include "Schedule.h"
12 #include "RtsFlags.h"
13 #include "Capability.h"
14 #include "win32/AsyncIO.h"
15 #include "win32/IOManager.h"
16
17 /*
18  * Overview:
19  *
20  * Haskell code issue asynchronous I/O requests via the 
21  * async{Read,Write,DoOp}# primops. These cause addIORequest()
22  * to be invoked, which forwards the request to the underlying
23  * asynchronous I/O subsystem. Each request is tagged with a unique
24  * ID.
25  *
26  * addIORequest() returns this ID, so that when the blocked CH
27  * thread is added onto blocked_queue, its TSO is annotated with
28  * it. Upon completion of an I/O request, the async I/O handling
29  * code makes a back-call to signal its completion; the local
30  * onIOComplete() routine. It adds the IO request ID (along with
31  * its result data) to a queue of completed requests before returning. 
32  *
33  * The queue of completed IO request is read by the thread operating
34  * the RTS scheduler. It de-queues the CH threads corresponding
35  * to the request IDs, making them runnable again.
36  *
37  */
38
39 typedef struct CompletedReq {
40     unsigned int   reqID;
41     int            len;
42     int            errCode;
43 } CompletedReq;
44
45 #define MAX_REQUESTS 200
46
47 static CRITICAL_SECTION queue_lock;
48 static HANDLE           completed_req_event;
49 static HANDLE           abandon_req_wait;
50 static HANDLE           wait_handles[2];
51 static CompletedReq     completedTable[MAX_REQUESTS];
52 static int              completed_hw;
53 static int              issued_reqs;
54
55 static void
56 onIOComplete(unsigned int reqID,
57              int   fd STG_UNUSED,
58              int   len,
59              void* buf STG_UNUSED,
60              int   errCode)
61 {
62     /* Deposit result of request in queue/table */
63     EnterCriticalSection(&queue_lock);
64     if (completed_hw == MAX_REQUESTS) {
65         /* Not likely */
66         fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
67         fflush(stderr);
68     } else {
69 #if 0
70         fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
71                 reqID, len, errCode, issued_reqs, completed_hw); 
72         fflush(stderr);
73 #endif
74         completedTable[completed_hw].reqID   = reqID;
75         completedTable[completed_hw].len     = len;
76         completedTable[completed_hw].errCode = errCode;
77         completed_hw++;
78         issued_reqs--;
79         if (completed_hw == 1) {
80             /* The event is used to wake up the scheduler thread should it
81              * be blocked waiting for requests to complete. The event resets once
82              * that thread has cleared out the request queue/table.
83              */
84             SetEvent(completed_req_event);
85         }
86     }
87     LeaveCriticalSection(&queue_lock);
88 }
89
90 unsigned int
91 addIORequest(int   fd,
92              int   forWriting,
93              int   isSock,
94              int   len,
95              char* buf)
96 {
97     EnterCriticalSection(&queue_lock);
98     issued_reqs++;
99     LeaveCriticalSection(&queue_lock);
100 #if 0
101     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
102 #endif
103     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
104 }
105
106 unsigned int
107 addDelayRequest(int msecs)
108 {
109     EnterCriticalSection(&queue_lock);
110     issued_reqs++;
111     LeaveCriticalSection(&queue_lock);
112 #if 0
113     fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
114 #endif
115     return AddDelayRequest(msecs,onIOComplete);
116 }
117
118 unsigned int
119 addDoProcRequest(void* proc, void* param)
120 {
121     EnterCriticalSection(&queue_lock);
122     issued_reqs++;
123     LeaveCriticalSection(&queue_lock);
124 #if 0
125     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
126 #endif
127     return AddProcRequest(proc,param,onIOComplete);
128 }
129
130
131 int
132 startupAsyncIO()
133 {
134     if (!StartIOManager()) {
135         return 0;
136     }
137     InitializeCriticalSection(&queue_lock);
138     /* Create a pair of events:
139      *
140      *    - completed_req_event  -- signals the deposit of request result; manual reset.
141      *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
142      *                              thread to abandon wait for IO request completion.
143      *                              Auto reset.
144      */
145     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
146     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
147     wait_handles[0] = completed_req_event;
148     wait_handles[1] = abandon_req_wait;
149     completed_hw = 0;
150     return ( completed_req_event != INVALID_HANDLE_VALUE &&
151              abandon_req_wait    != INVALID_HANDLE_VALUE );
152 }
153
154 void
155 shutdownAsyncIO()
156 {
157     CloseHandle(completed_req_event);
158     ShutdownIOManager();
159 }
160
161 /*
162  * Function: awaitRequests(wait)
163  *
164  * Check for the completion of external IO work requests. Worker
165  * threads signal completion of IO requests by depositing them
166  * in a table (completedTable). awaitRequests() matches up 
167  * requests in that table with threads on the blocked_queue, 
168  * making the threads whose IO requests have completed runnable
169  * again.
170  * 
171  * awaitRequests() is called by the scheduler periodically _or_ if
172  * it is out of work, and need to wait for the completion of IO
173  * requests to make further progress. In the latter scenario, 
174  * awaitRequests() will simply block waiting for worker threads 
175  * to complete if the 'completedTable' is empty.
176  */
177 int
178 awaitRequests(rtsBool wait)
179 {
180 start:
181 #if 0
182     fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
183     fflush(stderr);
184 #endif
185     EnterCriticalSection(&queue_lock);
186     /* Nothing immediately available & we won't wait */
187     if ((!wait && completed_hw == 0)
188 #if 0
189         // If we just return when wait==rtsFalse, we'll go into a busy
190         // wait loop, so I disabled this condition --SDM 18/12/2003
191         (issued_reqs == 0 && completed_hw == 0)
192 #endif
193         ) {
194         LeaveCriticalSection(&queue_lock);
195         return 0;
196     }
197     if (completed_hw == 0) {
198         /* empty table, drop lock and wait */
199         LeaveCriticalSection(&queue_lock);
200         if ( wait && !interrupted ) {
201             DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
202             switch (dwRes) {
203             case WAIT_OBJECT_0:
204                 /* a request was completed */
205                 break;
206             case WAIT_OBJECT_0 + 1:
207             case WAIT_TIMEOUT:
208                 /* timeout (unlikely) or told to abandon waiting */
209                 return 0;
210             case WAIT_FAILED: {
211                 DWORD dw = GetLastError();
212                 fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
213                 return 0;
214             }
215             default:
216                 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
217                 return 0;
218             }
219         } else {
220             return 0;
221         }
222         goto start;
223     } else {
224         int i;
225         StgTSO *tso, *prev;
226         
227         for (i=0; i < completed_hw; i++) {
228             /* For each of the completed requests, match up their Ids
229              * with those of the threads on the blocked_queue. If the
230              * thread that made the IO request has been subsequently
231              * killed (and removed from blocked_queue), no match will
232              * be found for that request Id. 
233              *
234              * i.e., killing a Haskell thread doesn't attempt to cancel
235              * the IO request it is blocked on.
236              *
237              */
238             unsigned int rID = completedTable[i].reqID;
239             
240             prev = NULL;
241             for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
242         
243                 switch(tso->why_blocked) {
244                 case BlockedOnRead:
245                 case BlockedOnWrite:
246                 case BlockedOnDoProc:
247                     if (tso->block_info.async_result->reqID == rID) {
248                         /* Found the thread blocked waiting on request; stodgily fill 
249                          * in its result block. 
250                          */
251                         tso->block_info.async_result->len = completedTable[i].len;
252                         tso->block_info.async_result->errCode = completedTable[i].errCode;
253                         
254                         /* Drop the matched TSO from blocked_queue */
255                         if (prev) {
256                             prev->link = tso->link;
257                         } else {
258                             blocked_queue_hd = tso->link;
259                         }
260                         if (blocked_queue_tl == tso) {
261                             blocked_queue_tl = prev;
262                         }
263                     
264                         /* Terminates the run queue + this inner for-loop. */
265                         tso->link = END_TSO_QUEUE;
266                         tso->why_blocked = NotBlocked;
267                         PUSH_ON_RUN_QUEUE(tso);
268                         break;
269                     }
270                     break;
271                 default:
272                     if (tso->why_blocked != NotBlocked) {
273                         barf("awaitRequests: odd thread state");
274                     }
275                     break;
276                 }
277             }
278         }
279         completed_hw = 0;
280         ResetEvent(completed_req_event);
281         LeaveCriticalSection(&queue_lock);
282         return 1;
283     }
284 }
285
286 /*
287  * Function: abandonRequestWait()
288  *
289  * Wake up a thread that's blocked waiting for new IO requests
290  * to complete (via awaitRequests().)
291  */
292 void
293 abandonRequestWait( void )
294 {
295     /* the event is auto-reset, but in case there's no thread
296      * already waiting on the event, we want to return it to
297      * a non-signalled state.
298      *
299      * Careful!  There is no synchronisation between
300      * abandonRequestWait and awaitRequest, which means that
301      * abandonRequestWait might be called just before a thread
302      * goes into a wait, and we miss the abandon signal.  So we
303      * must SetEvent() here rather than PulseEvent() to ensure
304      * that the event isn't lost.  We can re-optimise by resetting
305      * the event somewhere safe if we know the event has been
306      * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
307      */
308     SetEvent(abandon_req_wait);
309 }
310
311 void
312 resetAbandonRequestWait( void )
313 {
314     ResetEvent(abandon_req_wait);
315 }
316