+/* -----------------------------------------------------------------------------
+ Unblock a thread
+
+ This is for use when we raise an exception in another thread, which
+ may be blocked.
+ This has nothing to do with the UnblockThread event in GranSim. -- HWL
+ -------------------------------------------------------------------------- */
+
+static void
+unblockThread(StgTSO *tso)
+{
+ StgTSO *t, **last;
+
+ ACQUIRE_LOCK(&sched_mutex);
+ switch (tso->why_blocked) {
+
+ case NotBlocked:
+ return; /* not blocked */
+
+ case BlockedOnMVar:
+ ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
+ {
+ StgTSO *last_tso = END_TSO_QUEUE;
+ StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
+
+ last = &mvar->head;
+ for (t = mvar->head; t != END_TSO_QUEUE;
+ last = &t->link, last_tso = t, t = t->link) {
+ if (t == tso) {
+ *last = tso->link;
+ if (mvar->tail == tso) {
+ mvar->tail = last_tso;
+ }
+ goto done;
+ }
+ }
+ barf("unblockThread (MVAR): TSO not found");
+ }
+
+ case BlockedOnBlackHole:
+ ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
+
+ last = &bq->blocking_queue;
+ for (t = bq->blocking_queue; t != END_TSO_QUEUE;
+ last = &t->link, t = t->link) {
+ if (t == tso) {
+ *last = tso->link;
+ goto done;
+ }
+ }
+ barf("unblockThread (BLACKHOLE): TSO not found");
+ }
+
+ case BlockedOnException:
+ {
+ StgTSO *target = tso->block_info.tso;
+
+ ASSERT(get_itbl(target)->type == TSO);
+ ASSERT(target->blocked_exceptions != NULL);
+
+ last = &target->blocked_exceptions;
+ for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
+ last = &t->link, t = t->link) {
+ ASSERT(get_itbl(t)->type == TSO);
+ if (t == tso) {
+ *last = tso->link;
+ goto done;
+ }
+ }
+ barf("unblockThread (Exception): TSO not found");
+ }
+
+ case BlockedOnDelay:
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ {
+ StgTSO *prev = NULL;
+ for (t = blocked_queue_hd; t != END_TSO_QUEUE;
+ prev = t, t = t->link) {
+ if (t == tso) {
+ if (prev == NULL) {
+ blocked_queue_hd = t->link;
+ if (blocked_queue_tl == t) {
+ blocked_queue_tl = END_TSO_QUEUE;
+ }
+ } else {
+ prev->link = t->link;
+ if (blocked_queue_tl == t) {
+ blocked_queue_tl = prev;
+ }
+ }
+ goto done;
+ }
+ }
+ barf("unblockThread (I/O): TSO not found");
+ }
+
+ default:
+ barf("unblockThread");
+ }
+
+ done:
+ tso->link = END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
+ tso->block_info.closure = NULL;
+ PUSH_ON_RUN_QUEUE(tso);
+ RELEASE_LOCK(&sched_mutex);
+}
+
+/* -----------------------------------------------------------------------------
+ * raiseAsync()
+ *
+ * The following function implements the magic for raising an
+ * asynchronous exception in an existing thread.
+ *
+ * We first remove the thread from any queue on which it might be
+ * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ *
+ * We strip the stack down to the innermost CATCH_FRAME, building
+ * thunks in the heap for all the active computations, so they can
+ * be restarted if necessary. When we reach a CATCH_FRAME, we build
+ * an application of the handler to the exception, and push it on
+ * the top of the stack.
+ *
+ * How exactly do we save all the active computations? We create an
+ * AP_UPD for every UpdateFrame on the stack. Entering one of these
+ * AP_UPDs pushes everything from the corresponding update frame
+ * upwards onto the stack. (Actually, it pushes everything up to the
+ * next update frame plus a pointer to the next AP_UPD object.
+ * Entering the next AP_UPD object pushes more onto the stack until we
+ * reach the last AP_UPD object - at which point the stack should look
+ * exactly as it did when we killed the TSO and we can continue
+ * execution by entering the closure on top of the stack.
+ *
+ * We can also kill a thread entirely - this happens if either (a) the
+ * exception passed to raiseAsync is NULL, or (b) there's no
+ * CATCH_FRAME on the stack. In either case, we strip the entire
+ * stack and replace the thread with a zombie.
+ *
+ * -------------------------------------------------------------------------- */
+
+void
+deleteThread(StgTSO *tso)
+{
+ raiseAsync(tso,NULL);
+}
+
+void
+raiseAsync(StgTSO *tso, StgClosure *exception)
+{
+ StgUpdateFrame* su = tso->su;
+ StgPtr sp = tso->sp;
+
+ /* Thread already dead? */
+ if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
+ return;
+ }
+
+ IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
+
+ /* Remove it from any blocking queues */
+ unblockThread(tso);
+
+ /* The stack freezing code assumes there's a closure pointer on
+ * the top of the stack. This isn't always the case with compiled
+ * code, so we have to push a dummy closure on the top which just
+ * returns to the next return address on the stack.
+ */
+ if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
+ *(--sp) = (W_)&dummy_ret_closure;
+ }
+
+ while (1) {
+ int words = ((P_)su - (P_)sp) - 1;
+ nat i;
+ StgAP_UPD * ap;
+
+ /* If we find a CATCH_FRAME, and we've got an exception to raise,
+ * then build PAP(handler,exception,realworld#), and leave it on
+ * top of the stack ready to enter.
+ */
+ if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
+ StgCatchFrame *cf = (StgCatchFrame *)su;
+ /* we've got an exception to raise, so let's pass it to the
+ * handler in this frame.
+ */
+ ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
+ TICK_ALLOC_UPD_PAP(3,0);
+ SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
+
+ ap->n_args = 2;
+ ap->fun = cf->handler; /* :: Exception -> IO a */
+ ap->payload[0] = (P_)exception;
+ ap->payload[1] = ARG_TAG(0); /* realworld token */
+
+ /* throw away the stack from Sp up to and including the
+ * CATCH_FRAME.
+ */
+ sp = (P_)su + sizeofW(StgCatchFrame) - 1;
+ tso->su = cf->link;
+
+ /* Restore the blocked/unblocked state for asynchronous exceptions
+ * at the CATCH_FRAME.
+ *
+ * If exceptions were unblocked at the catch, arrange that they
+ * are unblocked again after executing the handler by pushing an
+ * unblockAsyncExceptions_ret stack frame.
+ */
+ if (!cf->exceptions_blocked) {
+ *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
+ }
+
+ /* Ensure that async exceptions are blocked when running the handler.
+ */
+ if (tso->blocked_exceptions == NULL) {
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ }
+
+ /* Put the newly-built PAP on top of the stack, ready to execute
+ * when the thread restarts.
+ */
+ sp[0] = (W_)ap;
+ tso->sp = sp;
+ tso->whatNext = ThreadEnterGHC;
+ return;
+ }
+
+ /* First build an AP_UPD consisting of the stack chunk above the
+ * current update frame, with the top word on the stack as the
+ * fun field.
+ */
+ ap = (StgAP_UPD *)allocate(AP_sizeW(words));
+
+ ASSERT(words >= 0);
+
+ ap->n_args = words;
+ ap->fun = (StgClosure *)sp[0];
+ sp++;
+ for(i=0; i < (nat)words; ++i) {
+ ap->payload[i] = (P_)*sp++;
+ }
+
+ switch (get_itbl(su)->type) {
+
+ case UPDATE_FRAME:
+ {
+ SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
+ TICK_ALLOC_UP_THK(words+1,0);
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "scheduler: Updating ");
+ printPtr((P_)su->updatee);
+ fprintf(stderr, " with ");
+ printObj((StgClosure *)ap);
+ );
+
+ /* Replace the updatee with an indirection - happily
+ * this will also wake up any threads currently
+ * waiting on the result.
+ */
+ UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
+ su = su->link;
+ sp += sizeofW(StgUpdateFrame) -1;
+ sp[0] = (W_)ap; /* push onto stack */
+ break;
+ }
+
+ case CATCH_FRAME:
+ {
+ StgCatchFrame *cf = (StgCatchFrame *)su;
+ StgClosure* o;
+
+ /* We want a PAP, not an AP_UPD. Fortunately, the
+ * layout's the same.
+ */
+ SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
+ TICK_ALLOC_UPD_PAP(words+1,0);
+
+ /* now build o = FUN(catch,ap,handler) */
+ o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
+ TICK_ALLOC_FUN(2,0);
+ SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
+ o->payload[0] = (StgClosure *)ap;
+ o->payload[1] = cf->handler;
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "scheduler: Built ");
+ printObj((StgClosure *)o);
+ );
+
+ /* pop the old handler and put o on the stack */
+ su = cf->link;
+ sp += sizeofW(StgCatchFrame) - 1;
+ sp[0] = (W_)o;
+ break;
+ }
+
+ case SEQ_FRAME:
+ {
+ StgSeqFrame *sf = (StgSeqFrame *)su;
+ StgClosure* o;
+
+ SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
+ TICK_ALLOC_UPD_PAP(words+1,0);
+
+ /* now build o = FUN(seq,ap) */
+ o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
+ TICK_ALLOC_SE_THK(1,0);
+ SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
+ payloadCPtr(o,0) = (StgClosure *)ap;
+
+ IF_DEBUG(scheduler,
+ fprintf(stderr, "scheduler: Built ");
+ printObj((StgClosure *)o);
+ );
+
+ /* pop the old handler and put o on the stack */
+ su = sf->link;
+ sp += sizeofW(StgSeqFrame) - 1;
+ sp[0] = (W_)o;
+ break;
+ }
+
+ case STOP_FRAME:
+ /* We've stripped the entire stack, the thread is now dead. */
+ sp += sizeofW(StgStopFrame) - 1;
+ sp[0] = (W_)exception; /* save the exception */
+ tso->whatNext = ThreadKilled;
+ tso->su = (StgUpdateFrame *)(sp+1);
+ tso->sp = sp;
+ return;
+
+ default:
+ barf("raiseAsync");
+ }
+ }
+ barf("raiseAsync");
+}
+
+//@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
+//@subsection Debugging Routines
+
+/* -----------------------------------------------------------------------------
+ Debugging: why is a thread blocked
+ -------------------------------------------------------------------------- */
+
+#ifdef DEBUG
+
+void printThreadBlockage(StgTSO *tso)
+{
+ switch (tso->why_blocked) {
+ case BlockedOnRead:
+ fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
+ break;
+ case BlockedOnWrite:
+ fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
+ break;
+ case BlockedOnDelay:
+ fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
+ break;
+ case BlockedOnMVar:
+ fprintf(stderr,"blocked on an MVar");
+ break;
+ case BlockedOnException:
+ fprintf(stderr,"blocked on delivering an exception to thread %d",
+ tso->block_info.tso->id);
+ break;
+ case BlockedOnBlackHole:
+ fprintf(stderr,"blocked on a black hole");
+ break;
+ case NotBlocked:
+ fprintf(stderr,"not blocked");
+ break;
+#if defined(PAR)
+ case BlockedOnGA:
+ fprintf(stderr,"blocked on global address");
+ break;
+#endif
+ }
+}
+
+/*
+ Print a whole blocking queue attached to node (debugging only).
+*/
+//@cindex print_bq
+# if defined(PAR)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ StgTSO *tso;
+ rtsBool end;
+
+ fprintf(stderr,"## BQ of closure %p (%s): ",
+ node, info_type(node));
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgTSO*)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == BLOCKED_FETCH ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ fprintf(stderr," TSO %d (%x),",
+ ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
+ break;
+ case BLOCKED_FETCH:
+ fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
+ ((StgBlockedFetch *)bqe)->node,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
+ ((StgBlockedFetch *)bqe)->ga.weight);
+ break;
+ case CONSTR:
+ fprintf(stderr," %s (IP %p),",
+ (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue of %p (%s)",
+ info_type(bqe), node, info_type(node));
+ break;
+ }
+ } /* for */
+ fputc('\n', stderr);
+}
+# elif defined(GRAN)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ StgTSO *tso;
+ PEs node_loc, tso_loc;
+ rtsBool end;
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+ node_loc = where_is(node);
+
+ fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
+ node, info_type(node), node_loc);
+
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgTSO*)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ tso_loc = where_is((StgClosure *)bqe);
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ fprintf(stderr," TSO %d (%x) on [PE %d],",
+ ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
+ break;
+ case CONSTR:
+ fprintf(stderr," %s (IP %p),",
+ (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue of %p (%s)",
+ info_type(bqe), node, info_type(node));
+ break;
+ }
+ } /* for */
+ fputc('\n', stderr);
+}
+#else
+/*
+ Nice and easy: only TSOs on the blocking queue
+*/
+void
+print_bq (StgClosure *node)
+{
+ StgTSO *tso;
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+ for (tso = ((StgBlockingQueue*)node)->blocking_queue;
+ tso != END_TSO_QUEUE;
+ tso=tso->link) {
+ ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
+ ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
+ fprintf(stderr," TSO %d (%p),", tso->id, tso);
+ }
+ fputc('\n', stderr);
+}
+# endif
+
+#if defined(PAR)
+static nat
+run_queue_len(void)
+{
+ nat i;
+ StgTSO *tso;
+
+ for (i=0, tso=run_queue_hd;
+ tso != END_TSO_QUEUE;
+ i++, tso=tso->link)
+ /* nothing */
+
+ return i;
+}
+#endif
+
+static void
+sched_belch(char *s, ...)
+{
+ va_list ap;
+ va_start(ap,s);
+#ifdef SMP
+ fprintf(stderr, "scheduler (task %ld): ", pthread_self());
+#else
+ fprintf(stderr, "scheduler: ");
+#endif
+ vfprintf(stderr, s, ap);
+ fprintf(stderr, "\n");
+}
+
+#endif /* DEBUG */
+
+//@node Index, , Debugging Routines, Main scheduling code
+//@subsection Index
+
+//@index
+//* MainRegTable:: @cindex\s-+MainRegTable
+//* StgMainThread:: @cindex\s-+StgMainThread
+//* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
+//* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
+//* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
+//* context_switch:: @cindex\s-+context_switch
+//* createThread:: @cindex\s-+createThread
+//* free_capabilities:: @cindex\s-+free_capabilities
+//* gc_pending_cond:: @cindex\s-+gc_pending_cond
+//* initScheduler:: @cindex\s-+initScheduler
+//* interrupted:: @cindex\s-+interrupted
+//* n_free_capabilities:: @cindex\s-+n_free_capabilities
+//* next_thread_id:: @cindex\s-+next_thread_id
+//* print_bq:: @cindex\s-+print_bq
+//* run_queue_hd:: @cindex\s-+run_queue_hd
+//* run_queue_tl:: @cindex\s-+run_queue_tl
+//* sched_mutex:: @cindex\s-+sched_mutex
+//* schedule:: @cindex\s-+schedule
+//* take_off_run_queue:: @cindex\s-+take_off_run_queue
+//* task_ids:: @cindex\s-+task_ids
+//* term_mutex:: @cindex\s-+term_mutex
+//* thread_ready_cond:: @cindex\s-+thread_ready_cond
+//@end index