Add 'par' and sparking support to the SMP implementation.
/* -----------------------------------------------------------------------------
- * $Id: PrimOps.h,v 1.42 2000/01/07 10:27:33 sewardj Exp $
+ * $Id: PrimOps.h,v 1.43 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
extern int cmp_thread(const StgTSO *tso1, const StgTSO *tso2);
+#if defined(SMP) || defined(PAR)
+#define parzh(r,node) \
+{ \
+ if (closure_SHOULD_SPARK((StgClosure *)node) && \
+ SparkTl < SparkLim) { \
+ *SparkTl++ = (StgClosure *)(node); \
+ } \
+ r = 1; \
+}
+#else
+#define parzh(r,node) r = 1
+#endif
+
/* Hmm, I'll think about these later. */
/* -----------------------------------------------------------------------------
Pointer equality
/* -----------------------------------------------------------------------------
- * $Id: Regs.h,v 1.7 1999/11/09 15:57:39 simonmar Exp $
+ * $Id: Regs.h,v 1.8 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
* 2) caller-saves registers are saved across a CCall
*/
+typedef struct StgSparkPool_ {
+ StgClosure **base;
+ StgClosure **lim;
+ StgClosure **hd;
+ StgClosure **tl;
+} StgSparkPool;
+
typedef struct StgRegTable_ {
StgUnion rR1;
StgUnion rR2;
StgTSO *rCurrentTSO;
struct _bdescr *rNursery;
struct _bdescr *rCurrentNursery;
-#ifdef SMP
- struct StgRegTable_ *link;
+#if defined(SMP) || defined(PAR)
+ StgSparkPool rSparks; /* per-task spark pool */
+#endif
+#if defined(SMP)
+ struct StgRegTable_ *link; /* per-task register tables are linked together */
#endif
} StgRegTable;
#define SAVE_CurrentTSO (BaseReg->rCurrentTSO)
#define SAVE_CurrentNursery (BaseReg->rCurrentNursery)
+#if defined(SMP) || defined(PAR)
+#define SAVE_SparkHd (BaseReg->rSparks.hd)
+#define SAVE_SparkTl (BaseReg->rSparks.tl)
+#define SAVE_SparkBase (BaseReg->rSparks.base)
+#define SAVE_SparkLim (BaseReg->rSparks.lim)
+#endif
/* We sometimes need to save registers across a C-call, eg. if they
* are clobbered in the standard calling convention. We define the
#define CurrentNursery (BaseReg->rCurrentNursery)
#endif
+#ifdef REG_SparkHd
+GLOBAL_REG_DECL(bdescr *,SparkHd,REG_SparkHd)
+#else
+#define SparkHd (BaseReg->rSparks.hd)
+#endif
+
+#ifdef REG_SparkTl
+GLOBAL_REG_DECL(bdescr *,SparkTl,REG_SparkTl)
+#else
+#define SparkTl (BaseReg->rSparks.tl)
+#endif
+
+#ifdef REG_SparkBase
+GLOBAL_REG_DECL(bdescr *,SparkBase,REG_SparkBase)
+#else
+#define SparkBase (BaseReg->rSparks.base)
+#endif
+
+#ifdef REG_SparkLim
+GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
+#else
+#define SparkLim (BaseReg->rSparks.lim)
+#endif
+
/* -----------------------------------------------------------------------------
For any registers which are denoted "caller-saves" by the C calling
convention, we have to emit code to save and restore them across C
#define CALLER_RESTORE_CurrentNursery /* nothing */
#endif
+#ifdef CALLER_SAVES_SparkHd
+#define CALLER_SAVE_SparkHd SAVE_SparkHd = SparkHd;
+#define CALLER_RESTORE_SparkHd SparkHd = SAVE_SparkHd;
+#else
+#define CALLER_SAVE_SparkHd /* nothing */
+#define CALLER_RESTORE_SparkHd /* nothing */
+#endif
+
+#ifdef CALLER_SAVES_SparkTl
+#define CALLER_SAVE_SparkTl SAVE_SparkTl = SparkTl;
+#define CALLER_RESTORE_SparkTl SparkTl = SAVE_SparkTl;
+#else
+#define CALLER_SAVE_SparkTl /* nothing */
+#define CALLER_RESTORE_SparkTl /* nothing */
+#endif
+
+#ifdef CALLER_SAVES_SparkBase
+#define CALLER_SAVE_SparkBase SAVE_SparkBase = SparkBase;
+#define CALLER_RESTORE_SparkBase SparkBase = SAVE_SparkBase;
+#else
+#define CALLER_SAVE_SparkBase /* nothing */
+#define CALLER_RESTORE_SparkBase /* nothing */
+#endif
+
+#ifdef CALLER_SAVES_SparkLim
+#define CALLER_SAVE_SparkLim SAVE_SparkLim = SparkLim;
+#define CALLER_RESTORE_SparkLim SparkLim = SAVE_SparkLim;
+#else
+#define CALLER_SAVE_SparkLim /* nothing */
+#define CALLER_RESTORE_SparkLim /* nothing */
+#endif
+
#endif /* IN_STG_CODE */
/* ----------------------------------------------------------------------------
CALLER_SAVE_Hp \
CALLER_SAVE_HpLim \
CALLER_SAVE_CurrentTSO \
- CALLER_SAVE_CurrentNursery
+ CALLER_SAVE_CurrentNursery \
+ CALLER_SAVE_SparkHd \
+ CALLER_SAVE_SparkTl \
+ CALLER_SAVE_SparkBase \
+ CALLER_SAVE_SparkLim
#define CALLER_RESTORE_USER \
CALLER_RESTORE_R1 \
CALLER_RESTORE_Hp \
CALLER_RESTORE_HpLim \
CALLER_RESTORE_CurrentTSO \
- CALLER_RESTORE_CurrentNursery
+ CALLER_RESTORE_CurrentNursery \
+ CALLER_RESTORE_SparkHd \
+ CALLER_RESTORE_SparkTl \
+ CALLER_RESTORE_SparkBase \
+ CALLER_RESTORE_SparkLim
#else /* not IN_STG_CODE */
/* -----------------------------------------------------------------------------
- * $Id: Rts.h,v 1.9 1999/11/09 15:47:08 simonmar Exp $
+ * $Id: Rts.h,v 1.10 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
Assertions and Debuggery
-------------------------------------------------------------------------- */
-#ifndef DEBUG
+#ifdef DEBUG
+#define IF_DEBUG(c,s) if (RtsFlags.DebugFlags.c) { s; }
+#else
#define IF_DEBUG(c,s) doNothing()
+#endif
+
+#if defined(GRAN) && defined(DEBUG)
+#define IF_GRAN_DEBUG(c,s) if (RtsFlags.GranFlags.Debug.c) { s; }
#else
-#define IF_DEBUG(c,s) if (RtsFlags.DebugFlags.c) { s; }
+#define IF_GRAN_DEBUG(c,s) doNothing()
+#endif
+
+#if defined(PAR) && defined(DEBUG)
+#define IF_PAR_DEBUG(c,s) if (RtsFlags.ParFlags.Debug.c) { s; }
+#else
+#define IF_PAR_DEBUG(c,s) doNothing()
#endif
/* -----------------------------------------------------------------------------
/* -----------------------------------------------------------------------------
- * $Id: RtsFlags.c,v 1.21 1999/11/29 12:02:44 keithw Exp $
+ * $Id: RtsFlags.c,v 1.22 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The AQUA Project, Glasgow University, 1994-1997
* (c) The GHC Team, 1998-1999
#endif
RtsFlags.ConcFlags.ctxtSwitchTime = CS_MIN_MILLISECS; /* In milliseconds */
+
#ifdef SMP
- RtsFlags.ConcFlags.nNodes = 1;
+ RtsFlags.ParFlags.nNodes = 1;
#endif
+
#ifdef PAR
RtsFlags.ParFlags.parallelStats = rtsFalse;
RtsFlags.ParFlags.granSimStats = rtsFalse;
RtsFlags.ParFlags.granSimStats_Binary = rtsFalse;
-
RtsFlags.ParFlags.outputDisabled = rtsFalse;
-
RtsFlags.ParFlags.packBufferSize = 1024;
+#endif
+
+#if defined(PAR) || defined(SMP)
RtsFlags.ParFlags.maxLocalSparks = 4096;
-#endif /* PAR */
+#endif
#ifdef GRAN
RtsFlags.GranFlags.granSimStats = rtsFalse;
" -qb Enable binary activity profile (output file /tmp/<program>.gb)",
" -Q<size> Set pack-buffer size (default: 1024)",
# endif
+# if defined(SMP) || defined(PAR)
+" -e<n> Maximum number of outstanding local sparks (default: 4096)",
+#endif
# ifdef PAR
" -d Turn on PVM-ish debugging",
" -O Disable output for performance measurement",
for (arg = 0; arg < *rts_argc; arg++) {
if (rts_argv[arg][0] != '-') {
fflush(stdout);
- fprintf(stderr, "setupRtsFlags: Unexpected RTS argument: %s\n",
- rts_argv[arg]);
+ prog_belch("unexpected RTS argument: %s", rts_argv[arg]);
error = rtsTrue;
} else {
# define TICKY_BUILD_ONLY(x) x
#else
# define TICKY_BUILD_ONLY(x) \
-fprintf(stderr, "setupRtsFlags: GHC not built for: ticky-ticky stats\n"); \
+prog_belch("GHC not built for: ticky-ticky stats"); \
error = rtsTrue;
#endif
# define COST_CENTRE_USING_BUILD_ONLY(x) x
#else
# define COST_CENTRE_USING_BUILD_ONLY(x) \
-fprintf(stderr, "setupRtsFlags: GHC not built for: -prof or -parallel\n"); \
+prog_belch("GHC not built for: -prof or -parallel"); \
error = rtsTrue;
#endif
# define PROFILING_BUILD_ONLY(x) x
#else
# define PROFILING_BUILD_ONLY(x) \
-fprintf(stderr, "setupRtsFlags: GHC not built for: -prof\n"); \
+prog_belch("GHC not built for: -prof"); \
+error = rtsTrue;
+#endif
+
+#ifdef SMP
+# define SMP_BUILD_ONLY(x) x
+#else
+# define SMP_BUILD_ONLY(x) \
+prog_belch("GHC not built for: -parallel"); \
error = rtsTrue;
#endif
# define PAR_BUILD_ONLY(x) x
#else
# define PAR_BUILD_ONLY(x) \
-fprintf(stderr, "setupRtsFlags: GHC not built for: -parallel\n"); \
+prog_belch("GHC not built for: -parallel"); \
+error = rtsTrue;
+#endif
+
+#if defined(SMP) || defined(PAR)
+# define PAR_OR_SMP_BUILD_ONLY(x) x
+#else
+# define PAR_OR_SMP_BUILD_ONLY(x) \
+prog_belch("GHC not built for: -parallel or -smp"); \
error = rtsTrue;
#endif
# define GRAN_BUILD_ONLY(x) x
#else
# define GRAN_BUILD_ONLY(x) \
-fprintf(stderr, "setupRtsFlags: GHC not built for: -gransim\n"); \
+prog_belch("GHC not built for: -gransim"); \
error = rtsTrue;
#endif
RtsFlags.ProfFlags.doHeapProfile = HEAP_BY_CLOSURE_TYPE;
break;
default:
- fprintf(stderr, "Invalid heap profile option: %s\n",
- rts_argv[arg]);
+ prog_belch("invalid heap profile option: %s",rts_argv[arg]);
error = rtsTrue;
}
#else
}
break;
default:
- fprintf(stderr, "Invalid heap profile option: %s\n",
- rts_argv[arg]);
+ prog_belch("invalid heap profile option: %s",rts_argv[arg]);
error = rtsTrue;
}
)
case CCchar:
max_cc_no = (hash_t) decode(rts_argv[arg]+3);
if (max_cc_no == 0) {
- fprintf(stderr, "Bad number of cost centres %s\n", rts_argv[arg]);
- error = rtsTrue;
+ prog_belch("bad number of cost centres %s", rts_argv[arg]);
+ error = rtsTrue;
}
break;
case MODchar:
max_mod_no = (hash_t) decode(rts_argv[arg]+3);
if (max_mod_no == 0) {
- fprintf(stderr, "Bad number of modules %s\n", rts_argv[arg]);
- error = rtsTrue;
+ prog_belch("bad number of modules %s", rts_argv[arg]);
+ error = rtsTrue;
}
break;
case GRPchar:
max_grp_no = (hash_t) decode(rts_argv[arg]+3);
if (max_grp_no == 0) {
- fprintf(stderr, "Bad number of groups %s\n", rts_argv[arg]);
- error = rtsTrue;
+ prog_belch("bad number of groups %s", rts_argv[arg]);
+ error = rtsTrue;
}
break;
case DESCRchar:
max_descr_no = (hash_t) decode(rts_argv[arg]+3);
if (max_descr_no == 0) {
- fprintf(stderr, "Bad number of closure descriptions %s\n", rts_argv[arg]);
+ prog_belch("bad number of closure descriptions %s",
+ rts_argv[arg]);
error = rtsTrue;
}
break;
case TYPEchar:
max_type_no = (hash_t) decode(rts_argv[arg]+3);
if (max_type_no == 0) {
- fprintf(stderr, "Bad number of type descriptions %s\n", rts_argv[arg]);
+ prog_belch("bad number of type descriptions %s",
+ rts_argv[arg]);
error = rtsTrue;
}
break;
default:
- fprintf(stderr, "Invalid index table size option: %s\n",
- rts_argv[arg]);
+ prog_belch("invalid index table size option: %s",
+ rts_argv[arg]);
error = rtsTrue;
}
) break;
if (! left || ! right ||
strrchr(rts_argv[arg], '{') != left ||
strchr(rts_argv[arg], '}') != right) {
- fprintf(stderr, "Invalid heap profiling selection bracketing\n %s\n", rts_argv[arg]);
- error = rtsTrue;
+ prog_belch("invalid heap profiling selection bracketing: %s",
+ rts_argv[arg]);
+ error = rtsTrue;
} else {
*right = '\0';
switch (rts_argv[arg][1]) {
#ifdef SMP
case 'N':
+ SMP_BUILD_ONLY(
if (rts_argv[arg][2] != '\0') {
- RtsFlags.ConcFlags.nNodes
+ RtsFlags.ParFlags.nNodes
= strtol(rts_argv[arg]+2, (char **) NULL, 10);
- if (RtsFlags.ConcFlags.nNodes <= 0) {
- fprintf(stderr, "setupRtsFlags: bad value for -N\n");
- error = rtsTrue;
+ if (RtsFlags.ParFlags.nNodes <= 0) {
+ prog_belch("bad value for -N");
+ error = rtsTrue;
}
}
- break;
+ ) break;
#endif
/* =========== PARALLEL =========================== */
case 'e':
- PAR_BUILD_ONLY(
- if (rts_argv[arg][2] != '\0') { /* otherwise, stick w/ the default */
-
+ PAR_OR_SMP_BUILD_ONLY(
+ if (rts_argv[arg][2] != '\0') {
RtsFlags.ParFlags.maxLocalSparks
= strtol(rts_argv[arg]+2, (char **) NULL, 10);
-
if (RtsFlags.ParFlags.maxLocalSparks <= 0) {
- fprintf(stderr, "setupRtsFlags: bad value for -e\n");
- error = rtsTrue;
+ prog_belch("bad value for -e");
+ error = rtsTrue;
}
}
) break;
if (rts_argv[arg][2] != '\0') {
RtsFlags.ParFlags.packBufferSize = decode(rts_argv[arg]+2);
} else {
- fprintf(stderr, "setupRtsFlags: missing size of PackBuffer (for -Q)\n");
- error = rtsTrue;
+ prog_belch("missing size of PackBuffer (for -Q)");
+ error = rtsTrue;
}
) break;
case 'x': /* Extend the argument space */
switch(rts_argv[arg][2]) {
case '\0':
- fprintf(stderr, "setupRtsFlags: Incomplete RTS option: %s\n",rts_argv[arg]);
+ prog_belch("incomplete RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
/* The option prefix '-xx' is reserved for future extension. KSW 1999-11. */
default:
- fprintf(stderr, "setupRtsFlags: Unknown RTS option: %s\n",rts_argv[arg]);
+ prog_belch("unknown RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
}
/* =========== OH DEAR ============================ */
default:
- fprintf(stderr, "setupRtsFlags: Unknown RTS option: %s\n",rts_argv[arg]);
+ prog_belch("unknown RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
}
fflush(stdout);
for (p = usage_text; *p; p++)
- fprintf(stderr, "%s\n", *p);
+ belch("%s", *p);
stg_exit(EXIT_FAILURE);
}
-
}
static FILE * /* return NULL on error */
/* -----------------------------------------------------------------------------
- * $Id: RtsFlags.h,v 1.18 1999/11/29 12:02:45 keithw Exp $
+ * $Id: RtsFlags.h,v 1.19 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
#endif /* DEBUG || PROFILING */
struct CONCURRENT_FLAGS {
- int ctxtSwitchTime; /* in milliseconds */
-#ifdef SMP
- nat nNodes; /* number of threads to run simultaneously */
-#endif
+ int ctxtSwitchTime; /* in milliseconds */
};
#ifdef PAR
struct PAR_FLAGS {
- rtsBool parallelStats; /* Gather parallel statistics */
- rtsBool granSimStats; /* Full .gr profile (rtsTrue) or only END events? */
- rtsBool granSimStats_Binary;
-
- rtsBool outputDisabled; /* Disable output for performance purposes */
-
- unsigned int packBufferSize;
- unsigned int maxLocalSparks;
+ rtsBool parallelStats; /* Gather parallel statistics */
+ rtsBool granSimStats; /* Full .gr profile (rtsTrue) or only END events? */
+ rtsBool granSimStats_Binary;
+
+ rtsBool outputDisabled; /* Disable output for performance purposes */
+
+ unsigned int packBufferSize;
+ unsigned int maxLocalSparks;
};
-
#endif /* PAR */
+#ifdef SMP
+struct PAR_FLAGS {
+ nat nNodes; /* number of threads to run simultaneously */
+ unsigned int maxLocalSparks;
+};
+#endif
+
#ifdef GRAN
struct GRAN_FLAGS {
rtsBool granSimStats; /* Full .gr profile (rtsTrue) or only END events? */
#if defined(PROFILING) || defined(DEBUG)
struct PROFILING_FLAGS ProfFlags;
#endif
-#ifdef PAR
+#if defined(SMP) || defined(PAR)
struct PAR_FLAGS ParFlags;
#endif
#ifdef GRAN
/* -----------------------------------------------------------------------------
- * $Id: RtsUtils.c,v 1.10 1999/11/09 10:46:26 simonmar Exp $
+ * $Id: RtsUtils.c,v 1.11 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
stg_exit(EXIT_FAILURE);
}
+void prog_belch(char *s, ...)
+{
+ va_list ap;
+ va_start(ap,s);
+ /* don't fflush(stdout); WORKAROUND bug in Linux glibc */
+ if (prog_argv != NULL && prog_argv[0] != NULL) {
+ fprintf(stderr, "%s: ", prog_argv[0]);
+ }
+ vfprintf(stderr, s, ap);
+ fprintf(stderr, "\n");
+}
+
void belch(char *s, ...)
{
va_list ap;
/* -----------------------------------------------------------------------------
- * $Id: RtsUtils.h,v 1.5 1999/11/09 10:46:25 simonmar Exp $
+ * $Id: RtsUtils.h,v 1.6 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
extern void *stgReallocWords(void *p, int n, char *msg);
extern void barf(char *s, ...) __attribute__((__noreturn__)) ;
extern void belch(char *s, ...);
+extern void prog_belch(char *s, ...);
extern void _stgAssert (char *filename, unsigned int linenum);
/* -----------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.38 1999/12/01 16:13:25 simonmar Exp $
+ * $Id: Schedule.c,v 1.39 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
#include "Profiling.h"
#include "Sanity.h"
#include "Stats.h"
+#include "Sparks.h"
+
+#include <stdarg.h>
/* Main threads:
*
/* flag set by signal handler to precipitate a context switch */
nat context_switch;
+
/* if this flag is set as well, give up execution */
-static nat interrupted;
+rtsBool interrupted;
/* Next thread ID to allocate.
* Locks required: sched_mutex
void addToBlockedQueue ( StgTSO *tso );
static void schedule ( void );
-static void initThread ( StgTSO *tso, nat stack_size );
void interruptStgRts ( void );
+static StgTSO * createThread_ ( nat size, rtsBool have_lock );
+
+#ifdef DEBUG
+static void sched_belch(char *s, ...);
+#endif
#ifdef SMP
pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
* threads.
*/
if (interrupted) {
- IF_DEBUG(scheduler,belch("schedule: interrupted"));
+ IF_DEBUG(scheduler, sched_belch("interrupted"));
for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
deleteThread(t);
}
StgMainThread *m, **prev;
prev = &main_threads;
for (m = main_threads; m != NULL; m = m->link) {
- if (m->tso->whatNext == ThreadComplete) {
+ switch (m->tso->whatNext) {
+ case ThreadComplete:
if (m->ret) {
*(m->ret) = (StgClosure *)m->tso->sp[0];
}
*prev = m->link;
m->stat = Success;
pthread_cond_broadcast(&m->wakeup);
- }
- if (m->tso->whatNext == ThreadKilled) {
+ break;
+ case ThreadKilled:
*prev = m->link;
m->stat = Killed;
pthread_cond_broadcast(&m->wakeup);
+ break;
+ default:
+ break;
}
}
}
}
#endif
+ /* Top up the run queue from our spark pool. We try to make the
+ * number of threads in the run queue equal to the number of
+ * free capabilities.
+ */
+#if defined(SMP) || defined(PAR)
+ {
+ nat n = n_free_capabilities;
+ StgTSO *tso = run_queue_hd;
+
+ /* Count the run queue */
+ while (n > 0 && tso != END_TSO_QUEUE) {
+ tso = tso->link;
+ n--;
+ }
+
+ for (; n > 0; n--) {
+ StgClosure *spark;
+ spark = findSpark();
+ if (spark == NULL) {
+ break; /* no more sparks in the pool */
+ } else {
+ StgTSO *tso;
+ tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
+ pushClosure(tso,spark);
+ PUSH_ON_RUN_QUEUE(tso);
+#ifdef ToDo
+ advisory_thread_count++;
+#endif
+
+ IF_DEBUG(scheduler,
+ sched_belch("turning spark of closure %p into a thread",
+ (StgClosure *)spark));
+ }
+ }
+ /* We need to wake up the other tasks if we just created some
+ * work for them.
+ */
+ if (n_free_capabilities - n > 1) {
+ pthread_cond_signal(&thread_ready_cond);
+ }
+ }
+#endif /* SMP || PAR */
+
/* Check whether any waiting threads need to be woken up. If the
* run queue is empty, and there are no other tasks running, we
* can wait indefinitely for something to happen.
awaitEvent(
(run_queue_hd == END_TSO_QUEUE)
#ifdef SMP
- && (n_free_capabilities == RtsFlags.ConcFlags.nNodes)
+ && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
#endif
);
}
#ifdef SMP
if (blocked_queue_hd == END_TSO_QUEUE
&& run_queue_hd == END_TSO_QUEUE
- && (n_free_capabilities == RtsFlags.ConcFlags.nNodes)
+ && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
) {
StgMainThread *m;
for (m = main_threads; m != NULL; m = m->link) {
* completed.
*/
if (ready_to_gc) {
- IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n",
- pthread_self()););
+ IF_DEBUG(scheduler,sched_belch("waiting for GC"));
pthread_cond_wait(&gc_pending_cond, &sched_mutex);
}
* capability.
*/
while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
- IF_DEBUG(scheduler,
- fprintf(stderr, "schedule (task %ld): waiting for work\n",
- pthread_self()););
+ IF_DEBUG(scheduler, sched_belch("waiting for work"));
pthread_cond_wait(&thread_ready_cond, &sched_mutex);
- IF_DEBUG(scheduler,
- fprintf(stderr, "schedule (task %ld): work now available\n",
- pthread_self()););
+ IF_DEBUG(scheduler, sched_belch("work now available"));
}
#endif
RELEASE_LOCK(&sched_mutex);
-#ifdef SMP
- IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): running thread %d\n", pthread_self(),t->id));
-#else
- IF_DEBUG(scheduler,fprintf(stderr,"schedule: running thread %d\n",t->id));
-#endif
+ IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
/* Run the current thread
*/
#ifdef INTERPRETER
{
StgClosure* c;
- IF_DEBUG(scheduler,belch("schedule: entering Hugs"));
+ IF_DEBUG(scheduler,sched_belch("entering Hugs"));
c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
cap->rCurrentTSO->sp += 1;
ret = enter(cap,c);
#endif
#ifdef SMP
- if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) {
+ if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) {
#else
if (ready_to_gc) {
#endif
* broadcast on gc_pending_cond afterward.
*/
#ifdef SMP
- IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self()));
+ IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots);
ready_to_gc = rtsFalse;
void deleteAllThreads ( void )
{
StgTSO* t;
- IF_DEBUG(scheduler,belch("deleteAllThreads()"));
+ IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
deleteThread(t);
}
ACQUIRE_LOCK(&sched_mutex);
-#ifdef SMP
- IF_DEBUG(scheduler,
- fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n",
- pthread_self(), cap->rCurrentTSO->id));
-#else
IF_DEBUG(scheduler,
- fprintf(stderr, "schedule: thread %d did a _ccall_gc\n",
- cap->rCurrentTSO->id));
-#endif
+ sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
threadPaused(cap->rCurrentTSO);
cap->rCurrentTSO->link = suspended_ccalling_threads;
#ifdef SMP
while (free_capabilities == NULL) {
- IF_DEBUG(scheduler,
- fprintf(stderr,"schedule (task %ld): waiting to resume\n",
- pthread_self()));
+ IF_DEBUG(scheduler, sched_belch("waiting to resume"));
pthread_cond_wait(&thread_ready_cond, &sched_mutex);
- IF_DEBUG(scheduler,fprintf(stderr,
- "schedule (task %ld): resuming thread %d\n",
- pthread_self(), tso->id));
+ IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
}
cap = free_capabilities;
free_capabilities = cap->link;
-------------------------------------------------------------------------- */
StgTSO *
-createThread(nat stack_size)
+createThread(nat size)
+{
+ return createThread_(size, rtsFalse);
+}
+
+static StgTSO *
+createThread_(nat size, rtsBool have_lock)
{
StgTSO *tso;
+ nat stack_size;
/* catch ridiculously small stack sizes */
- if (stack_size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
- stack_size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
}
- tso = (StgTSO *)allocate(stack_size);
- TICK_ALLOC_TSO(stack_size-sizeofW(StgTSO),0);
+ tso = (StgTSO *)allocate(size);
+ TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
- initThread(tso, stack_size - TSO_STRUCT_SIZEW);
- return tso;
-}
+ stack_size = size - TSO_STRUCT_SIZEW;
-void
-initThread(StgTSO *tso, nat stack_size)
-{
SET_HDR(tso, &TSO_info, CCS_MAIN);
- tso->whatNext = ThreadEnterGHC;
+ tso->whatNext = ThreadEnterGHC;
/* tso->id needs to be unique. For now we use a heavyweight mutex to
protect the increment operation on next_thread_id.
In future, we could use an atomic increment instead.
*/
- ACQUIRE_LOCK(&sched_mutex);
+ if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
tso->id = next_thread_id++;
- RELEASE_LOCK(&sched_mutex);
+ if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
tso->su = (StgUpdateFrame*)tso->sp;
- IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words",
- tso->id, tso->stack_size));
-
+ IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
+ tso->id, tso->stack_size));
+ return tso;
}
Capability *cap, *prev;
cap = NULL;
prev = NULL;
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
cap->link = prev;
prev = cap;
}
free_capabilities = cap;
- n_free_capabilities = RtsFlags.ConcFlags.nNodes;
+ n_free_capabilities = RtsFlags.ParFlags.nNodes;
}
IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n",
n_free_capabilities););
#endif
+
+ initSparkPools();
}
#ifdef SMP
pthread_t tid;
/* make some space for saving all the thread ids */
- task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info),
+ task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
"initScheduler:task_ids");
/* and create all the threads */
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
r = pthread_create(&tid,NULL,taskStart,NULL);
if (r != 0) {
barf("startTasks: Can't create new Posix thread");
exitScheduler( void )
{
#ifdef SMP
- nat i;
+ nat i;
/* Don't want to use pthread_cancel, since we'd have to install
* these silly exception handlers (pthread_cleanup_{push,pop}) around
*/
#if 0
/* Cancel all our tasks */
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
pthread_cancel(task_ids[i].id);
}
/* Wait for all the tasks to terminate */
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n",
task_ids[i].id));
pthread_join(task_ids[i].id, NULL);
/* Send 'em all a SIGHUP. That should shut 'em up.
*/
- await_death = RtsFlags.ConcFlags.nNodes;
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ await_death = RtsFlags.ParFlags.nNodes;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
pthread_kill(task_ids[i].id,SIGTERM);
}
while (await_death > 0) {
#ifdef SMP
pthread_cond_destroy(&m->wakeup);
#endif
+
+ IF_DEBUG(scheduler, fprintf(stderr, "schedule: main thread (%d) finished\n",
+ m->tso->id));
free(m);
RELEASE_LOCK(&sched_mutex);
+
return stat;
}
}
suspended_ccalling_threads =
(StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
+
+#if defined(SMP) || defined(PAR) || defined(GRAN)
+ markSparkQueue();
+#endif
}
/* -----------------------------------------------------------------------------
next = tso->link;
PUSH_ON_RUN_QUEUE(tso);
THREAD_RUNNABLE();
-#ifdef SMP
- IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld",
- pthread_self(), tso->id));
-#else
- IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id));
-#endif
+ IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
return next;
}
return;
}
- IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id));
+ IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
/* Remove it from any blocking queues */
unblockThread(tso);
barf("raiseAsync");
}
+/* -----------------------------------------------------------------------------
+ Debuggery...
+ -------------------------------------------------------------------------- */
+
+#ifdef DEBUG
+static void
+sched_belch(char *s, ...)
+{
+ va_list ap;
+ va_start(ap,s);
+#ifdef SMP
+ fprintf(stderr, "scheduler (task %ld): ", pthread_self());
+#else
+ fprintf(stderr, "scheduler: ");
+#endif
+ vfprintf(stderr, s, ap);
+ fprintf(stderr, "\n");
+}
+#endif
/* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.11 1999/11/11 17:24:49 sewardj Exp $
+ * $Id: Schedule.h,v 1.12 2000/01/12 15:15:18 simonmar Exp $
*
* (c) The GHC Team 1998-1999
*
* Locks required : sched_mutex
*/
extern nat context_switch;
+extern rtsBool interrupted;
extern nat ticks_since_select;
/* -----------------------------------------------------------------------------
- * $Id: Select.c,v 1.5 1999/11/24 16:39:33 simonmar Exp $
+ * $Id: Select.c,v 1.6 2000/01/12 15:15:18 simonmar Exp $
*
* (c) The GHC Team 1995-1999
*
case BlockedOnDelay:
{
- if ((int)tso->block_info.delay < min)
+ if (tso->block_info.delay < min)
min = tso->block_info.delay;
continue;
}
gettimeofday(&tv_before, (struct timezone *) NULL);
#endif
- while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
+ while (!interrupted &&
+ (numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
if (errno != EINTR) {
/* fflush(stdout); */
+ perror("select");
fprintf(stderr, "awaitEvent: select failed\n");
stg_exit(EXIT_FAILURE);
}
ACQUIRE_LOCK(&sched_mutex);
+
/* We got a signal; could be one of ours. If so, we need
* to start up the signal handler straight away, otherwise
* we could block for a long time before the signal is
*/
if (signals_pending()) {
start_signal_handlers();
- return;
+ RELEASE_LOCK(&sched_mutex);
+ break;
}
/* If new runnable threads have arrived, stop waiting for
* I/O and run them.
*/
if (run_queue_hd != END_TSO_QUEUE) {
- return;
+ RELEASE_LOCK(&sched_mutex);
+ break;
}
+
RELEASE_LOCK(&sched_mutex);
}
- if (numFound != 0) {
- /*
- File descriptors ready, but we don't know how much time was spent
- in the select(). To interpolate, we compare the time before
- and after the select().
- */
-
#ifdef linux_TARGET_OS
- /* on Linux, tv is set to indicate the amount of time not
- * slept, so we don't need to gettimeofday() to find out.
- */
- delta += min - (tv.tv_sec * 1000000 + tv.tv_usec);
+ /* on Linux, tv is set to indicate the amount of time not
+ * slept, so we don't need to gettimeofday() to find out.
+ */
+ delta += min - (tv.tv_sec * 1000000 + tv.tv_usec);
#else
- gettimeofday(&tv_after, (struct timezone *) NULL);
- delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
- tv_after.tv_usec - tv_before.tv_usec;
+ gettimeofday(&tv_after, (struct timezone *) NULL);
+ delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
+ tv_after.tv_usec - tv_before.tv_usec;
+#endif
+
+#if 0
+ if (delta != 0) { fprintf(stderr,"waited: %d %d %d\n", min, delta,
+ interrupted); }
#endif
- } else {
- delta += min;
- }
ACQUIRE_LOCK(&sched_mutex);
/* -----------------------------------------------------------------------------
- * $Id: Signals.c,v 1.10 1999/11/09 15:46:57 simonmar Exp $
+ * $Id: Signals.c,v 1.11 2000/01/12 15:15:18 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
next_pending_handler--;
- /* create*Thread puts the thread on the head of the runnable
- * queue, hence it will be run next. Poor man's priority
- * scheduling.
- */
createIOThread(RtsFlags.GcFlags.initialStkSize,
(StgClosure *) *next_pending_handler);
}
--- /dev/null
+/* -----------------------------------------------------------------------------
+ * $Id: Sparks.c,v 1.1 2000/01/12 15:15:18 simonmar Exp $
+ *
+ * (c) The GHC Team, 2000
+ *
+ * Sparking support for PAR and SMP versions of the RTS.
+ *
+ * ---------------------------------------------------------------------------*/
+
+#if defined(SMP) || defined(PAR)
+
+#include "Rts.h"
+#include "Schedule.h"
+#include "SchedAPI.h"
+#include "Sparks.h"
+#include "Storage.h"
+#include "RtsFlags.h"
+#include "RtsUtils.h"
+
+static void slide_spark_pool( StgSparkPool *pool );
+
+void
+initSparkPools( void )
+{
+ Capability *cap;
+ StgSparkPool *pool;
+
+#ifdef SMP
+ /* walk over the capabilities, allocating a spark pool for each one */
+ for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+#else
+ /* allocate a single spark pool */
+ cap = &MainRegTable;
+ {
+#endif
+ pool = &(cap->rSparks);
+
+ pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
+ * sizeof(StgClosure *),
+ "initSparkPools");
+ pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
+ pool->hd = pool->base;
+ pool->tl = pool->base;
+ }
+}
+
+StgClosure *
+findSpark( void )
+{
+ Capability *cap;
+ StgSparkPool *pool;
+ StgClosure *spark;
+
+#ifdef SMP
+ /* walk over the capabilities, allocating a spark pool for each one */
+ for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+#else
+ /* allocate a single spark pool */
+ cap = &MainRegTable;
+ {
+#endif
+ pool = &(cap->rSparks);
+ while (pool->hd < pool->tl) {
+ spark = *pool->hd++;
+ if (closure_SHOULD_SPARK(spark))
+ return spark;
+ }
+ slide_spark_pool(pool);
+ }
+ return NULL;
+}
+
+rtsBool
+add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
+{
+ if (pool->tl == pool->lim)
+ slide_spark_pool(pool);
+
+ if (closure_SHOULD_SPARK(closure) &&
+ pool->tl < pool->lim) {
+ *(pool->tl++) = closure;
+ return rtsTrue;
+ } else {
+ return rtsFalse;
+ }
+}
+
+static void
+slide_spark_pool( StgSparkPool *pool )
+{
+ StgClosure **sparkp, **to_sparkp;
+
+ sparkp = pool->hd;
+ to_sparkp = pool->base;
+ while (sparkp < pool->tl) {
+ ASSERT(to_sparkp<=sparkp);
+ ASSERT(*sparkp!=NULL);
+ ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
+
+ if (closure_SHOULD_SPARK(*sparkp)) {
+ *to_sparkp++ = *sparkp++;
+ } else {
+ sparkp++;
+ }
+ }
+ pool->hd = pool->base;
+ pool->tl = to_sparkp;
+}
+
+static inline nat
+spark_queue_len( StgSparkPool *pool )
+{
+ return (nat) (pool->tl - pool->hd);
+}
+
+/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
+ implicit slide i.e. after marking all sparks are at the beginning of the
+ spark pool and the spark pool only contains sparkable closures
+*/
+void
+markSparkQueue( void )
+{
+ StgClosure **sparkp, **to_sparkp;
+#ifdef DEBUG
+ nat n, pruned_sparks;
+#endif
+ StgSparkPool *pool;
+ Capability *cap;
+
+#ifdef SMP
+ /* walk over the capabilities, allocating a spark pool for each one */
+ for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+#else
+ /* allocate a single spark pool */
+ cap = &MainRegTable;
+ {
+#endif
+ pool = &(cap->rSparks);
+
+#ifdef DEBUG
+ n = 0;
+ pruned_sparks = 0;
+#endif
+
+ sparkp = pool->hd;
+ to_sparkp = pool->base;
+ while (sparkp < pool->tl) {
+ ASSERT(to_sparkp<=sparkp);
+ ASSERT(*sparkp!=NULL);
+ ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
+ // ToDo?: statistics gathering here (also for GUM!)
+ if (closure_SHOULD_SPARK(*sparkp)) {
+ *to_sparkp = MarkRoot(*sparkp);
+ to_sparkp++;
+#ifdef DEBUG
+ n++;
+#endif
+ } else {
+#ifdef DEBUG
+ pruned_sparks++;
+#endif
+ }
+ sparkp++;
+ }
+ pool->hd = pool->base;
+ pool->tl = to_sparkp;
+
+#if defined(SMP)
+ IF_DEBUG(scheduler,
+ belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+ n, pruned_sparks, pthread_self()));
+#elif defined(PAR)
+ IF_DEBUG(scheduler,
+ belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+ n, pruned_sparks, mytid));
+#else
+ IF_DEBUG(scheduler,
+ belch("markSparkQueue: marked %d sparks and pruned %d sparks",
+ n, pruned_sparks));
+#endif
+
+ IF_DEBUG(scheduler,
+ belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
+ spark_queue_len(pool), pool->hd, pool->tl));
+
+ }
+}
+
+#endif /* SMP || PAR */
+
+#if defined(GRAN)
+
+... ToDo ...
+
+#endif
--- /dev/null
+/* -----------------------------------------------------------------------------
+ * $Id: Sparks.h,v 1.1 2000/01/12 15:15:18 simonmar Exp $
+ *
+ * (c) The GHC Team, 2000
+ *
+ * Sparking support for PAR and SMP versions of the RTS.
+ *
+ * ---------------------------------------------------------------------------*/
+
+void initSparkPools( void );
+void markSparkQueue( void );
+StgClosure * findSpark( void );
+rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool );
+void markSparkQueue( void );
/* -----------------------------------------------------------------------------
- * $Id: Stats.c,v 1.19 1999/12/03 15:55:29 chak Exp $
+ * $Id: Stats.c,v 1.20 2000/01/12 15:15:18 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
gc_time,
gc_etime,
time,
- etime,
+ etime - ElapsedTimeStart,
faults - GC_start_faults,
GC_start_faults - GC_end_faults,
gen);
nat i;
pthread_t me = pthread_self();
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
if (me == task_ids[i].id) {
task_ids[i].gc_time += gc_time;
task_ids[i].gc_etime += gc_etime;
nat i;
pthread_t me = pthread_self();
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
if (task_ids[i].id == me) {
task_ids[i].mut_time = usertime() - task_ids[i].gc_time;
task_ids[i].mut_etime = elapsedtime()
{
nat i;
MutUserTime = 0.0;
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
MutUserTime += task_ids[i].mut_time;
fprintf(sf, " Task %2d: MUT time: %6.2fs (%6.2fs elapsed)\n"
" GC time: %6.2fs (%6.2fs elapsed)\n\n",
/* -----------------------------------------------------------------------------
- * $Id: Storage.c,v 1.21 1999/11/09 15:46:59 simonmar Exp $
+ * $Id: Storage.c,v 1.22 2000/01/12 15:15:18 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
Capability *cap;
/* All tasks must be stopped */
- ASSERT(n_free_capabilities == RtsFlags.ConcFlags.nNodes);
+ ASSERT(n_free_capabilities == RtsFlags.ParFlags.nNodes);
for (cap = free_capabilities; cap != NULL; cap = cap->link) {
for (bd = cap->rNursery; bd; bd = bd->link) {
capabilities are owned by the scheduler, though: one or more
tasks might have been stopped while they were running (non-main)
threads. */
- /* ASSERT(n_free_capabilities == RtsFlags.ConcFlags.nNodes); */
+ /* ASSERT(n_free_capabilities == RtsFlags.ParFlags.nNodes); */
allocated =
n_free_capabilities * RtsFlags.GcFlags.minAllocAreaSize * BLOCK_SIZE_W