/* -----------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.38 1999/12/01 16:13:25 simonmar Exp $
+ * $Id: Schedule.c,v 1.39 2000/01/12 15:15:17 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
#include "Profiling.h"
#include "Sanity.h"
#include "Stats.h"
+#include "Sparks.h"
+
+#include <stdarg.h>
/* Main threads:
*
/* flag set by signal handler to precipitate a context switch */
nat context_switch;
+
/* if this flag is set as well, give up execution */
-static nat interrupted;
+rtsBool interrupted;
/* Next thread ID to allocate.
* Locks required: sched_mutex
void addToBlockedQueue ( StgTSO *tso );
static void schedule ( void );
-static void initThread ( StgTSO *tso, nat stack_size );
void interruptStgRts ( void );
+static StgTSO * createThread_ ( nat size, rtsBool have_lock );
+
+#ifdef DEBUG
+static void sched_belch(char *s, ...);
+#endif
#ifdef SMP
pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
* threads.
*/
if (interrupted) {
- IF_DEBUG(scheduler,belch("schedule: interrupted"));
+ IF_DEBUG(scheduler, sched_belch("interrupted"));
for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
deleteThread(t);
}
StgMainThread *m, **prev;
prev = &main_threads;
for (m = main_threads; m != NULL; m = m->link) {
- if (m->tso->whatNext == ThreadComplete) {
+ switch (m->tso->whatNext) {
+ case ThreadComplete:
if (m->ret) {
*(m->ret) = (StgClosure *)m->tso->sp[0];
}
*prev = m->link;
m->stat = Success;
pthread_cond_broadcast(&m->wakeup);
- }
- if (m->tso->whatNext == ThreadKilled) {
+ break;
+ case ThreadKilled:
*prev = m->link;
m->stat = Killed;
pthread_cond_broadcast(&m->wakeup);
+ break;
+ default:
+ break;
}
}
}
}
#endif
+ /* Top up the run queue from our spark pool. We try to make the
+ * number of threads in the run queue equal to the number of
+ * free capabilities.
+ */
+#if defined(SMP) || defined(PAR)
+ {
+ nat n = n_free_capabilities;
+ StgTSO *tso = run_queue_hd;
+
+ /* Count the run queue */
+ while (n > 0 && tso != END_TSO_QUEUE) {
+ tso = tso->link;
+ n--;
+ }
+
+ for (; n > 0; n--) {
+ StgClosure *spark;
+ spark = findSpark();
+ if (spark == NULL) {
+ break; /* no more sparks in the pool */
+ } else {
+ StgTSO *tso;
+ tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
+ pushClosure(tso,spark);
+ PUSH_ON_RUN_QUEUE(tso);
+#ifdef ToDo
+ advisory_thread_count++;
+#endif
+
+ IF_DEBUG(scheduler,
+ sched_belch("turning spark of closure %p into a thread",
+ (StgClosure *)spark));
+ }
+ }
+ /* We need to wake up the other tasks if we just created some
+ * work for them.
+ */
+ if (n_free_capabilities - n > 1) {
+ pthread_cond_signal(&thread_ready_cond);
+ }
+ }
+#endif /* SMP || PAR */
+
/* Check whether any waiting threads need to be woken up. If the
* run queue is empty, and there are no other tasks running, we
* can wait indefinitely for something to happen.
awaitEvent(
(run_queue_hd == END_TSO_QUEUE)
#ifdef SMP
- && (n_free_capabilities == RtsFlags.ConcFlags.nNodes)
+ && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
#endif
);
}
#ifdef SMP
if (blocked_queue_hd == END_TSO_QUEUE
&& run_queue_hd == END_TSO_QUEUE
- && (n_free_capabilities == RtsFlags.ConcFlags.nNodes)
+ && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
) {
StgMainThread *m;
for (m = main_threads; m != NULL; m = m->link) {
* completed.
*/
if (ready_to_gc) {
- IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n",
- pthread_self()););
+ IF_DEBUG(scheduler,sched_belch("waiting for GC"));
pthread_cond_wait(&gc_pending_cond, &sched_mutex);
}
* capability.
*/
while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
- IF_DEBUG(scheduler,
- fprintf(stderr, "schedule (task %ld): waiting for work\n",
- pthread_self()););
+ IF_DEBUG(scheduler, sched_belch("waiting for work"));
pthread_cond_wait(&thread_ready_cond, &sched_mutex);
- IF_DEBUG(scheduler,
- fprintf(stderr, "schedule (task %ld): work now available\n",
- pthread_self()););
+ IF_DEBUG(scheduler, sched_belch("work now available"));
}
#endif
RELEASE_LOCK(&sched_mutex);
-#ifdef SMP
- IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): running thread %d\n", pthread_self(),t->id));
-#else
- IF_DEBUG(scheduler,fprintf(stderr,"schedule: running thread %d\n",t->id));
-#endif
+ IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
/* Run the current thread
*/
#ifdef INTERPRETER
{
StgClosure* c;
- IF_DEBUG(scheduler,belch("schedule: entering Hugs"));
+ IF_DEBUG(scheduler,sched_belch("entering Hugs"));
c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
cap->rCurrentTSO->sp += 1;
ret = enter(cap,c);
#endif
#ifdef SMP
- if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) {
+ if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) {
#else
if (ready_to_gc) {
#endif
* broadcast on gc_pending_cond afterward.
*/
#ifdef SMP
- IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self()));
+ IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots);
ready_to_gc = rtsFalse;
void deleteAllThreads ( void )
{
StgTSO* t;
- IF_DEBUG(scheduler,belch("deleteAllThreads()"));
+ IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
deleteThread(t);
}
ACQUIRE_LOCK(&sched_mutex);
-#ifdef SMP
- IF_DEBUG(scheduler,
- fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n",
- pthread_self(), cap->rCurrentTSO->id));
-#else
IF_DEBUG(scheduler,
- fprintf(stderr, "schedule: thread %d did a _ccall_gc\n",
- cap->rCurrentTSO->id));
-#endif
+ sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
threadPaused(cap->rCurrentTSO);
cap->rCurrentTSO->link = suspended_ccalling_threads;
#ifdef SMP
while (free_capabilities == NULL) {
- IF_DEBUG(scheduler,
- fprintf(stderr,"schedule (task %ld): waiting to resume\n",
- pthread_self()));
+ IF_DEBUG(scheduler, sched_belch("waiting to resume"));
pthread_cond_wait(&thread_ready_cond, &sched_mutex);
- IF_DEBUG(scheduler,fprintf(stderr,
- "schedule (task %ld): resuming thread %d\n",
- pthread_self(), tso->id));
+ IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
}
cap = free_capabilities;
free_capabilities = cap->link;
-------------------------------------------------------------------------- */
StgTSO *
-createThread(nat stack_size)
+createThread(nat size)
+{
+ return createThread_(size, rtsFalse);
+}
+
+static StgTSO *
+createThread_(nat size, rtsBool have_lock)
{
StgTSO *tso;
+ nat stack_size;
/* catch ridiculously small stack sizes */
- if (stack_size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
- stack_size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
}
- tso = (StgTSO *)allocate(stack_size);
- TICK_ALLOC_TSO(stack_size-sizeofW(StgTSO),0);
+ tso = (StgTSO *)allocate(size);
+ TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
- initThread(tso, stack_size - TSO_STRUCT_SIZEW);
- return tso;
-}
+ stack_size = size - TSO_STRUCT_SIZEW;
-void
-initThread(StgTSO *tso, nat stack_size)
-{
SET_HDR(tso, &TSO_info, CCS_MAIN);
- tso->whatNext = ThreadEnterGHC;
+ tso->whatNext = ThreadEnterGHC;
/* tso->id needs to be unique. For now we use a heavyweight mutex to
protect the increment operation on next_thread_id.
In future, we could use an atomic increment instead.
*/
- ACQUIRE_LOCK(&sched_mutex);
+ if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
tso->id = next_thread_id++;
- RELEASE_LOCK(&sched_mutex);
+ if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
tso->su = (StgUpdateFrame*)tso->sp;
- IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words",
- tso->id, tso->stack_size));
-
+ IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
+ tso->id, tso->stack_size));
+ return tso;
}
Capability *cap, *prev;
cap = NULL;
prev = NULL;
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
cap->link = prev;
prev = cap;
}
free_capabilities = cap;
- n_free_capabilities = RtsFlags.ConcFlags.nNodes;
+ n_free_capabilities = RtsFlags.ParFlags.nNodes;
}
IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n",
n_free_capabilities););
#endif
+
+ initSparkPools();
}
#ifdef SMP
pthread_t tid;
/* make some space for saving all the thread ids */
- task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info),
+ task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
"initScheduler:task_ids");
/* and create all the threads */
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
r = pthread_create(&tid,NULL,taskStart,NULL);
if (r != 0) {
barf("startTasks: Can't create new Posix thread");
exitScheduler( void )
{
#ifdef SMP
- nat i;
+ nat i;
/* Don't want to use pthread_cancel, since we'd have to install
* these silly exception handlers (pthread_cleanup_{push,pop}) around
*/
#if 0
/* Cancel all our tasks */
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
pthread_cancel(task_ids[i].id);
}
/* Wait for all the tasks to terminate */
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n",
task_ids[i].id));
pthread_join(task_ids[i].id, NULL);
/* Send 'em all a SIGHUP. That should shut 'em up.
*/
- await_death = RtsFlags.ConcFlags.nNodes;
- for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ await_death = RtsFlags.ParFlags.nNodes;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
pthread_kill(task_ids[i].id,SIGTERM);
}
while (await_death > 0) {
#ifdef SMP
pthread_cond_destroy(&m->wakeup);
#endif
+
+ IF_DEBUG(scheduler, fprintf(stderr, "schedule: main thread (%d) finished\n",
+ m->tso->id));
free(m);
RELEASE_LOCK(&sched_mutex);
+
return stat;
}
}
suspended_ccalling_threads =
(StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
+
+#if defined(SMP) || defined(PAR) || defined(GRAN)
+ markSparkQueue();
+#endif
}
/* -----------------------------------------------------------------------------
next = tso->link;
PUSH_ON_RUN_QUEUE(tso);
THREAD_RUNNABLE();
-#ifdef SMP
- IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld",
- pthread_self(), tso->id));
-#else
- IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id));
-#endif
+ IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
return next;
}
return;
}
- IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id));
+ IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
/* Remove it from any blocking queues */
unblockThread(tso);
barf("raiseAsync");
}
+/* -----------------------------------------------------------------------------
+ Debuggery...
+ -------------------------------------------------------------------------- */
+
+#ifdef DEBUG
+static void
+sched_belch(char *s, ...)
+{
+ va_list ap;
+ va_start(ap,s);
+#ifdef SMP
+ fprintf(stderr, "scheduler (task %ld): ", pthread_self());
+#else
+ fprintf(stderr, "scheduler: ");
+#endif
+ vfprintf(stderr, s, ap);
+ fprintf(stderr, "\n");
+}
+#endif