3 * Integrating Win32 asynchronous I/O with the GHC RTS.
11 #include "win32/AsyncIO.h"
12 #include "win32/IOManager.h"
17 * Haskell code issue asynchronous I/O requests via the
18 * asyncRead# and asyncWrite# primops. These cause addIORequest()
19 * to be invoked, which forwards the request to the underlying
20 * asynchronous I/O subsystem. Each request is tagged with a unique
23 * addIORequest() returns this ID, so that when the blocked CH
24 * thread is added onto blocked_queue, its TSO is annotated with
25 * it. Upon completion of an I/O request, the async I/O handling
26 * code makes a back-call to signal its completion; the local
27 * onIOComplete() routine. It adds the IO request ID (along with
28 * its result data) to a queue of completed requests before returning.
30 * The queue of completed IO request is read by the thread operating
31 * the RTS scheduler. It de-queues the CH threads corresponding
32 * to the request IDs, making them runnable again.
36 typedef struct CompletedReq {
42 #define MAX_REQUESTS 200
44 static CRITICAL_SECTION queue_lock;
45 static HANDLE completed_req_event;
46 static CompletedReq completedTable[MAX_REQUESTS];
47 static int completed_hw;
48 static int issued_reqs;
51 onIOComplete(unsigned int reqID,
52 void* param STG_UNUSED,
58 /* Deposit result of request in queue/table */
59 EnterCriticalSection(&queue_lock);
60 if (completed_hw == MAX_REQUESTS) {
62 fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
66 fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
68 completedTable[completed_hw].reqID = reqID;
69 completedTable[completed_hw].len = len;
70 completedTable[completed_hw].errCode = errCode;
73 if (completed_hw == 1) {
74 /* The event is used to wake up the scheduler thread should it
75 * be blocked waiting for requests to complete. It reset once
76 * that thread has cleared out the request queue/table.
78 SetEvent(completed_req_event);
81 LeaveCriticalSection(&queue_lock);
91 EnterCriticalSection(&queue_lock);
93 LeaveCriticalSection(&queue_lock);
95 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
97 return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
103 if (!StartIOManager()) {
106 InitializeCriticalSection(&queue_lock);
107 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
115 CloseHandle(completed_req_event);
120 awaitRequests(rtsBool wait)
124 fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
126 EnterCriticalSection(&queue_lock);
127 /* Nothing immediately available & we won't wait */
128 if ((!wait && completed_hw == 0) ||
129 (issued_reqs == 0 && completed_hw == 0)) {
130 LeaveCriticalSection(&queue_lock);
133 if (completed_hw == 0) {
134 /* empty table, drop lock and wait */
135 LeaveCriticalSection(&queue_lock);
137 WaitForSingleObject( completed_req_event, INFINITE );
139 return 0; /* cannot happen */
146 for (i=0; i < completed_hw; i++) {
147 unsigned int rID = completedTable[i].reqID;
149 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
150 switch(tso->why_blocked) {
153 if (tso->block_info.async_result->reqID == rID) {
154 /* Found the thread blocked waiting on request; stodgily fill
155 * in its result block.
157 tso->block_info.async_result->len = completedTable[i].len;
158 tso->block_info.async_result->errCode = completedTable[i].errCode;
160 /* Drop the matched TSO from blocked_queue */
162 prev->link = tso->link;
164 blocked_queue_hd = tso->link;
166 if (blocked_queue_tl == tso) {
167 blocked_queue_tl = prev;
169 /* Terminates the run queue + this inner for-loop. */
170 tso->link = END_TSO_QUEUE;
171 tso->why_blocked = NotBlocked;
172 PUSH_ON_RUN_QUEUE(tso);
183 ResetEvent(completed_req_event);
184 LeaveCriticalSection(&queue_lock);