Don't traverse the entire list of threads on every GC (phase 1)
authorSimon Marlow <simonmarhaskell@gmail.com>
Wed, 16 Apr 2008 23:44:20 +0000 (23:44 +0000)
committerSimon Marlow <simonmarhaskell@gmail.com>
Wed, 16 Apr 2008 23:44:20 +0000 (23:44 +0000)
Instead of keeping a single list of all threads, keep one per step
and only look at the threads belonging to steps that we are
collecting.

includes/Storage.h
rts/Sanity.c
rts/Schedule.c
rts/Schedule.h
rts/Threads.c
rts/sm/Compact.c
rts/sm/GC.c
rts/sm/MarkWeak.c
rts/sm/Storage.c

index 90e364c..c9cbd9c 100644 (file)
@@ -69,6 +69,8 @@ typedef struct step_ {
     bdescr *             large_objects;         // large objects (doubly linked)
     unsigned int         n_large_blocks; // no. of blocks used by large objs
 
+    StgTSO *             threads;       // threads in this step
+                                        // linked via global_link
 
     // ------------------------------------
     // Fields below are used during GC only
@@ -100,6 +102,7 @@ typedef struct step_ {
 
     bdescr *     bitmap;               // bitmap for compacting collection
 
+    StgTSO *     old_threads;
 
 } step;
 
index e90a573..b8bf5d4 100644 (file)
@@ -781,13 +781,17 @@ checkThreadQsSanity (rtsBool check_TSO_too)
 void
 checkGlobalTSOList (rtsBool checkTSOs)
 {
-  extern  StgTSO *all_threads;
   StgTSO *tso;
-  for (tso=all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
-      ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso));
-      ASSERT(get_itbl(tso)->type == TSO);
-      if (checkTSOs)
-         checkTSO(tso);
+  nat s;
+
+  for (s = 0; s < total_steps; s++) {
+      for (tso=all_steps[s].threads; tso != END_TSO_QUEUE; 
+           tso = tso->global_link) {
+          ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso));
+          ASSERT(get_itbl(tso)->type == TSO);
+          if (checkTSOs)
+              checkTSO(tso);
+      }
   }
 }
 
index 04ab41c..915530f 100644 (file)
@@ -118,12 +118,6 @@ StgTSO *blackhole_queue = NULL;
  */
 rtsBool blackholes_need_checking = rtsFalse;
 
-/* Linked list of all threads.
- * Used for detecting garbage collected threads.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *all_threads = NULL;
-
 /* flag set by signal handler to precipitate a context switch
  * LOCK: none (just an advisory flag)
  */
@@ -1898,7 +1892,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
              // point where we can deal with this.  Leaving it on the run
              // queue also ensures that the garbage collector knows about
              // this thread and its return value (it gets dropped from the
-             // all_threads list so there's no other way to find it).
+             // step->threads list so there's no other way to find it).
              appendToRunQueue(cap,t);
              return rtsFalse;
 #else
@@ -2016,8 +2010,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
      */
     { 
        StgTSO *next;
+        nat s;
 
-       for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+        for (s = 0; s < total_steps; s++) {
+          for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
            if (t->what_next == ThreadRelocated) {
                next = t->_link;
            } else {
@@ -2052,6 +2048,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
                    }
                }
            }
+          }
        }
     }
     
