[project @ 2005-10-27 15:26:06 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 5cad5b2..2bc7472 100644 (file)
@@ -210,6 +210,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
+static void schedulePushWork(Capability *cap, Task *task);
 static void scheduleStartSignalHandlers (void);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
@@ -367,10 +368,6 @@ schedule (Capability *initialCapability, Task *task)
 
   while (TERMINATION_CONDITION) {
 
-      ASSERT(cap->running_task == task);
-      ASSERT(task->cap == cap);
-      ASSERT(myTask() == task);
-
 #if defined(GRAN)
       /* Choose the processor with the next event */
       CurrentProc = event->proc;
@@ -383,11 +380,16 @@ schedule (Capability *initialCapability, Task *task)
          // thread for a bit, even if there are others banging at the
          // door.
          first = rtsFalse;
+         ASSERT_CAPABILITY_INVARIANTS(cap,task);
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
       }
 #endif
+      
+#ifdef SMP
+      schedulePushWork(cap,task);
+#endif         
 
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -408,11 +410,12 @@ schedule (Capability *initialCapability, Task *task)
        deleteRunQueue(cap);
        if (shutting_down_scheduler) {
            IF_DEBUG(scheduler, sched_belch("shutting down"));
-           if (task->tso) { // we are bound
-               task->stat = Interrupted;
-               task->ret  = NULL;
+           // If we are a worker, just exit.  If we're a bound thread
+           // then we will exit below when we've removed our TSO from
+           // the run queue.
+           if (task->tso == NULL) {
+               return cap;
            }
-           return cap;
        } else {
            IF_DEBUG(scheduler, sched_belch("interrupted"));
        }
@@ -574,36 +577,54 @@ run_thread:
     recent_activity = ACTIVITY_YES;
 
     switch (prev_what_next) {
-
+       
     case ThreadKilled:
     case ThreadComplete:
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;
-
+       
     case ThreadRunGHC:
-       ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+    {
+       StgRegTable *r;
+       r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+       cap = regTableToCapability(r);
+       ret = r->rRet;
        break;
-
+    }
+    
     case ThreadInterpret:
-       ret = interpretBCO(cap);
+       cap = interpretBCO(cap);
+       ret = cap->r.rRet;
        break;
-
+       
     default:
-      barf("schedule: invalid what_next field");
+       barf("schedule: invalid what_next field");
     }
 
-    // in SMP mode, we might return with a different capability than
-    // we started with, if the Haskell thread made a foreign call.  So
-    // let's find out what our current Capability is:
-    cap = task->cap;
-
     cap->in_haskell = rtsFalse;
 
     // The TSO might have moved, eg. if it re-entered the RTS and a GC
     // happened.  So find the new location:
     t = cap->r.rCurrentTSO;
 
+#ifdef SMP
+    // If ret is ThreadBlocked, and this Task is bound to the TSO that
+    // blocked, we are in limbo - the TSO is now owned by whatever it
+    // is blocked on, and may in fact already have been woken up,
+    // perhaps even on a different Capability.  It may be the case
+    // that task->cap != cap.  We better yield this Capability
+    // immediately and return to normaility.
+    if (ret == ThreadBlocked) {
+       IF_DEBUG(scheduler,
+                debugBelch("--<< thread %d (%s) stopped: blocked\n",
+                           t->id, whatNext_strs[t->what_next]));
+       continue;
+    }
+#endif
+
+    ASSERT_CAPABILITY_INVARIANTS(cap,task);
+
     // And save the current errno in this thread.
     t->saved_errno = errno;
 
@@ -654,6 +675,7 @@ run_thread:
 
     case ThreadFinished:
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+       ASSERT_CAPABILITY_INVARIANTS(cap,task);
        break;
 
     default:
@@ -694,6 +716,87 @@ schedulePreLoop(void)
 #endif
 }
 
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+static void
+schedulePushWork(Capability *cap, Task *task)
+{
+#ifdef SMP
+    Capability *free_caps[n_capabilities], *cap0;
+    nat i, n_free_caps;
+
+    // Check whether we have more threads on our run queue that we
+    // could hand to another Capability.
+    if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+       return;
+    }
+
+    // First grab as many free Capabilities as we can.
+    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+       cap0 = &capabilities[i];
+       if (cap != cap0 && tryGrabCapability(cap0,task)) {
+           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+               // it already has some work, we just grabbed it at 
+               // the wrong moment.  Or maybe it's deadlocked!
+               releaseCapability(cap0);
+           } else {
+               free_caps[n_free_caps++] = cap0;
+           }
+       }
+    }
+
+    // we now have n_free_caps free capabilities stashed in
+    // free_caps[].  Share our run queue equally with them.  This is
+    // probably the simplest thing we could do; improvements we might
+    // want to do include:
+    //
+    //   - giving high priority to moving relatively new threads, on 
+    //     the gournds that they haven't had time to build up a
+    //     working set in the cache on this CPU/Capability.
+    //
+    //   - giving low priority to moving long-lived threads
+
+    if (n_free_caps > 0) {
+       StgTSO *prev, *t, *next;
+       IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+       prev = cap->run_queue_hd;
+       t = prev->link;
+       prev->link = END_TSO_QUEUE;
+       i = 0;
+       for (; t != END_TSO_QUEUE; t = next) {
+           next = t->link;
+           t->link = END_TSO_QUEUE;
+           if (t->what_next == ThreadRelocated) {
+               prev->link = t;
+               prev = t;
+           } else if (i == n_free_caps) {
+               i = 0;
+               // keep one for us
+               prev->link = t;
+               prev = t;
+           } else {
+               appendToRunQueue(free_caps[i],t);
+               if (t->bound) { t->bound->cap = free_caps[i]; }
+               i++;
+           }
+       }
+       cap->run_queue_tl = prev;
+
+       // release the capabilities
+       for (i = 0; i < n_free_caps; i++) {
+           task->cap = free_caps[i];
+           releaseCapability(free_caps[i]);
+       }
+    }
+    task->cap = cap; // reset to point to our Capability.
+#endif
+}
+
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
  * ------------------------------------------------------------------------- */
