1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "sm/Storage.h"
23 /* Next thread ID to allocate.
26 static StgThreadID next_thread_id = 1;
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
40 /* ---------------------------------------------------------------------------
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
54 createThread(Capability *cap, nat size)
59 /* sched_mutex is *not* required */
61 /* First check whether we should create a thread at all */
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
95 tso->sp = (P_)&(tso->stack) + stack_size;
100 tso->prof.CCCS = CCS_MAIN;
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
108 /* Link the new thread on the global thread list.
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
114 RELEASE_LOCK(&sched_mutex);
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
130 cmp_thread(StgPtr tso1, StgPtr tso2)
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
146 rts_getThreadId(StgPtr tso)
148 return ((StgTSO *)tso)->id;
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
165 setTSOLink(cap,prev,t->_link);
173 barf("removeThreadFromQueue: not found");
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
181 rtsBool flag = rtsFalse;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187 setTSOLink(cap,prev,t->_link);
197 *tail = END_TSO_QUEUE;
205 barf("removeThreadFromMVarQueue: not found");
208 /* ----------------------------------------------------------------------------
211 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
212 always safe to call it too many times, but it is not safe in
213 general to omit a call.
215 ------------------------------------------------------------------------- */
218 tryWakeupThread (Capability *cap, StgTSO *tso_)
220 StgTSO *tso = deRefTSO(tso_);
222 traceEventThreadWakeup (cap, tso, tso->cap->no);
228 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
229 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
231 sendMessage(cap, tso->cap, (Message*)msg);
232 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
233 (lnat)tso->id, tso->cap->no);
238 switch (tso->why_blocked)
242 if (tso->_link == END_TSO_QUEUE) {
243 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
250 case BlockedOnMsgThrowTo:
252 const StgInfoTable *i;
254 i = lockClosure(tso->block_info.closure);
255 unlockClosure(tso->block_info.closure, i);
256 if (i != &stg_MSG_NULL_info) {
257 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
258 (lnat)tso->id, tso->block_info.throwto->header.info);
262 // remove the block frame from the stack
263 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
268 case BlockedOnBlackHole:
270 case ThreadMigrating:
274 // otherwise, do nothing
279 // just run the thread now, if the BH is not really available,
280 // we'll block again.
281 tso->why_blocked = NotBlocked;
282 appendToRunQueue(cap,tso);
285 /* ----------------------------------------------------------------------------
287 ------------------------------------------------------------------------- */
290 migrateThread (Capability *from, StgTSO *tso, Capability *to)
292 traceEventMigrateThread (from, tso, to->no);
293 // ThreadMigrating tells the target cap that it needs to be added to
294 // the run queue when it receives the MSG_TRY_WAKEUP.
295 tso->why_blocked = ThreadMigrating;
297 tryWakeupThread(from, tso);
300 /* ----------------------------------------------------------------------------
303 wakes up all the threads on the specified queue.
304 ------------------------------------------------------------------------- */
307 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
309 MessageBlackHole *msg;
310 const StgInfoTable *i;
312 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
313 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
315 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
317 i = msg->header.info;
318 if (i != &stg_IND_info) {
319 ASSERT(i == &stg_MSG_BLACKHOLE_info);
320 tryWakeupThread(cap,msg->tso);
324 // overwrite the BQ with an indirection so it will be
325 // collected at the next GC.
326 #if defined(DEBUG) && !defined(THREADED_RTS)
327 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
328 // another thread might be looking at this BLOCKING_QUEUE and
329 // checking the owner field at the same time.
330 bq->bh = 0; bq->queue = 0; bq->owner = 0;
332 OVERWRITE_INFO(bq, &stg_IND_info);
335 // If we update a closure that we know we BLACKHOLE'd, and the closure
336 // no longer points to the current TSO as its owner, then there may be
337 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
338 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
339 // current TSO to see if any can now be woken up.
341 checkBlockingQueues (Capability *cap, StgTSO *tso)
343 StgBlockingQueue *bq, *next;
346 debugTraceCap(DEBUG_sched, cap,
347 "collision occurred; checking blocking queues for thread %ld",
350 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
353 if (bq->header.info == &stg_IND_info) {
354 // ToDo: could short it out right here, to avoid
355 // traversing this IND multiple times.
361 if (p->header.info != &stg_BLACKHOLE_info ||
362 ((StgInd *)p)->indirectee != (StgClosure*)bq)
364 wakeBlockingQueue(cap,bq);
369 /* ----------------------------------------------------------------------------
372 Update a thunk with a value. In order to do this, we need to know
373 which TSO owns (or is evaluating) the thunk, in case we need to
374 awaken any threads that are blocked on it.
375 ------------------------------------------------------------------------- */
378 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
382 const StgInfoTable *i;
384 i = thunk->header.info;
385 if (i != &stg_BLACKHOLE_info &&
386 i != &stg_CAF_BLACKHOLE_info &&
387 i != &stg_WHITEHOLE_info) {
388 updateWithIndirection(cap, thunk, val);
392 v = ((StgInd*)thunk)->indirectee;
394 updateWithIndirection(cap, thunk, val);
397 if (i == &stg_TSO_info) {
398 owner = deRefTSO((StgTSO*)v);
400 checkBlockingQueues(cap, tso);
405 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
406 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
407 checkBlockingQueues(cap, tso);
411 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
414 checkBlockingQueues(cap, tso);
416 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
420 /* ---------------------------------------------------------------------------
421 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
422 * used by Control.Concurrent for error checking.
423 * ------------------------------------------------------------------------- */
426 rtsSupportsBoundThreads(void)
428 #if defined(THREADED_RTS)
431 return HS_BOOL_FALSE;
435 /* ---------------------------------------------------------------------------
436 * isThreadBound(tso): check whether tso is bound to an OS thread.
437 * ------------------------------------------------------------------------- */
440 isThreadBound(StgTSO* tso USED_IF_THREADS)
442 #if defined(THREADED_RTS)
443 return (tso->bound != NULL);
448 /* ----------------------------------------------------------------------------
449 * Debugging: why is a thread blocked
450 * ------------------------------------------------------------------------- */
454 printThreadBlockage(StgTSO *tso)
456 switch (tso->why_blocked) {
458 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
461 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
463 #if defined(mingw32_HOST_OS)
464 case BlockedOnDoProc:
465 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
469 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
472 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
474 case BlockedOnBlackHole:
475 debugBelch("is blocked on a black hole %p",
476 ((StgBlockingQueue*)tso->block_info.bh->bh));
478 case BlockedOnMsgThrowTo:
479 debugBelch("is blocked on a throwto message");
482 debugBelch("is not blocked");
484 case ThreadMigrating:
485 debugBelch("is runnable, but not on the run queue");
488 debugBelch("is blocked on an external call");
490 case BlockedOnCCall_NoUnblockExc:
491 debugBelch("is blocked on an external call (exceptions were already blocked)");
494 debugBelch("is blocked on an STM operation");
497 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
498 tso->why_blocked, tso->id, tso);
504 printThreadStatus(StgTSO *t)
506 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
508 void *label = lookupThreadLabel(t->id);
509 if (label) debugBelch("[\"%s\"] ",(char *)label);
511 if (t->what_next == ThreadRelocated) {
512 debugBelch("has been relocated...\n");
514 switch (t->what_next) {
516 debugBelch("has been killed");
519 debugBelch("has completed");
522 printThreadBlockage(t);
525 debugBelch(" (TSO_DIRTY)");
526 } else if (t->flags & TSO_LINK_DIRTY) {
527 debugBelch(" (TSO_LINK_DIRTY)");
534 printAllThreads(void)
540 debugBelch("all threads:\n");
542 for (i = 0; i < n_capabilities; i++) {
543 cap = &capabilities[i];
544 debugBelch("threads on capability %d:\n", cap->no);
545 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
546 printThreadStatus(t);
550 debugBelch("other threads:\n");
551 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
552 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
553 if (t->why_blocked != NotBlocked) {
554 printThreadStatus(t);
556 if (t->what_next == ThreadRelocated) {
559 next = t->global_link;
567 printThreadQueue(StgTSO *t)
570 for (; t != END_TSO_QUEUE; t = t->_link) {
571 printThreadStatus(t);
574 debugBelch("%d threads on queue\n", i);