3 * Integrating Win32 asynchronous I/O with the GHC RTS.
12 #include "win32/AsyncIO.h"
13 #include "win32/IOManager.h"
18 * Haskell code issue asynchronous I/O requests via the
19 * asyncRead# and asyncWrite# primops. These cause addIORequest()
20 * to be invoked, which forwards the request to the underlying
21 * asynchronous I/O subsystem. Each request is tagged with a unique
24 * addIORequest() returns this ID, so that when the blocked CH
25 * thread is added onto blocked_queue, its TSO is annotated with
26 * it. Upon completion of an I/O request, the async I/O handling
27 * code makes a back-call to signal its completion; the local
28 * onIOComplete() routine. It adds the IO request ID (along with
29 * its result data) to a queue of completed requests before returning.
31 * The queue of completed IO request is read by the thread operating
32 * the RTS scheduler. It de-queues the CH threads corresponding
33 * to the request IDs, making them runnable again.
37 typedef struct CompletedReq {
43 #define MAX_REQUESTS 200
45 static CRITICAL_SECTION queue_lock;
46 static HANDLE completed_req_event;
47 static HANDLE abandon_req_wait;
48 static HANDLE wait_handles[2];
49 static CompletedReq completedTable[MAX_REQUESTS];
50 static int completed_hw;
51 static int issued_reqs;
54 onIOComplete(unsigned int reqID,
60 /* Deposit result of request in queue/table */
61 EnterCriticalSection(&queue_lock);
62 if (completed_hw == MAX_REQUESTS) {
64 fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
68 fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
70 completedTable[completed_hw].reqID = reqID;
71 completedTable[completed_hw].len = len;
72 completedTable[completed_hw].errCode = errCode;
75 if (completed_hw == 1) {
76 /* The event is used to wake up the scheduler thread should it
77 * be blocked waiting for requests to complete. It reset once
78 * that thread has cleared out the request queue/table.
80 SetEvent(completed_req_event);
83 LeaveCriticalSection(&queue_lock);
93 EnterCriticalSection(&queue_lock);
95 LeaveCriticalSection(&queue_lock);
97 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
99 return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
103 addDelayRequest(int msecs)
105 EnterCriticalSection(&queue_lock);
107 LeaveCriticalSection(&queue_lock);
109 fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
111 return AddDelayRequest(msecs,onIOComplete);
115 addDoProcRequest(void* proc, void* param)
117 EnterCriticalSection(&queue_lock);
119 LeaveCriticalSection(&queue_lock);
121 fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
123 return AddProcRequest(proc,param,onIOComplete);
130 if (!StartIOManager()) {
133 InitializeCriticalSection(&queue_lock);
134 /* Create a pair of events:
136 * - completed_req_event -- signals the deposit of request result; manual reset.
137 * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
138 * thread to abandon wait for IO request completion.
141 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
142 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
143 wait_handles[0] = completed_req_event;
144 wait_handles[1] = abandon_req_wait;
146 return ( completed_req_event != INVALID_HANDLE_VALUE &&
147 abandon_req_wait != INVALID_HANDLE_VALUE );
153 CloseHandle(completed_req_event);
158 awaitRequests(rtsBool wait)
162 fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
164 EnterCriticalSection(&queue_lock);
165 /* Nothing immediately available & we won't wait */
166 if ((!wait && completed_hw == 0) ||
167 (issued_reqs == 0 && completed_hw == 0)) {
168 LeaveCriticalSection(&queue_lock);
171 if (completed_hw == 0) {
172 /* empty table, drop lock and wait */
173 LeaveCriticalSection(&queue_lock);
175 DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
179 case WAIT_OBJECT_0 + 1:
183 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
187 return 0; /* cannot happen */
194 for (i=0; i < completed_hw; i++) {
195 unsigned int rID = completedTable[i].reqID;
197 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
198 switch(tso->why_blocked) {
201 case BlockedOnDoProc:
202 if (tso->block_info.async_result->reqID == rID) {
203 /* Found the thread blocked waiting on request; stodgily fill
204 * in its result block.
206 tso->block_info.async_result->len = completedTable[i].len;
207 tso->block_info.async_result->errCode = completedTable[i].errCode;
209 /* Drop the matched TSO from blocked_queue */
211 prev->link = tso->link;
213 blocked_queue_hd = tso->link;
215 if (blocked_queue_tl == tso) {
216 blocked_queue_tl = prev;
218 /* Terminates the run queue + this inner for-loop. */
219 tso->link = END_TSO_QUEUE;
220 tso->why_blocked = NotBlocked;
221 PUSH_ON_RUN_QUEUE(tso);
226 if (tso->why_blocked != NotBlocked) {
227 barf("awaitRequests: odd thread state");
235 ResetEvent(completed_req_event);
236 LeaveCriticalSection(&queue_lock);
244 /* the event is auto-reset, but in case there's no thread
245 * already waiting on the event, we want to return it to
246 * a non-signalled state.
248 PulseEvent(abandon_req_wait);