#include "Proftimer.h"
#include "ProfHeap.h"
#include "GC.h"
+#include "Weak.h"
+#include "EventLog.h"
/* PARALLEL_HASKELL includes go here */
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
-static rtsBool scheduleGetRemoteWork(Capability *cap);
-static void scheduleSendPendingMessages(void);
-#endif
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
STATIC_INLINE void
addToRunQueue( Capability *cap, StgTSO *t )
{
-#if defined(PARALLEL_HASKELL)
- if (RtsFlags.ParFlags.doFairScheduling) {
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
- } else {
- // this does unfair scheduling; good for parallelism
- pushOnRunQueue(cap,t);
- }
-#else
// this does round-robin scheduling; good for concurrency
appendToRunQueue(cap,t);
-#endif
}
/* ---------------------------------------------------------------------------
StgTSO *t;
Capability *cap;
StgThreadReturnCode ret;
-#if defined(PARALLEL_HASKELL)
- rtsBool receivedFinish = rtsFalse;
-#endif
nat prev_what_next;
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
"### NEW SCHEDULER LOOP (task: %p, cap: %p)",
task, initialCapability);
+ if (running_finalizers) {
+ errorBelch("error: a C finalizer called back into Haskell.\n"
+ " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
+ " To create finalizers that may call back into Haskll, use\n"
+ " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
+ stg_exit(EXIT_FAILURE);
+ }
+
schedulePreLoop();
// -----------------------------------------------------------
// Scheduler loop starts here:
-#if defined(PARALLEL_HASKELL)
-#define TERMINATION_CONDITION (!receivedFinish)
-#else
-#define TERMINATION_CONDITION rtsTrue
-#endif
-
- while (TERMINATION_CONDITION) {
+ while (1) {
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
-#if defined(PARALLEL_HASKELL)
- /* since we perform a blocking receive and continue otherwise,
- either we never reach here or we definitely have work! */
- // from here: non-empty run queue
- ASSERT(!emptyRunQueue(cap));
-
- if (PacketsWaiting()) { /* now process incoming messages, if any
- pending...
-
- CAUTION: scheduleGetRemoteWork called
- above, waits for messages as well! */
- processMessages(cap, &receivedFinish);
- }
-#endif // PARALLEL_HASKELL: non-empty run queue!
-
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
}
#endif
+ postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
+
switch (prev_what_next) {
case ThreadKilled:
t->saved_winerror = GetLastError();
#endif
+ postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
+
#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
schedulePostRunThread(cap,t);
- t = threadStackUnderflow(task,t);
+ if (ret != StackOverflow) {
+ t = threadStackUnderflow(task,t);
+ }
ready_to_gc = rtsFalse;
scheduleCheckBlockedThreads(cap);
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
-
-#if defined(PARALLEL_HASKELL)
- // if messages have been buffered...
- scheduleSendPendingMessages();
-#endif
-
-#if defined(PARALLEL_HASKELL)
- if (emptyRunQueue(cap)) {
- receivedFinish = scheduleGetRemoteWork(cap);
- continue; // a new round, (hopefully) with new work
- /*
- in GUM, this a) sends out a FISH and returns IF no fish is
- out already
- b) (blocking) awaits and receives messages
-
- in Eden, this is only the blocking receive, as b) in GUM.
- */
- }
-#endif
}
#if defined(THREADED_RTS)
// if we have work, and we don't need to give up the Capability, continue.
if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
+ !emptyWakeupQueue(cap) ||
blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
} else {
debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
appendToRunQueue(free_caps[i],t);
+
+ postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
+
if (t->bound) { t->bound->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
spark = tryStealSpark(cap->sparks);
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
+
+ postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
+
newSpark(&(free_caps[i]->r), spark);
}
}
static void
scheduleDetectDeadlock (Capability *cap, Task *task)
{
-
-#if defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
- return;
-#endif
-
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(THREADED_RTS)
static void
scheduleActivateSpark(Capability *cap)
{
#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
- * Get work from a remote node (PARALLEL_HASKELL only)
- * ------------------------------------------------------------------------- */
-
-#if defined(PARALLEL_HASKELL)
-static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork (Capability *cap STG_UNUSED)
-{
-#if defined(PARALLEL_HASKELL)
- rtsBool receivedFinish = rtsFalse;
-
- // idle() , i.e. send all buffers, wait for work
- if (RtsFlags.ParFlags.BufferTime) {
- IF_PAR_DEBUG(verbose,
- debugBelch("...send all pending data,"));
- {
- nat i;
- for (i=1; i<=nPEs; i++)
- sendImmediately(i); // send all messages away immediately
- }
- }
-
- /* this would be the place for fishing in GUM...
-
- if (no-earlier-fish-around)
- sendFish(choosePe());
- */
-
- // Eden:just look for incoming messages (blocking receive)
- IF_PAR_DEBUG(verbose,
- debugBelch("...wait for incoming messages...\n"));
- processMessages(cap, &receivedFinish); // blocking receive...
-
-
- return receivedFinish;
- // reenter scheduling look after having received something
-
-#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
-
- return rtsFalse; /* return value unused in THREADED_RTS */
-
-#endif /* PARALLEL_HASKELL */
-}
-#endif // PARALLEL_HASKELL || THREADED_RTS
-
-/* ----------------------------------------------------------------------------
* After running a thread...
* ------------------------------------------------------------------------- */
"--<< thread %ld (%s) stopped: HeapOverflow",
(long)t->id, whatNext_strs[t->what_next]);
- if (cap->context_switch) {
+ if (cap->r.rHpLim == NULL || cap->context_switch) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
// context switch flag, and we end up waiting for a GC.
static void
scheduleHandleThreadBlocked( StgTSO *t
-#if !defined(GRAN) && !defined(DEBUG)
+#if !defined(DEBUG)
STG_UNUSED
#endif
)
if (gc_type == PENDING_GC_SEQ)
{
+ postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
{
// multi-threaded GC: make sure all the Capabilities donate one
// GC thread each.
+ postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
waitForGcThreads(cap);
heap_census = scheduleNeedHeapProfile(rtsTrue);
+#if defined(THREADED_RTS)
+ postEvent(cap, EVENT_GC_START, 0, 0);
+ debugTrace(DEBUG_sched, "doing GC");
+ // reset waiting_for_gc *before* GC, so that when the GC threads
+ // emerge they don't immediately re-enter the GC.
+ waiting_for_gc = 0;
+ GarbageCollect(force_major || heap_census, gc_type, cap);
+#else
+ GarbageCollect(force_major || heap_census, 0, cap);
+#endif
+ postEvent(cap, EVENT_GC_END, 0, 0);
+
if (recent_activity == ACTIVITY_INACTIVE && force_major)
{
// We are doing a GC because the system has been idle for a
// timeslice and we need to check for deadlock. Record the
// fact that we've done a GC and turn off the timer signal;
// it will get re-enabled if we run any threads after the GC.
- //
- // Note: this is done before GC, because after GC there might
- // be threads already running (GarbageCollect() releases the
- // GC threads when it completes), so we risk turning off the
- // timer signal when it should really be on.
recent_activity = ACTIVITY_DONE_GC;
stopTimer();
}
+ else
+ {
+ // the GC might have taken long enough for the timer to set
+ // recent_activity = ACTIVITY_INACTIVE, but we aren't
+ // necessarily deadlocked:
+ recent_activity = ACTIVITY_YES;
+ }
#if defined(THREADED_RTS)
- debugTrace(DEBUG_sched, "doing GC");
- // reset waiting_for_gc *before* GC, so that when the GC threads
- // emerge they don't immediately re-enter the GC.
- waiting_for_gc = 0;
- GarbageCollect(force_major || heap_census, gc_type, cap);
-#else
- GarbageCollect(force_major || heap_census, 0, cap);
+ if (gc_type == PENDING_GC_PAR)
+ {
+ releaseGCThreads(cap);
+ }
#endif
if (heap_census) {
task = cap->running_task;
tso = cap->r.rCurrentTSO;
+ postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
debugTrace(DEBUG_sched,
"thread %lu did a safe foreign call",
(unsigned long)cap->r.rCurrentTSO->id);
tso = task->suspended_tso;
task->suspended_tso = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
+
+ postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
if (tso->why_blocked == BlockedOnCCall) {
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
+ postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
cap = task->cap;
RELEASE_LOCK(&task->lock);
+ if (RtsFlags.ParFlags.setAffinity) {
+ setThreadAffinity(cap->no, n_capabilities);
+ }
+
// set the thread-local pointer to the Task:
taskEnter(task);
initTaskManager();
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
initSparkPools();
#endif
}
#endif
- trace(TRACE_sched, "start: %d capabilities", n_capabilities);
-
RELEASE_LOCK(&sched_mutex);
}
{
Task *task = NULL;
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
-#endif
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
-#if defined(THREADED_RTS)
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
releaseCapability(task->cap);
-#else
- scheduleDoGC(&MainCapability,task,rtsFalse);
-#endif
}
sched_state = SCHED_SHUTTING_DOWN;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalling_tasks queue.
- ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,force_major);
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
- IF_PAR_DEBUG(verbose,
- debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
- tso->id, tso, tso->stack_size);
- /* If we're debugging, just print out the top of the stack */
- printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
- tso->sp+64)));
-
unlockTSO(dest);
unlockTSO(tso);
tso_size_w = tso_sizeW(tso);
- if (tso_size_w < MBLOCK_SIZE_W ||
+ if (tso_size_w < MBLOCK_SIZE_W ||
+ // TSO is less than 2 mblocks (since the first mblock is
+ // shorter than MBLOCK_SIZE_W)
+ (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
+ // or TSO is not a whole number of megablocks (ensuring
+ // precondition of splitLargeBlock() below)
+ (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
+ // or TSO is smaller than the minimum stack size (rounded up)
(nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
+ // or stack is using more than 1/4 of the available space
{
+ // then do nothing
return tso;
}
prev = &blackhole_queue;
t = blackhole_queue;
while (t != END_TSO_QUEUE) {
+ if (t->what_next == ThreadRelocated) {
+ t = t->_link;
+ continue;
+ }
ASSERT(t->why_blocked == BlockedOnBlackHole);
type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {