#include "Capability.h"
#include "Task.h"
#include "AwaitEvent.h"
+#if defined(mingw32_HOST_OS)
+#include "win32/IOManager.h"
+#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
// scheduler clearer.
//
static void schedulePreLoop (void);
-static void scheduleStartSignalHandlers (void);
+#if defined(SMP)
+static void schedulePushWork(Capability *cap, Task *task);
+#endif
+static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
#endif
nat prev_what_next;
rtsBool ready_to_gc;
+#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
+#endif
cap = initialCapability;
}
#endif
+#ifdef SMP
+ schedulePushWork(cap,task);
+#endif
+
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
// If we are a worker, just exit. If we're a bound thread
// then we will exit below when we've removed our TSO from
// the run queue.
- if (task->tso == NULL) {
+ if (task->tso == NULL && emptyRunQueue(cap)) {
return cap;
}
} else {
}
#endif // SMP
- scheduleStartSignalHandlers();
+ scheduleStartSignalHandlers(cap);
// Only check the black holes here if we've nothing else to do.
// During normal execution, the black hole list only gets checked
cap->in_haskell = rtsFalse;
+ // The TSO might have moved, eg. if it re-entered the RTS and a GC
+ // happened. So find the new location:
+ t = cap->r.rCurrentTSO;
+
#ifdef SMP
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
// perhaps even on a different Capability. It may be the case
// that task->cap != cap. We better yield this Capability
// immediately and return to normaility.
- if (ret == ThreadBlocked) continue;
+ if (ret == ThreadBlocked) {
+ IF_DEBUG(scheduler,
+ debugBelch("--<< thread %d (%s) stopped: blocked\n",
+ t->id, whatNext_strs[t->what_next]));
+ continue;
+ }
#endif
ASSERT_CAPABILITY_INVARIANTS(cap,task);
- // The TSO might have moved, eg. if it re-entered the RTS and a GC
- // happened. So find the new location:
- t = cap->r.rCurrentTSO;
-
// And save the current errno in this thread.
t->saved_errno = errno;
case ThreadFinished:
if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+ ASSERT_CAPABILITY_INVARIANTS(cap,task);
break;
default:
#endif
}
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+#ifdef SMP
+static void
+schedulePushWork(Capability *cap USED_WHEN_SMP,
+ Task *task USED_WHEN_SMP)
+{
+ Capability *free_caps[n_capabilities], *cap0;
+ nat i, n_free_caps;
+
+ // Check whether we have more threads on our run queue that we
+ // could hand to another Capability.
+ if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+ return;
+ }
+
+ // First grab as many free Capabilities as we can.
+ for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+ cap0 = &capabilities[i];
+ if (cap != cap0 && tryGrabCapability(cap0,task)) {
+ if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+ // it already has some work, we just grabbed it at
+ // the wrong moment. Or maybe it's deadlocked!
+ releaseCapability(cap0);
+ } else {
+ free_caps[n_free_caps++] = cap0;
+ }
+ }
+ }
+
+ // we now have n_free_caps free capabilities stashed in
+ // free_caps[]. Share our run queue equally with them. This is
+ // probably the simplest thing we could do; improvements we might
+ // want to do include:
+ //
+ // - giving high priority to moving relatively new threads, on
+ // the gournds that they haven't had time to build up a
+ // working set in the cache on this CPU/Capability.
+ //
+ // - giving low priority to moving long-lived threads
+
+ if (n_free_caps > 0) {
+ StgTSO *prev, *t, *next;
+ IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+ prev = cap->run_queue_hd;
+ t = prev->link;
+ prev->link = END_TSO_QUEUE;
+ i = 0;
+ for (; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+ t->link = END_TSO_QUEUE;
+ if (t->what_next == ThreadRelocated
+ || t->bound == task) { // don't move my bound thread
+ prev->link = t;
+ prev = t;
+ } else if (i == n_free_caps) {
+ i = 0;
+ // keep one for us
+ prev->link = t;
+ prev = t;
+ } else {
+ appendToRunQueue(free_caps[i],t);
+ if (t->bound) { t->bound->cap = free_caps[i]; }
+ i++;
+ }
+ }
+ cap->run_queue_tl = prev;
+
+ // release the capabilities
+ for (i = 0; i < n_free_caps; i++) {
+ task->cap = free_caps[i];
+ releaseCapability(free_caps[i]);
+ }
+ }
+ task->cap = cap; // reset to point to our Capability.
+}
+#endif
+
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
static void
-scheduleStartSignalHandlers(void)
+scheduleStartSignalHandlers(Capability *cap)
{
-#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
if (signals_pending()) { // safe outside the lock
- startSignalHandlers();
+ startSignalHandlers(cap);
}
#endif
}
if ( !emptyRunQueue(cap) ) return;
-#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
awaitUserSignals();
if (signals_pending()) {
- startSignalHandlers();
+ startSignalHandlers(cap);
}
// either we have threads to run, or we were interrupted:
/* enlarge the stack */
StgTSO *new_t = threadStackOverflow(cap, t);
- /* This TSO has moved, so update any pointers to it from the
- * main thread stack. It better not be on any other queues...
- * (it shouldn't be).
+ /* The TSO attached to this Task may have moved, so update the
+ * pointer to it.
*/
- if (task->tso != NULL) {
+ if (task->tso == t) {
task->tso = new_t;
}
pushOnRunQueue(cap,new_t);
//
was_waiting = cas(&waiting_for_gc, 0, 1);
- if (was_waiting) return;
+ if (was_waiting) {
+ do {
+ IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+ yieldCapability(&cap,task);
+ } while (waiting_for_gc);
+ return;
+ }
for (i=0; i < n_capabilities; i++) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
+ context_switch = 1;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
- IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
+ IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
TICK_ALLOC_TSO(new_stack_size,0);
}
}
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
{
- switch (tso->what_next) {
- case ThreadKilled:
- debugBelch("has been killed");
- break;
- case ThreadComplete:
- debugBelch("has completed");
- break;
- default:
- printThreadBlockage(tso);
- }
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ switch (t->what_next) {
+ case ThreadKilled:
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(t);
+ }
+ debugBelch("\n");
+ }
}
void
printAllThreads(void)
{
- StgTSO *t;
+ StgTSO *t, *next;
+ nat i;
+ Capability *cap;
# if defined(GRAN)
char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
debugBelch("all threads:\n");
# endif
- for (t = all_threads; t != END_TSO_QUEUE; ) {
- debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
- {
- void *label = lookupThreadLabel(t->id);
- if (label) debugBelch("[\"%s\"] ",(char *)label);
- }
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- t = t->link;
- } else {
- printThreadStatus(t);
- debugBelch("\n");
- t = t->global_link;
- }
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ debugBelch("threads on capability %d:\n", cap->no);
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ }
+ }
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->why_blocked != NotBlocked) {
+ printThreadStatus(t);
+ }
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ }
}
}
{
nat i = 0;
for (; t != END_TSO_QUEUE; t = t->link) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- } else {
- printThreadStatus(t);
- debugBelch("\n");
- }
+ printThreadStatus(t);
i++;
}
debugBelch("%d threads on queue\n", i);