[project @ 2005-10-20 11:45:19 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 7f85690..b78f9d2 100644 (file)
@@ -463,7 +463,8 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
       // Yield the capability to higher-priority tasks if necessary.
       //
       if (cap != NULL) {
-         yieldCapability(&cap);
+         yieldCapability(&cap, 
+                         mainThread ? &mainThread->bound_thread_cond : NULL );
       }
 
       // If we do not currently hold a capability, we wait for one
@@ -475,7 +476,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       // We now have a capability...
 #endif
-      
+
 #if 0 /* extra sanity checking */
       { 
          StgMainThread *m;
@@ -627,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
            sched_belch("### thread %d bound to another OS thread", t->id));
          // no, bound to a different Haskell thread: pass to that thread
          PUSH_ON_RUN_QUEUE(t);
-         passCapability(&m->bound_thread_cond);
          continue;
        }
       }
       else
       {
        if(mainThread != NULL)
-        // The thread we want to run is bound.
+        // The thread we want to run is unbound.
        {
          IF_DEBUG(scheduler,
            sched_belch("### this OS thread cannot run thread %d", t->id));
          // no, the current native thread is bound to a different
          // Haskell thread, so pass it to any worker thread
          PUSH_ON_RUN_QUEUE(t);
-         passCapabilityToWorker();
          continue; 
        }
       }
@@ -708,12 +707,6 @@ run_thread:
     cap = myCapability();
 #endif
 
-    // We have run some Haskell code: there might be blackhole-blocked
-    // threads to wake up now.
-    if ( blackhole_queue != END_TSO_QUEUE ) {
-       blackholes_need_checking = rtsTrue;
-    }
-
     cap->r.rInHaskell = rtsFalse;
 
     // The TSO might have moved, eg. if it re-entered the RTS and a GC
@@ -732,6 +725,12 @@ run_thread:
 #endif
     
     ACQUIRE_LOCK(&sched_mutex);
+
+    // We have run some Haskell code: there might be blackhole-blocked
+    // threads to wake up now.
+    if ( blackhole_queue != END_TSO_QUEUE ) {
+       blackholes_need_checking = rtsTrue;
+    }
     
 #if defined(RTS_SUPPORTS_THREADS)
     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
@@ -841,8 +840,9 @@ scheduleCheckBlockedThreads(void)
 #if defined(RTS_SUPPORTS_THREADS)
        // We shouldn't be here...
        barf("schedule: awaitEvent() in threaded RTS");
-#endif
+#else
        awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+#endif
     }
 }
 
@@ -1486,7 +1486,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
                                               // if the nursery has only one block.
            
+           ACQUIRE_SM_LOCK
            bd = allocGroup( blocks );
+           RELEASE_SM_LOCK
            cap->r.rNursery->n_blocks += blocks;
            
            // link the new group into the list
@@ -1906,6 +1908,11 @@ scheduleDoGC( rtsBool force_major )
     // actually did the GC.  But it's quite hard to arrange for all
     // the other tasks to sleep and stay asleep.
     //
+    // This does mean that there will be multiple entries in the 
+    // thread->capability hash table for the current thread, but
+    // they will be removed as normal when the capabilities are
+    // released again.
+    //
        
     // Someone else is already trying to GC
     if (waiting_for_gc) return;
@@ -1925,19 +1932,28 @@ scheduleDoGC( rtsBool force_major )
      * atomically frames.  When next scheduled they will try to
      * commit, this commit will fail and they will retry.
      */
-    for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
-       if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
-           if (!stmValidateTransaction (t -> trec)) {
-               IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-               
-               // strip the stack back to the ATOMICALLY_FRAME, aborting
-               // the (nested) transaction, and saving the stack of any
-               // partially-evaluated thunks on the heap.
-               raiseAsync_(t, NULL, rtsTrue);
-               
+    { 
+       StgTSO *next;
+
+       for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+           if (t->what_next == ThreadRelocated) {
+               next = t->link;
+           } else {
+               next = t->global_link;
+               if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+                   if (!stmValidateNestOfTransactions (t -> trec)) {
+                       IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+                       
+                       // strip the stack back to the ATOMICALLY_FRAME, aborting
+                       // the (nested) transaction, and saving the stack of any
+                       // partially-evaluated thunks on the heap.
+                       raiseAsync_(t, NULL, rtsTrue);
+                       
 #ifdef REG_R1
-               ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+                       ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
 #endif
+                   }
+               }
            }
        }
     }
