/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.160 2002/12/13 15:16:29 simonmar Exp $
+ * $Id: Schedule.c,v 1.172 2003/07/12 00:09:15 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
#include "Interpreter.h"
#include "Exception.h"
#include "Printer.h"
-#include "Main.h"
#include "Signals.h"
#include "Sanity.h"
#include "Stats.h"
-#include "Itimer.h"
+#include "Timer.h"
#include "Prelude.h"
#include "ThreadLabels.h"
#ifdef PROFILING
*/
StgMainThread *main_threads = NULL;
+#ifdef THREADED_RTS
+// Pointer to the thread that executes main
+// When this thread is finished, the program terminates
+// by calling shutdownHaskellAndExit.
+// It would be better to add a call to shutdownHaskellAndExit
+// to the Main.main wrapper and to remove this hack.
+StgMainThread *main_main_thread = NULL;
+#endif
+
/* Thread queues.
* Locks required: sched_mutex.
*/
/* The smallest stack size that makes any sense is:
* RESERVED_STACK_WORDS (so we can get back from the stack overflow)
* + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
- * + 1 (the realworld token for an IO thread)
* + 1 (the closure to enter)
+ * + 1 (stg_ap_v_ret)
+ * + 1 (spare slot req'd by stg_ap_v_ret)
*
* A thread with this stack will bomb immediately with a stack
* overflow, which will increase its stack size.
*/
-#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
+#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
#if defined(GRAN)
}
#endif
-
-
+#if defined(RTS_SUPPORTS_THREADS)
+void
+startSchedulerTask(void)
+{
+ startTask(taskStart);
+}
+#endif
//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop
#if defined(RTS_SUPPORTS_THREADS)
waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
+ IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
#else
/* simply initialise it in the non-threaded case */
grabCapability(&cap);
*/
if (interrupted) {
IF_DEBUG(scheduler, sched_belch("interrupted"));
- deleteAllThreads();
interrupted = rtsFalse;
was_interrupted = rtsTrue;
+#if defined(RTS_SUPPORTS_THREADS)
+ // In the threaded RTS, deadlock detection doesn't work,
+ // so just exit right away.
+ prog_belch("interrupted");
+ releaseCapability(cap);
+ startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
+ RELEASE_LOCK(&sched_mutex);
+ shutdownHaskellAndExit(EXIT_SUCCESS);
+#else
+ deleteAllThreads();
+#endif
}
/* Go through the list of main threads and wake up any
{
StgMainThread *m, **prev;
prev = &main_threads;
- for (m = main_threads; m != NULL; m = m->link) {
+ for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
switch (m->tso->what_next) {
case ThreadComplete:
if (m->ret) {
#ifdef DEBUG
removeThreadLabel((StgWord)m->tso);
#endif
+ if(m == main_main_thread)
+ {
+ releaseCapability(cap);
+ startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
+ RELEASE_LOCK(&sched_mutex);
+ shutdownHaskellAndExit(EXIT_SUCCESS);
+ }
break;
case ThreadKilled:
if (m->ret) *(m->ret) = NULL;
#ifdef DEBUG
removeThreadLabel((StgWord)m->tso);
#endif
+ if(m == main_main_thread)
+ {
+ releaseCapability(cap);
+ startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
+ RELEASE_LOCK(&sched_mutex);
+ shutdownHaskellAndExit(EXIT_SUCCESS);
+ }
break;
default:
break;
#endif // SMP
/* check for signals each time around the scheduler */
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
/* Check whether any waiting threads need to be woken up. If the
* run queue is empty, and there are no other tasks running, we
* can wait indefinitely for something to happen.
- * ToDo: what if another client comes along & requests another
- * main thread?
*/
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
+ if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
+#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
+ || EMPTY_RUN_QUEUE()
+#endif
+ )
+ {
awaitEvent( EMPTY_RUN_QUEUE()
#if defined(SMP)
&& allFreeCapabilities()
* If no threads are black holed, we have a deadlock situation, so
* inform all the main threads.
*/
-#ifndef PAR
+#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
if ( EMPTY_THREAD_QUEUES()
#if defined(RTS_SUPPORTS_THREADS)
&& EMPTY_QUEUE(suspended_ccalling_threads)
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
}
not_deadlocked:
+#elif defined(RTS_SUPPORTS_THREADS)
+ /* ToDo: add deadlock detection in threaded RTS */
#elif defined(PAR)
/* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
#endif
#endif
#if defined(RTS_SUPPORTS_THREADS)
+#if defined(SMP)
/* block until we've got a thread on the run queue and a free
* capability.
*
waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
}
+#else
+ if ( EMPTY_RUN_QUEUE() ) {
+ continue; // nothing to do
+ }
+#endif
#endif
#if defined(GRAN)
// expensive if there is lots of thread switching going on...
IF_DEBUG(sanity,checkTSO(t));
#endif
-
+
cap->r.rCurrentTSO = t;
/* context switches are now initiated by the timer signal, unless
else
context_switch = 0;
+run_thread:
+
RELEASE_LOCK(&sched_mutex);
IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
/* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
/* Run the current thread
*/
- run_thread:
prev_what_next = t->what_next;
switch (prev_what_next) {
case ThreadKilled:
#endif
ACQUIRE_LOCK(&sched_mutex);
-
-#ifdef SMP
+
+#ifdef RTS_SUPPORTS_THREADS
IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-StgInt forkProcess(StgTSO* tso) {
-
+StgInt
+forkProcess(StgTSO* tso)
+{
#ifndef mingw32_TARGET_OS
pid_t pid;
StgTSO* t,*next;
* Locks: sched_mutex held.
* ------------------------------------------------------------------------- */
-void deleteAllThreads ( void )
+void
+deleteAllThreads ( void )
{
StgTSO* t, *next;
IF_DEBUG(scheduler,sched_belch("deleting all threads"));
suspended_ccalling_threads = cap->r.rCurrentTSO;
#if defined(RTS_SUPPORTS_THREADS)
- cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+ if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
+ {
+ cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+ cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+ }
+ else
+ {
+ cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+ }
#endif
/* Use the thread ID as the token; it should be unique */
#if defined(RTS_SUPPORTS_THREADS)
/* Preparing to leave the RTS, so ensure there's a native thread/task
waiting to take over.
-
- ToDo: optimise this and only create a new task if there's a need
- for one (i.e., if there's only one Concurrent Haskell thread alive,
- there's no need to create a new task).
*/
- IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
- if (concCall) {
- startTask(taskStart);
- }
+ IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
+ //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
+ startTask(taskStart);
+ //}
#endif
/* Other threads _might_ be available for execution; signal this */
StgRegTable *
resumeThread( StgInt tok,
- rtsBool concCall
-#if !defined(RTS_SUPPORTS_THREADS)
- STG_UNUSED
-#endif
- )
+ rtsBool concCall STG_UNUSED )
{
StgTSO *tso, **prev;
Capability *cap;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result. */
- if ( concCall ) {
- ACQUIRE_LOCK(&sched_mutex);
- grabReturnCapability(&sched_mutex, &cap);
- } else {
- grabCapability(&cap);
- }
+ ACQUIRE_LOCK(&sched_mutex);
+ grabReturnCapability(&sched_mutex, &cap);
+
+ IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
#else
grabCapability(&cap);
#endif
barf("resumeThread: thread not found");
}
tso->link = END_TSO_QUEUE;
+
+#if defined(RTS_SUPPORTS_THREADS)
+ if(tso->why_blocked == BlockedOnCCall)
+ {
+ awakenBlockedQueueNoLock(tso->blocked_exceptions);
+ tso->blocked_exceptions = NULL;
+ }
+#endif
+
/* Reset blocking status */
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
+#if defined(RTS_SUPPORTS_THREADS)
RELEASE_LOCK(&sched_mutex);
+#endif
return &cap->r;
}
/* Caveat: Once set, you can only set the thread name to "" */
len = strlen(label)+1;
- buf = malloc(len);
- if (buf == NULL) {
- fprintf(stderr,"insufficient memory for labelThread!\n");
- } else
- strncpy(buf,label,len);
+ buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
+ strncpy(buf,label,len);
/* Update will free the old memory for us */
updateThreadLabel((StgWord)tso,buf);
}
* on this thread's stack before the scheduler is invoked.
* ------------------------------------------------------------------------ */
-static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
+static void scheduleThread_ (StgTSO* tso);
void
-scheduleThread_(StgTSO *tso
- , rtsBool createTask
-#if !defined(THREADED_RTS)
- STG_UNUSED
-#endif
- )
+scheduleThread_(StgTSO *tso)
{
// Precondition: sched_mutex must be held.
* soon as we release the scheduler lock below.
*/
PUSH_ON_RUN_QUEUE(tso);
-#if defined(THREADED_RTS)
- /* If main() is scheduling a thread, don't bother creating a
- * new task.
- */
- if ( createTask ) {
- startTask(taskStart);
- }
-#endif
THREAD_RUNNABLE();
#if 0
void scheduleThread(StgTSO* tso)
{
ACQUIRE_LOCK(&sched_mutex);
- scheduleThread_(tso, rtsFalse);
+ scheduleThread_(tso);
RELEASE_LOCK(&sched_mutex);
}
SchedulerStatus
scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
-{
+{ // Precondition: sched_mutex must be held
StgMainThread *m;
m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
signal the completion of the its work item for the main thread to
see (==> it got stuck waiting.) -- sof 6/02.
*/
- ACQUIRE_LOCK(&sched_mutex);
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
m->link = main_threads;
main_threads = m;
- scheduleThread_(tso, rtsTrue);
+ scheduleThread_(tso);
#if defined(THREADED_RTS)
- return waitThread_(m, rtsTrue); // waitThread_ releases sched_mutex
+ return waitThread_(m, rtsTrue);
#else
return waitThread_(m);
#endif
waitThread(StgTSO *tso, /*out*/StgClosure **ret)
{
StgMainThread *m;
+ SchedulerStatus stat;
m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
m->tso = tso;
IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
#if defined(THREADED_RTS)
- return waitThread_(m, rtsFalse); // waitThread_ releases sched_mutex
+ stat = waitThread_(m, rtsFalse);
#else
- return waitThread_(m);
+ stat = waitThread_(m);
#endif
+ RELEASE_LOCK(&sched_mutex);
+ return stat;
}
static
* gets to enter the RTS directly without going via another
* task/thread.
*/
+ main_main_thread = m;
RELEASE_LOCK(&sched_mutex);
schedule();
+ ACQUIRE_LOCK(&sched_mutex);
+ main_main_thread = NULL;
ASSERT(m->stat != NoStatus);
} else
# endif
IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
m->tso->id));
- free(m);
+ stgFree(m);
-#if defined(THREADED_RTS)
- if (blockWaiting)
-#endif
- RELEASE_LOCK(&sched_mutex);
-
- // Postcondition: sched_mutex must not be held
+ // Postcondition: sched_mutex still held
return stat;
}
markSparkQueue(evac);
#endif
-#ifndef mingw32_TARGET_OS
+#if defined(RTS_USER_SIGNALS)
// mark the signal handlers (signals should be already blocked)
markSignalHandlers(evac);
#endif
static StgTSO *
threadStackOverflow(StgTSO *tso)
{
- nat new_stack_size, new_tso_size, diff, stack_words;
+ nat new_stack_size, new_tso_size, stack_words;
StgPtr new_sp;
StgTSO *dest;
memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
/* relocate the stack pointers... */
- diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
- dest->sp = new_sp;
+ dest->sp = new_sp;
dest->stack_size = new_stack_size;
/* Mark the old TSO as relocated. We have to check for relocated
}
#else /* !GRAN && !PAR */
+
+#ifdef RTS_SUPPORTS_THREADS
+void
+awakenBlockedQueueNoLock(StgTSO *tso)
+{
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOneLocked(tso);
+ }
+}
+#endif
+
void
awakenBlockedQueue(StgTSO *tso)
{
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
/* take TSO off blocked_queue */
StgBlockingQueueElement *prev = NULL;
goto done;
}
}
- barf("unblockThread (I/O): TSO not found");
+ barf("unblockThread (delay): TSO not found");
}
default:
case BlockedOnRead:
case BlockedOnWrite:
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+#endif
{
StgTSO *prev = NULL;
for (t = blocked_queue_hd; t != END_TSO_QUEUE;
goto done;
}
}
- barf("unblockThread (I/O): TSO not found");
+ barf("unblockThread (delay): TSO not found");
}
default:
if (tso->why_blocked != BlockedOnBlackHole) {
continue;
}
-
blocked_on = tso->block_info.closure;
frame = (StgClosure *)tso->sp;
while(1) {
info = get_ret_itbl(frame);
switch (info->i.type) {
-
case UPDATE_FRAME:
if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
/* We are blocking on one of our own computations, so
case BlockedOnWrite:
fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
break;
+#if defined(mingw32_TARGET_OS)
+ case BlockedOnDoProc:
+ fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+ break;
+#endif
case BlockedOnDelay:
fprintf(stderr,"is blocked until %d", tso->block_info.target);
break;
case BlockedOnCCall:
fprintf(stderr,"is blocked on an external call");
break;
+ case BlockedOnCCall_NoUnblockExc:
+ fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
+ break;
#endif
default:
barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",