1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
53 createThread(nat size, StgInt pri)
56 createThread(Capability *cap, nat size)
62 /* sched_mutex is *not* required */
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
87 stack_size = size - TSO_STRUCT_SIZEW;
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94 SET_GRAN_HDR(tso, ThisPE);
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
104 tso->saved_errno = 0;
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
111 tso->sp = (P_)&(tso->stack) + stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->link = END_TSO_QUEUE;
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
144 /* Link the new thread on the global thread list.
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = all_threads;
150 RELEASE_LOCK(&sched_mutex);
153 tso->dist.priority = MandatoryPriority; //by default that is...
159 tso->gran.magic = TSO_MAGIC; // debugging only
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
182 tso->par.magic = TSO_MAGIC; // debugging only
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
231 all parallel thread creation calls should fall through the following routine.
234 createThreadFromSpark(rtsSpark spark)
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
250 tso->priority = AdvisoryPriority;
252 pushClosure(tso,spark);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
268 cmp_thread(StgPtr tso1, StgPtr tso2)
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
284 rts_getThreadId(StgPtr tso)
286 return ((StgTSO *)tso)->id;
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
295 removeThreadFromQueue (StgTSO **queue, StgTSO *tso)
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) {
303 prev->link = t->link;
310 barf("removeThreadFromQueue: not found");
314 removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso)
319 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) {
322 prev->link = t->link;
330 *tail = END_TSO_QUEUE;
336 barf("removeThreadFromMVarQueue: not found");
340 removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso)
342 removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso);
345 /* ----------------------------------------------------------------------------
348 unblock a single thread.
349 ------------------------------------------------------------------------- */
353 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
356 #elif defined(PARALLEL_HASKELL)
358 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 /* write RESUME events to log file and
361 update blocked and fetch time (depending on type of the orig closure) */
362 if (RtsFlags.ParFlags.ParStats.Full) {
363 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
364 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
365 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
367 emitSchedule = rtsTrue;
369 switch (get_itbl(node)->type) {
371 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
376 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
383 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
390 StgBlockingQueueElement *
391 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
394 PEs node_loc, tso_loc;
396 node_loc = where_is(node); // should be lifted out of loop
397 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
398 tso_loc = where_is((StgClosure *)tso);
399 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
400 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
401 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
402 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
403 // insertThread(tso, node_loc);
404 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
406 tso, node, (rtsSpark*)NULL);
407 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
410 } else { // TSO is remote (actually should be FMBQ)
411 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
412 RtsFlags.GranFlags.Costs.gunblocktime +
413 RtsFlags.GranFlags.Costs.latency;
414 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
416 tso, node, (rtsSpark*)NULL);
417 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
420 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
422 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
423 (node_loc==tso_loc ? "Local" : "Global"),
424 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
425 tso->block_info.closure = NULL;
426 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
429 #elif defined(PARALLEL_HASKELL)
430 StgBlockingQueueElement *
431 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
433 StgBlockingQueueElement *next;
435 switch (get_itbl(bqe)->type) {
437 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
438 /* if it's a TSO just push it onto the run_queue */
440 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
441 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
443 unblockCount(bqe, node);
444 /* reset blocking status after dumping event */
445 ((StgTSO *)bqe)->why_blocked = NotBlocked;
449 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
451 bqe->link = (StgBlockingQueueElement *)PendingFetches;
452 PendingFetches = (StgBlockedFetch *)bqe;
456 /* can ignore this case in a non-debugging setup;
457 see comments on RBHSave closures above */
459 /* check that the closure is an RBHSave closure */
460 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
461 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
466 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
467 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
471 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
477 unblockOne (Capability *cap, StgTSO *tso)
479 return unblockOne_(cap,tso,rtsTrue); // allow migration
483 unblockOne_ (Capability *cap, StgTSO *tso,
484 rtsBool allow_migrate USED_IF_THREADS)
488 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
489 ASSERT(tso->why_blocked != NotBlocked);
491 tso->why_blocked = NotBlocked;
493 tso->link = END_TSO_QUEUE;
495 #if defined(THREADED_RTS)
496 if (tso->cap == cap || (!tsoLocked(tso) &&
498 RtsFlags.ParFlags.wakeupMigrate)) {
499 // We are waking up this thread on the current Capability, which
500 // might involve migrating it from the Capability it was last on.
502 ASSERT(tso->bound->cap == tso->cap);
503 tso->bound->cap = cap;
506 appendToRunQueue(cap,tso);
507 // we're holding a newly woken thread, make sure we context switch
508 // quickly so we can migrate it if necessary.
511 // we'll try to wake it up on the Capability it was last on.
512 wakeupThreadOnCapability_lock(tso->cap, tso);
515 appendToRunQueue(cap,tso);
519 debugTrace(DEBUG_sched,
520 "waking up thread %ld on cap %d",
521 (long)tso->id, tso->cap->no);
526 /* ----------------------------------------------------------------------------
529 wakes up all the threads on the specified queue.
530 ------------------------------------------------------------------------- */
534 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
536 StgBlockingQueueElement *bqe;
541 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
542 node, CurrentProc, CurrentTime[CurrentProc],
543 CurrentTSO->id, CurrentTSO));
545 node_loc = where_is(node);
547 ASSERT(q == END_BQ_QUEUE ||
548 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
549 get_itbl(q)->type == CONSTR); // closure (type constructor)
550 ASSERT(is_unique(node));
552 /* FAKE FETCH: magically copy the node to the tso's proc;
553 no Fetch necessary because in reality the node should not have been
554 moved to the other PE in the first place
556 if (CurrentProc!=node_loc) {
558 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
559 node, node_loc, CurrentProc, CurrentTSO->id,
560 // CurrentTSO, where_is(CurrentTSO),
561 node->header.gran.procs));
562 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
564 debugBelch("## new bitmask of node %p is %#x\n",
565 node, node->header.gran.procs));
566 if (RtsFlags.GranFlags.GranSimStats.Global) {
567 globalGranStats.tot_fake_fetches++;
572 // ToDo: check: ASSERT(CurrentProc==node_loc);
573 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
576 bqe points to the current element in the queue
577 next points to the next element in the queue
579 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
580 //tso_loc = where_is(tso);
582 bqe = unblockOne(bqe, node);
585 /* if this is the BQ of an RBH, we have to put back the info ripped out of
586 the closure to make room for the anchor of the BQ */
587 if (bqe!=END_BQ_QUEUE) {
588 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
590 ASSERT((info_ptr==&RBH_Save_0_info) ||
591 (info_ptr==&RBH_Save_1_info) ||
592 (info_ptr==&RBH_Save_2_info));
594 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
595 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
596 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
599 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
600 node, info_type(node)));
603 /* statistics gathering */
604 if (RtsFlags.GranFlags.GranSimStats.Global) {
605 // globalGranStats.tot_bq_processing_time += bq_processing_time;
606 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
607 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
608 globalGranStats.tot_awbq++; // total no. of bqs awakened
611 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
612 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
614 #elif defined(PARALLEL_HASKELL)
616 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
618 StgBlockingQueueElement *bqe;
620 IF_PAR_DEBUG(verbose,
621 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
625 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
626 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
631 ASSERT(q == END_BQ_QUEUE ||
632 get_itbl(q)->type == TSO ||
633 get_itbl(q)->type == BLOCKED_FETCH ||
634 get_itbl(q)->type == CONSTR);
637 while (get_itbl(bqe)->type==TSO ||
638 get_itbl(bqe)->type==BLOCKED_FETCH) {
639 bqe = unblockOne(bqe, node);
643 #else /* !GRAN && !PARALLEL_HASKELL */
646 awakenBlockedQueue(Capability *cap, StgTSO *tso)
648 while (tso != END_TSO_QUEUE) {
649 tso = unblockOne(cap,tso);
655 /* ---------------------------------------------------------------------------
656 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
657 * used by Control.Concurrent for error checking.
658 * ------------------------------------------------------------------------- */
661 rtsSupportsBoundThreads(void)
663 #if defined(THREADED_RTS)
666 return HS_BOOL_FALSE;
670 /* ---------------------------------------------------------------------------
671 * isThreadBound(tso): check whether tso is bound to an OS thread.
672 * ------------------------------------------------------------------------- */
675 isThreadBound(StgTSO* tso USED_IF_THREADS)
677 #if defined(THREADED_RTS)
678 return (tso->bound != NULL);
683 /* ----------------------------------------------------------------------------
684 * Debugging: why is a thread blocked
685 * ------------------------------------------------------------------------- */
689 printThreadBlockage(StgTSO *tso)
691 switch (tso->why_blocked) {
693 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
696 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
698 #if defined(mingw32_HOST_OS)
699 case BlockedOnDoProc:
700 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
704 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
707 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
709 case BlockedOnException:
710 debugBelch("is blocked on delivering an exception to thread %lu",
711 (unsigned long)tso->block_info.tso->id);
713 case BlockedOnBlackHole:
714 debugBelch("is blocked on a black hole");
717 debugBelch("is not blocked");
719 #if defined(PARALLEL_HASKELL)
721 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
722 tso->block_info.closure, info_type(tso->block_info.closure));
724 case BlockedOnGA_NoSend:
725 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
726 tso->block_info.closure, info_type(tso->block_info.closure));
730 debugBelch("is blocked on an external call");
732 case BlockedOnCCall_NoUnblockExc:
733 debugBelch("is blocked on an external call (exceptions were already blocked)");
736 debugBelch("is blocked on an STM operation");
739 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
740 tso->why_blocked, tso->id, tso);
745 printThreadStatus(StgTSO *t)
747 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
749 void *label = lookupThreadLabel(t->id);
750 if (label) debugBelch("[\"%s\"] ",(char *)label);
752 if (t->what_next == ThreadRelocated) {
753 debugBelch("has been relocated...\n");
755 switch (t->what_next) {
757 debugBelch("has been killed");
760 debugBelch("has completed");
763 printThreadBlockage(t);
770 printAllThreads(void)
777 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
778 ullong_format_string(TIME_ON_PROC(CurrentProc),
779 time_string, rtsFalse/*no commas!*/);
781 debugBelch("all threads at [%s]:\n", time_string);
782 # elif defined(PARALLEL_HASKELL)
783 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
784 ullong_format_string(CURRENT_TIME,
785 time_string, rtsFalse/*no commas!*/);
787 debugBelch("all threads at [%s]:\n", time_string);
789 debugBelch("all threads:\n");
792 for (i = 0; i < n_capabilities; i++) {
793 cap = &capabilities[i];
794 debugBelch("threads on capability %d:\n", cap->no);
795 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
796 printThreadStatus(t);
800 debugBelch("other threads:\n");
801 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
802 if (t->why_blocked != NotBlocked) {
803 printThreadStatus(t);
805 if (t->what_next == ThreadRelocated) {
808 next = t->global_link;
815 printThreadQueue(StgTSO *t)
818 for (; t != END_TSO_QUEUE; t = t->link) {
819 printThreadStatus(t);
822 debugBelch("%d threads on queue\n", i);
826 Print a whole blocking queue attached to node (debugging only).
828 # if defined(PARALLEL_HASKELL)
830 print_bq (StgClosure *node)
832 StgBlockingQueueElement *bqe;
836 debugBelch("## BQ of closure %p (%s): ",
837 node, info_type(node));
839 /* should cover all closures that may have a blocking queue */
840 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
841 get_itbl(node)->type == FETCH_ME_BQ ||
842 get_itbl(node)->type == RBH ||
843 get_itbl(node)->type == MVAR);
845 ASSERT(node!=(StgClosure*)NULL); // sanity check
847 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
851 Print a whole blocking queue starting with the element bqe.
854 print_bqe (StgBlockingQueueElement *bqe)
859 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
861 for (end = (bqe==END_BQ_QUEUE);
862 !end; // iterate until bqe points to a CONSTR
863 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
864 bqe = end ? END_BQ_QUEUE : bqe->link) {
865 ASSERT(bqe != END_BQ_QUEUE); // sanity check
866 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
867 /* types of closures that may appear in a blocking queue */
868 ASSERT(get_itbl(bqe)->type == TSO ||
869 get_itbl(bqe)->type == BLOCKED_FETCH ||
870 get_itbl(bqe)->type == CONSTR);
871 /* only BQs of an RBH end with an RBH_Save closure */
872 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
874 switch (get_itbl(bqe)->type) {
876 debugBelch(" TSO %u (%x),",
877 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
880 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
881 ((StgBlockedFetch *)bqe)->node,
882 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
883 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
884 ((StgBlockedFetch *)bqe)->ga.weight);
887 debugBelch(" %s (IP %p),",
888 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
889 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
890 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
891 "RBH_Save_?"), get_itbl(bqe));
894 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
895 info_type((StgClosure *)bqe)); // , node, info_type(node));
903 print_bq (StgClosure *node)
905 StgBlockingQueueElement *bqe;
906 PEs node_loc, tso_loc;
909 /* should cover all closures that may have a blocking queue */
910 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
911 get_itbl(node)->type == FETCH_ME_BQ ||
912 get_itbl(node)->type == RBH);
914 ASSERT(node!=(StgClosure*)NULL); // sanity check
915 node_loc = where_is(node);
917 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
918 node, info_type(node), node_loc);
921 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
923 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
924 !end; // iterate until bqe points to a CONSTR
925 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
926 ASSERT(bqe != END_BQ_QUEUE); // sanity check
927 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
928 /* types of closures that may appear in a blocking queue */
929 ASSERT(get_itbl(bqe)->type == TSO ||
930 get_itbl(bqe)->type == CONSTR);
931 /* only BQs of an RBH end with an RBH_Save closure */
932 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
934 tso_loc = where_is((StgClosure *)bqe);
935 switch (get_itbl(bqe)->type) {
937 debugBelch(" TSO %d (%p) on [PE %d],",
938 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
941 debugBelch(" %s (IP %p),",
942 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
943 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
944 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
945 "RBH_Save_?"), get_itbl(bqe));
948 barf("Unexpected closure type %s in blocking queue of %p (%s)",
949 info_type((StgClosure *)bqe), node, info_type(node));
957 #if defined(PARALLEL_HASKELL)
964 for (i=0, tso=run_queue_hd;
965 tso != END_TSO_QUEUE;
966 i++, tso=tso->link) {