Instead of a separate context-switch flag, set HpLim to zero
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21  * LOCK: sched_mutex
22  */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
27  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
28  *  + 1                       (the closure to enter)
29  *  + 1                       (stg_ap_v_ret)
30  *  + 1                       (spare slot req'd by stg_ap_v_ret)
31  *
32  * A thread with this stack will bomb immediately with a stack
33  * overflow, which will increase its stack size.  
34  */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38    Create a new thread.
39
40    The new thread starts with the given stack size.  Before the
41    scheduler can run, however, this thread needs to have a closure
42    (and possibly some arguments) pushed on its stack.  See
43    pushClosure() in Schedule.h.
44
45    createGenThread() and createIOThread() (in SchedAPI.h) are
46    convenient packaged versions of this function.
47
48    currently pri (priority) is only used in a GRAN setup -- HWL
49    ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59     StgTSO *tso;
60     nat stack_size;
61
62     /* sched_mutex is *not* required */
63
64     /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68         threadsIgnored++;
69         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71         return END_TSO_QUEUE;
72     }
73     threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82     /* catch ridiculously small stack sizes */
83     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85     }
86
87     stack_size = size - TSO_STRUCT_SIZEW;
88     
89     tso = (StgTSO *)allocateLocal(cap, size);
90     TICK_ALLOC_TSO(stack_size, 0);
91
92     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94     SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97     // Always start with the compiled code evaluator
98     tso->what_next = ThreadRunGHC;
99
100     tso->why_blocked  = NotBlocked;
101     tso->blocked_exceptions = END_TSO_QUEUE;
102     tso->flags = TSO_DIRTY;
103     
104     tso->saved_errno = 0;
105     tso->bound = NULL;
106     tso->cap = cap;
107     
108     tso->stack_size     = stack_size;
109     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
110                           - TSO_STRUCT_SIZEW;
111     tso->sp             = (P_)&(tso->stack) + stack_size;
112
113     tso->trec = NO_TREC;
114     
115 #ifdef PROFILING
116     tso->prof.CCCS = CCS_MAIN;
117 #endif
118     
119   /* put a stop frame on the stack */
120     tso->sp -= sizeofW(StgStopFrame);
121     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122     tso->_link = END_TSO_QUEUE;
123     
124   // ToDo: check this
125 #if defined(GRAN)
126     /* uses more flexible routine in GranSim */
127     insertThread(tso, CurrentProc);
128 #else
129     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130      * from its creation
131      */
132 #endif
133     
134 #if defined(GRAN) 
135     if (RtsFlags.GranFlags.GranSimStats.Full) 
136         DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138     if (RtsFlags.ParFlags.ParStats.Full) 
139         DumpGranEvent(GR_STARTQ,tso);
140     /* HACk to avoid SCHEDULE 
141        LastTSO = tso; */
142 #endif
143     
144     /* Link the new thread on the global thread list.
145      */
146     ACQUIRE_LOCK(&sched_mutex);
147     tso->id = next_thread_id++;  // while we have the mutex
148     tso->global_link = g0s0->threads;
149     g0s0->threads = tso;
150     RELEASE_LOCK(&sched_mutex);
151     
152 #if defined(DIST)
153     tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155     
156 #if defined(GRAN)
157     tso->gran.pri = pri;
158 # if defined(DEBUG)
159     tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161     tso->gran.sparkname   = 0;
162     tso->gran.startedat   = CURRENT_TIME; 
163     tso->gran.exported    = 0;
164     tso->gran.basicblocks = 0;
165     tso->gran.allocs      = 0;
166     tso->gran.exectime    = 0;
167     tso->gran.fetchtime   = 0;
168     tso->gran.fetchcount  = 0;
169     tso->gran.blocktime   = 0;
170     tso->gran.blockcount  = 0;
171     tso->gran.blockedat   = 0;
172     tso->gran.globalsparks = 0;
173     tso->gran.localsparks  = 0;
174     if (RtsFlags.GranFlags.Light)
175         tso->gran.clock  = Now; /* local clock */
176     else
177         tso->gran.clock  = 0;
178     
179     IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182     tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184     tso->par.sparkname   = 0;
185     tso->par.startedat   = CURRENT_TIME; 
186     tso->par.exported    = 0;
187     tso->par.basicblocks = 0;
188     tso->par.allocs      = 0;
189     tso->par.exectime    = 0;
190     tso->par.fetchtime   = 0;
191     tso->par.fetchcount  = 0;
192     tso->par.blocktime   = 0;
193     tso->par.blockcount  = 0;
194     tso->par.blockedat   = 0;
195     tso->par.globalsparks = 0;
196     tso->par.localsparks  = 0;
197 #endif
198     
199 #if defined(GRAN)
200     globalGranStats.tot_threads_created++;
201     globalGranStats.threads_created_on_PE[CurrentProc]++;
202     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203     globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205     // collect parallel global statistics (currently done together with GC stats)
206     if (RtsFlags.ParFlags.ParStats.Global &&
207         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
209         globalParStats.tot_threads_created++;
210     }
211 #endif 
212     
213 #if defined(GRAN)
214     debugTrace(GRAN_DEBUG_pri,
215                "==__ schedule: Created TSO %d (%p);",
216                CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218     debugTrace(PAR_DEBUG_verbose,
219                "==__ schedule: Created TSO %d (%p); %d threads active",
220                (long)tso->id, tso, advisory_thread_count);
221 #else
222     debugTrace(DEBUG_sched,
223                "created thread %ld, stack size = %lx words", 
224                (long)tso->id, (long)tso->stack_size);
225 #endif    
226     return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231    all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark) 
235 { StgTSO *tso;
236   ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
239   { threadsIgnored++;
240     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
242     return END_TSO_QUEUE;
243   }
244   else
245   { threadsCreated++;
246     tso = createThread(RtsFlags.GcFlags.initialStkSize);
247     if (tso==END_TSO_QUEUE)     
248       barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250     tso->priority = AdvisoryPriority;
251 #endif
252     pushClosure(tso,spark);
253     addToRunQueue(tso);
254     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
255   }
256   return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261  * Comparing Thread ids.
262  *
263  * This is used from STG land in the implementation of the
264  * instances of Eq/Ord for ThreadIds.
265  * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2) 
269
270   StgThreadID id1 = ((StgTSO *)tso1)->id; 
271   StgThreadID id2 = ((StgTSO *)tso2)->id;
272  
273   if (id1 < id2) return (-1);
274   if (id1 > id2) return 1;
275   return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279  * Fetching the ThreadID from an StgTSO.
280  *
281  * This is used in the implementation of Show for ThreadIds.
282  * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso) 
285 {
286   return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290    Remove a thread from a queue.
291    Fails fatally if the TSO is not on the queue.
292    -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
296 {
297     StgTSO *t, *prev;
298
299     prev = NULL;
300     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
301         if (t == tso) {
302             if (prev) {
303                 setTSOLink(cap,prev,t->_link);
304             } else {
305                 *queue = t->_link;
306             }
307             return;
308         }
309     }
310     barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (Capability *cap, 
315                          StgTSO **head, StgTSO **tail, StgTSO *tso)
316 {
317     StgTSO *t, *prev;
318
319     prev = NULL;
320     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
321         if (t == tso) {
322             if (prev) {
323                 setTSOLink(cap,prev,t->_link);
324             } else {
325                 *head = t->_link;
326             }
327             if (*tail == tso) {
328                 if (prev) {
329                     *tail = prev;
330                 } else {
331                     *tail = END_TSO_QUEUE;
332                 }
333             }
334             return;
335         }
336     }
337     barf("removeThreadFromMVarQueue: not found");
338 }
339
340 void
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
342 {
343     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
344 }
345
346 /* ----------------------------------------------------------------------------
347    unblockOne()
348
349    unblock a single thread.
350    ------------------------------------------------------------------------- */
351
352 #if defined(GRAN)
353 STATIC_INLINE void
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
355 {
356 }
357 #elif defined(PARALLEL_HASKELL)
358 STATIC_INLINE void
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 {
361   /* write RESUME events to log file and
362      update blocked and fetch time (depending on type of the orig closure) */
363   if (RtsFlags.ParFlags.ParStats.Full) {
364     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
365                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
367     if (emptyRunQueue())
368       emitSchedule = rtsTrue;
369
370     switch (get_itbl(node)->type) {
371         case FETCH_ME_BQ:
372           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
373           break;
374         case RBH:
375         case FETCH_ME:
376         case BLACKHOLE_BQ:
377           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
378           break;
379 #ifdef DIST
380         case MVAR:
381           break;
382 #endif    
383         default:
384           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
385         }
386       }
387 }
388 #endif
389
390 #if defined(GRAN)
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
393 {
394     StgTSO *tso;
395     PEs node_loc, tso_loc;
396
397     node_loc = where_is(node); // should be lifted out of loop
398     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
399     tso_loc = where_is((StgClosure *)tso);
400     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404       // insertThread(tso, node_loc);
405       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
406                 ResumeThread,
407                 tso, node, (rtsSpark*)NULL);
408       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
409       // len_local++;
410       // len++;
411     } else { // TSO is remote (actually should be FMBQ)
412       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413                                   RtsFlags.GranFlags.Costs.gunblocktime +
414                                   RtsFlags.GranFlags.Costs.latency;
415       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
416                 UnblockThread,
417                 tso, node, (rtsSpark*)NULL);
418       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
419       // len++;
420     }
421     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
422     IF_GRAN_DEBUG(bq,
423                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424                           (node_loc==tso_loc ? "Local" : "Global"), 
425                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426     tso->block_info.closure = NULL;
427     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", 
428                tso->id, tso));
429 }
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
433 {
434     StgBlockingQueueElement *next;
435
436     switch (get_itbl(bqe)->type) {
437     case TSO:
438       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439       /* if it's a TSO just push it onto the run_queue */
440       next = bqe->link;
441       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
443       threadRunnable();
444       unblockCount(bqe, node);
445       /* reset blocking status after dumping event */
446       ((StgTSO *)bqe)->why_blocked = NotBlocked;
447       break;
448
449     case BLOCKED_FETCH:
450       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
451       next = bqe->link;
452       bqe->link = (StgBlockingQueueElement *)PendingFetches;
453       PendingFetches = (StgBlockedFetch *)bqe;
454       break;
455
456 # if defined(DEBUG)
457       /* can ignore this case in a non-debugging setup; 
458          see comments on RBHSave closures above */
459     case CONSTR:
460       /* check that the closure is an RBHSave closure */
461       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
464       break;
465
466     default:
467       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
469            (StgClosure *)bqe);
470 # endif
471     }
472   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
473   return next;
474 }
475 #endif
476
477 StgTSO *
478 unblockOne (Capability *cap, StgTSO *tso)
479 {
480     return unblockOne_(cap,tso,rtsTrue); // allow migration
481 }
482
483 StgTSO *
484 unblockOne_ (Capability *cap, StgTSO *tso, 
485              rtsBool allow_migrate USED_IF_THREADS)
486 {
487   StgTSO *next;
488
489   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490   ASSERT(tso->why_blocked != NotBlocked);
491
492   tso->why_blocked = NotBlocked;
493   next = tso->_link;
494   tso->_link = END_TSO_QUEUE;
495
496 #if defined(THREADED_RTS)
497   if (tso->cap == cap || (!tsoLocked(tso) && 
498                           allow_migrate && 
499                           RtsFlags.ParFlags.wakeupMigrate)) {
500       // We are waking up this thread on the current Capability, which
501       // might involve migrating it from the Capability it was last on.
502       if (tso->bound) {
503           ASSERT(tso->bound->cap == tso->cap);
504           tso->bound->cap = cap;
505       }
506       tso->cap = cap;
507       appendToRunQueue(cap,tso);
508
509       // context-switch soonish so we can migrate the new thread if
510       // necessary.  NB. not contextSwitchCapability(cap), which would
511       // force a context switch immediately.
512       cap->context_switch = 1;
513   } else {
514       // we'll try to wake it up on the Capability it was last on.
515       wakeupThreadOnCapability(cap, tso->cap, tso);
516   }
517 #else
518   appendToRunQueue(cap,tso);
519
520   // context-switch soonish so we can migrate the new thread if
521   // necessary.  NB. not contextSwitchCapability(cap), which would
522   // force a context switch immediately.
523   cap->context_switch = 1;
524 #endif
525
526   debugTrace(DEBUG_sched,
527              "waking up thread %ld on cap %d",
528              (long)tso->id, tso->cap->no);
529
530   return next;
531 }
532
533 /* ----------------------------------------------------------------------------
534    awakenBlockedQueue
535
536    wakes up all the threads on the specified queue.
537    ------------------------------------------------------------------------- */
538
539 #if defined(GRAN)
540 void
541 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
542 {
543   StgBlockingQueueElement *bqe;
544   PEs node_loc;
545   nat len = 0; 
546
547   IF_GRAN_DEBUG(bq, 
548                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
549                       node, CurrentProc, CurrentTime[CurrentProc], 
550                       CurrentTSO->id, CurrentTSO));
551
552   node_loc = where_is(node);
553
554   ASSERT(q == END_BQ_QUEUE ||
555          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
556          get_itbl(q)->type == CONSTR); // closure (type constructor)
557   ASSERT(is_unique(node));
558
559   /* FAKE FETCH: magically copy the node to the tso's proc;
560      no Fetch necessary because in reality the node should not have been 
561      moved to the other PE in the first place
562   */
563   if (CurrentProc!=node_loc) {
564     IF_GRAN_DEBUG(bq, 
565                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
566                         node, node_loc, CurrentProc, CurrentTSO->id, 
567                         // CurrentTSO, where_is(CurrentTSO),
568                         node->header.gran.procs));
569     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
570     IF_GRAN_DEBUG(bq, 
571                   debugBelch("## new bitmask of node %p is %#x\n",
572                         node, node->header.gran.procs));
573     if (RtsFlags.GranFlags.GranSimStats.Global) {
574       globalGranStats.tot_fake_fetches++;
575     }
576   }
577
578   bqe = q;
579   // ToDo: check: ASSERT(CurrentProc==node_loc);
580   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
581     //next = bqe->link;
582     /* 
583        bqe points to the current element in the queue
584        next points to the next element in the queue
585     */
586     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
587     //tso_loc = where_is(tso);
588     len++;
589     bqe = unblockOne(bqe, node);
590   }
591
592   /* if this is the BQ of an RBH, we have to put back the info ripped out of
593      the closure to make room for the anchor of the BQ */
594   if (bqe!=END_BQ_QUEUE) {
595     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
596     /*
597     ASSERT((info_ptr==&RBH_Save_0_info) ||
598            (info_ptr==&RBH_Save_1_info) ||
599            (info_ptr==&RBH_Save_2_info));
600     */
601     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
602     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
603     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
604
605     IF_GRAN_DEBUG(bq,
606                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
607                         node, info_type(node)));
608   }
609
610   /* statistics gathering */
611   if (RtsFlags.GranFlags.GranSimStats.Global) {
612     // globalGranStats.tot_bq_processing_time += bq_processing_time;
613     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
614     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
615     globalGranStats.tot_awbq++;             // total no. of bqs awakened
616   }
617   IF_GRAN_DEBUG(bq,
618                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
619                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
620 }
621 #elif defined(PARALLEL_HASKELL)
622 void 
623 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
624 {
625   StgBlockingQueueElement *bqe;
626
627   IF_PAR_DEBUG(verbose, 
628                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
629                      node, mytid));
630 #ifdef DIST  
631   //RFP
632   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
633     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
634     return;
635   }
636 #endif
637   
638   ASSERT(q == END_BQ_QUEUE ||
639          get_itbl(q)->type == TSO ||           
640          get_itbl(q)->type == BLOCKED_FETCH || 
641          get_itbl(q)->type == CONSTR); 
642
643   bqe = q;
644   while (get_itbl(bqe)->type==TSO || 
645          get_itbl(bqe)->type==BLOCKED_FETCH) {
646     bqe = unblockOne(bqe, node);
647   }
648 }
649
650 #else   /* !GRAN && !PARALLEL_HASKELL */
651
652 void
653 awakenBlockedQueue(Capability *cap, StgTSO *tso)
654 {
655     while (tso != END_TSO_QUEUE) {
656         tso = unblockOne(cap,tso);
657     }
658 }
659 #endif
660
661
662 /* ---------------------------------------------------------------------------
663  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
664  * used by Control.Concurrent for error checking.
665  * ------------------------------------------------------------------------- */
666  
667 HsBool
668 rtsSupportsBoundThreads(void)
669 {
670 #if defined(THREADED_RTS)
671   return HS_BOOL_TRUE;
672 #else
673   return HS_BOOL_FALSE;
674 #endif
675 }
676
677 /* ---------------------------------------------------------------------------
678  * isThreadBound(tso): check whether tso is bound to an OS thread.
679  * ------------------------------------------------------------------------- */
680  
681 StgBool
682 isThreadBound(StgTSO* tso USED_IF_THREADS)
683 {
684 #if defined(THREADED_RTS)
685   return (tso->bound != NULL);
686 #endif
687   return rtsFalse;
688 }
689
690 /* ----------------------------------------------------------------------------
691  * Debugging: why is a thread blocked
692  * ------------------------------------------------------------------------- */
693
694 #if DEBUG
695 void
696 printThreadBlockage(StgTSO *tso)
697 {
698   switch (tso->why_blocked) {
699   case BlockedOnRead:
700     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
701     break;
702   case BlockedOnWrite:
703     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
704     break;
705 #if defined(mingw32_HOST_OS)
706     case BlockedOnDoProc:
707     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
708     break;
709 #endif
710   case BlockedOnDelay:
711     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
712     break;
713   case BlockedOnMVar:
714     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
715     break;
716   case BlockedOnException:
717     debugBelch("is blocked on delivering an exception to thread %lu",
718                (unsigned long)tso->block_info.tso->id);
719     break;
720   case BlockedOnBlackHole:
721     debugBelch("is blocked on a black hole");
722     break;
723   case NotBlocked:
724     debugBelch("is not blocked");
725     break;
726 #if defined(PARALLEL_HASKELL)
727   case BlockedOnGA:
728     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
729             tso->block_info.closure, info_type(tso->block_info.closure));
730     break;
731   case BlockedOnGA_NoSend:
732     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
733             tso->block_info.closure, info_type(tso->block_info.closure));
734     break;
735 #endif
736   case BlockedOnCCall:
737     debugBelch("is blocked on an external call");
738     break;
739   case BlockedOnCCall_NoUnblockExc:
740     debugBelch("is blocked on an external call (exceptions were already blocked)");
741     break;
742   case BlockedOnSTM:
743     debugBelch("is blocked on an STM operation");
744     break;
745   default:
746     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
747          tso->why_blocked, tso->id, tso);
748   }
749 }
750
751 void
752 printThreadStatus(StgTSO *t)
753 {
754   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
755     {
756       void *label = lookupThreadLabel(t->id);
757       if (label) debugBelch("[\"%s\"] ",(char *)label);
758     }
759     if (t->what_next == ThreadRelocated) {
760         debugBelch("has been relocated...\n");
761     } else {
762         switch (t->what_next) {
763         case ThreadKilled:
764             debugBelch("has been killed");
765             break;
766         case ThreadComplete:
767             debugBelch("has completed");
768             break;
769         default:
770             printThreadBlockage(t);
771         }
772         if (t->flags & TSO_DIRTY) {
773             debugBelch(" (TSO_DIRTY)");
774         } else if (t->flags & TSO_LINK_DIRTY) {
775             debugBelch(" (TSO_LINK_DIRTY)");
776         }
777         debugBelch("\n");
778     }
779 }
780
781 void
782 printAllThreads(void)
783 {
784   StgTSO *t, *next;
785   nat i, s;
786   Capability *cap;
787
788 # if defined(GRAN)
789   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
790   ullong_format_string(TIME_ON_PROC(CurrentProc), 
791                        time_string, rtsFalse/*no commas!*/);
792
793   debugBelch("all threads at [%s]:\n", time_string);
794 # elif defined(PARALLEL_HASKELL)
795   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
796   ullong_format_string(CURRENT_TIME,
797                        time_string, rtsFalse/*no commas!*/);
798
799   debugBelch("all threads at [%s]:\n", time_string);
800 # else
801   debugBelch("all threads:\n");
802 # endif
803
804   for (i = 0; i < n_capabilities; i++) {
805       cap = &capabilities[i];
806       debugBelch("threads on capability %d:\n", cap->no);
807       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
808           printThreadStatus(t);
809       }
810   }
811
812   debugBelch("other threads:\n");
813   for (s = 0; s < total_steps; s++) {
814     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
815       if (t->why_blocked != NotBlocked) {
816           printThreadStatus(t);
817       }
818       if (t->what_next == ThreadRelocated) {
819           next = t->_link;
820       } else {
821           next = t->global_link;
822       }
823     }
824   }
825 }
826
827 // useful from gdb
828 void 
829 printThreadQueue(StgTSO *t)
830 {
831     nat i = 0;
832     for (; t != END_TSO_QUEUE; t = t->_link) {
833         printThreadStatus(t);
834         i++;
835     }
836     debugBelch("%d threads on queue\n", i);
837 }
838
839 /* 
840    Print a whole blocking queue attached to node (debugging only).
841 */
842 # if defined(PARALLEL_HASKELL)
843 void 
844 print_bq (StgClosure *node)
845 {
846   StgBlockingQueueElement *bqe;
847   StgTSO *tso;
848   rtsBool end;
849
850   debugBelch("## BQ of closure %p (%s): ",
851           node, info_type(node));
852
853   /* should cover all closures that may have a blocking queue */
854   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
855          get_itbl(node)->type == FETCH_ME_BQ ||
856          get_itbl(node)->type == RBH ||
857          get_itbl(node)->type == MVAR);
858     
859   ASSERT(node!=(StgClosure*)NULL);         // sanity check
860
861   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
862 }
863
864 /* 
865    Print a whole blocking queue starting with the element bqe.
866 */
867 void 
868 print_bqe (StgBlockingQueueElement *bqe)
869 {
870   rtsBool end;
871
872   /* 
873      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
874   */
875   for (end = (bqe==END_BQ_QUEUE);
876        !end; // iterate until bqe points to a CONSTR
877        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
878        bqe = end ? END_BQ_QUEUE : bqe->link) {
879     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
880     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
881     /* types of closures that may appear in a blocking queue */
882     ASSERT(get_itbl(bqe)->type == TSO ||           
883            get_itbl(bqe)->type == BLOCKED_FETCH || 
884            get_itbl(bqe)->type == CONSTR); 
885     /* only BQs of an RBH end with an RBH_Save closure */
886     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
887
888     switch (get_itbl(bqe)->type) {
889     case TSO:
890       debugBelch(" TSO %u (%x),",
891               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
892       break;
893     case BLOCKED_FETCH:
894       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
895               ((StgBlockedFetch *)bqe)->node, 
896               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
897               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
898               ((StgBlockedFetch *)bqe)->ga.weight);
899       break;
900     case CONSTR:
901       debugBelch(" %s (IP %p),",
902               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
903                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
904                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
905                "RBH_Save_?"), get_itbl(bqe));
906       break;
907     default:
908       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
909            info_type((StgClosure *)bqe)); // , node, info_type(node));
910       break;
911     }
912   } /* for */
913   debugBelch("\n");
914 }
915 # elif defined(GRAN)
916 void 
917 print_bq (StgClosure *node)
918 {
919   StgBlockingQueueElement *bqe;
920   PEs node_loc, tso_loc;
921   rtsBool end;
922
923   /* should cover all closures that may have a blocking queue */
924   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
925          get_itbl(node)->type == FETCH_ME_BQ ||
926          get_itbl(node)->type == RBH);
927     
928   ASSERT(node!=(StgClosure*)NULL);         // sanity check
929   node_loc = where_is(node);
930
931   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
932           node, info_type(node), node_loc);
933
934   /* 
935      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
936   */
937   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
938        !end; // iterate until bqe points to a CONSTR
939        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
940     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
941     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
942     /* types of closures that may appear in a blocking queue */
943     ASSERT(get_itbl(bqe)->type == TSO ||           
944            get_itbl(bqe)->type == CONSTR); 
945     /* only BQs of an RBH end with an RBH_Save closure */
946     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
947
948     tso_loc = where_is((StgClosure *)bqe);
949     switch (get_itbl(bqe)->type) {
950     case TSO:
951       debugBelch(" TSO %d (%p) on [PE %d],",
952               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
953       break;
954     case CONSTR:
955       debugBelch(" %s (IP %p),",
956               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
957                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
958                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
959                "RBH_Save_?"), get_itbl(bqe));
960       break;
961     default:
962       barf("Unexpected closure type %s in blocking queue of %p (%s)",
963            info_type((StgClosure *)bqe), node, info_type(node));
964       break;
965     }
966   } /* for */
967   debugBelch("\n");
968 }
969 # endif
970
971 #if defined(PARALLEL_HASKELL)
972 nat
973 run_queue_len(void)
974 {
975     nat i;
976     StgTSO *tso;
977     
978     for (i=0, tso=run_queue_hd; 
979          tso != END_TSO_QUEUE;
980          i++, tso=tso->link) {
981         /* nothing */
982     }
983         
984     return i;
985 }
986 #endif
987
988 #endif /* DEBUG */