[project @ 2005-11-02 12:26:21 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index a7696ed..15d2181 100644 (file)
@@ -210,6 +210,9 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
+#if defined(SMP)
+static void schedulePushWork(Capability *cap, Task *task);
+#endif
 static void scheduleStartSignalHandlers (void);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
@@ -339,7 +342,9 @@ schedule (Capability *initialCapability, Task *task)
 #endif
   nat prev_what_next;
   rtsBool ready_to_gc;
+#if defined(THREADED_RTS)
   rtsBool first = rtsTrue;
+#endif
   
   cap = initialCapability;
 
@@ -379,15 +384,16 @@ schedule (Capability *initialCapability, Task *task)
          // thread for a bit, even if there are others banging at the
          // door.
          first = rtsFalse;
+         ASSERT_CAPABILITY_INVARIANTS(cap,task);
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
       }
 #endif
       
-      ASSERT(cap->running_task == task);
-      ASSERT(task->cap == cap);
-      ASSERT(myTask() == task);
+#ifdef SMP
+      schedulePushWork(cap,task);
+#endif         
 
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -602,6 +608,10 @@ run_thread:
 
     cap->in_haskell = rtsFalse;
 
+    // The TSO might have moved, eg. if it re-entered the RTS and a GC
+    // happened.  So find the new location:
+    t = cap->r.rCurrentTSO;
+
 #ifdef SMP
     // If ret is ThreadBlocked, and this Task is bound to the TSO that
     // blocked, we are in limbo - the TSO is now owned by whatever it
@@ -609,16 +619,15 @@ run_thread:
     // perhaps even on a different Capability.  It may be the case
     // that task->cap != cap.  We better yield this Capability
     // immediately and return to normaility.
-    if (ret == ThreadBlocked) continue;
+    if (ret == ThreadBlocked) {
+       IF_DEBUG(scheduler,
+                debugBelch("--<< thread %d (%s) stopped: blocked\n",
+                           t->id, whatNext_strs[t->what_next]));
+       continue;
+    }
 #endif
 
-    ASSERT(cap->running_task == task);
-    ASSERT(task->cap == cap);
-    ASSERT(myTask() == task);
-
-    // The TSO might have moved, eg. if it re-entered the RTS and a GC
-    // happened.  So find the new location:
-    t = cap->r.rCurrentTSO;
+    ASSERT_CAPABILITY_INVARIANTS(cap,task);
 
     // And save the current errno in this thread.
     t->saved_errno = errno;
@@ -670,6 +679,7 @@ run_thread:
 
     case ThreadFinished:
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+       ASSERT_CAPABILITY_INVARIANTS(cap,task);
        break;
 
     default:
@@ -710,6 +720,88 @@ schedulePreLoop(void)
 #endif
 }
 
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+#ifdef SMP
+static void
+schedulePushWork(Capability *cap USED_WHEN_SMP, 
+                Task *task      USED_WHEN_SMP)
+{
+    Capability *free_caps[n_capabilities], *cap0;
+    nat i, n_free_caps;
+
+    // Check whether we have more threads on our run queue that we
+    // could hand to another Capability.
+    if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+       return;
+    }
+
+    // First grab as many free Capabilities as we can.
+    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+       cap0 = &capabilities[i];
+       if (cap != cap0 && tryGrabCapability(cap0,task)) {
+           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+               // it already has some work, we just grabbed it at 
+               // the wrong moment.  Or maybe it's deadlocked!
+               releaseCapability(cap0);
+           } else {
+               free_caps[n_free_caps++] = cap0;
+           }
+       }
+    }
+
+    // we now have n_free_caps free capabilities stashed in
+    // free_caps[].  Share our run queue equally with them.  This is
+    // probably the simplest thing we could do; improvements we might
+    // want to do include:
+    //
+    //   - giving high priority to moving relatively new threads, on 
+    //     the gournds that they haven't had time to build up a
+    //     working set in the cache on this CPU/Capability.
+    //
+    //   - giving low priority to moving long-lived threads
+
+    if (n_free_caps > 0) {
+       StgTSO *prev, *t, *next;
+       IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+       prev = cap->run_queue_hd;
+       t = prev->link;
+       prev->link = END_TSO_QUEUE;
+       i = 0;
+       for (; t != END_TSO_QUEUE; t = next) {
+           next = t->link;
+           t->link = END_TSO_QUEUE;
+           if (t->what_next == ThreadRelocated) {
+               prev->link = t;
+               prev = t;
+           } else if (i == n_free_caps) {
+               i = 0;
+               // keep one for us
+               prev->link = t;
+               prev = t;
+           } else {
+               appendToRunQueue(free_caps[i],t);
+               if (t->bound) { t->bound->cap = free_caps[i]; }
+               i++;
+           }
+       }
+       cap->run_queue_tl = prev;
+
+       // release the capabilities
+       for (i = 0; i < n_free_caps; i++) {
+           task->cap = free_caps[i];
+           releaseCapability(free_caps[i]);
+       }
+    }
+    task->cap = cap; // reset to point to our Capability.
+}
+#endif
+
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
  * ------------------------------------------------------------------------- */
