Add some more flexibility to the multiproc scheduler
authorSimon Marlow <simonmar@microsoft.com>
Fri, 24 Mar 2006 16:45:32 +0000 (16:45 +0000)
committerSimon Marlow <simonmar@microsoft.com>
Fri, 24 Mar 2006 16:45:32 +0000 (16:45 +0000)
There are two new options in the -threaded RTS:

  -qm       Don't automatically migrate threads between CPUs
  -qw       Migrate a thread to the current CPU when it is woken up

previously both of these were effectively off, i.e. threads were
migrated between CPUs willy-milly, and threads were always migrated to
the current CPU when woken up.  This is the first step in tweaking the
scheduling for more effective work balancing, there will no doubt be
more to come.

ghc/includes/RtsFlags.h
ghc/includes/TSO.h
ghc/rts/Apply.cmm
ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Exception.cmm
ghc/rts/RtsFlags.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h

index c997b22..17d2363 100644 (file)
@@ -164,6 +164,8 @@ struct PAR_FLAGS {
 #ifdef THREADED_RTS
 struct PAR_FLAGS {
   nat            nNodes;         /* number of threads to run simultaneously */
+  rtsBool        migrate;        /* migrate threads between capabilities */
+  rtsBool        wakeupMigrate;  /* migrate a thread on wakeup */
   unsigned int  maxLocalSparks;
 };
 #endif /* THREADED_RTS */
index 82ecacb..14c47ab 100644 (file)
@@ -93,6 +93,8 @@ typedef StgWord32 StgThreadID;
  */
 #define TSO_DIRTY   1
 
+#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
+
 /*
  * Type returned after running a thread.  Values of this type
  * include HeapOverflow, StackOverflow etc.  See Constants.h for the
@@ -134,43 +136,44 @@ typedef union {
  */
 
 typedef struct StgTSO_ {
-  StgHeader          header;
-
-  struct StgTSO_*    link;          /* Links threads onto blocking queues */
-  struct StgTSO_*    global_link;    /* Links all threads together */
-  
-  StgWord16          what_next;      /* Values defined in Constants.h */
-  StgWord16          why_blocked;    /* Values defined in Constants.h */
-  StgWord32          flags;
-  StgTSOBlockInfo    block_info;
-  struct StgTSO_*    blocked_exceptions;
-  StgThreadID        id;
-  int                saved_errno;
-  struct Task_*      bound;          // non-NULL for a bound thread
-  struct StgTRecHeader_ *trec;       /* STM transaction record */
-  
+    StgHeader               header;
+
+    struct StgTSO_*         link;       /* Links threads onto blocking queues */
+    struct StgTSO_*         global_link;    /* Links all threads together */
+    
+    StgWord16               what_next;      /* Values defined in Constants.h */
+    StgWord16               why_blocked;    /* Values defined in Constants.h */
+    StgWord32               flags;
+    StgTSOBlockInfo         block_info;
+    struct StgTSO_*         blocked_exceptions;
+    StgThreadID             id;
+    int                     saved_errno;
+    struct Task_*           bound;
+    struct Capability_*     cap;
+    struct StgTRecHeader_ * trec;       /* STM transaction record */
+
 #ifdef TICKY_TICKY
-  /* TICKY-specific stuff would go here. */
+    /* TICKY-specific stuff would go here. */
 #endif
 #ifdef PROFILING
-   StgTSOProfInfo prof;
+    StgTSOProfInfo prof;
 #endif
 #ifdef PAR
-   StgTSOParInfo par;
+    StgTSOParInfo par;
 #endif
 #ifdef GRAN
-   StgTSOGranInfo gran;
+    StgTSOGranInfo gran;
 #endif
 #ifdef DIST
-   StgTSODistInfo dist;
+    StgTSODistInfo dist;
 #endif
 
-  /* The thread stack... */
-  StgWord           stack_size;     /* stack size in *words* */
-  StgWord            max_stack_size; /* maximum stack size in *words* */
-  StgPtr             sp;
-  
-  StgWord            stack[FLEXIBLE_ARRAY];
+    /* The thread stack... */
+    StgWord32         stack_size;     /* stack size in *words* */
+    StgWord32          max_stack_size; /* maximum stack size in *words* */
+    StgPtr             sp;
+    
+    StgWord            stack[FLEXIBLE_ARRAY];
 } StgTSO;
 
 /* -----------------------------------------------------------------------------
index 6678a63..e0ca039 100644 (file)
@@ -32,7 +32,7 @@ stg_ap_0_fast
     IF_DEBUG(sanity,
        foreign "C" checkStackChunk(Sp "ptr",
                                    CurrentTSO + TSO_OFFSET_StgTSO_stack +
-                                   WDS(StgTSO_stack_size(CurrentTSO)) "ptr") [R1]);
+                                   WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) "ptr") [R1]);
 
     ENTER();
 }
index 143eefe..51a42ef 100644 (file)
@@ -67,7 +67,9 @@ anyWorkForMe( Capability *cap, Task *task )
        // other global condition to check, such as threads blocked on
        // blackholes).
        if (emptyRunQueue(cap)) {
-           return !emptySparkPoolCap(cap) || globalWorkToDo();
+           return !emptySparkPoolCap(cap)
+               || !emptyWakeupQueue(cap)
+               || globalWorkToDo();
        } else
            return cap->run_queue_hd->bound == NULL;
     }
@@ -135,6 +137,8 @@ initCapability( Capability *cap, nat i )
     cap->suspended_ccalling_tasks = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
+    cap->wakeup_queue_hd    = END_TSO_QUEUE;
+    cap->wakeup_queue_tl    = END_TSO_QUEUE;
 #endif
 
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
@@ -296,7 +300,8 @@ releaseCapability_ (Capability* cap)
 
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
-    if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+    if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
+             || !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
            // The worker Task pops itself from the queue;
@@ -502,6 +507,37 @@ yieldCapability (Capability** pCap, Task *task)
 }
 
 /* ----------------------------------------------------------------------------
+ * Wake up a thread on a Capability.
+ *
+ * This is used when the current Task is running on a Capability and
+ * wishes to wake up a thread on a different Capability.
+ * ------------------------------------------------------------------------- */
+
+void
+wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+{
+    ASSERT(tso->cap == cap);
+    ASSERT(tso->bound ? tso->bound->cap == cap : 1);
+
+    ACQUIRE_LOCK(&cap->lock);
+    if (cap->running_task == NULL) {
+       // nobody is running this Capability, we can add our thread
+       // directly onto the run queue and start up a Task to run it.
+       appendToRunQueue(cap,tso);
+
+       // start it up
+       cap->running_task = myTask(); // precond for releaseCapability_()
+       releaseCapability_(cap);
+    } else {
+       appendToWakeupQueue(cap,tso);
+       // someone is running on this Capability, so it cannot be
+       // freed without first checking the wakeup queue (see
+       // releaseCapability_).
+    }
+    RELEASE_LOCK(&cap->lock);
+}
+
+/* ----------------------------------------------------------------------------
  * prodCapabilities
  *
  * Used to indicate that the interrupted flag is now set, or some
index 50e0b94..a2551d0 100644 (file)
@@ -70,7 +70,7 @@ struct Capability_ {
     // Worker Tasks waiting in the wings.  Singly-linked.
     Task *spare_workers;
 
-    // This lock protects running_task and returning_tasks_{hd,tl}.
+    // This lock protects running_task, returning_tasks_{hd,tl}, wakeup_queue.
     Mutex lock;
 
     // Tasks waiting to return from a foreign call, or waiting to make
@@ -80,6 +80,12 @@ struct Capability_ {
     // check whether it is NULL without taking the lock, however.
     Task *returning_tasks_hd; // Singly-linked, with head/tail
     Task *returning_tasks_tl;
+
+    // A list of threads to append to this Capability's run queue at
+    // the earliest opportunity.  These are threads that have been
+    // woken up by another Capability.
+    StgTSO *wakeup_queue_hd;
+    StgTSO *wakeup_queue_tl;
 #endif
 
     // Per-capability STM-related data
@@ -189,6 +195,11 @@ void yieldCapability (Capability** pCap, Task *task);
 //
 void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
 
+// Wakes up a thread on a Capability (probably a different Capability
+// from the one held by the current Task).
+//
+void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
+
 // Wakes up a worker thread on just one Capability, used when we
 // need to service some global event.
 //
index 4bb9e48..b5c2962 100644 (file)
@@ -380,7 +380,7 @@ retry_pop_stack:
         * entry code in StgStartup.cmm.
         */
        Sp = CurrentTSO + TSO_OFFSET_StgTSO_stack 
-               + WDS(StgTSO_stack_size(CurrentTSO)) - WDS(2);
+               + WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) - WDS(2);
        Sp(1) = R1;             /* save the exception */
        Sp(0) = stg_enter_info; /* so that GC can traverse this stack */
        StgTSO_what_next(CurrentTSO) = ThreadKilled::I16;
