1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
53 createThread(nat size, StgInt pri)
56 createThread(Capability *cap, nat size)
62 /* sched_mutex is *not* required */
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
87 size = round_to_mblocks(size);
88 tso = (StgTSO *)allocateLocal(cap, size);
90 stack_size = size - TSO_STRUCT_SIZEW;
91 TICK_ALLOC_TSO(stack_size, 0);
93 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
95 SET_GRAN_HDR(tso, ThisPE);
98 // Always start with the compiled code evaluator
99 tso->what_next = ThreadRunGHC;
101 tso->why_blocked = NotBlocked;
102 tso->blocked_exceptions = END_TSO_QUEUE;
103 tso->flags = TSO_DIRTY;
105 tso->saved_errno = 0;
109 tso->stack_size = stack_size;
110 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
112 tso->sp = (P_)&(tso->stack) + stack_size;
117 tso->prof.CCCS = CCS_MAIN;
120 /* put a stop frame on the stack */
121 tso->sp -= sizeofW(StgStopFrame);
122 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
123 tso->_link = END_TSO_QUEUE;
127 /* uses more flexible routine in GranSim */
128 insertThread(tso, CurrentProc);
130 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
136 if (RtsFlags.GranFlags.GranSimStats.Full)
137 DumpGranEvent(GR_START,tso);
138 #elif defined(PARALLEL_HASKELL)
139 if (RtsFlags.ParFlags.ParStats.Full)
140 DumpGranEvent(GR_STARTQ,tso);
141 /* HACk to avoid SCHEDULE
145 /* Link the new thread on the global thread list.
147 ACQUIRE_LOCK(&sched_mutex);
148 tso->id = next_thread_id++; // while we have the mutex
149 tso->global_link = g0s0->threads;
151 RELEASE_LOCK(&sched_mutex);
154 tso->dist.priority = MandatoryPriority; //by default that is...
160 tso->gran.magic = TSO_MAGIC; // debugging only
162 tso->gran.sparkname = 0;
163 tso->gran.startedat = CURRENT_TIME;
164 tso->gran.exported = 0;
165 tso->gran.basicblocks = 0;
166 tso->gran.allocs = 0;
167 tso->gran.exectime = 0;
168 tso->gran.fetchtime = 0;
169 tso->gran.fetchcount = 0;
170 tso->gran.blocktime = 0;
171 tso->gran.blockcount = 0;
172 tso->gran.blockedat = 0;
173 tso->gran.globalsparks = 0;
174 tso->gran.localsparks = 0;
175 if (RtsFlags.GranFlags.Light)
176 tso->gran.clock = Now; /* local clock */
180 IF_DEBUG(gran,printTSO(tso));
181 #elif defined(PARALLEL_HASKELL)
183 tso->par.magic = TSO_MAGIC; // debugging only
185 tso->par.sparkname = 0;
186 tso->par.startedat = CURRENT_TIME;
187 tso->par.exported = 0;
188 tso->par.basicblocks = 0;
190 tso->par.exectime = 0;
191 tso->par.fetchtime = 0;
192 tso->par.fetchcount = 0;
193 tso->par.blocktime = 0;
194 tso->par.blockcount = 0;
195 tso->par.blockedat = 0;
196 tso->par.globalsparks = 0;
197 tso->par.localsparks = 0;
201 globalGranStats.tot_threads_created++;
202 globalGranStats.threads_created_on_PE[CurrentProc]++;
203 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
204 globalGranStats.tot_sq_probes++;
205 #elif defined(PARALLEL_HASKELL)
206 // collect parallel global statistics (currently done together with GC stats)
207 if (RtsFlags.ParFlags.ParStats.Global &&
208 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
209 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
210 globalParStats.tot_threads_created++;
214 postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
217 debugTrace(GRAN_DEBUG_pri,
218 "==__ schedule: Created TSO %d (%p);",
219 CurrentProc, tso, tso->id);
220 #elif defined(PARALLEL_HASKELL)
221 debugTrace(PAR_DEBUG_verbose,
222 "==__ schedule: Created TSO %d (%p); %d threads active",
223 (long)tso->id, tso, advisory_thread_count);
225 debugTrace(DEBUG_sched,
226 "created thread %ld, stack size = %lx words",
227 (long)tso->id, (long)tso->stack_size);
234 all parallel thread creation calls should fall through the following routine.
237 createThreadFromSpark(rtsSpark spark)
239 ASSERT(spark != (rtsSpark)NULL);
240 // JB: TAKE CARE OF THIS COUNTER! BUGGY
241 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
243 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
244 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
245 return END_TSO_QUEUE;
249 tso = createThread(RtsFlags.GcFlags.initialStkSize);
250 if (tso==END_TSO_QUEUE)
251 barf("createSparkThread: Cannot create TSO");
253 tso->priority = AdvisoryPriority;
255 pushClosure(tso,spark);
257 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
263 /* ---------------------------------------------------------------------------
264 * Comparing Thread ids.
266 * This is used from STG land in the implementation of the
267 * instances of Eq/Ord for ThreadIds.
268 * ------------------------------------------------------------------------ */
271 cmp_thread(StgPtr tso1, StgPtr tso2)
273 StgThreadID id1 = ((StgTSO *)tso1)->id;
274 StgThreadID id2 = ((StgTSO *)tso2)->id;
276 if (id1 < id2) return (-1);
277 if (id1 > id2) return 1;
281 /* ---------------------------------------------------------------------------
282 * Fetching the ThreadID from an StgTSO.
284 * This is used in the implementation of Show for ThreadIds.
285 * ------------------------------------------------------------------------ */
287 rts_getThreadId(StgPtr tso)
289 return ((StgTSO *)tso)->id;
292 /* -----------------------------------------------------------------------------
293 Remove a thread from a queue.
294 Fails fatally if the TSO is not on the queue.
295 -------------------------------------------------------------------------- */
298 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
303 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
306 setTSOLink(cap,prev,t->_link);
313 barf("removeThreadFromQueue: not found");
317 removeThreadFromDeQueue (Capability *cap,
318 StgTSO **head, StgTSO **tail, StgTSO *tso)
323 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
326 setTSOLink(cap,prev,t->_link);
334 *tail = END_TSO_QUEUE;
340 barf("removeThreadFromMVarQueue: not found");
344 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
346 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
349 /* ----------------------------------------------------------------------------
352 unblock a single thread.
353 ------------------------------------------------------------------------- */
357 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 #elif defined(PARALLEL_HASKELL)
362 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
364 /* write RESUME events to log file and
365 update blocked and fetch time (depending on type of the orig closure) */
366 if (RtsFlags.ParFlags.ParStats.Full) {
367 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
368 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
369 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
371 emitSchedule = rtsTrue;
373 switch (get_itbl(node)->type) {
375 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
380 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
387 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
394 StgBlockingQueueElement *
395 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
398 PEs node_loc, tso_loc;
400 node_loc = where_is(node); // should be lifted out of loop
401 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
402 tso_loc = where_is((StgClosure *)tso);
403 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
404 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
405 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
406 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
407 // insertThread(tso, node_loc);
408 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
410 tso, node, (rtsSpark*)NULL);
411 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
414 } else { // TSO is remote (actually should be FMBQ)
415 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
416 RtsFlags.GranFlags.Costs.gunblocktime +
417 RtsFlags.GranFlags.Costs.latency;
418 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
420 tso, node, (rtsSpark*)NULL);
421 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
424 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
426 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
427 (node_loc==tso_loc ? "Local" : "Global"),
428 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
429 tso->block_info.closure = NULL;
430 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
433 #elif defined(PARALLEL_HASKELL)
434 StgBlockingQueueElement *
435 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
437 StgBlockingQueueElement *next;
439 switch (get_itbl(bqe)->type) {
441 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
442 /* if it's a TSO just push it onto the run_queue */
444 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
445 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
447 unblockCount(bqe, node);
448 /* reset blocking status after dumping event */
449 ((StgTSO *)bqe)->why_blocked = NotBlocked;
453 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
455 bqe->link = (StgBlockingQueueElement *)PendingFetches;
456 PendingFetches = (StgBlockedFetch *)bqe;
460 /* can ignore this case in a non-debugging setup;
461 see comments on RBHSave closures above */
463 /* check that the closure is an RBHSave closure */
464 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
465 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
466 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
470 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
471 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
475 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
481 unblockOne (Capability *cap, StgTSO *tso)
483 return unblockOne_(cap,tso,rtsTrue); // allow migration
487 unblockOne_ (Capability *cap, StgTSO *tso,
488 rtsBool allow_migrate USED_IF_THREADS)
492 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
493 ASSERT(tso->why_blocked != NotBlocked);
495 tso->why_blocked = NotBlocked;
497 tso->_link = END_TSO_QUEUE;
499 #if defined(THREADED_RTS)
500 if (tso->cap == cap || (!tsoLocked(tso) &&
502 RtsFlags.ParFlags.wakeupMigrate)) {
503 // We are waking up this thread on the current Capability, which
504 // might involve migrating it from the Capability it was last on.
506 ASSERT(tso->bound->cap == tso->cap);
507 tso->bound->cap = cap;
511 appendToRunQueue(cap,tso);
513 // context-switch soonish so we can migrate the new thread if
514 // necessary. NB. not contextSwitchCapability(cap), which would
515 // force a context switch immediately.
516 cap->context_switch = 1;
518 // we'll try to wake it up on the Capability it was last on.
519 wakeupThreadOnCapability(cap, tso->cap, tso);
522 appendToRunQueue(cap,tso);
524 // context-switch soonish so we can migrate the new thread if
525 // necessary. NB. not contextSwitchCapability(cap), which would
526 // force a context switch immediately.
527 cap->context_switch = 1;
530 postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
532 debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
533 (long)tso->id, tso->cap->no);
538 /* ----------------------------------------------------------------------------
541 wakes up all the threads on the specified queue.
542 ------------------------------------------------------------------------- */
546 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
548 StgBlockingQueueElement *bqe;
553 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
554 node, CurrentProc, CurrentTime[CurrentProc],
555 CurrentTSO->id, CurrentTSO));
557 node_loc = where_is(node);
559 ASSERT(q == END_BQ_QUEUE ||
560 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
561 get_itbl(q)->type == CONSTR); // closure (type constructor)
562 ASSERT(is_unique(node));
564 /* FAKE FETCH: magically copy the node to the tso's proc;
565 no Fetch necessary because in reality the node should not have been
566 moved to the other PE in the first place
568 if (CurrentProc!=node_loc) {
570 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
571 node, node_loc, CurrentProc, CurrentTSO->id,
572 // CurrentTSO, where_is(CurrentTSO),
573 node->header.gran.procs));
574 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
576 debugBelch("## new bitmask of node %p is %#x\n",
577 node, node->header.gran.procs));
578 if (RtsFlags.GranFlags.GranSimStats.Global) {
579 globalGranStats.tot_fake_fetches++;
584 // ToDo: check: ASSERT(CurrentProc==node_loc);
585 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
588 bqe points to the current element in the queue
589 next points to the next element in the queue
591 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
592 //tso_loc = where_is(tso);
594 bqe = unblockOne(bqe, node);
597 /* if this is the BQ of an RBH, we have to put back the info ripped out of
598 the closure to make room for the anchor of the BQ */
599 if (bqe!=END_BQ_QUEUE) {
600 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
602 ASSERT((info_ptr==&RBH_Save_0_info) ||
603 (info_ptr==&RBH_Save_1_info) ||
604 (info_ptr==&RBH_Save_2_info));
606 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
607 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
608 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
611 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
612 node, info_type(node)));
615 /* statistics gathering */
616 if (RtsFlags.GranFlags.GranSimStats.Global) {
617 // globalGranStats.tot_bq_processing_time += bq_processing_time;
618 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
619 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
620 globalGranStats.tot_awbq++; // total no. of bqs awakened
623 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
624 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
626 #elif defined(PARALLEL_HASKELL)
628 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
630 StgBlockingQueueElement *bqe;
632 IF_PAR_DEBUG(verbose,
633 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
637 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
638 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
643 ASSERT(q == END_BQ_QUEUE ||
644 get_itbl(q)->type == TSO ||
645 get_itbl(q)->type == BLOCKED_FETCH ||
646 get_itbl(q)->type == CONSTR);
649 while (get_itbl(bqe)->type==TSO ||
650 get_itbl(bqe)->type==BLOCKED_FETCH) {
651 bqe = unblockOne(bqe, node);
655 #else /* !GRAN && !PARALLEL_HASKELL */
658 awakenBlockedQueue(Capability *cap, StgTSO *tso)
660 while (tso != END_TSO_QUEUE) {
661 tso = unblockOne(cap,tso);
667 /* ---------------------------------------------------------------------------
668 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
669 * used by Control.Concurrent for error checking.
670 * ------------------------------------------------------------------------- */
673 rtsSupportsBoundThreads(void)
675 #if defined(THREADED_RTS)
678 return HS_BOOL_FALSE;
682 /* ---------------------------------------------------------------------------
683 * isThreadBound(tso): check whether tso is bound to an OS thread.
684 * ------------------------------------------------------------------------- */
687 isThreadBound(StgTSO* tso USED_IF_THREADS)
689 #if defined(THREADED_RTS)
690 return (tso->bound != NULL);
695 /* ----------------------------------------------------------------------------
696 * Debugging: why is a thread blocked
697 * ------------------------------------------------------------------------- */
701 printThreadBlockage(StgTSO *tso)
703 switch (tso->why_blocked) {
705 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
708 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
710 #if defined(mingw32_HOST_OS)
711 case BlockedOnDoProc:
712 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
716 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
719 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
721 case BlockedOnException:
722 debugBelch("is blocked on delivering an exception to thread %lu",
723 (unsigned long)tso->block_info.tso->id);
725 case BlockedOnBlackHole:
726 debugBelch("is blocked on a black hole");
729 debugBelch("is not blocked");
731 #if defined(PARALLEL_HASKELL)
733 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
734 tso->block_info.closure, info_type(tso->block_info.closure));
736 case BlockedOnGA_NoSend:
737 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
738 tso->block_info.closure, info_type(tso->block_info.closure));
742 debugBelch("is blocked on an external call");
744 case BlockedOnCCall_NoUnblockExc:
745 debugBelch("is blocked on an external call (exceptions were already blocked)");
748 debugBelch("is blocked on an STM operation");
751 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
752 tso->why_blocked, tso->id, tso);
757 printThreadStatus(StgTSO *t)
759 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
761 void *label = lookupThreadLabel(t->id);
762 if (label) debugBelch("[\"%s\"] ",(char *)label);
764 if (t->what_next == ThreadRelocated) {
765 debugBelch("has been relocated...\n");
767 switch (t->what_next) {
769 debugBelch("has been killed");
772 debugBelch("has completed");
775 printThreadBlockage(t);
777 if (t->flags & TSO_DIRTY) {
778 debugBelch(" (TSO_DIRTY)");
779 } else if (t->flags & TSO_LINK_DIRTY) {
780 debugBelch(" (TSO_LINK_DIRTY)");
787 printAllThreads(void)
794 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
795 ullong_format_string(TIME_ON_PROC(CurrentProc),
796 time_string, rtsFalse/*no commas!*/);
798 debugBelch("all threads at [%s]:\n", time_string);
799 # elif defined(PARALLEL_HASKELL)
800 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
801 ullong_format_string(CURRENT_TIME,
802 time_string, rtsFalse/*no commas!*/);
804 debugBelch("all threads at [%s]:\n", time_string);
806 debugBelch("all threads:\n");
809 for (i = 0; i < n_capabilities; i++) {
810 cap = &capabilities[i];
811 debugBelch("threads on capability %d:\n", cap->no);
812 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
813 printThreadStatus(t);
817 debugBelch("other threads:\n");
818 for (s = 0; s < total_steps; s++) {
819 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
820 if (t->why_blocked != NotBlocked) {
821 printThreadStatus(t);
823 if (t->what_next == ThreadRelocated) {
826 next = t->global_link;
834 printThreadQueue(StgTSO *t)
837 for (; t != END_TSO_QUEUE; t = t->_link) {
838 printThreadStatus(t);
841 debugBelch("%d threads on queue\n", i);
845 Print a whole blocking queue attached to node (debugging only).
847 # if defined(PARALLEL_HASKELL)
849 print_bq (StgClosure *node)
851 StgBlockingQueueElement *bqe;
855 debugBelch("## BQ of closure %p (%s): ",
856 node, info_type(node));
858 /* should cover all closures that may have a blocking queue */
859 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
860 get_itbl(node)->type == FETCH_ME_BQ ||
861 get_itbl(node)->type == RBH ||
862 get_itbl(node)->type == MVAR);
864 ASSERT(node!=(StgClosure*)NULL); // sanity check
866 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
870 Print a whole blocking queue starting with the element bqe.
873 print_bqe (StgBlockingQueueElement *bqe)
878 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
880 for (end = (bqe==END_BQ_QUEUE);
881 !end; // iterate until bqe points to a CONSTR
882 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
883 bqe = end ? END_BQ_QUEUE : bqe->link) {
884 ASSERT(bqe != END_BQ_QUEUE); // sanity check
885 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
886 /* types of closures that may appear in a blocking queue */
887 ASSERT(get_itbl(bqe)->type == TSO ||
888 get_itbl(bqe)->type == BLOCKED_FETCH ||
889 get_itbl(bqe)->type == CONSTR);
890 /* only BQs of an RBH end with an RBH_Save closure */
891 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
893 switch (get_itbl(bqe)->type) {
895 debugBelch(" TSO %u (%x),",
896 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
899 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
900 ((StgBlockedFetch *)bqe)->node,
901 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
902 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
903 ((StgBlockedFetch *)bqe)->ga.weight);
906 debugBelch(" %s (IP %p),",
907 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
908 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
909 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
910 "RBH_Save_?"), get_itbl(bqe));
913 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
914 info_type((StgClosure *)bqe)); // , node, info_type(node));
922 print_bq (StgClosure *node)
924 StgBlockingQueueElement *bqe;
925 PEs node_loc, tso_loc;
928 /* should cover all closures that may have a blocking queue */
929 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
930 get_itbl(node)->type == FETCH_ME_BQ ||
931 get_itbl(node)->type == RBH);
933 ASSERT(node!=(StgClosure*)NULL); // sanity check
934 node_loc = where_is(node);
936 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
937 node, info_type(node), node_loc);
940 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
942 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
943 !end; // iterate until bqe points to a CONSTR
944 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
945 ASSERT(bqe != END_BQ_QUEUE); // sanity check
946 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
947 /* types of closures that may appear in a blocking queue */
948 ASSERT(get_itbl(bqe)->type == TSO ||
949 get_itbl(bqe)->type == CONSTR);
950 /* only BQs of an RBH end with an RBH_Save closure */
951 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
953 tso_loc = where_is((StgClosure *)bqe);
954 switch (get_itbl(bqe)->type) {
956 debugBelch(" TSO %d (%p) on [PE %d],",
957 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
960 debugBelch(" %s (IP %p),",
961 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
962 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
963 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
964 "RBH_Save_?"), get_itbl(bqe));
967 barf("Unexpected closure type %s in blocking queue of %p (%s)",
968 info_type((StgClosure *)bqe), node, info_type(node));
976 #if defined(PARALLEL_HASKELL)
983 for (i=0, tso=run_queue_hd;
984 tso != END_TSO_QUEUE;
985 i++, tso=tso->link) {