[project @ 1999-08-25 16:37:42 by simonmar]
[ghc-hetmet.git] / ghc / rts / Select.c
1 /* -----------------------------------------------------------------------------
2  * $Id: Select.c,v 1.1 1999/08/25 16:37:42 simonmar Exp $
3  *
4  * (c) The GHC Team 1995-1999
5  *
6  * Support for concurrent non-blocking I/O and thread waiting.
7  *
8  * ---------------------------------------------------------------------------*/
9
10 /* we're outside the realms of POSIX here... */
11 #define NON_POSIX_SOURCE
12
13 #include "Rts.h"
14 #include "Schedule.h"
15 #include "RtsUtils.h"
16 #include "RtsFlags.h"
17 #include "Itimer.h"
18
19 # if defined(HAVE_SYS_TYPES_H)
20 #  include <sys/types.h>
21 # endif
22
23 # ifdef HAVE_SYS_TIME_H
24 #  include <sys/time.h>
25 # endif
26
27 nat ticks_since_select = 0;
28
29 /* Argument 'wait' says whether to wait for I/O to become available,
30  * or whether to just check and return immediately.  If there are
31  * other threads ready to run, we normally do the non-waiting variety,
32  * otherwise we wait (see Schedule.c).
33  */
34 void
35 awaitEvent(rtsBool wait)
36 {
37     StgTSO *tso, *prev, *next;
38     rtsBool ready;
39     fd_set rfd,wfd;
40     int min, numFound, delta;
41     int maxfd = -1;
42    
43     struct timeval tv,tv_before,tv_after;
44
45     IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));
46
47     /* see how long it's been since we last checked the blocked queue.
48      * ToDo: make this check atomic, so we don't lose any ticks.
49      */
50     delta = ticks_since_select;
51     ticks_since_select = 0;
52     delta = delta * TICK_MILLISECS * 1000;
53
54     min = wait == rtsTrue ? 0x7fffffff : 0;
55
56     /* 
57      * Collect all of the fd's that we're interested in, and capture
58      * the minimum waiting time (in microseconds) for the delayed threads.
59      */
60     FD_ZERO(&rfd);
61     FD_ZERO(&wfd);
62
63     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
64       next = tso->link;
65
66       switch (tso->why_blocked) {
67       case BlockedOnRead:
68         { 
69           int fd = tso->block_info.fd;
70           maxfd = (fd > maxfd) ? fd : maxfd;
71           FD_SET(fd, &rfd);
72           continue;
73         }
74
75       case BlockedOnWrite:
76         { 
77           int fd = tso->block_info.fd;
78           maxfd = (fd > maxfd) ? fd : maxfd;
79           FD_SET(fd, &wfd);
80           continue;
81         }
82
83       case BlockedOnDelay:
84         {
85           if ((int)tso->block_info.delay < min)
86             min = tso->block_info.delay;
87           continue;
88         }
89
90       default:
91         barf("AwaitEvent");
92       }
93     }
94
95     /* Check for any interesting events */
96
97     tv.tv_sec = min / 1000000;
98     tv.tv_usec = min % 1000000;
99
100     gettimeofday(&tv_before, (struct timezone *) NULL);
101
102     while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
103       if (errno != EINTR) {
104         /* fflush(stdout); */
105         fprintf(stderr, "awaitEvent: select failed\n");
106         stg_exit(EXIT_FAILURE);
107       }
108     }   
109
110     if (numFound != 0) { 
111       /* 
112         File descriptors ready, but we don't know how much time was spent
113         in the select(). To interpolate, we compare the time before
114         and after the select(). 
115       */
116
117       gettimeofday(&tv_after, (struct timezone *) NULL);
118       delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
119                 tv_after.tv_usec - tv_before.tv_usec;
120
121     } else {
122       delta += min;
123     }
124
125     /*
126       Step through the waiting queue, unblocking every thread that now has
127       a file descriptor in a ready state.
128
129       For the delayed threads, decrement the number of microsecs
130       we've been blocked for. Unblock the threads that have thusly expired.
131      */
132
133     prev = NULL;
134     for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
135       next = tso->link;
136       switch (tso->why_blocked) {
137       case BlockedOnRead:
138         ready = FD_ISSET(tso->block_info.fd, &rfd);
139         break;
140         
141       case BlockedOnWrite:
142         ready = FD_ISSET(tso->block_info.fd, &wfd);
143         break;
144         
145       case BlockedOnDelay:
146         tso->block_info.delay -= delta;
147         ready = (tso->block_info.delay <= 0);
148         break;
149         
150       default:
151         barf("awaitEvent");
152       }
153       
154       if (ready) {
155         IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
156         tso->why_blocked = NotBlocked;
157         tso->link = END_TSO_QUEUE;
158         PUSH_ON_RUN_QUEUE(tso);
159       } else {
160         if (prev == NULL)
161           blocked_queue_hd = tso;
162         else
163           prev->link = tso;
164         prev = tso;
165       }
166     }
167
168     if (prev == NULL)
169       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
170     else {
171       prev->link = END_TSO_QUEUE;
172       blocked_queue_tl = prev;
173     }
174 }