index f24912f..0f83b33 100644 (file)
@@ -219,6 +219,8 @@ void initRtsFlagsDefaults(void)
 
 #ifdef THREADED_RTS
     RtsFlags.ParFlags.nNodes           = 1;
+    RtsFlags.ParFlags.migrate           = rtsTrue;
+    RtsFlags.ParFlags.wakeupMigrate     = rtsFalse;
 #endif
 
 #ifdef PAR
@@ -437,6 +439,8 @@ usage_text[] = {
 #endif /* DEBUG */
 #if defined(THREADED_RTS)
 "  -N<n>     Use <n> OS threads (default: 1)",
+"  -qm       Don't automatically migrate threads between CPUs",
+"  -qw       Migrate a thread to the current CPU when it is woken up",
 #endif
 #if defined(THREADED_RTS) || defined(PAR)
 "  -e<size>  Size of spark pools (default 100)",
@@ -1049,6 +1053,25 @@ error = rtsTrue;
                    }
                }
                ) break;
+
+           case 'q':
+                   switch (rts_argv[arg][2]) {
+                   case '\0':
+                       errorBelch("incomplete RTS option: %s",rts_argv[arg]);
+                       error = rtsTrue;
+                       break;
+                   case 'm':
+                       RtsFlags.ParFlags.migrate = rtsFalse;
+                       break;
+                   case 'w':
+                       RtsFlags.ParFlags.wakeupMigrate = rtsTrue;
+                       break;
+                   default:
+                       errorBelch("unknown RTS option: %s",rts_argv[arg]);
+                       error = rtsTrue;
+                       break;
+                   }
+                   break;
 #endif
              /* =========== PARALLEL =========================== */
              case 'e':
