[project @ 2005-07-26 10:11:37 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 8e1a43e..d35bac5 100644 (file)
@@ -288,7 +288,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
                                             Capability *cap, StgTSO *t );
 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap);
+static void scheduleDoGC(rtsBool force_major);
 
 static void unblockThread(StgTSO *tso);
 static rtsBool checkBlackHoles(void);
@@ -305,6 +305,7 @@ static void raiseAsync_(StgTSO *tso, StgClosure *exception,
 
 static void printThreadBlockage(StgTSO *tso);
 static void printThreadStatus(StgTSO *tso);
+void printThreadQueue(StgTSO *tso);
 
 #if defined(PARALLEL_HASKELL)
 StgTSO * createSparkThread(rtsSpark spark);
@@ -462,7 +463,8 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
       // Yield the capability to higher-priority tasks if necessary.
       //
       if (cap != NULL) {
-         yieldCapability(&cap);
+         yieldCapability(&cap, 
+                         mainThread ? &mainThread->bound_thread_cond : NULL );
       }
 
       // If we do not currently hold a capability, we wait for one
@@ -474,7 +476,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       // We now have a capability...
 #endif
-      
+
 #if 0 /* extra sanity checking */
       { 
          StgMainThread *m;
@@ -626,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
            sched_belch("### thread %d bound to another OS thread", t->id));
          // no, bound to a different Haskell thread: pass to that thread
          PUSH_ON_RUN_QUEUE(t);
-         passCapability(&m->bound_thread_cond);
          continue;
        }
       }
       else
       {
        if(mainThread != NULL)
-        // The thread we want to run is bound.
+        // The thread we want to run is unbound.
        {
          IF_DEBUG(scheduler,
            sched_belch("### this OS thread cannot run thread %d", t->id));
          // no, the current native thread is bound to a different
          // Haskell thread, so pass it to any worker thread
          PUSH_ON_RUN_QUEUE(t);
-         passCapabilityToWorker();
          continue; 
        }
       }
@@ -771,7 +771,7 @@ run_thread:
     }
 
     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
-    if (ready_to_gc) { scheduleDoGC(cap); }
+    if (ready_to_gc) { scheduleDoGC(rtsFalse); }
   } /* end of while() */
 
   IF_PAR_DEBUG(verbose,
@@ -840,8 +840,9 @@ scheduleCheckBlockedThreads(void)
 #if defined(RTS_SUPPORTS_THREADS)
        // We shouldn't be here...
        barf("schedule: awaitEvent() in threaded RTS");
-#endif
+#else
        awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+#endif
     }
 }
 
@@ -866,7 +867,7 @@ scheduleCheckBlackHoles( void )
  * ------------------------------------------------------------------------- */
 
 static void