@@ -2133,6 +2130,7 @@ forkProcess(HsStablePtr *entry
     pid_t pid;
     StgTSO* t,*next;
     Capability *cap;
+    nat s;
     
 #if defined(THREADED_RTS)
     if (RtsFlags.ParFlags.nNodes > 1) {
@@ -2180,7 +2178,8 @@ forkProcess(HsStablePtr *entry
        // all Tasks, because they correspond to OS threads that are
        // now gone.
 
-       for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+        for (s = 0; s < total_steps; s++) {
+          for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
            if (t->what_next == ThreadRelocated) {
                next = t->_link;
            } else {
@@ -2190,6 +2189,7 @@ forkProcess(HsStablePtr *entry
                // threads may be evaluating thunks that we need later.
                deleteThread_(cap,t);
            }
+          }
        }
        
        // Empty the run queue.  It seems tempting to let all the
@@ -2203,9 +2203,11 @@ forkProcess(HsStablePtr *entry
        // don't exist now:
        cap->suspended_ccalling_tasks = NULL;
 
-       // Empty the all_threads list.  Otherwise, the garbage
+       // Empty the threads lists.  Otherwise, the garbage
        // collector may attempt to resurrect some of these threads.
-       all_threads = END_TSO_QUEUE;
+        for (s = 0; s < total_steps; s++) {
+            all_steps[s].threads = END_TSO_QUEUE;
+        }
 
        // Wipe the task list, except the current Task.
        ACQUIRE_LOCK(&sched_mutex);
@@ -2255,14 +2257,18 @@ deleteAllThreads ( Capability *cap )
     // NOTE: only safe to call if we own all capabilities.
 
     StgTSO* t, *next;
+    nat s;
+
     debugTrace(DEBUG_sched,"deleting all threads");
-    for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+    for (s = 0; s < total_steps; s++) {
+      for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
        if (t->what_next == ThreadRelocated) {
            next = t->_link;
        } else {
            next = t->global_link;
            deleteThread(cap,t);
        }
+      }
     }      
 
     // The run queue now contains a bunch of ThreadKilled threads.  We
@@ -2572,7 +2578,6 @@ initScheduler(void)
 #endif
 
   blackhole_queue   = END_TSO_QUEUE;
-  all_threads       = END_TSO_QUEUE;
 
   context_switch = 0;
   sched_state    = SCHED_RUNNING;
@@ -3143,11 +3148,15 @@ resurrectThreads (StgTSO *threads)
 {
     StgTSO *tso, *next;
     Capability *cap;
+    step *step;
 
     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
        next = tso->global_link;
-       tso->global_link = all_threads;
-       all_threads = tso;
+
+        step = Bdescr((P_)tso)->step;
+       tso->global_link = step->threads;
+       step->threads = tso;
+
        debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
        
        // Wake up the thread on the Capability it was last on
index 32b7e59..89ac112 100644 (file)
@@ -133,11 +133,6 @@ extern  StgTSO *RTS_VAR(sleeping_queue);
 #endif
 #endif
 
-/* Linked list of all threads.
- * Locks required  : sched_mutex
- */
-extern  StgTSO *RTS_VAR(all_threads);
-
 /* Set to rtsTrue if there are threads on the blackhole_queue, and
  * it is possible that one or more of them may be available to run.
  * This flag is set to rtsFalse after we've checked the queue, and
index efdf772..b7f62c8 100644 (file)
@@ -145,8 +145,8 @@ createThread(Capability *cap, nat size)
      */
     ACQUIRE_LOCK(&sched_mutex);
     tso->id = next_thread_id++;  // while we have the mutex
-    tso->global_link = all_threads;
-    all_threads = tso;
+    tso->global_link = g0s0->threads;
+    g0s0->threads = tso;
     RELEASE_LOCK(&sched_mutex);
     
 #if defined(DIST)
@@ -771,7 +771,7 @@ void
 printAllThreads(void)
 {
   StgTSO *t, *next;
-  nat i;
+  nat i, s;
   Capability *cap;
 
 # if defined(GRAN)
@@ -799,7 +799,8 @@ printAllThreads(void)
   }
 
   debugBelch("other threads:\n");
-  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+  for (s = 0; s < total_steps; s++) {
+    for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
       if (t->why_blocked != NotBlocked) {
          printThreadStatus(t);
       }
@@ -808,6 +809,7 @@ printAllThreads(void)
       } else {
          next = t->global_link;
       }
+    }
   }
 }
 
index fa6efa9..c5f0c37 100644 (file)
@@ -986,7 +986,9 @@ compact(StgClosure *static_objects)
     }
 
     // the global thread list
-    thread((void *)&all_threads);
+    for (s = 0; s < total_steps; s++) {
+        thread((void *)&all_steps[s].threads);
+    }
 
     // any threads resurrected during this GC
     thread((void *)&resurrected_threads);
index f869ec4..3cb71fa 100644 (file)
@@ -1078,14 +1078,19 @@ init_collected_gen (nat g, nat n_threads)
 
     for (s = 0; s < generations[g].n_steps; s++) {
 
+       stp = &generations[g].steps[s];
+       ASSERT(stp->gen_no == g);
+
+        // we'll construct a new list of threads in this step
+        // during GC, throw away the current list.
+        stp->old_threads = stp->threads;
+        stp->threads = END_TSO_QUEUE;
+
        // generation 0, step 0 doesn't need to-space 
        if (g == 0 && s == 0 && RtsFlags.GcFlags.generations > 1) { 
            continue; 
        }
        
-       stp = &generations[g].steps[s];
-       ASSERT(stp->gen_no == g);
-
        // deprecate the existing blocks
        stp->old_blocks   = stp->blocks;
        stp->n_old_blocks = stp->n_blocks;
index ce88466..078919d 100644 (file)
@@ -77,7 +77,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list
 /* List of all threads during GC
  */
 StgTSO *resurrected_threads;
-static StgTSO *old_all_threads;
 
 void
 initWeakForGC(void)
@@ -85,12 +84,6 @@ initWeakForGC(void)
     old_weak_ptr_list = weak_ptr_list;
     weak_ptr_list = NULL;
     weak_stage = WeakPtrs;
-
-    /* The all_threads list is like the weak_ptr_list.  
-     * See traverseWeakPtrList() for the details.
-     */
-    old_all_threads = all_threads;
-    all_threads = END_TSO_QUEUE;
     resurrected_threads = END_TSO_QUEUE;
 }
 
