5723eace8ce589693851d6f2ae6c98cfc391a2e5
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 return rtsFalse;
167             } else {
168                 *queue = t->_link;
169                 return rtsTrue;
170             }
171         }
172     }
173     barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap, 
178                          StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180     StgTSO *t, *prev;
181     rtsBool flag = rtsFalse;
182
183     prev = NULL;
184     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185         if (t == tso) {
186             if (prev) {
187                 setTSOLink(cap,prev,t->_link);
188                 flag = rtsFalse;
189             } else {
190                 *head = t->_link;
191                 flag = rtsTrue;
192             }
193             if (*tail == tso) {
194                 if (prev) {
195                     *tail = prev;
196                 } else {
197                     *tail = END_TSO_QUEUE;
198                 }
199                 return rtsTrue;
200             } else {
201                 return flag;
202             }
203         }
204     }
205     barf("removeThreadFromMVarQueue: not found");
206 }
207
208 /* ----------------------------------------------------------------------------
209    tryWakeupThread()
210
211    Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
212    always safe to call it too many times, but it is not safe in
213    general to omit a call.
214
215    ------------------------------------------------------------------------- */
216
217 void
218 tryWakeupThread (Capability *cap, StgTSO *tso_)
219 {
220     StgTSO *tso = deRefTSO(tso_);
221
222     traceEventThreadWakeup (cap, tso, tso->cap->no);
223
224 #ifdef THREADED_RTS
225     if (tso->cap != cap)
226     {
227         MessageWakeup *msg;
228         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
229         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
230         msg->tso = tso;
231         sendMessage(cap, tso->cap, (Message*)msg);
232         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
233                       (lnat)tso->id, tso->cap->no);
234         return;
235     }
236 #endif
237
238     switch (tso->why_blocked)
239     {
240     case BlockedOnMVar:
241     {
242         if (tso->_link == END_TSO_QUEUE) {
243             tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
244             goto unblock;
245         } else {
246             return;
247         }
248     }
249
250     case BlockedOnMsgThrowTo:
251     {
252         const StgInfoTable *i;
253         
254         i = lockClosure(tso->block_info.closure);
255         unlockClosure(tso->block_info.closure, i);
256         if (i != &stg_MSG_NULL_info) {
257             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
258                           (lnat)tso->id, tso->block_info.throwto->header.info);
259             return;
260         }
261
262         // remove the block frame from the stack
263         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
264         tso->sp += 3;
265         goto unblock;
266     }
267
268     case BlockedOnBlackHole:
269     case BlockedOnSTM:
270     case ThreadMigrating:
271         goto unblock;
272
273     default:
274         // otherwise, do nothing
275         return;
276     }
277
278 unblock:
279     // just run the thread now, if the BH is not really available,
280     // we'll block again.
281     tso->why_blocked = NotBlocked;
282     appendToRunQueue(cap,tso);
283 }
284
285 /* ----------------------------------------------------------------------------
286    migrateThread
287    ------------------------------------------------------------------------- */
288
289 void
290 migrateThread (Capability *from, StgTSO *tso, Capability *to)
291 {
292     traceEventMigrateThread (from, tso, to->no);
293     // ThreadMigrating tells the target cap that it needs to be added to
294     // the run queue when it receives the MSG_TRY_WAKEUP.
295     tso->why_blocked = ThreadMigrating;
296     tso->cap = to;
297     tryWakeupThread(from, tso);
298 }
299
300 /* ----------------------------------------------------------------------------
301    awakenBlockedQueue
302
303    wakes up all the threads on the specified queue.
304    ------------------------------------------------------------------------- */
305
306 void
307 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
308 {
309     MessageBlackHole *msg;
310     const StgInfoTable *i;
311
312     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
313            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
314
315     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
316          msg = msg->link) {
317         i = msg->header.info;
318         if (i != &stg_IND_info) {
319             ASSERT(i == &stg_MSG_BLACKHOLE_info);
320             tryWakeupThread(cap,msg->tso);
321         }
322     }
323
324     // overwrite the BQ with an indirection so it will be
325     // collected at the next GC.
326 #if defined(DEBUG) && !defined(THREADED_RTS)
327     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
328     // another thread might be looking at this BLOCKING_QUEUE and
329     // checking the owner field at the same time.
330     bq->bh = 0; bq->queue = 0; bq->owner = 0;
331 #endif
332     OVERWRITE_INFO(bq, &stg_IND_info);
333 }
334
335 // If we update a closure that we know we BLACKHOLE'd, and the closure
336 // no longer points to the current TSO as its owner, then there may be
337 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
338 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
339 // current TSO to see if any can now be woken up.
340 void
341 checkBlockingQueues (Capability *cap, StgTSO *tso)
342 {
343     StgBlockingQueue *bq, *next;
344     StgClosure *p;
345
346     debugTraceCap(DEBUG_sched, cap,
347                   "collision occurred; checking blocking queues for thread %ld",
348                   (lnat)tso->id);
349     
350     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
351         next = bq->link;
352
353         if (bq->header.info == &stg_IND_info) {
354             // ToDo: could short it out right here, to avoid
355             // traversing this IND multiple times.
356             continue;
357         }
358         
359         p = bq->bh;
360
361         if (p->header.info != &stg_BLACKHOLE_info ||
362             ((StgInd *)p)->indirectee != (StgClosure*)bq)
363         {
364             wakeBlockingQueue(cap,bq);
365         }   
366     }
367 }
368
369 /* ----------------------------------------------------------------------------
370    updateThunk
371
372    Update a thunk with a value.  In order to do this, we need to know
373    which TSO owns (or is evaluating) the thunk, in case we need to
374    awaken any threads that are blocked on it.
375    ------------------------------------------------------------------------- */
376
377 void
378 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
379 {
380     StgClosure *v;
381     StgTSO *owner;
382     const StgInfoTable *i;
383
384     i = thunk->header.info;
385     if (i != &stg_BLACKHOLE_info &&
386         i != &stg_CAF_BLACKHOLE_info &&
387         i != &stg_WHITEHOLE_info) {
388         updateWithIndirection(cap, thunk, val);
389         return;
390     }
391     
392     v = ((StgInd*)thunk)->indirectee;
393
394     updateWithIndirection(cap, thunk, val);
395
396     i = v->header.info;
397     if (i == &stg_TSO_info) {
398         owner = deRefTSO((StgTSO*)v);
399         if (owner != tso) {
400             checkBlockingQueues(cap, tso);
401         }
402         return;
403     }
404
405     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
406         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
407         checkBlockingQueues(cap, tso);
408         return;
409     }
410
411     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
412
413     if (owner != tso) {
414         checkBlockingQueues(cap, tso);
415     } else {
416         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
417     }
418 }
419
420 /* ---------------------------------------------------------------------------
421  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
422  * used by Control.Concurrent for error checking.
423  * ------------------------------------------------------------------------- */
424  
425 HsBool
426 rtsSupportsBoundThreads(void)
427 {
428 #if defined(THREADED_RTS)
429   return HS_BOOL_TRUE;
430 #else
431   return HS_BOOL_FALSE;
432 #endif
433 }
434
435 /* ---------------------------------------------------------------------------
436  * isThreadBound(tso): check whether tso is bound to an OS thread.
437  * ------------------------------------------------------------------------- */
438  
439 StgBool
440 isThreadBound(StgTSO* tso USED_IF_THREADS)
441 {
442 #if defined(THREADED_RTS)
443   return (tso->bound != NULL);
444 #endif
445   return rtsFalse;
446 }
447
448 /* ----------------------------------------------------------------------------
449  * Debugging: why is a thread blocked
450  * ------------------------------------------------------------------------- */
451
452 #if DEBUG
453 void
454 printThreadBlockage(StgTSO *tso)
455 {
456   switch (tso->why_blocked) {
457   case BlockedOnRead:
458     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
459     break;
460   case BlockedOnWrite:
461     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
462     break;
463 #if defined(mingw32_HOST_OS)
464     case BlockedOnDoProc:
465     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
466     break;
467 #endif
468   case BlockedOnDelay:
469     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
470     break;
471   case BlockedOnMVar:
472     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
473     break;
474   case BlockedOnBlackHole:
475       debugBelch("is blocked on a black hole %p", 
476                  ((StgBlockingQueue*)tso->block_info.bh->bh));
477     break;
478   case BlockedOnMsgThrowTo:
479     debugBelch("is blocked on a throwto message");
480     break;
481   case NotBlocked:
482     debugBelch("is not blocked");
483     break;
484   case ThreadMigrating:
485     debugBelch("is runnable, but not on the run queue");
486     break;
487   case BlockedOnCCall:
488     debugBelch("is blocked on an external call");
489     break;
490   case BlockedOnCCall_NoUnblockExc:
491     debugBelch("is blocked on an external call (exceptions were already blocked)");
492     break;
493   case BlockedOnSTM:
494     debugBelch("is blocked on an STM operation");
495     break;
496   default:
497     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
498          tso->why_blocked, tso->id, tso);
499   }
500 }
501
502
503 void
504 printThreadStatus(StgTSO *t)
505 {
506   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
507     {
508       void *label = lookupThreadLabel(t->id);
509       if (label) debugBelch("[\"%s\"] ",(char *)label);
510     }
511     if (t->what_next == ThreadRelocated) {
512         debugBelch("has been relocated...\n");
513     } else {
514         switch (t->what_next) {
515         case ThreadKilled:
516             debugBelch("has been killed");
517             break;
518         case ThreadComplete:
519             debugBelch("has completed");
520             break;
521         default:
522             printThreadBlockage(t);
523         }
524         if (t->dirty) {
525             debugBelch(" (TSO_DIRTY)");
526         } else if (t->flags & TSO_LINK_DIRTY) {
527             debugBelch(" (TSO_LINK_DIRTY)");
528         }
529         debugBelch("\n");
530     }
531 }
532
533 void
534 printAllThreads(void)
535 {
536   StgTSO *t, *next;
537   nat i, g;
538   Capability *cap;
539
540   debugBelch("all threads:\n");
541
542   for (i = 0; i < n_capabilities; i++) {
543       cap = &capabilities[i];
544       debugBelch("threads on capability %d:\n", cap->no);
545       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
546           printThreadStatus(t);
547       }
548   }
549
550   debugBelch("other threads:\n");
551   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
552     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
553       if (t->why_blocked != NotBlocked) {
554           printThreadStatus(t);
555       }
556       if (t->what_next == ThreadRelocated) {
557           next = t->_link;
558       } else {
559           next = t->global_link;
560       }
561     }
562   }
563 }
564
565 // useful from gdb
566 void 
567 printThreadQueue(StgTSO *t)
568 {
569     nat i = 0;
570     for (; t != END_TSO_QUEUE; t = t->_link) {
571         printThreadStatus(t);
572         i++;
573     }
574     debugBelch("%d threads on queue\n", i);
575 }
576
577 #endif /* DEBUG */