X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FCapability.c;h=ffaa372f98abe16f2eecc935c00f09140c73af24;hp=51a42ef468470d9dc750606b3a90c7c54a126794;hb=c004ec62b41aa2137b5b5e298ca562609b0de92e;hpb=0065d5ab628975892cea1ec7303f968c3338cbe1 diff --git a/rts/Capability.c b/rts/Capability.c index 51a42ef..ffaa372 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -25,6 +25,7 @@ #include "Capability.h" #include "Schedule.h" #include "Sparks.h" +#include "Trace.h" // one global capability, this is the Capability for non-threaded // builds, and for +RTS -N1 @@ -152,7 +153,8 @@ initCapability( Capability *cap, nat i ) cap->mut_lists[g] = NULL; } - cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE; + cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE; cap->free_trec_chunks = END_STM_CHUNK_LIST; cap->free_trec_headers = NO_TREC; cap->transaction_tokens = 0; @@ -196,8 +198,7 @@ initCapabilities( void ) initCapability(&capabilities[i], i); } - IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", - n_capabilities)); + debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities); #else /* !THREADED_RTS */ @@ -233,10 +234,10 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) { ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->cap == cap); - IF_DEBUG(scheduler, - sched_belch("passing capability %d to %s %p", - cap->no, task->tso ? "bound task" : "worker", - (void *)task->id)); + trace(TRACE_sched | DEBUG_sched, + "passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id); ACQUIRE_LOCK(&task->lock); task->wakeup = rtsTrue; // the wakeup flag is needed because signalCondition() doesn't @@ -291,8 +292,8 @@ releaseCapability_ (Capability* cap) // are threads that need to be completed. If the system is // shutting down, we never create a new worker. if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { - IF_DEBUG(scheduler, - sched_belch("starting new worker on capability %d", cap->no)); + debugTrace(DEBUG_sched, + "starting new worker on capability %d", cap->no); startWorkerTask(cap, workerStart); return; } @@ -310,7 +311,7 @@ releaseCapability_ (Capability* cap) } last_free_capability = cap; - IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); + trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no); } void @@ -396,8 +397,7 @@ waitForReturnCapability (Capability **pCap, Task *task) ACQUIRE_LOCK(&cap->lock); - IF_DEBUG(scheduler, - sched_belch("returning; I want capability %d", cap->no)); + debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no); if (!cap->running_task) { // It's free; just grab it @@ -435,8 +435,7 @@ waitForReturnCapability (Capability **pCap, Task *task) ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - IF_DEBUG(scheduler, - sched_belch("returning; got capability %d", cap->no)); + trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); *pCap = cap; #endif @@ -455,7 +454,7 @@ yieldCapability (Capability** pCap, Task *task) // The fast path has no locking, if we don't enter this while loop while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { - IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up // again. @@ -470,10 +469,12 @@ yieldCapability (Capability** pCap, Task *task) task->wakeup = rtsFalse; RELEASE_LOCK(&task->lock); - IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + debugTrace(DEBUG_sched, "woken up on capability %d", cap->no); + ACQUIRE_LOCK(&cap->lock); if (cap->running_task != NULL) { - IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no)); + debugTrace(DEBUG_sched, + "capability %d is owned by another task", cap->no); RELEASE_LOCK(&cap->lock); continue; } @@ -495,7 +496,7 @@ yieldCapability (Capability** pCap, Task *task) break; } - IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); ASSERT(cap->running_task == task); } @@ -518,8 +519,10 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) { ASSERT(tso->cap == cap); ASSERT(tso->bound ? tso->bound->cap == cap : 1); + ASSERT_LOCK_HELD(&cap->lock); + + tso->cap = cap; - ACQUIRE_LOCK(&cap->lock); if (cap->running_task == NULL) { // nobody is running this Capability, we can add our thread // directly onto the run queue and start up a Task to run it. @@ -527,6 +530,7 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) // start it up cap->running_task = myTask(); // precond for releaseCapability_() + trace(TRACE_sched, "resuming capability %d", cap->no); releaseCapability_(cap); } else { appendToWakeupQueue(cap,tso); @@ -534,6 +538,33 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso) // freed without first checking the wakeup queue (see // releaseCapability_). } +} + +void +wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso) +{ + ACQUIRE_LOCK(&cap->lock); + migrateThreadToCapability (cap, tso); + RELEASE_LOCK(&cap->lock); +} + +void +migrateThreadToCapability (Capability *cap, StgTSO *tso) +{ + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + wakeupThreadOnCapability(cap,tso); +} + +void +migrateThreadToCapability_lock (Capability *cap, StgTSO *tso) +{ + ACQUIRE_LOCK(&cap->lock); + migrateThreadToCapability (cap, tso); RELEASE_LOCK(&cap->lock); } @@ -557,6 +588,7 @@ prodCapabilities(rtsBool all) ACQUIRE_LOCK(&cap->lock); if (!cap->running_task) { if (cap->spare_workers) { + trace(TRACE_sched, "resuming capability %d", cap->no); task = cap->spare_workers; ASSERT(!task->stopped); giveCapabilityToTask(cap,task); @@ -607,7 +639,7 @@ prodOneCapability (void) * ------------------------------------------------------------------------- */ void -shutdownCapability (Capability *cap, Task *task) +shutdownCapability (Capability *cap, Task *task, rtsBool safe) { nat i; @@ -615,29 +647,83 @@ shutdownCapability (Capability *cap, Task *task) task->cap = cap; - for (i = 0; i < 50; i++) { - IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + // Loop indefinitely until all the workers have exited and there + // are no Haskell threads left. We used to bail out after 50 + // iterations of this loop, but that occasionally left a worker + // running which caused problems later (the closeMutex() below + // isn't safe, for one thing). + + for (i = 0; /* i < 50 */; i++) { + debugTrace(DEBUG_sched, + "shutting down capability %d, attempt %d", cap->no, i); ACQUIRE_LOCK(&cap->lock); if (cap->running_task) { RELEASE_LOCK(&cap->lock); - IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + debugTrace(DEBUG_sched, "not owner, yielding"); yieldThread(); continue; } cap->running_task = task; + + if (cap->spare_workers) { + // Look for workers that have died without removing + // themselves from the list; this could happen if the OS + // summarily killed the thread, for example. This + // actually happens on Windows when the system is + // terminating the program, and the RTS is running in a + // DLL. + Task *t, *prev; + prev = NULL; + for (t = cap->spare_workers; t != NULL; t = t->next) { + if (!osThreadIsAlive(t->id)) { + debugTrace(DEBUG_sched, + "worker thread %p has died unexpectedly", (void *)t->id); + if (!prev) { + cap->spare_workers = t->next; + } else { + prev->next = t->next; + } + prev = t; + } + } + } + if (!emptyRunQueue(cap) || cap->spare_workers) { - IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + debugTrace(DEBUG_sched, + "runnable threads or workers still alive, yielding"); releaseCapability_(cap); // this will wake up a worker RELEASE_LOCK(&cap->lock); yieldThread(); continue; } - IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + + // If "safe", then busy-wait for any threads currently doing + // foreign calls. If we're about to unload this DLL, for + // example, we need to be sure that there are no OS threads + // that will try to return to code that has been unloaded. + // We can be a bit more relaxed when this is a standalone + // program that is about to terminate, and let safe=false. + if (cap->suspended_ccalling_tasks && safe) { + debugTrace(DEBUG_sched, + "thread(s) are involved in foreign calls, yielding"); + cap->running_task = NULL; + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; + } + + debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no); + freeCapability(cap); RELEASE_LOCK(&cap->lock); break; } // we now have the Capability, its run queue and spare workers // list are both empty. + + // ToDo: we can't drop this mutex, because there might still be + // threads performing foreign calls that will eventually try to + // return via resumeThread() and attempt to grab cap->lock. + // closeMutex(&cap->lock); } /* ---------------------------------------------------------------------------- @@ -665,4 +751,11 @@ tryGrabCapability (Capability *cap, Task *task) #endif /* THREADED_RTS */ +void +freeCapability (Capability *cap) { + stgFree(cap->mut_lists); +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + freeSparkPool(&cap->r.rSparks); +#endif +}