Work stealing for sparks
[ghc-hetmet.git] / rts / Schedule.c
index a41dd67..09150fd 100644 (file)
@@ -89,11 +89,6 @@ StgTSO *blackhole_queue = NULL;
  */
 rtsBool blackholes_need_checking = rtsFalse;
 
-/* flag set by signal handler to precipitate a context switch
- * LOCK: none (just an advisory flag)
- */
-int context_switch = 0;
-
 /* flag that tracks whether we have done any execution in this time slice.
  * LOCK: currently none, perhaps we should lock (but needs to be
  * updated in the fast path of the scheduler).
@@ -142,20 +137,20 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
-#if defined(THREADED_RTS)
-static void schedulePushWork(Capability *cap, Task *task);
-#endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
 static rtsBool scheduleGetRemoteWork(Capability *cap);
+#if defined(PARALLEL_HASKELL)
 static void scheduleSendPendingMessages(void);
+#endif
 static void scheduleActivateSpark(Capability *cap);
 #endif
-static void schedulePostRunThread(StgTSO *t);
+static void schedulePostRunThread(Capability *cap, StgTSO *t);
 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
                                         StgTSO *t);
@@ -296,13 +291,15 @@ schedule (Capability *initialCapability, Task *task)
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
+         /* inside yieldCapability, attempts to steal work from other
+            capabilities, unless the capability has own work. 
+            See (REMARK) below.
+         */
       }
 #endif
-      
-#if defined(THREADED_RTS)
-      schedulePushWork(cap,task);
-#endif
 
+    /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
+      
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -370,21 +367,7 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
-#if defined(THREADED_RTS)
-    // If the run queue is empty, take a spark and turn it into a thread.
-    {
-       if (emptyRunQueue(cap)) {
-           StgClosure *spark;
-           spark = findSpark(cap);
-           if (spark != NULL) {
-               debugTrace(DEBUG_sched,
-                          "turning spark of closure %p into a thread",
-                          (StgClosure *)spark);
-               createSparkThread(cap,spark);     
-           }
-       }
-    }
-#endif // THREADED_RTS
+    /* this was the place to activate a spark, now below... */
 
     scheduleStartSignalHandlers(cap);
 
@@ -398,11 +381,19 @@ schedule (Capability *initialCapability, Task *task)
 
     scheduleCheckBlockedThreads(cap);
 
-#if defined(PARALLEL_HASKELL)
-    /* message processing and work distribution goes here */ 
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+    /* work distribution in multithreaded and parallel systems 
 
+       REMARK: IMHO best location for work-stealing as well.
+       tests above might yield some new jobs, so no need to steal a
+       spark in some cases. I believe the yieldCapability.. above
+       should be moved here.
+    */
+
+#if defined(PARALLEL_HASKELL)
     /* if messages have been buffered... a NOOP in THREADED_RTS */
     scheduleSendPendingMessages();
+#endif
 
     /* If the run queue is empty,...*/
     if (emptyRunQueue(cap)) {
@@ -411,6 +402,7 @@ schedule (Capability *initialCapability, Task *task)
 
        /* if this did not work, try to steal a spark from someone else */
       if (emptyRunQueue(cap)) {
+#if defined(PARALLEL_HASKELL)
        receivedFinish = scheduleGetRemoteWork(cap);
        continue; //  a new round, (hopefully) with new work
        /* 
@@ -419,10 +411,20 @@ schedule (Capability *initialCapability, Task *task)
                        b) (blocking) awaits and receives messages
           
           in Eden, this is only the blocking receive, as b) in GUM.
+
+          in Threaded-RTS, this does plain nothing. Stealing routine
+               is inside Capability.c and called from
+               yieldCapability() at the very beginning, see REMARK.
        */
+#endif
       }
-    } 
+    } else { /* i.e. run queue was (initially) not empty */
+      schedulePushWork(cap,task);
+      /* work pushing, currently relevant only for THREADED_RTS:
+        (pushes threads, wakes up idle capabilities for stealing) */
+    }
 
+#if defined(PARALLEL_HASKELL)
     /* since we perform a blocking receive and continue otherwise,
        either we never reach here or we definitely have work! */
     // from here: non-empty run queue
