/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $
+ * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
#include <stdlib.h>
#include <stdarg.h>
+#ifdef HAVE_ERRNO_H
+#include <errno.h>
+#endif
+
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures
*/
StgMainThread *main_threads = NULL;
-#ifdef THREADED_RTS
-// Pointer to the thread that executes main
-// When this thread is finished, the program terminates
-// by calling shutdownHaskellAndExit.
-// It would be better to add a call to shutdownHaskellAndExit
-// to the Main.main wrapper and to remove this hack.
-StgMainThread *main_main_thread = NULL;
-#endif
-
/* Thread queues.
* Locks required: sched_mutex.
*/
void addToBlockedQueue ( StgTSO *tso );
-static void schedule ( void );
+static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
void interruptStgRts ( void );
static void detectBlackHoles ( void );
static void
taskStart(void)
{
- schedule();
+ schedule(NULL,NULL);
}
#endif
------------------------------------------------------------------------ */
//@cindex schedule
static void
-schedule( void )
+schedule( StgMainThread *mainThread, Capability *initialCapability )
{
StgTSO *t;
- Capability *cap;
+ Capability *cap = initialCapability;
StgThreadReturnCode ret;
#if defined(GRAN)
rtsEvent *event;
ACQUIRE_LOCK(&sched_mutex);
#if defined(RTS_SUPPORTS_THREADS)
- waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
- IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
+ /* in the threaded case, the capability is either passed in via the initialCapability
+ parameter, or initialized inside the scheduler loop */
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
+ osThreadId(), osThreadId()));
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### main thread: %p\n",mainThread));
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### initial cap: %p\n",initialCapability));
#else
/* simply initialise it in the non-threaded case */
grabCapability(&cap);
#if defined(RTS_SUPPORTS_THREADS)
/* Check to see whether there are any worker threads
waiting to deposit external call results. If so,
- yield our capability */
- yieldToReturningWorker(&sched_mutex, &cap);
+ yield our capability... if we have a capability, that is. */
+ if(cap)
+ yieldToReturningWorker(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+
+ /* If we do not currently hold a capability, we wait for one */
+ if(!cap)
+ {
+ waitForWorkCapability(&sched_mutex, &cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL);
+ IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
+ osThreadId()));
+ }
#endif
/* If we're interrupted (the user pressed ^C, or some other
*/
#if defined(RTS_SUPPORTS_THREADS)
{
- StgMainThread *m, **prev;
- prev = &main_threads;
- for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
- switch (m->tso->what_next) {
- case ThreadComplete:
- if (m->ret) {
- // NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(m->ret) = (StgClosure *)m->tso->sp[1];
- }
- *prev = m->link;
- m->stat = Success;
- broadcastCondition(&m->wakeup);
-#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
-#endif
- if(m == main_main_thread)
- {
- releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
- RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
- }
- break;
- case ThreadKilled:
- if (m->ret) *(m->ret) = NULL;
- *prev = m->link;
- if (was_interrupted) {
- m->stat = Interrupted;
- } else {
- m->stat = Killed;
- }
- broadcastCondition(&m->wakeup);
+ StgMainThread *m, **prev;
+ prev = &main_threads;
+ for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
+ if (m->tso->what_next == ThreadComplete
+ || m->tso->what_next == ThreadKilled)
+ {
+ if(m == mainThread)
+ {
+ if(m->tso->what_next == ThreadComplete)
+ {
+ if (m->ret)
+ {
+ // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+ *(m->ret) = (StgClosure *)m->tso->sp[1];
+ }
+ m->stat = Success;
+ }
+ else
+ {
+ if (m->ret)
+ {
+ *(m->ret) = NULL;
+ }
+ if (was_interrupted)
+ {
+ m->stat = Interrupted;
+ }
+ else
+ {
+ m->stat = Killed;
+ }
+ }
+ *prev = m->link;
+
#ifdef DEBUG
- removeThreadLabel((StgWord)m->tso);
+ removeThreadLabel((StgWord)m->tso);
#endif
- if(m == main_main_thread)
- {
releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
RELEASE_LOCK(&sched_mutex);
- shutdownHaskellAndExit(EXIT_SUCCESS);
+ return;
+ }
+ else
+ {
+ // The current OS thread can not handle the fact that the Haskell
+ // thread "m" has ended.
+ // "m" is bound; the scheduler loop in it's bound OS thread has
+ // to return, so let's pass our capability directly to that thread.
+ passCapability(&sched_mutex, cap, &m->bound_thread_cond);
+ cap = NULL;
+ }
}
- break;
- default:
- break;
}
- }
}
-
+
+ if(!cap) // If we gave our capability away,
+ continue; // go to the top to get it back
+
#else /* not threaded */
# if defined(PAR)
IF_DEBUG(sanity,checkTSO(t));
#endif
+#ifdef THREADED_RTS
+ {
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == t)
+ break;
+ }
+
+ if(m)
+ {
+ if(m == mainThread)
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
+ t, osThreadId()));
+ // yes, the Haskell thread is bound to the current native thread
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
+ t, osThreadId()));
+ // no, bound to a different Haskell thread: pass to that thread
+ PUSH_ON_RUN_QUEUE(t);
+ passCapability(&sched_mutex,cap,&m->bound_thread_cond);
+ cap = NULL;
+ continue;
+ }
+ }
+ else
+ {
+ // The thread we want to run is not bound.
+ if(mainThread == NULL)
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
+ t, osThreadId()));
+ // if we are a worker thread,
+ // we may run it here
+ }
+ else
+ {
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
+ t, mainThread, osThreadId()));
+ // no, the current native thread is bound to a different
+ // Haskell thread, so pass it to any worker thread
+ PUSH_ON_RUN_QUEUE(t);
+ releaseCapability(cap);
+ cap = NULL;
+ continue;
+ }
+ }
+ }
+#endif
+
cap->r.rCurrentTSO = t;
/* context switches are now initiated by the timer signal, unless
ret = ThreadFinished;
break;
case ThreadRunGHC:
+ errno = t->saved_errno;
ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+ t->saved_errno = errno;
break;
case ThreadInterpret:
ret = interpretBCO(cap);
ACQUIRE_LOCK(&sched_mutex);
#ifdef RTS_SUPPORTS_THREADS
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
+ IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
#endif
}
/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#ifdef THREADED_RTS
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso)
+{
+#ifdef THREADED_RTS
+ StgMainThread *m;
+ for(m = main_threads; m; m = m->link)
+ {
+ if(m->tso == tso)
+ return rtsTrue;
+ }
+#endif
+ return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
+static void
+deleteThreadImmediately(StgTSO *tso);
+
StgInt
forkProcess(StgTSO* tso)
{
#ifndef mingw32_TARGET_OS
pid_t pid;
StgTSO* t,*next;
- StgMainThread *m;
- rtsBool doKill;
IF_DEBUG(scheduler,sched_belch("forking!"));
+ ACQUIRE_LOCK(&sched_mutex);
pid = fork();
if (pid) { /* parent */
/* just return the pid */
} else { /* child */
+#ifdef THREADED_RTS
+ /* wipe all other threads */
+ run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+ tso->link = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ /* Don't kill the current thread.. */
+ if (t->id == tso->id) {
+ continue;
+ }
+
+ if (isThreadBound(t)) {
+ // If the thread is bound, the OS thread that the thread is bound to
+ // no longer exists after the fork() system call.
+ // The bound Haskell thread is therefore unable to run at all;
+ // we must not give it a chance to survive by catching the
+ // ThreadKilled exception. So we kill it "brutally" rather than
+ // using deleteThread.
+ deleteThreadImmediately(t);
+ } else {
+ deleteThread(t);
+ }
+ }
+
+ if (isThreadBound(tso)) {
+ } else {
+ // If the current is not bound, then we should make it so.
+ // The OS thread left over by fork() is special in that the process
+ // will terminate as soon as the thread terminates;
+ // we'd expect forkProcess to behave similarily.
+ // FIXME - we don't do this.
+ }
+#else
+ StgMainThread *m;
+ rtsBool doKill;
/* wipe all other threads */
run_queue_hd = run_queue_tl = END_TSO_QUEUE;
tso->link = END_TSO_QUEUE;
deleteThread(t);
}
}
+#endif
}
+ RELEASE_LOCK(&sched_mutex);
return pid;
#else /* mingw32 */
barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
{
nat tok;
Capability *cap;
+ int saved_errno = errno;
/* assume that *reg is a pointer to the StgRegTable part
* of a Capability.
/* Other threads _might_ be available for execution; signal this */
THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
+
+ errno = saved_errno;
return tok;
}
{
StgTSO *tso, **prev;
Capability *cap;
+ int saved_errno = errno;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result. */
#if defined(RTS_SUPPORTS_THREADS)
RELEASE_LOCK(&sched_mutex);
#endif
+ errno = saved_errno;
return &cap->r;
}
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
+ tso->saved_errno = 0;
+
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
- TSO_STRUCT_SIZEW;
}
#endif
-static SchedulerStatus waitThread_(/*out*/StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
+static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
+ Capability *initialCapability
);
}
SchedulerStatus
-scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
+scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
{ // Precondition: sched_mutex must be held
StgMainThread *m;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+ initCondition(&m->bound_thread_cond);
+#endif
#endif
/* Put the thread on the main-threads list prior to scheduling the TSO.
main_threads = m;
scheduleThread_(tso);
-#if defined(THREADED_RTS)
- return waitThread_(m, rtsTrue);
-#else
- return waitThread_(m);
-#endif
+
+ return waitThread_(m, initialCapability);
}
/* ---------------------------------------------------------------------------
{
do {
while (run_queue_hd != END_TSO_QUEUE) {
- waitThread ( run_queue_hd, NULL);
+ waitThread ( run_queue_hd, NULL, NULL );
}
while (blocked_queue_hd != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
+ waitThread ( blocked_queue_hd, NULL, NULL );
}
while (sleeping_queue != END_TSO_QUEUE) {
- waitThread ( blocked_queue_hd, NULL);
+ waitThread ( blocked_queue_hd, NULL, NULL );
}
} while
(blocked_queue_hd != END_TSO_QUEUE ||
}
SchedulerStatus
-waitThread(StgTSO *tso, /*out*/StgClosure **ret)
+waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
{
StgMainThread *m;
SchedulerStatus stat;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+ initCondition(&m->bound_thread_cond);
+#endif
#endif
/* see scheduleWaitThread() comment */
main_threads = m;
IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
-#if defined(THREADED_RTS)
- stat = waitThread_(m, rtsFalse);
-#else
- stat = waitThread_(m);
-#endif
+
+ stat = waitThread_(m,initialCapability);
+
RELEASE_LOCK(&sched_mutex);
return stat;
}
static
SchedulerStatus
-waitThread_(StgMainThread* m
-#if defined(THREADED_RTS)
- , rtsBool blockWaiting
-#endif
- )
+waitThread_(StgMainThread* m, Capability *initialCapability)
{
SchedulerStatus stat;
// Precondition: sched_mutex must be held.
IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
-#if defined(RTS_SUPPORTS_THREADS)
-
-# if defined(THREADED_RTS)
- if (!blockWaiting) {
- /* In the threaded case, the OS thread that called main()
- * gets to enter the RTS directly without going via another
- * task/thread.
- */
- main_main_thread = m;
- RELEASE_LOCK(&sched_mutex);
- schedule();
- ACQUIRE_LOCK(&sched_mutex);
- main_main_thread = NULL;
- ASSERT(m->stat != NoStatus);
- } else
-# endif
- {
+#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
+ { // FIXME: does this still make sense?
+ // It's not for the threaded rts => SMP only
do {
waitCondition(&m->wakeup, &sched_mutex);
} while (m->stat == NoStatus);
CurrentProc = MainProc; // PE to run it on
RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
#else
RELEASE_LOCK(&sched_mutex);
- schedule();
+ schedule(m,initialCapability);
+ ACQUIRE_LOCK(&sched_mutex);
ASSERT(m->stat != NoStatus);
#endif
#if defined(RTS_SUPPORTS_THREADS)
closeCondition(&m->wakeup);
+#if defined(THREADED_RTS)
+ closeCondition(&m->bound_thread_cond);
+#endif
#endif
IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
raiseAsync(tso,NULL);
}
+static void
+deleteThreadImmediately(StgTSO *tso)
+{ // for forkProcess only:
+ // delete thread without giving it a chance to catch the KillThread exception
+
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+ return;
+ }
+ unblockThread(tso);
+ tso->what_next = ThreadKilled;
+}
+
void
raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
{