[project @ 1999-09-13 08:28:45 by sof]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.2 1999/09/13 08:28:45 sof Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18
19 # if defined(HAVE_SYS_TYPES_H)
20 #  include <sys/types.h>
21 # endif
22
23 # ifdef HAVE_SYS_TIME_H
24 #  include <sys/time.h>
25 # endif
26
27 nat ticks_since_select = 0;
28
29 /* Argument 'wait' says whether to wait for I/O to become available,
30  * or whether to just check and return immediately.  If there are
31  * other threads ready to run, we normally do the non-waiting variety,
32  * otherwise we wait (see Schedule.c).
33  */
34 void
35 awaitEvent(rtsBool wait)
36 {
37 #ifdef mingw32_TARGET_OS
38 /*
39  * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
40  * to achieve (similar) effect.
41  *
42  */
43     return;
44 #else
45
46     StgTSO *tso, *prev, *next;
47     rtsBool ready;
48     fd_set rfd,wfd;
49     int min, numFound, delta;
50     int maxfd = -1;
51    
52     struct timeval tv,tv_before,tv_after;
53
54     IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));
55
56     /* see how long it's been since we last checked the blocked queue.
57      * ToDo: make this check atomic, so we don't lose any ticks.
58      */
59     delta = ticks_since_select;
60     ticks_since_select = 0;
61     delta = delta * TICK_MILLISECS * 1000;
62
63     min = wait == rtsTrue ? 0x7fffffff : 0;
64
65     /* 
66      * Collect all of the fd's that we're interested in, and capture
67      * the minimum waiting time (in microseconds) for the delayed threads.
68      */
69     FD_ZERO(&rfd);
70     FD_ZERO(&wfd);
71
72     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
73       next = tso->link;
74
75       switch (tso->why_blocked) {
76       case BlockedOnRead:
77         { 
78           int fd = tso->block_info.fd;
79           maxfd = (fd > maxfd) ? fd : maxfd;
80           FD_SET(fd, &rfd);
81           continue;
82         }
83
84       case BlockedOnWrite:
85         { 
86           int fd = tso->block_info.fd;
87           maxfd = (fd > maxfd) ? fd : maxfd;
88           FD_SET(fd, &wfd);
89           continue;
90         }
91
92       case BlockedOnDelay:
93         {
94           if ((int)tso->block_info.delay < min)
95             min = tso->block_info.delay;
96           continue;
97         }
98
99       default:
100         barf("AwaitEvent");
101       }
102     }
103
104     /* Check for any interesting events */
105
106     tv.tv_sec = min / 1000000;
107     tv.tv_usec = min % 1000000;
108
109     gettimeofday(&tv_before, (struct timezone *) NULL);
110
111     while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
112       if (errno != EINTR) {
113         /* fflush(stdout); */
114         fprintf(stderr, "awaitEvent: select failed\n");
115         stg_exit(EXIT_FAILURE);
116       }
117     }   
118
119     if (numFound != 0) { 
120       /* 
121         File descriptors ready, but we don't know how much time was spent
122         in the select(). To interpolate, we compare the time before
123         and after the select(). 
124       */
125
126       gettimeofday(&tv_after, (struct timezone *) NULL);
127       delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
128                 tv_after.tv_usec - tv_before.tv_usec;
129
130     } else {
131       delta += min;
132     }
133
134     /*
135       Step through the waiting queue, unblocking every thread that now has
136       a file descriptor in a ready state.
137
138       For the delayed threads, decrement the number of microsecs
139       we've been blocked for. Unblock the threads that have thusly expired.
140      */
141
142     prev = NULL;
143     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
144       next = tso->link;
145       switch (tso->why_blocked) {
146       case BlockedOnRead:
147         ready = FD_ISSET(tso->block_info.fd, &rfd);
148         break;
149         
150       case BlockedOnWrite:
151         ready = FD_ISSET(tso->block_info.fd, &wfd);
152         break;
153         
154       case BlockedOnDelay:
155         tso->block_info.delay -= delta;
156         ready = (tso->block_info.delay <= 0);
157         break;
158         
159       default:
160         barf("awaitEvent");
161       }
162       
163       if (ready) {
164         IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
165         tso->why_blocked = NotBlocked;
166         tso->link = END_TSO_QUEUE;
167         PUSH_ON_RUN_QUEUE(tso);
168       } else {
169         if (prev == NULL)
170           blocked_queue_hd = tso;
171         else
172           prev->link = tso;
173         prev = tso;
174       }
175     }
176
177     if (prev == NULL)
178       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
179     else {
180       prev->link = END_TSO_QUEUE;
181       blocked_queue_tl = prev;
182     }
183 #endif
184 }