1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
53 createThread(nat size, StgInt pri)
56 createThread(Capability *cap, nat size)
62 /* sched_mutex is *not* required */
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
87 stack_size = size - TSO_STRUCT_SIZEW;
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94 SET_GRAN_HDR(tso, ThisPE);
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
104 tso->saved_errno = 0;
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
111 tso->sp = (P_)&(tso->stack) + stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
144 /* Link the new thread on the global thread list.
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
150 RELEASE_LOCK(&sched_mutex);
153 tso->dist.priority = MandatoryPriority; //by default that is...
159 tso->gran.magic = TSO_MAGIC; // debugging only
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
182 tso->par.magic = TSO_MAGIC; // debugging only
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
231 all parallel thread creation calls should fall through the following routine.
234 createThreadFromSpark(rtsSpark spark)
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
250 tso->priority = AdvisoryPriority;
252 pushClosure(tso,spark);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
268 cmp_thread(StgPtr tso1, StgPtr tso2)
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
284 rts_getThreadId(StgPtr tso)
286 return ((StgTSO *)tso)->id;
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
303 setTSOLink(cap,prev,t->_link);
310 barf("removeThreadFromQueue: not found");
314 removeThreadFromDeQueue (Capability *cap,
315 StgTSO **head, StgTSO **tail, StgTSO *tso)
320 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
323 setTSOLink(cap,prev,t->_link);
331 *tail = END_TSO_QUEUE;
337 barf("removeThreadFromMVarQueue: not found");
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
343 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
346 /* ----------------------------------------------------------------------------
349 unblock a single thread.
350 ------------------------------------------------------------------------- */
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
357 #elif defined(PARALLEL_HASKELL)
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
361 /* write RESUME events to log file and
362 update blocked and fetch time (depending on type of the orig closure) */
363 if (RtsFlags.ParFlags.ParStats.Full) {
364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
368 emitSchedule = rtsTrue;
370 switch (get_itbl(node)->type) {
372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
384 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
395 PEs node_loc, tso_loc;
397 node_loc = where_is(node); // should be lifted out of loop
398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
399 tso_loc = where_is((StgClosure *)tso);
400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404 // insertThread(tso, node_loc);
405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
407 tso, node, (rtsSpark*)NULL);
408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
411 } else { // TSO is remote (actually should be FMBQ)
412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413 RtsFlags.GranFlags.Costs.gunblocktime +
414 RtsFlags.GranFlags.Costs.latency;
415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
417 tso, node, (rtsSpark*)NULL);
418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424 (node_loc==tso_loc ? "Local" : "Global"),
425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426 tso->block_info.closure = NULL;
427 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
434 StgBlockingQueueElement *next;
436 switch (get_itbl(bqe)->type) {
438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439 /* if it's a TSO just push it onto the run_queue */
441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
444 unblockCount(bqe, node);
445 /* reset blocking status after dumping event */
446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
453 PendingFetches = (StgBlockedFetch *)bqe;
457 /* can ignore this case in a non-debugging setup;
458 see comments on RBHSave closures above */
460 /* check that the closure is an RBHSave closure */
461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
467 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
478 unblockOne (Capability *cap, StgTSO *tso)
480 return unblockOne_(cap,tso,rtsTrue); // allow migration
484 unblockOne_ (Capability *cap, StgTSO *tso,
485 rtsBool allow_migrate USED_IF_THREADS)
489 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490 ASSERT(tso->why_blocked != NotBlocked);
492 tso->why_blocked = NotBlocked;
494 tso->_link = END_TSO_QUEUE;
496 #if defined(THREADED_RTS)
497 if (tso->cap == cap || (!tsoLocked(tso) &&
499 RtsFlags.ParFlags.wakeupMigrate)) {
500 // We are waking up this thread on the current Capability, which
501 // might involve migrating it from the Capability it was last on.
503 ASSERT(tso->bound->cap == tso->cap);
504 tso->bound->cap = cap;
507 appendToRunQueue(cap,tso);
509 // context-switch soonish so we can migrate the new thread if
510 // necessary. NB. not contextSwitchCapability(cap), which would
511 // force a context switch immediately.
512 cap->context_switch = 1;
514 // we'll try to wake it up on the Capability it was last on.
515 wakeupThreadOnCapability(cap, tso->cap, tso);
518 appendToRunQueue(cap,tso);
520 // context-switch soonish so we can migrate the new thread if
521 // necessary. NB. not contextSwitchCapability(cap), which would
522 // force a context switch immediately.
523 cap->context_switch = 1;
526 debugTrace(DEBUG_sched,
527 "waking up thread %ld on cap %d",
528 (long)tso->id, tso->cap->no);
533 /* ----------------------------------------------------------------------------
536 wakes up all the threads on the specified queue.
537 ------------------------------------------------------------------------- */
541 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
543 StgBlockingQueueElement *bqe;
548 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
549 node, CurrentProc, CurrentTime[CurrentProc],
550 CurrentTSO->id, CurrentTSO));
552 node_loc = where_is(node);
554 ASSERT(q == END_BQ_QUEUE ||
555 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
556 get_itbl(q)->type == CONSTR); // closure (type constructor)
557 ASSERT(is_unique(node));
559 /* FAKE FETCH: magically copy the node to the tso's proc;
560 no Fetch necessary because in reality the node should not have been
561 moved to the other PE in the first place
563 if (CurrentProc!=node_loc) {
565 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
566 node, node_loc, CurrentProc, CurrentTSO->id,
567 // CurrentTSO, where_is(CurrentTSO),
568 node->header.gran.procs));
569 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
571 debugBelch("## new bitmask of node %p is %#x\n",
572 node, node->header.gran.procs));
573 if (RtsFlags.GranFlags.GranSimStats.Global) {
574 globalGranStats.tot_fake_fetches++;
579 // ToDo: check: ASSERT(CurrentProc==node_loc);
580 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
583 bqe points to the current element in the queue
584 next points to the next element in the queue
586 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
587 //tso_loc = where_is(tso);
589 bqe = unblockOne(bqe, node);
592 /* if this is the BQ of an RBH, we have to put back the info ripped out of
593 the closure to make room for the anchor of the BQ */
594 if (bqe!=END_BQ_QUEUE) {
595 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
597 ASSERT((info_ptr==&RBH_Save_0_info) ||
598 (info_ptr==&RBH_Save_1_info) ||
599 (info_ptr==&RBH_Save_2_info));
601 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
602 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
603 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
606 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
607 node, info_type(node)));
610 /* statistics gathering */
611 if (RtsFlags.GranFlags.GranSimStats.Global) {
612 // globalGranStats.tot_bq_processing_time += bq_processing_time;
613 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
614 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
615 globalGranStats.tot_awbq++; // total no. of bqs awakened
618 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
619 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
621 #elif defined(PARALLEL_HASKELL)
623 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
625 StgBlockingQueueElement *bqe;
627 IF_PAR_DEBUG(verbose,
628 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
632 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
633 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
638 ASSERT(q == END_BQ_QUEUE ||
639 get_itbl(q)->type == TSO ||
640 get_itbl(q)->type == BLOCKED_FETCH ||
641 get_itbl(q)->type == CONSTR);
644 while (get_itbl(bqe)->type==TSO ||
645 get_itbl(bqe)->type==BLOCKED_FETCH) {
646 bqe = unblockOne(bqe, node);
650 #else /* !GRAN && !PARALLEL_HASKELL */
653 awakenBlockedQueue(Capability *cap, StgTSO *tso)
655 while (tso != END_TSO_QUEUE) {
656 tso = unblockOne(cap,tso);
662 /* ---------------------------------------------------------------------------
663 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
664 * used by Control.Concurrent for error checking.
665 * ------------------------------------------------------------------------- */
668 rtsSupportsBoundThreads(void)
670 #if defined(THREADED_RTS)
673 return HS_BOOL_FALSE;
677 /* ---------------------------------------------------------------------------
678 * isThreadBound(tso): check whether tso is bound to an OS thread.
679 * ------------------------------------------------------------------------- */
682 isThreadBound(StgTSO* tso USED_IF_THREADS)
684 #if defined(THREADED_RTS)
685 return (tso->bound != NULL);
690 /* ----------------------------------------------------------------------------
691 * Debugging: why is a thread blocked
692 * ------------------------------------------------------------------------- */
696 printThreadBlockage(StgTSO *tso)
698 switch (tso->why_blocked) {
700 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
703 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
705 #if defined(mingw32_HOST_OS)
706 case BlockedOnDoProc:
707 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
711 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
714 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
716 case BlockedOnException:
717 debugBelch("is blocked on delivering an exception to thread %lu",
718 (unsigned long)tso->block_info.tso->id);
720 case BlockedOnBlackHole:
721 debugBelch("is blocked on a black hole");
724 debugBelch("is not blocked");
726 #if defined(PARALLEL_HASKELL)
728 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
729 tso->block_info.closure, info_type(tso->block_info.closure));
731 case BlockedOnGA_NoSend:
732 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
733 tso->block_info.closure, info_type(tso->block_info.closure));
737 debugBelch("is blocked on an external call");
739 case BlockedOnCCall_NoUnblockExc:
740 debugBelch("is blocked on an external call (exceptions were already blocked)");
743 debugBelch("is blocked on an STM operation");
746 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
747 tso->why_blocked, tso->id, tso);
752 printThreadStatus(StgTSO *t)
754 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
756 void *label = lookupThreadLabel(t->id);
757 if (label) debugBelch("[\"%s\"] ",(char *)label);
759 if (t->what_next == ThreadRelocated) {
760 debugBelch("has been relocated...\n");
762 switch (t->what_next) {
764 debugBelch("has been killed");
767 debugBelch("has completed");
770 printThreadBlockage(t);
772 if (t->flags & TSO_DIRTY) {
773 debugBelch(" (TSO_DIRTY)");
774 } else if (t->flags & TSO_LINK_DIRTY) {
775 debugBelch(" (TSO_LINK_DIRTY)");
782 printAllThreads(void)
789 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
790 ullong_format_string(TIME_ON_PROC(CurrentProc),
791 time_string, rtsFalse/*no commas!*/);
793 debugBelch("all threads at [%s]:\n", time_string);
794 # elif defined(PARALLEL_HASKELL)
795 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
796 ullong_format_string(CURRENT_TIME,
797 time_string, rtsFalse/*no commas!*/);
799 debugBelch("all threads at [%s]:\n", time_string);
801 debugBelch("all threads:\n");
804 for (i = 0; i < n_capabilities; i++) {
805 cap = &capabilities[i];
806 debugBelch("threads on capability %d:\n", cap->no);
807 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
808 printThreadStatus(t);
812 debugBelch("other threads:\n");
813 for (s = 0; s < total_steps; s++) {
814 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
815 if (t->why_blocked != NotBlocked) {
816 printThreadStatus(t);
818 if (t->what_next == ThreadRelocated) {
821 next = t->global_link;
829 printThreadQueue(StgTSO *t)
832 for (; t != END_TSO_QUEUE; t = t->_link) {
833 printThreadStatus(t);
836 debugBelch("%d threads on queue\n", i);
840 Print a whole blocking queue attached to node (debugging only).
842 # if defined(PARALLEL_HASKELL)
844 print_bq (StgClosure *node)
846 StgBlockingQueueElement *bqe;
850 debugBelch("## BQ of closure %p (%s): ",
851 node, info_type(node));
853 /* should cover all closures that may have a blocking queue */
854 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
855 get_itbl(node)->type == FETCH_ME_BQ ||
856 get_itbl(node)->type == RBH ||
857 get_itbl(node)->type == MVAR);
859 ASSERT(node!=(StgClosure*)NULL); // sanity check
861 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
865 Print a whole blocking queue starting with the element bqe.
868 print_bqe (StgBlockingQueueElement *bqe)
873 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
875 for (end = (bqe==END_BQ_QUEUE);
876 !end; // iterate until bqe points to a CONSTR
877 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
878 bqe = end ? END_BQ_QUEUE : bqe->link) {
879 ASSERT(bqe != END_BQ_QUEUE); // sanity check
880 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
881 /* types of closures that may appear in a blocking queue */
882 ASSERT(get_itbl(bqe)->type == TSO ||
883 get_itbl(bqe)->type == BLOCKED_FETCH ||
884 get_itbl(bqe)->type == CONSTR);
885 /* only BQs of an RBH end with an RBH_Save closure */
886 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
888 switch (get_itbl(bqe)->type) {
890 debugBelch(" TSO %u (%x),",
891 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
894 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
895 ((StgBlockedFetch *)bqe)->node,
896 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
897 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
898 ((StgBlockedFetch *)bqe)->ga.weight);
901 debugBelch(" %s (IP %p),",
902 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
903 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
904 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
905 "RBH_Save_?"), get_itbl(bqe));
908 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
909 info_type((StgClosure *)bqe)); // , node, info_type(node));
917 print_bq (StgClosure *node)
919 StgBlockingQueueElement *bqe;
920 PEs node_loc, tso_loc;
923 /* should cover all closures that may have a blocking queue */
924 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
925 get_itbl(node)->type == FETCH_ME_BQ ||
926 get_itbl(node)->type == RBH);
928 ASSERT(node!=(StgClosure*)NULL); // sanity check
929 node_loc = where_is(node);
931 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
932 node, info_type(node), node_loc);
935 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
937 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
938 !end; // iterate until bqe points to a CONSTR
939 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
940 ASSERT(bqe != END_BQ_QUEUE); // sanity check
941 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
942 /* types of closures that may appear in a blocking queue */
943 ASSERT(get_itbl(bqe)->type == TSO ||
944 get_itbl(bqe)->type == CONSTR);
945 /* only BQs of an RBH end with an RBH_Save closure */
946 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
948 tso_loc = where_is((StgClosure *)bqe);
949 switch (get_itbl(bqe)->type) {
951 debugBelch(" TSO %d (%p) on [PE %d],",
952 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
955 debugBelch(" %s (IP %p),",
956 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
957 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
958 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
959 "RBH_Save_?"), get_itbl(bqe));
962 barf("Unexpected closure type %s in blocking queue of %p (%s)",
963 info_type((StgClosure *)bqe), node, info_type(node));
971 #if defined(PARALLEL_HASKELL)
978 for (i=0, tso=run_queue_hd;
979 tso != END_TSO_QUEUE;
980 i++, tso=tso->link) {