Add a new primitive forkOn#, for forking a thread on a specific Capability
[ghc-hetmet.git] / ghc / rts / Schedule.c
index d49d4ed..52fd4d5 100644 (file)
@@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
+static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 #if defined(GRAN)
@@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
-    // Any threads that were woken up by other Capabilities get
-    // appended to our run queue.
-    if (!emptyWakeupQueue(cap)) {
-       ACQUIRE_LOCK(&cap->lock);
-       if (emptyRunQueue(cap)) {
-           cap->run_queue_hd = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       } else {
-           cap->run_queue_tl->link = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       }
-       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
-       RELEASE_LOCK(&cap->lock);
-    }
+    scheduleCheckWakeupThreads(cap);
 
     scheduleCheckBlockedThreads(cap);
 
@@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                next = t->link;
                t->link = END_TSO_QUEUE;
                if (t->what_next == ThreadRelocated
-                   || t->bound == task) { // don't move my bound thread
+                   || t->bound == task // don't move my bound thread
+                   || tsoLocked(t)) {  // don't move a locked thread
                    prev->link = t;
                    prev = t;
                } else if (i == n_free_caps) {
@@ -928,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
 
 
 /* ----------------------------------------------------------------------------
+ * Check for threads woken up by other Capabilities
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    // Any threads that were woken up by other Capabilities get
+    // appended to our run queue.
+    if (!emptyWakeupQueue(cap)) {
+       ACQUIRE_LOCK(&cap->lock);
+       if (emptyRunQueue(cap)) {
+           cap->run_queue_hd = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       } else {
+           cap->run_queue_tl->link = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       }
+       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+       RELEASE_LOCK(&cap->lock);
+    }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
  * Check for threads blocked on BLACKHOLEs that can be woken up
  * ------------------------------------------------------------------------- */
 static void
@@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso)
     appendToRunQueue(cap,tso);
 }
 
+void
+scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
+{
+#if defined(THREADED_RTS)
+    tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
+                             // move this thread from now on.
+    cpu %= RtsFlags.ParFlags.nNodes;
+    if (cpu == cap->no) {
+       appendToRunQueue(cap,tso);
+    } else {
+       Capability *target_cap = &capabilities[cpu];
+       if (tso->bound) {
+           tso->bound->cap = target_cap;
+       }
+       tso->cap = target_cap;
+       wakeupThreadOnCapability(target_cap,tso);
+    }
+#else
+    appendToRunQueue(cap,tso);
+#endif
+}
+
 Capability *
 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
 {
@@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso)
   next = tso->link;
   tso->link = END_TSO_QUEUE;
 
-  if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+#if defined(THREADED_RTS)
+  if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
       // We are waking up this thread on the current Capability, which
       // might involve migrating it from the Capability it was last on.
       if (tso->bound) {
@@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso)
       // we'll try to wake it up on the Capability it was last on.
       wakeupThreadOnCapability(tso->cap, tso);
   }
+#else
+  appendToRunQueue(cap,tso);
+  context_switch = 1;
+#endif
 
   IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
   return next;