1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "sm/Storage.h"
23 /* Next thread ID to allocate.
26 static StgThreadID next_thread_id = 1;
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
40 /* ---------------------------------------------------------------------------
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
54 createThread(Capability *cap, nat size)
59 /* sched_mutex is *not* required */
61 /* First check whether we should create a thread at all */
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
95 tso->sp = (P_)&(tso->stack) + stack_size;
100 tso->prof.CCCS = CCS_MAIN;
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
108 /* Link the new thread on the global thread list.
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
114 RELEASE_LOCK(&sched_mutex);
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
130 cmp_thread(StgPtr tso1, StgPtr tso2)
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
146 rts_getThreadId(StgPtr tso)
148 return ((StgTSO *)tso)->id;
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
165 setTSOLink(cap,prev,t->_link);
166 t->_link = END_TSO_QUEUE;
170 t->_link = END_TSO_QUEUE;
175 barf("removeThreadFromQueue: not found");
178 rtsBool // returns True if we modified head or tail
179 removeThreadFromDeQueue (Capability *cap,
180 StgTSO **head, StgTSO **tail, StgTSO *tso)
183 rtsBool flag = rtsFalse;
186 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
189 setTSOLink(cap,prev,t->_link);
195 t->_link = END_TSO_QUEUE;
200 *tail = END_TSO_QUEUE;
208 barf("removeThreadFromMVarQueue: not found");
211 /* ----------------------------------------------------------------------------
214 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
215 always safe to call it too many times, but it is not safe in
216 general to omit a call.
218 ------------------------------------------------------------------------- */
221 tryWakeupThread (Capability *cap, StgTSO *tso)
223 tryWakeupThread_(cap, deRefTSO(tso));
227 tryWakeupThread_ (Capability *cap, StgTSO *tso)
229 traceEventThreadWakeup (cap, tso, tso->cap->no);
235 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
236 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
238 sendMessage(cap, tso->cap, (Message*)msg);
239 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
240 (lnat)tso->id, tso->cap->no);
245 switch (tso->why_blocked)
249 if (tso->_link == END_TSO_QUEUE) {
250 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
257 case BlockedOnMsgThrowTo:
259 const StgInfoTable *i;
261 i = lockClosure(tso->block_info.closure);
262 unlockClosure(tso->block_info.closure, i);
263 if (i != &stg_MSG_NULL_info) {
264 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
265 (lnat)tso->id, tso->block_info.throwto->header.info);
269 // remove the block frame from the stack
270 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
275 case BlockedOnBlackHole:
277 case ThreadMigrating:
281 // otherwise, do nothing
286 // just run the thread now, if the BH is not really available,
287 // we'll block again.
288 tso->why_blocked = NotBlocked;
289 appendToRunQueue(cap,tso);
291 // We used to set the context switch flag here, which would
292 // trigger a context switch a short time in the future (at the end
293 // of the current nursery block). The idea is that we have just
294 // woken up a thread, so we may need to load-balance and migrate
295 // threads to other CPUs. On the other hand, setting the context
296 // switch flag here unfairly penalises the current thread by
297 // yielding its time slice too early.
299 // The synthetic benchmark nofib/smp/chan can be used to show the
300 // difference quite clearly.
302 // cap->context_switch = 1;
305 /* ----------------------------------------------------------------------------
307 ------------------------------------------------------------------------- */
310 migrateThread (Capability *from, StgTSO *tso, Capability *to)
312 traceEventMigrateThread (from, tso, to->no);
313 // ThreadMigrating tells the target cap that it needs to be added to
314 // the run queue when it receives the MSG_TRY_WAKEUP.
315 tso->why_blocked = ThreadMigrating;
317 tryWakeupThread(from, tso);
320 /* ----------------------------------------------------------------------------
323 wakes up all the threads on the specified queue.
324 ------------------------------------------------------------------------- */
327 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
329 MessageBlackHole *msg;
330 const StgInfoTable *i;
332 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
333 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
335 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
337 i = msg->header.info;
338 if (i != &stg_IND_info) {
339 ASSERT(i == &stg_MSG_BLACKHOLE_info);
340 tryWakeupThread(cap,msg->tso);
344 // overwrite the BQ with an indirection so it will be
345 // collected at the next GC.
346 #if defined(DEBUG) && !defined(THREADED_RTS)
347 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
348 // another thread might be looking at this BLOCKING_QUEUE and
349 // checking the owner field at the same time.
350 bq->bh = 0; bq->queue = 0; bq->owner = 0;
352 OVERWRITE_INFO(bq, &stg_IND_info);
355 // If we update a closure that we know we BLACKHOLE'd, and the closure
356 // no longer points to the current TSO as its owner, then there may be
357 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
358 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
359 // current TSO to see if any can now be woken up.
361 checkBlockingQueues (Capability *cap, StgTSO *tso)
363 StgBlockingQueue *bq, *next;
366 debugTraceCap(DEBUG_sched, cap,
367 "collision occurred; checking blocking queues for thread %ld",
370 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
373 if (bq->header.info == &stg_IND_info) {
374 // ToDo: could short it out right here, to avoid
375 // traversing this IND multiple times.
381 if (p->header.info != &stg_BLACKHOLE_info ||
382 ((StgInd *)p)->indirectee != (StgClosure*)bq)
384 wakeBlockingQueue(cap,bq);
389 /* ----------------------------------------------------------------------------
392 Update a thunk with a value. In order to do this, we need to know
393 which TSO owns (or is evaluating) the thunk, in case we need to
394 awaken any threads that are blocked on it.
395 ------------------------------------------------------------------------- */
398 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
402 const StgInfoTable *i;
404 i = thunk->header.info;
405 if (i != &stg_BLACKHOLE_info &&
406 i != &stg_CAF_BLACKHOLE_info &&
407 i != &__stg_EAGER_BLACKHOLE_info &&
408 i != &stg_WHITEHOLE_info) {
409 updateWithIndirection(cap, thunk, val);
413 v = ((StgInd*)thunk)->indirectee;
415 updateWithIndirection(cap, thunk, val);
418 if (i == &stg_TSO_info) {
419 owner = deRefTSO((StgTSO*)v);
421 checkBlockingQueues(cap, tso);
426 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
427 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
428 checkBlockingQueues(cap, tso);
432 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
435 checkBlockingQueues(cap, tso);
437 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
441 /* ---------------------------------------------------------------------------
442 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
443 * used by Control.Concurrent for error checking.
444 * ------------------------------------------------------------------------- */
447 rtsSupportsBoundThreads(void)
449 #if defined(THREADED_RTS)
452 return HS_BOOL_FALSE;
456 /* ---------------------------------------------------------------------------
457 * isThreadBound(tso): check whether tso is bound to an OS thread.
458 * ------------------------------------------------------------------------- */
461 isThreadBound(StgTSO* tso USED_IF_THREADS)
463 #if defined(THREADED_RTS)
464 return (tso->bound != NULL);
469 /* ----------------------------------------------------------------------------
470 * Debugging: why is a thread blocked
471 * ------------------------------------------------------------------------- */
475 printThreadBlockage(StgTSO *tso)
477 switch (tso->why_blocked) {
479 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
482 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
484 #if defined(mingw32_HOST_OS)
485 case BlockedOnDoProc:
486 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
490 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
493 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
495 case BlockedOnBlackHole:
496 debugBelch("is blocked on a black hole %p",
497 ((StgBlockingQueue*)tso->block_info.bh->bh));
499 case BlockedOnMsgThrowTo:
500 debugBelch("is blocked on a throwto message");
503 debugBelch("is not blocked");
505 case ThreadMigrating:
506 debugBelch("is runnable, but not on the run queue");
509 debugBelch("is blocked on an external call");
511 case BlockedOnCCall_Interruptible:
512 debugBelch("is blocked on an external call (but may be interrupted)");
515 debugBelch("is blocked on an STM operation");
518 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
519 tso->why_blocked, tso->id, tso);
525 printThreadStatus(StgTSO *t)
527 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
529 void *label = lookupThreadLabel(t->id);
530 if (label) debugBelch("[\"%s\"] ",(char *)label);
532 if (t->what_next == ThreadRelocated) {
533 debugBelch("has been relocated...\n");
535 switch (t->what_next) {
537 debugBelch("has been killed");
540 debugBelch("has completed");
543 printThreadBlockage(t);
546 debugBelch(" (TSO_DIRTY)");
547 } else if (t->flags & TSO_LINK_DIRTY) {
548 debugBelch(" (TSO_LINK_DIRTY)");
555 printAllThreads(void)
561 debugBelch("all threads:\n");
563 for (i = 0; i < n_capabilities; i++) {
564 cap = &capabilities[i];
565 debugBelch("threads on capability %d:\n", cap->no);
566 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
567 printThreadStatus(t);
571 debugBelch("other threads:\n");
572 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
573 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
574 if (t->why_blocked != NotBlocked) {
575 printThreadStatus(t);
577 if (t->what_next == ThreadRelocated) {
580 next = t->global_link;
588 printThreadQueue(StgTSO *t)
591 for (; t != END_TSO_QUEUE; t = t->_link) {
592 printThreadStatus(t);
595 debugBelch("%d threads on queue\n", i);