/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.106 2001/11/08 16:17:35 simonmar Exp $
+ * $Id: Schedule.c,v 1.114 2002/01/31 11:18:07 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
*
* WAY Name CPP flag What's it for
* --------------------------------------
- * mp GUM PAR Parallel execution on a distributed memory machine
- * s SMP SMP Parallel execution on a shared memory machine
- * mg GranSim GRAN Simulation of parallel execution
- * md GUM/GdH DIST Distributed execution (based on GUM)
+ * mp GUM PAR Parallel execution on a distributed memory machine
+ * s SMP SMP Parallel execution on a shared memory machine
+ * mg GranSim GRAN Simulation of parallel execution
+ * md GUM/GdH DIST Distributed execution (based on GUM)
+ *
* --------------------------------------------------------------------------*/
//@node Main scheduling code, , ,
#include "Stats.h"
#include "Itimer.h"
#include "Prelude.h"
+#ifdef PROFILING
+#include "Proftimer.h"
+#include "ProfHeap.h"
+#endif
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "HLC.h"
#endif
#include "Sparks.h"
+#include "Capability.h"
+#include "OSThreads.h"
#include <stdarg.h>
*
* These are the threads which clients have requested that we run.
*
- * In an SMP build, we might have several concurrent clients all
+ * In a 'threaded' build, we might have several concurrent clients all
* waiting for results, and each one will wait on a condition variable
* until the result is available.
*
StgTSO * tso;
SchedulerStatus stat;
StgClosure ** ret;
-#ifdef SMP
- pthread_cond_t wakeup;
+#if defined(RTS_SUPPORTS_THREADS)
+ CondVar wakeup;
#endif
struct StgMainThread_ *link;
} StgMainThread;
*/
static StgTSO *suspended_ccalling_threads;
-static void GetRoots(evac_fn);
static StgTSO *threadStackOverflow(StgTSO *tso);
/* KH: The following two flags are shared memory locations. There is no need
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
-/* Free capability list.
- * Locks required: sched_mutex.
- */
-#ifdef SMP
-Capability *free_capabilities; /* Available capabilities for running threads */
-nat n_free_capabilities; /* total number of available capabilities */
-#else
+
+#if !defined(SMP)
Capability MainCapability; /* for non-SMP, we have one global capability */
#endif
/* All our current task ids, saved in case we need to kill them later.
*/
-#ifdef SMP
+#if defined(SMP)
//@cindex task_ids
task_info *task_ids;
#endif
static void sched_belch(char *s, ...);
#endif
-#ifdef SMP
-//@cindex sched_mutex
-//@cindex term_mutex
-//@cindex thread_ready_cond
-//@cindex gc_pending_cond
-pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
-pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
+#if defined(RTS_SUPPORTS_THREADS)
+/* ToDo: carefully document the invariants that go together
+ * with these synchronisation objects.
+ */
+MutexVar sched_mutex = INIT_MUTEX_VAR;
+MutexVar term_mutex = INIT_MUTEX_VAR;
+CondVar thread_ready_cond = INIT_COND_VAR;
+CondVar gc_pending_cond = INIT_COND_VAR;
nat await_death;
#endif
};
#endif
-#ifdef PAR
+#if defined(PAR)
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);
#endif
* should be done more efficiently without a linear scan
* of the main threads list, somehow...
*/
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
{
StgMainThread *m, **prev;
prev = &main_threads;
}
*prev = m->link;
m->stat = Success;
- pthread_cond_broadcast(&m->wakeup);
+ broadcastCondVar(&m->wakeup);
break;
case ThreadKilled:
if (m->ret) *(m->ret) = NULL;
} else {
m->stat = Killed;
}
- pthread_cond_broadcast(&m->wakeup);
+ broadcastCondVar(&m->wakeup);
break;
default:
break;
}
}
-#else // not SMP
+#else /* not threaded */
# if defined(PAR)
/* in GUM do this only on the Main PE */
/* Top up the run queue from our spark pool. We try to make the
* number of threads in the run queue equal to the number of
* free capabilities.
+ *
+ * Disable spark support in SMP for now, non-essential & requires
+ * a little bit of work to make it compile cleanly. -- sof 1/02.
*/
-#if defined(SMP)
+#if 0 /* defined(SMP) */
{
- nat n = n_free_capabilities;
+ nat n = getFreeCapabilities();
StgTSO *tso = run_queue_hd;
/* Count the run queue */
/* We need to wake up the other tasks if we just created some
* work for them.
*/
- if (n_free_capabilities - n > 1) {
- pthread_cond_signal(&thread_ready_cond);
+ if (getFreeCapabilities() - n > 1) {
+ signalCondVar ( &thread_ready_cond );
}
}
#endif // SMP
if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
awaitEvent(
(run_queue_hd == END_TSO_QUEUE)
-#ifdef SMP
- && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
+#if defined(SMP)
+ && allFreeCapabilities()
#endif
);
}
if (blocked_queue_hd == END_TSO_QUEUE
&& run_queue_hd == END_TSO_QUEUE
&& sleeping_queue == END_TSO_QUEUE
-#ifdef SMP
- && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
+#if defined(SMP)
+ && allFreeCapabilities()
#endif
)
{
if (blocked_queue_hd == END_TSO_QUEUE
&& run_queue_hd == END_TSO_QUEUE
&& sleeping_queue == END_TSO_QUEUE) {
+
IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
detectBlackHoles();
+
+ /* No black holes, so probably a real deadlock. Send the
+ * current main thread the Deadlock exception (or in the SMP
+ * build, send *all* main threads the deadlock exception,
+ * since none of them can make progress).
+ */
if (run_queue_hd == END_TSO_QUEUE) {
- StgMainThread *m = main_threads;
-#ifdef SMP
- for (; m != NULL; m = m->link) {
- deleteThread(m->tso);
- m->ret = NULL;
- m->stat = Deadlock;
- pthread_cond_broadcast(&m->wakeup);
+ StgMainThread *m;
+#if defined(RTS_SUPPORTS_THREADS)
+ for (m = main_threads; m != NULL; m = m->link) {
+ switch (m->tso->why_blocked) {
+ case BlockedOnBlackHole:
+ raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
+ break;
+ case BlockedOnException:
+ case BlockedOnMVar:
+ raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
+ break;
+ default:
+ barf("deadlock: main thread blocked in a strange way");
+ }
}
- main_threads = NULL;
#else
- deleteThread(m->tso);
- m->ret = NULL;
- m->stat = Deadlock;
- main_threads = m->link;
- return;
+ m = main_threads;
+ switch (m->tso->why_blocked) {
+ case BlockedOnBlackHole:
+ raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
+ break;
+ case BlockedOnException:
+ case BlockedOnMVar:
+ raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
+ break;
+ default:
+ barf("deadlock: main thread blocked in a strange way");
+ }
#endif
}
+#if !defined(RTS_SUPPORTS_THREADS)
+ ASSERT( run_queue_hd != END_TSO_QUEUE );
+#endif
}
}
#elif defined(PAR)
/* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
#endif
-#ifdef SMP
+#if defined(SMP)
/* If there's a GC pending, don't do anything until it has
* completed.
*/
if (ready_to_gc) {
IF_DEBUG(scheduler,sched_belch("waiting for GC"));
- pthread_cond_wait(&gc_pending_cond, &sched_mutex);
+ waitCondVar ( &gc_pending_cond, &sched_mutex );
}
-
+#endif
+
+#if defined(RTS_SUPPORTS_THREADS)
/* block until we've got a thread on the run queue and a free
* capability.
*/
- while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
+ while ( run_queue_hd == END_TSO_QUEUE
+#if defined(SMP)
+ || noFreeCapabilities()
+#endif
+ ) {
IF_DEBUG(scheduler, sched_belch("waiting for work"));
- pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ waitCondVar ( &thread_ready_cond, &sched_mutex );
IF_DEBUG(scheduler, sched_belch("work now available"));
}
#endif
#endif
- /* grab a capability
- */
#ifdef SMP
- cap = free_capabilities;
- free_capabilities = cap->link;
- n_free_capabilities--;
+ grabCapability(&cap);
#else
cap = &MainCapability;
#endif
* the user specified "context switch as often as possible", with
* +RTS -C0
*/
- if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
- && (run_queue_hd != END_TSO_QUEUE
- || blocked_queue_hd != END_TSO_QUEUE
- || sleeping_queue != END_TSO_QUEUE))
+ if (
+#ifdef PROFILING
+ RtsFlags.ProfFlags.profileInterval == 0 ||
+#endif
+ (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
+ && (run_queue_hd != END_TSO_QUEUE
+ || blocked_queue_hd != END_TSO_QUEUE
+ || sleeping_queue != END_TSO_QUEUE)))
context_switch = 1;
else
context_switch = 0;
IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
t->id, t, whatNext_strs[t->what_next]));
+#ifdef PROFILING
+ startHeapProfTimer();
+#endif
+
/* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
/* Run the current thread
*/
/* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
+ stopHeapProfTimer();
CCCS = CCS_SYSTEM;
#endif
ACQUIRE_LOCK(&sched_mutex);
#ifdef SMP
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
#endif
}
#ifdef SMP
- cap->link = free_capabilities;
- free_capabilities = cap;
- n_free_capabilities++;
+ grabCapability(&cap);
+#endif
+
+#ifdef PROFILING
+ if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
+ GarbageCollect(GetRoots, rtsTrue);
+ heapCensus();
+ performHeapProfile = rtsFalse;
+ ready_to_gc = rtsFalse; // we already GC'd
+ }
#endif
#ifdef SMP
- if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
+ if (ready_to_gc && allFreeCapabilities() )
#else
if (ready_to_gc)
#endif
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots,rtsFalse);
ready_to_gc = rtsFalse;
#ifdef SMP
- pthread_cond_broadcast(&gc_pending_cond);
+ broadcastCondVar(&gc_pending_cond);
#endif
#if defined(GRAN)
/* add a ContinueThread event to continue execution of current thread */
tok = cap->r.rCurrentTSO->id;
#ifdef SMP
- cap->link = free_capabilities;
- free_capabilities = cap;
- n_free_capabilities++;
+ /* Hand back capability */
+ releaseCapability(&cap);
#endif
RELEASE_LOCK(&sched_mutex);
tso->link = END_TSO_QUEUE;
#ifdef SMP
- while (free_capabilities == NULL) {
+ while ( noFreeCapabilities() ) {
IF_DEBUG(scheduler, sched_belch("waiting to resume"));
- pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ waitCondVar(&thread_ready_cond, &sched_mutex);
IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
}
- cap = free_capabilities;
- free_capabilities = cap->link;
- n_free_capabilities--;
+ grabCapability(&cap);
#else
cap = &MainCapability;
#endif
stack_size = size - TSO_STRUCT_SIZEW;
tso = (StgTSO *)allocate(size);
- TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
+ TICK_ALLOC_TSO(stack_size, 0);
SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
#if defined(GRAN)
void
scheduleThread(StgTSO *tso)
{
- if (tso==END_TSO_QUEUE){
- schedule();
- return;
- }
-
ACQUIRE_LOCK(&sched_mutex);
/* Put the new thread on the head of the runnable queue. The caller
void
taskStart(void) /* ( void *arg STG_UNUSED) */
{
- scheduleThread(END_TSO_QUEUE);
+ schedule();
}
#endif
ACQUIRE_LOCK(&term_mutex);
await_death--;
RELEASE_LOCK(&term_mutex);
- pthread_exit(NULL);
+ shutdownThread();
}
#endif
-static void
-initCapability( Capability *cap )
-{
- cap->f.stgChk0 = (F_)__stg_chk_0;
- cap->f.stgChk1 = (F_)__stg_chk_1;
- cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
- cap->f.stgUpdatePAP = (F_)__stg_update_PAP;
-}
-
void
initScheduler(void)
{
#ifdef SMP
/* Allocate N Capabilities */
- {
- nat i;
- Capability *cap, *prev;
- cap = NULL;
- prev = NULL;
- for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
- cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
- initCapability(cap);
- cap->link = prev;
- prev = cap;
- }
- free_capabilities = cap;
- n_free_capabilities = RtsFlags.ParFlags.nNodes;
- }
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
- n_free_capabilities););
+ initCapabilities(RtsFlags.ParFlags.nNodes);
#else
initCapability(&MainCapability);
#endif
-#if defined(SMP) || defined(PAR)
+#if /* defined(SMP) ||*/ defined(PAR)
initSparkPools();
#endif
}
{
nat i;
int r;
- pthread_t tid;
+ OSThreadId tid;
/* make some space for saving all the thread ids */
task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
/* and create all the threads */
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
- r = pthread_create(&tid,NULL,taskStart,NULL);
+ r = createOSThread(&tid,taskStart);
if (r != 0) {
barf("startTasks: Can't create new Posix thread");
}
task_ids[i].mut_etime = 0.0;
task_ids[i].gc_time = 0.0;
task_ids[i].gc_etime = 0.0;
- task_ids[i].elapsedtimestart = elapsedtime();
+ task_ids[i].elapsedtimestart = stat_getElapsedTime();
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
}
}
m->tso = tso;
m->ret = ret;
m->stat = NoStatus;
-#ifdef SMP
- pthread_cond_init(&m->wakeup, NULL);
+#if defined(RTS_SUPPORTS_THREADS)
+ initCondVar(&m->wakeup);
#endif
m->link = main_threads;
#ifdef SMP
do {
- pthread_cond_wait(&m->wakeup, &sched_mutex);
+ waitCondVar(&m->wakeup, &sched_mutex);
} while (m->stat == NoStatus);
#elif defined(GRAN)
/* GranSim specific init */
stat = m->stat;
-#ifdef SMP
- pthread_cond_destroy(&m->wakeup);
+#if defined(RTS_SUPPORTS_THREADS)
+ closeCondVar(&m->wakeup);
#endif
IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
KH @ 25/10/99
*/
-static void
+void
GetRoots(evac_fn evac)
{
StgMainThread *m;
evac((StgClosure **)&suspended_ccalling_threads);
}
-#if defined(SMP) || defined(PAR) || defined(GRAN)
+#if defined(PAR) || defined(GRAN)
markSparkQueue(evac);
#endif
}
IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
- TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
+ TICK_ALLOC_TSO(new_stack_size,0);
/* copy the TSO block and the old stack into the new area */
memcpy(dest,tso,TSO_STRUCT_SIZE);
va_list ap;
va_start(ap,s);
#ifdef SMP
- fprintf(stderr, "scheduler (task %ld): ", pthread_self());
+ fprintf(stderr, "scheduler (task %ld): ", osThreadId());
#elif defined(PAR)
fprintf(stderr, "== ");
#else
//* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
//* context_switch:: @cindex\s-+context_switch
//* createThread:: @cindex\s-+createThread
-//* free_capabilities:: @cindex\s-+free_capabilities
//* gc_pending_cond:: @cindex\s-+gc_pending_cond
//* initScheduler:: @cindex\s-+initScheduler
//* interrupted:: @cindex\s-+interrupted
-//* n_free_capabilities:: @cindex\s-+n_free_capabilities
//* next_thread_id:: @cindex\s-+next_thread_id
//* print_bq:: @cindex\s-+print_bq
//* run_queue_hd:: @cindex\s-+run_queue_hd