[project @ 1999-10-04 16:14:34 by simonmar]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.3 1999/10/04 16:14:34 simonmar Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18 #include "Signals.h"
19
20 # if defined(HAVE_SYS_TYPES_H)
21 #  include <sys/types.h>
22 # endif
23
24 # ifdef HAVE_SYS_TIME_H
25 #  include <sys/time.h>
26 # endif
27
28 nat ticks_since_select = 0;
29
30 /* Argument 'wait' says whether to wait for I/O to become available,
31  * or whether to just check and return immediately.  If there are
32  * other threads ready to run, we normally do the non-waiting variety,
33  * otherwise we wait (see Schedule.c).
34  */
35 void
36 awaitEvent(rtsBool wait)
37 {
38 #ifdef mingw32_TARGET_OS
39 /*
40  * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
41  * to achieve (similar) effect.
42  *
43  */
44     return;
45 #else
46
47     StgTSO *tso, *prev, *next;
48     rtsBool ready;
49     fd_set rfd,wfd;
50     int min, numFound, delta;
51     int maxfd = -1;
52    
53     struct timeval tv,tv_before,tv_after;
54
55     IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));
56
57     /* see how long it's been since we last checked the blocked queue.
58      * ToDo: make this check atomic, so we don't lose any ticks.
59      */
60     delta = ticks_since_select;
61     ticks_since_select = 0;
62     delta = delta * TICK_MILLISECS * 1000;
63
64     min = wait == rtsTrue ? 0x7fffffff : 0;
65
66     /* 
67      * Collect all of the fd's that we're interested in, and capture
68      * the minimum waiting time (in microseconds) for the delayed threads.
69      */
70     FD_ZERO(&rfd);
71     FD_ZERO(&wfd);
72
73     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
74       next = tso->link;
75
76       switch (tso->why_blocked) {
77       case BlockedOnRead:
78         { 
79           int fd = tso->block_info.fd;
80           maxfd = (fd > maxfd) ? fd : maxfd;
81           FD_SET(fd, &rfd);
82           continue;
83         }
84
85       case BlockedOnWrite:
86         { 
87           int fd = tso->block_info.fd;
88           maxfd = (fd > maxfd) ? fd : maxfd;
89           FD_SET(fd, &wfd);
90           continue;
91         }
92
93       case BlockedOnDelay:
94         {
95           if ((int)tso->block_info.delay < min)
96             min = tso->block_info.delay;
97           continue;
98         }
99
100       default:
101         barf("AwaitEvent");
102       }
103     }
104
105     /* Check for any interesting events */
106
107     tv.tv_sec = min / 1000000;
108     tv.tv_usec = min % 1000000;
109
110     gettimeofday(&tv_before, (struct timezone *) NULL);
111
112     while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
113       if (errno != EINTR) {
114         /* fflush(stdout); */
115         fprintf(stderr, "awaitEvent: select failed\n");
116         stg_exit(EXIT_FAILURE);
117       }
118       /* We got a signal; could be one of ours.  If so, we need
119        * to start up the signal handler straight away, otherwise
120        * we could block for a long time before the signal is
121        * serviced.
122        */
123       if (signals_pending()) {
124         start_signal_handlers();
125         return;
126       }
127     }   
128
129     if (numFound != 0) { 
130       /* 
131         File descriptors ready, but we don't know how much time was spent
132         in the select(). To interpolate, we compare the time before
133         and after the select(). 
134       */
135
136       gettimeofday(&tv_after, (struct timezone *) NULL);
137       delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
138                 tv_after.tv_usec - tv_before.tv_usec;
139
140     } else {
141       delta += min;
142     }
143
144     /*
145       Step through the waiting queue, unblocking every thread that now has
146       a file descriptor in a ready state.
147
148       For the delayed threads, decrement the number of microsecs
149       we've been blocked for. Unblock the threads that have thusly expired.
150      */
151
152     prev = NULL;
153     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
154       next = tso->link;
155       switch (tso->why_blocked) {
156       case BlockedOnRead:
157         ready = FD_ISSET(tso->block_info.fd, &rfd);
158         break;
159         
160       case BlockedOnWrite:
161         ready = FD_ISSET(tso->block_info.fd, &wfd);
162         break;
163         
164       case BlockedOnDelay:
165         tso->block_info.delay -= delta;
166         ready = (tso->block_info.delay <= 0);
167         break;
168         
169       default:
170         barf("awaitEvent");
171       }
172       
173       if (ready) {
174         IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
175         tso->why_blocked = NotBlocked;
176         tso->link = END_TSO_QUEUE;
177         PUSH_ON_RUN_QUEUE(tso);
178       } else {
179         if (prev == NULL)
180           blocked_queue_hd = tso;
181         else
182           prev->link = tso;
183         prev = tso;
184       }
185     }
186
187     if (prev == NULL)
188       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
189     else {
190       prev->link = END_TSO_QUEUE;
191       blocked_queue_tl = prev;
192     }
193 #endif
194 }