[project @ 1999-11-24 16:39:33 by simonmar]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.5 1999/11/24 16:39:33 simonmar Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18 #include "Signals.h"
19
20 # if defined(HAVE_SYS_TYPES_H)
21 #  include <sys/types.h>
22 # endif
23
24 # ifdef HAVE_SYS_TIME_H
25 #  include <sys/time.h>
26 # endif
27
28 nat ticks_since_select = 0;
29
30 /* Argument 'wait' says whether to wait for I/O to become available,
31  * or whether to just check and return immediately.  If there are
32  * other threads ready to run, we normally do the non-waiting variety,
33  * otherwise we wait (see Schedule.c).
34  *
35  * SMP note: must be called with sched_mutex locked.
36  */
37 void
38 awaitEvent(rtsBool wait)
39 {
40 #ifdef mingw32_TARGET_OS
41 /*
42  * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
43  * to achieve (similar) effect.
44  *
45  */
46     return;
47 #else
48
49     StgTSO *tso, *prev, *next;
50     rtsBool ready;
51     fd_set rfd,wfd;
52     int numFound;
53     nat min, delta;
54     int maxfd = -1;
55    
56     struct timeval tv;
57 #ifndef linux_TARGET_OS
58     struct timeval tv_before,tv_after;
59 #endif
60
61     IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));
62
63     /* see how long it's been since we last checked the blocked queue.
64      * ToDo: make this check atomic, so we don't lose any ticks.
65      */
66     delta = ticks_since_select;
67     ticks_since_select = 0;
68     delta = delta * TICK_MILLISECS * 1000;
69
70     min = wait == rtsTrue ? 0x7fffffff : 0;
71
72     /* 
73      * Collect all of the fd's that we're interested in, and capture
74      * the minimum waiting time (in microseconds) for the delayed threads.
75      */
76     FD_ZERO(&rfd);
77     FD_ZERO(&wfd);
78
79     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
80       next = tso->link;
81
82       switch (tso->why_blocked) {
83       case BlockedOnRead:
84         { 
85           int fd = tso->block_info.fd;
86           maxfd = (fd > maxfd) ? fd : maxfd;
87           FD_SET(fd, &rfd);
88           continue;
89         }
90
91       case BlockedOnWrite:
92         { 
93           int fd = tso->block_info.fd;
94           maxfd = (fd > maxfd) ? fd : maxfd;
95           FD_SET(fd, &wfd);
96           continue;
97         }
98
99       case BlockedOnDelay:
100         {
101           if ((int)tso->block_info.delay < min)
102             min = tso->block_info.delay;
103           continue;
104         }
105
106       default:
107         barf("AwaitEvent");
108       }
109     }
110
111     /* Release the scheduler lock while we do the poll.
112      * this means that someone might muck with the blocked_queue
113      * while we do this, but it shouldn't matter:
114      *
115      *   - another task might poll for I/O and remove one
116      *     or more threads from the blocked_queue.
117      *   - more I/O threads may be added to blocked_queue.
118      *   - more delayed threads may be added to blocked_queue. We'll
119      *     just subtract delta from their delays after the poll.
120      *
121      * I believe none of these cases lead to trouble --SDM.
122      */
123     RELEASE_LOCK(&sched_mutex);
124
125     /* Check for any interesting events */
126
127     tv.tv_sec = min / 1000000;
128     tv.tv_usec = min % 1000000;
129
130 #ifndef linux_TARGET_OS
131     gettimeofday(&tv_before, (struct timezone *) NULL);
132 #endif
133
134     while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
135       if (errno != EINTR) {
136         /* fflush(stdout); */
137         fprintf(stderr, "awaitEvent: select failed\n");
138         stg_exit(EXIT_FAILURE);
139       }
140       ACQUIRE_LOCK(&sched_mutex);
141       /* We got a signal; could be one of ours.  If so, we need
142        * to start up the signal handler straight away, otherwise
143        * we could block for a long time before the signal is
144        * serviced.
145        */
146       if (signals_pending()) {
147         start_signal_handlers();
148         return;
149       }
150
151       /* If new runnable threads have arrived, stop waiting for
152        * I/O and run them.
153        */
154       if (run_queue_hd != END_TSO_QUEUE) {
155         return;
156       }
157       RELEASE_LOCK(&sched_mutex);
158     }   
159
160     if (numFound != 0) { 
161       /* 
162         File descriptors ready, but we don't know how much time was spent
163         in the select(). To interpolate, we compare the time before
164         and after the select(). 
165       */
166
167 #ifdef linux_TARGET_OS
168       /* on Linux, tv is set to indicate the amount of time not
169        * slept, so we don't need to gettimeofday() to find out.
170        */
171       delta += min - (tv.tv_sec * 1000000 + tv.tv_usec);
172 #else
173       gettimeofday(&tv_after, (struct timezone *) NULL);
174       delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
175                 tv_after.tv_usec - tv_before.tv_usec;
176 #endif
177     } else {
178       delta += min;
179     }
180
181     ACQUIRE_LOCK(&sched_mutex);
182
183     /*
184       Step through the waiting queue, unblocking every thread that now has
185       a file descriptor in a ready state.
186
187       For the delayed threads, decrement the number of microsecs
188       we've been blocked for. Unblock the threads that have thusly expired.
189      */
190
191     prev = NULL;
192     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
193       next = tso->link;
194       switch (tso->why_blocked) {
195       case BlockedOnRead:
196         ready = FD_ISSET(tso->block_info.fd, &rfd);
197         break;
198         
199       case BlockedOnWrite:
200         ready = FD_ISSET(tso->block_info.fd, &wfd);
201         break;
202         
203       case BlockedOnDelay:
204         if (tso->block_info.delay > delta) {
205           tso->block_info.delay -= delta;
206           ready = 0;
207         } else {
208           tso->block_info.delay = 0;
209           ready = 1;
210         }
211         break;
212         
213       default:
214         barf("awaitEvent");
215       }
216       
217       if (ready) {
218         IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
219         tso->why_blocked = NotBlocked;
220         tso->link = END_TSO_QUEUE;
221         PUSH_ON_RUN_QUEUE(tso);
222       } else {
223         if (prev == NULL)
224           blocked_queue_hd = tso;
225         else
226           prev->link = tso;
227         prev = tso;
228       }
229     }
230
231     if (prev == NULL)
232       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
233     else {
234       prev->link = END_TSO_QUEUE;
235       blocked_queue_tl = prev;
236     }
237 #endif
238 }