@@ -2297,7 +2313,6 @@ StgTSO *
 createThread(nat size)
 #endif
 {
-
     StgTSO *tso;
     nat stack_size;
 
@@ -2528,8 +2543,8 @@ activateSpark (rtsSpark spark)
  * on this thread's stack before the scheduler is invoked.
  * ------------------------------------------------------------------------ */
 
-static void
-scheduleThread_(StgTSO *tso)
+void
+scheduleThreadLocked(StgTSO *tso)
 {
   // The thread goes at the *end* of the run-queue, to avoid possible
   // starvation of any threads already on the queue.
@@ -2541,7 +2556,7 @@ void
 scheduleThread(StgTSO* tso)
 {
   ACQUIRE_LOCK(&sched_mutex);
-  scheduleThread_(tso);
+  scheduleThreadLocked(tso);
   RELEASE_LOCK(&sched_mutex);
 }
 
@@ -2682,9 +2697,62 @@ exitScheduler( void )
 {
     interrupted = rtsTrue;
     shutting_down_scheduler = rtsTrue;
+
 #if defined(RTS_SUPPORTS_THREADS)
     if (threadIsTask(osThreadId())) { taskStop(); }
     stopTaskManager();
+    //
+    // What can we do here?  There are a bunch of worker threads, it
+    // might be nice to let them exit cleanly.  There may be some main
+    // threads in the run queue; we should let them return to their
+    // callers with an Interrupted state.  We can't in general wait
+    // for all the running Tasks to stop, because some might be off in
+    // a C call that is blocked.
+    // 
+    // Letting the run queue drain is the safest thing.  That lets any
+    // main threads return that can return, and cleans up all the
+    // runnable threads.  Then we grab all the Capabilities to stop
+    // anything unexpected happening while we shut down.
+    //
+    // ToDo: this doesn't let us get the time stats from the worker
+    // tasks, because they haven't called taskStop().
+    //
+    ACQUIRE_LOCK(&sched_mutex);
+    { 
+       nat i;
+       for (i = 1000; i > 0; i--) {
+           if (EMPTY_RUN_QUEUE()) {
+               IF_DEBUG(scheduler, sched_belch("run queue is empty"));
+               break;
+           }
+           IF_DEBUG(scheduler, sched_belch("yielding"));
+           RELEASE_LOCK(&sched_mutex);
+           prodWorker();
+           yieldThread();
+           ACQUIRE_LOCK(&sched_mutex);
+       }
+    }
+
+#ifdef SMP
+    {
+       Capability *cap;
+       int n_capabilities = RtsFlags.ParFlags.nNodes; 
+       Capability *caps[n_capabilities];
+       nat i;
+
+       while (n_capabilities > 0) {
+           IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities));
+           waitForReturnCapability(&sched_mutex, &cap);
+           n_capabilities--;
+           caps[n_capabilities] = cap;
+       }
+    }
+#else
+    {
+       Capability *cap;
+       waitForReturnCapability(&sched_mutex, &cap);
+    }
+#endif
 #endif
 }
 
@@ -3227,6 +3295,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
 void
 awakenBlockedQueueNoLock(StgTSO *tso)
 {
+  if (tso == NULL) return; // hack; see bug #1235728, and comments in
+                          // Exception.cmm
   while (tso != END_TSO_QUEUE) {
     tso = unblockOneLocked(tso);
   }
@@ -3235,6 +3305,8 @@ awakenBlockedQueueNoLock(StgTSO *tso)
 void
 awakenBlockedQueue(StgTSO *tso)
 {
+  if (tso == NULL) return; // hack; see bug #1235728, and comments in
+                          // Exception.cmm
   ACQUIRE_LOCK(&sched_mutex);
   while (tso != END_TSO_QUEUE) {
     tso = unblockOneLocked(tso);
@@ -3593,6 +3665,7 @@ checkBlackHoles( void )
        ASSERT(t->why_blocked == BlockedOnBlackHole);
        type = get_itbl(t->block_info.closure)->type;
        if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
+           IF_DEBUG(sanity,checkTSO(t));
            t = unblockOneLocked(t);
            *prev = t;
            any_woke_up = rtsTrue;
@@ -3794,7 +3867,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
            // we've got an exception to raise, so let's pass it to the
            // handler in this frame.
            //
-           raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
+           raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
            TICK_ALLOC_SE_THK(1,0);
            SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
            raise->payload[0] = exception;