@@ -435,7 +437,9 @@ schedule (Capability *initialCapability, Task *task)
                                above, waits for messages as well! */
       processMessages(cap, &receivedFinish);
     }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL: non-empty run queue!
+
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
     scheduleDetectDeadlock(cap,task);
 #if defined(THREADED_RTS)
@@ -504,7 +508,7 @@ schedule (Capability *initialCapability, Task *task)
      */
     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
        && !emptyThreadQueues(cap)) {
-       context_switch = 1;
+       cap->context_switch = 1;
     }
         
 run_thread:
@@ -627,7 +631,7 @@ run_thread:
     CCCS = CCS_SYSTEM;
 #endif
     
-    schedulePostRunThread(t);
+    schedulePostRunThread(cap,t);
 
     t = threadStackUnderflow(task,t);
 
@@ -684,11 +688,15 @@ schedulePreLoop(void)
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
 {
+  /* following code not for PARALLEL_HASKELL. I kept the call general,
+     future GUM versions might use pushing in a distributed setup */
+#if defined(THREADED_RTS)
+
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
@@ -731,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        StgTSO *prev, *t, *next;
        rtsBool pushed_to_all;
 
-       debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
+       debugTrace(DEBUG_sched, 
+                  "cap %d: %s and %d free capabilities, sharing...", 
+                  cap->no, 
+                  (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+                  "excess threads on run queue":"sparks to share (>=2)",
+                  n_free_caps);
 
        i = 0;
        pushed_to_all = rtsFalse;
@@ -765,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            cap->run_queue_tl = prev;
        }
 
+#ifdef SPARK_PUSHING
+       /* JB I left this code in place, it would work but is not necessary */
+
        // If there are some free capabilities that we didn't push any
        // threads to, then try to push a spark to each one.
        if (!pushed_to_all) {
@@ -780,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                }
            }
        }
+#endif /* SPARK_PUSHING */
 
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
            releaseCapability(free_caps[i]);
        }
+       // now wake them all up, and they might steal sparks if
+       // the did not get a thread
+       prodAllCapabilities();
     }
     task->cap = cap; // reset to point to our Capability.
+
+#endif /* THREADED_RTS */
+
 }
-#endif
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
@@ -970,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
  * ------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL)
