Add some more flexibility to the multiproc scheduler
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 39aa5e2..d49d4ed 100644 (file)
@@ -482,6 +482,21 @@ schedule (Capability *initialCapability, Task *task)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
+    // Any threads that were woken up by other Capabilities get
+    // appended to our run queue.
+    if (!emptyWakeupQueue(cap)) {
+       ACQUIRE_LOCK(&cap->lock);
+       if (emptyRunQueue(cap)) {
+           cap->run_queue_hd = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       } else {
+           cap->run_queue_tl->link = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       }
+       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+       RELEASE_LOCK(&cap->lock);
+    }
+
     scheduleCheckBlockedThreads(cap);
 
     scheduleDetectDeadlock(cap,task);
@@ -604,6 +619,7 @@ run_thread:
     // Run the current thread 
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    ASSERT(t->cap == cap);
 
     prev_what_next = t->what_next;
 
@@ -674,6 +690,7 @@ run_thread:
 #endif
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    ASSERT(t->cap == cap);
 
     // ----------------------------------------------------------------------
     
@@ -772,6 +789,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
+    // migration can be turned off with +RTS -qg
+    if (!RtsFlags.ParFlags.migrate) return;
+
     // Check whether we have more threads on our run queue, or sparks
     // in our pool, that we could hand to another Capability.
     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
@@ -834,6 +854,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
                    appendToRunQueue(free_caps[i],t);
                    if (t->bound) { t->bound->cap = free_caps[i]; }
+                   t->cap = free_caps[i];
                    i++;
                }
            }
@@ -2491,6 +2512,7 @@ createThread(Capability *cap, nat size)
     
     tso->saved_errno = 0;
     tso->bound = NULL;
+    tso->cap = cap;
     
     tso->stack_size     = stack_size;
     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
@@ -2698,6 +2720,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     // This TSO is now a bound thread; make the Task and TSO
     // point to each other.
     tso->bound = task;
+    tso->cap = cap;
 
     task->tso = tso;
     task->ret = ret;
@@ -2905,16 +2928,21 @@ GetRoots( evac_fn evac )
 
     for (i = 0; i < n_capabilities; i++) {
        cap = &capabilities[i];
-       evac((StgClosure **)&cap->run_queue_hd);
-       evac((StgClosure **)&cap->run_queue_tl);
-       
+       evac((StgClosure **)(void *)&cap->run_queue_hd);
+       evac((StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+       evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
+       evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
        for (task = cap->suspended_ccalling_tasks; task != NULL; 
             task=task->next) {
            IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
-           evac((StgClosure **)&task->suspended_tso);
+           evac((StgClosure **)(void *)&task->suspended_tso);
        }
+
     }
     
+
 #if !defined(THREADED_RTS)
     evac((StgClosure **)(void *)&blocked_queue_hd);
     evac((StgClosure **)(void *)&blocked_queue_tl);
@@ -3211,21 +3239,29 @@ unblockOne(Capability *cap, StgTSO *tso)
 
   ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
+
   tso->why_blocked = NotBlocked;
   next = tso->link;
   tso->link = END_TSO_QUEUE;
 
-  // We might have just migrated this TSO to our Capability:
-  if (tso->bound) {
-      tso->bound->cap = cap;
+  if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+      // We are waking up this thread on the current Capability, which
+      // might involve migrating it from the Capability it was last on.
+      if (tso->bound) {
+         ASSERT(tso->bound->cap == tso->cap);
+         tso->bound->cap = cap;
+      }
+      tso->cap = cap;
+      appendToRunQueue(cap,tso);
+      // we're holding a newly woken thread, make sure we context switch
+      // quickly so we can migrate it if necessary.
+      context_switch = 1;
+  } else {
+      // we'll try to wake it up on the Capability it was last on.
+      wakeupThreadOnCapability(tso->cap, tso);
   }
 
-  appendToRunQueue(cap,tso);
-
-  // we're holding a newly woken thread, make sure we context switch
-  // quickly so we can migrate it if necessary.
-  context_switch = 1;
-  IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
+  IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
   return next;
 }
 
@@ -3675,6 +3711,7 @@ unblockThread(Capability *cap, StgTSO *tso)
   if (tso->bound) {
       tso->bound->cap = cap;
   }
+  tso->cap = cap;
 }
 #endif
 
@@ -4171,13 +4208,8 @@ resurrectThreads (StgTSO *threads)
        all_threads = tso;
        IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
        
-       // Wake up the thread on the Capability it was last on for a
-       // bound thread, or last_free_capability otherwise.
-       if (tso->bound) {
-           cap = tso->bound->cap;
-       } else {
-           cap = last_free_capability;
-       }
+       // Wake up the thread on the Capability it was last on
+       cap = tso->cap;
        
        switch (tso->why_blocked) {
        case BlockedOnMVar: