Fix #3429: a tricky race condition
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Threads.h"
13 #include "STM.h"
14 #include "Schedule.h"
15 #include "Trace.h"
16 #include "ThreadLabels.h"
17
18 /* Next thread ID to allocate.
19  * LOCK: sched_mutex
20  */
21 static StgThreadID next_thread_id = 1;
22
23 /* The smallest stack size that makes any sense is:
24  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
25  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
26  *  + 1                       (the closure to enter)
27  *  + 1                       (stg_ap_v_ret)
28  *  + 1                       (spare slot req'd by stg_ap_v_ret)
29  *
30  * A thread with this stack will bomb immediately with a stack
31  * overflow, which will increase its stack size.  
32  */
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
34
35 /* ---------------------------------------------------------------------------
36    Create a new thread.
37
38    The new thread starts with the given stack size.  Before the
39    scheduler can run, however, this thread needs to have a closure
40    (and possibly some arguments) pushed on its stack.  See
41    pushClosure() in Schedule.h.
42
43    createGenThread() and createIOThread() (in SchedAPI.h) are
44    convenient packaged versions of this function.
45
46    currently pri (priority) is only used in a GRAN setup -- HWL
47    ------------------------------------------------------------------------ */
48 StgTSO *
49 createThread(Capability *cap, nat size)
50 {
51     StgTSO *tso;
52     nat stack_size;
53
54     /* sched_mutex is *not* required */
55
56     /* First check whether we should create a thread at all */
57
58     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
59
60     /* catch ridiculously small stack sizes */
61     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
63     }
64
65     size = round_to_mblocks(size);
66     tso = (StgTSO *)allocateLocal(cap, size);
67
68     stack_size = size - TSO_STRUCT_SIZEW;
69     TICK_ALLOC_TSO(stack_size, 0);
70
71     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
72
73     // Always start with the compiled code evaluator
74     tso->what_next = ThreadRunGHC;
75
76     tso->why_blocked  = NotBlocked;
77     tso->blocked_exceptions = END_TSO_QUEUE;
78     tso->flags = 0;
79     tso->dirty = 1;
80     
81     tso->saved_errno = 0;
82     tso->bound = NULL;
83     tso->cap = cap;
84     
85     tso->stack_size     = stack_size;
86     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
87                           - TSO_STRUCT_SIZEW;
88     tso->sp             = (P_)&(tso->stack) + stack_size;
89
90     tso->trec = NO_TREC;
91     
92 #ifdef PROFILING
93     tso->prof.CCCS = CCS_MAIN;
94 #endif
95     
96   /* put a stop frame on the stack */
97     tso->sp -= sizeofW(StgStopFrame);
98     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
99     tso->_link = END_TSO_QUEUE;
100     
101     /* Link the new thread on the global thread list.
102      */
103     ACQUIRE_LOCK(&sched_mutex);
104     tso->id = next_thread_id++;  // while we have the mutex
105     tso->global_link = g0s0->threads;
106     g0s0->threads = tso;
107     RELEASE_LOCK(&sched_mutex);
108     
109     postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
110
111     debugTrace(DEBUG_sched,
112                "created thread %ld, stack size = %lx words", 
113                (long)tso->id, (long)tso->stack_size);
114     return tso;
115 }
116
117 /* ---------------------------------------------------------------------------
118  * Comparing Thread ids.
119  *
120  * This is used from STG land in the implementation of the
121  * instances of Eq/Ord for ThreadIds.
122  * ------------------------------------------------------------------------ */
123
124 int
125 cmp_thread(StgPtr tso1, StgPtr tso2) 
126
127   StgThreadID id1 = ((StgTSO *)tso1)->id; 
128   StgThreadID id2 = ((StgTSO *)tso2)->id;
129  
130   if (id1 < id2) return (-1);
131   if (id1 > id2) return 1;
132   return 0;
133 }
134
135 /* ---------------------------------------------------------------------------
136  * Fetching the ThreadID from an StgTSO.
137  *
138  * This is used in the implementation of Show for ThreadIds.
139  * ------------------------------------------------------------------------ */
140 int
141 rts_getThreadId(StgPtr tso) 
142 {
143   return ((StgTSO *)tso)->id;
144 }
145
146 /* -----------------------------------------------------------------------------
147    Remove a thread from a queue.
148    Fails fatally if the TSO is not on the queue.
149    -------------------------------------------------------------------------- */
150
151 void
152 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
153 {
154     StgTSO *t, *prev;
155
156     prev = NULL;
157     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
158         if (t == tso) {
159             if (prev) {
160                 setTSOLink(cap,prev,t->_link);
161             } else {
162                 *queue = t->_link;
163             }
164             return;
165         }
166     }
167     barf("removeThreadFromQueue: not found");
168 }
169
170 void
171 removeThreadFromDeQueue (Capability *cap, 
172                          StgTSO **head, StgTSO **tail, StgTSO *tso)
173 {
174     StgTSO *t, *prev;
175
176     prev = NULL;
177     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
178         if (t == tso) {
179             if (prev) {
180                 setTSOLink(cap,prev,t->_link);
181             } else {
182                 *head = t->_link;
183             }
184             if (*tail == tso) {
185                 if (prev) {
186                     *tail = prev;
187                 } else {
188                     *tail = END_TSO_QUEUE;
189                 }
190             }
191             return;
192         }
193     }
194     barf("removeThreadFromMVarQueue: not found");
195 }
196
197 void
198 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
199 {
200     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
201 }
202
203 /* ----------------------------------------------------------------------------
204    unblockOne()
205
206    unblock a single thread.
207    ------------------------------------------------------------------------- */
208
209 StgTSO *
210 unblockOne (Capability *cap, StgTSO *tso)
211 {
212     return unblockOne_(cap,tso,rtsTrue); // allow migration
213 }
214
215 StgTSO *
216 unblockOne_ (Capability *cap, StgTSO *tso, 
217              rtsBool allow_migrate USED_IF_THREADS)
218 {
219   StgTSO *next;
220
221   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
222   ASSERT(tso->why_blocked != NotBlocked);
223
224   tso->why_blocked = NotBlocked;
225   next = tso->_link;
226   tso->_link = END_TSO_QUEUE;
227
228 #if defined(THREADED_RTS)
229   if (tso->cap == cap || (!tsoLocked(tso) && 
230                           allow_migrate && 
231                           RtsFlags.ParFlags.wakeupMigrate)) {
232       // We are waking up this thread on the current Capability, which
233       // might involve migrating it from the Capability it was last on.
234       if (tso->bound) {
235           ASSERT(tso->bound->cap == tso->cap);
236           tso->bound->cap = cap;
237       }
238
239       tso->cap = cap;
240       appendToRunQueue(cap,tso);
241
242       // context-switch soonish so we can migrate the new thread if
243       // necessary.  NB. not contextSwitchCapability(cap), which would
244       // force a context switch immediately.
245       cap->context_switch = 1;
246   } else {
247       // we'll try to wake it up on the Capability it was last on.
248       wakeupThreadOnCapability(cap, tso->cap, tso);
249   }
250 #else
251   appendToRunQueue(cap,tso);
252
253   // context-switch soonish so we can migrate the new thread if
254   // necessary.  NB. not contextSwitchCapability(cap), which would
255   // force a context switch immediately.
256   cap->context_switch = 1;
257 #endif
258
259   postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
260
261   debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
262              (long)tso->id, tso->cap->no);
263
264   return next;
265 }
266
267 /* ----------------------------------------------------------------------------
268    awakenBlockedQueue
269
270    wakes up all the threads on the specified queue.
271    ------------------------------------------------------------------------- */
272
273 void
274 awakenBlockedQueue(Capability *cap, StgTSO *tso)
275 {
276     while (tso != END_TSO_QUEUE) {
277         tso = unblockOne(cap,tso);
278     }
279 }
280
281 /* ---------------------------------------------------------------------------
282  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
283  * used by Control.Concurrent for error checking.
284  * ------------------------------------------------------------------------- */
285  
286 HsBool
287 rtsSupportsBoundThreads(void)
288 {
289 #if defined(THREADED_RTS)
290   return HS_BOOL_TRUE;
291 #else
292   return HS_BOOL_FALSE;
293 #endif
294 }
295
296 /* ---------------------------------------------------------------------------
297  * isThreadBound(tso): check whether tso is bound to an OS thread.
298  * ------------------------------------------------------------------------- */
299  
300 StgBool
301 isThreadBound(StgTSO* tso USED_IF_THREADS)
302 {
303 #if defined(THREADED_RTS)
304   return (tso->bound != NULL);
305 #endif
306   return rtsFalse;
307 }
308
309 /* ----------------------------------------------------------------------------
310  * Debugging: why is a thread blocked
311  * ------------------------------------------------------------------------- */
312
313 #if DEBUG
314 void
315 printThreadBlockage(StgTSO *tso)
316 {
317   switch (tso->why_blocked) {
318   case BlockedOnRead:
319     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
320     break;
321   case BlockedOnWrite:
322     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
323     break;
324 #if defined(mingw32_HOST_OS)
325     case BlockedOnDoProc:
326     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
327     break;
328 #endif
329   case BlockedOnDelay:
330     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
331     break;
332   case BlockedOnMVar:
333     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
334     break;
335   case BlockedOnException:
336     debugBelch("is blocked on delivering an exception to thread %lu",
337                (unsigned long)tso->block_info.tso->id);
338     break;
339   case BlockedOnBlackHole:
340     debugBelch("is blocked on a black hole");
341     break;
342   case NotBlocked:
343     debugBelch("is not blocked");
344     break;
345   case BlockedOnCCall:
346     debugBelch("is blocked on an external call");
347     break;
348   case BlockedOnCCall_NoUnblockExc:
349     debugBelch("is blocked on an external call (exceptions were already blocked)");
350     break;
351   case BlockedOnSTM:
352     debugBelch("is blocked on an STM operation");
353     break;
354   default:
355     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
356          tso->why_blocked, tso->id, tso);
357   }
358 }
359
360 void
361 printThreadStatus(StgTSO *t)
362 {
363   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
364     {
365       void *label = lookupThreadLabel(t->id);
366       if (label) debugBelch("[\"%s\"] ",(char *)label);
367     }
368     if (t->what_next == ThreadRelocated) {
369         debugBelch("has been relocated...\n");
370     } else {
371         switch (t->what_next) {
372         case ThreadKilled:
373             debugBelch("has been killed");
374             break;
375         case ThreadComplete:
376             debugBelch("has completed");
377             break;
378         default:
379             printThreadBlockage(t);
380         }
381         if (t->dirty) {
382             debugBelch(" (TSO_DIRTY)");
383         } else if (t->flags & TSO_LINK_DIRTY) {
384             debugBelch(" (TSO_LINK_DIRTY)");
385         }
386         debugBelch("\n");
387     }
388 }
389
390 void
391 printAllThreads(void)
392 {
393   StgTSO *t, *next;
394   nat i, s;
395   Capability *cap;
396
397 # if defined(GRAN)
398   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
399   ullong_format_string(TIME_ON_PROC(CurrentProc), 
400                        time_string, rtsFalse/*no commas!*/);
401
402   debugBelch("all threads at [%s]:\n", time_string);
403 # elif defined(PARALLEL_HASKELL)
404   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
405   ullong_format_string(CURRENT_TIME,
406                        time_string, rtsFalse/*no commas!*/);
407
408   debugBelch("all threads at [%s]:\n", time_string);
409 # else
410   debugBelch("all threads:\n");
411 # endif
412
413   for (i = 0; i < n_capabilities; i++) {
414       cap = &capabilities[i];
415       debugBelch("threads on capability %d:\n", cap->no);
416       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
417           printThreadStatus(t);
418       }
419   }
420
421   debugBelch("other threads:\n");
422   for (s = 0; s < total_steps; s++) {
423     for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
424       if (t->why_blocked != NotBlocked) {
425           printThreadStatus(t);
426       }
427       if (t->what_next == ThreadRelocated) {
428           next = t->_link;
429       } else {
430           next = t->global_link;
431       }
432     }
433   }
434 }
435
436 // useful from gdb
437 void 
438 printThreadQueue(StgTSO *t)
439 {
440     nat i = 0;
441     for (; t != END_TSO_QUEUE; t = t->_link) {
442         printThreadStatus(t);
443         i++;
444     }
445     debugBelch("%d threads on queue\n", i);
446 }
447
448 #endif /* DEBUG */