Ensure runhaskell is rebuild in stage2
[ghc-hetmet.git] / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "RtsUtils.h"
12 #include <windows.h>
13 #include <stdio.h>
14 #include "Schedule.h"
15 #include "RtsFlags.h"
16 #include "Capability.h"
17 #include "win32/AsyncIO.h"
18 #include "win32/IOManager.h"
19
20 /*
21  * Overview:
22  *
23  * Haskell code issue asynchronous I/O requests via the 
24  * async{Read,Write,DoOp}# primops. These cause addIORequest()
25  * to be invoked, which forwards the request to the underlying
26  * asynchronous I/O subsystem. Each request is tagged with a unique
27  * ID.
28  *
29  * addIORequest() returns this ID, so that when the blocked CH
30  * thread is added onto blocked_queue, its TSO is annotated with
31  * it. Upon completion of an I/O request, the async I/O handling
32  * code makes a back-call to signal its completion; the local
33  * onIOComplete() routine. It adds the IO request ID (along with
34  * its result data) to a queue of completed requests before returning. 
35  *
36  * The queue of completed IO request is read by the thread operating
37  * the RTS scheduler. It de-queues the CH threads corresponding
38  * to the request IDs, making them runnable again.
39  *
40  */
41
42 typedef struct CompletedReq {
43     unsigned int   reqID;
44     int            len;
45     int            errCode;
46 } CompletedReq;
47
48 #define MAX_REQUESTS 200
49
50 static CRITICAL_SECTION queue_lock;
51 static HANDLE           completed_req_event = INVALID_HANDLE_VALUE;
52 static HANDLE           abandon_req_wait = INVALID_HANDLE_VALUE;
53 static HANDLE           wait_handles[2];
54 static CompletedReq     completedTable[MAX_REQUESTS];
55 static int              completed_hw;
56 static HANDLE           completed_table_sema;
57 static int              issued_reqs;
58
59 static void
60 onIOComplete(unsigned int reqID,
61              int   fd STG_UNUSED,
62              int   len,
63              void* buf STG_UNUSED,
64              int   errCode)
65 {
66     DWORD dwRes;
67     /* Deposit result of request in queue/table..when there's room. */
68     dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
69     switch (dwRes) {
70     case WAIT_OBJECT_0:
71         break;
72     default:
73         /* Not likely */
74         fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
75         fflush(stderr);
76         return;
77     }
78     EnterCriticalSection(&queue_lock);
79     if (completed_hw == MAX_REQUESTS) {
80         /* Shouldn't happen */
81         fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
82         fflush(stderr);
83     } else {
84 #if 0
85         fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
86                 reqID, len, errCode, issued_reqs, completed_hw); 
87         fflush(stderr);
88 #endif
89         completedTable[completed_hw].reqID   = reqID;
90         completedTable[completed_hw].len     = len;
91         completedTable[completed_hw].errCode = errCode;
92         completed_hw++;
93         issued_reqs--;
94         if (completed_hw == 1) {
95             /* The event is used to wake up the scheduler thread should it
96              * be blocked waiting for requests to complete. The event resets once
97              * that thread has cleared out the request queue/table.
98              */
99             SetEvent(completed_req_event);
100         }
101     }
102     LeaveCriticalSection(&queue_lock);
103 }
104
105 unsigned int
106 addIORequest(int   fd,
107              int   forWriting,
108              int   isSock,
109              int   len,
110              char* buf)
111 {
112     EnterCriticalSection(&queue_lock);
113     issued_reqs++;
114     LeaveCriticalSection(&queue_lock);
115 #if 0
116     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
117 #endif
118     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
119 }
120
121 unsigned int
122 addDelayRequest(int msecs)
123 {
124     EnterCriticalSection(&queue_lock);
125     issued_reqs++;
126     LeaveCriticalSection(&queue_lock);
127 #if 0
128     fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
129 #endif
130     return AddDelayRequest(msecs,onIOComplete);
131 }
132
133 unsigned int
134 addDoProcRequest(void* proc, void* param)
135 {
136     EnterCriticalSection(&queue_lock);
137     issued_reqs++;
138     LeaveCriticalSection(&queue_lock);
139 #if 0
140     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
141 #endif
142     return AddProcRequest(proc,param,onIOComplete);
143 }
144
145
146 int
147 startupAsyncIO()
148 {
149     if (!StartIOManager()) {
150         return 0;
151     }
152     InitializeCriticalSection(&queue_lock);
153     /* Create a pair of events:
154      *
155      *    - completed_req_event  -- signals the deposit of request result; manual reset.
156      *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
157      *                              thread to abandon wait for IO request completion.
158      *                              Auto reset.
159      */
160     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
161     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
162     wait_handles[0] = completed_req_event;
163     wait_handles[1] = abandon_req_wait;
164     completed_hw = 0;
165     if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
166         DWORD rc = GetLastError();
167         fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", (int)rc);
168         fflush(stderr);
169     }
170
171     return ( completed_req_event  != INVALID_HANDLE_VALUE &&
172              abandon_req_wait     != INVALID_HANDLE_VALUE &&
173              completed_table_sema != NULL );
174 }
175
176 void
177 shutdownAsyncIO(rtsBool wait_threads)
178 {
179     ShutdownIOManager(wait_threads);
180     if (completed_req_event != INVALID_HANDLE_VALUE) {
181         CloseHandle(completed_req_event);
182         completed_req_event = INVALID_HANDLE_VALUE;
183     }
184     if (abandon_req_wait != INVALID_HANDLE_VALUE) {
185         CloseHandle(abandon_req_wait);
186         abandon_req_wait = INVALID_HANDLE_VALUE;
187     }
188     if (completed_table_sema != NULL) {
189         CloseHandle(completed_table_sema);
190         completed_table_sema = NULL;
191     }
192     DeleteCriticalSection(&queue_lock);
193 }
194
195 /*
196  * Function: awaitRequests(wait)
197  *
198  * Check for the completion of external IO work requests. Worker
199  * threads signal completion of IO requests by depositing them
200  * in a table (completedTable). awaitRequests() matches up 
201  * requests in that table with threads on the blocked_queue, 
202  * making the threads whose IO requests have completed runnable
203  * again.
204  * 
205  * awaitRequests() is called by the scheduler periodically _or_ if
206  * it is out of work, and need to wait for the completion of IO
207  * requests to make further progress. In the latter scenario, 
208  * awaitRequests() will simply block waiting for worker threads 
209  * to complete if the 'completedTable' is empty.
210  */
211 int
212 awaitRequests(rtsBool wait)
213 {
214 #ifndef THREADED_RTS
215   // none of this is actually used in the threaded RTS
216
217 start:
218 #if 0
219     fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
220     fflush(stderr);
221 #endif
222     EnterCriticalSection(&queue_lock);
223     /* Nothing immediately available & we won't wait */
224     if ((!wait && completed_hw == 0)
225 #if 0
226         // If we just return when wait==rtsFalse, we'll go into a busy
227         // wait loop, so I disabled this condition --SDM 18/12/2003
228         (issued_reqs == 0 && completed_hw == 0)
229 #endif
230         ) {
231         LeaveCriticalSection(&queue_lock);
232         return 0;
233     }
234     if (completed_hw == 0) {
235         /* empty table, drop lock and wait */
236         LeaveCriticalSection(&queue_lock);
237         if ( wait && sched_state == SCHED_RUNNING ) {
238             DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
239             switch (dwRes) {
240             case WAIT_OBJECT_0:
241                 /* a request was completed */
242                 break;
243             case WAIT_OBJECT_0 + 1:
244             case WAIT_TIMEOUT:
245                 /* timeout (unlikely) or told to abandon waiting */
246                 return 0;
247             case WAIT_FAILED: {
248                 DWORD dw = GetLastError();
249                 fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
250                 return 0;
251             }
252             default:
253                 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
254                 return 0;
255             }
256         } else {
257             return 0;
258         }
259         goto start;
260     } else {
261         int i;
262         StgTSO *tso, *prev;
263         
264         for (i=0; i < completed_hw; i++) {
265             /* For each of the completed requests, match up their Ids
266              * with those of the threads on the blocked_queue. If the
267              * thread that made the IO request has been subsequently
268              * killed (and removed from blocked_queue), no match will
269              * be found for that request Id. 
270              *
271              * i.e., killing a Haskell thread doesn't attempt to cancel
272              * the IO request it is blocked on.
273              *
274              */
275             unsigned int rID = completedTable[i].reqID;
276             
277             prev = NULL;
278             for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
279         
280                 switch(tso->why_blocked) {
281                 case BlockedOnRead:
282                 case BlockedOnWrite:
283                 case BlockedOnDoProc:
284                     if (tso->block_info.async_result->reqID == rID) {
285                         /* Found the thread blocked waiting on request; stodgily fill 
286                          * in its result block. 
287                          */
288                         tso->block_info.async_result->len = completedTable[i].len;
289                         tso->block_info.async_result->errCode = completedTable[i].errCode;
290                         
291                         /* Drop the matched TSO from blocked_queue */
292                         if (prev) {
293                             prev->link = tso->link;
294                         } else {
295                             blocked_queue_hd = tso->link;
296                         }
297                         if (blocked_queue_tl == tso) {
298                             blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
299                         }
300                     
301                         /* Terminates the run queue + this inner for-loop. */
302                         tso->link = END_TSO_QUEUE;
303                         tso->why_blocked = NotBlocked;
304                         pushOnRunQueue(&MainCapability, tso);
305                         break;
306                     }
307                     break;
308                 default:
309                     if (tso->why_blocked != NotBlocked) {
310                         barf("awaitRequests: odd thread state");
311                     }
312                     break;
313                 }
314             }
315             /* Signal that there's completed table slots available */
316             if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
317                 DWORD dw = GetLastError();
318                 fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", (int)dw);
319                 fflush(stderr);
320             }
321         }
322         completed_hw = 0;
323         ResetEvent(completed_req_event);
324         LeaveCriticalSection(&queue_lock);
325         return 1;
326     }
327 #endif /* !THREADED_RTS */
328 }
329
330 /*
331  * Function: abandonRequestWait()
332  *
333  * Wake up a thread that's blocked waiting for new IO requests
334  * to complete (via awaitRequests().)
335  */
336 void
337 abandonRequestWait( void )
338 {
339     /* the event is auto-reset, but in case there's no thread
340      * already waiting on the event, we want to return it to
341      * a non-signalled state.
342      *
343      * Careful!  There is no synchronisation between
344      * abandonRequestWait and awaitRequest, which means that
345      * abandonRequestWait might be called just before a thread
346      * goes into a wait, and we miss the abandon signal.  So we
347      * must SetEvent() here rather than PulseEvent() to ensure
348      * that the event isn't lost.  We can re-optimise by resetting
349      * the event somewhere safe if we know the event has been
350      * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
351      */
352     SetEvent(abandon_req_wait);
353 }
354
355 void
356 resetAbandonRequestWait( void )
357 {
358     ResetEvent(abandon_req_wait);
359 }
360
361 #endif /* !defined(THREADED_RTS) */