X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FCapability.c;h=1d282f090218bb7f285025b7b54fdea3392bcd61;hb=3cdb0ada5aecbcbe940bacf577c02c41bc65c629;hp=51a42ef468470d9dc750606b3a90c7c54a126794;hpb=0065d5ab628975892cea1ec7303f968c3338cbe1;p=ghc-hetmet.git diff --git a/rts/Capability.c b/rts/Capability.c index 51a42ef..1d282f0 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -23,8 +23,10 @@ #include "STM.h" #include "OSThreads.h" #include "Capability.h" +#include "Storage.h" #include "Schedule.h" #include "Sparks.h" +#include "Trace.h" // one global capability, this is the Capability for non-threaded // builds, and for +RTS -N1 @@ -152,7 +154,8 @@ initCapability( Capability *cap, nat i ) cap->mut_lists[g] = NULL; } - cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE; + cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE; cap->free_trec_chunks = END_STM_CHUNK_LIST; cap->free_trec_headers = NO_TREC; cap->transaction_tokens = 0; @@ -196,8 +199,7 @@ initCapabilities( void ) initCapability(&capabilities[i], i); } - IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", - n_capabilities)); + debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities); #else /* !THREADED_RTS */ @@ -233,10 +235,10 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) { ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->cap == cap); - IF_DEBUG(scheduler, - sched_belch("passing capability %d to %s %p", - cap->no, task->tso ? "bound task" : "worker", - (void *)task->id)); + trace(TRACE_sched | DEBUG_sched, + "passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id); ACQUIRE_LOCK(&task->lock); task->wakeup = rtsTrue; // the wakeup flag is needed because signalCondition() doesn't @@ -291,8 +293,8 @@ releaseCapability_ (Capability* cap) // are threads that need to be completed. If the system is // shutting down, we never create a new worker. if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { - IF_DEBUG(scheduler, - sched_belch("starting new worker on capability %d", cap->no)); + debugTrace(DEBUG_sched, + "starting new worker on capability %d", cap->no); startWorkerTask(cap, workerStart); return; } @@ -310,7 +312,7 @@ releaseCapability_ (Capability* cap) } last_free_capability = cap; - IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); + trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no); } void @@ -396,8 +398,7 @@ waitForReturnCapability (Capability **pCap, Task *task) ACQUIRE_LOCK(&cap->lock); - IF_DEBUG(scheduler, - sched_belch("returning; I want capability %d", cap->no)); + debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no); if (!cap->running_task) { // It's free; just grab it @@ -435,8 +436,7 @@ waitForReturnCapability (Capability **pCap, Task *task) ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - IF_DEBUG(scheduler, - sched_belch("returning; got capability %d", cap->no)); + trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); *pCap = cap; #endif @@ -455,7 +455,7 @@ yieldCapability (Capability** pCap, Task *task) // The fast path has no locking, if we don't enter this while loop while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { - IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up // again. @@ -470,10 +470,12 @@ yieldCapability (Capability** pCap, Task *task) task->wakeup = rtsFalse; RELEASE_LOCK(&task->lock); - IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + debugTrace(DEBUG_sched, "woken up on capability %d", cap->no); + ACQUIRE_LOCK(&cap->lock); if (cap->running_task != NULL) { - IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no)); + debugTrace(DEBUG_sched, + "capability %d is owned by another task", cap->no); RELEASE_LOCK(&cap->lock); continue; } @@ -495,7 +497,7 @@ yieldCapability (Capability** pCap, Task *task) break; } - IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); ASSERT(cap->running_task == task); } @@ -518,8 +520,10 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) { ASSERT(tso->cap == cap); ASSERT(tso->bound ? tso->bound->cap == cap : 1); + ASSERT_LOCK_HELD(&cap->lock); + + tso->cap = cap; - ACQUIRE_LOCK(&cap->lock); if (cap->running_task == NULL) { // nobody is running this Capability, we can add our thread // directly onto the run queue and start up a Task to run it. @@ -527,6 +531,7 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) // start it up cap->running_task = myTask(); // precond for releaseCapability_() + trace(TRACE_sched, "resuming capability %d", cap->no); releaseCapability_(cap); } else { appendToWakeupQueue(cap,tso); @@ -534,6 +539,33 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) // freed without first checking the wakeup queue (see // releaseCapability_). } +} + +void +wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso) +{ + ACQUIRE_LOCK(&cap->lock); + migrateThreadToCapability (cap, tso); + RELEASE_LOCK(&cap->lock); +} + +void +migrateThreadToCapability (Capability *cap, StgTSO *tso) +{ + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + wakeupThreadOnCapability(cap,tso); +} + +void +migrateThreadToCapability_lock (Capability *cap, StgTSO *tso) +{ + ACQUIRE_LOCK(&cap->lock); + migrateThreadToCapability (cap, tso); RELEASE_LOCK(&cap->lock); } @@ -557,6 +589,7 @@ prodCapabilities(rtsBool all) ACQUIRE_LOCK(&cap->lock); if (!cap->running_task) { if (cap->spare_workers) { + trace(TRACE_sched, "resuming capability %d", cap->no); task = cap->spare_workers; ASSERT(!task->stopped); giveCapabilityToTask(cap,task); @@ -615,29 +648,42 @@ shutdownCapability (Capability *cap, Task *task) task->cap = cap; - for (i = 0; i < 50; i++) { - IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + // Loop indefinitely until all the workers have exited and there + // are no Haskell threads left. We used to bail out after 50 + // iterations of this loop, but that occasionally left a worker + // running which caused problems later (the closeMutex() below + // isn't safe, for one thing). + + for (i = 0; /* i < 50 */; i++) { + debugTrace(DEBUG_sched, + "shutting down capability %d, attempt %d", cap->no, i); ACQUIRE_LOCK(&cap->lock); if (cap->running_task) { RELEASE_LOCK(&cap->lock); - IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + debugTrace(DEBUG_sched, "not owner, yielding"); yieldThread(); continue; } cap->running_task = task; if (!emptyRunQueue(cap) || cap->spare_workers) { - IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + debugTrace(DEBUG_sched, + "runnable threads or workers still alive, yielding"); releaseCapability_(cap); // this will wake up a worker RELEASE_LOCK(&cap->lock); yieldThread(); continue; } - IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no); RELEASE_LOCK(&cap->lock); break; } // we now have the Capability, its run queue and spare workers // list are both empty. + + // ToDo: we can't drop this mutex, because there might still be + // threads performing foreign calls that will eventually try to + // return via resumeThread() and attempt to grab cap->lock. + // closeMutex(&cap->lock); } /* ----------------------------------------------------------------------------