#include "PosixSource.h"
#include "Rts.h"
-#include "SchedAPI.h"
-#include "Storage.h"
+
+#include "Capability.h"
+#include "Updates.h"
#include "Threads.h"
-#include "RtsFlags.h"
#include "STM.h"
#include "Schedule.h"
#include "Trace.h"
#include "ThreadLabels.h"
+#include "Updates.h"
+#include "Messages.h"
+#include "sm/Storage.h"
/* Next thread ID to allocate.
* LOCK: sched_mutex
}
size = round_to_mblocks(size);
- tso = (StgTSO *)allocateLocal(cap, size);
+ tso = (StgTSO *)allocate(cap, size);
stack_size = size - TSO_STRUCT_SIZEW;
TICK_ALLOC_TSO(stack_size, 0);
tso->what_next = ThreadRunGHC;
tso->why_blocked = NotBlocked;
- tso->blocked_exceptions = END_TSO_QUEUE;
- tso->flags = TSO_DIRTY;
+ tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
+ tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
+ tso->flags = 0;
+ tso->dirty = 1;
tso->saved_errno = 0;
tso->bound = NULL;
*/
ACQUIRE_LOCK(&sched_mutex);
tso->id = next_thread_id++; // while we have the mutex
- tso->global_link = g0s0->threads;
- g0s0->threads = tso;
+ tso->global_link = g0->threads;
+ g0->threads = tso;
RELEASE_LOCK(&sched_mutex);
- postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
+ // ToDo: report the stack size in the event?
+ traceEventCreateThread(cap, tso);
- debugTrace(DEBUG_sched,
- "created thread %ld, stack size = %lx words",
- (long)tso->id, (long)tso->stack_size);
return tso;
}
Fails fatally if the TSO is not on the queue.
-------------------------------------------------------------------------- */
-void
+rtsBool // returns True if we modified queue
removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
{
StgTSO *t, *prev;
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ return rtsFalse;
} else {
*queue = t->_link;
+ return rtsTrue;
}
- return;
}
}
barf("removeThreadFromQueue: not found");
}
-void
+rtsBool // returns True if we modified head or tail
removeThreadFromDeQueue (Capability *cap,
StgTSO **head, StgTSO **tail, StgTSO *tso)
{
StgTSO *t, *prev;
+ rtsBool flag = rtsFalse;
prev = NULL;
for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ flag = rtsFalse;
} else {
*head = t->_link;
+ flag = rtsTrue;
}
if (*tail == tso) {
if (prev) {
} else {
*tail = END_TSO_QUEUE;
}
- }
- return;
+ return rtsTrue;
+ } else {
+ return flag;
+ }
}
}
barf("removeThreadFromMVarQueue: not found");
}
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
- removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
-}
-
/* ----------------------------------------------------------------------------
- unblockOne()
+ tryWakeupThread()
+
+ Attempt to wake up a thread. tryWakeupThread is idempotent: it is
+ always safe to call it too many times, but it is not safe in
+ general to omit a call.
- unblock a single thread.
------------------------------------------------------------------------- */
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
{
- return unblockOne_(cap,tso,rtsTrue); // allow migration
+ tryWakeupThread_(cap, deRefTSO(tso));
}
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso,
- rtsBool allow_migrate USED_IF_THREADS)
+void
+tryWakeupThread_ (Capability *cap, StgTSO *tso)
{
- StgTSO *next;
+ traceEventThreadWakeup (cap, tso, tso->cap->no);
- // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
+#ifdef THREADED_RTS
+ if (tso->cap != cap)
+ {
+ MessageWakeup *msg;
+ msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
+ SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
+ msg->tso = tso;
+ sendMessage(cap, tso->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
+ (lnat)tso->id, tso->cap->no);
+ return;
+ }
+#endif
- tso->why_blocked = NotBlocked;
- next = tso->_link;
- tso->_link = END_TSO_QUEUE;
+ switch (tso->why_blocked)
+ {
+ case BlockedOnMVar:
+ {
+ if (tso->_link == END_TSO_QUEUE) {
+ tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+ goto unblock;
+ } else {
+ return;
+ }
+ }
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) &&
- allow_migrate &&
- RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
- }
+ case BlockedOnMsgThrowTo:
+ {
+ const StgInfoTable *i;
+
+ i = lockClosure(tso->block_info.closure);
+ unlockClosure(tso->block_info.closure, i);
+ if (i != &stg_MSG_NULL_info) {
+ debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
+ (lnat)tso->id, tso->block_info.throwto->header.info);
+ return;
+ }
- tso->cap = cap;
- appendToRunQueue(cap,tso);
+ // remove the block frame from the stack
+ ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
+ tso->sp += 3;
+ goto unblock;
+ }
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(cap, tso->cap, tso);
- }
-#else
- appendToRunQueue(cap,tso);
+ case BlockedOnBlackHole:
+ case BlockedOnSTM:
+ case ThreadMigrating:
+ goto unblock;
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
-#endif
+ default:
+ // otherwise, do nothing
+ return;
+ }
- postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
+unblock:
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+}
- debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
- (long)tso->id, tso->cap->no);
+/* ----------------------------------------------------------------------------
+ migrateThread
+ ------------------------------------------------------------------------- */
- return next;
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+ traceEventMigrateThread (from, tso, to->no);
+ // ThreadMigrating tells the target cap that it needs to be added to
+ // the run queue when it receives the MSG_TRY_WAKEUP.
+ tso->why_blocked = ThreadMigrating;
+ tso->cap = to;
+ tryWakeupThread(from, tso);
}
/* ----------------------------------------------------------------------------
------------------------------------------------------------------------- */
void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
+wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
{
- while (tso != END_TSO_QUEUE) {
- tso = unblockOne(cap,tso);
+ MessageBlackHole *msg;
+ const StgInfoTable *i;
+
+ ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
+ bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
+
+ for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
+ msg = msg->link) {
+ i = msg->header.info;
+ if (i != &stg_IND_info) {
+ ASSERT(i == &stg_MSG_BLACKHOLE_info);
+ tryWakeupThread(cap,msg->tso);
+ }
+ }
+
+ // overwrite the BQ with an indirection so it will be
+ // collected at the next GC.
+#if defined(DEBUG) && !defined(THREADED_RTS)
+ // XXX FILL_SLOP, but not if THREADED_RTS because in that case
+ // another thread might be looking at this BLOCKING_QUEUE and
+ // checking the owner field at the same time.
+ bq->bh = 0; bq->queue = 0; bq->owner = 0;
+#endif
+ OVERWRITE_INFO(bq, &stg_IND_info);
+}
+
+// If we update a closure that we know we BLACKHOLE'd, and the closure
+// no longer points to the current TSO as its owner, then there may be
+// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
+// it. We therefore traverse the BLOCKING_QUEUEs attached to the
+// current TSO to see if any can now be woken up.
+void
+checkBlockingQueues (Capability *cap, StgTSO *tso)
+{
+ StgBlockingQueue *bq, *next;
+ StgClosure *p;
+
+ debugTraceCap(DEBUG_sched, cap,
+ "collision occurred; checking blocking queues for thread %ld",
+ (lnat)tso->id);
+
+ for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
+ next = bq->link;
+
+ if (bq->header.info == &stg_IND_info) {
+ // ToDo: could short it out right here, to avoid
+ // traversing this IND multiple times.
+ continue;
+ }
+
+ p = bq->bh;
+
+ if (p->header.info != &stg_BLACKHOLE_info ||
+ ((StgInd *)p)->indirectee != (StgClosure*)bq)
+ {
+ wakeBlockingQueue(cap,bq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------------------
+ updateThunk
+
+ Update a thunk with a value. In order to do this, we need to know
+ which TSO owns (or is evaluating) the thunk, in case we need to
+ awaken any threads that are blocked on it.
+ ------------------------------------------------------------------------- */
+
+void
+updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
+{
+ StgClosure *v;
+ StgTSO *owner;
+ const StgInfoTable *i;
+
+ i = thunk->header.info;
+ if (i != &stg_BLACKHOLE_info &&
+ i != &stg_CAF_BLACKHOLE_info &&
+ i != &__stg_EAGER_BLACKHOLE_info &&
+ i != &stg_WHITEHOLE_info) {
+ updateWithIndirection(cap, thunk, val);
+ return;
+ }
+
+ v = ((StgInd*)thunk)->indirectee;
+
+ updateWithIndirection(cap, thunk, val);
+
+ i = v->header.info;
+ if (i == &stg_TSO_info) {
+ owner = deRefTSO((StgTSO*)v);
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ }
+ return;
+ }
+
+ if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
+ i != &stg_BLOCKING_QUEUE_DIRTY_info) {
+ checkBlockingQueues(cap, tso);
+ return;
+ }
+
+ owner = deRefTSO(((StgBlockingQueue*)v)->owner);
+
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ } else {
+ wakeBlockingQueue(cap, (StgBlockingQueue*)v);
}
}
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
- case BlockedOnException:
- debugBelch("is blocked on delivering an exception to thread %lu",
- (unsigned long)tso->block_info.tso->id);
- break;
case BlockedOnBlackHole:
- debugBelch("is blocked on a black hole");
+ debugBelch("is blocked on a black hole %p",
+ ((StgBlockingQueue*)tso->block_info.bh->bh));
+ break;
+ case BlockedOnMsgThrowTo:
+ debugBelch("is blocked on a throwto message");
break;
case NotBlocked:
debugBelch("is not blocked");
break;
+ case ThreadMigrating:
+ debugBelch("is runnable, but not on the run queue");
+ break;
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;
}
}
+
void
printThreadStatus(StgTSO *t)
{
default:
printThreadBlockage(t);
}
- if (t->flags & TSO_DIRTY) {
+ if (t->dirty) {
debugBelch(" (TSO_DIRTY)");
} else if (t->flags & TSO_LINK_DIRTY) {
debugBelch(" (TSO_LINK_DIRTY)");
printAllThreads(void)
{
StgTSO *t, *next;
- nat i, s;
+ nat i, g;
Capability *cap;
-# if defined(GRAN)
- char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
- ullong_format_string(TIME_ON_PROC(CurrentProc),
- time_string, rtsFalse/*no commas!*/);
-
- debugBelch("all threads at [%s]:\n", time_string);
-# elif defined(PARALLEL_HASKELL)
- char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
- ullong_format_string(CURRENT_TIME,
- time_string, rtsFalse/*no commas!*/);
-
- debugBelch("all threads at [%s]:\n", time_string);
-# else
debugBelch("all threads:\n");
-# endif
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
}
debugBelch("other threads:\n");
- for (s = 0; s < total_steps; s++) {
- for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
if (t->why_blocked != NotBlocked) {
printThreadStatus(t);
}