#ifdef THREADED_RTS
struct PAR_FLAGS {
nat nNodes; /* number of threads to run simultaneously */
+ rtsBool migrate; /* migrate threads between capabilities */
+ rtsBool wakeupMigrate; /* migrate a thread on wakeup */
unsigned int maxLocalSparks;
};
#endif /* THREADED_RTS */
*/
#define TSO_DIRTY 1
+#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
+
/*
* Type returned after running a thread. Values of this type
* include HeapOverflow, StackOverflow etc. See Constants.h for the
*/
typedef struct StgTSO_ {
- StgHeader header;
-
- struct StgTSO_* link; /* Links threads onto blocking queues */
- struct StgTSO_* global_link; /* Links all threads together */
-
- StgWord16 what_next; /* Values defined in Constants.h */
- StgWord16 why_blocked; /* Values defined in Constants.h */
- StgWord32 flags;
- StgTSOBlockInfo block_info;
- struct StgTSO_* blocked_exceptions;
- StgThreadID id;
- int saved_errno;
- struct Task_* bound; // non-NULL for a bound thread
- struct StgTRecHeader_ *trec; /* STM transaction record */
-
+ StgHeader header;
+
+ struct StgTSO_* link; /* Links threads onto blocking queues */
+ struct StgTSO_* global_link; /* Links all threads together */
+
+ StgWord16 what_next; /* Values defined in Constants.h */
+ StgWord16 why_blocked; /* Values defined in Constants.h */
+ StgWord32 flags;
+ StgTSOBlockInfo block_info;
+ struct StgTSO_* blocked_exceptions;
+ StgThreadID id;
+ int saved_errno;
+ struct Task_* bound;
+ struct Capability_* cap;
+ struct StgTRecHeader_ * trec; /* STM transaction record */
+
#ifdef TICKY_TICKY
- /* TICKY-specific stuff would go here. */
+ /* TICKY-specific stuff would go here. */
#endif
#ifdef PROFILING
- StgTSOProfInfo prof;
+ StgTSOProfInfo prof;
#endif
#ifdef PAR
- StgTSOParInfo par;
+ StgTSOParInfo par;
#endif
#ifdef GRAN
- StgTSOGranInfo gran;
+ StgTSOGranInfo gran;
#endif
#ifdef DIST
- StgTSODistInfo dist;
+ StgTSODistInfo dist;
#endif
- /* The thread stack... */
- StgWord stack_size; /* stack size in *words* */
- StgWord max_stack_size; /* maximum stack size in *words* */
- StgPtr sp;
-
- StgWord stack[FLEXIBLE_ARRAY];
+ /* The thread stack... */
+ StgWord32 stack_size; /* stack size in *words* */
+ StgWord32 max_stack_size; /* maximum stack size in *words* */
+ StgPtr sp;
+
+ StgWord stack[FLEXIBLE_ARRAY];
} StgTSO;
/* -----------------------------------------------------------------------------
IF_DEBUG(sanity,
foreign "C" checkStackChunk(Sp "ptr",
CurrentTSO + TSO_OFFSET_StgTSO_stack +
- WDS(StgTSO_stack_size(CurrentTSO)) "ptr") [R1]);
+ WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) "ptr") [R1]);
ENTER();
}
// other global condition to check, such as threads blocked on
// blackholes).
if (emptyRunQueue(cap)) {
- return !emptySparkPoolCap(cap) || globalWorkToDo();
+ return !emptySparkPoolCap(cap)
+ || !emptyWakeupQueue(cap)
+ || globalWorkToDo();
} else
return cap->run_queue_hd->bound == NULL;
}
cap->suspended_ccalling_tasks = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
+ cap->wakeup_queue_hd = END_TSO_QUEUE;
+ cap->wakeup_queue_tl = END_TSO_QUEUE;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
+ || !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
}
/* ----------------------------------------------------------------------------
+ * Wake up a thread on a Capability.
+ *
+ * This is used when the current Task is running on a Capability and
+ * wishes to wake up a thread on a different Capability.
+ * ------------------------------------------------------------------------- */
+
+void
+wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+{
+ ASSERT(tso->cap == cap);
+ ASSERT(tso->bound ? tso->bound->cap == cap : 1);
+
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task == NULL) {
+ // nobody is running this Capability, we can add our thread
+ // directly onto the run queue and start up a Task to run it.
+ appendToRunQueue(cap,tso);
+
+ // start it up
+ cap->running_task = myTask(); // precond for releaseCapability_()
+ releaseCapability_(cap);
+ } else {
+ appendToWakeupQueue(cap,tso);
+ // someone is running on this Capability, so it cannot be
+ // freed without first checking the wakeup queue (see
+ // releaseCapability_).
+ }
+ RELEASE_LOCK(&cap->lock);
+}
+
+/* ----------------------------------------------------------------------------
* prodCapabilities
*
* Used to indicate that the interrupted flag is now set, or some
// Worker Tasks waiting in the wings. Singly-linked.
Task *spare_workers;
- // This lock protects running_task and returning_tasks_{hd,tl}.
+ // This lock protects running_task, returning_tasks_{hd,tl}, wakeup_queue.
Mutex lock;
// Tasks waiting to return from a foreign call, or waiting to make
// check whether it is NULL without taking the lock, however.
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
+
+ // A list of threads to append to this Capability's run queue at
+ // the earliest opportunity. These are threads that have been
+ // woken up by another Capability.
+ StgTSO *wakeup_queue_hd;
+ StgTSO *wakeup_queue_tl;
#endif
// Per-capability STM-related data
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
+// Wakes up a thread on a Capability (probably a different Capability
+// from the one held by the current Task).
+//
+void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
+
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
* entry code in StgStartup.cmm.
*/
Sp = CurrentTSO + TSO_OFFSET_StgTSO_stack
- + WDS(StgTSO_stack_size(CurrentTSO)) - WDS(2);
+ + WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) - WDS(2);
Sp(1) = R1; /* save the exception */
Sp(0) = stg_enter_info; /* so that GC can traverse this stack */
StgTSO_what_next(CurrentTSO) = ThreadKilled::I16;
#ifdef THREADED_RTS
RtsFlags.ParFlags.nNodes = 1;
+ RtsFlags.ParFlags.migrate = rtsTrue;
+ RtsFlags.ParFlags.wakeupMigrate = rtsFalse;
#endif
#ifdef PAR
#endif /* DEBUG */
#if defined(THREADED_RTS)
" -N<n> Use <n> OS threads (default: 1)",
+" -qm Don't automatically migrate threads between CPUs",
+" -qw Migrate a thread to the current CPU when it is woken up",
#endif
#if defined(THREADED_RTS) || defined(PAR)
" -e<size> Size of spark pools (default 100)",
}
}
) break;
+
+ case 'q':
+ switch (rts_argv[arg][2]) {
+ case '\0':
+ errorBelch("incomplete RTS option: %s",rts_argv[arg]);
+ error = rtsTrue;
+ break;
+ case 'm':
+ RtsFlags.ParFlags.migrate = rtsFalse;
+ break;
+ case 'w':
+ RtsFlags.ParFlags.wakeupMigrate = rtsTrue;
+ break;
+ default:
+ errorBelch("unknown RTS option: %s",rts_argv[arg]);
+ error = rtsTrue;
+ break;
+ }
+ break;
#endif
/* =========== PARALLEL =========================== */
case 'e':
}
) break;
+#ifdef PAR
case 'q':
PAR_BUILD_ONLY(
process_par_option(arg, rts_argc, rts_argv, &error);
) break;
+#endif
/* =========== GRAN =============================== */
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+ // Any threads that were woken up by other Capabilities get
+ // appended to our run queue.
+ if (!emptyWakeupQueue(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ if (emptyRunQueue(cap)) {
+ cap->run_queue_hd = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ } else {
+ cap->run_queue_tl->link = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ }
+ cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+ RELEASE_LOCK(&cap->lock);
+ }
+
scheduleCheckBlockedThreads(cap);
scheduleDetectDeadlock(cap,task);
// Run the current thread
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT(t->cap == cap);
prev_what_next = t->what_next;
#endif
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT(t->cap == cap);
// ----------------------------------------------------------------------
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
+ // migration can be turned off with +RTS -qg
+ if (!RtsFlags.ParFlags.migrate) return;
+
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
appendToRunQueue(free_caps[i],t);
if (t->bound) { t->bound->cap = free_caps[i]; }
+ t->cap = free_caps[i];
i++;
}
}
tso->saved_errno = 0;
tso->bound = NULL;
+ tso->cap = cap;
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
tso->bound = task;
+ tso->cap = cap;
task->tso = tso;
task->ret = ret;
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
- evac((StgClosure **)&cap->run_queue_hd);
- evac((StgClosure **)&cap->run_queue_tl);
-
+ evac((StgClosure **)(void *)&cap->run_queue_hd);
+ evac((StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+ evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
+ evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
- evac((StgClosure **)&task->suspended_tso);
+ evac((StgClosure **)(void *)&task->suspended_tso);
}
+
}
+
#if !defined(THREADED_RTS)
evac((StgClosure **)(void *)&blocked_queue_hd);
evac((StgClosure **)(void *)&blocked_queue_tl);
ASSERT(get_itbl(tso)->type == TSO);
ASSERT(tso->why_blocked != NotBlocked);
+
tso->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
+ if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+ // We are waking up this thread on the current Capability, which
+ // might involve migrating it from the Capability it was last on.
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+ appendToRunQueue(cap,tso);
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
+ } else {
+ // we'll try to wake it up on the Capability it was last on.
+ wakeupThreadOnCapability(tso->cap, tso);
}
- appendToRunQueue(cap,tso);
-
- // we're holding a newly woken thread, make sure we context switch
- // quickly so we can migrate it if necessary.
- context_switch = 1;
- IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
+ IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;
}
if (tso->bound) {
tso->bound->cap = cap;
}
+ tso->cap = cap;
}
#endif
all_threads = tso;
IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
- // Wake up the thread on the Capability it was last on for a
- // bound thread, or last_free_capability otherwise.
- if (tso->bound) {
- cap = tso->bound->cap;
- } else {
- cap = last_free_capability;
- }
+ // Wake up the thread on the Capability it was last on
+ cap = tso->cap;
switch (tso->why_blocked) {
case BlockedOnMVar:
}
#endif
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+appendToWakeupQueue (Capability *cap, StgTSO *tso)
+{
+ ASSERT(tso->link == END_TSO_QUEUE);
+ if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
+ cap->wakeup_queue_hd = tso;
+ } else {
+ cap->wakeup_queue_tl->link = tso;
+ }
+ cap->wakeup_queue_tl = tso;
+}
+#endif
+
/* Check whether various thread queues are empty
*/
STATIC_INLINE rtsBool
return emptyQueue(cap->run_queue_hd);
}
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+emptyWakeupQueue(Capability *cap)
+{
+ return emptyQueue(cap->wakeup_queue_hd);
+}
+#endif
+
#if !defined(THREADED_RTS)
#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))