3 * Integrating Win32 asynchronous I/O with the GHC RTS.
11 #include "win32/AsyncIO.h"
12 #include "win32/IOManager.h"
17 * Haskell code issue asynchronous I/O requests via the
18 * asyncRead# and asyncWrite# primops. These cause addIORequest()
19 * to be invoked, which forwards the request to the underlying
20 * asynchronous I/O subsystem. Each request is tagged with a unique
23 * addIORequest() returns this ID, so that when the blocked CH
24 * thread is added onto blocked_queue, its TSO is annotated with
25 * it. Upon completion of an I/O request, the async I/O handling
26 * code makes a back-call to signal its completion; the local
27 * onIOComplete() routine. It adds the IO request ID (along with
28 * its result data) to a queue of completed requests before returning.
30 * The queue of completed IO request is read by the thread operating
31 * the RTS scheduler. It de-queues the CH threads corresponding
32 * to the request IDs, making them runnable again.
36 typedef struct CompletedReq {
42 #define MAX_REQUESTS 200
44 static CRITICAL_SECTION queue_lock;
45 static HANDLE completed_req_event;
46 static HANDLE abandon_req_wait;
47 static HANDLE wait_handles[2];
48 static CompletedReq completedTable[MAX_REQUESTS];
49 static int completed_hw;
50 static int issued_reqs;
53 onIOComplete(unsigned int reqID,
54 void* param STG_UNUSED,
60 /* Deposit result of request in queue/table */
61 EnterCriticalSection(&queue_lock);
62 if (completed_hw == MAX_REQUESTS) {
64 fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
68 fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
70 completedTable[completed_hw].reqID = reqID;
71 completedTable[completed_hw].len = len;
72 completedTable[completed_hw].errCode = errCode;
75 if (completed_hw == 1) {
76 /* The event is used to wake up the scheduler thread should it
77 * be blocked waiting for requests to complete. It reset once
78 * that thread has cleared out the request queue/table.
80 SetEvent(completed_req_event);
83 LeaveCriticalSection(&queue_lock);
93 EnterCriticalSection(&queue_lock);
95 LeaveCriticalSection(&queue_lock);
97 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
99 return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
103 addDelayRequest(int msecs)
105 EnterCriticalSection(&queue_lock);
107 LeaveCriticalSection(&queue_lock);
109 fprintf(stderr, "addDelayReq: %d %d %d\n", msecs); fflush(stderr);
111 return AddDelayRequest(msecs,0,onIOComplete);
117 if (!StartIOManager()) {
120 InitializeCriticalSection(&queue_lock);
121 /* Create a pair of events:
123 * - completed_req_event -- signals the deposit of request result; manual reset.
124 * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
125 * thread to abandon wait for IO request completion.
128 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
129 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
130 wait_handles[0] = completed_req_event;
131 wait_handles[1] = abandon_req_wait;
133 return ( completed_req_event != INVALID_HANDLE_VALUE &&
134 abandon_req_wait != INVALID_HANDLE_VALUE );
140 CloseHandle(completed_req_event);
145 awaitRequests(rtsBool wait)
149 fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
151 EnterCriticalSection(&queue_lock);
152 /* Nothing immediately available & we won't wait */
153 if ((!wait && completed_hw == 0) ||
154 (issued_reqs == 0 && completed_hw == 0)) {
155 LeaveCriticalSection(&queue_lock);
158 if (completed_hw == 0) {
159 /* empty table, drop lock and wait */
160 LeaveCriticalSection(&queue_lock);
162 DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
166 case WAIT_OBJECT_0 + 1:
170 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
174 return 0; /* cannot happen */
181 for (i=0; i < completed_hw; i++) {
182 unsigned int rID = completedTable[i].reqID;
184 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
185 switch(tso->why_blocked) {
189 if (tso->block_info.async_result->reqID == rID) {
190 /* Found the thread blocked waiting on request; stodgily fill
191 * in its result block.
193 if (tso->why_blocked != BlockedOnDelay) {
194 tso->block_info.async_result->len = completedTable[i].len;
195 tso->block_info.async_result->errCode = completedTable[i].errCode;
198 /* Drop the matched TSO from blocked_queue */
200 prev->link = tso->link;
202 blocked_queue_hd = tso->link;
204 if (blocked_queue_tl == tso) {
205 blocked_queue_tl = prev;
207 /* Terminates the run queue + this inner for-loop. */
208 tso->link = END_TSO_QUEUE;
209 tso->why_blocked = NotBlocked;
210 PUSH_ON_RUN_QUEUE(tso);
221 ResetEvent(completed_req_event);
222 LeaveCriticalSection(&queue_lock);
230 /* the event is auto-reset, but in case there's no thread
231 * already waiting on the event, we want to return it to
232 * a non-signalled state.
234 PulseEvent(abandon_req_wait);