1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
18 #include "ThreadLabels.h"
20 /* Next thread ID to allocate.
23 static StgThreadID next_thread_id = 1;
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
37 /* ---------------------------------------------------------------------------
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
51 createThread(Capability *cap, nat size)
56 /* sched_mutex is *not* required */
58 /* First check whether we should create a thread at all */
60 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
62 /* catch ridiculously small stack sizes */
63 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
64 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
67 size = round_to_mblocks(size);
68 tso = (StgTSO *)allocateLocal(cap, size);
70 stack_size = size - TSO_STRUCT_SIZEW;
71 TICK_ALLOC_TSO(stack_size, 0);
73 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
75 // Always start with the compiled code evaluator
76 tso->what_next = ThreadRunGHC;
78 tso->why_blocked = NotBlocked;
79 tso->blocked_exceptions = END_TSO_QUEUE;
80 tso->flags = TSO_DIRTY;
86 tso->stack_size = stack_size;
87 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
89 tso->sp = (P_)&(tso->stack) + stack_size;
94 tso->prof.CCCS = CCS_MAIN;
97 /* put a stop frame on the stack */
98 tso->sp -= sizeofW(StgStopFrame);
99 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
100 tso->_link = END_TSO_QUEUE;
102 /* Link the new thread on the global thread list.
104 ACQUIRE_LOCK(&sched_mutex);
105 tso->id = next_thread_id++; // while we have the mutex
106 tso->global_link = g0s0->threads;
108 RELEASE_LOCK(&sched_mutex);
110 postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
112 debugTrace(DEBUG_sched,
113 "created thread %ld, stack size = %lx words",
114 (long)tso->id, (long)tso->stack_size);
118 /* ---------------------------------------------------------------------------
119 * Comparing Thread ids.
121 * This is used from STG land in the implementation of the
122 * instances of Eq/Ord for ThreadIds.
123 * ------------------------------------------------------------------------ */
126 cmp_thread(StgPtr tso1, StgPtr tso2)
128 StgThreadID id1 = ((StgTSO *)tso1)->id;
129 StgThreadID id2 = ((StgTSO *)tso2)->id;
131 if (id1 < id2) return (-1);
132 if (id1 > id2) return 1;
136 /* ---------------------------------------------------------------------------
137 * Fetching the ThreadID from an StgTSO.
139 * This is used in the implementation of Show for ThreadIds.
140 * ------------------------------------------------------------------------ */
142 rts_getThreadId(StgPtr tso)
144 return ((StgTSO *)tso)->id;
147 /* -----------------------------------------------------------------------------
148 Remove a thread from a queue.
149 Fails fatally if the TSO is not on the queue.
150 -------------------------------------------------------------------------- */
153 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
161 setTSOLink(cap,prev,t->_link);
168 barf("removeThreadFromQueue: not found");
172 removeThreadFromDeQueue (Capability *cap,
173 StgTSO **head, StgTSO **tail, StgTSO *tso)
178 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
181 setTSOLink(cap,prev,t->_link);
189 *tail = END_TSO_QUEUE;
195 barf("removeThreadFromMVarQueue: not found");
199 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
201 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
204 /* ----------------------------------------------------------------------------
207 unblock a single thread.
208 ------------------------------------------------------------------------- */
211 unblockOne (Capability *cap, StgTSO *tso)
213 return unblockOne_(cap,tso,rtsTrue); // allow migration
217 unblockOne_ (Capability *cap, StgTSO *tso,
218 rtsBool allow_migrate USED_IF_THREADS)
222 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
223 ASSERT(tso->why_blocked != NotBlocked);
225 tso->why_blocked = NotBlocked;
227 tso->_link = END_TSO_QUEUE;
229 #if defined(THREADED_RTS)
230 if (tso->cap == cap || (!tsoLocked(tso) &&
232 RtsFlags.ParFlags.wakeupMigrate)) {
233 // We are waking up this thread on the current Capability, which
234 // might involve migrating it from the Capability it was last on.
236 ASSERT(tso->bound->cap == tso->cap);
237 tso->bound->cap = cap;
241 appendToRunQueue(cap,tso);
243 // context-switch soonish so we can migrate the new thread if
244 // necessary. NB. not contextSwitchCapability(cap), which would
245 // force a context switch immediately.
246 cap->context_switch = 1;
248 // we'll try to wake it up on the Capability it was last on.
249 wakeupThreadOnCapability(cap, tso->cap, tso);
252 appendToRunQueue(cap,tso);
254 // context-switch soonish so we can migrate the new thread if
255 // necessary. NB. not contextSwitchCapability(cap), which would
256 // force a context switch immediately.
257 cap->context_switch = 1;
260 postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
262 debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
263 (long)tso->id, tso->cap->no);
268 /* ----------------------------------------------------------------------------
271 wakes up all the threads on the specified queue.
272 ------------------------------------------------------------------------- */
275 awakenBlockedQueue(Capability *cap, StgTSO *tso)
277 while (tso != END_TSO_QUEUE) {
278 tso = unblockOne(cap,tso);
282 /* ---------------------------------------------------------------------------
283 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
284 * used by Control.Concurrent for error checking.
285 * ------------------------------------------------------------------------- */
288 rtsSupportsBoundThreads(void)
290 #if defined(THREADED_RTS)
293 return HS_BOOL_FALSE;
297 /* ---------------------------------------------------------------------------
298 * isThreadBound(tso): check whether tso is bound to an OS thread.
299 * ------------------------------------------------------------------------- */
302 isThreadBound(StgTSO* tso USED_IF_THREADS)
304 #if defined(THREADED_RTS)
305 return (tso->bound != NULL);
310 /* ----------------------------------------------------------------------------
311 * Debugging: why is a thread blocked
312 * ------------------------------------------------------------------------- */
316 printThreadBlockage(StgTSO *tso)
318 switch (tso->why_blocked) {
320 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
323 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
325 #if defined(mingw32_HOST_OS)
326 case BlockedOnDoProc:
327 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
331 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
334 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
336 case BlockedOnException:
337 debugBelch("is blocked on delivering an exception to thread %lu",
338 (unsigned long)tso->block_info.tso->id);
340 case BlockedOnBlackHole:
341 debugBelch("is blocked on a black hole");
344 debugBelch("is not blocked");
347 debugBelch("is blocked on an external call");
349 case BlockedOnCCall_NoUnblockExc:
350 debugBelch("is blocked on an external call (exceptions were already blocked)");
353 debugBelch("is blocked on an STM operation");
356 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
357 tso->why_blocked, tso->id, tso);
362 printThreadStatus(StgTSO *t)
364 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
366 void *label = lookupThreadLabel(t->id);
367 if (label) debugBelch("[\"%s\"] ",(char *)label);
369 if (t->what_next == ThreadRelocated) {
370 debugBelch("has been relocated...\n");
372 switch (t->what_next) {
374 debugBelch("has been killed");
377 debugBelch("has completed");
380 printThreadBlockage(t);
382 if (t->flags & TSO_DIRTY) {
383 debugBelch(" (TSO_DIRTY)");
384 } else if (t->flags & TSO_LINK_DIRTY) {
385 debugBelch(" (TSO_LINK_DIRTY)");
392 printAllThreads(void)
399 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
400 ullong_format_string(TIME_ON_PROC(CurrentProc),
401 time_string, rtsFalse/*no commas!*/);
403 debugBelch("all threads at [%s]:\n", time_string);
404 # elif defined(PARALLEL_HASKELL)
405 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
406 ullong_format_string(CURRENT_TIME,
407 time_string, rtsFalse/*no commas!*/);
409 debugBelch("all threads at [%s]:\n", time_string);
411 debugBelch("all threads:\n");
414 for (i = 0; i < n_capabilities; i++) {
415 cap = &capabilities[i];
416 debugBelch("threads on capability %d:\n", cap->no);
417 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
418 printThreadStatus(t);
422 debugBelch("other threads:\n");
423 for (s = 0; s < total_steps; s++) {
424 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
425 if (t->why_blocked != NotBlocked) {
426 printThreadStatus(t);
428 if (t->what_next == ThreadRelocated) {
431 next = t->global_link;
439 printThreadQueue(StgTSO *t)
442 for (; t != END_TSO_QUEUE; t = t->_link) {
443 printThreadStatus(t);
446 debugBelch("%d threads on queue\n", i);