Use message-passing to implement throwTo in the RTS
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Threads.h"
13 #include "STM.h"
14 #include "Schedule.h"
15 #include "Trace.h"
16 #include "ThreadLabels.h"
17
18 /* Next thread ID to allocate.
19  * LOCK: sched_mutex
20  */
21 static StgThreadID next_thread_id = 1;
22
23 /* The smallest stack size that makes any sense is:
24  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
25  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
26  *  + 1                       (the closure to enter)
27  *  + 1                       (stg_ap_v_ret)
28  *  + 1                       (spare slot req'd by stg_ap_v_ret)
29  *
30  * A thread with this stack will bomb immediately with a stack
31  * overflow, which will increase its stack size.  
32  */
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
34
35 /* ---------------------------------------------------------------------------
36    Create a new thread.
37
38    The new thread starts with the given stack size.  Before the
39    scheduler can run, however, this thread needs to have a closure
40    (and possibly some arguments) pushed on its stack.  See
41    pushClosure() in Schedule.h.
42
43    createGenThread() and createIOThread() (in SchedAPI.h) are
44    convenient packaged versions of this function.
45
46    currently pri (priority) is only used in a GRAN setup -- HWL
47    ------------------------------------------------------------------------ */
48 StgTSO *
49 createThread(Capability *cap, nat size)
50 {
51     StgTSO *tso;
52     nat stack_size;
53
54     /* sched_mutex is *not* required */
55
56     /* First check whether we should create a thread at all */
57
58     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
59
60     /* catch ridiculously small stack sizes */
61     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
63     }
64
65     size = round_to_mblocks(size);
66     tso = (StgTSO *)allocate(cap, size);
67
68     stack_size = size - TSO_STRUCT_SIZEW;
69     TICK_ALLOC_TSO(stack_size, 0);
70
71     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
72
73     // Always start with the compiled code evaluator
74     tso->what_next = ThreadRunGHC;
75
76     tso->why_blocked  = NotBlocked;
77     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
78     tso->flags = 0;
79     tso->dirty = 1;
80     
81     tso->saved_errno = 0;
82     tso->bound = NULL;
83     tso->cap = cap;
84     
85     tso->stack_size     = stack_size;
86     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
87                           - TSO_STRUCT_SIZEW;
88     tso->sp             = (P_)&(tso->stack) + stack_size;
89
90     tso->trec = NO_TREC;
91     
92 #ifdef PROFILING
93     tso->prof.CCCS = CCS_MAIN;
94 #endif
95     
96   /* put a stop frame on the stack */
97     tso->sp -= sizeofW(StgStopFrame);
98     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
99     tso->_link = END_TSO_QUEUE;
100     
101     /* Link the new thread on the global thread list.
102      */
103     ACQUIRE_LOCK(&sched_mutex);
104     tso->id = next_thread_id++;  // while we have the mutex
105     tso->global_link = g0->threads;
106     g0->threads = tso;
107     RELEASE_LOCK(&sched_mutex);
108     
109     // ToDo: report the stack size in the event?
110     traceEventCreateThread(cap, tso);
111
112     return tso;
113 }
114
115 /* ---------------------------------------------------------------------------
116  * Comparing Thread ids.
117  *
118  * This is used from STG land in the implementation of the
119  * instances of Eq/Ord for ThreadIds.
120  * ------------------------------------------------------------------------ */
121
122 int
123 cmp_thread(StgPtr tso1, StgPtr tso2) 
124
125   StgThreadID id1 = ((StgTSO *)tso1)->id; 
126   StgThreadID id2 = ((StgTSO *)tso2)->id;
127  
128   if (id1 < id2) return (-1);
129   if (id1 > id2) return 1;
130   return 0;
131 }
132
133 /* ---------------------------------------------------------------------------
134  * Fetching the ThreadID from an StgTSO.
135  *
136  * This is used in the implementation of Show for ThreadIds.
137  * ------------------------------------------------------------------------ */
138 int
139 rts_getThreadId(StgPtr tso) 
140 {
141   return ((StgTSO *)tso)->id;
142 }
143
144 /* -----------------------------------------------------------------------------
145    Remove a thread from a queue.
146    Fails fatally if the TSO is not on the queue.
147    -------------------------------------------------------------------------- */
148
149 void
150 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
151 {
152     StgTSO *t, *prev;
153
154     prev = NULL;
155     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
156         if (t == tso) {
157             if (prev) {
158                 setTSOLink(cap,prev,t->_link);
159             } else {
160                 *queue = t->_link;
161             }
162             return;
163         }
164     }
165     barf("removeThreadFromQueue: not found");
166 }
167
168 void
169 removeThreadFromDeQueue (Capability *cap, 
170                          StgTSO **head, StgTSO **tail, StgTSO *tso)
171 {
172     StgTSO *t, *prev;
173
174     prev = NULL;
175     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
176         if (t == tso) {
177             if (prev) {
178                 setTSOLink(cap,prev,t->_link);
179             } else {
180                 *head = t->_link;
181             }
182             if (*tail == tso) {
183                 if (prev) {
184                     *tail = prev;
185                 } else {
186                     *tail = END_TSO_QUEUE;
187                 }
188             }
189             return;
190         }
191     }
192     barf("removeThreadFromMVarQueue: not found");
193 }
194
195 void
196 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
197 {
198     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
199 }
200
201 /* ----------------------------------------------------------------------------
202    unblockOne()
203
204    unblock a single thread.
205    ------------------------------------------------------------------------- */
206
207 StgTSO *
208 unblockOne (Capability *cap, StgTSO *tso)
209 {
210     return unblockOne_(cap,tso,rtsTrue); // allow migration
211 }
212
213 StgTSO *
214 unblockOne_ (Capability *cap, StgTSO *tso, 
215              rtsBool allow_migrate USED_IF_THREADS)
216 {
217   StgTSO *next;
218
219   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
220   ASSERT(tso->why_blocked != NotBlocked);
221   ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
222          tso->block_info.closure->header.info == &stg_IND_info);
223
224   next = tso->_link;
225   tso->_link = END_TSO_QUEUE;
226
227 #if defined(THREADED_RTS)
228   if (tso->cap == cap || (!tsoLocked(tso) && 
229                           allow_migrate && 
230                           RtsFlags.ParFlags.wakeupMigrate)) {
231       // We are waking up this thread on the current Capability, which
232       // might involve migrating it from the Capability it was last on.
233       if (tso->bound) {
234           ASSERT(tso->bound->task->cap == tso->cap);
235           tso->bound->task->cap = cap;
236       }
237
238       tso->cap = cap;
239       write_barrier();
240       tso->why_blocked = NotBlocked;
241       appendToRunQueue(cap,tso);
242
243       // context-switch soonish so we can migrate the new thread if
244       // necessary.  NB. not contextSwitchCapability(cap), which would
245       // force a context switch immediately.
246       cap->context_switch = 1;
247   } else {
248       // we'll try to wake it up on the Capability it was last on.
249       wakeupThreadOnCapability(cap, tso->cap, tso);
250   }
251 #else
252   tso->why_blocked = NotBlocked;
253   appendToRunQueue(cap,tso);
254
255   // context-switch soonish so we can migrate the new thread if
256   // necessary.  NB. not contextSwitchCapability(cap), which would
257   // force a context switch immediately.
258   cap->context_switch = 1;
259 #endif
260
261   traceEventThreadWakeup (cap, tso, tso->cap->no);
262
263   return next;
264 }
265
266 /* ----------------------------------------------------------------------------
267    awakenBlockedQueue
268
269    wakes up all the threads on the specified queue.
270    ------------------------------------------------------------------------- */
271
272 void
273 awakenBlockedQueue(Capability *cap, StgTSO *tso)
274 {
275     while (tso != END_TSO_QUEUE) {
276         tso = unblockOne(cap,tso);
277     }
278 }
279
280 /* ---------------------------------------------------------------------------
281  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
282  * used by Control.Concurrent for error checking.
283  * ------------------------------------------------------------------------- */
284  
285 HsBool
286 rtsSupportsBoundThreads(void)
287 {
288 #if defined(THREADED_RTS)
289   return HS_BOOL_TRUE;
290 #else
291   return HS_BOOL_FALSE;
292 #endif
293 }
294
295 /* ---------------------------------------------------------------------------
296  * isThreadBound(tso): check whether tso is bound to an OS thread.
297  * ------------------------------------------------------------------------- */
298  
299 StgBool
300 isThreadBound(StgTSO* tso USED_IF_THREADS)
301 {
302 #if defined(THREADED_RTS)
303   return (tso->bound != NULL);
304 #endif
305   return rtsFalse;
306 }
307
308 /* ----------------------------------------------------------------------------
309  * Debugging: why is a thread blocked
310  * ------------------------------------------------------------------------- */
311
312 #if DEBUG
313 void
314 printThreadBlockage(StgTSO *tso)
315 {
316   switch (tso->why_blocked) {
317   case BlockedOnRead:
318     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
319     break;
320   case BlockedOnWrite:
321     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
322     break;
323 #if defined(mingw32_HOST_OS)
324     case BlockedOnDoProc:
325     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
326     break;
327 #endif
328   case BlockedOnDelay:
329     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
330     break;
331   case BlockedOnMVar:
332     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
333     break;
334   case BlockedOnBlackHole:
335     debugBelch("is blocked on a black hole");
336     break;
337   case BlockedOnMsgWakeup:
338     debugBelch("is blocked on a wakeup message");
339     break;
340   case BlockedOnMsgThrowTo:
341     debugBelch("is blocked on a throwto message");
342     break;
343   case NotBlocked:
344     debugBelch("is not blocked");
345     break;
346   case BlockedOnCCall:
347     debugBelch("is blocked on an external call");
348     break;
349   case BlockedOnCCall_NoUnblockExc:
350     debugBelch("is blocked on an external call (exceptions were already blocked)");
351     break;
352   case BlockedOnSTM:
353     debugBelch("is blocked on an STM operation");
354     break;
355   default:
356     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
357          tso->why_blocked, tso->id, tso);
358   }
359 }
360
361
362 void
363 printThreadStatus(StgTSO *t)
364 {
365   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
366     {
367       void *label = lookupThreadLabel(t->id);
368       if (label) debugBelch("[\"%s\"] ",(char *)label);
369     }
370     if (t->what_next == ThreadRelocated) {
371         debugBelch("has been relocated...\n");
372     } else {
373         switch (t->what_next) {
374         case ThreadKilled:
375             debugBelch("has been killed");
376             break;
377         case ThreadComplete:
378             debugBelch("has completed");
379             break;
380         default:
381             printThreadBlockage(t);
382         }
383         if (t->dirty) {
384             debugBelch(" (TSO_DIRTY)");
385         } else if (t->flags & TSO_LINK_DIRTY) {
386             debugBelch(" (TSO_LINK_DIRTY)");
387         }
388         debugBelch("\n");
389     }
390 }
391
392 void
393 printAllThreads(void)
394 {
395   StgTSO *t, *next;
396   nat i, g;
397   Capability *cap;
398
399   debugBelch("all threads:\n");
400
401   for (i = 0; i < n_capabilities; i++) {
402       cap = &capabilities[i];
403       debugBelch("threads on capability %d:\n", cap->no);
404       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
405           printThreadStatus(t);
406       }
407   }
408
409   debugBelch("other threads:\n");
410   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
411     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
412       if (t->why_blocked != NotBlocked) {
413           printThreadStatus(t);
414       }
415       if (t->what_next == ThreadRelocated) {
416           next = t->_link;
417       } else {
418           next = t->global_link;
419       }
420     }
421   }
422 }
423
424 // useful from gdb
425 void 
426 printThreadQueue(StgTSO *t)
427 {
428     nat i = 0;
429     for (; t != END_TSO_QUEUE; t = t->_link) {
430         printThreadStatus(t);
431         i++;
432     }
433     debugBelch("%d threads on queue\n", i);
434 }
435
436 #endif /* DEBUG */