1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
12 #include "Capability.h"
18 #include "ThreadLabels.h"
21 #include "sm/Storage.h"
23 /* Next thread ID to allocate.
26 static StgThreadID next_thread_id = 1;
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
40 /* ---------------------------------------------------------------------------
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
54 createThread(Capability *cap, nat size)
59 /* sched_mutex is *not* required */
61 /* First check whether we should create a thread at all */
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
95 tso->sp = (P_)&(tso->stack) + stack_size;
100 tso->prof.CCCS = CCS_MAIN;
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
108 /* Link the new thread on the global thread list.
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
114 RELEASE_LOCK(&sched_mutex);
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
130 cmp_thread(StgPtr tso1, StgPtr tso2)
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
146 rts_getThreadId(StgPtr tso)
148 return ((StgTSO *)tso)->id;
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
165 setTSOLink(cap,prev,t->_link);
173 barf("removeThreadFromQueue: not found");
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
181 rtsBool flag = rtsFalse;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187 setTSOLink(cap,prev,t->_link);
197 *tail = END_TSO_QUEUE;
205 barf("removeThreadFromMVarQueue: not found");
209 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
211 // caller must do the write barrier, because replacing the info
212 // pointer will unlock the MVar.
213 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
214 tso->_link = END_TSO_QUEUE;
217 /* ----------------------------------------------------------------------------
220 unblock a single thread.
221 ------------------------------------------------------------------------- */
224 unblockOne (Capability *cap, StgTSO *tso)
226 return unblockOne_(cap,tso,rtsTrue); // allow migration
230 unblockOne_ (Capability *cap, StgTSO *tso,
231 rtsBool allow_migrate USED_IF_THREADS)
235 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
236 ASSERT(tso->why_blocked != NotBlocked);
237 ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
238 tso->block_info.closure->header.info == &stg_IND_info);
241 tso->_link = END_TSO_QUEUE;
243 #if defined(THREADED_RTS)
244 if (tso->cap == cap || (!tsoLocked(tso) &&
246 RtsFlags.ParFlags.wakeupMigrate)) {
247 // We are waking up this thread on the current Capability, which
248 // might involve migrating it from the Capability it was last on.
250 ASSERT(tso->bound->task->cap == tso->cap);
251 tso->bound->task->cap = cap;
256 tso->why_blocked = NotBlocked;
257 appendToRunQueue(cap,tso);
259 // context-switch soonish so we can migrate the new thread if
260 // necessary. NB. not contextSwitchCapability(cap), which would
261 // force a context switch immediately.
262 cap->context_switch = 1;
264 // we'll try to wake it up on the Capability it was last on.
265 wakeupThreadOnCapability(cap, tso->cap, tso);
268 tso->why_blocked = NotBlocked;
269 appendToRunQueue(cap,tso);
271 // context-switch soonish so we can migrate the new thread if
272 // necessary. NB. not contextSwitchCapability(cap), which would
273 // force a context switch immediately.
274 cap->context_switch = 1;
277 traceEventThreadWakeup (cap, tso, tso->cap->no);
283 tryWakeupThread (Capability *cap, StgTSO *tso)
289 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
290 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
292 sendMessage(cap, tso->cap, (Message*)msg);
293 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
294 (lnat)tso->id, tso->cap->no);
299 switch (tso->why_blocked)
301 case BlockedOnMsgThrowTo:
303 const StgInfoTable *i;
305 i = lockClosure(tso->block_info.closure);
306 unlockClosure(tso->block_info.closure, i);
307 if (i != &stg_MSG_NULL_info) {
308 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
309 (lnat)tso->id, tso->block_info.throwto->header.info);
310 break; // still blocked
313 // remove the block frame from the stack
314 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
318 case BlockedOnBlackHole:
321 // just run the thread now, if the BH is not really available,
322 // we'll block again.
323 tso->why_blocked = NotBlocked;
324 appendToRunQueue(cap,tso);
328 // otherwise, do nothing
333 /* ----------------------------------------------------------------------------
336 wakes up all the threads on the specified queue.
337 ------------------------------------------------------------------------- */
340 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
342 MessageBlackHole *msg;
343 const StgInfoTable *i;
345 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
346 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
348 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
350 i = msg->header.info;
351 if (i != &stg_IND_info) {
352 ASSERT(i == &stg_MSG_BLACKHOLE_info);
353 tryWakeupThread(cap,msg->tso);
357 // overwrite the BQ with an indirection so it will be
358 // collected at the next GC.
359 #if defined(DEBUG) && !defined(THREADED_RTS)
360 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
361 // another thread might be looking at this BLOCKING_QUEUE and
362 // checking the owner field at the same time.
363 bq->bh = 0; bq->queue = 0; bq->owner = 0;
365 OVERWRITE_INFO(bq, &stg_IND_info);
368 // If we update a closure that we know we BLACKHOLE'd, and the closure
369 // no longer points to the current TSO as its owner, then there may be
370 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
371 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
372 // current TSO to see if any can now be woken up.
374 checkBlockingQueues (Capability *cap, StgTSO *tso)
376 StgBlockingQueue *bq, *next;
379 debugTraceCap(DEBUG_sched, cap,
380 "collision occurred; checking blocking queues for thread %ld",
383 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
386 if (bq->header.info == &stg_IND_info) {
387 // ToDo: could short it out right here, to avoid
388 // traversing this IND multiple times.
394 if (p->header.info != &stg_BLACKHOLE_info ||
395 ((StgInd *)p)->indirectee != (StgClosure*)bq)
397 wakeBlockingQueue(cap,bq);
402 /* ----------------------------------------------------------------------------
405 Update a thunk with a value. In order to do this, we need to know
406 which TSO owns (or is evaluating) the thunk, in case we need to
407 awaken any threads that are blocked on it.
408 ------------------------------------------------------------------------- */
411 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
415 const StgInfoTable *i;
417 i = thunk->header.info;
418 if (i != &stg_BLACKHOLE_info &&
419 i != &stg_CAF_BLACKHOLE_info &&
420 i != &stg_WHITEHOLE_info) {
421 updateWithIndirection(cap, thunk, val);
425 v = ((StgInd*)thunk)->indirectee;
427 updateWithIndirection(cap, thunk, val);
430 if (i == &stg_TSO_info) {
431 owner = deRefTSO((StgTSO*)v);
433 checkBlockingQueues(cap, tso);
438 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
439 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
440 checkBlockingQueues(cap, tso);
444 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
447 checkBlockingQueues(cap, tso);
449 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
453 /* ----------------------------------------------------------------------------
454 * Wake up a thread on a Capability.
456 * This is used when the current Task is running on a Capability and
457 * wishes to wake up a thread on a different Capability.
458 * ------------------------------------------------------------------------- */
463 wakeupThreadOnCapability (Capability *cap,
464 Capability *other_cap,
469 // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
471 ASSERT(tso->bound->task->cap == tso->cap);
472 tso->bound->task->cap = other_cap;
474 tso->cap = other_cap;
476 ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
477 tso->block_info.closure->header.info == &stg_IND_info);
479 ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
481 msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
482 SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
484 tso->block_info.closure = (StgClosure *)msg;
487 tso->why_blocked = BlockedOnMsgWakeup;
489 sendMessage(cap, other_cap, (Message*)msg);
492 #endif /* THREADED_RTS */
494 /* ---------------------------------------------------------------------------
495 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
496 * used by Control.Concurrent for error checking.
497 * ------------------------------------------------------------------------- */
500 rtsSupportsBoundThreads(void)
502 #if defined(THREADED_RTS)
505 return HS_BOOL_FALSE;
509 /* ---------------------------------------------------------------------------
510 * isThreadBound(tso): check whether tso is bound to an OS thread.
511 * ------------------------------------------------------------------------- */
514 isThreadBound(StgTSO* tso USED_IF_THREADS)
516 #if defined(THREADED_RTS)
517 return (tso->bound != NULL);
522 /* ----------------------------------------------------------------------------
523 * Debugging: why is a thread blocked
524 * ------------------------------------------------------------------------- */
528 printThreadBlockage(StgTSO *tso)
530 switch (tso->why_blocked) {
532 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
535 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
537 #if defined(mingw32_HOST_OS)
538 case BlockedOnDoProc:
539 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
543 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
546 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
548 case BlockedOnBlackHole:
549 debugBelch("is blocked on a black hole %p",
550 ((StgBlockingQueue*)tso->block_info.bh->bh));
552 case BlockedOnMsgWakeup:
553 debugBelch("is blocked on a wakeup message");
555 case BlockedOnMsgThrowTo:
556 debugBelch("is blocked on a throwto message");
559 debugBelch("is not blocked");
562 debugBelch("is blocked on an external call");
564 case BlockedOnCCall_NoUnblockExc:
565 debugBelch("is blocked on an external call (exceptions were already blocked)");
568 debugBelch("is blocked on an STM operation");
571 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
572 tso->why_blocked, tso->id, tso);
578 printThreadStatus(StgTSO *t)
580 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
582 void *label = lookupThreadLabel(t->id);
583 if (label) debugBelch("[\"%s\"] ",(char *)label);
585 if (t->what_next == ThreadRelocated) {
586 debugBelch("has been relocated...\n");
588 switch (t->what_next) {
590 debugBelch("has been killed");
593 debugBelch("has completed");
596 printThreadBlockage(t);
599 debugBelch(" (TSO_DIRTY)");
600 } else if (t->flags & TSO_LINK_DIRTY) {
601 debugBelch(" (TSO_LINK_DIRTY)");
608 printAllThreads(void)
614 debugBelch("all threads:\n");
616 for (i = 0; i < n_capabilities; i++) {
617 cap = &capabilities[i];
618 debugBelch("threads on capability %d:\n", cap->no);
619 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
620 printThreadStatus(t);
624 debugBelch("other threads:\n");
625 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
626 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
627 if (t->why_blocked != NotBlocked) {
628 printThreadStatus(t);
630 if (t->what_next == ThreadRelocated) {
633 next = t->global_link;
641 printThreadQueue(StgTSO *t)
644 for (; t != END_TSO_QUEUE; t = t->_link) {
645 printThreadStatus(t);
648 debugBelch("%d threads on queue\n", i);