1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2006
5 * Thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
16 #include "ThreadLabels.h"
18 /* Next thread ID to allocate.
21 static StgThreadID next_thread_id = 1;
23 /* The smallest stack size that makes any sense is:
24 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
25 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
26 * + 1 (the closure to enter)
28 * + 1 (spare slot req'd by stg_ap_v_ret)
30 * A thread with this stack will bomb immediately with a stack
31 * overflow, which will increase its stack size.
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
35 /* ---------------------------------------------------------------------------
38 The new thread starts with the given stack size. Before the
39 scheduler can run, however, this thread needs to have a closure
40 (and possibly some arguments) pushed on its stack. See
41 pushClosure() in Schedule.h.
43 createGenThread() and createIOThread() (in SchedAPI.h) are
44 convenient packaged versions of this function.
46 currently pri (priority) is only used in a GRAN setup -- HWL
47 ------------------------------------------------------------------------ */
49 createThread(Capability *cap, nat size)
54 /* sched_mutex is *not* required */
56 /* First check whether we should create a thread at all */
58 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
60 /* catch ridiculously small stack sizes */
61 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
65 size = round_to_mblocks(size);
66 tso = (StgTSO *)allocateLocal(cap, size);
68 stack_size = size - TSO_STRUCT_SIZEW;
69 TICK_ALLOC_TSO(stack_size, 0);
71 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
73 // Always start with the compiled code evaluator
74 tso->what_next = ThreadRunGHC;
76 tso->why_blocked = NotBlocked;
77 tso->blocked_exceptions = END_TSO_QUEUE;
78 tso->flags = TSO_DIRTY;
84 tso->stack_size = stack_size;
85 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
87 tso->sp = (P_)&(tso->stack) + stack_size;
92 tso->prof.CCCS = CCS_MAIN;
95 /* put a stop frame on the stack */
96 tso->sp -= sizeofW(StgStopFrame);
97 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
98 tso->_link = END_TSO_QUEUE;
100 /* Link the new thread on the global thread list.
102 ACQUIRE_LOCK(&sched_mutex);
103 tso->id = next_thread_id++; // while we have the mutex
104 tso->global_link = g0s0->threads;
106 RELEASE_LOCK(&sched_mutex);
108 postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
110 debugTrace(DEBUG_sched,
111 "created thread %ld, stack size = %lx words",
112 (long)tso->id, (long)tso->stack_size);
116 /* ---------------------------------------------------------------------------
117 * Comparing Thread ids.
119 * This is used from STG land in the implementation of the
120 * instances of Eq/Ord for ThreadIds.
121 * ------------------------------------------------------------------------ */
124 cmp_thread(StgPtr tso1, StgPtr tso2)
126 StgThreadID id1 = ((StgTSO *)tso1)->id;
127 StgThreadID id2 = ((StgTSO *)tso2)->id;
129 if (id1 < id2) return (-1);
130 if (id1 > id2) return 1;
134 /* ---------------------------------------------------------------------------
135 * Fetching the ThreadID from an StgTSO.
137 * This is used in the implementation of Show for ThreadIds.
138 * ------------------------------------------------------------------------ */
140 rts_getThreadId(StgPtr tso)
142 return ((StgTSO *)tso)->id;
145 /* -----------------------------------------------------------------------------
146 Remove a thread from a queue.
147 Fails fatally if the TSO is not on the queue.
148 -------------------------------------------------------------------------- */
151 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
156 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
159 setTSOLink(cap,prev,t->_link);
166 barf("removeThreadFromQueue: not found");
170 removeThreadFromDeQueue (Capability *cap,
171 StgTSO **head, StgTSO **tail, StgTSO *tso)
176 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
179 setTSOLink(cap,prev,t->_link);
187 *tail = END_TSO_QUEUE;
193 barf("removeThreadFromMVarQueue: not found");
197 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
199 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
202 /* ----------------------------------------------------------------------------
205 unblock a single thread.
206 ------------------------------------------------------------------------- */
209 unblockOne (Capability *cap, StgTSO *tso)
211 return unblockOne_(cap,tso,rtsTrue); // allow migration
215 unblockOne_ (Capability *cap, StgTSO *tso,
216 rtsBool allow_migrate USED_IF_THREADS)
220 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
221 ASSERT(tso->why_blocked != NotBlocked);
223 tso->why_blocked = NotBlocked;
225 tso->_link = END_TSO_QUEUE;
227 #if defined(THREADED_RTS)
228 if (tso->cap == cap || (!tsoLocked(tso) &&
230 RtsFlags.ParFlags.wakeupMigrate)) {
231 // We are waking up this thread on the current Capability, which
232 // might involve migrating it from the Capability it was last on.
234 ASSERT(tso->bound->cap == tso->cap);
235 tso->bound->cap = cap;
239 appendToRunQueue(cap,tso);
241 // context-switch soonish so we can migrate the new thread if
242 // necessary. NB. not contextSwitchCapability(cap), which would
243 // force a context switch immediately.
244 cap->context_switch = 1;
246 // we'll try to wake it up on the Capability it was last on.
247 wakeupThreadOnCapability(cap, tso->cap, tso);
250 appendToRunQueue(cap,tso);
252 // context-switch soonish so we can migrate the new thread if
253 // necessary. NB. not contextSwitchCapability(cap), which would
254 // force a context switch immediately.
255 cap->context_switch = 1;
258 postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
260 debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
261 (long)tso->id, tso->cap->no);
266 /* ----------------------------------------------------------------------------
269 wakes up all the threads on the specified queue.
270 ------------------------------------------------------------------------- */
273 awakenBlockedQueue(Capability *cap, StgTSO *tso)
275 while (tso != END_TSO_QUEUE) {
276 tso = unblockOne(cap,tso);
280 /* ---------------------------------------------------------------------------
281 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
282 * used by Control.Concurrent for error checking.
283 * ------------------------------------------------------------------------- */
286 rtsSupportsBoundThreads(void)
288 #if defined(THREADED_RTS)
291 return HS_BOOL_FALSE;
295 /* ---------------------------------------------------------------------------
296 * isThreadBound(tso): check whether tso is bound to an OS thread.
297 * ------------------------------------------------------------------------- */
300 isThreadBound(StgTSO* tso USED_IF_THREADS)
302 #if defined(THREADED_RTS)
303 return (tso->bound != NULL);
308 /* ----------------------------------------------------------------------------
309 * Debugging: why is a thread blocked
310 * ------------------------------------------------------------------------- */
314 printThreadBlockage(StgTSO *tso)
316 switch (tso->why_blocked) {
318 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
321 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
323 #if defined(mingw32_HOST_OS)
324 case BlockedOnDoProc:
325 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
329 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
332 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
334 case BlockedOnException:
335 debugBelch("is blocked on delivering an exception to thread %lu",
336 (unsigned long)tso->block_info.tso->id);
338 case BlockedOnBlackHole:
339 debugBelch("is blocked on a black hole");
342 debugBelch("is not blocked");
345 debugBelch("is blocked on an external call");
347 case BlockedOnCCall_NoUnblockExc:
348 debugBelch("is blocked on an external call (exceptions were already blocked)");
351 debugBelch("is blocked on an STM operation");
354 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
355 tso->why_blocked, tso->id, tso);
360 printThreadStatus(StgTSO *t)
362 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
364 void *label = lookupThreadLabel(t->id);
365 if (label) debugBelch("[\"%s\"] ",(char *)label);
367 if (t->what_next == ThreadRelocated) {
368 debugBelch("has been relocated...\n");
370 switch (t->what_next) {
372 debugBelch("has been killed");
375 debugBelch("has completed");
378 printThreadBlockage(t);
380 if (t->flags & TSO_DIRTY) {
381 debugBelch(" (TSO_DIRTY)");
382 } else if (t->flags & TSO_LINK_DIRTY) {
383 debugBelch(" (TSO_LINK_DIRTY)");
390 printAllThreads(void)
397 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
398 ullong_format_string(TIME_ON_PROC(CurrentProc),
399 time_string, rtsFalse/*no commas!*/);
401 debugBelch("all threads at [%s]:\n", time_string);
402 # elif defined(PARALLEL_HASKELL)
403 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
404 ullong_format_string(CURRENT_TIME,
405 time_string, rtsFalse/*no commas!*/);
407 debugBelch("all threads at [%s]:\n", time_string);
409 debugBelch("all threads:\n");
412 for (i = 0; i < n_capabilities; i++) {
413 cap = &capabilities[i];
414 debugBelch("threads on capability %d:\n", cap->no);
415 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
416 printThreadStatus(t);
420 debugBelch("other threads:\n");
421 for (s = 0; s < total_steps; s++) {
422 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
423 if (t->why_blocked != NotBlocked) {
424 printThreadStatus(t);
426 if (t->what_next == ThreadRelocated) {
429 next = t->global_link;
437 printThreadQueue(StgTSO *t)
440 for (; t != END_TSO_QUEUE; t = t->_link) {
441 printThreadStatus(t);
444 debugBelch("%d threads on queue\n", i);