1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
53 createThread(nat size, StgInt pri)
56 createThread(Capability *cap, nat size)
62 /* sched_mutex is *not* required */
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
87 stack_size = size - TSO_STRUCT_SIZEW;
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94 SET_GRAN_HDR(tso, ThisPE);
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
104 tso->saved_errno = 0;
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
111 tso->sp = (P_)&(tso->stack) + stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
144 /* Link the new thread on the global thread list.
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
150 RELEASE_LOCK(&sched_mutex);
153 tso->dist.priority = MandatoryPriority; //by default that is...
159 tso->gran.magic = TSO_MAGIC; // debugging only
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
182 tso->par.magic = TSO_MAGIC; // debugging only
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
231 all parallel thread creation calls should fall through the following routine.
234 createThreadFromSpark(rtsSpark spark)
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
250 tso->priority = AdvisoryPriority;
252 pushClosure(tso,spark);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
268 cmp_thread(StgPtr tso1, StgPtr tso2)
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
284 rts_getThreadId(StgPtr tso)
286 return ((StgTSO *)tso)->id;
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
303 setTSOLink(cap,prev,t->_link);
310 barf("removeThreadFromQueue: not found");
314 removeThreadFromDeQueue (Capability *cap,
315 StgTSO **head, StgTSO **tail, StgTSO *tso)
320 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
323 setTSOLink(cap,prev,t->_link);
331 *tail = END_TSO_QUEUE;
337 barf("removeThreadFromMVarQueue: not found");
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
343 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
346 /* ----------------------------------------------------------------------------
349 unblock a single thread.
350 ------------------------------------------------------------------------- */
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
357 #elif defined(PARALLEL_HASKELL)
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
361 /* write RESUME events to log file and
362 update blocked and fetch time (depending on type of the orig closure) */
363 if (RtsFlags.ParFlags.ParStats.Full) {
364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
368 emitSchedule = rtsTrue;
370 switch (get_itbl(node)->type) {
372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
384 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
395 PEs node_loc, tso_loc;
397 node_loc = where_is(node); // should be lifted out of loop
398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
399 tso_loc = where_is((StgClosure *)tso);
400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404 // insertThread(tso, node_loc);
405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
407 tso, node, (rtsSpark*)NULL);
408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
411 } else { // TSO is remote (actually should be FMBQ)
412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413 RtsFlags.GranFlags.Costs.gunblocktime +
414 RtsFlags.GranFlags.Costs.latency;
415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
417 tso, node, (rtsSpark*)NULL);
418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424 (node_loc==tso_loc ? "Local" : "Global"),
425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426 tso->block_info.closure = NULL;
427 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
434 StgBlockingQueueElement *next;
436 switch (get_itbl(bqe)->type) {
438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439 /* if it's a TSO just push it onto the run_queue */
441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
444 unblockCount(bqe, node);
445 /* reset blocking status after dumping event */
446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
453 PendingFetches = (StgBlockedFetch *)bqe;
457 /* can ignore this case in a non-debugging setup;
458 see comments on RBHSave closures above */
460 /* check that the closure is an RBHSave closure */
461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
467 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
478 unblockOne (Capability *cap, StgTSO *tso)
480 return unblockOne_(cap,tso,rtsTrue); // allow migration
484 unblockOne_ (Capability *cap, StgTSO *tso,
485 rtsBool allow_migrate USED_IF_THREADS)
489 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490 ASSERT(tso->why_blocked != NotBlocked);
492 tso->why_blocked = NotBlocked;
494 tso->_link = END_TSO_QUEUE;
496 #if defined(THREADED_RTS)
497 if (tso->cap == cap || (!tsoLocked(tso) &&
499 RtsFlags.ParFlags.wakeupMigrate)) {
500 // We are waking up this thread on the current Capability, which
501 // might involve migrating it from the Capability it was last on.
503 ASSERT(tso->bound->cap == tso->cap);
504 tso->bound->cap = cap;
507 appendToRunQueue(cap,tso);
508 // we're holding a newly woken thread, make sure we context switch
509 // quickly so we can migrate it if necessary.
510 cap->context_switch = 1;
512 // we'll try to wake it up on the Capability it was last on.
513 wakeupThreadOnCapability(cap, tso->cap, tso);
516 appendToRunQueue(cap,tso);
517 cap->context_switch = 1;
520 debugTrace(DEBUG_sched,
521 "waking up thread %ld on cap %d",
522 (long)tso->id, tso->cap->no);
527 /* ----------------------------------------------------------------------------
530 wakes up all the threads on the specified queue.
531 ------------------------------------------------------------------------- */
535 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
537 StgBlockingQueueElement *bqe;
542 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
543 node, CurrentProc, CurrentTime[CurrentProc],
544 CurrentTSO->id, CurrentTSO));
546 node_loc = where_is(node);
548 ASSERT(q == END_BQ_QUEUE ||
549 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
550 get_itbl(q)->type == CONSTR); // closure (type constructor)
551 ASSERT(is_unique(node));
553 /* FAKE FETCH: magically copy the node to the tso's proc;
554 no Fetch necessary because in reality the node should not have been
555 moved to the other PE in the first place
557 if (CurrentProc!=node_loc) {
559 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
560 node, node_loc, CurrentProc, CurrentTSO->id,
561 // CurrentTSO, where_is(CurrentTSO),
562 node->header.gran.procs));
563 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
565 debugBelch("## new bitmask of node %p is %#x\n",
566 node, node->header.gran.procs));
567 if (RtsFlags.GranFlags.GranSimStats.Global) {
568 globalGranStats.tot_fake_fetches++;
573 // ToDo: check: ASSERT(CurrentProc==node_loc);
574 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
577 bqe points to the current element in the queue
578 next points to the next element in the queue
580 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
581 //tso_loc = where_is(tso);
583 bqe = unblockOne(bqe, node);
586 /* if this is the BQ of an RBH, we have to put back the info ripped out of
587 the closure to make room for the anchor of the BQ */
588 if (bqe!=END_BQ_QUEUE) {
589 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
591 ASSERT((info_ptr==&RBH_Save_0_info) ||
592 (info_ptr==&RBH_Save_1_info) ||
593 (info_ptr==&RBH_Save_2_info));
595 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
596 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
597 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
600 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
601 node, info_type(node)));
604 /* statistics gathering */
605 if (RtsFlags.GranFlags.GranSimStats.Global) {
606 // globalGranStats.tot_bq_processing_time += bq_processing_time;
607 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
608 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
609 globalGranStats.tot_awbq++; // total no. of bqs awakened
612 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
613 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
615 #elif defined(PARALLEL_HASKELL)
617 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
619 StgBlockingQueueElement *bqe;
621 IF_PAR_DEBUG(verbose,
622 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
626 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
627 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
632 ASSERT(q == END_BQ_QUEUE ||
633 get_itbl(q)->type == TSO ||
634 get_itbl(q)->type == BLOCKED_FETCH ||
635 get_itbl(q)->type == CONSTR);
638 while (get_itbl(bqe)->type==TSO ||
639 get_itbl(bqe)->type==BLOCKED_FETCH) {
640 bqe = unblockOne(bqe, node);
644 #else /* !GRAN && !PARALLEL_HASKELL */
647 awakenBlockedQueue(Capability *cap, StgTSO *tso)
649 while (tso != END_TSO_QUEUE) {
650 tso = unblockOne(cap,tso);
656 /* ---------------------------------------------------------------------------
657 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
658 * used by Control.Concurrent for error checking.
659 * ------------------------------------------------------------------------- */
662 rtsSupportsBoundThreads(void)
664 #if defined(THREADED_RTS)
667 return HS_BOOL_FALSE;
671 /* ---------------------------------------------------------------------------
672 * isThreadBound(tso): check whether tso is bound to an OS thread.
673 * ------------------------------------------------------------------------- */
676 isThreadBound(StgTSO* tso USED_IF_THREADS)
678 #if defined(THREADED_RTS)
679 return (tso->bound != NULL);
684 /* ----------------------------------------------------------------------------
685 * Debugging: why is a thread blocked
686 * ------------------------------------------------------------------------- */
690 printThreadBlockage(StgTSO *tso)
692 switch (tso->why_blocked) {
694 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
697 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
699 #if defined(mingw32_HOST_OS)
700 case BlockedOnDoProc:
701 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
705 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
708 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
710 case BlockedOnException:
711 debugBelch("is blocked on delivering an exception to thread %lu",
712 (unsigned long)tso->block_info.tso->id);
714 case BlockedOnBlackHole:
715 debugBelch("is blocked on a black hole");
718 debugBelch("is not blocked");
720 #if defined(PARALLEL_HASKELL)
722 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
723 tso->block_info.closure, info_type(tso->block_info.closure));
725 case BlockedOnGA_NoSend:
726 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
727 tso->block_info.closure, info_type(tso->block_info.closure));
731 debugBelch("is blocked on an external call");
733 case BlockedOnCCall_NoUnblockExc:
734 debugBelch("is blocked on an external call (exceptions were already blocked)");
737 debugBelch("is blocked on an STM operation");
740 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
741 tso->why_blocked, tso->id, tso);
746 printThreadStatus(StgTSO *t)
748 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
750 void *label = lookupThreadLabel(t->id);
751 if (label) debugBelch("[\"%s\"] ",(char *)label);
753 if (t->what_next == ThreadRelocated) {
754 debugBelch("has been relocated...\n");
756 switch (t->what_next) {
758 debugBelch("has been killed");
761 debugBelch("has completed");
764 printThreadBlockage(t);
766 if (t->flags & TSO_DIRTY) {
767 debugBelch(" (TSO_DIRTY)");
768 } else if (t->flags & TSO_LINK_DIRTY) {
769 debugBelch(" (TSO_LINK_DIRTY)");
776 printAllThreads(void)
783 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
784 ullong_format_string(TIME_ON_PROC(CurrentProc),
785 time_string, rtsFalse/*no commas!*/);
787 debugBelch("all threads at [%s]:\n", time_string);
788 # elif defined(PARALLEL_HASKELL)
789 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
790 ullong_format_string(CURRENT_TIME,
791 time_string, rtsFalse/*no commas!*/);
793 debugBelch("all threads at [%s]:\n", time_string);
795 debugBelch("all threads:\n");
798 for (i = 0; i < n_capabilities; i++) {
799 cap = &capabilities[i];
800 debugBelch("threads on capability %d:\n", cap->no);
801 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
802 printThreadStatus(t);
806 debugBelch("other threads:\n");
807 for (s = 0; s < total_steps; s++) {
808 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
809 if (t->why_blocked != NotBlocked) {
810 printThreadStatus(t);
812 if (t->what_next == ThreadRelocated) {
815 next = t->global_link;
823 printThreadQueue(StgTSO *t)
826 for (; t != END_TSO_QUEUE; t = t->_link) {
827 printThreadStatus(t);
830 debugBelch("%d threads on queue\n", i);
834 Print a whole blocking queue attached to node (debugging only).
836 # if defined(PARALLEL_HASKELL)
838 print_bq (StgClosure *node)
840 StgBlockingQueueElement *bqe;
844 debugBelch("## BQ of closure %p (%s): ",
845 node, info_type(node));
847 /* should cover all closures that may have a blocking queue */
848 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
849 get_itbl(node)->type == FETCH_ME_BQ ||
850 get_itbl(node)->type == RBH ||
851 get_itbl(node)->type == MVAR);
853 ASSERT(node!=(StgClosure*)NULL); // sanity check
855 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
859 Print a whole blocking queue starting with the element bqe.
862 print_bqe (StgBlockingQueueElement *bqe)
867 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
869 for (end = (bqe==END_BQ_QUEUE);
870 !end; // iterate until bqe points to a CONSTR
871 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
872 bqe = end ? END_BQ_QUEUE : bqe->link) {
873 ASSERT(bqe != END_BQ_QUEUE); // sanity check
874 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
875 /* types of closures that may appear in a blocking queue */
876 ASSERT(get_itbl(bqe)->type == TSO ||
877 get_itbl(bqe)->type == BLOCKED_FETCH ||
878 get_itbl(bqe)->type == CONSTR);
879 /* only BQs of an RBH end with an RBH_Save closure */
880 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
882 switch (get_itbl(bqe)->type) {
884 debugBelch(" TSO %u (%x),",
885 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
888 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
889 ((StgBlockedFetch *)bqe)->node,
890 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
891 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
892 ((StgBlockedFetch *)bqe)->ga.weight);
895 debugBelch(" %s (IP %p),",
896 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
897 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
898 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
899 "RBH_Save_?"), get_itbl(bqe));
902 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
903 info_type((StgClosure *)bqe)); // , node, info_type(node));
911 print_bq (StgClosure *node)
913 StgBlockingQueueElement *bqe;
914 PEs node_loc, tso_loc;
917 /* should cover all closures that may have a blocking queue */
918 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
919 get_itbl(node)->type == FETCH_ME_BQ ||
920 get_itbl(node)->type == RBH);
922 ASSERT(node!=(StgClosure*)NULL); // sanity check
923 node_loc = where_is(node);
925 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
926 node, info_type(node), node_loc);
929 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
931 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
932 !end; // iterate until bqe points to a CONSTR
933 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
934 ASSERT(bqe != END_BQ_QUEUE); // sanity check
935 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
936 /* types of closures that may appear in a blocking queue */
937 ASSERT(get_itbl(bqe)->type == TSO ||
938 get_itbl(bqe)->type == CONSTR);
939 /* only BQs of an RBH end with an RBH_Save closure */
940 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
942 tso_loc = where_is((StgClosure *)bqe);
943 switch (get_itbl(bqe)->type) {
945 debugBelch(" TSO %d (%p) on [PE %d],",
946 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
949 debugBelch(" %s (IP %p),",
950 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
951 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
952 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
953 "RBH_Save_?"), get_itbl(bqe));
956 barf("Unexpected closure type %s in blocking queue of %p (%s)",
957 info_type((StgClosure *)bqe), node, info_type(node));
965 #if defined(PARALLEL_HASKELL)
972 for (i=0, tso=run_queue_hd;
973 tso != END_TSO_QUEUE;
974 i++, tso=tso->link) {