Unify event logging and debug tracing.
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Threads.h"
13 #include "STM.h"
14 #include "Schedule.h"
15 #include "Trace.h"
16 #include "ThreadLabels.h"
17
18 /* Next thread ID to allocate.
19  * LOCK: sched_mutex
20  */
21 static StgThreadID next_thread_id = 1;
22
23 /* The smallest stack size that makes any sense is:
24  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
25  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
26  *  + 1                       (the closure to enter)
27  *  + 1                       (stg_ap_v_ret)
28  *  + 1                       (spare slot req'd by stg_ap_v_ret)
29  *
30  * A thread with this stack will bomb immediately with a stack
31  * overflow, which will increase its stack size.  
32  */
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
34
35 /* ---------------------------------------------------------------------------
36    Create a new thread.
37
38    The new thread starts with the given stack size.  Before the
39    scheduler can run, however, this thread needs to have a closure
40    (and possibly some arguments) pushed on its stack.  See
41    pushClosure() in Schedule.h.
42
43    createGenThread() and createIOThread() (in SchedAPI.h) are
44    convenient packaged versions of this function.
45
46    currently pri (priority) is only used in a GRAN setup -- HWL
47    ------------------------------------------------------------------------ */
48 StgTSO *
49 createThread(Capability *cap, nat size)
50 {
51     StgTSO *tso;
52     nat stack_size;
53
54     /* sched_mutex is *not* required */
55
56     /* First check whether we should create a thread at all */
57
58     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
59
60     /* catch ridiculously small stack sizes */
61     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
63     }
64
65     size = round_to_mblocks(size);
66     tso = (StgTSO *)allocateLocal(cap, size);
67
68     stack_size = size - TSO_STRUCT_SIZEW;
69     TICK_ALLOC_TSO(stack_size, 0);
70
71     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
72
73     // Always start with the compiled code evaluator
74     tso->what_next = ThreadRunGHC;
75
76     tso->why_blocked  = NotBlocked;
77     tso->blocked_exceptions = END_TSO_QUEUE;
78     tso->flags = 0;
79     tso->dirty = 1;
80     
81     tso->saved_errno = 0;
82     tso->bound = NULL;
83     tso->cap = cap;
84     
85     tso->stack_size     = stack_size;
86     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
87                           - TSO_STRUCT_SIZEW;
88     tso->sp             = (P_)&(tso->stack) + stack_size;
89
90     tso->trec = NO_TREC;
91     
92 #ifdef PROFILING
93     tso->prof.CCCS = CCS_MAIN;
94 #endif
95     
96   /* put a stop frame on the stack */
97     tso->sp -= sizeofW(StgStopFrame);
98     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
99     tso->_link = END_TSO_QUEUE;
100     
101     /* Link the new thread on the global thread list.
102      */
103     ACQUIRE_LOCK(&sched_mutex);
104     tso->id = next_thread_id++;  // while we have the mutex
105     tso->global_link = g0s0->threads;
106     g0s0->threads = tso;
107     RELEASE_LOCK(&sched_mutex);
108     
109     // ToDo: report the stack size in the event?
110     traceSchedEvent (cap, EVENT_CREATE_THREAD, tso, tso->stack_size);
111
112     return tso;
113 }
114
115 /* ---------------------------------------------------------------------------
116  * Comparing Thread ids.
117  *
118  * This is used from STG land in the implementation of the
119  * instances of Eq/Ord for ThreadIds.
120  * ------------------------------------------------------------------------ */
121
122 int
123 cmp_thread(StgPtr tso1, StgPtr tso2) 
124
125   StgThreadID id1 = ((StgTSO *)tso1)->id; 
126   StgThreadID id2 = ((StgTSO *)tso2)->id;
127  
128   if (id1 < id2) return (-1);
129   if (id1 > id2) return 1;
130   return 0;
131 }
132
133 /* ---------------------------------------------------------------------------
134  * Fetching the ThreadID from an StgTSO.
135  *
136  * This is used in the implementation of Show for ThreadIds.
137  * ------------------------------------------------------------------------ */
138 int
139 rts_getThreadId(StgPtr tso) 
140 {
141   return ((StgTSO *)tso)->id;
142 }
143
144 /* -----------------------------------------------------------------------------
145    Remove a thread from a queue.
146    Fails fatally if the TSO is not on the queue.
147    -------------------------------------------------------------------------- */
148
149 void
150 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
151 {
152     StgTSO *t, *prev;
153
154     prev = NULL;
155     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
156         if (t == tso) {
157             if (prev) {
158                 setTSOLink(cap,prev,t->_link);
159             } else {
160                 *queue = t->_link;
161             }
162             return;
163         }
164     }
165     barf("removeThreadFromQueue: not found");
166 }
167
168 void
169 removeThreadFromDeQueue (Capability *cap, 
170                          StgTSO **head, StgTSO **tail, StgTSO *tso)
171 {
172     StgTSO *t, *prev;
173
174     prev = NULL;
175     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
176         if (t == tso) {
177             if (prev) {
178                 setTSOLink(cap,prev,t->_link);
179             } else {
180                 *head = t->_link;
181             }
182             if (*tail == tso) {
183                 if (prev) {
184                     *tail = prev;
185                 } else {
186                     *tail = END_TSO_QUEUE;
187                 }
188             }
189             return;
190         }
191     }
192     barf("removeThreadFromMVarQueue: not found");
193 }
194
195 void
196 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
197 {
198     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
199 }
200
201 /* ----------------------------------------------------------------------------
202    unblockOne()
203
204    unblock a single thread.
205    ------------------------------------------------------------------------- */
206
207 StgTSO *
208 unblockOne (Capability *cap, StgTSO *tso)
209 {
210     return unblockOne_(cap,tso,rtsTrue); // allow migration
211 }
212
213 StgTSO *
214 unblockOne_ (Capability *cap, StgTSO *tso, 
215              rtsBool allow_migrate USED_IF_THREADS)
216 {
217   StgTSO *next;
218
219   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
220   ASSERT(tso->why_blocked != NotBlocked);
221
222   tso->why_blocked = NotBlocked;
223   next = tso->_link;
224   tso->_link = END_TSO_QUEUE;
225
226 #if defined(THREADED_RTS)
227   if (tso->cap == cap || (!tsoLocked(tso) && 
228                           allow_migrate && 
229                           RtsFlags.ParFlags.wakeupMigrate)) {
230       // We are waking up this thread on the current Capability, which
231       // might involve migrating it from the Capability it was last on.
232       if (tso->bound) {
233           ASSERT(tso->bound->cap == tso->cap);
234           tso->bound->cap = cap;
235       }
236
237       tso->cap = cap;
238       appendToRunQueue(cap,tso);
239
240       // context-switch soonish so we can migrate the new thread if
241       // necessary.  NB. not contextSwitchCapability(cap), which would
242       // force a context switch immediately.
243       cap->context_switch = 1;
244   } else {
245       // we'll try to wake it up on the Capability it was last on.
246       wakeupThreadOnCapability(cap, tso->cap, tso);
247   }
248 #else
249   appendToRunQueue(cap,tso);
250
251   // context-switch soonish so we can migrate the new thread if
252   // necessary.  NB. not contextSwitchCapability(cap), which would
253   // force a context switch immediately.
254   cap->context_switch = 1;
255 #endif
256
257   traceSchedEvent (cap, EVENT_THREAD_WAKEUP, tso, tso->cap->no);
258
259   return next;
260 }
261
262 /* ----------------------------------------------------------------------------
263    awakenBlockedQueue
264
265    wakes up all the threads on the specified queue.
266    ------------------------------------------------------------------------- */
267
268 void
269 awakenBlockedQueue(Capability *cap, StgTSO *tso)
270 {
271     while (tso != END_TSO_QUEUE) {
272         tso = unblockOne(cap,tso);
273     }
274 }
275
276 /* ---------------------------------------------------------------------------
277  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
278  * used by Control.Concurrent for error checking.
279  * ------------------------------------------------------------------------- */
280  
281 HsBool
282 rtsSupportsBoundThreads(void)
283 {
284 #if defined(THREADED_RTS)
285   return HS_BOOL_TRUE;
286 #else
287   return HS_BOOL_FALSE;
288 #endif
289 }
290
291 /* ---------------------------------------------------------------------------
292  * isThreadBound(tso): check whether tso is bound to an OS thread.
293  * ------------------------------------------------------------------------- */
294  
295 StgBool
296 isThreadBound(StgTSO* tso USED_IF_THREADS)
297 {
298 #if defined(THREADED_RTS)
299   return (tso->bound != NULL);
300 #endif
301   return rtsFalse;
302 }
303
304 /* ----------------------------------------------------------------------------
305  * Debugging: why is a thread blocked
306  * ------------------------------------------------------------------------- */
307
308 #if DEBUG
309 void
310 printThreadBlockage(StgTSO *tso)
311 {
312   switch (tso->why_blocked) {
313   case BlockedOnRead:
314     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
315     break;
316   case BlockedOnWrite:
317     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
318     break;
319 #if defined(mingw32_HOST_OS)
320     case BlockedOnDoProc:
321     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
322     break;
323 #endif
324   case BlockedOnDelay:
325     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
326     break;
327   case BlockedOnMVar:
328     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
329     break;
330   case BlockedOnException:
331     debugBelch("is blocked on delivering an exception to thread %lu",
332                (unsigned long)tso->block_info.tso->id);
333     break;
334   case BlockedOnBlackHole:
335     debugBelch("is blocked on a black hole");
336     break;
337   case NotBlocked:
338     debugBelch("is not blocked");
339     break;
340   case BlockedOnCCall:
341     debugBelch("is blocked on an external call");
342     break;
343   case BlockedOnCCall_NoUnblockExc:
344     debugBelch("is blocked on an external call (exceptions were already blocked)");
345     break;
346   case BlockedOnSTM:
347     debugBelch("is blocked on an STM operation");
348     break;
349   default:
350     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
351          tso->why_blocked, tso->id, tso);
352   }
353 }
354
355
356 void
357 printThreadStatus(StgTSO *t)
358 {
359   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
360     {
361       void *label = lookupThreadLabel(t->id);
362       if (label) debugBelch("[\"%s\"] ",(char *)label);
363     }
364     if (t->what_next == ThreadRelocated) {
365         debugBelch("has been relocated...\n");
366     } else {
367         switch (t->what_next) {
368         case ThreadKilled:
369             debugBelch("has been killed");
370             break;
371         case ThreadComplete:
372             debugBelch("has completed");
373             break;
374         default:
375             printThreadBlockage(t);
376         }
377         if (t->dirty) {
378             debugBelch(" (TSO_DIRTY)");
379         } else if (t->flags & TSO_LINK_DIRTY) {
380             debugBelch(" (TSO_LINK_DIRTY)");
381         }
382         debugBelch("\n");
383     }
384 }
385
386 void
387 printAllThreads(void)
388 {
389   StgTSO *t, *next;
390   nat i, s;
391   Capability *cap;
392
393 # if defined(GRAN)
394   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
395   ullong_format_string(TIME_ON_PROC(CurrentProc), 
396                        time_string, rtsFalse/*no commas!*/);
397
398   debugBelch("all threads at [%s]:\n", time_string);
399 # elif defined(PARALLEL_HASKELL)
400   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
401   ullong_format_string(CURRENT_TIME,
402                        time_string, rtsFalse/*no commas!*/);
403
404   debugBelch("all threads at [%s]:\n", time_string);
405 # else
406   debugBelch("all threads:\n");
407 # endif
408
409   for (i = 0; i < n_capabilities; i++) {
410       cap = &capabilities[i];
411       debugBelch("threads on capability %d:\n", cap->no);
412       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
413           printThreadStatus(t);
414       }
415   }
416
417   debugBelch("other threads:\n");
418   for (s = 0; s < total_steps; s++) {
419     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
420       if (t->why_blocked != NotBlocked) {
421           printThreadStatus(t);
422       }
423       if (t->what_next == ThreadRelocated) {
424           next = t->_link;
425       } else {
426           next = t->global_link;
427       }
428     }
429   }
430 }
431
432 // useful from gdb
433 void 
434 printThreadQueue(StgTSO *t)
435 {
436     nat i = 0;
437     for (; t != END_TSO_QUEUE; t = t->_link) {
438         printThreadStatus(t);
439         i++;
440     }
441     debugBelch("%d threads on queue\n", i);
442 }
443
444 #endif /* DEBUG */