X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSelect.c;h=677fdd2c8198103a9e0c095efc532593c405b113;hb=c389ab20bbfd0ea98953d630502a43fe44050f2c;hp=0a8ba6aa33e8e1007279c55896c6a97da03a22b2;hpb=5eb15889de25e2068de071b21b651454210fa4ee;p=ghc-hetmet.git diff --git a/ghc/rts/Select.c b/ghc/rts/Select.c index 0a8ba6a..677fdd2 100644 --- a/ghc/rts/Select.c +++ b/ghc/rts/Select.c @@ -1,22 +1,26 @@ /* ----------------------------------------------------------------------------- - * $Id: Select.c,v 1.1 1999/08/25 16:37:42 simonmar Exp $ + * $Id: Select.c,v 1.29 2003/06/26 12:22:59 stolz Exp $ * - * (c) The GHC Team 1995-1999 + * (c) The GHC Team 1995-2002 * * Support for concurrent non-blocking I/O and thread waiting. * * ---------------------------------------------------------------------------*/ + /* we're outside the realms of POSIX here... */ -#define NON_POSIX_SOURCE +/* #include "PosixSource.h" */ #include "Rts.h" #include "Schedule.h" #include "RtsUtils.h" #include "RtsFlags.h" +#include "Timer.h" #include "Itimer.h" +#include "Signals.h" +#include "Capability.h" -# if defined(HAVE_SYS_TYPES_H) +# ifdef HAVE_SYS_TYPES_H # include # endif @@ -24,12 +28,65 @@ # include # endif -nat ticks_since_select = 0; +#include +#include + +#ifdef HAVE_UNISTD_H +#include +#endif + +/* last timestamp */ +nat timestamp = 0; + +#ifdef RTS_SUPPORTS_THREADS +static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse; +static rtsBool workerWakeupPending = rtsFalse; +static int workerWakeupPipe[2]; +static rtsBool workerWakeupInited = rtsFalse; +#endif + +/* There's a clever trick here to avoid problems when the time wraps + * around. Since our maximum delay is smaller than 31 bits of ticks + * (it's actually 31 bits of microseconds), we can safely check + * whether a timer has expired even if our timer will wrap around + * before the target is reached, using the following formula: + * + * (int)((uint)current_time - (uint)target_time) < 0 + * + * if this is true, then our time has expired. + * (idea due to Andy Gill). + */ +rtsBool +wakeUpSleepingThreads(nat ticks) +{ + StgTSO *tso; + rtsBool flag = rtsFalse; + + while (sleeping_queue != END_TSO_QUEUE && + (int)(ticks - sleeping_queue->block_info.target) > 0) { + tso = sleeping_queue; + sleeping_queue = tso->link; + tso->why_blocked = NotBlocked; + tso->link = END_TSO_QUEUE; + IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id)); + PUSH_ON_RUN_QUEUE(tso); + flag = rtsTrue; + } + return flag; +} /* Argument 'wait' says whether to wait for I/O to become available, * or whether to just check and return immediately. If there are * other threads ready to run, we normally do the non-waiting variety, * otherwise we wait (see Schedule.c). + * + * SMP note: must be called with sched_mutex locked. + * + * Windows: select only works on sockets, so this doesn't really work, + * though it makes things better than before. MsgWaitForMultipleObjects + * should really be used, though it only seems to work for read handles, + * not write handles. + * */ void awaitEvent(rtsBool wait) @@ -37,138 +94,261 @@ awaitEvent(rtsBool wait) StgTSO *tso, *prev, *next; rtsBool ready; fd_set rfd,wfd; - int min, numFound, delta; + int numFound; int maxfd = -1; - - struct timeval tv,tv_before,tv_after; + rtsBool select_succeeded = rtsTrue; + rtsBool unblock_all = rtsFalse; + struct timeval tv; + lnat min, ticks; - IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n")); + tv.tv_sec = 0; + tv.tv_usec = 0; + + IF_DEBUG(scheduler, + belch("scheduler: checking for threads blocked on I/O"); + if (wait) { + belch(" (waiting)"); + } + belch("\n"); + ); - /* see how long it's been since we last checked the blocked queue. - * ToDo: make this check atomic, so we don't lose any ticks. + /* loop until we've woken up some threads. This loop is needed + * because the select timing isn't accurate, we sometimes sleep + * for a while but not long enough to wake up a thread in + * a threadDelay. */ - delta = ticks_since_select; - ticks_since_select = 0; - delta = delta * TICK_MILLISECS * 1000; + do { - min = wait == rtsTrue ? 0x7fffffff : 0; + ticks = timestamp = getourtimeofday(); + if (wakeUpSleepingThreads(ticks)) { + return; + } - /* - * Collect all of the fd's that we're interested in, and capture - * the minimum waiting time (in microseconds) for the delayed threads. - */ - FD_ZERO(&rfd); - FD_ZERO(&wfd); - - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { - next = tso->link; - - switch (tso->why_blocked) { - case BlockedOnRead: - { - int fd = tso->block_info.fd; - maxfd = (fd > maxfd) ? fd : maxfd; - FD_SET(fd, &rfd); - continue; - } + if (!wait) { + min = 0; + } else if (sleeping_queue != END_TSO_QUEUE) { + min = (sleeping_queue->block_info.target - ticks) + * TICK_MILLISECS * 1000; + } else { + min = 0x7ffffff; + } - case BlockedOnWrite: - { - int fd = tso->block_info.fd; - maxfd = (fd > maxfd) ? fd : maxfd; - FD_SET(fd, &wfd); - continue; - } + /* + * Collect all of the fd's that we're interested in + */ + FD_ZERO(&rfd); + FD_ZERO(&wfd); - case BlockedOnDelay: - { - if ((int)tso->block_info.delay < min) - min = tso->block_info.delay; - continue; + for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + next = tso->link; + + switch (tso->why_blocked) { + case BlockedOnRead: + { + int fd = tso->block_info.fd; + maxfd = (fd > maxfd) ? fd : maxfd; + FD_SET(fd, &rfd); + continue; + } + + case BlockedOnWrite: + { + int fd = tso->block_info.fd; + maxfd = (fd > maxfd) ? fd : maxfd; + FD_SET(fd, &wfd); + continue; + } + + default: + barf("AwaitEvent"); } + } - default: - barf("AwaitEvent"); +#ifdef RTS_SUPPORTS_THREADS + if(!workerWakeupInited) { + pipe(workerWakeupPipe); + workerWakeupInited = rtsTrue; } - } + FD_SET(workerWakeupPipe[0], &rfd); + maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd; +#endif + + /* Release the scheduler lock while we do the poll. + * this means that someone might muck with the blocked_queue + * while we do this, but it shouldn't matter: + * + * - another task might poll for I/O and remove one + * or more threads from the blocked_queue. + * - more I/O threads may be added to blocked_queue. + * - more delayed threads may be added to blocked_queue. We'll + * just subtract delta from their delays after the poll. + * + * I believe none of these cases lead to trouble --SDM. + */ + +#ifdef RTS_SUPPORTS_THREADS + isWorkerBlockedInAwaitEvent = rtsTrue; + workerWakeupPending = rtsFalse; +#endif + RELEASE_LOCK(&sched_mutex); - /* Check for any interesting events */ + /* Check for any interesting events */ + + tv.tv_sec = min / 1000000; + tv.tv_usec = min % 1000000; - tv.tv_sec = min / 1000000; - tv.tv_usec = min % 1000000; + while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) { + if (errno != EINTR) { + /* Handle bad file descriptors by unblocking all the + waiting threads. Why? Because a thread might have been + a bit naughty and closed a file descriptor while another + was blocked waiting. This is less-than-good programming + practice, but having the RTS as a result fall over isn't + acceptable, so we simply unblock all the waiting threads + should we see a bad file descriptor & give the threads + a chance to clean up their act. + + Note: assume here that threads becoming unblocked + will try to read/write the file descriptor before trying + to issue a threadWaitRead/threadWaitWrite again (==> an + IOError will result for the thread that's got the bad + file descriptor.) Hence, there's no danger of a bad + file descriptor being repeatedly select()'ed on, so + the RTS won't loop. + */ + if ( errno == EBADF ) { + unblock_all = rtsTrue; + break; + } else { + fprintf(stderr,"%d\n", errno); + fflush(stderr); + perror("select"); + barf("select failed"); + } + } + ACQUIRE_LOCK(&sched_mutex); +#ifdef RTS_SUPPORTS_THREADS + isWorkerBlockedInAwaitEvent = rtsFalse; +#endif - gettimeofday(&tv_before, (struct timezone *) NULL); + /* We got a signal; could be one of ours. If so, we need + * to start up the signal handler straight away, otherwise + * we could block for a long time before the signal is + * serviced. + */ +#if defined(RTS_USER_SIGNALS) + if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ + startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); + return; /* still hold the lock */ + } +#endif - while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) { - if (errno != EINTR) { - /* fflush(stdout); */ - fprintf(stderr, "awaitEvent: select failed\n"); - stg_exit(EXIT_FAILURE); + /* we were interrupted, return to the scheduler immediately. + */ + if (interrupted) { + return; /* still hold the lock */ + } + + /* check for threads that need waking up + */ + wakeUpSleepingThreads(getourtimeofday()); + + /* If new runnable threads have arrived, stop waiting for + * I/O and run them. + */ + if (run_queue_hd != END_TSO_QUEUE) { + return; /* still hold the lock */ + } + +#ifdef RTS_SUPPORTS_THREADS + /* If another worker thread wants to take over, + * return to the scheduler + */ + if (needToYieldToReturningWorker()) { + return; /* still hold the lock */ + } +#endif + +#ifdef RTS_SUPPORTS_THREADS + isWorkerBlockedInAwaitEvent = rtsTrue; +#endif + RELEASE_LOCK(&sched_mutex); } - } - - if (numFound != 0) { - /* - File descriptors ready, but we don't know how much time was spent - in the select(). To interpolate, we compare the time before - and after the select(). - */ - gettimeofday(&tv_after, (struct timezone *) NULL); - delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 + - tv_after.tv_usec - tv_before.tv_usec; + ACQUIRE_LOCK(&sched_mutex); - } else { - delta += min; - } - - /* - Step through the waiting queue, unblocking every thread that now has - a file descriptor in a ready state. + /* Step through the waiting queue, unblocking every thread that now has + * a file descriptor in a ready state. + */ - For the delayed threads, decrement the number of microsecs - we've been blocked for. Unblock the threads that have thusly expired. - */ + prev = NULL; + if (select_succeeded || unblock_all) { + for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + next = tso->link; + switch (tso->why_blocked) { + case BlockedOnRead: + ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd); + break; + case BlockedOnWrite: + ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd); + break; + default: + barf("awaitEvent"); + } + + if (ready) { + IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id)); + tso->why_blocked = NotBlocked; + tso->link = END_TSO_QUEUE; + PUSH_ON_RUN_QUEUE(tso); + } else { + if (prev == NULL) + blocked_queue_hd = tso; + else + prev->link = tso; + prev = tso; + } + } - prev = NULL; - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { - next = tso->link; - switch (tso->why_blocked) { - case BlockedOnRead: - ready = FD_ISSET(tso->block_info.fd, &rfd); - break; - - case BlockedOnWrite: - ready = FD_ISSET(tso->block_info.fd, &wfd); - break; - - case BlockedOnDelay: - tso->block_info.delay -= delta; - ready = (tso->block_info.delay <= 0); - break; - - default: - barf("awaitEvent"); + if (prev == NULL) + blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; + else { + prev->link = END_TSO_QUEUE; + blocked_queue_tl = prev; + } } - if (ready) { - IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id)); - tso->why_blocked = NotBlocked; - tso->link = END_TSO_QUEUE; - PUSH_ON_RUN_QUEUE(tso); - } else { - if (prev == NULL) - blocked_queue_hd = tso; - else - prev->link = tso; - prev = tso; +#if defined(RTS_SUPPORTS_THREADS) + // if we were woken up by wakeBlockedWorkerThread, + // read the dummy byte from the pipe + if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) { + unsigned char dummy; + wait = rtsFalse; + read(workerWakeupPipe[0],&dummy,1); } - } +#endif + } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE); +} + - if (prev == NULL) - blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; - else { - prev->link = END_TSO_QUEUE; - blocked_queue_tl = prev; +#ifdef RTS_SUPPORTS_THREADS +/* wakeBlockedWorkerThread + * + * If a worker thread is currently blocked within awaitEvent, + * wake it. + * Must be called with sched_mutex held. + */ +void +wakeBlockedWorkerThread() +{ + if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) { + unsigned char dummy = 42; // Any value will do here + + // write something so that select() wakes up + write(workerWakeupPipe[1],&dummy,1); + workerWakeupPending = rtsTrue; } } +#endif