1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "RaiseAsync.h"
24 #include "sm/Sanity.h"
25 #include "sm/Storage.h"
29 /* Next thread ID to allocate.
32 static StgThreadID next_thread_id = 1;
34 /* The smallest stack size that makes any sense is:
35 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
36 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
37 * + 1 (the closure to enter)
39 * + 1 (spare slot req'd by stg_ap_v_ret)
41 * A thread with this stack will bomb immediately with a stack
42 * overflow, which will increase its stack size.
44 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
46 /* ---------------------------------------------------------------------------
49 The new thread starts with the given stack size. Before the
50 scheduler can run, however, this thread needs to have a closure
51 (and possibly some arguments) pushed on its stack. See
52 pushClosure() in Schedule.h.
54 createGenThread() and createIOThread() (in SchedAPI.h) are
55 convenient packaged versions of this function.
57 currently pri (priority) is only used in a GRAN setup -- HWL
58 ------------------------------------------------------------------------ */
60 createThread(Capability *cap, nat size)
66 /* sched_mutex is *not* required */
68 /* catch ridiculously small stack sizes */
69 if (size < MIN_STACK_WORDS + sizeofW(StgStack)) {
70 size = MIN_STACK_WORDS + sizeofW(StgStack);
73 /* The size argument we are given includes all the per-thread
79 * This is so that we can use a nice round power of 2 for the
80 * default stack size (e.g. 1k), and if we're allocating lots of
81 * threads back-to-back they'll fit nicely in a block. It's a bit
82 * of a benchmark hack, but it doesn't do any harm.
84 stack_size = round_to_mblocks(size - sizeofW(StgTSO));
85 stack = (StgStack *)allocate(cap, stack_size);
86 TICK_ALLOC_STACK(stack_size);
87 SET_HDR(stack, &stg_STACK_info, CCS_SYSTEM);
88 stack->stack_size = stack_size - sizeofW(StgStack);
89 stack->sp = stack->stack + stack->stack_size;
92 tso = (StgTSO *)allocate(cap, sizeofW(StgTSO));
94 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
96 // Always start with the compiled code evaluator
97 tso->what_next = ThreadRunGHC;
98 tso->why_blocked = NotBlocked;
99 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
100 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
101 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
104 tso->_link = END_TSO_QUEUE;
106 tso->saved_errno = 0;
110 tso->stackobj = stack;
111 tso->tot_stack_size = stack->stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 // put a stop frame on the stack
120 stack->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)stack->sp,
122 (StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
124 /* Link the new thread on the global thread list.
126 ACQUIRE_LOCK(&sched_mutex);
127 tso->id = next_thread_id++; // while we have the mutex
128 tso->global_link = g0->threads;
130 RELEASE_LOCK(&sched_mutex);
132 // ToDo: report the stack size in the event?
133 traceEventCreateThread(cap, tso);
138 /* ---------------------------------------------------------------------------
139 * Comparing Thread ids.
141 * This is used from STG land in the implementation of the
142 * instances of Eq/Ord for ThreadIds.
143 * ------------------------------------------------------------------------ */
146 cmp_thread(StgPtr tso1, StgPtr tso2)
148 StgThreadID id1 = ((StgTSO *)tso1)->id;
149 StgThreadID id2 = ((StgTSO *)tso2)->id;
151 if (id1 < id2) return (-1);
152 if (id1 > id2) return 1;
156 /* ---------------------------------------------------------------------------
157 * Fetching the ThreadID from an StgTSO.
159 * This is used in the implementation of Show for ThreadIds.
160 * ------------------------------------------------------------------------ */
162 rts_getThreadId(StgPtr tso)
164 return ((StgTSO *)tso)->id;
167 /* -----------------------------------------------------------------------------
168 Remove a thread from a queue.
169 Fails fatally if the TSO is not on the queue.
170 -------------------------------------------------------------------------- */
172 rtsBool // returns True if we modified queue
173 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
178 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
181 setTSOLink(cap,prev,t->_link);
182 t->_link = END_TSO_QUEUE;
186 t->_link = END_TSO_QUEUE;
191 barf("removeThreadFromQueue: not found");
194 rtsBool // returns True if we modified head or tail
195 removeThreadFromDeQueue (Capability *cap,
196 StgTSO **head, StgTSO **tail, StgTSO *tso)
199 rtsBool flag = rtsFalse;
202 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
205 setTSOLink(cap,prev,t->_link);
211 t->_link = END_TSO_QUEUE;
216 *tail = END_TSO_QUEUE;
224 barf("removeThreadFromMVarQueue: not found");
227 /* ----------------------------------------------------------------------------
230 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
231 always safe to call it too many times, but it is not safe in
232 general to omit a call.
234 ------------------------------------------------------------------------- */
237 tryWakeupThread (Capability *cap, StgTSO *tso)
239 traceEventThreadWakeup (cap, tso, tso->cap->no);
245 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
246 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
248 sendMessage(cap, tso->cap, (Message*)msg);
249 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
250 (lnat)tso->id, tso->cap->no);
255 switch (tso->why_blocked)
259 if (tso->_link == END_TSO_QUEUE) {
260 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
267 case BlockedOnMsgThrowTo:
269 const StgInfoTable *i;
271 i = lockClosure(tso->block_info.closure);
272 unlockClosure(tso->block_info.closure, i);
273 if (i != &stg_MSG_NULL_info) {
274 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
275 (lnat)tso->id, tso->block_info.throwto->header.info);
279 // remove the block frame from the stack
280 ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
281 tso->stackobj->sp += 3;
285 case BlockedOnBlackHole:
287 case ThreadMigrating:
291 // otherwise, do nothing
296 // just run the thread now, if the BH is not really available,
297 // we'll block again.
298 tso->why_blocked = NotBlocked;
299 appendToRunQueue(cap,tso);
301 // We used to set the context switch flag here, which would
302 // trigger a context switch a short time in the future (at the end
303 // of the current nursery block). The idea is that we have just
304 // woken up a thread, so we may need to load-balance and migrate
305 // threads to other CPUs. On the other hand, setting the context
306 // switch flag here unfairly penalises the current thread by
307 // yielding its time slice too early.
309 // The synthetic benchmark nofib/smp/chan can be used to show the
310 // difference quite clearly.
312 // cap->context_switch = 1;
315 /* ----------------------------------------------------------------------------
317 ------------------------------------------------------------------------- */
320 migrateThread (Capability *from, StgTSO *tso, Capability *to)
322 traceEventMigrateThread (from, tso, to->no);
323 // ThreadMigrating tells the target cap that it needs to be added to
324 // the run queue when it receives the MSG_TRY_WAKEUP.
325 tso->why_blocked = ThreadMigrating;
327 tryWakeupThread(from, tso);
330 /* ----------------------------------------------------------------------------
333 wakes up all the threads on the specified queue.
334 ------------------------------------------------------------------------- */
337 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
339 MessageBlackHole *msg;
340 const StgInfoTable *i;
342 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
343 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
345 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
347 i = msg->header.info;
348 if (i != &stg_IND_info) {
349 ASSERT(i == &stg_MSG_BLACKHOLE_info);
350 tryWakeupThread(cap,msg->tso);
354 // overwrite the BQ with an indirection so it will be
355 // collected at the next GC.
356 #if defined(DEBUG) && !defined(THREADED_RTS)
357 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
358 // another thread might be looking at this BLOCKING_QUEUE and
359 // checking the owner field at the same time.
360 bq->bh = 0; bq->queue = 0; bq->owner = 0;
362 OVERWRITE_INFO(bq, &stg_IND_info);
365 // If we update a closure that we know we BLACKHOLE'd, and the closure
366 // no longer points to the current TSO as its owner, then there may be
367 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
368 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
369 // current TSO to see if any can now be woken up.
371 checkBlockingQueues (Capability *cap, StgTSO *tso)
373 StgBlockingQueue *bq, *next;
376 debugTraceCap(DEBUG_sched, cap,
377 "collision occurred; checking blocking queues for thread %ld",
380 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
383 if (bq->header.info == &stg_IND_info) {
384 // ToDo: could short it out right here, to avoid
385 // traversing this IND multiple times.
391 if (p->header.info != &stg_BLACKHOLE_info ||
392 ((StgInd *)p)->indirectee != (StgClosure*)bq)
394 wakeBlockingQueue(cap,bq);
399 /* ----------------------------------------------------------------------------
402 Update a thunk with a value. In order to do this, we need to know
403 which TSO owns (or is evaluating) the thunk, in case we need to
404 awaken any threads that are blocked on it.
405 ------------------------------------------------------------------------- */
408 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
412 const StgInfoTable *i;
414 i = thunk->header.info;
415 if (i != &stg_BLACKHOLE_info &&
416 i != &stg_CAF_BLACKHOLE_info &&
417 i != &__stg_EAGER_BLACKHOLE_info &&
418 i != &stg_WHITEHOLE_info) {
419 updateWithIndirection(cap, thunk, val);
423 v = ((StgInd*)thunk)->indirectee;
425 updateWithIndirection(cap, thunk, val);
428 if (i == &stg_TSO_info) {
431 checkBlockingQueues(cap, tso);
436 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
437 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
438 checkBlockingQueues(cap, tso);
442 owner = ((StgBlockingQueue*)v)->owner;
445 checkBlockingQueues(cap, tso);
447 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
451 /* ---------------------------------------------------------------------------
452 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
453 * used by Control.Concurrent for error checking.
454 * ------------------------------------------------------------------------- */
457 rtsSupportsBoundThreads(void)
459 #if defined(THREADED_RTS)
462 return HS_BOOL_FALSE;
466 /* ---------------------------------------------------------------------------
467 * isThreadBound(tso): check whether tso is bound to an OS thread.
468 * ------------------------------------------------------------------------- */
471 isThreadBound(StgTSO* tso USED_IF_THREADS)
473 #if defined(THREADED_RTS)
474 return (tso->bound != NULL);
479 /* -----------------------------------------------------------------------------
482 If the thread has reached its maximum stack size, then raise the
483 StackOverflow exception in the offending thread. Otherwise
484 relocate the TSO into a larger chunk of memory and adjust its stack
486 -------------------------------------------------------------------------- */
489 threadStackOverflow (Capability *cap, StgTSO *tso)
491 StgStack *new_stack, *old_stack;
492 StgUnderflowFrame *frame;
494 IF_DEBUG(sanity,checkTSO(tso));
496 if (tso->tot_stack_size >= RtsFlags.GcFlags.maxStkSize
497 && !(tso->flags & TSO_BLOCKEX)) {
498 // NB. never raise a StackOverflow exception if the thread is
499 // inside Control.Exceptino.block. It is impractical to protect
500 // against stack overflow exceptions, since virtually anything
501 // can raise one (even 'catch'), so this is the only sensible
502 // thing to do here. See bug #767.
505 if (tso->flags & TSO_SQUEEZED) {
508 // #3677: In a stack overflow situation, stack squeezing may
509 // reduce the stack size, but we don't know whether it has been
510 // reduced enough for the stack check to succeed if we try
511 // again. Fortunately stack squeezing is idempotent, so all we
512 // need to do is record whether *any* squeezing happened. If we
513 // are at the stack's absolute -K limit, and stack squeezing
514 // happened, then we try running the thread again. The
515 // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
516 // squeezing happened or not.
519 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
520 (long)tso->id, tso, (long)tso->stackobj->stack_size,
521 RtsFlags.GcFlags.maxStkSize);
523 /* If we're debugging, just print out the top of the stack */
524 printStackChunk(tso->stackobj->sp,
525 stg_min(tso->stackobj->stack + tso->stackobj->stack_size,
526 tso->stackobj->sp+64)));
528 // Send this thread the StackOverflow exception
529 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
533 // We also want to avoid enlarging the stack if squeezing has
534 // already released some of it. However, we don't want to get into
535 // a pathalogical situation where a thread has a nearly full stack
536 // (near its current limit, but not near the absolute -K limit),
537 // keeps allocating a little bit, squeezing removes a little bit,
538 // and then it runs again. So to avoid this, if we squeezed *and*
539 // there is still less than BLOCK_SIZE_W words free, then we enlarge
541 if ((tso->flags & TSO_SQUEEZED) &&
542 ((W_)(tso->stackobj->sp - tso->stackobj->stack) >= BLOCK_SIZE_W)) {
546 debugTraceCap(DEBUG_sched, cap,
547 "allocating new stack chunk of size %d bytes",
548 RtsFlags.GcFlags.stkChunkSize * sizeof(W_));
550 old_stack = tso->stackobj;
552 new_stack = (StgStack*) allocate(cap, RtsFlags.GcFlags.stkChunkSize);
553 SET_HDR(new_stack, &stg_STACK_info, CCS_SYSTEM);
554 TICK_ALLOC_STACK(RtsFlags.GcFlags.stkChunkSize);
556 new_stack->dirty = 0; // begin clean, we'll mark it dirty below
557 new_stack->stack_size = RtsFlags.GcFlags.stkChunkSize - sizeofW(StgStack);
558 new_stack->sp = new_stack->stack + new_stack->stack_size;
560 tso->tot_stack_size += new_stack->stack_size;
562 new_stack->sp -= sizeofW(StgUnderflowFrame);
563 frame = (StgUnderflowFrame*)new_stack->sp;
564 frame->info = &stg_stack_underflow_frame_info;
565 frame->next_chunk = old_stack;
569 nat chunk_words, size;
571 // find the boundary of the chunk of old stack we're going to
572 // copy to the new stack. We skip over stack frames until we
573 // reach the smaller of
575 // * the chunk buffer size (+RTS -kb)
576 // * the end of the old stack
578 for (sp = old_stack->sp;
579 sp < stg_min(old_stack->sp + RtsFlags.GcFlags.stkChunkBufferSize,
580 old_stack->stack + old_stack->stack_size); )
582 size = stack_frame_sizeW((StgClosure*)sp);
584 // if including this frame would exceed the size of the
585 // new stack (taking into account the underflow frame),
586 // then stop at the previous frame.
587 if (sp + size > old_stack->stack + (new_stack->stack_size -
588 sizeofW(StgUnderflowFrame))) {
594 // copy the stack chunk between tso->sp and sp to
595 // new_tso->sp + (tso->sp - sp)
596 chunk_words = sp - old_stack->sp;
598 memcpy(/* dest */ new_stack->sp - chunk_words,
599 /* source */ old_stack->sp,
600 /* size */ chunk_words * sizeof(W_));
602 old_stack->sp += chunk_words;
603 new_stack->sp -= chunk_words;
606 // if the old stack chunk is now empty, discard it. With the
607 // default settings, -ki1k -kb1k, this means the first stack chunk
608 // will be discarded after the first overflow, being replaced by a
609 // non-moving 32k chunk.
610 if (old_stack->sp == old_stack->stack + old_stack->stack_size) {
611 frame->next_chunk = new_stack;
614 tso->stackobj = new_stack;
616 // we're about to run it, better mark it dirty
617 dirty_STACK(cap, new_stack);
619 IF_DEBUG(sanity,checkTSO(tso));
620 // IF_DEBUG(scheduler,printTSO(new_tso));
624 /* ---------------------------------------------------------------------------
625 Stack underflow - called from the stg_stack_underflow_info frame
626 ------------------------------------------------------------------------ */
628 nat // returns offset to the return address
629 threadStackUnderflow (Capability *cap, StgTSO *tso)
631 StgStack *new_stack, *old_stack;
632 StgUnderflowFrame *frame;
635 debugTraceCap(DEBUG_sched, cap, "stack underflow");
637 old_stack = tso->stackobj;
639 frame = (StgUnderflowFrame*)(old_stack->stack + old_stack->stack_size
640 - sizeofW(StgUnderflowFrame));
641 ASSERT(frame->info == &stg_stack_underflow_frame_info);
643 new_stack = (StgStack*)frame->next_chunk;
644 tso->stackobj = new_stack;
646 retvals = (P_)frame - old_stack->sp;
649 // we have some return values to copy to the old stack
650 if ((new_stack->sp - new_stack->stack) < retvals)
652 barf("threadStackUnderflow: not enough space for return values");
655 new_stack->sp -= retvals;
657 memcpy(/* dest */ new_stack->sp,
658 /* src */ old_stack->sp,
659 /* size */ retvals * sizeof(W_));
662 // empty the old stack. The GC may still visit this object
663 // because it is on the mutable list.
664 old_stack->sp = old_stack->stack + old_stack->stack_size;
666 // restore the stack parameters, and update tot_stack_size
667 tso->tot_stack_size -= old_stack->stack_size;
669 // we're about to run it, better mark it dirty
670 dirty_STACK(cap, new_stack);
675 /* ----------------------------------------------------------------------------
676 * Debugging: why is a thread blocked
677 * ------------------------------------------------------------------------- */
681 printThreadBlockage(StgTSO *tso)
683 switch (tso->why_blocked) {
685 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
688 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
690 #if defined(mingw32_HOST_OS)
691 case BlockedOnDoProc:
692 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
696 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
699 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
701 case BlockedOnBlackHole:
702 debugBelch("is blocked on a black hole %p",
703 ((StgBlockingQueue*)tso->block_info.bh->bh));
705 case BlockedOnMsgThrowTo:
706 debugBelch("is blocked on a throwto message");
709 debugBelch("is not blocked");
711 case ThreadMigrating:
712 debugBelch("is runnable, but not on the run queue");
715 debugBelch("is blocked on an external call");
717 case BlockedOnCCall_Interruptible:
718 debugBelch("is blocked on an external call (but may be interrupted)");
721 debugBelch("is blocked on an STM operation");
724 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
725 tso->why_blocked, tso->id, tso);
731 printThreadStatus(StgTSO *t)
733 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
735 void *label = lookupThreadLabel(t->id);
736 if (label) debugBelch("[\"%s\"] ",(char *)label);
738 switch (t->what_next) {
740 debugBelch("has been killed");
743 debugBelch("has completed");
746 printThreadBlockage(t);
749 debugBelch(" (TSO_DIRTY)");
755 printAllThreads(void)
761 debugBelch("all threads:\n");
763 for (i = 0; i < n_capabilities; i++) {
764 cap = &capabilities[i];
765 debugBelch("threads on capability %d:\n", cap->no);
766 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
767 printThreadStatus(t);
771 debugBelch("other threads:\n");
772 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
773 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
774 if (t->why_blocked != NotBlocked) {
775 printThreadStatus(t);
777 next = t->global_link;
784 printThreadQueue(StgTSO *t)
787 for (; t != END_TSO_QUEUE; t = t->_link) {
788 printThreadStatus(t);
791 debugBelch("%d threads on queue\n", i);