3 * Integrating Win32 asynchronous I/O with the GHC RTS.
12 #include "win32/AsyncIO.h"
13 #include "win32/IOManager.h"
18 * Haskell code issue asynchronous I/O requests via the
19 * async{Read,Write,DoOp}# primops. These cause addIORequest()
20 * to be invoked, which forwards the request to the underlying
21 * asynchronous I/O subsystem. Each request is tagged with a unique
24 * addIORequest() returns this ID, so that when the blocked CH
25 * thread is added onto blocked_queue, its TSO is annotated with
26 * it. Upon completion of an I/O request, the async I/O handling
27 * code makes a back-call to signal its completion; the local
28 * onIOComplete() routine. It adds the IO request ID (along with
29 * its result data) to a queue of completed requests before returning.
31 * The queue of completed IO request is read by the thread operating
32 * the RTS scheduler. It de-queues the CH threads corresponding
33 * to the request IDs, making them runnable again.
37 typedef struct CompletedReq {
43 #define MAX_REQUESTS 200
45 static CRITICAL_SECTION queue_lock;
46 static HANDLE completed_req_event;
47 static HANDLE abandon_req_wait;
48 static HANDLE wait_handles[2];
49 static CompletedReq completedTable[MAX_REQUESTS];
50 static int completed_hw;
51 static int issued_reqs;
54 onIOComplete(unsigned int reqID,
60 /* Deposit result of request in queue/table */
61 EnterCriticalSection(&queue_lock);
62 if (completed_hw == MAX_REQUESTS) {
64 fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
68 fprintf(stderr, "onCompl: %d %d %d %d %d\n",
69 reqID, len, errCode, issued_reqs, completed_hw);
72 completedTable[completed_hw].reqID = reqID;
73 completedTable[completed_hw].len = len;
74 completedTable[completed_hw].errCode = errCode;
77 if (completed_hw == 1) {
78 /* The event is used to wake up the scheduler thread should it
79 * be blocked waiting for requests to complete. It reset once
80 * that thread has cleared out the request queue/table.
82 SetEvent(completed_req_event);
85 LeaveCriticalSection(&queue_lock);
95 EnterCriticalSection(&queue_lock);
97 LeaveCriticalSection(&queue_lock);
99 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
101 return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
105 addDelayRequest(int msecs)
107 EnterCriticalSection(&queue_lock);
109 LeaveCriticalSection(&queue_lock);
111 fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
113 return AddDelayRequest(msecs,onIOComplete);
117 addDoProcRequest(void* proc, void* param)
119 EnterCriticalSection(&queue_lock);
121 LeaveCriticalSection(&queue_lock);
123 fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
125 return AddProcRequest(proc,param,onIOComplete);
132 if (!StartIOManager()) {
135 InitializeCriticalSection(&queue_lock);
136 /* Create a pair of events:
138 * - completed_req_event -- signals the deposit of request result; manual reset.
139 * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
140 * thread to abandon wait for IO request completion.
143 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
144 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
145 wait_handles[0] = completed_req_event;
146 wait_handles[1] = abandon_req_wait;
148 return ( completed_req_event != INVALID_HANDLE_VALUE &&
149 abandon_req_wait != INVALID_HANDLE_VALUE );
155 CloseHandle(completed_req_event);
160 * Function: awaitRequests(wait)
162 * Check for the completion of external IO work requests. Worker
163 * threads signal completion of IO requests by depositing them
164 * in a table (completedTable). awaitRequests() matches up
165 * requests in that table with threads on the blocked_queue,
166 * making the threads whose IO requests have completed runnable
169 * awaitRequests() is called by the scheduler periodically _or_ if
170 * it is out of work, and need to wait for the completion of IO
171 * requests to make further progress. In the latter scenario,
172 * awaitRequests() will simply block waiting for worker threads
173 * to complete if the 'completedTable' is empty.
176 awaitRequests(rtsBool wait)
180 fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
183 EnterCriticalSection(&queue_lock);
184 /* Nothing immediately available & we won't wait */
185 if ((!wait && completed_hw == 0) ||
186 (issued_reqs == 0 && completed_hw == 0)) {
187 LeaveCriticalSection(&queue_lock);
190 if (completed_hw == 0) {
191 /* empty table, drop lock and wait */
192 LeaveCriticalSection(&queue_lock);
193 if ( wait && !interrupted ) {
194 DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
198 case WAIT_OBJECT_0 + 1:
202 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
206 return 0; /* cannot happen */
213 for (i=0; i < completed_hw; i++) {
214 /* For each of the completed requests, match up their Ids
215 * with those of the threads on the blocked_queue. If the
216 * thread that made the IO request has been subsequently
217 * killed (and removed from blocked_queue), no match will
218 * be found for that request Id.
220 * i.e., killing a Haskell thread doesn't attempt to cancel
221 * the IO request it is blocked on.
224 unsigned int rID = completedTable[i].reqID;
228 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
230 switch(tso->why_blocked) {
233 case BlockedOnDoProc:
234 if (tso->block_info.async_result->reqID == rID) {
235 /* Found the thread blocked waiting on request; stodgily fill
236 * in its result block.
238 tso->block_info.async_result->len = completedTable[i].len;
239 tso->block_info.async_result->errCode = completedTable[i].errCode;
241 /* Drop the matched TSO from blocked_queue */
243 prev->link = tso->link;
245 blocked_queue_hd = tso->link;
247 if (blocked_queue_tl == tso) {
248 blocked_queue_tl = prev;
251 /* Terminates the run queue + this inner for-loop. */
252 tso->link = END_TSO_QUEUE;
253 tso->why_blocked = NotBlocked;
254 PUSH_ON_RUN_QUEUE(tso);
259 if (tso->why_blocked != NotBlocked) {
260 barf("awaitRequests: odd thread state");
267 ResetEvent(completed_req_event);
268 LeaveCriticalSection(&queue_lock);
274 * Function: abandonRequestWait()
276 * Wake up a thread that's blocked waiting for new IO requests
277 * to complete (via awaitRequests().)
282 /* the event is auto-reset, but in case there's no thread
283 * already waiting on the event, we want to return it to
284 * a non-signalled state.
286 PulseEvent(abandon_req_wait);