From 6270f2b9ba3b65510df40a1cfef688098c0b234b Mon Sep 17 00:00:00 2001 From: simonmar Date: Wed, 25 May 2005 08:33:15 +0000 Subject: [PATCH] [project @ 2005-05-25 08:33:15 by simonmar] something very strange happened with previous commit; try again --- ghc/rts/Schedule.c | 848 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 845 insertions(+), 3 deletions(-) diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index dcd32da..e646679 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -841,8 +841,9 @@ scheduleCheckBlockedThreads(void) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); #else - awaitEvent( EMPTY_RUN_QUEUE() ); + awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); #endif + } } @@ -1675,7 +1676,696 @@ scheduleHandleThreadBlocked( StgTSO *t /* ngoq Dogh! ASSERT(procStatus[CurrentProc]==Busy || - ((procStatus[Curren + ((procStatus[CurrentProc]==Fetching) && + (t->block_info.closure!=(StgClosure*)NULL))); + if (run_queue_hds[CurrentProc] == END_TSO_QUEUE && + !(!RtsFlags.GranFlags.DoAsyncFetch && + procStatus[CurrentProc]==Fetching)) + procStatus[CurrentProc] = Idle; + */ +#elif defined(PAR) + IF_DEBUG(scheduler, + debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", + t->id, t, whatNext_strs[t->what_next], t->block_info.closure)); + IF_PAR_DEBUG(bq, + + if (t->block_info.closure!=(StgClosure*)NULL) + print_bq(t->block_info.closure)); + + /* Send a fetch (if BlockedOnGA) and dump event to log file */ + blockThread(t); + + /* whatever we schedule next, we must log that schedule */ + emitSchedule = rtsTrue; + +#else /* !GRAN */ + + // We don't need to do anything. The thread is blocked, and it + // has tidied up its stack and placed itself on whatever queue + // it needs to be on. + +#if !defined(SMP) + ASSERT(t->why_blocked != NotBlocked); + // This might not be true under SMP: we don't have + // exclusive access to this TSO, so someone might have + // woken it up by now. This actually happens: try + // conc023 +RTS -N2. +#endif + + IF_DEBUG(scheduler, + debugBelch("--<< thread %d (%s) stopped: ", + t->id, whatNext_strs[t->what_next]); + printThreadBlockage(t); + debugBelch("\n")); + + /* Only for dumping event to log file + ToDo: do I need this in GranSim, too? + blockThread(t); + */ +#endif +} + +/* ----------------------------------------------------------------------------- + * Handle a thread that returned to the scheduler with ThreadFinished + * ASSUMES: sched_mutex + * -------------------------------------------------------------------------- */ + +static rtsBool +scheduleHandleThreadFinished( StgMainThread *mainThread + USED_WHEN_RTS_SUPPORTS_THREADS, + Capability *cap, + StgTSO *t ) +{ + /* Need to check whether this was a main thread, and if so, + * return with the return value. + * + * We also end up here if the thread kills itself with an + * uncaught exception, see Exception.cmm. + */ + IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", + t->id, whatNext_strs[t->what_next])); + +#if defined(GRAN) + endThread(t, CurrentProc); // clean-up the thread +#elif defined(PARALLEL_HASKELL) + /* For now all are advisory -- HWL */ + //if(t->priority==AdvisoryPriority) ?? + advisory_thread_count--; // JB: Caution with this counter, buggy! + +# if defined(DIST) + if(t->dist.priority==RevalPriority) + FinishReval(t); +# endif + +# if defined(EDENOLD) + // the thread could still have an outport... (BUG) + if (t->eden.outport != -1) { + // delete the outport for the tso which has finished... + IF_PAR_DEBUG(eden_ports, + debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n", + t->eden.outport, t->id)); + deleteOPT(t); + } + // thread still in the process (HEAVY BUG! since outport has just been closed...) + if (t->eden.epid != -1) { + IF_PAR_DEBUG(eden_ports, + debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n", + t->id, t->eden.epid)); + removeTSOfromProcess(t); + } +# endif + +# if defined(PAR) + if (RtsFlags.ParFlags.ParStats.Full && + !RtsFlags.ParFlags.ParStats.Suppressed) + DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); + + // t->par only contains statistics: left out for now... + IF_PAR_DEBUG(fish, + debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n", + t->id,t,t->par.sparkname)); +# endif +#endif // PARALLEL_HASKELL + + // + // Check whether the thread that just completed was a main + // thread, and if so return with the result. + // + // There is an assumption here that all thread completion goes + // through this point; we need to make sure that if a thread + // ends up in the ThreadKilled state, that it stays on the run + // queue so it can be dealt with here. + // + if ( +#if defined(RTS_SUPPORTS_THREADS) + mainThread != NULL +#else + mainThread->tso == t +#endif + ) + { + // We are a bound thread: this must be our thread that just + // completed. + ASSERT(mainThread->tso == t); + + if (t->what_next == ThreadComplete) { + if (mainThread->ret) { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; + } + mainThread->stat = Success; + } else { + if (mainThread->ret) { + *(mainThread->ret) = NULL; + } + if (interrupted) { + mainThread->stat = Interrupted; + } else { + mainThread->stat = Killed; + } + } +#ifdef DEBUG + removeThreadLabel((StgWord)mainThread->tso->id); +#endif + if (mainThread->prev == NULL) { + ASSERT(mainThread == main_threads); + main_threads = mainThread->link; + } else { + mainThread->prev->link = mainThread->link; + } + if (mainThread->link != NULL) { + mainThread->link->prev = mainThread->prev; + } + releaseCapability(cap); + return rtsTrue; // tells schedule() to return + } + +#ifdef RTS_SUPPORTS_THREADS + ASSERT(t->main == NULL); +#else + if (t->main != NULL) { + // Must be a main thread that is not the topmost one. Leave + // it on the run queue until the stack has unwound to the + // point where we can deal with this. Leaving it on the run + // queue also ensures that the garbage collector knows about + // this thread and its return value (it gets dropped from the + // all_threads list so there's no other way to find it). + APPEND_TO_RUN_QUEUE(t); + } +#endif + return rtsFalse; +} + +/* ----------------------------------------------------------------------------- + * Perform a heap census, if PROFILING + * -------------------------------------------------------------------------- */ + +static rtsBool +scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) +{ +#if defined(PROFILING) + // When we have +RTS -i0 and we're heap profiling, do a census at + // every GC. This lets us get repeatable runs for debugging. + if (performHeapProfile || + (RtsFlags.ProfFlags.profileInterval==0 && + RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { + GarbageCollect(GetRoots, rtsTrue); + heapCensus(); + performHeapProfile = rtsFalse; + return rtsTrue; // true <=> we already GC'd + } +#endif + return rtsFalse; +} + +/* ----------------------------------------------------------------------------- + * Perform a garbage collection if necessary + * ASSUMES: sched_mutex + * -------------------------------------------------------------------------- */ + +static void +scheduleDoGC( rtsBool force_major ) +{ + StgTSO *t; +#ifdef SMP + Capability *cap; + static rtsBool waiting_for_gc; + int n_capabilities = RtsFlags.ParFlags.nNodes - 1; + // subtract one because we're already holding one. + Capability *caps[n_capabilities]; +#endif + +#ifdef SMP + // In order to GC, there must be no threads running Haskell code. + // Therefore, the GC thread needs to hold *all* the capabilities, + // and release them after the GC has completed. + // + // This seems to be the simplest way: previous attempts involved + // making all the threads with capabilities give up their + // capabilities and sleep except for the *last* one, which + // actually did the GC. But it's quite hard to arrange for all + // the other tasks to sleep and stay asleep. + // + + // Someone else is already trying to GC + if (waiting_for_gc) return; + waiting_for_gc = rtsTrue; + + while (n_capabilities > 0) { + IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); + waitForReturnCapability(&sched_mutex, &cap); + n_capabilities--; + caps[n_capabilities] = cap; + } + + waiting_for_gc = rtsFalse; +#endif + + /* Kick any transactions which are invalid back to their + * atomically frames. When next scheduled they will try to + * commit, this commit will fail and they will retry. + */ + for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { + if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateTransaction (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + +#ifdef REG_R1 + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +#endif + } + } + } + + // so this happens periodically: + scheduleCheckBlackHoles(); + + IF_DEBUG(scheduler, printAllThreads()); + + /* everybody back, start the GC. + * Could do it in this thread, or signal a condition var + * to do it in another thread. Either way, we need to + * broadcast on gc_pending_cond afterward. + */ +#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler,sched_belch("doing GC")); +#endif + GarbageCollect(GetRoots, force_major); + +#if defined(SMP) + { + // release our stash of capabilities. + nat i; + for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) { + releaseCapability(caps[i]); + } + } +#endif + +#if defined(GRAN) + /* add a ContinueThread event to continue execution of current thread */ + new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], + ContinueThread, + t, (StgClosure*)NULL, (rtsSpark*)NULL); + IF_GRAN_DEBUG(bq, + debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n"); + G_EVENTQ(0); + G_CURR_THREADQ(0)); +#endif /* GRAN */ +} + +/* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#if defined(RTS_SUPPORTS_THREADS) + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +{ +#if defined(RTS_SUPPORTS_THREADS) + return (tso->main != NULL); +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +#ifndef mingw32_HOST_OS +#define FORKPROCESS_PRIMOP_SUPPORTED +#endif + +#ifdef FORKPROCESS_PRIMOP_SUPPORTED +static void +deleteThreadImmediately(StgTSO *tso); +#endif +StgInt +forkProcess(HsStablePtr *entry +#ifndef FORKPROCESS_PRIMOP_SUPPORTED + STG_UNUSED +#endif + ) +{ +#ifdef FORKPROCESS_PRIMOP_SUPPORTED + pid_t pid; + StgTSO* t,*next; + StgMainThread *m; + SchedulerStatus rc; + + IF_DEBUG(scheduler,sched_belch("forking!")); + rts_lock(); // This not only acquires sched_mutex, it also + // makes sure that no other threads are running + + pid = fork(); + + if (pid) { /* parent */ + + /* just return the pid */ + rts_unlock(); + return pid; + + } else { /* child */ + + + // delete all threads + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + // don't allow threads to catch the ThreadKilled exception + deleteThreadImmediately(t); + } + + // wipe the main thread list + while((m = main_threads) != NULL) { + main_threads = m->link; +# ifdef THREADED_RTS + closeCondition(&m->bound_thread_cond); +# endif + stgFree(m); + } + + rc = rts_evalStableIO(entry, NULL); // run the action + rts_checkSchedStatus("forkProcess",rc); + + rts_unlock(); + + hs_exit(); // clean up and exit + stg_exit(0); + } +#else /* !FORKPROCESS_PRIMOP_SUPPORTED */ + barf("forkProcess#: primop not supported, sorry!\n"); + return -1; +#endif +} + +/* --------------------------------------------------------------------------- + * deleteAllThreads(): kill all the live threads. + * + * This is used when we catch a user interrupt (^C), before performing + * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. + * ------------------------------------------------------------------------- */ + +void +deleteAllThreads ( void ) +{ + StgTSO* t, *next; + IF_DEBUG(scheduler,sched_belch("deleting all threads")); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(t); + } + } + + // The run queue now contains a bunch of ThreadKilled threads. We + // must not throw these away: the main thread(s) will be in there + // somewhere, and the main scheduler loop has to deal with it. + // Also, the run queue is the only thing keeping these threads from + // being GC'd, and we don't want the "main thread has been GC'd" panic. + + ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(blackhole_queue == END_TSO_QUEUE); + ASSERT(sleeping_queue == END_TSO_QUEUE); +} + +/* startThread and insertThread are now in GranSim.c -- HWL */ + + +/* --------------------------------------------------------------------------- + * Suspending & resuming Haskell threads. + * + * When making a "safe" call to C (aka _ccall_GC), the task gives back + * its capability before calling the C function. This allows another + * task to pick up the capability and carry on running Haskell + * threads. It also means that if the C call blocks, it won't lock + * the whole system. + * + * The Haskell thread making the C call is put to sleep for the + * duration of the call, on the susepended_ccalling_threads queue. We + * give out a token to the task, which it can use to resume the thread + * on return from the C function. + * ------------------------------------------------------------------------- */ + +StgInt +suspendThread( StgRegTable *reg ) +{ + nat tok; + Capability *cap; + int saved_errno = errno; + + /* assume that *reg is a pointer to the StgRegTable part + * of a Capability. + */ + cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable))); + + ACQUIRE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, + sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + + // XXX this might not be necessary --SDM + cap->r.rCurrentTSO->what_next = ThreadRunGHC; + + threadPaused(cap->r.rCurrentTSO); + cap->r.rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->r.rCurrentTSO; + + if(cap->r.rCurrentTSO->blocked_exceptions == NULL) { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; + cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE; + } else { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc; + } + + /* Use the thread ID as the token; it should be unique */ + tok = cap->r.rCurrentTSO->id; + + /* Hand back capability */ + cap->r.rInHaskell = rtsFalse; + releaseCapability(cap); + +#if defined(RTS_SUPPORTS_THREADS) + /* Preparing to leave the RTS, so ensure there's a native thread/task + waiting to take over. + */ + IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); +#endif + + RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; + return tok; +} + +StgRegTable * +resumeThread( StgInt tok ) +{ + StgTSO *tso, **prev; + Capability *cap; + int saved_errno = errno; + +#if defined(RTS_SUPPORTS_THREADS) + /* Wait for permission to re-enter the RTS with the result. */ + ACQUIRE_LOCK(&sched_mutex); + waitForReturnCapability(&sched_mutex, &cap); + + IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok)); +#else + grabCapability(&cap); +#endif + + /* Remove the thread off of the suspended list */ + prev = &suspended_ccalling_threads; + for (tso = suspended_ccalling_threads; + tso != END_TSO_QUEUE; + prev = &tso->link, tso = tso->link) { + if (tso->id == (StgThreadID)tok) { + *prev = tso->link; + break; + } + } + if (tso == END_TSO_QUEUE) { + barf("resumeThread: thread not found"); + } + tso->link = END_TSO_QUEUE; + + if(tso->why_blocked == BlockedOnCCall) { + awakenBlockedQueueNoLock(tso->blocked_exceptions); + tso->blocked_exceptions = NULL; + } + + /* Reset blocking status */ + tso->why_blocked = NotBlocked; + + cap->r.rCurrentTSO = tso; + cap->r.rInHaskell = rtsTrue; + RELEASE_LOCK(&sched_mutex); + errno = saved_errno; + return &cap->r; +} + +/* --------------------------------------------------------------------------- + * Comparing Thread ids. + * + * This is used from STG land in the implementation of the + * instances of Eq/Ord for ThreadIds. + * ------------------------------------------------------------------------ */ + +int +cmp_thread(StgPtr tso1, StgPtr tso2) +{ + StgThreadID id1 = ((StgTSO *)tso1)->id; + StgThreadID id2 = ((StgTSO *)tso2)->id; + + if (id1 < id2) return (-1); + if (id1 > id2) return 1; + return 0; +} + +/* --------------------------------------------------------------------------- + * Fetching the ThreadID from an StgTSO. + * + * This is used in the implementation of Show for ThreadIds. + * ------------------------------------------------------------------------ */ +int +rts_getThreadId(StgPtr tso) +{ + return ((StgTSO *)tso)->id; +} + +#ifdef DEBUG +void +labelThread(StgPtr tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel(((StgTSO *)tso)->id,buf); +} +#endif /* DEBUG */ + +/* --------------------------------------------------------------------------- + Create a new thread. + + The new thread starts with the given stack size. Before the + scheduler can run, however, this thread needs to have a closure + (and possibly some arguments) pushed on its stack. See + pushClosure() in Schedule.h. + + createGenThread() and createIOThread() (in SchedAPI.h) are + convenient packaged versions of this function. + + currently pri (priority) is only used in a GRAN setup -- HWL + ------------------------------------------------------------------------ */ +#if defined(GRAN) +/* currently pri (priority) is only used in a GRAN setup -- HWL */ +StgTSO * +createThread(nat size, StgInt pri) +#else +StgTSO * +createThread(nat size) +#endif +{ + + StgTSO *tso; + nat stack_size; + + /* First check whether we should create a thread at all */ +#if defined(PARALLEL_HASKELL) + /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { + threadsIgnored++; + debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + threadsCreated++; +#endif + +#if defined(GRAN) + ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); +#endif + + // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW + + /* catch ridiculously small stack sizes */ + if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { + size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; + } + + stack_size = size - TSO_STRUCT_SIZEW; + + tso = (StgTSO *)allocate(size); + TICK_ALLOC_TSO(stack_size, 0); + + SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); +#if defined(GRAN) + SET_GRAN_HDR(tso, ThisPE); +#endif + + // Always start with the compiled code evaluator + tso->what_next = ThreadRunGHC; + + tso->id = next_thread_id++; + tso->why_blocked = NotBlocked; + tso->blocked_exceptions = NULL; + + tso->saved_errno = 0; + tso->main = NULL; + + tso->stack_size = stack_size; + tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) + - TSO_STRUCT_SIZEW; + tso->sp = (P_)&(tso->stack) + stack_size; + + tso->trec = NO_TREC; + +#ifdef PROFILING + tso->prof.CCCS = CCS_MAIN; +#endif + + /* put a stop frame on the stack */ + tso->sp -= sizeofW(StgStopFrame); + SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); + tso->link = END_TSO_QUEUE; + + // ToDo: check this +#if defined(GRAN) + /* uses more flexible routine in GranSim */ + insertThread(tso, CurrentProc); +#else + /* In a non-GranSim setup the pushing of a TSO onto the runq is separated * from its creation */ #endif @@ -3441,7 +4131,159 @@ printThreadStatus(StgTSO *tso) { switch (tso->what_next) { case ThreadKilled: - debugBelch("has + debugBelch("has been killed"); + break; + case ThreadComplete: + debugBelch("has completed"); + break; + default: + printThreadBlockage(tso); + } +} + +void +printAllThreads(void) +{ + StgTSO *t; + +# if defined(GRAN) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(TIME_ON_PROC(CurrentProc), + time_string, rtsFalse/*no commas!*/); + + debugBelch("all threads at [%s]:\n", time_string); +# elif defined(PARALLEL_HASKELL) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(CURRENT_TIME, + time_string, rtsFalse/*no commas!*/); + + debugBelch("all threads at [%s]:\n", time_string); +# else + debugBelch("all threads:\n"); +# endif + + for (t = all_threads; t != END_TSO_QUEUE; ) { + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); +#if defined(DEBUG) + { + void *label = lookupThreadLabel(t->id); + if (label) debugBelch("[\"%s\"] ",(char *)label); + } +#endif + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + t = t->link; + } else { + printThreadStatus(t); + debugBelch("\n"); + t = t->global_link; + } + } +} + +#ifdef DEBUG + +// useful from gdb +void +printThreadQueue(StgTSO *t) +{ + nat i = 0; + for (; t != END_TSO_QUEUE; t = t->link) { + debugBelch("\tthread %d @ %p ", t->id, (void *)t); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + printThreadStatus(t); + debugBelch("\n"); + } + i++; + } + debugBelch("%d threads on queue\n", i); +} + +/* + Print a whole blocking queue attached to node (debugging only). +*/ +# if defined(PARALLEL_HASKELL) +void +print_bq (StgClosure *node) +{ + StgBlockingQueueElement *bqe; + StgTSO *tso; + rtsBool end; + + debugBelch("## BQ of closure %p (%s): ", + node, info_type(node)); + + /* should cover all closures that may have a blocking queue */ + ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || + get_itbl(node)->type == FETCH_ME_BQ || + get_itbl(node)->type == RBH || + get_itbl(node)->type == MVAR); + + ASSERT(node!=(StgClosure*)NULL); // sanity check + + print_bqe(((StgBlockingQueue*)node)->blocking_queue); +} + +/* + Print a whole blocking queue starting with the element bqe. +*/ +void +print_bqe (StgBlockingQueueElement *bqe) +{ + rtsBool end; + + /* + NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; + */ + for (end = (bqe==END_BQ_QUEUE); + !end; // iterate until bqe points to a CONSTR + end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), + bqe = end ? END_BQ_QUEUE : bqe->link) { + ASSERT(bqe != END_BQ_QUEUE); // sanity check + ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check + /* types of closures that may appear in a blocking queue */ + ASSERT(get_itbl(bqe)->type == TSO || + get_itbl(bqe)->type == BLOCKED_FETCH || + get_itbl(bqe)->type == CONSTR); + /* only BQs of an RBH end with an RBH_Save closure */ + //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); + + switch (get_itbl(bqe)->type) { + case TSO: + debugBelch(" TSO %u (%x),", + ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); + break; + case BLOCKED_FETCH: + debugBelch(" BF (node=%p, ga=((%x, %d, %x)),", + ((StgBlockedFetch *)bqe)->node, + ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid, + ((StgBlockedFetch *)bqe)->ga.payload.gc.slot, + ((StgBlockedFetch *)bqe)->ga.weight); + break; + case CONSTR: + debugBelch(" %s (IP %p),", + (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : + get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : + get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : + "RBH_Save_?"), get_itbl(bqe)); + break; + default: + barf("Unexpected closure type %s in blocking queue", // of %p (%s)", + info_type((StgClosure *)bqe)); // , node, info_type(node)); + break; + } + } /* for */ + debugBelch("\n"); +} +# elif defined(GRAN) +void +print_bq (StgClosure *node) +{ + StgBlockingQueueElement *bqe; + PEs node_loc, tso_loc; + rtsBool end; /* should cover all closures that may have a blocking queue */ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || -- 1.7.10.4