X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=b3d34463d39746b91d28eb5ae25787d3e68b4a8d;hb=b77098031265b55b28ddf6f3fe89c429c66cceb7;hp=d73559e204ad41c1a19c97434d87c3a117109b37;hpb=e289780ec23a1934f39d214d60705f8f26d1763d;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index d73559e..b3d3446 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $ + * $Id: Schedule.c,v 1.128 2002/02/15 20:58:14 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -266,68 +266,6 @@ static void sched_belch(char *s, ...); Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; - -/* - * When a native thread has completed executing an external - * call, it needs to communicate the result back to the - * (Haskell) thread that made the call. Do this as follows: - * - * - in resumeThread(), the thread increments the counter - * rts_n_returning_workers, and then blocks waiting on the - * condition returning_worker_cond. - * - upon entry to the scheduler, a worker/task checks - * rts_n_returning_workers. If it is > 0, worker threads - * are waiting to return, so it gives up its capability - * to let a worker deposit its result. - * - the worker thread that gave up its capability then tries - * to re-grab a capability and re-enter the Scheduler. - */ - - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its data structures (that are not - * under sched_mutex's control). - * - * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. - * - */ -Condition thread_ready_cond = INIT_COND_VAR; -#if 0 -/* For documentation purposes only */ -#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) -#endif - -/* - * To be able to make an informed decision about whether or not - * to create a new task when making an external call, keep track of - * the number of tasks currently blocked waiting on thread_ready_cond. - * (if > 0 => no need for a new task, just unblock an existing one). - */ -nat rts_n_waiting_tasks = 0; - -/* returning_worker_cond: when a worker thread returns from executing an - * external call, it needs to wait for an RTS Capability before passing - * on the result of the call to the Haskell thread that made it. - * - * returning_worker_cond is signalled in Capability.releaseCapability(). - * - */ -Condition returning_worker_cond = INIT_COND_VAR; - -/* - * To avoid starvation of threads blocked on worker_thread_cond, - * the task(s) that enter the Scheduler will check to see whether - * there are one or more worker threads blocked waiting on - * returning_worker_cond. - * - * Locks needed: sched_mutex - */ -nat rts_n_waiting_workers = 0; - - # if defined(SMP) static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; @@ -440,41 +378,19 @@ schedule( void ) # endif #endif rtsBool was_interrupted = rtsFalse; - -#if defined(RTS_SUPPORTS_THREADS) -schedule_start: -#endif -#if defined(RTS_SUPPORTS_THREADS) ACQUIRE_LOCK(&sched_mutex); -#endif #if defined(RTS_SUPPORTS_THREADS) - /* ToDo: consider SMP support */ - if ( rts_n_waiting_workers > 0 && noCapabilities() ) { - /* (At least) one native thread is waiting to - * deposit the result of an external call. So, - * be nice and hand over our capability. - */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers)); - releaseCapability(cap); - RELEASE_LOCK(&sched_mutex); - - yieldThread(); - goto schedule_start; - } -#endif + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability */ + yieldToReturningWorker(&sched_mutex, cap); -#if defined(RTS_SUPPORTS_THREADS) - while ( noCapabilities() ) { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, &sched_mutex); - rts_n_waiting_tasks--; - } + waitForWorkCapability(&sched_mutex, &cap, rtsFalse); #endif #if defined(GRAN) - /* set up first event to get things going */ /* ToDo: assign costs for system setup and init MainTSO ! */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -731,7 +647,6 @@ schedule_start: if ( EMPTY_RUN_QUEUE() ) { IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); shutdownHaskellAndExit(0); - } #endif ASSERT( !EMPTY_RUN_QUEUE() ); @@ -759,18 +674,19 @@ schedule_start: if ( EMPTY_RUN_QUEUE() ) { /* Give up our capability */ releaseCapability(cap); - while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); - rts_n_waiting_tasks++; - waitCondition( &thread_ready_cond, &sched_mutex ); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE())); + IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); + waitForWorkCapability(&sched_mutex, &cap, rtsTrue); + IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); +#if 0 + while ( EMPTY_RUN_QUEUE() ) { + waitForWorkCapability(&sched_mutex, &cap); + IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); } +#endif } #endif #if defined(GRAN) - if (RtsFlags.GranFlags.Light) GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc @@ -1016,7 +932,7 @@ schedule_start: belch("--=^ %d threads, %d sparks on [%#x]", run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); -#if 1 +# if 1 if (0 && RtsFlags.ParFlags.ParStats.Full && t && LastTSO && t->id != LastTSO->id && LastTSO->why_blocked == NotBlocked && @@ -1041,7 +957,7 @@ schedule_start: emitSchedule = rtsFalse; } -#endif +# endif #else /* !GRAN && !PAR */ /* grab a thread from the run queue */ @@ -1415,12 +1331,11 @@ schedule_start: } #endif + if (ready_to_gc #ifdef SMP - if (ready_to_gc && allFreeCapabilities() ) -#else - if (ready_to_gc) + && allFreeCapabilities() #endif - { + ) { /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1550,10 +1465,11 @@ suspendThread( StgRegTable *reg ) for one (i.e., if there's only one Concurrent Haskell thread alive, there's no need to create a new task). */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok)); + IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok)); startTask(taskStart); #endif + /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); return tok; @@ -1566,21 +1482,10 @@ resumeThread( StgInt tok ) Capability *cap; #if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok)); - ACQUIRE_LOCK(&sched_mutex); - rts_n_waiting_workers++; - IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers)); - - /* - * Wait for the go ahead - */ - IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities)); - while ( noCapabilities() ) { - waitCondition(&returning_worker_cond, &sched_mutex); - } - rts_n_waiting_workers--; - - IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok)); + /* Wait for permission to re-enter the RTS with the result. */ + grabReturnCapability(&sched_mutex, &cap); +#else + grabCapability(&cap); #endif /* Remove the thread off of the suspended list */ @@ -1597,28 +1502,12 @@ resumeThread( StgInt tok ) barf("resumeThread: thread not found"); } tso->link = END_TSO_QUEUE; - -#if defined(RTS_SUPPORTS_THREADS) - /* Is it clever to block here with the TSO off the list, - * but not hooked up to a capability? - */ - while ( noCapabilities() ) { - IF_DEBUG(scheduler, sched_belch("waiting to resume")); - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, &sched_mutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); - } -#endif - - grabCapability(&cap); - RELEASE_LOCK(&sched_mutex); - /* Reset blocking status */ tso->why_blocked = NotBlocked; - cap->r.rCurrentTSO = tso; + RELEASE_LOCK(&sched_mutex); + cap->r.rCurrentTSO = tso; return &cap->r; } @@ -1923,10 +1812,13 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ +static void scheduleThread_ (StgTSO* tso, rtsBool createTask); + void scheduleThread_(StgTSO *tso -#if defined(THREADED_RTS) , rtsBool createTask +#if !defined(THREADED_RTS) + STG_UNUSED #endif ) { @@ -1956,11 +1848,12 @@ scheduleThread_(StgTSO *tso void scheduleThread(StgTSO* tso) { -#if defined(THREADED_RTS) + return scheduleThread_(tso, rtsFalse); +} + +void scheduleExtThread(StgTSO* tso) +{ return scheduleThread_(tso, rtsTrue); -#else - return scheduleThread_(tso); -#endif } /* --------------------------------------------------------------------------- @@ -2024,7 +1917,6 @@ initScheduler(void) initMutex(&term_mutex); initCondition(&thread_ready_cond); - initCondition(&returning_worker_cond); #endif #if defined(SMP) @@ -3181,51 +3073,39 @@ raiseAsync(StgTSO *tso, StgClosure *exception) StgAP_UPD * ap; /* If we find a CATCH_FRAME, and we've got an exception to raise, - * then build PAP(handler,exception,realworld#), and leave it on - * top of the stack ready to enter. + * then build the THUNK raise(exception), and leave it on + * top of the CATCH_FRAME ready to enter. */ if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) { StgCatchFrame *cf = (StgCatchFrame *)su; + StgClosure *raise; + /* we've got an exception to raise, so let's pass it to the * handler in this frame. */ - ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2); - TICK_ALLOC_UPD_PAP(3,0); - SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs); - - ap->n_args = 2; - ap->fun = cf->handler; /* :: Exception -> IO a */ - ap->payload[0] = exception; - ap->payload[1] = ARG_TAG(0); /* realworld token */ - - /* throw away the stack from Sp up to and including the - * CATCH_FRAME. - */ - sp = (P_)su + sizeofW(StgCatchFrame) - 1; - tso->su = cf->link; - - /* Restore the blocked/unblocked state for asynchronous exceptions - * at the CATCH_FRAME. - * - * If exceptions were unblocked at the catch, arrange that they - * are unblocked again after executing the handler by pushing an - * unblockAsyncExceptions_ret stack frame. + raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + /* throw away the stack from Sp up to the CATCH_FRAME. */ - if (!cf->exceptions_blocked) { - *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info; - } - - /* Ensure that async exceptions are blocked when running the handler. + sp = (P_)su - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. */ if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; + tso->blocked_exceptions = END_TSO_QUEUE; } - - /* Put the newly-built PAP on top of the stack, ready to execute + + /* Put the newly-built THUNK on top of the stack, ready to execute * when the thread restarts. */ - sp[0] = (W_)ap; + sp[0] = (W_)raise; tso->sp = sp; + tso->su = su; tso->what_next = ThreadEnterGHC; IF_DEBUG(sanity, checkTSO(tso)); return; @@ -3742,7 +3622,6 @@ sched_belch(char *s, ...) //@subsection Index //@index -//* MainRegTable:: @cindex\s-+MainRegTable //* StgMainThread:: @cindex\s-+StgMainThread //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd @@ -3760,5 +3639,4 @@ sched_belch(char *s, ...) //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue //* term_mutex:: @cindex\s-+term_mutex -//* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index