X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FCapability.c;h=444532480d1148e83b56a3ba784a3a0476893a29;hb=27de38efce6d73d2a0209f803cfa98c82773e773;hp=7fc1c577c4a170e2bf6ca6f774e0e55c25be574e;hpb=ac548e9fd9014cc372dcab31eb4c0392ee80ed4e;p=ghc-hetmet.git diff --git a/rts/Capability.c b/rts/Capability.c index 7fc1c57..4445324 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -153,7 +153,8 @@ initCapability( Capability *cap, nat i ) cap->mut_lists[g] = NULL; } - cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE; + cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE; cap->free_trec_chunks = END_STM_CHUNK_LIST; cap->free_trec_headers = NO_TREC; cap->transaction_tokens = 0; @@ -638,7 +639,7 @@ prodOneCapability (void) * ------------------------------------------------------------------------- */ void -shutdownCapability (Capability *cap, Task *task) +shutdownCapability (Capability *cap, Task *task, rtsBool safe) { nat i; @@ -663,6 +664,30 @@ shutdownCapability (Capability *cap, Task *task) continue; } cap->running_task = task; + + if (cap->spare_workers) { + // Look for workers that have died without removing + // themselves from the list; this could happen if the OS + // summarily killed the thread, for example. This + // actually happens on Windows when the system is + // terminating the program, and the RTS is running in a + // DLL. + Task *t, *prev; + prev = NULL; + for (t = cap->spare_workers; t != NULL; t = t->next) { + if (!osThreadIsAlive(t->id)) { + debugTrace(DEBUG_sched, + "worker thread %p has died unexpectedly", (void *)t->id); + if (!prev) { + cap->spare_workers = t->next; + } else { + prev->next = t->next; + } + prev = t; + } + } + } + if (!emptyRunQueue(cap) || cap->spare_workers) { debugTrace(DEBUG_sched, "runnable threads or workers still alive, yielding"); @@ -671,15 +696,34 @@ shutdownCapability (Capability *cap, Task *task) yieldThread(); continue; } + + // If "safe", then busy-wait for any threads currently doing + // foreign calls. If we're about to unload this DLL, for + // example, we need to be sure that there are no OS threads + // that will try to return to code that has been unloaded. + // We can be a bit more relaxed when this is a standalone + // program that is about to terminate, and let safe=false. + if (cap->suspended_ccalling_tasks && safe) { + debugTrace(DEBUG_sched, + "thread(s) are involved in foreign calls, yielding"); + cap->running_task = NULL; + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; + } + debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no); + freeCapability(cap); RELEASE_LOCK(&cap->lock); break; } // we now have the Capability, its run queue and spare workers // list are both empty. - // We end up here only in THREADED_RTS - closeMutex(&cap->lock); + // ToDo: we can't drop this mutex, because there might still be + // threads performing foreign calls that will eventually try to + // return via resumeThread() and attempt to grab cap->lock. + // closeMutex(&cap->lock); } /* ---------------------------------------------------------------------------- @@ -707,4 +751,75 @@ tryGrabCapability (Capability *cap, Task *task) #endif /* THREADED_RTS */ +void +freeCapability (Capability *cap) { + stgFree(cap->mut_lists); +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + freeSparkPool(&cap->r.rSparks); +#endif +} +/* --------------------------------------------------------------------------- + Mark everything directly reachable from the Capabilities. When + using multiple GC threads, each GC thread marks all Capabilities + for which (c `mod` n == 0), for Capability c and thread n. + ------------------------------------------------------------------------ */ + +void +markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) +{ + nat i; + Capability *cap; + Task *task; + + // Each GC thread is responsible for following roots from the + // Capability of the same number. There will usually be the same + // or fewer Capabilities as GC threads, but just in case there + // are more, we mark every Capability whose number is the GC + // thread's index plus a multiple of the number of GC threads. + for (i = i0; i < n_capabilities; i += delta) { + cap = &capabilities[i]; + evac(user, (StgClosure **)(void *)&cap->run_queue_hd); + evac(user, (StgClosure **)(void *)&cap->run_queue_tl); +#if defined(THREADED_RTS) + evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd); + evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl); +#endif + for (task = cap->suspended_ccalling_tasks; task != NULL; + task=task->next) { + debugTrace(DEBUG_sched, + "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id); + evac(user, (StgClosure **)(void *)&task->suspended_tso); + } + +#if defined(THREADED_RTS) + markSparkQueue (evac, user, cap); +#endif + } + +#if !defined(THREADED_RTS) + evac(user, (StgClosure **)(void *)&blocked_queue_hd); + evac(user, (StgClosure **)(void *)&blocked_queue_tl); + evac(user, (StgClosure **)(void *)&sleeping_queue); +#endif +} + +// This function is used by the compacting GC to thread all the +// pointers from spark queues. +void +traverseSparkQueues (evac_fn evac USED_IF_THREADS, void *user USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + nat i; + for (i = 0; i < n_capabilities; i++) { + traverseSparkQueue (evac, user, &capabilities[i]); + } +#endif // THREADED_RTS + +} + +void +markCapabilities (evac_fn evac, void *user) +{ + markSomeCapabilities(evac, user, 0, 1); +}