Nothing -> text "(StgClosure*)deRefStablePtr(the_stableptr)"
Just hs_fn -> char '&' <> ppr hs_fn <> text "_closure"
+ cap = text "cap" <> comma
+
-- the expression we give to rts_evalIO
expr_to_run
= foldl appArg the_cfun arg_info -- NOT aug_arg_info
where
appArg acc (arg_cname, _, arg_hty, _)
= text "rts_apply"
- <> parens (acc <> comma <> mkHObj arg_hty <> parens arg_cname)
+ <> parens (cap <> acc <> comma <> mkHObj arg_hty <> parens (cap <> arg_cname))
-- various other bits for inside the fn
declareResult = text "HaskellObj ret;"
fun_proto $$
vcat
[ lbrace
- , text "SchedulerStatus rc;"
+ , text "Capability *cap;"
, declareResult
, declareCResult
- , text "rts_lock();"
+ , text "cap = rts_lock();"
-- create the application + perform it.
- , text "rc=rts_evalIO" <> parens (
+ , text "cap=rts_evalIO" <> parens (
+ cap <>
text "rts_apply" <> parens (
+ cap <>
text "(HaskellObj)"
<> text (if is_IO_res_ty
then "runIO_closure"
<> text "&ret"
) <> semi
, text "rts_checkSchedStatus" <> parens (doubleQuotes (ftext c_nm)
- <> comma <> text "rc") <> semi
+ <> comma <> text "cap") <> semi
, assignCResult
- , text "rts_unlock();"
+ , text "rts_unlock(cap);"
, if res_hty_is_unit then empty
else text "return cret;"
, rbrace
/* Allocation -------------------------------------------------------------- */
-extern bdescr *allocGroup(nat n);
-extern bdescr *allocBlock(void);
+bdescr *allocGroup(nat n);
+bdescr *allocBlock(void);
+
+// versions that take the storage manager lock for you:
+bdescr *allocGroup_lock(nat n);
+bdescr *allocBlock_lock(void);
/* De-Allocation ----------------------------------------------------------- */
-extern void freeGroup(bdescr *p);
-extern void freeChain(bdescr *p);
+void freeGroup(bdescr *p);
+void freeChain(bdescr *p);
+
+// versions that take the storage manager lock for you:
+void freeGroup_lock(bdescr *p);
+void freeChain_lock(bdescr *p);
/* Round a value to megablocks --------------------------------------------- */
#error mp_limb_t != StgWord: assumptions in PrimOps.cmm are now false
#endif
+#define MyCapability() (BaseReg - OFFSET_Capability_r)
+
/* -------------------------------------------------------------------------
Allocation and garbage collection
------------------------------------------------------------------------- */
#define BlockedOnGA 9
/* same as above but without sending a Fetch message */
#define BlockedOnGA_NoSend 10
-/* Only relevant for RTS_SUPPORTS_THREADS: */
+/* Only relevant for THREADED_RTS: */
#define BlockedOnCCall 11
#define BlockedOnCCall_NoUnblockExc 12
/* same as above but don't unblock async exceptions in resumeThread() */
SRC_CC_OPTS += -DNO_REGS -DUSE_MINIINTERPRETER
endif
+SRC_CC_OPTS += -I. -I../rts
+
#
# Header file built from the configure script's findings
#
$(CC) -o $@ $(CC_OPTS) $(LD_OPTS) mkGHCConstants.o
mkGHCConstants.o : mkDerivedConstants.c
- $(CC) -o $@ -c $< -DGEN_HASKELL
+ $(CC) -o $@ $(CC_OPTS) -c $< -DGEN_HASKELL
GHCConstants.h : mkGHCConstants
./mkGHCConstants >$@
#ifndef __OSTHREADS_H__
#define __OSTHREADS_H__
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
+#if defined(THREADED_RTS) /* to the end */
# if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS)
-# include <pthread.h>
+
+#include <pthread.h>
+
typedef pthread_cond_t Condition;
typedef pthread_mutex_t Mutex;
typedef pthread_t OSThreadId;
+typedef pthread_key_t ThreadLocalKey;
+
+#define OSThreadProcAttr /* nothing */
#define INIT_MUTEX_VAR PTHREAD_MUTEX_INITIALIZER
#define INIT_COND_VAR PTHREAD_COND_INITIALIZER
#ifdef LOCK_DEBUG
+
#define ACQUIRE_LOCK(mutex) \
debugBelch("ACQUIRE_LOCK(0x%p) %s %d\n", mutex,__FILE__,__LINE__); \
pthread_mutex_lock(mutex)
#define RELEASE_LOCK(mutex) \
debugBelch("RELEASE_LOCK(0x%p) %s %d\n", mutex,__FILE__,__LINE__); \
pthread_mutex_unlock(mutex)
+#define ASSERT_LOCK_HELD(mutex) /* nothing */
+
+#elif defined(DEBUG) && defined(linux_HOST_OS)
+#include <errno.h>
+/*
+ * On Linux, we can use extensions to determine whether we already
+ * hold a lock or not, which is useful for debugging.
+ */
+#define ACQUIRE_LOCK(mutex) \
+ if (pthread_mutex_lock(mutex) == EDEADLK) { \
+ barf("multiple ACQUIRE_LOCK: %s %d", __FILE__,__LINE__); \
+ }
+#define RELEASE_LOCK(mutex) \
+ if (pthread_mutex_unlock(mutex) != 0) { \
+ barf("RELEASE_LOCK: I do not own this lock: %s %d", __FILE__,__LINE__); \
+ }
+
+#define ASSERT_LOCK_HELD(mutex) ASSERT(pthread_mutex_lock(mutex) == EDEADLK)
+
+#define ASSERT_LOCK_NOTHELD(mutex) \
+ if (pthread_mutex_lock(mutex) != EDEADLK) { \
+ pthread_mutex_unlock(mutex); \
+ } else { \
+ ASSERT(0); \
+ }
+
+
#else
+
#define ACQUIRE_LOCK(mutex) pthread_mutex_lock(mutex)
#define RELEASE_LOCK(mutex) pthread_mutex_unlock(mutex)
+#define ASSERT_LOCK_HELD(mutex) /* nothing */
+
#endif
# elif defined(HAVE_WINDOWS_H)
typedef HANDLE Condition;
typedef HANDLE Mutex;
typedef DWORD OSThreadId;
+typedef DWORD ThreadLocalKey;
+
+#define OSThreadProcAttr __stdcall
#define INIT_MUTEX_VAR 0
#define INIT_COND_VAR 0
}
}
+#define ASSERT_LOCK_HELD(mutex) /* nothing */
+
# else
# error "Threads not supported"
# endif
+//
+// General thread operations
+//
+extern OSThreadId osThreadId ( void );
+extern void shutdownThread ( void );
+extern void yieldThread ( void );
+
+typedef void OSThreadProcAttr OSThreadProc(void *);
+
+extern int createOSThread ( OSThreadId* tid,
+ OSThreadProc *startProc, void *param);
+
+//
+// Condition Variables
+//
extern void initCondition ( Condition* pCond );
extern void closeCondition ( Condition* pCond );
extern rtsBool broadcastCondition ( Condition* pCond );
extern rtsBool waitCondition ( Condition* pCond,
Mutex* pMut );
+//
+// Mutexes
+//
extern void initMutex ( Mutex* pMut );
-extern OSThreadId osThreadId ( void );
-extern void shutdownThread ( void );
-extern void yieldThread ( void );
-extern int createOSThread ( OSThreadId* tid,
- void (*startProc)(void) );
+//
+// Thread-local storage
+//
+void newThreadLocalKey (ThreadLocalKey *key);
+void *getThreadLocalVar (ThreadLocalKey *key);
+void setThreadLocalVar (ThreadLocalKey *key, void *value);
+
#else
#define ACQUIRE_LOCK(l)
#define RELEASE_LOCK(l)
+#define ASSERT_LOCK_HELD(l)
#endif /* defined(RTS_SUPPORTS_THREADS) */
MP_INT rmp_result1;
MP_INT rmp_result2;
#if defined(SMP) || defined(PAR)
- StgSparkPool rSparks; /* per-task spark pool */
+ StgSparkPool rSparks; /* per-task spark pool */
#endif
- StgWord rInHaskell; /* non-zero if we're in Haskell code */
// If this flag is set, we are running Haskell code. Used to detect
// uses of 'foreign import unsafe' that should be 'safe'.
} StgRegTable;
-
-/* A capability is a combination of a FunTable and a RegTable. In STG
- * code, BaseReg normally points to the RegTable portion of this
- * structure, so that we can index both forwards and backwards to take
- * advantage of shorter instruction forms on some archs (eg. x86).
- */
-typedef struct Capability_ {
- StgFunTable f;
- StgRegTable r;
-#if defined(SMP)
- struct Capability_ *link; /* per-task register tables are linked together */
-#endif
-} Capability;
-
-/* No such thing as a MainCapability under SMP - each thread must have
- * its own Capability.
- */
-#ifndef SMP
-#if IN_STG_CODE
-extern W_ MainCapability[];
-#else
-extern DLL_IMPORT_RTS Capability MainCapability;
-#endif
-#endif
-
#if IN_STG_CODE
/*
* concurrent Haskell, MainRegTable otherwise).
*/
+/* A capability is a combination of a FunTable and a RegTable. In STG
+ * code, BaseReg normally points to the RegTable portion of this
+ * structure, so that we can index both forwards and backwards to take
+ * advantage of shorter instruction forms on some archs (eg. x86).
+ * This is a cut-down version of the Capability structure; the full
+ * version is defined in Capability.h.
+ */
+struct PartCapability_ {
+ StgFunTable f;
+ StgRegTable r;
+};
+
+/* No such thing as a MainCapability under SMP - each thread must have
+ * its own Capability.
+ */
+#if IN_STG_CODE && !defined(SMP)
+extern W_ MainCapability[];
+#endif
+
#if defined(REG_Base) && !defined(NO_GLOBAL_REG_DECLS)
GLOBAL_REG_DECL(StgRegTable *,BaseReg,REG_Base)
#else
#ifdef SMP
#error BaseReg must be in a register for SMP
#endif
-#define BaseReg (&((Capability *)MainCapability)[0].r)
+#define BaseReg (&((struct Capability_)MainCapability).r)
#endif
#if defined(REG_Sp) && !defined(NO_GLOBAL_REG_DECLS)
/* Parallel information */
#include "Parallel.h"
+#include "OSThreads.h"
+#include "SMP.h"
/* STG/Optimised-C related stuff */
#include "Block.h"
typedef StgClosure *HaskellObj;
+/*
+ * An abstract type representing the token returned by rts_lock() and
+ * used when allocating objects and threads in the RTS.
+ */
+typedef struct Capability_ Capability;
+
/* ----------------------------------------------------------------------------
Starting up and shutting down the Haskell RTS.
------------------------------------------------------------------------- */
/* ----------------------------------------------------------------------------
Locking.
- In a multithreaded environments, you have to surround all access to the
- RtsAPI with these calls.
+ You have to surround all access to the RtsAPI with these calls.
------------------------------------------------------------------------- */
-void
-rts_lock ( void );
+// acquires a token which may be used to create new objects and
+// evaluate them.
+Capability *rts_lock (void);
-void
-rts_unlock ( void );
+// releases the token acquired with rts_lock().
+void rts_unlock (Capability *token);
/* ----------------------------------------------------------------------------
Building Haskell objects from C datatypes.
------------------------------------------------------------------------- */
-HaskellObj rts_mkChar ( HsChar c );
-HaskellObj rts_mkInt ( HsInt i );
-HaskellObj rts_mkInt8 ( HsInt8 i );
-HaskellObj rts_mkInt16 ( HsInt16 i );
-HaskellObj rts_mkInt32 ( HsInt32 i );
-HaskellObj rts_mkInt64 ( HsInt64 i );
-HaskellObj rts_mkWord ( HsWord w );
-HaskellObj rts_mkWord8 ( HsWord8 w );
-HaskellObj rts_mkWord16 ( HsWord16 w );
-HaskellObj rts_mkWord32 ( HsWord32 w );
-HaskellObj rts_mkWord64 ( HsWord64 w );
-HaskellObj rts_mkPtr ( HsPtr a );
-HaskellObj rts_mkFunPtr ( HsFunPtr a );
-HaskellObj rts_mkFloat ( HsFloat f );
-HaskellObj rts_mkDouble ( HsDouble f );
-HaskellObj rts_mkStablePtr ( HsStablePtr s );
-HaskellObj rts_mkBool ( HsBool b );
-HaskellObj rts_mkString ( char *s );
-
-HaskellObj rts_apply ( HaskellObj, HaskellObj );
+HaskellObj rts_mkChar ( Capability *, HsChar c );
+HaskellObj rts_mkInt ( Capability *, HsInt i );
+HaskellObj rts_mkInt8 ( Capability *, HsInt8 i );
+HaskellObj rts_mkInt16 ( Capability *, HsInt16 i );
+HaskellObj rts_mkInt32 ( Capability *, HsInt32 i );
+HaskellObj rts_mkInt64 ( Capability *, HsInt64 i );
+HaskellObj rts_mkWord ( Capability *, HsWord w );
+HaskellObj rts_mkWord8 ( Capability *, HsWord8 w );
+HaskellObj rts_mkWord16 ( Capability *, HsWord16 w );
+HaskellObj rts_mkWord32 ( Capability *, HsWord32 w );
+HaskellObj rts_mkWord64 ( Capability *, HsWord64 w );
+HaskellObj rts_mkPtr ( Capability *, HsPtr a );
+HaskellObj rts_mkFunPtr ( Capability *, HsFunPtr a );
+HaskellObj rts_mkFloat ( Capability *, HsFloat f );
+HaskellObj rts_mkDouble ( Capability *, HsDouble f );
+HaskellObj rts_mkStablePtr ( Capability *, HsStablePtr s );
+HaskellObj rts_mkBool ( Capability *, HsBool b );
+HaskellObj rts_mkString ( Capability *, char *s );
+
+HaskellObj rts_apply ( Capability *, HaskellObj, HaskellObj );
/* ----------------------------------------------------------------------------
Deconstructing Haskell objects
Note that these calls may cause Garbage Collection, so all HaskellObj
references are rendered invalid by these calls.
------------------------------------------------------------------------- */
-SchedulerStatus
-rts_eval ( HaskellObj p, /*out*/HaskellObj *ret );
+Capability *
+rts_eval (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
-SchedulerStatus
-rts_eval_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
+Capability *
+rts_eval_ (Capability *, HaskellObj p, unsigned int stack_size,
+ /*out*/HaskellObj *ret);
-SchedulerStatus
-rts_evalIO ( HaskellObj p, /*out*/HaskellObj *ret );
+Capability *
+rts_evalIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
-SchedulerStatus
-rts_evalStableIO ( HsStablePtr s, /*out*/HsStablePtr *ret );
+Capability *
+rts_evalStableIO (Capability *, HsStablePtr s, /*out*/HsStablePtr *ret);
-SchedulerStatus
-rts_evalLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
+Capability *
+rts_evalLazyIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
-SchedulerStatus
-rts_evalLazyIO_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
+Capability *
+rts_evalLazyIO_ (Capability *, HaskellObj p, unsigned int stack_size,
+ /*out*/HaskellObj *ret);
void
-rts_checkSchedStatus ( char* site, SchedulerStatus rc);
+rts_checkSchedStatus (char* site, Capability *);
+
+SchedulerStatus
+rts_getSchedStatus (Capability *cap);
/* --------------------------------------------------------------------------
Wrapper closures
#define SUPPORT_LONG_LONGS 1
#endif
-#if defined(SMP) || defined(THREADED_RTS)
-#define RTS_SUPPORTS_THREADS 1
-#endif
-
/*
* Whether the runtime system will use libbfd for debugging purposes.
*/
/* TICKY_TICKY needs EAGER_BLACKHOLING to verify no double-entries of
* single-entry thunks.
*/
+//#if defined(TICKY_TICKY) || defined(SMP)
#if defined(TICKY_TICKY)
# define EAGER_BLACKHOLING
#else
extern StgInt isFloatNegativeZero(StgFloat f);
/* Suspending/resuming threads around foreign calls */
-extern StgInt suspendThread ( StgRegTable * );
-extern StgRegTable * resumeThread ( StgInt );
+extern void * suspendThread ( StgRegTable * );
+extern StgRegTable * resumeThread ( void * );
+
+/* scheduler stuff */
+extern void stg_scheduleThread (StgRegTable *reg, struct StgTSO_ *tso);
/* Creating and destroying an adjustor thunk */
extern void* createAdjustor(int cconv, StgStablePtr hptr, StgFunPtr wptr,
extern int stg_sig_install (int, int, StgStablePtr *, void *);
#endif
-extern void startSignalHandler(int sig);
+#if !defined(mingw32_HOST_OS)
+extern StgInt *signal_handlers;
+#endif
extern void setIOManagerPipe (int fd);
extern void* stgMallocBytesRWX(int len);
/* Create and enter a new transaction context */
-extern StgTRecHeader *stmStartTransaction(StgRegTable *reg, StgTRecHeader *outer);
-extern StgTRecHeader *stmStartNestedTransaction(StgRegTable *reg, StgTRecHeader *outer
+extern StgTRecHeader *stmStartTransaction(Capability *cap, StgTRecHeader *outer);
+extern StgTRecHeader *stmStartNestedTransaction(Capability *cap, StgTRecHeader *outer
);
/*
* been committed to.
*/
-extern StgBool stmCommitTransaction(StgRegTable *reg, StgTRecHeader *trec);
-extern StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec);
+extern StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec);
+extern StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec);
/*
* Test whether the current transaction context is valid and, if so,
* if the thread is already waiting.
*/
-extern StgBool stmWait(StgRegTable *reg,
+extern StgBool stmWait(Capability *cap,
StgTSO *tso,
StgTRecHeader *trec);
--------------------------
*/
-extern StgTVar *stmNewTVar(StgRegTable *reg,
+extern StgTVar *stmNewTVar(Capability *cap,
StgClosure *new_value);
/*----------------------------------------------------------------------
* thread's current transaction.
*/
-extern StgClosure *stmReadTVar(StgRegTable *reg,
+extern StgClosure *stmReadTVar(Capability *cap,
StgTRecHeader *trec,
StgTVar *tvar);
* thread's current transaction.
*/
-extern void stmWriteTVar(StgRegTable *reg,
+extern void stmWriteTVar(Capability *cap,
StgTRecHeader *trec,
StgTVar *tvar,
StgClosure *new_value);
#define NO_PRI 0
#endif
-extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret,
- Capability *initialCapability);
-
/*
* Creating threads
*/
#if defined(GRAN)
-extern StgTSO *createThread(nat stack_size, StgInt pri);
-#else
-extern StgTSO *createThread(nat stack_size);
-#endif
-extern void scheduleThread(StgTSO *tso);
-extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret,
- Capability *initialCapability);
-
-INLINE_HEADER void pushClosure (StgTSO *tso, StgWord c) {
- tso->sp--;
- tso->sp[0] = (W_) c;
-}
-
-INLINE_HEADER StgTSO *
-createGenThread(nat stack_size, StgClosure *closure) {
- StgTSO *t;
-#if defined(GRAN)
- t = createThread(stack_size, NO_PRI);
-#else
- t = createThread(stack_size);
-#endif
- pushClosure(t, (W_)closure);
- pushClosure(t, (W_)&stg_enter_info);
- return t;
-}
-
-INLINE_HEADER StgTSO *
-createIOThread(nat stack_size, StgClosure *closure) {
- StgTSO *t;
-#if defined(GRAN)
- t = createThread(stack_size, NO_PRI);
-#else
- t = createThread(stack_size);
-#endif
- pushClosure(t, (W_)&stg_noforceIO_info);
- pushClosure(t, (W_)&stg_ap_v_info);
- pushClosure(t, (W_)closure);
- pushClosure(t, (W_)&stg_enter_info);
- return t;
-}
-
-/*
- * Same as above, but also evaluate the result of the IO action
- * to whnf while we're at it.
- */
-
-INLINE_HEADER StgTSO *
-createStrictIOThread(nat stack_size, StgClosure *closure) {
- StgTSO *t;
-#if defined(GRAN)
- t = createThread(stack_size, NO_PRI);
+StgTSO *createThread (Capability *cap, nat stack_size, StgInt pri);
#else
- t = createThread(stack_size);
+StgTSO *createThread (Capability *cap, nat stack_size);
#endif
- pushClosure(t, (W_)&stg_forceIO_info);
- pushClosure(t, (W_)&stg_ap_v_info);
- pushClosure(t, (W_)closure);
- pushClosure(t, (W_)&stg_enter_info);
- return t;
-}
-
-/*
- * Killing threads
- */
-extern void deleteThread(StgTSO *tso);
-extern void deleteAllThreads ( void );
-extern int howManyThreadsAvail ( void );
-/*
- * Run until there are no more threads.
- */
-extern void finishAllThreads ( void );
+Capability *scheduleWaitThread (StgTSO *tso, /*out*/HaskellObj* ret,
+ Capability *cap);
+StgTSO *createGenThread (Capability *cap, nat stack_size,
+ StgClosure *closure);
+StgTSO *createIOThread (Capability *cap, nat stack_size,
+ StgClosure *closure);
+StgTSO *createStrictIOThread (Capability *cap, nat stack_size,
+ StgClosure *closure);
#endif
-------------------------------------------------------------------------- */
extern StgPtr allocate ( nat n );
-extern StgPtr allocateLocal ( StgRegTable *reg, nat n );
+extern StgPtr allocateLocal ( Capability *cap, nat n );
extern StgPtr allocatePinned ( nat n );
extern lnat allocated_bytes ( void );
#if defined(SMP)
#define ACQUIRE_SM_LOCK ACQUIRE_LOCK(&sm_mutex);
#define RELEASE_SM_LOCK RELEASE_LOCK(&sm_mutex);
+#define ASSERT_SM_LOCK() ASSERT_LOCK_HELD(&sm_mutex);
#else
#define ACQUIRE_SM_LOCK
#define RELEASE_SM_LOCK
+#define ASSERT_SM_LOCK()
#endif
INLINE_HEADER void
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
- struct StgMainThread_* main;
+ struct Task_* bound; // non-NULL for a bound thread
struct StgTRecHeader_ *trec; /* STM transaction record */
#ifdef TICKY_TICKY
\
/* ASSERT( p1 != p2 && !closure_IND(p1) ); \
*/ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \
- bd = Bdescr(p1); \
+/* foreign "C" cas(p1 "ptr", 0, stg_WHITEHOLE_info); \
+ */ bd = Bdescr(p1); \
if (bdescr_gen_no(bd) == 0 :: CInt) { \
StgInd_indirectee(p1) = p2; \
SET_INFO(p1, ind_info); \
{ \
bdescr *bd; \
\
+ /* cas(p1, 0, &stg_WHITEHOLE_info); */ \
ASSERT( (P_)p1 != (P_)p2 && !closure_IND(p1) ); \
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \
bd = Bdescr((P_)p1); \
#include "Rts.h"
#include "RtsFlags.h"
#include "Storage.h"
+#include "OSThreads.h"
+#include "Capability.h"
#include <stdio.h>
--- /dev/null
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 1998-2005
+ *
+ * The awaitEvent() interface, for the non-threaded RTS
+ *
+ * -------------------------------------------------------------------------*/
+
+#ifndef AWAITEVENT_H
+#define AWAITEVENT_H
+
+#if !defined(THREADED_RTS)
+/* awaitEvent(rtsBool wait)
+ *
+ * Checks for blocked threads that need to be woken.
+ *
+ * Called from STG : NO
+ * Locks assumed : sched_mutex
+ */
+void awaitEvent(rtsBool wait); /* In posix/Select.c or
+ * win32/AwaitEvent.c */
+#endif
+
+#endif /* SELECT_H */
#include "RtsUtils.h"
#include "BlockAlloc.h"
#include "MBlock.h"
+#include "Storage.h"
#include <string.h>
static bdescr *allocMegaGroup(nat mblocks);
static void freeMegaGroup(bdescr *bd);
+// In SMP mode, the free list is protected by sm_mutex. In the
+// threaded RTS, it is protected by the Capability.
static bdescr *free_list = NULL;
/* -----------------------------------------------------------------------------
void *mblock;
bdescr *bd, **last;
+ ASSERT_SM_LOCK();
ASSERT(n != 0);
if (n > BLOCKS_PER_MBLOCK) {
}
bdescr *
+allocGroup_lock(nat n)
+{
+ bdescr *bd;
+ ACQUIRE_SM_LOCK;
+ bd = allocGroup(n);
+ RELEASE_SM_LOCK;
+ return bd;
+}
+
+bdescr *
allocBlock(void)
{
return allocGroup(1);
}
+bdescr *
+allocBlock_lock(void)
+{
+ bdescr *bd;
+ ACQUIRE_SM_LOCK;
+ bd = allocBlock();
+ RELEASE_SM_LOCK;
+ return bd;
+}
+
/* -----------------------------------------------------------------------------
Any request larger than BLOCKS_PER_MBLOCK needs a megablock group.
First, search the free list for enough contiguous megablocks to
{
bdescr *bd, *last;
+ ASSERT_SM_LOCK();
+
/* are we dealing with a megablock group? */
if (p->blocks > BLOCKS_PER_MBLOCK) {
freeMegaGroup(p);
IF_DEBUG(sanity, checkFreeListSanity());
}
+void
+freeGroup_lock(bdescr *p)
+{
+ ACQUIRE_SM_LOCK;
+ freeGroup(p);
+ RELEASE_SM_LOCK;
+}
+
static void
freeMegaGroup(bdescr *p)
{
}
}
+void
+freeChain_lock(bdescr *bd)
+{
+ ACQUIRE_SM_LOCK;
+ freeChain(bd);
+ RELEASE_SM_LOCK;
+}
+
static void
initMBlock(void *mblock)
{
for (bd = free_list; bd != NULL; bd = bd->link) {
IF_DEBUG(block_alloc,
- debugBelch("group at 0x%x, length %d blocks\n",
- (nat)bd->start, bd->blocks));
+ debugBelch("group at 0x%p, length %d blocks\n",
+ bd->start, bd->blocks));
ASSERT(bd->blocks > 0);
checkWellFormedGroup(bd);
if (bd->link != NULL) {
/* ---------------------------------------------------------------------------
- * (c) The GHC Team, 2003
+ *
+ * (c) The GHC Team, 2003-2005
*
* Capabilities
*
* and all the state an OS thread/task needs to run Haskell code:
* its STG registers, a pointer to its TSO, a nursery etc. During
* STG execution, a pointer to the capabilitity is kept in a
- * register (BaseReg).
+ * register (BaseReg; actually it is a pointer to cap->r).
*
* Only in an SMP build will there be multiple capabilities, for
* the threaded RTS and other non-threaded builds, there is only
#include "RtsFlags.h"
#include "OSThreads.h"
#include "Capability.h"
-#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */
-#if defined(SMP)
-#include "Hash.h"
-#endif
+#include "Schedule.h"
#if !defined(SMP)
-Capability MainCapability; /* for non-SMP, we have one global capability */
+Capability MainCapability; // for non-SMP, we have one global capability
#endif
+nat n_capabilities;
Capability *capabilities = NULL;
-nat rts_n_free_capabilities;
-
-#if defined(RTS_SUPPORTS_THREADS)
-
-/* returning_worker_cond: when a worker thread returns from executing an
- * external call, it needs to wait for an RTS Capability before passing
- * on the result of the call to the Haskell thread that made it.
- *
- * returning_worker_cond is signalled in Capability.releaseCapability().
- *
- */
-Condition returning_worker_cond = INIT_COND_VAR;
-
-/*
- * To avoid starvation of threads blocked on worker_thread_cond,
- * the task(s) that enter the Scheduler will check to see whether
- * there are one or more worker threads blocked waiting on
- * returning_worker_cond.
- */
-nat rts_n_waiting_workers = 0;
-
-/* thread_ready_cond: when signalled, a thread has become runnable for a
- * task to execute.
- *
- * In the non-SMP case, it also implies that the thread that is woken up has
- * exclusive access to the RTS and all its data structures (that are not
- * locked by the Scheduler's mutex).
- *
- * thread_ready_cond is signalled whenever
- * !noCapabilities && !EMPTY_RUN_QUEUE().
- */
-Condition thread_ready_cond = INIT_COND_VAR;
-
-/*
- * To be able to make an informed decision about whether or not
- * to create a new task when making an external call, keep track of
- * the number of tasks currently blocked waiting on thread_ready_cond.
- * (if > 0 => no need for a new task, just unblock an existing one).
- *
- * waitForWorkCapability() takes care of keeping it up-to-date;
- * Task.startTask() uses its current value.
- */
-nat rts_n_waiting_tasks = 0;
-#endif
-#if defined(SMP)
-/*
- * Free capability list.
- */
-Capability *free_capabilities;
-
-/*
- * Maps OSThreadId to Capability *
- */
-HashTable *capability_hash;
-#endif
+// Holds the Capability which last became free. This is used so that
+// an in-call has a chance of quickly finding a free Capability.
+// Maintaining a global free list of Capabilities would require global
+// locking, so we don't do that.
+Capability *last_free_capability;
#ifdef SMP
#define UNUSED_IF_NOT_SMP
#define UNUSED_IF_NOT_SMP STG_UNUSED
#endif
+#ifdef RTS_USER_SIGNALS
+#define UNUSED_IF_NOT_THREADS
+#else
+#define UNUSED_IF_NOT_THREADS STG_UNUSED
+#endif
-#if defined(RTS_SUPPORTS_THREADS)
-INLINE_HEADER rtsBool
-ANY_WORK_FOR_ME( Condition *cond )
-{
- // If the run queue is not empty, then we only wake up the guy who
- // can run the thread at the head, even if there is some other
- // reason for this task to run (eg. interrupted=rtsTrue).
- if (!EMPTY_RUN_QUEUE()) {
- if (run_queue_hd->main == NULL) {
- return (cond == NULL);
- } else {
- return (&run_queue_hd->main->bound_thread_cond == cond);
- }
- }
+STATIC_INLINE rtsBool
+globalWorkToDo (void)
+{
return blackholes_need_checking
|| interrupted
#if defined(RTS_USER_SIGNALS)
#endif
;
}
-#endif
-INLINE_HEADER rtsBool
-ANY_WORK_TO_DO(void)
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+anyWorkForMe( Capability *cap, Task *task )
{
- return (!EMPTY_RUN_QUEUE()
- || interrupted
- || blackholes_need_checking
-#if defined(RTS_USER_SIGNALS)
- || signals_pending()
+ // If the run queue is not empty, then we only wake up the guy who
+ // can run the thread at the head, even if there is some other
+ // reason for this task to run (eg. interrupted=rtsTrue).
+ if (!emptyRunQueue(cap)) {
+ if (cap->run_queue_hd->bound == NULL) {
+ return (task->tso == NULL);
+ } else {
+ return (cap->run_queue_hd->bound == task);
+ }
+ }
+ return globalWorkToDo();
+}
#endif
- );
+
+/* -----------------------------------------------------------------------------
+ * Manage the returning_tasks lists.
+ *
+ * These functions require cap->lock
+ * -------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+newReturningTask (Capability *cap, Task *task)
+{
+ ASSERT_LOCK_HELD(&cap->lock);
+ ASSERT(task->return_link == NULL);
+ if (cap->returning_tasks_hd) {
+ ASSERT(cap->returning_tasks_tl->return_link == NULL);
+ cap->returning_tasks_tl->return_link = task;
+ } else {
+ cap->returning_tasks_hd = task;
+ }
+ cap->returning_tasks_tl = task;
}
+STATIC_INLINE Task *
+popReturningTask (Capability *cap)
+{
+ ASSERT_LOCK_HELD(&cap->lock);
+ Task *task;
+ task = cap->returning_tasks_hd;
+ ASSERT(task);
+ cap->returning_tasks_hd = task->return_link;
+ if (!cap->returning_tasks_hd) {
+ cap->returning_tasks_tl = NULL;
+ }
+ task->return_link = NULL;
+ return task;
+}
+#endif
+
/* ----------------------------------------------------------------------------
- Initialisation
- ------------------------------------------------------------------------- */
+ * Initialisation
+ *
+ * The Capability is initially marked not free.
+ * ------------------------------------------------------------------------- */
static void
-initCapability( Capability *cap )
+initCapability( Capability *cap, nat i )
{
- cap->r.rInHaskell = rtsFalse;
+ cap->no = i;
+ cap->in_haskell = rtsFalse;
+
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+#if defined(THREADED_RTS)
+ initMutex(&cap->lock);
+ cap->running_task = NULL; // indicates cap is free
+ cap->spare_workers = NULL;
+ cap->suspended_ccalling_tasks = NULL;
+ cap->returning_tasks_hd = NULL;
+ cap->returning_tasks_tl = NULL;
+ cap->next = NULL;
+ cap->prev = NULL;
+#endif
+
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
cap->f.stgGCFun = (F_)__stg_gc_fun;
}
*
* Purpose: set up the Capability handling. For the SMP build,
* we keep a table of them, the size of which is
- * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes
+ * controlled by the user via the RTS flag -N.
*
* ------------------------------------------------------------------------- */
void
#if defined(SMP)
nat i,n;
- n = RtsFlags.ParFlags.nNodes;
+ n_capabilities = n = RtsFlags.ParFlags.nNodes;
capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
for (i = 0; i < n; i++) {
- initCapability(&capabilities[i]);
- capabilities[i].link = &capabilities[i+1];
+ initCapability(&capabilities[i], i);
}
- capabilities[n-1].link = NULL;
- free_capabilities = &capabilities[0];
- rts_n_free_capabilities = n;
-
- capability_hash = allocHashTable();
-
IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
#else
+ n_capabilities = 1;
capabilities = &MainCapability;
- initCapability(&MainCapability);
- rts_n_free_capabilities = 1;
+ initCapability(&MainCapability, 0);
#endif
-#if defined(RTS_SUPPORTS_THREADS)
- initCondition(&returning_worker_cond);
- initCondition(&thread_ready_cond);
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- grabCapability( Capability** )
-
- (only externally visible when !RTS_SUPPORTS_THREADS. In the
- threaded RTS, clients must use waitFor*Capability()).
- ------------------------------------------------------------------------- */
-
-#if defined(RTS_SUPPORTS_THREADS)
-static
-#endif
-void
-grabCapability( Capability** cap )
-{
-#if defined(SMP)
- ASSERT(rts_n_free_capabilities > 0);
- *cap = free_capabilities;
- free_capabilities = (*cap)->link;
- rts_n_free_capabilities--;
- insertHashTable(capability_hash, osThreadId(), *cap);
-#else
-# if defined(RTS_SUPPORTS_THREADS)
- ASSERT(rts_n_free_capabilities == 1);
- rts_n_free_capabilities = 0;
-# endif
- *cap = &MainCapability;
-#endif
-#if defined(RTS_SUPPORTS_THREADS)
- IF_DEBUG(scheduler, sched_belch("worker: got capability"));
-#endif
+ // There are no free capabilities to begin with. We will start
+ // a worker Task to each Capability, which will quickly put the
+ // Capability on the free list when it finds nothing to do.
+ last_free_capability = &capabilities[0];
}
/* ----------------------------------------------------------------------------
- * Function: myCapability(void)
+ * Give a Capability to a Task. The task must currently be sleeping
+ * on its condition variable.
+ *
+ * Requires cap->lock (modifies cap->running_task).
+ *
+ * When migrating a Task, the migrater must take task->lock before
+ * modifying task->cap, to synchronise with the waking up Task.
+ * Additionally, the migrater should own the Capability (when
+ * migrating the run queue), or cap->lock (when migrating
+ * returning_workers).
*
- * Purpose: Return the capability owned by the current thread.
- * Should not be used if the current thread does not
- * hold a Capability.
* ------------------------------------------------------------------------- */
-Capability *
-myCapability (void)
+
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+giveCapabilityToTask (Capability *cap, Task *task)
{
-#if defined(SMP)
- return lookupHashTable(capability_hash, osThreadId());
-#else
- return &MainCapability;
-#endif
+ ASSERT_LOCK_HELD(&cap->lock);
+ ASSERT(task->cap == cap);
+ // We are not modifying task->cap, so we do not need to take task->lock.
+ IF_DEBUG(scheduler,
+ sched_belch("passing capability %d to %s %p",
+ cap->no, task->tso ? "bound task" : "worker",
+ (void *)task->id));
+ ACQUIRE_LOCK(&task->lock);
+ task->wakeup = rtsTrue;
+ // the wakeup flag is needed because signalCondition() doesn't
+ // flag the condition if the thread is already runniing, but we want
+ // it to be sticky.
+ signalCondition(&task->cond);
+ RELEASE_LOCK(&task->lock);
}
+#endif
/* ----------------------------------------------------------------------------
* Function: releaseCapability(Capability*)
* to wake up, in that order.
* ------------------------------------------------------------------------- */
+#if defined(THREADED_RTS)
void
-releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
+releaseCapability_ (Capability* cap)
{
- // Precondition: sched_mutex is held.
-#if defined(RTS_SUPPORTS_THREADS)
-#if !defined(SMP)
- ASSERT(rts_n_free_capabilities == 0);
-#endif
-#if defined(SMP)
- cap->link = free_capabilities;
- free_capabilities = cap;
- ASSERT(myCapability() == cap);
- removeHashTable(capability_hash, osThreadId(), NULL);
-#endif
+ Task *task;
+
+ ASSERT(cap->running_task != NULL && myTask() == cap->running_task);
+
+ task = cap->running_task;
+ cap->running_task = NULL;
+
+ ASSERT(task->id == osThreadId());
+
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
- if (rts_n_waiting_workers > 0) {
- // Decrement the counter here to avoid livelock where the
- // thread that is yielding its capability will repeatedly
- // signal returning_worker_cond.
- rts_n_waiting_workers--;
- signalCondition(&returning_worker_cond);
- IF_DEBUG(scheduler,
- sched_belch("worker: released capability to returning worker"));
- } else {
- rts_n_free_capabilities++;
- IF_DEBUG(scheduler, sched_belch("worker: released capability"));
- threadRunnable();
+ if (cap->returning_tasks_hd != NULL) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ // The Task pops itself from the queue (see waitForReturnCapability())
+ return;
+ }
+
+ // If the next thread on the run queue is a bound thread,
+ // give this Capability to the appropriate Task.
+ if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
+ // Make sure we're not about to try to wake ourselves up
+ ASSERT(task != cap->run_queue_hd->bound);
+ task = cap->run_queue_hd->bound;
+ giveCapabilityToTask(cap,task);
+ return;
+ }
+
+ // If we have an unbound thread on the run queue, or if there's
+ // anything else to do, give the Capability to a worker thread.
+ if (!emptyRunQueue(cap) || globalWorkToDo()) {
+ if (cap->spare_workers) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ // The worker Task pops itself from the queue;
+ return;
+ }
+
+ // Create a worker thread if we don't have one. If the system
+ // is interrupted, we only create a worker task if there
+ // are threads that need to be completed. If the system is
+ // shutting down, we never create a new worker.
+ if (!shutting_down_scheduler) {
+ IF_DEBUG(scheduler,
+ sched_belch("starting new worker on capability %d", cap->no));
+ startWorkerTask(cap, workerStart);
+ return;
+ }
}
-#endif
- return;
+
+ last_free_capability = cap;
+ IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
}
-#if defined(RTS_SUPPORTS_THREADS)
-/*
- * When a native thread has completed the execution of an external
- * call, it needs to communicate the result back. This is done
- * as follows:
- *
- * - in resumeThread(), the thread calls waitForReturnCapability().
- * - If no capabilities are readily available, waitForReturnCapability()
- * increments a counter rts_n_waiting_workers, and blocks
- * waiting for the condition returning_worker_cond to become
- * signalled.
- * - upon entry to the Scheduler, a worker thread checks the
- * value of rts_n_waiting_workers. If > 0, the worker thread
- * will yield its capability to let a returning worker thread
- * proceed with returning its result -- this is done via
- * yieldToReturningWorker().
- * - the worker thread that yielded its capability then tries
- * to re-grab a capability and re-enter the Scheduler.
- */
+void
+releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap);
+ RELEASE_LOCK(&cap->lock);
+}
+
+static void
+releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
+{
+ Task *task;
+
+ ACQUIRE_LOCK(&cap->lock);
+
+ task = cap->running_task;
+
+ // If the current task is a worker, save it on the spare_workers
+ // list of this Capability. A worker can mark itself as stopped,
+ // in which case it is not replaced on the spare_worker queue.
+ // This happens when the system is shutting down (see
+ // Schedule.c:workerStart()).
+ // Also, be careful to check that this task hasn't just exited
+ // Haskell to do a foreign call (task->suspended_tso).
+ if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
+ task->next = cap->spare_workers;
+ cap->spare_workers = task;
+ }
+ // Bound tasks just float around attached to their TSOs.
+
+ releaseCapability_(cap);
+
+ RELEASE_LOCK(&cap->lock);
+}
+#endif
/* ----------------------------------------------------------------------------
- * waitForReturnCapability( Mutext *pMutex, Capability** )
+ * waitForReturnCapability( Task *task )
*
* Purpose: when an OS thread returns from an external call,
- * it calls grabReturnCapability() (via Schedule.resumeThread())
- * to wait for permissions to enter the RTS & communicate the
+ * it calls waitForReturnCapability() (via Schedule.resumeThread())
+ * to wait for permission to enter the RTS & communicate the
* result of the external call back to the Haskell thread that
* made it.
*
* ------------------------------------------------------------------------- */
-
void
-waitForReturnCapability( Mutex* pMutex, Capability** pCap )
+waitForReturnCapability (Capability **pCap,
+ Task *task UNUSED_IF_NOT_THREADS)
{
- // Pre-condition: pMutex is held.
+#if !defined(THREADED_RTS)
- IF_DEBUG(scheduler,
- sched_belch("worker: returning; workers waiting: %d",
- rts_n_waiting_workers));
+ MainCapability.running_task = task;
+ task->cap = &MainCapability;
+ *pCap = &MainCapability;
- if ( noCapabilities() ) {
- rts_n_waiting_workers++;
- context_switch = 1; // make sure it's our turn soon
- waitCondition(&returning_worker_cond, pMutex);
-#if defined(SMP)
- *pCap = free_capabilities;
- free_capabilities = (*pCap)->link;
- ASSERT(pCap != NULL);
- insertHashTable(capability_hash, osThreadId(), *pCap);
#else
- *pCap = &MainCapability;
- ASSERT(rts_n_free_capabilities == 0);
-#endif
+ Capability *cap = *pCap;
+
+ if (cap == NULL) {
+ // Try last_free_capability first
+ cap = last_free_capability;
+ if (!cap->running_task) {
+ nat i;
+ // otherwise, search for a free capability
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ if (!cap->running_task) {
+ break;
+ }
+ }
+ // Can't find a free one, use last_free_capability.
+ cap = last_free_capability;
+ }
+
+ // record the Capability as the one this Task is now assocated with.
+ task->cap = cap;
+
} else {
- grabCapability(pCap);
+ ASSERT(task->cap == cap);
}
- // Post-condition: pMutex is held, pCap points to a capability
- // which is now held by the current thread.
- return;
-}
+ ACQUIRE_LOCK(&cap->lock);
+ IF_DEBUG(scheduler,
+ sched_belch("returning; I want capability %d", cap->no));
+ if (!cap->running_task) {
+ // It's free; just grab it
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ } else {
+ newReturningTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ // now check whether we should wake up...
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task == NULL) {
+ if (cap->returning_tasks_hd != task) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->running_task = task;
+ popReturningTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+
+ }
+
+ ASSERT(cap->running_task == task);
+
+ IF_DEBUG(scheduler,
+ sched_belch("returning; got capability %d", cap->no));
+
+ *pCap = cap;
+#endif
+}
+
+#if defined(THREADED_RTS)
/* ----------------------------------------------------------------------------
- * yieldCapability( Mutex* pMutex, Capability** pCap )
+ * yieldCapability
* ------------------------------------------------------------------------- */
void
-yieldCapability( Capability** pCap, Condition *cond )
+yieldCapability (Capability** pCap, Task *task)
{
- // Pre-condition: pMutex is assumed held, the current thread
- // holds the capability pointed to by pCap.
-
- if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) {
- IF_DEBUG(scheduler,
- if (rts_n_waiting_workers > 0) {
- sched_belch("worker: giving up capability (returning wkr)");
- } else if (!EMPTY_RUN_QUEUE()) {
- sched_belch("worker: giving up capability (passing capability)");
- } else {
- sched_belch("worker: giving up capability (no threads to run)");
- }
- );
- releaseCapability(*pCap);
- *pCap = NULL;
+ Capability *cap = *pCap;
+
+ // The fast path; no locking
+ if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
+ return;
+
+ while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
+ IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
+
+ // We must now release the capability and wait to be woken up
+ // again.
+ releaseCapabilityAndQueueWorker(cap);
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+
+ if (task->tso == NULL) {
+ ASSERT(cap->spare_workers != NULL);
+ // if we're not at the front of the queue, release it
+ // again. This is unlikely to happen.
+ if (cap->spare_workers != task) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->spare_workers = task->next;
+ task->next = NULL;
+ }
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+
+ IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
+ ASSERT(cap->running_task == task);
}
- // Post-condition: either:
- //
- // 1. *pCap is NULL, in which case the current thread does not
- // hold a capability now, or
- // 2. *pCap is not NULL, in which case the current thread still
- // holds the capability.
- //
+ *pCap = cap;
return;
}
-
/* ----------------------------------------------------------------------------
- * waitForCapability( Mutex*, Capability**, Condition* )
+ * prodCapabilities
*
- * Purpose: wait for a Capability to become available. In
- * the process of doing so, updates the number
- * of tasks currently blocked waiting for a capability/more
- * work. That counter is used when deciding whether or
- * not to create a new worker thread when an external
- * call is made.
- * If pThreadCond is not NULL, a capability can be specifically
- * passed to this thread.
+ * Used to indicate that the interrupted flag is now set, or some
+ * other global condition that might require waking up a Task on each
+ * Capability.
* ------------------------------------------------------------------------- */
-
-void
-waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
-{
- // Pre-condition: pMutex is held.
-
- while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) {
- IF_DEBUG(scheduler,
- sched_belch("worker: wait for capability (cond: %p)",
- pThreadCond));
- if (pThreadCond != NULL) {
- waitCondition(pThreadCond, pMutex);
- IF_DEBUG(scheduler, sched_belch("worker: get passed capability"));
- } else {
- rts_n_waiting_tasks++;
- waitCondition(&thread_ready_cond, pMutex);
- rts_n_waiting_tasks--;
- IF_DEBUG(scheduler, sched_belch("worker: get normal capability"));
+static void
+prodCapabilities(rtsBool all)
+{
+ nat i;
+ Capability *cap;
+ Task *task;
+
+ for (i=0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ ACQUIRE_LOCK(&cap->lock);
+ if (!cap->running_task) {
+ if (cap->spare_workers) {
+ task = cap->spare_workers;
+ ASSERT(!task->stopped);
+ giveCapabilityToTask(cap,task);
+ if (!all) {
+ RELEASE_LOCK(&cap->lock);
+ return;
+ }
+ }
}
+ RELEASE_LOCK(&cap->lock);
}
- grabCapability(pCap);
-
- // Post-condition: pMutex is held and *pCap is held by the current thread
- return;
}
-#endif /* RTS_SUPPORTS_THREADS */
+void
+prodAllCapabilities (void)
+{
+ prodCapabilities(rtsTrue);
+}
/* ----------------------------------------------------------------------------
- threadRunnable()
+ * prodOneCapability
+ *
+ * Like prodAllCapabilities, but we only require a single Task to wake
+ * up in order to service some global event, such as checking for
+ * deadlock after some idle time has passed.
+ * ------------------------------------------------------------------------- */
- Signals that a thread has been placed on the run queue, so a worker
- might need to be woken up to run it.
+void
+prodOneCapability (void)
+{
+ prodCapabilities(rtsFalse);
+}
+
+/* ----------------------------------------------------------------------------
+ * shutdownCapability
+ *
+ * At shutdown time, we want to let everything exit as cleanly as
+ * possible. For each capability, we let its run queue drain, and
+ * allow the workers to stop.
+ *
+ * This function should be called when interrupted and
+ * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
+ * will exit the scheduler and call taskStop(), and any bound thread
+ * that wakes up will return to its caller. Runnable threads are
+ * killed.
+ *
+ * ------------------------------------------------------------------------- */
- ToDo: should check whether the thread at the front of the queue is
- bound, and if so wake up the appropriate worker.
- -------------------------------------------------------------------------- */
void
-threadRunnable ( void )
+shutdownCapability (Capability *cap, Task *task)
{
-#if defined(RTS_SUPPORTS_THREADS)
- if ( !noCapabilities() && ANY_WORK_TO_DO() ) {
- if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) {
- signalCondition(&run_queue_hd->main->bound_thread_cond);
- return;
+ nat i;
+
+ ASSERT(interrupted && shutting_down_scheduler);
+
+ task->cap = cap;
+
+ for (i = 0; i < 50; i++) {
+ IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task) {
+ RELEASE_LOCK(&cap->lock);
+ IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
+ yieldThread();
+ continue;
}
- if (rts_n_waiting_tasks > 0) {
- signalCondition(&thread_ready_cond);
- } else {
- startSchedulerTaskIfNecessary();
+ cap->running_task = task;
+ if (!emptyRunQueue(cap) || cap->spare_workers) {
+ IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
+ releaseCapability_(cap); // this will wake up a worker
+ RELEASE_LOCK(&cap->lock);
+ yieldThread();
+ continue;
}
+ IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
+ RELEASE_LOCK(&cap->lock);
+ break;
}
-#endif
+ // we now have the Capability, its run queue and spare workers
+ // list are both empty.
}
+#endif /* THREADED_RTS */
-/* ----------------------------------------------------------------------------
- prodWorker()
- Wake up... time to die.
- -------------------------------------------------------------------------- */
-void
-prodWorker ( void )
-{
-#if defined(RTS_SUPPORTS_THREADS)
- signalCondition(&thread_ready_cond);
-#endif
-}
*
* --------------------------------------------------------------------------*/
-#ifndef __CAPABILITY_H__
-#define __CAPABILITY_H__
+#ifndef CAPABILITY_H
+#define CAPABILITY_H
#include "RtsFlags.h"
+#include "Task.h"
+
+struct Capability_ {
+ // State required by the STG virtual machine when running Haskell
+ // code. During STG execution, the BaseReg register always points
+ // to the StgRegTable of the current Capability (&cap->r).
+ StgFunTable f;
+ StgRegTable r;
+
+ nat no; // capability number.
+
+ // The Task currently holding this Capability. This task has
+ // exclusive access to the contents of this Capability (apart from
+ // returning_tasks_hd/returning_tasks_tl).
+ // Locks required: cap->lock.
+ Task *running_task;
+
+ // true if this Capability is running Haskell code, used for
+ // catching unsafe call-ins.
+ rtsBool in_haskell;
+
+ // The run queue. The Task owning this Capability has exclusive
+ // access to its run queue, so can wake up threads without
+ // taking a lock, and the common path through the scheduler is
+ // also lock-free.
+ StgTSO *run_queue_hd;
+ StgTSO *run_queue_tl;
+
+ // Tasks currently making safe foreign calls. Doubly-linked.
+ // When returning, a task first acquires the Capability before
+ // removing itself from this list, so that the GC can find all
+ // the suspended TSOs easily. Hence, when migrating a Task from
+ // the returning_tasks list, we must also migrate its entry from
+ // this list.
+ Task *suspended_ccalling_tasks;
+
+#if defined(THREADED_RTS)
+ struct Capability_ *next;
+ struct Capability_ *prev;
+
+ // Worker Tasks waiting in the wings. Singly-linked.
+ Task *spare_workers;
+
+ // This lock protects running_task and returning_tasks_{hd,tl}.
+ Mutex lock;
+
+ // Tasks waiting to return from a foreign call, or waiting to make
+ // a new call-in using this Capability (NULL if empty).
+ // NB. this field needs to be modified by tasks other than the
+ // running_task, so it requires cap->lock to modify. A task can
+ // check whether it is NULL without taking the lock, however.
+ Task *returning_tasks_hd; // Singly-linked, with head/tail
+ Task *returning_tasks_tl;
+#endif
+}; // typedef Capability, defined in RtsAPI.h
-// All the capabilities
-extern Capability *capabilities;
+// Converts a *StgRegTable into a *Capability.
+//
+INLINE_HEADER Capability *
+regTableToCapability (StgRegTable *reg)
+{
+ return (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
+}
-// Initialised the available capabilities.
+// Initialise the available capabilities.
//
-extern void initCapabilities( void );
+void initCapabilities (void);
+
+// Release a capability. This is called by a Task that is exiting
+// Haskell to make a foreign call, or in various other cases when we
+// want to relinquish a Capability that we currently hold.
+//
+// ASSUMES: cap->running_task is the current Task.
+//
+#if defined(THREADED_RTS)
+void releaseCapability (Capability* cap);
+void releaseCapability_ (Capability* cap); // assumes cap->lock is held
+#else
+// releaseCapability() is empty in non-threaded RTS
+INLINE_HEADER void releaseCapability (Capability* cap STG_UNUSED) {};
+INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED) {};
+#endif
-// Releases a capability
+#if !IN_STG_CODE && !defined(SMP)
+// for non-SMP, we have one global capability
+extern Capability MainCapability;
+#endif
+
+// Array of all the capabilities
//
-extern void releaseCapability( Capability* cap );
+extern nat n_capabilities;
+extern Capability *capabilities;
-// Signal that a thread has become runnable
+// The Capability that was last free. Used as a good guess for where
+// to assign new threads.
//
-extern void threadRunnable ( void );
+extern Capability *last_free_capability;
-// Return the capability that I own.
-//
-extern Capability *myCapability (void);
+// Acquires a capability at a return point. If *cap is non-NULL, then
+// this is taken as a preference for the Capability we wish to
+// acquire.
+//
+// OS threads waiting in this function get priority over those waiting
+// in waitForCapability().
+//
+// On return, *cap is non-NULL, and points to the Capability acquired.
+//
+void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
-extern void prodWorker ( void );
+#if defined(THREADED_RTS)
-#ifdef RTS_SUPPORTS_THREADS
// Gives up the current capability IFF there is a higher-priority
// thread waiting for it. This happens in one of two ways:
//
// (b) there is an OS thread waiting to return from a foreign call
//
// On return: *pCap is NULL if the capability was released. The
-// current worker thread should then re-acquire it using
-// waitForCapability().
+// current task should then re-acquire it using waitForCapability().
//
-extern void yieldCapability( Capability** pCap, Condition *cond );
+void yieldCapability (Capability** pCap, Task *task);
// Acquires a capability for doing some work.
//
-// If the current OS thread is bound to a particular Haskell thread,
-// then pThreadCond points to a condition variable for waking up this
-// OS thread when its Haskell thread is ready to run.
-//
// On return: pCap points to the capability.
-extern void waitForCapability( Mutex* pMutex, Capability** pCap,
- Condition *pThreadCond );
-
-// Acquires a capability at a return point.
//
-// OS threads waiting in this function get priority over those waiting
-// in waitForWorkCapability().
-//
-// On return: pCap points to the capability.
-extern void waitForReturnCapability(Mutex* pMutex, Capability** pCap);
+void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
-// Signals that the next time a capability becomes free, it should
-// be transfered to a particular OS thread, identified by the
-// condition variable pTargetThreadCond.
+// Wakes up a worker thread on just one Capability, used when we
+// need to service some global event.
//
-extern void passCapability(Condition *pTargetThreadCond);
+void prodOneCapability (void);
-// Signals that the next time a capability becomes free, it should
-// be transfered to an ordinary worker thread.
+// Similar to prodOneCapability(), but prods all of them.
//
-extern void passCapabilityToWorker( void );
-
-extern nat rts_n_free_capabilities;
-
-extern Capability *free_capabilities;
-
-/* number of worker threads waiting for a return capability
- */
-extern nat rts_n_waiting_workers;
+void prodAllCapabilities (void);
-static inline rtsBool needToYieldToReturningWorker(void)
-{
- return rts_n_waiting_workers > 0;
-}
-
-static inline nat getFreeCapabilities (void)
-{
- return rts_n_free_capabilities;
-}
-
-static inline rtsBool noCapabilities (void)
-{
- return (rts_n_free_capabilities == 0);
-}
-
-static inline rtsBool allFreeCapabilities (void)
-{
-#if defined(SMP)
- return (rts_n_free_capabilities == RTS_DEREF(RtsFlags).ParFlags.nNodes);
-#else
- return (rts_n_free_capabilities == 1);
-#endif
-}
+// Waits for a capability to drain of runnable threads and workers,
+// and then acquires it. Used at shutdown time.
+//
+void shutdownCapability (Capability *cap, Task *task);
-#else // !RTS_SUPPORTS_THREADS
+#else // !THREADED_RTS
// Grab a capability. (Only in the non-threaded RTS; in the threaded
// RTS one of the waitFor*Capability() functions must be used).
//
-extern void grabCapability( Capability **pCap );
+extern void grabCapability (Capability **pCap);
-#endif /* !RTS_SUPPORTS_THREADS */
+#endif /* !THREADED_RTS */
-#endif /* __CAPABILITY_H__ */
+#endif /* CAPABILITY_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2000
+ * (c) The GHC Team, 1998-2005
*
* Prototypes for functions in Disassembler.c
*
* ---------------------------------------------------------------------------*/
+#ifndef DISASSEMBLER_H
+#define DISASSEMBLER_H
+
#ifdef DEBUG
extern int disInstr ( StgBCO *bco, int pc );
extern void disassemble( StgBCO *bco );
#endif
+
+#endif /* DISASSEMBLER_H */
// Not true: see comments above
// ASSERT(StgTSO_blocked_exceptions(CurrentTSO) != NULL);
#if defined(GRAN) || defined(PAR)
- foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr",
+ foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr",
NULL "ptr");
#else
- foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr");
+ foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
#endif
StgTSO_blocked_exceptions(CurrentTSO) = NULL;
#ifdef REG_R1
if (StgTSO_blocked_exceptions(CurrentTSO) != NULL) {
#if defined(GRAN) || defined(PAR)
- foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr",
+ foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr",
StgTSO_block_info(CurrentTSO) "ptr");
#else
- foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr");
+ foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
#endif
StgTSO_blocked_exceptions(CurrentTSO) = NULL;
*/
if (R1 == CurrentTSO) {
SAVE_THREAD_STATE();
- foreign "C" raiseAsyncWithLock(R1 "ptr", R2 "ptr");
+ foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) {
R1 = ThreadFinished;
jump StgReturn;
jump %ENTRY_CODE(Sp(0));
}
} else {
- foreign "C" raiseAsyncWithLock(R1 "ptr", R2 "ptr");
+ foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
}
jump %ENTRY_CODE(Sp(0));
retry_pop_stack:
StgTSO_sp(CurrentTSO) = Sp;
- frame_type = foreign "C" raiseExceptionHelper(CurrentTSO "ptr", R1 "ptr");
+ frame_type = foreign "C" raiseExceptionHelper(BaseReg "ptr", CurrentTSO "ptr", R1 "ptr");
Sp = StgTSO_sp(CurrentTSO);
if (frame_type == ATOMICALLY_FRAME) {
/* The exception has reached the edge of a memory transaction. Check that
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2000
+ * (c) The GHC Team, 1998-2005
*
* Exception support
*
* ---------------------------------------------------------------------------*/
+#ifndef EXCEPTION_H
+#define EXCEPTION_H
+
extern const StgRetInfoTable stg_blockAsyncExceptionszh_ret_info;
extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info;
* indefinitely). Interruptible threads can be sent an exception with
* killThread# even if they have async exceptions blocked.
*/
-INLINE_HEADER int
+STATIC_INLINE int
interruptible(StgTSO *t)
{
switch (t->why_blocked) {
}
}
+#endif /* EXCEPTION_H */
+
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team 2000
+ * (c) The GHC Team 2000-2005
*
* RTS GTK Front Panel
*
* ---------------------------------------------------------------------------*/
+#ifndef FRONTPANEL_H
+#define FRONTPANEL_H
+
#ifdef RTS_GTK_FRONTPANEL
#include "Rts.h" /* needed because this file gets included by
#endif /* RTS_GTK_FRONTPANEL */
+#endif /* FRONTPANEL_H */
+
#include "Prelude.h"
#include "ParTicky.h" // ToDo: move into Rts.h
#include "GCCompact.h"
-#include "Signals.h"
+#include "RtsSignals.h"
#include "STM.h"
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
- free from-space in each step, and set from-space = to-space.
- Locks held: sched_mutex
+ Locks held: all capabilities are held throughout GarbageCollect().
-------------------------------------------------------------------------- */
lnat oldgen_saved_blocks = 0;
nat g, s;
+ ACQUIRE_SM_LOCK;
+
#ifdef PROFILING
CostCentreStack *prev_CCS;
#endif
}
}
- /* Update the pointers from the "main thread" list - these are
+ /* Update the pointers from the task list - these are
* treated as weak pointers because we want to allow a main thread
* to get a BlockedOnDeadMVar exception in the same way as any other
* thread. Note that the threads should all have been retained by
* updating pointers here.
*/
{
- StgMainThread *m;
+ Task *task;
StgTSO *tso;
- for (m = main_threads; m != NULL; m = m->link) {
- tso = (StgTSO *) isAlive((StgClosure *)m->tso);
- if (tso == NULL) {
- barf("main thread has been GC'd");
+ for (task = all_tasks; task != NULL; task = task->all_link) {
+ if (!task->stopped && task->tso) {
+ tso = (StgTSO *) isAlive((StgClosure *)task->tso);
+ if (tso == NULL) {
+ barf("task %p: main thread %d has been GC'd",
+#ifdef THREADED_RTS
+ (void *)task->id,
+#else
+ (void *)task,
+#endif
+ task->tso->id);
+ }
+ task->tso = tso;
}
- m->tso = tso;
}
}
// Reset the nursery
resetNurseries();
- RELEASE_LOCK(&sched_mutex);
-
// start any pending finalizers
- scheduleFinalizers(old_weak_ptr_list);
+ scheduleFinalizers(last_free_capability, old_weak_ptr_list);
// send exceptions to any threads which were about to die
resurrectThreads(resurrected_threads);
-
- ACQUIRE_LOCK(&sched_mutex);
// Update the stable pointer hash table.
updateStablePtrTable(major_gc);
unblockUserSignals();
#endif
+ RELEASE_SM_LOCK;
+
//PAR_TICKY_TP();
}
if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
- debugBelch("Unexpected lazy BHing required at 0x%04x\n",(int)bh);
+ debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
#endif
#ifdef PROFILING
// @LDV profiling
if (bh->header.info != &stg_BLACKHOLE_info &&
bh->header.info != &stg_CAF_BLACKHOLE_info) {
#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
- debugBelch("Unexpected lazy BHing required at 0x%04x",(int)bh);
+ debugBelch("Unexpected lazy BHing required at 0x%04lx",(long)bh);
#endif
#ifdef DEBUG
// zero out the slop so that the sanity checker can tell
// any threads resurrected during this GC
thread((StgPtr)&resurrected_threads);
- // the main threads list
+ // the task list
{
- StgMainThread *m;
- for (m = main_threads; m != NULL; m = m->link) {
- thread((StgPtr)&m->tso);
+ Task *task;
+ for (task = all_tasks; task != NULL; task = task->all_link) {
+ if (task->tso) {
+ thread((StgPtr)&task->tso);
+ }
}
}
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team 1998-1999
+ * (c) The GHC Team 1998-2005
*
* Compacting garbage collector
*
* ---------------------------------------------------------------------------*/
-INLINE_HEADER void
+#ifndef GCCOMPACT_H
+#define GCCOMPACT_H
+
+STATIC_INLINE void
mark(StgPtr p, bdescr *bd)
{
nat offset_within_block = p - bd->start; // in words
*bitmap_word |= bit_mask;
}
-INLINE_HEADER void
+STATIC_INLINE void
unmark(StgPtr p, bdescr *bd)
{
nat offset_within_block = p - bd->start; // in words
*bitmap_word &= ~bit_mask;
}
-INLINE_HEADER StgWord
+STATIC_INLINE StgWord
is_marked(StgPtr p, bdescr *bd)
{
nat offset_within_block = p - bd->start; // in words
}
void compact( void (*get_roots)(evac_fn) );
+
+#endif /* GCCOMPACT_H */
*
* -------------------------------------------------------------------------- */
+#ifndef HASH_H
+#define HASH_H
+
typedef struct hashtable HashTable; /* abstract */
/* Hash table access where the keys are StgWords */
/* Freeing hash tables
*/
void freeHashTable ( HashTable *table, void (*freeDataFun)(void *) );
+
+#endif /* HASH_H */
+
}
case bci_CCALL: {
- StgInt tok;
+ void *tok;
int stk_offset = BCO_NEXT;
int o_itbl = BCO_NEXT;
void(*marshall_fn)(void*) = (void (*)(void*))BCO_LIT(o_itbl);
RET_DYN_BITMAP_SIZE + RET_DYN_NONPTR_REGS_SIZE
+ sizeofW(StgRetDyn);
-#ifdef RTS_SUPPORTS_THREADS
+#ifdef THREADED_RTS
// Threaded RTS:
// Arguments on the TSO stack are not good, because garbage
// collection might move the TSO as soon as we call
SAVE_STACK_POINTERS;
tok = suspendThread(&cap->r);
-#ifndef RTS_SUPPORTS_THREADS
+#ifndef THREADED_RTS
// Careful:
// suspendThread might have shifted the stack
// around (stack squeezing), so we have to grab the real
// Save the Haskell thread's current value of errno
cap->r.rCurrentTSO->saved_errno = errno;
-#ifdef RTS_SUPPORTS_THREADS
+#ifdef THREADED_RTS
// Threaded RTS:
// Copy the "arguments", which might include a return value,
// back to the TSO stack. It would of course be enough to
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team 1998-2001
+ * (c) The GHC Team 1998-2005
*
* Interval timer for profiling and pre-emptive scheduling.
*
* ---------------------------------------------------------------------------*/
-#ifndef __ITIMER_H__
-#define __ITIMER_H__
+
+#ifndef ITIMER_H
+#define ITIMER_H
extern int startTicker( nat ms, TickProc handle_tick);
extern int stopTicker ( void );
extern void block_vtalrm_signal ( void );
extern void unblock_vtalrm_signal ( void );
#endif
-#endif /* __ITIMER_H__ */
+
+#endif /* ITIMER_H */
#if !defined(mingw32_HOST_OS)
#define RTS_USER_SIGNALS_SYMBOLS \
- SymX(startSignalHandler) \
SymX(setIOManagerPipe)
#else
#define RTS_USER_SIGNALS_SYMBOLS /* nothing */
SymX(rts_mkWord8) \
SymX(rts_unlock) \
SymX(rtsSupportsBoundThreads) \
- SymX(run_queue_hd) \
SymX(__hscore_get_saved_termios) \
SymX(__hscore_set_saved_termios) \
SymX(setProgArgv) \
// debugBelch("ghci_enquire: can't find %s\n", sym);
}
else if (addr-DELTA <= a && a <= addr+DELTA) {
- debugBelch("%p + %3d == `%s'\n", addr, a - addr, sym);
+ debugBelch("%p + %3d == `%s'\n", addr, (int)(a - addr), sym);
}
}
}
}
IF_DEBUG(linker,debugBelch(
- "\nSection header table: start %d, n_entries %d, ent_size %d\n",
+ "\nSection header table: start %ld, n_entries %d, ent_size %d\n",
ehdr->e_shoff, ehdr->e_shnum, ehdr->e_shentsize ));
ASSERT (ehdr->e_shentsize == sizeof(Elf_Shdr));
nsymtabs++;
stab = (Elf_Sym*) (ehdrC + shdr[i].sh_offset);
nent = shdr[i].sh_size / sizeof(Elf_Sym);
- IF_DEBUG(linker,debugBelch( " number of entries is apparently %d (%d rem)\n",
+ IF_DEBUG(linker,debugBelch( " number of entries is apparently %d (%ld rem)\n",
nent,
shdr[i].sh_size % sizeof(Elf_Sym)
));
case R_386_PC32: *pP = value - P; break;
# endif
default:
- errorBelch("%s: unhandled ELF relocation(Rel) type %d\n",
+ errorBelch("%s: unhandled ELF relocation(Rel) type %ld\n",
oc->fileName, ELF_R_TYPE(info));
return 0;
}
Elf_Sym* stab, char* strtab )
{
int j;
- char *symbol;
+ char *symbol = NULL;
Elf_Addr targ;
Elf_Rela* rtab = (Elf_Rela*) (ehdrC + shdr[shnum].sh_offset);
int nent = shdr[shnum].sh_size / sizeof(Elf_Rela);
#endif
default:
- errorBelch("%s: unhandled ELF relocation(RelA) type %d\n",
+ errorBelch("%s: unhandled ELF relocation(RelA) type %ld\n",
oc->fileName, ELF_R_TYPE(info));
return 0;
}
*
* ---------------------------------------------------------------------------*/
+#ifndef LINKERINTERNALS_H
+#define LINKERINTERNALS_H
+
typedef enum { OBJECT_LOADED, OBJECT_RESOLVED } OStatus;
/* Indication of section kinds for loaded objects. Needed by
} ObjectCode;
extern ObjectCode *objects;
+
+#endif /* LINKERINTERNALS_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-1999
+ * (c) The GHC Team, 1998-2005
*
* MegaBlock Allocator interface.
*
* ---------------------------------------------------------------------------*/
-#ifndef __MBLOCK_H__
-#define __MBLOCK_H__
+#ifndef MBLOCK_H
+#define MBLOCK_H
extern lnat RTS_VAR(mblocks_allocated);
# error HEAP_ALLOCED not defined
#endif
-#endif /* __MBLOCK_H__ */
+#endif /* MBLOCK_H */
startupHaskell(argc,argv,__stginit_ZCMain);
- /* Register this thread as a task, so we can get timing stats about it */
-#if defined(RTS_SUPPORTS_THREADS)
- threadIsTask(osThreadId());
-#endif
-
/* kick off the computation by creating the main thread with a pointer
to mainIO_closure representing the computation of the overall program;
then enter the scheduler with this thread and off we go;
# else /* !PAR && !GRAN */
/* ToDo: want to start with a larger stack size */
- rts_lock();
- status = rts_evalLazyIO((HaskellObj)mainIO_closure, NULL);
- rts_unlock();
+ {
+ void *cap = rts_lock();
+ cap = rts_evalLazyIO(cap,(HaskellObj)mainIO_closure, NULL);
+ status = rts_getSchedStatus(cap);
+ rts_unlock(cap);
+ }
# endif /* !PAR && !GRAN */
ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
ALL_DIRS += win32
-EXCLUDED_SRCS += Itimer.c Select.c Signals.c
+else
+ALL_DIRS += posix
endif
ifneq "$(DLLized)" "YES"
# COMPILING_RTS is only used when building Win32 DLL support.
STANDARD_OPTS += -DCOMPILING_RTS
-ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
-STANDARD_OPTS += -Iwin32
-endif
-
# HC_OPTS is included in both .c and .cmm compilations, whereas CC_OPTS is
# only included in .c compilations. HC_OPTS included the WAY_* opts, which
# must be included in both types of compilations.
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
* Include this file into sources which should not need any non-Posix services.
* That includes most RTS C sources.
/* Let's be ISO C9X too... */
-#endif
+#endif /* POSIXSOURCE_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
* Prelude identifiers that we sometimes need to refer to in the RTS.
*
n = R1;
payload_words = ROUNDUP_BYTES_TO_WDS(n);
words = BYTES_TO_WDS(SIZEOF_StgArrWords) + payload_words;
- "ptr" p = foreign "C" allocateLocal(BaseReg "ptr",words) [];
+ "ptr" p = foreign "C" allocateLocal(MyCapability() "ptr",words) [];
TICK_ALLOC_PRIM(SIZEOF_StgArrWords,WDS(payload_words),0);
SET_HDR(p, stg_ARR_WORDS_info, W_[CCCS]);
StgArrWords_words(p) = payload_words;
MAYBE_GC(R2_PTR,newArrayzh_fast);
words = BYTES_TO_WDS(SIZEOF_StgMutArrPtrs) + n;
- "ptr" arr = foreign "C" allocateLocal(BaseReg "ptr",words) [];
+ "ptr" arr = foreign "C" allocateLocal(MyCapability() "ptr",words) [];
TICK_ALLOC_PRIM(SIZEOF_StgMutArrPtrs, WDS(n), 0);
SET_HDR(arr, stg_MUT_ARR_PTRS_info, W_[CCCS]);
MAYBE_GC(R1_PTR, forkzh_fast);
- foreign "C" ACQUIRE_LOCK(sched_mutex "ptr");
-
// create it right now, return ThreadID in R1
- "ptr" R1 = foreign "C" createIOThread( RtsFlags_GcFlags_initialStkSize(RtsFlags),
- R1 "ptr");
- foreign "C" scheduleThreadLocked(R1 "ptr");
-
- foreign "C" RELEASE_LOCK(sched_mutex "ptr");
+ "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr",
+ RtsFlags_GcFlags_initialStkSize(RtsFlags),
+ R1 "ptr");
+ foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr");
// switch at the earliest opportunity
CInt[context_switch] = 1 :: CInt;
frame = Sp;
trec = StgTSO_trec(CurrentTSO);
"ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
- r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", trec "ptr") [];
+ r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr") [];
if (r) {
/* Succeeded (either first branch or second branch) */
StgTSO_trec(CurrentTSO) = outer;
} else {
/* Did not commit: retry */
W_ new_trec;
- "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr") [];
+ "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr") [];
StgTSO_trec(CurrentTSO) = new_trec;
if (StgCatchRetryFrame_running_alt_code(frame)) {
R1 = StgCatchRetryFrame_alt_code(frame);
jump stg_block_noregs;
} else {
/* Previous attempt is no longer valid: try again */
- "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr");
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr");
StgTSO_trec(CurrentTSO) = trec;
StgAtomicallyFrame_waiting(frame) = 0 :: CInt; /* false; */
R1 = StgAtomicallyFrame_code(frame);
}
} else {
/* The TSO is not currently waiting: try to commit the transaction */
- valid = foreign "C" stmCommitTransaction(BaseReg "ptr", trec "ptr");
+ valid = foreign "C" stmCommitTransaction(MyCapability() "ptr", trec "ptr");
if (valid) {
/* Transaction was valid: commit succeeded */
StgTSO_trec(CurrentTSO) = NO_TREC;
jump %ENTRY_CODE(Sp(SP_OFF));
} else {
/* Transaction was not valid: try again */
- "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr");
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr");
StgTSO_trec(CurrentTSO) = trec;
R1 = StgAtomicallyFrame_code(frame);
Sp_adj(-1);
/* Start the memory transcation */
old_trec = StgTSO_trec(CurrentTSO);
ASSERT(old_trec == NO_TREC);
- "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", old_trec "ptr");
+ "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", old_trec "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
/* Apply R1 to the realworld token */
/* Start a nested transaction within which to run the first code */
trec = StgTSO_trec(CurrentTSO);
- "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", trec "ptr");
+ "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", trec "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
/* Set up the catch-retry frame */
ASSERT(outer != NO_TREC);
if (!StgCatchRetryFrame_running_alt_code(frame)) {
// Retry in the first code: try the alternative
- "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr");
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr");
StgTSO_trec(CurrentTSO) = trec;
StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true;
R1 = StgCatchRetryFrame_alt_code(frame);
// Retry in the alternative code: propagate
W_ other_trec;
other_trec = StgCatchRetryFrame_first_code_trec(frame);
- r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", other_trec "ptr");
+ r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", other_trec "ptr");
if (r) {
- r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", trec "ptr");
+ r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
}
if (r) {
// Merge between siblings succeeded: commit it back to enclosing transaction
goto retry_pop_stack;
} else {
// Merge failed: we musn't propagate the retry. Try both paths again.
- "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr");
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr");
StgCatchRetryFrame_first_code_trec(frame) = trec;
StgCatchRetryFrame_running_alt_code(frame) = 0 :: CInt; // false;
StgTSO_trec(CurrentTSO) = trec;
// We've reached the ATOMICALLY_FRAME: attempt to wait
ASSERT(frame_type == ATOMICALLY_FRAME);
ASSERT(outer == NO_TREC);
- r = foreign "C" stmWait(BaseReg "ptr", CurrentTSO "ptr", trec "ptr");
+ r = foreign "C" stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr");
if (r) {
// Transaction was valid: stmWait put us on the TVars' queues, we now block
StgAtomicallyFrame_waiting(frame) = 1 :: CInt; // true
jump stg_block_noregs;
} else {
// Transaction was not valid: retry immediately
- "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr");
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr");
StgTSO_trec(CurrentTSO) = trec;
R1 = StgAtomicallyFrame_code(frame);
Sp = frame;
MAYBE_GC (R1_PTR, newTVarzh_fast);
new_value = R1;
- tv = foreign "C" stmNewTVar(BaseReg "ptr", new_value "ptr");
+ "ptr" tv = foreign "C" stmNewTVar(MyCapability() "ptr", new_value "ptr");
RET_P(tv);
}
MAYBE_GC (R1_PTR, readTVarzh_fast); // Call to stmReadTVar may allocate
trec = StgTSO_trec(CurrentTSO);
tvar = R1;
- "ptr" result = foreign "C" stmReadTVar(BaseReg "ptr", trec "ptr", tvar "ptr") [];
+ "ptr" result = foreign "C" stmReadTVar(MyCapability() "ptr", trec "ptr", tvar "ptr") [];
RET_P(result);
}
trec = StgTSO_trec(CurrentTSO);
tvar = R1;
new_value = R2;
- foreign "C" stmWriteTVar(BaseReg "ptr", trec "ptr", tvar "ptr", new_value "ptr") [];
+ foreign "C" stmWriteTVar(MyCapability() "ptr", trec "ptr", tvar "ptr", new_value "ptr") [];
jump %ENTRY_CODE(Sp(0));
}
"ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar),mvar) [];
StgMVar_head(mvar) = tso;
#else
- "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") [];
+ "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr",
+ StgMVar_head(mvar) "ptr") [];
StgMVar_head(mvar) = tso;
#endif
"ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr", mvar "ptr") [];
StgMVar_head(mvar) = tso;
#else
- "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") [];
+ "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr",
+ StgMVar_head(mvar) "ptr") [];
StgMVar_head(mvar) = tso;
#endif
#if defined(GRAN) || defined(PAR)
/* ToDo: check 2nd arg (mvar) is right */
- "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr",mvar "ptr") [];
+ "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr",mvar "ptr") [];
StgMVar_head(mvar) = tso;
#else
- "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") [];
+ "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr") [];
StgMVar_head(mvar) = tso;
#endif
#if defined(GRAN) || defined(PAR)
/* ToDo: check 2nd arg (mvar) is right */
- "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr",mvar "ptr") [];
+ "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr",mvar "ptr") [];
StgMVar_head(mvar) = tso;
#else
- "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") [];
+ "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr") [];
StgMVar_head(mvar) = tso;
#endif
/* args: R1 */
#ifdef THREADED_RTS
foreign "C" barf("waitRead# on threaded RTS");
-#endif
+#else
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
// threaded RTS anyway.
APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
jump stg_block_noregs;
+#endif
}
waitWritezh_fast
/* args: R1 */
#ifdef THREADED_RTS
foreign "C" barf("waitWrite# on threaded RTS");
-#endif
+#else
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
// threaded RTS anyway.
APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
jump stg_block_noregs;
+#endif
}
#ifdef THREADED_RTS
foreign "C" barf("delay# on threaded RTS");
-#endif
+#else
/* args: R1 (microsecond delay amount) */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
}
jump stg_block_noregs;
#endif
+#endif /* !THREADED_RTS */
}
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2000
+ * (c) The GHC Team, 1998-2005
*
* Prototypes for functions in Printer.c
*
* ---------------------------------------------------------------------------*/
+#ifndef PRINTER_H
+#define PRINTER_H
+
extern void printPtr ( StgPtr p );
extern void printObj ( StgClosure *obj );
extern const char *lookupGHCName( void *addr );
#endif
+
+#endif /* PRINTER_H */
+
#ifdef PROFILING
if (doingLDVProfiling() && doingRetainerProfiling()) {
errorBelch("cannot mix -hb and -hr");
- stg_exit(1);
+ stg_exit(EXIT_FAILURE);
}
#endif
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-1999
+ * (c) The GHC Team, 1998-2005
*
* Support for heap profiling
*
extern void LDV_recordDead( StgClosure *c, nat size );
extern rtsBool strMatchesSelector( char* str, char* sel );
-#endif
+#endif /* PROFHEAP_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
* Support for profiling
*
* ---------------------------------------------------------------------------*/
+#ifndef PROFILING_H
+#define PROFILING_H
+
#include <stdio.h>
#if defined(PROFILING) || defined(DEBUG)
#endif
#endif
+
+#endif /* PROFILING_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998
+ * (c) The GHC Team, 1998-2005
*
* Profiling interval timer
*
* ---------------------------------------------------------------------------*/
+#ifndef PROFTIMER_H
+#define PROFTIMER_H
+
extern void initProfTimer ( void );
extern void handleProfTick ( void );
extern void startHeapProfTimer ( void );
extern rtsBool performHeapProfile;
+
+#endif /* PROFTIMER_H */
#include <stdlib.h>
-static Capability *rtsApiCapability = NULL;
-
/* ----------------------------------------------------------------------------
Building Haskell objects from C datatypes.
------------------------------------------------------------------------- */
HaskellObj
-rts_mkChar (HsChar c)
+rts_mkChar (Capability *cap, HsChar c)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap, CONSTR_sizeW(0,1));
SET_HDR(p, Czh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(StgChar)c;
return p;
}
HaskellObj
-rts_mkInt (HsInt i)
+rts_mkInt (Capability *cap, HsInt i)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, Izh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgInt)i;
return p;
}
HaskellObj
-rts_mkInt8 (HsInt8 i)
+rts_mkInt8 (Capability *cap, HsInt8 i)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, I8zh_con_info, CCS_SYSTEM);
/* Make sure we mask out the bits above the lowest 8 */
p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xff);
}
HaskellObj
-rts_mkInt16 (HsInt16 i)
+rts_mkInt16 (Capability *cap, HsInt16 i)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, I16zh_con_info, CCS_SYSTEM);
/* Make sure we mask out the relevant bits */
p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffff);
}
HaskellObj
-rts_mkInt32 (HsInt32 i)
+rts_mkInt32 (Capability *cap, HsInt32 i)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, I32zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffffffff);
return p;
}
HaskellObj
-rts_mkInt64 (HsInt64 i)
+rts_mkInt64 (Capability *cap, HsInt64 i)
{
llong *tmp;
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,2));
SET_HDR(p, I64zh_con_info, CCS_SYSTEM);
tmp = (llong*)&(p->payload[0]);
*tmp = (StgInt64)i;
}
HaskellObj
-rts_mkWord (HsWord i)
+rts_mkWord (Capability *cap, HsWord i)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, Wzh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)i;
return p;
}
HaskellObj
-rts_mkWord8 (HsWord8 w)
+rts_mkWord8 (Capability *cap, HsWord8 w)
{
/* see rts_mkInt* comments */
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, W8zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(w & 0xff);
return p;
}
HaskellObj
-rts_mkWord16 (HsWord16 w)
+rts_mkWord16 (Capability *cap, HsWord16 w)
{
/* see rts_mkInt* comments */
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, W16zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(w & 0xffff);
return p;
}
HaskellObj
-rts_mkWord32 (HsWord32 w)
+rts_mkWord32 (Capability *cap, HsWord32 w)
{
/* see rts_mkInt* comments */
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, W32zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(w & 0xffffffff);
return p;
}
HaskellObj
-rts_mkWord64 (HsWord64 w)
+rts_mkWord64 (Capability *cap, HsWord64 w)
{
ullong *tmp;
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,2));
/* see mk_Int8 comment */
SET_HDR(p, W64zh_con_info, CCS_SYSTEM);
tmp = (ullong*)&(p->payload[0]);
}
HaskellObj
-rts_mkFloat (HsFloat f)
+rts_mkFloat (Capability *cap, HsFloat f)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1));
SET_HDR(p, Fzh_con_info, CCS_SYSTEM);
ASSIGN_FLT((P_)p->payload, (StgFloat)f);
return p;
}
HaskellObj
-rts_mkDouble (HsDouble d)
+rts_mkDouble (Capability *cap, HsDouble d)
{
- StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble)));
+ StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,sizeofW(StgDouble)));
SET_HDR(p, Dzh_con_info, CCS_SYSTEM);
ASSIGN_DBL((P_)p->payload, (StgDouble)d);
return p;
}
HaskellObj
-rts_mkStablePtr (HsStablePtr s)
+rts_mkStablePtr (Capability *cap, HsStablePtr s)
{
- StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
+ StgClosure *p = (StgClosure *)allocateLocal(cap,sizeofW(StgHeader)+1);
SET_HDR(p, StablePtr_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)s;
return p;
}
HaskellObj
-rts_mkPtr (HsPtr a)
+rts_mkPtr (Capability *cap, HsPtr a)
{
- StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
+ StgClosure *p = (StgClosure *)allocateLocal(cap,sizeofW(StgHeader)+1);
SET_HDR(p, Ptr_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)a;
return p;
}
HaskellObj
-rts_mkFunPtr (HsFunPtr a)
+rts_mkFunPtr (Capability *cap, HsFunPtr a)
{
- StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
+ StgClosure *p = (StgClosure *)allocateLocal(cap,sizeofW(StgHeader)+1);
SET_HDR(p, FunPtr_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)a;
return p;
}
HaskellObj
-rts_mkBool (HsBool b)
+rts_mkBool (Capability *cap STG_UNUSED, HsBool b)
{
if (b) {
return (StgClosure *)True_closure;
}
HaskellObj
-rts_mkString (char *s)
+rts_mkString (Capability *cap, char *s)
{
- return rts_apply((StgClosure *)unpackCString_closure, rts_mkPtr(s));
+ return rts_apply(cap, (StgClosure *)unpackCString_closure, rts_mkPtr(cap,s));
}
HaskellObj
-rts_apply (HaskellObj f, HaskellObj arg)
+rts_apply (Capability *cap, HaskellObj f, HaskellObj arg)
{
StgThunk *ap;
- ap = (StgThunk *)allocate(sizeofW(StgThunk) + 2);
+ ap = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk) + 2);
SET_HDR(ap, (StgInfoTable *)&stg_ap_2_upd_info, CCS_SYSTEM);
ap->payload[0] = f;
ap->payload[1] = arg;
// See comment above:
// ASSERT(p->header.info == Ptr_con_info ||
// p->header.info == Ptr_static_info);
- return (void *)(p->payload[0]);
+ return (Capability *)(p->payload[0]);
}
HsFunPtr
}
}
+/* -----------------------------------------------------------------------------
+ Creating threads
+ -------------------------------------------------------------------------- */
+
+INLINE_HEADER void pushClosure (StgTSO *tso, StgWord c) {
+ tso->sp--;
+ tso->sp[0] = (W_) c;
+}
+
+StgTSO *
+createGenThread (Capability *cap, nat stack_size, StgClosure *closure)
+{
+ StgTSO *t;
+#if defined(GRAN)
+ t = createThread (cap, stack_size, NO_PRI);
+#else
+ t = createThread (cap, stack_size);
+#endif
+ pushClosure(t, (W_)closure);
+ pushClosure(t, (W_)&stg_enter_info);
+ return t;
+}
+
+StgTSO *
+createIOThread (Capability *cap, nat stack_size, StgClosure *closure)
+{
+ StgTSO *t;
+#if defined(GRAN)
+ t = createThread (cap, stack_size, NO_PRI);
+#else
+ t = createThread (cap, stack_size);
+#endif
+ pushClosure(t, (W_)&stg_noforceIO_info);
+ pushClosure(t, (W_)&stg_ap_v_info);
+ pushClosure(t, (W_)closure);
+ pushClosure(t, (W_)&stg_enter_info);
+ return t;
+}
+
+/*
+ * Same as above, but also evaluate the result of the IO action
+ * to whnf while we're at it.
+ */
+
+StgTSO *
+createStrictIOThread(Capability *cap, nat stack_size, StgClosure *closure)
+{
+ StgTSO *t;
+#if defined(GRAN)
+ t = createThread(cap, stack_size, NO_PRI);
+#else
+ t = createThread(cap, stack_size);
+#endif
+ pushClosure(t, (W_)&stg_forceIO_info);
+ pushClosure(t, (W_)&stg_ap_v_info);
+ pushClosure(t, (W_)closure);
+ pushClosure(t, (W_)&stg_enter_info);
+ return t;
+}
+
/* ----------------------------------------------------------------------------
Evaluating Haskell expressions
------------------------------------------------------------------------- */
-SchedulerStatus
-rts_eval (HaskellObj p, /*out*/HaskellObj *ret)
+
+Capability *
+rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
{
StgTSO *tso;
- Capability *cap = rtsApiCapability;
- rtsApiCapability = NULL;
-
- tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
+
+ tso = createGenThread(cap, RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,cap);
}
-SchedulerStatus
-rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
+Capability *
+rts_eval_ (Capability *cap, HaskellObj p, unsigned int stack_size,
+ /*out*/HaskellObj *ret)
{
StgTSO *tso;
- Capability *cap = rtsApiCapability;
- rtsApiCapability = NULL;
-
- tso = createGenThread(stack_size, p);
+
+ tso = createGenThread(cap, stack_size, p);
return scheduleWaitThread(tso,ret,cap);
}
* rts_evalIO() evaluates a value of the form (IO a), forcing the action's
* result to WHNF before returning.
*/
-SchedulerStatus
-rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
+Capability *
+rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
{
StgTSO* tso;
- Capability *cap = rtsApiCapability;
- rtsApiCapability = NULL;
- tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
+ tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,cap);
}
* action's result to WHNF before returning. The result is returned
* in a StablePtr.
*/
-SchedulerStatus
-rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
+Capability *
+rts_evalStableIO (Capability *cap, HsStablePtr s, /*out*/HsStablePtr *ret)
{
StgTSO* tso;
StgClosure *p, *r;
SchedulerStatus stat;
- Capability *cap = rtsApiCapability;
- rtsApiCapability = NULL;
p = (StgClosure *)deRefStablePtr(s);
- tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
- stat = scheduleWaitThread(tso,&r,cap);
+ tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
+ cap = scheduleWaitThread(tso,&r,cap);
+ stat = rts_getSchedStatus(cap);
if (stat == Success && ret != NULL) {
ASSERT(r != NULL);
*ret = getStablePtr((StgPtr)r);
}
- return stat;
+ return cap;
}
/*
* Like rts_evalIO(), but doesn't force the action's result.
*/
-SchedulerStatus
-rts_evalLazyIO (HaskellObj p, /*out*/HaskellObj *ret)
+Capability *
+rts_evalLazyIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
{
StgTSO *tso;
- Capability *cap = rtsApiCapability;
- rtsApiCapability = NULL;
- tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
+ tso = createIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,cap);
}
-SchedulerStatus
-rts_evalLazyIO_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
+Capability *
+rts_evalLazyIO_ (Capability *cap, HaskellObj p, unsigned int stack_size,
+ /*out*/HaskellObj *ret)
{
StgTSO *tso;
- Capability *cap = rtsApiCapability;
- rtsApiCapability = NULL;
- tso = createIOThread(stack_size, p);
+ tso = createIOThread(cap, stack_size, p);
return scheduleWaitThread(tso,ret,cap);
}
/* Convenience function for decoding the returned status. */
void
-rts_checkSchedStatus ( char* site, SchedulerStatus rc )
+rts_checkSchedStatus (char* site, Capability *cap)
{
+ SchedulerStatus rc = cap->running_task->stat;
switch (rc) {
case Success:
return;
}
}
-void
-rts_lock()
+SchedulerStatus
+rts_getSchedStatus (Capability *cap)
+{
+ return cap->running_task->stat;
+}
+
+Capability *
+rts_lock (void)
{
-#ifdef RTS_SUPPORTS_THREADS
+ Capability *cap;
+ Task *task;
+
+ // ToDo: get rid of this lock in the common case. We could store
+ // a free Task in thread-local storage, for example. That would
+ // leave just one lock on the path into the RTS: cap->lock when
+ // acquiring the Capability.
ACQUIRE_LOCK(&sched_mutex);
-
- // we request to get the capability immediately, in order to
- // a) stop other threads from using allocate()
- // b) wake the current worker thread from awaitEvent()
- // (so that a thread started by rts_eval* will start immediately)
- waitForReturnCapability(&sched_mutex,&rtsApiCapability);
-#else
- grabCapability(&rtsApiCapability);
-#endif
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
+
+ cap = NULL;
+ waitForReturnCapability(&cap, task);
+ return (Capability *)cap;
}
+// Exiting the RTS: we hold a Capability that is not necessarily the
+// same one that was originally returned by rts_lock(), because
+// rts_evalIO() etc. may return a new one. Now that we have
+// investigated the return value, we can release the Capability,
+// and free the Task (in that order).
+
void
-rts_unlock()
+rts_unlock (Capability *cap)
{
-#ifdef RTS_SUPPORTS_THREADS
- if (rtsApiCapability) {
- releaseCapability(rtsApiCapability);
- }
- rtsApiCapability = NULL;
- RELEASE_LOCK(&sched_mutex);
-#endif
+ Task *task;
+
+ task = cap->running_task;
+ ASSERT(task == myTask());
+
+ // slightly delicate ordering of operations below, pay attention!
+
+ // We are no longer a bound task/thread. This is important,
+ // because the GC can run when we release the Capability below,
+ // and we don't want it to treat this as a live TSO pointer.
+ task->tso = NULL;
+
+ // Now release the Capability. With the capability released, GC
+ // may happen. NB. does not try to put the current Task on the
+ // worker queue.
+ releaseCapability(cap);
+
+ // Finally, we can release the Task to the free list.
+ boundTaskExiting(task);
}
" -c<n> Auto-enable compaction of the oldest generation when live data is",
" at least <n>% of the maximum heap size set with -M (default: 30%)",
" -c Enable compaction for all major collections",
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
" -I<sec> Perform full GC after <sec> idle time (default: 0.3, 0 == off)",
#endif
"",
--- /dev/null
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2005
+ *
+ * Signal processing / handling.
+ *
+ * ---------------------------------------------------------------------------*/
+
+#ifndef RTS_SIGNALS_H
+#define RTS_SIGNALS_H
+
+#if !defined(PAR) && !defined(mingw32_HOST_OS)
+
+#define RTS_USER_SIGNALS 1
+#include "posix/Signals.h"
+
+#elif defined(mingw32_HOST_OS)
+
+#define RTS_USER_SIGNALS 1
+#include "win32/ConsoleHandler.h"
+
+#else /* PAR */
+
+#define signals_pending() (rtsFalse)
+#define handleSignalsInThisThread() /* nothing */
+
+#endif /* PAR */
+
+
+#if RTS_USER_SIGNALS
+
+/*
+ * Function: initUserSignals()
+ *
+ * Initialize the console handling substrate.
+ */
+extern void initUserSignals(void);
+
+/*
+ * Function: initDefaultHandlers()
+ *
+ * Install any default signal/console handlers. Currently we install a
+ * Ctrl+C handler that shuts down the RTS in an orderly manner.
+ */
+extern void initDefaultHandlers(void);
+
+/*
+ * Function: blockUserSignals()
+ *
+ * Temporarily block the delivery of further console events. Needed to
+ * avoid race conditions when GCing the queue of outstanding handlers or
+ * when emptying the queue by running the handlers.
+ *
+ */
+extern void blockUserSignals(void);
+
+/*
+ * Function: unblockUserSignals()
+ *
+ * The inverse of blockUserSignals(); re-enable the deliver of console events.
+ */
+extern void unblockUserSignals(void);
+
+/*
+ * Function: awaitUserSignals()
+ *
+ * Wait for the next console event. Currently a NOP (returns immediately.)
+ */
+extern void awaitUserSignals(void);
+
+/*
+ * Function: markSignalHandlers()
+ *
+ * Evacuate the handler queue. _Assumes_ that console event delivery
+ * has already been blocked.
+ */
+extern void markSignalHandlers (evac_fn evac);
+
+#endif /* RTS_USER_SIGNALS */
+
+#endif /* RTS_SIGNALS_H */
/* The initialisation stack grows downward, with sp pointing
to the last occupied word */
init_sp = INIT_STACK_BLOCKS*BLOCK_SIZE_W;
- bd = allocGroup(INIT_STACK_BLOCKS);
+ bd = allocGroup_lock(INIT_STACK_BLOCKS);
init_stack = (F_ *)bd->start;
init_stack[--init_sp] = (F_)stg_init_finish;
if (init_root != NULL) {
cap.r.rSp = (P_)(init_stack + init_sp);
StgRun((StgFunPtr)stg_init, &cap.r);
- freeGroup(bd);
+ freeGroup_lock(bd);
#if defined(PROFILING) || defined(DEBUG)
// This must be done after module initialisation.
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
* General utility functions used in the RTS.
*
// Helper functions for thread blocking and unblocking
static void park_tso(StgTSO *tso) {
- ACQUIRE_LOCK(&sched_mutex);
ASSERT(tso -> why_blocked == NotBlocked);
tso -> why_blocked = BlockedOnSTM;
tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
- RELEASE_LOCK(&sched_mutex);
TRACE("park_tso on tso=%p\n", tso);
}
-static void unpark_tso(StgTSO *tso) {
+static void unpark_tso(Capability *cap, StgTSO *tso) {
// We will continue unparking threads while they remain on one of the wait
// queues: it's up to the thread itself to remove it from the wait queues
// if it decides to do so when it is scheduled.
if (tso -> why_blocked == BlockedOnSTM) {
TRACE("unpark_tso on tso=%p\n", tso);
- ACQUIRE_LOCK(&sched_mutex);
tso -> why_blocked = NotBlocked;
- PUSH_ON_RUN_QUEUE(tso);
- RELEASE_LOCK(&sched_mutex);
+ pushOnRunQueue(cap,tso);
} else {
TRACE("spurious unpark_tso on tso=%p\n", tso);
}
}
-static void unpark_waiters_on(StgTVar *s) {
+static void unpark_waiters_on(Capability *cap, StgTVar *s) {
StgTVarWaitQueue *q;
TRACE("unpark_waiters_on tvar=%p\n", s);
for (q = s -> first_wait_queue_entry;
q != END_STM_WAIT_QUEUE;
q = q -> next_queue_entry) {
- unpark_tso(q -> waiting_tso);
+ unpark_tso(cap, q -> waiting_tso);
}
}
// Helper functions for allocation and initialization
-static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg,
+static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
StgTSO *waiting_tso) {
StgTVarWaitQueue *result;
- result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue));
+ result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
result -> waiting_tso = waiting_tso;
return result;
}
-static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) {
+static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
StgTRecChunk *result;
- result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk));
+ result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
result -> prev_chunk = END_STM_CHUNK_LIST;
result -> next_entry_idx = 0;
return result;
}
-static StgTRecHeader *new_stg_trec_header(StgRegTable *reg,
+static StgTRecHeader *new_stg_trec_header(Capability *cap,
StgTRecHeader *enclosing_trec) {
StgTRecHeader *result;
- result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader));
+ result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
result -> enclosing_trec = enclosing_trec;
- result -> current_chunk = new_stg_trec_chunk(reg);
+ result -> current_chunk = new_stg_trec_chunk(cap);
if (enclosing_trec == NO_TREC) {
result -> state = TREC_ACTIVE;
return result;
}
-static StgTVar *new_tvar(StgRegTable *reg,
+static StgTVar *new_tvar(Capability *cap,
StgClosure *new_value) {
StgTVar *result;
- result = (StgTVar *)allocateLocal(reg, sizeofW(StgTVar));
+ result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
result -> current_value = new_value;
result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
// Helper functions for managing waiting lists
-static void build_wait_queue_entries_for_trec(StgRegTable *reg,
+static void build_wait_queue_entries_for_trec(Capability *cap,
StgTSO *tso,
StgTRecHeader *trec) {
ASSERT(trec != NO_TREC);
ACQ_ASSERT(s -> current_value == trec);
NACQ_ASSERT(s -> current_value == e -> expected_value);
fq = s -> first_wait_queue_entry;
- q = new_stg_tvar_wait_queue(reg, tso);
+ q = new_stg_tvar_wait_queue(cap, tso);
q -> next_queue_entry = fq;
q -> prev_queue_entry = END_STM_WAIT_QUEUE;
if (fq != END_STM_WAIT_QUEUE) {
/*......................................................................*/
-static TRecEntry *get_new_entry(StgRegTable *reg,
+static TRecEntry *get_new_entry(Capability *cap,
StgTRecHeader *t) {
TRecEntry *result;
StgTRecChunk *c;
} else {
// Current chunk is full: allocate a fresh one
StgTRecChunk *nc;
- nc = new_stg_trec_chunk(reg);
+ nc = new_stg_trec_chunk(cap);
nc -> prev_chunk = c;
nc -> next_entry_idx = 1;
t -> current_chunk = nc;
/*......................................................................*/
-static void merge_update_into(StgRegTable *reg,
+static void merge_update_into(Capability *cap,
StgTRecHeader *t,
StgTVar *tvar,
StgClosure *expected_value,
if (!found) {
// No entry so far in this trec
TRecEntry *ne;
- ne = get_new_entry(reg, t);
+ ne = get_new_entry(cap, t);
ne -> tvar = tvar;
ne -> expected_value = expected_value;
ne -> new_value = new_value;
/*......................................................................*/
-StgTRecHeader *stmStartTransaction(StgRegTable *reg,
+StgTRecHeader *stmStartTransaction(Capability *cap,
StgTRecHeader *outer) {
StgTRecHeader *t;
TRACE("%p : stmStartTransaction\n", outer);
- t = new_stg_trec_header(reg, outer);
+ t = new_stg_trec_header(cap, outer);
TRACE("%p : stmStartTransaction()=%p\n", outer, t);
return t;
}
/*......................................................................*/
-StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
+StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
int result;
+
TRACE("%p : stmCommitTransaction()\n", trec);
ASSERT (trec != NO_TREC);
ASSERT (trec -> enclosing_trec == NO_TREC);
ACQ_ASSERT(tvar_is_locked(s, trec));
TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
- unpark_waiters_on(s);
+ unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
s -> last_update_by = trec;
});
/*......................................................................*/
-StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
+StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
StgTRecHeader *et;
int result;
ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
if (entry_is_update(e)) {
unlock_tvar(trec, s, e -> expected_value, FALSE);
}
- merge_update_into(reg, et, s, e -> expected_value, e -> new_value);
+ merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
ACQ_ASSERT(s -> current_value != trec);
});
} else {
/*......................................................................*/
-StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) {
+StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
int result;
TRACE("%p : stmWait(%p)\n", trec, tso);
ASSERT (trec != NO_TREC);
// Put ourselves to sleep. We retain locks on all the TVars involved
// until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
// in the TSO, (c) TREC_WAITING in the Trec.
- build_wait_queue_entries_for_trec(reg, tso, trec);
+ build_wait_queue_entries_for_trec(cap, tso, trec);
park_tso(tso);
trec -> state = TREC_WAITING;
/*......................................................................*/
-StgClosure *stmReadTVar(StgRegTable *reg,
+StgClosure *stmReadTVar(Capability *cap,
StgTRecHeader *trec,
StgTVar *tvar) {
StgTRecHeader *entry_in;
result = entry -> new_value;
} else {
// Entry found in another trec
- TRecEntry *new_entry = get_new_entry(reg, trec);
+ TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = entry -> expected_value;
new_entry -> new_value = entry -> new_value;
} else {
// No entry found
StgClosure *current_value = read_current_value(trec, tvar);
- TRecEntry *new_entry = get_new_entry(reg, trec);
+ TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = current_value;
new_entry -> new_value = current_value;
/*......................................................................*/
-void stmWriteTVar(StgRegTable *reg,
+void stmWriteTVar(Capability *cap,
StgTRecHeader *trec,
StgTVar *tvar,
StgClosure *new_value) {
entry -> new_value = new_value;
} else {
// Entry found in another trec
- TRecEntry *new_entry = get_new_entry(reg, trec);
+ TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = entry -> expected_value;
new_entry -> new_value = new_value;
} else {
// No entry found
StgClosure *current_value = read_current_value(trec, tvar);
- TRecEntry *new_entry = get_new_entry(reg, trec);
+ TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = current_value;
new_entry -> new_value = new_value;
/*......................................................................*/
-StgTVar *stmNewTVar(StgRegTable *reg,
+StgTVar *stmNewTVar(Capability *cap,
StgClosure *new_value) {
StgTVar *result;
- result = new_tvar(reg, new_value);
+ result = new_tvar(cap, new_value);
return result;
}
*
* ---------------------------------------------------------------------------*/
+#ifndef SANITY_H
+
#ifdef DEBUG
# if defined(PAR)
#endif /* DEBUG */
+#endif /* SANITY_H */
+
/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
- * Scheduler
- *
- * Different GHC ways use this scheduler quite differently (see comments below)
- * Here is the global picture:
- *
- * WAY Name CPP flag What's it for
- * --------------------------------------
- * mp GUM PARALLEL_HASKELL Parallel execution on a distrib. memory machine
- * s SMP SMP Parallel execution on a shared memory machine
- * mg GranSim GRAN Simulation of parallel execution
- * md GUM/GdH DIST Distributed execution (based on GUM)
+ * The scheduler and thread-related functionality
*
* --------------------------------------------------------------------------*/
-/*
- * Version with support for distributed memory parallelism aka GUM (WAY=mp):
-
- The main scheduling loop in GUM iterates until a finish message is received.
- In that case a global flag @receivedFinish@ is set and this instance of
- the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
- for the handling of incoming messages, such as PP_FINISH.
- Note that in the parallel case we have a system manager that coordinates
- different PEs, each of which are running one instance of the RTS.
- See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
- From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
-
- * Version with support for simulating parallel execution aka GranSim (WAY=mg):
-
- The main scheduling code in GranSim is quite different from that in std
- (concurrent) Haskell: while concurrent Haskell just iterates over the
- threads in the runnable queue, GranSim is event driven, i.e. it iterates
- over the events in the global event queue. -- HWL
-*/
-
#include "PosixSource.h"
#include "Rts.h"
#include "SchedAPI.h"
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
-#define COMPILING_SCHEDULER
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Interpreter.h"
#include "Exception.h"
#include "Printer.h"
-#include "Signals.h"
+#include "RtsSignals.h"
#include "Sanity.h"
#include "Stats.h"
#include "STM.h"
#endif
#include "Sparks.h"
#include "Capability.h"
-#include "Task.h"
+#include "Task.h"
+#include "AwaitEvent.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef THREADED_RTS
-#define USED_IN_THREADED_RTS
+#define USED_WHEN_THREADED_RTS
+#define USED_WHEN_NON_THREADED_RTS STG_UNUSED
#else
-#define USED_IN_THREADED_RTS STG_UNUSED
+#define USED_WHEN_THREADED_RTS STG_UNUSED
+#define USED_WHEN_NON_THREADED_RTS
#endif
-#ifdef RTS_SUPPORTS_THREADS
-#define USED_WHEN_RTS_SUPPORTS_THREADS
+#ifdef SMP
+#define USED_WHEN_SMP
#else
-#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
+#define USED_WHEN_SMP STG_UNUSED
#endif
-/* Main thread queue.
- * Locks required: sched_mutex.
- */
-StgMainThread *main_threads = NULL;
+/* -----------------------------------------------------------------------------
+ * Global variables
+ * -------------------------------------------------------------------------- */
#if defined(GRAN)
#else /* !GRAN */
-/* Thread queues.
- * Locks required: sched_mutex.
- */
-StgTSO *run_queue_hd = NULL;
-StgTSO *run_queue_tl = NULL;
+#if !defined(THREADED_RTS)
+// Blocked/sleeping thrads
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
-StgTSO *blackhole_queue = NULL;
-StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
+StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
+#endif
+/* Threads blocked on blackholes.
+ * LOCK: sched_mutex+capability, or all capabilities
+ */
+StgTSO *blackhole_queue = NULL;
#endif
/* The blackhole_queue should be checked for threads to wake up. See
* Schedule.h for more thorough comment.
+ * LOCK: none (doesn't matter if we miss an update)
*/
rtsBool blackholes_need_checking = rtsFalse;
/* Linked list of all threads.
* Used for detecting garbage collected threads.
+ * LOCK: sched_mutex+capability, or all capabilities
*/
StgTSO *all_threads = NULL;
-/* When a thread performs a safe C call (_ccall_GC, using old
- * terminology), it gets put on the suspended_ccalling_threads
- * list. Used by the garbage collector.
+/* flag set by signal handler to precipitate a context switch
+ * LOCK: none (just an advisory flag)
*/
-static StgTSO *suspended_ccalling_threads;
-
-/* KH: The following two flags are shared memory locations. There is no need
- to lock them, since they are only unset at the end of a scheduler
- operation.
-*/
-
-/* flag set by signal handler to precipitate a context switch */
int context_switch = 0;
-/* flag that tracks whether we have done any execution in this time slice. */
+/* flag that tracks whether we have done any execution in this time slice.
+ * LOCK: currently none, perhaps we should lock (but needs to be
+ * updated in the fast path of the scheduler).
+ */
nat recent_activity = ACTIVITY_YES;
-/* if this flag is set as well, give up execution */
+/* if this flag is set as well, give up execution
+ * LOCK: none (changes once, from false->true)
+ */
rtsBool interrupted = rtsFalse;
/* Next thread ID to allocate.
- * Locks required: thread_id_mutex
+ * LOCK: sched_mutex
*/
static StgThreadID next_thread_id = 1;
-/*
- * Pointers to the state of the current thread.
- * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
- * thread. If CurrentTSO == NULL, then we're at the scheduler level.
- */
-
/* The smallest stack size that makes any sense is:
* RESERVED_STACK_WORDS (so we can get back from the stack overflow)
* + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
* A thread with this stack will bomb immediately with a stack
* overflow, which will increase its stack size.
*/
-
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
-
#if defined(GRAN)
StgTSO *CurrentTSO;
#endif
* in an MT setting, needed to signal that a worker thread shouldn't hang around
* in the scheduler when it is out of work.
*/
-static rtsBool shutting_down_scheduler = rtsFalse;
+rtsBool shutting_down_scheduler = rtsFalse;
-#if defined(RTS_SUPPORTS_THREADS)
-/* ToDo: carefully document the invariants that go together
- * with these synchronisation objects.
+/*
+ * This mutex protects most of the global scheduler data in
+ * the THREADED_RTS and (inc. SMP) runtime.
*/
-Mutex sched_mutex = INIT_MUTEX_VAR;
-Mutex term_mutex = INIT_MUTEX_VAR;
-
-#endif /* RTS_SUPPORTS_THREADS */
+#if defined(THREADED_RTS)
+Mutex sched_mutex = INIT_MUTEX_VAR;
+#endif
#if defined(PARALLEL_HASKELL)
StgTSO *LastTSO;
rtsBool emitSchedule = rtsTrue;
#endif
-#if DEBUG
-static char *whatNext_strs[] = {
- "(unknown)",
- "ThreadRunGHC",
- "ThreadInterpret",
- "ThreadKilled",
- "ThreadRelocated",
- "ThreadComplete"
-};
-#endif
-
/* -----------------------------------------------------------------------------
* static function prototypes
* -------------------------------------------------------------------------- */
-#if defined(RTS_SUPPORTS_THREADS)
-static void taskStart(void);
-#endif
-
-static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
- Capability *initialCapability );
+static Capability *schedule (Capability *initialCapability, Task *task);
//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
-static void schedulePreLoop(void);
-static void scheduleStartSignalHandlers(void);
-static void scheduleCheckBlockedThreads(void);
-static void scheduleCheckBlackHoles(void);
-static void scheduleDetectDeadlock(void);
+static void schedulePreLoop (void);
+static void scheduleStartSignalHandlers (void);
+static void scheduleCheckBlockedThreads (Capability *cap);
+static void scheduleCheckBlackHoles (Capability *cap);
+static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#endif
static void schedulePostRunThread(void);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
-static void scheduleHandleStackOverflow( StgTSO *t);
-static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
+static void scheduleHandleStackOverflow( Capability *cap, Task *task,
+ StgTSO *t);
+static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
+ nat prev_what_next );
static void scheduleHandleThreadBlocked( StgTSO *t );
-static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
- Capability *cap, StgTSO *t );
+static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
+ StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(rtsBool force_major);
-
-static void unblockThread(StgTSO *tso);
-static rtsBool checkBlackHoles(void);
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
- Capability *initialCapability
- );
-static void scheduleThread_ (StgTSO* tso);
+static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
+
+static void unblockThread(Capability *cap, StgTSO *tso);
+static rtsBool checkBlackHoles(Capability *cap);
static void AllRoots(evac_fn evac);
-static StgTSO *threadStackOverflow(StgTSO *tso);
+static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
-static void raiseAsync_(StgTSO *tso, StgClosure *exception,
+static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically);
+static void deleteThread (Capability *cap, StgTSO *tso);
+static void deleteRunQueue (Capability *cap);
+
+#ifdef DEBUG
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
void printThreadQueue(StgTSO *tso);
+#endif
#if defined(PARALLEL_HASKELL)
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);
#endif
-/* ----------------------------------------------------------------------------
- * Starting Tasks
- * ------------------------------------------------------------------------- */
-
-#if defined(RTS_SUPPORTS_THREADS)
-static nat startingWorkerThread = 0;
-
-static void
-taskStart(void)
-{
- ACQUIRE_LOCK(&sched_mutex);
- startingWorkerThread--;
- schedule(NULL,NULL);
- taskStop();
- RELEASE_LOCK(&sched_mutex);
-}
-
-void
-startSchedulerTaskIfNecessary(void)
-{
- if ( !EMPTY_RUN_QUEUE()
- && !shutting_down_scheduler // not if we're shutting down
- && startingWorkerThread==0)
- {
- // we don't want to start another worker thread
- // just because the last one hasn't yet reached the
- // "waiting for capability" state
- startingWorkerThread++;
- if (!maybeStartNewWorker(taskStart)) {
- startingWorkerThread--;
- }
- }
-}
+#ifdef DEBUG
+static char *whatNext_strs[] = {
+ "(unknown)",
+ "ThreadRunGHC",
+ "ThreadInterpret",
+ "ThreadKilled",
+ "ThreadRelocated",
+ "ThreadComplete"
+};
#endif
/* -----------------------------------------------------------------------------
* -------------------------------------------------------------------------- */
STATIC_INLINE void
-addToRunQueue( StgTSO *t )
+addToRunQueue( Capability *cap, StgTSO *t )
{
#if defined(PARALLEL_HASKELL)
if (RtsFlags.ParFlags.doFairScheduling) {
// this does round-robin scheduling; good for concurrency
- APPEND_TO_RUN_QUEUE(t);
+ appendToRunQueue(cap,t);
} else {
// this does unfair scheduling; good for parallelism
- PUSH_ON_RUN_QUEUE(t);
+ pushOnRunQueue(cap,t);
}
#else
// this does round-robin scheduling; good for concurrency
- APPEND_TO_RUN_QUEUE(t);
+ appendToRunQueue(cap,t);
#endif
}
-
+
/* ---------------------------------------------------------------------------
Main scheduling loop.
* thread ends
* stack overflow
- Locking notes: we acquire the scheduler lock once at the beginning
- of the scheduler loop, and release it when
-
- * running a thread, or
- * waiting for work, or
- * waiting for a GC to complete.
-
GRAN version:
In a GranSim setup this loop iterates over the global event queue.
This revolves around the global event queue, which determines what
------------------------------------------------------------------------ */
-static void
-schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
- Capability *initialCapability )
+static Capability *
+schedule (Capability *initialCapability, Task *task)
{
StgTSO *t;
Capability *cap;
#endif
nat prev_what_next;
rtsBool ready_to_gc;
+ rtsBool first = rtsTrue;
- // Pre-condition: sched_mutex is held.
- // We might have a capability, passed in as initialCapability.
cap = initialCapability;
-#if !defined(RTS_SUPPORTS_THREADS)
- // simply initialise it in the non-threaded case
- grabCapability(&cap);
-#endif
+ // Pre-condition: this task owns initialCapability.
+ // The sched_mutex is *NOT* held
+ // NB. on return, we still hold a capability.
IF_DEBUG(scheduler,
- sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
- mainThread, initialCapability);
+ sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
+ task, initialCapability);
);
schedulePreLoop();
while (TERMINATION_CONDITION) {
+ ASSERT(cap->running_task == task);
+ ASSERT(task->cap == cap);
+ ASSERT(myTask() == task);
+
#if defined(GRAN)
/* Choose the processor with the next event */
CurrentProc = event->proc;
CurrentTSO = event->tso;
#endif
-#if defined(RTS_SUPPORTS_THREADS)
- // Yield the capability to higher-priority tasks if necessary.
- //
- if (cap != NULL) {
- yieldCapability(&cap,
- mainThread ? &mainThread->bound_thread_cond : NULL );
- }
-
- // If we do not currently hold a capability, we wait for one
- //
- if (cap == NULL) {
- waitForCapability(&sched_mutex, &cap,
- mainThread ? &mainThread->bound_thread_cond : NULL);
- }
-
- // We now have a capability...
-#endif
-
-#if 0 /* extra sanity checking */
- {
- StgMainThread *m;
- for (m = main_threads; m != NULL; m = m->link) {
- ASSERT(get_itbl(m->tso)->type == TSO);
- }
+#if defined(THREADED_RTS)
+ if (first) {
+ // don't yield the first time, we want a chance to run this
+ // thread for a bit, even if there are others banging at the
+ // door.
+ first = rtsFalse;
+ } else {
+ // Yield the capability to higher-priority tasks if necessary.
+ yieldCapability(&cap, task);
}
#endif
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
- if (cap->r.rInHaskell) {
+ if (cap->in_haskell) {
errorBelch("schedule: re-entered unsafely.\n"
" Perhaps a 'foreign import unsafe' should be 'safe'?");
- stg_exit(1);
+ stg_exit(EXIT_FAILURE);
}
//
// the threaded RTS.
//
if (interrupted) {
+ deleteRunQueue(cap);
if (shutting_down_scheduler) {
IF_DEBUG(scheduler, sched_belch("shutting down"));
- releaseCapability(cap);
- if (mainThread) {
- mainThread->stat = Interrupted;
- mainThread->ret = NULL;
+ if (task->tso) { // we are bound
+ task->stat = Interrupted;
+ task->ret = NULL;
}
- return;
+ return cap;
} else {
IF_DEBUG(scheduler, sched_belch("interrupted"));
- deleteAllThreads();
}
}
//
{
StgClosure *spark;
- if (EMPTY_RUN_QUEUE()) {
+ if (emptyRunQueue()) {
spark = findSpark(rtsFalse);
if (spark == NULL) {
break; /* no more sparks in the pool */
// During normal execution, the black hole list only gets checked
// at GC time, to avoid repeatedly traversing this possibly long
// list each time around the scheduler.
- if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
+ if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
- scheduleCheckBlockedThreads();
+ scheduleCheckBlockedThreads(cap);
- scheduleDetectDeadlock();
+ scheduleDetectDeadlock(cap,task);
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
//
// win32: might be here due to awaitEvent() being abandoned
// as a result of a console event having been delivered.
- if ( EMPTY_RUN_QUEUE() ) {
-#if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
+ if ( emptyRunQueue(cap) ) {
+#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
ASSERT(interrupted);
#endif
continue; // nothing to do
#if defined(PARALLEL_HASKELL)
scheduleSendPendingMessages();
- if (EMPTY_RUN_QUEUE() && scheduleActivateSpark())
+ if (emptyRunQueue(cap) && scheduleActivateSpark())
continue;
#if defined(SPARKS)
/* If we still have no work we need to send a FISH to get a spark
from another PE */
- if (EMPTY_RUN_QUEUE()) {
+ if (emptyRunQueue(cap)) {
if (!scheduleGetRemoteWork(&receivedFinish)) continue;
ASSERT(rtsFalse); // should not happen at the moment
}
//
// Get a thread to run
//
- ASSERT(run_queue_hd != END_TSO_QUEUE);
- POP_RUN_QUEUE(t);
+ t = popRunQueue(cap);
#if defined(GRAN) || defined(PAR)
scheduleGranParReport(); // some kind of debuging output
IF_DEBUG(sanity,checkTSO(t));
#endif
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
- StgMainThread *m = t->main;
+ Task *bound = t->bound;
- if(m)
- {
- if(m == mainThread)
- {
- IF_DEBUG(scheduler,
- sched_belch("### Running thread %d in bound thread", t->id));
- // yes, the Haskell thread is bound to the current native thread
- }
- else
- {
- IF_DEBUG(scheduler,
- sched_belch("### thread %d bound to another OS thread", t->id));
- // no, bound to a different Haskell thread: pass to that thread
- PUSH_ON_RUN_QUEUE(t);
- continue;
- }
- }
- else
- {
- if(mainThread != NULL)
- // The thread we want to run is unbound.
- {
- IF_DEBUG(scheduler,
- sched_belch("### this OS thread cannot run thread %d", t->id));
- // no, the current native thread is bound to a different
- // Haskell thread, so pass it to any worker thread
- PUSH_ON_RUN_QUEUE(t);
- continue;
+ if (bound) {
+ if (bound == task) {
+ IF_DEBUG(scheduler,
+ sched_belch("### Running thread %d in bound thread",
+ t->id));
+ // yes, the Haskell thread is bound to the current native thread
+ } else {
+ IF_DEBUG(scheduler,
+ sched_belch("### thread %d bound to another OS thread",
+ t->id));
+ // no, bound to a different Haskell thread: pass to that thread
+ pushOnRunQueue(cap,t);
+ continue;
+ }
+ } else {
+ // The thread we want to run is unbound.
+ if (task->tso) {
+ IF_DEBUG(scheduler,
+ sched_belch("### this OS thread cannot run thread %d", t->id));
+ // no, the current native thread is bound to a different
+ // Haskell thread, so pass it to any worker thread
+ pushOnRunQueue(cap,t);
+ continue;
+ }
}
- }
}
#endif
cap->r.rCurrentTSO = t;
- /* context switches are now initiated by the timer signal, unless
+ /* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
*/
- if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
- && (run_queue_hd != END_TSO_QUEUE
- || blocked_queue_hd != END_TSO_QUEUE
- || sleeping_queue != END_TSO_QUEUE)))
+ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
+ && !emptyThreadQueues(cap)) {
context_switch = 1;
-
+ }
+
run_thread:
- RELEASE_LOCK(&sched_mutex);
-
IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
(long)t->id, whatNext_strs[t->what_next]));
prev_what_next = t->what_next;
errno = t->saved_errno;
- cap->r.rInHaskell = rtsTrue;
+ cap->in_haskell = rtsTrue;
recent_activity = ACTIVITY_YES;
barf("schedule: invalid what_next field");
}
-#if defined(SMP)
// in SMP mode, we might return with a different capability than
// we started with, if the Haskell thread made a foreign call. So
// let's find out what our current Capability is:
- cap = myCapability();
-#endif
+ cap = task->cap;
- cap->r.rInHaskell = rtsFalse;
+ cap->in_haskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
// ----------------------------------------------------------------------
- /* Costs for the scheduler are assigned to CCS_SYSTEM */
+ // Costs for the scheduler are assigned to CCS_SYSTEM
#if defined(PROFILING)
stopHeapProfTimer();
CCCS = CCS_SYSTEM;
#endif
- ACQUIRE_LOCK(&sched_mutex);
-
// We have run some Haskell code: there might be blackhole-blocked
// threads to wake up now.
+ // Lock-free test here should be ok, we're just setting a flag.
if ( blackhole_queue != END_TSO_QUEUE ) {
blackholes_need_checking = rtsTrue;
}
-#if defined(RTS_SUPPORTS_THREADS)
- IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
+#if defined(THREADED_RTS)
+ IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
#elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
IF_DEBUG(scheduler,debugBelch("sched: "););
#endif
break;
case StackOverflow:
- scheduleHandleStackOverflow(t);
+ scheduleHandleStackOverflow(cap,task,t);
break;
case ThreadYielding:
- if (scheduleHandleYield(t, prev_what_next)) {
+ if (scheduleHandleYield(cap, t, prev_what_next)) {
// shortcut for switching between compiler/interpreter:
goto run_thread;
}
break;
case ThreadFinished:
- if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
+ if (scheduleHandleThreadFinished(cap, task, t)) return cap;
break;
default:
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(rtsFalse); }
+ if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
scheduleStartSignalHandlers(void)
{
-#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
- if (signals_pending()) {
- RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
- startSignalHandlers();
- ACQUIRE_LOCK(&sched_mutex);
+#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
+ if (signals_pending()) { // safe outside the lock
+ startSignalHandlers();
}
#endif
}
/* ----------------------------------------------------------------------------
* Check for blocked threads that can be woken up.
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
-scheduleCheckBlockedThreads(void)
+scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
{
+#if !defined(THREADED_RTS)
//
// Check whether any waiting threads need to be woken up. If the
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
+ if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
-#if defined(RTS_SUPPORTS_THREADS)
- // We shouldn't be here...
- barf("schedule: awaitEvent() in threaded RTS");
-#else
- awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
-#endif
+ awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
}
+#endif
}
/* ----------------------------------------------------------------------------
* Check for threads blocked on BLACKHOLEs that can be woken up
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
-scheduleCheckBlackHoles( void )
+scheduleCheckBlackHoles (Capability *cap)
{
- if ( blackholes_need_checking )
+ if ( blackholes_need_checking ) // check without the lock first
{
- checkBlackHoles();
- blackholes_need_checking = rtsFalse;
+ ACQUIRE_LOCK(&sched_mutex);
+ if ( blackholes_need_checking ) {
+ checkBlackHoles(cap);
+ blackholes_need_checking = rtsFalse;
+ }
+ RELEASE_LOCK(&sched_mutex);
}
}
/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock()
+scheduleDetectDeadlock (Capability *cap, Task *task)
{
#if defined(PARALLEL_HASKELL)
* other tasks are waiting for work, we must have a deadlock of
* some description.
*/
- if ( EMPTY_THREAD_QUEUES() )
+ if ( emptyThreadQueues(cap) )
{
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
/*
* In the threaded RTS, we only check for deadlock if there
* has been no activity in a complete timeslice. This means
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
-
- scheduleDoGC( rtsTrue/*force major GC*/ );
+ scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
recent_activity = ACTIVITY_DONE_GC;
- if ( !EMPTY_RUN_QUEUE() ) return;
+
+ if ( !emptyRunQueue(cap) ) return;
-#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
awaitUserSignals();
if (signals_pending()) {
- RELEASE_LOCK(&sched_mutex);
startSignalHandlers();
- ACQUIRE_LOCK(&sched_mutex);
}
// either we have threads to run, or we were interrupted:
- ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
+ ASSERT(!emptyRunQueue(cap) || interrupted);
}
#endif
-#if !defined(RTS_SUPPORTS_THREADS)
+#if !defined(THREADED_RTS)
/* Probably a real deadlock. Send the current main thread the
- * Deadlock exception (or in the SMP build, send *all* main
- * threads the deadlock exception, since none of them can make
- * progress).
+ * Deadlock exception.
*/
- {
- StgMainThread *m;
- m = main_threads;
- switch (m->tso->why_blocked) {
+ if (task->tso) {
+ switch (task->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
- raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
+ raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
return;
default:
barf("deadlock: main thread blocked in a strange way");
}
}
+ return;
#endif
}
}
scheduleActivateSpark(void)
{
#if defined(SPARKS)
- ASSERT(EMPTY_RUN_QUEUE());
+ ASSERT(emptyRunQueue());
/* We get here if the run queue is empty and want some work.
We try to turn a spark into a thread, and add it to the run queue,
from where it will be picked up in the next iteration of the scheduler
static rtsBool
scheduleGetRemoteWork(rtsBool *receivedFinish)
{
- ASSERT(EMPTY_RUN_QUEUE());
+ ASSERT(emptyRunQueue());
if (RtsFlags.ParFlags.BufferTime) {
IF_PAR_DEBUG(verbose,
/* ----------------------------------------------------------------------------
* After running a thread...
- * ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadHeepOverflow
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
// run queue before us and steal the large block, but in that
// case the thread will just end up requesting another large
// block.
- PUSH_ON_RUN_QUEUE(t);
+ pushOnRunQueue(cap,t);
return rtsFalse; /* not actually GC'ing */
}
}
}
#endif
- PUSH_ON_RUN_QUEUE(t);
+ pushOnRunQueue(cap,t);
return rtsTrue;
/* actual GC is done at the end of the while loop in schedule() */
}
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadStackOverflow
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
-scheduleHandleStackOverflow( StgTSO *t)
+scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
{
IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
(long)t->id, whatNext_strs[t->what_next]));
*/
{
/* enlarge the stack */
- StgTSO *new_t = threadStackOverflow(t);
+ StgTSO *new_t = threadStackOverflow(cap, t);
/* This TSO has moved, so update any pointers to it from the
* main thread stack. It better not be on any other queues...
* (it shouldn't be).
*/
- if (t->main != NULL) {
- t->main->tso = new_t;
+ if (task->tso != NULL) {
+ task->tso = new_t;
}
- PUSH_ON_RUN_QUEUE(new_t);
+ pushOnRunQueue(cap,new_t);
}
}
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadYielding
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
-scheduleHandleYield( StgTSO *t, nat prev_what_next )
+scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
{
// Reset the context switch flag. We don't do this just before
// running the thread, because that would mean we would lose ticks
#endif
- addToRunQueue(t);
+ addToRunQueue(cap,t);
#if defined(GRAN)
/* add a ContinueThread event to actually process the thread */
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadBlocked
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadFinished
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
-scheduleHandleThreadFinished( StgMainThread *mainThread
- USED_WHEN_RTS_SUPPORTS_THREADS,
- Capability *cap,
- StgTSO *t )
+scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
{
/* Need to check whether this was a main thread, and if so,
* return with the return value.
#endif // PARALLEL_HASKELL
//
- // Check whether the thread that just completed was a main
+ // Check whether the thread that just completed was a bound
// thread, and if so return with the result.
//
// There is an assumption here that all thread completion goes
// ends up in the ThreadKilled state, that it stays on the run
// queue so it can be dealt with here.
//
- if (
-#if defined(RTS_SUPPORTS_THREADS)
- mainThread != NULL
+
+ if (t->bound) {
+
+ if (t->bound != task) {
+#if !defined(THREADED_RTS)
+ // Must be a bound thread that is not the topmost one. Leave
+ // it on the run queue until the stack has unwound to the
+ // point where we can deal with this. Leaving it on the run
+ // queue also ensures that the garbage collector knows about
+ // this thread and its return value (it gets dropped from the
+ // all_threads list so there's no other way to find it).
+ appendToRunQueue(cap,t);
+ return rtsFalse;
#else
- mainThread->tso == t
+ // this cannot happen in the threaded RTS, because a
+ // bound thread can only be run by the appropriate Task.
+ barf("finished bound thread that isn't mine");
#endif
- )
- {
- // We are a bound thread: this must be our thread that just
- // completed.
- ASSERT(mainThread->tso == t);
+ }
+
+ ASSERT(task->tso == t);
if (t->what_next == ThreadComplete) {
- if (mainThread->ret) {
+ if (task->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
+ *(task->ret) = (StgClosure *)task->tso->sp[1];
}
- mainThread->stat = Success;
+ task->stat = Success;
} else {
- if (mainThread->ret) {
- *(mainThread->ret) = NULL;
+ if (task->ret) {
+ *(task->ret) = NULL;
}
if (interrupted) {
- mainThread->stat = Interrupted;
+ task->stat = Interrupted;
} else {
- mainThread->stat = Killed;
+ task->stat = Killed;
}
}
#ifdef DEBUG
- removeThreadLabel((StgWord)mainThread->tso->id);
+ removeThreadLabel((StgWord)task->tso->id);
#endif
- if (mainThread->prev == NULL) {
- ASSERT(mainThread == main_threads);
- main_threads = mainThread->link;
- } else {
- mainThread->prev->link = mainThread->link;
- }
- if (mainThread->link != NULL) {
- mainThread->link->prev = mainThread->prev;
- }
- releaseCapability(cap);
return rtsTrue; // tells schedule() to return
}
-#ifdef RTS_SUPPORTS_THREADS
- ASSERT(t->main == NULL);
-#else
- if (t->main != NULL) {
- // Must be a main thread that is not the topmost one. Leave
- // it on the run queue until the stack has unwound to the
- // point where we can deal with this. Leaving it on the run
- // queue also ensures that the garbage collector knows about
- // this thread and its return value (it gets dropped from the
- // all_threads list so there's no other way to find it).
- APPEND_TO_RUN_QUEUE(t);
- }
-#endif
return rtsFalse;
}
/* -----------------------------------------------------------------------------
* Perform a garbage collection if necessary
- * ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC( rtsBool force_major )
+scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
{
StgTSO *t;
#ifdef SMP
- Capability *cap;
- static rtsBool waiting_for_gc;
- int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
- // subtract one because we're already holding one.
- Capability *caps[n_capabilities];
+ static volatile StgWord waiting_for_gc;
+ rtsBool was_waiting;
+ nat i;
#endif
#ifdef SMP
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
- // This does mean that there will be multiple entries in the
- // thread->capability hash table for the current thread, but
- // they will be removed as normal when the capabilities are
- // released again.
- //
- // Someone else is already trying to GC
- if (waiting_for_gc) return;
- waiting_for_gc = rtsTrue;
-
- while (n_capabilities > 0) {
- IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
- waitForReturnCapability(&sched_mutex, &cap);
- n_capabilities--;
- caps[n_capabilities] = cap;
+ was_waiting = cas(&waiting_for_gc, 0, 1);
+ if (was_waiting) return;
+
+ for (i=0; i < n_capabilities; i++) {
+ IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
+ if (cap != &capabilities[i]) {
+ Capability *pcap = &capabilities[i];
+ // we better hope this task doesn't get migrated to
+ // another Capability while we're waiting for this one.
+ // It won't, because load balancing happens while we have
+ // all the Capabilities, but even so it's a slightly
+ // unsavoury invariant.
+ task->cap = pcap;
+ waitForReturnCapability(&pcap, task);
+ if (pcap != &capabilities[i]) {
+ barf("scheduleDoGC: got the wrong capability");
+ }
+ }
}
waiting_for_gc = rtsFalse;
if (!stmValidateNestOfTransactions (t -> trec)) {
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
- // strip the stack back to the ATOMICALLY_FRAME, aborting
- // the (nested) transaction, and saving the stack of any
+ // strip the stack back to the
+ // ATOMICALLY_FRAME, aborting the (nested)
+ // transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- raiseAsync_(t, NULL, rtsTrue);
+ raiseAsync_(cap, t, NULL, rtsTrue);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
// so this happens periodically:
- scheduleCheckBlackHoles();
+ scheduleCheckBlackHoles(cap);
IF_DEBUG(scheduler, printAllThreads());
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots, force_major);
#if defined(SMP)
- {
- // release our stash of capabilities.
- nat i;
- for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
- releaseCapability(caps[i]);
+ // release our stash of capabilities.
+ for (i = 0; i < n_capabilities; i++) {
+ if (cap != &capabilities[i]) {
+ task->cap = &capabilities[i];
+ releaseCapability(&capabilities[i]);
}
}
+ task->cap = cap;
#endif
#if defined(GRAN)
StgBool
rtsSupportsBoundThreads(void)
{
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
return rtsTrue;
#else
return rtsFalse;
* ------------------------------------------------------------------------- */
StgBool
-isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
+isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
{
-#if defined(RTS_SUPPORTS_THREADS)
- return (tso->main != NULL);
+#if defined(THREADED_RTS)
+ return (tso->bound != NULL);
#endif
return rtsFalse;
}
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-#ifndef mingw32_HOST_OS
+#if !defined(mingw32_HOST_OS) && !defined(SMP)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(StgTSO *tso);
+deleteThreadImmediately(Capability *cap, StgTSO *tso);
#endif
StgInt
forkProcess(HsStablePtr *entry
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
- pid_t pid;
- StgTSO* t,*next;
- StgMainThread *m;
- SchedulerStatus rc;
-
- IF_DEBUG(scheduler,sched_belch("forking!"));
- rts_lock(); // This not only acquires sched_mutex, it also
- // makes sure that no other threads are running
-
- pid = fork();
-
- if (pid) { /* parent */
-
- /* just return the pid */
- rts_unlock();
- return pid;
-
- } else { /* child */
+ pid_t pid;
+ StgTSO* t,*next;
+ Task *task;
+ Capability *cap;
+ IF_DEBUG(scheduler,sched_belch("forking!"));
- // delete all threads
- run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+ // ToDo: for SMP, we should probably acquire *all* the capabilities
+ cap = rts_lock();
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
-
- // don't allow threads to catch the ThreadKilled exception
- deleteThreadImmediately(t);
- }
+ pid = fork();
- // wipe the main thread list
- while((m = main_threads) != NULL) {
- main_threads = m->link;
-# ifdef THREADED_RTS
- closeCondition(&m->bound_thread_cond);
-# endif
- stgFree(m);
+ if (pid) { // parent
+
+ // just return the pid
+ return pid;
+
+ } else { // child
+
+ // delete all threads
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ // don't allow threads to catch the ThreadKilled exception
+ deleteThreadImmediately(cap,t);
+ }
+
+ // wipe the main thread list
+ while ((task = all_tasks) != NULL) {
+ all_tasks = task->all_link;
+ discardTask(task);
+ }
+
+ cap = rts_evalStableIO(cap, entry, NULL); // run the action
+ rts_checkSchedStatus("forkProcess",cap);
+
+ rts_unlock(cap);
+ hs_exit(); // clean up and exit
+ stg_exit(EXIT_SUCCESS);
}
-
- rc = rts_evalStableIO(entry, NULL); // run the action
- rts_checkSchedStatus("forkProcess",rc);
-
- rts_unlock();
-
- hs_exit(); // clean up and exit
- stg_exit(0);
- }
#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
- barf("forkProcess#: primop not supported, sorry!\n");
- return -1;
+ barf("forkProcess#: primop not supported on this platform, sorry!\n");
+ return -1;
#endif
}
/* ---------------------------------------------------------------------------
- * deleteAllThreads(): kill all the live threads.
- *
- * This is used when we catch a user interrupt (^C), before performing
- * any necessary cleanups and running finalizers.
- *
- * Locks: sched_mutex held.
+ * Delete the threads on the run queue of the current capability.
* ------------------------------------------------------------------------- */
-void
-deleteAllThreads ( void )
+static void
+deleteRunQueue (Capability *cap)
{
- StgTSO* t, *next;
- IF_DEBUG(scheduler,sched_belch("deleting all threads"));
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- if (t->what_next == ThreadRelocated) {
- next = t->link;
- } else {
- next = t->global_link;
- deleteThread(t);
- }
- }
-
- // The run queue now contains a bunch of ThreadKilled threads. We
- // must not throw these away: the main thread(s) will be in there
- // somewhere, and the main scheduler loop has to deal with it.
- // Also, the run queue is the only thing keeping these threads from
- // being GC'd, and we don't want the "main thread has been GC'd" panic.
-
- ASSERT(blocked_queue_hd == END_TSO_QUEUE);
- ASSERT(blackhole_queue == END_TSO_QUEUE);
- ASSERT(sleeping_queue == END_TSO_QUEUE);
+ StgTSO *t, *next;
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
+ ASSERT(t->what_next != ThreadRelocated);
+ next = t->link;
+ deleteThread(cap, t);
+ }
}
/* startThread and insertThread are now in GranSim.c -- HWL */
+/* -----------------------------------------------------------------------------
+ Managing the suspended_ccalling_tasks list.
+ Locks required: sched_mutex
+ -------------------------------------------------------------------------- */
+
+STATIC_INLINE void
+suspendTask (Capability *cap, Task *task)
+{
+ ASSERT(task->next == NULL && task->prev == NULL);
+ task->next = cap->suspended_ccalling_tasks;
+ task->prev = NULL;
+ if (cap->suspended_ccalling_tasks) {
+ cap->suspended_ccalling_tasks->prev = task;
+ }
+ cap->suspended_ccalling_tasks = task;
+}
+
+STATIC_INLINE void
+recoverSuspendedTask (Capability *cap, Task *task)
+{
+ if (task->prev) {
+ task->prev->next = task->next;
+ } else {
+ ASSERT(cap->suspended_ccalling_tasks == task);
+ cap->suspended_ccalling_tasks = task->next;
+ }
+ if (task->next) {
+ task->next->prev = task->prev;
+ }
+ task->next = task->prev = NULL;
+}
+
/* ---------------------------------------------------------------------------
* Suspending & resuming Haskell threads.
*
* on return from the C function.
* ------------------------------------------------------------------------- */
-StgInt
-suspendThread( StgRegTable *reg )
+void *
+suspendThread (StgRegTable *reg)
{
- nat tok;
Capability *cap;
int saved_errno = errno;
+ StgTSO *tso;
+ Task *task;
- /* assume that *reg is a pointer to the StgRegTable part
- * of a Capability.
+ /* assume that *reg is a pointer to the StgRegTable part of a Capability.
*/
- cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
+ cap = regTableToCapability(reg);
- ACQUIRE_LOCK(&sched_mutex);
+ task = cap->running_task;
+ tso = cap->r.rCurrentTSO;
IF_DEBUG(scheduler,
- sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
+ sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
// XXX this might not be necessary --SDM
- cap->r.rCurrentTSO->what_next = ThreadRunGHC;
+ tso->what_next = ThreadRunGHC;
- threadPaused(cap->r.rCurrentTSO);
- cap->r.rCurrentTSO->link = suspended_ccalling_threads;
- suspended_ccalling_threads = cap->r.rCurrentTSO;
+ threadPaused(tso);
- if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
- cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
- cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+ if(tso->blocked_exceptions == NULL) {
+ tso->why_blocked = BlockedOnCCall;
+ tso->blocked_exceptions = END_TSO_QUEUE;
} else {
- cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+ tso->why_blocked = BlockedOnCCall_NoUnblockExc;
}
- /* Use the thread ID as the token; it should be unique */
- tok = cap->r.rCurrentTSO->id;
+ // Hand back capability
+ task->suspended_tso = tso;
+
+ ACQUIRE_LOCK(&cap->lock);
- /* Hand back capability */
- cap->r.rInHaskell = rtsFalse;
- releaseCapability(cap);
+ suspendTask(cap,task);
+ cap->in_haskell = rtsFalse;
+ releaseCapability_(cap);
-#if defined(RTS_SUPPORTS_THREADS)
+ RELEASE_LOCK(&cap->lock);
+
+#if defined(THREADED_RTS)
/* Preparing to leave the RTS, so ensure there's a native thread/task
waiting to take over.
*/
- IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
+ IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
#endif
- RELEASE_LOCK(&sched_mutex);
-
errno = saved_errno;
- return tok;
+ return task;
}
StgRegTable *
-resumeThread( StgInt tok )
+resumeThread (void *task_)
{
- StgTSO *tso, **prev;
- Capability *cap;
- int saved_errno = errno;
-
-#if defined(RTS_SUPPORTS_THREADS)
- /* Wait for permission to re-enter the RTS with the result. */
- ACQUIRE_LOCK(&sched_mutex);
- waitForReturnCapability(&sched_mutex, &cap);
-
- IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
-#else
- grabCapability(&cap);
-#endif
-
- /* Remove the thread off of the suspended list */
- prev = &suspended_ccalling_threads;
- for (tso = suspended_ccalling_threads;
- tso != END_TSO_QUEUE;
- prev = &tso->link, tso = tso->link) {
- if (tso->id == (StgThreadID)tok) {
- *prev = tso->link;
- break;
+ StgTSO *tso;
+ Capability *cap;
+ int saved_errno = errno;
+ Task *task = task_;
+
+ cap = task->cap;
+ // Wait for permission to re-enter the RTS with the result.
+ waitForReturnCapability(&cap,task);
+ // we might be on a different capability now... but if so, our
+ // entry on the suspended_ccalling_tasks list will also have been
+ // migrated.
+
+ // Remove the thread from the suspended list
+ recoverSuspendedTask(cap,task);
+
+ tso = task->suspended_tso;
+ task->suspended_tso = NULL;
+ tso->link = END_TSO_QUEUE;
+ IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
+
+ if (tso->why_blocked == BlockedOnCCall) {
+ awakenBlockedQueue(cap,tso->blocked_exceptions);
+ tso->blocked_exceptions = NULL;
}
- }
- if (tso == END_TSO_QUEUE) {
- barf("resumeThread: thread not found");
- }
- tso->link = END_TSO_QUEUE;
-
- if(tso->why_blocked == BlockedOnCCall) {
- awakenBlockedQueueNoLock(tso->blocked_exceptions);
- tso->blocked_exceptions = NULL;
- }
-
- /* Reset blocking status */
- tso->why_blocked = NotBlocked;
+
+ /* Reset blocking status */
+ tso->why_blocked = NotBlocked;
+
+ cap->r.rCurrentTSO = tso;
+ cap->in_haskell = rtsTrue;
+ errno = saved_errno;
- cap->r.rCurrentTSO = tso;
- cap->r.rInHaskell = rtsTrue;
- RELEASE_LOCK(&sched_mutex);
- errno = saved_errno;
- return &cap->r;
+ return &cap->r;
}
/* ---------------------------------------------------------------------------
createThread(nat size, StgInt pri)
#else
StgTSO *
-createThread(nat size)
+createThread(Capability *cap, nat size)
#endif
{
StgTSO *tso;
nat stack_size;
+ /* sched_mutex is *not* required */
+
/* First check whether we should create a thread at all */
#if defined(PARALLEL_HASKELL)
- /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
- if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
- threadsIgnored++;
- debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
- RtsFlags.ParFlags.maxThreads, advisory_thread_count);
- return END_TSO_QUEUE;
- }
- threadsCreated++;
+ /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+ threadsIgnored++;
+ debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ threadsCreated++;
#endif
#if defined(GRAN)
- ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
#endif
- // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
-
- /* catch ridiculously small stack sizes */
- if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
- size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
- }
+ // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
- stack_size = size - TSO_STRUCT_SIZEW;
+ /* catch ridiculously small stack sizes */
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ }
- tso = (StgTSO *)allocate(size);
- TICK_ALLOC_TSO(stack_size, 0);
+ stack_size = size - TSO_STRUCT_SIZEW;
+
+ tso = (StgTSO *)allocateLocal(cap, size);
+ TICK_ALLOC_TSO(stack_size, 0);
- SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+ SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
#if defined(GRAN)
- SET_GRAN_HDR(tso, ThisPE);
+ SET_GRAN_HDR(tso, ThisPE);
#endif
- // Always start with the compiled code evaluator
- tso->what_next = ThreadRunGHC;
-
- tso->id = next_thread_id++;
- tso->why_blocked = NotBlocked;
- tso->blocked_exceptions = NULL;
+ // Always start with the compiled code evaluator
+ tso->what_next = ThreadRunGHC;
- tso->saved_errno = 0;
- tso->main = NULL;
-
- tso->stack_size = stack_size;
- tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- - TSO_STRUCT_SIZEW;
- tso->sp = (P_)&(tso->stack) + stack_size;
-
- tso->trec = NO_TREC;
+ tso->why_blocked = NotBlocked;
+ tso->blocked_exceptions = NULL;
+
+ tso->saved_errno = 0;
+ tso->bound = NULL;
+
+ tso->stack_size = stack_size;
+ tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
+ - TSO_STRUCT_SIZEW;
+ tso->sp = (P_)&(tso->stack) + stack_size;
+ tso->trec = NO_TREC;
+
#ifdef PROFILING
- tso->prof.CCCS = CCS_MAIN;
+ tso->prof.CCCS = CCS_MAIN;
#endif
-
+
/* put a stop frame on the stack */
- tso->sp -= sizeofW(StgStopFrame);
- SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
- tso->link = END_TSO_QUEUE;
-
+ tso->sp -= sizeofW(StgStopFrame);
+ SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+ tso->link = END_TSO_QUEUE;
+
// ToDo: check this
#if defined(GRAN)
- /* uses more flexible routine in GranSim */
- insertThread(tso, CurrentProc);
+ /* uses more flexible routine in GranSim */
+ insertThread(tso, CurrentProc);
#else
- /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
- * from its creation
- */
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+ * from its creation
+ */
#endif
-
+
#if defined(GRAN)
- if (RtsFlags.GranFlags.GranSimStats.Full)
- DumpGranEvent(GR_START,tso);
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpGranEvent(GR_START,tso);
#elif defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.ParStats.Full)
- DumpGranEvent(GR_STARTQ,tso);
- /* HACk to avoid SCHEDULE
- LastTSO = tso; */
+ if (RtsFlags.ParFlags.ParStats.Full)
+ DumpGranEvent(GR_STARTQ,tso);
+ /* HACk to avoid SCHEDULE
+ LastTSO = tso; */
#endif
-
- /* Link the new thread on the global thread list.
- */
- tso->global_link = all_threads;
- all_threads = tso;
-
+
+ /* Link the new thread on the global thread list.
+ */
+ ACQUIRE_LOCK(&sched_mutex);
+ tso->id = next_thread_id++; // while we have the mutex
+ tso->global_link = all_threads;
+ all_threads = tso;
+ RELEASE_LOCK(&sched_mutex);
+
#if defined(DIST)
- tso->dist.priority = MandatoryPriority; //by default that is...
+ tso->dist.priority = MandatoryPriority; //by default that is...
#endif
-
+
#if defined(GRAN)
- tso->gran.pri = pri;
+ tso->gran.pri = pri;
# if defined(DEBUG)
- tso->gran.magic = TSO_MAGIC; // debugging only
+ tso->gran.magic = TSO_MAGIC; // debugging only
# endif
- tso->gran.sparkname = 0;
- tso->gran.startedat = CURRENT_TIME;
- tso->gran.exported = 0;
- tso->gran.basicblocks = 0;
- tso->gran.allocs = 0;
- tso->gran.exectime = 0;
- tso->gran.fetchtime = 0;
- tso->gran.fetchcount = 0;
- tso->gran.blocktime = 0;
- tso->gran.blockcount = 0;
- tso->gran.blockedat = 0;
- tso->gran.globalsparks = 0;
- tso->gran.localsparks = 0;
- if (RtsFlags.GranFlags.Light)
- tso->gran.clock = Now; /* local clock */
- else
- tso->gran.clock = 0;
-
- IF_DEBUG(gran,printTSO(tso));
+ tso->gran.sparkname = 0;
+ tso->gran.startedat = CURRENT_TIME;
+ tso->gran.exported = 0;
+ tso->gran.basicblocks = 0;
+ tso->gran.allocs = 0;
+ tso->gran.exectime = 0;
+ tso->gran.fetchtime = 0;
+ tso->gran.fetchcount = 0;
+ tso->gran.blocktime = 0;
+ tso->gran.blockcount = 0;
+ tso->gran.blockedat = 0;
+ tso->gran.globalsparks = 0;
+ tso->gran.localsparks = 0;
+ if (RtsFlags.GranFlags.Light)
+ tso->gran.clock = Now; /* local clock */
+ else
+ tso->gran.clock = 0;
+
+ IF_DEBUG(gran,printTSO(tso));
#elif defined(PARALLEL_HASKELL)
# if defined(DEBUG)
- tso->par.magic = TSO_MAGIC; // debugging only
+ tso->par.magic = TSO_MAGIC; // debugging only
# endif
- tso->par.sparkname = 0;
- tso->par.startedat = CURRENT_TIME;
- tso->par.exported = 0;
- tso->par.basicblocks = 0;
- tso->par.allocs = 0;
- tso->par.exectime = 0;
- tso->par.fetchtime = 0;
- tso->par.fetchcount = 0;
- tso->par.blocktime = 0;
- tso->par.blockcount = 0;
- tso->par.blockedat = 0;
- tso->par.globalsparks = 0;
- tso->par.localsparks = 0;
+ tso->par.sparkname = 0;
+ tso->par.startedat = CURRENT_TIME;
+ tso->par.exported = 0;
+ tso->par.basicblocks = 0;
+ tso->par.allocs = 0;
+ tso->par.exectime = 0;
+ tso->par.fetchtime = 0;
+ tso->par.fetchcount = 0;
+ tso->par.blocktime = 0;
+ tso->par.blockcount = 0;
+ tso->par.blockedat = 0;
+ tso->par.globalsparks = 0;
+ tso->par.localsparks = 0;
#endif
-
+
#if defined(GRAN)
- globalGranStats.tot_threads_created++;
- globalGranStats.threads_created_on_PE[CurrentProc]++;
- globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
- globalGranStats.tot_sq_probes++;
+ globalGranStats.tot_threads_created++;
+ globalGranStats.threads_created_on_PE[CurrentProc]++;
+ globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
+ globalGranStats.tot_sq_probes++;
#elif defined(PARALLEL_HASKELL)
- // collect parallel global statistics (currently done together with GC stats)
- if (RtsFlags.ParFlags.ParStats.Global &&
- RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
- //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
- globalParStats.tot_threads_created++;
- }
+ // collect parallel global statistics (currently done together with GC stats)
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
+ globalParStats.tot_threads_created++;
+ }
#endif
-
+
#if defined(GRAN)
- IF_GRAN_DEBUG(pri,
- sched_belch("==__ schedule: Created TSO %d (%p);",
- CurrentProc, tso, tso->id));
+ IF_GRAN_DEBUG(pri,
+ sched_belch("==__ schedule: Created TSO %d (%p);",
+ CurrentProc, tso, tso->id));
#elif defined(PARALLEL_HASKELL)
- IF_PAR_DEBUG(verbose,
- sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
- (long)tso->id, tso, advisory_thread_count));
+ IF_PAR_DEBUG(verbose,
+ sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
+ (long)tso->id, tso, advisory_thread_count));
#else
- IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
- (long)tso->id, (long)tso->stack_size));
+ IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
+ (long)tso->id, (long)tso->stack_size));
#endif
- return tso;
+ return tso;
}
#if defined(PAR)
/* ---------------------------------------------------------------------------
* scheduleThread()
*
- * scheduleThread puts a thread on the head of the runnable queue.
+ * scheduleThread puts a thread on the end of the runnable queue.
* This will usually be done immediately after a thread is created.
* The caller of scheduleThread must create the thread using e.g.
* createThread and push an appropriate closure
* ------------------------------------------------------------------------ */
void
-scheduleThreadLocked(StgTSO *tso)
+scheduleThread(Capability *cap, StgTSO *tso)
{
- // The thread goes at the *end* of the run-queue, to avoid possible
- // starvation of any threads already on the queue.
- APPEND_TO_RUN_QUEUE(tso);
- threadRunnable();
+ // The thread goes at the *end* of the run-queue, to avoid possible
+ // starvation of any threads already on the queue.
+ appendToRunQueue(cap,tso);
}
-void
-scheduleThread(StgTSO* tso)
+Capability *
+scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
{
- ACQUIRE_LOCK(&sched_mutex);
- scheduleThreadLocked(tso);
- RELEASE_LOCK(&sched_mutex);
-}
+ Task *task;
+
+ // We already created/initialised the Task
+ task = cap->running_task;
+
+ // This TSO is now a bound thread; make the Task and TSO
+ // point to each other.
+ tso->bound = task;
+
+ task->tso = tso;
+ task->ret = ret;
+ task->stat = NoStatus;
-#if defined(RTS_SUPPORTS_THREADS)
-static Condition bound_cond_cache;
-static int bound_cond_cache_full = 0;
+ appendToRunQueue(cap,tso);
+
+ IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
+
+#if defined(GRAN)
+ /* GranSim specific init */
+ CurrentTSO = m->tso; // the TSO to run
+ procStatus[MainProc] = Busy; // status of main PE
+ CurrentProc = MainProc; // PE to run it on
#endif
+ cap = schedule(cap,task);
+
+ ASSERT(task->stat != NoStatus);
+
+ IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
+ return cap;
+}
+
+/* ----------------------------------------------------------------------------
+ * Starting Tasks
+ * ------------------------------------------------------------------------- */
-SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
- Capability *initialCapability)
+#if defined(THREADED_RTS)
+void
+workerStart(Task *task)
{
- // Precondition: sched_mutex must be held
- StgMainThread *m;
-
- m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
- m->tso = tso;
- tso->main = m;
- m->ret = ret;
- m->stat = NoStatus;
- m->link = main_threads;
- m->prev = NULL;
- if (main_threads != NULL) {
- main_threads->prev = m;
- }
- main_threads = m;
-
-#if defined(RTS_SUPPORTS_THREADS)
- // Allocating a new condition for each thread is expensive, so we
- // cache one. This is a pretty feeble hack, but it helps speed up
- // consecutive call-ins quite a bit.
- if (bound_cond_cache_full) {
- m->bound_thread_cond = bound_cond_cache;
- bound_cond_cache_full = 0;
- } else {
- initCondition(&m->bound_thread_cond);
- }
-#endif
+ Capability *cap;
- /* Put the thread on the main-threads list prior to scheduling the TSO.
- Failure to do so introduces a race condition in the MT case (as
- identified by Wolfgang Thaller), whereby the new task/OS thread
- created by scheduleThread_() would complete prior to the thread
- that spawned it managed to put 'itself' on the main-threads list.
- The upshot of it all being that the worker thread wouldn't get to
- signal the completion of the its work item for the main thread to
- see (==> it got stuck waiting.) -- sof 6/02.
- */
- IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
-
- APPEND_TO_RUN_QUEUE(tso);
- // NB. Don't call threadRunnable() here, because the thread is
- // bound and only runnable by *this* OS thread, so waking up other
- // workers will just slow things down.
+ // See startWorkerTask().
+ ACQUIRE_LOCK(&task->lock);
+ cap = task->cap;
+ RELEASE_LOCK(&task->lock);
+
+ // set the thread-local pointer to the Task:
+ taskEnter(task);
+
+ // schedule() runs without a lock.
+ cap = schedule(cap,task);
- return waitThread_(m, initialCapability);
+ // On exit from schedule(), we have a Capability.
+ releaseCapability(cap);
+ taskStop(task);
}
+#endif
/* ---------------------------------------------------------------------------
* initScheduler()
{
#if defined(GRAN)
nat i;
-
for (i=0; i<=MAX_PROC; i++) {
run_queue_hds[i] = END_TSO_QUEUE;
run_queue_tls[i] = END_TSO_QUEUE;
blackhole_queue[i] = END_TSO_QUEUE;
sleeping_queue = END_TSO_QUEUE;
}
-#else
- run_queue_hd = END_TSO_QUEUE;
- run_queue_tl = END_TSO_QUEUE;
+#elif !defined(THREADED_RTS)
blocked_queue_hd = END_TSO_QUEUE;
blocked_queue_tl = END_TSO_QUEUE;
- blackhole_queue = END_TSO_QUEUE;
sleeping_queue = END_TSO_QUEUE;
-#endif
-
- suspended_ccalling_threads = END_TSO_QUEUE;
+#endif
- main_threads = NULL;
- all_threads = END_TSO_QUEUE;
+ blackhole_queue = END_TSO_QUEUE;
+ all_threads = END_TSO_QUEUE;
context_switch = 0;
interrupted = 0;
RtsFlags.ConcFlags.ctxtSwitchTicks =
RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
/* Initialise the mutex and condition variables used by
* the scheduler. */
initMutex(&sched_mutex);
- initMutex(&term_mutex);
#endif
ACQUIRE_LOCK(&sched_mutex);
* floating around (only SMP builds have more than one).
*/
initCapabilities();
-
-#if defined(RTS_SUPPORTS_THREADS)
+
initTaskManager();
-#endif
#if defined(SMP)
- /* eagerly start some extra workers */
- startingWorkerThread = RtsFlags.ParFlags.nNodes;
- startTasks(RtsFlags.ParFlags.nNodes, taskStart);
+ /*
+ * Eagerly start one worker to run each Capability, except for
+ * Capability 0. The idea is that we're probably going to start a
+ * bound thread on Capability 0 pretty soon, so we don't want a
+ * worker task hogging it.
+ */
+ {
+ nat i;
+ Capability *cap;
+ for (i = 1; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ ACQUIRE_LOCK(&cap->lock);
+ startWorkerTask(cap, workerStart);
+ RELEASE_LOCK(&cap->lock);
+ }
+ }
#endif
#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
interrupted = rtsTrue;
shutting_down_scheduler = rtsTrue;
-#if defined(RTS_SUPPORTS_THREADS)
- if (threadIsTask(osThreadId())) { taskStop(); }
- stopTaskManager();
- //
- // What can we do here? There are a bunch of worker threads, it
- // might be nice to let them exit cleanly. There may be some main
- // threads in the run queue; we should let them return to their
- // callers with an Interrupted state. We can't in general wait
- // for all the running Tasks to stop, because some might be off in
- // a C call that is blocked.
- //
- // Letting the run queue drain is the safest thing. That lets any
- // main threads return that can return, and cleans up all the
- // runnable threads. Then we grab all the Capabilities to stop
- // anything unexpected happening while we shut down.
- //
- // ToDo: this doesn't let us get the time stats from the worker
- // tasks, because they haven't called taskStop().
- //
- ACQUIRE_LOCK(&sched_mutex);
+#if defined(THREADED_RTS)
{
+ Task *task;
nat i;
- for (i = 1000; i > 0; i--) {
- if (EMPTY_RUN_QUEUE()) {
- IF_DEBUG(scheduler, sched_belch("run queue is empty"));
- break;
- }
- IF_DEBUG(scheduler, sched_belch("yielding"));
- RELEASE_LOCK(&sched_mutex);
- prodWorker();
- yieldThread();
- ACQUIRE_LOCK(&sched_mutex);
- }
- }
-
-#ifdef SMP
- {
- Capability *cap;
- int n_capabilities = RtsFlags.ParFlags.nNodes;
- Capability *caps[n_capabilities];
- nat i;
+
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
- while (n_capabilities > 0) {
- IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities));
- waitForReturnCapability(&sched_mutex, &cap);
- n_capabilities--;
- caps[n_capabilities] = cap;
+ for (i = 0; i < n_capabilities; i++) {
+ shutdownCapability(&capabilities[i], task);
}
+ boundTaskExiting(task);
+ stopTaskManager();
}
-#else
- {
- Capability *cap;
- waitForReturnCapability(&sched_mutex, &cap);
- }
-#endif
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- Managing the per-task allocation areas.
-
- Each capability comes with an allocation area. These are
- fixed-length block lists into which allocation can be done.
-
- ToDo: no support for two-space collection at the moment???
- ------------------------------------------------------------------------- */
-
-static SchedulerStatus
-waitThread_(StgMainThread* m, Capability *initialCapability)
-{
- SchedulerStatus stat;
-
- // Precondition: sched_mutex must be held.
- IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
-
-#if defined(GRAN)
- /* GranSim specific init */
- CurrentTSO = m->tso; // the TSO to run
- procStatus[MainProc] = Busy; // status of main PE
- CurrentProc = MainProc; // PE to run it on
- schedule(m,initialCapability);
-#else
- schedule(m,initialCapability);
- ASSERT(m->stat != NoStatus);
#endif
-
- stat = m->stat;
-
-#if defined(RTS_SUPPORTS_THREADS)
- // Free the condition variable, returning it to the cache if possible.
- if (!bound_cond_cache_full) {
- bound_cond_cache = m->bound_thread_cond;
- bound_cond_cache_full = 1;
- } else {
- closeCondition(&m->bound_thread_cond);
- }
-#endif
-
- IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
- stgFree(m);
-
- // Postcondition: sched_mutex still held
- return stat;
}
/* ---------------------------------------------------------------------------
void
GetRoots( evac_fn evac )
{
-#if defined(GRAN)
- {
nat i;
+ Capability *cap;
+ Task *task;
+
+#if defined(GRAN)
for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
- if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
- evac((StgClosure **)&run_queue_hds[i]);
- if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
- evac((StgClosure **)&run_queue_tls[i]);
-
- if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
- evac((StgClosure **)&blocked_queue_hds[i]);
- if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
- evac((StgClosure **)&blocked_queue_tls[i]);
- if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
- evac((StgClosure **)&ccalling_threads[i]);
+ if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
+ evac((StgClosure **)&run_queue_hds[i]);
+ if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
+ evac((StgClosure **)&run_queue_tls[i]);
+
+ if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
+ evac((StgClosure **)&blocked_queue_hds[i]);
+ if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
+ evac((StgClosure **)&blocked_queue_tls[i]);
+ if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
+ evac((StgClosure **)&ccalling_threads[i]);
}
- }
- markEventQueue();
+ markEventQueue();
#else /* !GRAN */
- if (run_queue_hd != END_TSO_QUEUE) {
- ASSERT(run_queue_tl != END_TSO_QUEUE);
- evac((StgClosure **)&run_queue_hd);
- evac((StgClosure **)&run_queue_tl);
- }
-
- if (blocked_queue_hd != END_TSO_QUEUE) {
- ASSERT(blocked_queue_tl != END_TSO_QUEUE);
- evac((StgClosure **)&blocked_queue_hd);
- evac((StgClosure **)&blocked_queue_tl);
- }
-
- if (sleeping_queue != END_TSO_QUEUE) {
- evac((StgClosure **)&sleeping_queue);
- }
-#endif
- if (blackhole_queue != END_TSO_QUEUE) {
- evac((StgClosure **)&blackhole_queue);
- }
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ evac((StgClosure **)&cap->run_queue_hd);
+ evac((StgClosure **)&cap->run_queue_tl);
+
+ for (task = cap->suspended_ccalling_tasks; task != NULL;
+ task=task->next) {
+ evac((StgClosure **)&task->suspended_tso);
+ }
+ }
+
+#if !defined(THREADED_RTS)
+ evac((StgClosure **)&blocked_queue_hd);
+ evac((StgClosure **)&blocked_queue_tl);
+ evac((StgClosure **)&sleeping_queue);
+#endif
+#endif
- if (suspended_ccalling_threads != END_TSO_QUEUE) {
- evac((StgClosure **)&suspended_ccalling_threads);
- }
+ evac((StgClosure **)&blackhole_queue);
#if defined(PARALLEL_HASKELL) || defined(GRAN)
- markSparkQueue(evac);
+ markSparkQueue(evac);
#endif
-
+
#if defined(RTS_USER_SIGNALS)
- // mark the signal handlers (signals should be already blocked)
- markSignalHandlers(evac);
+ // mark the signal handlers (signals should be already blocked)
+ markSignalHandlers(evac);
#endif
}
void
performGC(void)
{
- /* Obligated to hold this lock upon entry */
- ACQUIRE_LOCK(&sched_mutex);
- GarbageCollect(GetRoots,rtsFalse);
- RELEASE_LOCK(&sched_mutex);
+#ifdef THREADED_RTS
+ // ToDo: we have to grab all the capabilities here.
+ errorBelch("performGC not supported in threaded RTS (yet)");
+ stg_exit(EXIT_FAILURE);
+#endif
+ /* Obligated to hold this lock upon entry */
+ GarbageCollect(GetRoots,rtsFalse);
}
void
performMajorGC(void)
{
- ACQUIRE_LOCK(&sched_mutex);
- GarbageCollect(GetRoots,rtsTrue);
- RELEASE_LOCK(&sched_mutex);
+#ifdef THREADED_RTS
+ errorBelch("performMayjorGC not supported in threaded RTS (yet)");
+ stg_exit(EXIT_FAILURE);
+#endif
+ GarbageCollect(GetRoots,rtsTrue);
}
static void
void
performGCWithRoots(void (*get_roots)(evac_fn))
{
- ACQUIRE_LOCK(&sched_mutex);
- extra_roots = get_roots;
- GarbageCollect(AllRoots,rtsFalse);
- RELEASE_LOCK(&sched_mutex);
+#ifdef THREADED_RTS
+ errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
+ stg_exit(EXIT_FAILURE);
+#endif
+ extra_roots = get_roots;
+ GarbageCollect(AllRoots,rtsFalse);
}
/* -----------------------------------------------------------------------------
-------------------------------------------------------------------------- */
static StgTSO *
-threadStackOverflow(StgTSO *tso)
+threadStackOverflow(Capability *cap, StgTSO *tso)
{
nat new_stack_size, stack_words;
lnat new_tso_size;
tso->sp+64)));
/* Send this thread the StackOverflow exception */
- raiseAsync(tso, (StgClosure *)stackOverflow_closure);
+ raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
return tso;
}
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
- IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
+ IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
TICK_ALLOC_TSO(new_stack_size,0);
DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- if (EMPTY_RUN_QUEUE())
+ if (emptyRunQueue())
emitSchedule = rtsTrue;
switch (get_itbl(node)->type) {
break;
#endif
default:
- barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
+ barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
}
}
}
#if defined(GRAN)
StgBlockingQueueElement *
-unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgTSO *tso;
PEs node_loc, tso_loc;
}
#elif defined(PARALLEL_HASKELL)
StgBlockingQueueElement *
-unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgBlockingQueueElement *next;
break;
default:
- barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
+ barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
(StgClosure *)bqe);
# endif
IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
return next;
}
+#endif
-#else /* !GRAN && !PARALLEL_HASKELL */
StgTSO *
-unblockOneLocked(StgTSO *tso)
+unblockOne(Capability *cap, StgTSO *tso)
{
StgTSO *next;
tso->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
- APPEND_TO_RUN_QUEUE(tso);
- threadRunnable();
+
+ // We might have just migrated this TSO to our Capability:
+ if (tso->bound) {
+ tso->bound->cap = cap;
+ }
+
+ appendToRunQueue(cap,tso);
+
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
return next;
}
-#endif
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-INLINE_ME StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
- ACQUIRE_LOCK(&sched_mutex);
- bqe = unblockOneLocked(bqe, node);
- RELEASE_LOCK(&sched_mutex);
- return bqe;
-}
-#else
-INLINE_ME StgTSO *
-unblockOne(StgTSO *tso)
-{
- ACQUIRE_LOCK(&sched_mutex);
- tso = unblockOneLocked(tso);
- RELEASE_LOCK(&sched_mutex);
- return tso;
-}
-#endif
#if defined(GRAN)
void
//tso = (StgTSO *)bqe; // wastes an assignment to get the type right
//tso_loc = where_is(tso);
len++;
- bqe = unblockOneLocked(bqe, node);
+ bqe = unblockOne(bqe, node);
}
/* if this is the BQ of an RBH, we have to put back the info ripped out of
{
StgBlockingQueueElement *bqe;
- ACQUIRE_LOCK(&sched_mutex);
-
IF_PAR_DEBUG(verbose,
debugBelch("##-_ AwBQ for node %p on [%x]: \n",
node, mytid));
bqe = q;
while (get_itbl(bqe)->type==TSO ||
get_itbl(bqe)->type==BLOCKED_FETCH) {
- bqe = unblockOneLocked(bqe, node);
+ bqe = unblockOne(bqe, node);
}
- RELEASE_LOCK(&sched_mutex);
}
#else /* !GRAN && !PARALLEL_HASKELL */
void
-awakenBlockedQueueNoLock(StgTSO *tso)
-{
- if (tso == NULL) return; // hack; see bug #1235728, and comments in
- // Exception.cmm
- while (tso != END_TSO_QUEUE) {
- tso = unblockOneLocked(tso);
- }
-}
-
-void
-awakenBlockedQueue(StgTSO *tso)
+awakenBlockedQueue(Capability *cap, StgTSO *tso)
{
- if (tso == NULL) return; // hack; see bug #1235728, and comments in
- // Exception.cmm
- ACQUIRE_LOCK(&sched_mutex);
- while (tso != END_TSO_QUEUE) {
- tso = unblockOneLocked(tso);
- }
- RELEASE_LOCK(&sched_mutex);
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOne(cap,tso);
+ }
}
#endif
{
interrupted = 1;
context_switch = 1;
- threadRunnable();
- /* ToDo: if invoked from a signal handler, this threadRunnable
- * only works if there's another thread (not this one) waiting to
- * be woken up.
- */
+#if defined(THREADED_RTS)
+ prodAllCapabilities();
+#endif
}
/* -----------------------------------------------------------------------------
*/
static void
-unblockThread(StgTSO *tso)
+unblockThread(Capability *cap, StgTSO *tso)
{
StgBlockingQueueElement *t, **last;
tso->link = END_TSO_QUEUE;
tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL;
- PUSH_ON_RUN_QUEUE(tso);
+ pushOnRunQueue(cap,tso);
}
#else
static void
-unblockThread(StgTSO *tso)
+unblockThread(Capability *cap, StgTSO *tso)
{
StgTSO *t, **last;
barf("unblockThread (Exception): TSO not found");
}
+#if !defined(THREADED_RTS)
case BlockedOnRead:
case BlockedOnWrite:
#if defined(mingw32_HOST_OS)
}
barf("unblockThread (delay): TSO not found");
}
+#endif
default:
barf("unblockThread");
tso->link = END_TSO_QUEUE;
tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL;
- APPEND_TO_RUN_QUEUE(tso);
+ appendToRunQueue(cap,tso);
}
#endif
* -------------------------------------------------------------------------- */
static rtsBool
-checkBlackHoles( void )
+checkBlackHoles (Capability *cap)
{
StgTSO **prev, *t;
rtsBool any_woke_up = rtsFalse;
StgHalfWord type;
+ // blackhole_queue is global:
+ ASSERT_LOCK_HELD(&sched_mutex);
+
IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
// ASSUMES: sched_mutex
type = get_itbl(t->block_info.closure)->type;
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
IF_DEBUG(sanity,checkTSO(t));
- t = unblockOneLocked(t);
+ t = unblockOne(cap, t);
+ // urk, the threads migrate to the current capability
+ // here, but we'd like to keep them on the original one.
*prev = t;
any_woke_up = rtsTrue;
} else {
* CATCH_FRAME on the stack. In either case, we strip the entire
* stack and replace the thread with a zombie.
*
- * Locks: sched_mutex held upon entry nor exit.
+ * ToDo: in SMP mode, this function is only safe if either (a) we hold
+ * all the Capabilities (eg. in GC), or (b) we own the Capability that
+ * the TSO is currently blocked on or on the run queue of.
*
* -------------------------------------------------------------------------- */
-void
-deleteThread(StgTSO *tso)
-{
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- raiseAsync(tso,NULL);
- }
-}
-
-#ifdef FORKPROCESS_PRIMOP_SUPPORTED
-static void
-deleteThreadImmediately(StgTSO *tso)
-{ // for forkProcess only:
- // delete thread without giving it a chance to catch the KillThread exception
-
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
-
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unblockThread(tso);
- }
-
- tso->what_next = ThreadKilled;
-}
-#endif
-
void
-raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
+raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
{
- /* When raising async exs from contexts where sched_mutex isn't held;
- use raiseAsyncWithLock(). */
- ACQUIRE_LOCK(&sched_mutex);
- raiseAsync(tso,exception);
- RELEASE_LOCK(&sched_mutex);
-}
-
-void
-raiseAsync(StgTSO *tso, StgClosure *exception)
-{
- raiseAsync_(tso, exception, rtsFalse);
+ raiseAsync_(cap, tso, exception, rtsFalse);
}
static void
-raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
+raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically)
{
StgRetInfoTable *info;
StgPtr sp;
sched_belch("raising exception in thread %ld.", (long)tso->id));
// Remove it from any blocking queues
- unblockThread(tso);
+ unblockThread(cap,tso);
sp = tso->sp;
// we've got an exception to raise, so let's pass it to the
// handler in this frame.
//
- raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
+ raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
TICK_ALLOC_SE_THK(1,0);
SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
raise->payload[0] = exception;
// fun field.
//
words = frame - sp - 1;
- ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
+ ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
ap->size = words;
ap->fun = (StgClosure *)sp[0];
}
/* -----------------------------------------------------------------------------
+ Deleting threads
+
+ This is used for interruption (^C) and forking, and corresponds to
+ raising an exception but without letting the thread catch the
+ exception.
+ -------------------------------------------------------------------------- */
+
+static void
+deleteThread (Capability *cap, StgTSO *tso)
+{
+ if (tso->why_blocked != BlockedOnCCall &&
+ tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ raiseAsync(cap,tso,NULL);
+ }
+}
+
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+static void
+deleteThreadImmediately(Capability *cap, StgTSO *tso)
+{ // for forkProcess only:
+ // delete thread without giving it a chance to catch the KillThread exception
+
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+
+ if (tso->why_blocked != BlockedOnCCall &&
+ tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ unblockThread(cap,tso);
+ }
+
+ tso->what_next = ThreadKilled;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
raiseExceptionHelper
This function is called by the raise# primitve, just so that we can
-------------------------------------------------------------------------- */
StgWord
-raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
+raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
{
+ Capability *cap = regTableToCapability(reg);
StgThunk *raise_closure = NULL;
StgPtr p, next;
StgRetInfoTable *info;
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
+ (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
on an MVar, or NonTermination if the thread was blocked on a Black
Hole.
- Locks: sched_mutex isn't held upon entry nor exit.
+ Locks: assumes we hold *all* the capabilities.
-------------------------------------------------------------------------- */
void
-resurrectThreads( StgTSO *threads )
+resurrectThreads (StgTSO *threads)
{
- StgTSO *tso, *next;
-
- for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
- next = tso->global_link;
- tso->global_link = all_threads;
- all_threads = tso;
- IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
+ StgTSO *tso, *next;
+ Capability *cap;
- switch (tso->why_blocked) {
- case BlockedOnMVar:
- case BlockedOnException:
- /* Called by GC - sched_mutex lock is currently held. */
- raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
- break;
- case BlockedOnBlackHole:
- raiseAsync(tso,(StgClosure *)NonTermination_closure);
- break;
- case BlockedOnSTM:
- raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
- break;
- case NotBlocked:
- /* This might happen if the thread was blocked on a black hole
- * belonging to a thread that we've just woken up (raiseAsync
- * can wake up threads, remember...).
- */
- continue;
- default:
- barf("resurrectThreads: thread blocked in a strange way");
+ for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
+ next = tso->global_link;
+ tso->global_link = all_threads;
+ all_threads = tso;
+ IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
+
+ // Wake up the thread on the Capability it was last on for a
+ // bound thread, or last_free_capability otherwise.
+ if (tso->bound) {
+ cap = tso->bound->cap;
+ } else {
+ cap = last_free_capability;
+ }
+
+ switch (tso->why_blocked) {
+ case BlockedOnMVar:
+ case BlockedOnException:
+ /* Called by GC - sched_mutex lock is currently held. */
+ raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
+ break;
+ case BlockedOnBlackHole:
+ raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
+ break;
+ case BlockedOnSTM:
+ raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
+ break;
+ case NotBlocked:
+ /* This might happen if the thread was blocked on a black hole
+ * belonging to a thread that we've just woken up (raiseAsync
+ * can wake up threads, remember...).
+ */
+ continue;
+ default:
+ barf("resurrectThreads: thread blocked in a strange way");
+ }
}
- }
}
/* ----------------------------------------------------------------------------
* at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
------------------------------------------------------------------------- */
+#if DEBUG
static void
printThreadBlockage(StgTSO *tso)
{
for (t = all_threads; t != END_TSO_QUEUE; ) {
debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
-#if defined(DEBUG)
{
void *label = lookupThreadLabel(t->id);
if (label) debugBelch("[\"%s\"] ",(char *)label);
}
-#endif
if (t->what_next == ThreadRelocated) {
debugBelch("has been relocated...\n");
t = t->link;
}
}
-#ifdef DEBUG
-
// useful from gdb
void
printThreadQueue(StgTSO *t)
static nat
run_queue_len(void)
{
- nat i;
- StgTSO *tso;
-
- for (i=0, tso=run_queue_hd;
- tso != END_TSO_QUEUE;
- i++, tso=tso->link)
- /* nothing */
-
- return i;
+ nat i;
+ StgTSO *tso;
+
+ for (i=0, tso=run_queue_hd;
+ tso != END_TSO_QUEUE;
+ i++, tso=tso->link) {
+ /* nothing */
+ }
+
+ return i;
}
#endif
void
sched_belch(char *s, ...)
{
- va_list ap;
- va_start(ap,s);
-#ifdef RTS_SUPPORTS_THREADS
- debugBelch("sched (task %p): ", osThreadId());
+ va_list ap;
+ va_start(ap,s);
+#ifdef THREADED_RTS
+ debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
#elif defined(PARALLEL_HASKELL)
- debugBelch("== ");
+ debugBelch("== ");
#else
- debugBelch("sched: ");
+ debugBelch("sched: ");
#endif
- vdebugBelch(s, ap);
- debugBelch("\n");
- va_end(ap);
+ vdebugBelch(s, ap);
+ debugBelch("\n");
+ va_end(ap);
}
#endif /* DEBUG */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team 1998-1999
+ * (c) The GHC Team 1998-2005
*
* Prototypes for functions in Schedule.c
* (RTS internal scheduler interface)
*
* -------------------------------------------------------------------------*/
-#ifndef __SCHEDULE_H__
-#define __SCHEDULE_H__
+#ifndef SCHEDULE_H
+#define SCHEDULE_H
+
#include "OSThreads.h"
+#include "Capability.h"
-/* initScheduler(), exitScheduler(), startTasks()
- *
+/* initScheduler(), exitScheduler()
* Called from STG : no
* Locks assumed : none
*/
-extern void initScheduler ( void );
-extern void exitScheduler ( void );
+void initScheduler (void);
+void exitScheduler (void);
+
+// Place a new thread on the run queue of the specified Capability
+void scheduleThread (Capability *cap, StgTSO *tso);
/* awakenBlockedQueue()
*
* Takes a pointer to the beginning of a blocked TSO queue, and
* wakes up the entire queue.
- *
* Called from STG : yes
* Locks assumed : none
*/
#elif defined(PAR)
void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
#else
-void awakenBlockedQueue (StgTSO *tso);
-void awakenBlockedQueueNoLock (StgTSO *tso);
+void awakenBlockedQueue (Capability *cap, StgTSO *tso);
#endif
-/* Version of scheduleThread that doesn't take sched_mutex */
-void scheduleThreadLocked(StgTSO *tso);
-
/* unblockOne()
*
- * Takes a pointer to the beginning of a blocked TSO queue, and
- * removes the first thread, placing it on the runnable queue.
- *
- * Called from STG : yes
- * Locks assumed : none
+ * Put the specified thread on the run queue of the given Capability.
+ * Called from STG : yes
+ * Locks assumed : we own the Capability.
*/
-#if defined(GRAN) || defined(PAR)
-StgBlockingQueueElement *unblockOne(StgBlockingQueueElement *bqe, StgClosure *node);
-StgBlockingQueueElement *unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node);
-#else
-StgTSO *unblockOne(StgTSO *tso);
-StgTSO *unblockOneLocked(StgTSO *tso);
-#endif
+StgTSO * unblockOne(Capability *cap, StgTSO *tso);
/* raiseAsync()
*
* Called from STG : yes
* Locks assumed : none
*/
-void raiseAsync(StgTSO *tso, StgClosure *exception);
-void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception);
+void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception);
/* raiseExceptionHelper */
-StgWord raiseExceptionHelper (StgTSO *tso, StgClosure *exception);
+StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
/* findRetryFrameHelper */
StgWord findRetryFrameHelper (StgTSO *tso);
-/* awaitEvent(rtsBool wait)
- *
- * Checks for blocked threads that need to be woken.
- *
- * Called from STG : NO
- * Locks assumed : sched_mutex
- */
-void awaitEvent(rtsBool wait); /* In Select.c */
-
-/* wakeUpSleepingThreads(nat ticks)
- *
- * Wakes up any sleeping threads whose timers have expired.
- *
- * Called from STG : NO
- * Locks assumed : sched_mutex
- */
-rtsBool wakeUpSleepingThreads(lnat); /* In Select.c */
-
-/* wakeBlockedWorkerThread()
- *
- * If a worker thread is currently blocked in awaitEvent(), interrupt it.
- *
- * Called from STG : NO
- * Locks assumed : sched_mutex
- */
-void wakeBlockedWorkerThread(void); /* In Select.c */
-
-/* resetWorkerWakeupPipeAfterFork()
- *
- * Notify Select.c that a fork() has occured
- *
- * Called from STG : NO
- * Locks assumed : don't care, but must be called right after fork()
- */
-void resetWorkerWakeupPipeAfterFork(void); /* In Select.c */
-
/* GetRoots(evac_fn f)
*
* Call f() for each root known to the scheduler.
*/
void GetRoots(evac_fn);
+/* workerStart()
+ *
+ * Entry point for a new worker task.
+ * Called from STG : NO
+ * Locks assumed : none
+ */
+void workerStart(Task *task);
+
// ToDo: check whether all fcts below are used in the SMP version, too
#if defined(GRAN)
void awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node);
#endif
/* Context switch flag.
- * Locks required : sched_mutex
+ * Locks required : none (conflicts are harmless)
*/
extern int RTS_VAR(context_switch);
+
+/* Interrupted flag.
+ * Locks required : none (makes one transition from false->true)
+ */
extern rtsBool RTS_VAR(interrupted);
+/* Shutdown flag.
+ * Locks required : none (makes one transition from false->true)
+ */
+extern rtsBool shutting_down_scheduler;
+
/*
* flag that tracks whether we have done any execution in this time slice.
*/
#define ACTIVITY_MAYBE_NO 1 /* no activity in the current slice */
#define ACTIVITY_INACTIVE 2 /* a complete slice has passed with no activity */
#define ACTIVITY_DONE_GC 3 /* like 2, but we've done a GC too */
-extern nat recent_activity;
-/* In Select.c */
-extern lnat RTS_VAR(timestamp);
+/* Recent activity flag.
+ * Locks required : Transition from MAYBE_NO to INACTIVE
+ * happens in the timer signal, so it is atomic. Trnasition from
+ * INACTIVE to DONE_GC happens under sched_mutex. No lock required
+ * to set it to ACTIVITY_YES.
+ */
+extern nat recent_activity;
/* Thread queues.
* Locks required : sched_mutex
#if defined(GRAN)
// run_queue_hds defined in GranSim.h
#else
-extern StgTSO *RTS_VAR(run_queue_hd), *RTS_VAR(run_queue_tl);
-extern StgTSO *RTS_VAR(blocked_queue_hd), *RTS_VAR(blocked_queue_tl);
extern StgTSO *RTS_VAR(blackhole_queue);
+#if !defined(THREADED_RTS)
+extern StgTSO *RTS_VAR(blocked_queue_hd), *RTS_VAR(blocked_queue_tl);
extern StgTSO *RTS_VAR(sleeping_queue);
#endif
-/* Linked list of all threads. */
+#endif
+
+/* Linked list of all threads.
+ * Locks required : sched_mutex
+ */
extern StgTSO *RTS_VAR(all_threads);
/* Set to rtsTrue if there are threads on the blackhole_queue, and
* This flag is set to rtsFalse after we've checked the queue, and
* set to rtsTrue just before we run some Haskell code. It is used
* to decide whether we should yield the Capability or not.
+ * Locks required : none (see scheduleCheckBlackHoles()).
*/
extern rtsBool blackholes_need_checking;
-#if defined(RTS_SUPPORTS_THREADS)
-/* Schedule.c has detailed info on what these do */
-extern Mutex RTS_VAR(sched_mutex);
-extern Condition RTS_VAR(returning_worker_cond);
-extern nat RTS_VAR(rts_n_waiting_workers);
-extern nat RTS_VAR(rts_n_waiting_tasks);
+#if defined(THREADED_RTS)
+extern Mutex RTS_VAR(sched_mutex);
#endif
StgBool isThreadBound(StgTSO *tso);
-extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
-
+SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
/* Called by shutdown_handler(). */
-void interruptStgRts ( void );
-
-void raiseAsync(StgTSO *tso, StgClosure *exception);
-nat run_queue_len(void);
+void interruptStgRts (void);
-void resurrectThreads( StgTSO * );
-
-/* Main threads:
- *
- * These are the threads which clients have requested that we run.
- *
- * In a 'threaded' build, each of these corresponds to one bound thread.
- * The pointer to the StgMainThread is passed as a parameter to schedule;
- * this invocation of schedule will always pass this main thread's
- * bound_thread_cond to waitForkWorkCapability; OS-thread-switching
- * takes place using passCapability.
- *
- * In non-threaded builds, clients are strictly nested: the first client calls
- * into the RTS, which might call out again to C with a _ccall_GC, and
- * eventually re-enter the RTS.
- *
- * This is non-abstract at the moment because the garbage collector
- * treats pointers to TSOs from the main thread list as "weak" - these
- * pointers won't prevent a thread from receiving a BlockedOnDeadMVar
- * exception.
- *
- * Main threads information is kept in a linked list:
- */
-typedef struct StgMainThread_ {
- StgTSO * tso;
- SchedulerStatus stat;
- StgClosure ** ret;
-#if defined(RTS_SUPPORTS_THREADS)
- Condition bound_thread_cond;
-#endif
- struct StgMainThread_ *prev;
- struct StgMainThread_ *link;
-} StgMainThread;
+nat run_queue_len (void);
-/* Main thread queue.
- * Locks required: sched_mutex.
- */
-extern StgMainThread *main_threads;
+void resurrectThreads (StgTSO *);
void printAllThreads(void);
-#ifdef COMPILING_SCHEDULER
-static void printThreadBlockage(StgTSO *tso);
-static void printThreadStatus(StgTSO *tso);
-#endif
+
/* debugging only
*/
#ifdef DEBUG
void labelThread(StgPtr tso, char *label);
/* -----------------------------------------------------------------------------
- * Some convenient macros...
+ * Some convenient macros/inline functions...
*/
+#if !IN_STG_CODE
+
/* END_TSO_QUEUE and friends now defined in includes/StgMiscClosures.h */
/* Add a thread to the end of the run queue.
* NOTE: tso->link should be END_TSO_QUEUE before calling this macro.
+ * ASSUMES: cap->running_task is the current task.
*/
-#define APPEND_TO_RUN_QUEUE(tso) \
- ASSERT(tso->link == END_TSO_QUEUE); \
- if (run_queue_hd == END_TSO_QUEUE) { \
- run_queue_hd = tso; \
- } else { \
- run_queue_tl->link = tso; \
- } \
- run_queue_tl = tso;
+STATIC_INLINE void
+appendToRunQueue (Capability *cap, StgTSO *tso)
+{
+ ASSERT(tso->link == END_TSO_QUEUE);
+ if (cap->run_queue_hd == END_TSO_QUEUE) {
+ cap->run_queue_hd = tso;
+ } else {
+ cap->run_queue_tl->link = tso;
+ }
+ cap->run_queue_tl = tso;
+}
/* Push a thread on the beginning of the run queue. Used for
* newly awakened threads, so they get run as soon as possible.
+ * ASSUMES: cap->running_task is the current task.
*/
-#define PUSH_ON_RUN_QUEUE(tso) \
- tso->link = run_queue_hd; \
- run_queue_hd = tso; \
- if (run_queue_tl == END_TSO_QUEUE) { \
- run_queue_tl = tso; \
+STATIC_INLINE void
+pushOnRunQueue (Capability *cap, StgTSO *tso)
+{
+ tso->link = cap->run_queue_hd;
+ cap->run_queue_hd = tso;
+ if (cap->run_queue_tl == END_TSO_QUEUE) {
+ cap->run_queue_tl = tso;
}
+}
/* Pop the first thread off the runnable queue.
*/
-#define POP_RUN_QUEUE(pt) \
- do { StgTSO *__tmp_t = run_queue_hd; \
- if (__tmp_t != END_TSO_QUEUE) { \
- run_queue_hd = __tmp_t->link; \
- __tmp_t->link = END_TSO_QUEUE; \
- if (run_queue_hd == END_TSO_QUEUE) { \
- run_queue_tl = END_TSO_QUEUE; \
- } \
- } \
- pt = __tmp_t; \
- } while(0)
+STATIC_INLINE StgTSO *
+popRunQueue (Capability *cap)
+{
+ StgTSO *t = cap->run_queue_hd;
+ ASSERT(t != END_TSO_QUEUE);
+ cap->run_queue_hd = t->link;
+ t->link = END_TSO_QUEUE;
+ if (cap->run_queue_hd == END_TSO_QUEUE) {
+ cap->run_queue_tl = END_TSO_QUEUE;
+ }
+ return t;
+}
/* Add a thread to the end of the blocked queue.
*/
-#define APPEND_TO_BLOCKED_QUEUE(tso) \
- ASSERT(tso->link == END_TSO_QUEUE); \
- if (blocked_queue_hd == END_TSO_QUEUE) { \
- blocked_queue_hd = tso; \
- } else { \
- blocked_queue_tl->link = tso; \
- } \
+#if !defined(THREADED_RTS)
+STATIC_INLINE void
+appendToBlockedQueue(StgTSO *tso)
+{
+ ASSERT(tso->link == END_TSO_QUEUE);
+ if (blocked_queue_hd == END_TSO_QUEUE) {
+ blocked_queue_hd = tso;
+ } else {
+ blocked_queue_tl->link = tso;
+ }
blocked_queue_tl = tso;
+}
+#endif
/* Check whether various thread queues are empty
*/
-#define EMPTY_QUEUE(q) (q == END_TSO_QUEUE)
-
-#define EMPTY_RUN_QUEUE() (EMPTY_QUEUE(run_queue_hd))
-#define EMPTY_BLOCKED_QUEUE() (EMPTY_QUEUE(blocked_queue_hd))
-#define EMPTY_SLEEPING_QUEUE() (EMPTY_QUEUE(sleeping_queue))
-
-#define EMPTY_THREAD_QUEUES() (EMPTY_RUN_QUEUE() && \
- EMPTY_BLOCKED_QUEUE() && \
- EMPTY_SLEEPING_QUEUE())
+STATIC_INLINE rtsBool
+emptyQueue (StgTSO *q)
+{
+ return (q == END_TSO_QUEUE);
+}
+
+STATIC_INLINE rtsBool
+emptyRunQueue(Capability *cap)
+{
+ return emptyQueue(cap->run_queue_hd);
+}
+
+#if !defined(THREADED_RTS)
+#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
+#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
+#endif
-#if defined(RTS_SUPPORTS_THREADS)
-/* If no task is waiting for a capability,
- * and if there is work to be done
- * or if we need to wait for IO or delay requests,
- * spawn a new worker thread.
- */
-void
-startSchedulerTaskIfNecessary(void);
+STATIC_INLINE rtsBool
+emptyThreadQueues(Capability *cap)
+{
+ return emptyRunQueue(cap)
+#if !defined(THREADED_RTS)
+ && EMPTY_BLOCKED_QUEUE() && EMPTY_SLEEPING_QUEUE()
#endif
+ ;
+}
#ifdef DEBUG
-extern void sched_belch(char *s, ...)
+void sched_belch(char *s, ...)
GNU_ATTRIBUTE(format (printf, 1, 2));
#endif
-#endif /* __SCHEDULE_H__ */
+#endif /* !IN_STG_CODE */
+
+#endif /* SCHEDULE_H */
+
+++ /dev/null
-/* -----------------------------------------------------------------------------
- *
- * (c) The GHC Team, 1998-1999
- *
- * Signal processing / handling.
- *
- * ---------------------------------------------------------------------------*/
-
-#if !defined(PAR) && !defined(mingw32_HOST_OS)
-#define RTS_USER_SIGNALS 1
-
-extern void initUserSignals(void);
-extern void blockUserSignals(void);
-extern void unblockUserSignals(void);
-
-extern rtsBool anyUserHandlers(void);
-
-#if !defined(RTS_SUPPORTS_THREADS)
-
-extern StgPtr pending_handler_buf[];
-extern StgPtr *next_pending_handler;
-#define signals_pending() (next_pending_handler != pending_handler_buf)
-extern void awaitUserSignals(void);
-
-#else
-
-extern void startSignalHandler(int sig);
-
-#endif
-
-/* sig_install declared in PrimOps.h */
-
-extern void startSignalHandlers(void);
-extern void markSignalHandlers (evac_fn evac);
-extern void initDefaultHandlers(void);
-
-#elif defined(mingw32_HOST_OS)
-#define RTS_USER_SIGNALS 1
-#include "win32/ConsoleHandler.h"
-
-#else /* PAR */
-#define signals_pending() (rtsFalse)
-#define handleSignalsInThisThread() /* nothing */
-
-#endif /* PAR */
*
* ---------------------------------------------------------------------------*/
+#ifndef SPARKS_H
+#define SPARKS_H
+
#if defined(GRAN)
void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res);
#endif
#endif
+
+#endif /* SPARKS_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-2004
+ * (c) The GHC Team, 1998-2005
*
* Statistics and timing-related functions.
*
#include "ParTicky.h" /* ToDo: move into Rts.h */
#include "Profiling.h"
#include "Storage.h"
-#include "Task.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
* calculate the EXIT time. The real MutUserTime is calculated
* in stat_exit below.
*/
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
MutUserTime = user;
#else
MutUserTime = user - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime;
TICK_TYPE user, elapsed;
getTimes( &user, &elapsed );
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
ExitUserTime = user - MutUserTime;
#else
ExitUserTime = user - MutUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime;
-------------------------------------------------------------------------- */
void
-stat_endGC(lnat alloc, lnat collect, lnat live, lnat copied, lnat scavd_copied, lnat gen)
+stat_endGC (lnat alloc, lnat collect, lnat live, lnat copied,
+ lnat scavd_copied, lnat gen)
{
TICK_TYPE user, elapsed;
GC_tot_time += gc_time;
GCe_tot_time += gc_etime;
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
{
- TaskInfo *task_info = taskOfId(osThreadId());
-
- if (task_info != NULL) {
- task_info->gc_time += gc_time;
- task_info->gc_etime += gc_etime;
+ Task *task;
+ if ((task = myTask()) != NULL) {
+ task->gc_time += gc_time;
+ task->gc_etime += gc_etime;
}
}
#endif
for (g = 0; g < RtsFlags.GcFlags.generations; g++)
total_collections += generations[g].collections;
- /* For SMP, we have to get the user time from each thread
+ /* For THREADED_RTS, we have to get the user time from each Task
* and try to work out the total time.
*/
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
{
- nat i;
+ Task *task;
MutUserTime = 0.0;
- for (i = 0; i < taskCount; i++) {
- MutUserTime += taskTable[i].mut_time;
+ for (task = all_tasks; task != NULL; task = task->all_link) {
+ MutUserTime += task->mut_time;
}
}
time = MutUserTime + GC_tot_time + InitUserTime + ExitUserTime;
statsPrintf("\n%11ld Mb total memory in use\n\n",
mblocks_allocated * MBLOCK_SIZE / (1024 * 1024));
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
{
nat i;
- for (i = 0; i < taskCount; i++) {
+ Task *task;
+ for (i = 0, task = all_tasks;
+ task != NULL;
+ i++, task = task->all_link) {
statsPrintf(" Task %2d %-8s : MUT time: %6.2fs (%6.2fs elapsed)\n"
" GC time: %6.2fs (%6.2fs elapsed)\n\n",
i,
- taskTable[i].is_worker ? "(worker)" : "(bound)",
- TICK_TO_DBL(taskTable[i].mut_time),
- TICK_TO_DBL(taskTable[i].mut_etime),
- TICK_TO_DBL(taskTable[i].gc_time),
- TICK_TO_DBL(taskTable[i].gc_etime));
+ (task->tso == NULL) ? "(worker)" : "(bound)",
+ TICK_TO_DBL(task->mut_time),
+ TICK_TO_DBL(task->mut_etime),
+ TICK_TO_DBL(task->gc_time),
+ TICK_TO_DBL(task->gc_etime));
}
}
#endif
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-1999
+ * (c) The GHC Team, 1998-2005
*
* Statistics and timing-related functions.
*
* ---------------------------------------------------------------------------*/
+#ifndef STATS_H
+#define STATS_H
+
+#include "Task.h"
+
extern void stat_startInit(void);
extern void stat_endInit(void);
extern void stat_startGC(void);
-extern void stat_endGC(lnat alloc, lnat collect, lnat live,
- lnat copied, lnat scavd_copied, lnat gen);
+extern void stat_endGC (lnat alloc, lnat collect, lnat live,
+ lnat copied, lnat scavd_copied, lnat gen);
#ifdef PROFILING
extern void stat_startRP(void);
extern void stat_getTimes ( long *currentElapsedTime,
long *currentUserTime,
long *elapsedGCTime );
+
+#endif /* STATS_H */
TICK_ENT_BH();
+#ifdef SMP
+ // foreign "C" debugBelch("BLACKHOLE entry\n");
+#endif
+
/* Actually this is not necessary because R1 is about to be destroyed. */
LDV_ENTER(R1);
LDV_ENTER(R1);
#if defined(SMP)
+ // foreign "C" debugBelch("BLACKHOLE entry\n");
+#endif
+
+#if defined(SMP)
foreign "C" ACQUIRE_LOCK(sched_mutex "ptr");
// released in stg_block_blackhole_finally
#endif
initMutex(&sm_mutex);
#endif
+ ACQUIRE_SM_LOCK;
+
/* allocate generation info array */
generations = (generation *)stgMallocBytes(RtsFlags.GcFlags.generations
* sizeof(struct generation_),
}
#ifdef SMP
- n_nurseries = RtsFlags.ParFlags.nNodes;
+ n_nurseries = n_capabilities;
nurseries = stgMallocBytes (n_nurseries * sizeof(struct step_),
"initStorage: nurseries");
#else
#ifdef SMP
if (RtsFlags.GcFlags.generations == 1) {
errorBelch("-G1 is incompatible with SMP");
- stg_exit(1);
+ stg_exit(EXIT_FAILURE);
}
#endif
mp_set_memory_functions(stgAllocForGMP, stgReallocForGMP, stgDeallocForGMP);
IF_DEBUG(gc, statDescribeGens());
+
+ RELEASE_SM_LOCK;
}
void
-------------------------------------------------------------------------- */
StgPtr
-allocateLocal( StgRegTable *reg, nat n )
+allocateLocal (Capability *cap, nat n)
{
bdescr *bd;
StgPtr p;
/* small allocation (<LARGE_OBJECT_THRESHOLD) */
} else {
- bd = reg->rCurrentAlloc;
+ bd = cap->r.rCurrentAlloc;
if (bd == NULL || bd->free + n > bd->start + BLOCK_SIZE_W) {
// The CurrentAlloc block is full, we need to find another
// one. First, we try taking the next block from the
// nursery:
- bd = reg->rCurrentNursery->link;
+ bd = cap->r.rCurrentNursery->link;
if (bd == NULL || bd->free + n > bd->start + BLOCK_SIZE_W) {
// The nursery is empty, or the next block is already
// full: allocate a fresh block (we can't fail here).
ACQUIRE_SM_LOCK;
bd = allocBlock();
- reg->rNursery->n_blocks++;
+ cap->r.rNursery->n_blocks++;
RELEASE_SM_LOCK;
bd->gen_no = 0;
bd->step = g0s0;
// we have a block in the nursery: take it and put
// it at the *front* of the nursery list, and use it
// to allocate() from.
- reg->rCurrentNursery->link = bd->link;
+ cap->r.rCurrentNursery->link = bd->link;
if (bd->link != NULL) {
- bd->link->u.back = reg->rCurrentNursery;
+ bd->link->u.back = cap->r.rCurrentNursery;
}
}
- dbl_link_onto(bd, ®->rNursery->blocks);
- reg->rCurrentAlloc = bd;
- IF_DEBUG(sanity, checkNurserySanity(reg->rNursery));
+ dbl_link_onto(bd, &cap->r.rNursery->blocks);
+ cap->r.rCurrentAlloc = bd;
+ IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
}
}
p = bd->free;
/* allocate and fill it in. */
#if defined(SMP)
- arr = (StgArrWords *)allocateLocal(&(myCapability()->r), total_size_in_words);
+ arr = (StgArrWords *)allocateLocal(myTask()->cap, total_size_in_words);
#else
- arr = (StgArrWords *)allocateLocal(&MainCapability.r, total_size_in_words);
+ arr = (StgArrWords *)allocateLocal(&MainCapability, total_size_in_words);
#endif
SET_ARR_HDR(arr, &stg_ARR_WORDS_info, CCCS, data_size_in_words);
* -------------------------------------------------------------------------*/
#include "Rts.h"
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Task.h"
+#include "Capability.h"
#include "Stats.h"
#include "RtsFlags.h"
#include "Schedule.h"
#include "Hash.h"
-#include "Capability.h"
#if HAVE_SIGNAL_H
#include <signal.h>
#endif
-#define INIT_TASK_TABLE_SIZE 16
-
-TaskInfo* taskTable;
-static nat taskTableSize;
-
-// maps OSThreadID to TaskInfo*
-HashTable *taskHash;
-
-nat taskCount;
+// Task lists and global counters.
+// Locks required: sched_mutex.
+Task *all_tasks = NULL;
+static Task *task_free_list = NULL; // singly-linked
+static nat taskCount;
+#define DEFAULT_MAX_WORKERS 64
+static nat maxWorkers; // we won't create more workers than this
static nat tasksRunning;
static nat workerCount;
-#define DEFAULT_MAX_WORKERS 64
-nat maxWorkers; // we won't create more workers than this
+/* -----------------------------------------------------------------------------
+ * Remembering the current thread's Task
+ * -------------------------------------------------------------------------- */
+
+// A thread-local-storage key that we can use to get access to the
+// current thread's Task structure.
+#if defined(THREADED_RTS)
+ThreadLocalKey currentTaskKey;
+#else
+Task *my_task;
+#endif
+
+/* -----------------------------------------------------------------------------
+ * Rest of the Task API
+ * -------------------------------------------------------------------------- */
void
initTaskManager (void)
static int initialized = 0;
if (!initialized) {
-#if defined(SMP)
- taskTableSize = stg_max(INIT_TASK_TABLE_SIZE,
- RtsFlags.ParFlags.nNodes * 2);
-#else
- taskTableSize = INIT_TASK_TABLE_SIZE;
-#endif
- taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
- "initTaskManager");
-
taskCount = 0;
workerCount = 0;
tasksRunning = 0;
-
- taskHash = allocHashTable();
-
maxWorkers = DEFAULT_MAX_WORKERS;
-
initialized = 1;
+#if defined(THREADED_RTS)
+ newThreadLocalKey(¤tTaskKey);
+#endif
}
}
-static void
-expandTaskTable (void)
-{
- nat i;
-
- taskTableSize *= 2;
- taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
- "expandTaskTable");
-
- /* Have to update the hash table now... */
- for (i = 0; i < taskCount; i++) {
- removeHashTable( taskHash, taskTable[i].id, NULL );
- insertHashTable( taskHash, taskTable[i].id, &taskTable[i] );
- }
-}
void
stopTaskManager (void)
}
-rtsBool
-startTasks (nat num, void (*taskStart)(void))
-{
- nat i;
- for (i = 0; i < num; i++) {
- if (!startTask(taskStart)) {
- return rtsFalse;
- }
- }
- return rtsTrue;
-}
-
-static TaskInfo*
-newTask (OSThreadId id, rtsBool is_worker)
+static Task*
+newTask (void)
{
+#if defined(THREADED_RTS)
long currentElapsedTime, currentUserTime, elapsedGCTime;
- TaskInfo *task_info;
+#endif
+ Task *task;
- if (taskCount >= taskTableSize) {
- expandTaskTable();
- }
+ task = stgMallocBytes(sizeof(Task), "newTask");
- insertHashTable( taskHash, id, &(taskTable[taskCount]) );
+ task->cap = NULL;
+ task->stopped = rtsFalse;
+ task->suspended_tso = NULL;
+ task->tso = NULL;
+ task->stat = NoStatus;
+ task->ret = NULL;
+#if defined(THREADED_RTS)
+ initCondition(&task->cond);
+ initMutex(&task->lock);
+ task->wakeup = rtsFalse;
+#endif
+
+#if defined(THREADED_RTS)
stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime);
-
- task_info = &taskTable[taskCount];
-
- task_info->id = id;
- task_info->is_worker = is_worker;
- task_info->stopped = rtsFalse;
- task_info->mut_time = 0.0;
- task_info->mut_etime = 0.0;
- task_info->gc_time = 0.0;
- task_info->gc_etime = 0.0;
- task_info->muttimestart = currentUserTime;
- task_info->elapsedtimestart = currentElapsedTime;
-
+ task->mut_time = 0.0;
+ task->mut_etime = 0.0;
+ task->gc_time = 0.0;
+ task->gc_etime = 0.0;
+ task->muttimestart = currentUserTime;
+ task->elapsedtimestart = currentElapsedTime;
+#endif
+
+ task->prev = NULL;
+ task->next = NULL;
+ task->return_link = NULL;
+
+ task->all_link = all_tasks;
+ all_tasks = task;
+
taskCount++;
workerCount++;
- tasksRunning++;
- IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks););
-
- return task_info;
+ return task;
}
-rtsBool
-startTask (void (*taskStart)(void))
+Task *
+newBoundTask (void)
{
- int r;
- OSThreadId tid;
+ Task *task;
+
+ ASSERT_LOCK_HELD(&sched_mutex);
+ if (task_free_list == NULL) {
+ task = newTask();
+ } else {
+ task = task_free_list;
+ task_free_list = task->next;
+ task->next = NULL;
+ task->prev = NULL;
+ task->stopped = rtsFalse;
+ }
+#if defined(THREADED_RTS)
+ task->id = osThreadId();
+#endif
+ ASSERT(task->cap == NULL);
- r = createOSThread(&tid,taskStart);
- if (r != 0) {
- barf("startTask: Can't create new task");
- }
- newTask (tid, rtsTrue);
- return rtsTrue;
+ tasksRunning++;
+
+ taskEnter(task);
+
+ IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
+ return task;
}
-TaskInfo *
-threadIsTask (OSThreadId id)
+void
+boundTaskExiting (Task *task)
{
- TaskInfo *task_info;
-
- task_info = lookupHashTable(taskHash, id);
- if (task_info != NULL) {
- if (task_info->stopped) {
- task_info->stopped = rtsFalse;
- }
- return task_info;
- }
+ task->stopped = rtsTrue;
+ task->cap = NULL;
+
+#if defined(THREADED_RTS)
+ ASSERT(osThreadId() == task->id);
+#endif
+ ASSERT(myTask() == task);
+ setMyTask(task->prev_stack);
- return newTask(id, rtsFalse);
+ tasksRunning--;
+
+ // sadly, we need a lock around the free task list. Todo: eliminate.
+ ACQUIRE_LOCK(&sched_mutex);
+ task->next = task_free_list;
+ task_free_list = task;
+ RELEASE_LOCK(&sched_mutex);
+
+ IF_DEBUG(scheduler,sched_belch("task exiting"));
}
-TaskInfo *
-taskOfId (OSThreadId id)
+void
+discardTask (Task *task)
{
- return lookupHashTable(taskHash, id);
+ ASSERT_LOCK_HELD(&sched_mutex);
+#if defined(THREADED_RTS)
+ closeCondition(&task->cond);
+#endif
+ task->stopped = rtsTrue;
+ task->cap = NULL;
+ task->next = task_free_list;
+ task_free_list = task;
}
void
-taskStop (void)
+taskStop (Task *task)
{
+#if defined(THREADED_RTS)
OSThreadId id;
long currentElapsedTime, currentUserTime, elapsedGCTime;
- TaskInfo *task_info;
id = osThreadId();
- task_info = taskOfId(id);
- if (task_info == NULL) {
- debugBelch("taskStop: not a task");
- return;
- }
- ASSERT(task_info->id == id);
+ ASSERT(task->id == id);
+ ASSERT(myTask() == task);
stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime);
- task_info->mut_time =
- currentUserTime - task_info->muttimestart - task_info->gc_time;
- task_info->mut_etime =
- currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime;
+ task->mut_time =
+ currentUserTime - task->muttimestart - task->gc_time;
+ task->mut_etime =
+ currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
- if (task_info->mut_time < 0.0) { task_info->mut_time = 0.0; }
- if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; }
+ if (task->mut_time < 0.0) { task->mut_time = 0.0; }
+ if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
+#endif
- task_info->stopped = rtsTrue;
+ task->stopped = rtsTrue;
tasksRunning--;
}
void
resetTaskManagerAfterFork (void)
{
- rts_n_waiting_tasks = 0;
+#warning TODO!
taskCount = 0;
}
-rtsBool
-maybeStartNewWorker (void (*taskStart)(void))
+#if defined(THREADED_RTS)
+
+void
+startWorkerTask (Capability *cap,
+ void OSThreadProcAttr (*taskStart)(Task *task))
{
- /*
- * If more than one worker thread is known to be blocked waiting
- * on thread_ready_cond, don't create a new one.
- */
- if ( rts_n_waiting_tasks > 0) {
- IF_DEBUG(scheduler,sched_belch(
- "startTask: %d tasks waiting, not creating new one",
- rts_n_waiting_tasks););
- // the task will run as soon as a capability is available,
- // so there's no need to wake it.
- return rtsFalse;
- }
-
- /* If the task limit has been reached, just return. */
- if (maxWorkers > 0 && workerCount >= maxWorkers) {
- IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers));
- return rtsFalse;
- }
-
- return startTask(taskStart);
+ int r;
+ OSThreadId tid;
+ Task *task;
+
+ if (workerCount >= maxWorkers) {
+ barf("too many workers; runaway worker creation?");
+ }
+ workerCount++;
+
+ // A worker always gets a fresh Task structure.
+ task = newTask();
+
+ tasksRunning++;
+
+ // The lock here is to synchronise with taskStart(), to make sure
+ // that we have finished setting up the Task structure before the
+ // worker thread reads it.
+ ACQUIRE_LOCK(&task->lock);
+
+ task->cap = cap;
+
+ // Give the capability directly to the worker; we can't let anyone
+ // else get in, because the new worker Task has nowhere to go to
+ // sleep so that it could be woken up again.
+ ASSERT_LOCK_HELD(&cap->lock);
+ cap->running_task = task;
+
+ r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+ if (r != 0) {
+ barf("startTask: Can't create new task");
+ }
+
+ IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
+
+ task->id = tid;
+
+ // ok, finished with the Task struct.
+ RELEASE_LOCK(&task->lock);
}
-#endif /* RTS_SUPPORTS_THREADS */
+#endif /* THREADED_RTS */
+
+#ifdef DEBUG
+
+static void *taskId(Task *task)
+{
+#ifdef THREADED_RTS
+ return (void *)task->id;
+#else
+ return (void *)task;
+#endif
+}
+
+void printAllTasks(void);
+
+void
+printAllTasks(void)
+{
+ Task *task;
+ for (task = all_tasks; task != NULL; task = task->all_link) {
+ debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
+ if (!task->stopped) {
+ if (task->cap) {
+ debugBelch("on capability %d, ", task->cap->no);
+ }
+ if (task->tso) {
+ debugBelch("bound to thread %d", task->tso->id);
+ } else {
+ debugBelch("worker");
+ }
+ }
+ debugBelch("\n");
+ }
+}
+
+#endif
+
*
* -------------------------------------------------------------------------*/
-#ifndef __TASK_H__
-#define __TASK_H__
+#ifndef TASK_H
+#define TASK_H
-/* Definition of a Task:
- *
- * A task is an OSThread that runs Haskell code. Every OSThread
- * created by the RTS for the purposes of running Haskell code is a
- * Task. We maintain information about Tasks mainly for the purposes
- * of stats gathering.
- *
- * There may exist OSThreads that run Haskell code, but which aren't
- * tasks (they don't have an associated TaskInfo structure). This
- * happens when a thread makes an in-call to Haskell: we don't want to
- * create a Task for every in-call and register stats for all these
- * threads, so it is not therefore mandatory to have a Task for every
- * thread running Haskell code.
- *
- * The SMP build lets multiple tasks concurrently execute STG code,
- * all sharing vital internal RTS data structures in a controlled manner.
- *
- * The 'threaded' build has at any one time only one task executing STG
- * code, other tasks are either busy executing code outside the RTS
- * (e.g., a C call) or waiting for their turn to (again) evaluate some
- * STG code. A task relinquishes its RTS token when it is asked to
- * evaluate an external (C) call.
- */
-
-#if defined(RTS_SUPPORTS_THREADS) /* to the end */
/*
- * Tasks evaluate Haskell code; the TaskInfo structure collects together
- * misc metadata about a task.
- */
-typedef struct _TaskInfo {
- OSThreadId id;
- rtsBool is_worker; /* rtsFalse <=> is a bound thread */
- rtsBool stopped; /* this task has stopped or exited Haskell */
- long elapsedtimestart;
- long muttimestart;
- long mut_time;
- long mut_etime;
- long gc_time;
- long gc_etime;
-} TaskInfo;
-
-extern TaskInfo *taskTable;
-extern nat taskCount;
-
-/*
- * Start and stop the task manager.
- * Requires: sched_mutex.
- */
-extern void initTaskManager (void);
-extern void stopTaskManager (void);
-
-/*
- * Two ways to start tasks: either singly or in a batch
- * Requires: sched_mutex.
- */
-extern rtsBool startTasks (nat num, void (*taskStart)(void));
-extern rtsBool startTask (void (*taskStart)(void));
-
-/*
- * Notify the task manager that a task has stopped. This is used
- * mainly for stats-gathering purposes.
- * Requires: sched_mutex.
- */
-extern void taskStop (void);
-
-/*
- * After a fork, the tasks are not carried into the child process, so
- * we must tell the task manager.
- * Requires: sched_mutex.
- */
-extern void resetTaskManagerAfterFork (void);
-
-/*
- * Tell the task manager that the current OS thread is now a task,
- * because it has entered Haskell as a bound thread.
- * Requires: sched_mutex.
- */
-extern TaskInfo* threadIsTask (OSThreadId id);
-
-/*
- * Get the TaskInfo structure corresponding to an OSThread. Returns
- * NULL if the thread is not a task.
- * Requires: sched_mutex.
- */
-extern TaskInfo* taskOfId (OSThreadId id);
-
-/*
- * Decides whether to call startTask() or not, based on how many
- * workers are already running and waiting for work. Returns
- * rtsTrue if a worker was created.
- * Requires: sched_mutex.
- */
-extern rtsBool maybeStartNewWorker (void (*taskStart)(void));
-
-#endif /* RTS_SUPPORTS_THREADS */
-#endif /* __TASK_H__ */
+ Definition of a Task
+ --------------------
+
+ A task is an OSThread that runs Haskell code. Every OSThread
+ created by the RTS for the purposes of running Haskell code is a
+ Task, and OS threads that enter the Haskell RTS for the purposes of
+ making a call-in are also Tasks.
+
+ The relationship between the number of tasks and capabilities, and
+ the runtime build (-threaded, -smp etc.) is summarised by the
+ following table:
+
+ build Tasks Capabilities
+ ---------------------------------
+ normal 1 1
+ -threaded N 1
+ -smp N N
+
+ The non-threaded build has a single Task and a single global
+ Capability.
+
+ The 'threaded' build has multiple Tasks, but a single Capability.
+ At any one time only one task executing STG code, other tasks are
+ either busy executing code outside the RTS (e.g., a C call) or
+ waiting for their turn to (again) evaluate some STG code. A task
+ relinquishes its RTS token when it is asked to evaluate an external
+ (C) call.
+
+ The SMP build allows multiple tasks and mulitple Capabilities.
+ Multiple Tasks may all be running Haskell code simultaneously.
+
+ In general, there may be multiple Tasks for an OS thread. This
+ happens if one Task makes a foreign call from Haskell, and
+ subsequently calls back in to create a new bound thread.
+
+ A particular Task structure can belong to more than one OS thread
+ over its lifetime. This is to avoid creating an unbounded number
+ of Task structures. The stats just accumulate.
+
+ Ownership of Task
+ -----------------
+
+ The OS thread named in the Task structure has exclusive access to
+ the structure, as long as it is the running_task of its Capability.
+ That is, if (task->cap->running_task == task), then task->id owns
+ the Task. Otherwise the Task is owned by the owner of the parent
+ data structure on which it is sleeping; for example, if the task is
+ sleeping on spare_workers field of a Capability, then the owner of the
+ Capability has access to the Task.
+
+ When a task is migrated from sleeping on one Capability to another,
+ its task->cap field must be modified. When the task wakes up, it
+ will read the new value of task->cap to find out which Capability
+ it belongs to. Hence some synchronisation is required on
+ task->cap, and this is why we have task->lock.
+
+ If the Task is not currently owned by task->id, then the thread is
+ either
+
+ (a) waiting on the condition task->cond. The Task is either
+ (1) a bound Task, the TSO will be on a queue somewhere
+ (2) a worker task, on the spare_workers queue of task->cap.
+
+ (b) making a foreign call. The Task will be on the
+ suspended_ccalling_tasks list.
+
+ We re-establish ownership in each case by respectively
+
+ (a) the task is currently blocked in yieldCapability().
+ This call will return when we have ownership of the Task and
+ a Capability. The Capability we get might not be the same
+ as the one we had when we called yieldCapability().
+
+ (b) we must call resumeThread(task), which will safely establish
+ ownership of the Task and a Capability.
+*/
+
+typedef struct Task_ {
+#if defined(THREADED_RTS)
+ OSThreadId id; // The OS Thread ID of this task
+#endif
+
+ // This points to the Capability that the Task "belongs" to. If
+ // the Task owns a Capability, then task->cap points to it. If
+ // the task does not own a Capability, then either (a) if the task
+ // is a worker, then task->cap points to the Capability it belongs
+ // to, or (b) it is returning from a foreign call, then task->cap
+ // points to the Capability with the returning_worker queue that this
+ // this Task is on.
+ //
+ // When a task goes to sleep, it may be migrated to a different
+ // Capability. Hence, we always check task->cap on wakeup. To
+ // syncrhonise between the migrater and the migratee, task->lock
+ // must be held when modifying task->cap.
+ struct Capability_ *cap;
+
+ rtsBool stopped; // this task has stopped or exited Haskell
+ StgTSO * suspended_tso; // the TSO is stashed here when we
+ // make a foreign call (NULL otherwise);
+
+ // The following 3 fields are used by bound threads:
+ StgTSO * tso; // the bound TSO (or NULL)
+ SchedulerStatus stat; // return status
+ StgClosure ** ret; // return value
+
+#if defined(THREADED_RTS)
+ Condition cond; // used for sleeping & waking up this task
+ Mutex lock; // lock for the condition variable
+
+ // this flag tells the task whether it should wait on task->cond
+ // or just continue immediately. It's a workaround for the fact
+ // that signalling a condition variable doesn't do anything if the
+ // thread is already running, but we want it to be sticky.
+ rtsBool wakeup;
+#endif
+
+ // Stats that we collect about this task
+ // ToDo: we probably want to put this in a separate TaskStats
+ // structure, so we can share it between multiple Tasks. We don't
+ // really want separate stats for each call in a nested chain of
+ // foreign->haskell->foreign->haskell calls, but we'll get a
+ // separate Task for each of the haskell calls.
+ long elapsedtimestart;
+ long muttimestart;
+ long mut_time;
+ long mut_etime;
+ long gc_time;
+ long gc_etime;
+
+ // Links tasks onto various lists. (ToDo: do we need double
+ // linking now?)
+ struct Task_ *prev;
+ struct Task_ *next;
+
+ // Links tasks on the returning_tasks queue of a Capability.
+ struct Task_ *return_link;
+
+ // Links tasks on the all_tasks list
+ struct Task_ *all_link;
+
+ // When a Haskell thread makes a foreign call that re-enters
+ // Haskell, we end up with another Task associated with the
+ // current thread. We have to remember the whole stack of Tasks
+ // associated with the current thread so that we can correctly
+ // save & restore the thread-local current task pointer.
+ struct Task_ *prev_stack;
+} Task;
+
+INLINE_HEADER rtsBool
+isBoundTask (Task *task)
+{
+ return (task->tso != NULL);
+}
+
+
+// Linked list of all tasks.
+//
+extern Task *all_tasks;
+
+// Start and stop the task manager.
+// Requires: sched_mutex.
+//
+void initTaskManager (void);
+void stopTaskManager (void);
+
+// Create a new Task for a bound thread
+// Requires: sched_mutex.
+//
+Task *newBoundTask (void);
+
+// The current task is a bound task that is exiting.
+// Requires: sched_mutex.
+//
+void boundTaskExiting (Task *task);
+
+// This must be called when a new Task is associated with the current
+// thread. It sets up the thread-local current task pointer so that
+// myTask() can work.
+INLINE_HEADER void taskEnter (Task *task);
+
+// Notify the task manager that a task has stopped. This is used
+// mainly for stats-gathering purposes.
+// Requires: sched_mutex.
+//
+void taskStop (Task *task);
+
+// Put the task back on the free list, mark it stopped. Used by
+// forkProcess().
+//
+void discardTask (Task *task);
+
+// Get the Task associated with the current OS thread (or NULL if none).
+//
+INLINE_HEADER Task *myTask (void);
+
+// After a fork, the tasks are not carried into the child process, so
+// we must tell the task manager.
+// Requires: sched_mutex.
+//
+void resetTaskManagerAfterFork (void);
+
+#if defined(THREADED_RTS)
+
+// Workers are attached to the supplied Capability. This Capability
+// should not currently have a running_task, because the new task
+// will become the running_task for that Capability.
+// Requires: sched_mutex.
+//
+void startWorkerTask (struct Capability_ *cap,
+ void OSThreadProcAttr (*taskStart)(Task *task));
+
+#endif /* THREADED_RTS */
+
+// -----------------------------------------------------------------------------
+// INLINE functions... private from here on down:
+
+// A thread-local-storage key that we can use to get access to the
+// current thread's Task structure.
+#if defined(THREADED_RTS)
+extern ThreadLocalKey currentTaskKey;
+#else
+extern Task *my_task;
+#endif
+
+//
+// myTask() uses thread-local storage to find the Task associated with
+// the current OS thread. If the current OS thread has multiple
+// Tasks, because it has re-entered the RTS, then the task->prev_stack
+// field is used to store the previous Task.
+//
+INLINE_HEADER Task *
+myTask (void)
+{
+#if defined(THREADED_RTS)
+ return getThreadLocalVar(¤tTaskKey);
+#else
+ return my_task;
+#endif
+}
+
+INLINE_HEADER void
+setMyTask (Task *task)
+{
+#if defined(THREADED_RTS)
+ setThreadLocalVar(¤tTaskKey,task);
+#else
+ my_task = task;
+#endif
+}
+
+// This must be called when a new Task is associated with the current
+// thread. It sets up the thread-local current task pointer so that
+// myTask() can work.
+INLINE_HEADER void
+taskEnter (Task *task)
+{
+ // save the current value, just in case this Task has been created
+ // as a result of re-entering the RTS (defaults to NULL):
+ task->prev_stack = myTask();
+ setMyTask(task);
+}
+
+#endif /* TASK_H */
--- /dev/null
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 2005
+ *
+ * Ticker interface (implementation is OS-specific)
+ *
+ * ---------------------------------------------------------------------------*/
+
+#ifndef TICKER_H
+#define TICKER_H
+
+extern int startTicker( nat ms, TickProc handle_tick );
+extern int stopTicker ( void );
+
+#endif /* TICKER_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1995-2003
+ * (c) The GHC Team, 1995-2005
*
* Interval timer service for profiling and pre-emptive scheduling.
*
#include "Proftimer.h"
#include "Schedule.h"
#include "Timer.h"
+#include "Ticker.h"
#include "Capability.h"
#if !defined(mingw32_HOST_OS)
/* ticks left before next pre-emptive context switch */
static int ticks_to_ctxt_switch = 0;
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
/* idle ticks left before we perform a GC */
static int ticks_to_gc = 0;
#endif
ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks;
context_switch = 1; /* schedule a context switch */
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
/*
* If we've been inactive for idleGCDelayTicks (set by +RTS
* -I), tell the scheduler to wake up and do a GC, to check
recent_activity = ACTIVITY_INACTIVE;
blackholes_need_checking = rtsTrue;
/* hack: re-use the blackholes_need_checking flag */
- threadRunnable();
- /* ToDo: this threadRunnable only works if there's
- * another thread (not this one) waiting to be woken up
+
+ /* ToDo: this doesn't work. Can't invoke
+ * pthread_cond_signal from a signal handler.
+ * Furthermore, we can't prod a capability that we
+ * might be holding. What can we do?
*/
+ prodOneCapability();
}
break;
default:
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1995-2003
+ * (c) The GHC Team, 1995-2005
*
* Interval timer service for profiling and pre-emptive scheduling.
*
* ---------------------------------------------------------------------------*/
-#ifndef __TIMER_H__
-#define __TIMER_H__
+
+#ifndef TIMER_H
+#define TIMER_H
# define TICK_MILLISECS (1000/TICK_FREQUENCY) /* ms per tick */
extern int startTimer(nat ms);
extern int stopTimer(void);
-#endif /* __TIMER_H__ */
+
+#endif /* TIMER_H */
*/
void
-scheduleFinalizers(StgWeak *list)
+scheduleFinalizers(Capability *cap, StgWeak *list)
{
StgWeak *w;
StgTSO *t;
IF_DEBUG(weak,debugBelch("weak: batching %d finalizers\n", n));
- arr = (StgMutArrPtrs *)allocate(sizeofW(StgMutArrPtrs) + n);
+ arr = (StgMutArrPtrs *)allocateLocal(cap, sizeofW(StgMutArrPtrs) + n);
TICK_ALLOC_PRIM(sizeofW(StgMutArrPtrs), n, 0);
SET_HDR(arr, &stg_MUT_ARR_PTRS_FROZEN_info, CCS_SYSTEM);
arr->ptrs = n;
}
}
- t = createIOThread(RtsFlags.GcFlags.initialStkSize,
- rts_apply(
- rts_apply(
+ t = createIOThread(cap,
+ RtsFlags.GcFlags.initialStkSize,
+ rts_apply(cap,
+ rts_apply(cap,
(StgClosure *)runFinalizerBatch_closure,
- rts_mkInt(n)),
+ rts_mkInt(cap,n)),
(StgClosure *)arr)
);
- scheduleThread(t);
+ scheduleThread(cap,t);
}
#ifndef WEAK_H
#define WEAK_H
-void scheduleFinalizers(StgWeak *w);
+#include "Capability.h"
+
+void scheduleFinalizers(Capability *cap, StgWeak *w);
void markWeakList(void);
#endif
#include "Rts.h"
#include "RtsFlags.h"
#include "Timer.h"
-#include "Itimer.h"
+#include "Ticker.h"
+#include "posix/Itimer.h"
#include "Proftimer.h"
#include "Schedule.h"
+#include "posix/Select.h"
/* As recommended in the autoconf manual */
# ifdef TIME_WITH_SYS_TIME
*
* For now, we're using (1), but this needs a better solution. --SDM
*/
-#ifdef RTS_SUPPORTS_THREADS
+#ifdef THREADED_RTS
#define ITIMER_FLAVOUR ITIMER_REAL
#define ITIMER_SIGNAL SIGALRM
#else
install_vtalrm_handler(handle_tick);
+#if !defined(THREADED_RTS)
timestamp = getourtimeofday();
+#endif
it.it_value.tv_sec = ms / 1000;
it.it_value.tv_usec = 1000 * (ms - (1000 * it.it_value.tv_sec));
struct itimerspec it;
timer_t tid;
+#if !defined(THREADED_RTS)
timestamp = getourtimeofday();
+#endif
se.sigev_notify = SIGEV_SIGNAL;
se.sigev_signo = ITIMER_SIGNAL;
struct itimerspec it;
timer_t tid;
+#if !defined(THREADED_RTS)
timestamp = getourtimeofday();
+#endif
se.sigev_notify = SIGEV_SIGNAL;
se.sigev_signo = ITIMER_SIGNAL;
--- /dev/null
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 1998-2005
+ *
+ * Interval timer for profiling and pre-emptive scheduling.
+ *
+ * ---------------------------------------------------------------------------*/
+
+#ifndef ITIMER_H
+#define ITIMER_H
+
+extern lnat getourtimeofday ( void );
+#if 0
+/* unused */
+extern void block_vtalrm_signal ( void );
+extern void unblock_vtalrm_signal ( void );
+#endif
+
+#endif /* ITIMER_H */
--- /dev/null
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2001-2005
+ *
+ * Accessing OS threads functionality in a (mostly) OS-independent
+ * manner.
+ *
+ * --------------------------------------------------------------------------*/
+
+#if defined(DEBUG) && defined(__linux__)
+/* We want GNU extensions in DEBUG mode for mutex error checking */
+#define _GNU_SOURCE
+#endif
+
+#include "Rts.h"
+#if defined(THREADED_RTS)
+#include "OSThreads.h"
+#include "RtsUtils.h"
+
+#if HAVE_STRING_H
+#include <string.h>
+#endif
+
+#if !defined(HAVE_PTHREAD_H)
+#error pthreads.h is required for the threaded RTS on Posix platforms
+#endif
+
+/*
+ * This (allegedly) OS threads independent layer was initially
+ * abstracted away from code that used Pthreads, so the functions
+ * provided here are mostly just wrappers to the Pthreads API.
+ *
+ */
+
+void
+initCondition( Condition* pCond )
+{
+ pthread_cond_init(pCond, NULL);
+ return;
+}
+
+void
+closeCondition( Condition* pCond )
+{
+ pthread_cond_destroy(pCond);
+ return;
+}
+
+rtsBool
+broadcastCondition ( Condition* pCond )
+{
+ return (pthread_cond_broadcast(pCond) == 0);
+}
+
+rtsBool
+signalCondition ( Condition* pCond )
+{
+ return (pthread_cond_signal(pCond) == 0);
+}
+
+rtsBool
+waitCondition ( Condition* pCond, Mutex* pMut )
+{
+ return (pthread_cond_wait(pCond,pMut) == 0);
+}
+
+void
+yieldThread()
+{
+ sched_yield();
+ return;
+}
+
+void
+shutdownThread()
+{
+ pthread_exit(NULL);
+}
+
+int
+createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param)
+{
+ int result = pthread_create(pId, NULL, (void *(*)(void *))startProc, param);
+ if(!result)
+ pthread_detach(*pId);
+ return result;
+}
+
+OSThreadId
+osThreadId()
+{
+ return pthread_self();
+}
+
+void
+initMutex(Mutex* pMut)
+{
+#if defined(DEBUG) && defined(linux_HOST_OS)
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_ERRORCHECK_NP);
+ pthread_mutex_init(pMut,&attr);
+#else
+ pthread_mutex_init(pMut,NULL);
+#endif
+ return;
+}
+
+void
+newThreadLocalKey (ThreadLocalKey *key)
+{
+ int r;
+ if ((r = pthread_key_create(key, NULL)) != 0) {
+ barf("newThreadLocalKey: %s", strerror(r));
+ }
+}
+
+void *
+getThreadLocalVar (ThreadLocalKey *key)
+{
+ return pthread_getspecific(*key);
+ // Note: a return value of NULL can indicate that either the key
+ // is not valid, or the key is valid and the data value has not
+ // yet been set. We need to use the latter case, so we cannot
+ // detect errors here.
+}
+
+void
+setThreadLocalVar (ThreadLocalKey *key, void *value)
+{
+ int r;
+ if ((r = pthread_setspecific(*key,value)) != 0) {
+ barf("setThreadLocalVar: %s", strerror(r));
+ }
+}
+
+static void *
+forkOS_createThreadWrapper ( void * entry )
+{
+ Capability *cap;
+ cap = rts_lock();
+ rts_evalStableIO(cap, (HsStablePtr) entry, NULL);
+ rts_unlock(cap);
+ return NULL;
+}
+
+int
+forkOS_createThread ( HsStablePtr entry )
+{
+ pthread_t tid;
+ int result = pthread_create(&tid, NULL,
+ forkOS_createThreadWrapper, (void*)entry);
+ if(!result)
+ pthread_detach(tid);
+ return result;
+}
+
+#else /* !defined(THREADED_RTS) */
+
+int
+forkOS_createThread ( HsStablePtr entry STG_UNUSED )
+{
+ return -1;
+}
+
+#endif /* !defined(THREADED_RTS) */
#include "Itimer.h"
#include "Signals.h"
#include "Capability.h"
+#include "posix/Select.h"
# ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
#include <unistd.h>
#endif
+#if !defined(THREADED_RTS)
/* last timestamp */
lnat timestamp = 0;
-#if !defined(RTS_SUPPORTS_THREADS)
/*
* The threaded RTS uses an IO-manager thread in Haskell instead (see GHC.Conc)
*/
* if this is true, then our time has expired.
* (idea due to Andy Gill).
*/
-rtsBool
+static rtsBool
wakeUpSleepingThreads(lnat ticks)
{
StgTSO *tso;
tso->why_blocked = NotBlocked;
tso->link = END_TSO_QUEUE;
IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %d\n", tso->id));
- PUSH_ON_RUN_QUEUE(tso);
+ // MainCapability: this code is !THREADED_RTS
+ pushOnRunQueue(&MainCapability,tso);
flag = rtsTrue;
}
return flag;
/* If new runnable threads have arrived, stop waiting for
* I/O and run them.
*/
- if (run_queue_hd != END_TSO_QUEUE) {
+ if (!emptyRunQueue(&MainCapability)) {
return; /* still hold the lock */
}
}
IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %d\n", tso->id));
tso->why_blocked = NotBlocked;
tso->link = END_TSO_QUEUE;
- PUSH_ON_RUN_QUEUE(tso);
+ pushOnRunQueue(&MainCapability,tso);
} else {
if (prev == NULL)
blocked_queue_hd = tso;
}
}
- } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
+ } while (wait && !interrupted && emptyRunQueue(&MainCapability));
}
-#endif /* RTS_SUPPORTS_THREADS */
+#endif /* THREADED_RTS */
--- /dev/null
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 1998-2005
+ *
+ * Prototypes for functions in Select.c
+ *
+ * -------------------------------------------------------------------------*/
+
+#ifndef SELECT_H
+#define SELECT_H
+
+#if !defined(THREADED_RTS)
+/* In Select.c */
+extern lnat RTS_VAR(timestamp);
+
+/* awaitEvent(rtsBool wait)
+ *
+ * Checks for blocked threads that need to be woken.
+ *
+ * Called from STG : NO
+ * Locks assumed : sched_mutex
+ */
+void awaitEvent(rtsBool wait); /* In Select.c */
+#endif
+
+#endif /* SELECT_H */
/* -----------------------------------------------------------------------------
*
- * (c) The GHC Team, 1998-1999
+ * (c) The GHC Team, 1998-2005
*
* Signal processing / handling.
*
#include "Rts.h"
#include "SchedAPI.h"
#include "Schedule.h"
-#include "Signals.h"
+#include "RtsSignals.h"
+#include "posix/Signals.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#if defined(RTS_USER_SIGNALS)
/* SUP: The type of handlers is a little bit, well, doubtful... */
-static StgInt *handlers = NULL; /* Dynamically grown array of signal handlers */
+StgInt *signal_handlers = NULL; /* Dynamically grown array of signal handlers */
static StgInt nHandlers = 0; /* Size of handlers array */
static nat n_haskell_handlers = 0;
if (sig < nHandlers)
return;
- if (handlers == NULL)
- handlers = (StgInt *)stgMallocBytes((sig + 1) * sizeof(StgInt), "more_handlers");
+ if (signal_handlers == NULL)
+ signal_handlers = (StgInt *)stgMallocBytes((sig + 1) * sizeof(StgInt), "more_handlers");
else
- handlers = (StgInt *)stgReallocBytes(handlers, (sig + 1) * sizeof(StgInt), "more_handlers");
+ signal_handlers = (StgInt *)stgReallocBytes(signal_handlers, (sig + 1) * sizeof(StgInt), "more_handlers");
for(i = nHandlers; i <= sig; i++)
// Fill in the new slots with default actions
- handlers[i] = STG_SIG_DFL;
+ signal_handlers[i] = STG_SIG_DFL;
nHandlers = sig + 1;
}
* Pending Handlers
*
* The mechanism for starting handlers differs between the threaded
- * (RTS_SUPPORTS_THREADS) and non-threaded versions of the RTS.
+ * (THREADED_RTS) and non-threaded versions of the RTS.
*
* When the RTS is single-threaded, we just write the pending signal
* handlers into a buffer, and start a thread for each one in the
* scheduler loop.
*
- * When RTS_SUPPORTS_THREADS, the problem is that signals might be
+ * When THREADED_RTS, the problem is that signals might be
* delivered to multiple threads, so we would need to synchronise
* access to pending_handler_buf somehow. Using thread
* synchronisation from a signal handler isn't possible in general
void
setIOManagerPipe (int fd)
{
- // only called when RTS_SUPPORTS_THREADS, but unconditionally
+ // only called when THREADED_RTS, but unconditionally
// compiled here because GHC.Conc depends on it.
io_manager_pipe = fd;
}
-#if !defined(RTS_SUPPORTS_THREADS)
+#if !defined(THREADED_RTS)
#define N_PENDING_HANDLERS 16
StgPtr pending_handler_buf[N_PENDING_HANDLERS];
StgPtr *next_pending_handler = pending_handler_buf;
-#endif /* RTS_SUPPORTS_THREADS */
+#endif /* THREADED_RTS */
/* -----------------------------------------------------------------------------
* SIGCONT handler
{
sigset_t signals;
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
if (io_manager_pipe != -1)
{
// of its pipe is, there's not much we can do here, so just ignore
// the signal..
-#else /* not RTS_SUPPORTS_THREADS */
+#else /* not THREADED_RTS */
/* Can't call allocate from here. Probably can't call malloc
either. However, we have to schedule a new thread somehow.
circumstances, depending on the signal.
*/
- *next_pending_handler++ = deRefStablePtr((StgStablePtr)handlers[sig]);
+ *next_pending_handler++ = deRefStablePtr((StgStablePtr)signal_handlers[sig]);
// stack full?
if (next_pending_handler == &pending_handler_buf[N_PENDING_HANDLERS]) {
stg_exit(EXIT_FAILURE);
}
-#endif /* RTS_SUPPORTS_THREADS */
+#endif /* THREADED_RTS */
// re-establish the signal handler, and carry on
sigemptyset(&signals);
return n_haskell_handlers != 0;
}
-#if !defined(RTS_SUPPORTS_THREADS)
+#if !defined(THREADED_RTS)
void
awaitUserSignals(void)
{
more_handlers(sig);
- previous_spi = handlers[sig];
+ previous_spi = signal_handlers[sig];
action.sa_flags = 0;
switch(spi) {
case STG_SIG_IGN:
- handlers[sig] = STG_SIG_IGN;
+ signal_handlers[sig] = STG_SIG_IGN;
sigdelset(&userSignals, sig);
action.sa_handler = SIG_IGN;
break;
case STG_SIG_DFL:
- handlers[sig] = STG_SIG_DFL;
+ signal_handlers[sig] = STG_SIG_DFL;
sigdelset(&userSignals, sig);
action.sa_handler = SIG_DFL;
break;
case STG_SIG_HAN:
case STG_SIG_RST:
- handlers[sig] = (StgInt)*handler;
+ signal_handlers[sig] = (StgInt)*handler;
sigaddset(&userSignals, sig);
action.sa_handler = generic_handler;
if (spi == STG_SIG_RST) {
// need to return an error code, so avoid a stable pointer leak
// by freeing the previous handler if there was one.
if (previous_spi >= 0) {
- freeStablePtr(stgCast(StgStablePtr,handlers[sig]));
+ freeStablePtr(stgCast(StgStablePtr,signal_handlers[sig]));
n_haskell_handlers--;
}
return STG_SIG_ERR;
* Creating new threads for signal handlers.
* -------------------------------------------------------------------------- */
-void
-startSignalHandler(int sig) // called by the IO manager, see GHC.Conc
-{
-#if defined(RTS_SUPPORTS_THREADS)
- // ToDo: fix race window between the time at which the signal is
- // delivered and the deRefStablePtr() call here. There's no way
- // to safely uninstall a signal handler.
- scheduleThread(
- createIOThread(RtsFlags.GcFlags.initialStkSize,
- (StgClosure *)deRefStablePtr((StgStablePtr)handlers[sig]))
- );
-#else
- (void)sig; /* keep gcc -Wall happy */
-#endif
-}
-
+#if !defined(THREADED_RTS)
void
startSignalHandlers(void)
{
-#if !defined(RTS_SUPPORTS_THREADS)
blockUserSignals();
+ ASSERT_LOCK_HELD(&sched_mutex);
+
while (next_pending_handler != pending_handler_buf) {
next_pending_handler--;
- scheduleThread(
- createIOThread(RtsFlags.GcFlags.initialStkSize,
- (StgClosure *) *next_pending_handler));
+ scheduleThread (
+ &MainCapability,
+ createIOThread(&MainCapability,
+ RtsFlags.GcFlags.initialStkSize,
+ (StgClosure *) *next_pending_handler));
}
unblockUserSignals();
-#endif
}
+#endif
/* ----------------------------------------------------------------------------
* Mark signal handlers during GC.
* avoid race conditions.
* -------------------------------------------------------------------------- */
-#if !defined(RTS_SUPPORTS_THREADS)
+#if !defined(THREADED_RTS)
void
markSignalHandlers (evac_fn evac)
{
--- /dev/null
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2005
+ *
+ * Signal processing / handling.
+ *
+ * ---------------------------------------------------------------------------*/
+
+#ifndef POSIX_SIGNALS_H
+#define POSIX_SIGNALS_H
+
+extern rtsBool anyUserHandlers(void);
+
+#if !defined(THREADED_RTS)
+
+extern StgPtr pending_handler_buf[];
+extern StgPtr *next_pending_handler;
+#define signals_pending() (next_pending_handler != pending_handler_buf)
+void startSignalHandlers(void);
+
+#endif
+
+extern StgInt *signal_handlers;
+
+#endif /* POSIX_SIGNALS_H */
+
#include "Schedule.h"
#include <windows.h>
#include "win32/AsyncIO.h"
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
#include "Capability.h"
#endif
{
int ret;
-#ifdef RTS_SUPPORTS_THREADS
+#ifdef THREADED_RTS
// Small optimisation: we don't want the waiting thread to wake
// up straight away just because a previous returning worker has
// called abandonRequestWait(). If the event is no longer needed,
//
// - we were interrupted
// - new threads have arrived
- // - another worker wants to take over (RTS_SUPPORTS_THREADS)
+ // - another worker wants to take over (THREADED_RTS)
} while (wait
&& !interrupted
&& run_queue_hd == END_TSO_QUEUE
-#ifdef RTS_SUPPORTS_THREADS
+#ifdef THREADED_RTS
&& !needToYieldToReturningWorker()
#endif
);
}
-#ifdef RTS_SUPPORTS_THREADS
+#ifdef THREADED_RTS
void
wakeBlockedWorkerThread()
{
*/
/*
- * Function: initUserSignals()
- *
- * Initialize the console handling substrate.
- */
-extern void initUserSignals(void);
-
-/*
- * Function: initDefaultHandlers()
- *
- * Install any default signal/console handlers. Currently we install a
- * Ctrl+C handler that shuts down the RTS in an orderly manner.
- */
-extern void initDefaultHandlers(void);
-
-/*
* Function: signals_pending()
*
* Used by the RTS to check whether new signals have been 'recently' reported.
#define anyUserHandlers() (rtsFalse)
/*
- * Function: blockUserSignals()
- *
- * Temporarily block the delivery of further console events. Needed to
- * avoid race conditions when GCing the queue of outstanding handlers or
- * when emptying the queue by running the handlers.
- *
- */
-extern void blockUserSignals(void);
-
-/*
- * Function: unblockUserSignals()
- *
- * The inverse of blockUserSignals(); re-enable the deliver of console events.
- */
-extern void unblockUserSignals(void);
-
-/*
- * Function: awaitUserSignals()
- *
- * Wait for the next console event. Currently a NOP (returns immediately.)
- */
-extern void awaitUserSignals(void);
-
-/*
* Function: startSignalHandlers()
*
* Run the handlers associated with the queued up console events. Console
extern void startSignalHandlers(void);
/*
- * Function: markSignalHandlers()
- *
- * Evacuate the handler queue. _Assumes_ that console event delivery
- * has already been blocked.
- */
-extern void markSignalHandlers (evac_fn evac);
-
-/*
* Function: handleSignalsInThisThread()
*
* Have current (OS) thread assume responsibility of handling console events/signals.
/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2001
+ * (c) The GHC Team, 2001-2005
*
* Accessing OS threads functionality in a (mostly) OS-independent
* manner.
*
- *
* --------------------------------------------------------------------------*/
+
#include "Rts.h"
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
#include "OSThreads.h"
#include "RtsUtils.h"
-#if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS)
-/*
- * This (allegedly) OS threads independent layer was initially
- * abstracted away from code that used Pthreads, so the functions
- * provided here are mostly just wrappers to the Pthreads API.
- *
- */
-
-void
-initCondition( Condition* pCond )
-{
- pthread_cond_init(pCond, NULL);
- return;
-}
-
-void
-closeCondition( Condition* pCond )
-{
- pthread_cond_destroy(pCond);
- return;
-}
-
-rtsBool
-broadcastCondition ( Condition* pCond )
-{
- return (pthread_cond_broadcast(pCond) == 0);
-}
-
-rtsBool
-signalCondition ( Condition* pCond )
-{
- return (pthread_cond_signal(pCond) == 0);
-}
-
-rtsBool
-waitCondition ( Condition* pCond, Mutex* pMut )
-{
- return (pthread_cond_wait(pCond,pMut) == 0);
-}
-
-void
-yieldThread()
-{
- sched_yield();
- return;
-}
-
-void
-shutdownThread()
-{
- pthread_exit(NULL);
-}
-
-/* Don't need the argument nor the result, at least not yet. */
-static void* startProcWrapper(void* pProc);
-static void*
-startProcWrapper(void* pProc)
-{
- ((void (*)(void))pProc)();
- return NULL;
-}
-
-
-int
-createOSThread ( OSThreadId* pId, void (*startProc)(void))
-{
- int result = pthread_create(pId, NULL, startProcWrapper, (void*)startProc);
- if(!result)
- pthread_detach(*pId);
- return result;
-}
-
-OSThreadId
-osThreadId()
-{
- return pthread_self();
-}
-
-void
-initMutex(Mutex* pMut)
-{
- pthread_mutex_init(pMut,NULL);
- return;
-}
-
-static void *
-forkOS_createThreadWrapper ( void * entry )
-{
- rts_lock();
- rts_evalStableIO((HsStablePtr) entry, NULL);
- rts_unlock();
- return NULL;
-}
-
-int
-forkOS_createThread ( HsStablePtr entry )
-{
- pthread_t tid;
- int result = pthread_create(&tid, NULL,
- forkOS_createThreadWrapper, (void*)entry);
- if(!result)
- pthread_detach(tid);
- return result;
-}
-
-#elif defined(HAVE_WINDOWS_H)
/* For reasons not yet clear, the entire contents of process.h is protected
* by __STRICT_ANSI__ not being defined.
*/
_endthreadex(0);
}
-static unsigned __stdcall startProcWrapper(void* pReal);
-static unsigned __stdcall
-startProcWrapper(void* pReal)
-{
- ((void (*)(void))pReal)();
- return 0;
-}
-
int
-createOSThread ( OSThreadId* pId, void (*startProc)(void))
+createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param)
{
return (_beginthreadex ( NULL, /* default security attributes */
0,
- startProcWrapper,
- (void*)startProc,
+ startProc,
+ param,
0,
(unsigned*)pId) == 0);
}
return;
}
+void
+newThreadLocalKey (ThreadLocalKey *key)
+{
+ DWORD r;
+ r = TlsAlloc();
+ if (r == TLS_OUT_OF_INDEXES) {
+ barf("newThreadLocalKey: out of keys");
+ }
+ *key = r;
+}
+
+void *
+getThreadLocalVar (ThreadLocalKey *key)
+{
+ void *r;
+ r = TlsGetValue(*key);
+ if (r == NULL) {
+ barf("getThreadLocalVar: key not found");
+ }
+ return r;
+}
+
+void
+setThreadLocalVar (ThreadLocalKey *key, void *value)
+{
+ BOOL b;
+ b = TlsSetValue(*key, value);
+ if (!b) {
+ barf("setThreadLocalVar: %d", GetLastError());
+ }
+}
+
+
static unsigned __stdcall
forkOS_createThreadWrapper ( void * entry )
{
- rts_lock();
- rts_evalStableIO((HsStablePtr) entry, NULL);
- rts_unlock();
+ Capability *cap;
+ cap = rts_lock();
+ rts_evalStableIO(cap, (HsStablePtr) entry, NULL);
+ rts_unlock(cap);
return 0;
}
#endif /* defined(HAVE_PTHREAD_H) */
-#else /* !defined(RTS_SUPPORTS_THREADS) */
+#else /* !defined(THREADED_RTS) */
int
forkOS_createThread ( HsStablePtr entry STG_UNUSED )
return -1;
}
-#endif /* !defined(RTS_SUPPORTS_THREADS) */
-
+#endif /* !defined(THREADED_RTS) */
+++ /dev/null
-/*
- * RTS periodic timers (win32)
- */
-#ifndef __TICKER_H__
-#define __TICKER_H__
-extern int startTicker( nat ms, TickProc handle_tick );
-extern int stopTicker ( void );
-#endif /* __TICKER_H__ */
-