/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.164 2003/03/25 17:26:09 sof Exp $
+ * $Id: Schedule.c,v 1.185 2004/02/25 17:35:44 simonmar Exp $
*
- * (c) The GHC Team, 1998-2000
+ * (c) The GHC Team, 1998-2003
*
* Scheduler
*
*
* WAY Name CPP flag What's it for
* --------------------------------------
- * mp GUM PAR Parallel execution on a distributed memory machine
+ * mp GUM PAR Parallel execution on a distrib. memory machine
* s SMP SMP Parallel execution on a shared memory machine
* mg GranSim GRAN Simulation of parallel execution
* md GUM/GdH DIST Distributed execution (based on GUM)
*
* --------------------------------------------------------------------------*/
-//@node Main scheduling code, , ,
-//@section Main scheduling code
-
/*
- * Version with scheduler monitor support for SMPs (WAY=s):
-
- This design provides a high-level API to create and schedule threads etc.
- as documented in the SMP design document.
-
- It uses a monitor design controlled by a single mutex to exercise control
- over accesses to shared data structures, and builds on the Posix threads
- library.
-
- The majority of state is shared. In order to keep essential per-task state,
- there is a Capability structure, which contains all the information
- needed to run a thread: its STG registers, a pointer to its TSO, a
- nursery etc. During STG execution, a pointer to the capability is
- kept in a register (BaseReg).
-
- In a non-SMP build, there is one global capability, namely MainRegTable.
-
- SDM & KH, 10/99
-
* Version with support for distributed memory parallelism aka GUM (WAY=mp):
The main scheduling loop in GUM iterates until a finish message is received.
over the events in the global event queue. -- HWL
*/
-//@menu
-//* Includes::
-//* Variables and Data structures::
-//* Main scheduling loop::
-//* Suspend and Resume::
-//* Run queue code::
-//* Garbage Collextion Routines::
-//* Blocking Queue Routines::
-//* Exception Handling Routines::
-//* Debugging Routines::
-//* Index::
-//@end menu
-
-//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
-//@subsection Includes
-
#include "PosixSource.h"
#include "Rts.h"
#include "SchedAPI.h"
#include <stdlib.h>
#include <stdarg.h>
-//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
-//@subsection Variables and Data structures
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
+#ifdef THREADED_RTS
+#define USED_IN_THREADED_RTS
+#else
+#define USED_IN_THREADED_RTS STG_UNUSED
+#endif
+
+#ifdef RTS_SUPPORTS_THREADS
+#define USED_WHEN_RTS_SUPPORTS_THREADS
+#else
+#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
+#endif
/* Main thread queue.
* Locks required: sched_mutex.
*/
StgMainThread *main_threads = NULL;
-#ifdef THREADED_RTS
-// Pointer to the thread that executes main
-// When this thread is finished, the program terminates
-// by calling shutdownHaskellAndExit.
-// It would be better to add a call to shutdownHaskellAndExit
-// to the Main.main wrapper and to remove this hack.
-StgMainThread *main_main_thread = NULL;
-#endif
-
/* Thread queues.
* Locks required: sched_mutex.
*/
*/
/* flag set by signal handler to precipitate a context switch */
-//@cindex context_switch
nat context_switch = 0;
/* if this flag is set as well, give up execution */
-//@cindex interrupted
rtsBool interrupted = rtsFalse;
/* Next thread ID to allocate.
* Locks required: thread_id_mutex
*/
-//@cindex next_thread_id
static StgThreadID next_thread_id = 1;
/*
/* The smallest stack size that makes any sense is:
* RESERVED_STACK_WORDS (so we can get back from the stack overflow)
* + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
- * + 1 (the realworld token for an IO thread)
* + 1 (the closure to enter)
+ * + 1 (stg_ap_v_ret)
+ * + 1 (spare slot req'd by stg_ap_v_ret)
*
* A thread with this stack will bomb immediately with a stack
* overflow, which will increase its stack size.
*/
-#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
+#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
#if defined(GRAN)
void addToBlockedQueue ( StgTSO *tso );
-static void schedule ( void );
+static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
void interruptStgRts ( void );
static void detectBlackHoles ( void );
-#ifdef DEBUG
-static void sched_belch(char *s, ...);
-#endif
-
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
* with these synchronisation objects.
*/
Mutex thread_id_mutex = INIT_MUTEX_VAR;
-
-# if defined(SMP)
-static Condition gc_pending_cond = INIT_COND_VAR;
-nat await_death;
-# endif
-
#endif /* RTS_SUPPORTS_THREADS */
#if defined(PAR)
StgTSO * activateSpark (rtsSpark spark);
#endif
-/*
- * The thread state for the main thread.
-// ToDo: check whether not needed any more
-StgTSO *MainTSO;
- */
+/* ----------------------------------------------------------------------------
+ * Starting Tasks
+ * ------------------------------------------------------------------------- */
+
+#if defined(RTS_SUPPORTS_THREADS)
+static rtsBool startingWorkerThread = rtsFalse;
-#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
static void taskStart(void);
static void
taskStart(void)
{
- schedule();
+ ACQUIRE_LOCK(&sched_mutex);
+ schedule(NULL,NULL);
+ RELEASE_LOCK(&sched_mutex);
}
-#endif
-#if defined(RTS_SUPPORTS_THREADS)
void
-startSchedulerTask(void)
+startSchedulerTaskIfNecessary(void)
{
- startTask(taskStart);
+ if(run_queue_hd != END_TSO_QUEUE
+ || blocked_queue_hd != END_TSO_QUEUE
+ || sleeping_queue != END_TSO_QUEUE)
+ {
+ if(!startingWorkerThread)
+ { // we don't want to start another worker thread
+ // just because the last one hasn't yet reached the
+ // "waiting for capability" state
+ startingWorkerThread = rtsTrue;
+ startTask(taskStart);
+ }
+ }
}
#endif
-//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
-//@subsection Main scheduling loop
-
/* ---------------------------------------------------------------------------
Main scheduling loop.
This is not the ugliest code you could imagine, but it's bloody close.
------------------------------------------------------------------------ */
-//@cindex schedule
static void
-schedule( void )
+schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
+ Capability *initialCapability )
{
StgTSO *t;
- Capability *cap;
+ Capability *cap = initialCapability;
StgThreadReturnCode ret;
#if defined(GRAN)
rtsEvent *event;
rtsBool was_interrupted = rtsFalse;
StgTSOWhatNext prev_what_next;
- ACQUIRE_LOCK(&sched_mutex);
+ // Pre-condition: sched_mutex is held.
#if defined(RTS_SUPPORTS_THREADS)
- waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
- IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
+ //
+ // in the threaded case, the capability is either passed in via the
+ // initialCapability parameter, or initialized inside the scheduler
+ // loop
+ //
+ IF_DEBUG(scheduler,
+ sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
+ mainThread, initialCapability);
+ );
#else
- /* simply initialise it in the non-threaded case */
+ // simply initialise it in the non-threaded case
grabCapability(&cap);
#endif
while (!receivedFinish) { /* set by processMessages */
/* when receiving PP_FINISH message */
-#else
+
+#else // everything except GRAN and PAR
while (1) {
#endif
- IF_DEBUG(scheduler, printAllThreads());
+ IF_DEBUG(scheduler, printAllThreads());
#if defined(RTS_SUPPORTS_THREADS)
- /* Check to see whether there are any worker threads
- waiting to deposit external call results. If so,
- yield our capability */
- yieldToReturningWorker(&sched_mutex, &cap);
+ // Yield the capability to higher-priority tasks if necessary.
+ //
+ if (cap != NULL) {
+ yieldCapability(&cap);
+ }
+
+ // If we do not currently hold a capability, we wait for one
+ //
+ if (cap == NULL) {
+ waitForCapability(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+ }
+
+ // We now have a capability...
#endif
- /* If we're interrupted (the user pressed ^C, or some other
- * termination condition occurred), kill all the currently running
- * threads.
- */
+ //
+ // If we're interrupted (the user pressed ^C, or some other
+ // termination condition occurred), kill all the currently running
+ // threads.
+ //
if (interrupted) {
- IF_DEBUG(scheduler, sched_belch("interrupted"));
- interrupted = rtsFalse;
- was_interrupted = rtsTrue;
+ IF_DEBUG(scheduler, sched_belch("interrupted"));
+ interrupted = rtsFalse;
+ was_interrupted = rtsTrue;
#if defined(RTS_SUPPORTS_THREADS)
- // In the threaded RTS, deadlock detection doesn't work,
- // so just exit right away.
- prog_belch("interrupted");
- releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
- RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
+ // In the threaded RTS, deadlock detection doesn't work,
+ // so just exit right away.
+ prog_belch("interrupted");
+ releaseCapability(cap);
+ RELEASE_LOCK(&sched_mutex);
+ shutdownHaskellAndExit(EXIT_SUCCESS);
#else
- deleteAllThreads();
+ deleteAllThreads();
#endif
}
- /* Go through the list of main threads and wake up any
- * clients whose computations have finished. ToDo: this
- * should be done more efficiently without a linear scan
- * of the main threads list, somehow...
- */
+ //
+ // Go through the list of main threads and wake up any
+ // clients whose computations have finished. ToDo: this
+ // should be done more efficiently without a linear scan
+ // of the main threads list, somehow...
+ //
#if defined(RTS_SUPPORTS_THREADS)
{
- StgMainThread *m, **prev;
- prev = &main_threads;
- for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
- switch (m->tso->what_next) {
- case ThreadComplete:
- if (m->ret) {
- // NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(m->ret) = (StgClosure *)m->tso->sp[1];
- }
- *prev = m->link;
- m->stat = Success;
- broadcastCondition(&m->wakeup);
-#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
-#endif
- if(m == main_main_thread)
- {
- releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
- RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
- }
- break;
- case ThreadKilled:
- if (m->ret) *(m->ret) = NULL;
- *prev = m->link;
- if (was_interrupted) {
- m->stat = Interrupted;
- } else {
- m->stat = Killed;
- }
- broadcastCondition(&m->wakeup);
+ StgMainThread *m, **prev;
+ prev = &main_threads;
+ for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
+ if (m->tso->what_next == ThreadComplete
+ || m->tso->what_next == ThreadKilled)
+ {
+ if (m == mainThread)
+ {
+ if (m->tso->what_next == ThreadComplete)
+ {
+ if (m->ret)
+ {
+ // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+ *(m->ret) = (StgClosure *)m->tso->sp[1];
+ }
+ m->stat = Success;
+ }
+ else
+ {
+ if (m->ret)
+ {
+ *(m->ret) = NULL;
+ }
+ if (was_interrupted)
+ {
+ m->stat = Interrupted;
+ }
+ else
+ {
+ m->stat = Killed;
+ }
+ }
+ *prev = m->link;
+
#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
+ removeThreadLabel((StgWord)m->tso->id);
#endif
- if(m == main_main_thread)
- {
releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
- RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
+ return;
+ }
+ else
+ {
+ // The current OS thread can not handle the fact that
+ // the Haskell thread "m" has ended. "m" is bound;
+ // the scheduler loop in it's bound OS thread has to
+ // return, so let's pass our capability directly to
+ // that thread.
+ passCapability(&m->bound_thread_cond);
+ continue;
+ }
}
- break;
- default:
- break;
}
- }
}
-
+
#else /* not threaded */
# if defined(PAR)
if (m->tso->what_next == ThreadComplete
|| m->tso->what_next == ThreadKilled) {
#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
+ removeThreadLabel((StgWord)m->tso->id);
#endif
main_threads = main_threads->link;
if (m->tso->what_next == ThreadComplete) {
}
#endif
- /* Top up the run queue from our spark pool. We try to make the
- * number of threads in the run queue equal to the number of
- * free capabilities.
- *
- * Disable spark support in SMP for now, non-essential & requires
- * a little bit of work to make it compile cleanly. -- sof 1/02.
- */
-#if 0 /* defined(SMP) */
- {
- nat n = getFreeCapabilities();
- StgTSO *tso = run_queue_hd;
- /* Count the run queue */
- while (n > 0 && tso != END_TSO_QUEUE) {
- tso = tso->link;
- n--;
- }
-
- for (; n > 0; n--) {
- StgClosure *spark;
- spark = findSpark(rtsFalse);
- if (spark == NULL) {
- break; /* no more sparks in the pool */
- } else {
- /* I'd prefer this to be done in activateSpark -- HWL */
- /* tricky - it needs to hold the scheduler lock and
- * not try to re-acquire it -- SDM */
- createSparkThread(spark);
- IF_DEBUG(scheduler,
- sched_belch("==^^ turning spark of closure %p into a thread",
- (StgClosure *)spark));
- }
- }
- /* We need to wake up the other tasks if we just created some
- * work for them.
- */
- if (getFreeCapabilities() - n > 1) {
- signalCondition( &thread_ready_cond );
- }
- }
-#endif // SMP
-
- /* check for signals each time around the scheduler */
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
+ // check for signals each time around the scheduler
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
* can wait indefinitely for something to happen.
*/
if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
-#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
+#if defined(RTS_SUPPORTS_THREADS)
|| EMPTY_RUN_QUEUE()
#endif
)
{
- awaitEvent( EMPTY_RUN_QUEUE()
-#if defined(SMP)
- && allFreeCapabilities()
-#endif
- );
+ awaitEvent( EMPTY_RUN_QUEUE() );
}
/* we can be interrupted while waiting for I/O... */
if (interrupted) continue;
* inform all the main threads.
*/
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
- if ( EMPTY_THREAD_QUEUES()
-#if defined(RTS_SUPPORTS_THREADS)
- && EMPTY_QUEUE(suspended_ccalling_threads)
-#endif
-#ifdef SMP
- && allFreeCapabilities()
-#endif
- )
+ if ( EMPTY_THREAD_QUEUES() )
{
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
-#if defined(THREADED_RTS)
- /* and SMP mode ..? */
- releaseCapability(cap);
-#endif
// Garbage collection can release some new threads due to
// either (a) finalizers or (b) threads resurrected because
// they are about to be send BlockedOnDeadMVar. Any threads
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
*/
-#if defined(RTS_SUPPORTS_THREADS)
- if ( 0 ) { /* hmm..what to do? Simply stop waiting for
- a signal with no runnable threads (or I/O
- suspended ones) leads nowhere quick.
- For now, simply shut down when we reach this
- condition.
-
- ToDo: define precisely under what conditions
- the Scheduler should shut down in an MT setting.
- */
-#else
if ( anyUserHandlers() ) {
-#endif
IF_DEBUG(scheduler,
sched_belch("still deadlocked, waiting for signals..."));
*/
{
StgMainThread *m;
-#if defined(RTS_SUPPORTS_THREADS)
- for (m = main_threads; m != NULL; m = m->link) {
- switch (m->tso->why_blocked) {
- case BlockedOnBlackHole:
- raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
- break;
- case BlockedOnException:
- case BlockedOnMVar:
- raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
- break;
- default:
- barf("deadlock: main thread blocked in a strange way");
- }
- }
-#else
m = main_threads;
switch (m->tso->why_blocked) {
case BlockedOnBlackHole:
default:
barf("deadlock: main thread blocked in a strange way");
}
-#endif
}
-
-#if defined(RTS_SUPPORTS_THREADS)
- /* ToDo: revisit conditions (and mechanism) for shutting
- down a multi-threaded world */
- IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
- RELEASE_LOCK(&sched_mutex);
- shutdownHaskell();
- return;
-#endif
}
not_deadlocked:
#elif defined(RTS_SUPPORTS_THREADS)
- /* ToDo: add deadlock detection in threaded RTS */
+ // ToDo: add deadlock detection in threaded RTS
#elif defined(PAR)
- /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
+ // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
#endif
-#if defined(SMP)
- /* If there's a GC pending, don't do anything until it has
- * completed.
- */
- if (ready_to_gc) {
- IF_DEBUG(scheduler,sched_belch("waiting for GC"));
- waitCondition( &gc_pending_cond, &sched_mutex );
- }
-#endif
-
#if defined(RTS_SUPPORTS_THREADS)
-#if defined(SMP)
- /* block until we've got a thread on the run queue and a free
- * capability.
- *
- */
- if ( EMPTY_RUN_QUEUE() ) {
- /* Give up our capability */
- releaseCapability(cap);
-
- /* If we're in the process of shutting down (& running the
- * a batch of finalisers), don't wait around.
- */
- if ( shutting_down_scheduler ) {
- RELEASE_LOCK(&sched_mutex);
- return;
- }
- IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
- waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
- IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
- }
-#else
if ( EMPTY_RUN_QUEUE() ) {
continue; // nothing to do
}
#endif
-#endif
#if defined(GRAN)
if (RtsFlags.GranFlags.Light)
/* in a GranSim setup the TSO stays on the run queue */
t = CurrentTSO;
/* Take a thread from the run queue. */
- t = POP_RUN_QUEUE(); // take_off_run_queue(t);
+ POP_RUN_QUEUE(t); // take_off_run_queue(t);
IF_DEBUG(gran,
fprintf(stderr, "GRAN: About to run current thread, which is\n");
ASSERT(run_queue_hd != END_TSO_QUEUE);
/* Take a thread from the run queue, if we have work */
- t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
+ POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
IF_DEBUG(sanity,checkTSO(t));
/* ToDo: write something to the log-file
# endif
#else /* !GRAN && !PAR */
- /* grab a thread from the run queue */
+ // grab a thread from the run queue
ASSERT(run_queue_hd != END_TSO_QUEUE);
- t = POP_RUN_QUEUE();
+ POP_RUN_QUEUE(t);
+
// Sanity check the thread we're about to run. This can be
// expensive if there is lots of thread switching going on...
IF_DEBUG(sanity,checkTSO(t));
#endif
+#ifdef THREADED_RTS
+ {
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == t)
+ break;
+ }
+
+ if(m)
+ {
+ if(m == mainThread)
+ {
+ IF_DEBUG(scheduler,
+ sched_belch("### Running thread %d in bound thread", t->id));
+ // yes, the Haskell thread is bound to the current native thread
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ sched_belch("### thread %d bound to another OS thread", t->id));
+ // no, bound to a different Haskell thread: pass to that thread
+ PUSH_ON_RUN_QUEUE(t);
+ passCapability(&m->bound_thread_cond);
+ continue;
+ }
+ }
+ else
+ {
+ if(mainThread != NULL)
+ // The thread we want to run is bound.
+ {
+ IF_DEBUG(scheduler,
+ sched_belch("### this OS thread cannot run thread %d", t->id));
+ // no, the current native thread is bound to a different
+ // Haskell thread, so pass it to any worker thread
+ PUSH_ON_RUN_QUEUE(t);
+ passCapabilityToWorker();
+ continue;
+ }
+ }
+ }
+#endif
+
cap->r.rCurrentTSO = t;
/* context switches are now initiated by the timer signal, unless
ret = ThreadFinished;
break;
case ThreadRunGHC:
+ errno = t->saved_errno;
ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+ t->saved_errno = errno;
break;
case ThreadInterpret:
ret = interpretBCO(cap);
ACQUIRE_LOCK(&sched_mutex);
#ifdef RTS_SUPPORTS_THREADS
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
+ IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
+ IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
#endif
t = cap->r.rCurrentTSO;
t->id, whatNext_strs[t->what_next]);
printThreadBlockage(t);
fprintf(stderr, "\n"));
+ fflush(stderr);
/* Only for dumping event to log file
ToDo: do I need this in GranSim, too?
#endif
threadPaused(t);
break;
-
+
case ThreadFinished:
/* Need to check whether this was a main thread, and if so, signal
* the task that started it with the return value. If we have no
}
#endif
- if (ready_to_gc
-#ifdef SMP
- && allFreeCapabilities()
-#endif
- ) {
+ if (ready_to_gc) {
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
#endif
GarbageCollect(GetRoots,rtsFalse);
ready_to_gc = rtsFalse;
-#ifdef SMP
- broadcastCondition(&gc_pending_cond);
-#endif
#if defined(GRAN)
/* add a ContinueThread event to continue execution of current thread */
new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
}
/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#ifdef THREADED_RTS
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
+{
+#ifdef THREADED_RTS
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == tso)
+ return rtsTrue;
+ }
+#endif
+ return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-StgInt forkProcess(StgTSO* tso) {
+static void
+deleteThreadImmediately(StgTSO *tso);
+StgInt
+forkProcess(HsStablePtr *entry)
+{
#ifndef mingw32_TARGET_OS
pid_t pid;
StgTSO* t,*next;
StgMainThread *m;
- rtsBool doKill;
+ SchedulerStatus rc;
IF_DEBUG(scheduler,sched_belch("forking!"));
+ rts_lock(); // This not only acquires sched_mutex, it also
+ // makes sure that no other threads are running
pid = fork();
+
if (pid) { /* parent */
/* just return the pid */
+ rts_unlock();
+ return pid;
} else { /* child */
- /* wipe all other threads */
- run_queue_hd = run_queue_tl = tso;
- tso->link = END_TSO_QUEUE;
-
- /* When clearing out the threads, we need to ensure
- that a 'main thread' is left behind; if there isn't,
- the Scheduler will shutdown next time it is entered.
-
- ==> we don't kill a thread that's on the main_threads
- list (nor the current thread.)
- [ Attempts at implementing the more ambitious scheme of
- killing the main_threads also, and then adding the
- current thread onto the main_threads list if it wasn't
- there already, failed -- waitThread() (for one) wasn't
- up to it. If it proves to be desirable to also kill
- the main threads, then this scheme will have to be
- revisited (and fully debugged!)
-
- -- sof 7/2002
- ]
- */
- /* DO NOT TOUCH THE QUEUES directly because most of the code around
- us is picky about finding the thread still in its queue when
- handling the deleteThread() */
-
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
- /* Don't kill the current thread.. */
- if (t->id == tso->id) continue;
- doKill=rtsTrue;
- /* ..or a main thread */
- for (m = main_threads; m != NULL; m = m->link) {
- if (m->tso->id == t->id) {
- doKill=rtsFalse;
- break;
- }
+ // delete all threads
+ run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ // don't allow threads to catch the ThreadKilled exception
+ deleteThreadImmediately(t);
}
- if (doKill) {
- deleteThread(t);
+
+ // wipe the main thread list
+ while((m = main_threads) != NULL) {
+ main_threads = m->link;
+#ifdef THREADED_RTS
+ closeCondition(&m->bound_thread_cond);
+#endif
+ stgFree(m);
}
+
+#ifdef RTS_SUPPORTS_THREADS
+ resetTaskManagerAfterFork(); // tell startTask() and friends that
+ startingWorkerThread = rtsFalse; // we have no worker threads any more
+ resetWorkerWakeupPipeAfterFork();
+#endif
+
+ rc = rts_evalStableIO(entry, NULL); // run the action
+ rts_checkSchedStatus("forkProcess",rc);
+
+ rts_unlock();
+
+ hs_exit(); // clean up and exit
+ stg_exit(0);
}
- }
- return pid;
#else /* mingw32 */
- barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
- /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
+ barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
return -1;
#endif /* mingw32 */
}
* Locks: sched_mutex held.
* ------------------------------------------------------------------------- */
-void deleteAllThreads ( void )
+void
+deleteAllThreads ( void )
{
StgTSO* t, *next;
IF_DEBUG(scheduler,sched_belch("deleting all threads"));
/* startThread and insertThread are now in GranSim.c -- HWL */
-//@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
-//@subsection Suspend and Resume
-
/* ---------------------------------------------------------------------------
* Suspending & resuming Haskell threads.
*
StgInt
suspendThread( StgRegTable *reg,
rtsBool concCall
-#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
+#if !defined(DEBUG)
STG_UNUSED
#endif
)
{
nat tok;
Capability *cap;
+ int saved_errno = errno;
/* assume that *reg is a pointer to the StgRegTable part
* of a Capability.
*/
- cap = (Capability *)((void *)reg - sizeof(StgFunTable));
+ cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
ACQUIRE_LOCK(&sched_mutex);
/* Preparing to leave the RTS, so ensure there's a native thread/task
waiting to take over.
*/
- IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
- //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
- startTask(taskStart);
- //}
+ IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
/* Other threads _might_ be available for execution; signal this */
THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
+
+ errno = saved_errno;
return tok;
}
StgRegTable *
resumeThread( StgInt tok,
- rtsBool concCall
-#if !defined(RTS_SUPPORTS_THREADS)
- STG_UNUSED
-#endif
- )
+ rtsBool concCall STG_UNUSED )
{
StgTSO *tso, **prev;
Capability *cap;
+ int saved_errno = errno;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result. */
ACQUIRE_LOCK(&sched_mutex);
- grabReturnCapability(&sched_mutex, &cap);
+ waitForReturnCapability(&sched_mutex, &cap);
- IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
+ IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
#else
grabCapability(&cap);
#endif
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
-#if defined(RTS_SUPPORTS_THREADS)
RELEASE_LOCK(&sched_mutex);
-#endif
+ errno = saved_errno;
return &cap->r;
}
buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
strncpy(buf,label,len);
/* Update will free the old memory for us */
- updateThreadLabel((StgWord)tso,buf);
+ updateThreadLabel(((StgTSO *)tso)->id,buf);
}
#endif /* DEBUG */
currently pri (priority) is only used in a GRAN setup -- HWL
------------------------------------------------------------------------ */
-//@cindex createThread
#if defined(GRAN)
/* currently pri (priority) is only used in a GRAN setup -- HWL */
StgTSO *
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
+ tso->saved_errno = 0;
+
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- TSO_STRUCT_SIZEW;
ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
*/
#if defined(PAR)
-//@cindex activateSpark
StgTSO *
activateSpark (rtsSpark spark)
{
}
#endif
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
+static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
+ Capability *initialCapability
);
RELEASE_LOCK(&sched_mutex);
}
+static Condition bound_cond_cache = NULL;
+
SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
-{ // Precondition: sched_mutex must be held
- StgMainThread *m;
+scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
+ Capability *initialCapability)
+{
+ // Precondition: sched_mutex must be held
+ StgMainThread *m;
- m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
- m->tso = tso;
- m->ret = ret;
- m->stat = NoStatus;
+ m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
+ m->tso = tso;
+ m->ret = ret;
+ m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
- initCondition(&m->wakeup);
+ // Allocating a new condition for each thread is expensive, so we
+ // cache one. This is a pretty feeble hack, but it helps speed up
+ // consecutive call-ins quite a bit.
+ if (bound_cond_cache != NULL) {
+ m->bound_thread_cond = bound_cond_cache;
+ bound_cond_cache = NULL;
+ } else {
+ initCondition(&m->bound_thread_cond);
+ }
#endif
- /* Put the thread on the main-threads list prior to scheduling the TSO.
- Failure to do so introduces a race condition in the MT case (as
- identified by Wolfgang Thaller), whereby the new task/OS thread
- created by scheduleThread_() would complete prior to the thread
- that spawned it managed to put 'itself' on the main-threads list.
- The upshot of it all being that the worker thread wouldn't get to
- signal the completion of the its work item for the main thread to
- see (==> it got stuck waiting.) -- sof 6/02.
- */
- IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
-
- m->link = main_threads;
- main_threads = m;
+ /* Put the thread on the main-threads list prior to scheduling the TSO.
+ Failure to do so introduces a race condition in the MT case (as
+ identified by Wolfgang Thaller), whereby the new task/OS thread
+ created by scheduleThread_() would complete prior to the thread
+ that spawned it managed to put 'itself' on the main-threads list.
+ The upshot of it all being that the worker thread wouldn't get to
+ signal the completion of the its work item for the main thread to
+ see (==> it got stuck waiting.) -- sof 6/02.
+ */
+ IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
+
+ m->link = main_threads;
+ main_threads = m;
+
+ scheduleThread_(tso);
- scheduleThread_(tso);
-#if defined(THREADED_RTS)
- return waitThread_(m, rtsTrue);
-#else
- return waitThread_(m);
-#endif
+ return waitThread_(m, initialCapability);
}
/* ---------------------------------------------------------------------------
*
* ------------------------------------------------------------------------ */
-#ifdef SMP
-static void
-term_handler(int sig STG_UNUSED)
-{
- stat_workerStop();
- ACQUIRE_LOCK(&term_mutex);
- await_death--;
- RELEASE_LOCK(&term_mutex);
- shutdownThread();
-}
-#endif
-
void
initScheduler(void)
{
initCondition(&thread_ready_cond);
#endif
-#if defined(SMP)
- initCondition(&gc_pending_cond);
-#endif
-
#if defined(RTS_SUPPORTS_THREADS)
ACQUIRE_LOCK(&sched_mutex);
#endif
- /* Install the SIGHUP handler */
-#if defined(SMP)
- {
- struct sigaction action,oact;
-
- action.sa_handler = term_handler;
- sigemptyset(&action.sa_mask);
- action.sa_flags = 0;
- if (sigaction(SIGTERM, &action, &oact) != 0) {
- barf("can't install TERM handler");
- }
- }
-#endif
-
/* A capability holds the state a native thread needs in
* order to execute STG code. At least one capability is
* floating around (only SMP builds have more than one).
#if defined(RTS_SUPPORTS_THREADS)
/* start our haskell execution tasks */
-# if defined(SMP)
- startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
-# else
startTaskManager(0,taskStart);
-# endif
#endif
#if /* defined(SMP) ||*/ defined(PAR)
initSparkPools();
#endif
-#if defined(RTS_SUPPORTS_THREADS)
RELEASE_LOCK(&sched_mutex);
-#endif
}
shutting_down_scheduler = rtsTrue;
}
-/* -----------------------------------------------------------------------------
+/* ----------------------------------------------------------------------------
Managing the per-task allocation areas.
Each capability comes with an allocation area. These are
fixed-length block lists into which allocation can be done.
ToDo: no support for two-space collection at the moment???
- -------------------------------------------------------------------------- */
-
-/* -----------------------------------------------------------------------------
- * waitThread is the external interface for running a new computation
- * and waiting for the result.
- *
- * In the non-SMP case, we create a new main thread, push it on the
- * main-thread stack, and invoke the scheduler to run it. The
- * scheduler will return when the top main thread on the stack has
- * completed or died, and fill in the necessary fields of the
- * main_thread structure.
- *
- * In the SMP case, we create a main thread as before, but we then
- * create a new condition variable and sleep on it. When our new
- * main thread has completed, we'll be woken up and the status/result
- * will be in the main_thread struct.
- * -------------------------------------------------------------------------- */
-
-int
-howManyThreadsAvail ( void )
-{
- int i = 0;
- StgTSO* q;
- for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
- i++;
- for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
- i++;
- for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
- i++;
- return i;
-}
-
-void
-finishAllThreads ( void )
-{
- do {
- while (run_queue_hd != END_TSO_QUEUE) {
- waitThread ( run_queue_hd, NULL);
- }
- while (blocked_queue_hd != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
- }
- while (sleeping_queue != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
- }
- } while
- (blocked_queue_hd != END_TSO_QUEUE ||
- run_queue_hd != END_TSO_QUEUE ||
- sleeping_queue != END_TSO_QUEUE);
-}
-
-SchedulerStatus
-waitThread(StgTSO *tso, /*out*/StgClosure **ret)
-{
- StgMainThread *m;
- SchedulerStatus stat;
-
- m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
- m->tso = tso;
- m->ret = ret;
- m->stat = NoStatus;
-#if defined(RTS_SUPPORTS_THREADS)
- initCondition(&m->wakeup);
-#endif
-
- /* see scheduleWaitThread() comment */
- ACQUIRE_LOCK(&sched_mutex);
- m->link = main_threads;
- main_threads = m;
-
- IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
-#if defined(THREADED_RTS)
- stat = waitThread_(m, rtsFalse);
-#else
- stat = waitThread_(m);
-#endif
- RELEASE_LOCK(&sched_mutex);
- return stat;
-}
+ ------------------------------------------------------------------------- */
static
SchedulerStatus
-waitThread_(StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
- )
+waitThread_(StgMainThread* m, Capability *initialCapability)
{
SchedulerStatus stat;
// Precondition: sched_mutex must be held.
- IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
-
-#if defined(RTS_SUPPORTS_THREADS)
+ IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
-# if defined(THREADED_RTS)
- if (!blockWaiting) {
- /* In the threaded case, the OS thread that called main()
- * gets to enter the RTS directly without going via another
- * task/thread.
- */
- main_main_thread = m;
- RELEASE_LOCK(&sched_mutex);
- schedule();
- ACQUIRE_LOCK(&sched_mutex);
- main_main_thread = NULL;
- ASSERT(m->stat != NoStatus);
- } else
-# endif
- {
- do {
- waitCondition(&m->wakeup, &sched_mutex);
- } while (m->stat == NoStatus);
- }
-#elif defined(GRAN)
+#if defined(GRAN)
/* GranSim specific init */
CurrentTSO = m->tso; // the TSO to run
procStatus[MainProc] = Busy; // status of main PE
CurrentProc = MainProc; // PE to run it on
-
- RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
#else
- RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
ASSERT(m->stat != NoStatus);
#endif
stat = m->stat;
#if defined(RTS_SUPPORTS_THREADS)
- closeCondition(&m->wakeup);
+ // Free the condition variable, returning it to the cache if possible.
+ if (bound_cond_cache == NULL) {
+ bound_cond_cache = m->bound_thread_cond;
+ } else {
+ closeCondition(&m->bound_thread_cond);
+ }
#endif
- IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
- m->tso->id));
- free(m);
+ IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
+ stgFree(m);
// Postcondition: sched_mutex still held
return stat;
}
-//@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
-//@subsection Run queue code
-
-#if 0
-/*
- NB: In GranSim we have many run queues; run_queue_hd is actually a macro
- unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
- implicit global variable that has to be correct when calling these
- fcts -- HWL
-*/
-
-/* Put the new thread on the head of the runnable queue.
- * The caller of createThread better push an appropriate closure
- * on this thread's stack before the scheduler is invoked.
- */
-static /* inline */ void
-add_to_run_queue(tso)
-StgTSO* tso;
-{
- ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
- tso->link = run_queue_hd;
- run_queue_hd = tso;
- if (run_queue_tl == END_TSO_QUEUE) {
- run_queue_tl = tso;
- }
-}
-
-/* Put the new thread at the end of the runnable queue. */
-static /* inline */ void
-push_on_run_queue(tso)
-StgTSO* tso;
-{
- ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
- ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
- ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
- if (run_queue_hd == END_TSO_QUEUE) {
- run_queue_hd = tso;
- } else {
- run_queue_tl->link = tso;
- }
- run_queue_tl = tso;
-}
-
-/*
- Should be inlined because it's used very often in schedule. The tso
- argument is actually only needed in GranSim, where we want to have the
- possibility to schedule *any* TSO on the run queue, irrespective of the
- actual ordering. Therefore, if tso is not the nil TSO then we traverse
- the run queue and dequeue the tso, adjusting the links in the queue.
-*/
-//@cindex take_off_run_queue
-static /* inline */ StgTSO*
-take_off_run_queue(StgTSO *tso) {
- StgTSO *t, *prev;
-
- /*
- qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
-
- if tso is specified, unlink that tso from the run_queue (doesn't have
- to be at the beginning of the queue); GranSim only
- */
- if (tso!=END_TSO_QUEUE) {
- /* find tso in queue */
- for (t=run_queue_hd, prev=END_TSO_QUEUE;
- t!=END_TSO_QUEUE && t!=tso;
- prev=t, t=t->link)
- /* nothing */ ;
- ASSERT(t==tso);
- /* now actually dequeue the tso */
- if (prev!=END_TSO_QUEUE) {
- ASSERT(run_queue_hd!=t);
- prev->link = t->link;
- } else {
- /* t is at beginning of thread queue */
- ASSERT(run_queue_hd==t);
- run_queue_hd = t->link;
- }
- /* t is at end of thread queue */
- if (t->link==END_TSO_QUEUE) {
- ASSERT(t==run_queue_tl);
- run_queue_tl = prev;
- } else {
- ASSERT(run_queue_tl!=t);
- }
- t->link = END_TSO_QUEUE;
- } else {
- /* take tso from the beginning of the queue; std concurrent code */
- t = run_queue_hd;
- if (t != END_TSO_QUEUE) {
- run_queue_hd = t->link;
- t->link = END_TSO_QUEUE;
- if (run_queue_hd == END_TSO_QUEUE) {
- run_queue_tl = END_TSO_QUEUE;
- }
- }
- }
- return t;
-}
-
-#endif /* 0 */
-
-//@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
-//@subsection Garbage Collextion Routines
-
/* ---------------------------------------------------------------------------
Where are the roots that we know about?
*/
void
-GetRoots(evac_fn evac)
+GetRoots( evac_fn evac )
{
#if defined(GRAN)
{
markSparkQueue(evac);
#endif
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
// mark the signal handlers (signals should be already blocked)
markSignalHandlers(evac);
#endif
static StgTSO *
threadStackOverflow(StgTSO *tso)
{
- nat new_stack_size, new_tso_size, diff, stack_words;
+ nat new_stack_size, new_tso_size, stack_words;
StgPtr new_sp;
StgTSO *dest;
if (tso->stack_size >= tso->max_stack_size) {
IF_DEBUG(gc,
- belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
+ belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
tso->id, tso, tso->stack_size, tso->max_stack_size);
/* If we're debugging, just print out the top of the stack */
printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
- IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
+ IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
TICK_ALLOC_TSO(new_stack_size,0);
memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
/* relocate the stack pointers... */
- diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
- dest->sp = new_sp;
+ dest->sp = new_sp;
dest->stack_size = new_stack_size;
/* Mark the old TSO as relocated. We have to check for relocated
return dest;
}
-//@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
-//@subsection Blocking Queue Routines
-
/* ---------------------------------------------------------------------------
Wake up a queue that was blocked on some resource.
------------------------------------------------------------------------ */
#if defined(GRAN)
-static inline void
+STATIC_INLINE void
unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
{
}
#elif defined(PAR)
-static inline void
+STATIC_INLINE void
unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
{
/* write RESUME events to log file and
#endif
#if defined(GRAN) || defined(PAR)
-inline StgBlockingQueueElement *
+INLINE_ME StgBlockingQueueElement *
unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
{
ACQUIRE_LOCK(&sched_mutex);
return bqe;
}
#else
-inline StgTSO *
+INLINE_ME StgTSO *
unblockOne(StgTSO *tso)
{
ACQUIRE_LOCK(&sched_mutex);
}
#endif
-//@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
-//@subsection Exception Handling Routines
-
/* ---------------------------------------------------------------------------
Interrupt execution
- usually called inside a signal handler so it mustn't do anything fancy.
{
interrupted = 1;
context_switch = 1;
+#ifdef RTS_SUPPORTS_THREADS
+ wakeBlockedWorkerThread();
+#endif
}
/* -----------------------------------------------------------------------------
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
/* take TSO off blocked_queue */
StgBlockingQueueElement *prev = NULL;
goto done;
}
}
- barf("unblockThread (I/O): TSO not found");
+ barf("unblockThread (delay): TSO not found");
}
default:
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
StgTSO *prev = NULL;
for (t = blocked_queue_hd; t != END_TSO_QUEUE;
goto done;
}
}
- barf("unblockThread (I/O): TSO not found");
+ barf("unblockThread (delay): TSO not found");
}
default:
raiseAsync(tso,NULL);
}
+static void
+deleteThreadImmediately(StgTSO *tso)
+{ // for forkProcess only:
+ // delete thread without giving it a chance to catch the KillThread exception
+
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+#if defined(RTS_SUPPORTS_THREADS)
+ if (tso->why_blocked != BlockedOnCCall
+ && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
+#endif
+ unblockThread(tso);
+ tso->what_next = ThreadKilled;
+}
+
void
raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
{
TICK_ALLOC_UP_THK(words+1,0);
IF_DEBUG(scheduler,
- fprintf(stderr, "scheduler: Updating ");
+ fprintf(stderr, "sched: Updating ");
printPtr((P_)((StgUpdateFrame *)frame)->updatee);
fprintf(stderr, " with ");
printObj((StgClosure *)ap);
}
}
-//@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
-//@subsection Debugging Routines
-
-/* -----------------------------------------------------------------------------
+/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* [Also provides useful information when debugging threaded programs
* at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
- -------------------------------------------------------------------------- */
+ ------------------------------------------------------------------------- */
static
void
case BlockedOnWrite:
fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
break;
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+ fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+ break;
+#endif
case BlockedOnDelay:
fprintf(stderr,"is blocked until %d", tso->block_info.target);
break;
for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
- label = lookupThreadLabel((StgWord)t);
+ label = lookupThreadLabel(t->id);
if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
printThreadStatus(t);
fprintf(stderr,"\n");
/*
Print a whole blocking queue attached to node (debugging only).
*/
-//@cindex print_bq
# if defined(PAR)
void
print_bq (StgClosure *node)
}
#endif
-static void
+void
sched_belch(char *s, ...)
{
va_list ap;
va_start(ap,s);
-#ifdef SMP
- fprintf(stderr, "scheduler (task %ld): ", osThreadId());
+#ifdef RTS_SUPPORTS_THREADS
+ fprintf(stderr, "sched (task %p): ", osThreadId());
#elif defined(PAR)
fprintf(stderr, "== ");
#else
- fprintf(stderr, "scheduler: ");
+ fprintf(stderr, "sched: ");
#endif
vfprintf(stderr, s, ap);
fprintf(stderr, "\n");
+ fflush(stderr);
va_end(ap);
}
#endif /* DEBUG */
-
-
-//@node Index, , Debugging Routines, Main scheduling code
-//@subsection Index
-
-//@index
-//* StgMainThread:: @cindex\s-+StgMainThread
-//* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
-//* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
-//* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
-//* context_switch:: @cindex\s-+context_switch
-//* createThread:: @cindex\s-+createThread
-//* gc_pending_cond:: @cindex\s-+gc_pending_cond
-//* initScheduler:: @cindex\s-+initScheduler
-//* interrupted:: @cindex\s-+interrupted
-//* next_thread_id:: @cindex\s-+next_thread_id
-//* print_bq:: @cindex\s-+print_bq
-//* run_queue_hd:: @cindex\s-+run_queue_hd
-//* run_queue_tl:: @cindex\s-+run_queue_tl
-//* sched_mutex:: @cindex\s-+sched_mutex
-//* schedule:: @cindex\s-+schedule
-//* take_off_run_queue:: @cindex\s-+take_off_run_queue
-//* term_mutex:: @cindex\s-+term_mutex
-//@end index