@@ -1467,11 +1559,10 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
        /* enlarge the stack */
        StgTSO *new_t = threadStackOverflow(cap, t);
        
-       /* This TSO has moved, so update any pointers to it from the
-        * main thread stack.  It better not be on any other queues...
-        * (it shouldn't be).
+       /* The TSO attached to this Task may have moved, so update the
+        * pointer to it.
         */
-       if (task->tso != NULL) {
+       if (task->tso == t) {
            task->tso = new_t;
        }
        pushOnRunQueue(cap,new_t);
@@ -1780,7 +1871,13 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
     //
        
     was_waiting = cas(&waiting_for_gc, 0, 1);
-    if (was_waiting) return;
+    if (was_waiting) {
+       do {
+           IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+           yieldCapability(&cap,task);
+       } while (waiting_for_gc);
+       return;
+    }
 
     for (i=0; i < n_capabilities; i++) {
        IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
@@ -1792,6 +1889,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
            // all the Capabilities, but even so it's a slightly
            // unsavoury invariant.
            task->cap = pcap;
+           context_switch = 1;
            waitForReturnCapability(&pcap, task);
            if (pcap != &capabilities[i]) {
                barf("scheduleDoGC: got the wrong capability");
@@ -1934,6 +2032,7 @@ forkProcess(HsStablePtr *entry
     if (pid) { // parent
        
        // just return the pid
+       rts_unlock(cap);
        return pid;
        
     } else { // child
@@ -2462,6 +2561,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     cap = schedule(cap,task);
 
     ASSERT(task->stat != NoStatus);
+    ASSERT_CAPABILITY_INVARIANTS(cap,task);
 
     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
     return cap;
@@ -2767,7 +2867,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
 
-  IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
+  IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
 
   dest = (StgTSO *)allocate(new_tso_size);
   TICK_ALLOC_TSO(new_stack_size,0);
@@ -3989,25 +4089,37 @@ printThreadBlockage(StgTSO *tso)
   }
 }
 
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
 {
-  switch (tso->what_next) {
-  case ThreadKilled:
-    debugBelch("has been killed");
-    break;
-  case ThreadComplete:
-    debugBelch("has completed");
-    break;
-  default:
-    printThreadBlockage(tso);
-  }
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+    {
+      void *label = lookupThreadLabel(t->id);
+      if (label) debugBelch("[\"%s\"] ",(char *)label);
+    }
+    if (t->what_next == ThreadRelocated) {
+       debugBelch("has been relocated...\n");
+    } else {
+       switch (t->what_next) {
+       case ThreadKilled:
+           debugBelch("has been killed");
+           break;
+       case ThreadComplete:
+           debugBelch("has completed");
+           break;
+       default:
+           printThreadBlockage(t);
+       }
+       debugBelch("\n");
+    }
 }
 
 void
 printAllThreads(void)
 {
-  StgTSO *t;
+  StgTSO *t, *next;
+  nat i;
+  Capability *cap;
 
 # if defined(GRAN)
   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
@@ -4025,20 +4137,23 @@ printAllThreads(void)
   debugBelch("all threads:\n");
 # endif
 
-  for (t = all_threads; t != END_TSO_QUEUE; ) {
-    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
-    {
-      void *label = lookupThreadLabel(t->id);
-      if (label) debugBelch("[\"%s\"] ",(char *)label);
-    }
-    if (t->what_next == ThreadRelocated) {
-       debugBelch("has been relocated...\n");
-       t = t->link;
-    } else {
-       printThreadStatus(t);
-       debugBelch("\n");
-       t = t->global_link;
-    }
+  for (i = 0; i < n_capabilities; i++) {
+      cap = &capabilities[i];
+      debugBelch("threads on capability %d:\n", cap->no);
+      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+         printThreadStatus(t);
+      }
+  }
+
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->why_blocked != NotBlocked) {
+         printThreadStatus(t);
+      }
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+      }
   }
 }
 
@@ -4048,13 +4163,7 @@ printThreadQueue(StgTSO *t)
 {
     nat i = 0;
     for (; t != END_TSO_QUEUE; t = t->link) {
-       debugBelch("\tthread %d @ %p ", t->id, (void *)t);
-       if (t->what_next == ThreadRelocated) {
-           debugBelch("has been relocated...\n");
-       } else {
-           printThreadStatus(t);
-           debugBelch("\n");
-       }
+       printThreadStatus(t);
        i++;
     }
     debugBelch("%d threads on queue\n", i);