1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "RaiseAsync.h"
24 #include "sm/Sanity.h"
25 #include "sm/Storage.h"
29 /* Next thread ID to allocate.
32 static StgThreadID next_thread_id = 1;
34 /* The smallest stack size that makes any sense is:
35 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
36 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
37 * + 1 (the closure to enter)
39 * + 1 (spare slot req'd by stg_ap_v_ret)
41 * A thread with this stack will bomb immediately with a stack
42 * overflow, which will increase its stack size.
44 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
46 /* ---------------------------------------------------------------------------
49 The new thread starts with the given stack size. Before the
50 scheduler can run, however, this thread needs to have a closure
51 (and possibly some arguments) pushed on its stack. See
52 pushClosure() in Schedule.h.
54 createGenThread() and createIOThread() (in SchedAPI.h) are
55 convenient packaged versions of this function.
57 currently pri (priority) is only used in a GRAN setup -- HWL
58 ------------------------------------------------------------------------ */
60 createThread(Capability *cap, nat size)
66 /* sched_mutex is *not* required */
68 /* catch ridiculously small stack sizes */
69 if (size < MIN_STACK_WORDS + sizeofW(StgStack)) {
70 size = MIN_STACK_WORDS + sizeofW(StgStack);
73 /* The size argument we are given includes all the per-thread
79 * This is so that we can use a nice round power of 2 for the
80 * default stack size (e.g. 1k), and if we're allocating lots of
81 * threads back-to-back they'll fit nicely in a block. It's a bit
82 * of a benchmark hack, but it doesn't do any harm.
84 stack_size = round_to_mblocks(size - sizeofW(StgTSO));
85 stack = (StgStack *)allocate(cap, stack_size);
86 TICK_ALLOC_STACK(stack_size);
87 SET_HDR(stack, &stg_STACK_info, CCS_SYSTEM);
88 stack->stack_size = stack_size - sizeofW(StgStack);
89 stack->sp = stack->stack + stack->stack_size;
92 tso = (StgTSO *)allocate(cap, sizeofW(StgTSO));
94 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
96 // Always start with the compiled code evaluator
97 tso->what_next = ThreadRunGHC;
98 tso->why_blocked = NotBlocked;
99 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
100 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
101 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
104 tso->_link = END_TSO_QUEUE;
106 tso->saved_errno = 0;
110 tso->stackobj = stack;
111 tso->tot_stack_size = stack->stack_size;
116 tso->prof.CCCS = CCS_MAIN;
119 // put a stop frame on the stack
120 stack->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)stack->sp,
122 (StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
124 /* Link the new thread on the global thread list.
126 ACQUIRE_LOCK(&sched_mutex);
127 tso->id = next_thread_id++; // while we have the mutex
128 tso->global_link = g0->threads;
130 RELEASE_LOCK(&sched_mutex);
132 // ToDo: report the stack size in the event?
133 traceEventCreateThread(cap, tso);
138 /* ---------------------------------------------------------------------------
139 * Comparing Thread ids.
141 * This is used from STG land in the implementation of the
142 * instances of Eq/Ord for ThreadIds.
143 * ------------------------------------------------------------------------ */
146 cmp_thread(StgPtr tso1, StgPtr tso2)
148 StgThreadID id1 = ((StgTSO *)tso1)->id;
149 StgThreadID id2 = ((StgTSO *)tso2)->id;
151 if (id1 < id2) return (-1);
152 if (id1 > id2) return 1;
156 /* ---------------------------------------------------------------------------
157 * Fetching the ThreadID from an StgTSO.
159 * This is used in the implementation of Show for ThreadIds.
160 * ------------------------------------------------------------------------ */
162 rts_getThreadId(StgPtr tso)
164 return ((StgTSO *)tso)->id;
167 /* -----------------------------------------------------------------------------
168 Remove a thread from a queue.
169 Fails fatally if the TSO is not on the queue.
170 -------------------------------------------------------------------------- */
172 rtsBool // returns True if we modified queue
173 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
178 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
181 setTSOLink(cap,prev,t->_link);
182 t->_link = END_TSO_QUEUE;
186 t->_link = END_TSO_QUEUE;
191 barf("removeThreadFromQueue: not found");
194 rtsBool // returns True if we modified head or tail
195 removeThreadFromDeQueue (Capability *cap,
196 StgTSO **head, StgTSO **tail, StgTSO *tso)
199 rtsBool flag = rtsFalse;
202 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
205 setTSOLink(cap,prev,t->_link);
211 t->_link = END_TSO_QUEUE;
216 *tail = END_TSO_QUEUE;
224 barf("removeThreadFromMVarQueue: not found");
227 /* ----------------------------------------------------------------------------
230 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
231 always safe to call it too many times, but it is not safe in
232 general to omit a call.
234 ------------------------------------------------------------------------- */
237 tryWakeupThread (Capability *cap, StgTSO *tso)
239 traceEventThreadWakeup (cap, tso, tso->cap->no);
245 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
246 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
248 sendMessage(cap, tso->cap, (Message*)msg);
249 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
250 (lnat)tso->id, tso->cap->no);
255 switch (tso->why_blocked)
259 if (tso->_link == END_TSO_QUEUE) {
260 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
267 case BlockedOnMsgThrowTo:
269 const StgInfoTable *i;
271 i = lockClosure(tso->block_info.closure);
272 unlockClosure(tso->block_info.closure, i);
273 if (i != &stg_MSG_NULL_info) {
274 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
275 (lnat)tso->id, tso->block_info.throwto->header.info);
279 // remove the block frame from the stack
280 ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
281 tso->stackobj->sp += 3;
285 case BlockedOnBlackHole:
287 case ThreadMigrating:
291 // otherwise, do nothing
296 // just run the thread now, if the BH is not really available,
297 // we'll block again.
298 tso->why_blocked = NotBlocked;
299 appendToRunQueue(cap,tso);
301 // We used to set the context switch flag here, which would
302 // trigger a context switch a short time in the future (at the end
303 // of the current nursery block). The idea is that we have just
304 // woken up a thread, so we may need to load-balance and migrate
305 // threads to other CPUs. On the other hand, setting the context
306 // switch flag here unfairly penalises the current thread by
307 // yielding its time slice too early.
309 // The synthetic benchmark nofib/smp/chan can be used to show the
310 // difference quite clearly.
312 // cap->context_switch = 1;
315 /* ----------------------------------------------------------------------------
317 ------------------------------------------------------------------------- */
320 migrateThread (Capability *from, StgTSO *tso, Capability *to)
322 traceEventMigrateThread (from, tso, to->no);
323 // ThreadMigrating tells the target cap that it needs to be added to
324 // the run queue when it receives the MSG_TRY_WAKEUP.
325 tso->why_blocked = ThreadMigrating;
327 tryWakeupThread(from, tso);
330 /* ----------------------------------------------------------------------------
333 wakes up all the threads on the specified queue.
334 ------------------------------------------------------------------------- */
337 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
339 MessageBlackHole *msg;
340 const StgInfoTable *i;
342 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
343 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
345 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
347 i = msg->header.info;
348 if (i != &stg_IND_info) {
349 ASSERT(i == &stg_MSG_BLACKHOLE_info);
350 tryWakeupThread(cap,msg->tso);
354 // overwrite the BQ with an indirection so it will be
355 // collected at the next GC.
356 #if defined(DEBUG) && !defined(THREADED_RTS)
357 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
358 // another thread might be looking at this BLOCKING_QUEUE and
359 // checking the owner field at the same time.
360 bq->bh = 0; bq->queue = 0; bq->owner = 0;
362 OVERWRITE_INFO(bq, &stg_IND_info);
365 // If we update a closure that we know we BLACKHOLE'd, and the closure
366 // no longer points to the current TSO as its owner, then there may be
367 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
368 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
369 // current TSO to see if any can now be woken up.
371 checkBlockingQueues (Capability *cap, StgTSO *tso)
373 StgBlockingQueue *bq, *next;
376 debugTraceCap(DEBUG_sched, cap,
377 "collision occurred; checking blocking queues for thread %ld",
380 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
383 if (bq->header.info == &stg_IND_info) {
384 // ToDo: could short it out right here, to avoid
385 // traversing this IND multiple times.
391 if (p->header.info != &stg_BLACKHOLE_info ||
392 ((StgInd *)p)->indirectee != (StgClosure*)bq)
394 wakeBlockingQueue(cap,bq);
399 /* ----------------------------------------------------------------------------
402 Update a thunk with a value. In order to do this, we need to know
403 which TSO owns (or is evaluating) the thunk, in case we need to
404 awaken any threads that are blocked on it.
405 ------------------------------------------------------------------------- */
408 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
412 const StgInfoTable *i;
414 i = thunk->header.info;
415 if (i != &stg_BLACKHOLE_info &&
416 i != &stg_CAF_BLACKHOLE_info &&
417 i != &__stg_EAGER_BLACKHOLE_info &&
418 i != &stg_WHITEHOLE_info) {
419 updateWithIndirection(cap, thunk, val);
423 v = ((StgInd*)thunk)->indirectee;
425 updateWithIndirection(cap, thunk, val);
428 if (i == &stg_TSO_info) {
431 checkBlockingQueues(cap, tso);
436 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
437 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
438 checkBlockingQueues(cap, tso);
442 owner = ((StgBlockingQueue*)v)->owner;
445 checkBlockingQueues(cap, tso);
447 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
451 /* ---------------------------------------------------------------------------
452 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
453 * used by Control.Concurrent for error checking.
454 * ------------------------------------------------------------------------- */
457 rtsSupportsBoundThreads(void)
459 #if defined(THREADED_RTS)
462 return HS_BOOL_FALSE;
466 /* ---------------------------------------------------------------------------
467 * isThreadBound(tso): check whether tso is bound to an OS thread.
468 * ------------------------------------------------------------------------- */
471 isThreadBound(StgTSO* tso USED_IF_THREADS)
473 #if defined(THREADED_RTS)
474 return (tso->bound != NULL);
479 /* -----------------------------------------------------------------------------
482 If the thread has reached its maximum stack size, then raise the
483 StackOverflow exception in the offending thread. Otherwise
484 relocate the TSO into a larger chunk of memory and adjust its stack
486 -------------------------------------------------------------------------- */
489 threadStackOverflow (Capability *cap, StgTSO *tso)
491 StgStack *new_stack, *old_stack;
492 StgUnderflowFrame *frame;
495 IF_DEBUG(sanity,checkTSO(tso));
497 if (tso->tot_stack_size >= RtsFlags.GcFlags.maxStkSize
498 && !(tso->flags & TSO_BLOCKEX)) {
499 // NB. never raise a StackOverflow exception if the thread is
500 // inside Control.Exceptino.block. It is impractical to protect
501 // against stack overflow exceptions, since virtually anything
502 // can raise one (even 'catch'), so this is the only sensible
503 // thing to do here. See bug #767.
506 if (tso->flags & TSO_SQUEEZED) {
509 // #3677: In a stack overflow situation, stack squeezing may
510 // reduce the stack size, but we don't know whether it has been
511 // reduced enough for the stack check to succeed if we try
512 // again. Fortunately stack squeezing is idempotent, so all we
513 // need to do is record whether *any* squeezing happened. If we
514 // are at the stack's absolute -K limit, and stack squeezing
515 // happened, then we try running the thread again. The
516 // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
517 // squeezing happened or not.
520 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
521 (long)tso->id, tso, (long)tso->stackobj->stack_size,
522 RtsFlags.GcFlags.maxStkSize);
524 /* If we're debugging, just print out the top of the stack */
525 printStackChunk(tso->stackobj->sp,
526 stg_min(tso->stackobj->stack + tso->stackobj->stack_size,
527 tso->stackobj->sp+64)));
529 // Send this thread the StackOverflow exception
530 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
534 // We also want to avoid enlarging the stack if squeezing has
535 // already released some of it. However, we don't want to get into
536 // a pathalogical situation where a thread has a nearly full stack
537 // (near its current limit, but not near the absolute -K limit),
538 // keeps allocating a little bit, squeezing removes a little bit,
539 // and then it runs again. So to avoid this, if we squeezed *and*
540 // there is still less than BLOCK_SIZE_W words free, then we enlarge
542 if ((tso->flags & TSO_SQUEEZED) &&
543 ((W_)(tso->stackobj->sp - tso->stackobj->stack) >= BLOCK_SIZE_W)) {
547 old_stack = tso->stackobj;
549 // If we used less than half of the previous stack chunk, then we
550 // must have failed a stack check for a large amount of stack. In
551 // this case we allocate a double-sized chunk to try to
552 // accommodate the large stack request. If that also fails, the
553 // next chunk will be 4x normal size, and so on.
555 // It would be better to have the mutator tell us how much stack
556 // was needed, as we do with heap allocations, but this works for
559 if (old_stack->sp > old_stack->stack + old_stack->stack_size / 2)
561 chunk_size = 2 * (old_stack->stack_size + sizeofW(StgStack));
565 chunk_size = RtsFlags.GcFlags.stkChunkSize;
568 debugTraceCap(DEBUG_sched, cap,
569 "allocating new stack chunk of size %d bytes",
570 chunk_size * sizeof(W_));
572 new_stack = (StgStack*) allocate(cap, chunk_size);
573 SET_HDR(new_stack, &stg_STACK_info, CCS_SYSTEM);
574 TICK_ALLOC_STACK(chunk_size);
576 new_stack->dirty = 0; // begin clean, we'll mark it dirty below
577 new_stack->stack_size = chunk_size - sizeofW(StgStack);
578 new_stack->sp = new_stack->stack + new_stack->stack_size;
580 tso->tot_stack_size += new_stack->stack_size;
582 new_stack->sp -= sizeofW(StgUnderflowFrame);
583 frame = (StgUnderflowFrame*)new_stack->sp;
584 frame->info = &stg_stack_underflow_frame_info;
585 frame->next_chunk = old_stack;
589 nat chunk_words, size;
591 // find the boundary of the chunk of old stack we're going to
592 // copy to the new stack. We skip over stack frames until we
593 // reach the smaller of
595 // * the chunk buffer size (+RTS -kb)
596 // * the end of the old stack
598 for (sp = old_stack->sp;
599 sp < stg_min(old_stack->sp + RtsFlags.GcFlags.stkChunkBufferSize,
600 old_stack->stack + old_stack->stack_size); )
602 size = stack_frame_sizeW((StgClosure*)sp);
604 // if including this frame would exceed the size of the
605 // new stack (taking into account the underflow frame),
606 // then stop at the previous frame.
607 if (sp + size > old_stack->stack + (new_stack->stack_size -
608 sizeofW(StgUnderflowFrame))) {
614 // copy the stack chunk between tso->sp and sp to
615 // new_tso->sp + (tso->sp - sp)
616 chunk_words = sp - old_stack->sp;
618 memcpy(/* dest */ new_stack->sp - chunk_words,
619 /* source */ old_stack->sp,
620 /* size */ chunk_words * sizeof(W_));
622 old_stack->sp += chunk_words;
623 new_stack->sp -= chunk_words;
626 // if the old stack chunk is now empty, discard it. With the
627 // default settings, -ki1k -kb1k, this means the first stack chunk
628 // will be discarded after the first overflow, being replaced by a
629 // non-moving 32k chunk.
630 if (old_stack->sp == old_stack->stack + old_stack->stack_size) {
631 frame->next_chunk = new_stack;
634 tso->stackobj = new_stack;
636 // we're about to run it, better mark it dirty
637 dirty_STACK(cap, new_stack);
639 IF_DEBUG(sanity,checkTSO(tso));
640 // IF_DEBUG(scheduler,printTSO(new_tso));
644 /* ---------------------------------------------------------------------------
645 Stack underflow - called from the stg_stack_underflow_info frame
646 ------------------------------------------------------------------------ */
648 nat // returns offset to the return address
649 threadStackUnderflow (Capability *cap, StgTSO *tso)
651 StgStack *new_stack, *old_stack;
652 StgUnderflowFrame *frame;
655 debugTraceCap(DEBUG_sched, cap, "stack underflow");
657 old_stack = tso->stackobj;
659 frame = (StgUnderflowFrame*)(old_stack->stack + old_stack->stack_size
660 - sizeofW(StgUnderflowFrame));
661 ASSERT(frame->info == &stg_stack_underflow_frame_info);
663 new_stack = (StgStack*)frame->next_chunk;
664 tso->stackobj = new_stack;
666 retvals = (P_)frame - old_stack->sp;
669 // we have some return values to copy to the old stack
670 if ((new_stack->sp - new_stack->stack) < retvals)
672 barf("threadStackUnderflow: not enough space for return values");
675 new_stack->sp -= retvals;
677 memcpy(/* dest */ new_stack->sp,
678 /* src */ old_stack->sp,
679 /* size */ retvals * sizeof(W_));
682 // empty the old stack. The GC may still visit this object
683 // because it is on the mutable list.
684 old_stack->sp = old_stack->stack + old_stack->stack_size;
686 // restore the stack parameters, and update tot_stack_size
687 tso->tot_stack_size -= old_stack->stack_size;
689 // we're about to run it, better mark it dirty
690 dirty_STACK(cap, new_stack);
695 /* ----------------------------------------------------------------------------
696 * Debugging: why is a thread blocked
697 * ------------------------------------------------------------------------- */
701 printThreadBlockage(StgTSO *tso)
703 switch (tso->why_blocked) {
705 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
708 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
710 #if defined(mingw32_HOST_OS)
711 case BlockedOnDoProc:
712 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
716 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
719 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
721 case BlockedOnBlackHole:
722 debugBelch("is blocked on a black hole %p",
723 ((StgBlockingQueue*)tso->block_info.bh->bh));
725 case BlockedOnMsgThrowTo:
726 debugBelch("is blocked on a throwto message");
729 debugBelch("is not blocked");
731 case ThreadMigrating:
732 debugBelch("is runnable, but not on the run queue");
735 debugBelch("is blocked on an external call");
737 case BlockedOnCCall_Interruptible:
738 debugBelch("is blocked on an external call (but may be interrupted)");
741 debugBelch("is blocked on an STM operation");
744 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
745 tso->why_blocked, tso->id, tso);
751 printThreadStatus(StgTSO *t)
753 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
755 void *label = lookupThreadLabel(t->id);
756 if (label) debugBelch("[\"%s\"] ",(char *)label);
758 switch (t->what_next) {
760 debugBelch("has been killed");
763 debugBelch("has completed");
766 printThreadBlockage(t);
769 debugBelch(" (TSO_DIRTY)");
775 printAllThreads(void)
781 debugBelch("all threads:\n");
783 for (i = 0; i < n_capabilities; i++) {
784 cap = &capabilities[i];
785 debugBelch("threads on capability %d:\n", cap->no);
786 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
787 printThreadStatus(t);
791 debugBelch("other threads:\n");
792 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
793 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
794 if (t->why_blocked != NotBlocked) {
795 printThreadStatus(t);
797 next = t->global_link;
804 printThreadQueue(StgTSO *t)
807 for (; t != END_TSO_QUEUE; t = t->_link) {
808 printThreadStatus(t);
811 debugBelch("%d threads on queue\n", i);