[project @ 2003-07-16 17:40:38 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include "RtsUtils.h"
9 #include <windows.h>
10 #include <stdio.h>
11 #include "Schedule.h"
12 #include "win32/AsyncIO.h"
13 #include "win32/IOManager.h"
14
15 /*
16  * Overview:
17  *
18  * Haskell code issue asynchronous I/O requests via the 
19  * asyncRead# and asyncWrite# primops. These cause addIORequest()
20  * to be invoked, which forwards the request to the underlying
21  * asynchronous I/O subsystem. Each request is tagged with a unique
22  * ID.
23  *
24  * addIORequest() returns this ID, so that when the blocked CH
25  * thread is added onto blocked_queue, its TSO is annotated with
26  * it. Upon completion of an I/O request, the async I/O handling
27  * code makes a back-call to signal its completion; the local
28  * onIOComplete() routine. It adds the IO request ID (along with
29  * its result data) to a queue of completed requests before returning. 
30  *
31  * The queue of completed IO request is read by the thread operating
32  * the RTS scheduler. It de-queues the CH threads corresponding
33  * to the request IDs, making them runnable again.
34  *
35  */
36
37 typedef struct CompletedReq {
38   unsigned int   reqID;
39   int            len;
40   int            errCode;
41 } CompletedReq;
42
43 #define MAX_REQUESTS 200
44
45 static CRITICAL_SECTION queue_lock;
46 static HANDLE           completed_req_event;
47 static HANDLE           abandon_req_wait;
48 static HANDLE           wait_handles[2];
49 static CompletedReq     completedTable[MAX_REQUESTS];
50 static int              completed_hw;
51 static int              issued_reqs;
52
53 static void
54 onIOComplete(unsigned int reqID,
55              int   fd STG_UNUSED,
56              int   len,
57              void* buf STG_UNUSED,
58              int   errCode)
59 {
60   /* Deposit result of request in queue/table */
61   EnterCriticalSection(&queue_lock);
62   if (completed_hw == MAX_REQUESTS) {
63     /* Not likely */
64     fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
65     fflush(stderr);
66   } else {
67 #if 0
68     fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
69 #endif
70     completedTable[completed_hw].reqID   = reqID;
71     completedTable[completed_hw].len     = len;
72     completedTable[completed_hw].errCode = errCode;
73     completed_hw++;
74     issued_reqs--;
75     if (completed_hw == 1) {
76       /* The event is used to wake up the scheduler thread should it
77        * be blocked waiting for requests to complete. It reset once
78        * that thread has cleared out the request queue/table.
79        */
80       SetEvent(completed_req_event);
81     }
82   }
83   LeaveCriticalSection(&queue_lock);
84 }
85
86 unsigned int
87 addIORequest(int   fd,
88              int   forWriting,
89              int   isSock,
90              int   len,
91              char* buf)
92 {
93   EnterCriticalSection(&queue_lock);
94   issued_reqs++;
95   LeaveCriticalSection(&queue_lock);
96 #if 0
97   fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
98 #endif
99   return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
100 }
101
102 unsigned int
103 addDelayRequest(int msecs)
104 {
105   EnterCriticalSection(&queue_lock);
106   issued_reqs++;
107   LeaveCriticalSection(&queue_lock);
108 #if 0
109   fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
110 #endif
111   return AddDelayRequest(msecs,onIOComplete);
112 }
113
114 unsigned int
115 addDoProcRequest(void* proc, void* param)
116 {
117   EnterCriticalSection(&queue_lock);
118   issued_reqs++;
119   LeaveCriticalSection(&queue_lock);
120 #if 0
121   fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
122 #endif
123   return AddProcRequest(proc,param,onIOComplete);
124 }
125
126
127 int
128 startupAsyncIO()
129 {
130   if (!StartIOManager()) {
131     return 0;
132   }
133   InitializeCriticalSection(&queue_lock);
134   /* Create a pair of events:
135    *
136    *    - completed_req_event  -- signals the deposit of request result; manual reset.
137    *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
138    *                              thread to abandon wait for IO request completion.
139    *                              Auto reset.
140    */
141   completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
142   abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
143   wait_handles[0] = completed_req_event;
144   wait_handles[1] = abandon_req_wait;
145   completed_hw = 0;
146   return ( completed_req_event != INVALID_HANDLE_VALUE &&
147            abandon_req_wait    != INVALID_HANDLE_VALUE );
148 }
149
150 void
151 shutdownAsyncIO()
152 {
153   CloseHandle(completed_req_event);
154   ShutdownIOManager();
155 }
156
157 int
158 awaitRequests(rtsBool wait)
159 {
160 start:
161 #if 0
162   fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
163 #endif
164   EnterCriticalSection(&queue_lock);
165   /* Nothing immediately available & we won't wait */
166   if ((!wait && completed_hw == 0) || 
167       (issued_reqs == 0 && completed_hw == 0)) {
168     LeaveCriticalSection(&queue_lock);
169     return 0;
170   }
171   if (completed_hw == 0) {
172     /* empty table, drop lock and wait */
173     LeaveCriticalSection(&queue_lock);
174     if (wait) {
175       DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
176       switch (dwRes) {
177       case WAIT_OBJECT_0:
178         break;
179       case WAIT_OBJECT_0 + 1:
180       case WAIT_TIMEOUT:
181         return 0;
182       default:
183         fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
184         return 0;
185       }
186     } else {
187       return 0; /* cannot happen */
188     }
189     goto start;
190   } else {
191     int i;
192     StgTSO *tso, *prev;
193     
194     for (i=0; i < completed_hw; i++) {
195       unsigned int rID = completedTable[i].reqID;
196       prev = NULL;
197       for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
198         switch(tso->why_blocked) {
199         case BlockedOnRead:
200         case BlockedOnWrite:
201         case BlockedOnDoProc:
202           if (tso->block_info.async_result->reqID == rID) {
203             /* Found the thread blocked waiting on request; stodgily fill 
204              * in its result block. 
205              */
206             tso->block_info.async_result->len = completedTable[i].len;
207             tso->block_info.async_result->errCode = completedTable[i].errCode;
208
209             /* Drop the matched TSO from blocked_queue */
210             if (prev) {
211               prev->link = tso->link;
212             } else {
213               blocked_queue_hd = tso->link;
214             }
215             if (blocked_queue_tl == tso) {
216               blocked_queue_tl = prev;
217             }
218             /* Terminates the run queue + this inner for-loop. */
219             tso->link = END_TSO_QUEUE;
220             tso->why_blocked = NotBlocked;
221             PUSH_ON_RUN_QUEUE(tso);
222             break;
223           }
224           break;
225         default:
226           if (tso->why_blocked != NotBlocked) {
227               barf("awaitRequests: odd thread state");
228           }
229           break;
230         }
231         prev = tso;
232       }
233     }
234     completed_hw = 0;
235     ResetEvent(completed_req_event);
236     LeaveCriticalSection(&queue_lock);
237     return 1;
238   }
239 }
240
241 void
242 abandonRequestWait()
243 {
244   /* the event is auto-reset, but in case there's no thread
245    * already waiting on the event, we want to return it to
246    * a non-signalled state.
247    */
248   PulseEvent(abandon_req_wait);
249 }