3 * Integrating Win32 asynchronous I/O with the GHC RTS.
11 #include "win32/AsyncIO.h"
12 #include "win32/IOManager.h"
17 * Haskell code issue asynchronous I/O requests via the
18 * asyncRead# and asyncWrite# primops. These cause addIORequest()
19 * to be invoked, which forwards the request to the underlying
20 * asynchronous I/O subsystem. Each request is tagged with a unique
23 * addIORequest() returns this ID, so that when the blocked CH
24 * thread is added onto blocked_queue, its TSO is annotated with
25 * it. Upon completion of an I/O request, the async I/O handling
26 * code makes a back-call to signal its completion; the local
27 * onIOComplete() routine. It adds the IO request ID (along with
28 * its result data) to a queue of completed requests before returning.
30 * The queue of completed IO request is read by the thread operating
31 * the RTS scheduler. It de-queues the CH threads corresponding
32 * to the request IDs, making them runnable again.
36 typedef struct CompletedReq {
42 #define MAX_REQUESTS 200
44 static CRITICAL_SECTION queue_lock;
45 static HANDLE completed_req_event;
46 static HANDLE abandon_req_wait;
47 static HANDLE wait_handles[2];
48 static CompletedReq completedTable[MAX_REQUESTS];
49 static int completed_hw;
50 static int issued_reqs;
53 onIOComplete(unsigned int reqID,
59 /* Deposit result of request in queue/table */
60 EnterCriticalSection(&queue_lock);
61 if (completed_hw == MAX_REQUESTS) {
63 fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
67 fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
69 completedTable[completed_hw].reqID = reqID;
70 completedTable[completed_hw].len = len;
71 completedTable[completed_hw].errCode = errCode;
74 if (completed_hw == 1) {
75 /* The event is used to wake up the scheduler thread should it
76 * be blocked waiting for requests to complete. It reset once
77 * that thread has cleared out the request queue/table.
79 SetEvent(completed_req_event);
82 LeaveCriticalSection(&queue_lock);
92 EnterCriticalSection(&queue_lock);
94 LeaveCriticalSection(&queue_lock);
96 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
98 return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
102 addDelayRequest(int msecs)
104 EnterCriticalSection(&queue_lock);
106 LeaveCriticalSection(&queue_lock);
108 fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
110 return AddDelayRequest(msecs,onIOComplete);
114 addDoProcRequest(void* proc, void* param)
116 EnterCriticalSection(&queue_lock);
118 LeaveCriticalSection(&queue_lock);
120 fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
122 return AddProcRequest(proc,param,onIOComplete);
129 if (!StartIOManager()) {
132 InitializeCriticalSection(&queue_lock);
133 /* Create a pair of events:
135 * - completed_req_event -- signals the deposit of request result; manual reset.
136 * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
137 * thread to abandon wait for IO request completion.
140 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
141 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
142 wait_handles[0] = completed_req_event;
143 wait_handles[1] = abandon_req_wait;
145 return ( completed_req_event != INVALID_HANDLE_VALUE &&
146 abandon_req_wait != INVALID_HANDLE_VALUE );
152 CloseHandle(completed_req_event);
157 awaitRequests(rtsBool wait)
161 fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
163 EnterCriticalSection(&queue_lock);
164 /* Nothing immediately available & we won't wait */
165 if ((!wait && completed_hw == 0) ||
166 (issued_reqs == 0 && completed_hw == 0)) {
167 LeaveCriticalSection(&queue_lock);
170 if (completed_hw == 0) {
171 /* empty table, drop lock and wait */
172 LeaveCriticalSection(&queue_lock);
174 DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
178 case WAIT_OBJECT_0 + 1:
182 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
186 return 0; /* cannot happen */
193 for (i=0; i < completed_hw; i++) {
194 unsigned int rID = completedTable[i].reqID;
196 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
197 switch(tso->why_blocked) {
201 case BlockedOnDoProc:
202 if (tso->block_info.async_result->reqID == rID) {
203 /* Found the thread blocked waiting on request; stodgily fill
204 * in its result block.
206 if (tso->why_blocked != BlockedOnDelay) {
207 tso->block_info.async_result->len = completedTable[i].len;
208 tso->block_info.async_result->errCode = completedTable[i].errCode;
211 /* Drop the matched TSO from blocked_queue */
213 prev->link = tso->link;
215 blocked_queue_hd = tso->link;
217 if (blocked_queue_tl == tso) {
218 blocked_queue_tl = prev;
220 /* Terminates the run queue + this inner for-loop. */
221 tso->link = END_TSO_QUEUE;
222 tso->why_blocked = NotBlocked;
223 PUSH_ON_RUN_QUEUE(tso);
234 ResetEvent(completed_req_event);
235 LeaveCriticalSection(&queue_lock);
243 /* the event is auto-reset, but in case there's no thread
244 * already waiting on the event, we want to return it to
245 * a non-signalled state.
247 PulseEvent(abandon_req_wait);