Add a couple of missing tests for EAGER_BLACKHOLE
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 return rtsFalse;
167             } else {
168                 *queue = t->_link;
169                 return rtsTrue;
170             }
171         }
172     }
173     barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap, 
178                          StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180     StgTSO *t, *prev;
181     rtsBool flag = rtsFalse;
182
183     prev = NULL;
184     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185         if (t == tso) {
186             if (prev) {
187                 setTSOLink(cap,prev,t->_link);
188                 flag = rtsFalse;
189             } else {
190                 *head = t->_link;
191                 flag = rtsTrue;
192             }
193             if (*tail == tso) {
194                 if (prev) {
195                     *tail = prev;
196                 } else {
197                     *tail = END_TSO_QUEUE;
198                 }
199                 return rtsTrue;
200             } else {
201                 return flag;
202             }
203         }
204     }
205     barf("removeThreadFromMVarQueue: not found");
206 }
207
208 /* ----------------------------------------------------------------------------
209    tryWakeupThread()
210
211    Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
212    always safe to call it too many times, but it is not safe in
213    general to omit a call.
214
215    ------------------------------------------------------------------------- */
216
217 void
218 tryWakeupThread (Capability *cap, StgTSO *tso)
219 {
220     tryWakeupThread_(cap, deRefTSO(tso));
221 }
222
223 void
224 tryWakeupThread_ (Capability *cap, StgTSO *tso)
225 {
226     traceEventThreadWakeup (cap, tso, tso->cap->no);
227
228 #ifdef THREADED_RTS
229     if (tso->cap != cap)
230     {
231         MessageWakeup *msg;
232         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
233         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
234         msg->tso = tso;
235         sendMessage(cap, tso->cap, (Message*)msg);
236         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
237                       (lnat)tso->id, tso->cap->no);
238         return;
239     }
240 #endif
241
242     switch (tso->why_blocked)
243     {
244     case BlockedOnMVar:
245     {
246         if (tso->_link == END_TSO_QUEUE) {
247             tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
248             goto unblock;
249         } else {
250             return;
251         }
252     }
253
254     case BlockedOnMsgThrowTo:
255     {
256         const StgInfoTable *i;
257         
258         i = lockClosure(tso->block_info.closure);
259         unlockClosure(tso->block_info.closure, i);
260         if (i != &stg_MSG_NULL_info) {
261             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
262                           (lnat)tso->id, tso->block_info.throwto->header.info);
263             return;
264         }
265
266         // remove the block frame from the stack
267         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
268         tso->sp += 3;
269         goto unblock;
270     }
271
272     case BlockedOnBlackHole:
273     case BlockedOnSTM:
274     case ThreadMigrating:
275         goto unblock;
276
277     default:
278         // otherwise, do nothing
279         return;
280     }
281
282 unblock:
283     // just run the thread now, if the BH is not really available,
284     // we'll block again.
285     tso->why_blocked = NotBlocked;
286     appendToRunQueue(cap,tso);
287 }
288
289 /* ----------------------------------------------------------------------------
290    migrateThread
291    ------------------------------------------------------------------------- */
292
293 void
294 migrateThread (Capability *from, StgTSO *tso, Capability *to)
295 {
296     traceEventMigrateThread (from, tso, to->no);
297     // ThreadMigrating tells the target cap that it needs to be added to
298     // the run queue when it receives the MSG_TRY_WAKEUP.
299     tso->why_blocked = ThreadMigrating;
300     tso->cap = to;
301     tryWakeupThread(from, tso);
302 }
303
304 /* ----------------------------------------------------------------------------
305    awakenBlockedQueue
306
307    wakes up all the threads on the specified queue.
308    ------------------------------------------------------------------------- */
309
310 void
311 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
312 {
313     MessageBlackHole *msg;
314     const StgInfoTable *i;
315
316     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
317            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
318
319     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
320          msg = msg->link) {
321         i = msg->header.info;
322         if (i != &stg_IND_info) {
323             ASSERT(i == &stg_MSG_BLACKHOLE_info);
324             tryWakeupThread(cap,msg->tso);
325         }
326     }
327
328     // overwrite the BQ with an indirection so it will be
329     // collected at the next GC.
330 #if defined(DEBUG) && !defined(THREADED_RTS)
331     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
332     // another thread might be looking at this BLOCKING_QUEUE and
333     // checking the owner field at the same time.
334     bq->bh = 0; bq->queue = 0; bq->owner = 0;
335 #endif
336     OVERWRITE_INFO(bq, &stg_IND_info);
337 }
338
339 // If we update a closure that we know we BLACKHOLE'd, and the closure
340 // no longer points to the current TSO as its owner, then there may be
341 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
342 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
343 // current TSO to see if any can now be woken up.
344 void
345 checkBlockingQueues (Capability *cap, StgTSO *tso)
346 {
347     StgBlockingQueue *bq, *next;
348     StgClosure *p;
349
350     debugTraceCap(DEBUG_sched, cap,
351                   "collision occurred; checking blocking queues for thread %ld",
352                   (lnat)tso->id);
353     
354     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
355         next = bq->link;
356
357         if (bq->header.info == &stg_IND_info) {
358             // ToDo: could short it out right here, to avoid
359             // traversing this IND multiple times.
360             continue;
361         }
362         
363         p = bq->bh;
364
365         if (p->header.info != &stg_BLACKHOLE_info ||
366             ((StgInd *)p)->indirectee != (StgClosure*)bq)
367         {
368             wakeBlockingQueue(cap,bq);
369         }   
370     }
371 }
372
373 /* ----------------------------------------------------------------------------
374    updateThunk
375
376    Update a thunk with a value.  In order to do this, we need to know
377    which TSO owns (or is evaluating) the thunk, in case we need to
378    awaken any threads that are blocked on it.
379    ------------------------------------------------------------------------- */
380
381 void
382 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
383 {
384     StgClosure *v;
385     StgTSO *owner;
386     const StgInfoTable *i;
387
388     i = thunk->header.info;
389     if (i != &stg_BLACKHOLE_info &&
390         i != &stg_CAF_BLACKHOLE_info &&
391         i != &__stg_EAGER_BLACKHOLE_info &&
392         i != &stg_WHITEHOLE_info) {
393         updateWithIndirection(cap, thunk, val);
394         return;
395     }
396     
397     v = ((StgInd*)thunk)->indirectee;
398
399     updateWithIndirection(cap, thunk, val);
400
401     i = v->header.info;
402     if (i == &stg_TSO_info) {
403         owner = deRefTSO((StgTSO*)v);
404         if (owner != tso) {
405             checkBlockingQueues(cap, tso);
406         }
407         return;
408     }
409
410     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
411         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
412         checkBlockingQueues(cap, tso);
413         return;
414     }
415
416     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
417
418     if (owner != tso) {
419         checkBlockingQueues(cap, tso);
420     } else {
421         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
422     }
423 }
424
425 /* ---------------------------------------------------------------------------
426  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
427  * used by Control.Concurrent for error checking.
428  * ------------------------------------------------------------------------- */
429  
430 HsBool
431 rtsSupportsBoundThreads(void)
432 {
433 #if defined(THREADED_RTS)
434   return HS_BOOL_TRUE;
435 #else
436   return HS_BOOL_FALSE;
437 #endif
438 }
439
440 /* ---------------------------------------------------------------------------
441  * isThreadBound(tso): check whether tso is bound to an OS thread.
442  * ------------------------------------------------------------------------- */
443  
444 StgBool
445 isThreadBound(StgTSO* tso USED_IF_THREADS)
446 {
447 #if defined(THREADED_RTS)
448   return (tso->bound != NULL);
449 #endif
450   return rtsFalse;
451 }
452
453 /* ----------------------------------------------------------------------------
454  * Debugging: why is a thread blocked
455  * ------------------------------------------------------------------------- */
456
457 #if DEBUG
458 void
459 printThreadBlockage(StgTSO *tso)
460 {
461   switch (tso->why_blocked) {
462   case BlockedOnRead:
463     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
464     break;
465   case BlockedOnWrite:
466     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
467     break;
468 #if defined(mingw32_HOST_OS)
469     case BlockedOnDoProc:
470     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
471     break;
472 #endif
473   case BlockedOnDelay:
474     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
475     break;
476   case BlockedOnMVar:
477     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
478     break;
479   case BlockedOnBlackHole:
480       debugBelch("is blocked on a black hole %p", 
481                  ((StgBlockingQueue*)tso->block_info.bh->bh));
482     break;
483   case BlockedOnMsgThrowTo:
484     debugBelch("is blocked on a throwto message");
485     break;
486   case NotBlocked:
487     debugBelch("is not blocked");
488     break;
489   case ThreadMigrating:
490     debugBelch("is runnable, but not on the run queue");
491     break;
492   case BlockedOnCCall:
493     debugBelch("is blocked on an external call");
494     break;
495   case BlockedOnCCall_NoUnblockExc:
496     debugBelch("is blocked on an external call (exceptions were already blocked)");
497     break;
498   case BlockedOnSTM:
499     debugBelch("is blocked on an STM operation");
500     break;
501   default:
502     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
503          tso->why_blocked, tso->id, tso);
504   }
505 }
506
507
508 void
509 printThreadStatus(StgTSO *t)
510 {
511   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
512     {
513       void *label = lookupThreadLabel(t->id);
514       if (label) debugBelch("[\"%s\"] ",(char *)label);
515     }
516     if (t->what_next == ThreadRelocated) {
517         debugBelch("has been relocated...\n");
518     } else {
519         switch (t->what_next) {
520         case ThreadKilled:
521             debugBelch("has been killed");
522             break;
523         case ThreadComplete:
524             debugBelch("has completed");
525             break;
526         default:
527             printThreadBlockage(t);
528         }
529         if (t->dirty) {
530             debugBelch(" (TSO_DIRTY)");
531         } else if (t->flags & TSO_LINK_DIRTY) {
532             debugBelch(" (TSO_LINK_DIRTY)");
533         }
534         debugBelch("\n");
535     }
536 }
537
538 void
539 printAllThreads(void)
540 {
541   StgTSO *t, *next;
542   nat i, g;
543   Capability *cap;
544
545   debugBelch("all threads:\n");
546
547   for (i = 0; i < n_capabilities; i++) {
548       cap = &capabilities[i];
549       debugBelch("threads on capability %d:\n", cap->no);
550       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
551           printThreadStatus(t);
552       }
553   }
554
555   debugBelch("other threads:\n");
556   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
557     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
558       if (t->why_blocked != NotBlocked) {
559           printThreadStatus(t);
560       }
561       if (t->what_next == ThreadRelocated) {
562           next = t->_link;
563       } else {
564           next = t->global_link;
565       }
566     }
567   }
568 }
569
570 // useful from gdb
571 void 
572 printThreadQueue(StgTSO *t)
573 {
574     nat i = 0;
575     for (; t != END_TSO_QUEUE; t = t->_link) {
576         printThreadStatus(t);
577         i++;
578     }
579     debugBelch("%d threads on queue\n", i);
580 }
581
582 #endif /* DEBUG */