[project @ 2003-02-22 04:51:50 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include <windows.h>
9 #include <stdio.h>
10 #include "Schedule.h"
11 #include "win32/AsyncIO.h"
12 #include "win32/IOManager.h"
13
14 /*
15  * Overview:
16  *
17  * Haskell code issue asynchronous I/O requests via the 
18  * asyncRead# and asyncWrite# primops. These cause addIORequest()
19  * to be invoked, which forwards the request to the underlying
20  * asynchronous I/O subsystem. Each request is tagged with a unique
21  * ID.
22  *
23  * addIORequest() returns this ID, so that when the blocked CH
24  * thread is added onto blocked_queue, its TSO is annotated with
25  * it. Upon completion of an I/O request, the async I/O handling
26  * code makes a back-call to signal its completion; the local
27  * onIOComplete() routine. It adds the IO request ID (along with
28  * its result data) to a queue of completed requests before returning. 
29  *
30  * The queue of completed IO request is read by the thread operating
31  * the RTS scheduler. It de-queues the CH threads corresponding
32  * to the request IDs, making them runnable again.
33  *
34  */
35
36 typedef struct CompletedReq {
37   unsigned int   reqID;
38   int            len;
39   int            errCode;
40 } CompletedReq;
41
42 #define MAX_REQUESTS 200
43
44 static CRITICAL_SECTION queue_lock;
45 static HANDLE           completed_req_event;
46 static HANDLE           abandon_req_wait;
47 static HANDLE           wait_handles[2];
48 static CompletedReq     completedTable[MAX_REQUESTS];
49 static int              completed_hw;
50 static int              issued_reqs;
51
52 static void
53 onIOComplete(unsigned int reqID,
54              void* param STG_UNUSED,
55              int   fd STG_UNUSED,
56              int   len,
57              char* buf STG_UNUSED,
58              int   errCode)
59 {
60   /* Deposit result of request in queue/table */
61   EnterCriticalSection(&queue_lock);
62   if (completed_hw == MAX_REQUESTS) {
63     /* Not likely */
64     fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
65     fflush(stderr);
66   } else {
67 #if 0
68     fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
69 #endif
70     completedTable[completed_hw].reqID   = reqID;
71     completedTable[completed_hw].len     = len;
72     completedTable[completed_hw].errCode = errCode;
73     completed_hw++;
74     issued_reqs--;
75     if (completed_hw == 1) {
76       /* The event is used to wake up the scheduler thread should it
77        * be blocked waiting for requests to complete. It reset once
78        * that thread has cleared out the request queue/table.
79        */
80       SetEvent(completed_req_event);
81     }
82   }
83   LeaveCriticalSection(&queue_lock);
84 }
85
86 unsigned int
87 addIORequest(int   fd,
88              int   forWriting,
89              int   isSock,
90              int   len,
91              char* buf)
92 {
93   EnterCriticalSection(&queue_lock);
94   issued_reqs++;
95   LeaveCriticalSection(&queue_lock);
96 #if 0
97   fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
98 #endif
99   return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
100 }
101
102 unsigned int
103 addDelayRequest(int   msecs)
104 {
105   EnterCriticalSection(&queue_lock);
106   issued_reqs++;
107   LeaveCriticalSection(&queue_lock);
108 #if 0
109   fprintf(stderr, "addDelayReq: %d %d %d\n", msecs); fflush(stderr);
110 #endif
111   return AddDelayRequest(msecs,0,onIOComplete);
112 }
113
114 int
115 startupAsyncIO()
116 {
117   if (!StartIOManager()) {
118     return 0;
119   }
120   InitializeCriticalSection(&queue_lock);
121   /* Create a pair of events:
122    *
123    *    - completed_req_event  -- signals the deposit of request result; manual reset.
124    *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
125    *                              thread to abandon wait for IO request completion.
126    *                              Auto reset.
127    */
128   completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
129   abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
130   wait_handles[0] = completed_req_event;
131   wait_handles[1] = abandon_req_wait;
132   completed_hw = 0;
133   return ( completed_req_event != INVALID_HANDLE_VALUE &&
134            abandon_req_wait    != INVALID_HANDLE_VALUE );
135 }
136
137 void
138 shutdownAsyncIO()
139 {
140   CloseHandle(completed_req_event);
141   ShutdownIOManager();
142 }
143
144 int
145 awaitRequests(rtsBool wait)
146 {
147 start:
148 #if 0
149   fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
150 #endif
151   EnterCriticalSection(&queue_lock);
152   /* Nothing immediately available & we won't wait */
153   if ((!wait && completed_hw == 0) || 
154       (issued_reqs == 0 && completed_hw == 0)) {
155     LeaveCriticalSection(&queue_lock);
156     return 0;
157   }
158   if (completed_hw == 0) {
159     /* empty table, drop lock and wait */
160     LeaveCriticalSection(&queue_lock);
161     if (wait) {
162       DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
163       switch (dwRes) {
164       case WAIT_OBJECT_0:
165         break;
166       case WAIT_OBJECT_0 + 1:
167       case WAIT_TIMEOUT:
168         return 0;
169       default:
170         fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
171         return 0;
172       }
173     } else {
174       return 0; /* cannot happen */
175     }
176     goto start;
177   } else {
178     int i;
179     StgTSO *tso, *prev;
180     
181     for (i=0; i < completed_hw; i++) {
182       unsigned int rID = completedTable[i].reqID;
183       prev = NULL;
184       for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
185         switch(tso->why_blocked) {
186         case BlockedOnDelay:
187         case BlockedOnRead:
188         case BlockedOnWrite:
189           if (tso->block_info.async_result->reqID == rID) {
190             /* Found the thread blocked waiting on request; stodgily fill 
191              * in its result block. 
192              */
193             if (tso->why_blocked != BlockedOnDelay) {
194               tso->block_info.async_result->len = completedTable[i].len;
195               tso->block_info.async_result->errCode = completedTable[i].errCode;
196             }
197
198             /* Drop the matched TSO from blocked_queue */
199             if (prev) {
200               prev->link = tso->link;
201             } else {
202               blocked_queue_hd = tso->link;
203             }
204             if (blocked_queue_tl == tso) {
205               blocked_queue_tl = prev;
206             }
207             /* Terminates the run queue + this inner for-loop. */
208             tso->link = END_TSO_QUEUE;
209             tso->why_blocked = NotBlocked;
210             PUSH_ON_RUN_QUEUE(tso);
211             break;
212           }
213           break;
214         default:
215           break;
216         }
217         prev = tso;
218       }
219     }
220     completed_hw = 0;
221     ResetEvent(completed_req_event);
222     LeaveCriticalSection(&queue_lock);
223     return 1;
224   }
225 }
226
227 void
228 abandonRequestWait()
229 {
230   /* the event is auto-reset, but in case there's no thread
231    * already waiting on the event, we want to return it to
232    * a non-signalled state.
233    */
234   PulseEvent(abandon_req_wait);
235 }