removeThreadFromQueue: stub out the link field before returning (#4813)
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 t->_link = END_TSO_QUEUE;
167                 return rtsFalse;
168             } else {
169                 *queue = t->_link;
170                 t->_link = END_TSO_QUEUE;
171                 return rtsTrue;
172             }
173         }
174     }
175     barf("removeThreadFromQueue: not found");
176 }
177
178 rtsBool // returns True if we modified head or tail
179 removeThreadFromDeQueue (Capability *cap, 
180                          StgTSO **head, StgTSO **tail, StgTSO *tso)
181 {
182     StgTSO *t, *prev;
183     rtsBool flag = rtsFalse;
184
185     prev = NULL;
186     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187         if (t == tso) {
188             if (prev) {
189                 setTSOLink(cap,prev,t->_link);
190                 flag = rtsFalse;
191             } else {
192                 *head = t->_link;
193                 flag = rtsTrue;
194             }
195             t->_link = END_TSO_QUEUE;
196             if (*tail == tso) {
197                 if (prev) {
198                     *tail = prev;
199                 } else {
200                     *tail = END_TSO_QUEUE;
201                 }
202                 return rtsTrue;
203             } else {
204                 return flag;
205             }
206         }
207     }
208     barf("removeThreadFromMVarQueue: not found");
209 }
210
211 /* ----------------------------------------------------------------------------
212    tryWakeupThread()
213
214    Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
215    always safe to call it too many times, but it is not safe in
216    general to omit a call.
217
218    ------------------------------------------------------------------------- */
219
220 void
221 tryWakeupThread (Capability *cap, StgTSO *tso)
222 {
223     tryWakeupThread_(cap, deRefTSO(tso));
224 }
225
226 void
227 tryWakeupThread_ (Capability *cap, StgTSO *tso)
228 {
229     traceEventThreadWakeup (cap, tso, tso->cap->no);
230
231 #ifdef THREADED_RTS
232     if (tso->cap != cap)
233     {
234         MessageWakeup *msg;
235         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
236         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
237         msg->tso = tso;
238         sendMessage(cap, tso->cap, (Message*)msg);
239         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
240                       (lnat)tso->id, tso->cap->no);
241         return;
242     }
243 #endif
244
245     switch (tso->why_blocked)
246     {
247     case BlockedOnMVar:
248     {
249         if (tso->_link == END_TSO_QUEUE) {
250             tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
251             goto unblock;
252         } else {
253             return;
254         }
255     }
256
257     case BlockedOnMsgThrowTo:
258     {
259         const StgInfoTable *i;
260         
261         i = lockClosure(tso->block_info.closure);
262         unlockClosure(tso->block_info.closure, i);
263         if (i != &stg_MSG_NULL_info) {
264             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
265                           (lnat)tso->id, tso->block_info.throwto->header.info);
266             return;
267         }
268
269         // remove the block frame from the stack
270         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
271         tso->sp += 3;
272         goto unblock;
273     }
274
275     case BlockedOnBlackHole:
276     case BlockedOnSTM:
277     case ThreadMigrating:
278         goto unblock;
279
280     default:
281         // otherwise, do nothing
282         return;
283     }
284
285 unblock:
286     // just run the thread now, if the BH is not really available,
287     // we'll block again.
288     tso->why_blocked = NotBlocked;
289     appendToRunQueue(cap,tso);
290
291     // We used to set the context switch flag here, which would
292     // trigger a context switch a short time in the future (at the end
293     // of the current nursery block).  The idea is that we have just
294     // woken up a thread, so we may need to load-balance and migrate
295     // threads to other CPUs.  On the other hand, setting the context
296     // switch flag here unfairly penalises the current thread by
297     // yielding its time slice too early.
298     //
299     // The synthetic benchmark nofib/smp/chan can be used to show the
300     // difference quite clearly.
301
302     // cap->context_switch = 1;
303 }
304
305 /* ----------------------------------------------------------------------------
306    migrateThread
307    ------------------------------------------------------------------------- */
308
309 void
310 migrateThread (Capability *from, StgTSO *tso, Capability *to)
311 {
312     traceEventMigrateThread (from, tso, to->no);
313     // ThreadMigrating tells the target cap that it needs to be added to
314     // the run queue when it receives the MSG_TRY_WAKEUP.
315     tso->why_blocked = ThreadMigrating;
316     tso->cap = to;
317     tryWakeupThread(from, tso);
318 }
319
320 /* ----------------------------------------------------------------------------
321    awakenBlockedQueue
322
323    wakes up all the threads on the specified queue.
324    ------------------------------------------------------------------------- */
325
326 void
327 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
328 {
329     MessageBlackHole *msg;
330     const StgInfoTable *i;
331
332     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
333            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
334
335     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
336          msg = msg->link) {
337         i = msg->header.info;
338         if (i != &stg_IND_info) {
339             ASSERT(i == &stg_MSG_BLACKHOLE_info);
340             tryWakeupThread(cap,msg->tso);
341         }
342     }
343
344     // overwrite the BQ with an indirection so it will be
345     // collected at the next GC.
346 #if defined(DEBUG) && !defined(THREADED_RTS)
347     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
348     // another thread might be looking at this BLOCKING_QUEUE and
349     // checking the owner field at the same time.
350     bq->bh = 0; bq->queue = 0; bq->owner = 0;
351 #endif
352     OVERWRITE_INFO(bq, &stg_IND_info);
353 }
354
355 // If we update a closure that we know we BLACKHOLE'd, and the closure
356 // no longer points to the current TSO as its owner, then there may be
357 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
358 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
359 // current TSO to see if any can now be woken up.
360 void
361 checkBlockingQueues (Capability *cap, StgTSO *tso)
362 {
363     StgBlockingQueue *bq, *next;
364     StgClosure *p;
365
366     debugTraceCap(DEBUG_sched, cap,
367                   "collision occurred; checking blocking queues for thread %ld",
368                   (lnat)tso->id);
369     
370     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
371         next = bq->link;
372
373         if (bq->header.info == &stg_IND_info) {
374             // ToDo: could short it out right here, to avoid
375             // traversing this IND multiple times.
376             continue;
377         }
378         
379         p = bq->bh;
380
381         if (p->header.info != &stg_BLACKHOLE_info ||
382             ((StgInd *)p)->indirectee != (StgClosure*)bq)
383         {
384             wakeBlockingQueue(cap,bq);
385         }   
386     }
387 }
388
389 /* ----------------------------------------------------------------------------
390    updateThunk
391
392    Update a thunk with a value.  In order to do this, we need to know
393    which TSO owns (or is evaluating) the thunk, in case we need to
394    awaken any threads that are blocked on it.
395    ------------------------------------------------------------------------- */
396
397 void
398 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
399 {
400     StgClosure *v;
401     StgTSO *owner;
402     const StgInfoTable *i;
403
404     i = thunk->header.info;
405     if (i != &stg_BLACKHOLE_info &&
406         i != &stg_CAF_BLACKHOLE_info &&
407         i != &__stg_EAGER_BLACKHOLE_info &&
408         i != &stg_WHITEHOLE_info) {
409         updateWithIndirection(cap, thunk, val);
410         return;
411     }
412     
413     v = ((StgInd*)thunk)->indirectee;
414
415     updateWithIndirection(cap, thunk, val);
416
417     i = v->header.info;
418     if (i == &stg_TSO_info) {
419         owner = deRefTSO((StgTSO*)v);
420         if (owner != tso) {
421             checkBlockingQueues(cap, tso);
422         }
423         return;
424     }
425
426     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
427         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
428         checkBlockingQueues(cap, tso);
429         return;
430     }
431
432     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
433
434     if (owner != tso) {
435         checkBlockingQueues(cap, tso);
436     } else {
437         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
438     }
439 }
440
441 /* ---------------------------------------------------------------------------
442  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
443  * used by Control.Concurrent for error checking.
444  * ------------------------------------------------------------------------- */
445  
446 HsBool
447 rtsSupportsBoundThreads(void)
448 {
449 #if defined(THREADED_RTS)
450   return HS_BOOL_TRUE;
451 #else
452   return HS_BOOL_FALSE;
453 #endif
454 }
455
456 /* ---------------------------------------------------------------------------
457  * isThreadBound(tso): check whether tso is bound to an OS thread.
458  * ------------------------------------------------------------------------- */
459  
460 StgBool
461 isThreadBound(StgTSO* tso USED_IF_THREADS)
462 {
463 #if defined(THREADED_RTS)
464   return (tso->bound != NULL);
465 #endif
466   return rtsFalse;
467 }
468
469 /* ----------------------------------------------------------------------------
470  * Debugging: why is a thread blocked
471  * ------------------------------------------------------------------------- */
472
473 #if DEBUG
474 void
475 printThreadBlockage(StgTSO *tso)
476 {
477   switch (tso->why_blocked) {
478   case BlockedOnRead:
479     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
480     break;
481   case BlockedOnWrite:
482     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
483     break;
484 #if defined(mingw32_HOST_OS)
485     case BlockedOnDoProc:
486     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
487     break;
488 #endif
489   case BlockedOnDelay:
490     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
491     break;
492   case BlockedOnMVar:
493     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
494     break;
495   case BlockedOnBlackHole:
496       debugBelch("is blocked on a black hole %p", 
497                  ((StgBlockingQueue*)tso->block_info.bh->bh));
498     break;
499   case BlockedOnMsgThrowTo:
500     debugBelch("is blocked on a throwto message");
501     break;
502   case NotBlocked:
503     debugBelch("is not blocked");
504     break;
505   case ThreadMigrating:
506     debugBelch("is runnable, but not on the run queue");
507     break;
508   case BlockedOnCCall:
509     debugBelch("is blocked on an external call");
510     break;
511   case BlockedOnCCall_Interruptible:
512     debugBelch("is blocked on an external call (but may be interrupted)");
513     break;
514   case BlockedOnSTM:
515     debugBelch("is blocked on an STM operation");
516     break;
517   default:
518     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
519          tso->why_blocked, tso->id, tso);
520   }
521 }
522
523
524 void
525 printThreadStatus(StgTSO *t)
526 {
527   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
528     {
529       void *label = lookupThreadLabel(t->id);
530       if (label) debugBelch("[\"%s\"] ",(char *)label);
531     }
532     if (t->what_next == ThreadRelocated) {
533         debugBelch("has been relocated...\n");
534     } else {
535         switch (t->what_next) {
536         case ThreadKilled:
537             debugBelch("has been killed");
538             break;
539         case ThreadComplete:
540             debugBelch("has completed");
541             break;
542         default:
543             printThreadBlockage(t);
544         }
545         if (t->dirty) {
546             debugBelch(" (TSO_DIRTY)");
547         } else if (t->flags & TSO_LINK_DIRTY) {
548             debugBelch(" (TSO_LINK_DIRTY)");
549         }
550         debugBelch("\n");
551     }
552 }
553
554 void
555 printAllThreads(void)
556 {
557   StgTSO *t, *next;
558   nat i, g;
559   Capability *cap;
560
561   debugBelch("all threads:\n");
562
563   for (i = 0; i < n_capabilities; i++) {
564       cap = &capabilities[i];
565       debugBelch("threads on capability %d:\n", cap->no);
566       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
567           printThreadStatus(t);
568       }
569   }
570
571   debugBelch("other threads:\n");
572   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
573     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
574       if (t->why_blocked != NotBlocked) {
575           printThreadStatus(t);
576       }
577       if (t->what_next == ThreadRelocated) {
578           next = t->_link;
579       } else {
580           next = t->global_link;
581       }
582     }
583   }
584 }
585
586 // useful from gdb
587 void 
588 printThreadQueue(StgTSO *t)
589 {
590     nat i = 0;
591     for (; t != END_TSO_QUEUE; t = t->_link) {
592         printThreadStatus(t);
593         i++;
594     }
595     debugBelch("%d threads on queue\n", i);
596 }
597
598 #endif /* DEBUG */