Add a new primitive forkOn#, for forking a thread on a specific Capability
authorSimon Marlow <simonmar@microsoft.com>
Mon, 27 Mar 2006 12:41:51 +0000 (12:41 +0000)
committerSimon Marlow <simonmar@microsoft.com>
Mon, 27 Mar 2006 12:41:51 +0000 (12:41 +0000)
This gives some control over affinity, while we figure out the best
way to automatically schedule threads to make best use of the
available parallelism.

In addition to the primitive, there is also:

  GHC.Conc.forkOnIO :: Int -> IO () -> IO ThreadId

where 'forkOnIO i m' creates a thread on Capability (i `rem` N), where
N is the number of available Capabilities set by +RTS -N.

Threads forked by forkOnIO do not automatically migrate when there are
free Capabilities, like normal threads do.  Still, if you're using
forkOnIO exclusively, it's a good idea to do +RTS -qm to disable work
pushing anyway (work pushing takes too much time when the run queues
are large, this is something we need to fix).

ghc/compiler/prelude/primops.txt.pp
ghc/includes/StgMiscClosures.h
ghc/includes/TSO.h
ghc/rts/PrimOps.cmm
ghc/rts/Schedule.c
ghc/rts/Schedule.h

index ecde882..13b4b6c 100644 (file)
@@ -1441,6 +1441,13 @@ primop  ForkOp "fork#" GenPrimOp
    has_side_effects = True
    out_of_line      = True
 
+primop  ForkOnOp "forkOn#" GenPrimOp
+   Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
+   with
+   usage            = { mangle ForkOnOp [mkO, mkP] mkR }
+   has_side_effects = True
+   out_of_line      = True
+
 primop  KillThreadOp "killThread#"  GenPrimOp
    ThreadId# -> a -> State# RealWorld -> State# RealWorld
    with
index 62a7ed3..4a6a7c4 100644 (file)
@@ -579,6 +579,7 @@ RTS_FUN(makeStablePtrzh_fast);
 RTS_FUN(deRefStablePtrzh_fast);
 
 RTS_FUN(forkzh_fast);
+RTS_FUN(forkOnzh_fast);
 RTS_FUN(yieldzh_fast);
 RTS_FUN(killThreadzh_fast);
 RTS_FUN(blockAsyncExceptionszh_fast);
index 14c47ab..d096d40 100644 (file)
@@ -93,7 +93,13 @@ typedef StgWord32 StgThreadID;
  */
 #define TSO_DIRTY   1
 
-#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
+/*
+ * TSO_LOCKED is set when a TSO is locked to a particular Capability.
+ */
+#define TSO_LOCKED  2
+
+#define tsoDirty(tso)  ((tso)->flags & TSO_DIRTY)
+#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED)
 
 /*
  * Type returned after running a thread.  Values of this type
index 23bc22e..f1c214e 100644 (file)
@@ -876,19 +876,45 @@ decodeDoublezh_fast
 forkzh_fast
 {
   /* args: R1 = closure to spark */
-  
+
   MAYBE_GC(R1_PTR, forkzh_fast);
 
-  // create it right now, return ThreadID in R1
-  "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr", 
+  W_ closure;
+  W_ threadid;
+  closure = R1;
+
+  "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr", 
+                               RtsFlags_GcFlags_initialStkSize(RtsFlags), 
+                               closure "ptr") [];
+  foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
+
+  // switch at the earliest opportunity
+  CInt[context_switch] = 1 :: CInt;
+  
+  RET_P(threadid);
+}
+
+forkOnzh_fast
+{
+  /* args: R1 = cpu, R2 = closure to spark */
+
+  MAYBE_GC(R2_PTR, forkOnzh_fast);
+
+  W_ cpu;
+  W_ closure;
+  W_ threadid;
+  cpu = R1;
+  closure = R2;
+
+  "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr", 
                                RtsFlags_GcFlags_initialStkSize(RtsFlags), 
-                               R1 "ptr") [R1];
-  foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr") [R1];
+                               closure "ptr") [];
+  foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
 
   // switch at the earliest opportunity
   CInt[context_switch] = 1 :: CInt;
   
