Don't traverse the entire list of threads on every GC (phase 1)
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21  * LOCK: sched_mutex
22  */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
27  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
28  *  + 1                       (the closure to enter)
29  *  + 1                       (stg_ap_v_ret)
30  *  + 1                       (spare slot req'd by stg_ap_v_ret)
31  *
32  * A thread with this stack will bomb immediately with a stack
33  * overflow, which will increase its stack size.  
34  */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38    Create a new thread.
39
40    The new thread starts with the given stack size.  Before the
41    scheduler can run, however, this thread needs to have a closure
42    (and possibly some arguments) pushed on its stack.  See
43    pushClosure() in Schedule.h.
44
45    createGenThread() and createIOThread() (in SchedAPI.h) are
46    convenient packaged versions of this function.
47
48    currently pri (priority) is only used in a GRAN setup -- HWL
49    ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59     StgTSO *tso;
60     nat stack_size;
61
62     /* sched_mutex is *not* required */
63
64     /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68         threadsIgnored++;
69         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71         return END_TSO_QUEUE;
72     }
73     threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82     /* catch ridiculously small stack sizes */
83     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85     }
86
87     stack_size = size - TSO_STRUCT_SIZEW;
88     
89     tso = (StgTSO *)allocateLocal(cap, size);
90     TICK_ALLOC_TSO(stack_size, 0);
91
92     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94     SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97     // Always start with the compiled code evaluator
98     tso->what_next = ThreadRunGHC;
99
100     tso->why_blocked  = NotBlocked;
101     tso->blocked_exceptions = END_TSO_QUEUE;
102     tso->flags = TSO_DIRTY;
103     
104     tso->saved_errno = 0;
105     tso->bound = NULL;
106     tso->cap = cap;
107     
108     tso->stack_size     = stack_size;
109     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
110                           - TSO_STRUCT_SIZEW;
111     tso->sp             = (P_)&(tso->stack) + stack_size;
112
113     tso->trec = NO_TREC;
114     
115 #ifdef PROFILING
116     tso->prof.CCCS = CCS_MAIN;
117 #endif
118     
119   /* put a stop frame on the stack */
120     tso->sp -= sizeofW(StgStopFrame);
121     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122     tso->_link = END_TSO_QUEUE;
123     
124   // ToDo: check this
125 #if defined(GRAN)
126     /* uses more flexible routine in GranSim */
127     insertThread(tso, CurrentProc);
128 #else
129     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130      * from its creation
131      */
132 #endif
133     
134 #if defined(GRAN) 
135     if (RtsFlags.GranFlags.GranSimStats.Full) 
136         DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138     if (RtsFlags.ParFlags.ParStats.Full) 
139         DumpGranEvent(GR_STARTQ,tso);
140     /* HACk to avoid SCHEDULE 
141        LastTSO = tso; */
142 #endif
143     
144     /* Link the new thread on the global thread list.
145      */
146     ACQUIRE_LOCK(&sched_mutex);
147     tso->id = next_thread_id++;  // while we have the mutex
148     tso->global_link = g0s0->threads;
149     g0s0->threads = tso;
150     RELEASE_LOCK(&sched_mutex);
151     
152 #if defined(DIST)
153     tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155     
156 #if defined(GRAN)
157     tso->gran.pri = pri;
158 # if defined(DEBUG)
159     tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161     tso->gran.sparkname   = 0;
162     tso->gran.startedat   = CURRENT_TIME; 
163     tso->gran.exported    = 0;
164     tso->gran.basicblocks = 0;
165     tso->gran.allocs      = 0;
166     tso->gran.exectime    = 0;
167     tso->gran.fetchtime   = 0;
168     tso->gran.fetchcount  = 0;
169     tso->gran.blocktime   = 0;
170     tso->gran.blockcount  = 0;
171     tso->gran.blockedat   = 0;
172     tso->gran.globalsparks = 0;
173     tso->gran.localsparks  = 0;
174     if (RtsFlags.GranFlags.Light)
175         tso->gran.clock  = Now; /* local clock */
176     else
177         tso->gran.clock  = 0;
178     
179     IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182     tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184     tso->par.sparkname   = 0;
185     tso->par.startedat   = CURRENT_TIME; 
186     tso->par.exported    = 0;
187     tso->par.basicblocks = 0;
188     tso->par.allocs      = 0;
189     tso->par.exectime    = 0;
190     tso->par.fetchtime   = 0;
191     tso->par.fetchcount  = 0;
192     tso->par.blocktime   = 0;
193     tso->par.blockcount  = 0;
194     tso->par.blockedat   = 0;
195     tso->par.globalsparks = 0;
196     tso->par.localsparks  = 0;
197 #endif
198     
199 #if defined(GRAN)
200     globalGranStats.tot_threads_created++;
201     globalGranStats.threads_created_on_PE[CurrentProc]++;
202     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203     globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205     // collect parallel global statistics (currently done together with GC stats)
206     if (RtsFlags.ParFlags.ParStats.Global &&
207         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
209         globalParStats.tot_threads_created++;
210     }
211 #endif 
212     
213 #if defined(GRAN)
214     debugTrace(GRAN_DEBUG_pri,
215                "==__ schedule: Created TSO %d (%p);",
216                CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218     debugTrace(PAR_DEBUG_verbose,
219                "==__ schedule: Created TSO %d (%p); %d threads active",
220                (long)tso->id, tso, advisory_thread_count);
221 #else
222     debugTrace(DEBUG_sched,
223                "created thread %ld, stack size = %lx words", 
224                (long)tso->id, (long)tso->stack_size);
225 #endif    
226     return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231    all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark) 
235 { StgTSO *tso;
236   ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
239   { threadsIgnored++;
240     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
242     return END_TSO_QUEUE;
243   }
244   else
245   { threadsCreated++;
246     tso = createThread(RtsFlags.GcFlags.initialStkSize);
247     if (tso==END_TSO_QUEUE)     
248       barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250     tso->priority = AdvisoryPriority;
251 #endif
252     pushClosure(tso,spark);
253     addToRunQueue(tso);
254     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
255   }
256   return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261  * Comparing Thread ids.
262  *
263  * This is used from STG land in the implementation of the
264  * instances of Eq/Ord for ThreadIds.
265  * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2) 
269
270   StgThreadID id1 = ((StgTSO *)tso1)->id; 
271   StgThreadID id2 = ((StgTSO *)tso2)->id;
272  
273   if (id1 < id2) return (-1);
274   if (id1 > id2) return 1;
275   return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279  * Fetching the ThreadID from an StgTSO.
280  *
281  * This is used in the implementation of Show for ThreadIds.
282  * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso) 
285 {
286   return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290    Remove a thread from a queue.
291    Fails fatally if the TSO is not on the queue.
292    -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
296 {
297     StgTSO *t, *prev;
298
299     prev = NULL;
300     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
301         if (t == tso) {
302             if (prev) {
303                 setTSOLink(cap,prev,t->_link);
304             } else {
305                 *queue = t->_link;
306             }
307             return;
308         }
309     }
310     barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (Capability *cap, 
315                          StgTSO **head, StgTSO **tail, StgTSO *tso)
316 {
317     StgTSO *t, *prev;
318
319     prev = NULL;
320     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
321         if (t == tso) {
322             if (prev) {
323                 setTSOLink(cap,prev,t->_link);
324             } else {
325                 *head = t->_link;
326             }
327             if (*tail == tso) {
328                 if (prev) {
329                     *tail = prev;
330                 } else {
331                     *tail = END_TSO_QUEUE;
332                 }
333             }
334             return;
335         }
336     }
337     barf("removeThreadFromMVarQueue: not found");
338 }
339
340 void
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
342 {
343     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
344 }
345
346 /* ----------------------------------------------------------------------------
347    unblockOne()
348
349    unblock a single thread.
350    ------------------------------------------------------------------------- */
351
352 #if defined(GRAN)
353 STATIC_INLINE void
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
355 {
356 }
357 #elif defined(PARALLEL_HASKELL)
358 STATIC_INLINE void
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 {
361   /* write RESUME events to log file and
362      update blocked and fetch time (depending on type of the orig closure) */
363   if (RtsFlags.ParFlags.ParStats.Full) {
364     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
365                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
367     if (emptyRunQueue())
368       emitSchedule = rtsTrue;
369
370     switch (get_itbl(node)->type) {
371         case FETCH_ME_BQ:
372           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
373           break;
374         case RBH:
375         case FETCH_ME:
376         case BLACKHOLE_BQ:
377           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
378           break;
379 #ifdef DIST
380         case MVAR:
381           break;
382 #endif    
383         default:
384           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
385         }
386       }
387 }
388 #endif
389
390 #if defined(GRAN)
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
393 {
394     StgTSO *tso;
395     PEs node_loc, tso_loc;
396
397     node_loc = where_is(node); // should be lifted out of loop
398     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
399     tso_loc = where_is((StgClosure *)tso);
400     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404       // insertThread(tso, node_loc);
405       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
406                 ResumeThread,
407                 tso, node, (rtsSpark*)NULL);
408       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
409       // len_local++;
410       // len++;
411     } else { // TSO is remote (actually should be FMBQ)
412       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413                                   RtsFlags.GranFlags.Costs.gunblocktime +
414                                   RtsFlags.GranFlags.Costs.latency;
415       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
416                 UnblockThread,
417                 tso, node, (rtsSpark*)NULL);
418       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
419       // len++;
420     }
421     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
422     IF_GRAN_DEBUG(bq,
423                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424                           (node_loc==tso_loc ? "Local" : "Global"), 
425                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426     tso->block_info.closure = NULL;
427     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", 
428                tso->id, tso));
429 }
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
433 {
434     StgBlockingQueueElement *next;
435
436     switch (get_itbl(bqe)->type) {
437     case TSO:
438       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439       /* if it's a TSO just push it onto the run_queue */
440       next = bqe->link;
441       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
443       threadRunnable();
444       unblockCount(bqe, node);
445       /* reset blocking status after dumping event */
446       ((StgTSO *)bqe)->why_blocked = NotBlocked;
447       break;
448
449     case BLOCKED_FETCH:
450       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
451       next = bqe->link;
452       bqe->link = (StgBlockingQueueElement *)PendingFetches;
453       PendingFetches = (StgBlockedFetch *)bqe;
454       break;
455
456 # if defined(DEBUG)
457       /* can ignore this case in a non-debugging setup; 
458          see comments on RBHSave closures above */
459     case CONSTR:
460       /* check that the closure is an RBHSave closure */
461       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
464       break;
465
466     default:
467       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
469            (StgClosure *)bqe);
470 # endif
471     }
472   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
473   return next;
474 }
475 #endif
476
477 StgTSO *
478 unblockOne (Capability *cap, StgTSO *tso)
479 {
480     return unblockOne_(cap,tso,rtsTrue); // allow migration
481 }
482
483 StgTSO *
484 unblockOne_ (Capability *cap, StgTSO *tso, 
485              rtsBool allow_migrate USED_IF_THREADS)
486 {
487   StgTSO *next;
488
489   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490   ASSERT(tso->why_blocked != NotBlocked);
491
492   tso->why_blocked = NotBlocked;
493   next = tso->_link;
494   tso->_link = END_TSO_QUEUE;
495
496 #if defined(THREADED_RTS)
497   if (tso->cap == cap || (!tsoLocked(tso) && 
498                           allow_migrate && 
499                           RtsFlags.ParFlags.wakeupMigrate)) {
500       // We are waking up this thread on the current Capability, which
501       // might involve migrating it from the Capability it was last on.
502       if (tso->bound) {
503           ASSERT(tso->bound->cap == tso->cap);
504           tso->bound->cap = cap;
505       }
506       tso->cap = cap;
507       appendToRunQueue(cap,tso);
508       // we're holding a newly woken thread, make sure we context switch
509       // quickly so we can migrate it if necessary.
510       context_switch = 1;
511   } else {
512       // we'll try to wake it up on the Capability it was last on.
513       wakeupThreadOnCapability_lock(tso->cap, tso);
514   }
515 #else
516   appendToRunQueue(cap,tso);
517   context_switch = 1;
518 #endif
519
520   debugTrace(DEBUG_sched,
521              "waking up thread %ld on cap %d",
522              (long)tso->id, tso->cap->no);
523
524   return next;
525 }
526
527 /* ----------------------------------------------------------------------------
528    awakenBlockedQueue
529
530    wakes up all the threads on the specified queue.
531    ------------------------------------------------------------------------- */
532
533 #if defined(GRAN)
534 void
535 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
536 {
537   StgBlockingQueueElement *bqe;
538   PEs node_loc;
539   nat len = 0; 
540
541   IF_GRAN_DEBUG(bq, 
542                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
543                       node, CurrentProc, CurrentTime[CurrentProc], 
544                       CurrentTSO->id, CurrentTSO));
545
546   node_loc = where_is(node);
547
548   ASSERT(q == END_BQ_QUEUE ||
549          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
550          get_itbl(q)->type == CONSTR); // closure (type constructor)
551   ASSERT(is_unique(node));
552
553   /* FAKE FETCH: magically copy the node to the tso's proc;
554      no Fetch necessary because in reality the node should not have been 
555      moved to the other PE in the first place
556   */
557   if (CurrentProc!=node_loc) {
558     IF_GRAN_DEBUG(bq, 
559                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
560                         node, node_loc, CurrentProc, CurrentTSO->id, 
561                         // CurrentTSO, where_is(CurrentTSO),
562                         node->header.gran.procs));
563     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
564     IF_GRAN_DEBUG(bq, 
565                   debugBelch("## new bitmask of node %p is %#x\n",
566                         node, node->header.gran.procs));
567     if (RtsFlags.GranFlags.GranSimStats.Global) {
568       globalGranStats.tot_fake_fetches++;
569     }
570   }
571
572   bqe = q;
573   // ToDo: check: ASSERT(CurrentProc==node_loc);
574   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
575     //next = bqe->link;
576     /* 
577        bqe points to the current element in the queue
578        next points to the next element in the queue
579     */
580     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
581     //tso_loc = where_is(tso);
582     len++;
583     bqe = unblockOne(bqe, node);
584   }
585
586   /* if this is the BQ of an RBH, we have to put back the info ripped out of
587      the closure to make room for the anchor of the BQ */
588   if (bqe!=END_BQ_QUEUE) {
589     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
590     /*
591     ASSERT((info_ptr==&RBH_Save_0_info) ||
592            (info_ptr==&RBH_Save_1_info) ||
593            (info_ptr==&RBH_Save_2_info));
594     */
595     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
596     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
597     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
598
599     IF_GRAN_DEBUG(bq,
600                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
601                         node, info_type(node)));
602   }
603
604   /* statistics gathering */
605   if (RtsFlags.GranFlags.GranSimStats.Global) {
606     // globalGranStats.tot_bq_processing_time += bq_processing_time;
607     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
608     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
609     globalGranStats.tot_awbq++;             // total no. of bqs awakened
610   }
611   IF_GRAN_DEBUG(bq,
612                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
613                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
614 }
615 #elif defined(PARALLEL_HASKELL)
616 void 
617 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
618 {
619   StgBlockingQueueElement *bqe;
620
621   IF_PAR_DEBUG(verbose, 
622                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
623                      node, mytid));
624 #ifdef DIST  
625   //RFP
626   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
627     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
628     return;
629   }
630 #endif
631   
632   ASSERT(q == END_BQ_QUEUE ||
633          get_itbl(q)->type == TSO ||           
634          get_itbl(q)->type == BLOCKED_FETCH || 
635          get_itbl(q)->type == CONSTR); 
636
637   bqe = q;
638   while (get_itbl(bqe)->type==TSO || 
639          get_itbl(bqe)->type==BLOCKED_FETCH) {
640     bqe = unblockOne(bqe, node);
641   }
642 }
643
644 #else   /* !GRAN && !PARALLEL_HASKELL */
645
646 void
647 awakenBlockedQueue(Capability *cap, StgTSO *tso)
648 {
649     while (tso != END_TSO_QUEUE) {
650         tso = unblockOne(cap,tso);
651     }
652 }
653 #endif
654
655
656 /* ---------------------------------------------------------------------------
657  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
658  * used by Control.Concurrent for error checking.
659  * ------------------------------------------------------------------------- */
660  
661 HsBool
662 rtsSupportsBoundThreads(void)
663 {
664 #if defined(THREADED_RTS)
665   return HS_BOOL_TRUE;
666 #else
667   return HS_BOOL_FALSE;
668 #endif
669 }
670
671 /* ---------------------------------------------------------------------------
672  * isThreadBound(tso): check whether tso is bound to an OS thread.
673  * ------------------------------------------------------------------------- */
674  
675 StgBool
676 isThreadBound(StgTSO* tso USED_IF_THREADS)
677 {
678 #if defined(THREADED_RTS)
679   return (tso->bound != NULL);
680 #endif
681   return rtsFalse;
682 }
683
684 /* ----------------------------------------------------------------------------
685  * Debugging: why is a thread blocked
686  * ------------------------------------------------------------------------- */
687
688 #if DEBUG
689 void
690 printThreadBlockage(StgTSO *tso)
691 {
692   switch (tso->why_blocked) {
693   case BlockedOnRead:
694     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
695     break;
696   case BlockedOnWrite:
697     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
698     break;
699 #if defined(mingw32_HOST_OS)
700     case BlockedOnDoProc:
701     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
702     break;
703 #endif
704   case BlockedOnDelay:
705     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
706     break;
707   case BlockedOnMVar:
708     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
709     break;
710   case BlockedOnException:
711     debugBelch("is blocked on delivering an exception to thread %lu",
712                (unsigned long)tso->block_info.tso->id);
713     break;
714   case BlockedOnBlackHole:
715     debugBelch("is blocked on a black hole");
716     break;
717   case NotBlocked:
718     debugBelch("is not blocked");
719     break;
720 #if defined(PARALLEL_HASKELL)
721   case BlockedOnGA:
722     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
723             tso->block_info.closure, info_type(tso->block_info.closure));
724     break;
725   case BlockedOnGA_NoSend:
726     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
727             tso->block_info.closure, info_type(tso->block_info.closure));
728     break;
729 #endif
730   case BlockedOnCCall:
731     debugBelch("is blocked on an external call");
732     break;
733   case BlockedOnCCall_NoUnblockExc:
734     debugBelch("is blocked on an external call (exceptions were already blocked)");
735     break;
736   case BlockedOnSTM:
737     debugBelch("is blocked on an STM operation");
738     break;
739   default:
740     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
741          tso->why_blocked, tso->id, tso);
742   }
743 }
744
745 void
746 printThreadStatus(StgTSO *t)
747 {
748   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
749     {
750       void *label = lookupThreadLabel(t->id);
751       if (label) debugBelch("[\"%s\"] ",(char *)label);
752     }
753     if (t->what_next == ThreadRelocated) {
754         debugBelch("has been relocated...\n");
755     } else {
756         switch (t->what_next) {
757         case ThreadKilled:
758             debugBelch("has been killed");
759             break;
760         case ThreadComplete:
761             debugBelch("has completed");
762             break;
763         default:
764             printThreadBlockage(t);
765         }
766         debugBelch("\n");
767     }
768 }
769
770 void
771 printAllThreads(void)
772 {
773   StgTSO *t, *next;
774   nat i, s;
775   Capability *cap;
776
777 # if defined(GRAN)
778   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
779   ullong_format_string(TIME_ON_PROC(CurrentProc), 
780                        time_string, rtsFalse/*no commas!*/);
781
782   debugBelch("all threads at [%s]:\n", time_string);
783 # elif defined(PARALLEL_HASKELL)
784   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
785   ullong_format_string(CURRENT_TIME,
786                        time_string, rtsFalse/*no commas!*/);
787
788   debugBelch("all threads at [%s]:\n", time_string);
789 # else
790   debugBelch("all threads:\n");
791 # endif
792
793   for (i = 0; i < n_capabilities; i++) {
794       cap = &capabilities[i];
795       debugBelch("threads on capability %d:\n", cap->no);
796       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
797           printThreadStatus(t);
798       }
799   }
800
801   debugBelch("other threads:\n");
802   for (s = 0; s < total_steps; s++) {
803     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
804       if (t->why_blocked != NotBlocked) {
805           printThreadStatus(t);
806       }
807       if (t->what_next == ThreadRelocated) {
808           next = t->_link;
809       } else {
810           next = t->global_link;
811       }
812     }
813   }
814 }
815
816 // useful from gdb
817 void 
818 printThreadQueue(StgTSO *t)
819 {
820     nat i = 0;
821     for (; t != END_TSO_QUEUE; t = t->_link) {
822         printThreadStatus(t);
823         i++;
824     }
825     debugBelch("%d threads on queue\n", i);
826 }
827
828 /* 
829    Print a whole blocking queue attached to node (debugging only).
830 */
831 # if defined(PARALLEL_HASKELL)
832 void 
833 print_bq (StgClosure *node)
834 {
835   StgBlockingQueueElement *bqe;
836   StgTSO *tso;
837   rtsBool end;
838
839   debugBelch("## BQ of closure %p (%s): ",
840           node, info_type(node));
841
842   /* should cover all closures that may have a blocking queue */
843   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
844          get_itbl(node)->type == FETCH_ME_BQ ||
845          get_itbl(node)->type == RBH ||
846          get_itbl(node)->type == MVAR);
847     
848   ASSERT(node!=(StgClosure*)NULL);         // sanity check
849
850   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
851 }
852
853 /* 
854    Print a whole blocking queue starting with the element bqe.
855 */
856 void 
857 print_bqe (StgBlockingQueueElement *bqe)
858 {
859   rtsBool end;
860
861   /* 
862      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
863   */
864   for (end = (bqe==END_BQ_QUEUE);
865        !end; // iterate until bqe points to a CONSTR
866        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
867        bqe = end ? END_BQ_QUEUE : bqe->link) {
868     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
869     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
870     /* types of closures that may appear in a blocking queue */
871     ASSERT(get_itbl(bqe)->type == TSO ||           
872            get_itbl(bqe)->type == BLOCKED_FETCH || 
873            get_itbl(bqe)->type == CONSTR); 
874     /* only BQs of an RBH end with an RBH_Save closure */
875     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
876
877     switch (get_itbl(bqe)->type) {
878     case TSO:
879       debugBelch(" TSO %u (%x),",
880               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
881       break;
882     case BLOCKED_FETCH:
883       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
884               ((StgBlockedFetch *)bqe)->node, 
885               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
886               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
887               ((StgBlockedFetch *)bqe)->ga.weight);
888       break;
889     case CONSTR:
890       debugBelch(" %s (IP %p),",
891               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
892                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
893                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
894                "RBH_Save_?"), get_itbl(bqe));
895       break;
896     default:
897       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
898            info_type((StgClosure *)bqe)); // , node, info_type(node));
899       break;
900     }
901   } /* for */
902   debugBelch("\n");
903 }
904 # elif defined(GRAN)
905 void 
906 print_bq (StgClosure *node)
907 {
908   StgBlockingQueueElement *bqe;
909   PEs node_loc, tso_loc;
910   rtsBool end;
911
912   /* should cover all closures that may have a blocking queue */
913   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
914          get_itbl(node)->type == FETCH_ME_BQ ||
915          get_itbl(node)->type == RBH);
916     
917   ASSERT(node!=(StgClosure*)NULL);         // sanity check
918   node_loc = where_is(node);
919
920   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
921           node, info_type(node), node_loc);
922
923   /* 
924      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
925   */
926   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
927        !end; // iterate until bqe points to a CONSTR
928        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
929     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
930     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
931     /* types of closures that may appear in a blocking queue */
932     ASSERT(get_itbl(bqe)->type == TSO ||           
933            get_itbl(bqe)->type == CONSTR); 
934     /* only BQs of an RBH end with an RBH_Save closure */
935     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
936
937     tso_loc = where_is((StgClosure *)bqe);
938     switch (get_itbl(bqe)->type) {
939     case TSO:
940       debugBelch(" TSO %d (%p) on [PE %d],",
941               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
942       break;
943     case CONSTR:
944       debugBelch(" %s (IP %p),",
945               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
946                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
947                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
948                "RBH_Save_?"), get_itbl(bqe));
949       break;
950     default:
951       barf("Unexpected closure type %s in blocking queue of %p (%s)",
952            info_type((StgClosure *)bqe), node, info_type(node));
953       break;
954     }
955   } /* for */
956   debugBelch("\n");
957 }
958 # endif
959
960 #if defined(PARALLEL_HASKELL)
961 nat
962 run_queue_len(void)
963 {
964     nat i;
965     StgTSO *tso;
966     
967     for (i=0, tso=run_queue_hd; 
968          tso != END_TSO_QUEUE;
969          i++, tso=tso->link) {
970         /* nothing */
971     }
972         
973     return i;
974 }
975 #endif
976
977 #endif /* DEBUG */