@@ -1063,10 +1086,12 @@ error = rtsTrue;
                }
                ) break;
 
+#ifdef PAR
              case 'q':
                PAR_BUILD_ONLY(
                  process_par_option(arg, rts_argc, rts_argv, &error);
                ) break;
+#endif
 
              /* =========== GRAN =============================== */
 
index 39aa5e2..d49d4ed 100644 (file)
@@ -482,6 +482,21 @@ schedule (Capability *initialCapability, Task *task)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
+    // Any threads that were woken up by other Capabilities get
+    // appended to our run queue.
+    if (!emptyWakeupQueue(cap)) {
+       ACQUIRE_LOCK(&cap->lock);
+       if (emptyRunQueue(cap)) {
+           cap->run_queue_hd = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       } else {
+           cap->run_queue_tl->link = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       }
+       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+       RELEASE_LOCK(&cap->lock);
+    }
+
     scheduleCheckBlockedThreads(cap);
 
     scheduleDetectDeadlock(cap,task);
@@ -604,6 +619,7 @@ run_thread:
     // Run the current thread 
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    ASSERT(t->cap == cap);
 
     prev_what_next = t->what_next;
 
@@ -674,6 +690,7 @@ run_thread:
 #endif
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    ASSERT(t->cap == cap);
 
     // ----------------------------------------------------------------------
     
@@ -772,6 +789,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
+    // migration can be turned off with +RTS -qg
+    if (!RtsFlags.ParFlags.migrate) return;
+
     // Check whether we have more threads on our run queue, or sparks
     // in our pool, that we could hand to another Capability.
     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
@@ -834,6 +854,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
                    appendToRunQueue(free_caps[i],t);
                    if (t->bound) { t->bound->cap = free_caps[i]; }
+                   t->cap = free_caps[i];
                    i++;
                }
            }
@@ -2491,6 +2512,7 @@ createThread(Capability *cap, nat size)
     
     tso->saved_errno = 0;
     tso->bound = NULL;
