1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
53 createThread(nat size, StgInt pri)
56 createThread(Capability *cap, nat size)
62 /* sched_mutex is *not* required */
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
87 stack_size = size - TSO_STRUCT_SIZEW;
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94 SET_GRAN_HDR(tso, ThisPE);
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
104 tso->saved_errno = 0;
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
111 tso->sp = (P_)&(tso->stack) + stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
144 /* Link the new thread on the global thread list.
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
150 RELEASE_LOCK(&sched_mutex);
153 tso->dist.priority = MandatoryPriority; //by default that is...
159 tso->gran.magic = TSO_MAGIC; // debugging only
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
182 tso->par.magic = TSO_MAGIC; // debugging only
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
213 postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
216 debugTrace(GRAN_DEBUG_pri,
217 "==__ schedule: Created TSO %d (%p);",
218 CurrentProc, tso, tso->id);
219 #elif defined(PARALLEL_HASKELL)
220 debugTrace(PAR_DEBUG_verbose,
221 "==__ schedule: Created TSO %d (%p); %d threads active",
222 (long)tso->id, tso, advisory_thread_count);
224 debugTrace(DEBUG_sched,
225 "created thread %ld, stack size = %lx words",
226 (long)tso->id, (long)tso->stack_size);
233 all parallel thread creation calls should fall through the following routine.
236 createThreadFromSpark(rtsSpark spark)
238 ASSERT(spark != (rtsSpark)NULL);
239 // JB: TAKE CARE OF THIS COUNTER! BUGGY
240 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
242 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
243 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
244 return END_TSO_QUEUE;
248 tso = createThread(RtsFlags.GcFlags.initialStkSize);
249 if (tso==END_TSO_QUEUE)
250 barf("createSparkThread: Cannot create TSO");
252 tso->priority = AdvisoryPriority;
254 pushClosure(tso,spark);
256 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
262 /* ---------------------------------------------------------------------------
263 * Comparing Thread ids.
265 * This is used from STG land in the implementation of the
266 * instances of Eq/Ord for ThreadIds.
267 * ------------------------------------------------------------------------ */
270 cmp_thread(StgPtr tso1, StgPtr tso2)
272 StgThreadID id1 = ((StgTSO *)tso1)->id;
273 StgThreadID id2 = ((StgTSO *)tso2)->id;
275 if (id1 < id2) return (-1);
276 if (id1 > id2) return 1;
280 /* ---------------------------------------------------------------------------
281 * Fetching the ThreadID from an StgTSO.
283 * This is used in the implementation of Show for ThreadIds.
284 * ------------------------------------------------------------------------ */
286 rts_getThreadId(StgPtr tso)
288 return ((StgTSO *)tso)->id;
291 /* -----------------------------------------------------------------------------
292 Remove a thread from a queue.
293 Fails fatally if the TSO is not on the queue.
294 -------------------------------------------------------------------------- */
297 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
302 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
305 setTSOLink(cap,prev,t->_link);
312 barf("removeThreadFromQueue: not found");
316 removeThreadFromDeQueue (Capability *cap,
317 StgTSO **head, StgTSO **tail, StgTSO *tso)
322 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
325 setTSOLink(cap,prev,t->_link);
333 *tail = END_TSO_QUEUE;
339 barf("removeThreadFromMVarQueue: not found");
343 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
345 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
348 /* ----------------------------------------------------------------------------
351 unblock a single thread.
352 ------------------------------------------------------------------------- */
356 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
359 #elif defined(PARALLEL_HASKELL)
361 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
363 /* write RESUME events to log file and
364 update blocked and fetch time (depending on type of the orig closure) */
365 if (RtsFlags.ParFlags.ParStats.Full) {
366 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
367 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
368 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
370 emitSchedule = rtsTrue;
372 switch (get_itbl(node)->type) {
374 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
379 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
386 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
393 StgBlockingQueueElement *
394 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
397 PEs node_loc, tso_loc;
399 node_loc = where_is(node); // should be lifted out of loop
400 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
401 tso_loc = where_is((StgClosure *)tso);
402 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
403 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
404 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
405 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
406 // insertThread(tso, node_loc);
407 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
409 tso, node, (rtsSpark*)NULL);
410 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
413 } else { // TSO is remote (actually should be FMBQ)
414 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
415 RtsFlags.GranFlags.Costs.gunblocktime +
416 RtsFlags.GranFlags.Costs.latency;
417 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
419 tso, node, (rtsSpark*)NULL);
420 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
423 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
425 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
426 (node_loc==tso_loc ? "Local" : "Global"),
427 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
428 tso->block_info.closure = NULL;
429 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
432 #elif defined(PARALLEL_HASKELL)
433 StgBlockingQueueElement *
434 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
436 StgBlockingQueueElement *next;
438 switch (get_itbl(bqe)->type) {
440 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
441 /* if it's a TSO just push it onto the run_queue */
443 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
444 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
446 unblockCount(bqe, node);
447 /* reset blocking status after dumping event */
448 ((StgTSO *)bqe)->why_blocked = NotBlocked;
452 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
454 bqe->link = (StgBlockingQueueElement *)PendingFetches;
455 PendingFetches = (StgBlockedFetch *)bqe;
459 /* can ignore this case in a non-debugging setup;
460 see comments on RBHSave closures above */
462 /* check that the closure is an RBHSave closure */
463 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
464 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
465 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
469 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
470 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
474 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
480 unblockOne (Capability *cap, StgTSO *tso)
482 return unblockOne_(cap,tso,rtsTrue); // allow migration
486 unblockOne_ (Capability *cap, StgTSO *tso,
487 rtsBool allow_migrate USED_IF_THREADS)
491 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
492 ASSERT(tso->why_blocked != NotBlocked);
494 tso->why_blocked = NotBlocked;
496 tso->_link = END_TSO_QUEUE;
498 #if defined(THREADED_RTS)
499 if (tso->cap == cap || (!tsoLocked(tso) &&
501 RtsFlags.ParFlags.wakeupMigrate)) {
502 // We are waking up this thread on the current Capability, which
503 // might involve migrating it from the Capability it was last on.
505 ASSERT(tso->bound->cap == tso->cap);
506 tso->bound->cap = cap;
510 appendToRunQueue(cap,tso);
512 // context-switch soonish so we can migrate the new thread if
513 // necessary. NB. not contextSwitchCapability(cap), which would
514 // force a context switch immediately.
515 cap->context_switch = 1;
517 // we'll try to wake it up on the Capability it was last on.
518 wakeupThreadOnCapability(cap, tso->cap, tso);
521 appendToRunQueue(cap,tso);
523 // context-switch soonish so we can migrate the new thread if
524 // necessary. NB. not contextSwitchCapability(cap), which would
525 // force a context switch immediately.
526 cap->context_switch = 1;
529 postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
531 debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
532 (long)tso->id, tso->cap->no);
537 /* ----------------------------------------------------------------------------
540 wakes up all the threads on the specified queue.
541 ------------------------------------------------------------------------- */
545 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
547 StgBlockingQueueElement *bqe;
552 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
553 node, CurrentProc, CurrentTime[CurrentProc],
554 CurrentTSO->id, CurrentTSO));
556 node_loc = where_is(node);
558 ASSERT(q == END_BQ_QUEUE ||
559 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
560 get_itbl(q)->type == CONSTR); // closure (type constructor)
561 ASSERT(is_unique(node));
563 /* FAKE FETCH: magically copy the node to the tso's proc;
564 no Fetch necessary because in reality the node should not have been
565 moved to the other PE in the first place
567 if (CurrentProc!=node_loc) {
569 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
570 node, node_loc, CurrentProc, CurrentTSO->id,
571 // CurrentTSO, where_is(CurrentTSO),
572 node->header.gran.procs));
573 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
575 debugBelch("## new bitmask of node %p is %#x\n",
576 node, node->header.gran.procs));
577 if (RtsFlags.GranFlags.GranSimStats.Global) {
578 globalGranStats.tot_fake_fetches++;
583 // ToDo: check: ASSERT(CurrentProc==node_loc);
584 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
587 bqe points to the current element in the queue
588 next points to the next element in the queue
590 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
591 //tso_loc = where_is(tso);
593 bqe = unblockOne(bqe, node);
596 /* if this is the BQ of an RBH, we have to put back the info ripped out of
597 the closure to make room for the anchor of the BQ */
598 if (bqe!=END_BQ_QUEUE) {
599 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
601 ASSERT((info_ptr==&RBH_Save_0_info) ||
602 (info_ptr==&RBH_Save_1_info) ||
603 (info_ptr==&RBH_Save_2_info));
605 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
606 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
607 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
610 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
611 node, info_type(node)));
614 /* statistics gathering */
615 if (RtsFlags.GranFlags.GranSimStats.Global) {
616 // globalGranStats.tot_bq_processing_time += bq_processing_time;
617 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
618 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
619 globalGranStats.tot_awbq++; // total no. of bqs awakened
622 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
623 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
625 #elif defined(PARALLEL_HASKELL)
627 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
629 StgBlockingQueueElement *bqe;
631 IF_PAR_DEBUG(verbose,
632 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
636 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
637 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
642 ASSERT(q == END_BQ_QUEUE ||
643 get_itbl(q)->type == TSO ||
644 get_itbl(q)->type == BLOCKED_FETCH ||
645 get_itbl(q)->type == CONSTR);
648 while (get_itbl(bqe)->type==TSO ||
649 get_itbl(bqe)->type==BLOCKED_FETCH) {
650 bqe = unblockOne(bqe, node);
654 #else /* !GRAN && !PARALLEL_HASKELL */
657 awakenBlockedQueue(Capability *cap, StgTSO *tso)
659 while (tso != END_TSO_QUEUE) {
660 tso = unblockOne(cap,tso);
666 /* ---------------------------------------------------------------------------
667 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
668 * used by Control.Concurrent for error checking.
669 * ------------------------------------------------------------------------- */
672 rtsSupportsBoundThreads(void)
674 #if defined(THREADED_RTS)
677 return HS_BOOL_FALSE;
681 /* ---------------------------------------------------------------------------
682 * isThreadBound(tso): check whether tso is bound to an OS thread.
683 * ------------------------------------------------------------------------- */
686 isThreadBound(StgTSO* tso USED_IF_THREADS)
688 #if defined(THREADED_RTS)
689 return (tso->bound != NULL);
694 /* ----------------------------------------------------------------------------
695 * Debugging: why is a thread blocked
696 * ------------------------------------------------------------------------- */
700 printThreadBlockage(StgTSO *tso)
702 switch (tso->why_blocked) {
704 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
707 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
709 #if defined(mingw32_HOST_OS)
710 case BlockedOnDoProc:
711 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
715 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
718 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
720 case BlockedOnException:
721 debugBelch("is blocked on delivering an exception to thread %lu",
722 (unsigned long)tso->block_info.tso->id);
724 case BlockedOnBlackHole:
725 debugBelch("is blocked on a black hole");
728 debugBelch("is not blocked");
730 #if defined(PARALLEL_HASKELL)
732 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
733 tso->block_info.closure, info_type(tso->block_info.closure));
735 case BlockedOnGA_NoSend:
736 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
737 tso->block_info.closure, info_type(tso->block_info.closure));
741 debugBelch("is blocked on an external call");
743 case BlockedOnCCall_NoUnblockExc:
744 debugBelch("is blocked on an external call (exceptions were already blocked)");
747 debugBelch("is blocked on an STM operation");
750 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
751 tso->why_blocked, tso->id, tso);
756 printThreadStatus(StgTSO *t)
758 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
760 void *label = lookupThreadLabel(t->id);
761 if (label) debugBelch("[\"%s\"] ",(char *)label);
763 if (t->what_next == ThreadRelocated) {
764 debugBelch("has been relocated...\n");
766 switch (t->what_next) {
768 debugBelch("has been killed");
771 debugBelch("has completed");
774 printThreadBlockage(t);
776 if (t->flags & TSO_DIRTY) {
777 debugBelch(" (TSO_DIRTY)");
778 } else if (t->flags & TSO_LINK_DIRTY) {
779 debugBelch(" (TSO_LINK_DIRTY)");
786 printAllThreads(void)
793 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
794 ullong_format_string(TIME_ON_PROC(CurrentProc),
795 time_string, rtsFalse/*no commas!*/);
797 debugBelch("all threads at [%s]:\n", time_string);
798 # elif defined(PARALLEL_HASKELL)
799 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
800 ullong_format_string(CURRENT_TIME,
801 time_string, rtsFalse/*no commas!*/);
803 debugBelch("all threads at [%s]:\n", time_string);
805 debugBelch("all threads:\n");
808 for (i = 0; i < n_capabilities; i++) {
809 cap = &capabilities[i];
810 debugBelch("threads on capability %d:\n", cap->no);
811 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
812 printThreadStatus(t);
816 debugBelch("other threads:\n");
817 for (s = 0; s < total_steps; s++) {
818 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
819 if (t->why_blocked != NotBlocked) {
820 printThreadStatus(t);
822 if (t->what_next == ThreadRelocated) {
825 next = t->global_link;
833 printThreadQueue(StgTSO *t)
836 for (; t != END_TSO_QUEUE; t = t->_link) {
837 printThreadStatus(t);
840 debugBelch("%d threads on queue\n", i);
844 Print a whole blocking queue attached to node (debugging only).
846 # if defined(PARALLEL_HASKELL)
848 print_bq (StgClosure *node)
850 StgBlockingQueueElement *bqe;
854 debugBelch("## BQ of closure %p (%s): ",
855 node, info_type(node));
857 /* should cover all closures that may have a blocking queue */
858 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
859 get_itbl(node)->type == FETCH_ME_BQ ||
860 get_itbl(node)->type == RBH ||
861 get_itbl(node)->type == MVAR);
863 ASSERT(node!=(StgClosure*)NULL); // sanity check
865 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
869 Print a whole blocking queue starting with the element bqe.
872 print_bqe (StgBlockingQueueElement *bqe)
877 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
879 for (end = (bqe==END_BQ_QUEUE);
880 !end; // iterate until bqe points to a CONSTR
881 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
882 bqe = end ? END_BQ_QUEUE : bqe->link) {
883 ASSERT(bqe != END_BQ_QUEUE); // sanity check
884 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
885 /* types of closures that may appear in a blocking queue */
886 ASSERT(get_itbl(bqe)->type == TSO ||
887 get_itbl(bqe)->type == BLOCKED_FETCH ||
888 get_itbl(bqe)->type == CONSTR);
889 /* only BQs of an RBH end with an RBH_Save closure */
890 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
892 switch (get_itbl(bqe)->type) {
894 debugBelch(" TSO %u (%x),",
895 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
898 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
899 ((StgBlockedFetch *)bqe)->node,
900 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
901 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
902 ((StgBlockedFetch *)bqe)->ga.weight);
905 debugBelch(" %s (IP %p),",
906 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
907 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
908 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
909 "RBH_Save_?"), get_itbl(bqe));
912 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
913 info_type((StgClosure *)bqe)); // , node, info_type(node));
921 print_bq (StgClosure *node)
923 StgBlockingQueueElement *bqe;
924 PEs node_loc, tso_loc;
927 /* should cover all closures that may have a blocking queue */
928 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
929 get_itbl(node)->type == FETCH_ME_BQ ||
930 get_itbl(node)->type == RBH);
932 ASSERT(node!=(StgClosure*)NULL); // sanity check
933 node_loc = where_is(node);
935 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
936 node, info_type(node), node_loc);
939 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
941 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
942 !end; // iterate until bqe points to a CONSTR
943 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
944 ASSERT(bqe != END_BQ_QUEUE); // sanity check
945 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
946 /* types of closures that may appear in a blocking queue */
947 ASSERT(get_itbl(bqe)->type == TSO ||
948 get_itbl(bqe)->type == CONSTR);
949 /* only BQs of an RBH end with an RBH_Save closure */
950 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
952 tso_loc = where_is((StgClosure *)bqe);
953 switch (get_itbl(bqe)->type) {
955 debugBelch(" TSO %d (%p) on [PE %d],",
956 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
959 debugBelch(" %s (IP %p),",
960 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
961 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
962 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
963 "RBH_Save_?"), get_itbl(bqe));
966 barf("Unexpected closure type %s in blocking queue of %p (%s)",
967 info_type((StgClosure *)bqe), node, info_type(node));
975 #if defined(PARALLEL_HASKELL)
982 for (i=0, tso=run_queue_hd;
983 tso != END_TSO_QUEUE;
984 i++, tso=tso->link) {