From: Simon Marlow Date: Mon, 27 Mar 2006 12:41:51 +0000 (+0000) Subject: Add a new primitive forkOn#, for forking a thread on a specific Capability X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=commitdiff_plain;h=c520a3a2752ffcec5710a88a8a2e219c20edfc8a Add a new primitive forkOn#, for forking a thread on a specific Capability This gives some control over affinity, while we figure out the best way to automatically schedule threads to make best use of the available parallelism. In addition to the primitive, there is also: GHC.Conc.forkOnIO :: Int -> IO () -> IO ThreadId where 'forkOnIO i m' creates a thread on Capability (i `rem` N), where N is the number of available Capabilities set by +RTS -N. Threads forked by forkOnIO do not automatically migrate when there are free Capabilities, like normal threads do. Still, if you're using forkOnIO exclusively, it's a good idea to do +RTS -qm to disable work pushing anyway (work pushing takes too much time when the run queues are large, this is something we need to fix). --- diff --git a/ghc/compiler/prelude/primops.txt.pp b/ghc/compiler/prelude/primops.txt.pp index ecde882..13b4b6c 100644 --- a/ghc/compiler/prelude/primops.txt.pp +++ b/ghc/compiler/prelude/primops.txt.pp @@ -1441,6 +1441,13 @@ primop ForkOp "fork#" GenPrimOp has_side_effects = True out_of_line = True +primop ForkOnOp "forkOn#" GenPrimOp + Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #) + with + usage = { mangle ForkOnOp [mkO, mkP] mkR } + has_side_effects = True + out_of_line = True + primop KillThreadOp "killThread#" GenPrimOp ThreadId# -> a -> State# RealWorld -> State# RealWorld with diff --git a/ghc/includes/StgMiscClosures.h b/ghc/includes/StgMiscClosures.h index 62a7ed3..4a6a7c4 100644 --- a/ghc/includes/StgMiscClosures.h +++ b/ghc/includes/StgMiscClosures.h @@ -579,6 +579,7 @@ RTS_FUN(makeStablePtrzh_fast); RTS_FUN(deRefStablePtrzh_fast); RTS_FUN(forkzh_fast); +RTS_FUN(forkOnzh_fast); RTS_FUN(yieldzh_fast); RTS_FUN(killThreadzh_fast); RTS_FUN(blockAsyncExceptionszh_fast); diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index 14c47ab..d096d40 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -93,7 +93,13 @@ typedef StgWord32 StgThreadID; */ #define TSO_DIRTY 1 -#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY) +/* + * TSO_LOCKED is set when a TSO is locked to a particular Capability. + */ +#define TSO_LOCKED 2 + +#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY) +#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED) /* * Type returned after running a thread. Values of this type diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index 23bc22e..f1c214e 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -876,19 +876,45 @@ decodeDoublezh_fast forkzh_fast { /* args: R1 = closure to spark */ - + MAYBE_GC(R1_PTR, forkzh_fast); - // create it right now, return ThreadID in R1 - "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr", + W_ closure; + W_ threadid; + closure = R1; + + "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr", + RtsFlags_GcFlags_initialStkSize(RtsFlags), + closure "ptr") []; + foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") []; + + // switch at the earliest opportunity + CInt[context_switch] = 1 :: CInt; + + RET_P(threadid); +} + +forkOnzh_fast +{ + /* args: R1 = cpu, R2 = closure to spark */ + + MAYBE_GC(R2_PTR, forkOnzh_fast); + + W_ cpu; + W_ closure; + W_ threadid; + cpu = R1; + closure = R2; + + "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr", RtsFlags_GcFlags_initialStkSize(RtsFlags), - R1 "ptr") [R1]; - foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr") [R1]; + closure "ptr") []; + foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") []; // switch at the earliest opportunity CInt[context_switch] = 1 :: CInt; - RET_P(R1); + RET_P(threadid); } yieldzh_fast diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index d49d4ed..52fd4d5 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); +static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); #if defined(GRAN) @@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - cap->run_queue_tl->link = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); - } + scheduleCheckWakeupThreads(cap); scheduleCheckBlockedThreads(cap); @@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS, next = t->link; t->link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task) { // don't move my bound thread + || t->bound == task // don't move my bound thread + || tsoLocked(t)) { // don't move a locked thread prev->link = t; prev = t; } else if (i == n_free_caps) { @@ -928,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) /* ---------------------------------------------------------------------------- + * Check for threads woken up by other Capabilities + * ------------------------------------------------------------------------- */ + +static void +scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + // Any threads that were woken up by other Capabilities get + // appended to our run queue. + if (!emptyWakeupQueue(cap)) { + ACQUIRE_LOCK(&cap->lock); + if (emptyRunQueue(cap)) { + cap->run_queue_hd = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } else { + cap->run_queue_tl->link = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } + cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; + RELEASE_LOCK(&cap->lock); + } +#endif +} + +/* ---------------------------------------------------------------------------- * Check for threads blocked on BLACKHOLEs that can be woken up * ------------------------------------------------------------------------- */ static void @@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso) appendToRunQueue(cap,tso); } +void +scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) +{ +#if defined(THREADED_RTS) + tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't + // move this thread from now on. + cpu %= RtsFlags.ParFlags.nNodes; + if (cpu == cap->no) { + appendToRunQueue(cap,tso); + } else { + Capability *target_cap = &capabilities[cpu]; + if (tso->bound) { + tso->bound->cap = target_cap; + } + tso->cap = target_cap; + wakeupThreadOnCapability(target_cap,tso); + } +#else + appendToRunQueue(cap,tso); +#endif +} + Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { @@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso) next = tso->link; tso->link = END_TSO_QUEUE; - if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) { +#if defined(THREADED_RTS) + if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) { // We are waking up this thread on the current Capability, which // might involve migrating it from the Capability it was last on. if (tso->bound) { @@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso) // we'll try to wake it up on the Capability it was last on. wakeupThreadOnCapability(tso->cap, tso); } +#else + appendToRunQueue(cap,tso); + context_switch = 1; +#endif IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no)); return next; diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index d11162e..37b0794 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -20,9 +20,14 @@ void initScheduler (void); void exitScheduler (void); -// Place a new thread on the run queue of the specified Capability +// Place a new thread on the run queue of the current Capability void scheduleThread (Capability *cap, StgTSO *tso); +// Place a new thread on the run queue of a specified Capability +// (cap is the currently owned Capability, cpu is the number of +// the desired Capability). +void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso); + /* awakenBlockedQueue() * * Takes a pointer to the beginning of a blocked TSO queue, and