[project @ 2003-02-21 05:34:12 by sof]
[ghc-hetmet.git] / ghc / rts / win32 / AsyncIO.c
1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 #include "Rts.h"
8 #include <windows.h>
9 #include <stdio.h>
10 #include "Schedule.h"
11 #include "win32/AsyncIO.h"
12 #include "win32/IOManager.h"
13
14 /*
15  * Overview:
16  *
17  * Haskell code issue asynchronous I/O requests via the 
18  * asyncRead# and asyncWrite# primops. These cause addIORequest()
19  * to be invoked, which forwards the request to the underlying
20  * asynchronous I/O subsystem. Each request is tagged with a unique
21  * ID.
22  *
23  * addIORequest() returns this ID, so that when the blocked CH
24  * thread is added onto blocked_queue, its TSO is annotated with
25  * it. Upon completion of an I/O request, the async I/O handling
26  * code makes a back-call to signal its completion; the local
27  * onIOComplete() routine. It adds the IO request ID (along with
28  * its result data) to a queue of completed requests before returning. 
29  *
30  * The queue of completed IO request is read by the thread operating
31  * the RTS scheduler. It de-queues the CH threads corresponding
32  * to the request IDs, making them runnable again.
33  *
34  */
35
36 typedef struct CompletedReq {
37   unsigned int   reqID;
38   int            len;
39   int            errCode;
40 } CompletedReq;
41
42 #define MAX_REQUESTS 200
43
44 static CRITICAL_SECTION queue_lock;
45 static HANDLE           completed_req_event;
46 static CompletedReq     completedTable[MAX_REQUESTS];
47 static int              completed_hw;
48 static int              issued_reqs;
49
50 static void
51 onIOComplete(unsigned int reqID,
52              void* param STG_UNUSED,
53              int   fd STG_UNUSED,
54              int   len,
55              char* buf STG_UNUSED,
56              int   errCode)
57 {
58   /* Deposit result of request in queue/table */
59   EnterCriticalSection(&queue_lock);
60   if (completed_hw == MAX_REQUESTS) {
61     /* Not likely */
62     fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
63     fflush(stderr);
64   } else {
65 #if 0
66     fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
67 #endif
68     completedTable[completed_hw].reqID   = reqID;
69     completedTable[completed_hw].len     = len;
70     completedTable[completed_hw].errCode = errCode;
71     completed_hw++;
72     issued_reqs--;
73     if (completed_hw == 1) {
74       /* The event is used to wake up the scheduler thread should it
75        * be blocked waiting for requests to complete. It reset once
76        * that thread has cleared out the request queue/table.
77        */
78       SetEvent(completed_req_event);
79     }
80   }
81   LeaveCriticalSection(&queue_lock);
82 }
83
84 unsigned int
85 addIORequest(int   fd,
86              int   forWriting,
87              int   isSock,
88              int   len,
89              char* buf)
90 {
91   EnterCriticalSection(&queue_lock);
92   issued_reqs++;
93   LeaveCriticalSection(&queue_lock);
94 #if 0
95   fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
96 #endif
97   return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
98 }
99
100 int
101 startupAsyncIO()
102 {
103   if (!StartIOManager()) {
104     return 0;
105   }
106   InitializeCriticalSection(&queue_lock);
107   completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
108   completed_hw = 0;
109   return 1;
110 }
111
112 void
113 shutdownAsyncIO()
114 {
115   CloseHandle(completed_req_event);
116   ShutdownIOManager();
117 }
118
119 int
120 awaitRequests(rtsBool wait)
121 {
122 start:
123 #if 0
124   fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
125 #endif
126   EnterCriticalSection(&queue_lock);
127   /* Nothing immediately available & we won't wait */
128   if ((!wait && completed_hw == 0) || 
129       (issued_reqs == 0 && completed_hw == 0)) {
130     LeaveCriticalSection(&queue_lock);
131     return 0;
132   }
133   if (completed_hw == 0) {
134     /* empty table, drop lock and wait */
135     LeaveCriticalSection(&queue_lock);
136     if (wait) {
137       WaitForSingleObject( completed_req_event, INFINITE );
138     } else {
139       return 0; /* cannot happen */
140     }
141     goto start;
142   } else {
143     int i;
144     StgTSO *tso, *prev;
145     
146     for (i=0; i < completed_hw; i++) {
147       unsigned int rID = completedTable[i].reqID;
148       prev = NULL;
149       for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
150         switch(tso->why_blocked) {
151         case BlockedOnRead:
152         case BlockedOnWrite:
153           if (tso->block_info.async_result->reqID == rID) {
154             /* Found the thread blocked waiting on request; stodgily fill 
155              * in its result block. 
156              */
157             tso->block_info.async_result->len = completedTable[i].len;
158             tso->block_info.async_result->errCode = completedTable[i].errCode;
159
160             /* Drop the matched TSO from blocked_queue */
161             if (prev) {
162               prev->link = tso->link;
163             } else {
164               blocked_queue_hd = tso->link;
165             }
166             if (blocked_queue_tl == tso) {
167               blocked_queue_tl = prev;
168             }
169             /* Terminates the run queue + this inner for-loop. */
170             tso->link = END_TSO_QUEUE;
171             tso->why_blocked = NotBlocked;
172             PUSH_ON_RUN_QUEUE(tso);
173             break;
174           }
175           break;
176         default:
177           break;
178         }
179         prev = tso;
180       }
181     }
182     completed_hw = 0;
183     ResetEvent(completed_req_event);
184     LeaveCriticalSection(&queue_lock);
185     return 1;
186   }
187 }