fix bug in migrateThread()
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 return rtsFalse;
167             } else {
168                 *queue = t->_link;
169                 return rtsTrue;
170             }
171         }
172     }
173     barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap, 
178                          StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180     StgTSO *t, *prev;
181     rtsBool flag = rtsFalse;
182
183     prev = NULL;
184     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185         if (t == tso) {
186             if (prev) {
187                 setTSOLink(cap,prev,t->_link);
188                 flag = rtsFalse;
189             } else {
190                 *head = t->_link;
191                 flag = rtsTrue;
192             }
193             if (*tail == tso) {
194                 if (prev) {
195                     *tail = prev;
196                 } else {
197                     *tail = END_TSO_QUEUE;
198                 }
199                 return rtsTrue;
200             } else {
201                 return flag;
202             }
203         }
204     }
205     barf("removeThreadFromMVarQueue: not found");
206 }
207
208 /* ----------------------------------------------------------------------------
209    tryWakeupThread()
210
211    Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
212    always safe to call it too many times, but it is not safe in
213    general to omit a call.
214
215    ------------------------------------------------------------------------- */
216
217 void
218 tryWakeupThread (Capability *cap, StgTSO *tso)
219 {
220
221     traceEventThreadWakeup (cap, tso, tso->cap->no);
222
223 #ifdef THREADED_RTS
224     if (tso->cap != cap)
225     {
226         MessageWakeup *msg;
227         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
228         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
229         msg->tso = tso;
230         sendMessage(cap, tso->cap, (Message*)msg);
231         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
232                       (lnat)tso->id, tso->cap->no);
233         return;
234     }
235 #endif
236
237     switch (tso->why_blocked)
238     {
239     case BlockedOnMVar:
240     {
241         if (tso->_link == END_TSO_QUEUE) {
242             tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
243             goto unblock;
244         } else {
245             return;
246         }
247     }
248
249     case BlockedOnMsgThrowTo:
250     {
251         const StgInfoTable *i;
252         
253         i = lockClosure(tso->block_info.closure);
254         unlockClosure(tso->block_info.closure, i);
255         if (i != &stg_MSG_NULL_info) {
256             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
257                           (lnat)tso->id, tso->block_info.throwto->header.info);
258             return;
259         }
260
261         // remove the block frame from the stack
262         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
263         tso->sp += 3;
264         goto unblock;
265     }
266
267     case BlockedOnBlackHole:
268     case BlockedOnSTM:
269     case ThreadMigrating:
270         goto unblock;
271
272     default:
273         // otherwise, do nothing
274         return;
275     }
276
277 unblock:
278     // just run the thread now, if the BH is not really available,
279     // we'll block again.
280     tso->why_blocked = NotBlocked;
281     appendToRunQueue(cap,tso);
282 }
283
284 /* ----------------------------------------------------------------------------
285    migrateThread
286    ------------------------------------------------------------------------- */
287
288 void
289 migrateThread (Capability *from, StgTSO *tso, Capability *to)
290 {
291     traceEventMigrateThread (from, tso, to->no);
292     // ThreadMigrating tells the target cap that it needs to be added to
293     // the run queue when it receives the MSG_TRY_WAKEUP.
294     tso->why_blocked = ThreadMigrating;
295     tso->cap = to;
296     tryWakeupThread(from, tso);
297 }
298
299 /* ----------------------------------------------------------------------------
300    awakenBlockedQueue
301
302    wakes up all the threads on the specified queue.
303    ------------------------------------------------------------------------- */
304
305 void
306 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
307 {
308     MessageBlackHole *msg;
309     const StgInfoTable *i;
310
311     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
312            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
313
314     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
315          msg = msg->link) {
316         i = msg->header.info;
317         if (i != &stg_IND_info) {
318             ASSERT(i == &stg_MSG_BLACKHOLE_info);
319             tryWakeupThread(cap,msg->tso);
320         }
321     }
322
323     // overwrite the BQ with an indirection so it will be
324     // collected at the next GC.
325 #if defined(DEBUG) && !defined(THREADED_RTS)
326     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
327     // another thread might be looking at this BLOCKING_QUEUE and
328     // checking the owner field at the same time.
329     bq->bh = 0; bq->queue = 0; bq->owner = 0;
330 #endif
331     OVERWRITE_INFO(bq, &stg_IND_info);
332 }
333
334 // If we update a closure that we know we BLACKHOLE'd, and the closure
335 // no longer points to the current TSO as its owner, then there may be
336 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
337 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
338 // current TSO to see if any can now be woken up.
339 void
340 checkBlockingQueues (Capability *cap, StgTSO *tso)
341 {
342     StgBlockingQueue *bq, *next;
343     StgClosure *p;
344
345     debugTraceCap(DEBUG_sched, cap,
346                   "collision occurred; checking blocking queues for thread %ld",
347                   (lnat)tso->id);
348     
349     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
350         next = bq->link;
351
352         if (bq->header.info == &stg_IND_info) {
353             // ToDo: could short it out right here, to avoid
354             // traversing this IND multiple times.
355             continue;
356         }
357         
358         p = bq->bh;
359
360         if (p->header.info != &stg_BLACKHOLE_info ||
361             ((StgInd *)p)->indirectee != (StgClosure*)bq)
362         {
363             wakeBlockingQueue(cap,bq);
364         }   
365     }
366 }
367
368 /* ----------------------------------------------------------------------------
369    updateThunk
370
371    Update a thunk with a value.  In order to do this, we need to know
372    which TSO owns (or is evaluating) the thunk, in case we need to
373    awaken any threads that are blocked on it.
374    ------------------------------------------------------------------------- */
375
376 void
377 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
378 {
379     StgClosure *v;
380     StgTSO *owner;
381     const StgInfoTable *i;
382
383     i = thunk->header.info;
384     if (i != &stg_BLACKHOLE_info &&
385         i != &stg_CAF_BLACKHOLE_info &&
386         i != &stg_WHITEHOLE_info) {
387         updateWithIndirection(cap, thunk, val);
388         return;
389     }
390     
391     v = ((StgInd*)thunk)->indirectee;
392
393     updateWithIndirection(cap, thunk, val);
394
395     i = v->header.info;
396     if (i == &stg_TSO_info) {
397         owner = deRefTSO((StgTSO*)v);
398         if (owner != tso) {
399             checkBlockingQueues(cap, tso);
400         }
401         return;
402     }
403
404     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
405         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
406         checkBlockingQueues(cap, tso);
407         return;
408     }
409
410     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
411
412     if (owner != tso) {
413         checkBlockingQueues(cap, tso);
414     } else {
415         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
416     }
417 }
418
419 /* ---------------------------------------------------------------------------
420  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
421  * used by Control.Concurrent for error checking.
422  * ------------------------------------------------------------------------- */
423  
424 HsBool
425 rtsSupportsBoundThreads(void)
426 {
427 #if defined(THREADED_RTS)
428   return HS_BOOL_TRUE;
429 #else
430   return HS_BOOL_FALSE;
431 #endif
432 }
433
434 /* ---------------------------------------------------------------------------
435  * isThreadBound(tso): check whether tso is bound to an OS thread.
436  * ------------------------------------------------------------------------- */
437  
438 StgBool
439 isThreadBound(StgTSO* tso USED_IF_THREADS)
440 {
441 #if defined(THREADED_RTS)
442   return (tso->bound != NULL);
443 #endif
444   return rtsFalse;
445 }
446
447 /* ----------------------------------------------------------------------------
448  * Debugging: why is a thread blocked
449  * ------------------------------------------------------------------------- */
450
451 #if DEBUG
452 void
453 printThreadBlockage(StgTSO *tso)
454 {
455   switch (tso->why_blocked) {
456   case BlockedOnRead:
457     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
458     break;
459   case BlockedOnWrite:
460     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
461     break;
462 #if defined(mingw32_HOST_OS)
463     case BlockedOnDoProc:
464     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
465     break;
466 #endif
467   case BlockedOnDelay:
468     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
469     break;
470   case BlockedOnMVar:
471     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
472     break;
473   case BlockedOnBlackHole:
474       debugBelch("is blocked on a black hole %p", 
475                  ((StgBlockingQueue*)tso->block_info.bh->bh));
476     break;
477   case BlockedOnMsgThrowTo:
478     debugBelch("is blocked on a throwto message");
479     break;
480   case NotBlocked:
481     debugBelch("is not blocked");
482     break;
483   case ThreadMigrating:
484     debugBelch("is runnable, but not on the run queue");
485     break;
486   case BlockedOnCCall:
487     debugBelch("is blocked on an external call");
488     break;
489   case BlockedOnCCall_NoUnblockExc:
490     debugBelch("is blocked on an external call (exceptions were already blocked)");
491     break;
492   case BlockedOnSTM:
493     debugBelch("is blocked on an STM operation");
494     break;
495   default:
496     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
497          tso->why_blocked, tso->id, tso);
498   }
499 }
500
501
502 void
503 printThreadStatus(StgTSO *t)
504 {
505   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
506     {
507       void *label = lookupThreadLabel(t->id);
508       if (label) debugBelch("[\"%s\"] ",(char *)label);
509     }
510     if (t->what_next == ThreadRelocated) {
511         debugBelch("has been relocated...\n");
512     } else {
513         switch (t->what_next) {
514         case ThreadKilled:
515             debugBelch("has been killed");
516             break;
517         case ThreadComplete:
518             debugBelch("has completed");
519             break;
520         default:
521             printThreadBlockage(t);
522         }
523         if (t->dirty) {
524             debugBelch(" (TSO_DIRTY)");
525         } else if (t->flags & TSO_LINK_DIRTY) {
526             debugBelch(" (TSO_LINK_DIRTY)");
527         }
528         debugBelch("\n");
529     }
530 }
531
532 void
533 printAllThreads(void)
534 {
535   StgTSO *t, *next;
536   nat i, g;
537   Capability *cap;
538
539   debugBelch("all threads:\n");
540
541   for (i = 0; i < n_capabilities; i++) {
542       cap = &capabilities[i];
543       debugBelch("threads on capability %d:\n", cap->no);
544       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
545           printThreadStatus(t);
546       }
547   }
548
549   debugBelch("other threads:\n");
550   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
551     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
552       if (t->why_blocked != NotBlocked) {
553           printThreadStatus(t);
554       }
555       if (t->what_next == ThreadRelocated) {
556           next = t->_link;
557       } else {
558           next = t->global_link;
559       }
560     }
561   }
562 }
563
564 // useful from gdb
565 void 
566 printThreadQueue(StgTSO *t)
567 {
568     nat i = 0;
569     for (; t != END_TSO_QUEUE; t = t->_link) {
570         printThreadStatus(t);
571         i++;
572     }
573     debugBelch("%d threads on queue\n", i);
574 }
575
576 #endif /* DEBUG */