X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSelect.c;h=26870641bc02539b97e1f39d6107de2b16efce0a;hb=6f44f99a4d26ea518a109b073dca7b72bd551a6f;hp=dc19cbfa635c676fac8e45edfe4b3fdb6ff76b4f;hpb=126ebbae699dc0d3dcd5f8b6e25907132e47a668;p=ghc-hetmet.git diff --git a/ghc/rts/Select.c b/ghc/rts/Select.c index dc19cbf..2687064 100644 --- a/ghc/rts/Select.c +++ b/ghc/rts/Select.c @@ -1,5 +1,4 @@ /* ----------------------------------------------------------------------------- - * $Id: Select.c,v 1.22 2002/07/24 03:38:58 sof Exp $ * * (c) The GHC Team 1995-2002 * @@ -7,6 +6,7 @@ * * ---------------------------------------------------------------------------*/ + /* we're outside the realms of POSIX here... */ /* #include "PosixSource.h" */ @@ -14,8 +14,10 @@ #include "Schedule.h" #include "RtsUtils.h" #include "RtsFlags.h" +#include "Timer.h" #include "Itimer.h" #include "Signals.h" +#include "Capability.h" # ifdef HAVE_SYS_TYPES_H # include @@ -25,16 +27,23 @@ # include # endif -# ifdef mingw32_TARGET_OS -# include -# endif - #include #include +#ifdef HAVE_UNISTD_H +#include +#endif + /* last timestamp */ nat timestamp = 0; +#ifdef RTS_SUPPORTS_THREADS +static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse; +static rtsBool workerWakeupPending = rtsFalse; +static int workerWakeupPipe[2]; +static rtsBool workerWakeupInited = rtsFalse; +#endif + /* There's a clever trick here to avoid problems when the time wraps * around. Since our maximum delay is smaller than 31 bits of ticks * (it's actually 31 bits of microseconds), we can safely check @@ -58,7 +67,7 @@ wakeUpSleepingThreads(nat ticks) sleeping_queue = tso->link; tso->why_blocked = NotBlocked; tso->link = END_TSO_QUEUE; - IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id)); + IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %d\n", tso->id)); PUSH_ON_RUN_QUEUE(tso); flag = rtsTrue; } @@ -84,10 +93,8 @@ awaitEvent(rtsBool wait) StgTSO *tso, *prev, *next; rtsBool ready; fd_set rfd,wfd; -#ifndef mingw32_TARGET_OS int numFound; int maxfd = -1; -#endif rtsBool select_succeeded = rtsTrue; rtsBool unblock_all = rtsFalse; struct timeval tv; @@ -97,11 +104,11 @@ awaitEvent(rtsBool wait) tv.tv_usec = 0; IF_DEBUG(scheduler, - belch("scheduler: checking for threads blocked on I/O"); + debugBelch("scheduler: checking for threads blocked on I/O"); if (wait) { - belch(" (waiting)"); + debugBelch(" (waiting)"); } - belch("\n"); + debugBelch("\n"); ); /* loop until we've woken up some threads. This loop is needed @@ -125,7 +132,6 @@ awaitEvent(rtsBool wait) min = 0x7ffffff; } -#ifndef mingw32_TARGET_OS /* * Collect all of the fd's that we're interested in */ @@ -157,6 +163,15 @@ awaitEvent(rtsBool wait) } } +#ifdef RTS_SUPPORTS_THREADS + if(!workerWakeupInited) { + pipe(workerWakeupPipe); + workerWakeupInited = rtsTrue; + } + FD_SET(workerWakeupPipe[0], &rfd); + maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd; +#endif + /* Release the scheduler lock while we do the poll. * this means that someone might muck with the blocked_queue * while we do this, but it shouldn't matter: @@ -169,6 +184,11 @@ awaitEvent(rtsBool wait) * * I believe none of these cases lead to trouble --SDM. */ + +#ifdef RTS_SUPPORTS_THREADS + isWorkerBlockedInAwaitEvent = rtsTrue; + workerWakeupPending = rtsFalse; +#endif RELEASE_LOCK(&sched_mutex); /* Check for any interesting events */ @@ -199,24 +219,21 @@ awaitEvent(rtsBool wait) unblock_all = rtsTrue; break; } else { - fprintf(stderr,"%d\n", errno); - fflush(stderr); perror("select"); barf("select failed"); } } -#else /* on mingwin */ - while (1) { - Sleep(0); /* don't busy wait */ -#endif /* mingw32_TARGET_OS */ ACQUIRE_LOCK(&sched_mutex); +#ifdef RTS_SUPPORTS_THREADS + isWorkerBlockedInAwaitEvent = rtsFalse; +#endif -#ifndef mingw32_TARGET_OS /* We got a signal; could be one of ours. If so, we need * to start up the signal handler straight away, otherwise * we could block for a long time before the signal is * serviced. */ +#if defined(RTS_USER_SIGNALS) if (signals_pending()) { RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); @@ -242,6 +259,18 @@ awaitEvent(rtsBool wait) return; /* still hold the lock */ } +#ifdef RTS_SUPPORTS_THREADS + /* If another worker thread wants to take over, + * return to the scheduler + */ + if (needToYieldToReturningWorker()) { + return; /* still hold the lock */ + } +#endif + +#ifdef RTS_SUPPORTS_THREADS + isWorkerBlockedInAwaitEvent = rtsTrue; +#endif RELEASE_LOCK(&sched_mutex); } @@ -267,7 +296,7 @@ awaitEvent(rtsBool wait) } if (ready) { - IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id)); + IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %d\n", tso->id)); tso->why_blocked = NotBlocked; tso->link = END_TSO_QUEUE; PUSH_ON_RUN_QUEUE(tso); @@ -287,6 +316,52 @@ awaitEvent(rtsBool wait) blocked_queue_tl = prev; } } - + +#if defined(RTS_SUPPORTS_THREADS) + // if we were woken up by wakeBlockedWorkerThread, + // read the dummy byte from the pipe + if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) { + unsigned char dummy; + wait = rtsFalse; + read(workerWakeupPipe[0],&dummy,1); + } +#endif } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE); } + + +#ifdef RTS_SUPPORTS_THREADS +/* wakeBlockedWorkerThread + * + * If a worker thread is currently blocked within awaitEvent, + * wake it. + * Must be called with sched_mutex held. + */ +void +wakeBlockedWorkerThread() +{ + if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) { + unsigned char dummy = 42; // Any value will do here + + // write something so that select() wakes up + write(workerWakeupPipe[1],&dummy,1); + workerWakeupPending = rtsTrue; + } +} + +/* resetWorkerWakeupPipeAfterFork + * + * To be called right after a fork(). + * After the fork(), the worker wakeup pipe will be shared + * with the parent process, and that's something we don't want. + */ +void +resetWorkerWakeupPipeAfterFork() +{ + if(workerWakeupInited) { + close(workerWakeupPipe[0]); + close(workerWakeupPipe[1]); + } + workerWakeupInited = rtsFalse; +} +#endif