1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "sm/Storage.h"
23 /* Next thread ID to allocate.
26 static StgThreadID next_thread_id = 1;
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
40 /* ---------------------------------------------------------------------------
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
54 createThread(Capability *cap, nat size)
59 /* sched_mutex is *not* required */
61 /* First check whether we should create a thread at all */
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
95 tso->sp = (P_)&(tso->stack) + stack_size;
100 tso->prof.CCCS = CCS_MAIN;
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
108 /* Link the new thread on the global thread list.
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
114 RELEASE_LOCK(&sched_mutex);
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
130 cmp_thread(StgPtr tso1, StgPtr tso2)
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
146 rts_getThreadId(StgPtr tso)
148 return ((StgTSO *)tso)->id;
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
165 setTSOLink(cap,prev,t->_link);
173 barf("removeThreadFromQueue: not found");
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
181 rtsBool flag = rtsFalse;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187 setTSOLink(cap,prev,t->_link);
197 *tail = END_TSO_QUEUE;
205 barf("removeThreadFromMVarQueue: not found");
209 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
211 // caller must do the write barrier, because replacing the info
212 // pointer will unlock the MVar.
213 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
214 tso->_link = END_TSO_QUEUE;
217 /* ----------------------------------------------------------------------------
220 unblock a single thread.
221 ------------------------------------------------------------------------- */
224 unblockOne (Capability *cap, StgTSO *tso)
226 return unblockOne_(cap,tso,rtsTrue); // allow migration
230 unblockOne_ (Capability *cap, StgTSO *tso,
231 rtsBool allow_migrate USED_IF_THREADS)
235 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
236 ASSERT(tso->why_blocked != NotBlocked);
237 ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
238 tso->block_info.closure->header.info == &stg_IND_info);
241 tso->_link = END_TSO_QUEUE;
243 #if defined(THREADED_RTS)
244 if (tso->cap == cap || (!tsoLocked(tso) &&
246 RtsFlags.ParFlags.wakeupMigrate)) {
247 // We are waking up this thread on the current Capability, which
248 // might involve migrating it from the Capability it was last on.
250 ASSERT(tso->bound->task->cap == tso->cap);
251 tso->bound->task->cap = cap;
256 tso->why_blocked = NotBlocked;
257 appendToRunQueue(cap,tso);
259 // context-switch soonish so we can migrate the new thread if
260 // necessary. NB. not contextSwitchCapability(cap), which would
261 // force a context switch immediately.
262 cap->context_switch = 1;
264 // we'll try to wake it up on the Capability it was last on.
265 wakeupThreadOnCapability(cap, tso->cap, tso);
268 tso->why_blocked = NotBlocked;
269 appendToRunQueue(cap,tso);
271 // context-switch soonish so we can migrate the new thread if
272 // necessary. NB. not contextSwitchCapability(cap), which would
273 // force a context switch immediately.
274 cap->context_switch = 1;
277 traceEventThreadWakeup (cap, tso, tso->cap->no);
283 tryWakeupThread (Capability *cap, StgTSO *tso)
289 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
290 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
292 sendMessage(cap, tso->cap, (Message*)msg);
297 switch (tso->why_blocked)
299 case BlockedOnBlackHole:
302 // just run the thread now, if the BH is not really available,
303 // we'll block again.
304 tso->why_blocked = NotBlocked;
305 appendToRunQueue(cap,tso);
309 // otherwise, do nothing
314 /* ----------------------------------------------------------------------------
317 wakes up all the threads on the specified queue.
318 ------------------------------------------------------------------------- */
321 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
323 MessageBlackHole *msg;
324 const StgInfoTable *i;
326 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
327 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
329 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
331 i = msg->header.info;
332 if (i != &stg_IND_info) {
333 ASSERT(i == &stg_MSG_BLACKHOLE_info);
334 tryWakeupThread(cap,msg->tso);
338 // overwrite the BQ with an indirection so it will be
339 // collected at the next GC.
340 #if defined(DEBUG) && !defined(THREADED_RTS)
341 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
342 // another thread might be looking at this BLOCKING_QUEUE and
343 // checking the owner field at the same time.
344 bq->bh = 0; bq->queue = 0; bq->owner = 0;
346 OVERWRITE_INFO(bq, &stg_IND_info);
349 // If we update a closure that we know we BLACKHOLE'd, and the closure
350 // no longer points to the current TSO as its owner, then there may be
351 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
352 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
353 // current TSO to see if any can now be woken up.
355 checkBlockingQueues (Capability *cap, StgTSO *tso)
357 StgBlockingQueue *bq, *next;
360 debugTraceCap(DEBUG_sched, cap,
361 "collision occurred; checking blocking queues for thread %ld",
364 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
367 if (bq->header.info == &stg_IND_info) {
368 // ToDo: could short it out right here, to avoid
369 // traversing this IND multiple times.
375 if (p->header.info != &stg_BLACKHOLE_info ||
376 ((StgInd *)p)->indirectee != (StgClosure*)bq)
378 wakeBlockingQueue(cap,bq);
383 /* ----------------------------------------------------------------------------
386 Update a thunk with a value. In order to do this, we need to know
387 which TSO owns (or is evaluating) the thunk, in case we need to
388 awaken any threads that are blocked on it.
389 ------------------------------------------------------------------------- */
392 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
396 const StgInfoTable *i;
398 i = thunk->header.info;
399 if (i != &stg_BLACKHOLE_info &&
400 i != &stg_CAF_BLACKHOLE_info &&
401 i != &stg_WHITEHOLE_info) {
402 updateWithIndirection(cap, thunk, val);
406 v = ((StgInd*)thunk)->indirectee;
408 updateWithIndirection(cap, thunk, val);
411 if (i == &stg_TSO_info) {
412 owner = deRefTSO((StgTSO*)v);
414 checkBlockingQueues(cap, tso);
419 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
420 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
421 checkBlockingQueues(cap, tso);
425 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
428 checkBlockingQueues(cap, tso);
430 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
434 /* ----------------------------------------------------------------------------
435 * Wake up a thread on a Capability.
437 * This is used when the current Task is running on a Capability and
438 * wishes to wake up a thread on a different Capability.
439 * ------------------------------------------------------------------------- */
444 wakeupThreadOnCapability (Capability *cap,
445 Capability *other_cap,
450 // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
452 ASSERT(tso->bound->task->cap == tso->cap);
453 tso->bound->task->cap = other_cap;
455 tso->cap = other_cap;
457 ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
458 tso->block_info.closure->header.info == &stg_IND_info);
460 ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
462 msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
463 SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
465 tso->block_info.closure = (StgClosure *)msg;
468 tso->why_blocked = BlockedOnMsgWakeup;
470 sendMessage(cap, other_cap, (Message*)msg);
473 #endif /* THREADED_RTS */
475 /* ---------------------------------------------------------------------------
476 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
477 * used by Control.Concurrent for error checking.
478 * ------------------------------------------------------------------------- */
481 rtsSupportsBoundThreads(void)
483 #if defined(THREADED_RTS)
486 return HS_BOOL_FALSE;
490 /* ---------------------------------------------------------------------------
491 * isThreadBound(tso): check whether tso is bound to an OS thread.
492 * ------------------------------------------------------------------------- */
495 isThreadBound(StgTSO* tso USED_IF_THREADS)
497 #if defined(THREADED_RTS)
498 return (tso->bound != NULL);
503 /* ----------------------------------------------------------------------------
504 * Debugging: why is a thread blocked
505 * ------------------------------------------------------------------------- */
509 printThreadBlockage(StgTSO *tso)
511 switch (tso->why_blocked) {
513 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
516 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
518 #if defined(mingw32_HOST_OS)
519 case BlockedOnDoProc:
520 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
524 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
527 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
529 case BlockedOnBlackHole:
530 debugBelch("is blocked on a black hole %p",
531 ((StgBlockingQueue*)tso->block_info.bh->bh));
533 case BlockedOnMsgWakeup:
534 debugBelch("is blocked on a wakeup message");
536 case BlockedOnMsgThrowTo:
537 debugBelch("is blocked on a throwto message");
540 debugBelch("is not blocked");
543 debugBelch("is blocked on an external call");
545 case BlockedOnCCall_NoUnblockExc:
546 debugBelch("is blocked on an external call (exceptions were already blocked)");
549 debugBelch("is blocked on an STM operation");
552 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
553 tso->why_blocked, tso->id, tso);
559 printThreadStatus(StgTSO *t)
561 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
563 void *label = lookupThreadLabel(t->id);
564 if (label) debugBelch("[\"%s\"] ",(char *)label);
566 if (t->what_next == ThreadRelocated) {
567 debugBelch("has been relocated...\n");
569 switch (t->what_next) {
571 debugBelch("has been killed");
574 debugBelch("has completed");
577 printThreadBlockage(t);
580 debugBelch(" (TSO_DIRTY)");
581 } else if (t->flags & TSO_LINK_DIRTY) {
582 debugBelch(" (TSO_LINK_DIRTY)");
589 printAllThreads(void)
595 debugBelch("all threads:\n");
597 for (i = 0; i < n_capabilities; i++) {
598 cap = &capabilities[i];
599 debugBelch("threads on capability %d:\n", cap->no);
600 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
601 printThreadStatus(t);
605 debugBelch("other threads:\n");
606 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
607 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
608 if (t->why_blocked != NotBlocked) {
609 printThreadStatus(t);
611 if (t->what_next == ThreadRelocated) {
614 next = t->global_link;
622 printThreadQueue(StgTSO *t)
625 for (; t != END_TSO_QUEUE; t = t->_link) {
626 printThreadStatus(t);
629 debugBelch("%d threads on queue\n", i);