Fix bug in previous change: allocate the correct size
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21  * LOCK: sched_mutex
22  */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
27  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
28  *  + 1                       (the closure to enter)
29  *  + 1                       (stg_ap_v_ret)
30  *  + 1                       (spare slot req'd by stg_ap_v_ret)
31  *
32  * A thread with this stack will bomb immediately with a stack
33  * overflow, which will increase its stack size.  
34  */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38    Create a new thread.
39
40    The new thread starts with the given stack size.  Before the
41    scheduler can run, however, this thread needs to have a closure
42    (and possibly some arguments) pushed on its stack.  See
43    pushClosure() in Schedule.h.
44
45    createGenThread() and createIOThread() (in SchedAPI.h) are
46    convenient packaged versions of this function.
47
48    currently pri (priority) is only used in a GRAN setup -- HWL
49    ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59     StgTSO *tso;
60     nat stack_size;
61
62     /* sched_mutex is *not* required */
63
64     /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68         threadsIgnored++;
69         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71         return END_TSO_QUEUE;
72     }
73     threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82     /* catch ridiculously small stack sizes */
83     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85     }
86
87     size = round_to_mblocks(size);
88     tso = (StgTSO *)allocateLocal(cap, size);
89
90     stack_size = size - TSO_STRUCT_SIZEW;
91     TICK_ALLOC_TSO(stack_size, 0);
92
93     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94 #if defined(GRAN)
95     SET_GRAN_HDR(tso, ThisPE);
96 #endif
97
98     // Always start with the compiled code evaluator
99     tso->what_next = ThreadRunGHC;
100
101     tso->why_blocked  = NotBlocked;
102     tso->blocked_exceptions = END_TSO_QUEUE;
103     tso->flags = TSO_DIRTY;
104     
105     tso->saved_errno = 0;
106     tso->bound = NULL;
107     tso->cap = cap;
108     
109     tso->stack_size     = stack_size;
110     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
111                           - TSO_STRUCT_SIZEW;
112     tso->sp             = (P_)&(tso->stack) + stack_size;
113
114     tso->trec = NO_TREC;
115     
116 #ifdef PROFILING
117     tso->prof.CCCS = CCS_MAIN;
118 #endif
119     
120   /* put a stop frame on the stack */
121     tso->sp -= sizeofW(StgStopFrame);
122     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
123     tso->_link = END_TSO_QUEUE;
124     
125   // ToDo: check this
126 #if defined(GRAN)
127     /* uses more flexible routine in GranSim */
128     insertThread(tso, CurrentProc);
129 #else
130     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
131      * from its creation
132      */
133 #endif
134     
135 #if defined(GRAN) 
136     if (RtsFlags.GranFlags.GranSimStats.Full) 
137         DumpGranEvent(GR_START,tso);
138 #elif defined(PARALLEL_HASKELL)
139     if (RtsFlags.ParFlags.ParStats.Full) 
140         DumpGranEvent(GR_STARTQ,tso);
141     /* HACk to avoid SCHEDULE 
142        LastTSO = tso; */
143 #endif
144     
145     /* Link the new thread on the global thread list.
146      */
147     ACQUIRE_LOCK(&sched_mutex);
148     tso->id = next_thread_id++;  // while we have the mutex
149     tso->global_link = g0s0->threads;
150     g0s0->threads = tso;
151     RELEASE_LOCK(&sched_mutex);
152     
153 #if defined(DIST)
154     tso->dist.priority = MandatoryPriority; //by default that is...
155 #endif
156     
157 #if defined(GRAN)
158     tso->gran.pri = pri;
159 # if defined(DEBUG)
160     tso->gran.magic = TSO_MAGIC; // debugging only
161 # endif
162     tso->gran.sparkname   = 0;
163     tso->gran.startedat   = CURRENT_TIME; 
164     tso->gran.exported    = 0;
165     tso->gran.basicblocks = 0;
166     tso->gran.allocs      = 0;
167     tso->gran.exectime    = 0;
168     tso->gran.fetchtime   = 0;
169     tso->gran.fetchcount  = 0;
170     tso->gran.blocktime   = 0;
171     tso->gran.blockcount  = 0;
172     tso->gran.blockedat   = 0;
173     tso->gran.globalsparks = 0;
174     tso->gran.localsparks  = 0;
175     if (RtsFlags.GranFlags.Light)
176         tso->gran.clock  = Now; /* local clock */
177     else
178         tso->gran.clock  = 0;
179     
180     IF_DEBUG(gran,printTSO(tso));
181 #elif defined(PARALLEL_HASKELL)
182 # if defined(DEBUG)
183     tso->par.magic = TSO_MAGIC; // debugging only
184 # endif
185     tso->par.sparkname   = 0;
186     tso->par.startedat   = CURRENT_TIME; 
187     tso->par.exported    = 0;
188     tso->par.basicblocks = 0;
189     tso->par.allocs      = 0;
190     tso->par.exectime    = 0;
191     tso->par.fetchtime   = 0;
192     tso->par.fetchcount  = 0;
193     tso->par.blocktime   = 0;
194     tso->par.blockcount  = 0;
195     tso->par.blockedat   = 0;
196     tso->par.globalsparks = 0;
197     tso->par.localsparks  = 0;
198 #endif
199     
200 #if defined(GRAN)
201     globalGranStats.tot_threads_created++;
202     globalGranStats.threads_created_on_PE[CurrentProc]++;
203     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
204     globalGranStats.tot_sq_probes++;
205 #elif defined(PARALLEL_HASKELL)
206     // collect parallel global statistics (currently done together with GC stats)
207     if (RtsFlags.ParFlags.ParStats.Global &&
208         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
209         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
210         globalParStats.tot_threads_created++;
211     }
212 #endif 
213     
214     postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
215
216 #if defined(GRAN)
217     debugTrace(GRAN_DEBUG_pri,
218                "==__ schedule: Created TSO %d (%p);",
219                CurrentProc, tso, tso->id);
220 #elif defined(PARALLEL_HASKELL)
221     debugTrace(PAR_DEBUG_verbose,
222                "==__ schedule: Created TSO %d (%p); %d threads active",
223                (long)tso->id, tso, advisory_thread_count);
224 #else
225     debugTrace(DEBUG_sched,
226                "created thread %ld, stack size = %lx words", 
227                (long)tso->id, (long)tso->stack_size);
228 #endif    
229     return tso;
230 }
231
232 #if defined(PAR)
233 /* RFP:
234    all parallel thread creation calls should fall through the following routine.
235 */
236 StgTSO *
237 createThreadFromSpark(rtsSpark spark) 
238 { StgTSO *tso;
239   ASSERT(spark != (rtsSpark)NULL);
240 // JB: TAKE CARE OF THIS COUNTER! BUGGY
241   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
242   { threadsIgnored++;
243     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
244           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
245     return END_TSO_QUEUE;
246   }
247   else
248   { threadsCreated++;
249     tso = createThread(RtsFlags.GcFlags.initialStkSize);
250     if (tso==END_TSO_QUEUE)     
251       barf("createSparkThread: Cannot create TSO");
252 #if defined(DIST)
253     tso->priority = AdvisoryPriority;
254 #endif
255     pushClosure(tso,spark);
256     addToRunQueue(tso);
257     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
258   }
259   return tso;
260 }
261 #endif
262
263 /* ---------------------------------------------------------------------------
264  * Comparing Thread ids.
265  *
266  * This is used from STG land in the implementation of the
267  * instances of Eq/Ord for ThreadIds.
268  * ------------------------------------------------------------------------ */
269
270 int
271 cmp_thread(StgPtr tso1, StgPtr tso2) 
272
273   StgThreadID id1 = ((StgTSO *)tso1)->id; 
274   StgThreadID id2 = ((StgTSO *)tso2)->id;
275  
276   if (id1 < id2) return (-1);
277   if (id1 > id2) return 1;
278   return 0;
279 }
280
281 /* ---------------------------------------------------------------------------
282  * Fetching the ThreadID from an StgTSO.
283  *
284  * This is used in the implementation of Show for ThreadIds.
285  * ------------------------------------------------------------------------ */
286 int
287 rts_getThreadId(StgPtr tso) 
288 {
289   return ((StgTSO *)tso)->id;
290 }
291
292 /* -----------------------------------------------------------------------------
293    Remove a thread from a queue.
294    Fails fatally if the TSO is not on the queue.
295    -------------------------------------------------------------------------- */
296
297 void
298 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
299 {
300     StgTSO *t, *prev;
301
302     prev = NULL;
303     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
304         if (t == tso) {
305             if (prev) {
306                 setTSOLink(cap,prev,t->_link);
307             } else {
308                 *queue = t->_link;
309             }
310             return;
311         }
312     }
313     barf("removeThreadFromQueue: not found");
314 }
315
316 void
317 removeThreadFromDeQueue (Capability *cap, 
318                          StgTSO **head, StgTSO **tail, StgTSO *tso)
319 {
320     StgTSO *t, *prev;
321
322     prev = NULL;
323     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
324         if (t == tso) {
325             if (prev) {
326                 setTSOLink(cap,prev,t->_link);
327             } else {
328                 *head = t->_link;
329             }
330             if (*tail == tso) {
331                 if (prev) {
332                     *tail = prev;
333                 } else {
334                     *tail = END_TSO_QUEUE;
335                 }
336             }
337             return;
338         }
339     }
340     barf("removeThreadFromMVarQueue: not found");
341 }
342
343 void
344 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
345 {
346     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
347 }
348
349 /* ----------------------------------------------------------------------------
350    unblockOne()
351
352    unblock a single thread.
353    ------------------------------------------------------------------------- */
354
355 #if defined(GRAN)
356 STATIC_INLINE void
357 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
358 {
359 }
360 #elif defined(PARALLEL_HASKELL)
361 STATIC_INLINE void
362 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
363 {
364   /* write RESUME events to log file and
365      update blocked and fetch time (depending on type of the orig closure) */
366   if (RtsFlags.ParFlags.ParStats.Full) {
367     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
368                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
369                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
370     if (emptyRunQueue())
371       emitSchedule = rtsTrue;
372
373     switch (get_itbl(node)->type) {
374         case FETCH_ME_BQ:
375           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
376           break;
377         case RBH:
378         case FETCH_ME:
379         case BLACKHOLE_BQ:
380           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
381           break;
382 #ifdef DIST
383         case MVAR:
384           break;
385 #endif    
386         default:
387           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
388         }
389       }
390 }
391 #endif
392
393 #if defined(GRAN)
394 StgBlockingQueueElement *
395 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
396 {
397     StgTSO *tso;
398     PEs node_loc, tso_loc;
399
400     node_loc = where_is(node); // should be lifted out of loop
401     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
402     tso_loc = where_is((StgClosure *)tso);
403     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
404       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
405       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
406       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
407       // insertThread(tso, node_loc);
408       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
409                 ResumeThread,
410                 tso, node, (rtsSpark*)NULL);
411       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
412       // len_local++;
413       // len++;
414     } else { // TSO is remote (actually should be FMBQ)
415       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
416                                   RtsFlags.GranFlags.Costs.gunblocktime +
417                                   RtsFlags.GranFlags.Costs.latency;
418       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
419                 UnblockThread,
420                 tso, node, (rtsSpark*)NULL);
421       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
422       // len++;
423     }
424     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
425     IF_GRAN_DEBUG(bq,
426                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
427                           (node_loc==tso_loc ? "Local" : "Global"), 
428                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
429     tso->block_info.closure = NULL;
430     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", 
431                tso->id, tso));
432 }
433 #elif defined(PARALLEL_HASKELL)
434 StgBlockingQueueElement *
435 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
436 {
437     StgBlockingQueueElement *next;
438
439     switch (get_itbl(bqe)->type) {
440     case TSO:
441       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
442       /* if it's a TSO just push it onto the run_queue */
443       next = bqe->link;
444       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
445       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
446       threadRunnable();
447       unblockCount(bqe, node);
448       /* reset blocking status after dumping event */
449       ((StgTSO *)bqe)->why_blocked = NotBlocked;
450       break;
451
452     case BLOCKED_FETCH:
453       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
454       next = bqe->link;
455       bqe->link = (StgBlockingQueueElement *)PendingFetches;
456       PendingFetches = (StgBlockedFetch *)bqe;
457       break;
458
459 # if defined(DEBUG)
460       /* can ignore this case in a non-debugging setup; 
461          see comments on RBHSave closures above */
462     case CONSTR:
463       /* check that the closure is an RBHSave closure */
464       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
465              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
466              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
467       break;
468
469     default:
470       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
471            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
472            (StgClosure *)bqe);
473 # endif
474     }
475   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
476   return next;
477 }
478 #endif
479
480 StgTSO *
481 unblockOne (Capability *cap, StgTSO *tso)
482 {
483     return unblockOne_(cap,tso,rtsTrue); // allow migration
484 }
485
486 StgTSO *
487 unblockOne_ (Capability *cap, StgTSO *tso, 
488              rtsBool allow_migrate USED_IF_THREADS)
489 {
490   StgTSO *next;
491
492   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
493   ASSERT(tso->why_blocked != NotBlocked);
494
495   tso->why_blocked = NotBlocked;
496   next = tso->_link;
497   tso->_link = END_TSO_QUEUE;
498
499 #if defined(THREADED_RTS)
500   if (tso->cap == cap || (!tsoLocked(tso) && 
501                           allow_migrate && 
502                           RtsFlags.ParFlags.wakeupMigrate)) {
503       // We are waking up this thread on the current Capability, which
504       // might involve migrating it from the Capability it was last on.
505       if (tso->bound) {
506           ASSERT(tso->bound->cap == tso->cap);
507           tso->bound->cap = cap;
508       }
509
510       tso->cap = cap;
511       appendToRunQueue(cap,tso);
512
513       // context-switch soonish so we can migrate the new thread if
514       // necessary.  NB. not contextSwitchCapability(cap), which would
515       // force a context switch immediately.
516       cap->context_switch = 1;
517   } else {
518       // we'll try to wake it up on the Capability it was last on.
519       wakeupThreadOnCapability(cap, tso->cap, tso);
520   }
521 #else
522   appendToRunQueue(cap,tso);
523
524   // context-switch soonish so we can migrate the new thread if
525   // necessary.  NB. not contextSwitchCapability(cap), which would
526   // force a context switch immediately.
527   cap->context_switch = 1;
528 #endif
529
530   postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
531
532   debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
533              (long)tso->id, tso->cap->no);
534
535   return next;
536 }
537
538 /* ----------------------------------------------------------------------------
539    awakenBlockedQueue
540
541    wakes up all the threads on the specified queue.
542    ------------------------------------------------------------------------- */
543
544 #if defined(GRAN)
545 void
546 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
547 {
548   StgBlockingQueueElement *bqe;
549   PEs node_loc;
550   nat len = 0; 
551
552   IF_GRAN_DEBUG(bq, 
553                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
554                       node, CurrentProc, CurrentTime[CurrentProc], 
555                       CurrentTSO->id, CurrentTSO));
556
557   node_loc = where_is(node);
558
559   ASSERT(q == END_BQ_QUEUE ||
560          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
561          get_itbl(q)->type == CONSTR); // closure (type constructor)
562   ASSERT(is_unique(node));
563
564   /* FAKE FETCH: magically copy the node to the tso's proc;
565      no Fetch necessary because in reality the node should not have been 
566      moved to the other PE in the first place
567   */
568   if (CurrentProc!=node_loc) {
569     IF_GRAN_DEBUG(bq, 
570                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
571                         node, node_loc, CurrentProc, CurrentTSO->id, 
572                         // CurrentTSO, where_is(CurrentTSO),
573                         node->header.gran.procs));
574     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
575     IF_GRAN_DEBUG(bq, 
576                   debugBelch("## new bitmask of node %p is %#x\n",
577                         node, node->header.gran.procs));
578     if (RtsFlags.GranFlags.GranSimStats.Global) {
579       globalGranStats.tot_fake_fetches++;
580     }
581   }
582
583   bqe = q;
584   // ToDo: check: ASSERT(CurrentProc==node_loc);
585   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
586     //next = bqe->link;
587     /* 
588        bqe points to the current element in the queue
589        next points to the next element in the queue
590     */
591     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
592     //tso_loc = where_is(tso);
593     len++;
594     bqe = unblockOne(bqe, node);
595   }
596
597   /* if this is the BQ of an RBH, we have to put back the info ripped out of
598      the closure to make room for the anchor of the BQ */
599   if (bqe!=END_BQ_QUEUE) {
600     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
601     /*
602     ASSERT((info_ptr==&RBH_Save_0_info) ||
603            (info_ptr==&RBH_Save_1_info) ||
604            (info_ptr==&RBH_Save_2_info));
605     */
606     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
607     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
608     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
609
610     IF_GRAN_DEBUG(bq,
611                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
612                         node, info_type(node)));
613   }
614
615   /* statistics gathering */
616   if (RtsFlags.GranFlags.GranSimStats.Global) {
617     // globalGranStats.tot_bq_processing_time += bq_processing_time;
618     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
619     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
620     globalGranStats.tot_awbq++;             // total no. of bqs awakened
621   }
622   IF_GRAN_DEBUG(bq,
623                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
624                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
625 }
626 #elif defined(PARALLEL_HASKELL)
627 void 
628 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
629 {
630   StgBlockingQueueElement *bqe;
631
632   IF_PAR_DEBUG(verbose, 
633                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
634                      node, mytid));
635 #ifdef DIST  
636   //RFP
637   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
638     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
639     return;
640   }
641 #endif
642   
643   ASSERT(q == END_BQ_QUEUE ||
644          get_itbl(q)->type == TSO ||           
645          get_itbl(q)->type == BLOCKED_FETCH || 
646          get_itbl(q)->type == CONSTR); 
647
648   bqe = q;
649   while (get_itbl(bqe)->type==TSO || 
650          get_itbl(bqe)->type==BLOCKED_FETCH) {
651     bqe = unblockOne(bqe, node);
652   }
653 }
654
655 #else   /* !GRAN && !PARALLEL_HASKELL */
656
657 void
658 awakenBlockedQueue(Capability *cap, StgTSO *tso)
659 {
660     while (tso != END_TSO_QUEUE) {
661         tso = unblockOne(cap,tso);
662     }
663 }
664 #endif
665
666
667 /* ---------------------------------------------------------------------------
668  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
669  * used by Control.Concurrent for error checking.
670  * ------------------------------------------------------------------------- */
671  
672 HsBool
673 rtsSupportsBoundThreads(void)
674 {
675 #if defined(THREADED_RTS)
676   return HS_BOOL_TRUE;
677 #else
678   return HS_BOOL_FALSE;
679 #endif
680 }
681
682 /* ---------------------------------------------------------------------------
683  * isThreadBound(tso): check whether tso is bound to an OS thread.
684  * ------------------------------------------------------------------------- */
685  
686 StgBool
687 isThreadBound(StgTSO* tso USED_IF_THREADS)
688 {
689 #if defined(THREADED_RTS)
690   return (tso->bound != NULL);
691 #endif
692   return rtsFalse;
693 }
694
695 /* ----------------------------------------------------------------------------
696  * Debugging: why is a thread blocked
697  * ------------------------------------------------------------------------- */
698
699 #if DEBUG
700 void
701 printThreadBlockage(StgTSO *tso)
702 {
703   switch (tso->why_blocked) {
704   case BlockedOnRead:
705     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
706     break;
707   case BlockedOnWrite:
708     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
709     break;
710 #if defined(mingw32_HOST_OS)
711     case BlockedOnDoProc:
712     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
713     break;
714 #endif
715   case BlockedOnDelay:
716     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
717     break;
718   case BlockedOnMVar:
719     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
720     break;
721   case BlockedOnException:
722     debugBelch("is blocked on delivering an exception to thread %lu",
723                (unsigned long)tso->block_info.tso->id);
724     break;
725   case BlockedOnBlackHole:
726     debugBelch("is blocked on a black hole");
727     break;
728   case NotBlocked:
729     debugBelch("is not blocked");
730     break;
731 #if defined(PARALLEL_HASKELL)
732   case BlockedOnGA:
733     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
734             tso->block_info.closure, info_type(tso->block_info.closure));
735     break;
736   case BlockedOnGA_NoSend:
737     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
738             tso->block_info.closure, info_type(tso->block_info.closure));
739     break;
740 #endif
741   case BlockedOnCCall:
742     debugBelch("is blocked on an external call");
743     break;
744   case BlockedOnCCall_NoUnblockExc:
745     debugBelch("is blocked on an external call (exceptions were already blocked)");
746     break;
747   case BlockedOnSTM:
748     debugBelch("is blocked on an STM operation");
749     break;
750   default:
751     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
752          tso->why_blocked, tso->id, tso);
753   }
754 }
755
756 void
757 printThreadStatus(StgTSO *t)
758 {
759   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
760     {
761       void *label = lookupThreadLabel(t->id);
762       if (label) debugBelch("[\"%s\"] ",(char *)label);
763     }
764     if (t->what_next == ThreadRelocated) {
765         debugBelch("has been relocated...\n");
766     } else {
767         switch (t->what_next) {
768         case ThreadKilled:
769             debugBelch("has been killed");
770             break;
771         case ThreadComplete:
772             debugBelch("has completed");
773             break;
774         default:
775             printThreadBlockage(t);
776         }
777         if (t->flags & TSO_DIRTY) {
778             debugBelch(" (TSO_DIRTY)");
779         } else if (t->flags & TSO_LINK_DIRTY) {
780             debugBelch(" (TSO_LINK_DIRTY)");
781         }
782         debugBelch("\n");
783     }
784 }
785
786 void
787 printAllThreads(void)
788 {
789   StgTSO *t, *next;
790   nat i, s;
791   Capability *cap;
792
793 # if defined(GRAN)
794   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
795   ullong_format_string(TIME_ON_PROC(CurrentProc), 
796                        time_string, rtsFalse/*no commas!*/);
797
798   debugBelch("all threads at [%s]:\n", time_string);
799 # elif defined(PARALLEL_HASKELL)
800   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
801   ullong_format_string(CURRENT_TIME,
802                        time_string, rtsFalse/*no commas!*/);
803
804   debugBelch("all threads at [%s]:\n", time_string);
805 # else
806   debugBelch("all threads:\n");
807 # endif
808
809   for (i = 0; i < n_capabilities; i++) {
810       cap = &capabilities[i];
811       debugBelch("threads on capability %d:\n", cap->no);
812       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
813           printThreadStatus(t);
814       }
815   }
816
817   debugBelch("other threads:\n");
818   for (s = 0; s < total_steps; s++) {
819     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
820       if (t->why_blocked != NotBlocked) {
821           printThreadStatus(t);
822       }
823       if (t->what_next == ThreadRelocated) {
824           next = t->_link;
825       } else {
826           next = t->global_link;
827       }
828     }
829   }
830 }
831
832 // useful from gdb
833 void 
834 printThreadQueue(StgTSO *t)
835 {
836     nat i = 0;
837     for (; t != END_TSO_QUEUE; t = t->_link) {
838         printThreadStatus(t);
839         i++;
840     }
841     debugBelch("%d threads on queue\n", i);
842 }
843
844 /* 
845    Print a whole blocking queue attached to node (debugging only).
846 */
847 # if defined(PARALLEL_HASKELL)
848 void 
849 print_bq (StgClosure *node)
850 {
851   StgBlockingQueueElement *bqe;
852   StgTSO *tso;
853   rtsBool end;
854
855   debugBelch("## BQ of closure %p (%s): ",
856           node, info_type(node));
857
858   /* should cover all closures that may have a blocking queue */
859   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
860          get_itbl(node)->type == FETCH_ME_BQ ||
861          get_itbl(node)->type == RBH ||
862          get_itbl(node)->type == MVAR);
863     
864   ASSERT(node!=(StgClosure*)NULL);         // sanity check
865
866   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
867 }
868
869 /* 
870    Print a whole blocking queue starting with the element bqe.
871 */
872 void 
873 print_bqe (StgBlockingQueueElement *bqe)
874 {
875   rtsBool end;
876
877   /* 
878      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
879   */
880   for (end = (bqe==END_BQ_QUEUE);
881        !end; // iterate until bqe points to a CONSTR
882        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
883        bqe = end ? END_BQ_QUEUE : bqe->link) {
884     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
885     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
886     /* types of closures that may appear in a blocking queue */
887     ASSERT(get_itbl(bqe)->type == TSO ||           
888            get_itbl(bqe)->type == BLOCKED_FETCH || 
889            get_itbl(bqe)->type == CONSTR); 
890     /* only BQs of an RBH end with an RBH_Save closure */
891     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
892
893     switch (get_itbl(bqe)->type) {
894     case TSO:
895       debugBelch(" TSO %u (%x),",
896               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
897       break;
898     case BLOCKED_FETCH:
899       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
900               ((StgBlockedFetch *)bqe)->node, 
901               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
902               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
903               ((StgBlockedFetch *)bqe)->ga.weight);
904       break;
905     case CONSTR:
906       debugBelch(" %s (IP %p),",
907               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
908                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
909                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
910                "RBH_Save_?"), get_itbl(bqe));
911       break;
912     default:
913       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
914            info_type((StgClosure *)bqe)); // , node, info_type(node));
915       break;
916     }
917   } /* for */
918   debugBelch("\n");
919 }
920 # elif defined(GRAN)
921 void 
922 print_bq (StgClosure *node)
923 {
924   StgBlockingQueueElement *bqe;
925   PEs node_loc, tso_loc;
926   rtsBool end;
927
928   /* should cover all closures that may have a blocking queue */
929   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
930          get_itbl(node)->type == FETCH_ME_BQ ||
931          get_itbl(node)->type == RBH);
932     
933   ASSERT(node!=(StgClosure*)NULL);         // sanity check
934   node_loc = where_is(node);
935
936   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
937           node, info_type(node), node_loc);
938
939   /* 
940      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
941   */
942   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
943        !end; // iterate until bqe points to a CONSTR
944        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
945     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
946     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
947     /* types of closures that may appear in a blocking queue */
948     ASSERT(get_itbl(bqe)->type == TSO ||           
949            get_itbl(bqe)->type == CONSTR); 
950     /* only BQs of an RBH end with an RBH_Save closure */
951     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
952
953     tso_loc = where_is((StgClosure *)bqe);
954     switch (get_itbl(bqe)->type) {
955     case TSO:
956       debugBelch(" TSO %d (%p) on [PE %d],",
957               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
958       break;
959     case CONSTR:
960       debugBelch(" %s (IP %p),",
961               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
962                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
963                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
964                "RBH_Save_?"), get_itbl(bqe));
965       break;
966     default:
967       barf("Unexpected closure type %s in blocking queue of %p (%s)",
968            info_type((StgClosure *)bqe), node, info_type(node));
969       break;
970     }
971   } /* for */
972   debugBelch("\n");
973 }
974 # endif
975
976 #if defined(PARALLEL_HASKELL)
977 nat
978 run_queue_len(void)
979 {
980     nat i;
981     StgTSO *tso;
982     
983     for (i=0, tso=run_queue_hd; 
984          tso != END_TSO_QUEUE;
985          i++, tso=tso->link) {
986         /* nothing */
987     }
988         
989     return i;
990 }
991 #endif
992
993 #endif /* DEBUG */