rename StgSync to SpinLock
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21  * LOCK: sched_mutex
22  */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
27  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
28  *  + 1                       (the closure to enter)
29  *  + 1                       (stg_ap_v_ret)
30  *  + 1                       (spare slot req'd by stg_ap_v_ret)
31  *
32  * A thread with this stack will bomb immediately with a stack
33  * overflow, which will increase its stack size.  
34  */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38    Create a new thread.
39
40    The new thread starts with the given stack size.  Before the
41    scheduler can run, however, this thread needs to have a closure
42    (and possibly some arguments) pushed on its stack.  See
43    pushClosure() in Schedule.h.
44
45    createGenThread() and createIOThread() (in SchedAPI.h) are
46    convenient packaged versions of this function.
47
48    currently pri (priority) is only used in a GRAN setup -- HWL
49    ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59     StgTSO *tso;
60     nat stack_size;
61
62     /* sched_mutex is *not* required */
63
64     /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68         threadsIgnored++;
69         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71         return END_TSO_QUEUE;
72     }
73     threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82     /* catch ridiculously small stack sizes */
83     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85     }
86
87     stack_size = size - TSO_STRUCT_SIZEW;
88     
89     tso = (StgTSO *)allocateLocal(cap, size);
90     TICK_ALLOC_TSO(stack_size, 0);
91
92     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94     SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97     // Always start with the compiled code evaluator
98     tso->what_next = ThreadRunGHC;
99
100     tso->why_blocked  = NotBlocked;
101     tso->blocked_exceptions = END_TSO_QUEUE;
102     tso->flags = TSO_DIRTY;
103     
104     tso->saved_errno = 0;
105     tso->bound = NULL;
106     tso->cap = cap;
107     
108     tso->stack_size     = stack_size;
109     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
110                           - TSO_STRUCT_SIZEW;
111     tso->sp             = (P_)&(tso->stack) + stack_size;
112
113     tso->trec = NO_TREC;
114     
115 #ifdef PROFILING
116     tso->prof.CCCS = CCS_MAIN;
117 #endif
118     
119   /* put a stop frame on the stack */
120     tso->sp -= sizeofW(StgStopFrame);
121     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122     tso->link = END_TSO_QUEUE;
123     
124   // ToDo: check this
125 #if defined(GRAN)
126     /* uses more flexible routine in GranSim */
127     insertThread(tso, CurrentProc);
128 #else
129     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130      * from its creation
131      */
132 #endif
133     
134 #if defined(GRAN) 
135     if (RtsFlags.GranFlags.GranSimStats.Full) 
136         DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138     if (RtsFlags.ParFlags.ParStats.Full) 
139         DumpGranEvent(GR_STARTQ,tso);
140     /* HACk to avoid SCHEDULE 
141        LastTSO = tso; */
142 #endif
143     
144     /* Link the new thread on the global thread list.
145      */
146     ACQUIRE_LOCK(&sched_mutex);
147     tso->id = next_thread_id++;  // while we have the mutex
148     tso->global_link = all_threads;
149     all_threads = tso;
150     RELEASE_LOCK(&sched_mutex);
151     
152 #if defined(DIST)
153     tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155     
156 #if defined(GRAN)
157     tso->gran.pri = pri;
158 # if defined(DEBUG)
159     tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161     tso->gran.sparkname   = 0;
162     tso->gran.startedat   = CURRENT_TIME; 
163     tso->gran.exported    = 0;
164     tso->gran.basicblocks = 0;
165     tso->gran.allocs      = 0;
166     tso->gran.exectime    = 0;
167     tso->gran.fetchtime   = 0;
168     tso->gran.fetchcount  = 0;
169     tso->gran.blocktime   = 0;
170     tso->gran.blockcount  = 0;
171     tso->gran.blockedat   = 0;
172     tso->gran.globalsparks = 0;
173     tso->gran.localsparks  = 0;
174     if (RtsFlags.GranFlags.Light)
175         tso->gran.clock  = Now; /* local clock */
176     else
177         tso->gran.clock  = 0;
178     
179     IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182     tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184     tso->par.sparkname   = 0;
185     tso->par.startedat   = CURRENT_TIME; 
186     tso->par.exported    = 0;
187     tso->par.basicblocks = 0;
188     tso->par.allocs      = 0;
189     tso->par.exectime    = 0;
190     tso->par.fetchtime   = 0;
191     tso->par.fetchcount  = 0;
192     tso->par.blocktime   = 0;
193     tso->par.blockcount  = 0;
194     tso->par.blockedat   = 0;
195     tso->par.globalsparks = 0;
196     tso->par.localsparks  = 0;
197 #endif
198     
199 #if defined(GRAN)
200     globalGranStats.tot_threads_created++;
201     globalGranStats.threads_created_on_PE[CurrentProc]++;
202     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203     globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205     // collect parallel global statistics (currently done together with GC stats)
206     if (RtsFlags.ParFlags.ParStats.Global &&
207         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
209         globalParStats.tot_threads_created++;
210     }
211 #endif 
212     
213 #if defined(GRAN)
214     debugTrace(GRAN_DEBUG_pri,
215                "==__ schedule: Created TSO %d (%p);",
216                CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218     debugTrace(PAR_DEBUG_verbose,
219                "==__ schedule: Created TSO %d (%p); %d threads active",
220                (long)tso->id, tso, advisory_thread_count);
221 #else
222     debugTrace(DEBUG_sched,
223                "created thread %ld, stack size = %lx words", 
224                (long)tso->id, (long)tso->stack_size);
225 #endif    
226     return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231    all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark) 
235 { StgTSO *tso;
236   ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
239   { threadsIgnored++;
240     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
242     return END_TSO_QUEUE;
243   }
244   else
245   { threadsCreated++;
246     tso = createThread(RtsFlags.GcFlags.initialStkSize);
247     if (tso==END_TSO_QUEUE)     
248       barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250     tso->priority = AdvisoryPriority;
251 #endif
252     pushClosure(tso,spark);
253     addToRunQueue(tso);
254     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
255   }
256   return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261  * Comparing Thread ids.
262  *
263  * This is used from STG land in the implementation of the
264  * instances of Eq/Ord for ThreadIds.
265  * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2) 
269
270   StgThreadID id1 = ((StgTSO *)tso1)->id; 
271   StgThreadID id2 = ((StgTSO *)tso2)->id;
272  
273   if (id1 < id2) return (-1);
274   if (id1 > id2) return 1;
275   return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279  * Fetching the ThreadID from an StgTSO.
280  *
281  * This is used in the implementation of Show for ThreadIds.
282  * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso) 
285 {
286   return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290    Remove a thread from a queue.
291    Fails fatally if the TSO is not on the queue.
292    -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (StgTSO **queue, StgTSO *tso)
296 {
297     StgTSO *t, *prev;
298
299     prev = NULL;
300     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) {
301         if (t == tso) {
302             if (prev) {
303                 prev->link = t->link;
304             } else {
305                 *queue = t->link;
306             }
307             return;
308         }
309     }
310     barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso)
315 {
316     StgTSO *t, *prev;
317
318     prev = NULL;
319     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) {
320         if (t == tso) {
321             if (prev) {
322                 prev->link = t->link;
323             } else {
324                 *head = t->link;
325             }
326             if (*tail == tso) {
327                 if (prev) {
328                     *tail = prev;
329                 } else {
330                     *tail = END_TSO_QUEUE;
331                 }
332             }
333             return;
334         }
335     }
336     barf("removeThreadFromMVarQueue: not found");
337 }
338
339 void
340 removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso)
341 {
342     removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso);
343 }
344
345 /* ----------------------------------------------------------------------------
346    unblockOne()
347
348    unblock a single thread.
349    ------------------------------------------------------------------------- */
350
351 #if defined(GRAN)
352 STATIC_INLINE void
353 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
354 {
355 }
356 #elif defined(PARALLEL_HASKELL)
357 STATIC_INLINE void
358 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
359 {
360   /* write RESUME events to log file and
361      update blocked and fetch time (depending on type of the orig closure) */
362   if (RtsFlags.ParFlags.ParStats.Full) {
363     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
364                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
365                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
366     if (emptyRunQueue())
367       emitSchedule = rtsTrue;
368
369     switch (get_itbl(node)->type) {
370         case FETCH_ME_BQ:
371           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
372           break;
373         case RBH:
374         case FETCH_ME:
375         case BLACKHOLE_BQ:
376           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
377           break;
378 #ifdef DIST
379         case MVAR:
380           break;
381 #endif    
382         default:
383           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
384         }
385       }
386 }
387 #endif
388
389 #if defined(GRAN)
390 StgBlockingQueueElement *
391 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
392 {
393     StgTSO *tso;
394     PEs node_loc, tso_loc;
395
396     node_loc = where_is(node); // should be lifted out of loop
397     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
398     tso_loc = where_is((StgClosure *)tso);
399     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
400       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
401       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
402       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
403       // insertThread(tso, node_loc);
404       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
405                 ResumeThread,
406                 tso, node, (rtsSpark*)NULL);
407       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
408       // len_local++;
409       // len++;
410     } else { // TSO is remote (actually should be FMBQ)
411       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
412                                   RtsFlags.GranFlags.Costs.gunblocktime +
413                                   RtsFlags.GranFlags.Costs.latency;
414       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
415                 UnblockThread,
416                 tso, node, (rtsSpark*)NULL);
417       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
418       // len++;
419     }
420     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
421     IF_GRAN_DEBUG(bq,
422                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
423                           (node_loc==tso_loc ? "Local" : "Global"), 
424                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
425     tso->block_info.closure = NULL;
426     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", 
427                tso->id, tso));
428 }
429 #elif defined(PARALLEL_HASKELL)
430 StgBlockingQueueElement *
431 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
432 {
433     StgBlockingQueueElement *next;
434
435     switch (get_itbl(bqe)->type) {
436     case TSO:
437       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
438       /* if it's a TSO just push it onto the run_queue */
439       next = bqe->link;
440       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
441       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
442       threadRunnable();
443       unblockCount(bqe, node);
444       /* reset blocking status after dumping event */
445       ((StgTSO *)bqe)->why_blocked = NotBlocked;
446       break;
447
448     case BLOCKED_FETCH:
449       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
450       next = bqe->link;
451       bqe->link = (StgBlockingQueueElement *)PendingFetches;
452       PendingFetches = (StgBlockedFetch *)bqe;
453       break;
454
455 # if defined(DEBUG)
456       /* can ignore this case in a non-debugging setup; 
457          see comments on RBHSave closures above */
458     case CONSTR:
459       /* check that the closure is an RBHSave closure */
460       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
461              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
462              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
463       break;
464
465     default:
466       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
467            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
468            (StgClosure *)bqe);
469 # endif
470     }
471   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
472   return next;
473 }
474 #endif
475
476 StgTSO *
477 unblockOne (Capability *cap, StgTSO *tso)
478 {
479     return unblockOne_(cap,tso,rtsTrue); // allow migration
480 }
481
482 StgTSO *
483 unblockOne_ (Capability *cap, StgTSO *tso, 
484              rtsBool allow_migrate USED_IF_THREADS)
485 {
486   StgTSO *next;
487
488   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
489   ASSERT(tso->why_blocked != NotBlocked);
490
491   tso->why_blocked = NotBlocked;
492   next = tso->link;
493   tso->link = END_TSO_QUEUE;
494
495 #if defined(THREADED_RTS)
496   if (tso->cap == cap || (!tsoLocked(tso) && 
497                           allow_migrate && 
498                           RtsFlags.ParFlags.wakeupMigrate)) {
499       // We are waking up this thread on the current Capability, which
500       // might involve migrating it from the Capability it was last on.
501       if (tso->bound) {
502           ASSERT(tso->bound->cap == tso->cap);
503           tso->bound->cap = cap;
504       }
505       tso->cap = cap;
506       appendToRunQueue(cap,tso);
507       // we're holding a newly woken thread, make sure we context switch
508       // quickly so we can migrate it if necessary.
509       context_switch = 1;
510   } else {
511       // we'll try to wake it up on the Capability it was last on.
512       wakeupThreadOnCapability_lock(tso->cap, tso);
513   }
514 #else
515   appendToRunQueue(cap,tso);
516   context_switch = 1;
517 #endif
518
519   debugTrace(DEBUG_sched,
520              "waking up thread %ld on cap %d",
521              (long)tso->id, tso->cap->no);
522
523   return next;
524 }
525
526 /* ----------------------------------------------------------------------------
527    awakenBlockedQueue
528
529    wakes up all the threads on the specified queue.
530    ------------------------------------------------------------------------- */
531
532 #if defined(GRAN)
533 void
534 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
535 {
536   StgBlockingQueueElement *bqe;
537   PEs node_loc;
538   nat len = 0; 
539
540   IF_GRAN_DEBUG(bq, 
541                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
542                       node, CurrentProc, CurrentTime[CurrentProc], 
543                       CurrentTSO->id, CurrentTSO));
544
545   node_loc = where_is(node);
546
547   ASSERT(q == END_BQ_QUEUE ||
548          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
549          get_itbl(q)->type == CONSTR); // closure (type constructor)
550   ASSERT(is_unique(node));
551
552   /* FAKE FETCH: magically copy the node to the tso's proc;
553      no Fetch necessary because in reality the node should not have been 
554      moved to the other PE in the first place
555   */
556   if (CurrentProc!=node_loc) {
557     IF_GRAN_DEBUG(bq, 
558                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
559                         node, node_loc, CurrentProc, CurrentTSO->id, 
560                         // CurrentTSO, where_is(CurrentTSO),
561                         node->header.gran.procs));
562     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
563     IF_GRAN_DEBUG(bq, 
564                   debugBelch("## new bitmask of node %p is %#x\n",
565                         node, node->header.gran.procs));
566     if (RtsFlags.GranFlags.GranSimStats.Global) {
567       globalGranStats.tot_fake_fetches++;
568     }
569   }
570
571   bqe = q;
572   // ToDo: check: ASSERT(CurrentProc==node_loc);
573   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
574     //next = bqe->link;
575     /* 
576        bqe points to the current element in the queue
577        next points to the next element in the queue
578     */
579     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
580     //tso_loc = where_is(tso);
581     len++;
582     bqe = unblockOne(bqe, node);
583   }
584
585   /* if this is the BQ of an RBH, we have to put back the info ripped out of
586      the closure to make room for the anchor of the BQ */
587   if (bqe!=END_BQ_QUEUE) {
588     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
589     /*
590     ASSERT((info_ptr==&RBH_Save_0_info) ||
591            (info_ptr==&RBH_Save_1_info) ||
592            (info_ptr==&RBH_Save_2_info));
593     */
594     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
595     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
596     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
597
598     IF_GRAN_DEBUG(bq,
599                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
600                         node, info_type(node)));
601   }
602
603   /* statistics gathering */
604   if (RtsFlags.GranFlags.GranSimStats.Global) {
605     // globalGranStats.tot_bq_processing_time += bq_processing_time;
606     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
607     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
608     globalGranStats.tot_awbq++;             // total no. of bqs awakened
609   }
610   IF_GRAN_DEBUG(bq,
611                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
612                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
613 }
614 #elif defined(PARALLEL_HASKELL)
615 void 
616 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
617 {
618   StgBlockingQueueElement *bqe;
619
620   IF_PAR_DEBUG(verbose, 
621                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
622                      node, mytid));
623 #ifdef DIST  
624   //RFP
625   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
626     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
627     return;
628   }
629 #endif
630   
631   ASSERT(q == END_BQ_QUEUE ||
632          get_itbl(q)->type == TSO ||           
633          get_itbl(q)->type == BLOCKED_FETCH || 
634          get_itbl(q)->type == CONSTR); 
635
636   bqe = q;
637   while (get_itbl(bqe)->type==TSO || 
638          get_itbl(bqe)->type==BLOCKED_FETCH) {
639     bqe = unblockOne(bqe, node);
640   }
641 }
642
643 #else   /* !GRAN && !PARALLEL_HASKELL */
644
645 void
646 awakenBlockedQueue(Capability *cap, StgTSO *tso)
647 {
648     while (tso != END_TSO_QUEUE) {
649         tso = unblockOne(cap,tso);
650     }
651 }
652 #endif
653
654
655 /* ---------------------------------------------------------------------------
656  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
657  * used by Control.Concurrent for error checking.
658  * ------------------------------------------------------------------------- */
659  
660 HsBool
661 rtsSupportsBoundThreads(void)
662 {
663 #if defined(THREADED_RTS)
664   return HS_BOOL_TRUE;
665 #else
666   return HS_BOOL_FALSE;
667 #endif
668 }
669
670 /* ---------------------------------------------------------------------------
671  * isThreadBound(tso): check whether tso is bound to an OS thread.
672  * ------------------------------------------------------------------------- */
673  
674 StgBool
675 isThreadBound(StgTSO* tso USED_IF_THREADS)
676 {
677 #if defined(THREADED_RTS)
678   return (tso->bound != NULL);
679 #endif
680   return rtsFalse;
681 }
682
683 /* ----------------------------------------------------------------------------
684  * Debugging: why is a thread blocked
685  * ------------------------------------------------------------------------- */
686
687 #if DEBUG
688 void
689 printThreadBlockage(StgTSO *tso)
690 {
691   switch (tso->why_blocked) {
692   case BlockedOnRead:
693     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
694     break;
695   case BlockedOnWrite:
696     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
697     break;
698 #if defined(mingw32_HOST_OS)
699     case BlockedOnDoProc:
700     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
701     break;
702 #endif
703   case BlockedOnDelay:
704     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
705     break;
706   case BlockedOnMVar:
707     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
708     break;
709   case BlockedOnException:
710     debugBelch("is blocked on delivering an exception to thread %lu",
711                (unsigned long)tso->block_info.tso->id);
712     break;
713   case BlockedOnBlackHole:
714     debugBelch("is blocked on a black hole");
715     break;
716   case NotBlocked:
717     debugBelch("is not blocked");
718     break;
719 #if defined(PARALLEL_HASKELL)
720   case BlockedOnGA:
721     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
722             tso->block_info.closure, info_type(tso->block_info.closure));
723     break;
724   case BlockedOnGA_NoSend:
725     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
726             tso->block_info.closure, info_type(tso->block_info.closure));
727     break;
728 #endif
729   case BlockedOnCCall:
730     debugBelch("is blocked on an external call");
731     break;
732   case BlockedOnCCall_NoUnblockExc:
733     debugBelch("is blocked on an external call (exceptions were already blocked)");
734     break;
735   case BlockedOnSTM:
736     debugBelch("is blocked on an STM operation");
737     break;
738   default:
739     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
740          tso->why_blocked, tso->id, tso);
741   }
742 }
743
744 void
745 printThreadStatus(StgTSO *t)
746 {
747   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
748     {
749       void *label = lookupThreadLabel(t->id);
750       if (label) debugBelch("[\"%s\"] ",(char *)label);
751     }
752     if (t->what_next == ThreadRelocated) {
753         debugBelch("has been relocated...\n");
754     } else {
755         switch (t->what_next) {
756         case ThreadKilled:
757             debugBelch("has been killed");
758             break;
759         case ThreadComplete:
760             debugBelch("has completed");
761             break;
762         default:
763             printThreadBlockage(t);
764         }
765         debugBelch("\n");
766     }
767 }
768
769 void
770 printAllThreads(void)
771 {
772   StgTSO *t, *next;
773   nat i;
774   Capability *cap;
775
776 # if defined(GRAN)
777   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
778   ullong_format_string(TIME_ON_PROC(CurrentProc), 
779                        time_string, rtsFalse/*no commas!*/);
780
781   debugBelch("all threads at [%s]:\n", time_string);
782 # elif defined(PARALLEL_HASKELL)
783   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
784   ullong_format_string(CURRENT_TIME,
785                        time_string, rtsFalse/*no commas!*/);
786
787   debugBelch("all threads at [%s]:\n", time_string);
788 # else
789   debugBelch("all threads:\n");
790 # endif
791
792   for (i = 0; i < n_capabilities; i++) {
793       cap = &capabilities[i];
794       debugBelch("threads on capability %d:\n", cap->no);
795       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
796           printThreadStatus(t);
797       }
798   }
799
800   debugBelch("other threads:\n");
801   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
802       if (t->why_blocked != NotBlocked) {
803           printThreadStatus(t);
804       }
805       if (t->what_next == ThreadRelocated) {
806           next = t->link;
807       } else {
808           next = t->global_link;
809       }
810   }
811 }
812
813 // useful from gdb
814 void 
815 printThreadQueue(StgTSO *t)
816 {
817     nat i = 0;
818     for (; t != END_TSO_QUEUE; t = t->link) {
819         printThreadStatus(t);
820         i++;
821     }
822     debugBelch("%d threads on queue\n", i);
823 }
824
825 /* 
826    Print a whole blocking queue attached to node (debugging only).
827 */
828 # if defined(PARALLEL_HASKELL)
829 void 
830 print_bq (StgClosure *node)
831 {
832   StgBlockingQueueElement *bqe;
833   StgTSO *tso;
834   rtsBool end;
835
836   debugBelch("## BQ of closure %p (%s): ",
837           node, info_type(node));
838
839   /* should cover all closures that may have a blocking queue */
840   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
841          get_itbl(node)->type == FETCH_ME_BQ ||
842          get_itbl(node)->type == RBH ||
843          get_itbl(node)->type == MVAR);
844     
845   ASSERT(node!=(StgClosure*)NULL);         // sanity check
846
847   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
848 }
849
850 /* 
851    Print a whole blocking queue starting with the element bqe.
852 */
853 void 
854 print_bqe (StgBlockingQueueElement *bqe)
855 {
856   rtsBool end;
857
858   /* 
859      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
860   */
861   for (end = (bqe==END_BQ_QUEUE);
862        !end; // iterate until bqe points to a CONSTR
863        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
864        bqe = end ? END_BQ_QUEUE : bqe->link) {
865     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
866     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
867     /* types of closures that may appear in a blocking queue */
868     ASSERT(get_itbl(bqe)->type == TSO ||           
869            get_itbl(bqe)->type == BLOCKED_FETCH || 
870            get_itbl(bqe)->type == CONSTR); 
871     /* only BQs of an RBH end with an RBH_Save closure */
872     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
873
874     switch (get_itbl(bqe)->type) {
875     case TSO:
876       debugBelch(" TSO %u (%x),",
877               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
878       break;
879     case BLOCKED_FETCH:
880       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
881               ((StgBlockedFetch *)bqe)->node, 
882               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
883               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
884               ((StgBlockedFetch *)bqe)->ga.weight);
885       break;
886     case CONSTR:
887       debugBelch(" %s (IP %p),",
888               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
889                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
890                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
891                "RBH_Save_?"), get_itbl(bqe));
892       break;
893     default:
894       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
895            info_type((StgClosure *)bqe)); // , node, info_type(node));
896       break;
897     }
898   } /* for */
899   debugBelch("\n");
900 }
901 # elif defined(GRAN)
902 void 
903 print_bq (StgClosure *node)
904 {
905   StgBlockingQueueElement *bqe;
906   PEs node_loc, tso_loc;
907   rtsBool end;
908
909   /* should cover all closures that may have a blocking queue */
910   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
911          get_itbl(node)->type == FETCH_ME_BQ ||
912          get_itbl(node)->type == RBH);
913     
914   ASSERT(node!=(StgClosure*)NULL);         // sanity check
915   node_loc = where_is(node);
916
917   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
918           node, info_type(node), node_loc);
919
920   /* 
921      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
922   */
923   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
924        !end; // iterate until bqe points to a CONSTR
925        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
926     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
927     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
928     /* types of closures that may appear in a blocking queue */
929     ASSERT(get_itbl(bqe)->type == TSO ||           
930            get_itbl(bqe)->type == CONSTR); 
931     /* only BQs of an RBH end with an RBH_Save closure */
932     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
933
934     tso_loc = where_is((StgClosure *)bqe);
935     switch (get_itbl(bqe)->type) {
936     case TSO:
937       debugBelch(" TSO %d (%p) on [PE %d],",
938               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
939       break;
940     case CONSTR:
941       debugBelch(" %s (IP %p),",
942               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
943                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
944                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
945                "RBH_Save_?"), get_itbl(bqe));
946       break;
947     default:
948       barf("Unexpected closure type %s in blocking queue of %p (%s)",
949            info_type((StgClosure *)bqe), node, info_type(node));
950       break;
951     }
952   } /* for */
953   debugBelch("\n");
954 }
955 # endif
956
957 #if defined(PARALLEL_HASKELL)
958 nat
959 run_queue_len(void)
960 {
961     nat i;
962     StgTSO *tso;
963     
964     for (i=0, tso=run_queue_hd; 
965          tso != END_TSO_QUEUE;
966          i++, tso=tso->link) {
967         /* nothing */
968     }
969         
970     return i;
971 }
972 #endif
973
974 #endif /* DEBUG */