Massive patch for the first months work adding System FC to GHC #35
[ghc-hetmet.git] / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "RtsUtils.h"
9 #include <windows.h>
10 #include <stdio.h>
11 #include "Schedule.h"
12 #include "RtsFlags.h"
13 #include "Capability.h"
14 #include "win32/AsyncIO.h"
15 #include "win32/IOManager.h"
16
17 /*
18  * Overview:
19  *
20  * Haskell code issue asynchronous I/O requests via the 
21  * async{Read,Write,DoOp}# primops. These cause addIORequest()
22  * to be invoked, which forwards the request to the underlying
23  * asynchronous I/O subsystem. Each request is tagged with a unique
24  * ID.
25  *
26  * addIORequest() returns this ID, so that when the blocked CH
27  * thread is added onto blocked_queue, its TSO is annotated with
28  * it. Upon completion of an I/O request, the async I/O handling
29  * code makes a back-call to signal its completion; the local
30  * onIOComplete() routine. It adds the IO request ID (along with
31  * its result data) to a queue of completed requests before returning. 
32  *
33  * The queue of completed IO request is read by the thread operating
34  * the RTS scheduler. It de-queues the CH threads corresponding
35  * to the request IDs, making them runnable again.
36  *
37  */
38
39 typedef struct CompletedReq {
40     unsigned int   reqID;
41     int            len;
42     int            errCode;
43 } CompletedReq;
44
45 #define MAX_REQUESTS 200
46
47 static CRITICAL_SECTION queue_lock;
48 static HANDLE           completed_req_event = INVALID_HANDLE_VALUE;
49 static HANDLE           abandon_req_wait = INVALID_HANDLE_VALUE;
50 static HANDLE           wait_handles[2];
51 static CompletedReq     completedTable[MAX_REQUESTS];
52 static int              completed_hw;
53 static HANDLE           completed_table_sema;
54 static int              issued_reqs;
55
56 static void
57 onIOComplete(unsigned int reqID,
58              int   fd STG_UNUSED,
59              int   len,
60              void* buf STG_UNUSED,
61              int   errCode)
62 {
63     DWORD dwRes;
64     /* Deposit result of request in queue/table..when there's room. */
65     dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
66     switch (dwRes) {
67     case WAIT_OBJECT_0:
68         break;
69     default:
70         /* Not likely */
71         fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
72         fflush(stderr);
73         return;
74     }
75     EnterCriticalSection(&queue_lock);
76     if (completed_hw == MAX_REQUESTS) {
77         /* Shouldn't happen */
78         fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
79         fflush(stderr);
80     } else {
81 #if 0
82         fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
83                 reqID, len, errCode, issued_reqs, completed_hw); 
84         fflush(stderr);
85 #endif
86         completedTable[completed_hw].reqID   = reqID;
87         completedTable[completed_hw].len     = len;
88         completedTable[completed_hw].errCode = errCode;
89         completed_hw++;
90         issued_reqs--;
91         if (completed_hw == 1) {
92             /* The event is used to wake up the scheduler thread should it
93              * be blocked waiting for requests to complete. The event resets once
94              * that thread has cleared out the request queue/table.
95              */
96             SetEvent(completed_req_event);
97         }
98     }
99     LeaveCriticalSection(&queue_lock);
100 }
101
102 unsigned int
103 addIORequest(int   fd,
104              int   forWriting,
105              int   isSock,
106              int   len,
107              char* buf)
108 {
109     EnterCriticalSection(&queue_lock);
110     issued_reqs++;
111     LeaveCriticalSection(&queue_lock);
112 #if 0
113     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
114 #endif
115     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
116 }
117
118 unsigned int
119 addDelayRequest(int msecs)
120 {
121     EnterCriticalSection(&queue_lock);
122     issued_reqs++;
123     LeaveCriticalSection(&queue_lock);
124 #if 0
125     fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
126 #endif
127     return AddDelayRequest(msecs,onIOComplete);
128 }
129
130 unsigned int
131 addDoProcRequest(void* proc, void* param)
132 {
133     EnterCriticalSection(&queue_lock);
134     issued_reqs++;
135     LeaveCriticalSection(&queue_lock);
136 #if 0
137     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
138 #endif
139     return AddProcRequest(proc,param,onIOComplete);
140 }
141
142
143 int
144 startupAsyncIO()
145 {
146     if (!StartIOManager()) {
147         return 0;
148     }
149     InitializeCriticalSection(&queue_lock);
150     /* Create a pair of events:
151      *
152      *    - completed_req_event  -- signals the deposit of request result; manual reset.
153      *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
154      *                              thread to abandon wait for IO request completion.
155      *                              Auto reset.
156      */
157     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
158     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
159     wait_handles[0] = completed_req_event;
160     wait_handles[1] = abandon_req_wait;
161     completed_hw = 0;
162     if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
163         DWORD rc = GetLastError();
164         fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", (int)rc);
165         fflush(stderr);
166     }
167
168     return ( completed_req_event  != INVALID_HANDLE_VALUE &&
169              abandon_req_wait     != INVALID_HANDLE_VALUE &&
170              completed_table_sema != NULL );
171 }
172
173 void
174 shutdownAsyncIO()
175 {
176     ShutdownIOManager();
177     if (completed_req_event != INVALID_HANDLE_VALUE) {
178         CloseHandle(completed_req_event);
179         completed_req_event = INVALID_HANDLE_VALUE;
180     }
181     if (abandon_req_wait != INVALID_HANDLE_VALUE) {
182         CloseHandle(abandon_req_wait);
183         abandon_req_wait = INVALID_HANDLE_VALUE;
184     }
185     if (completed_table_sema != NULL) {
186         CloseHandle(completed_table_sema);
187         completed_table_sema = NULL;
188     }
189 }
190
191 /*
192  * Function: awaitRequests(wait)
193  *
194  * Check for the completion of external IO work requests. Worker
195  * threads signal completion of IO requests by depositing them
196  * in a table (completedTable). awaitRequests() matches up 
197  * requests in that table with threads on the blocked_queue, 
198  * making the threads whose IO requests have completed runnable
199  * again.
200  * 
201  * awaitRequests() is called by the scheduler periodically _or_ if
202  * it is out of work, and need to wait for the completion of IO
203  * requests to make further progress. In the latter scenario, 
204  * awaitRequests() will simply block waiting for worker threads 
205  * to complete if the 'completedTable' is empty.
206  */
207 int
208 awaitRequests(rtsBool wait)
209 {
210 #ifndef THREADED_RTS
211   // none of this is actually used in the threaded RTS
212
213 start:
214 #if 0
215     fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
216     fflush(stderr);
217 #endif
218     EnterCriticalSection(&queue_lock);
219     /* Nothing immediately available & we won't wait */
220     if ((!wait && completed_hw == 0)
221 #if 0
222         // If we just return when wait==rtsFalse, we'll go into a busy
223         // wait loop, so I disabled this condition --SDM 18/12/2003
224         (issued_reqs == 0 && completed_hw == 0)
225 #endif
226         ) {
227         LeaveCriticalSection(&queue_lock);
228         return 0;
229     }
230     if (completed_hw == 0) {
231         /* empty table, drop lock and wait */
232         LeaveCriticalSection(&queue_lock);
233         if ( wait && sched_state == SCHED_RUNNING ) {
234             DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
235             switch (dwRes) {
236             case WAIT_OBJECT_0:
237                 /* a request was completed */
238                 break;
239             case WAIT_OBJECT_0 + 1:
240             case WAIT_TIMEOUT:
241                 /* timeout (unlikely) or told to abandon waiting */
242                 return 0;
243             case WAIT_FAILED: {
244                 DWORD dw = GetLastError();
245                 fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
246                 return 0;
247             }
248             default:
249                 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
250                 return 0;
251             }
252         } else {
253             return 0;
254         }
255         goto start;
256     } else {
257         int i;
258         StgTSO *tso, *prev;
259         
260         for (i=0; i < completed_hw; i++) {
261             /* For each of the completed requests, match up their Ids
262              * with those of the threads on the blocked_queue. If the
263              * thread that made the IO request has been subsequently
264              * killed (and removed from blocked_queue), no match will
265              * be found for that request Id. 
266              *
267              * i.e., killing a Haskell thread doesn't attempt to cancel
268              * the IO request it is blocked on.
269              *
270              */
271             unsigned int rID = completedTable[i].reqID;
272             
273             prev = NULL;
274             for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
275         
276                 switch(tso->why_blocked) {
277                 case BlockedOnRead:
278                 case BlockedOnWrite:
279                 case BlockedOnDoProc:
280                     if (tso->block_info.async_result->reqID == rID) {
281                         /* Found the thread blocked waiting on request; stodgily fill 
282                          * in its result block. 
283                          */
284                         tso->block_info.async_result->len = completedTable[i].len;
285                         tso->block_info.async_result->errCode = completedTable[i].errCode;
286                         
287                         /* Drop the matched TSO from blocked_queue */
288                         if (prev) {
289                             prev->link = tso->link;
290                         } else {
291                             blocked_queue_hd = tso->link;
292                         }
293                         if (blocked_queue_tl == tso) {
294                             blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
295                         }
296                     
297                         /* Terminates the run queue + this inner for-loop. */
298                         tso->link = END_TSO_QUEUE;
299                         tso->why_blocked = NotBlocked;
300                         pushOnRunQueue(&MainCapability, tso);
301                         break;
302                     }
303                     break;
304                 default:
305                     if (tso->why_blocked != NotBlocked) {
306                         barf("awaitRequests: odd thread state");
307                     }
308                     break;
309                 }
310             }
311             /* Signal that there's completed table slots available */
312             if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
313                 DWORD dw = GetLastError();
314                 fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", (int)dw);
315                 fflush(stderr);
316             }
317         }
318         completed_hw = 0;
319         ResetEvent(completed_req_event);
320         LeaveCriticalSection(&queue_lock);
321         return 1;
322     }
323 #endif /* !THREADED_RTS */
324 }
325
326 /*
327  * Function: abandonRequestWait()
328  *
329  * Wake up a thread that's blocked waiting for new IO requests
330  * to complete (via awaitRequests().)
331  */
332 void
333 abandonRequestWait( void )
334 {
335     /* the event is auto-reset, but in case there's no thread
336      * already waiting on the event, we want to return it to
337      * a non-signalled state.
338      *
339      * Careful!  There is no synchronisation between
340      * abandonRequestWait and awaitRequest, which means that
341      * abandonRequestWait might be called just before a thread
342      * goes into a wait, and we miss the abandon signal.  So we
343      * must SetEvent() here rather than PulseEvent() to ensure
344      * that the event isn't lost.  We can re-optimise by resetting
345      * the event somewhere safe if we know the event has been
346      * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
347      */
348     SetEvent(abandon_req_wait);
349 }
350
351 void
352 resetAbandonRequestWait( void )
353 {
354     ResetEvent(abandon_req_wait);
355 }
356