// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
#else
- awaitEvent( EMPTY_RUN_QUEUE() );
+ awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
#endif
+ }
}
/*
ngoq Dogh!
ASSERT(procStatus[CurrentProc]==Busy ||
- ((procStatus[Curren
+ ((procStatus[CurrentProc]==Fetching) &&
+ (t->block_info.closure!=(StgClosure*)NULL)));
+ if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
+ !(!RtsFlags.GranFlags.DoAsyncFetch &&
+ procStatus[CurrentProc]==Fetching))
+ procStatus[CurrentProc] = Idle;
+ */
+#elif defined(PAR)
+ IF_DEBUG(scheduler,
+ debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
+ t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
+ IF_PAR_DEBUG(bq,
+
+ if (t->block_info.closure!=(StgClosure*)NULL)
+ print_bq(t->block_info.closure));
+
+ /* Send a fetch (if BlockedOnGA) and dump event to log file */
+ blockThread(t);
+
+ /* whatever we schedule next, we must log that schedule */
+ emitSchedule = rtsTrue;
+
+#else /* !GRAN */
+
+ // We don't need to do anything. The thread is blocked, and it
+ // has tidied up its stack and placed itself on whatever queue
+ // it needs to be on.
+
+#if !defined(SMP)
+ ASSERT(t->why_blocked != NotBlocked);
+ // This might not be true under SMP: we don't have
+ // exclusive access to this TSO, so someone might have
+ // woken it up by now. This actually happens: try
+ // conc023 +RTS -N2.
+#endif
+
+ IF_DEBUG(scheduler,
+ debugBelch("--<< thread %d (%s) stopped: ",
+ t->id, whatNext_strs[t->what_next]);
+ printThreadBlockage(t);
+ debugBelch("\n"));
+
+ /* Only for dumping event to log file
+ ToDo: do I need this in GranSim, too?
+ blockThread(t);
+ */
+#endif
+}
+
+/* -----------------------------------------------------------------------------
+ * Handle a thread that returned to the scheduler with ThreadFinished
+ * ASSUMES: sched_mutex
+ * -------------------------------------------------------------------------- */
+
+static rtsBool
+scheduleHandleThreadFinished( StgMainThread *mainThread
+ USED_WHEN_RTS_SUPPORTS_THREADS,
+ Capability *cap,
+ StgTSO *t )
+{
+ /* Need to check whether this was a main thread, and if so,
+ * return with the return value.
+ *
+ * We also end up here if the thread kills itself with an
+ * uncaught exception, see Exception.cmm.
+ */
+ IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
+ t->id, whatNext_strs[t->what_next]));
+
+#if defined(GRAN)
+ endThread(t, CurrentProc); // clean-up the thread
+#elif defined(PARALLEL_HASKELL)
+ /* For now all are advisory -- HWL */
+ //if(t->priority==AdvisoryPriority) ??
+ advisory_thread_count--; // JB: Caution with this counter, buggy!
+
+# if defined(DIST)
+ if(t->dist.priority==RevalPriority)
+ FinishReval(t);
+# endif
+
+# if defined(EDENOLD)
+ // the thread could still have an outport... (BUG)
+ if (t->eden.outport != -1) {
+ // delete the outport for the tso which has finished...
+ IF_PAR_DEBUG(eden_ports,
+ debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
+ t->eden.outport, t->id));
+ deleteOPT(t);
+ }
+ // thread still in the process (HEAVY BUG! since outport has just been closed...)
+ if (t->eden.epid != -1) {
+ IF_PAR_DEBUG(eden_ports,
+ debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
+ t->id, t->eden.epid));
+ removeTSOfromProcess(t);
+ }
+# endif
+
+# if defined(PAR)
+ if (RtsFlags.ParFlags.ParStats.Full &&
+ !RtsFlags.ParFlags.ParStats.Suppressed)
+ DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
+
+ // t->par only contains statistics: left out for now...
+ IF_PAR_DEBUG(fish,
+ debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
+ t->id,t,t->par.sparkname));
+# endif
+#endif // PARALLEL_HASKELL
+
+ //
+ // Check whether the thread that just completed was a main
+ // thread, and if so return with the result.
+ //
+ // There is an assumption here that all thread completion goes
+ // through this point; we need to make sure that if a thread
+ // ends up in the ThreadKilled state, that it stays on the run
+ // queue so it can be dealt with here.
+ //
+ if (
+#if defined(RTS_SUPPORTS_THREADS)
+ mainThread != NULL
+#else
+ mainThread->tso == t
+#endif
+ )
+ {
+ // We are a bound thread: this must be our thread that just
+ // completed.
+ ASSERT(mainThread->tso == t);
+
+ if (t->what_next == ThreadComplete) {
+ if (mainThread->ret) {
+ // NOTE: return val is tso->sp[1] (see StgStartup.hc)
+ *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
+ }
+ mainThread->stat = Success;
+ } else {
+ if (mainThread->ret) {
+ *(mainThread->ret) = NULL;
+ }
+ if (interrupted) {
+ mainThread->stat = Interrupted;
+ } else {
+ mainThread->stat = Killed;
+ }
+ }
+#ifdef DEBUG
+ removeThreadLabel((StgWord)mainThread->tso->id);
+#endif
+ if (mainThread->prev == NULL) {
+ ASSERT(mainThread == main_threads);
+ main_threads = mainThread->link;
+ } else {
+ mainThread->prev->link = mainThread->link;
+ }
+ if (mainThread->link != NULL) {
+ mainThread->link->prev = mainThread->prev;
+ }
+ releaseCapability(cap);
+ return rtsTrue; // tells schedule() to return
+ }
+
+#ifdef RTS_SUPPORTS_THREADS
+ ASSERT(t->main == NULL);
+#else
+ if (t->main != NULL) {
+ // Must be a main thread that is not the topmost one. Leave
+ // it on the run queue until the stack has unwound to the
+ // point where we can deal with this. Leaving it on the run
+ // queue also ensures that the garbage collector knows about
+ // this thread and its return value (it gets dropped from the
+ // all_threads list so there's no other way to find it).
+ APPEND_TO_RUN_QUEUE(t);
+ }
+#endif
+ return rtsFalse;
+}
+
+/* -----------------------------------------------------------------------------
+ * Perform a heap census, if PROFILING
+ * -------------------------------------------------------------------------- */
+
+static rtsBool
+scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
+{
+#if defined(PROFILING)
+ // When we have +RTS -i0 and we're heap profiling, do a census at
+ // every GC. This lets us get repeatable runs for debugging.
+ if (performHeapProfile ||
+ (RtsFlags.ProfFlags.profileInterval==0 &&
+ RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
+ GarbageCollect(GetRoots, rtsTrue);
+ heapCensus();
+ performHeapProfile = rtsFalse;
+ return rtsTrue; // true <=> we already GC'd
+ }
+#endif
+ return rtsFalse;
+}
+
+/* -----------------------------------------------------------------------------
+ * Perform a garbage collection if necessary
+ * ASSUMES: sched_mutex
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleDoGC( rtsBool force_major )
+{
+ StgTSO *t;
+#ifdef SMP
+ Capability *cap;
+ static rtsBool waiting_for_gc;
+ int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
+ // subtract one because we're already holding one.
+ Capability *caps[n_capabilities];
+#endif
+
+#ifdef SMP
+ // In order to GC, there must be no threads running Haskell code.
+ // Therefore, the GC thread needs to hold *all* the capabilities,
+ // and release them after the GC has completed.
+ //
+ // This seems to be the simplest way: previous attempts involved
+ // making all the threads with capabilities give up their
+ // capabilities and sleep except for the *last* one, which
+ // actually did the GC. But it's quite hard to arrange for all
+ // the other tasks to sleep and stay asleep.
+ //
+
+ // Someone else is already trying to GC
+ if (waiting_for_gc) return;
+ waiting_for_gc = rtsTrue;
+
+ while (n_capabilities > 0) {
+ IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
+ waitForReturnCapability(&sched_mutex, &cap);
+ n_capabilities--;
+ caps[n_capabilities] = cap;
+ }
+
+ waiting_for_gc = rtsFalse;
+#endif
+
+ /* Kick any transactions which are invalid back to their
+ * atomically frames. When next scheduled they will try to
+ * commit, this commit will fail and they will retry.
+ */
+ for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+ if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateTransaction (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
+
+#ifdef REG_R1
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+#endif
+ }
+ }
+ }
+
+ // so this happens periodically:
+ scheduleCheckBlackHoles();
+
+ IF_DEBUG(scheduler, printAllThreads());
+
+ /* everybody back, start the GC.
+ * Could do it in this thread, or signal a condition var
+ * to do it in another thread. Either way, we need to
+ * broadcast on gc_pending_cond afterward.
+ */
+#if defined(RTS_SUPPORTS_THREADS)
+ IF_DEBUG(scheduler,sched_belch("doing GC"));
+#endif
+ GarbageCollect(GetRoots, force_major);
+
+#if defined(SMP)
+ {
+ // release our stash of capabilities.
+ nat i;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
+ releaseCapability(caps[i]);
+ }
+ }
+#endif
+
+#if defined(GRAN)
+ /* add a ContinueThread event to continue execution of current thread */
+ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
+ ContinueThread,
+ t, (StgClosure*)NULL, (rtsSpark*)NULL);
+ IF_GRAN_DEBUG(bq,
+ debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
+ G_EVENTQ(0);
+ G_CURR_THREADQ(0));
+#endif /* GRAN */
+}
+
+/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#if defined(RTS_SUPPORTS_THREADS)
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
+{
+#if defined(RTS_SUPPORTS_THREADS)
+ return (tso->main != NULL);
+#endif
+ return rtsFalse;
+}
+
+/* ---------------------------------------------------------------------------
+ * Singleton fork(). Do not copy any running threads.
+ * ------------------------------------------------------------------------- */
+
+#ifndef mingw32_HOST_OS
+#define FORKPROCESS_PRIMOP_SUPPORTED
+#endif
+
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+static void
+deleteThreadImmediately(StgTSO *tso);
+#endif
+StgInt
+forkProcess(HsStablePtr *entry
+#ifndef FORKPROCESS_PRIMOP_SUPPORTED
+ STG_UNUSED
+#endif
+ )
+{
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+ pid_t pid;
+ StgTSO* t,*next;
+ StgMainThread *m;
+ SchedulerStatus rc;
+
+ IF_DEBUG(scheduler,sched_belch("forking!"));
+ rts_lock(); // This not only acquires sched_mutex, it also
+ // makes sure that no other threads are running
+
+ pid = fork();
+
+ if (pid) { /* parent */
+
+ /* just return the pid */
+ rts_unlock();
+ return pid;
+
+ } else { /* child */
+
+
+ // delete all threads
+ run_queue_hd = run_queue_tl = END_TSO_QUEUE;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+
+ // don't allow threads to catch the ThreadKilled exception
+ deleteThreadImmediately(t);
+ }
+
+ // wipe the main thread list
+ while((m = main_threads) != NULL) {
+ main_threads = m->link;
+# ifdef THREADED_RTS
+ closeCondition(&m->bound_thread_cond);
+# endif
+ stgFree(m);
+ }
+
+ rc = rts_evalStableIO(entry, NULL); // run the action
+ rts_checkSchedStatus("forkProcess",rc);
+
+ rts_unlock();
+
+ hs_exit(); // clean up and exit
+ stg_exit(0);
+ }
+#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
+ barf("forkProcess#: primop not supported, sorry!\n");
+ return -1;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * deleteAllThreads(): kill all the live threads.
+ *
+ * This is used when we catch a user interrupt (^C), before performing
+ * any necessary cleanups and running finalizers.
+ *
+ * Locks: sched_mutex held.
+ * ------------------------------------------------------------------------- */
+
+void
+deleteAllThreads ( void )
+{
+ StgTSO* t, *next;
+ IF_DEBUG(scheduler,sched_belch("deleting all threads"));
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ deleteThread(t);
+ }
+ }
+
+ // The run queue now contains a bunch of ThreadKilled threads. We
+ // must not throw these away: the main thread(s) will be in there
+ // somewhere, and the main scheduler loop has to deal with it.
+ // Also, the run queue is the only thing keeping these threads from
+ // being GC'd, and we don't want the "main thread has been GC'd" panic.
+
+ ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+ ASSERT(blackhole_queue == END_TSO_QUEUE);
+ ASSERT(sleeping_queue == END_TSO_QUEUE);
+}
+
+/* startThread and insertThread are now in GranSim.c -- HWL */
+
+
+/* ---------------------------------------------------------------------------
+ * Suspending & resuming Haskell threads.
+ *
+ * When making a "safe" call to C (aka _ccall_GC), the task gives back
+ * its capability before calling the C function. This allows another
+ * task to pick up the capability and carry on running Haskell
+ * threads. It also means that if the C call blocks, it won't lock
+ * the whole system.
+ *
+ * The Haskell thread making the C call is put to sleep for the
+ * duration of the call, on the susepended_ccalling_threads queue. We
+ * give out a token to the task, which it can use to resume the thread
+ * on return from the C function.
+ * ------------------------------------------------------------------------- */
+
+StgInt
+suspendThread( StgRegTable *reg )
+{
+ nat tok;
+ Capability *cap;
+ int saved_errno = errno;
+
+ /* assume that *reg is a pointer to the StgRegTable part
+ * of a Capability.
+ */
+ cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
+
+ ACQUIRE_LOCK(&sched_mutex);
+
+ IF_DEBUG(scheduler,
+ sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
+
+ // XXX this might not be necessary --SDM
+ cap->r.rCurrentTSO->what_next = ThreadRunGHC;
+
+ threadPaused(cap->r.rCurrentTSO);
+ cap->r.rCurrentTSO->link = suspended_ccalling_threads;
+ suspended_ccalling_threads = cap->r.rCurrentTSO;
+
+ if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
+ cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
+ cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
+ } else {
+ cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
+ }
+
+ /* Use the thread ID as the token; it should be unique */
+ tok = cap->r.rCurrentTSO->id;
+
+ /* Hand back capability */
+ cap->r.rInHaskell = rtsFalse;
+ releaseCapability(cap);
+
+#if defined(RTS_SUPPORTS_THREADS)
+ /* Preparing to leave the RTS, so ensure there's a native thread/task
+ waiting to take over.
+ */
+ IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
+#endif
+
+ RELEASE_LOCK(&sched_mutex);
+
+ errno = saved_errno;
+ return tok;
+}
+
+StgRegTable *
+resumeThread( StgInt tok )
+{
+ StgTSO *tso, **prev;
+ Capability *cap;
+ int saved_errno = errno;
+
+#if defined(RTS_SUPPORTS_THREADS)
+ /* Wait for permission to re-enter the RTS with the result. */
+ ACQUIRE_LOCK(&sched_mutex);
+ waitForReturnCapability(&sched_mutex, &cap);
+
+ IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
+#else
+ grabCapability(&cap);
+#endif
+
+ /* Remove the thread off of the suspended list */
+ prev = &suspended_ccalling_threads;
+ for (tso = suspended_ccalling_threads;
+ tso != END_TSO_QUEUE;
+ prev = &tso->link, tso = tso->link) {
+ if (tso->id == (StgThreadID)tok) {
+ *prev = tso->link;
+ break;
+ }
+ }
+ if (tso == END_TSO_QUEUE) {
+ barf("resumeThread: thread not found");
+ }
+ tso->link = END_TSO_QUEUE;
+
+ if(tso->why_blocked == BlockedOnCCall) {
+ awakenBlockedQueueNoLock(tso->blocked_exceptions);
+ tso->blocked_exceptions = NULL;
+ }
+
+ /* Reset blocking status */
+ tso->why_blocked = NotBlocked;
+
+ cap->r.rCurrentTSO = tso;
+ cap->r.rInHaskell = rtsTrue;
+ RELEASE_LOCK(&sched_mutex);
+ errno = saved_errno;
+ return &cap->r;
+}
+
+/* ---------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * ------------------------------------------------------------------------ */
+
+int
+cmp_thread(StgPtr tso1, StgPtr tso2)
+{
+ StgThreadID id1 = ((StgTSO *)tso1)->id;
+ StgThreadID id2 = ((StgTSO *)tso2)->id;
+
+ if (id1 < id2) return (-1);
+ if (id1 > id2) return 1;
+ return 0;
+}
+
+/* ---------------------------------------------------------------------------
+ * Fetching the ThreadID from an StgTSO.
+ *
+ * This is used in the implementation of Show for ThreadIds.
+ * ------------------------------------------------------------------------ */
+int
+rts_getThreadId(StgPtr tso)
+{
+ return ((StgTSO *)tso)->id;
+}
+
+#ifdef DEBUG
+void
+labelThread(StgPtr tso, char *label)
+{
+ int len;
+ void *buf;
+
+ /* Caveat: Once set, you can only set the thread name to "" */
+ len = strlen(label)+1;
+ buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
+ strncpy(buf,label,len);
+ /* Update will free the old memory for us */
+ updateThreadLabel(((StgTSO *)tso)->id,buf);
+}
+#endif /* DEBUG */
+
+/* ---------------------------------------------------------------------------
+ Create a new thread.
+
+ The new thread starts with the given stack size. Before the
+ scheduler can run, however, this thread needs to have a closure
+ (and possibly some arguments) pushed on its stack. See
+ pushClosure() in Schedule.h.
+
+ createGenThread() and createIOThread() (in SchedAPI.h) are
+ convenient packaged versions of this function.
+
+ currently pri (priority) is only used in a GRAN setup -- HWL
+ ------------------------------------------------------------------------ */
+#if defined(GRAN)
+/* currently pri (priority) is only used in a GRAN setup -- HWL */
+StgTSO *
+createThread(nat size, StgInt pri)
+#else
+StgTSO *
+createThread(nat size)
+#endif
+{
+
+ StgTSO *tso;
+ nat stack_size;
+
+ /* First check whether we should create a thread at all */
+#if defined(PARALLEL_HASKELL)
+ /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+ threadsIgnored++;
+ debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ threadsCreated++;
+#endif
+
+#if defined(GRAN)
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+#endif
+
+ // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
+
+ /* catch ridiculously small stack sizes */
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ }
+
+ stack_size = size - TSO_STRUCT_SIZEW;
+
+ tso = (StgTSO *)allocate(size);
+ TICK_ALLOC_TSO(stack_size, 0);
+
+ SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+#if defined(GRAN)
+ SET_GRAN_HDR(tso, ThisPE);
+#endif
+
+ // Always start with the compiled code evaluator
+ tso->what_next = ThreadRunGHC;
+
+ tso->id = next_thread_id++;
+ tso->why_blocked = NotBlocked;
+ tso->blocked_exceptions = NULL;
+
+ tso->saved_errno = 0;
+ tso->main = NULL;
+
+ tso->stack_size = stack_size;
+ tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
+ - TSO_STRUCT_SIZEW;
+ tso->sp = (P_)&(tso->stack) + stack_size;
+
+ tso->trec = NO_TREC;
+
+#ifdef PROFILING
+ tso->prof.CCCS = CCS_MAIN;
+#endif
+
+ /* put a stop frame on the stack */
+ tso->sp -= sizeofW(StgStopFrame);
+ SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+ tso->link = END_TSO_QUEUE;
+
+ // ToDo: check this
+#if defined(GRAN)
+ /* uses more flexible routine in GranSim */
+ insertThread(tso, CurrentProc);
+#else
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
* from its creation
*/
#endif
{
switch (tso->what_next) {
case ThreadKilled:
- debugBelch("has
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(tso);
+ }
+}
+
+void
+printAllThreads(void)
+{
+ StgTSO *t;
+
+# if defined(GRAN)
+ char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+ ullong_format_string(TIME_ON_PROC(CurrentProc),
+ time_string, rtsFalse/*no commas!*/);
+
+ debugBelch("all threads at [%s]:\n", time_string);
+# elif defined(PARALLEL_HASKELL)
+ char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+ ullong_format_string(CURRENT_TIME,
+ time_string, rtsFalse/*no commas!*/);
+
+ debugBelch("all threads at [%s]:\n", time_string);
+# else
+ debugBelch("all threads:\n");
+# endif
+
+ for (t = all_threads; t != END_TSO_QUEUE; ) {
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+#if defined(DEBUG)
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+#endif
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ t = t->link;
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ t = t->global_link;
+ }
+ }
+}
+
+#ifdef DEBUG
+
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ }
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
+/*
+ Print a whole blocking queue attached to node (debugging only).
+*/
+# if defined(PARALLEL_HASKELL)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ StgTSO *tso;
+ rtsBool end;
+
+ debugBelch("## BQ of closure %p (%s): ",
+ node, info_type(node));
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH ||
+ get_itbl(node)->type == MVAR);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+
+ print_bqe(((StgBlockingQueue*)node)->blocking_queue);
+}
+
+/*
+ Print a whole blocking queue starting with the element bqe.
+*/
+void
+print_bqe (StgBlockingQueueElement *bqe)
+{
+ rtsBool end;
+
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
+ bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == BLOCKED_FETCH ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ debugBelch(" TSO %u (%x),",
+ ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
+ break;
+ case BLOCKED_FETCH:
+ debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
+ ((StgBlockedFetch *)bqe)->node,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
+ ((StgBlockedFetch *)bqe)->ga.weight);
+ break;
+ case CONSTR:
+ debugBelch(" %s (IP %p),",
+ (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
+ info_type((StgClosure *)bqe)); // , node, info_type(node));
+ break;
+ }
+ } /* for */
+ debugBelch("\n");
+}
+# elif defined(GRAN)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ PEs node_loc, tso_loc;
+ rtsBool end;
/* should cover all closures that may have a blocking queue */
ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||