1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
53 createThread(nat size, StgInt pri)
56 createThread(Capability *cap, nat size)
62 /* sched_mutex is *not* required */
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
87 stack_size = size - TSO_STRUCT_SIZEW;
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94 SET_GRAN_HDR(tso, ThisPE);
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
104 tso->saved_errno = 0;
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
111 tso->sp = (P_)&(tso->stack) + stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
144 /* Link the new thread on the global thread list.
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
150 RELEASE_LOCK(&sched_mutex);
153 tso->dist.priority = MandatoryPriority; //by default that is...
159 tso->gran.magic = TSO_MAGIC; // debugging only
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
182 tso->par.magic = TSO_MAGIC; // debugging only
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
231 all parallel thread creation calls should fall through the following routine.
234 createThreadFromSpark(rtsSpark spark)
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
250 tso->priority = AdvisoryPriority;
252 pushClosure(tso,spark);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
268 cmp_thread(StgPtr tso1, StgPtr tso2)
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
284 rts_getThreadId(StgPtr tso)
286 return ((StgTSO *)tso)->id;
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
303 setTSOLink(cap,prev,t->_link);
310 barf("removeThreadFromQueue: not found");
314 removeThreadFromDeQueue (Capability *cap,
315 StgTSO **head, StgTSO **tail, StgTSO *tso)
320 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
323 setTSOLink(cap,prev,t->_link);
331 *tail = END_TSO_QUEUE;
337 barf("removeThreadFromMVarQueue: not found");
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
343 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
346 /* ----------------------------------------------------------------------------
349 unblock a single thread.
350 ------------------------------------------------------------------------- */
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
357 #elif defined(PARALLEL_HASKELL)
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
361 /* write RESUME events to log file and
362 update blocked and fetch time (depending on type of the orig closure) */
363 if (RtsFlags.ParFlags.ParStats.Full) {
364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
368 emitSchedule = rtsTrue;
370 switch (get_itbl(node)->type) {
372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
384 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
395 PEs node_loc, tso_loc;
397 node_loc = where_is(node); // should be lifted out of loop
398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
399 tso_loc = where_is((StgClosure *)tso);
400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404 // insertThread(tso, node_loc);
405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
407 tso, node, (rtsSpark*)NULL);
408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
411 } else { // TSO is remote (actually should be FMBQ)
412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413 RtsFlags.GranFlags.Costs.gunblocktime +
414 RtsFlags.GranFlags.Costs.latency;
415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
417 tso, node, (rtsSpark*)NULL);
418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424 (node_loc==tso_loc ? "Local" : "Global"),
425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426 tso->block_info.closure = NULL;
427 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
434 StgBlockingQueueElement *next;
436 switch (get_itbl(bqe)->type) {
438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439 /* if it's a TSO just push it onto the run_queue */
441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
444 unblockCount(bqe, node);
445 /* reset blocking status after dumping event */
446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
453 PendingFetches = (StgBlockedFetch *)bqe;
457 /* can ignore this case in a non-debugging setup;
458 see comments on RBHSave closures above */
460 /* check that the closure is an RBHSave closure */
461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
467 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
478 unblockOne (Capability *cap, StgTSO *tso)
480 return unblockOne_(cap,tso,rtsTrue); // allow migration
484 unblockOne_ (Capability *cap, StgTSO *tso,
485 rtsBool allow_migrate USED_IF_THREADS)
489 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490 ASSERT(tso->why_blocked != NotBlocked);
492 tso->why_blocked = NotBlocked;
494 tso->_link = END_TSO_QUEUE;
496 #if defined(THREADED_RTS)
497 if (tso->cap == cap || (!tsoLocked(tso) &&
499 RtsFlags.ParFlags.wakeupMigrate)) {
500 // We are waking up this thread on the current Capability, which
501 // might involve migrating it from the Capability it was last on.
503 ASSERT(tso->bound->cap == tso->cap);
504 tso->bound->cap = cap;
507 appendToRunQueue(cap,tso);
508 // we're holding a newly woken thread, make sure we context switch
509 // quickly so we can migrate it if necessary.
512 // we'll try to wake it up on the Capability it was last on.
513 wakeupThreadOnCapability_lock(tso->cap, tso);
516 appendToRunQueue(cap,tso);
520 debugTrace(DEBUG_sched,
521 "waking up thread %ld on cap %d",
522 (long)tso->id, tso->cap->no);
527 /* ----------------------------------------------------------------------------
530 wakes up all the threads on the specified queue.
531 ------------------------------------------------------------------------- */
535 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
537 StgBlockingQueueElement *bqe;
542 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
543 node, CurrentProc, CurrentTime[CurrentProc],
544 CurrentTSO->id, CurrentTSO));
546 node_loc = where_is(node);
548 ASSERT(q == END_BQ_QUEUE ||
549 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
550 get_itbl(q)->type == CONSTR); // closure (type constructor)
551 ASSERT(is_unique(node));
553 /* FAKE FETCH: magically copy the node to the tso's proc;
554 no Fetch necessary because in reality the node should not have been
555 moved to the other PE in the first place
557 if (CurrentProc!=node_loc) {
559 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
560 node, node_loc, CurrentProc, CurrentTSO->id,
561 // CurrentTSO, where_is(CurrentTSO),
562 node->header.gran.procs));
563 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
565 debugBelch("## new bitmask of node %p is %#x\n",
566 node, node->header.gran.procs));
567 if (RtsFlags.GranFlags.GranSimStats.Global) {
568 globalGranStats.tot_fake_fetches++;
573 // ToDo: check: ASSERT(CurrentProc==node_loc);
574 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
577 bqe points to the current element in the queue
578 next points to the next element in the queue
580 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
581 //tso_loc = where_is(tso);
583 bqe = unblockOne(bqe, node);
586 /* if this is the BQ of an RBH, we have to put back the info ripped out of
587 the closure to make room for the anchor of the BQ */
588 if (bqe!=END_BQ_QUEUE) {
589 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
591 ASSERT((info_ptr==&RBH_Save_0_info) ||
592 (info_ptr==&RBH_Save_1_info) ||
593 (info_ptr==&RBH_Save_2_info));
595 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
596 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
597 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
600 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
601 node, info_type(node)));
604 /* statistics gathering */
605 if (RtsFlags.GranFlags.GranSimStats.Global) {
606 // globalGranStats.tot_bq_processing_time += bq_processing_time;
607 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
608 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
609 globalGranStats.tot_awbq++; // total no. of bqs awakened
612 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
613 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
615 #elif defined(PARALLEL_HASKELL)
617 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
619 StgBlockingQueueElement *bqe;
621 IF_PAR_DEBUG(verbose,
622 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
626 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
627 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
632 ASSERT(q == END_BQ_QUEUE ||
633 get_itbl(q)->type == TSO ||
634 get_itbl(q)->type == BLOCKED_FETCH ||
635 get_itbl(q)->type == CONSTR);
638 while (get_itbl(bqe)->type==TSO ||
639 get_itbl(bqe)->type==BLOCKED_FETCH) {
640 bqe = unblockOne(bqe, node);
644 #else /* !GRAN && !PARALLEL_HASKELL */
647 awakenBlockedQueue(Capability *cap, StgTSO *tso)
649 while (tso != END_TSO_QUEUE) {
650 tso = unblockOne(cap,tso);
656 /* ---------------------------------------------------------------------------
657 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
658 * used by Control.Concurrent for error checking.
659 * ------------------------------------------------------------------------- */
662 rtsSupportsBoundThreads(void)
664 #if defined(THREADED_RTS)
667 return HS_BOOL_FALSE;
671 /* ---------------------------------------------------------------------------
672 * isThreadBound(tso): check whether tso is bound to an OS thread.
673 * ------------------------------------------------------------------------- */
676 isThreadBound(StgTSO* tso USED_IF_THREADS)
678 #if defined(THREADED_RTS)
679 return (tso->bound != NULL);
684 /* ----------------------------------------------------------------------------
685 * Debugging: why is a thread blocked
686 * ------------------------------------------------------------------------- */
690 printThreadBlockage(StgTSO *tso)
692 switch (tso->why_blocked) {
694 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
697 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
699 #if defined(mingw32_HOST_OS)
700 case BlockedOnDoProc:
701 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
705 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
708 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
710 case BlockedOnException:
711 debugBelch("is blocked on delivering an exception to thread %lu",
712 (unsigned long)tso->block_info.tso->id);
714 case BlockedOnBlackHole:
715 debugBelch("is blocked on a black hole");
718 debugBelch("is not blocked");
720 #if defined(PARALLEL_HASKELL)
722 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
723 tso->block_info.closure, info_type(tso->block_info.closure));
725 case BlockedOnGA_NoSend:
726 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
727 tso->block_info.closure, info_type(tso->block_info.closure));
731 debugBelch("is blocked on an external call");
733 case BlockedOnCCall_NoUnblockExc:
734 debugBelch("is blocked on an external call (exceptions were already blocked)");
737 debugBelch("is blocked on an STM operation");
740 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
741 tso->why_blocked, tso->id, tso);
746 printThreadStatus(StgTSO *t)
748 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
750 void *label = lookupThreadLabel(t->id);
751 if (label) debugBelch("[\"%s\"] ",(char *)label);
753 if (t->what_next == ThreadRelocated) {
754 debugBelch("has been relocated...\n");
756 switch (t->what_next) {
758 debugBelch("has been killed");
761 debugBelch("has completed");
764 printThreadBlockage(t);
771 printAllThreads(void)
778 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
779 ullong_format_string(TIME_ON_PROC(CurrentProc),
780 time_string, rtsFalse/*no commas!*/);
782 debugBelch("all threads at [%s]:\n", time_string);
783 # elif defined(PARALLEL_HASKELL)
784 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
785 ullong_format_string(CURRENT_TIME,
786 time_string, rtsFalse/*no commas!*/);
788 debugBelch("all threads at [%s]:\n", time_string);
790 debugBelch("all threads:\n");
793 for (i = 0; i < n_capabilities; i++) {
794 cap = &capabilities[i];
795 debugBelch("threads on capability %d:\n", cap->no);
796 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
797 printThreadStatus(t);
801 debugBelch("other threads:\n");
802 for (s = 0; s < total_steps; s++) {
803 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
804 if (t->why_blocked != NotBlocked) {
805 printThreadStatus(t);
807 if (t->what_next == ThreadRelocated) {
810 next = t->global_link;
818 printThreadQueue(StgTSO *t)
821 for (; t != END_TSO_QUEUE; t = t->_link) {
822 printThreadStatus(t);
825 debugBelch("%d threads on queue\n", i);
829 Print a whole blocking queue attached to node (debugging only).
831 # if defined(PARALLEL_HASKELL)
833 print_bq (StgClosure *node)
835 StgBlockingQueueElement *bqe;
839 debugBelch("## BQ of closure %p (%s): ",
840 node, info_type(node));
842 /* should cover all closures that may have a blocking queue */
843 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
844 get_itbl(node)->type == FETCH_ME_BQ ||
845 get_itbl(node)->type == RBH ||
846 get_itbl(node)->type == MVAR);
848 ASSERT(node!=(StgClosure*)NULL); // sanity check
850 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
854 Print a whole blocking queue starting with the element bqe.
857 print_bqe (StgBlockingQueueElement *bqe)
862 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
864 for (end = (bqe==END_BQ_QUEUE);
865 !end; // iterate until bqe points to a CONSTR
866 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
867 bqe = end ? END_BQ_QUEUE : bqe->link) {
868 ASSERT(bqe != END_BQ_QUEUE); // sanity check
869 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
870 /* types of closures that may appear in a blocking queue */
871 ASSERT(get_itbl(bqe)->type == TSO ||
872 get_itbl(bqe)->type == BLOCKED_FETCH ||
873 get_itbl(bqe)->type == CONSTR);
874 /* only BQs of an RBH end with an RBH_Save closure */
875 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
877 switch (get_itbl(bqe)->type) {
879 debugBelch(" TSO %u (%x),",
880 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
883 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
884 ((StgBlockedFetch *)bqe)->node,
885 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
886 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
887 ((StgBlockedFetch *)bqe)->ga.weight);
890 debugBelch(" %s (IP %p),",
891 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
892 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
893 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
894 "RBH_Save_?"), get_itbl(bqe));
897 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
898 info_type((StgClosure *)bqe)); // , node, info_type(node));
906 print_bq (StgClosure *node)
908 StgBlockingQueueElement *bqe;
909 PEs node_loc, tso_loc;
912 /* should cover all closures that may have a blocking queue */
913 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
914 get_itbl(node)->type == FETCH_ME_BQ ||
915 get_itbl(node)->type == RBH);
917 ASSERT(node!=(StgClosure*)NULL); // sanity check
918 node_loc = where_is(node);
920 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
921 node, info_type(node), node_loc);
924 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
926 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
927 !end; // iterate until bqe points to a CONSTR
928 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
929 ASSERT(bqe != END_BQ_QUEUE); // sanity check
930 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
931 /* types of closures that may appear in a blocking queue */
932 ASSERT(get_itbl(bqe)->type == TSO ||
933 get_itbl(bqe)->type == CONSTR);
934 /* only BQs of an RBH end with an RBH_Save closure */
935 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
937 tso_loc = where_is((StgClosure *)bqe);
938 switch (get_itbl(bqe)->type) {
940 debugBelch(" TSO %d (%p) on [PE %d],",
941 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
944 debugBelch(" %s (IP %p),",
945 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
946 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
947 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
948 "RBH_Save_?"), get_itbl(bqe));
951 barf("Unexpected closure type %s in blocking queue of %p (%s)",
952 info_type((StgClosure *)bqe), node, info_type(node));
960 #if defined(PARALLEL_HASKELL)
967 for (i=0, tso=run_queue_hd;
968 tso != END_TSO_QUEUE;
969 i++, tso=tso->link) {