X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Fruntime%2Fmain%2FSelect.lc;h=c7a31cb280ffc7a78767f13aeecd14e9354ac513;hb=967cc47f37cb93a5e2b6df7822c9a646f0428247;hp=4fdcaa45ecf5b33583e5563a9c852550b5a75135;hpb=10521d8418fd3a1cf32882718b5bd28992db36fd;p=ghc-hetmet.git diff --git a/ghc/runtime/main/Select.lc b/ghc/runtime/main/Select.lc index 4fdcaa4..c7a31cb 100644 --- a/ghc/runtime/main/Select.lc +++ b/ghc/runtime/main/Select.lc @@ -7,6 +7,8 @@ %* * %************************************************************************ +Handling of select() of read&write on file descriptors or timer expiry. + \begin{code} #ifdef CONCURRENT @@ -14,9 +16,13 @@ /* #define STK_CHK_DEBUG */ #define NULL_REG_MAP + +#if !defined(_AIX) #define NON_POSIX_SOURCE +#endif /* Should there be a POSIX alternative based on poll()? */ -#include "stgdefs.h" + +#include "rtsdefs.h" # if defined(HAVE_SYS_TYPES_H) # include @@ -26,36 +32,75 @@ # include # endif +/* Counter holding the number of timer ticks seen during GC */ +I_ delayTicks = 0; + +/* + handleTimerExpiry is used to temporarily delay the handling of + timer ticks for threads delayed waiting for timeout. Disable + during GC, counting up the ticks , before updating the waiting + threads queue when finished GCing. + + */ + +void +handleTimerExpiry(enable) +rtsBool enable; +{ + /* + If we enable the handling of timer expiry, update the WaitingThreads + queue with the number of ticks we have accumulated since the handling + was disabled. + */ + if (!enable) + delayTicks = 1; + else { + if (delayTicks > 1) { + delayTicks = 0; + AwaitEvent((delayTicks-1) * RTSflags.ConcFlags.ctxtSwitchTime); + } + } +} + void AwaitEvent(I_ delta) { P_ tso, prev, next; rtsBool ready; - fd_set rfd; + fd_set rfd,wfd; I_ us; - I_ min; + I_ min, numFound; I_ maxfd=0; - struct timeval tv; + + struct timeval tv,tv_before,tv_after; min = delta == 0 ? 0x7fffffff : 0; /* * Collect all of the fd's that we're interested in, and capture - * the minimum waiting time for the delayed threads. + * the minimum waiting time (in microseconds) for the delayed threads. * - * (I_)TSO_EVENT(tso) < 0 => thread waiting on fd (-(I_)TSO_EVENT(tso)) + * (I_)TSO_EVENT(tso) < 0 => thread waiting on read on fd (-(I_)TSO_EVENT(tso)) * + * (I_)TSO_EVENT(tso) < -FD_SETSIZE => thread waiting on write on fd + * (FD_SETSIZE-(I_)TSO_EVENT(tso)) */ FD_ZERO(&rfd); - for(tso = WaitingThreadsHd; tso != Nil_closure; tso = TSO_LINK(tso)) { + FD_ZERO(&wfd); + for(tso = WaitingThreadsHd; tso != PrelBase_Z91Z93_closure; tso = TSO_LINK(tso)) { us = (I_) TSO_EVENT(tso); if (us > 0) { /* Looking at a delay event */ if (us < min) min = us; + } else if ( us <= (-(I_)FD_SETSIZE)) { + /* Looking at a waitWrite event */ + us += (I_)FD_SETSIZE; + maxfd = ((1-us)> maxfd) ? (1-us) : maxfd; + FD_SET((-us), &wfd); } else { - /* Looking at a wait event */ - maxfd = ((-us)> maxfd) ? (-us) : maxfd; + /* Looking at a waitRead event */ + maxfd = ((1-us)> maxfd) ? (1-us) : maxfd; FD_SET((-us), &rfd); } } @@ -65,19 +110,42 @@ AwaitEvent(I_ delta) tv.tv_sec = min / 1000000; tv.tv_usec = min % 1000000; - while (select((maxfd==0 ? 0 : (maxfd+1)), &rfd, NULL, NULL, &tv) < 0) { + gettimeofday(&tv_before, (struct timezone *) NULL); + + while ((numFound = select(maxfd, &rfd, &wfd, NULL, &tv)) < 0) { if (errno != EINTR) { fflush(stdout); fprintf(stderr, "AwaitEvent: select failed\n"); EXIT(EXIT_FAILURE); } } - + + if (numFound != 0) { + /* + File descriptors ready, but we have don't know how much time was spent + in the select(). To interpolate, we compare the time before and after the + select(). + */ + + gettimeofday(&tv_after, (struct timezone *) NULL); + delta = (tv_after.tv_sec - tv_before.tv_sec) * 1000000 + + tv_after.tv_usec - tv_before.tv_usec; + + } + if (delta == 0) delta=min; + /* + Step through the waiting queue, unblocking every thread that now has + a file descriptor in a ready state. + + For the delayed threads, decrement the number of microsecs + we've been blocked for. Unblock the threads that have thusly expired. + */ + prev = NULL; - for(tso = WaitingThreadsHd; tso != Nil_closure; tso = next) { + for(tso = WaitingThreadsHd; tso != PrelBase_Z91Z93_closure; tso = next) { next = TSO_LINK(tso); us = (I_) TSO_EVENT(tso); if (us > 0) { @@ -86,26 +154,29 @@ AwaitEvent(I_ delta) ready = (us <= 0); if (!ready) TSO_EVENT(tso) = (W_) us; + } else if ( us <= (-(I_)FD_SETSIZE)) { + /* Looking at a waitWrite event */ + ready = FD_ISSET(((I_)FD_SETSIZE-us), &wfd); } else { - /* Looking at a wait event */ + /* Looking at a waitRead event */ ready = FD_ISSET((-us), &rfd); } if (ready) { #if defined(GRAN) - if (ThreadQueueTl == Nil_closure) + if (ThreadQueueTl == PrelBase_Z91Z93_closure) ThreadQueueHd = tso; else TSO_LINK(ThreadQueueTl) = tso; ThreadQueueTl = tso; - TSO_LINK(tso) = Nil_closure; + TSO_LINK(tso) = PrelBase_Z91Z93_closure; #else - if (RunnableThreadsTl == Nil_closure) + if (RunnableThreadsTl == PrelBase_Z91Z93_closure) RunnableThreadsHd = tso; else TSO_LINK(RunnableThreadsTl) = tso; RunnableThreadsTl = tso; - TSO_LINK(tso) = Nil_closure; + TSO_LINK(tso) = PrelBase_Z91Z93_closure; #endif } else { if (prev == NULL) @@ -116,9 +187,9 @@ AwaitEvent(I_ delta) } } if (prev == NULL) - WaitingThreadsHd = WaitingThreadsTl = Nil_closure; + WaitingThreadsHd = WaitingThreadsTl = PrelBase_Z91Z93_closure; else { - TSO_LINK(prev) = Nil_closure; + TSO_LINK(prev) = PrelBase_Z91Z93_closure; WaitingThreadsTl = prev; } }