From 200c73fdfea734765c48309cc8dcbcf44b69c8c5 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Wed, 16 Apr 2008 23:44:20 +0000 Subject: [PATCH] Don't traverse the entire list of threads on every GC (phase 1) Instead of keeping a single list of all threads, keep one per step and only look at the threads belonging to steps that we are collecting. --- includes/Storage.h | 3 ++ rts/Sanity.c | 16 ++++--- rts/Schedule.c | 39 ++++++++++------ rts/Schedule.h | 5 -- rts/Threads.c | 10 ++-- rts/sm/Compact.c | 4 +- rts/sm/GC.c | 11 +++-- rts/sm/MarkWeak.c | 128 +++++++++++++++++++++++++++++----------------------- rts/sm/Storage.c | 2 + 9 files changed, 128 insertions(+), 90 deletions(-) diff --git a/includes/Storage.h b/includes/Storage.h index 90e364c..c9cbd9c 100644 --- a/includes/Storage.h +++ b/includes/Storage.h @@ -69,6 +69,8 @@ typedef struct step_ { bdescr * large_objects; // large objects (doubly linked) unsigned int n_large_blocks; // no. of blocks used by large objs + StgTSO * threads; // threads in this step + // linked via global_link // ------------------------------------ // Fields below are used during GC only @@ -100,6 +102,7 @@ typedef struct step_ { bdescr * bitmap; // bitmap for compacting collection + StgTSO * old_threads; } step; diff --git a/rts/Sanity.c b/rts/Sanity.c index e90a573..b8bf5d4 100644 --- a/rts/Sanity.c +++ b/rts/Sanity.c @@ -781,13 +781,17 @@ checkThreadQsSanity (rtsBool check_TSO_too) void checkGlobalTSOList (rtsBool checkTSOs) { - extern StgTSO *all_threads; StgTSO *tso; - for (tso=all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) { - ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso)); - ASSERT(get_itbl(tso)->type == TSO); - if (checkTSOs) - checkTSO(tso); + nat s; + + for (s = 0; s < total_steps; s++) { + for (tso=all_steps[s].threads; tso != END_TSO_QUEUE; + tso = tso->global_link) { + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso)); + ASSERT(get_itbl(tso)->type == TSO); + if (checkTSOs) + checkTSO(tso); + } } } diff --git a/rts/Schedule.c b/rts/Schedule.c index 04ab41c..915530f 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -118,12 +118,6 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; -/* Linked list of all threads. - * Used for detecting garbage collected threads. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *all_threads = NULL; - /* flag set by signal handler to precipitate a context switch * LOCK: none (just an advisory flag) */ @@ -1898,7 +1892,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) // point where we can deal with this. Leaving it on the run // queue also ensures that the garbage collector knows about // this thread and its return value (it gets dropped from the - // all_threads list so there's no other way to find it). + // step->threads list so there's no other way to find it). appendToRunQueue(cap,t); return rtsFalse; #else @@ -2016,8 +2010,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) */ { StgTSO *next; + nat s; - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { next = t->_link; } else { @@ -2052,6 +2048,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } } } + } } } @@ -2133,6 +2130,7 @@ forkProcess(HsStablePtr *entry pid_t pid; StgTSO* t,*next; Capability *cap; + nat s; #if defined(THREADED_RTS) if (RtsFlags.ParFlags.nNodes > 1) { @@ -2180,7 +2178,8 @@ forkProcess(HsStablePtr *entry // all Tasks, because they correspond to OS threads that are // now gone. - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { next = t->_link; } else { @@ -2190,6 +2189,7 @@ forkProcess(HsStablePtr *entry // threads may be evaluating thunks that we need later. deleteThread_(cap,t); } + } } // Empty the run queue. It seems tempting to let all the @@ -2203,9 +2203,11 @@ forkProcess(HsStablePtr *entry // don't exist now: cap->suspended_ccalling_tasks = NULL; - // Empty the all_threads list. Otherwise, the garbage + // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. - all_threads = END_TSO_QUEUE; + for (s = 0; s < total_steps; s++) { + all_steps[s].threads = END_TSO_QUEUE; + } // Wipe the task list, except the current Task. ACQUIRE_LOCK(&sched_mutex); @@ -2255,14 +2257,18 @@ deleteAllThreads ( Capability *cap ) // NOTE: only safe to call if we own all capabilities. StgTSO* t, *next; + nat s; + debugTrace(DEBUG_sched,"deleting all threads"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { next = t->_link; } else { next = t->global_link; deleteThread(cap,t); } + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2572,7 +2578,6 @@ initScheduler(void) #endif blackhole_queue = END_TSO_QUEUE; - all_threads = END_TSO_QUEUE; context_switch = 0; sched_state = SCHED_RUNNING; @@ -3143,11 +3148,15 @@ resurrectThreads (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + step *step; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - tso->global_link = all_threads; - all_threads = tso; + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id); // Wake up the thread on the Capability it was last on diff --git a/rts/Schedule.h b/rts/Schedule.h index 32b7e59..89ac112 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -133,11 +133,6 @@ extern StgTSO *RTS_VAR(sleeping_queue); #endif #endif -/* Linked list of all threads. - * Locks required : sched_mutex - */ -extern StgTSO *RTS_VAR(all_threads); - /* Set to rtsTrue if there are threads on the blackhole_queue, and * it is possible that one or more of them may be available to run. * This flag is set to rtsFalse after we've checked the queue, and diff --git a/rts/Threads.c b/rts/Threads.c index efdf772..b7f62c8 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -145,8 +145,8 @@ createThread(Capability *cap, nat size) */ ACQUIRE_LOCK(&sched_mutex); tso->id = next_thread_id++; // while we have the mutex - tso->global_link = all_threads; - all_threads = tso; + tso->global_link = g0s0->threads; + g0s0->threads = tso; RELEASE_LOCK(&sched_mutex); #if defined(DIST) @@ -771,7 +771,7 @@ void printAllThreads(void) { StgTSO *t, *next; - nat i; + nat i, s; Capability *cap; # if defined(GRAN) @@ -799,7 +799,8 @@ printAllThreads(void) } debugBelch("other threads:\n"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->why_blocked != NotBlocked) { printThreadStatus(t); } @@ -808,6 +809,7 @@ printAllThreads(void) } else { next = t->global_link; } + } } } diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index fa6efa9..c5f0c37 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -986,7 +986,9 @@ compact(StgClosure *static_objects) } // the global thread list - thread((void *)&all_threads); + for (s = 0; s < total_steps; s++) { + thread((void *)&all_steps[s].threads); + } // any threads resurrected during this GC thread((void *)&resurrected_threads); diff --git a/rts/sm/GC.c b/rts/sm/GC.c index f869ec4..3cb71fa 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -1078,14 +1078,19 @@ init_collected_gen (nat g, nat n_threads) for (s = 0; s < generations[g].n_steps; s++) { + stp = &generations[g].steps[s]; + ASSERT(stp->gen_no == g); + + // we'll construct a new list of threads in this step + // during GC, throw away the current list. + stp->old_threads = stp->threads; + stp->threads = END_TSO_QUEUE; + // generation 0, step 0 doesn't need to-space if (g == 0 && s == 0 && RtsFlags.GcFlags.generations > 1) { continue; } - stp = &generations[g].steps[s]; - ASSERT(stp->gen_no == g); - // deprecate the existing blocks stp->old_blocks = stp->blocks; stp->n_old_blocks = stp->n_blocks; diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c index ce88466..078919d 100644 --- a/rts/sm/MarkWeak.c +++ b/rts/sm/MarkWeak.c @@ -77,7 +77,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list /* List of all threads during GC */ StgTSO *resurrected_threads; -static StgTSO *old_all_threads; void initWeakForGC(void) @@ -85,12 +84,6 @@ initWeakForGC(void) old_weak_ptr_list = weak_ptr_list; weak_ptr_list = NULL; weak_stage = WeakPtrs; - - /* The all_threads list is like the weak_ptr_list. - * See traverseWeakPtrList() for the details. - */ - old_all_threads = all_threads; - all_threads = END_TSO_QUEUE; resurrected_threads = END_TSO_QUEUE; } @@ -185,53 +178,67 @@ traverseWeakPtrList(void) * the weak ptr list. If we discover any threads that are about to * become garbage, we wake them up and administer an exception. */ - { - StgTSO *t, *tmp, *next, **prev; + { + StgTSO *t, *tmp, *next, **prev; + nat g, s; + step *stp; - prev = &old_all_threads; - for (t = old_all_threads; t != END_TSO_QUEUE; t = next) { - - tmp = (StgTSO *)isAlive((StgClosure *)t); + // Traverse thread lists for generations we collected... + for (g = 0; g <= N; g++) { + for (s = 0; s < generations[g].n_steps; s++) { + stp = &generations[g].steps[s]; + + prev = &stp->old_threads; + + for (t = stp->old_threads; t != END_TSO_QUEUE; t = next) { - if (tmp != NULL) { - t = tmp; - } + tmp = (StgTSO *)isAlive((StgClosure *)t); - ASSERT(get_itbl(t)->type == TSO); - switch (t->what_next) { - case ThreadRelocated: - next = t->_link; - *prev = next; - continue; - case ThreadKilled: - case ThreadComplete: - // finshed or died. The thread might still be alive, but we - // don't keep it on the all_threads list. Don't forget to - // stub out its global_link field. - next = t->global_link; - t->global_link = END_TSO_QUEUE; - *prev = next; - continue; - default: - ; - } + if (tmp != NULL) { + t = tmp; + } + + ASSERT(get_itbl(t)->type == TSO); + switch (t->what_next) { + case ThreadRelocated: + next = t->_link; + *prev = next; + continue; + case ThreadKilled: + case ThreadComplete: + // finshed or died. The thread might still + // be alive, but we don't keep it on the + // all_threads list. Don't forget to + // stub out its global_link field. + next = t->global_link; + t->global_link = END_TSO_QUEUE; + *prev = next; + continue; + default: + ; + } - if (tmp == NULL) { - // not alive (yet): leave this thread on the - // old_all_threads list. - prev = &(t->global_link); - next = t->global_link; - } - else { - // alive: move this thread onto the all_threads list. - next = t->global_link; - t->global_link = all_threads; - all_threads = t; - *prev = next; - } - } + if (tmp == NULL) { + // not alive (yet): leave this thread on the + // old_all_threads list. + prev = &(t->global_link); + next = t->global_link; + } + else { + step *new_step; + // alive: move this thread onto the correct + // threads list. + next = t->global_link; + new_step = Bdescr((P_)t)->step; + t->global_link = new_step->threads; + new_step->threads = t; + *prev = next; + } + } + } + } } - + /* If we evacuated any threads, we need to go back to the scavenger. */ if (flag) return rtsTrue; @@ -239,14 +246,23 @@ traverseWeakPtrList(void) /* And resurrect any threads which were about to become garbage. */ { + nat g, s; + step *stp; StgTSO *t, *tmp, *next; - for (t = old_all_threads; t != END_TSO_QUEUE; t = next) { - next = t->global_link; - tmp = t; - evacuate((StgClosure **)&tmp); - tmp->global_link = resurrected_threads; - resurrected_threads = tmp; - } + + for (g = 0; g <= N; g++) { + for (s = 0; s < generations[g].n_steps; s++) { + stp = &generations[g].steps[s]; + + for (t = stp->old_threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; + tmp = t; + evacuate((StgClosure **)&tmp); + tmp->global_link = resurrected_threads; + resurrected_threads = tmp; + } + } + } } /* Finally, we can update the blackhole_queue. This queue diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c index bd321b3..db0299c 100644 --- a/rts/sm/Storage.c +++ b/rts/sm/Storage.c @@ -101,6 +101,8 @@ initStep (step *stp, int g, int s) initSpinLock(&stp->sync_todo); initSpinLock(&stp->sync_large_objects); #endif + stp->threads = END_TSO_QUEUE; + stp->old_threads = END_TSO_QUEUE; } void -- 1.7.10.4