From 324e96d2ebfcb113cd97c43ef043d591ef87de71 Mon Sep 17 00:00:00 2001 From: wolfgang Date: Wed, 1 Oct 2003 10:49:09 +0000 Subject: [PATCH] [project @ 2003-10-01 10:49:07 by wolfgang] Threaded RTS: Don't start new worker threads earlier than necessary. After this commit, a Haskell program that uses neither forkOS nor forkIO is really single-threaded (rather than using two OS threads internally). Some details: Worker threads are now only created when a capability is released, and only when (there are no worker threads) && (there are runnable Haskell threads || there are Haskell threads blocked on IO or threadDelay) awaitEvent can now be called from bound thread scheduling loops (so that we don't have to create a worker thread just to run awaitEvent) --- ghc/rts/Capability.c | 37 ++++++++++++++++++++++++++++++++--- ghc/rts/Capability.h | 1 + ghc/rts/RtsAPI.c | 8 +------- ghc/rts/Schedule.c | 53 +++++++++++++++++++++++++++++++++++--------------- ghc/rts/Schedule.h | 11 ++++++----- ghc/rts/Select.c | 18 ++++++++++++++++- ghc/rts/Task.c | 13 +++++++++++++ ghc/rts/Task.h | 1 + 8 files changed, 110 insertions(+), 32 deletions(-) diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 74d50ac..d748aee 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -199,6 +199,8 @@ void releaseCapability(Capability* cap #endif /* Signal that a capability is available */ signalCondition(&thread_ready_cond); + startSchedulerTaskIfNecessary(); // if there is more work to be done, + // we'll need a new thread } #endif #ifdef RTS_SUPPORTS_THREADS @@ -324,6 +326,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) */ static Condition *passTarget = NULL; +static rtsBool passingCapability = rtsFalse; void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) @@ -334,8 +337,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) IF_DEBUG(scheduler, fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n", osThreadId(),pThreadCond)); - while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond) - || (!pThreadCond && passTarget)) { + while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) { if(pThreadCond) { waitCondition(pThreadCond, pMutex); @@ -353,7 +355,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) osThreadId())); } } - passTarget = NULL; + passingCapability = rtsFalse; grabCapability(pCap); return; } @@ -378,11 +380,40 @@ passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond) rts_n_free_capabilities = 1; signalCondition(pTargetThreadCond); passTarget = pTargetThreadCond; + passingCapability = rtsTrue; IF_DEBUG(scheduler, fprintf(stderr,"worker thread (%p): passCapability\n", osThreadId())); } +/* + * Function: passCapabilityToWorker(Mutex*, Capability*) + * + * Purpose: Let go of the capability and make sure that a + * "plain" worker thread (not a bound thread) gets it next. + * + * Pre-condition: pMutex is held and cap is held by the current thread + * Post-condition: pMutex is held; cap will be grabbed by the "target" + * thread when pMutex is released. + */ + +void +passCapabilityToWorker(Mutex* pMutex, Capability* cap) +{ +#ifdef SMP + #error SMP version not implemented +#endif + rts_n_free_capabilities = 1; + signalCondition(&thread_ready_cond); + startSchedulerTaskIfNecessary(); + passTarget = NULL; + passingCapability = rtsTrue; + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): passCapabilityToWorker\n", + osThreadId())); +} + + #endif /* RTS_SUPPORTS_THREADS */ diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index 70acc15..ede787b 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -42,6 +42,7 @@ extern void grabReturnCapability(Mutex* pMutex, Capability** pCap); extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond); extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond); extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond); +extern void passCapabilityToWorker(Mutex* pMutex, Capability* cap); static inline rtsBool needToYieldToReturningWorker(void) { diff --git a/ghc/rts/RtsAPI.c b/ghc/rts/RtsAPI.c index 835db72..8d1cfd9 100644 --- a/ghc/rts/RtsAPI.c +++ b/ghc/rts/RtsAPI.c @@ -1,5 +1,5 @@ /* ---------------------------------------------------------------------------- - * $Id: RtsAPI.c,v 1.48 2003/10/01 10:36:49 wolfgang Exp $ + * $Id: RtsAPI.c,v 1.49 2003/10/01 10:49:07 wolfgang Exp $ * * (c) The GHC Team, 1998-2001 * @@ -501,12 +501,6 @@ rts_lock() // b) wake the current worker thread from awaitEvent() // (so that a thread started by rts_eval* will start immediately) grabReturnCapability(&sched_mutex,&rtsApiCapability); - - // In the RTS hasn't been entered yet, - // start a RTS task. - // If there is already a task available (waiting for the work capability), - // this will do nothing. - startSchedulerTask(); #endif } diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 2754c4f..33db7e6 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.175 2003/09/26 13:32:14 panne Exp $ + * $Id: Schedule.c,v 1.176 2003/10/01 10:49:08 wolfgang Exp $ * * (c) The GHC Team, 1998-2000 * @@ -313,20 +313,38 @@ StgTSO * activateSpark (rtsSpark spark); StgTSO *MainTSO; */ -#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) +static rtsBool startingWorkerThread = rtsFalse; + static void taskStart(void); static void taskStart(void) { - schedule(NULL,NULL); + Capability *cap; + + ACQUIRE_LOCK(&sched_mutex); + startingWorkerThread = rtsFalse; + waitForWorkCapability(&sched_mutex, &cap, NULL); + RELEASE_LOCK(&sched_mutex); + + schedule(NULL,cap); } -#endif -#if defined(RTS_SUPPORTS_THREADS) void -startSchedulerTask(void) +startSchedulerTaskIfNecessary(void) { - startTask(taskStart); + if(run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE) + { + if(!startingWorkerThread) + { // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread = rtsTrue; + startTask(taskStart); + } + } } #endif @@ -475,7 +493,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // so just exit right away. prog_belch("interrupted"); releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit RELEASE_LOCK(&sched_mutex); shutdownHaskellAndExit(EXIT_SUCCESS); #else @@ -1151,7 +1168,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread PUSH_ON_RUN_QUEUE(t); - releaseCapability(cap); + passCapabilityToWorker(&sched_mutex, cap); cap = NULL; continue; } @@ -1830,9 +1847,6 @@ suspendThread( StgRegTable *reg, waiting to take over. */ IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId())); - //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult - startTask(taskStart); - //} #endif /* Other threads _might_ be available for execution; signal this */ @@ -2245,9 +2259,10 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCap m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) - initCondition(&m->wakeup); #if defined(THREADED_RTS) initCondition(&m->bound_thread_cond); +#else + initCondition(&m->wakeup); #endif #endif @@ -2459,9 +2474,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability) m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) - initCondition(&m->wakeup); #if defined(THREADED_RTS) initCondition(&m->bound_thread_cond); +#else + initCondition(&m->wakeup); #endif #endif @@ -2512,9 +2528,10 @@ waitThread_(StgMainThread* m, Capability *initialCapability) stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) - closeCondition(&m->wakeup); #if defined(THREADED_RTS) closeCondition(&m->bound_thread_cond); +#else + closeCondition(&m->wakeup); #endif #endif @@ -3498,7 +3515,11 @@ deleteThreadImmediately(StgTSO *tso) if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; } - unblockThread(tso); +#if defined(RTS_SUPPORTS_THREADS) + if (tso->why_blocked != BlockedOnCCall + && tso->why_blocked != BlockedOnCCall_NoUnblockExc) +#endif + unblockThread(tso); tso->what_next = ThreadKilled; } #endif diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index fccac3c..ebbe7d1 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $ + * $Id: Schedule.h,v 1.40 2003/10/01 10:49:09 wolfgang Exp $ * * (c) The GHC Team 1998-1999 * @@ -190,9 +190,10 @@ typedef struct StgMainThread_ { SchedulerStatus stat; StgClosure ** ret; #if defined(RTS_SUPPORTS_THREADS) - Condition wakeup; #if defined(THREADED_RTS) Condition bound_thread_cond; +#else + Condition wakeup; #endif #endif struct StgMainThread_ *link; @@ -297,12 +298,12 @@ void labelThread(StgPtr tso, char *label); #if defined(RTS_SUPPORTS_THREADS) /* If no task is waiting for a capability, + * and if there is work to be done + * or if we need to wait for IO or delay requests, * spawn a new worker thread. - * - * (Used by the RtsAPI) */ void -startSchedulerTask(void); +startSchedulerTaskIfNecessary(void); #endif #endif /* __SCHEDULE_H__ */ diff --git a/ghc/rts/Select.c b/ghc/rts/Select.c index 677fdd2..d7e6ffc 100644 --- a/ghc/rts/Select.c +++ b/ghc/rts/Select.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Select.c,v 1.29 2003/06/26 12:22:59 stolz Exp $ + * $Id: Select.c,v 1.30 2003/10/01 10:49:09 wolfgang Exp $ * * (c) The GHC Team 1995-2002 * @@ -351,4 +351,20 @@ wakeBlockedWorkerThread() workerWakeupPending = rtsTrue; } } + +/* resetWorkerWakeupPipeAfterFork + * + * To be called right after a fork(). + * After the fork(), the worker wakeup pipe will be shared + * with the parent process, and that's something we don't want. + */ +void +resetWorkerWakeupPipeAfterFork() +{ + if(workerWakeupInited) { + close(workerWakeupPipe[0]); + close(workerWakeupPipe[1]); + } + workerWakeupInited = rtsFalse; +} #endif diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index 92b5c25..c720538 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -134,6 +134,12 @@ stopTaskManager () return; } +void +resetTaskManagerAfterFork () +{ + barf("resetTaskManagerAfterFork not implemented for SMP"); +} + #else /************ THREADS version *****************/ @@ -192,6 +198,13 @@ stopTaskManager () { } + +void +resetTaskManagerAfterFork () +{ + rts_n_waiting_tasks = 0; + taskCount = 0; +} #endif diff --git a/ghc/rts/Task.h b/ghc/rts/Task.h index bf29d91..ee59987 100644 --- a/ghc/rts/Task.h +++ b/ghc/rts/Task.h @@ -28,6 +28,7 @@ extern TaskInfo *taskIds; extern void startTaskManager ( nat maxTasks, void (*taskStart)(void) ); extern void stopTaskManager ( void ); +void resetTaskManagerAfterFork (); extern void startTask ( void (*taskStart)(void) ); -- 1.7.10.4