Round stack size to a whole number of megablocks
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21  * LOCK: sched_mutex
22  */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
27  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
28  *  + 1                       (the closure to enter)
29  *  + 1                       (stg_ap_v_ret)
30  *  + 1                       (spare slot req'd by stg_ap_v_ret)
31  *
32  * A thread with this stack will bomb immediately with a stack
33  * overflow, which will increase its stack size.  
34  */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38    Create a new thread.
39
40    The new thread starts with the given stack size.  Before the
41    scheduler can run, however, this thread needs to have a closure
42    (and possibly some arguments) pushed on its stack.  See
43    pushClosure() in Schedule.h.
44
45    createGenThread() and createIOThread() (in SchedAPI.h) are
46    convenient packaged versions of this function.
47
48    currently pri (priority) is only used in a GRAN setup -- HWL
49    ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59     StgTSO *tso;
60     nat stack_size;
61
62     /* sched_mutex is *not* required */
63
64     /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68         threadsIgnored++;
69         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71         return END_TSO_QUEUE;
72     }
73     threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82     /* catch ridiculously small stack sizes */
83     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85     }
86
87     stack_size = round_to_mblocks(size) - TSO_STRUCT_SIZEW;
88     
89     tso = (StgTSO *)allocateLocal(cap, size);
90     TICK_ALLOC_TSO(stack_size, 0);
91
92     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94     SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97     // Always start with the compiled code evaluator
98     tso->what_next = ThreadRunGHC;
99
100     tso->why_blocked  = NotBlocked;
101     tso->blocked_exceptions = END_TSO_QUEUE;
102     tso->flags = TSO_DIRTY;
103     
104     tso->saved_errno = 0;
105     tso->bound = NULL;
106     tso->cap = cap;
107     
108     tso->stack_size     = stack_size;
109     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
110                           - TSO_STRUCT_SIZEW;
111     tso->sp             = (P_)&(tso->stack) + stack_size;
112
113     tso->trec = NO_TREC;
114     
115 #ifdef PROFILING
116     tso->prof.CCCS = CCS_MAIN;
117 #endif
118     
119   /* put a stop frame on the stack */
120     tso->sp -= sizeofW(StgStopFrame);
121     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122     tso->_link = END_TSO_QUEUE;
123     
124   // ToDo: check this
125 #if defined(GRAN)
126     /* uses more flexible routine in GranSim */
127     insertThread(tso, CurrentProc);
128 #else
129     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130      * from its creation
131      */
132 #endif
133     
134 #if defined(GRAN) 
135     if (RtsFlags.GranFlags.GranSimStats.Full) 
136         DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138     if (RtsFlags.ParFlags.ParStats.Full) 
139         DumpGranEvent(GR_STARTQ,tso);
140     /* HACk to avoid SCHEDULE 
141        LastTSO = tso; */
142 #endif
143     
144     /* Link the new thread on the global thread list.
145      */
146     ACQUIRE_LOCK(&sched_mutex);
147     tso->id = next_thread_id++;  // while we have the mutex
148     tso->global_link = g0s0->threads;
149     g0s0->threads = tso;
150     RELEASE_LOCK(&sched_mutex);
151     
152 #if defined(DIST)
153     tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155     
156 #if defined(GRAN)
157     tso->gran.pri = pri;
158 # if defined(DEBUG)
159     tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161     tso->gran.sparkname   = 0;
162     tso->gran.startedat   = CURRENT_TIME; 
163     tso->gran.exported    = 0;
164     tso->gran.basicblocks = 0;
165     tso->gran.allocs      = 0;
166     tso->gran.exectime    = 0;
167     tso->gran.fetchtime   = 0;
168     tso->gran.fetchcount  = 0;
169     tso->gran.blocktime   = 0;
170     tso->gran.blockcount  = 0;
171     tso->gran.blockedat   = 0;
172     tso->gran.globalsparks = 0;
173     tso->gran.localsparks  = 0;
174     if (RtsFlags.GranFlags.Light)
175         tso->gran.clock  = Now; /* local clock */
176     else
177         tso->gran.clock  = 0;
178     
179     IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182     tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184     tso->par.sparkname   = 0;
185     tso->par.startedat   = CURRENT_TIME; 
186     tso->par.exported    = 0;
187     tso->par.basicblocks = 0;
188     tso->par.allocs      = 0;
189     tso->par.exectime    = 0;
190     tso->par.fetchtime   = 0;
191     tso->par.fetchcount  = 0;
192     tso->par.blocktime   = 0;
193     tso->par.blockcount  = 0;
194     tso->par.blockedat   = 0;
195     tso->par.globalsparks = 0;
196     tso->par.localsparks  = 0;
197 #endif
198     
199 #if defined(GRAN)
200     globalGranStats.tot_threads_created++;
201     globalGranStats.threads_created_on_PE[CurrentProc]++;
202     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203     globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205     // collect parallel global statistics (currently done together with GC stats)
206     if (RtsFlags.ParFlags.ParStats.Global &&
207         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
209         globalParStats.tot_threads_created++;
210     }
211 #endif 
212     
213     postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
214
215 #if defined(GRAN)
216     debugTrace(GRAN_DEBUG_pri,
217                "==__ schedule: Created TSO %d (%p);",
218                CurrentProc, tso, tso->id);
219 #elif defined(PARALLEL_HASKELL)
220     debugTrace(PAR_DEBUG_verbose,
221                "==__ schedule: Created TSO %d (%p); %d threads active",
222                (long)tso->id, tso, advisory_thread_count);
223 #else
224     debugTrace(DEBUG_sched,
225                "created thread %ld, stack size = %lx words", 
226                (long)tso->id, (long)tso->stack_size);
227 #endif    
228     return tso;
229 }
230
231 #if defined(PAR)
232 /* RFP:
233    all parallel thread creation calls should fall through the following routine.
234 */
235 StgTSO *
236 createThreadFromSpark(rtsSpark spark) 
237 { StgTSO *tso;
238   ASSERT(spark != (rtsSpark)NULL);
239 // JB: TAKE CARE OF THIS COUNTER! BUGGY
240   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
241   { threadsIgnored++;
242     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
243           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
244     return END_TSO_QUEUE;
245   }
246   else
247   { threadsCreated++;
248     tso = createThread(RtsFlags.GcFlags.initialStkSize);
249     if (tso==END_TSO_QUEUE)     
250       barf("createSparkThread: Cannot create TSO");
251 #if defined(DIST)
252     tso->priority = AdvisoryPriority;
253 #endif
254     pushClosure(tso,spark);
255     addToRunQueue(tso);
256     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
257   }
258   return tso;
259 }
260 #endif
261
262 /* ---------------------------------------------------------------------------
263  * Comparing Thread ids.
264  *
265  * This is used from STG land in the implementation of the
266  * instances of Eq/Ord for ThreadIds.
267  * ------------------------------------------------------------------------ */
268
269 int
270 cmp_thread(StgPtr tso1, StgPtr tso2) 
271
272   StgThreadID id1 = ((StgTSO *)tso1)->id; 
273   StgThreadID id2 = ((StgTSO *)tso2)->id;
274  
275   if (id1 < id2) return (-1);
276   if (id1 > id2) return 1;
277   return 0;
278 }
279
280 /* ---------------------------------------------------------------------------
281  * Fetching the ThreadID from an StgTSO.
282  *
283  * This is used in the implementation of Show for ThreadIds.
284  * ------------------------------------------------------------------------ */
285 int
286 rts_getThreadId(StgPtr tso) 
287 {
288   return ((StgTSO *)tso)->id;
289 }
290
291 /* -----------------------------------------------------------------------------
292    Remove a thread from a queue.
293    Fails fatally if the TSO is not on the queue.
294    -------------------------------------------------------------------------- */
295
296 void
297 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
298 {
299     StgTSO *t, *prev;
300
301     prev = NULL;
302     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
303         if (t == tso) {
304             if (prev) {
305                 setTSOLink(cap,prev,t->_link);
306             } else {
307                 *queue = t->_link;
308             }
309             return;
310         }
311     }
312     barf("removeThreadFromQueue: not found");
313 }
314
315 void
316 removeThreadFromDeQueue (Capability *cap, 
317                          StgTSO **head, StgTSO **tail, StgTSO *tso)
318 {
319     StgTSO *t, *prev;
320
321     prev = NULL;
322     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
323         if (t == tso) {
324             if (prev) {
325                 setTSOLink(cap,prev,t->_link);
326             } else {
327                 *head = t->_link;
328             }
329             if (*tail == tso) {
330                 if (prev) {
331                     *tail = prev;
332                 } else {
333                     *tail = END_TSO_QUEUE;
334                 }
335             }
336             return;
337         }
338     }
339     barf("removeThreadFromMVarQueue: not found");
340 }
341
342 void
343 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
344 {
345     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
346 }
347
348 /* ----------------------------------------------------------------------------
349    unblockOne()
350
351    unblock a single thread.
352    ------------------------------------------------------------------------- */
353
354 #if defined(GRAN)
355 STATIC_INLINE void
356 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
357 {
358 }
359 #elif defined(PARALLEL_HASKELL)
360 STATIC_INLINE void
361 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
362 {
363   /* write RESUME events to log file and
364      update blocked and fetch time (depending on type of the orig closure) */
365   if (RtsFlags.ParFlags.ParStats.Full) {
366     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
367                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
368                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
369     if (emptyRunQueue())
370       emitSchedule = rtsTrue;
371
372     switch (get_itbl(node)->type) {
373         case FETCH_ME_BQ:
374           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
375           break;
376         case RBH:
377         case FETCH_ME:
378         case BLACKHOLE_BQ:
379           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
380           break;
381 #ifdef DIST
382         case MVAR:
383           break;
384 #endif    
385         default:
386           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
387         }
388       }
389 }
390 #endif
391
392 #if defined(GRAN)
393 StgBlockingQueueElement *
394 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
395 {
396     StgTSO *tso;
397     PEs node_loc, tso_loc;
398
399     node_loc = where_is(node); // should be lifted out of loop
400     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
401     tso_loc = where_is((StgClosure *)tso);
402     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
403       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
404       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
405       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
406       // insertThread(tso, node_loc);
407       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
408                 ResumeThread,
409                 tso, node, (rtsSpark*)NULL);
410       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
411       // len_local++;
412       // len++;
413     } else { // TSO is remote (actually should be FMBQ)
414       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
415                                   RtsFlags.GranFlags.Costs.gunblocktime +
416                                   RtsFlags.GranFlags.Costs.latency;
417       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
418                 UnblockThread,
419                 tso, node, (rtsSpark*)NULL);
420       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
421       // len++;
422     }
423     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
424     IF_GRAN_DEBUG(bq,
425                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
426                           (node_loc==tso_loc ? "Local" : "Global"), 
427                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
428     tso->block_info.closure = NULL;
429     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", 
430                tso->id, tso));
431 }
432 #elif defined(PARALLEL_HASKELL)
433 StgBlockingQueueElement *
434 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
435 {
436     StgBlockingQueueElement *next;
437
438     switch (get_itbl(bqe)->type) {
439     case TSO:
440       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
441       /* if it's a TSO just push it onto the run_queue */
442       next = bqe->link;
443       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
444       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
445       threadRunnable();
446       unblockCount(bqe, node);
447       /* reset blocking status after dumping event */
448       ((StgTSO *)bqe)->why_blocked = NotBlocked;
449       break;
450
451     case BLOCKED_FETCH:
452       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
453       next = bqe->link;
454       bqe->link = (StgBlockingQueueElement *)PendingFetches;
455       PendingFetches = (StgBlockedFetch *)bqe;
456       break;
457
458 # if defined(DEBUG)
459       /* can ignore this case in a non-debugging setup; 
460          see comments on RBHSave closures above */
461     case CONSTR:
462       /* check that the closure is an RBHSave closure */
463       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
464              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
465              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
466       break;
467
468     default:
469       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
470            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
471            (StgClosure *)bqe);
472 # endif
473     }
474   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
475   return next;
476 }
477 #endif
478
479 StgTSO *
480 unblockOne (Capability *cap, StgTSO *tso)
481 {
482     return unblockOne_(cap,tso,rtsTrue); // allow migration
483 }
484
485 StgTSO *
486 unblockOne_ (Capability *cap, StgTSO *tso, 
487              rtsBool allow_migrate USED_IF_THREADS)
488 {
489   StgTSO *next;
490
491   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
492   ASSERT(tso->why_blocked != NotBlocked);
493
494   tso->why_blocked = NotBlocked;
495   next = tso->_link;
496   tso->_link = END_TSO_QUEUE;
497
498 #if defined(THREADED_RTS)
499   if (tso->cap == cap || (!tsoLocked(tso) && 
500                           allow_migrate && 
501                           RtsFlags.ParFlags.wakeupMigrate)) {
502       // We are waking up this thread on the current Capability, which
503       // might involve migrating it from the Capability it was last on.
504       if (tso->bound) {
505           ASSERT(tso->bound->cap == tso->cap);
506           tso->bound->cap = cap;
507       }
508
509       tso->cap = cap;
510       appendToRunQueue(cap,tso);
511
512       // context-switch soonish so we can migrate the new thread if
513       // necessary.  NB. not contextSwitchCapability(cap), which would
514       // force a context switch immediately.
515       cap->context_switch = 1;
516   } else {
517       // we'll try to wake it up on the Capability it was last on.
518       wakeupThreadOnCapability(cap, tso->cap, tso);
519   }
520 #else
521   appendToRunQueue(cap,tso);
522
523   // context-switch soonish so we can migrate the new thread if
524   // necessary.  NB. not contextSwitchCapability(cap), which would
525   // force a context switch immediately.
526   cap->context_switch = 1;
527 #endif
528
529   postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
530
531   debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
532              (long)tso->id, tso->cap->no);
533
534   return next;
535 }
536
537 /* ----------------------------------------------------------------------------
538    awakenBlockedQueue
539
540    wakes up all the threads on the specified queue.
541    ------------------------------------------------------------------------- */
542
543 #if defined(GRAN)
544 void
545 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
546 {
547   StgBlockingQueueElement *bqe;
548   PEs node_loc;
549   nat len = 0; 
550
551   IF_GRAN_DEBUG(bq, 
552                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
553                       node, CurrentProc, CurrentTime[CurrentProc], 
554                       CurrentTSO->id, CurrentTSO));
555
556   node_loc = where_is(node);
557
558   ASSERT(q == END_BQ_QUEUE ||
559          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
560          get_itbl(q)->type == CONSTR); // closure (type constructor)
561   ASSERT(is_unique(node));
562
563   /* FAKE FETCH: magically copy the node to the tso's proc;
564      no Fetch necessary because in reality the node should not have been 
565      moved to the other PE in the first place
566   */
567   if (CurrentProc!=node_loc) {
568     IF_GRAN_DEBUG(bq, 
569                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
570                         node, node_loc, CurrentProc, CurrentTSO->id, 
571                         // CurrentTSO, where_is(CurrentTSO),
572                         node->header.gran.procs));
573     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
574     IF_GRAN_DEBUG(bq, 
575                   debugBelch("## new bitmask of node %p is %#x\n",
576                         node, node->header.gran.procs));
577     if (RtsFlags.GranFlags.GranSimStats.Global) {
578       globalGranStats.tot_fake_fetches++;
579     }
580   }
581
582   bqe = q;
583   // ToDo: check: ASSERT(CurrentProc==node_loc);
584   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
585     //next = bqe->link;
586     /* 
587        bqe points to the current element in the queue
588        next points to the next element in the queue
589     */
590     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
591     //tso_loc = where_is(tso);
592     len++;
593     bqe = unblockOne(bqe, node);
594   }
595
596   /* if this is the BQ of an RBH, we have to put back the info ripped out of
597      the closure to make room for the anchor of the BQ */
598   if (bqe!=END_BQ_QUEUE) {
599     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
600     /*
601     ASSERT((info_ptr==&RBH_Save_0_info) ||
602            (info_ptr==&RBH_Save_1_info) ||
603            (info_ptr==&RBH_Save_2_info));
604     */
605     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
606     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
607     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
608
609     IF_GRAN_DEBUG(bq,
610                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
611                         node, info_type(node)));
612   }
613
614   /* statistics gathering */
615   if (RtsFlags.GranFlags.GranSimStats.Global) {
616     // globalGranStats.tot_bq_processing_time += bq_processing_time;
617     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
618     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
619     globalGranStats.tot_awbq++;             // total no. of bqs awakened
620   }
621   IF_GRAN_DEBUG(bq,
622                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
623                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
624 }
625 #elif defined(PARALLEL_HASKELL)
626 void 
627 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
628 {
629   StgBlockingQueueElement *bqe;
630
631   IF_PAR_DEBUG(verbose, 
632                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
633                      node, mytid));
634 #ifdef DIST  
635   //RFP
636   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
637     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
638     return;
639   }
640 #endif
641   
642   ASSERT(q == END_BQ_QUEUE ||
643          get_itbl(q)->type == TSO ||           
644          get_itbl(q)->type == BLOCKED_FETCH || 
645          get_itbl(q)->type == CONSTR); 
646
647   bqe = q;
648   while (get_itbl(bqe)->type==TSO || 
649          get_itbl(bqe)->type==BLOCKED_FETCH) {
650     bqe = unblockOne(bqe, node);
651   }
652 }
653
654 #else   /* !GRAN && !PARALLEL_HASKELL */
655
656 void
657 awakenBlockedQueue(Capability *cap, StgTSO *tso)
658 {
659     while (tso != END_TSO_QUEUE) {
660         tso = unblockOne(cap,tso);
661     }
662 }
663 #endif
664
665
666 /* ---------------------------------------------------------------------------
667  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
668  * used by Control.Concurrent for error checking.
669  * ------------------------------------------------------------------------- */
670  
671 HsBool
672 rtsSupportsBoundThreads(void)
673 {
674 #if defined(THREADED_RTS)
675   return HS_BOOL_TRUE;
676 #else
677   return HS_BOOL_FALSE;
678 #endif
679 }
680
681 /* ---------------------------------------------------------------------------
682  * isThreadBound(tso): check whether tso is bound to an OS thread.
683  * ------------------------------------------------------------------------- */
684  
685 StgBool
686 isThreadBound(StgTSO* tso USED_IF_THREADS)
687 {
688 #if defined(THREADED_RTS)
689   return (tso->bound != NULL);
690 #endif
691   return rtsFalse;
692 }
693
694 /* ----------------------------------------------------------------------------
695  * Debugging: why is a thread blocked
696  * ------------------------------------------------------------------------- */
697
698 #if DEBUG
699 void
700 printThreadBlockage(StgTSO *tso)
701 {
702   switch (tso->why_blocked) {
703   case BlockedOnRead:
704     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
705     break;
706   case BlockedOnWrite:
707     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
708     break;
709 #if defined(mingw32_HOST_OS)
710     case BlockedOnDoProc:
711     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
712     break;
713 #endif
714   case BlockedOnDelay:
715     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
716     break;
717   case BlockedOnMVar:
718     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
719     break;
720   case BlockedOnException:
721     debugBelch("is blocked on delivering an exception to thread %lu",
722                (unsigned long)tso->block_info.tso->id);
723     break;
724   case BlockedOnBlackHole:
725     debugBelch("is blocked on a black hole");
726     break;
727   case NotBlocked:
728     debugBelch("is not blocked");
729     break;
730 #if defined(PARALLEL_HASKELL)
731   case BlockedOnGA:
732     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
733             tso->block_info.closure, info_type(tso->block_info.closure));
734     break;
735   case BlockedOnGA_NoSend:
736     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
737             tso->block_info.closure, info_type(tso->block_info.closure));
738     break;
739 #endif
740   case BlockedOnCCall:
741     debugBelch("is blocked on an external call");
742     break;
743   case BlockedOnCCall_NoUnblockExc:
744     debugBelch("is blocked on an external call (exceptions were already blocked)");
745     break;
746   case BlockedOnSTM:
747     debugBelch("is blocked on an STM operation");
748     break;
749   default:
750     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
751          tso->why_blocked, tso->id, tso);
752   }
753 }
754
755 void
756 printThreadStatus(StgTSO *t)
757 {
758   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
759     {
760       void *label = lookupThreadLabel(t->id);
761       if (label) debugBelch("[\"%s\"] ",(char *)label);
762     }
763     if (t->what_next == ThreadRelocated) {
764         debugBelch("has been relocated...\n");
765     } else {
766         switch (t->what_next) {
767         case ThreadKilled:
768             debugBelch("has been killed");
769             break;
770         case ThreadComplete:
771             debugBelch("has completed");
772             break;
773         default:
774             printThreadBlockage(t);
775         }
776         if (t->flags & TSO_DIRTY) {
777             debugBelch(" (TSO_DIRTY)");
778         } else if (t->flags & TSO_LINK_DIRTY) {
779             debugBelch(" (TSO_LINK_DIRTY)");
780         }
781         debugBelch("\n");
782     }
783 }
784
785 void
786 printAllThreads(void)
787 {
788   StgTSO *t, *next;
789   nat i, s;
790   Capability *cap;
791
792 # if defined(GRAN)
793   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
794   ullong_format_string(TIME_ON_PROC(CurrentProc), 
795                        time_string, rtsFalse/*no commas!*/);
796
797   debugBelch("all threads at [%s]:\n", time_string);
798 # elif defined(PARALLEL_HASKELL)
799   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
800   ullong_format_string(CURRENT_TIME,
801                        time_string, rtsFalse/*no commas!*/);
802
803   debugBelch("all threads at [%s]:\n", time_string);
804 # else
805   debugBelch("all threads:\n");
806 # endif
807
808   for (i = 0; i < n_capabilities; i++) {
809       cap = &capabilities[i];
810       debugBelch("threads on capability %d:\n", cap->no);
811       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
812           printThreadStatus(t);
813       }
814   }
815
816   debugBelch("other threads:\n");
817   for (s = 0; s < total_steps; s++) {
818     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
819       if (t->why_blocked != NotBlocked) {
820           printThreadStatus(t);
821       }
822       if (t->what_next == ThreadRelocated) {
823           next = t->_link;
824       } else {
825           next = t->global_link;
826       }
827     }
828   }
829 }
830
831 // useful from gdb
832 void 
833 printThreadQueue(StgTSO *t)
834 {
835     nat i = 0;
836     for (; t != END_TSO_QUEUE; t = t->_link) {
837         printThreadStatus(t);
838         i++;
839     }
840     debugBelch("%d threads on queue\n", i);
841 }
842
843 /* 
844    Print a whole blocking queue attached to node (debugging only).
845 */
846 # if defined(PARALLEL_HASKELL)
847 void 
848 print_bq (StgClosure *node)
849 {
850   StgBlockingQueueElement *bqe;
851   StgTSO *tso;
852   rtsBool end;
853
854   debugBelch("## BQ of closure %p (%s): ",
855           node, info_type(node));
856
857   /* should cover all closures that may have a blocking queue */
858   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
859          get_itbl(node)->type == FETCH_ME_BQ ||
860          get_itbl(node)->type == RBH ||
861          get_itbl(node)->type == MVAR);
862     
863   ASSERT(node!=(StgClosure*)NULL);         // sanity check
864
865   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
866 }
867
868 /* 
869    Print a whole blocking queue starting with the element bqe.
870 */
871 void 
872 print_bqe (StgBlockingQueueElement *bqe)
873 {
874   rtsBool end;
875
876   /* 
877      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
878   */
879   for (end = (bqe==END_BQ_QUEUE);
880        !end; // iterate until bqe points to a CONSTR
881        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
882        bqe = end ? END_BQ_QUEUE : bqe->link) {
883     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
884     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
885     /* types of closures that may appear in a blocking queue */
886     ASSERT(get_itbl(bqe)->type == TSO ||           
887            get_itbl(bqe)->type == BLOCKED_FETCH || 
888            get_itbl(bqe)->type == CONSTR); 
889     /* only BQs of an RBH end with an RBH_Save closure */
890     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
891
892     switch (get_itbl(bqe)->type) {
893     case TSO:
894       debugBelch(" TSO %u (%x),",
895               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
896       break;
897     case BLOCKED_FETCH:
898       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
899               ((StgBlockedFetch *)bqe)->node, 
900               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
901               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
902               ((StgBlockedFetch *)bqe)->ga.weight);
903       break;
904     case CONSTR:
905       debugBelch(" %s (IP %p),",
906               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
907                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
908                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
909                "RBH_Save_?"), get_itbl(bqe));
910       break;
911     default:
912       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
913            info_type((StgClosure *)bqe)); // , node, info_type(node));
914       break;
915     }
916   } /* for */
917   debugBelch("\n");
918 }
919 # elif defined(GRAN)
920 void 
921 print_bq (StgClosure *node)
922 {
923   StgBlockingQueueElement *bqe;
924   PEs node_loc, tso_loc;
925   rtsBool end;
926
927   /* should cover all closures that may have a blocking queue */
928   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
929          get_itbl(node)->type == FETCH_ME_BQ ||
930          get_itbl(node)->type == RBH);
931     
932   ASSERT(node!=(StgClosure*)NULL);         // sanity check
933   node_loc = where_is(node);
934
935   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
936           node, info_type(node), node_loc);
937
938   /* 
939      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
940   */
941   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
942        !end; // iterate until bqe points to a CONSTR
943        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
944     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
945     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
946     /* types of closures that may appear in a blocking queue */
947     ASSERT(get_itbl(bqe)->type == TSO ||           
948            get_itbl(bqe)->type == CONSTR); 
949     /* only BQs of an RBH end with an RBH_Save closure */
950     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
951
952     tso_loc = where_is((StgClosure *)bqe);
953     switch (get_itbl(bqe)->type) {
954     case TSO:
955       debugBelch(" TSO %d (%p) on [PE %d],",
956               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
957       break;
958     case CONSTR:
959       debugBelch(" %s (IP %p),",
960               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
961                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
962                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
963                "RBH_Save_?"), get_itbl(bqe));
964       break;
965     default:
966       barf("Unexpected closure type %s in blocking queue of %p (%s)",
967            info_type((StgClosure *)bqe), node, info_type(node));
968       break;
969     }
970   } /* for */
971   debugBelch("\n");
972 }
973 # endif
974
975 #if defined(PARALLEL_HASKELL)
976 nat
977 run_queue_len(void)
978 {
979     nat i;
980     StgTSO *tso;
981     
982     for (i=0, tso=run_queue_hd; 
983          tso != END_TSO_QUEUE;
984          i++, tso=tso->link) {
985         /* nothing */
986     }
987         
988     return i;
989 }
990 #endif
991
992 #endif /* DEBUG */