--- /dev/null
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2001
+ *
+ * Accessing OS threads functionality in a (mostly) OS-independent
+ * manner.
+ *
+ *
+ * --------------------------------------------------------------------------*/
+#include "Rts.h"
+#if defined(RTS_SUPPORTS_THREADS)
+#include "OSThreads.h"
+
+
+#if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS)
+/*
+ * This (allegedly) OS threads independent layer was initially
+ * abstracted away from code that used Pthreads, so the functions
+ * provided here are mostly just wrappers to the Pthreads API.
+ *
+ */
+
+void initCondVar( CondVar* pCond )
+{
+ pthread_cond_init(pCond, NULL);
+ return;
+}
+
+void closeCondVar( CondVar* pCond )
+{
+ pthread_cond_destroy(pCond);
+ return;
+}
+
+rtsBool
+broadcastCondVar ( CondVar* pCond )
+{
+ return (pthread_cond_broadcast(pCond) == 0);
+}
+
+rtsBool
+signalCondVar ( CondVar* pCond )
+{
+ return (pthread_cond_signal(pCond) == 0);
+}
+
+rtsBool
+waitCondVar ( CondVar* pCond, MutexVar* pMut )
+{
+ return (pthread_cond_wait(pCond,pMut) == 0);
+}
+
+void shutdownThread()
+{
+ pthread_exit(NULL);
+}
+
+int createOSThread ( OSThreadId* pId, void *(*startProc)(void*))
+{
+ return pthread_create(pId, NULL, startProc, NULL);
+}
+
+OSThreadId osThreadId()
+{
+ return pthread_self();
+}
+
+void initMutexVar (MutexVar* pMut)
+{
+ pthread_mutex_init(pMut,NULL);
+ return;
+}
+
+#elif defined(HAVE_WINDOWS_H)
+/* Win32 threads and synchronisation objects */
+
+/* A CondVar is represented by a Win32 Event object,
+ * a MutexVar by a Mutex kernel object.
+ */
+
+void initCondVar( CondVar* pCond )
+{
+ HANDLE h = CreateEvent(NULL,
+ TRUE, /* manual reset */
+ TRUE, /* initially signalled */
+ NULL); /* unnamed => process-local. */
+
+
+
+ pthread_cond_init(pCond, NULL);
+ return;
+}
+
+void closeCondVar( CondVar* pCond )
+{
+ pthread_cond_destroy(pCond);
+ return;
+}
+
+rtsBool
+broadcastCondVar ( CondVar* pCond )
+{
+ return (pthread_cond_broadcast(pCond) == 0);
+}
+
+rtsBool
+signalCondVar ( CondVar* pCond )
+{
+ return (pthread_cond_signal(pCond) == 0);
+}
+
+rtsBool
+waitCondVar ( CondVar* pCond, MutexVar* pMut )
+{
+ return (pthread_cond_wait(pCond,pMut) == 0);
+}
+
+void shutdownThread()
+{
+ pthread_exit(NULL);
+}
+
+int createOSThread ( OSThreadId* pId, void *(*startProc)(void*))
+{
+ return pthread_create(pId, NULL, startProc, NULL);
+}
+
+OSThreadId osThreadId()
+{
+ return pthread_self();
+}
+
+void initMutexVar (MutexVar* pMut)
+{
+ pthread_mutex_init(pMut);
+ return;
+}
+
+#endif /* defined(HAVE_PTHREAD_H) */
+
+#endif /* defined(RTS_SUPPORTS_THREADS) */
/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.113 2002/01/24 07:50:02 sof Exp $
+ * $Id: Schedule.c,v 1.114 2002/01/31 11:18:07 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
*
* WAY Name CPP flag What's it for
* --------------------------------------
- * mp GUM PAR Parallel execution on a distributed memory machine
- * s SMP SMP Parallel execution on a shared memory machine
- * mg GranSim GRAN Simulation of parallel execution
- * md GUM/GdH DIST Distributed execution (based on GUM)
+ * mp GUM PAR Parallel execution on a distributed memory machine
+ * s SMP SMP Parallel execution on a shared memory machine
+ * mg GranSim GRAN Simulation of parallel execution
+ * md GUM/GdH DIST Distributed execution (based on GUM)
+ *
* --------------------------------------------------------------------------*/
//@node Main scheduling code, , ,
# include "HLC.h"
#endif
#include "Sparks.h"
+#include "Capability.h"
+#include "OSThreads.h"
#include <stdarg.h>
*
* These are the threads which clients have requested that we run.
*
- * In an SMP build, we might have several concurrent clients all
+ * In a 'threaded' build, we might have several concurrent clients all
* waiting for results, and each one will wait on a condition variable
* until the result is available.
*
StgTSO * tso;
SchedulerStatus stat;
StgClosure ** ret;
-#ifdef SMP
- pthread_cond_t wakeup;
+#if defined(RTS_SUPPORTS_THREADS)
+ CondVar wakeup;
#endif
struct StgMainThread_ *link;
} StgMainThread;
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
-/* Free capability list.
- * Locks required: sched_mutex.
- */
-#ifdef SMP
-Capability *free_capabilities; /* Available capabilities for running threads */
-nat n_free_capabilities; /* total number of available capabilities */
-#else
+
+#if !defined(SMP)
Capability MainCapability; /* for non-SMP, we have one global capability */
#endif
/* All our current task ids, saved in case we need to kill them later.
*/
-#ifdef SMP
+#if defined(SMP)
//@cindex task_ids
task_info *task_ids;
#endif
static void sched_belch(char *s, ...);
#endif
-#ifdef SMP
-//@cindex sched_mutex
-//@cindex term_mutex
-//@cindex thread_ready_cond
-//@cindex gc_pending_cond
-pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
-pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
+#if defined(RTS_SUPPORTS_THREADS)
+/* ToDo: carefully document the invariants that go together
+ * with these synchronisation objects.
+ */
+MutexVar sched_mutex = INIT_MUTEX_VAR;
+MutexVar term_mutex = INIT_MUTEX_VAR;
+CondVar thread_ready_cond = INIT_COND_VAR;
+CondVar gc_pending_cond = INIT_COND_VAR;
nat await_death;
#endif
* should be done more efficiently without a linear scan
* of the main threads list, somehow...
*/
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
{
StgMainThread *m, **prev;
prev = &main_threads;
}
*prev = m->link;
m->stat = Success;
- pthread_cond_broadcast(&m->wakeup);
+ broadcastCondVar(&m->wakeup);
break;
case ThreadKilled:
if (m->ret) *(m->ret) = NULL;
} else {
m->stat = Killed;
}
- pthread_cond_broadcast(&m->wakeup);
+ broadcastCondVar(&m->wakeup);
break;
default:
break;
}
}
-#else // not SMP
+#else /* not threaded */
# if defined(PAR)
/* in GUM do this only on the Main PE */
*/
#if 0 /* defined(SMP) */
{
- nat n = n_free_capabilities;
+ nat n = getFreeCapabilities();
StgTSO *tso = run_queue_hd;
/* Count the run queue */
/* We need to wake up the other tasks if we just created some
* work for them.
*/
- if (n_free_capabilities - n > 1) {
- pthread_cond_signal(&thread_ready_cond);
+ if (getFreeCapabilities() - n > 1) {
+ signalCondVar ( &thread_ready_cond );
}
}
#endif // SMP
if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
awaitEvent(
(run_queue_hd == END_TSO_QUEUE)
-#ifdef SMP
- && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
+#if defined(SMP)
+ && allFreeCapabilities()
#endif
);
}
if (blocked_queue_hd == END_TSO_QUEUE
&& run_queue_hd == END_TSO_QUEUE
&& sleeping_queue == END_TSO_QUEUE
-#ifdef SMP
- && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
+#if defined(SMP)
+ && allFreeCapabilities()
#endif
)
{
IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
detectBlackHoles();
- // No black holes, so probably a real deadlock. Send the
- // current main thread the Deadlock exception (or in the SMP
- // build, send *all* main threads the deadlock exception,
- // since none of them can make progress).
+ /* No black holes, so probably a real deadlock. Send the
+ * current main thread the Deadlock exception (or in the SMP
+ * build, send *all* main threads the deadlock exception,
+ * since none of them can make progress).
+ */
if (run_queue_hd == END_TSO_QUEUE) {
StgMainThread *m;
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
for (m = main_threads; m != NULL; m = m->link) {
switch (m->tso->why_blocked) {
case BlockedOnBlackHole:
}
#endif
}
+#if !defined(RTS_SUPPORTS_THREADS)
ASSERT( run_queue_hd != END_TSO_QUEUE );
+#endif
}
}
#elif defined(PAR)
/* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
#endif
-#ifdef SMP
+#if defined(SMP)
/* If there's a GC pending, don't do anything until it has
* completed.
*/
if (ready_to_gc) {
IF_DEBUG(scheduler,sched_belch("waiting for GC"));
- pthread_cond_wait(&gc_pending_cond, &sched_mutex);
+ waitCondVar ( &gc_pending_cond, &sched_mutex );
}
-
+#endif
+
+#if defined(RTS_SUPPORTS_THREADS)
/* block until we've got a thread on the run queue and a free
* capability.
*/
- while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
+ while ( run_queue_hd == END_TSO_QUEUE
+#if defined(SMP)
+ || noFreeCapabilities()
+#endif
+ ) {
IF_DEBUG(scheduler, sched_belch("waiting for work"));
- pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ waitCondVar ( &thread_ready_cond, &sched_mutex );
IF_DEBUG(scheduler, sched_belch("work now available"));
}
#endif
#endif
- /* grab a capability
- */
#ifdef SMP
- cap = free_capabilities;
- free_capabilities = cap->link;
- n_free_capabilities--;
+ grabCapability(&cap);
#else
cap = &MainCapability;
#endif
ACQUIRE_LOCK(&sched_mutex);
#ifdef SMP
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
#endif
}
#ifdef SMP
- cap->link = free_capabilities;
- free_capabilities = cap;
- n_free_capabilities++;
+ grabCapability(&cap);
#endif
#ifdef PROFILING
#endif
#ifdef SMP
- if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
+ if (ready_to_gc && allFreeCapabilities() )
#else
if (ready_to_gc)
#endif
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
-#ifdef SMP
+#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots,rtsFalse);
ready_to_gc = rtsFalse;
#ifdef SMP
- pthread_cond_broadcast(&gc_pending_cond);
+ broadcastCondVar(&gc_pending_cond);
#endif
#if defined(GRAN)
/* add a ContinueThread event to continue execution of current thread */
tok = cap->r.rCurrentTSO->id;
#ifdef SMP
- cap->link = free_capabilities;
- free_capabilities = cap;
- n_free_capabilities++;
+ /* Hand back capability */
+ releaseCapability(&cap);
#endif
RELEASE_LOCK(&sched_mutex);
tso->link = END_TSO_QUEUE;
#ifdef SMP
- while (free_capabilities == NULL) {
+ while ( noFreeCapabilities() ) {
IF_DEBUG(scheduler, sched_belch("waiting to resume"));
- pthread_cond_wait(&thread_ready_cond, &sched_mutex);
+ waitCondVar(&thread_ready_cond, &sched_mutex);
IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
}
- cap = free_capabilities;
- free_capabilities = cap->link;
- n_free_capabilities--;
+ grabCapability(&cap);
#else
cap = &MainCapability;
#endif
ACQUIRE_LOCK(&term_mutex);
await_death--;
RELEASE_LOCK(&term_mutex);
- pthread_exit(NULL);
+ shutdownThread();
}
#endif
-static void
-initCapability( Capability *cap )
-{
- cap->f.stgChk0 = (F_)__stg_chk_0;
- cap->f.stgChk1 = (F_)__stg_chk_1;
- cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
- cap->f.stgUpdatePAP = (F_)__stg_update_PAP;
-}
-
void
initScheduler(void)
{
#ifdef SMP
/* Allocate N Capabilities */
- {
- nat i;
- Capability *cap, *prev;
- cap = NULL;
- prev = NULL;
- for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
- cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
- initCapability(cap);
- cap->link = prev;
- prev = cap;
- }
- free_capabilities = cap;
- n_free_capabilities = RtsFlags.ParFlags.nNodes;
- }
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
- n_free_capabilities););
+ initCapabilities(RtsFlags.ParFlags.nNodes);
#else
initCapability(&MainCapability);
#endif
{
nat i;
int r;
- pthread_t tid;
+ OSThreadId tid;
/* make some space for saving all the thread ids */
task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
/* and create all the threads */
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
- r = pthread_create(&tid,NULL,taskStart,NULL);
+ r = createOSThread(&tid,taskStart);
if (r != 0) {
barf("startTasks: Can't create new Posix thread");
}
m->tso = tso;
m->ret = ret;
m->stat = NoStatus;
-#ifdef SMP
- pthread_cond_init(&m->wakeup, NULL);
+#if defined(RTS_SUPPORTS_THREADS)
+ initCondVar(&m->wakeup);
#endif
m->link = main_threads;
#ifdef SMP
do {
- pthread_cond_wait(&m->wakeup, &sched_mutex);
+ waitCondVar(&m->wakeup, &sched_mutex);
} while (m->stat == NoStatus);
#elif defined(GRAN)
/* GranSim specific init */
stat = m->stat;
-#ifdef SMP
- pthread_cond_destroy(&m->wakeup);
+#if defined(RTS_SUPPORTS_THREADS)
+ closeCondVar(&m->wakeup);
#endif
IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
va_list ap;
va_start(ap,s);
#ifdef SMP
- fprintf(stderr, "scheduler (task %ld): ", pthread_self());
+ fprintf(stderr, "scheduler (task %ld): ", osThreadId());
#elif defined(PAR)
fprintf(stderr, "== ");
#else
//* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
//* context_switch:: @cindex\s-+context_switch
//* createThread:: @cindex\s-+createThread
-//* free_capabilities:: @cindex\s-+free_capabilities
//* gc_pending_cond:: @cindex\s-+gc_pending_cond
//* initScheduler:: @cindex\s-+initScheduler
//* interrupted:: @cindex\s-+interrupted
-//* n_free_capabilities:: @cindex\s-+n_free_capabilities
//* next_thread_id:: @cindex\s-+next_thread_id
//* print_bq:: @cindex\s-+print_bq
//* run_queue_hd:: @cindex\s-+run_queue_hd