% % (c) The AQUA Project, Glasgow University, 1995 % %************************************************************************ %* * \section[Select.lc]{Select Available File Descriptors} %* * %************************************************************************ Handling of select() of read&write on file descriptors or timer expiry. \begin{code} #ifdef CONCURRENT /* #define STK_CHK_DEBUG */ #define NULL_REG_MAP #define NON_POSIX_SOURCE /* Should there be a POSIX alternative based on poll()? */ #include "rtsdefs.h" # if defined(HAVE_SYS_TYPES_H) # include # endif # ifdef HAVE_SYS_TIME_H # include # endif /* Counter holding the number of timer ticks seen during GC */ I_ delayTicks = 0; /* handleTimerExpiry is used to temporarily delay the handling of timer ticks for threads delayed waiting for timeout. Disable during GC, counting up the ticks , before updating the waiting threads queue when finished GCing. */ void handleTimerExpiry(enable) rtsBool enable; { /* If we enable the handling of timer expiry, update the WaitingThreads queue with the number of ticks we have accumulated since the handling was disabled. */ if (!enable) delayTicks = 1; else { if (delayTicks > 1) { delayTicks = 0; AwaitEvent((delayTicks-1) * RTSflags.ConcFlags.ctxtSwitchTime); } } } void AwaitEvent(I_ delta) { P_ tso, prev, next; rtsBool ready; fd_set rfd,wfd; I_ us; I_ min, numFound; I_ maxfd=0; struct timeval tv,tv_before,tv_after; min = delta == 0 ? 0x7fffffff : 0; /* * Collect all of the fd's that we're interested in, and capture * the minimum waiting time (in microseconds) for the delayed threads. * * (I_)TSO_EVENT(tso) < 0 => thread waiting on read on fd (-(I_)TSO_EVENT(tso)) * * (I_)TSO_EVENT(tso) < -FD_SETSIZE => thread waiting on write on fd * (FD_SETSIZE-(I_)TSO_EVENT(tso)) */ FD_ZERO(&rfd); FD_ZERO(&wfd); for(tso = WaitingThreadsHd; tso != Prelude_Z91Z93_closure; tso = TSO_LINK(tso)) { us = (I_) TSO_EVENT(tso); if (us > 0) { /* Looking at a delay event */ if (us < min) min = us; } else if ( us <= (-(I_)FD_SETSIZE)) { /* Looking at a waitWrite event */ us += (I_)FD_SETSIZE; maxfd = ((1-us)> maxfd) ? (1-us) : maxfd; FD_SET((-us), &wfd); } else { /* Looking at a waitRead event */ maxfd = ((1-us)> maxfd) ? (1-us) : maxfd; FD_SET((-us), &rfd); } } /* Check for any interesting events */ tv.tv_sec = min / 1000000; tv.tv_usec = min % 1000000; gettimeofday(&tv_before, (struct timezone *) NULL); while ((numFound = select(maxfd, &rfd, &wfd, NULL, &tv)) < 0) { if (errno != EINTR) { fflush(stdout); fprintf(stderr, "AwaitEvent: select failed\n"); EXIT(EXIT_FAILURE); } } if (numFound != 0) { /* File descriptors ready, but we have don't know how much time was spent in the select(). To interpolate, we compare the time before and after the select(). */ gettimeofday(&tv_after, (struct timezone *) NULL); delta = (tv_after.tv_sec - tv_before.tv_sec) * 1000000 + tv_after.tv_usec - tv_before.tv_usec; } if (delta == 0) delta=min; /* Step through the waiting queue, unblocking every thread that now has a file descriptor in a ready state. For the delayed threads, decrement the number of microsecs we've been blocked for. Unblock the threads that have thusly expired. */ prev = NULL; for(tso = WaitingThreadsHd; tso != Prelude_Z91Z93_closure; tso = next) { next = TSO_LINK(tso); us = (I_) TSO_EVENT(tso); if (us > 0) { /* Looking at a delay event */ us -= delta; ready = (us <= 0); if (!ready) TSO_EVENT(tso) = (W_) us; } else if ( us <= (-(I_)FD_SETSIZE)) { /* Looking at a waitWrite event */ ready = FD_ISSET(((I_)FD_SETSIZE-us), &wfd); } else { /* Looking at a waitRead event */ ready = FD_ISSET((-us), &rfd); } if (ready) { #if defined(GRAN) if (ThreadQueueTl == Prelude_Z91Z93_closure) ThreadQueueHd = tso; else TSO_LINK(ThreadQueueTl) = tso; ThreadQueueTl = tso; TSO_LINK(tso) = Prelude_Z91Z93_closure; #else if (RunnableThreadsTl == Prelude_Z91Z93_closure) RunnableThreadsHd = tso; else TSO_LINK(RunnableThreadsTl) = tso; RunnableThreadsTl = tso; TSO_LINK(tso) = Prelude_Z91Z93_closure; #endif } else { if (prev == NULL) WaitingThreadsHd = tso; else TSO_LINK(prev) = tso; prev = tso; } } if (prev == NULL) WaitingThreadsHd = WaitingThreadsTl = Prelude_Z91Z93_closure; else { TSO_LINK(prev) = Prelude_Z91Z93_closure; WaitingThreadsTl = prev; } } #endif /* CONCURRENT */ \end{code}