[project @ 2005-04-19 13:39:41 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 6cac6c1..a5f133e 100644 (file)
@@ -177,11 +177,6 @@ int context_switch = 0;
 /* if this flag is set as well, give up execution */
 rtsBool interrupted = rtsFalse;
 
-/* If this flag is set, we are running Haskell code.  Used to detect
- * uses of 'foreign import unsafe' that should be 'safe'.
- */
-static rtsBool in_haskell = rtsFalse;
-
 /* Next thread ID to allocate.
  * Locks required: thread_id_mutex
  */
@@ -217,12 +212,6 @@ StgTSO *CurrentTSO;
  */
 StgTSO dummy_tso;
 
-# if defined(SMP)
-static Condition gc_pending_cond = INIT_COND_VAR;
-# endif
-
-static rtsBool ready_to_gc;
-
 /*
  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
  * in an MT setting, needed to signal that a worker thread shouldn't hang around
@@ -295,8 +284,8 @@ static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
 static void scheduleHandleThreadBlocked( StgTSO *t );
 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
                                             Capability *cap, StgTSO *t );
-static void scheduleDoHeapProfile(void);
-static void scheduleDoGC(void);
+static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
+static void scheduleDoGC(Capability *cap);
 
 static void unblockThread(StgTSO *tso);
 static rtsBool checkBlackHoles(void);
@@ -324,13 +313,13 @@ StgTSO * activateSpark (rtsSpark spark);
  * ------------------------------------------------------------------------- */
 
 #if defined(RTS_SUPPORTS_THREADS)
-static rtsBool startingWorkerThread = rtsFalse;
+static nat startingWorkerThread = 0;
 
 static void
 taskStart(void)
 {
   ACQUIRE_LOCK(&sched_mutex);
-  startingWorkerThread = rtsFalse;
+  startingWorkerThread--;
   schedule(NULL,NULL);
   taskStop();
   RELEASE_LOCK(&sched_mutex);
@@ -341,14 +330,14 @@ startSchedulerTaskIfNecessary(void)
 {
     if ( !EMPTY_RUN_QUEUE()
         && !shutting_down_scheduler // not if we're shutting down
-        && !startingWorkerThread )
+        && startingWorkerThread==0)
     {
        // we don't want to start another worker thread
        // just because the last one hasn't yet reached the
        // "waiting for capability" state
-       startingWorkerThread = rtsTrue;
+       startingWorkerThread++;
        if (!maybeStartNewWorker(taskStart)) {
-           startingWorkerThread = rtsFalse;
+           startingWorkerThread--;
        }
     }
 }
@@ -429,6 +418,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 # endif
 #endif
   nat prev_what_next;
+  rtsBool ready_to_gc;
   
   // Pre-condition: sched_mutex is held.
   // We might have a capability, passed in as initialCapability.
@@ -467,19 +457,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       IF_DEBUG(scheduler, printAllThreads());
 
-#if defined(SMP)
-      // 
-      // Wait until GC has completed, if necessary.
-      //
-      if (ready_to_gc) {
-         if (cap != NULL) {
-             releaseCapability(cap);
-             IF_DEBUG(scheduler,sched_belch("waiting for GC"));
-             waitCondition( &gc_pending_cond, &sched_mutex );
-         }
-      }
-#endif
-
 #if defined(RTS_SUPPORTS_THREADS)
       // Yield the capability to higher-priority tasks if necessary.
       //
@@ -500,7 +477,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
-    if (in_haskell) {
+    if (cap->r.rInHaskell) {
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(1);
@@ -689,7 +666,7 @@ run_thread:
     prev_what_next = t->what_next;
 
     errno = t->saved_errno;
-    in_haskell = rtsTrue;
+    cap->r.rInHaskell = rtsTrue;
 
     switch (prev_what_next) {
 
@@ -717,7 +694,7 @@ run_thread:
        blackholes_need_checking = rtsTrue;
     }
 
-    in_haskell = rtsFalse;
+    cap->r.rInHaskell = rtsFalse;
 
     // The TSO might have moved, eg. if it re-entered the RTS and a GC
     // happened.  So find the new location:
@@ -744,6 +721,8 @@ run_thread:
     
     schedulePostRunThread();
 
+    ready_to_gc = rtsFalse;
+
     switch (ret) {
     case HeapOverflow:
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
@@ -773,8 +752,8 @@ run_thread:
       barf("schedule: invalid thread return code %d", (int)ret);
     }
 
-    scheduleDoHeapProfile();
-    scheduleDoGC();
+    if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
+    if (ready_to_gc) { scheduleDoGC(cap); }
   } /* end of while() */
 
   IF_PAR_DEBUG(verbose,
@@ -816,7 +795,7 @@ schedulePreLoop(void)
 static void
 scheduleStartSignalHandlers(void)
 {
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
     if (signals_pending()) {
       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
       startSignalHandlers();
@@ -921,6 +900,7 @@ scheduleDetectDeadlock(void)
            StgMainThread *m;
            m = main_threads;
            switch (m->tso->why_blocked) {
+           case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnException:
            case BlockedOnMVar:
@@ -1482,9 +1462,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            if (cap->r.rCurrentNursery->u.back != NULL) {
                cap->r.rCurrentNursery->u.back->link = bd;
            } else {
+#if !defined(SMP)
                ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
-                      g0s0->blocks == cap->r.rNursery);
-               cap->r.rNursery = g0s0->blocks = bd;
+                      g0s0 == cap->r.rNursery);
+               g0s0->blocks = bd;
+#endif
+               cap->r.rNursery->blocks = bd;
            }             
            cap->r.rCurrentNursery->u.back = bd;
            
@@ -1504,11 +1487,14 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
                }
            }
            
+#if !defined(SMP)
            // don't forget to update the block count in g0s0.
            g0s0->n_blocks += blocks;
+
            // This assert can be a killer if the app is doing lots
            // of large block allocations.
            ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
+#endif
            
            // now update the nursery to point to the new block
            cap->r.rCurrentNursery = bd;
@@ -1847,10 +1833,10 @@ scheduleHandleThreadFinished( StgMainThread *mainThread
  * Perform a heap census, if PROFILING
  * -------------------------------------------------------------------------- */
 
-static void
-scheduleDoHeapProfile(void)
+static rtsBool
+scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
 {
-#ifdef PROFILING
+#if defined(PROFILING)
     // When we have +RTS -i0 and we're heap profiling, do a census at
     // every GC.  This lets us get repeatable runs for debugging.
     if (performHeapProfile ||
@@ -1859,9 +1845,10 @@ scheduleDoHeapProfile(void)
        GarbageCollect(GetRoots, rtsTrue);
        heapCensus();
        performHeapProfile = rtsFalse;
-       ready_to_gc = rtsFalse; // we already GC'd
+       return rtsTrue;  // true <=> we already GC'd
     }
 #endif
+    return rtsFalse;
 }
 
 /* -----------------------------------------------------------------------------
@@ -1870,66 +1857,97 @@ scheduleDoHeapProfile(void)
  * -------------------------------------------------------------------------- */
 
 static void
-scheduleDoGC(void)
+scheduleDoGC( Capability *cap STG_UNUSED )
 {
     StgTSO *t;
+#ifdef SMP
+    static rtsBool waiting_for_gc;
+    int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
+           // subtract one because we're already holding one.
+    Capability *caps[n_capabilities];
+#endif
 
 #ifdef SMP
-    // The last task to stop actually gets to do the GC.  The rest
-    // of the tasks release their capabilities and wait gc_pending_cond.
-    if (ready_to_gc && allFreeCapabilities())
-#else
-    if (ready_to_gc)
+    // In order to GC, there must be no threads running Haskell code.
+    // Therefore, the GC thread needs to hold *all* the capabilities,
+    // and release them after the GC has completed.  
+    //
+    // This seems to be the simplest way: previous attempts involved
+    // making all the threads with capabilities give up their
+    // capabilities and sleep except for the *last* one, which
+    // actually did the GC.  But it's quite hard to arrange for all
+    // the other tasks to sleep and stay asleep.
+    //
+       
+    // Someone else is already trying to GC
+    if (waiting_for_gc) return;
+    waiting_for_gc = rtsTrue;
+
+    caps[n_capabilities] = cap;
+    while (n_capabilities > 0) {
+       IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
+       waitForReturnCapability(&sched_mutex, &cap);
+       n_capabilities--;
+       caps[n_capabilities] = cap;
+    }
+
+    waiting_for_gc = rtsFalse;
 #endif
-    {
-       /* Kick any transactions which are invalid back to their
-        * atomically frames.  When next scheduled they will try to
-        * commit, this commit will fail and they will retry.
-        */
-       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
-           if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
-               if (!stmValidateTransaction (t -> trec)) {
-                   IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-                   
-                   // strip the stack back to the ATOMICALLY_FRAME, aborting
-                   // the (nested) transaction, and saving the stack of any
-                   // partially-evaluated thunks on the heap.
-                   raiseAsync_(t, NULL, rtsTrue);
-                   
+
+    /* Kick any transactions which are invalid back to their
+     * atomically frames.  When next scheduled they will try to
+     * commit, this commit will fail and they will retry.
+     */
+    for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+       if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+           if (!stmValidateTransaction (t -> trec)) {
+               IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+               
+               // strip the stack back to the ATOMICALLY_FRAME, aborting
+               // the (nested) transaction, and saving the stack of any
+               // partially-evaluated thunks on the heap.
+               raiseAsync_(t, NULL, rtsTrue);
+               
 #ifdef REG_R1
-                   ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+               ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
 #endif
-               }
            }
        }
-
-       // so this happens periodically:
-       scheduleCheckBlackHoles();
-
-       /* everybody back, start the GC.
-        * Could do it in this thread, or signal a condition var
-        * to do it in another thread.  Either way, we need to
-        * broadcast on gc_pending_cond afterward.
-        */
+    }
+    
+    // so this happens periodically:
+    scheduleCheckBlackHoles();
+    
+    /* everybody back, start the GC.
+     * Could do it in this thread, or signal a condition var
+     * to do it in another thread.  Either way, we need to
+     * broadcast on gc_pending_cond afterward.
+     */
 #if defined(RTS_SUPPORTS_THREADS)
-       IF_DEBUG(scheduler,sched_belch("doing GC"));
+    IF_DEBUG(scheduler,sched_belch("doing GC"));
 #endif
-       GarbageCollect(GetRoots,rtsFalse);
-       ready_to_gc = rtsFalse;
+    GarbageCollect(GetRoots,rtsFalse);
+    
 #if defined(SMP)
-       broadcastCondition(&gc_pending_cond);
+    {
+       // release our stash of capabilities.
+       nat i;
+       for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
+           releaseCapability(caps[i]);
+       }
+    }
 #endif
+
 #if defined(GRAN)
-       /* add a ContinueThread event to continue execution of current thread */
-       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
-                 ContinueThread,
-                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
-       IF_GRAN_DEBUG(bq, 
-                     debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
-                     G_EVENTQ(0);
-                     G_CURR_THREADQ(0));
+    /* add a ContinueThread event to continue execution of current thread */
+    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
+             ContinueThread,
+             t, (StgClosure*)NULL, (rtsSpark*)NULL);
+    IF_GRAN_DEBUG(bq, 
+                 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
+                 G_EVENTQ(0);
+                 G_CURR_THREADQ(0));
 #endif /* GRAN */
-    }
 }
 
 /* ---------------------------------------------------------------------------
@@ -2116,6 +2134,7 @@ suspendThread( StgRegTable *reg )
   tok = cap->r.rCurrentTSO->id;
 
   /* Hand back capability */
+  cap->r.rInHaskell = rtsFalse;
   releaseCapability(cap);
   
 #if defined(RTS_SUPPORTS_THREADS)
@@ -2125,7 +2144,6 @@ suspendThread( StgRegTable *reg )
   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
 #endif
 
-  in_haskell = rtsFalse;
   RELEASE_LOCK(&sched_mutex);
   
   errno = saved_errno;
@@ -2173,7 +2191,7 @@ resumeThread( StgInt tok )
   tso->why_blocked  = NotBlocked;
 
   cap->r.rCurrentTSO = tso;
-  in_haskell = rtsTrue;
+  cap->r.rInHaskell = rtsTrue;
   RELEASE_LOCK(&sched_mutex);
   errno = saved_errno;
   return &cap->r;
@@ -2615,6 +2633,7 @@ initScheduler(void)
 
 #if defined(SMP)
   /* eagerly start some extra workers */
+  startingWorkerThread = RtsFlags.ParFlags.nNodes;
   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
 #endif