-scheduleDetectDeadlock(void)
+scheduleDetectDeadlock()
 {
 
 #if defined(PARALLEL_HASKELL)
@@ -900,7 +901,7 @@ scheduleDetectDeadlock(void)
        // exception.  Any threads thus released will be immediately
        // runnable.
 
-       GarbageCollect(GetRoots,rtsTrue);
+       scheduleDoGC( rtsTrue/*force  major GC*/ );
        recent_activity = ACTIVITY_DONE_GC;
        if ( !EMPTY_RUN_QUEUE() ) return;
 
@@ -1883,10 +1884,11 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
  * -------------------------------------------------------------------------- */
 
 static void
-scheduleDoGC( Capability *cap STG_UNUSED )
+scheduleDoGC( rtsBool force_major )
 {
     StgTSO *t;
 #ifdef SMP
+    Capability *cap;
     static rtsBool waiting_for_gc;
     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
            // subtract one because we're already holding one.
@@ -1904,12 +1906,16 @@ scheduleDoGC( Capability *cap STG_UNUSED )
     // actually did the GC.  But it's quite hard to arrange for all
     // the other tasks to sleep and stay asleep.
     //
+    // This does mean that there will be multiple entries in the 
+    // thread->capability hash table for the current thread, but
+    // they will be removed as normal when the capabilities are
+    // released again.
+    //
        
     // Someone else is already trying to GC
     if (waiting_for_gc) return;
     waiting_for_gc = rtsTrue;
 
-    caps[n_capabilities] = cap;
     while (n_capabilities > 0) {
        IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
        waitForReturnCapability(&sched_mutex, &cap);
@@ -1924,19 +1930,28 @@ scheduleDoGC( Capability *cap STG_UNUSED )
      * atomically frames.  When next scheduled they will try to
      * commit, this commit will fail and they will retry.
      */
-    for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
-       if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
-           if (!stmValidateTransaction (t -> trec)) {
-               IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-               
-               // strip the stack back to the ATOMICALLY_FRAME, aborting
-               // the (nested) transaction, and saving the stack of any
-               // partially-evaluated thunks on the heap.
-               raiseAsync_(t, NULL, rtsTrue);
-               
+    { 
+       StgTSO *next;
+
+       for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+           if (t->what_next == ThreadRelocated) {
+               next = t->link;
+           } else {
+               next = t->global_link;
+               if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+                   if (!stmValidateNestOfTransactions (t -> trec)) {
+                       IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+                       
+                       // strip the stack back to the ATOMICALLY_FRAME, aborting
+                       // the (nested) transaction, and saving the stack of any
+                       // partially-evaluated thunks on the heap.
+                       raiseAsync_(t, NULL, rtsTrue);
+                       
 #ifdef REG_R1
-               ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+                       ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
 #endif
+                   }
+               }
            }
        }
     }
@@ -1954,7 +1969,7 @@ scheduleDoGC( Capability *cap STG_UNUSED )
 #if defined(RTS_SUPPORTS_THREADS)
     IF_DEBUG(scheduler,sched_belch("doing GC"));
 #endif
-    GarbageCollect(GetRoots,rtsFalse);
+    GarbageCollect(GetRoots, force_major);
     
 #if defined(SMP)
     {
@@ -3226,6 +3241,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
 void
 awakenBlockedQueueNoLock(StgTSO *tso)
 {
+  if (tso == NULL) return; // hack; see bug #1235728, and comments in
+                          // Exception.cmm
   while (tso != END_TSO_QUEUE) {
     tso = unblockOneLocked(tso);
   }
@@ -3234,6 +3251,8 @@ awakenBlockedQueueNoLock(StgTSO *tso)
 void
 awakenBlockedQueue(StgTSO *tso)
 {
+  if (tso == NULL) return; // hack; see bug #1235728, and comments in
+                          // Exception.cmm
   ACQUIRE_LOCK(&sched_mutex);
   while (tso != END_TSO_QUEUE) {
     tso = unblockOneLocked(tso);
@@ -4088,7 +4107,7 @@ printThreadBlockage(StgTSO *tso)
     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
     break;
   case BlockedOnMVar:
-    debugBelch("is blocked on an MVar");
+    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
   case BlockedOnException:
     debugBelch("is blocked on delivering an exception to thread %d",
@@ -4162,7 +4181,7 @@ printAllThreads(void)
 # endif
 
   for (t = all_threads; t != END_TSO_QUEUE; ) {
-    debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
 #if defined(DEBUG)
     {
       void *label = lookupThreadLabel(t->id);
@@ -4179,9 +4198,27 @@ printAllThreads(void)
     }
   }
 }
-    
+
 #ifdef DEBUG
 
+// useful from gdb
+void 
+printThreadQueue(StgTSO *t)
+{
+    nat i = 0;
+    for (; t != END_TSO_QUEUE; t = t->link) {
+       debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+       if (t->what_next == ThreadRelocated) {
+           debugBelch("has been relocated...\n");
+       } else {
+           printThreadStatus(t);
+           debugBelch("\n");
+       }
+       i++;
+    }
+    debugBelch("%d threads on queue\n", i);
+}
+
 /* 
    Print a whole blocking queue attached to node (debugging only).
 */