RTS tidyup sweep, first phase
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Threads.h"
13 #include "STM.h"
14 #include "Schedule.h"
15 #include "Trace.h"
16 #include "ThreadLabels.h"
17
18 /* Next thread ID to allocate.
19  * LOCK: sched_mutex
20  */
21 static StgThreadID next_thread_id = 1;
22
23 /* The smallest stack size that makes any sense is:
24  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
25  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
26  *  + 1                       (the closure to enter)
27  *  + 1                       (stg_ap_v_ret)
28  *  + 1                       (spare slot req'd by stg_ap_v_ret)
29  *
30  * A thread with this stack will bomb immediately with a stack
31  * overflow, which will increase its stack size.  
32  */
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
34
35 /* ---------------------------------------------------------------------------
36    Create a new thread.
37
38    The new thread starts with the given stack size.  Before the
39    scheduler can run, however, this thread needs to have a closure
40    (and possibly some arguments) pushed on its stack.  See
41    pushClosure() in Schedule.h.
42
43    createGenThread() and createIOThread() (in SchedAPI.h) are
44    convenient packaged versions of this function.
45
46    currently pri (priority) is only used in a GRAN setup -- HWL
47    ------------------------------------------------------------------------ */
48 StgTSO *
49 createThread(Capability *cap, nat size)
50 {
51     StgTSO *tso;
52     nat stack_size;
53
54     /* sched_mutex is *not* required */
55
56     /* First check whether we should create a thread at all */
57
58     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
59
60     /* catch ridiculously small stack sizes */
61     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
63     }
64
65     size = round_to_mblocks(size);
66     tso = (StgTSO *)allocateLocal(cap, size);
67
68     stack_size = size - TSO_STRUCT_SIZEW;
69     TICK_ALLOC_TSO(stack_size, 0);
70
71     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
72
73     // Always start with the compiled code evaluator
74     tso->what_next = ThreadRunGHC;
75
76     tso->why_blocked  = NotBlocked;
77     tso->blocked_exceptions = END_TSO_QUEUE;
78     tso->flags = TSO_DIRTY;
79     
80     tso->saved_errno = 0;
81     tso->bound = NULL;
82     tso->cap = cap;
83     
84     tso->stack_size     = stack_size;
85     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
86                           - TSO_STRUCT_SIZEW;
87     tso->sp             = (P_)&(tso->stack) + stack_size;
88
89     tso->trec = NO_TREC;
90     
91 #ifdef PROFILING
92     tso->prof.CCCS = CCS_MAIN;
93 #endif
94     
95   /* put a stop frame on the stack */
96     tso->sp -= sizeofW(StgStopFrame);
97     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
98     tso->_link = END_TSO_QUEUE;
99     
100     /* Link the new thread on the global thread list.
101      */
102     ACQUIRE_LOCK(&sched_mutex);
103     tso->id = next_thread_id++;  // while we have the mutex
104     tso->global_link = g0s0->threads;
105     g0s0->threads = tso;
106     RELEASE_LOCK(&sched_mutex);
107     
108     postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
109
110     debugTrace(DEBUG_sched,
111                "created thread %ld, stack size = %lx words", 
112                (long)tso->id, (long)tso->stack_size);
113     return tso;
114 }
115
116 /* ---------------------------------------------------------------------------
117  * Comparing Thread ids.
118  *
119  * This is used from STG land in the implementation of the
120  * instances of Eq/Ord for ThreadIds.
121  * ------------------------------------------------------------------------ */
122
123 int
124 cmp_thread(StgPtr tso1, StgPtr tso2) 
125
126   StgThreadID id1 = ((StgTSO *)tso1)->id; 
127   StgThreadID id2 = ((StgTSO *)tso2)->id;
128  
129   if (id1 < id2) return (-1);
130   if (id1 > id2) return 1;
131   return 0;
132 }
133
134 /* ---------------------------------------------------------------------------
135  * Fetching the ThreadID from an StgTSO.
136  *
137  * This is used in the implementation of Show for ThreadIds.
138  * ------------------------------------------------------------------------ */
139 int
140 rts_getThreadId(StgPtr tso) 
141 {
142   return ((StgTSO *)tso)->id;
143 }
144
145 /* -----------------------------------------------------------------------------
146    Remove a thread from a queue.
147    Fails fatally if the TSO is not on the queue.
148    -------------------------------------------------------------------------- */
149
150 void
151 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
152 {
153     StgTSO *t, *prev;
154
155     prev = NULL;
156     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
157         if (t == tso) {
158             if (prev) {
159                 setTSOLink(cap,prev,t->_link);
160             } else {
161                 *queue = t->_link;
162             }
163             return;
164         }
165     }
166     barf("removeThreadFromQueue: not found");
167 }
168
169 void
170 removeThreadFromDeQueue (Capability *cap, 
171                          StgTSO **head, StgTSO **tail, StgTSO *tso)
172 {
173     StgTSO *t, *prev;
174
175     prev = NULL;
176     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
177         if (t == tso) {
178             if (prev) {
179                 setTSOLink(cap,prev,t->_link);
180             } else {
181                 *head = t->_link;
182             }
183             if (*tail == tso) {
184                 if (prev) {
185                     *tail = prev;
186                 } else {
187                     *tail = END_TSO_QUEUE;
188                 }
189             }
190             return;
191         }
192     }
193     barf("removeThreadFromMVarQueue: not found");
194 }
195
196 void
197 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
198 {
199     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
200 }
201
202 /* ----------------------------------------------------------------------------
203    unblockOne()
204
205    unblock a single thread.
206    ------------------------------------------------------------------------- */
207
208 StgTSO *
209 unblockOne (Capability *cap, StgTSO *tso)
210 {
211     return unblockOne_(cap,tso,rtsTrue); // allow migration
212 }
213
214 StgTSO *
215 unblockOne_ (Capability *cap, StgTSO *tso, 
216              rtsBool allow_migrate USED_IF_THREADS)
217 {
218   StgTSO *next;
219
220   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
221   ASSERT(tso->why_blocked != NotBlocked);
222
223   tso->why_blocked = NotBlocked;
224   next = tso->_link;
225   tso->_link = END_TSO_QUEUE;
226
227 #if defined(THREADED_RTS)
228   if (tso->cap == cap || (!tsoLocked(tso) && 
229                           allow_migrate && 
230                           RtsFlags.ParFlags.wakeupMigrate)) {
231       // We are waking up this thread on the current Capability, which
232       // might involve migrating it from the Capability it was last on.
233       if (tso->bound) {
234           ASSERT(tso->bound->cap == tso->cap);
235           tso->bound->cap = cap;
236       }
237
238       tso->cap = cap;
239       appendToRunQueue(cap,tso);
240
241       // context-switch soonish so we can migrate the new thread if
242       // necessary.  NB. not contextSwitchCapability(cap), which would
243       // force a context switch immediately.
244       cap->context_switch = 1;
245   } else {
246       // we'll try to wake it up on the Capability it was last on.
247       wakeupThreadOnCapability(cap, tso->cap, tso);
248   }
249 #else
250   appendToRunQueue(cap,tso);
251
252   // context-switch soonish so we can migrate the new thread if
253   // necessary.  NB. not contextSwitchCapability(cap), which would
254   // force a context switch immediately.
255   cap->context_switch = 1;
256 #endif
257
258   postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
259
260   debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
261              (long)tso->id, tso->cap->no);
262
263   return next;
264 }
265
266 /* ----------------------------------------------------------------------------
267    awakenBlockedQueue
268
269    wakes up all the threads on the specified queue.
270    ------------------------------------------------------------------------- */
271
272 void
273 awakenBlockedQueue(Capability *cap, StgTSO *tso)
274 {
275     while (tso != END_TSO_QUEUE) {
276         tso = unblockOne(cap,tso);
277     }
278 }
279
280 /* ---------------------------------------------------------------------------
281  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
282  * used by Control.Concurrent for error checking.
283  * ------------------------------------------------------------------------- */
284  
285 HsBool
286 rtsSupportsBoundThreads(void)
287 {
288 #if defined(THREADED_RTS)
289   return HS_BOOL_TRUE;
290 #else
291   return HS_BOOL_FALSE;
292 #endif
293 }
294
295 /* ---------------------------------------------------------------------------
296  * isThreadBound(tso): check whether tso is bound to an OS thread.
297  * ------------------------------------------------------------------------- */
298  
299 StgBool
300 isThreadBound(StgTSO* tso USED_IF_THREADS)
301 {
302 #if defined(THREADED_RTS)
303   return (tso->bound != NULL);
304 #endif
305   return rtsFalse;
306 }
307
308 /* ----------------------------------------------------------------------------
309  * Debugging: why is a thread blocked
310  * ------------------------------------------------------------------------- */
311
312 #if DEBUG
313 void
314 printThreadBlockage(StgTSO *tso)
315 {
316   switch (tso->why_blocked) {
317   case BlockedOnRead:
318     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
319     break;
320   case BlockedOnWrite:
321     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
322     break;
323 #if defined(mingw32_HOST_OS)
324     case BlockedOnDoProc:
325     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
326     break;
327 #endif
328   case BlockedOnDelay:
329     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
330     break;
331   case BlockedOnMVar:
332     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
333     break;
334   case BlockedOnException:
335     debugBelch("is blocked on delivering an exception to thread %lu",
336                (unsigned long)tso->block_info.tso->id);
337     break;
338   case BlockedOnBlackHole:
339     debugBelch("is blocked on a black hole");
340     break;
341   case NotBlocked:
342     debugBelch("is not blocked");
343     break;
344   case BlockedOnCCall:
345     debugBelch("is blocked on an external call");
346     break;
347   case BlockedOnCCall_NoUnblockExc:
348     debugBelch("is blocked on an external call (exceptions were already blocked)");
349     break;
350   case BlockedOnSTM:
351     debugBelch("is blocked on an STM operation");
352     break;
353   default:
354     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
355          tso->why_blocked, tso->id, tso);
356   }
357 }
358
359 void
360 printThreadStatus(StgTSO *t)
361 {
362   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
363     {
364       void *label = lookupThreadLabel(t->id);
365       if (label) debugBelch("[\"%s\"] ",(char *)label);
366     }
367     if (t->what_next == ThreadRelocated) {
368         debugBelch("has been relocated...\n");
369     } else {
370         switch (t->what_next) {
371         case ThreadKilled:
372             debugBelch("has been killed");
373             break;
374         case ThreadComplete:
375             debugBelch("has completed");
376             break;
377         default:
378             printThreadBlockage(t);
379         }
380         if (t->flags & TSO_DIRTY) {
381             debugBelch(" (TSO_DIRTY)");
382         } else if (t->flags & TSO_LINK_DIRTY) {
383             debugBelch(" (TSO_LINK_DIRTY)");
384         }
385         debugBelch("\n");
386     }
387 }
388
389 void
390 printAllThreads(void)
391 {
392   StgTSO *t, *next;
393   nat i, s;
394   Capability *cap;
395
396 # if defined(GRAN)
397   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
398   ullong_format_string(TIME_ON_PROC(CurrentProc), 
399                        time_string, rtsFalse/*no commas!*/);
400
401   debugBelch("all threads at [%s]:\n", time_string);
402 # elif defined(PARALLEL_HASKELL)
403   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
404   ullong_format_string(CURRENT_TIME,
405                        time_string, rtsFalse/*no commas!*/);
406
407   debugBelch("all threads at [%s]:\n", time_string);
408 # else
409   debugBelch("all threads:\n");
410 # endif
411
412   for (i = 0; i < n_capabilities; i++) {
413       cap = &capabilities[i];
414       debugBelch("threads on capability %d:\n", cap->no);
415       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
416           printThreadStatus(t);
417       }
418   }
419
420   debugBelch("other threads:\n");
421   for (s = 0; s < total_steps; s++) {
422     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
423       if (t->why_blocked != NotBlocked) {
424           printThreadStatus(t);
425       }
426       if (t->what_next == ThreadRelocated) {
427           next = t->_link;
428       } else {
429           next = t->global_link;
430       }
431     }
432   }
433 }
434
435 // useful from gdb
436 void 
437 printThreadQueue(StgTSO *t)
438 {
439     nat i = 0;
440     for (; t != END_TSO_QUEUE; t = t->_link) {
441         printThreadStatus(t);
442         i++;
443     }
444     debugBelch("%d threads on queue\n", i);
445 }
446
447 #endif /* DEBUG */