/* -----------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.22 1999/06/25 09:17:58 simonmar Exp $
+ * $Id: Schedule.c,v 1.30 1999/11/08 15:30:39 sewardj Exp $
*
* (c) The GHC Team, 1998-1999
*
*
* ---------------------------------------------------------------------------*/
+/* Version with scheduler monitor support for SMPs.
+
+ This design provides a high-level API to create and schedule threads etc.
+ as documented in the SMP design document.
+
+ It uses a monitor design controlled by a single mutex to exercise control
+ over accesses to shared data structures, and builds on the Posix threads
+ library.
+
+ The majority of state is shared. In order to keep essential per-task state,
+ there is a Capability structure, which contains all the information
+ needed to run a thread: its STG registers, a pointer to its TSO, a
+ nursery etc. During STG execution, a pointer to the capability is
+ kept in a register (BaseReg).
+
+ In a non-SMP build, there is one global capability, namely MainRegTable.
+
+ SDM & KH, 10/99
+*/
+
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "Signals.h"
#include "Profiling.h"
#include "Sanity.h"
+#include "Stats.h"
+
+/* Main threads:
+ *
+ * These are the threads which clients have requested that we run.
+ *
+ * In an SMP build, we might have several concurrent clients all
+ * waiting for results, and each one will wait on a condition variable
+ * until the result is available.
+ *
+ * In non-SMP, clients are strictly nested: the first client calls
+ * into the RTS, which might call out again to C with a _ccall_GC, and
+ * eventually re-enter the RTS.
+ *
+ * Main threads information is kept in a linked list:
+ */
+typedef struct StgMainThread_ {
+ StgTSO * tso;
+ SchedulerStatus stat;
+ StgClosure ** ret;
+#ifdef SMP
+ pthread_cond_t wakeup;
+#endif
+ struct StgMainThread_ *link;
+} StgMainThread;
+/* Main thread queue.
+ * Locks required: sched_mutex.
+ */
+static StgMainThread *main_threads;
+
+/* Thread queues.
+ * Locks required: sched_mutex.
+ */
StgTSO *run_queue_hd, *run_queue_tl;
StgTSO *blocked_queue_hd, *blocked_queue_tl;
-StgTSO *ccalling_threads;
-#define MAX_SCHEDULE_NESTING 256
-nat next_main_thread;
-StgTSO *main_threads[MAX_SCHEDULE_NESTING];
+/* Threads suspended in _ccall_GC.
+ * Locks required: sched_mutex.
+ */
+static StgTSO *suspended_ccalling_threads;
+
+#ifndef SMP
+static rtsBool in_ccall_gc;
+#endif
static void GetRoots(void);
static StgTSO *threadStackOverflow(StgTSO *tso);
+/* KH: The following two flags are shared memory locations. There is no need
+ to lock them, since they are only unset at the end of a scheduler
+ operation.
+*/
+
/* flag set by signal handler to precipitate a context switch */
nat context_switch;
/* if this flag is set as well, give up execution */
static nat interrupted;
-/* Next thread ID to allocate */
+/* Next thread ID to allocate.
+ * Locks required: sched_mutex
+ */
StgThreadID next_thread_id = 1;
/*
* Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
* thread. If CurrentTSO == NULL, then we're at the scheduler level.
*/
-StgTSO *CurrentTSO;
-StgRegTable MainRegTable;
-
-/*
- * The thread state for the main thread.
- */
-StgTSO *MainTSO;
-
+
/* The smallest stack size that makes any sense is:
* RESERVED_STACK_WORDS (so we can get back from the stack overflow)
* + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
+/* Free capability list.
+ * Locks required: sched_mutex.
+ */
+#ifdef SMP
+Capability *free_capabilities; /* Available capabilities for running threads */
+nat n_free_capabilities; /* total number of available capabilities */
+#else
+Capability MainRegTable; /* for non-SMP, we have one global capability */
+#endif
+
+rtsBool ready_to_gc;
+
+/* All our current task ids, saved in case we need to kill them later.
+ */
+#ifdef SMP
+task_info *task_ids;
+#endif
+
+void addToBlockedQueue ( StgTSO *tso );
+
+static void schedule ( void );
+static void initThread ( StgTSO *tso, nat stack_size );
+ void interruptStgRts ( void );
+
+#ifdef SMP
+pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
+pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
+
+nat await_death;
+#endif
+
+/* -----------------------------------------------------------------------------
+ Main scheduling loop.
+
+ We use round-robin scheduling, each thread returning to the
+ scheduler loop when one of these conditions is detected:
+
+ * out of heap space
+ * timer expires (thread yields)
+ * thread blocks
+ * thread ends
+ * stack overflow
+
+ Locking notes: we acquire the scheduler lock once at the beginning
+ of the scheduler loop, and release it when
+
+ * running a thread, or
+ * waiting for work, or
+ * waiting for a GC to complete.
+
+ -------------------------------------------------------------------------- */
+
+static void
+schedule( void )
+{
+ StgTSO *t;
+ Capability *cap;
+ StgThreadReturnCode ret;
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+ while (1) {
+
+ /* Check whether any waiting threads need to be woken up.
+ * If the run queue is empty, we can wait indefinitely for
+ * something to happen.
+ */
+ if (blocked_queue_hd != END_TSO_QUEUE) {
+ awaitEvent(run_queue_hd == END_TSO_QUEUE);
+ }
+
+ /* check for signals each time around the scheduler */
+#ifndef __MINGW32__
+ if (signals_pending()) {
+ start_signal_handlers();
+ }
+#endif
+
+#ifdef SMP
+ /* If there's a GC pending, don't do anything until it has
+ * completed.
+ */
+ if (ready_to_gc) {
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n",
+ pthread_self()););
+ pthread_cond_wait(&gc_pending_cond, &sched_mutex);
+ }
+
+ /* block until we've got a thread on the run queue and a free
+ * capability.
+ */
+ while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "schedule (task %ld): waiting for work\n",
+ pthread_self()););
+ pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "schedule (task %ld): work now available\n",
+ pthread_self()););
+ }
+#endif
+
+ /* grab a thread from the run queue
+ */
+ t = POP_RUN_QUEUE();
+
+ /* grab a capability
+ */
+#ifdef SMP
+ cap = free_capabilities;
+ free_capabilities = cap->link;
+ n_free_capabilities--;
+#else
+ cap = &MainRegTable;
+#endif
+
+ cap->rCurrentTSO = t;
+
+ /* set the context_switch flag
+ */
+ if (run_queue_hd == END_TSO_QUEUE)
+ context_switch = 0;
+ else
+ context_switch = 1;
+
+ RELEASE_LOCK(&sched_mutex);
+
+#ifdef SMP
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): running thread %d\n", pthread_self(),t->id));
+#else
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule: running thread %d\n",t->id));
+#endif
+
+ /* Run the current thread
+ */
+ switch (cap->rCurrentTSO->whatNext) {
+ case ThreadKilled:
+ case ThreadComplete:
+ /* Thread already finished, return to scheduler. */
+ ret = ThreadFinished;
+ break;
+ case ThreadEnterGHC:
+ ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
+ break;
+ case ThreadRunGHC:
+ ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
+ break;
+ case ThreadEnterHugs:
+#ifdef INTERPRETER
+ {
+ StgClosure* c;
+ IF_DEBUG(scheduler,belch("schedule: entering Hugs"));
+ c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
+ cap->rCurrentTSO->sp += 1;
+ ret = enter(cap,c);
+ break;
+ }
+#else
+ barf("Panic: entered a BCO but no bytecode interpreter in this build");
+#endif
+ default:
+ barf("schedule: invalid whatNext field");
+ }
+
+ /* Costs for the scheduler are assigned to CCS_SYSTEM */
+#ifdef PROFILING
+ CCCS = CCS_SYSTEM;
+#endif
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+#ifdef SMP
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): ", pthread_self()););
+#else
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule: "););
+#endif
+ t = cap->rCurrentTSO;
+
+ switch (ret) {
+ case HeapOverflow:
+ /* make all the running tasks block on a condition variable,
+ * maybe set context_switch and wait till they all pile in,
+ * then have them wait on a GC condition variable.
+ */
+ IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
+ threadPaused(t);
+
+ ready_to_gc = rtsTrue;
+ context_switch = 1; /* stop other threads ASAP */
+ PUSH_ON_RUN_QUEUE(t);
+ break;
+
+ case StackOverflow:
+ /* just adjust the stack for this thread, then pop it back
+ * on the run queue.
+ */
+ IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
+ threadPaused(t);
+ {
+ StgMainThread *m;
+ /* enlarge the stack */
+ StgTSO *new_t = threadStackOverflow(t);
+
+ /* This TSO has moved, so update any pointers to it from the
+ * main thread stack. It better not be on any other queues...
+ * (it shouldn't be)
+ */
+ for (m = main_threads; m != NULL; m = m->link) {
+ if (m->tso == t) {
+ m->tso = new_t;
+ }
+ }
+ PUSH_ON_RUN_QUEUE(new_t);
+ }
+ break;
+
+ case ThreadYielding:
+ /* put the thread back on the run queue. Then, if we're ready to
+ * GC, check whether this is the last task to stop. If so, wake
+ * up the GC thread. getThread will block during a GC until the
+ * GC is finished.
+ */
+ IF_DEBUG(scheduler,
+ if (t->whatNext == ThreadEnterHugs) {
+ /* ToDo: or maybe a timer expired when we were in Hugs?
+ * or maybe someone hit ctrl-C
+ */
+ belch("thread %ld stopped to switch to Hugs", t->id);
+ } else {
+ belch("thread %ld stopped, yielding", t->id);
+ }
+ );
+ threadPaused(t);
+ APPEND_TO_RUN_QUEUE(t);
+ break;
+
+ case ThreadBlocked:
+ /* don't need to do anything. Either the thread is blocked on
+ * I/O, in which case we'll have called addToBlockedQueue
+ * previously, or it's blocked on an MVar or Blackhole, in which
+ * case it'll be on the relevant queue already.
+ */
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "thread %d stopped, ", t->id);
+ printThreadBlockage(t);
+ fprintf(stderr, "\n"));
+ threadPaused(t);
+ break;
+
+ case ThreadFinished:
+ /* Need to check whether this was a main thread, and if so, signal
+ * the task that started it with the return value. If we have no
+ * more main threads, we probably need to stop all the tasks until
+ * we get a new one.
+ */
+ IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
+ t->whatNext = ThreadComplete;
+ break;
+
+ default:
+ barf("doneThread: invalid thread return code");
+ }
+
+#ifdef SMP
+ cap->link = free_capabilities;
+ free_capabilities = cap;
+ n_free_capabilities++;
+#endif
+
+#ifdef SMP
+ if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) {
+#else
+ if (ready_to_gc) {
+#endif
+ /* everybody back, start the GC.
+ * Could do it in this thread, or signal a condition var
+ * to do it in another thread. Either way, we need to
+ * broadcast on gc_pending_cond afterward.
+ */
+#ifdef SMP
+ IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self()));
+#endif
+ GarbageCollect(GetRoots);
+ ready_to_gc = rtsFalse;
+#ifdef SMP
+ pthread_cond_broadcast(&gc_pending_cond);
+#endif
+ }
+
+ /* Go through the list of main threads and wake up any
+ * clients whose computations have finished. ToDo: this
+ * should be done more efficiently without a linear scan
+ * of the main threads list, somehow...
+ */
+#ifdef SMP
+ {
+ StgMainThread *m, **prev;
+ prev = &main_threads;
+ for (m = main_threads; m != NULL; m = m->link) {
+ if (m->tso->whatNext == ThreadComplete) {
+ if (m->ret) {
+ *(m->ret) = (StgClosure *)m->tso->sp[0];
+ }
+ *prev = m->link;
+ m->stat = Success;
+ pthread_cond_broadcast(&m->wakeup);
+ }
+ if (m->tso->whatNext == ThreadKilled) {
+ *prev = m->link;
+ m->stat = Killed;
+ pthread_cond_broadcast(&m->wakeup);
+ }
+ }
+ }
+#else
+ /* If our main thread has finished or been killed, return.
+ * If we were re-entered as a result of a _ccall_gc, then
+ * pop the blocked thread off the ccalling_threads stack back
+ * into CurrentTSO.
+ */
+ {
+ StgMainThread *m = main_threads;
+ if (m->tso->whatNext == ThreadComplete
+ || m->tso->whatNext == ThreadKilled) {
+ main_threads = main_threads->link;
+ if (m->tso->whatNext == ThreadComplete) {
+ /* we finished successfully, fill in the return value */
+ if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
+ m->stat = Success;
+ return;
+ } else {
+ m->stat = Killed;
+ return;
+ }
+ }
+ }
+#endif
+
+ } /* end of while(1) */
+}
+
+/* -----------------------------------------------------------------------------
+ * Suspending & resuming Haskell threads.
+ *
+ * When making a "safe" call to C (aka _ccall_GC), the task gives back
+ * its capability before calling the C function. This allows another
+ * task to pick up the capability and carry on running Haskell
+ * threads. It also means that if the C call blocks, it won't lock
+ * the whole system.
+ *
+ * The Haskell thread making the C call is put to sleep for the
+ * duration of the call, on the susepended_ccalling_threads queue. We
+ * give out a token to the task, which it can use to resume the thread
+ * on return from the C function.
+ * -------------------------------------------------------------------------- */
+
+StgInt
+suspendThread( Capability *cap )
+{
+ nat tok;
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+#ifdef SMP
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n",
+ pthread_self(), cap->rCurrentTSO->id));
+#else
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "schedule: thread %d did a _ccall_gc\n",
+ cap->rCurrentTSO->id));
+#endif
+
+ threadPaused(cap->rCurrentTSO);
+ cap->rCurrentTSO->link = suspended_ccalling_threads;
+ suspended_ccalling_threads = cap->rCurrentTSO;
+
+ /* Use the thread ID as the token; it should be unique */
+ tok = cap->rCurrentTSO->id;
+
+#ifdef SMP
+ cap->link = free_capabilities;
+ free_capabilities = cap;
+ n_free_capabilities++;
+#endif
+
+ RELEASE_LOCK(&sched_mutex);
+ return tok;
+}
+
+Capability *
+resumeThread( StgInt tok )
+{
+ StgTSO *tso, **prev;
+ Capability *cap;
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+ prev = &suspended_ccalling_threads;
+ for (tso = suspended_ccalling_threads;
+ tso != END_TSO_QUEUE;
+ prev = &tso->link, tso = tso->link) {
+ if (tso->id == (StgThreadID)tok) {
+ *prev = tso->link;
+ break;
+ }
+ }
+ if (tso == END_TSO_QUEUE) {
+ barf("resumeThread: thread not found");
+ }
+
+#ifdef SMP
+ while (free_capabilities == NULL) {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"schedule (task %ld): waiting to resume\n",
+ pthread_self()));
+ pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ IF_DEBUG(scheduler,fprintf(stderr,
+ "schedule (task %ld): resuming thread %d\n",
+ pthread_self(), tso->id));
+ }
+ cap = free_capabilities;
+ free_capabilities = cap->link;
+ n_free_capabilities--;
+#else
+ cap = &MainRegTable;
+#endif
+
+ cap->rCurrentTSO = tso;
+
+ RELEASE_LOCK(&sched_mutex);
+ return cap;
+}
+
/* -----------------------------------------------------------------------------
* Static functions
* -------------------------------------------------------------------------- */
static void unblockThread(StgTSO *tso);
/* -----------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * -------------------------------------------------------------------------- */
+
+int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
+{
+ StgThreadID id1 = tso1->id;
+ StgThreadID id2 = tso2->id;
+
+ if (id1 < id2) return (-1);
+ if (id1 > id2) return 1;
+ return 0;
+}
+
+/* -----------------------------------------------------------------------------
Create a new thread.
The new thread starts with the given stack size. Before the
{
SET_INFO(tso,&TSO_info);
tso->whatNext = ThreadEnterGHC;
- tso->id = next_thread_id++;
- tso->blocked_on = NULL;
+
+ /* tso->id needs to be unique. For now we use a heavyweight mutex to
+ protect the increment operation on next_thread_id.
+ In future, we could use an atomic increment instead.
+ */
+
+ ACQUIRE_LOCK(&sched_mutex);
+ tso->id = next_thread_id++;
+ RELEASE_LOCK(&sched_mutex);
+
+ tso->why_blocked = NotBlocked;
tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
tso->stack_size = stack_size;
SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
tso->su = (StgUpdateFrame*)tso->sp;
- IF_DEBUG(scheduler,belch("Initialised thread %ld, stack size = %lx words\n",
+ IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words",
tso->id, tso->stack_size));
- /* Put the new thread on the head of the runnable queue.
- * The caller of createThread better push an appropriate closure
- * on this thread's stack before the scheduler is invoked.
+}
+
+
+/* -----------------------------------------------------------------------------
+ * scheduleThread()
+ *
+ * scheduleThread puts a thread on the head of the runnable queue.
+ * This will usually be done immediately after a thread is created.
+ * The caller of scheduleThread must create the thread using e.g.
+ * createThread and push an appropriate closure
+ * on this thread's stack before the scheduler is invoked.
+ * -------------------------------------------------------------------------- */
+
+void
+scheduleThread(StgTSO *tso)
+{
+ ACQUIRE_LOCK(&sched_mutex);
+
+ /* Put the new thread on the head of the runnable queue. The caller
+ * better push an appropriate closure on this thread's stack
+ * beforehand. In the SMP case, the thread may start running as
+ * soon as we release the scheduler lock below.
*/
- tso->link = run_queue_hd;
- run_queue_hd = tso;
- if (run_queue_tl == END_TSO_QUEUE) {
- run_queue_tl = tso;
- }
+ PUSH_ON_RUN_QUEUE(tso);
+ THREAD_RUNNABLE();
IF_DEBUG(scheduler,printTSO(tso));
+ RELEASE_LOCK(&sched_mutex);
}
+
+/* -----------------------------------------------------------------------------
+ * startTasks()
+ *
+ * Start up Posix threads to run each of the scheduler tasks.
+ * I believe the task ids are not needed in the system as defined.
+ * KH @ 25/10/99
+ * -------------------------------------------------------------------------- */
+
+#ifdef SMP
+static void *
+taskStart( void *arg STG_UNUSED )
+{
+ schedule();
+ return NULL;
+}
+#endif
+
/* -----------------------------------------------------------------------------
* initScheduler()
*
* Initialise the scheduler. This resets all the queues - if the
* queues contained any threads, they'll be garbage collected at the
* next pass.
+ *
+ * This now calls startTasks(), so should only be called once! KH @ 25/10/99
* -------------------------------------------------------------------------- */
+#ifdef SMP
+static void
+term_handler(int sig STG_UNUSED)
+{
+ nat i;
+ pthread_t me = pthread_self();
+
+ for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ if (task_ids[i].id == me) {
+ task_ids[i].mut_time = usertime() - task_ids[i].gc_time;
+ if (task_ids[i].mut_time < 0.0) {
+ task_ids[i].mut_time = 0.0;
+ }
+ }
+ }
+ ACQUIRE_LOCK(&term_mutex);
+ await_death--;
+ RELEASE_LOCK(&term_mutex);
+ pthread_exit(NULL);
+}
+#endif
+
void initScheduler(void)
{
run_queue_hd = END_TSO_QUEUE;
run_queue_tl = END_TSO_QUEUE;
blocked_queue_hd = END_TSO_QUEUE;
blocked_queue_tl = END_TSO_QUEUE;
- ccalling_threads = END_TSO_QUEUE;
- next_main_thread = 0;
+
+ suspended_ccalling_threads = END_TSO_QUEUE;
+
+ main_threads = NULL;
context_switch = 0;
interrupted = 0;
enteredCAFs = END_CAF_LIST;
+
+ /* Install the SIGHUP handler */
+#ifdef SMP
+ {
+ struct sigaction action,oact;
+
+ action.sa_handler = term_handler;
+ sigemptyset(&action.sa_mask);
+ action.sa_flags = 0;
+ if (sigaction(SIGTERM, &action, &oact) != 0) {
+ barf("can't install TERM handler");
+ }
+ }
+#endif
+
+#ifdef SMP
+ /* Allocate N Capabilities */
+ {
+ nat i;
+ Capability *cap, *prev;
+ cap = NULL;
+ prev = NULL;
+ for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
+ cap->link = prev;
+ prev = cap;
+ }
+ free_capabilities = cap;
+ n_free_capabilities = RtsFlags.ConcFlags.nNodes;
+ }
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n",
+ n_free_capabilities););
+#endif
}
-/* -----------------------------------------------------------------------------
- Main scheduling loop.
+#ifdef SMP
+void
+startTasks( void )
+{
+ nat i;
+ int r;
+ pthread_t tid;
+
+ /* make some space for saving all the thread ids */
+ task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info),
+ "initScheduler:task_ids");
+
+ /* and create all the threads */
+ for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ r = pthread_create(&tid,NULL,taskStart,NULL);
+ if (r != 0) {
+ barf("startTasks: Can't create new Posix thread");
+ }
+ task_ids[i].id = tid;
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule: Started task: %ld\n",tid););
+ }
+}
+#endif
- We use round-robin scheduling, each thread returning to the
- scheduler loop when one of these conditions is detected:
+void
+exitScheduler( void )
+{
+#ifdef SMP
+ nat i;
- * stack overflow
- * out of heap space
- * timer expires (thread yields)
- * thread blocks
- * thread ends
+ /* Don't want to use pthread_cancel, since we'd have to install
+ * these silly exception handlers (pthread_cleanup_{push,pop}) around
+ * all our locks.
+ */
+#if 0
+ /* Cancel all our tasks */
+ for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ pthread_cancel(task_ids[i].id);
+ }
+
+ /* Wait for all the tasks to terminate */
+ for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n",
+ task_ids[i].id));
+ pthread_join(task_ids[i].id, NULL);
+ }
+#endif
+
+ /* Send 'em all a SIGHUP. That should shut 'em up.
+ */
+ await_death = RtsFlags.ConcFlags.nNodes;
+ for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) {
+ pthread_kill(task_ids[i].id,SIGTERM);
+ }
+ while (await_death > 0) {
+ sched_yield();
+ }
+#endif
+}
+
+/* -----------------------------------------------------------------------------
+ Managing the per-task allocation areas.
+
+ Each capability comes with an allocation area. These are
+ fixed-length block lists into which allocation can be done.
+
+ ToDo: no support for two-space collection at the moment???
-------------------------------------------------------------------------- */
+/* -----------------------------------------------------------------------------
+ * waitThread is the external interface for running a new computataion
+ * and waiting for the result.
+ *
+ * In the non-SMP case, we create a new main thread, push it on the
+ * main-thread stack, and invoke the scheduler to run it. The
+ * scheduler will return when the top main thread on the stack has
+ * completed or died, and fill in the necessary fields of the
+ * main_thread structure.
+ *
+ * In the SMP case, we create a main thread as before, but we then
+ * create a new condition variable and sleep on it. When our new
+ * main thread has completed, we'll be woken up and the status/result
+ * will be in the main_thread struct.
+ * -------------------------------------------------------------------------- */
+
+SchedulerStatus
+waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+{
+ StgMainThread *m;
+ SchedulerStatus stat;
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+ m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
+
+ m->tso = tso;
+ m->ret = ret;
+ m->stat = NoStatus;
+#ifdef SMP
+ pthread_cond_init(&m->wakeup, NULL);
+#endif
+
+ m->link = main_threads;
+ main_threads = m;
+
+#ifdef SMP
+ pthread_cond_wait(&m->wakeup, &sched_mutex);
+#else
+ schedule();
+#endif
+
+ stat = m->stat;
+ ASSERT(stat != NoStatus);
+
+#ifdef SMP
+ pthread_cond_destroy(&m->wakeup);
+#endif
+ free(m);
+
+ RELEASE_LOCK(&sched_mutex);
+ return stat;
+}
+
+
+#if 0
SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val)
{
StgTSO *t;
/* Take a thread from the run queue.
*/
- t = run_queue_hd;
- if (t != END_TSO_QUEUE) {
- run_queue_hd = t->link;
- t->link = END_TSO_QUEUE;
- if (run_queue_hd == END_TSO_QUEUE) {
- run_queue_tl = END_TSO_QUEUE;
- }
- }
+ t = POP_RUN_QUEUE();
while (t != END_TSO_QUEUE) {
CurrentTSO = t;
/* If we have more threads on the run queue, set up a context
* switch at some point in the future.
*/
- if (run_queue_hd != END_TSO_QUEUE) {
+ if (run_queue_hd != END_TSO_QUEUE || blocked_queue_hd != END_TSO_QUEUE) {
context_switch = 1;
} else {
context_switch = 0;
/* Put the thread back on the run queue, at the end.
* t->link is already set to END_TSO_QUEUE.
*/
- ASSERT(t->link == END_TSO_QUEUE);
- if (run_queue_tl == END_TSO_QUEUE) {
- run_queue_hd = run_queue_tl = t;
- } else {
- ASSERT(get_itbl(run_queue_tl)->type == TSO);
- if (run_queue_hd == run_queue_tl) {
- run_queue_hd->link = t;
- run_queue_tl = t;
- } else {
- run_queue_tl->link = t;
- run_queue_tl = t;
- }
- }
+ APPEND_TO_RUN_QUEUE(t);
break;
case ThreadBlocked:
- IF_DEBUG(scheduler,belch("Thread %ld stopped, blocking\n", t->id));
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "Thread %d stopped, ", t->id);
+ printThreadBlockage(t);
+ fprintf(stderr, "\n"));
threadPaused(t);
/* assume the thread has put itself on some blocked queue
* somewhere.
break;
case ThreadFinished:
- IF_DEBUG(scheduler,belch("Thread %ld finished\n", t->id));
+ IF_DEBUG(scheduler,fprintf(stderr,"thread %ld finished\n", t->id));
t->whatNext = ThreadComplete;
break;
}
next_thread:
- t = run_queue_hd;
- if (t != END_TSO_QUEUE) {
- run_queue_hd = t->link;
- t->link = END_TSO_QUEUE;
- if (run_queue_hd == END_TSO_QUEUE) {
- run_queue_tl = END_TSO_QUEUE;
- }
+ /* Checked whether any waiting threads need to be woken up.
+ * If the run queue is empty, we can wait indefinitely for
+ * something to happen.
+ */
+ if (blocked_queue_hd != END_TSO_QUEUE) {
+ awaitEvent(run_queue_hd == END_TSO_QUEUE);
}
+
+ t = POP_RUN_QUEUE();
}
- if (blocked_queue_hd != END_TSO_QUEUE) {
- return AllBlocked;
- } else {
- return Deadlock;
+ /* If we got to here, then we ran out of threads to run, but the
+ * main thread hasn't finished yet. It must be blocked on an MVar
+ * or a black hole somewhere, so we return deadlock.
+ */
+ return Deadlock;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
+ Debugging: why is a thread blocked
+ -------------------------------------------------------------------------- */
+
+#ifdef DEBUG
+void printThreadBlockage(StgTSO *tso)
+{
+ switch (tso->why_blocked) {
+ case BlockedOnRead:
+ fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
+ break;
+ case BlockedOnWrite:
+ fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
+ break;
+ case BlockedOnDelay:
+ fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
+ break;
+ case BlockedOnMVar:
+ fprintf(stderr,"blocked on an MVar");
+ break;
+ case BlockedOnBlackHole:
+ fprintf(stderr,"blocked on a black hole");
+ break;
+ case NotBlocked:
+ fprintf(stderr,"not blocked");
+ break;
}
}
+#endif
/* -----------------------------------------------------------------------------
Where are the roots that we know about?
-------------------------------------------------------------------------- */
+/* This has to be protected either by the scheduler monitor, or by the
+ garbage collection monitor (probably the latter).
+ KH @ 25/10/99
+*/
+
static void GetRoots(void)
{
- nat i;
+ StgMainThread *m;
run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
- ccalling_threads = (StgTSO *)MarkRoot((StgClosure *)ccalling_threads);
-
- for (i = 0; i < next_main_thread; i++) {
- main_threads[i] = (StgTSO *)MarkRoot((StgClosure *)main_threads[i]);
+ for (m = main_threads; m != NULL; m = m->link) {
+ m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
}
+ suspended_ccalling_threads =
+ (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
}
/* -----------------------------------------------------------------------------
It might be useful to provide an interface whereby the programmer
can specify more roots (ToDo).
+
+ This needs to be protected by the GC condition variable above. KH.
-------------------------------------------------------------------------- */
void (*extra_roots)(void);
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
- IF_DEBUG(scheduler, fprintf(stderr,"increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
+ IF_DEBUG(scheduler, fprintf(stderr,"schedule: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
tso->whatNext = ThreadKilled;
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->su = (StgUpdateFrame *)tso->sp;
- tso->blocked_on = NULL;
+ tso->why_blocked = NotBlocked;
dest->mut_link = NULL;
IF_DEBUG(sanity,checkTSO(tso));
#if 0
IF_DEBUG(scheduler,printTSO(dest));
#endif
+
+#if 0
+ /* This will no longer work: KH */
if (tso == MainTSO) { /* hack */
MainTSO = dest;
}
+#endif
return dest;
}
/* -----------------------------------------------------------------------------
- Wake up a queue that was blocked on some resource (usually a
- computation in progress).
+ Wake up a queue that was blocked on some resource.
-------------------------------------------------------------------------- */
-void awaken_blocked_queue(StgTSO *q)
+static StgTSO *
+unblockOneLocked(StgTSO *tso)
{
- StgTSO *tso;
+ StgTSO *next;
- while (q != END_TSO_QUEUE) {
- ASSERT(get_itbl(q)->type == TSO);
- tso = q;
- q = tso->link;
- PUSH_ON_RUN_QUEUE(tso);
- tso->blocked_on = NULL;
- IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id));
+ ASSERT(get_itbl(tso)->type == TSO);
+ ASSERT(tso->why_blocked != NotBlocked);
+ tso->why_blocked = NotBlocked;
+ next = tso->link;
+ PUSH_ON_RUN_QUEUE(tso);
+ THREAD_RUNNABLE();
+#ifdef SMP
+ IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld",
+ pthread_self(), tso->id));
+#else
+ IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id));
+#endif
+ return next;
+}
+
+inline StgTSO *
+unblockOne(StgTSO *tso)
+{
+ ACQUIRE_LOCK(&sched_mutex);
+ tso = unblockOneLocked(tso);
+ RELEASE_LOCK(&sched_mutex);
+ return tso;
+}
+
+void
+awakenBlockedQueue(StgTSO *tso)
+{
+ ACQUIRE_LOCK(&sched_mutex);
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOneLocked(tso);
}
+ RELEASE_LOCK(&sched_mutex);
}
/* -----------------------------------------------------------------------------
{
StgTSO *t, **last;
- if (tso->blocked_on == NULL) {
- return; /* not blocked */
- }
+ ACQUIRE_LOCK(&sched_mutex);
+ switch (tso->why_blocked) {
- switch (get_itbl(tso->blocked_on)->type) {
+ case NotBlocked:
+ return; /* not blocked */
- case MVAR:
+ case BlockedOnMVar:
+ ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
{
StgTSO *last_tso = END_TSO_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->blocked_on);
+ StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
last = &mvar->head;
for (t = mvar->head; t != END_TSO_QUEUE;
barf("unblockThread (MVAR): TSO not found");
}
- case BLACKHOLE_BQ:
+ case BlockedOnBlackHole:
+ ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
{
- StgBlockingQueue *bq = (StgBlockingQueue *)(tso->blocked_on);
+ StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
last = &bq->blocking_queue;
for (t = bq->blocking_queue; t != END_TSO_QUEUE;
barf("unblockThread (BLACKHOLE): TSO not found");
}
+ case BlockedOnDelay:
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ {
+ last = &blocked_queue_hd;
+ for (t = blocked_queue_hd; t != END_TSO_QUEUE;
+ last = &t->link, t = t->link) {
+ if (t == tso) {
+ *last = tso->link;
+ if (blocked_queue_tl == t) {
+ blocked_queue_tl = tso->link;
+ }
+ goto done;
+ }
+ }
+ barf("unblockThread (I/O): TSO not found");
+ }
+
default:
barf("unblockThread");
}
done:
tso->link = END_TSO_QUEUE;
- tso->blocked_on = NULL;
+ tso->why_blocked = NotBlocked;
+ tso->block_info.closure = NULL;
PUSH_ON_RUN_QUEUE(tso);
+ RELEASE_LOCK(&sched_mutex);
}
/* -----------------------------------------------------------------------------
return;
}
- IF_DEBUG(scheduler, belch("Raising exception in thread %ld.", tso->id));
+ IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id));
/* Remove it from any blocking queues */
unblockThread(tso);
TICK_ALLOC_UP_THK(words+1,0);
IF_DEBUG(scheduler,
- fprintf(stderr, "Updating ");
+ fprintf(stderr, "schedule: Updating ");
printPtr((P_)su->updatee);
fprintf(stderr, " with ");
printObj((StgClosure *)ap);
o->payload[1] = cf->handler;
IF_DEBUG(scheduler,
- fprintf(stderr, "Built ");
+ fprintf(stderr, "schedule: Built ");
printObj((StgClosure *)o);
);
payloadCPtr(o,0) = (StgClosure *)ap;
IF_DEBUG(scheduler,
- fprintf(stderr, "Built ");
+ fprintf(stderr, "schedule: Built ");
printObj((StgClosure *)o);
);
}
barf("raiseAsync");
}
+