// this list.
Task *suspended_ccalling_tasks;
+ // One mutable list per generation, so we don't need to take any
+ // locks when updating an old-generation thunk. These
+ // mini-mut-lists are moved onto the respective gen->mut_list at
+ // each GC.
+ bdescr **mut_lists;
+
#if defined(THREADED_RTS)
// Worker Tasks waiting in the wings. Singly-linked.
Task *spare_workers;
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
+INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+
#if defined(THREADED_RTS)
// Gives up the current capability IFF there is a higher-priority
//
void shutdownCapability (Capability *cap, Task *task);
+// Attempt to gain control of a Capability if it is free.
+//
+rtsBool tryGrabCapability (Capability *cap, Task *task);
+
#else // !THREADED_RTS
// Grab a capability. (Only in the non-threaded RTS; in the threaded
#endif /* !THREADED_RTS */
+/* -----------------------------------------------------------------------------
+ * INLINE functions... private below here
+ * -------------------------------------------------------------------------- */
+
+INLINE_HEADER void
+recordMutableCap (StgClosure *p, Capability *cap, nat gen)
+{
+ bdescr *bd;
+
+ bd = cap->mut_lists[gen];
+ if (bd->free >= bd->start + BLOCK_SIZE_W) {
+ bdescr *new_bd;
+ new_bd = allocBlock_lock();
+ new_bd->link = bd;
+ bd = new_bd;
+ cap->mut_lists[gen] = bd;
+ }
+ *bd->free++ = (StgWord)p;
+}
+
#endif /* CAPABILITY_H */
// scheduler clearer.
//
static void schedulePreLoop (void);
+static void schedulePushWork(Capability *cap, Task *task);
static void scheduleStartSignalHandlers (void);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
}
#endif
+#ifdef SMP
+ schedulePushWork(cap,task);
+#endif
+
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
cap->in_haskell = rtsFalse;
+ // The TSO might have moved, eg. if it re-entered the RTS and a GC
+ // happened. So find the new location:
+ t = cap->r.rCurrentTSO;
+
#ifdef SMP
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
// perhaps even on a different Capability. It may be the case
// that task->cap != cap. We better yield this Capability
// immediately and return to normaility.
- if (ret == ThreadBlocked) continue;
+ if (ret == ThreadBlocked) {
+ IF_DEBUG(scheduler,
+ debugBelch("--<< thread %d (%s) stopped: blocked\n",
+ t->id, whatNext_strs[t->what_next]));
+ continue;
+ }
#endif
ASSERT_CAPABILITY_INVARIANTS(cap,task);
- // The TSO might have moved, eg. if it re-entered the RTS and a GC
- // happened. So find the new location:
- t = cap->r.rCurrentTSO;
-
// And save the current errno in this thread.
t->saved_errno = errno;
case ThreadFinished:
if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+ ASSERT_CAPABILITY_INVARIANTS(cap,task);
break;
default:
#endif
}
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+static void
+schedulePushWork(Capability *cap, Task *task)
+{
+#ifdef SMP
+ Capability *free_caps[n_capabilities], *cap0;
+ nat i, n_free_caps;
+
+ // Check whether we have more threads on our run queue that we
+ // could hand to another Capability.
+ if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+ return;
+ }
+
+ // First grab as many free Capabilities as we can.
+ for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+ cap0 = &capabilities[i];
+ if (cap != cap0 && tryGrabCapability(cap0,task)) {
+ if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+ // it already has some work, we just grabbed it at
+ // the wrong moment. Or maybe it's deadlocked!
+ releaseCapability(cap0);
+ } else {
+ free_caps[n_free_caps++] = cap0;
+ }
+ }
+ }
+
+ // we now have n_free_caps free capabilities stashed in
+ // free_caps[]. Share our run queue equally with them. This is
+ // probably the simplest thing we could do; improvements we might
+ // want to do include:
+ //
+ // - giving high priority to moving relatively new threads, on
+ // the gournds that they haven't had time to build up a
+ // working set in the cache on this CPU/Capability.
+ //
+ // - giving low priority to moving long-lived threads
+
+ if (n_free_caps > 0) {
+ StgTSO *prev, *t, *next;
+ IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+ prev = cap->run_queue_hd;
+ t = prev->link;
+ prev->link = END_TSO_QUEUE;
+ i = 0;
+ for (; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+ t->link = END_TSO_QUEUE;
+ if (t->what_next == ThreadRelocated) {
+ prev->link = t;
+ prev = t;
+ } else if (i == n_free_caps) {
+ i = 0;
+ // keep one for us
+ prev->link = t;
+ prev = t;
+ } else {
+ appendToRunQueue(free_caps[i],t);
+ if (t->bound) { t->bound->cap = free_caps[i]; }
+ i++;
+ }
+ }
+ cap->run_queue_tl = prev;
+
+ // release the capabilities
+ for (i = 0; i < n_free_caps; i++) {
+ task->cap = free_caps[i];
+ releaseCapability(free_caps[i]);
+ }
+ }
+ task->cap = cap; // reset to point to our Capability.
+#endif
+}
+
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
//
was_waiting = cas(&waiting_for_gc, 0, 1);
- if (was_waiting) return;
+ if (was_waiting) {
+ do {
+ IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+ yieldCapability(&cap,task);
+ } while (waiting_for_gc);
+ return;
+ }
for (i=0; i < n_capabilities; i++) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
+ context_switch = 1;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
}
}
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
{
- switch (tso->what_next) {
- case ThreadKilled:
- debugBelch("has been killed");
- break;
- case ThreadComplete:
- debugBelch("has completed");
- break;
- default:
- printThreadBlockage(tso);
- }
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ switch (t->what_next) {
+ case ThreadKilled:
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(t);
+ }
+ debugBelch("\n");
+ }
}
void
printAllThreads(void)
{
- StgTSO *t;
+ StgTSO *t, *next;
+ nat i;
+ Capability *cap;
# if defined(GRAN)
char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
debugBelch("all threads:\n");
# endif
- for (t = all_threads; t != END_TSO_QUEUE; ) {
- debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
- {
- void *label = lookupThreadLabel(t->id);
- if (label) debugBelch("[\"%s\"] ",(char *)label);
- }
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- t = t->link;
- } else {
- printThreadStatus(t);
- debugBelch("\n");
- t = t->global_link;
- }
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ debugBelch("threads on capability %d:\n", cap->no);
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ }
+ }
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->why_blocked != NotBlocked) {
+ printThreadStatus(t);
+ }
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ }
}
}
{
nat i = 0;
for (; t != END_TSO_QUEUE; t = t->link) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- } else {
- printThreadStatus(t);
- debugBelch("\n");
- }
+ printThreadStatus(t);
i++;
}
debugBelch("%d threads on queue\n", i);