/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.160 2002/12/13 15:16:29 simonmar Exp $
+ * $Id: Schedule.c,v 1.179 2003/10/05 20:18:36 panne Exp $
*
* (c) The GHC Team, 1998-2000
*
#include "Interpreter.h"
#include "Exception.h"
#include "Printer.h"
-#include "Main.h"
#include "Signals.h"
#include "Sanity.h"
#include "Stats.h"
-#include "Itimer.h"
+#include "Timer.h"
#include "Prelude.h"
#include "ThreadLabels.h"
#ifdef PROFILING
#include <stdlib.h>
#include <stdarg.h>
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
+#ifdef THREADED_RTS
+#define USED_IN_THREADED_RTS
+#else
+#define USED_IN_THREADED_RTS STG_UNUSED
+#endif
+
+#ifdef RTS_SUPPORTS_THREADS
+#define USED_WHEN_RTS_SUPPORTS_THREADS
+#else
+#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
+#endif
+
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures
/* The smallest stack size that makes any sense is:
* RESERVED_STACK_WORDS (so we can get back from the stack overflow)
* + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
- * + 1 (the realworld token for an IO thread)
* + 1 (the closure to enter)
+ * + 1 (stg_ap_v_ret)
+ * + 1 (spare slot req'd by stg_ap_v_ret)
*
* A thread with this stack will bomb immediately with a stack
* overflow, which will increase its stack size.
*/
-#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
+#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
#if defined(GRAN)
void addToBlockedQueue ( StgTSO *tso );
-static void schedule ( void );
+static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
void interruptStgRts ( void );
static void detectBlackHoles ( void );
StgTSO *MainTSO;
*/
-#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS)
+static rtsBool startingWorkerThread = rtsFalse;
+
static void taskStart(void);
static void
taskStart(void)
{
- schedule();
+ Capability *cap;
+
+ ACQUIRE_LOCK(&sched_mutex);
+ startingWorkerThread = rtsFalse;
+ waitForWorkCapability(&sched_mutex, &cap, NULL);
+ RELEASE_LOCK(&sched_mutex);
+
+ schedule(NULL,cap);
}
-#endif
-
-
+void
+startSchedulerTaskIfNecessary(void)
+{
+ if(run_queue_hd != END_TSO_QUEUE
+ || blocked_queue_hd != END_TSO_QUEUE
+ || sleeping_queue != END_TSO_QUEUE)
+ {
+ if(!startingWorkerThread)
+ { // we don't want to start another worker thread
+ // just because the last one hasn't yet reached the
+ // "waiting for capability" state
+ startingWorkerThread = rtsTrue;
+ startTask(taskStart);
+ }
+ }
+}
+#endif
//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop
------------------------------------------------------------------------ */
//@cindex schedule
static void
-schedule( void )
+schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
+ Capability *initialCapability )
{
StgTSO *t;
- Capability *cap;
+ Capability *cap = initialCapability;
StgThreadReturnCode ret;
#if defined(GRAN)
rtsEvent *event;
ACQUIRE_LOCK(&sched_mutex);
#if defined(RTS_SUPPORTS_THREADS)
- waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
+ /* in the threaded case, the capability is either passed in via the initialCapability
+ parameter, or initialized inside the scheduler loop */
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
+ osThreadId(), osThreadId()));
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### main thread: %p\n",mainThread));
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### initial cap: %p\n",initialCapability));
#else
/* simply initialise it in the non-threaded case */
grabCapability(&cap);
#if defined(RTS_SUPPORTS_THREADS)
/* Check to see whether there are any worker threads
waiting to deposit external call results. If so,
- yield our capability */
- yieldToReturningWorker(&sched_mutex, &cap);
+ yield our capability... if we have a capability, that is. */
+ if(cap)
+ yieldToReturningWorker(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+
+ /* If we do not currently hold a capability, we wait for one */
+ if(!cap)
+ {
+ waitForWorkCapability(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+ IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
+ osThreadId()));
+ }
#endif
/* If we're interrupted (the user pressed ^C, or some other
*/
if (interrupted) {
IF_DEBUG(scheduler, sched_belch("interrupted"));
- deleteAllThreads();
interrupted = rtsFalse;
was_interrupted = rtsTrue;
+#if defined(RTS_SUPPORTS_THREADS)
+ // In the threaded RTS, deadlock detection doesn't work,
+ // so just exit right away.
+ prog_belch("interrupted");
+ releaseCapability(cap);
+ RELEASE_LOCK(&sched_mutex);
+ shutdownHaskellAndExit(EXIT_SUCCESS);
+#else
+ deleteAllThreads();
+#endif
}
/* Go through the list of main threads and wake up any
*/
#if defined(RTS_SUPPORTS_THREADS)
{
- StgMainThread *m, **prev;
- prev = &main_threads;
- for (m = main_threads; m != NULL; m = m->link) {
- switch (m->tso->what_next) {
- case ThreadComplete:
- if (m->ret) {
- // NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(m->ret) = (StgClosure *)m->tso->sp[1];
- }
- *prev = m->link;
- m->stat = Success;
- broadcastCondition(&m->wakeup);
-#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
-#endif
- break;
- case ThreadKilled:
- if (m->ret) *(m->ret) = NULL;
- *prev = m->link;
- if (was_interrupted) {
- m->stat = Interrupted;
- } else {
- m->stat = Killed;
- }
- broadcastCondition(&m->wakeup);
+ StgMainThread *m, **prev;
+ prev = &main_threads;
+ for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
+ if (m->tso->what_next == ThreadComplete
+ || m->tso->what_next == ThreadKilled)
+ {
+ if(m == mainThread)
+ {
+ if(m->tso->what_next == ThreadComplete)
+ {
+ if (m->ret)
+ {
+ // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+ *(m->ret) = (StgClosure *)m->tso->sp[1];
+ }
+ m->stat = Success;
+ }
+ else
+ {
+ if (m->ret)
+ {
+ *(m->ret) = NULL;
+ }
+ if (was_interrupted)
+ {
+ m->stat = Interrupted;
+ }
+ else
+ {
+ m->stat = Killed;
+ }
+ }
+ *prev = m->link;
+
#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
-#endif
- break;
- default:
- break;
+ removeThreadLabel((StgWord)m->tso);
+#endif
+ releaseCapability(cap);
+ RELEASE_LOCK(&sched_mutex);
+ return;
+ }
+ else
+ {
+ // The current OS thread can not handle the fact that the Haskell
+ // thread "m" has ended.
+ // "m" is bound; the scheduler loop in it's bound OS thread has
+ // to return, so let's pass our capability directly to that thread.
+ passCapability(&sched_mutex, cap, &m->bound_thread_cond);
+ cap = NULL;
+ }
+ }
}
- }
}
-
+
+ if(!cap) // If we gave our capability away,
+ continue; // go to the top to get it back
+
#else /* not threaded */
# if defined(PAR)
#endif // SMP
/* check for signals each time around the scheduler */
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
/* Check whether any waiting threads need to be woken up. If the
* run queue is empty, and there are no other tasks running, we
* can wait indefinitely for something to happen.
- * ToDo: what if another client comes along & requests another
- * main thread?
*/
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
+ if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
+#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
+ || EMPTY_RUN_QUEUE()
+#endif
+ )
+ {
awaitEvent( EMPTY_RUN_QUEUE()
#if defined(SMP)
&& allFreeCapabilities()
* If no threads are black holed, we have a deadlock situation, so
* inform all the main threads.
*/
-#ifndef PAR
+#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
if ( EMPTY_THREAD_QUEUES()
#if defined(RTS_SUPPORTS_THREADS)
&& EMPTY_QUEUE(suspended_ccalling_threads)
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
}
not_deadlocked:
+#elif defined(RTS_SUPPORTS_THREADS)
+ /* ToDo: add deadlock detection in threaded RTS */
#elif defined(PAR)
/* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
#endif
#endif
#if defined(RTS_SUPPORTS_THREADS)
+#if defined(SMP)
/* block until we've got a thread on the run queue and a free
* capability.
*
waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
}
+#else
+ if ( EMPTY_RUN_QUEUE() ) {
+ continue; // nothing to do
+ }
+#endif
#endif
#if defined(GRAN)
// expensive if there is lots of thread switching going on...
IF_DEBUG(sanity,checkTSO(t));
#endif
-
+
+#ifdef THREADED_RTS
+ {
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == t)
+ break;
+ }
+
+ if(m)
+ {
+ if(m == mainThread)
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
+ t, osThreadId()));
+ // yes, the Haskell thread is bound to the current native thread
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
+ t, osThreadId()));
+ // no, bound to a different Haskell thread: pass to that thread
+ PUSH_ON_RUN_QUEUE(t);
+ passCapability(&sched_mutex,cap,&m->bound_thread_cond);
+ cap = NULL;
+ continue;
+ }
+ }
+ else
+ {
+ // The thread we want to run is not bound.
+ if(mainThread == NULL)
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
+ t, osThreadId()));
+ // if we are a worker thread,
+ // we may run it here
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
+ t, mainThread, osThreadId()));
+ // no, the current native thread is bound to a different
+ // Haskell thread, so pass it to any worker thread
+ PUSH_ON_RUN_QUEUE(t);
+ passCapabilityToWorker(&sched_mutex, cap);
+ cap = NULL;
+ continue;
+ }
+ }
+ }
+#endif
+
cap->r.rCurrentTSO = t;
/* context switches are now initiated by the timer signal, unless
else
context_switch = 0;
+run_thread:
+
RELEASE_LOCK(&sched_mutex);
IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
/* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
/* Run the current thread
*/
- run_thread:
prev_what_next = t->what_next;
switch (prev_what_next) {
case ThreadKilled:
ret = ThreadFinished;
break;
case ThreadRunGHC:
+ errno = t->saved_errno;
ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+ t->saved_errno = errno;
break;
case ThreadInterpret:
ret = interpretBCO(cap);
#endif
ACQUIRE_LOCK(&sched_mutex);
-
-#ifdef SMP
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
+
+#ifdef RTS_SUPPORTS_THREADS
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
#endif
}
/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#ifdef THREADED_RTS
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
+{
+#ifdef THREADED_RTS
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == tso)
+ return rtsTrue;
+ }
+#endif
+ return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-StgInt forkProcess(StgTSO* tso) {
+static void
+deleteThreadImmediately(StgTSO *tso);
+StgInt
+forkProcess(HsStablePtr *entry)
+{
#ifndef mingw32_TARGET_OS
pid_t pid;
StgTSO* t,*next;
StgMainThread *m;
- rtsBool doKill;
+ SchedulerStatus rc;
IF_DEBUG(scheduler,sched_belch("forking!"));
+ rts_lock(); // This not only acquires sched_mutex, it also
+ // makes sure that no other threads are running
pid = fork();
+
if (pid) { /* parent */
/* just return the pid */
+ rts_unlock();
+ return pid;
} else { /* child */
- /* wipe all other threads */
- run_queue_hd = run_queue_tl = tso;
- tso->link = END_TSO_QUEUE;
-
- /* When clearing out the threads, we need to ensure
- that a 'main thread' is left behind; if there isn't,
- the Scheduler will shutdown next time it is entered.
-
- ==> we don't kill a thread that's on the main_threads
- list (nor the current thread.)
- [ Attempts at implementing the more ambitious scheme of
- killing the main_threads also, and then adding the
- current thread onto the main_threads list if it wasn't
- there already, failed -- waitThread() (for one) wasn't
- up to it. If it proves to be desirable to also kill
- the main threads, then this scheme will have to be
- revisited (and fully debugged!)
-
- -- sof 7/2002
- ]
- */
- /* DO NOT TOUCH THE QUEUES directly because most of the code around
- us is picky about finding the thread still in its queue when
- handling the deleteThread() */
-
- for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
- /* Don't kill the current thread.. */
- if (t->id == tso->id) continue;
- doKill=rtsTrue;
- /* ..or a main thread */
- for (m = main_threads; m != NULL; m = m->link) {
- if (m->tso->id == t->id) {
- doKill=rtsFalse;
- break;
- }
+ // delete all threads
+ run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ // don't allow threads to catch the ThreadKilled exception
+ deleteThreadImmediately(t);
}
- if (doKill) {
- deleteThread(t);
+
+ // wipe the main thread list
+ while((m = main_threads) != NULL) {
+ main_threads = m->link;
+#ifdef THREADED_RTS
+ closeCondition(&m->bound_thread_cond);
+#endif
+ stgFree(m);
}
+
+#ifdef RTS_SUPPORTS_THREADS
+ resetTaskManagerAfterFork(); // tell startTask() and friends that
+ startingWorkerThread = rtsFalse; // we have no worker threads any more
+ resetWorkerWakeupPipeAfterFork();
+#endif
+
+ rc = rts_evalStableIO(entry, NULL); // run the action
+ rts_checkSchedStatus("forkProcess",rc);
+
+ rts_unlock();
+
+ hs_exit(); // clean up and exit
+ stg_exit(0);
}
- }
- return pid;
#else /* mingw32 */
- barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
- /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
+ barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
return -1;
#endif /* mingw32 */
}
* Locks: sched_mutex held.
* ------------------------------------------------------------------------- */
-void deleteAllThreads ( void )
+void
+deleteAllThreads ( void )
{
StgTSO* t, *next;
IF_DEBUG(scheduler,sched_belch("deleting all threads"));
{
nat tok;
Capability *cap;
+ int saved_errno = errno;
/* assume that *reg is a pointer to the StgRegTable part
* of a Capability.
suspended_ccalling_threads = cap->r.rCurrentTSO;
#if defined(RTS_SUPPORTS_THREADS)
- cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+ if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
+ {
+ cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+ cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+ }
+ else
+ {
+ cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+ }
#endif
/* Use the thread ID as the token; it should be unique */
#if defined(RTS_SUPPORTS_THREADS)
/* Preparing to leave the RTS, so ensure there's a native thread/task
waiting to take over.
-
- ToDo: optimise this and only create a new task if there's a need
- for one (i.e., if there's only one Concurrent Haskell thread alive,
- there's no need to create a new task).
*/
- IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
- if (concCall) {
- startTask(taskStart);
- }
+ IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
#endif
/* Other threads _might_ be available for execution; signal this */
THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
+
+ errno = saved_errno;
return tok;
}
StgRegTable *
resumeThread( StgInt tok,
- rtsBool concCall
-#if !defined(RTS_SUPPORTS_THREADS)
- STG_UNUSED
-#endif
- )
+ rtsBool concCall STG_UNUSED )
{
StgTSO *tso, **prev;
Capability *cap;
+ int saved_errno = errno;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result. */
- if ( concCall ) {
- ACQUIRE_LOCK(&sched_mutex);
- grabReturnCapability(&sched_mutex, &cap);
- } else {
- grabCapability(&cap);
- }
+ ACQUIRE_LOCK(&sched_mutex);
+ grabReturnCapability(&sched_mutex, &cap);
+
+ IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
#else
grabCapability(&cap);
#endif
barf("resumeThread: thread not found");
}
tso->link = END_TSO_QUEUE;
+
+#if defined(RTS_SUPPORTS_THREADS)
+ if(tso->why_blocked == BlockedOnCCall)
+ {
+ awakenBlockedQueueNoLock(tso->blocked_exceptions);
+ tso->blocked_exceptions = NULL;
+ }
+#endif
+
/* Reset blocking status */
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
+#if defined(RTS_SUPPORTS_THREADS)
RELEASE_LOCK(&sched_mutex);
+#endif
+ errno = saved_errno;
return &cap->r;
}
/* Caveat: Once set, you can only set the thread name to "" */
len = strlen(label)+1;
- buf = malloc(len);
- if (buf == NULL) {
- fprintf(stderr,"insufficient memory for labelThread!\n");
- } else
- strncpy(buf,label,len);
+ buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
+ strncpy(buf,label,len);
/* Update will free the old memory for us */
updateThreadLabel((StgWord)tso,buf);
}
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
+ tso->saved_errno = 0;
+
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- TSO_STRUCT_SIZEW;
}
#endif
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
+static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
+ Capability *initialCapability
);
* on this thread's stack before the scheduler is invoked.
* ------------------------------------------------------------------------ */
-static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
+static void scheduleThread_ (StgTSO* tso);
void
-scheduleThread_(StgTSO *tso
- , rtsBool createTask
-#if !defined(THREADED_RTS)
- STG_UNUSED
-#endif
- )
+scheduleThread_(StgTSO *tso)
{
// Precondition: sched_mutex must be held.
* soon as we release the scheduler lock below.
*/
PUSH_ON_RUN_QUEUE(tso);
-#if defined(THREADED_RTS)
- /* If main() is scheduling a thread, don't bother creating a
- * new task.
- */
- if ( createTask ) {
- startTask(taskStart);
- }
-#endif
THREAD_RUNNABLE();
#if 0
void scheduleThread(StgTSO* tso)
{
ACQUIRE_LOCK(&sched_mutex);
- scheduleThread_(tso, rtsFalse);
+ scheduleThread_(tso);
RELEASE_LOCK(&sched_mutex);
}
SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
-{
+scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
+{ // Precondition: sched_mutex must be held
StgMainThread *m;
m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
m->ret = ret;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
+ initCondition(&m->bound_thread_cond);
+#else
initCondition(&m->wakeup);
#endif
+#endif
/* Put the thread on the main-threads list prior to scheduling the TSO.
Failure to do so introduces a race condition in the MT case (as
signal the completion of the its work item for the main thread to
see (==> it got stuck waiting.) -- sof 6/02.
*/
- ACQUIRE_LOCK(&sched_mutex);
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
m->link = main_threads;
main_threads = m;
- scheduleThread_(tso, rtsTrue);
-#if defined(THREADED_RTS)
- return waitThread_(m, rtsTrue); // waitThread_ releases sched_mutex
-#else
- return waitThread_(m);
-#endif
+ scheduleThread_(tso);
+
+ return waitThread_(m, initialCapability);
}
/* ---------------------------------------------------------------------------
{
do {
while (run_queue_hd != END_TSO_QUEUE) {
- waitThread ( run_queue_hd, NULL);
+ waitThread ( run_queue_hd, NULL, NULL );
}
while (blocked_queue_hd != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
+ waitThread ( blocked_queue_hd, NULL, NULL );
}
while (sleeping_queue != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
+ waitThread ( blocked_queue_hd, NULL, NULL );
}
} while
(blocked_queue_hd != END_TSO_QUEUE ||
}
SchedulerStatus
-waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
{
StgMainThread *m;
+ SchedulerStatus stat;
m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
m->tso = tso;
m->ret = ret;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
+ initCondition(&m->bound_thread_cond);
+#else
initCondition(&m->wakeup);
#endif
+#endif
/* see scheduleWaitThread() comment */
ACQUIRE_LOCK(&sched_mutex);
main_threads = m;
IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
-#if defined(THREADED_RTS)
- return waitThread_(m, rtsFalse); // waitThread_ releases sched_mutex
-#else
- return waitThread_(m);
-#endif
+
+ stat = waitThread_(m,initialCapability);
+
+ RELEASE_LOCK(&sched_mutex);
+ return stat;
}
static
SchedulerStatus
-waitThread_(StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
- )
+waitThread_(StgMainThread* m, Capability *initialCapability)
{
SchedulerStatus stat;
// Precondition: sched_mutex must be held.
IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
-#if defined(RTS_SUPPORTS_THREADS)
-
-# if defined(THREADED_RTS)
- if (!blockWaiting) {
- /* In the threaded case, the OS thread that called main()
- * gets to enter the RTS directly without going via another
- * task/thread.
- */
- RELEASE_LOCK(&sched_mutex);
- schedule();
- ASSERT(m->stat != NoStatus);
- } else
-# endif
- {
+#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
+ { // FIXME: does this still make sense?
+ // It's not for the threaded rts => SMP only
do {
waitCondition(&m->wakeup, &sched_mutex);
} while (m->stat == NoStatus);
CurrentProc = MainProc; // PE to run it on
RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
#else
RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
+ ACQUIRE_LOCK(&sched_mutex);
ASSERT(m->stat != NoStatus);
#endif
stat = m->stat;
#if defined(RTS_SUPPORTS_THREADS)
+#if defined(THREADED_RTS)
+ closeCondition(&m->bound_thread_cond);
+#else
closeCondition(&m->wakeup);
#endif
+#endif
IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
m->tso->id));
- free(m);
+ stgFree(m);
-#if defined(THREADED_RTS)
- if (blockWaiting)
-#endif
- RELEASE_LOCK(&sched_mutex);
-
- // Postcondition: sched_mutex must not be held
+ // Postcondition: sched_mutex still held
return stat;
}
markSparkQueue(evac);
#endif
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
// mark the signal handlers (signals should be already blocked)
markSignalHandlers(evac);
#endif
static StgTSO *
threadStackOverflow(StgTSO *tso)
{
- nat new_stack_size, new_tso_size, diff, stack_words;
+ nat new_stack_size, new_tso_size, stack_words;
StgPtr new_sp;
StgTSO *dest;
memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
/* relocate the stack pointers... */
- diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
- dest->sp = new_sp;
+ dest->sp = new_sp;
dest->stack_size = new_stack_size;
/* Mark the old TSO as relocated. We have to check for relocated
}
#else /* !GRAN && !PAR */
+
+#ifdef RTS_SUPPORTS_THREADS
+void
+awakenBlockedQueueNoLock(StgTSO *tso)
+{
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOneLocked(tso);
+ }
+}
+#endif
+
void
awakenBlockedQueue(StgTSO *tso)
{
{
interrupted = 1;
context_switch = 1;
+#ifdef RTS_SUPPORTS_THREADS
+ wakeBlockedWorkerThread();
+#endif
}
/* -----------------------------------------------------------------------------
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
/* take TSO off blocked_queue */
StgBlockingQueueElement *prev = NULL;
goto done;
}
}
- barf("unblockThread (I/O): TSO not found");
+ barf("unblockThread (delay): TSO not found");
}
default:
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
StgTSO *prev = NULL;
for (t = blocked_queue_hd; t != END_TSO_QUEUE;
goto done;
}
}
- barf("unblockThread (I/O): TSO not found");
+ barf("unblockThread (delay): TSO not found");
}
default:
raiseAsync(tso,NULL);
}
+static void
+deleteThreadImmediately(StgTSO *tso)
+{ // for forkProcess only:
+ // delete thread without giving it a chance to catch the KillThread exception
+
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+#if defined(RTS_SUPPORTS_THREADS)
+ if (tso->why_blocked != BlockedOnCCall
+ && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
+#endif
+ unblockThread(tso);
+ tso->what_next = ThreadKilled;
+}
+
void
raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
{
if (tso->why_blocked != BlockedOnBlackHole) {
continue;
}
-
blocked_on = tso->block_info.closure;
frame = (StgClosure *)tso->sp;
while(1) {
info = get_ret_itbl(frame);
switch (info->i.type) {
-
case UPDATE_FRAME:
if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
/* We are blocking on one of our own computations, so
case BlockedOnWrite:
fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
break;
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+ fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+ break;
+#endif
case BlockedOnDelay:
fprintf(stderr,"is blocked until %d", tso->block_info.target);
break;
case BlockedOnCCall:
fprintf(stderr,"is blocked on an external call");
break;
+ case BlockedOnCCall_NoUnblockExc:
+ fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
+ break;
#endif
default:
barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",