-  RET_P(R1);
+  RET_P(threadid);
 }
 
 yieldzh_fast
index d49d4ed..52fd4d5 100644 (file)
@@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
+static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 #if defined(GRAN)
@@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
-    // Any threads that were woken up by other Capabilities get
-    // appended to our run queue.
-    if (!emptyWakeupQueue(cap)) {
-       ACQUIRE_LOCK(&cap->lock);
-       if (emptyRunQueue(cap)) {
-           cap->run_queue_hd = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       } else {
-           cap->run_queue_tl->link = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       }
-       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
-       RELEASE_LOCK(&cap->lock);
-    }
+    scheduleCheckWakeupThreads(cap);
 
     scheduleCheckBlockedThreads(cap);
 
@@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                next = t->link;
                t->link = END_TSO_QUEUE;
                if (t->what_next == ThreadRelocated
-                   || t->bound == task) { // don't move my bound thread
+                   || t->bound == task // don't move my bound thread
+                   || tsoLocked(t)) {  // don't move a locked thread
                    prev->link = t;
                    prev = t;
                } else if (i == n_free_caps) {
@@ -928,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
 
 
 /* ----------------------------------------------------------------------------
+ * Check for threads woken up by other Capabilities
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    // Any threads that were woken up by other Capabilities get
+    // appended to our run queue.
+    if (!emptyWakeupQueue(cap)) {
+       ACQUIRE_LOCK(&cap->lock);
+       if (emptyRunQueue(cap)) {
+           cap->run_queue_hd = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       } else {
+           cap->run_queue_tl->link = cap->wakeup_queue_hd;
+           cap->run_queue_tl = cap->wakeup_queue_tl;
+       }
+       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+       RELEASE_LOCK(&cap->lock);
+    }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
  * Check for threads blocked on BLACKHOLEs that can be woken up
  * ------------------------------------------------------------------------- */
 static void
@@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso)
     appendToRunQueue(cap,tso);
 }
 
+void
+scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
+{
+#if defined(THREADED_RTS)
+    tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
+                             // move this thread from now on.
+    cpu %= RtsFlags.ParFlags.nNodes;
+    if (cpu == cap->no) {
+       appendToRunQueue(cap,tso);
+    } else {
+       Capability *target_cap = &capabilities[cpu];
+       if (tso->bound) {
+           tso->bound->cap = target_cap;
+       }
+       tso->cap = target_cap;
+       wakeupThreadOnCapability(target_cap,tso);
+    }
+#else
+    appendToRunQueue(cap,tso);
+#endif
+}
+
 Capability *
 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
 {
@@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso)
   next = tso->link;
   tso->link = END_TSO_QUEUE;
 
-  if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+#if defined(THREADED_RTS)
+  if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
       // We are waking up this thread on the current Capability, which
       // might involve migrating it from the Capability it was last on.
       if (tso->bound) {
@@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso)
       // we'll try to wake it up on the Capability it was last on.
       wakeupThreadOnCapability(tso->cap, tso);
   }
+#else
+  appendToRunQueue(cap,tso);
+  context_switch = 1;
+#endif
 
   IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
   return next;
index d11162e..37b0794 100644 (file)
 void initScheduler (void);
 void exitScheduler (void);
 
-// Place a new thread on the run queue of the specified Capability
+// Place a new thread on the run queue of the current Capability
 void scheduleThread (Capability *cap, StgTSO *tso);
 
+// Place a new thread on the run queue of a specified Capability
+// (cap is the currently owned Capability, cpu is the number of
+// the desired Capability).
+void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
+
 /* awakenBlockedQueue()
  *
  * Takes a pointer to the beginning of a blocked TSO queue, and