X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=92d2cf6de4179cdb1c67e3f56a01216266692409;hb=91fd21013ba04932b16eb9633d9ab12da0e8b808;hp=c0b47208c0ba72c56bf9657f9c54a77bfa60bd29;hpb=20fc2f0ced64a12d8e44956931b2ac341ed2186f;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index c0b4720..92d2cf6 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.94 2001/03/22 03:51:10 hwloidl Exp $ + * $Id: Schedule.c,v 1.118 2002/02/06 01:29:27 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -10,10 +10,11 @@ * * WAY Name CPP flag What's it for * -------------------------------------- - * mp GUM PAR Parallel execution on a distributed memory machine - * s SMP SMP Parallel execution on a shared memory machine - * mg GranSim GRAN Simulation of parallel execution - * md GUM/GdH DIST Distributed execution (based on GUM) + * mp GUM PAR Parallel execution on a distributed memory machine + * s SMP SMP Parallel execution on a shared memory machine + * mg GranSim GRAN Simulation of parallel execution + * md GUM/GdH DIST Distributed execution (based on GUM) + * * --------------------------------------------------------------------------*/ //@node Main scheduling code, , , @@ -74,6 +75,7 @@ //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code //@subsection Includes +#include "PosixSource.h" #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" @@ -81,7 +83,6 @@ #include "Storage.h" #include "StgRun.h" #include "StgStartup.h" -#include "GC.h" #include "Hooks.h" #include "Schedule.h" #include "StgMiscClosures.h" @@ -95,6 +96,10 @@ #include "Stats.h" #include "Itimer.h" #include "Prelude.h" +#ifdef PROFILING +#include "Proftimer.h" +#include "ProfHeap.h" +#endif #if defined(GRAN) || defined(PAR) # include "GranSimRts.h" # include "GranSim.h" @@ -105,6 +110,9 @@ # include "HLC.h" #endif #include "Sparks.h" +#include "Capability.h" +#include "OSThreads.h" +#include "Task.h" #include @@ -115,7 +123,7 @@ * * These are the threads which clients have requested that we run. * - * In an SMP build, we might have several concurrent clients all + * In a 'threaded' build, we might have several concurrent clients all * waiting for results, and each one will wait on a condition variable * until the result is available. * @@ -130,8 +138,8 @@ typedef struct StgMainThread_ { StgTSO * tso; SchedulerStatus stat; StgClosure ** ret; -#ifdef SMP - pthread_cond_t wakeup; +#if defined(RTS_SUPPORTS_THREADS) + Condition wakeup; #endif struct StgMainThread_ *link; } StgMainThread; @@ -181,7 +189,6 @@ StgTSO *all_threads; */ static StgTSO *suspended_ccalling_threads; -static void GetRoots(void); static StgTSO *threadStackOverflow(StgTSO *tso); /* KH: The following two flags are shared memory locations. There is no need @@ -221,18 +228,6 @@ StgThreadID next_thread_id = 1; #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) -/* Free capability list. - * Locks required: sched_mutex. - */ -#ifdef SMP -//@cindex free_capabilities -//@cindex n_free_capabilities -Capability *free_capabilities; /* Available capabilities for running threads */ -nat n_free_capabilities; /* total number of available capabilities */ -#else -//@cindex MainRegTable -Capability MainRegTable; /* for non-SMP, we have one global capability */ -#endif #if defined(GRAN) StgTSO *CurrentTSO; @@ -246,13 +241,6 @@ StgTSO dummy_tso; rtsBool ready_to_gc; -/* All our current task ids, saved in case we need to kill them later. - */ -#ifdef SMP -//@cindex task_ids -task_info *task_ids; -#endif - void addToBlockedQueue ( StgTSO *tso ); static void schedule ( void ); @@ -269,15 +257,60 @@ static void detectBlackHoles ( void ); static void sched_belch(char *s, ...); #endif -#ifdef SMP -//@cindex sched_mutex -//@cindex term_mutex -//@cindex thread_ready_cond -//@cindex gc_pending_cond -pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER; -pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER; +#if defined(RTS_SUPPORTS_THREADS) +/* ToDo: carefully document the invariants that go together + * with these synchronisation objects. + */ +Mutex sched_mutex = INIT_MUTEX_VAR; +Mutex term_mutex = INIT_MUTEX_VAR; +#if defined(THREADED_RTS) +/* + * The rts_mutex is the 'big lock' that the active native + * thread within the RTS holds while executing code. + * It is given up when the thread makes a transition out of + * the RTS (e.g., to perform an external C call), hopefully + * for another thread to take over its chores and enter + * the RTS. + * + */ +Mutex rts_mutex = INIT_MUTEX_VAR; +/* + * When a native thread has completed executing an external + * call, it needs to communicate the result back to the + * (Haskell) thread that made the call. Do this as follows: + * + * - in resumeThread(), the thread increments the counter + * threads_waiting, and then blocks on the 'big' RTS lock. + * - upon entry to the scheduler, the thread that's currently + * holding the RTS lock checks threads_waiting. If there + * are native threads waiting, it gives up its RTS lock + * and tries to re-grab the RTS lock [perhaps after having + * waited for a bit..?] + * - care must be taken to deal with the case where more than + * one external thread are waiting on the lock. [ToDo: more] + * + */ + +static nat threads_waiting = 0; +/* + * thread_ready_aux_mutex is used to handle the scenario where the + * the RTS executing thread runs out of work, but there are + * active external threads. The RTS executing thread gives up + * its RTS mutex, and blocks waiting for the thread_ready_cond. + * Unfortunately, a condition variable needs to be associated + * with a mutex in pthreads, so rts_thread_waiting_mutex is + * used for just this purpose. + * + */ +Mutex thread_ready_aux_mutex = INIT_MUTEX_VAR; +#endif + + +/* thread_ready_cond: when signalled, a thread has + * become runnable. When used? + */ +Condition thread_ready_cond = INIT_COND_VAR; +Condition gc_pending_cond = INIT_COND_VAR; nat await_death; #endif @@ -306,7 +339,7 @@ char *threadReturnCode_strs[] = { }; #endif -#ifdef PAR +#if defined(PAR) StgTSO * createSparkThread(rtsSpark spark); StgTSO * activateSpark (rtsSpark spark); #endif @@ -317,6 +350,24 @@ StgTSO * activateSpark (rtsSpark spark); StgTSO *MainTSO; */ +#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +static void taskStart(void); +static void +taskStart(void) +{ + /* threads start up using 'taskStart', so make them + them grab the RTS lock. */ +#if defined(THREADED_RTS) + ACQUIRE_LOCK(&rts_mutex); + taskNotAvailable(); +#endif + schedule(); +} +#endif + + + + //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code //@subsection Main scheduling loop @@ -377,6 +428,28 @@ schedule( void ) rtsBool was_interrupted = rtsFalse; ACQUIRE_LOCK(&sched_mutex); + +#if defined(THREADED_RTS) + /* ToDo: consider SMP support */ + if (threads_waiting > 0) { + /* (At least) one native thread is waiting to + * deposit the result of an external call. So, + * give up our RTS executing privileges and let + * one of them continue. + * + */ + taskAvailable(); + RELEASE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting)); + RELEASE_LOCK(&rts_mutex); + /* ToDo: come up with mechanism that guarantees that + * the main thread doesn't loop here. + */ + yieldThread(); + /* ToDo: longjmp() */ + taskStart(); + } +#endif #if defined(GRAN) @@ -430,7 +503,7 @@ schedule( void ) * should be done more efficiently without a linear scan * of the main threads list, somehow... */ -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) { StgMainThread *m, **prev; prev = &main_threads; @@ -442,16 +515,17 @@ schedule( void ) } *prev = m->link; m->stat = Success; - pthread_cond_broadcast(&m->wakeup); + broadcastCondition(&m->wakeup); break; case ThreadKilled: + if (m->ret) *(m->ret) = NULL; *prev = m->link; if (was_interrupted) { m->stat = Interrupted; } else { m->stat = Killed; } - pthread_cond_broadcast(&m->wakeup); + broadcastCondition(&m->wakeup); break; default: break; @@ -459,7 +533,8 @@ schedule( void ) } } -#else +#else /* not threaded */ + # if defined(PAR) /* in GUM do this only on the Main PE */ if (IAmMainThread) @@ -477,6 +552,7 @@ schedule( void ) m->stat = Success; return; } else { + if (m->ret) { *(m->ret) = NULL; }; if (was_interrupted) { m->stat = Interrupted; } else { @@ -491,10 +567,13 @@ schedule( void ) /* Top up the run queue from our spark pool. We try to make the * number of threads in the run queue equal to the number of * free capabilities. + * + * Disable spark support in SMP for now, non-essential & requires + * a little bit of work to make it compile cleanly. -- sof 1/02. */ -#if defined(SMP) +#if 0 /* defined(SMP) */ { - nat n = n_free_capabilities; + nat n = getFreeCapabilities(); StgTSO *tso = run_queue_hd; /* Count the run queue */ @@ -521,11 +600,18 @@ schedule( void ) /* We need to wake up the other tasks if we just created some * work for them. */ - if (n_free_capabilities - n > 1) { - pthread_cond_signal(&thread_ready_cond); + if (getFreeCapabilities() - n > 1) { + signalCondition( &thread_ready_cond ); } } -#endif /* SMP */ +#endif // SMP + + /* check for signals each time around the scheduler */ +#ifndef mingw32_TARGET_OS + if (signals_pending()) { + startSignalHandlers(); + } +#endif /* Check whether any waiting threads need to be woken up. If the * run queue is empty, and there are no other tasks running, we @@ -536,21 +622,14 @@ schedule( void ) if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) { awaitEvent( (run_queue_hd == END_TSO_QUEUE) -#ifdef SMP - && (n_free_capabilities == RtsFlags.ParFlags.nNodes) +#if defined(SMP) + && allFreeCapabilities() #endif ); } /* we can be interrupted while waiting for I/O... */ if (interrupted) continue; - /* check for signals each time around the scheduler */ -#ifndef mingw32_TARGET_OS - if (signals_pending()) { - start_signal_handlers(); - } -#endif - /* * Detect deadlock: when we have no threads to run, there are no * threads waiting on I/O or sleeping, and all the other tasks are @@ -562,60 +641,118 @@ schedule( void ) * If no threads are black holed, we have a deadlock situation, so * inform all the main threads. */ -#ifdef SMP +#ifndef PAR if (blocked_queue_hd == END_TSO_QUEUE && run_queue_hd == END_TSO_QUEUE && sleeping_queue == END_TSO_QUEUE - && (n_free_capabilities == RtsFlags.ParFlags.nNodes)) +#if defined(SMP) + && allFreeCapabilities() +#elif defined(THREADED_RTS) + && suspended_ccalling_threads == END_TSO_QUEUE +#endif + ) { - IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes...")); - detectBlackHoles(); - if (run_queue_hd == END_TSO_QUEUE) { - StgMainThread *m; - for (m = main_threads; m != NULL; m = m->link) { - m->ret = NULL; - m->stat = Deadlock; - pthread_cond_broadcast(&m->wakeup); + IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); + RELEASE_LOCK(&sched_mutex); + GarbageCollect(GetRoots,rtsTrue); + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("GC done.")); + if (blocked_queue_hd == END_TSO_QUEUE + && run_queue_hd == END_TSO_QUEUE + && sleeping_queue == END_TSO_QUEUE) { + + IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes...")); + detectBlackHoles(); + + /* No black holes, so probably a real deadlock. Send the + * current main thread the Deadlock exception (or in the SMP + * build, send *all* main threads the deadlock exception, + * since none of them can make progress). + */ + if (run_queue_hd == END_TSO_QUEUE) { + StgMainThread *m; +#if defined(RTS_SUPPORTS_THREADS) + for (m = main_threads; m != NULL; m = m->link) { + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); + } + } +#else + m = main_threads; + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); + } +#endif + } +#if defined(RTS_SUPPORTS_THREADS) + if ( run_queue_hd == END_TSO_QUEUE ) { + IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down.")); + shutdownHaskellAndExit(0); + } - main_threads = NULL; +#endif + ASSERT( run_queue_hd != END_TSO_QUEUE ); } } #elif defined(PAR) /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ -#else /* ! SMP */ - if (blocked_queue_hd == END_TSO_QUEUE - && run_queue_hd == END_TSO_QUEUE - && sleeping_queue == END_TSO_QUEUE) - { - IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes...")); - detectBlackHoles(); - if (run_queue_hd == END_TSO_QUEUE) { - StgMainThread *m = main_threads; - m->ret = NULL; - m->stat = Deadlock; - main_threads = m->link; - return; - } - } #endif -#ifdef SMP +#if defined(SMP) /* If there's a GC pending, don't do anything until it has * completed. */ if (ready_to_gc) { IF_DEBUG(scheduler,sched_belch("waiting for GC")); - pthread_cond_wait(&gc_pending_cond, &sched_mutex); + waitCondition( &gc_pending_cond, &sched_mutex ); } - +#endif + +#if defined(SMP) /* block until we've got a thread on the run queue and a free * capability. */ - while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) { + while ( run_queue_hd == END_TSO_QUEUE + || noFreeCapabilities() + ) { IF_DEBUG(scheduler, sched_belch("waiting for work")); - pthread_cond_wait(&thread_ready_cond, &sched_mutex); + waitCondition( &thread_ready_cond, &sched_mutex ); IF_DEBUG(scheduler, sched_belch("work now available")); } +#elif defined(THREADED_RTS) + if ( run_queue_hd == END_TSO_QUEUE ) { + /* no work available, wait for external calls to complete. */ + IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId())); + taskAvailable(); + RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(&rts_mutex); + + /* Sigh - need to have a mutex locked in order to wait on the + condition variable. */ + ACQUIRE_LOCK(&thread_ready_aux_mutex); + waitCondition(&thread_ready_cond, &thread_ready_aux_mutex); + RELEASE_LOCK(&thread_ready_aux_mutex); + + IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId())); + /* ToDo: longjmp() */ + taskStart(); + } #endif #if defined(GRAN) @@ -897,30 +1034,26 @@ schedule( void ) */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); + // Sanity check the thread we're about to run. This can be + // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); - #endif - /* grab a capability - */ -#ifdef SMP - cap = free_capabilities; - free_capabilities = cap->link; - n_free_capabilities--; -#else - cap = &MainRegTable; -#endif - - cap->rCurrentTSO = t; + grabCapability(&cap); + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 */ - if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 - && (run_queue_hd != END_TSO_QUEUE - || blocked_queue_hd != END_TSO_QUEUE - || sleeping_queue != END_TSO_QUEUE)) + if ( +#ifdef PROFILING + RtsFlags.ProfFlags.profileInterval == 0 || +#endif + (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 + && (run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE))) context_switch = 1; else context_switch = 0; @@ -930,20 +1063,24 @@ schedule( void ) IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", t->id, t, whatNext_strs[t->what_next])); +#ifdef PROFILING + startHeapProfTimer(); +#endif + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ /* Run the current thread */ - switch (cap->rCurrentTSO->what_next) { + switch (cap->r.rCurrentTSO->what_next) { case ThreadKilled: case ThreadComplete: /* Thread already finished, return to scheduler. */ ret = ThreadFinished; break; case ThreadEnterGHC: - ret = StgRun((StgFunPtr) stg_enterStackTop, cap); + ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r); break; case ThreadRunGHC: - ret = StgRun((StgFunPtr) stg_returnToStackTop, cap); + ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); break; case ThreadEnterInterp: ret = interpretBCO(cap); @@ -955,17 +1092,18 @@ schedule( void ) /* Costs for the scheduler are assigned to CCS_SYSTEM */ #ifdef PROFILING + stopHeapProfTimer(); CCCS = CCS_SYSTEM; #endif ACQUIRE_LOCK(&sched_mutex); #ifdef SMP - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self());); + IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif - t = cap->rCurrentTSO; + t = cap->r.rCurrentTSO; #if defined(PAR) /* HACK 675: if the last thread didn't yield, make sure to print a @@ -978,14 +1116,65 @@ schedule( void ) switch (ret) { case HeapOverflow: #if defined(GRAN) - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); + IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t)); globalGranStats.tot_heapover++; #elif defined(PAR) - // IF_DEBUG(par, - //DumpGranEvent(GR_DESCHEDULE, t); globalParStats.tot_heapover++; #endif + + // did the task ask for a large block? + if (cap->r.rHpAlloc > BLOCK_SIZE_W) { + // if so, get one and push it on the front of the nursery. + bdescr *bd; + nat blocks; + + blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE; + + IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", + t->id, t, + whatNext_strs[t->what_next], blocks)); + + // don't do this if it would push us over the + // alloc_blocks_lim limit; we'll GC first. + if (alloc_blocks + blocks < alloc_blocks_lim) { + + alloc_blocks += blocks; + bd = allocGroup( blocks ); + + // link the new group into the list + bd->link = cap->r.rCurrentNursery; + bd->u.back = cap->r.rCurrentNursery->u.back; + if (cap->r.rCurrentNursery->u.back != NULL) { + cap->r.rCurrentNursery->u.back->link = bd; + } else { + ASSERT(g0s0->blocks == cap->r.rCurrentNursery && + g0s0->blocks == cap->r.rNursery); + cap->r.rNursery = g0s0->blocks = bd; + } + cap->r.rCurrentNursery->u.back = bd; + + // initialise it as a nursery block + bd->step = g0s0; + bd->gen_no = 0; + bd->flags = 0; + bd->free = bd->start; + + // don't forget to update the block count in g0s0. + g0s0->n_blocks += blocks; + ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); + + // now update the nursery to point to the new block + cap->r.rCurrentNursery = bd; + + // we might be unlucky and have another thread get on the + // run queue before us and steal the large block, but in that + // case the thread will just end up requesting another large + // block. + PUSH_ON_RUN_QUEUE(t); + break; + } + } + /* make all the running tasks block on a condition variable, * maybe set context_switch and wait till they all pile in, * then have them wait on a GC condition variable. @@ -1200,13 +1389,20 @@ schedule( void ) } #ifdef SMP - cap->link = free_capabilities; - free_capabilities = cap; - n_free_capabilities++; + grabCapability(&cap); +#endif + +#ifdef PROFILING + if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) { + GarbageCollect(GetRoots, rtsTrue); + heapCensus(); + performHeapProfile = rtsFalse; + ready_to_gc = rtsFalse; // we already GC'd + } #endif #ifdef SMP - if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) + if (ready_to_gc && allFreeCapabilities() ) #else if (ready_to_gc) #endif @@ -1216,13 +1412,13 @@ schedule( void ) * to do it in another thread. Either way, we need to * broadcast on gc_pending_cond afterward. */ -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif GarbageCollect(GetRoots,rtsFalse); ready_to_gc = rtsFalse; #ifdef SMP - pthread_cond_broadcast(&gc_pending_cond); + broadcastCondition(&gc_pending_cond); #endif #if defined(GRAN) /* add a ContinueThread event to continue execution of current thread */ @@ -1235,24 +1431,20 @@ schedule( void ) G_CURR_THREADQ(0)); #endif /* GRAN */ } + #if defined(GRAN) next_thread: IF_GRAN_DEBUG(unused, print_eventq(EventHd)); event = get_next_event(); - #elif defined(PAR) next_thread: /* ToDo: wait for next message to arrive rather than busy wait */ - -#else /* GRAN */ - /* not any more - next_thread: - t = take_off_run_queue(END_TSO_QUEUE); - */ #endif /* GRAN */ + } /* end of while(1) */ + IF_PAR_DEBUG(verbose, belch("== Leaving schedule() after having received Finish")); } @@ -1266,15 +1458,18 @@ schedule( void ) void deleteAllThreads ( void ) { - StgTSO* t; + StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) { + for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) { + next = t->link; deleteThread(t); } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) { + for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) { + next = t->link; deleteThread(t); } - for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) { + for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) { + next = t->link; deleteThread(t); } run_queue_hd = run_queue_tl = END_TSO_QUEUE; @@ -1284,6 +1479,7 @@ void deleteAllThreads ( void ) /* startThread and insertThread are now in GranSim.c -- HWL */ + //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code //@subsection Suspend and Resume @@ -1303,40 +1499,75 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( Capability *cap ) +suspendThread( StgRegTable *reg ) { nat tok; + Capability *cap; + + /* assume that *reg is a pointer to the StgRegTable part + * of a Capability. + */ + cap = (Capability *)((void *)reg - sizeof(StgFunTable)); ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); - threadPaused(cap->rCurrentTSO); - cap->rCurrentTSO->link = suspended_ccalling_threads; - suspended_ccalling_threads = cap->rCurrentTSO; + threadPaused(cap->r.rCurrentTSO); + cap->r.rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->r.rCurrentTSO; /* Use the thread ID as the token; it should be unique */ - tok = cap->rCurrentTSO->id; + tok = cap->r.rCurrentTSO->id; -#ifdef SMP - cap->link = free_capabilities; - free_capabilities = cap; - n_free_capabilities++; -#endif + /* Hand back capability */ + releaseCapability(&cap); + +#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) + /* Preparing to leave the RTS, so ensure there's a native thread/task + waiting to take over. + + ToDo: optimise this and only create a new task if there's a need + for one (i.e., if there's only one Concurrent Haskell thread alive, + there's no need to create a new task). + */ + IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok)); + startTask(taskStart); +#endif + RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(&rts_mutex); return tok; } -Capability * +StgRegTable * resumeThread( StgInt tok ) { StgTSO *tso, **prev; Capability *cap; +#if defined(THREADED_RTS) + IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok)); ACQUIRE_LOCK(&sched_mutex); + threads_waiting++; + IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting)); + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok)); + ACQUIRE_LOCK(&rts_mutex); + threads_waiting--; + taskNotAvailable(); + IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok)); +#endif + +#if defined(THREADED_RTS) + /* Free up any RTS-blocked threads. */ + broadcastCondition(&thread_ready_cond); +#endif + /* Remove the thread off of the suspended list */ prev = &suspended_ccalling_threads; for (tso = suspended_ccalling_threads; tso != END_TSO_QUEUE; @@ -1351,23 +1582,19 @@ resumeThread( StgInt tok ) } tso->link = END_TSO_QUEUE; -#ifdef SMP - while (free_capabilities == NULL) { +#if defined(SMP) + while ( noFreeCapabilities() ) { IF_DEBUG(scheduler, sched_belch("waiting to resume")); - pthread_cond_wait(&thread_ready_cond, &sched_mutex); + waitCondition(&thread_ready_cond, &sched_mutex); IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); } - cap = free_capabilities; - free_capabilities = cap->link; - n_free_capabilities--; -#else - cap = &MainRegTable; #endif - cap->rCurrentTSO = tso; + grabCapability(&cap); - RELEASE_LOCK(&sched_mutex); - return cap; + cap->r.rCurrentTSO = tso; + + return &cap->r; } @@ -1394,6 +1621,16 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) } /* --------------------------------------------------------------------------- + * Fetching the ThreadID from an StgTSO. + * + * This is used in the implementation of Show for ThreadIds. + * ------------------------------------------------------------------------ */ +int rts_getThreadId(const StgTSO *tso) +{ + return tso->id; +} + +/* --------------------------------------------------------------------------- Create a new thread. The new thread starts with the given stack size. Before the @@ -1459,7 +1696,7 @@ createThread_(nat size, rtsBool have_lock) stack_size = size - TSO_STRUCT_SIZEW; tso = (StgTSO *)allocate(size); - TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0); + TICK_ALLOC_TSO(stack_size, 0); SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); #if defined(GRAN) @@ -1478,7 +1715,6 @@ createThread_(nat size, rtsBool have_lock) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; - //tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS; tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -1665,11 +1901,6 @@ activateSpark (rtsSpark spark) void scheduleThread(StgTSO *tso) { - if (tso==END_TSO_QUEUE){ - schedule(); - return; - } - ACQUIRE_LOCK(&sched_mutex); /* Put the new thread on the head of the runnable queue. The caller @@ -1687,29 +1918,12 @@ scheduleThread(StgTSO *tso) } /* --------------------------------------------------------------------------- - * startTasks() - * - * Start up Posix threads to run each of the scheduler tasks. - * I believe the task ids are not needed in the system as defined. - * KH @ 25/10/99 - * ------------------------------------------------------------------------ */ - -#if defined(PAR) || defined(SMP) -void -taskStart(void) /* ( void *arg STG_UNUSED) */ -{ - scheduleThread(END_TSO_QUEUE); -} -#endif - -/* --------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the * queues contained any threads, they'll be garbage collected at the * next pass. * - * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * ------------------------------------------------------------------------ */ #ifdef SMP @@ -1720,11 +1934,10 @@ term_handler(int sig STG_UNUSED) ACQUIRE_LOCK(&term_mutex); await_death--; RELEASE_LOCK(&term_mutex); - pthread_exit(NULL); + shutdownThread(); } #endif -//@cindex initScheduler void initScheduler(void) { @@ -1757,6 +1970,27 @@ initScheduler(void) RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; + +#if defined(RTS_SUPPORTS_THREADS) + /* Initialise the mutex and condition variables used by + * the scheduler. */ + initMutex(&rts_mutex); + initMutex(&sched_mutex); + initMutex(&term_mutex); +#if defined(THREADED_RTS) + initMutex(&thread_ready_aux_mutex); +#endif + + initCondition(&thread_ready_cond); + initCondition(&gc_pending_cond); +#endif + +#if defined(THREADED_RTS) + /* Grab big lock */ + ACQUIRE_LOCK(&rts_mutex); + IF_DEBUG(scheduler, + sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId())); +#endif /* Install the SIGHUP handler */ #ifdef SMP @@ -1772,92 +2006,31 @@ initScheduler(void) } #endif -#ifdef SMP - /* Allocate N Capabilities */ - { - nat i; - Capability *cap, *prev; - cap = NULL; - prev = NULL; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities"); - cap->link = prev; - prev = cap; - } - free_capabilities = cap; - n_free_capabilities = RtsFlags.ParFlags.nNodes; - } - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", - n_free_capabilities);); + /* A capability holds the state a native thread needs in + * order to execute STG code. At least one capability is + * floating around (only SMP builds have more than one). + */ + initCapabilities(); + +#if defined(RTS_SUPPORTS_THREADS) + /* start our haskell execution tasks */ +# if defined(SMP) + startTaskManager(RtsFlags.ParFlags.nNodes, taskStart); +# else + startTaskManager(0,taskStart); +# endif #endif -#if defined(SMP) || defined(PAR) +#if /* defined(SMP) ||*/ defined(PAR) initSparkPools(); #endif } -#ifdef SMP -void -startTasks( void ) -{ - nat i; - int r; - pthread_t tid; - - /* make some space for saving all the thread ids */ - task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info), - "initScheduler:task_ids"); - - /* and create all the threads */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - r = pthread_create(&tid,NULL,taskStart,NULL); - if (r != 0) { - barf("startTasks: Can't create new Posix thread"); - } - task_ids[i].id = tid; - task_ids[i].mut_time = 0.0; - task_ids[i].mut_etime = 0.0; - task_ids[i].gc_time = 0.0; - task_ids[i].gc_etime = 0.0; - task_ids[i].elapsedtimestart = elapsedtime(); - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid);); - } -} -#endif - void exitScheduler( void ) { -#ifdef SMP - nat i; - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(task_ids[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", - task_ids[i].id)); - pthread_join(task_ids[i].id, NULL); - } -#endif - - /* Send 'em all a SIGHUP. That should shut 'em up. - */ - await_death = RtsFlags.ParFlags.nNodes; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_kill(task_ids[i].id,SIGTERM); - } - while (await_death > 0) { - sched_yield(); - } +#if defined(RTS_SUPPORTS_THREADS) + stopTaskManager(); #endif } @@ -1932,8 +2105,8 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->tso = tso; m->ret = ret; m->stat = NoStatus; -#ifdef SMP - pthread_cond_init(&m->wakeup, NULL); +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&m->wakeup); #endif m->link = main_threads; @@ -1944,7 +2117,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) #ifdef SMP do { - pthread_cond_wait(&m->wakeup, &sched_mutex); + waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); #elif defined(GRAN) /* GranSim specific init */ @@ -1954,14 +2127,15 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) schedule(); #else + RELEASE_LOCK(&sched_mutex); schedule(); ASSERT(m->stat != NoStatus); #endif stat = m->stat; -#ifdef SMP - pthread_cond_destroy(&m->wakeup); +#if defined(RTS_SUPPORTS_THREADS) + closeCondition(&m->wakeup); #endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", @@ -2093,7 +2267,8 @@ take_off_run_queue(StgTSO *tso) { KH @ 25/10/99 */ -static void GetRoots(void) +void +GetRoots(evac_fn evac) { StgMainThread *m; @@ -2102,16 +2277,16 @@ static void GetRoots(void) nat i; for (i=0; i<=RtsFlags.GranFlags.proc; i++) { if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]); + evac((StgClosure **)&run_queue_hds[i]); if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]); + evac((StgClosure **)&run_queue_tls[i]); if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]); + evac((StgClosure **)&blocked_queue_hds[i]); if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]); + evac((StgClosure **)&blocked_queue_tls[i]); if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]); + evac((StgClosure **)&ccalling_threads[i]); } } @@ -2119,31 +2294,31 @@ static void GetRoots(void) #else /* !GRAN */ if (run_queue_hd != END_TSO_QUEUE) { - ASSERT(run_queue_tl != END_TSO_QUEUE); - run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); - run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); + ASSERT(run_queue_tl != END_TSO_QUEUE); + evac((StgClosure **)&run_queue_hd); + evac((StgClosure **)&run_queue_tl); } - + if (blocked_queue_hd != END_TSO_QUEUE) { - ASSERT(blocked_queue_tl != END_TSO_QUEUE); - blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); - blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); + ASSERT(blocked_queue_tl != END_TSO_QUEUE); + evac((StgClosure **)&blocked_queue_hd); + evac((StgClosure **)&blocked_queue_tl); } - + if (sleeping_queue != END_TSO_QUEUE) { - sleeping_queue = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue); + evac((StgClosure **)&sleeping_queue); } #endif for (m = main_threads; m != NULL; m = m->link) { - m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso); + evac((StgClosure **)&m->tso); + } + if (suspended_ccalling_threads != END_TSO_QUEUE) { + evac((StgClosure **)&suspended_ccalling_threads); } - if (suspended_ccalling_threads != END_TSO_QUEUE) - suspended_ccalling_threads = - (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads); -#if defined(SMP) || defined(PAR) || defined(GRAN) - markSparkQueue(); +#if defined(PAR) || defined(GRAN) + markSparkQueue(evac); #endif } @@ -2160,7 +2335,7 @@ static void GetRoots(void) This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ -void (*extra_roots)(void); +void (*extra_roots)(evac_fn); void performGC(void) @@ -2175,17 +2350,16 @@ performMajorGC(void) } static void -AllRoots(void) +AllRoots(evac_fn evac) { - GetRoots(); /* the scheduler's roots */ - extra_roots(); /* the user's roots */ + GetRoots(evac); // the scheduler's roots + extra_roots(evac); // the user's roots } void -performGCWithRoots(void (*get_roots)(void)) +performGCWithRoots(void (*get_roots)(evac_fn)) { extra_roots = get_roots; - GarbageCollect(AllRoots,rtsFalse); } @@ -2233,7 +2407,7 @@ threadStackOverflow(StgTSO *tso) IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); - TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0); + TICK_ALLOC_TSO(new_stack_size,0); /* copy the TSO block and the old stack into the new area */ memcpy(dest,tso,TSO_STRUCT_SIZE); @@ -2245,11 +2419,10 @@ threadStackOverflow(StgTSO *tso) diff = (P_)new_sp - (P_)tso->sp; /* In *words* */ dest->su = (StgUpdateFrame *) ((P_)dest->su + diff); dest->sp = new_sp; - //dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso); dest->stack_size = new_stack_size; /* and relocate the update frame list */ - relocate_TSO(tso, dest); + relocate_stack(dest, diff); /* Mark the old TSO as relocated. We have to check for relocated * TSOs in the garbage collector and any primops that deal with TSOs. @@ -2923,7 +3096,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) } while (1) { - int words = ((P_)su - (P_)sp) - 1; + nat words = ((P_)su - (P_)sp) - 1; nat i; StgAP_UPD * ap; @@ -3467,7 +3640,7 @@ sched_belch(char *s, ...) va_list ap; va_start(ap,s); #ifdef SMP - fprintf(stderr, "scheduler (task %ld): ", pthread_self()); + fprintf(stderr, "scheduler (task %ld): ", osThreadId()); #elif defined(PAR) fprintf(stderr, "== "); #else @@ -3491,11 +3664,9 @@ sched_belch(char *s, ...) //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl //* context_switch:: @cindex\s-+context_switch //* createThread:: @cindex\s-+createThread -//* free_capabilities:: @cindex\s-+free_capabilities //* gc_pending_cond:: @cindex\s-+gc_pending_cond //* initScheduler:: @cindex\s-+initScheduler //* interrupted:: @cindex\s-+interrupted -//* n_free_capabilities:: @cindex\s-+n_free_capabilities //* next_thread_id:: @cindex\s-+next_thread_id //* print_bq:: @cindex\s-+print_bq //* run_queue_hd:: @cindex\s-+run_queue_hd @@ -3503,7 +3674,6 @@ sched_belch(char *s, ...) //* sched_mutex:: @cindex\s-+sched_mutex //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue -//* task_ids:: @cindex\s-+task_ids //* term_mutex:: @cindex\s-+term_mutex //* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index