X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=09150fd8b51389e4deafc0c6282539a77554b484;hb=cf9650f2a1690c04051c716124bb0350adc74ae7;hp=a41dd676d41a7c32c22cae82a0424e4a2b8341b0;hpb=9f076a02084843d54bcb6d8c63c443e9af820b67;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index a41dd67..09150fd 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -89,11 +89,6 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; -/* flag set by signal handler to precipitate a context switch - * LOCK: none (just an advisory flag) - */ -int context_switch = 0; - /* flag that tracks whether we have done any execution in this time slice. * LOCK: currently none, perhaps we should lock (but needs to be * updated in the fast path of the scheduler). @@ -142,20 +137,20 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); -#if defined(THREADED_RTS) -static void schedulePushWork(Capability *cap, Task *task); -#endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +static void schedulePushWork(Capability *cap, Task *task); static rtsBool scheduleGetRemoteWork(Capability *cap); +#if defined(PARALLEL_HASKELL) static void scheduleSendPendingMessages(void); +#endif static void scheduleActivateSpark(Capability *cap); #endif -static void schedulePostRunThread(StgTSO *t); +static void schedulePostRunThread(Capability *cap, StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); static void scheduleHandleStackOverflow( Capability *cap, Task *task, StgTSO *t); @@ -296,13 +291,15 @@ schedule (Capability *initialCapability, Task *task) } else { // Yield the capability to higher-priority tasks if necessary. yieldCapability(&cap, task); + /* inside yieldCapability, attempts to steal work from other + capabilities, unless the capability has own work. + See (REMARK) below. + */ } #endif - -#if defined(THREADED_RTS) - schedulePushWork(cap,task); -#endif + /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */ + // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -370,21 +367,7 @@ schedule (Capability *initialCapability, Task *task) barf("sched_state: %d", sched_state); } -#if defined(THREADED_RTS) - // If the run queue is empty, take a spark and turn it into a thread. - { - if (emptyRunQueue(cap)) { - StgClosure *spark; - spark = findSpark(cap); - if (spark != NULL) { - debugTrace(DEBUG_sched, - "turning spark of closure %p into a thread", - (StgClosure *)spark); - createSparkThread(cap,spark); - } - } - } -#endif // THREADED_RTS + /* this was the place to activate a spark, now below... */ scheduleStartSignalHandlers(cap); @@ -398,11 +381,19 @@ schedule (Capability *initialCapability, Task *task) scheduleCheckBlockedThreads(cap); -#if defined(PARALLEL_HASKELL) - /* message processing and work distribution goes here */ +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + /* work distribution in multithreaded and parallel systems + REMARK: IMHO best location for work-stealing as well. + tests above might yield some new jobs, so no need to steal a + spark in some cases. I believe the yieldCapability.. above + should be moved here. + */ + +#if defined(PARALLEL_HASKELL) /* if messages have been buffered... a NOOP in THREADED_RTS */ scheduleSendPendingMessages(); +#endif /* If the run queue is empty,...*/ if (emptyRunQueue(cap)) { @@ -411,6 +402,7 @@ schedule (Capability *initialCapability, Task *task) /* if this did not work, try to steal a spark from someone else */ if (emptyRunQueue(cap)) { +#if defined(PARALLEL_HASKELL) receivedFinish = scheduleGetRemoteWork(cap); continue; // a new round, (hopefully) with new work /* @@ -419,10 +411,20 @@ schedule (Capability *initialCapability, Task *task) b) (blocking) awaits and receives messages in Eden, this is only the blocking receive, as b) in GUM. + + in Threaded-RTS, this does plain nothing. Stealing routine + is inside Capability.c and called from + yieldCapability() at the very beginning, see REMARK. */ +#endif } - } + } else { /* i.e. run queue was (initially) not empty */ + schedulePushWork(cap,task); + /* work pushing, currently relevant only for THREADED_RTS: + (pushes threads, wakes up idle capabilities for stealing) */ + } +#if defined(PARALLEL_HASKELL) /* since we perform a blocking receive and continue otherwise, either we never reach here or we definitely have work! */ // from here: non-empty run queue @@ -435,7 +437,9 @@ schedule (Capability *initialCapability, Task *task) above, waits for messages as well! */ processMessages(cap, &receivedFinish); } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL: non-empty run queue! + +#endif /* THREADED_RTS || PARALLEL_HASKELL */ scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) @@ -504,7 +508,7 @@ schedule (Capability *initialCapability, Task *task) */ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 && !emptyThreadQueues(cap)) { - context_switch = 1; + cap->context_switch = 1; } run_thread: @@ -627,7 +631,7 @@ run_thread: CCCS = CCS_SYSTEM; #endif - schedulePostRunThread(t); + schedulePostRunThread(cap,t); t = threadStackUnderflow(task,t); @@ -684,11 +688,15 @@ schedulePreLoop(void) * Push work to other Capabilities if we have some. * -------------------------------------------------------------------------- */ -#if defined(THREADED_RTS) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) static void schedulePushWork(Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS) { + /* following code not for PARALLEL_HASKELL. I kept the call general, + future GUM versions might use pushing in a distributed setup */ +#if defined(THREADED_RTS) + Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; @@ -731,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS, StgTSO *prev, *t, *next; rtsBool pushed_to_all; - debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps); + debugTrace(DEBUG_sched, + "cap %d: %s and %d free capabilities, sharing...", + cap->no, + (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)? + "excess threads on run queue":"sparks to share (>=2)", + n_free_caps); i = 0; pushed_to_all = rtsFalse; @@ -765,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, cap->run_queue_tl = prev; } +#ifdef SPARK_PUSHING + /* JB I left this code in place, it would work but is not necessary */ + // If there are some free capabilities that we didn't push any // threads to, then try to push a spark to each one. if (!pushed_to_all) { @@ -780,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } } } +#endif /* SPARK_PUSHING */ // release the capabilities for (i = 0; i < n_free_caps; i++) { task->cap = free_caps[i]; releaseCapability(free_caps[i]); } + // now wake them all up, and they might steal sparks if + // the did not get a thread + prodAllCapabilities(); } task->cap = cap; // reset to point to our Capability. + +#endif /* THREADED_RTS */ + } -#endif +#endif /* THREADED_RTS || PARALLEL_HASKELL */ /* ---------------------------------------------------------------------------- * Start any pending signal handlers @@ -970,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) * ------------------------------------------------------------------------- */ #if defined(PARALLEL_HASKELL) -static StgTSO * +static void scheduleSendPendingMessages(void) { @@ -989,10 +1012,10 @@ scheduleSendPendingMessages(void) #endif /* ---------------------------------------------------------------------------- - * Activate spark threads (PARALLEL_HASKELL only) + * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap) { @@ -1017,14 +1040,14 @@ scheduleActivateSpark(Capability *cap) createSparkThread(cap,spark); // defined in Sparks.c } } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- * Get work from a remote node (PARALLEL_HASKELL only) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) -static rtsBool +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +static rtsBool /* return value used in PARALLEL_HASKELL only */ scheduleGetRemoteWork(Capability *cap) { #if defined(PARALLEL_HASKELL) @@ -1062,14 +1085,14 @@ scheduleGetRemoteWork(Capability *cap) #endif /* PARALLEL_HASKELL */ } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- * After running a thread... * ------------------------------------------------------------------------- */ static void -schedulePostRunThread (StgTSO *t) +schedulePostRunThread (Capability *cap, StgTSO *t) { // We have to be able to catch transactions that are in an // infinite loop as a result of seeing an inconsistent view of @@ -1090,8 +1113,7 @@ schedulePostRunThread (StgTSO *t) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - throwToSingleThreaded_(&capabilities[0], t, - NULL, rtsTrue, NULL); + throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL); ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } @@ -1179,12 +1201,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) "--<< thread %ld (%s) stopped: HeapOverflow", (long)t->id, whatNext_strs[t->what_next]); - if (context_switch) { + if (cap->context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the // context switch flag, and we end up waiting for a GC. // See #1984, and concurrent/should_run/1984 - context_switch = 0; + cap->context_switch = 0; addToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); @@ -1234,7 +1256,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) // the CPU because the tick always arrives during GC). This way // penalises threads that do a lot of allocation, but that seems // better than the alternative. - context_switch = 0; + cap->context_switch = 0; /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake @@ -1435,6 +1457,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) return cap; // NOTE: task->cap might have changed here } + setContextSwitches(); for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); if (cap != &capabilities[i]) { @@ -1445,7 +1468,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // all the Capabilities, but even so it's a slightly // unsavoury invariant. task->cap = pcap; - context_switch = 1; waitForReturnCapability(&pcap, task); if (pcap != &capabilities[i]) { barf("scheduleDoGC: got the wrong capability"); @@ -1489,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) performHeapProfile = rtsFalse; } +#ifdef SPARKBALANCE + /* JB + Once we are all together... this would be the place to balance all + spark pools. No concurrent stealing or adding of new sparks can + occur. Should be defined in Sparks.c. */ + balanceSparkPoolsCaps(n_capabilities, capabilities); +#endif + #if defined(THREADED_RTS) // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { @@ -1954,7 +1984,6 @@ initScheduler(void) blackhole_queue = END_TSO_QUEUE; - context_switch = 0; sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; @@ -2132,10 +2161,17 @@ threadStackOverflow(Capability *cap, StgTSO *tso) } /* Try to double the current stack size. If that takes us over the - * maximum stack size for this thread, then use the maximum instead. - * Finally round up so the TSO ends up as a whole number of blocks. + * maximum stack size for this thread, then use the maximum instead + * (that is, unless we're already at or over the max size and we + * can't raise the StackOverflow exception (see above), in which + * case just double the size). Finally round up so the TSO ends up as + * a whole number of blocks. */ - new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); + if (tso->stack_size >= tso->max_stack_size) { + new_stack_size = tso->stack_size * 2; + } else { + new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); + } new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + TSO_STRUCT_SIZE)/sizeof(W_); new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ @@ -2192,7 +2228,7 @@ static StgTSO * threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) { bdescr *bd, *new_bd; - lnat new_tso_size_w, tso_size_w; + lnat free_w, tso_size_w; StgTSO *new_tso; tso_size_w = tso_sizeW(tso); @@ -2207,19 +2243,19 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) // while we are moving the TSO: lockClosure((StgClosure *)tso); - new_tso_size_w = round_to_mblocks(tso_size_w/2); - - debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", - (long)tso->id, tso_size_w, new_tso_size_w); + // this is the number of words we'll free + free_w = round_to_mblocks(tso_size_w/2); bd = Bdescr((StgPtr)tso); - new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W); - new_bd->free = bd->free; + new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W); bd->free = bd->start + TSO_STRUCT_SIZEW; new_tso = (StgTSO *)new_bd->start; memcpy(new_tso,tso,TSO_STRUCT_SIZE); - new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW; + new_tso->stack_size = new_bd->free - new_tso->stack; + + debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", + (long)tso->id, tso_size_w, tso_sizeW(new_tso)); tso->what_next = ThreadRelocated; tso->_link = new_tso; // no write barrier reqd: same generation @@ -2247,7 +2283,7 @@ void interruptStgRts(void) { sched_state = SCHED_INTERRUPTING; - context_switch = 1; + setContextSwitches(); wakeUpRts(); }