@@ -185,53 +178,67 @@ traverseWeakPtrList(void)
        * the weak ptr list.  If we discover any threads that are about to
        * become garbage, we wake them up and administer an exception.
        */
-      {
-         StgTSO *t, *tmp, *next, **prev;
+     {
+          StgTSO *t, *tmp, *next, **prev;
+          nat g, s;
+          step *stp;
          
-         prev = &old_all_threads;
-         for (t = old_all_threads; t != END_TSO_QUEUE; t = next) {
-             
-             tmp = (StgTSO *)isAlive((StgClosure *)t);
+          // Traverse thread lists for generations we collected...
+          for (g = 0; g <= N; g++) {
+              for (s = 0; s < generations[g].n_steps; s++) {
+                  stp = &generations[g].steps[s];
+
+                  prev = &stp->old_threads;
+
+                  for (t = stp->old_threads; t != END_TSO_QUEUE; t = next) {
              
-             if (tmp != NULL) {
-                 t = tmp;
-             }
+                      tmp = (StgTSO *)isAlive((StgClosure *)t);
              
-             ASSERT(get_itbl(t)->type == TSO);
-             switch (t->what_next) {
-             case ThreadRelocated:
-                 next = t->_link;
-                 *prev = next;
-                 continue;
-             case ThreadKilled:
-             case ThreadComplete:
-                 // finshed or died.  The thread might still be alive, but we
-                 // don't keep it on the all_threads list.  Don't forget to
-                 // stub out its global_link field.
-                 next = t->global_link;
-                 t->global_link = END_TSO_QUEUE;
-                 *prev = next;
-                 continue;
-             default:
-                 ;
-             }
+                      if (tmp != NULL) {
+                          t = tmp;
+                      }
+
+                      ASSERT(get_itbl(t)->type == TSO);
+                      switch (t->what_next) {
+                      case ThreadRelocated:
+                          next = t->_link;
+                          *prev = next;
+                          continue;
+                      case ThreadKilled:
+                      case ThreadComplete:
+                          // finshed or died.  The thread might still
+                          // be alive, but we don't keep it on the
+                          // all_threads list.  Don't forget to
+                          // stub out its global_link field.
+                          next = t->global_link;
+                          t->global_link = END_TSO_QUEUE;
+                          *prev = next;
+                          continue;
+                      default:
+                          ;
+                      }
              
-             if (tmp == NULL) {
-                 // not alive (yet): leave this thread on the
-                 // old_all_threads list.
-                 prev = &(t->global_link);
-                 next = t->global_link;
-             } 
-             else {
-                 // alive: move this thread onto the all_threads list.
-                 next = t->global_link;
-                 t->global_link = all_threads;
-                 all_threads  = t;
-                 *prev = next;
-             }
-         }
+                      if (tmp == NULL) {
+                          // not alive (yet): leave this thread on the
+                          // old_all_threads list.
+                          prev = &(t->global_link);
+                          next = t->global_link;
+                      } 
+                      else {
+                          step *new_step;
+                          // alive: move this thread onto the correct
+                          // threads list.
+                          next = t->global_link;
+                          new_step = Bdescr((P_)t)->step;
+                          t->global_link = new_step->threads;
+                          new_step->threads  = t;
+                          *prev = next;
+                      }
+                  }
+              }
+          }
       }
-      
+
       /* If we evacuated any threads, we need to go back to the scavenger.
        */
       if (flag) return rtsTrue;
@@ -239,14 +246,23 @@ traverseWeakPtrList(void)
       /* And resurrect any threads which were about to become garbage.
        */
       {
+          nat g, s;
+          step *stp;
          StgTSO *t, *tmp, *next;
-         for (t = old_all_threads; t != END_TSO_QUEUE; t = next) {
-             next = t->global_link;
-             tmp = t;
-             evacuate((StgClosure **)&tmp);
-             tmp->global_link = resurrected_threads;
-             resurrected_threads = tmp;
-         }
+
+          for (g = 0; g <= N; g++) {
+              for (s = 0; s < generations[g].n_steps; s++) {
+                  stp = &generations[g].steps[s];
+
+                  for (t = stp->old_threads; t != END_TSO_QUEUE; t = next) {
+                      next = t->global_link;
+                      tmp = t;
+                      evacuate((StgClosure **)&tmp);
+                      tmp->global_link = resurrected_threads;
+                      resurrected_threads = tmp;
+                  }
+              }
+          }
       }
       
       /* Finally, we can update the blackhole_queue.  This queue
index bd321b3..db0299c 100644 (file)
@@ -101,6 +101,8 @@ initStep (step *stp, int g, int s)
     initSpinLock(&stp->sync_todo);
     initSpinLock(&stp->sync_large_objects);
 #endif
+    stp->threads = END_TSO_QUEUE;
+    stp->old_threads = END_TSO_QUEUE;
 }
 
 void