random is now -Wall clean
[ghc-hetmet.git] / rts / posix / Select.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1995-2002
4  *
5  * Support for concurrent non-blocking I/O and thread waiting.
6  *
7  * ---------------------------------------------------------------------------*/
8
9 /* we're outside the realms of POSIX here... */
10 /* #include "PosixSource.h" */
11
12 #include "Rts.h"
13 #include "Storage.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Timer.h"
18 #include "Itimer.h"
19 #include "Signals.h"
20 #include "Capability.h"
21 #include "posix/Select.h"
22
23 # ifdef HAVE_SYS_TYPES_H
24 #  include <sys/types.h>
25 # endif
26
27 # ifdef HAVE_SYS_TIME_H
28 #  include <sys/time.h>
29 # endif
30
31 #include <errno.h>
32 #include <string.h>
33
34 #ifdef HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37
38 #if !defined(THREADED_RTS)
39 /* last timestamp */
40 lnat timestamp = 0;
41
42 /* 
43  * The threaded RTS uses an IO-manager thread in Haskell instead (see GHC.Conc) 
44  */
45
46 /* There's a clever trick here to avoid problems when the time wraps
47  * around.  Since our maximum delay is smaller than 31 bits of ticks
48  * (it's actually 31 bits of microseconds), we can safely check
49  * whether a timer has expired even if our timer will wrap around
50  * before the target is reached, using the following formula:
51  *
52  *        (int)((uint)current_time - (uint)target_time) < 0
53  *
54  * if this is true, then our time has expired.
55  * (idea due to Andy Gill).
56  */
57 static rtsBool
58 wakeUpSleepingThreads(lnat ticks)
59 {
60     StgTSO *tso;
61     rtsBool flag = rtsFalse;
62
63     while (sleeping_queue != END_TSO_QUEUE &&
64            (int)(ticks - sleeping_queue->block_info.target) >= 0) {
65         tso = sleeping_queue;
66         sleeping_queue = tso->_link;
67         tso->why_blocked = NotBlocked;
68         tso->_link = END_TSO_QUEUE;
69         IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %lu\n", (unsigned long)tso->id));
70         // MainCapability: this code is !THREADED_RTS
71         pushOnRunQueue(&MainCapability,tso);
72         flag = rtsTrue;
73     }
74     return flag;
75 }
76
77 /* Argument 'wait' says whether to wait for I/O to become available,
78  * or whether to just check and return immediately.  If there are
79  * other threads ready to run, we normally do the non-waiting variety,
80  * otherwise we wait (see Schedule.c).
81  *
82  * SMP note: must be called with sched_mutex locked.
83  *
84  * Windows: select only works on sockets, so this doesn't really work,
85  * though it makes things better than before. MsgWaitForMultipleObjects
86  * should really be used, though it only seems to work for read handles,
87  * not write handles.
88  *
89  */
90 void
91 awaitEvent(rtsBool wait)
92 {
93     StgTSO *tso, *prev, *next;
94     rtsBool ready;
95     fd_set rfd,wfd;
96     int numFound;
97     int maxfd = -1;
98     rtsBool select_succeeded = rtsTrue;
99     rtsBool unblock_all = rtsFalse;
100     struct timeval tv;
101     lnat min, ticks;
102
103     tv.tv_sec  = 0;
104     tv.tv_usec = 0;
105     
106     IF_DEBUG(scheduler,
107              debugBelch("scheduler: checking for threads blocked on I/O");
108              if (wait) {
109                  debugBelch(" (waiting)");
110              }
111              debugBelch("\n");
112              );
113
114     /* loop until we've woken up some threads.  This loop is needed
115      * because the select timing isn't accurate, we sometimes sleep
116      * for a while but not long enough to wake up a thread in
117      * a threadDelay.
118      */
119     do {
120
121       ticks = timestamp = getourtimeofday();
122       if (wakeUpSleepingThreads(ticks)) { 
123           return;
124       }
125
126       if (!wait) {
127           min = 0;
128       } else if (sleeping_queue != END_TSO_QUEUE) {
129           min = (sleeping_queue->block_info.target - ticks) 
130               * RtsFlags.MiscFlags.tickInterval * 1000;
131       } else {
132           min = 0x7ffffff;
133       }
134
135       /* 
136        * Collect all of the fd's that we're interested in
137        */
138       FD_ZERO(&rfd);
139       FD_ZERO(&wfd);
140
141       for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
142         next = tso->_link;
143
144       /* On FreeBSD FD_SETSIZE is unsigned. Cast it to signed int
145        * in order to switch off the 'comparison between signed and
146        * unsigned error message
147        */
148         switch (tso->why_blocked) {
149         case BlockedOnRead:
150           { 
151             int fd = tso->block_info.fd;
152             if (fd >= (int)FD_SETSIZE) {
153                 barf("awaitEvent: descriptor out of range");
154             }
155             maxfd = (fd > maxfd) ? fd : maxfd;
156             FD_SET(fd, &rfd);
157             continue;
158           }
159
160         case BlockedOnWrite:
161           { 
162             int fd = tso->block_info.fd;
163             if (fd >= (int)FD_SETSIZE) {
164                 barf("awaitEvent: descriptor out of range");
165             }
166             maxfd = (fd > maxfd) ? fd : maxfd;
167             FD_SET(fd, &wfd);
168             continue;
169           }
170
171         default:
172           barf("AwaitEvent");
173         }
174       }
175
176       /* Check for any interesting events */
177       
178       tv.tv_sec  = min / 1000000;
179       tv.tv_usec = min % 1000000;
180
181       while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
182           if (errno != EINTR) {
183             /* Handle bad file descriptors by unblocking all the
184                waiting threads. Why? Because a thread might have been
185                a bit naughty and closed a file descriptor while another
186                was blocked waiting. This is less-than-good programming
187                practice, but having the RTS as a result fall over isn't
188                acceptable, so we simply unblock all the waiting threads
189                should we see a bad file descriptor & give the threads
190                a chance to clean up their act. 
191                
192                Note: assume here that threads becoming unblocked
193                will try to read/write the file descriptor before trying
194                to issue a threadWaitRead/threadWaitWrite again (==> an
195                IOError will result for the thread that's got the bad
196                file descriptor.) Hence, there's no danger of a bad
197                file descriptor being repeatedly select()'ed on, so
198                the RTS won't loop.
199             */
200             if ( errno == EBADF ) {
201               unblock_all = rtsTrue;
202               break;
203             } else {
204               perror("select");
205               barf("select failed");
206             }
207           }
208
209           /* We got a signal; could be one of ours.  If so, we need
210            * to start up the signal handler straight away, otherwise
211            * we could block for a long time before the signal is
212            * serviced.
213            */
214 #if defined(RTS_USER_SIGNALS)
215           if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
216               startSignalHandlers(&MainCapability);
217               return; /* still hold the lock */
218           }
219 #endif
220
221           /* we were interrupted, return to the scheduler immediately.
222            */
223           if (sched_state >= SCHED_INTERRUPTING) {
224               return; /* still hold the lock */
225           }
226           
227           /* check for threads that need waking up 
228            */
229           wakeUpSleepingThreads(getourtimeofday());
230           
231           /* If new runnable threads have arrived, stop waiting for
232            * I/O and run them.
233            */
234           if (!emptyRunQueue(&MainCapability)) {
235               return; /* still hold the lock */
236           }
237       }
238
239       /* Step through the waiting queue, unblocking every thread that now has
240        * a file descriptor in a ready state.
241        */
242
243       prev = NULL;
244       if (select_succeeded || unblock_all) {
245           for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
246               next = tso->_link;
247               switch (tso->why_blocked) {
248               case BlockedOnRead:
249                   ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
250                   break;
251               case BlockedOnWrite:
252                   ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
253                   break;
254               default:
255                   barf("awaitEvent");
256               }
257       
258               if (ready) {
259                 IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id));
260                   tso->why_blocked = NotBlocked;
261                   tso->_link = END_TSO_QUEUE;
262                   pushOnRunQueue(&MainCapability,tso);
263               } else {
264                   if (prev == NULL)
265                       blocked_queue_hd = tso;
266                   else
267                       setTSOLink(&MainCapability, prev, tso);
268                   prev = tso;
269               }
270           }
271
272           if (prev == NULL)
273               blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
274           else {
275               prev->_link = END_TSO_QUEUE;
276               blocked_queue_tl = prev;
277           }
278       }
279       
280     } while (wait && sched_state == SCHED_RUNNING
281              && emptyRunQueue(&MainCapability));
282 }
283
284 #endif /* THREADED_RTS */