Windows build fixes
[ghc-hetmet.git] / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "RtsUtils.h"
12 #include <windows.h>
13 #include <stdio.h>
14 #include "Schedule.h"
15 #include "Capability.h"
16 #include "win32/AsyncIO.h"
17 #include "win32/IOManager.h"
18
19 /*
20  * Overview:
21  *
22  * Haskell code issue asynchronous I/O requests via the 
23  * async{Read,Write,DoOp}# primops. These cause addIORequest()
24  * to be invoked, which forwards the request to the underlying
25  * asynchronous I/O subsystem. Each request is tagged with a unique
26  * ID.
27  *
28  * addIORequest() returns this ID, so that when the blocked CH
29  * thread is added onto blocked_queue, its TSO is annotated with
30  * it. Upon completion of an I/O request, the async I/O handling
31  * code makes a back-call to signal its completion; the local
32  * onIOComplete() routine. It adds the IO request ID (along with
33  * its result data) to a queue of completed requests before returning. 
34  *
35  * The queue of completed IO request is read by the thread operating
36  * the RTS scheduler. It de-queues the CH threads corresponding
37  * to the request IDs, making them runnable again.
38  *
39  */
40
41 typedef struct CompletedReq {
42     unsigned int   reqID;
43     int            len;
44     int            errCode;
45 } CompletedReq;
46
47 #define MAX_REQUESTS 200
48
49 static CRITICAL_SECTION queue_lock;
50 static HANDLE           completed_req_event = INVALID_HANDLE_VALUE;
51 static HANDLE           abandon_req_wait = INVALID_HANDLE_VALUE;
52 static HANDLE           wait_handles[2];
53 static CompletedReq     completedTable[MAX_REQUESTS];
54 static int              completed_hw;
55 static HANDLE           completed_table_sema;
56 static int              issued_reqs;
57
58 static void
59 onIOComplete(unsigned int reqID,
60              int   fd STG_UNUSED,
61              int   len,
62              void* buf STG_UNUSED,
63              int   errCode)
64 {
65     DWORD dwRes;
66     /* Deposit result of request in queue/table..when there's room. */
67     dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
68     switch (dwRes) {
69     case WAIT_OBJECT_0:
70         break;
71     default:
72         /* Not likely */
73         fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
74         fflush(stderr);
75         return;
76     }
77     EnterCriticalSection(&queue_lock);
78     if (completed_hw == MAX_REQUESTS) {
79         /* Shouldn't happen */
80         fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
81         fflush(stderr);
82     } else {
83 #if 0
84         fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
85                 reqID, len, errCode, issued_reqs, completed_hw); 
86         fflush(stderr);
87 #endif
88         completedTable[completed_hw].reqID   = reqID;
89         completedTable[completed_hw].len     = len;
90         completedTable[completed_hw].errCode = errCode;
91         completed_hw++;
92         issued_reqs--;
93         if (completed_hw == 1) {
94             /* The event is used to wake up the scheduler thread should it
95              * be blocked waiting for requests to complete. The event resets once
96              * that thread has cleared out the request queue/table.
97              */
98             SetEvent(completed_req_event);
99         }
100     }
101     LeaveCriticalSection(&queue_lock);
102 }
103
104 unsigned int
105 addIORequest(int   fd,
106              int   forWriting,
107              int   isSock,
108              int   len,
109              char* buf)
110 {
111     EnterCriticalSection(&queue_lock);
112     issued_reqs++;
113     LeaveCriticalSection(&queue_lock);
114 #if 0
115     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
116 #endif
117     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
118 }
119
120 unsigned int
121 addDelayRequest(int msecs)
122 {
123     EnterCriticalSection(&queue_lock);
124     issued_reqs++;
125     LeaveCriticalSection(&queue_lock);
126 #if 0
127     fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
128 #endif
129     return AddDelayRequest(msecs,onIOComplete);
130 }
131
132 unsigned int
133 addDoProcRequest(void* proc, void* param)
134 {
135     EnterCriticalSection(&queue_lock);
136     issued_reqs++;
137     LeaveCriticalSection(&queue_lock);
138 #if 0
139     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
140 #endif
141     return AddProcRequest(proc,param,onIOComplete);
142 }
143
144
145 int
146 startupAsyncIO()
147 {
148     if (!StartIOManager()) {
149         return 0;
150     }
151     InitializeCriticalSection(&queue_lock);
152     /* Create a pair of events:
153      *
154      *    - completed_req_event  -- signals the deposit of request result; manual reset.
155      *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
156      *                              thread to abandon wait for IO request completion.
157      *                              Auto reset.
158      */
159     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
160     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
161     wait_handles[0] = completed_req_event;
162     wait_handles[1] = abandon_req_wait;
163     completed_hw = 0;
164     if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
165         DWORD rc = GetLastError();
166         fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", (int)rc);
167         fflush(stderr);
168     }
169
170     return ( completed_req_event  != INVALID_HANDLE_VALUE &&
171              abandon_req_wait     != INVALID_HANDLE_VALUE &&
172              completed_table_sema != NULL );
173 }
174
175 void
176 shutdownAsyncIO(rtsBool wait_threads)
177 {
178     ShutdownIOManager(wait_threads);
179     if (completed_req_event != INVALID_HANDLE_VALUE) {
180         CloseHandle(completed_req_event);
181         completed_req_event = INVALID_HANDLE_VALUE;
182     }
183     if (abandon_req_wait != INVALID_HANDLE_VALUE) {
184         CloseHandle(abandon_req_wait);
185         abandon_req_wait = INVALID_HANDLE_VALUE;
186     }
187     if (completed_table_sema != NULL) {
188         CloseHandle(completed_table_sema);
189         completed_table_sema = NULL;
190     }
191     DeleteCriticalSection(&queue_lock);
192 }
193
194 /*
195  * Function: awaitRequests(wait)
196  *
197  * Check for the completion of external IO work requests. Worker
198  * threads signal completion of IO requests by depositing them
199  * in a table (completedTable). awaitRequests() matches up 
200  * requests in that table with threads on the blocked_queue, 
201  * making the threads whose IO requests have completed runnable
202  * again.
203  * 
204  * awaitRequests() is called by the scheduler periodically _or_ if
205  * it is out of work, and need to wait for the completion of IO
206  * requests to make further progress. In the latter scenario, 
207  * awaitRequests() will simply block waiting for worker threads 
208  * to complete if the 'completedTable' is empty.
209  */
210 int
211 awaitRequests(rtsBool wait)
212 {
213 #ifndef THREADED_RTS
214   // none of this is actually used in the threaded RTS
215
216 start:
217 #if 0
218     fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
219     fflush(stderr);
220 #endif
221     EnterCriticalSection(&queue_lock);
222     /* Nothing immediately available & we won't wait */
223     if ((!wait && completed_hw == 0)
224 #if 0
225         // If we just return when wait==rtsFalse, we'll go into a busy
226         // wait loop, so I disabled this condition --SDM 18/12/2003
227         (issued_reqs == 0 && completed_hw == 0)
228 #endif
229         ) {
230         LeaveCriticalSection(&queue_lock);
231         return 0;
232     }
233     if (completed_hw == 0) {
234         /* empty table, drop lock and wait */
235         LeaveCriticalSection(&queue_lock);
236         if ( wait && sched_state == SCHED_RUNNING ) {
237             DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
238             switch (dwRes) {
239             case WAIT_OBJECT_0:
240                 /* a request was completed */
241                 break;
242             case WAIT_OBJECT_0 + 1:
243             case WAIT_TIMEOUT:
244                 /* timeout (unlikely) or told to abandon waiting */
245                 return 0;
246             case WAIT_FAILED: {
247                 DWORD dw = GetLastError();
248                 fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
249                 return 0;
250             }
251             default:
252                 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
253                 return 0;
254             }
255         } else {
256             return 0;
257         }
258         goto start;
259     } else {
260         int i;
261         StgTSO *tso, *prev;
262         
263         for (i=0; i < completed_hw; i++) {
264             /* For each of the completed requests, match up their Ids
265              * with those of the threads on the blocked_queue. If the
266              * thread that made the IO request has been subsequently
267              * killed (and removed from blocked_queue), no match will
268              * be found for that request Id. 
269              *
270              * i.e., killing a Haskell thread doesn't attempt to cancel
271              * the IO request it is blocked on.
272              *
273              */
274             unsigned int rID = completedTable[i].reqID;
275             
276             prev = NULL;
277             for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->_link) {
278         
279                 if (tso->what_next == ThreadRelocated) {
280                     /* Drop the TSO from blocked_queue */
281                     if (prev) {
282                         setTSOLink(&MainCapability, prev, tso->_link);
283                     } else {
284                         blocked_queue_hd = tso->_link;
285                     }
286                     if (blocked_queue_tl == tso) {
287                         blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
288                     }
289                     continue;
290                 }
291
292                 switch(tso->why_blocked) {
293                 case BlockedOnRead:
294                 case BlockedOnWrite:
295                 case BlockedOnDoProc:
296                     if (tso->block_info.async_result->reqID == rID) {
297                         /* Found the thread blocked waiting on request; stodgily fill 
298                          * in its result block. 
299                          */
300                         tso->block_info.async_result->len = completedTable[i].len;
301                         tso->block_info.async_result->errCode = completedTable[i].errCode;
302                         
303                         /* Drop the matched TSO from blocked_queue */
304                         if (prev) {
305                             setTSOLink(&MainCapability, prev, tso->_link);
306                         } else {
307                             blocked_queue_hd = tso->_link;
308                         }
309                         if (blocked_queue_tl == tso) {
310                             blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
311                         }
312                     
313                         /* Terminates the run queue + this inner for-loop. */
314                         tso->_link = END_TSO_QUEUE;
315                         tso->why_blocked = NotBlocked;
316                         pushOnRunQueue(&MainCapability, tso);
317                         break;
318                     }
319                     break;
320                 default:
321                     if (tso->why_blocked != NotBlocked) {
322                         barf("awaitRequests: odd thread state");
323                     }
324                     break;
325                 }
326
327                 prev = tso;
328             }
329             /* Signal that there's completed table slots available */
330             if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
331                 DWORD dw = GetLastError();
332                 fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", (int)dw);
333                 fflush(stderr);
334             }
335         }
336         completed_hw = 0;
337         ResetEvent(completed_req_event);
338         LeaveCriticalSection(&queue_lock);
339         return 1;
340     }
341 #endif /* !THREADED_RTS */
342 }
343
344 /*
345  * Function: abandonRequestWait()
346  *
347  * Wake up a thread that's blocked waiting for new IO requests
348  * to complete (via awaitRequests().)
349  */
350 void
351 abandonRequestWait( void )
352 {
353     /* the event is auto-reset, but in case there's no thread
354      * already waiting on the event, we want to return it to
355      * a non-signalled state.
356      *
357      * Careful!  There is no synchronisation between
358      * abandonRequestWait and awaitRequest, which means that
359      * abandonRequestWait might be called just before a thread
360      * goes into a wait, and we miss the abandon signal.  So we
361      * must SetEvent() here rather than PulseEvent() to ensure
362      * that the event isn't lost.  We can re-optimise by resetting
363      * the event somewhere safe if we know the event has been
364      * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
365      */
366     SetEvent(abandon_req_wait);
367 }
368
369 void
370 resetAbandonRequestWait( void )
371 {
372     ResetEvent(abandon_req_wait);
373 }
374
375 #endif /* !defined(THREADED_RTS) */