*/
rtsBool blackholes_need_checking = rtsFalse;
-/* flag set by signal handler to precipitate a context switch
- * LOCK: none (just an advisory flag)
- */
-int context_switch = 0;
-
/* flag that tracks whether we have done any execution in this time slice.
* LOCK: currently none, perhaps we should lock (but needs to be
* updated in the fast path of the scheduler).
// scheduler clearer.
//
static void schedulePreLoop (void);
-#if defined(THREADED_RTS)
-static void schedulePushWork(Capability *cap, Task *task);
-#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
static rtsBool scheduleGetRemoteWork(Capability *cap);
+#if defined(PARALLEL_HASKELL)
static void scheduleSendPendingMessages(void);
+#endif
static void scheduleActivateSpark(Capability *cap);
#endif
-static void schedulePostRunThread(StgTSO *t);
+static void schedulePostRunThread(Capability *cap, StgTSO *t);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static void scheduleHandleStackOverflow( Capability *cap, Task *task,
StgTSO *t);
} else {
// Yield the capability to higher-priority tasks if necessary.
yieldCapability(&cap, task);
+ /* inside yieldCapability, attempts to steal work from other
+ capabilities, unless the capability has own work.
+ See (REMARK) below.
+ */
}
#endif
-
-#if defined(THREADED_RTS)
- schedulePushWork(cap,task);
-#endif
+ /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
+
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
barf("sched_state: %d", sched_state);
}
-#if defined(THREADED_RTS)
- // If the run queue is empty, take a spark and turn it into a thread.
- {
- if (emptyRunQueue(cap)) {
- StgClosure *spark;
- spark = findSpark(cap);
- if (spark != NULL) {
- debugTrace(DEBUG_sched,
- "turning spark of closure %p into a thread",
- (StgClosure *)spark);
- createSparkThread(cap,spark);
- }
- }
- }
-#endif // THREADED_RTS
+ /* this was the place to activate a spark, now below... */
scheduleStartSignalHandlers(cap);
scheduleCheckBlockedThreads(cap);
-#if defined(PARALLEL_HASKELL)
- /* message processing and work distribution goes here */
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ /* work distribution in multithreaded and parallel systems
+ REMARK: IMHO best location for work-stealing as well.
+ tests above might yield some new jobs, so no need to steal a
+ spark in some cases. I believe the yieldCapability.. above
+ should be moved here.
+ */
+
+#if defined(PARALLEL_HASKELL)
/* if messages have been buffered... a NOOP in THREADED_RTS */
scheduleSendPendingMessages();
+#endif
/* If the run queue is empty,...*/
if (emptyRunQueue(cap)) {
/* if this did not work, try to steal a spark from someone else */
if (emptyRunQueue(cap)) {
+#if defined(PARALLEL_HASKELL)
receivedFinish = scheduleGetRemoteWork(cap);
continue; // a new round, (hopefully) with new work
/*
b) (blocking) awaits and receives messages
in Eden, this is only the blocking receive, as b) in GUM.
+
+ in Threaded-RTS, this does plain nothing. Stealing routine
+ is inside Capability.c and called from
+ yieldCapability() at the very beginning, see REMARK.
*/
+#endif
}
- }
+ } else { /* i.e. run queue was (initially) not empty */
+ schedulePushWork(cap,task);
+ /* work pushing, currently relevant only for THREADED_RTS:
+ (pushes threads, wakes up idle capabilities for stealing) */
+ }
+#if defined(PARALLEL_HASKELL)
/* since we perform a blocking receive and continue otherwise,
either we never reach here or we definitely have work! */
// from here: non-empty run queue
above, waits for messages as well! */
processMessages(cap, &receivedFinish);
}
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL: non-empty run queue!
+
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
*/
if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
&& !emptyThreadQueues(cap)) {
- context_switch = 1;
+ cap->context_switch = 1;
}
run_thread:
CCCS = CCS_SYSTEM;
#endif
- schedulePostRunThread(t);
+ schedulePostRunThread(cap,t);
t = threadStackUnderflow(task,t);
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
{
+ /* following code not for PARALLEL_HASKELL. I kept the call general,
+ future GUM versions might use pushing in a distributed setup */
+#if defined(THREADED_RTS)
+
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
StgTSO *prev, *t, *next;
rtsBool pushed_to_all;
- debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
+ debugTrace(DEBUG_sched,
+ "cap %d: %s and %d free capabilities, sharing...",
+ cap->no,
+ (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+ "excess threads on run queue":"sparks to share (>=2)",
+ n_free_caps);
i = 0;
pushed_to_all = rtsFalse;
cap->run_queue_tl = prev;
}
+#ifdef SPARK_PUSHING
+ /* JB I left this code in place, it would work but is not necessary */
+
// If there are some free capabilities that we didn't push any
// threads to, then try to push a spark to each one.
if (!pushed_to_all) {
}
}
}
+#endif /* SPARK_PUSHING */
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
releaseCapability(free_caps[i]);
}
+ // now wake them all up, and they might steal sparks if
+ // the did not get a thread
+ prodAllCapabilities();
}
task->cap = cap; // reset to point to our Capability.
+
+#endif /* THREADED_RTS */
+
}
-#endif
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL)
-static StgTSO *
+static void
scheduleSendPendingMessages(void)
{
#endif
/* ----------------------------------------------------------------------------
- * Activate spark threads (PARALLEL_HASKELL only)
+ * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void
scheduleActivateSpark(Capability *cap)
{
createSparkThread(cap,spark); // defined in Sparks.c
}
}
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
* Get work from a remote node (PARALLEL_HASKELL only)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL)
-static rtsBool
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static rtsBool /* return value used in PARALLEL_HASKELL only */
scheduleGetRemoteWork(Capability *cap)
{
#if defined(PARALLEL_HASKELL)
#endif /* PARALLEL_HASKELL */
}
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
* After running a thread...
* ------------------------------------------------------------------------- */
static void
-schedulePostRunThread (StgTSO *t)
+schedulePostRunThread (Capability *cap, StgTSO *t)
{
// We have to be able to catch transactions that are in an
// infinite loop as a result of seeing an inconsistent view of
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- throwToSingleThreaded_(&capabilities[0], t,
- NULL, rtsTrue, NULL);
+ throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
"--<< thread %ld (%s) stopped: HeapOverflow",
(long)t->id, whatNext_strs[t->what_next]);
- if (context_switch) {
+ if (cap->context_switch) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
- context_switch = 0;
+ cap->context_switch = 0;
addToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
// the CPU because the tick always arrives during GC). This way
// penalises threads that do a lot of allocation, but that seems
// better than the alternative.
- context_switch = 0;
+ cap->context_switch = 0;
/* put the thread back on the run queue. Then, if we're ready to
* GC, check whether this is the last task to stop. If so, wake
return cap; // NOTE: task->cap might have changed here
}
+ setContextSwitches();
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
if (cap != &capabilities[i]) {
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
- context_switch = 1;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
performHeapProfile = rtsFalse;
}
+#ifdef SPARKBALANCE
+ /* JB
+ Once we are all together... this would be the place to balance all
+ spark pools. No concurrent stealing or adding of new sparks can
+ occur. Should be defined in Sparks.c. */
+ balanceSparkPoolsCaps(n_capabilities, capabilities);
+#endif
+
#if defined(THREADED_RTS)
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
blackhole_queue = END_TSO_QUEUE;
- context_switch = 0;
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
}
/* Try to double the current stack size. If that takes us over the
- * maximum stack size for this thread, then use the maximum instead.
- * Finally round up so the TSO ends up as a whole number of blocks.
+ * maximum stack size for this thread, then use the maximum instead
+ * (that is, unless we're already at or over the max size and we
+ * can't raise the StackOverflow exception (see above), in which
+ * case just double the size). Finally round up so the TSO ends up as
+ * a whole number of blocks.
*/
- new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
+ if (tso->stack_size >= tso->max_stack_size) {
+ new_stack_size = tso->stack_size * 2;
+ } else {
+ new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
+ }
new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
TSO_STRUCT_SIZE)/sizeof(W_);
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
{
bdescr *bd, *new_bd;
- lnat new_tso_size_w, tso_size_w;
+ lnat free_w, tso_size_w;
StgTSO *new_tso;
tso_size_w = tso_sizeW(tso);
// while we are moving the TSO:
lockClosure((StgClosure *)tso);
- new_tso_size_w = round_to_mblocks(tso_size_w/2);
-
- debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
- (long)tso->id, tso_size_w, new_tso_size_w);
+ // this is the number of words we'll free
+ free_w = round_to_mblocks(tso_size_w/2);
bd = Bdescr((StgPtr)tso);
- new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
- new_bd->free = bd->free;
+ new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
bd->free = bd->start + TSO_STRUCT_SIZEW;
new_tso = (StgTSO *)new_bd->start;
memcpy(new_tso,tso,TSO_STRUCT_SIZE);
- new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
+ new_tso->stack_size = new_bd->free - new_tso->stack;
+
+ debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
+ (long)tso->id, tso_size_w, tso_sizeW(new_tso));
tso->what_next = ThreadRelocated;
tso->_link = new_tso; // no write barrier reqd: same generation
interruptStgRts(void)
{
sched_state = SCHED_INTERRUPTING;
- context_switch = 1;
+ setContextSwitches();
wakeUpRts();
}