3 * Integrating Win32 asynchronous I/O with the GHC RTS.
13 #include "Capability.h"
14 #include "win32/AsyncIO.h"
15 #include "win32/IOManager.h"
20 * Haskell code issue asynchronous I/O requests via the
21 * async{Read,Write,DoOp}# primops. These cause addIORequest()
22 * to be invoked, which forwards the request to the underlying
23 * asynchronous I/O subsystem. Each request is tagged with a unique
26 * addIORequest() returns this ID, so that when the blocked CH
27 * thread is added onto blocked_queue, its TSO is annotated with
28 * it. Upon completion of an I/O request, the async I/O handling
29 * code makes a back-call to signal its completion; the local
30 * onIOComplete() routine. It adds the IO request ID (along with
31 * its result data) to a queue of completed requests before returning.
33 * The queue of completed IO request is read by the thread operating
34 * the RTS scheduler. It de-queues the CH threads corresponding
35 * to the request IDs, making them runnable again.
39 typedef struct CompletedReq {
45 #define MAX_REQUESTS 200
47 static CRITICAL_SECTION queue_lock;
48 static HANDLE completed_req_event;
49 static HANDLE abandon_req_wait;
50 static HANDLE wait_handles[2];
51 static CompletedReq completedTable[MAX_REQUESTS];
52 static int completed_hw;
53 static HANDLE completed_table_sema;
54 static int issued_reqs;
57 onIOComplete(unsigned int reqID,
64 /* Deposit result of request in queue/table..when there's room. */
65 dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
71 fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
75 EnterCriticalSection(&queue_lock);
76 if (completed_hw == MAX_REQUESTS) {
77 /* Shouldn't happen */
78 fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
82 fprintf(stderr, "onCompl: %d %d %d %d %d\n",
83 reqID, len, errCode, issued_reqs, completed_hw);
86 completedTable[completed_hw].reqID = reqID;
87 completedTable[completed_hw].len = len;
88 completedTable[completed_hw].errCode = errCode;
91 if (completed_hw == 1) {
92 /* The event is used to wake up the scheduler thread should it
93 * be blocked waiting for requests to complete. The event resets once
94 * that thread has cleared out the request queue/table.
96 SetEvent(completed_req_event);
99 LeaveCriticalSection(&queue_lock);
109 EnterCriticalSection(&queue_lock);
111 LeaveCriticalSection(&queue_lock);
113 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
115 return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
119 addDelayRequest(int msecs)
121 EnterCriticalSection(&queue_lock);
123 LeaveCriticalSection(&queue_lock);
125 fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
127 return AddDelayRequest(msecs,onIOComplete);
131 addDoProcRequest(void* proc, void* param)
133 EnterCriticalSection(&queue_lock);
135 LeaveCriticalSection(&queue_lock);
137 fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
139 return AddProcRequest(proc,param,onIOComplete);
146 if (!StartIOManager()) {
149 InitializeCriticalSection(&queue_lock);
150 /* Create a pair of events:
152 * - completed_req_event -- signals the deposit of request result; manual reset.
153 * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
154 * thread to abandon wait for IO request completion.
157 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
158 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
159 wait_handles[0] = completed_req_event;
160 wait_handles[1] = abandon_req_wait;
162 if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
163 DWORD rc = GetLastError();
164 fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", rc);
168 return ( completed_req_event != INVALID_HANDLE_VALUE &&
169 abandon_req_wait != INVALID_HANDLE_VALUE &&
170 completed_table_sema != NULL );
176 CloseHandle(completed_req_event);
181 * Function: awaitRequests(wait)
183 * Check for the completion of external IO work requests. Worker
184 * threads signal completion of IO requests by depositing them
185 * in a table (completedTable). awaitRequests() matches up
186 * requests in that table with threads on the blocked_queue,
187 * making the threads whose IO requests have completed runnable
190 * awaitRequests() is called by the scheduler periodically _or_ if
191 * it is out of work, and need to wait for the completion of IO
192 * requests to make further progress. In the latter scenario,
193 * awaitRequests() will simply block waiting for worker threads
194 * to complete if the 'completedTable' is empty.
197 awaitRequests(rtsBool wait)
200 // none of this is actually used in the threaded RTS
204 fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
207 EnterCriticalSection(&queue_lock);
208 /* Nothing immediately available & we won't wait */
209 if ((!wait && completed_hw == 0)
211 // If we just return when wait==rtsFalse, we'll go into a busy
212 // wait loop, so I disabled this condition --SDM 18/12/2003
213 (issued_reqs == 0 && completed_hw == 0)
216 LeaveCriticalSection(&queue_lock);
219 if (completed_hw == 0) {
220 /* empty table, drop lock and wait */
221 LeaveCriticalSection(&queue_lock);
222 if ( wait && sched_state == SCHED_RUNNING ) {
223 DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
226 /* a request was completed */
228 case WAIT_OBJECT_0 + 1:
230 /* timeout (unlikely) or told to abandon waiting */
233 DWORD dw = GetLastError();
234 fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
238 fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
249 for (i=0; i < completed_hw; i++) {
250 /* For each of the completed requests, match up their Ids
251 * with those of the threads on the blocked_queue. If the
252 * thread that made the IO request has been subsequently
253 * killed (and removed from blocked_queue), no match will
254 * be found for that request Id.
256 * i.e., killing a Haskell thread doesn't attempt to cancel
257 * the IO request it is blocked on.
260 unsigned int rID = completedTable[i].reqID;
263 for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
265 switch(tso->why_blocked) {
268 case BlockedOnDoProc:
269 if (tso->block_info.async_result->reqID == rID) {
270 /* Found the thread blocked waiting on request; stodgily fill
271 * in its result block.
273 tso->block_info.async_result->len = completedTable[i].len;
274 tso->block_info.async_result->errCode = completedTable[i].errCode;
276 /* Drop the matched TSO from blocked_queue */
278 prev->link = tso->link;
280 blocked_queue_hd = tso->link;
282 if (blocked_queue_tl == tso) {
283 blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
286 /* Terminates the run queue + this inner for-loop. */
287 tso->link = END_TSO_QUEUE;
288 tso->why_blocked = NotBlocked;
289 pushOnRunQueue(&MainCapability, tso);
294 if (tso->why_blocked != NotBlocked) {
295 barf("awaitRequests: odd thread state");
300 /* Signal that there's completed table slots available */
301 if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
302 DWORD dw = GetLastError();
303 fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", dw);
308 ResetEvent(completed_req_event);
309 LeaveCriticalSection(&queue_lock);
312 #endif /* !THREADED_RTS */
316 * Function: abandonRequestWait()
318 * Wake up a thread that's blocked waiting for new IO requests
319 * to complete (via awaitRequests().)
322 abandonRequestWait( void )
324 /* the event is auto-reset, but in case there's no thread
325 * already waiting on the event, we want to return it to
326 * a non-signalled state.
328 * Careful! There is no synchronisation between
329 * abandonRequestWait and awaitRequest, which means that
330 * abandonRequestWait might be called just before a thread
331 * goes into a wait, and we miss the abandon signal. So we
332 * must SetEvent() here rather than PulseEvent() to ensure
333 * that the event isn't lost. We can re-optimise by resetting
334 * the event somewhere safe if we know the event has been
335 * properly serviced (see resetAbandon() below). --SDM 18/12/2003
337 SetEvent(abandon_req_wait);
341 resetAbandonRequestWait( void )
343 ResetEvent(abandon_req_wait);