1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "sm/Storage.h"
23 /* Next thread ID to allocate.
26 static StgThreadID next_thread_id = 1;
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
40 /* ---------------------------------------------------------------------------
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
54 createThread(Capability *cap, nat size)
59 /* sched_mutex is *not* required */
61 /* First check whether we should create a thread at all */
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
95 tso->sp = (P_)&(tso->stack) + stack_size;
100 tso->prof.CCCS = CCS_MAIN;
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
108 /* Link the new thread on the global thread list.
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
114 RELEASE_LOCK(&sched_mutex);
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
130 cmp_thread(StgPtr tso1, StgPtr tso2)
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
146 rts_getThreadId(StgPtr tso)
148 return ((StgTSO *)tso)->id;
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
165 setTSOLink(cap,prev,t->_link);
173 barf("removeThreadFromQueue: not found");
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
181 rtsBool flag = rtsFalse;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187 setTSOLink(cap,prev,t->_link);
197 *tail = END_TSO_QUEUE;
205 barf("removeThreadFromMVarQueue: not found");
208 /* ----------------------------------------------------------------------------
211 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
212 always safe to call it too many times, but it is not safe in
213 general to omit a call.
215 ------------------------------------------------------------------------- */
218 tryWakeupThread (Capability *cap, StgTSO *tso)
220 tryWakeupThread_(cap, deRefTSO(tso));
224 tryWakeupThread_ (Capability *cap, StgTSO *tso)
226 traceEventThreadWakeup (cap, tso, tso->cap->no);
232 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
233 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
235 sendMessage(cap, tso->cap, (Message*)msg);
236 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
237 (lnat)tso->id, tso->cap->no);
242 switch (tso->why_blocked)
246 if (tso->_link == END_TSO_QUEUE) {
247 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
254 case BlockedOnMsgThrowTo:
256 const StgInfoTable *i;
258 i = lockClosure(tso->block_info.closure);
259 unlockClosure(tso->block_info.closure, i);
260 if (i != &stg_MSG_NULL_info) {
261 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
262 (lnat)tso->id, tso->block_info.throwto->header.info);
266 // remove the block frame from the stack
267 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
272 case BlockedOnBlackHole:
274 case ThreadMigrating:
278 // otherwise, do nothing
283 // just run the thread now, if the BH is not really available,
284 // we'll block again.
285 tso->why_blocked = NotBlocked;
286 appendToRunQueue(cap,tso);
288 // We used to set the context switch flag here, which would
289 // trigger a context switch a short time in the future (at the end
290 // of the current nursery block). The idea is that we have just
291 // woken up a thread, so we may need to load-balance and migrate
292 // threads to other CPUs. On the other hand, setting the context
293 // switch flag here unfairly penalises the current thread by
294 // yielding its time slice too early.
296 // The synthetic benchmark nofib/smp/chan can be used to show the
297 // difference quite clearly.
299 // cap->context_switch = 1;
302 /* ----------------------------------------------------------------------------
304 ------------------------------------------------------------------------- */
307 migrateThread (Capability *from, StgTSO *tso, Capability *to)
309 traceEventMigrateThread (from, tso, to->no);
310 // ThreadMigrating tells the target cap that it needs to be added to
311 // the run queue when it receives the MSG_TRY_WAKEUP.
312 tso->why_blocked = ThreadMigrating;
314 tryWakeupThread(from, tso);
317 /* ----------------------------------------------------------------------------
320 wakes up all the threads on the specified queue.
321 ------------------------------------------------------------------------- */
324 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
326 MessageBlackHole *msg;
327 const StgInfoTable *i;
329 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
330 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
332 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
334 i = msg->header.info;
335 if (i != &stg_IND_info) {
336 ASSERT(i == &stg_MSG_BLACKHOLE_info);
337 tryWakeupThread(cap,msg->tso);
341 // overwrite the BQ with an indirection so it will be
342 // collected at the next GC.
343 #if defined(DEBUG) && !defined(THREADED_RTS)
344 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
345 // another thread might be looking at this BLOCKING_QUEUE and
346 // checking the owner field at the same time.
347 bq->bh = 0; bq->queue = 0; bq->owner = 0;
349 OVERWRITE_INFO(bq, &stg_IND_info);
352 // If we update a closure that we know we BLACKHOLE'd, and the closure
353 // no longer points to the current TSO as its owner, then there may be
354 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
355 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
356 // current TSO to see if any can now be woken up.
358 checkBlockingQueues (Capability *cap, StgTSO *tso)
360 StgBlockingQueue *bq, *next;
363 debugTraceCap(DEBUG_sched, cap,
364 "collision occurred; checking blocking queues for thread %ld",
367 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
370 if (bq->header.info == &stg_IND_info) {
371 // ToDo: could short it out right here, to avoid
372 // traversing this IND multiple times.
378 if (p->header.info != &stg_BLACKHOLE_info ||
379 ((StgInd *)p)->indirectee != (StgClosure*)bq)
381 wakeBlockingQueue(cap,bq);
386 /* ----------------------------------------------------------------------------
389 Update a thunk with a value. In order to do this, we need to know
390 which TSO owns (or is evaluating) the thunk, in case we need to
391 awaken any threads that are blocked on it.
392 ------------------------------------------------------------------------- */
395 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
399 const StgInfoTable *i;
401 i = thunk->header.info;
402 if (i != &stg_BLACKHOLE_info &&
403 i != &stg_CAF_BLACKHOLE_info &&
404 i != &__stg_EAGER_BLACKHOLE_info &&
405 i != &stg_WHITEHOLE_info) {
406 updateWithIndirection(cap, thunk, val);
410 v = ((StgInd*)thunk)->indirectee;
412 updateWithIndirection(cap, thunk, val);
415 if (i == &stg_TSO_info) {
416 owner = deRefTSO((StgTSO*)v);
418 checkBlockingQueues(cap, tso);
423 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
424 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
425 checkBlockingQueues(cap, tso);
429 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
432 checkBlockingQueues(cap, tso);
434 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
438 /* ---------------------------------------------------------------------------
439 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
440 * used by Control.Concurrent for error checking.
441 * ------------------------------------------------------------------------- */
444 rtsSupportsBoundThreads(void)
446 #if defined(THREADED_RTS)
449 return HS_BOOL_FALSE;
453 /* ---------------------------------------------------------------------------
454 * isThreadBound(tso): check whether tso is bound to an OS thread.
455 * ------------------------------------------------------------------------- */
458 isThreadBound(StgTSO* tso USED_IF_THREADS)
460 #if defined(THREADED_RTS)
461 return (tso->bound != NULL);
466 /* ----------------------------------------------------------------------------
467 * Debugging: why is a thread blocked
468 * ------------------------------------------------------------------------- */
472 printThreadBlockage(StgTSO *tso)
474 switch (tso->why_blocked) {
476 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
479 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
481 #if defined(mingw32_HOST_OS)
482 case BlockedOnDoProc:
483 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
487 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
490 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
492 case BlockedOnBlackHole:
493 debugBelch("is blocked on a black hole %p",
494 ((StgBlockingQueue*)tso->block_info.bh->bh));
496 case BlockedOnMsgThrowTo:
497 debugBelch("is blocked on a throwto message");
500 debugBelch("is not blocked");
502 case ThreadMigrating:
503 debugBelch("is runnable, but not on the run queue");
506 debugBelch("is blocked on an external call");
508 case BlockedOnCCall_Interruptible:
509 debugBelch("is blocked on an external call (but may be interrupted)");
512 debugBelch("is blocked on an STM operation");
515 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
516 tso->why_blocked, tso->id, tso);
522 printThreadStatus(StgTSO *t)
524 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
526 void *label = lookupThreadLabel(t->id);
527 if (label) debugBelch("[\"%s\"] ",(char *)label);
529 if (t->what_next == ThreadRelocated) {
530 debugBelch("has been relocated...\n");
532 switch (t->what_next) {
534 debugBelch("has been killed");
537 debugBelch("has completed");
540 printThreadBlockage(t);
543 debugBelch(" (TSO_DIRTY)");
544 } else if (t->flags & TSO_LINK_DIRTY) {
545 debugBelch(" (TSO_LINK_DIRTY)");
552 printAllThreads(void)
558 debugBelch("all threads:\n");
560 for (i = 0; i < n_capabilities; i++) {
561 cap = &capabilities[i];
562 debugBelch("threads on capability %d:\n", cap->no);
563 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
564 printThreadStatus(t);
568 debugBelch("other threads:\n");
569 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
570 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
571 if (t->why_blocked != NotBlocked) {
572 printThreadStatus(t);
574 if (t->what_next == ThreadRelocated) {
577 next = t->global_link;
585 printThreadQueue(StgTSO *t)
588 for (; t != END_TSO_QUEUE; t = t->_link) {
589 printThreadStatus(t);
592 debugBelch("%d threads on queue\n", i);