26870641bc02539b97e1f39d6107de2b16efce0a
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1995-2002
4  *
5  * Support for concurrent non-blocking I/O and thread waiting.
6  *
7  * ---------------------------------------------------------------------------*/
8
9
10 /* we're outside the realms of POSIX here... */
11 /* #include "PosixSource.h" */
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Timer.h"
18 #include "Itimer.h"
19 #include "Signals.h"
20 #include "Capability.h"
21
22 # ifdef HAVE_SYS_TYPES_H
23 #  include <sys/types.h>
24 # endif
25
26 # ifdef HAVE_SYS_TIME_H
27 #  include <sys/time.h>
28 # endif
29
30 #include <errno.h>
31 #include <string.h>
32
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36
37 /* last timestamp */
38 nat timestamp = 0;
39
40 #ifdef RTS_SUPPORTS_THREADS
41 static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
42 static rtsBool workerWakeupPending = rtsFalse;
43 static int workerWakeupPipe[2];
44 static rtsBool workerWakeupInited = rtsFalse;
45 #endif
46
47 /* There's a clever trick here to avoid problems when the time wraps
48  * around.  Since our maximum delay is smaller than 31 bits of ticks
49  * (it's actually 31 bits of microseconds), we can safely check
50  * whether a timer has expired even if our timer will wrap around
51  * before the target is reached, using the following formula:
52  *
53  *        (int)((uint)current_time - (uint)target_time) < 0
54  *
55  * if this is true, then our time has expired.
56  * (idea due to Andy Gill).
57  */
58 rtsBool
59 wakeUpSleepingThreads(nat ticks)
60 {
61     StgTSO *tso;
62     rtsBool flag = rtsFalse;
63
64     while (sleeping_queue != END_TSO_QUEUE &&
65            (int)(ticks - sleeping_queue->block_info.target) > 0) {
66         tso = sleeping_queue;
67         sleeping_queue = tso->link;
68         tso->why_blocked = NotBlocked;
69         tso->link = END_TSO_QUEUE;
70         IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %d\n", tso->id));
71         PUSH_ON_RUN_QUEUE(tso);
72         flag = rtsTrue;
73     }
74     return flag;
75 }
76
77 /* Argument 'wait' says whether to wait for I/O to become available,
78  * or whether to just check and return immediately.  If there are
79  * other threads ready to run, we normally do the non-waiting variety,
80  * otherwise we wait (see Schedule.c).
81  *
82  * SMP note: must be called with sched_mutex locked.
83  *
84  * Windows: select only works on sockets, so this doesn't really work,
85  * though it makes things better than before. MsgWaitForMultipleObjects
86  * should really be used, though it only seems to work for read handles,
87  * not write handles.
88  *
89  */
90 void
91 awaitEvent(rtsBool wait)
92 {
93     StgTSO *tso, *prev, *next;
94     rtsBool ready;
95     fd_set rfd,wfd;
96     int numFound;
97     int maxfd = -1;
98     rtsBool select_succeeded = rtsTrue;
99     rtsBool unblock_all = rtsFalse;
100     struct timeval tv;
101     lnat min, ticks;
102
103     tv.tv_sec  = 0;
104     tv.tv_usec = 0;
105     
106     IF_DEBUG(scheduler,
107              debugBelch("scheduler: checking for threads blocked on I/O");
108              if (wait) {
109                  debugBelch(" (waiting)");
110              }
111              debugBelch("\n");
112              );
113
114     /* loop until we've woken up some threads.  This loop is needed
115      * because the select timing isn't accurate, we sometimes sleep
116      * for a while but not long enough to wake up a thread in
117      * a threadDelay.
118      */
119     do {
120
121       ticks = timestamp = getourtimeofday();
122       if (wakeUpSleepingThreads(ticks)) { 
123           return;
124       }
125
126       if (!wait) {
127           min = 0;
128       } else if (sleeping_queue != END_TSO_QUEUE) {
129           min = (sleeping_queue->block_info.target - ticks) 
130               * TICK_MILLISECS * 1000;
131       } else {
132           min = 0x7ffffff;
133       }
134
135       /* 
136        * Collect all of the fd's that we're interested in
137        */
138       FD_ZERO(&rfd);
139       FD_ZERO(&wfd);
140
141       for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
142         next = tso->link;
143
144         switch (tso->why_blocked) {
145         case BlockedOnRead:
146           { 
147             int fd = tso->block_info.fd;
148             maxfd = (fd > maxfd) ? fd : maxfd;
149             FD_SET(fd, &rfd);
150             continue;
151           }
152
153         case BlockedOnWrite:
154           { 
155             int fd = tso->block_info.fd;
156             maxfd = (fd > maxfd) ? fd : maxfd;
157             FD_SET(fd, &wfd);
158             continue;
159           }
160
161         default:
162           barf("AwaitEvent");
163         }
164       }
165
166 #ifdef RTS_SUPPORTS_THREADS
167       if(!workerWakeupInited) {
168           pipe(workerWakeupPipe);
169           workerWakeupInited = rtsTrue;
170       }
171       FD_SET(workerWakeupPipe[0], &rfd);
172       maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
173 #endif
174       
175       /* Release the scheduler lock while we do the poll.
176        * this means that someone might muck with the blocked_queue
177        * while we do this, but it shouldn't matter:
178        *
179        *   - another task might poll for I/O and remove one
180        *     or more threads from the blocked_queue.
181        *   - more I/O threads may be added to blocked_queue.
182        *   - more delayed threads may be added to blocked_queue. We'll
183        *     just subtract delta from their delays after the poll.
184        *
185        * I believe none of these cases lead to trouble --SDM.
186        */
187       
188 #ifdef RTS_SUPPORTS_THREADS
189       isWorkerBlockedInAwaitEvent = rtsTrue;
190       workerWakeupPending = rtsFalse;
191 #endif
192       RELEASE_LOCK(&sched_mutex);
193
194       /* Check for any interesting events */
195       
196       tv.tv_sec  = min / 1000000;
197       tv.tv_usec = min % 1000000;
198
199       while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
200           if (errno != EINTR) {
201             /* Handle bad file descriptors by unblocking all the
202                waiting threads. Why? Because a thread might have been
203                a bit naughty and closed a file descriptor while another
204                was blocked waiting. This is less-than-good programming
205                practice, but having the RTS as a result fall over isn't
206                acceptable, so we simply unblock all the waiting threads
207                should we see a bad file descriptor & give the threads
208                a chance to clean up their act. 
209                
210                Note: assume here that threads becoming unblocked
211                will try to read/write the file descriptor before trying
212                to issue a threadWaitRead/threadWaitWrite again (==> an
213                IOError will result for the thread that's got the bad
214                file descriptor.) Hence, there's no danger of a bad
215                file descriptor being repeatedly select()'ed on, so
216                the RTS won't loop.
217             */
218             if ( errno == EBADF ) {
219               unblock_all = rtsTrue;
220               break;
221             } else {
222               perror("select");
223               barf("select failed");
224             }
225           }
226           ACQUIRE_LOCK(&sched_mutex);
227 #ifdef RTS_SUPPORTS_THREADS
228           isWorkerBlockedInAwaitEvent = rtsFalse;
229 #endif
230
231           /* We got a signal; could be one of ours.  If so, we need
232            * to start up the signal handler straight away, otherwise
233            * we could block for a long time before the signal is
234            * serviced.
235            */
236 #if defined(RTS_USER_SIGNALS)
237           if (signals_pending()) {
238               RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
239               startSignalHandlers();
240               ACQUIRE_LOCK(&sched_mutex);
241               return; /* still hold the lock */
242           }
243 #endif
244
245           /* we were interrupted, return to the scheduler immediately.
246            */
247           if (interrupted) {
248               return; /* still hold the lock */
249           }
250           
251           /* check for threads that need waking up 
252            */
253           wakeUpSleepingThreads(getourtimeofday());
254           
255           /* If new runnable threads have arrived, stop waiting for
256            * I/O and run them.
257            */
258           if (run_queue_hd != END_TSO_QUEUE) {
259               return; /* still hold the lock */
260           }
261           
262 #ifdef RTS_SUPPORTS_THREADS
263           /* If another worker thread wants to take over,
264            * return to the scheduler
265            */
266           if (needToYieldToReturningWorker()) {
267               return; /* still hold the lock */
268           }
269 #endif
270           
271 #ifdef RTS_SUPPORTS_THREADS
272           isWorkerBlockedInAwaitEvent = rtsTrue;
273 #endif
274           RELEASE_LOCK(&sched_mutex);
275       }
276
277       ACQUIRE_LOCK(&sched_mutex);
278
279       /* Step through the waiting queue, unblocking every thread that now has
280        * a file descriptor in a ready state.
281        */
282
283       prev = NULL;
284       if (select_succeeded || unblock_all) {
285           for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
286               next = tso->link;
287               switch (tso->why_blocked) {
288               case BlockedOnRead:
289                   ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
290                   break;
291               case BlockedOnWrite:
292                   ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
293                   break;
294               default:
295                   barf("awaitEvent");
296               }
297       
298               if (ready) {
299                   IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %d\n", tso->id));
300                   tso->why_blocked = NotBlocked;
301                   tso->link = END_TSO_QUEUE;
302                   PUSH_ON_RUN_QUEUE(tso);
303               } else {
304                   if (prev == NULL)
305                       blocked_queue_hd = tso;
306                   else
307                       prev->link = tso;
308                   prev = tso;
309               }
310           }
311
312           if (prev == NULL)
313               blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
314           else {
315               prev->link = END_TSO_QUEUE;
316               blocked_queue_tl = prev;
317           }
318       }
319       
320 #if defined(RTS_SUPPORTS_THREADS)
321         // if we were woken up by wakeBlockedWorkerThread,
322         // read the dummy byte from the pipe
323       if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
324           unsigned char dummy;
325           wait = rtsFalse;
326           read(workerWakeupPipe[0],&dummy,1);
327       }
328 #endif
329     } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
330 }
331
332
333 #ifdef RTS_SUPPORTS_THREADS
334 /* wakeBlockedWorkerThread
335  *
336  * If a worker thread is currently blocked within awaitEvent,
337  * wake it.
338  * Must be called with sched_mutex held.
339  */
340 void
341 wakeBlockedWorkerThread()
342 {
343     if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
344         unsigned char dummy = 42;       // Any value will do here
345         
346                         // write something so that select() wakes up
347         write(workerWakeupPipe[1],&dummy,1);
348         workerWakeupPending = rtsTrue;
349     }
350 }
351
352 /* resetWorkerWakeupPipeAfterFork
353  *
354  * To be called right after a fork().
355  * After the fork(), the worker wakeup pipe will be shared
356  * with the parent process, and that's something we don't want.
357  */
358 void
359 resetWorkerWakeupPipeAfterFork()
360 {
361     if(workerWakeupInited) {
362         close(workerWakeupPipe[0]);
363         close(workerWakeupPipe[1]);
364     }
365     workerWakeupInited = rtsFalse;
366 }
367 #endif