[project @ 2001-01-17 12:14:30 by simonmar]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.14 2000/08/25 13:12:07 simonmar Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18 #include "Signals.h"
19
20 # if defined(HAVE_SYS_TYPES_H)
21 #  include <sys/types.h>
22 # endif
23
24 # ifdef HAVE_SYS_TIME_H
25 #  include <sys/time.h>
26 # endif
27
28 /* last timestamp */
29 nat timestamp = 0;
30
31 /* keep track of the number of ticks since we last called
32  * gettimeofday(), to avoid having to call it every time we need
33  * a timestamp.
34  */
35 nat ticks_since_timestamp = 0;
36
37 /* There's a clever trick here to avoid problems when the time wraps
38  * around.  Since our maximum delay is smaller than 31 bits of ticks
39  * (it's actually 31 bits of microseconds), we can safely check
40  * whether a timer has expired even if our timer will wrap around
41  * before the target is reached, using the following formula:
42  *
43  *        (int)((uint)current_time - (uint)target_time) < 0
44  *
45  * if this is true, then our time has expired.
46  * (idea due to Andy Gill).
47  */
48 rtsBool
49 wakeUpSleepingThreads(nat ticks)
50 {
51     StgTSO *tso;
52     rtsBool flag = rtsFalse;
53
54     while (sleeping_queue != END_TSO_QUEUE &&
55            (int)(ticks - sleeping_queue->block_info.target) > 0) {
56         tso = sleeping_queue;
57         sleeping_queue = tso->link;
58         tso->why_blocked = NotBlocked;
59         tso->link = END_TSO_QUEUE;
60         IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id));
61         PUSH_ON_RUN_QUEUE(tso);
62         flag = rtsTrue;
63     }
64     return flag;
65 }
66
67 /* Argument 'wait' says whether to wait for I/O to become available,
68  * or whether to just check and return immediately.  If there are
69  * other threads ready to run, we normally do the non-waiting variety,
70  * otherwise we wait (see Schedule.c).
71  *
72  * SMP note: must be called with sched_mutex locked.
73  */
74 void
75 awaitEvent(rtsBool wait)
76 {
77 #ifdef mingw32_TARGET_OS
78 /*
79  * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
80  * to achieve (similar) effect.
81  *
82  */
83     return;
84 #else
85
86     StgTSO *tso, *prev, *next;
87     rtsBool ready;
88     fd_set rfd,wfd;
89     int numFound;
90     int maxfd = -1;
91     rtsBool select_succeeded = rtsTrue;
92     struct timeval tv;
93     lnat min, ticks;
94
95     tv.tv_sec  = 0;
96     tv.tv_usec = 0;
97
98     IF_DEBUG(scheduler,
99              belch("scheduler: checking for threads blocked on I/O");
100              if (wait) {
101                  belch(" (waiting)");
102              }
103              belch("\n");
104              );
105
106     /* loop until we've woken up some threads.  This loop is needed
107      * because the select timing isn't accurate, we sometimes sleep
108      * for a while but not long enough to wake up a thread in
109      * a threadDelay.
110      */
111     do {
112
113       ticks = timestamp = getourtimeofday();
114       ticks_since_timestamp = 0;
115       if (wakeUpSleepingThreads(ticks)) { 
116           return;
117       }
118
119       if (!wait) {
120           min = 0;
121       } else if (sleeping_queue != END_TSO_QUEUE) {
122           min = (sleeping_queue->block_info.target - ticks) 
123               * TICK_MILLISECS * 1000;
124       } else {
125           min = 0x7ffffff;
126       }
127
128       /* 
129        * Collect all of the fd's that we're interested in
130        */
131       FD_ZERO(&rfd);
132       FD_ZERO(&wfd);
133
134       for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
135         next = tso->link;
136
137         switch (tso->why_blocked) {
138         case BlockedOnRead:
139           { 
140             int fd = tso->block_info.fd;
141             maxfd = (fd > maxfd) ? fd : maxfd;
142             FD_SET(fd, &rfd);
143             continue;
144           }
145
146         case BlockedOnWrite:
147           { 
148             int fd = tso->block_info.fd;
149             maxfd = (fd > maxfd) ? fd : maxfd;
150             FD_SET(fd, &wfd);
151             continue;
152           }
153
154         default:
155           barf("AwaitEvent");
156         }
157       }
158
159       /* Release the scheduler lock while we do the poll.
160        * this means that someone might muck with the blocked_queue
161        * while we do this, but it shouldn't matter:
162        *
163        *   - another task might poll for I/O and remove one
164        *     or more threads from the blocked_queue.
165        *   - more I/O threads may be added to blocked_queue.
166        *   - more delayed threads may be added to blocked_queue. We'll
167        *     just subtract delta from their delays after the poll.
168        *
169        * I believe none of these cases lead to trouble --SDM.
170        */
171       RELEASE_LOCK(&sched_mutex);
172
173       /* Check for any interesting events */
174       
175       tv.tv_sec  = min / 1000000;
176       tv.tv_usec = min % 1000000;
177
178       while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
179
180           if (errno != EINTR) {
181               /* fflush(stdout); */
182               perror("select");
183               barf("select failed");
184           }
185           ACQUIRE_LOCK(&sched_mutex);
186         
187           /* We got a signal; could be one of ours.  If so, we need
188            * to start up the signal handler straight away, otherwise
189            * we could block for a long time before the signal is
190            * serviced.
191            */
192           if (signals_pending()) {
193               RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
194               start_signal_handlers();
195               ACQUIRE_LOCK(&sched_mutex);
196               return; /* still hold the lock */
197           }
198           
199           /* we were interrupted, return to the scheduler immediately.
200            */
201           if (interrupted) {
202               return; /* still hold the lock */
203           }
204           
205           /* check for threads that need waking up 
206            */
207           wakeUpSleepingThreads(getourtimeofday());
208           
209           /* If new runnable threads have arrived, stop waiting for
210            * I/O and run them.
211            */
212           if (run_queue_hd != END_TSO_QUEUE) {
213               return; /* still hold the lock */
214           }
215           
216           RELEASE_LOCK(&sched_mutex);
217       }
218
219       ACQUIRE_LOCK(&sched_mutex);
220
221       /* Step through the waiting queue, unblocking every thread that now has
222        * a file descriptor in a ready state.
223        */
224
225       prev = NULL;
226       if (select_succeeded) {
227           for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
228               next = tso->link;
229               switch (tso->why_blocked) {
230               case BlockedOnRead:
231                   ready = FD_ISSET(tso->block_info.fd, &rfd);
232                   break;
233               case BlockedOnWrite:
234                   ready = FD_ISSET(tso->block_info.fd, &wfd);
235                   break;
236               default:
237                   barf("awaitEvent");
238               }
239       
240               if (ready) {
241                   IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id));
242                   tso->why_blocked = NotBlocked;
243                   tso->link = END_TSO_QUEUE;
244                   PUSH_ON_RUN_QUEUE(tso);
245               } else {
246                   if (prev == NULL)
247                       blocked_queue_hd = tso;
248                   else
249                       prev->link = tso;
250                   prev = tso;
251               }
252           }
253
254           if (prev == NULL)
255               blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
256           else {
257               prev->link = END_TSO_QUEUE;
258               blocked_queue_tl = prev;
259           }
260       }
261
262     } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
263 #endif
264 }