@@ -1764,7 +1867,13 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
     //
        
     was_waiting = cas(&waiting_for_gc, 0, 1);
-    if (was_waiting) return;
+    if (was_waiting) {
+       do {
+           IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+           yieldCapability(&cap,task);
+       } while (waiting_for_gc);
+       return;
+    }
 
     for (i=0; i < n_capabilities; i++) {
        IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
@@ -1776,6 +1885,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
            // all the Capabilities, but even so it's a slightly
            // unsavoury invariant.
            task->cap = pcap;
+           context_switch = 1;
            waitForReturnCapability(&pcap, task);
            if (pcap != &capabilities[i]) {
                barf("scheduleDoGC: got the wrong capability");
@@ -1918,6 +2028,7 @@ forkProcess(HsStablePtr *entry
     if (pid) { // parent
        
        // just return the pid
+       rts_unlock(cap);
        return pid;
        
     } else { // child
@@ -2446,6 +2557,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     cap = schedule(cap,task);
 
     ASSERT(task->stat != NoStatus);
+    ASSERT_CAPABILITY_INVARIANTS(cap,task);
 
     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
     return cap;
@@ -3973,25 +4085,37 @@ printThreadBlockage(StgTSO *tso)
   }
 }
 
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
 {
-  switch (tso->what_next) {
-  case ThreadKilled:
-    debugBelch("has been killed");
-    break;
-  case ThreadComplete:
-    debugBelch("has completed");
-    break;
-  default:
-    printThreadBlockage(tso);
-  }
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+    {
+      void *label = lookupThreadLabel(t->id);
+      if (label) debugBelch("[\"%s\"] ",(char *)label);
+    }
+    if (t->what_next == ThreadRelocated) {
+       debugBelch("has been relocated...\n");
+    } else {
+       switch (t->what_next) {
+       case ThreadKilled:
+           debugBelch("has been killed");
+           break;
+       case ThreadComplete:
+           debugBelch("has completed");
+           break;
+       default:
+           printThreadBlockage(t);
+       }
+       debugBelch("\n");
+    }
 }
 
 void
 printAllThreads(void)
 {
-  StgTSO *t;
+  StgTSO *t, *next;
+  nat i;
+  Capability *cap;
 
 # if defined(GRAN)
   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
@@ -4009,20 +4133,23 @@ printAllThreads(void)
   debugBelch("all threads:\n");
 # endif
 
-  for (t = all_threads; t != END_TSO_QUEUE; ) {
-    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
-    {
-      void *label = lookupThreadLabel(t->id);
-      if (label) debugBelch("[\"%s\"] ",(char *)label);
-    }
-    if (t->what_next == ThreadRelocated) {
-       debugBelch("has been relocated...\n");
-       t = t->link;
-    } else {
-       printThreadStatus(t);
-       debugBelch("\n");
-       t = t->global_link;
-    }
+  for (i = 0; i < n_capabilities; i++) {
+      cap = &capabilities[i];
+      debugBelch("threads on capability %d:\n", cap->no);
+      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+         printThreadStatus(t);
+      }
+  }
+
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->why_blocked != NotBlocked) {
+         printThreadStatus(t);
+      }
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+      }
   }
 }
 
@@ -4032,13 +4159,7 @@ printThreadQueue(StgTSO *t)
 {
     nat i = 0;
     for (; t != END_TSO_QUEUE; t = t->link) {
-       debugBelch("\tthread %d @ %p ", t->id, (void *)t);
-       if (t->what_next == ThreadRelocated) {
-           debugBelch("has been relocated...\n");
-       } else {
-           printThreadStatus(t);
-           debugBelch("\n");
-       }
+       printThreadStatus(t);
        i++;
     }
     debugBelch("%d threads on queue\n", i);