From: wolfgang Date: Sun, 21 Sep 2003 22:20:57 +0000 (+0000) Subject: [project @ 2003-09-21 22:20:51 by wolfgang] X-Git-Tag: Approx_11550_changesets_converted~433 X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=commitdiff_plain;h=85aa72b9dc6803685595936c61f3cab6faab815a [project @ 2003-09-21 22:20:51 by wolfgang] Bound Threads ============= Introduce a way to use foreign libraries that rely on thread local state from multiple threads (mainly affects the threaded RTS). See the file threads.tex in CVS at haskell-report/ffi/threads.tex (not entirely finished yet) for a definition of this extension. A less formal description is also found in the documentation of Control.Concurrent. The changes mostly affect the THREADED_RTS (./configure --enable-threaded-rts), except for saving & restoring errno on a per-TSO basis, which is also necessary for the non-threaded RTS (a bugfix). Detailed list of changes ------------------------ - errno is saved in the TSO object and restored when necessary: ghc/includes/TSO.h, ghc/rts/Interpreter.c, ghc/rts/Schedule.c - rts_mainLazyIO is no longer needed, main is no special case anymore ghc/includes/RtsAPI.h, ghc/rts/RtsAPI.c, ghc/rts/Main.c, ghc/rts/Weak.c - passCapability: a new function that releases the capability and "passes" it to a specific OS thread: ghc/rts/Capability.h ghc/rts/Capability.c - waitThread(), scheduleWaitThread() and schedule() get an optional Capability *initialCapability passed as an argument: ghc/includes/SchedAPI.h, ghc/rts/Schedule.c, ghc/rts/RtsAPI.c - Bound Thread scheduling (that's what this is all about): ghc/rts/Schedule.h, ghc/rts/Schedule.c - new Primop isCurrentThreadBound#: ghc/compiler/prelude/primops.txt.pp, ghc/includes/PrimOps.h, ghc/rts/PrimOps.hc, ghc/rts/Schedule.h, ghc/rts/Schedule.c - a simple function, rtsSupportsBoundThreads, that returns true if THREADED_RTS is defined: ghc/rts/Schedule.h, ghc/rts/Schedule.c - a new implementation of forkProcess (the old implementation stays in place for the non-threaded case). Partially broken; works for the standard fork-and-exec case, but not for much else. A proper forkProcess is really next to impossible to implement: ghc/rts/Schedule.c - Library support for bound threads: Control.Concurrent. rtsSupportsBoundThreads, isCurrentThreadBound, forkOS, runInBoundThread, runInUnboundThread libraries/base/Control/Concurrent.hs, libraries/base/Makefile, libraries/base/include/HsBase.h, libraries/base/cbits/forkOS.c (new file) --- diff --git a/ghc/compiler/prelude/primops.txt.pp b/ghc/compiler/prelude/primops.txt.pp index f5fd8a7..af4a244 100644 --- a/ghc/compiler/prelude/primops.txt.pp +++ b/ghc/compiler/prelude/primops.txt.pp @@ -1,5 +1,5 @@ ----------------------------------------------------------------------- --- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $ +-- $Id: primops.txt.pp,v 1.29 2003/09/21 22:20:51 wolfgang Exp $ -- -- Primitive Operations -- @@ -1494,6 +1494,11 @@ primop LabelThreadOp "labelThread#" GenPrimOp with has_side_effects = True out_of_line = True + +primop IsCurrentThreadBoundOp "isCurrentThreadBound#" GenPrimOp + State# RealWorld -> (# State# RealWorld, Int# #) + with + out_of_line = True ------------------------------------------------------------------------ section "Weak pointers" diff --git a/ghc/includes/PrimOps.h b/ghc/includes/PrimOps.h index cf67e61..9576f24 100644 --- a/ghc/includes/PrimOps.h +++ b/ghc/includes/PrimOps.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $ + * $Id: PrimOps.h,v 1.104 2003/09/21 22:20:52 wolfgang Exp $ * * (c) The GHC Team, 1998-2000 * @@ -281,6 +281,7 @@ EXTFUN_RTS(blockAsyncExceptionszh_fast); EXTFUN_RTS(unblockAsyncExceptionszh_fast); EXTFUN_RTS(myThreadIdzh_fast); EXTFUN_RTS(labelThreadzh_fast); +EXTFUN_RTS(isCurrentThreadBoundzh_fast); extern int cmp_thread(StgPtr tso1, StgPtr tso2); extern int rts_getThreadId(StgPtr tso); @@ -417,4 +418,5 @@ EXTFUN_RTS(mkApUpd0zh_fast); -------------------------------------------------------------------------- */ #define ForeignObj_CLOSURE_DATA(c) (((StgForeignObj *)c)->data) + #endif /* PRIMOPS_H */ diff --git a/ghc/includes/RtsAPI.h b/ghc/includes/RtsAPI.h index 73a7354..d8e772f 100644 --- a/ghc/includes/RtsAPI.h +++ b/ghc/includes/RtsAPI.h @@ -1,5 +1,5 @@ /* ---------------------------------------------------------------------------- - * $Id: RtsAPI.h,v 1.35 2003/08/22 22:38:02 sof Exp $ + * $Id: RtsAPI.h,v 1.36 2003/09/21 22:20:52 wolfgang Exp $ * * (c) The GHC Team, 1998-1999 * @@ -113,17 +113,14 @@ rts_eval_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret ); SchedulerStatus rts_evalIO ( HaskellObj p, /*out*/HaskellObj *ret ); -#if defined(COMPILING_RTS_MAIN) -/* Used by the RTS' main() only */ -SchedulerStatus -rts_mainLazyIO ( HaskellObj p, /*out*/HaskellObj *ret ); -#endif - SchedulerStatus rts_evalStableIO ( HsStablePtr s, /*out*/HsStablePtr *ret ); SchedulerStatus -rts_evalLazyIO ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret ); +rts_evalLazyIO ( HaskellObj p, /*out*/HaskellObj *ret ); + +SchedulerStatus +rts_evalLazyIO_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret ); void rts_checkSchedStatus ( char* site, SchedulerStatus rc); diff --git a/ghc/includes/SchedAPI.h b/ghc/includes/SchedAPI.h index 524b1da..b0cee60 100644 --- a/ghc/includes/SchedAPI.h +++ b/ghc/includes/SchedAPI.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: SchedAPI.h,v 1.17 2002/12/27 12:33:21 panne Exp $ + * $Id: SchedAPI.h,v 1.18 2003/09/21 22:20:53 wolfgang Exp $ * * (c) The GHC Team 1998-2002 * @@ -16,7 +16,8 @@ #define NO_PRI 0 #endif -extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret); +extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret, + Capability *initialCapability); /* * Creating threads @@ -30,7 +31,8 @@ extern StgTSO *createThread(nat stack_size); extern void taskStart(void); #endif extern void scheduleThread(StgTSO *tso); -extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret); +extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret, + Capability *initialCapability); static inline void pushClosure (StgTSO *tso, StgWord c) { tso->sp--; diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index 7c6e1c0..56bc726 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $ + * $Id: TSO.h,v 1.32 2003/09/21 22:20:53 wolfgang Exp $ * * (c) The GHC Team, 1998-1999 * @@ -194,7 +194,8 @@ typedef struct StgTSO_ { StgTSOBlockInfo block_info; struct StgTSO_* blocked_exceptions; StgThreadID id; - + int saved_errno; + StgTSOTickyInfo ticky; StgTSOProfInfo prof; StgTSOParInfo par; diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index d96b724..a2910ad 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -149,6 +149,9 @@ void grabCapability(Capability** cap) free_capabilities = (*cap)->link; rts_n_free_capabilities--; #endif + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): got capability\n", + osThreadId())); } /* @@ -196,6 +199,9 @@ void releaseCapability(Capability* cap signalCondition(&thread_ready_cond); } #endif + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): released capability\n", + osThreadId())); return; } @@ -236,9 +242,9 @@ void grabReturnCapability(Mutex* pMutex, Capability** pCap) { IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId())); + fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId())); IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n", + fprintf(stderr,"worker (%p): returning; workers waiting: %d\n", osThreadId(), rts_n_waiting_workers)); if ( noCapabilities() ) { rts_n_waiting_workers++; @@ -265,27 +271,30 @@ grabReturnCapability(Mutex* pMutex, Capability** pCap) -------------------------------------------------------------------------- */ /* - * Function: yieldToReturningWorker(Mutex*,Capability*) + * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*) * * Purpose: when, upon entry to the Scheduler, an OS worker thread * spots that one or more threads are blocked waiting for * permission to return back their result, it gives up - * its Capability. + * its Capability. + * Immediately afterwards, it tries to reaquire the Capabilty + * using waitForWorkCapability. + * * * Pre-condition: pMutex is assumed held and the thread possesses * a Capability. - * Post-condition: pMutex is held and the Capability has - * been given back. + * Post-condition: pMutex is held and the thread possesses + * a Capability. */ void -yieldToReturningWorker(Mutex* pMutex, Capability** pCap) +yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) { if ( rts_n_waiting_workers > 0 ) { IF_DEBUG(scheduler, fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId())); releaseCapability(*pCap); /* And wait for work */ - waitForWorkCapability(pMutex, pCap, rtsFalse); + waitForWorkCapability(pMutex, pCap, pThreadCond); IF_DEBUG(scheduler, fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n", osThreadId())); @@ -295,7 +304,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap) /* - * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool) + * Function: waitForWorkCapability(Mutex*, Capability**, Condition*) * * Purpose: wait for a Capability to become available. In * the process of doing so, updates the number @@ -303,22 +312,74 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap) * work. That counter is used when deciding whether or * not to create a new worker thread when an external * call is made. + * If pThreadCond is not NULL, a capability can be specifically + * passed to this thread using passCapability. * * Pre-condition: pMutex is held. * Post-condition: pMutex is held and *pCap is held by the current thread */ + +static Condition *passTarget = NULL; + void -waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable) +waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) { - while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; +#ifdef SMP + #error SMP version not implemented +#endif + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n", + osThreadId(),pThreadCond)); + while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond) + || (!pThreadCond && passTarget)) { + if(pThreadCond) + { + waitCondition(pThreadCond, pMutex); + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): get passed capability\n", + osThreadId())); + } + else + { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, pMutex); + rts_n_waiting_tasks--; + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): get normal capability\n", + osThreadId())); + } } + passTarget = NULL; grabCapability(pCap); return; } +/* + * Function: passCapability(Mutex*, Capability*, Condition*) + * + * Purpose: Let go of the capability and make sure the thread associated + * with the Condition pTargetThreadCond gets it next. + * + * Pre-condition: pMutex is held and cap is held by the current thread + * Post-condition: pMutex is held; cap will be grabbed by the "target" + * thread when pMutex is released. + */ + +void +passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond) +{ +#ifdef SMP + #error SMP version not implemented +#endif + rts_n_free_capabilities = 1; + signalCondition(pTargetThreadCond); + passTarget = pTargetThreadCond; + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): passCapability\n", + osThreadId())); +} + + #endif /* RTS_SUPPORTS_THREADS */ #if defined(SMP) diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index dd6a7be..70acc15 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -39,9 +39,9 @@ extern nat rts_n_free_capabilities; extern nat rts_n_waiting_workers; extern void grabReturnCapability(Mutex* pMutex, Capability** pCap); -extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap); -extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable); - +extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond); +extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond); +extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond); static inline rtsBool needToYieldToReturningWorker(void) { diff --git a/ghc/rts/Interpreter.c b/ghc/rts/Interpreter.c index 7f408d7..888f3b8 100644 --- a/ghc/rts/Interpreter.c +++ b/ghc/rts/Interpreter.c @@ -26,6 +26,11 @@ #include "Disassembler.h" #include "Interpreter.h" +#include /* for memcpy */ +#ifdef HAVE_ERRNO_H +#include +#endif + /* -------------------------------------------------------------------------- * The bytecode interpreter @@ -1172,6 +1177,9 @@ run_BCO: memcpy(arguments, Sp, sizeof(W_) * stk_offset); #endif + // Restore the Haskell thread's current value of errno + errno = cap->r.rCurrentTSO->saved_errno; + // There are a bunch of non-ptr words on the stack (the // ccall args, the ccall fun address and space for the // result), which we need to cover with an info table @@ -1208,6 +1216,9 @@ run_BCO: LOAD_STACK_POINTERS; Sp += ret_dyn_size; + // Save the Haskell thread's current value of errno + cap->r.rCurrentTSO->saved_errno = errno; + #ifdef RTS_SUPPORTS_THREADS // Threaded RTS: // Copy the "arguments", which might include a return value, diff --git a/ghc/rts/Linker.c b/ghc/rts/Linker.c index 8ec5972..6753876 100644 --- a/ghc/rts/Linker.c +++ b/ghc/rts/Linker.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Linker.c,v 1.129 2003/09/11 15:12:25 wolfgang Exp $ + * $Id: Linker.c,v 1.130 2003/09/21 22:20:54 wolfgang Exp $ * * (c) The GHC Team, 2000-2003 * @@ -381,6 +381,7 @@ typedef struct _RtsSymbolVal { SymX(int2Integerzh_fast) \ SymX(integer2Intzh_fast) \ SymX(integer2Wordzh_fast) \ + SymX(isCurrentThreadBoundzh_fast) \ SymX(isDoubleDenormalized) \ SymX(isDoubleInfinite) \ SymX(isDoubleNaN) \ @@ -422,6 +423,7 @@ typedef struct _RtsSymbolVal { SymX(rts_eval) \ SymX(rts_evalIO) \ SymX(rts_evalLazyIO) \ + SymX(rts_evalStableIO) \ SymX(rts_eval_) \ SymX(rts_getBool) \ SymX(rts_getChar) \ @@ -455,6 +457,7 @@ typedef struct _RtsSymbolVal { SymX(rts_mkWord64) \ SymX(rts_mkWord8) \ SymX(rts_unlock) \ + SymX(rtsSupportsBoundThreads) \ SymX(run_queue_hd) \ SymX(setProgArgv) \ SymX(startupHaskell) \ diff --git a/ghc/rts/Main.c b/ghc/rts/Main.c index a651eaa..6029921 100644 --- a/ghc/rts/Main.c +++ b/ghc/rts/Main.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Main.c,v 1.39 2003/07/10 08:02:29 simonpj Exp $ + * $Id: Main.c,v 1.40 2003/09/21 22:20:55 wolfgang Exp $ * * (c) The GHC Team 1998-2000 * @@ -105,7 +105,9 @@ int main(int argc, char *argv[]) # else /* !PAR && !GRAN */ /* ToDo: want to start with a larger stack size */ - status = rts_mainLazyIO((HaskellObj)mainIO_closure, NULL); + rts_lock(); + status = rts_evalLazyIO((HaskellObj)mainIO_closure, NULL); + rts_unlock(); # endif /* !PAR && !GRAN */ diff --git a/ghc/rts/Makefile b/ghc/rts/Makefile index 1e39b6f..6a75b87 100644 --- a/ghc/rts/Makefile +++ b/ghc/rts/Makefile @@ -110,11 +110,17 @@ ifeq "$(way)" "mp" SRC_HC_OPTS += -I$$PVM_ROOT/include endif -# Currently, you only get 'threads support' in the normal -# way. +# You get 'threads support' in the normal +# and profiling ways. ifeq "$(GhcRtsThreaded)" "YES" ifeq "$(way)" "" SRC_CC_OPTS += -DTHREADED_RTS +SRC_HC_OPTS += -optc-DTHREADED_RTS +PACKAGE_CPP_OPTS += -DTHREADED_RTS +endif +ifeq "$(way)" "p" +SRC_CC_OPTS += -DTHREADED_RTS +SRC_HC_OPTS += -optc-DTHREADED_RTS PACKAGE_CPP_OPTS += -DTHREADED_RTS endif endif diff --git a/ghc/rts/PrimOps.hc b/ghc/rts/PrimOps.hc index 53fabf6..9b3f3a4 100644 --- a/ghc/rts/PrimOps.hc +++ b/ghc/rts/PrimOps.hc @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: PrimOps.hc,v 1.112 2003/09/12 16:32:13 sof Exp $ + * $Id: PrimOps.hc,v 1.113 2003/09/21 22:20:55 wolfgang Exp $ * * (c) The GHC Team, 1998-2002 * @@ -1095,6 +1095,15 @@ FN_(labelThreadzh_fast) FE_ } +FN_(isCurrentThreadBoundzh_fast) +{ + /* no args */ + I_ r; + FB_ + r = (I_)(RET_STGCALL1(StgBool, isThreadBound, CurrentTSO)); + RET_N(r); + FE_ +} /* ----------------------------------------------------------------------------- * MVar primitives @@ -1736,3 +1745,4 @@ FN_(asyncDoProczh_fast) FE_ } #endif + diff --git a/ghc/rts/RtsAPI.c b/ghc/rts/RtsAPI.c index 3236d1e..651a497 100644 --- a/ghc/rts/RtsAPI.c +++ b/ghc/rts/RtsAPI.c @@ -1,5 +1,5 @@ /* ---------------------------------------------------------------------------- - * $Id: RtsAPI.c,v 1.45 2003/08/28 16:33:42 simonmar Exp $ + * $Id: RtsAPI.c,v 1.46 2003/09/21 22:20:56 wolfgang Exp $ * * (c) The GHC Team, 1998-2001 * @@ -21,6 +21,8 @@ #include +static Capability *rtsApiCapability = NULL; + /* ---------------------------------------------------------------------------- Building Haskell objects from C datatypes. ------------------------------------------------------------------------- */ @@ -385,7 +387,7 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret) StgTSO *tso; tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p); - return scheduleWaitThread(tso,ret); + return scheduleWaitThread(tso,ret,rtsApiCapability); } SchedulerStatus @@ -394,7 +396,7 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) StgTSO *tso; tso = createGenThread(stack_size, p); - return scheduleWaitThread(tso,ret); + return scheduleWaitThread(tso,ret,rtsApiCapability); } /* @@ -407,22 +409,7 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret) StgTSO* tso; tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); - return scheduleWaitThread(tso,ret); -} - -/* - * Identical to rts_evalLazyIO(), but won't create a new task/OS thread - * to evaluate the Haskell thread. Used by main() only. Hack. - */ - -SchedulerStatus -rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret) -{ - StgTSO* tso; - - tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p); - scheduleThread(tso); - return waitThread(tso, ret); + return scheduleWaitThread(tso,ret,rtsApiCapability); } /* @@ -440,9 +427,9 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret) p = (StgClosure *)deRefStablePtr(s); tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); - stat = scheduleWaitThread(tso,&r); + stat = scheduleWaitThread(tso,&r,rtsApiCapability); - if (stat == Success) { + if (stat == Success && ret != NULL) { ASSERT(r != NULL); *ret = getStablePtr((StgPtr)r); } @@ -454,12 +441,21 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret) * Like rts_evalIO(), but doesn't force the action's result. */ SchedulerStatus -rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) +rts_evalLazyIO (HaskellObj p, /*out*/HaskellObj *ret) +{ + StgTSO *tso; + + tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p); + return scheduleWaitThread(tso,ret,rtsApiCapability); +} + +SchedulerStatus +rts_evalLazyIO_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) { StgTSO *tso; tso = createIOThread(stack_size, p); - return scheduleWaitThread(tso,ret); + return scheduleWaitThread(tso,ret,rtsApiCapability); } /* Convenience function for decoding the returned status. */ @@ -486,18 +482,13 @@ void rts_lock() { #ifdef RTS_SUPPORTS_THREADS - Capability *cap; ACQUIRE_LOCK(&sched_mutex); // we request to get the capability immediately, in order to // a) stop other threads from using allocate() // b) wake the current worker thread from awaitEvent() // (so that a thread started by rts_eval* will start immediately) - grabReturnCapability(&sched_mutex,&cap); - - // now that we have the capability, we don't need it anymore - // (other threads will continue to run as soon as we release the sched_mutex) - releaseCapability(cap); + grabReturnCapability(&sched_mutex,&rtsApiCapability); // In the RTS hasn't been entered yet, // start a RTS task. @@ -511,6 +502,7 @@ void rts_unlock() { #ifdef RTS_SUPPORTS_THREADS + rtsApiCapability = NULL; RELEASE_LOCK(&sched_mutex); #endif } diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index c58584f..d29b6bb 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $ + * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $ * * (c) The GHC Team, 1998-2000 * @@ -126,6 +126,10 @@ #include #include +#ifdef HAVE_ERRNO_H +#include +#endif + //@node Variables and Data structures, Prototypes, Includes, Main scheduling code //@subsection Variables and Data structures @@ -134,15 +138,6 @@ */ StgMainThread *main_threads = NULL; -#ifdef THREADED_RTS -// Pointer to the thread that executes main -// When this thread is finished, the program terminates -// by calling shutdownHaskellAndExit. -// It would be better to add a call to shutdownHaskellAndExit -// to the Main.main wrapper and to remove this hack. -StgMainThread *main_main_thread = NULL; -#endif - /* Thread queues. * Locks required: sched_mutex. */ @@ -249,7 +244,7 @@ static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); -static void schedule ( void ); +static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); static void detectBlackHoles ( void ); @@ -311,7 +306,7 @@ static void taskStart(void); static void taskStart(void) { - schedule(); + schedule(NULL,NULL); } #endif @@ -363,10 +358,10 @@ startSchedulerTask(void) ------------------------------------------------------------------------ */ //@cindex schedule static void -schedule( void ) +schedule( StgMainThread *mainThread, Capability *initialCapability ) { StgTSO *t; - Capability *cap; + Capability *cap = initialCapability; StgThreadReturnCode ret; #if defined(GRAN) rtsEvent *event; @@ -386,8 +381,16 @@ schedule( void ) ACQUIRE_LOCK(&sched_mutex); #if defined(RTS_SUPPORTS_THREADS) - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); - IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId())); + /* in the threaded case, the capability is either passed in via the initialCapability + parameter, or initialized inside the scheduler loop */ + + IF_DEBUG(scheduler, + fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n", + osThreadId(), osThreadId())); + IF_DEBUG(scheduler, + fprintf(stderr,"### main thread: %p\n",mainThread)); + IF_DEBUG(scheduler, + fprintf(stderr,"### initial cap: %p\n",initialCapability)); #else /* simply initialise it in the non-threaded case */ grabCapability(&cap); @@ -431,8 +434,19 @@ schedule( void ) #if defined(RTS_SUPPORTS_THREADS) /* Check to see whether there are any worker threads waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, &cap); + yield our capability... if we have a capability, that is. */ + if(cap) + yieldToReturningWorker(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + + /* If we do not currently hold a capability, we wait for one */ + if(!cap) + { + waitForWorkCapability(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap", + osThreadId())); + } #endif /* If we're interrupted (the user pressed ^C, or some other @@ -463,55 +477,63 @@ schedule( void ) */ #if defined(RTS_SUPPORTS_THREADS) { - StgMainThread *m, **prev; - prev = &main_threads; - for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - if (m->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(m->ret) = (StgClosure *)m->tso->sp[1]; - } - *prev = m->link; - m->stat = Success; - broadcastCondition(&m->wakeup); -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - if(m == main_main_thread) - { - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); - } - break; - case ThreadKilled: - if (m->ret) *(m->ret) = NULL; - *prev = m->link; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - broadcastCondition(&m->wakeup); + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { + if (m->tso->what_next == ThreadComplete + || m->tso->what_next == ThreadKilled) + { + if(m == mainThread) + { + if(m->tso->what_next == ThreadComplete) + { + if (m->ret) + { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(m->ret) = (StgClosure *)m->tso->sp[1]; + } + m->stat = Success; + } + else + { + if (m->ret) + { + *(m->ret) = NULL; + } + if (was_interrupted) + { + m->stat = Interrupted; + } + else + { + m->stat = Killed; + } + } + *prev = m->link; + #ifdef DEBUG - removeThreadLabel((StgWord)m->tso); + removeThreadLabel((StgWord)m->tso); #endif - if(m == main_main_thread) - { releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); + return; + } + else + { + // The current OS thread can not handle the fact that the Haskell + // thread "m" has ended. + // "m" is bound; the scheduler loop in it's bound OS thread has + // to return, so let's pass our capability directly to that thread. + passCapability(&sched_mutex, cap, &m->bound_thread_cond); + cap = NULL; + } } - break; - default: - break; } - } } - + + if(!cap) // If we gave our capability away, + continue; // go to the top to get it back + #else /* not threaded */ # if defined(PAR) @@ -1067,6 +1089,63 @@ schedule( void ) IF_DEBUG(sanity,checkTSO(t)); #endif +#ifdef THREADED_RTS + { + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == t) + break; + } + + if(m) + { + if(m == mainThread) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in bound OS thread %u\n", + t, osThreadId())); + // yes, the Haskell thread is bound to the current native thread + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p bound to other OS thread than %u\n", + t, osThreadId())); + // no, bound to a different Haskell thread: pass to that thread + PUSH_ON_RUN_QUEUE(t); + passCapability(&sched_mutex,cap,&m->bound_thread_cond); + cap = NULL; + continue; + } + } + else + { + // The thread we want to run is not bound. + if(mainThread == NULL) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in worker OS thread %u\n", + t, osThreadId())); + // if we are a worker thread, + // we may run it here + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n", + t, mainThread, osThreadId())); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + PUSH_ON_RUN_QUEUE(t); + releaseCapability(cap); + cap = NULL; + continue; + } + } + } +#endif + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1103,7 +1182,9 @@ run_thread: ret = ThreadFinished; break; case ThreadRunGHC: + errno = t->saved_errno; ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + t->saved_errno = errno; break; case ThreadInterpret: ret = interpretBCO(cap); @@ -1122,7 +1203,7 @@ run_thread: ACQUIRE_LOCK(&sched_mutex); #ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); + IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif @@ -1492,19 +1573,54 @@ run_thread: } /* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#ifdef THREADED_RTS + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso) +{ +#ifdef THREADED_RTS + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == tso) + return rtsTrue; + } +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ +static void +deleteThreadImmediately(StgTSO *tso); + StgInt forkProcess(StgTSO* tso) { #ifndef mingw32_TARGET_OS pid_t pid; StgTSO* t,*next; - StgMainThread *m; - rtsBool doKill; IF_DEBUG(scheduler,sched_belch("forking!")); + ACQUIRE_LOCK(&sched_mutex); pid = fork(); if (pid) { /* parent */ @@ -1512,6 +1628,43 @@ forkProcess(StgTSO* tso) /* just return the pid */ } else { /* child */ +#ifdef THREADED_RTS + /* wipe all other threads */ + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + tso->link = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + /* Don't kill the current thread.. */ + if (t->id == tso->id) { + continue; + } + + if (isThreadBound(t)) { + // If the thread is bound, the OS thread that the thread is bound to + // no longer exists after the fork() system call. + // The bound Haskell thread is therefore unable to run at all; + // we must not give it a chance to survive by catching the + // ThreadKilled exception. So we kill it "brutally" rather than + // using deleteThread. + deleteThreadImmediately(t); + } else { + deleteThread(t); + } + } + + if (isThreadBound(tso)) { + } else { + // If the current is not bound, then we should make it so. + // The OS thread left over by fork() is special in that the process + // will terminate as soon as the thread terminates; + // we'd expect forkProcess to behave similarily. + // FIXME - we don't do this. + } +#else + StgMainThread *m; + rtsBool doKill; /* wipe all other threads */ run_queue_hd = run_queue_tl = END_TSO_QUEUE; tso->link = END_TSO_QUEUE; @@ -1555,7 +1708,9 @@ forkProcess(StgTSO* tso) deleteThread(t); } } +#endif } + RELEASE_LOCK(&sched_mutex); return pid; #else /* mingw32 */ barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); @@ -1618,6 +1773,7 @@ suspendThread( StgRegTable *reg, { nat tok; Capability *cap; + int saved_errno = errno; /* assume that *reg is a pointer to the StgRegTable part * of a Capability. @@ -1667,6 +1823,8 @@ suspendThread( StgRegTable *reg, /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; return tok; } @@ -1676,6 +1834,7 @@ resumeThread( StgInt tok, { StgTSO *tso, **prev; Capability *cap; + int saved_errno = errno; #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ @@ -1717,6 +1876,7 @@ resumeThread( StgInt tok, #if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); #endif + errno = saved_errno; return &cap->r; } @@ -1845,6 +2005,8 @@ createThread(nat size) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->saved_errno = 0; + tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -2016,10 +2178,8 @@ activateSpark (rtsSpark spark) } #endif -static SchedulerStatus waitThread_(/*out*/StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m, + Capability *initialCapability ); @@ -2061,7 +2221,7 @@ void scheduleThread(StgTSO* tso) } SchedulerStatus -scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability) { // Precondition: sched_mutex must be held StgMainThread *m; @@ -2071,6 +2231,9 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) initCondition(&m->wakeup); +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#endif #endif /* Put the thread on the main-threads list prior to scheduling the TSO. @@ -2088,11 +2251,8 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) main_threads = m; scheduleThread_(tso); -#if defined(THREADED_RTS) - return waitThread_(m, rtsTrue); -#else - return waitThread_(m); -#endif + + return waitThread_(m, initialCapability); } /* --------------------------------------------------------------------------- @@ -2259,13 +2419,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL); + waitThread ( run_queue_hd, NULL, NULL ); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -2274,7 +2434,7 @@ finishAllThreads ( void ) } SchedulerStatus -waitThread(StgTSO *tso, /*out*/StgClosure **ret) +waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability) { StgMainThread *m; SchedulerStatus stat; @@ -2285,6 +2445,9 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) initCondition(&m->wakeup); +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#endif #endif /* see scheduleWaitThread() comment */ @@ -2293,45 +2456,25 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) main_threads = m; IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); -#if defined(THREADED_RTS) - stat = waitThread_(m, rtsFalse); -#else - stat = waitThread_(m); -#endif + + stat = waitThread_(m,initialCapability); + RELEASE_LOCK(&sched_mutex); return stat; } static SchedulerStatus -waitThread_(StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) +waitThread_(StgMainThread* m, Capability *initialCapability) { SchedulerStatus stat; // Precondition: sched_mutex must be held. IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); -#if defined(RTS_SUPPORTS_THREADS) - -# if defined(THREADED_RTS) - if (!blockWaiting) { - /* In the threaded case, the OS thread that called main() - * gets to enter the RTS directly without going via another - * task/thread. - */ - main_main_thread = m; - RELEASE_LOCK(&sched_mutex); - schedule(); - ACQUIRE_LOCK(&sched_mutex); - main_main_thread = NULL; - ASSERT(m->stat != NoStatus); - } else -# endif - { +#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS) + { // FIXME: does this still make sense? + // It's not for the threaded rts => SMP only do { waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); @@ -2343,10 +2486,11 @@ waitThread_(StgMainThread* m CurrentProc = MainProc; // PE to run it on RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); #else RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); + ACQUIRE_LOCK(&sched_mutex); ASSERT(m->stat != NoStatus); #endif @@ -2354,6 +2498,9 @@ waitThread_(StgMainThread* m #if defined(RTS_SUPPORTS_THREADS) closeCondition(&m->wakeup); +#if defined(THREADED_RTS) + closeCondition(&m->bound_thread_cond); +#endif #endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", @@ -3327,6 +3474,18 @@ deleteThread(StgTSO *tso) raiseAsync(tso,NULL); } +static void +deleteThreadImmediately(StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception + + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } + unblockThread(tso); + tso->what_next = ThreadKilled; +} + void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) { diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index b2a07e4..fccac3c 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.h,v 1.38 2003/03/17 14:47:48 simonmar Exp $ + * $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $ * * (c) The GHC Team 1998-1999 * @@ -149,6 +149,8 @@ extern nat rts_n_waiting_workers; extern nat rts_n_waiting_tasks; #endif +StgBool rtsSupportsBoundThreads(void); +StgBool isThreadBound(StgTSO *tso); StgInt forkProcess(StgTSO *tso); extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret); @@ -166,11 +168,13 @@ void resurrectThreads( StgTSO * ); * * These are the threads which clients have requested that we run. * - * In a 'threaded' build, we might have several concurrent clients all - * waiting for results, and each one will wait on a condition variable - * until the result is available. + * In a 'threaded' build, each of these corresponds to one bound thread. + * The pointer to the StgMainThread is passed as a parameter to schedule; + * this invocation of schedule will always pass this main thread's + * bound_thread_cond to waitForkWorkCapability; OS-thread-switching + * takes place using passCapability. * - * In non-SMP, clients are strictly nested: the first client calls + * In non-threaded builds, clients are strictly nested: the first client calls * into the RTS, which might call out again to C with a _ccall_GC, and * eventually re-enter the RTS. * @@ -188,7 +192,6 @@ typedef struct StgMainThread_ { #if defined(RTS_SUPPORTS_THREADS) Condition wakeup; #if defined(THREADED_RTS) - rtsBool thread_bound; Condition bound_thread_cond; #endif #endif diff --git a/ghc/rts/Weak.c b/ghc/rts/Weak.c index fe2ae74..7201ba1 100644 --- a/ghc/rts/Weak.c +++ b/ghc/rts/Weak.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Weak.c,v 1.29 2003/03/26 17:43:05 sof Exp $ + * $Id: Weak.c,v 1.30 2003/09/21 22:20:56 wolfgang Exp $ * * (c) The GHC Team, 1998-1999 * @@ -35,28 +35,20 @@ finalizeWeakPointersNow(void) { StgWeak *w; -#if defined(RTS_SUPPORTS_THREADS) rts_lock(); -#endif while ((w = weak_ptr_list)) { weak_ptr_list = w->link; if (w->header.info != &stg_DEAD_WEAK_info) { w->header.info = &stg_DEAD_WEAK_info; IF_DEBUG(weak,fprintf(stderr,"Finalising weak pointer at %p -> %p\n", w, w->key)); if (w->finalizer != &stg_NO_FINALIZER_closure) { -#if defined(RTS_SUPPORTS_THREADS) - rts_evalIO(w->finalizer,NULL); + rts_evalLazyIO(w->finalizer,NULL); rts_unlock(); rts_lock(); -#else - rts_mainLazyIO(w->finalizer,NULL); -#endif } } } -#if defined(RTS_SUPPORTS_THREADS) rts_unlock(); -#endif } /*