%* *
%************************************************************************
+Handling of select() of read&write on file descriptors or timer expiry.
+
\begin{code}
#ifdef CONCURRENT
/* #define STK_CHK_DEBUG */
#define NULL_REG_MAP
+
+#if !defined(_AIX)
#define NON_POSIX_SOURCE
+#endif
/* Should there be a POSIX alternative based on poll()? */
-#include "stgdefs.h"
+
+#include "rtsdefs.h"
# if defined(HAVE_SYS_TYPES_H)
# include <sys/types.h>
# include <sys/time.h>
# endif
+/* Counter holding the number of timer ticks seen during GC */
+I_ delayTicks = 0;
+
+/*
+ handleTimerExpiry is used to temporarily delay the handling of
+ timer ticks for threads delayed waiting for timeout. Disable
+ during GC, counting up the ticks , before updating the waiting
+ threads queue when finished GCing.
+
+ */
+
+void
+handleTimerExpiry(enable)
+rtsBool enable;
+{
+ /*
+ If we enable the handling of timer expiry, update the WaitingThreads
+ queue with the number of ticks we have accumulated since the handling
+ was disabled.
+ */
+ if (!enable)
+ delayTicks = 1;
+ else {
+ if (delayTicks > 1) {
+ delayTicks = 0;
+ AwaitEvent((delayTicks-1) * RTSflags.ConcFlags.ctxtSwitchTime);
+ }
+ }
+}
+
void
AwaitEvent(I_ delta)
{
P_ tso, prev, next;
rtsBool ready;
- fd_set rfd;
+ fd_set rfd,wfd;
I_ us;
- I_ min;
+ I_ min, numFound;
I_ maxfd=0;
- struct timeval tv;
+
+ struct timeval tv,tv_before,tv_after;
min = delta == 0 ? 0x7fffffff : 0;
/*
* Collect all of the fd's that we're interested in, and capture
- * the minimum waiting time for the delayed threads.
+ * the minimum waiting time (in microseconds) for the delayed threads.
*
- * (I_)TSO_EVENT(tso) < 0 => thread waiting on fd (-(I_)TSO_EVENT(tso))
+ * (I_)TSO_EVENT(tso) < 0 => thread waiting on read on fd (-(I_)TSO_EVENT(tso))
*
+ * (I_)TSO_EVENT(tso) < -FD_SETSIZE => thread waiting on write on fd
+ * (FD_SETSIZE-(I_)TSO_EVENT(tso))
*/
FD_ZERO(&rfd);
- for(tso = WaitingThreadsHd; tso != Nil_closure; tso = TSO_LINK(tso)) {
+ FD_ZERO(&wfd);
+ for(tso = WaitingThreadsHd; tso != PrelBase_Z91Z93_closure; tso = TSO_LINK(tso)) {
us = (I_) TSO_EVENT(tso);
if (us > 0) {
/* Looking at a delay event */
if (us < min)
min = us;
+ } else if ( us <= (-(I_)FD_SETSIZE)) {
+ /* Looking at a waitWrite event */
+ us += (I_)FD_SETSIZE;
+ maxfd = ((1-us)> maxfd) ? (1-us) : maxfd;
+ FD_SET((-us), &wfd);
} else {
- /* Looking at a wait event */
- maxfd = ((-us)> maxfd) ? (-us) : maxfd;
+ /* Looking at a waitRead event */
+ maxfd = ((1-us)> maxfd) ? (1-us) : maxfd;
FD_SET((-us), &rfd);
}
}
tv.tv_sec = min / 1000000;
tv.tv_usec = min % 1000000;
- while (select((maxfd==0 ? 0 : (maxfd+1)), &rfd, NULL, NULL, &tv) < 0) {
+ gettimeofday(&tv_before, (struct timezone *) NULL);
+
+ while ((numFound = select(maxfd, &rfd, &wfd, NULL, &tv)) < 0) {
if (errno != EINTR) {
fflush(stdout);
fprintf(stderr, "AwaitEvent: select failed\n");
EXIT(EXIT_FAILURE);
}
}
-
+
+ if (numFound != 0) {
+ /*
+ File descriptors ready, but we have don't know how much time was spent
+ in the select(). To interpolate, we compare the time before and after the
+ select().
+ */
+
+ gettimeofday(&tv_after, (struct timezone *) NULL);
+ delta = (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
+ tv_after.tv_usec - tv_before.tv_usec;
+
+ }
+
if (delta == 0)
delta=min;
+ /*
+ Step through the waiting queue, unblocking every thread that now has
+ a file descriptor in a ready state.
+
+ For the delayed threads, decrement the number of microsecs
+ we've been blocked for. Unblock the threads that have thusly expired.
+ */
+
prev = NULL;
- for(tso = WaitingThreadsHd; tso != Nil_closure; tso = next) {
+ for(tso = WaitingThreadsHd; tso != PrelBase_Z91Z93_closure; tso = next) {
next = TSO_LINK(tso);
us = (I_) TSO_EVENT(tso);
if (us > 0) {
ready = (us <= 0);
if (!ready)
TSO_EVENT(tso) = (W_) us;
+ } else if ( us <= (-(I_)FD_SETSIZE)) {
+ /* Looking at a waitWrite event */
+ ready = FD_ISSET(((I_)FD_SETSIZE-us), &wfd);
} else {
- /* Looking at a wait event */
+ /* Looking at a waitRead event */
ready = FD_ISSET((-us), &rfd);
}
if (ready) {
#if defined(GRAN)
- if (ThreadQueueTl == Nil_closure)
+ if (ThreadQueueTl == PrelBase_Z91Z93_closure)
ThreadQueueHd = tso;
else
TSO_LINK(ThreadQueueTl) = tso;
ThreadQueueTl = tso;
- TSO_LINK(tso) = Nil_closure;
+ TSO_LINK(tso) = PrelBase_Z91Z93_closure;
#else
- if (RunnableThreadsTl == Nil_closure)
+ if (RunnableThreadsTl == PrelBase_Z91Z93_closure)
RunnableThreadsHd = tso;
else
TSO_LINK(RunnableThreadsTl) = tso;
RunnableThreadsTl = tso;
- TSO_LINK(tso) = Nil_closure;
+ TSO_LINK(tso) = PrelBase_Z91Z93_closure;
#endif
} else {
if (prev == NULL)
}
}
if (prev == NULL)
- WaitingThreadsHd = WaitingThreadsTl = Nil_closure;
+ WaitingThreadsHd = WaitingThreadsTl = PrelBase_Z91Z93_closure;
else {
- TSO_LINK(prev) = Nil_closure;
+ TSO_LINK(prev) = PrelBase_Z91Z93_closure;
WaitingThreadsTl = prev;
}
}