[project @ 2003-09-12 16:16:43 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "RtsUtils.h"
9 #include <windows.h>
10 #include <stdio.h>
11 #include "Schedule.h"
12 #include "win32/AsyncIO.h"
13 #include "win32/IOManager.h"
14
15 /*
16  * Overview:
17  *
18  * Haskell code issue asynchronous I/O requests via the 
19  * async{Read,Write,DoOp}# primops. These cause addIORequest()
20  * to be invoked, which forwards the request to the underlying
21  * asynchronous I/O subsystem. Each request is tagged with a unique
22  * ID.
23  *
24  * addIORequest() returns this ID, so that when the blocked CH
25  * thread is added onto blocked_queue, its TSO is annotated with
26  * it. Upon completion of an I/O request, the async I/O handling
27  * code makes a back-call to signal its completion; the local
28  * onIOComplete() routine. It adds the IO request ID (along with
29  * its result data) to a queue of completed requests before returning. 
30  *
31  * The queue of completed IO request is read by the thread operating
32  * the RTS scheduler. It de-queues the CH threads corresponding
33  * to the request IDs, making them runnable again.
34  *
35  */
36
37 typedef struct CompletedReq {
38     unsigned int   reqID;
39     int            len;
40     int            errCode;
41 } CompletedReq;
42
43 #define MAX_REQUESTS 200
44
45 static CRITICAL_SECTION queue_lock;
46 static HANDLE           completed_req_event;
47 static HANDLE           abandon_req_wait;
48 static HANDLE           wait_handles[2];
49 static CompletedReq     completedTable[MAX_REQUESTS];
50 static int              completed_hw;
51 static int              issued_reqs;
52
53 static void
54 onIOComplete(unsigned int reqID,
55              int   fd STG_UNUSED,
56              int   len,
57              void* buf STG_UNUSED,
58              int   errCode)
59 {
60     /* Deposit result of request in queue/table */
61     EnterCriticalSection(&queue_lock);
62     if (completed_hw == MAX_REQUESTS) {
63         /* Not likely */
64         fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
65         fflush(stderr);
66     } else {
67 #if 0
68         fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
69                 reqID, len, errCode, issued_reqs, completed_hw); 
70         fflush(stderr);
71 #endif
72         completedTable[completed_hw].reqID   = reqID;
73         completedTable[completed_hw].len     = len;
74         completedTable[completed_hw].errCode = errCode;
75         completed_hw++;
76         issued_reqs--;
77         if (completed_hw == 1) {
78             /* The event is used to wake up the scheduler thread should it
79              * be blocked waiting for requests to complete. It reset once
80              * that thread has cleared out the request queue/table.
81              */
82             SetEvent(completed_req_event);
83         }
84     }
85     LeaveCriticalSection(&queue_lock);
86 }
87
88 unsigned int
89 addIORequest(int   fd,
90              int   forWriting,
91              int   isSock,
92              int   len,
93              char* buf)
94 {
95     EnterCriticalSection(&queue_lock);
96     issued_reqs++;
97     LeaveCriticalSection(&queue_lock);
98 #if 0
99     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
100 #endif
101     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
102 }
103
104 unsigned int
105 addDelayRequest(int msecs)
106 {
107     EnterCriticalSection(&queue_lock);
108     issued_reqs++;
109     LeaveCriticalSection(&queue_lock);
110 #if 0
111     fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
112 #endif
113     return AddDelayRequest(msecs,onIOComplete);
114 }
115
116 unsigned int
117 addDoProcRequest(void* proc, void* param)
118 {
119     EnterCriticalSection(&queue_lock);
120     issued_reqs++;
121     LeaveCriticalSection(&queue_lock);
122 #if 0
123     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
124 #endif
125     return AddProcRequest(proc,param,onIOComplete);
126 }
127
128
129 int
130 startupAsyncIO()
131 {
132     if (!StartIOManager()) {
133         return 0;
134     }
135     InitializeCriticalSection(&queue_lock);
136     /* Create a pair of events:
137      *
138      *    - completed_req_event  -- signals the deposit of request result; manual reset.
139      *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
140      *                              thread to abandon wait for IO request completion.
141      *                              Auto reset.
142      */
143     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
144     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
145     wait_handles[0] = completed_req_event;
146     wait_handles[1] = abandon_req_wait;
147     completed_hw = 0;
148     return ( completed_req_event != INVALID_HANDLE_VALUE &&
149              abandon_req_wait    != INVALID_HANDLE_VALUE );
150 }
151
152 void
153 shutdownAsyncIO()
154 {
155     CloseHandle(completed_req_event);
156     ShutdownIOManager();
157 }
158
159 /*
160  * Function: awaitRequests(wait)
161  *
162  * Check for the completion of external IO work requests. Worker
163  * threads signal completion of IO requests by depositing them
164  * in a table (completedTable). awaitRequests() matches up 
165  * requests in that table with threads on the blocked_queue, 
166  * making the threads whose IO requests have completed runnable
167  * again.
168  * 
169  * awaitRequests() is called by the scheduler periodically _or_ if
170  * it is out of work, and need to wait for the completion of IO
171  * requests to make further progress. In the latter scenario, 
172  * awaitRequests() will simply block waiting for worker threads 
173  * to complete if the 'completedTable' is empty.
174  */
175 int
176 awaitRequests(rtsBool wait)
177 {
178 start:
179 #if 0
180     fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
181     fflush(stderr);
182 #endif
183     EnterCriticalSection(&queue_lock);
184     /* Nothing immediately available & we won't wait */
185     if ((!wait && completed_hw == 0) || 
186         (issued_reqs == 0 && completed_hw == 0)) {
187         LeaveCriticalSection(&queue_lock);
188         return 0;
189     }
190     if (completed_hw == 0) {
191         /* empty table, drop lock and wait */
192         LeaveCriticalSection(&queue_lock);
193         if ( wait && !interrupted ) {
194             DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
195             switch (dwRes) {
196             case WAIT_OBJECT_0:
197                 break;
198             case WAIT_OBJECT_0 + 1:
199             case WAIT_TIMEOUT:
200                 return 0;
201             default:
202                 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
203                 return 0;
204             }
205         } else {
206             return 0; /* cannot happen */
207         }
208         goto start;
209     } else {
210         int i;
211         StgTSO *tso, *prev;
212         
213         for (i=0; i < completed_hw; i++) {
214             /* For each of the completed requests, match up their Ids
215              * with those of the threads on the blocked_queue. If the
216              * thread that made the IO request has been subsequently
217              * killed (and removed from blocked_queue), no match will
218              * be found for that request Id. 
219              *
220              * i.e., killing a Haskell thread doesn't attempt to cancel
221              * the IO request it is blocked on.
222              *
223              */
224             unsigned int rID = completedTable[i].reqID;
225             prev = NULL;
226             
227             prev = NULL;
228             for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
229         
230                 switch(tso->why_blocked) {
231                 case BlockedOnRead:
232                 case BlockedOnWrite:
233                 case BlockedOnDoProc:
234                     if (tso->block_info.async_result->reqID == rID) {
235                         /* Found the thread blocked waiting on request; stodgily fill 
236                          * in its result block. 
237                          */
238                         tso->block_info.async_result->len = completedTable[i].len;
239                         tso->block_info.async_result->errCode = completedTable[i].errCode;
240                         
241                         /* Drop the matched TSO from blocked_queue */
242                         if (prev) {
243                             prev->link = tso->link;
244                         } else {
245                             blocked_queue_hd = tso->link;
246                         }
247                         if (blocked_queue_tl == tso) {
248                             blocked_queue_tl = prev;
249                         }
250                     
251                         /* Terminates the run queue + this inner for-loop. */
252                         tso->link = END_TSO_QUEUE;
253                         tso->why_blocked = NotBlocked;
254                         PUSH_ON_RUN_QUEUE(tso);
255                         break;
256                     }
257                     break;
258                 default:
259                     if (tso->why_blocked != NotBlocked) {
260                         barf("awaitRequests: odd thread state");
261                     }
262                     break;
263                 }
264             }
265         }
266         completed_hw = 0;
267         ResetEvent(completed_req_event);
268         LeaveCriticalSection(&queue_lock);
269         return 1;
270     }
271 }
272
273 /*
274  * Function: abandonRequestWait()
275  *
276  * Wake up a thread that's blocked waiting for new IO requests
277  * to complete (via awaitRequests().)
278  */
279 void
280 abandonRequestWait()
281 {
282     /* the event is auto-reset, but in case there's no thread
283      * already waiting on the event, we want to return it to
284      * a non-signalled state.
285      */
286     PulseEvent(abandon_req_wait);
287 }