forkzh_fast
{
/* args: R1 = closure to spark */
-
+
MAYBE_GC(R1_PTR, forkzh_fast);
- // create it right now, return ThreadID in R1
- "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr",
+ W_ closure;
+ W_ threadid;
+ closure = R1;
+
+ "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr",
+ RtsFlags_GcFlags_initialStkSize(RtsFlags),
+ closure "ptr") [];
+ foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
+
+ // switch at the earliest opportunity
+ CInt[context_switch] = 1 :: CInt;
+
+ RET_P(threadid);
+}
+
+forkOnzh_fast
+{
+ /* args: R1 = cpu, R2 = closure to spark */
+
+ MAYBE_GC(R2_PTR, forkOnzh_fast);
+
+ W_ cpu;
+ W_ closure;
+ W_ threadid;
+ cpu = R1;
+ closure = R2;
+
+ "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr",
RtsFlags_GcFlags_initialStkSize(RtsFlags),
- R1 "ptr") [R1];
- foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr") [R1];
+ closure "ptr") [];
+ foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
// switch at the earliest opportunity
CInt[context_switch] = 1 :: CInt;
- RET_P(R1);
+ RET_P(threadid);
}
yieldzh_fast
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
+static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(GRAN)
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
- // Any threads that were woken up by other Capabilities get
- // appended to our run queue.
- if (!emptyWakeupQueue(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- if (emptyRunQueue(cap)) {
- cap->run_queue_hd = cap->wakeup_queue_hd;
- cap->run_queue_tl = cap->wakeup_queue_tl;
- } else {
- cap->run_queue_tl->link = cap->wakeup_queue_hd;
- cap->run_queue_tl = cap->wakeup_queue_tl;
- }
- cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
- RELEASE_LOCK(&cap->lock);
- }
+ scheduleCheckWakeupThreads(cap);
scheduleCheckBlockedThreads(cap);
next = t->link;
t->link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
- || t->bound == task) { // don't move my bound thread
+ || t->bound == task // don't move my bound thread
+ || tsoLocked(t)) { // don't move a locked thread
prev->link = t;
prev = t;
} else if (i == n_free_caps) {
/* ----------------------------------------------------------------------------
+ * Check for threads woken up by other Capabilities
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ // Any threads that were woken up by other Capabilities get
+ // appended to our run queue.
+ if (!emptyWakeupQueue(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ if (emptyRunQueue(cap)) {
+ cap->run_queue_hd = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ } else {
+ cap->run_queue_tl->link = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ }
+ cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+ RELEASE_LOCK(&cap->lock);
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Check for threads blocked on BLACKHOLEs that can be woken up
* ------------------------------------------------------------------------- */
static void
appendToRunQueue(cap,tso);
}
+void
+scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
+{
+#if defined(THREADED_RTS)
+ tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
+ // move this thread from now on.
+ cpu %= RtsFlags.ParFlags.nNodes;
+ if (cpu == cap->no) {
+ appendToRunQueue(cap,tso);
+ } else {
+ Capability *target_cap = &capabilities[cpu];
+ if (tso->bound) {
+ tso->bound->cap = target_cap;
+ }
+ tso->cap = target_cap;
+ wakeupThreadOnCapability(target_cap,tso);
+ }
+#else
+ appendToRunQueue(cap,tso);
+#endif
+}
+
Capability *
scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
{
next = tso->link;
tso->link = END_TSO_QUEUE;
- if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+#if defined(THREADED_RTS)
+ if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
// We are waking up this thread on the current Capability, which
// might involve migrating it from the Capability it was last on.
if (tso->bound) {
// we'll try to wake it up on the Capability it was last on.
wakeupThreadOnCapability(tso->cap, tso);
}
+#else
+ appendToRunQueue(cap,tso);
+ context_switch = 1;
+#endif
IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;