0dee734baa0edfc15903a437c5e191c266c33067
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 return rtsFalse;
167             } else {
168                 *queue = t->_link;
169                 return rtsTrue;
170             }
171         }
172     }
173     barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap, 
178                          StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180     StgTSO *t, *prev;
181     rtsBool flag = rtsFalse;
182
183     prev = NULL;
184     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185         if (t == tso) {
186             if (prev) {
187                 setTSOLink(cap,prev,t->_link);
188                 flag = rtsFalse;
189             } else {
190                 *head = t->_link;
191                 flag = rtsTrue;
192             }
193             if (*tail == tso) {
194                 if (prev) {
195                     *tail = prev;
196                 } else {
197                     *tail = END_TSO_QUEUE;
198                 }
199                 return rtsTrue;
200             } else {
201                 return flag;
202             }
203         }
204     }
205     barf("removeThreadFromMVarQueue: not found");
206 }
207
208 /* ----------------------------------------------------------------------------
209    tryWakeupThread()
210
211    Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
212    always safe to call it too many times, but it is not safe in
213    general to omit a call.
214
215    ------------------------------------------------------------------------- */
216
217 void
218 tryWakeupThread (Capability *cap, StgTSO *tso)
219 {
220     tryWakeupThread_(cap, deRefTSO(tso));
221 }
222
223 void
224 tryWakeupThread_ (Capability *cap, StgTSO *tso)
225 {
226     traceEventThreadWakeup (cap, tso, tso->cap->no);
227
228 #ifdef THREADED_RTS
229     if (tso->cap != cap)
230     {
231         MessageWakeup *msg;
232         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
233         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
234         msg->tso = tso;
235         sendMessage(cap, tso->cap, (Message*)msg);
236         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
237                       (lnat)tso->id, tso->cap->no);
238         return;
239     }
240 #endif
241
242     switch (tso->why_blocked)
243     {
244     case BlockedOnMVar:
245     {
246         if (tso->_link == END_TSO_QUEUE) {
247             tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
248             goto unblock;
249         } else {
250             return;
251         }
252     }
253
254     case BlockedOnMsgThrowTo:
255     {
256         const StgInfoTable *i;
257         
258         i = lockClosure(tso->block_info.closure);
259         unlockClosure(tso->block_info.closure, i);
260         if (i != &stg_MSG_NULL_info) {
261             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
262                           (lnat)tso->id, tso->block_info.throwto->header.info);
263             return;
264         }
265
266         // remove the block frame from the stack
267         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
268         tso->sp += 3;
269         goto unblock;
270     }
271
272     case BlockedOnBlackHole:
273     case BlockedOnSTM:
274     case ThreadMigrating:
275         goto unblock;
276
277     default:
278         // otherwise, do nothing
279         return;
280     }
281
282 unblock:
283     // just run the thread now, if the BH is not really available,
284     // we'll block again.
285     tso->why_blocked = NotBlocked;
286     appendToRunQueue(cap,tso);
287
288     // We used to set the context switch flag here, which would
289     // trigger a context switch a short time in the future (at the end
290     // of the current nursery block).  The idea is that we have just
291     // woken up a thread, so we may need to load-balance and migrate
292     // threads to other CPUs.  On the other hand, setting the context
293     // switch flag here unfairly penalises the current thread by
294     // yielding its time slice too early.
295     //
296     // The synthetic benchmark nofib/smp/chan can be used to show the
297     // difference quite clearly.
298
299     // cap->context_switch = 1;
300 }
301
302 /* ----------------------------------------------------------------------------
303    migrateThread
304    ------------------------------------------------------------------------- */
305
306 void
307 migrateThread (Capability *from, StgTSO *tso, Capability *to)
308 {
309     traceEventMigrateThread (from, tso, to->no);
310     // ThreadMigrating tells the target cap that it needs to be added to
311     // the run queue when it receives the MSG_TRY_WAKEUP.
312     tso->why_blocked = ThreadMigrating;
313     tso->cap = to;
314     tryWakeupThread(from, tso);
315 }
316
317 /* ----------------------------------------------------------------------------
318    awakenBlockedQueue
319
320    wakes up all the threads on the specified queue.
321    ------------------------------------------------------------------------- */
322
323 void
324 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
325 {
326     MessageBlackHole *msg;
327     const StgInfoTable *i;
328
329     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
330            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
331
332     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
333          msg = msg->link) {
334         i = msg->header.info;
335         if (i != &stg_IND_info) {
336             ASSERT(i == &stg_MSG_BLACKHOLE_info);
337             tryWakeupThread(cap,msg->tso);
338         }
339     }
340
341     // overwrite the BQ with an indirection so it will be
342     // collected at the next GC.
343 #if defined(DEBUG) && !defined(THREADED_RTS)
344     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
345     // another thread might be looking at this BLOCKING_QUEUE and
346     // checking the owner field at the same time.
347     bq->bh = 0; bq->queue = 0; bq->owner = 0;
348 #endif
349     OVERWRITE_INFO(bq, &stg_IND_info);
350 }
351
352 // If we update a closure that we know we BLACKHOLE'd, and the closure
353 // no longer points to the current TSO as its owner, then there may be
354 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
355 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
356 // current TSO to see if any can now be woken up.
357 void
358 checkBlockingQueues (Capability *cap, StgTSO *tso)
359 {
360     StgBlockingQueue *bq, *next;
361     StgClosure *p;
362
363     debugTraceCap(DEBUG_sched, cap,
364                   "collision occurred; checking blocking queues for thread %ld",
365                   (lnat)tso->id);
366     
367     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
368         next = bq->link;
369
370         if (bq->header.info == &stg_IND_info) {
371             // ToDo: could short it out right here, to avoid
372             // traversing this IND multiple times.
373             continue;
374         }
375         
376         p = bq->bh;
377
378         if (p->header.info != &stg_BLACKHOLE_info ||
379             ((StgInd *)p)->indirectee != (StgClosure*)bq)
380         {
381             wakeBlockingQueue(cap,bq);
382         }   
383     }
384 }
385
386 /* ----------------------------------------------------------------------------
387    updateThunk
388
389    Update a thunk with a value.  In order to do this, we need to know
390    which TSO owns (or is evaluating) the thunk, in case we need to
391    awaken any threads that are blocked on it.
392    ------------------------------------------------------------------------- */
393
394 void
395 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
396 {
397     StgClosure *v;
398     StgTSO *owner;
399     const StgInfoTable *i;
400
401     i = thunk->header.info;
402     if (i != &stg_BLACKHOLE_info &&
403         i != &stg_CAF_BLACKHOLE_info &&
404         i != &__stg_EAGER_BLACKHOLE_info &&
405         i != &stg_WHITEHOLE_info) {
406         updateWithIndirection(cap, thunk, val);
407         return;
408     }
409     
410     v = ((StgInd*)thunk)->indirectee;
411
412     updateWithIndirection(cap, thunk, val);
413
414     i = v->header.info;
415     if (i == &stg_TSO_info) {
416         owner = deRefTSO((StgTSO*)v);
417         if (owner != tso) {
418             checkBlockingQueues(cap, tso);
419         }
420         return;
421     }
422
423     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
424         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
425         checkBlockingQueues(cap, tso);
426         return;
427     }
428
429     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
430
431     if (owner != tso) {
432         checkBlockingQueues(cap, tso);
433     } else {
434         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
435     }
436 }
437
438 /* ---------------------------------------------------------------------------
439  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
440  * used by Control.Concurrent for error checking.
441  * ------------------------------------------------------------------------- */
442  
443 HsBool
444 rtsSupportsBoundThreads(void)
445 {
446 #if defined(THREADED_RTS)
447   return HS_BOOL_TRUE;
448 #else
449   return HS_BOOL_FALSE;
450 #endif
451 }
452
453 /* ---------------------------------------------------------------------------
454  * isThreadBound(tso): check whether tso is bound to an OS thread.
455  * ------------------------------------------------------------------------- */
456  
457 StgBool
458 isThreadBound(StgTSO* tso USED_IF_THREADS)
459 {
460 #if defined(THREADED_RTS)
461   return (tso->bound != NULL);
462 #endif
463   return rtsFalse;
464 }
465
466 /* ----------------------------------------------------------------------------
467  * Debugging: why is a thread blocked
468  * ------------------------------------------------------------------------- */
469
470 #if DEBUG
471 void
472 printThreadBlockage(StgTSO *tso)
473 {
474   switch (tso->why_blocked) {
475   case BlockedOnRead:
476     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
477     break;
478   case BlockedOnWrite:
479     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
480     break;
481 #if defined(mingw32_HOST_OS)
482     case BlockedOnDoProc:
483     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
484     break;
485 #endif
486   case BlockedOnDelay:
487     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
488     break;
489   case BlockedOnMVar:
490     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
491     break;
492   case BlockedOnBlackHole:
493       debugBelch("is blocked on a black hole %p", 
494                  ((StgBlockingQueue*)tso->block_info.bh->bh));
495     break;
496   case BlockedOnMsgThrowTo:
497     debugBelch("is blocked on a throwto message");
498     break;
499   case NotBlocked:
500     debugBelch("is not blocked");
501     break;
502   case ThreadMigrating:
503     debugBelch("is runnable, but not on the run queue");
504     break;
505   case BlockedOnCCall:
506     debugBelch("is blocked on an external call");
507     break;
508   case BlockedOnCCall_Interruptible:
509     debugBelch("is blocked on an external call (but may be interrupted)");
510     break;
511   case BlockedOnSTM:
512     debugBelch("is blocked on an STM operation");
513     break;
514   default:
515     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
516          tso->why_blocked, tso->id, tso);
517   }
518 }
519
520
521 void
522 printThreadStatus(StgTSO *t)
523 {
524   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
525     {
526       void *label = lookupThreadLabel(t->id);
527       if (label) debugBelch("[\"%s\"] ",(char *)label);
528     }
529     if (t->what_next == ThreadRelocated) {
530         debugBelch("has been relocated...\n");
531     } else {
532         switch (t->what_next) {
533         case ThreadKilled:
534             debugBelch("has been killed");
535             break;
536         case ThreadComplete:
537             debugBelch("has completed");
538             break;
539         default:
540             printThreadBlockage(t);
541         }
542         if (t->dirty) {
543             debugBelch(" (TSO_DIRTY)");
544         } else if (t->flags & TSO_LINK_DIRTY) {
545             debugBelch(" (TSO_LINK_DIRTY)");
546         }
547         debugBelch("\n");
548     }
549 }
550
551 void
552 printAllThreads(void)
553 {
554   StgTSO *t, *next;
555   nat i, g;
556   Capability *cap;
557
558   debugBelch("all threads:\n");
559
560   for (i = 0; i < n_capabilities; i++) {
561       cap = &capabilities[i];
562       debugBelch("threads on capability %d:\n", cap->no);
563       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
564           printThreadStatus(t);
565       }
566   }
567
568   debugBelch("other threads:\n");
569   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
570     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
571       if (t->why_blocked != NotBlocked) {
572           printThreadStatus(t);
573       }
574       if (t->what_next == ThreadRelocated) {
575           next = t->_link;
576       } else {
577           next = t->global_link;
578       }
579     }
580   }
581 }
582
583 // useful from gdb
584 void 
585 printThreadQueue(StgTSO *t)
586 {
587     nat i = 0;
588     for (; t != END_TSO_QUEUE; t = t->_link) {
589         printThreadStatus(t);
590         i++;
591     }
592     debugBelch("%d threads on queue\n", i);
593 }
594
595 #endif /* DEBUG */