+    tso->cap = cap;
     
     tso->stack_size     = stack_size;
     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
@@ -2698,6 +2720,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     // This TSO is now a bound thread; make the Task and TSO
     // point to each other.
     tso->bound = task;
+    tso->cap = cap;
 
     task->tso = tso;
     task->ret = ret;
@@ -2905,16 +2928,21 @@ GetRoots( evac_fn evac )
 
     for (i = 0; i < n_capabilities; i++) {
        cap = &capabilities[i];
-       evac((StgClosure **)&cap->run_queue_hd);
-       evac((StgClosure **)&cap->run_queue_tl);
-       
+       evac((StgClosure **)(void *)&cap->run_queue_hd);
+       evac((StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+       evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
+       evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
        for (task = cap->suspended_ccalling_tasks; task != NULL; 
             task=task->next) {
            IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
-           evac((StgClosure **)&task->suspended_tso);
+           evac((StgClosure **)(void *)&task->suspended_tso);
        }
+
     }
     
+
 #if !defined(THREADED_RTS)
     evac((StgClosure **)(void *)&blocked_queue_hd);
     evac((StgClosure **)(void *)&blocked_queue_tl);
@@ -3211,21 +3239,29 @@ unblockOne(Capability *cap, StgTSO *tso)
 
   ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
+
   tso->why_blocked = NotBlocked;
   next = tso->link;
   tso->link = END_TSO_QUEUE;
 
-  // We might have just migrated this TSO to our Capability:
-  if (tso->bound) {
-      tso->bound->cap = cap;
+  if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+      // We are waking up this thread on the current Capability, which
+      // might involve migrating it from the Capability it was last on.
+      if (tso->bound) {
+         ASSERT(tso->bound->cap == tso->cap);
+         tso->bound->cap = cap;
+      }
+      tso->cap = cap;
+      appendToRunQueue(cap,tso);
+      // we're holding a newly woken thread, make sure we context switch
+      // quickly so we can migrate it if necessary.
+      context_switch = 1;
+  } else {
+      // we'll try to wake it up on the Capability it was last on.
+      wakeupThreadOnCapability(tso->cap, tso);
   }
 
-  appendToRunQueue(cap,tso);
-
-  // we're holding a newly woken thread, make sure we context switch
-  // quickly so we can migrate it if necessary.
-  context_switch = 1;
-  IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
+  IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
   return next;
 }
 
@@ -3675,6 +3711,7 @@ unblockThread(Capability *cap, StgTSO *tso)
   if (tso->bound) {
       tso->bound->cap = cap;
   }
+  tso->cap = cap;
 }
 #endif
 
@@ -4171,13 +4208,8 @@ resurrectThreads (StgTSO *threads)
        all_threads = tso;
        IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
        
-       // Wake up the thread on the Capability it was last on for a
-       // bound thread, or last_free_capability otherwise.
-       if (tso->bound) {
-           cap = tso->bound->cap;
-       } else {
-           cap = last_free_capability;
-       }
+       // Wake up the thread on the Capability it was last on
+       cap = tso->cap;
        
        switch (tso->why_blocked) {
        case BlockedOnMVar:
index 63dfeb7..d11162e 100644 (file)
@@ -259,6 +259,20 @@ appendToBlockedQueue(StgTSO *tso)
 }
 #endif
 
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+appendToWakeupQueue (Capability *cap, StgTSO *tso)
+{
+    ASSERT(tso->link == END_TSO_QUEUE);
+    if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
+       cap->wakeup_queue_hd = tso;
+    } else {
+       cap->wakeup_queue_tl->link = tso;
+    }
+    cap->wakeup_queue_tl = tso;
+}
+#endif
+
 /* Check whether various thread queues are empty
  */
 STATIC_INLINE rtsBool
@@ -273,6 +287,14 @@ emptyRunQueue(Capability *cap)
     return emptyQueue(cap->run_queue_hd);
 }
 
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+emptyWakeupQueue(Capability *cap)
+{
+    return emptyQueue(cap->wakeup_queue_hd);
+}
+#endif
+
 #if !defined(THREADED_RTS)
 #define EMPTY_BLOCKED_QUEUE()  (emptyQueue(blocked_queue_hd))
 #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))