Split GC.c, and move storage manager into sm/ directory
[ghc-hetmet.git] / rts / posix / Select.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1995-2002
4  *
5  * Support for concurrent non-blocking I/O and thread waiting.
6  *
7  * ---------------------------------------------------------------------------*/
8
9 /* we're outside the realms of POSIX here... */
10 /* #include "PosixSource.h" */
11
12 #include "Rts.h"
13 #include "Storage.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Timer.h"
18 #include "Itimer.h"
19 #include "Signals.h"
20 #include "Capability.h"
21 #include "posix/Select.h"
22
23 # ifdef HAVE_SYS_TYPES_H
24 #  include <sys/types.h>
25 # endif
26
27 # ifdef HAVE_SYS_TIME_H
28 #  include <sys/time.h>
29 # endif
30
31 #include <errno.h>
32 #include <string.h>
33
34 #ifdef HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37
38 #if !defined(THREADED_RTS)
39 /* last timestamp */
40 lnat timestamp = 0;
41
42 /* 
43  * The threaded RTS uses an IO-manager thread in Haskell instead (see GHC.Conc) 
44  */
45
46 /* There's a clever trick here to avoid problems when the time wraps
47  * around.  Since our maximum delay is smaller than 31 bits of ticks
48  * (it's actually 31 bits of microseconds), we can safely check
49  * whether a timer has expired even if our timer will wrap around
50  * before the target is reached, using the following formula:
51  *
52  *        (int)((uint)current_time - (uint)target_time) < 0
53  *
54  * if this is true, then our time has expired.
55  * (idea due to Andy Gill).
56  */
57 static rtsBool
58 wakeUpSleepingThreads(lnat ticks)
59 {
60     StgTSO *tso;
61     rtsBool flag = rtsFalse;
62
63     while (sleeping_queue != END_TSO_QUEUE &&
64            (int)(ticks - sleeping_queue->block_info.target) > 0) {
65         tso = sleeping_queue;
66         sleeping_queue = tso->link;
67         tso->why_blocked = NotBlocked;
68         tso->link = END_TSO_QUEUE;
69         IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %lu\n", (unsigned long)tso->id));
70         // MainCapability: this code is !THREADED_RTS
71         pushOnRunQueue(&MainCapability,tso);
72         flag = rtsTrue;
73     }
74     return flag;
75 }
76
77 /* Argument 'wait' says whether to wait for I/O to become available,
78  * or whether to just check and return immediately.  If there are
79  * other threads ready to run, we normally do the non-waiting variety,
80  * otherwise we wait (see Schedule.c).
81  *
82  * SMP note: must be called with sched_mutex locked.
83  *
84  * Windows: select only works on sockets, so this doesn't really work,
85  * though it makes things better than before. MsgWaitForMultipleObjects
86  * should really be used, though it only seems to work for read handles,
87  * not write handles.
88  *
89  */
90 void
91 awaitEvent(rtsBool wait)
92 {
93     StgTSO *tso, *prev, *next;
94     rtsBool ready;
95     fd_set rfd,wfd;
96     int numFound;
97     int maxfd = -1;
98     rtsBool select_succeeded = rtsTrue;
99     rtsBool unblock_all = rtsFalse;
100     struct timeval tv;
101     lnat min, ticks;
102
103     tv.tv_sec  = 0;
104     tv.tv_usec = 0;
105     
106     IF_DEBUG(scheduler,
107              debugBelch("scheduler: checking for threads blocked on I/O");
108              if (wait) {
109                  debugBelch(" (waiting)");
110              }
111              debugBelch("\n");
112              );
113
114     /* loop until we've woken up some threads.  This loop is needed
115      * because the select timing isn't accurate, we sometimes sleep
116      * for a while but not long enough to wake up a thread in
117      * a threadDelay.
118      */
119     do {
120
121       ticks = timestamp = getourtimeofday();
122       if (wakeUpSleepingThreads(ticks)) { 
123           return;
124       }
125
126       if (!wait) {
127           min = 0;
128       } else if (sleeping_queue != END_TSO_QUEUE) {
129           min = (sleeping_queue->block_info.target - ticks) 
130               * RtsFlags.MiscFlags.tickInterval * 1000;
131       } else {
132           min = 0x7ffffff;
133       }
134
135       /* 
136        * Collect all of the fd's that we're interested in
137        */
138       FD_ZERO(&rfd);
139       FD_ZERO(&wfd);
140
141       for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
142         next = tso->link;
143
144         switch (tso->why_blocked) {
145         case BlockedOnRead:
146           { 
147             int fd = tso->block_info.fd;
148             if (fd >= FD_SETSIZE) {
149                 barf("awaitEvent: descriptor out of range");
150             }
151             maxfd = (fd > maxfd) ? fd : maxfd;
152             FD_SET(fd, &rfd);
153             continue;
154           }
155
156         case BlockedOnWrite:
157           { 
158             int fd = tso->block_info.fd;
159             if (fd >= FD_SETSIZE) {
160                 barf("awaitEvent: descriptor out of range");
161             }
162             maxfd = (fd > maxfd) ? fd : maxfd;
163             FD_SET(fd, &wfd);
164             continue;
165           }
166
167         default:
168           barf("AwaitEvent");
169         }
170       }
171
172       /* Check for any interesting events */
173       
174       tv.tv_sec  = min / 1000000;
175       tv.tv_usec = min % 1000000;
176
177       while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
178           if (errno != EINTR) {
179             /* Handle bad file descriptors by unblocking all the
180                waiting threads. Why? Because a thread might have been
181                a bit naughty and closed a file descriptor while another
182                was blocked waiting. This is less-than-good programming
183                practice, but having the RTS as a result fall over isn't
184                acceptable, so we simply unblock all the waiting threads
185                should we see a bad file descriptor & give the threads
186                a chance to clean up their act. 
187                
188                Note: assume here that threads becoming unblocked
189                will try to read/write the file descriptor before trying
190                to issue a threadWaitRead/threadWaitWrite again (==> an
191                IOError will result for the thread that's got the bad
192                file descriptor.) Hence, there's no danger of a bad
193                file descriptor being repeatedly select()'ed on, so
194                the RTS won't loop.
195             */
196             if ( errno == EBADF ) {
197               unblock_all = rtsTrue;
198               break;
199             } else {
200               perror("select");
201               barf("select failed");
202             }
203           }
204
205           /* We got a signal; could be one of ours.  If so, we need
206            * to start up the signal handler straight away, otherwise
207            * we could block for a long time before the signal is
208            * serviced.
209            */
210 #if defined(RTS_USER_SIGNALS)
211           if (signals_pending()) {
212               startSignalHandlers(&MainCapability);
213               return; /* still hold the lock */
214           }
215 #endif
216
217           /* we were interrupted, return to the scheduler immediately.
218            */
219           if (sched_state >= SCHED_INTERRUPTING) {
220               return; /* still hold the lock */
221           }
222           
223           /* check for threads that need waking up 
224            */
225           wakeUpSleepingThreads(getourtimeofday());
226           
227           /* If new runnable threads have arrived, stop waiting for
228            * I/O and run them.
229            */
230           if (!emptyRunQueue(&MainCapability)) {
231               return; /* still hold the lock */
232           }
233       }
234
235       /* Step through the waiting queue, unblocking every thread that now has
236        * a file descriptor in a ready state.
237        */
238
239       prev = NULL;
240       if (select_succeeded || unblock_all) {
241           for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
242               next = tso->link;
243               switch (tso->why_blocked) {
244               case BlockedOnRead:
245                   ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
246                   break;
247               case BlockedOnWrite:
248                   ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
249                   break;
250               default:
251                   barf("awaitEvent");
252               }
253       
254               if (ready) {
255                 IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id));
256                   tso->why_blocked = NotBlocked;
257                   tso->link = END_TSO_QUEUE;
258                   pushOnRunQueue(&MainCapability,tso);
259               } else {
260                   if (prev == NULL)
261                       blocked_queue_hd = tso;
262                   else
263                       prev->link = tso;
264                   prev = tso;
265               }
266           }
267
268           if (prev == NULL)
269               blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
270           else {
271               prev->link = END_TSO_QUEUE;
272               blocked_queue_tl = prev;
273           }
274       }
275       
276     } while (wait && sched_state == SCHED_RUNNING
277              && emptyRunQueue(&MainCapability));
278 }
279
280 #endif /* THREADED_RTS */