[project @ 2000-01-12 15:15:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.6 2000/01/12 15:15:18 simonmar Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18 #include "Signals.h"
19
20 # if defined(HAVE_SYS_TYPES_H)
21 #  include <sys/types.h>
22 # endif
23
24 # ifdef HAVE_SYS_TIME_H
25 #  include <sys/time.h>
26 # endif
27
28 nat ticks_since_select = 0;
29
30 /* Argument 'wait' says whether to wait for I/O to become available,
31  * or whether to just check and return immediately.  If there are
32  * other threads ready to run, we normally do the non-waiting variety,
33  * otherwise we wait (see Schedule.c).
34  *
35  * SMP note: must be called with sched_mutex locked.
36  */
37 void
38 awaitEvent(rtsBool wait)
39 {
40 #ifdef mingw32_TARGET_OS
41 /*
42  * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
43  * to achieve (similar) effect.
44  *
45  */
46     return;
47 #else
48
49     StgTSO *tso, *prev, *next;
50     rtsBool ready;
51     fd_set rfd,wfd;
52     int numFound;
53     nat min, delta;
54     int maxfd = -1;
55    
56     struct timeval tv;
57 #ifndef linux_TARGET_OS
58     struct timeval tv_before,tv_after;
59 #endif
60
61     IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));
62
63     /* see how long it's been since we last checked the blocked queue.
64      * ToDo: make this check atomic, so we don't lose any ticks.
65      */
66     delta = ticks_since_select;
67     ticks_since_select = 0;
68     delta = delta * TICK_MILLISECS * 1000;
69
70     min = wait == rtsTrue ? 0x7fffffff : 0;
71
72     /* 
73      * Collect all of the fd's that we're interested in, and capture
74      * the minimum waiting time (in microseconds) for the delayed threads.
75      */
76     FD_ZERO(&rfd);
77     FD_ZERO(&wfd);
78
79     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
80       next = tso->link;
81
82       switch (tso->why_blocked) {
83       case BlockedOnRead:
84         { 
85           int fd = tso->block_info.fd;
86           maxfd = (fd > maxfd) ? fd : maxfd;
87           FD_SET(fd, &rfd);
88           continue;
89         }
90
91       case BlockedOnWrite:
92         { 
93           int fd = tso->block_info.fd;
94           maxfd = (fd > maxfd) ? fd : maxfd;
95           FD_SET(fd, &wfd);
96           continue;
97         }
98
99       case BlockedOnDelay:
100         {
101           if (tso->block_info.delay < min)
102             min = tso->block_info.delay;
103           continue;
104         }
105
106       default:
107         barf("AwaitEvent");
108       }
109     }
110
111     /* Release the scheduler lock while we do the poll.
112      * this means that someone might muck with the blocked_queue
113      * while we do this, but it shouldn't matter:
114      *
115      *   - another task might poll for I/O and remove one
116      *     or more threads from the blocked_queue.
117      *   - more I/O threads may be added to blocked_queue.
118      *   - more delayed threads may be added to blocked_queue. We'll
119      *     just subtract delta from their delays after the poll.
120      *
121      * I believe none of these cases lead to trouble --SDM.
122      */
123     RELEASE_LOCK(&sched_mutex);
124
125     /* Check for any interesting events */
126
127     tv.tv_sec = min / 1000000;
128     tv.tv_usec = min % 1000000;
129
130 #ifndef linux_TARGET_OS
131     gettimeofday(&tv_before, (struct timezone *) NULL);
132 #endif
133
134     while (!interrupted &&
135            (numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
136       if (errno != EINTR) {
137         /* fflush(stdout); */
138         perror("select");
139         fprintf(stderr, "awaitEvent: select failed\n");
140         stg_exit(EXIT_FAILURE);
141       }
142       ACQUIRE_LOCK(&sched_mutex);
143
144       /* We got a signal; could be one of ours.  If so, we need
145        * to start up the signal handler straight away, otherwise
146        * we could block for a long time before the signal is
147        * serviced.
148        */
149       if (signals_pending()) {
150         start_signal_handlers();
151         RELEASE_LOCK(&sched_mutex);
152         break;
153       }
154
155       /* If new runnable threads have arrived, stop waiting for
156        * I/O and run them.
157        */
158       if (run_queue_hd != END_TSO_QUEUE) {
159         RELEASE_LOCK(&sched_mutex);
160         break;
161       }
162
163       RELEASE_LOCK(&sched_mutex);
164     }   
165
166 #ifdef linux_TARGET_OS
167     /* on Linux, tv is set to indicate the amount of time not
168      * slept, so we don't need to gettimeofday() to find out.
169      */
170     delta += min - (tv.tv_sec * 1000000 + tv.tv_usec);
171 #else
172     gettimeofday(&tv_after, (struct timezone *) NULL);
173     delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
174       tv_after.tv_usec - tv_before.tv_usec;
175 #endif
176
177 #if 0
178     if (delta != 0) { fprintf(stderr,"waited: %d %d %d\n", min, delta,
179                               interrupted); }
180 #endif
181
182     ACQUIRE_LOCK(&sched_mutex);
183
184     /*
185       Step through the waiting queue, unblocking every thread that now has
186       a file descriptor in a ready state.
187
188       For the delayed threads, decrement the number of microsecs
189       we've been blocked for. Unblock the threads that have thusly expired.
190      */
191
192     prev = NULL;
193     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
194       next = tso->link;
195       switch (tso->why_blocked) {
196       case BlockedOnRead:
197         ready = FD_ISSET(tso->block_info.fd, &rfd);
198         break;
199         
200       case BlockedOnWrite:
201         ready = FD_ISSET(tso->block_info.fd, &wfd);
202         break;
203         
204       case BlockedOnDelay:
205         if (tso->block_info.delay > delta) {
206           tso->block_info.delay -= delta;
207           ready = 0;
208         } else {
209           tso->block_info.delay = 0;
210           ready = 1;
211         }
212         break;
213         
214       default:
215         barf("awaitEvent");
216       }
217       
218       if (ready) {
219         IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
220         tso->why_blocked = NotBlocked;
221         tso->link = END_TSO_QUEUE;
222         PUSH_ON_RUN_QUEUE(tso);
223       } else {
224         if (prev == NULL)
225           blocked_queue_hd = tso;
226         else
227           prev->link = tso;
228         prev = tso;
229       }
230     }
231
232     if (prev == NULL)
233       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
234     else {
235       prev->link = END_TSO_QUEUE;
236       blocked_queue_tl = prev;
237     }
238 #endif
239 }