[project @ 2003-07-03 15:14:56 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include <windows.h>
9 #include <stdio.h>
10 #include "Schedule.h"
11 #include "win32/AsyncIO.h"
12 #include "win32/IOManager.h"
13
14 /*
15  * Overview:
16  *
17  * Haskell code issue asynchronous I/O requests via the 
18  * asyncRead# and asyncWrite# primops. These cause addIORequest()
19  * to be invoked, which forwards the request to the underlying
20  * asynchronous I/O subsystem. Each request is tagged with a unique
21  * ID.
22  *
23  * addIORequest() returns this ID, so that when the blocked CH
24  * thread is added onto blocked_queue, its TSO is annotated with
25  * it. Upon completion of an I/O request, the async I/O handling
26  * code makes a back-call to signal its completion; the local
27  * onIOComplete() routine. It adds the IO request ID (along with
28  * its result data) to a queue of completed requests before returning. 
29  *
30  * The queue of completed IO request is read by the thread operating
31  * the RTS scheduler. It de-queues the CH threads corresponding
32  * to the request IDs, making them runnable again.
33  *
34  */
35
36 typedef struct CompletedReq {
37   unsigned int   reqID;
38   int            len;
39   int            errCode;
40 } CompletedReq;
41
42 #define MAX_REQUESTS 200
43
44 static CRITICAL_SECTION queue_lock;
45 static HANDLE           completed_req_event;
46 static HANDLE           abandon_req_wait;
47 static HANDLE           wait_handles[2];
48 static CompletedReq     completedTable[MAX_REQUESTS];
49 static int              completed_hw;
50 static int              issued_reqs;
51
52 static void
53 onIOComplete(unsigned int reqID,
54              int   fd STG_UNUSED,
55              int   len,
56              void* buf STG_UNUSED,
57              int   errCode)
58 {
59   /* Deposit result of request in queue/table */
60   EnterCriticalSection(&queue_lock);
61   if (completed_hw == MAX_REQUESTS) {
62     /* Not likely */
63     fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
64     fflush(stderr);
65   } else {
66 #if 0
67     fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
68 #endif
69     completedTable[completed_hw].reqID   = reqID;
70     completedTable[completed_hw].len     = len;
71     completedTable[completed_hw].errCode = errCode;
72     completed_hw++;
73     issued_reqs--;
74     if (completed_hw == 1) {
75       /* The event is used to wake up the scheduler thread should it
76        * be blocked waiting for requests to complete. It reset once
77        * that thread has cleared out the request queue/table.
78        */
79       SetEvent(completed_req_event);
80     }
81   }
82   LeaveCriticalSection(&queue_lock);
83 }
84
85 unsigned int
86 addIORequest(int   fd,
87              int   forWriting,
88              int   isSock,
89              int   len,
90              char* buf)
91 {
92   EnterCriticalSection(&queue_lock);
93   issued_reqs++;
94   LeaveCriticalSection(&queue_lock);
95 #if 0
96   fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
97 #endif
98   return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
99 }
100
101 unsigned int
102 addDelayRequest(int msecs)
103 {
104   EnterCriticalSection(&queue_lock);
105   issued_reqs++;
106   LeaveCriticalSection(&queue_lock);
107 #if 0
108   fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
109 #endif
110   return AddDelayRequest(msecs,onIOComplete);
111 }
112
113 unsigned int
114 addDoProcRequest(void* proc, void* param)
115 {
116   EnterCriticalSection(&queue_lock);
117   issued_reqs++;
118   LeaveCriticalSection(&queue_lock);
119 #if 0
120   fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
121 #endif
122   return AddProcRequest(proc,param,onIOComplete);
123 }
124
125
126 int
127 startupAsyncIO()
128 {
129   if (!StartIOManager()) {
130     return 0;
131   }
132   InitializeCriticalSection(&queue_lock);
133   /* Create a pair of events:
134    *
135    *    - completed_req_event  -- signals the deposit of request result; manual reset.
136    *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
137    *                              thread to abandon wait for IO request completion.
138    *                              Auto reset.
139    */
140   completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
141   abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
142   wait_handles[0] = completed_req_event;
143   wait_handles[1] = abandon_req_wait;
144   completed_hw = 0;
145   return ( completed_req_event != INVALID_HANDLE_VALUE &&
146            abandon_req_wait    != INVALID_HANDLE_VALUE );
147 }
148
149 void
150 shutdownAsyncIO()
151 {
152   CloseHandle(completed_req_event);
153   ShutdownIOManager();
154 }
155
156 int
157 awaitRequests(rtsBool wait)
158 {
159 start:
160 #if 0
161   fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
162 #endif
163   EnterCriticalSection(&queue_lock);
164   /* Nothing immediately available & we won't wait */
165   if ((!wait && completed_hw == 0) || 
166       (issued_reqs == 0 && completed_hw == 0)) {
167     LeaveCriticalSection(&queue_lock);
168     return 0;
169   }
170   if (completed_hw == 0) {
171     /* empty table, drop lock and wait */
172     LeaveCriticalSection(&queue_lock);
173     if (wait) {
174       DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
175       switch (dwRes) {
176       case WAIT_OBJECT_0:
177         break;
178       case WAIT_OBJECT_0 + 1:
179       case WAIT_TIMEOUT:
180         return 0;
181       default:
182         fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
183         return 0;
184       }
185     } else {
186       return 0; /* cannot happen */
187     }
188     goto start;
189   } else {
190     int i;
191     StgTSO *tso, *prev;
192     
193     for (i=0; i < completed_hw; i++) {
194       unsigned int rID = completedTable[i].reqID;
195       prev = NULL;
196       for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
197         switch(tso->why_blocked) {
198         case BlockedOnDelay:
199         case BlockedOnRead:
200         case BlockedOnWrite:
201         case BlockedOnDoProc:
202           if (tso->block_info.async_result->reqID == rID) {
203             /* Found the thread blocked waiting on request; stodgily fill 
204              * in its result block. 
205              */
206             if (tso->why_blocked != BlockedOnDelay) {
207               tso->block_info.async_result->len = completedTable[i].len;
208               tso->block_info.async_result->errCode = completedTable[i].errCode;
209             }
210
211             /* Drop the matched TSO from blocked_queue */
212             if (prev) {
213               prev->link = tso->link;
214             } else {
215               blocked_queue_hd = tso->link;
216             }
217             if (blocked_queue_tl == tso) {
218               blocked_queue_tl = prev;
219             }
220             /* Terminates the run queue + this inner for-loop. */
221             tso->link = END_TSO_QUEUE;
222             tso->why_blocked = NotBlocked;
223             PUSH_ON_RUN_QUEUE(tso);
224             break;
225           }
226           break;
227         default:
228           break;
229         }
230         prev = tso;
231       }
232     }
233     completed_hw = 0;
234     ResetEvent(completed_req_event);
235     LeaveCriticalSection(&queue_lock);
236     return 1;
237   }
238 }
239
240 void
241 abandonRequestWait()
242 {
243   /* the event is auto-reset, but in case there's no thread
244    * already waiting on the event, we want to return it to
245    * a non-signalled state.
246    */
247   PulseEvent(abandon_req_wait);
248 }