-static StgTSO *
+static void
 scheduleSendPendingMessages(void)
 {
 
@@ -989,10 +1012,10 @@ scheduleSendPendingMessages(void)
 #endif
 
 /* ----------------------------------------------------------------------------
- * Activate spark threads (PARALLEL_HASKELL only)
+ * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
  * ------------------------------------------------------------------------- */
 
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void
 scheduleActivateSpark(Capability *cap)
 {
@@ -1017,14 +1040,14 @@ scheduleActivateSpark(Capability *cap)
       createSparkThread(cap,spark); // defined in Sparks.c
     }
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
-#if defined(PARALLEL_HASKELL)
-static rtsBool
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static rtsBool /* return value used in PARALLEL_HASKELL only */
 scheduleGetRemoteWork(Capability *cap)
 {
 #if defined(PARALLEL_HASKELL)
@@ -1062,14 +1085,14 @@ scheduleGetRemoteWork(Capability *cap)
 
 #endif /* PARALLEL_HASKELL */
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * After running a thread...
  * ------------------------------------------------------------------------- */
 
 static void
-schedulePostRunThread (StgTSO *t)
+schedulePostRunThread (Capability *cap, StgTSO *t)
 {
     // We have to be able to catch transactions that are in an
     // infinite loop as a result of seeing an inconsistent view of
@@ -1090,8 +1113,7 @@ schedulePostRunThread (StgTSO *t)
             // ATOMICALLY_FRAME, aborting the (nested)
             // transaction, and saving the stack of any
             // partially-evaluated thunks on the heap.
-            throwToSingleThreaded_(&capabilities[0], t, 
-                                   NULL, rtsTrue, NULL);
+            throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
             
             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
         }
@@ -1179,12 +1201,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
               "--<< thread %ld (%s) stopped: HeapOverflow",
               (long)t->id, whatNext_strs[t->what_next]);
 
-    if (context_switch) {
+    if (cap->context_switch) {
         // Sometimes we miss a context switch, e.g. when calling
         // primitives in a tight loop, MAYBE_GC() doesn't check the
         // context switch flag, and we end up waiting for a GC.
         // See #1984, and concurrent/should_run/1984
-        context_switch = 0;
+        cap->context_switch = 0;
         addToRunQueue(cap,t);
     } else {
         pushOnRunQueue(cap,t);
@@ -1234,7 +1256,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
     // the CPU because the tick always arrives during GC).  This way
     // penalises threads that do a lot of allocation, but that seems
     // better than the alternative.
-    context_switch = 0;
+    cap->context_switch = 0;
     
     /* put the thread back on the run queue.  Then, if we're ready to
      * GC, check whether this is the last task to stop.  If so, wake
@@ -1435,6 +1457,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
        return cap;  // NOTE: task->cap might have changed here
     }
 
+    setContextSwitches();
     for (i=0; i < n_capabilities; i++) {
        debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
        if (cap != &capabilities[i]) {
@@ -1445,7 +1468,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
            // all the Capabilities, but even so it's a slightly
            // unsavoury invariant.
            task->cap = pcap;
-           context_switch = 1;
            waitForReturnCapability(&pcap, task);
            if (pcap != &capabilities[i]) {
                barf("scheduleDoGC: got the wrong capability");
@@ -1489,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
        performHeapProfile = rtsFalse;
     }
 
+#ifdef SPARKBALANCE
+    /* JB 
+       Once we are all together... this would be the place to balance all
+       spark pools. No concurrent stealing or adding of new sparks can
+       occur. Should be defined in Sparks.c. */
+    balanceSparkPoolsCaps(n_capabilities, capabilities);
+#endif
+
 #if defined(THREADED_RTS)
     // release our stash of capabilities.
     for (i = 0; i < n_capabilities; i++) {
@@ -1954,7 +1984,6 @@ initScheduler(void)
 
   blackhole_queue   = END_TSO_QUEUE;
 
-  context_switch = 0;
   sched_state    = SCHED_RUNNING;
   recent_activity = ACTIVITY_YES;
 
@@ -2132,10 +2161,17 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   }
 
   /* Try to double the current stack size.  If that takes us over the
-   * maximum stack size for this thread, then use the maximum instead.
-   * Finally round up so the TSO ends up as a whole number of blocks.
+   * maximum stack size for this thread, then use the maximum instead
+   * (that is, unless we're already at or over the max size and we
+   * can't raise the StackOverflow exception (see above), in which
+   * case just double the size). Finally round up so the TSO ends up as
+   * a whole number of blocks.
    */
-  new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
+  if (tso->stack_size >= tso->max_stack_size) {
+      new_stack_size = tso->stack_size * 2;
+  } else { 
+      new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
+  }
   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
                                       TSO_STRUCT_SIZE)/sizeof(W_);
   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
@@ -2192,7 +2228,7 @@ static StgTSO *
 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
 {
     bdescr *bd, *new_bd;
-    lnat new_tso_size_w, tso_size_w;
+    lnat free_w, tso_size_w;
     StgTSO *new_tso;
 
     tso_size_w = tso_sizeW(tso);
@@ -2207,19 +2243,19 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
     // while we are moving the TSO:
     lockClosure((StgClosure *)tso);
 
-    new_tso_size_w = round_to_mblocks(tso_size_w/2);
-
-    debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
-               (long)tso->id, tso_size_w, new_tso_size_w);
+    // this is the number of words we'll free
+    free_w = round_to_mblocks(tso_size_w/2);
 
     bd = Bdescr((StgPtr)tso);
-    new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
-    new_bd->free = bd->free;
+    new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
     bd->free = bd->start + TSO_STRUCT_SIZEW;
 
     new_tso = (StgTSO *)new_bd->start;
     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
-    new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
+    new_tso->stack_size = new_bd->free - new_tso->stack;
+
+    debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
+               (long)tso->id, tso_size_w, tso_sizeW(new_tso));
 
     tso->what_next = ThreadRelocated;
     tso->_link = new_tso; // no write barrier reqd: same generation
@@ -2247,7 +2283,7 @@ void
 interruptStgRts(void)
 {
     sched_state = SCHED_INTERRUPTING;
-    context_switch = 1;
+    setContextSwitches();
     wakeUpRts();
 }