[project @ 2000-08-23 12:51:03 by simonmar]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.13 2000/08/23 12:51:03 simonmar Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18 #include "Signals.h"
19
20 # if defined(HAVE_SYS_TYPES_H)
21 #  include <sys/types.h>
22 # endif
23
24 # ifdef HAVE_SYS_TIME_H
25 #  include <sys/time.h>
26 # endif
27
28 nat ticks_since_select = 0;
29
30 /* Argument 'wait' says whether to wait for I/O to become available,
31  * or whether to just check and return immediately.  If there are
32  * other threads ready to run, we normally do the non-waiting variety,
33  * otherwise we wait (see Schedule.c).
34  *
35  * SMP note: must be called with sched_mutex locked.
36  */
37 void
38 awaitEvent(rtsBool wait)
39 {
40 #ifdef mingw32_TARGET_OS
41 /*
42  * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
43  * to achieve (similar) effect.
44  *
45  */
46     return;
47 #else
48
49     StgTSO *tso, *prev, *next;
50     rtsBool ready;
51     fd_set rfd,wfd;
52     int numFound;
53     nat min, delta;
54     int maxfd = -1;
55     rtsBool select_succeeded = rtsTrue;
56    
57     struct timeval tv;
58 #ifndef linux_TARGET_OS
59     struct timeval tv_before,tv_after;
60 #endif
61
62     IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));
63
64     /* loop until we've woken up some threads.  This loop is needed
65      * because the select timing isn't accurate, we sometimes sleep
66      * for a while but not long enough to wake up a thread in
67      * a threadDelay.
68      */
69     do {
70
71       /* see how long it's been since we last checked the blocked queue.
72        * ToDo: make this check atomic, so we don't lose any ticks.
73        */
74       delta = ticks_since_select;
75       ticks_since_select = 0;
76       delta = delta * TICK_MILLISECS * 1000;
77
78       min = wait == rtsTrue ? 0x7fffffff : 0;
79
80       /* 
81        * Collect all of the fd's that we're interested in, and capture
82        * the minimum waiting time (in microseconds) for the delayed threads.
83        */
84       FD_ZERO(&rfd);
85       FD_ZERO(&wfd);
86
87       for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
88         next = tso->link;
89
90         switch (tso->why_blocked) {
91         case BlockedOnRead:
92           { 
93             int fd = tso->block_info.fd;
94             maxfd = (fd > maxfd) ? fd : maxfd;
95             FD_SET(fd, &rfd);
96             continue;
97           }
98
99         case BlockedOnWrite:
100           { 
101             int fd = tso->block_info.fd;
102             maxfd = (fd > maxfd) ? fd : maxfd;
103             FD_SET(fd, &wfd);
104             continue;
105           }
106
107         case BlockedOnDelay:
108           {
109             int candidate; /* signed int is intentional */
110 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
111             candidate = tso->block_info.delay;
112 #else
113             candidate = tso->block_info.target - getourtimeofday();
114             if (candidate < 0) {
115               candidate = 0;
116             }
117 #endif
118             if ((nat)candidate < min) {
119               min = candidate;
120             }
121             continue;
122           }
123
124         default:
125           barf("AwaitEvent");
126         }
127       }
128
129       /* Release the scheduler lock while we do the poll.
130        * this means that someone might muck with the blocked_queue
131        * while we do this, but it shouldn't matter:
132        *
133        *   - another task might poll for I/O and remove one
134        *     or more threads from the blocked_queue.
135        *   - more I/O threads may be added to blocked_queue.
136        *   - more delayed threads may be added to blocked_queue. We'll
137        *     just subtract delta from their delays after the poll.
138        *
139        * I believe none of these cases lead to trouble --SDM.
140        */
141       RELEASE_LOCK(&sched_mutex);
142
143       /* Check for any interesting events */
144
145       tv.tv_sec = min / 1000000;
146       tv.tv_usec = min % 1000000;
147
148 #ifndef linux_TARGET_OS
149       gettimeofday(&tv_before, (struct timezone *) NULL);
150 #endif
151
152       while (!interrupted &&
153              (numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
154         if (errno != EINTR) {
155           /* fflush(stdout); */
156           perror("select");
157           barf("select failed");
158         }
159         ACQUIRE_LOCK(&sched_mutex);
160
161         /* We got a signal; could be one of ours.  If so, we need
162          * to start up the signal handler straight away, otherwise
163          * we could block for a long time before the signal is
164          * serviced.
165          */
166         if (signals_pending()) {
167           RELEASE_LOCK(&sched_mutex);
168           start_signal_handlers();
169           /* Don't wake up any other threads that were waiting on I/O */
170           select_succeeded = rtsFalse;
171           break;
172         }
173
174         if (interrupted) {
175             RELEASE_LOCK(&sched_mutex);
176             select_succeeded = rtsFalse;
177             break;
178         }
179
180         /* If new runnable threads have arrived, stop waiting for
181          * I/O and run them.
182          */
183         if (run_queue_hd != END_TSO_QUEUE) {
184           RELEASE_LOCK(&sched_mutex);
185           select_succeeded = rtsFalse;
186           break;
187         }
188         
189         RELEASE_LOCK(&sched_mutex);
190       } 
191
192 #ifdef linux_TARGET_OS
193       /* on Linux, tv is set to indicate the amount of time not
194        * slept, so we don't need to gettimeofday() to find out.
195        */
196       delta += min - (tv.tv_sec * 1000000 + tv.tv_usec);
197 #else
198       gettimeofday(&tv_after, (struct timezone *) NULL);
199       delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
200         tv_after.tv_usec - tv_before.tv_usec;
201 #endif
202
203 #if 0
204       if (delta != 0) { fprintf(stderr,"waited: %d %d %d\n", min, delta,
205                                 interrupted); }
206 #endif
207
208       ACQUIRE_LOCK(&sched_mutex);
209
210       /* Step through the waiting queue, unblocking every thread that now has
211        * a file descriptor in a ready state.
212         
213        * For the delayed threads, decrement the number of microsecs
214        * we've been blocked for. Unblock the threads that have thusly expired.
215        */
216
217       prev = NULL;
218       for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
219         next = tso->link;
220         switch (tso->why_blocked) {
221         case BlockedOnRead:
222           ready = select_succeeded && FD_ISSET(tso->block_info.fd, &rfd);
223           break;
224         
225         case BlockedOnWrite:
226           ready = select_succeeded && FD_ISSET(tso->block_info.fd, &wfd);
227           break;
228         
229         case BlockedOnDelay:
230           {
231 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
232             if (tso->block_info.delay > delta) {
233               tso->block_info.delay -= delta;
234               ready = 0;
235             } else {
236               tso->block_info.delay = 0;
237               ready = 1;
238             }
239 #else
240             int candidate; /* signed int is intentional */
241             candidate = tso->block_info.target - getourtimeofday();
242             if (candidate < 0) {
243               candidate = 0;
244             }
245             if ((nat)candidate > delta) {
246               ready = 0;
247             } else {
248               ready = 1;
249             }
250 #endif
251             break;
252           }
253         
254         default:
255           barf("awaitEvent");
256         }
257       
258         if (ready) {
259           IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
260           tso->why_blocked = NotBlocked;
261           tso->link = END_TSO_QUEUE;
262           PUSH_ON_RUN_QUEUE(tso);
263         } else {
264           if (prev == NULL)
265             blocked_queue_hd = tso;
266           else
267             prev->link = tso;
268           prev = tso;
269         }
270       }
271
272       if (prev == NULL)
273         blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
274       else {
275         prev->link = END_TSO_QUEUE;
276         blocked_queue_tl = prev;
277       }
278
279     } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
280 #endif
281 }