1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "sm/Storage.h"
23 /* Next thread ID to allocate.
26 static StgThreadID next_thread_id = 1;
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
40 /* ---------------------------------------------------------------------------
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
54 createThread(Capability *cap, nat size)
59 /* sched_mutex is *not* required */
61 /* First check whether we should create a thread at all */
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
95 tso->sp = (P_)&(tso->stack) + stack_size;
100 tso->prof.CCCS = CCS_MAIN;
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
108 /* Link the new thread on the global thread list.
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
114 RELEASE_LOCK(&sched_mutex);
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
130 cmp_thread(StgPtr tso1, StgPtr tso2)
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
146 rts_getThreadId(StgPtr tso)
148 return ((StgTSO *)tso)->id;
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
165 setTSOLink(cap,prev,t->_link);
173 barf("removeThreadFromQueue: not found");
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
181 rtsBool flag = rtsFalse;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187 setTSOLink(cap,prev,t->_link);
197 *tail = END_TSO_QUEUE;
205 barf("removeThreadFromMVarQueue: not found");
208 /* ----------------------------------------------------------------------------
211 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
212 always safe to call it too many times, but it is not safe in
213 general to omit a call.
215 ------------------------------------------------------------------------- */
218 tryWakeupThread (Capability *cap, StgTSO *tso)
221 traceEventThreadWakeup (cap, tso, tso->cap->no);
227 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
228 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
230 sendMessage(cap, tso->cap, (Message*)msg);
231 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
232 (lnat)tso->id, tso->cap->no);
237 switch (tso->why_blocked)
241 if (tso->_link == END_TSO_QUEUE) {
242 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
249 case BlockedOnMsgThrowTo:
251 const StgInfoTable *i;
253 i = lockClosure(tso->block_info.closure);
254 unlockClosure(tso->block_info.closure, i);
255 if (i != &stg_MSG_NULL_info) {
256 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
257 (lnat)tso->id, tso->block_info.throwto->header.info);
261 // remove the block frame from the stack
262 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
267 case BlockedOnBlackHole:
269 case ThreadMigrating:
273 // otherwise, do nothing
278 // just run the thread now, if the BH is not really available,
279 // we'll block again.
280 tso->why_blocked = NotBlocked;
281 appendToRunQueue(cap,tso);
284 /* ----------------------------------------------------------------------------
286 ------------------------------------------------------------------------- */
289 migrateThread (Capability *from, StgTSO *tso, Capability *to)
291 traceEventMigrateThread (from, tso, to->no);
292 // ThreadMigrating tells the target cap that it needs to be added to
293 // the run queue when it receives the MSG_TRY_WAKEUP.
294 tso->what_next = ThreadMigrating;
296 tryWakeupThread(from, tso);
299 /* ----------------------------------------------------------------------------
302 wakes up all the threads on the specified queue.
303 ------------------------------------------------------------------------- */
306 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
308 MessageBlackHole *msg;
309 const StgInfoTable *i;
311 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
312 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
314 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
316 i = msg->header.info;
317 if (i != &stg_IND_info) {
318 ASSERT(i == &stg_MSG_BLACKHOLE_info);
319 tryWakeupThread(cap,msg->tso);
323 // overwrite the BQ with an indirection so it will be
324 // collected at the next GC.
325 #if defined(DEBUG) && !defined(THREADED_RTS)
326 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
327 // another thread might be looking at this BLOCKING_QUEUE and
328 // checking the owner field at the same time.
329 bq->bh = 0; bq->queue = 0; bq->owner = 0;
331 OVERWRITE_INFO(bq, &stg_IND_info);
334 // If we update a closure that we know we BLACKHOLE'd, and the closure
335 // no longer points to the current TSO as its owner, then there may be
336 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
337 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
338 // current TSO to see if any can now be woken up.
340 checkBlockingQueues (Capability *cap, StgTSO *tso)
342 StgBlockingQueue *bq, *next;
345 debugTraceCap(DEBUG_sched, cap,
346 "collision occurred; checking blocking queues for thread %ld",
349 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
352 if (bq->header.info == &stg_IND_info) {
353 // ToDo: could short it out right here, to avoid
354 // traversing this IND multiple times.
360 if (p->header.info != &stg_BLACKHOLE_info ||
361 ((StgInd *)p)->indirectee != (StgClosure*)bq)
363 wakeBlockingQueue(cap,bq);
368 /* ----------------------------------------------------------------------------
371 Update a thunk with a value. In order to do this, we need to know
372 which TSO owns (or is evaluating) the thunk, in case we need to
373 awaken any threads that are blocked on it.
374 ------------------------------------------------------------------------- */
377 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
381 const StgInfoTable *i;
383 i = thunk->header.info;
384 if (i != &stg_BLACKHOLE_info &&
385 i != &stg_CAF_BLACKHOLE_info &&
386 i != &stg_WHITEHOLE_info) {
387 updateWithIndirection(cap, thunk, val);
391 v = ((StgInd*)thunk)->indirectee;
393 updateWithIndirection(cap, thunk, val);
396 if (i == &stg_TSO_info) {
397 owner = deRefTSO((StgTSO*)v);
399 checkBlockingQueues(cap, tso);
404 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
405 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
406 checkBlockingQueues(cap, tso);
410 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
413 checkBlockingQueues(cap, tso);
415 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
419 /* ---------------------------------------------------------------------------
420 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
421 * used by Control.Concurrent for error checking.
422 * ------------------------------------------------------------------------- */
425 rtsSupportsBoundThreads(void)
427 #if defined(THREADED_RTS)
430 return HS_BOOL_FALSE;
434 /* ---------------------------------------------------------------------------
435 * isThreadBound(tso): check whether tso is bound to an OS thread.
436 * ------------------------------------------------------------------------- */
439 isThreadBound(StgTSO* tso USED_IF_THREADS)
441 #if defined(THREADED_RTS)
442 return (tso->bound != NULL);
447 /* ----------------------------------------------------------------------------
448 * Debugging: why is a thread blocked
449 * ------------------------------------------------------------------------- */
453 printThreadBlockage(StgTSO *tso)
455 switch (tso->why_blocked) {
457 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
460 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
462 #if defined(mingw32_HOST_OS)
463 case BlockedOnDoProc:
464 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
468 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
471 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
473 case BlockedOnBlackHole:
474 debugBelch("is blocked on a black hole %p",
475 ((StgBlockingQueue*)tso->block_info.bh->bh));
477 case BlockedOnMsgThrowTo:
478 debugBelch("is blocked on a throwto message");
481 debugBelch("is not blocked");
483 case ThreadMigrating:
484 debugBelch("is runnable, but not on the run queue");
487 debugBelch("is blocked on an external call");
489 case BlockedOnCCall_NoUnblockExc:
490 debugBelch("is blocked on an external call (exceptions were already blocked)");
493 debugBelch("is blocked on an STM operation");
496 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
497 tso->why_blocked, tso->id, tso);
503 printThreadStatus(StgTSO *t)
505 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
507 void *label = lookupThreadLabel(t->id);
508 if (label) debugBelch("[\"%s\"] ",(char *)label);
510 if (t->what_next == ThreadRelocated) {
511 debugBelch("has been relocated...\n");
513 switch (t->what_next) {
515 debugBelch("has been killed");
518 debugBelch("has completed");
521 printThreadBlockage(t);
524 debugBelch(" (TSO_DIRTY)");
525 } else if (t->flags & TSO_LINK_DIRTY) {
526 debugBelch(" (TSO_LINK_DIRTY)");
533 printAllThreads(void)
539 debugBelch("all threads:\n");
541 for (i = 0; i < n_capabilities; i++) {
542 cap = &capabilities[i];
543 debugBelch("threads on capability %d:\n", cap->no);
544 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
545 printThreadStatus(t);
549 debugBelch("other threads:\n");
550 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
551 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
552 if (t->why_blocked != NotBlocked) {
553 printThreadStatus(t);
555 if (t->what_next == ThreadRelocated) {
558 next = t->global_link;
566 printThreadQueue(StgTSO *t)
569 for (; t != END_TSO_QUEUE; t = t->_link) {
570 printThreadStatus(t);
573 debugBelch("%d threads on queue\n", i);