New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 return rtsFalse;
167             } else {
168                 *queue = t->_link;
169                 return rtsTrue;
170             }
171         }
172     }
173     barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap, 
178                          StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180     StgTSO *t, *prev;
181     rtsBool flag = rtsFalse;
182
183     prev = NULL;
184     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185         if (t == tso) {
186             if (prev) {
187                 setTSOLink(cap,prev,t->_link);
188                 flag = rtsFalse;
189             } else {
190                 *head = t->_link;
191                 flag = rtsTrue;
192             }
193             if (*tail == tso) {
194                 if (prev) {
195                     *tail = prev;
196                 } else {
197                     *tail = END_TSO_QUEUE;
198                 }
199                 return rtsTrue;
200             } else {
201                 return flag;
202             }
203         }
204     }
205     barf("removeThreadFromMVarQueue: not found");
206 }
207
208 void
209 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
210 {
211     // caller must do the write barrier, because replacing the info
212     // pointer will unlock the MVar.
213     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
214     tso->_link = END_TSO_QUEUE;
215 }
216
217 /* ----------------------------------------------------------------------------
218    unblockOne()
219
220    unblock a single thread.
221    ------------------------------------------------------------------------- */
222
223 StgTSO *
224 unblockOne (Capability *cap, StgTSO *tso)
225 {
226     return unblockOne_(cap,tso,rtsTrue); // allow migration
227 }
228
229 StgTSO *
230 unblockOne_ (Capability *cap, StgTSO *tso, 
231              rtsBool allow_migrate USED_IF_THREADS)
232 {
233   StgTSO *next;
234
235   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
236   ASSERT(tso->why_blocked != NotBlocked);
237   ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
238          tso->block_info.closure->header.info == &stg_IND_info);
239
240   next = tso->_link;
241   tso->_link = END_TSO_QUEUE;
242
243 #if defined(THREADED_RTS)
244   if (tso->cap == cap || (!tsoLocked(tso) && 
245                           allow_migrate && 
246                           RtsFlags.ParFlags.wakeupMigrate)) {
247       // We are waking up this thread on the current Capability, which
248       // might involve migrating it from the Capability it was last on.
249       if (tso->bound) {
250           ASSERT(tso->bound->task->cap == tso->cap);
251           tso->bound->task->cap = cap;
252       }
253
254       tso->cap = cap;
255       write_barrier();
256       tso->why_blocked = NotBlocked;
257       appendToRunQueue(cap,tso);
258
259       // context-switch soonish so we can migrate the new thread if
260       // necessary.  NB. not contextSwitchCapability(cap), which would
261       // force a context switch immediately.
262       cap->context_switch = 1;
263   } else {
264       // we'll try to wake it up on the Capability it was last on.
265       wakeupThreadOnCapability(cap, tso->cap, tso);
266   }
267 #else
268   tso->why_blocked = NotBlocked;
269   appendToRunQueue(cap,tso);
270
271   // context-switch soonish so we can migrate the new thread if
272   // necessary.  NB. not contextSwitchCapability(cap), which would
273   // force a context switch immediately.
274   cap->context_switch = 1;
275 #endif
276
277   traceEventThreadWakeup (cap, tso, tso->cap->no);
278
279   return next;
280 }
281
282 void
283 tryWakeupThread (Capability *cap, StgTSO *tso)
284 {
285 #ifdef THREADED_RTS
286     if (tso->cap != cap)
287     {
288         MessageWakeup *msg;
289         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
290         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
291         msg->tso = tso;
292         sendMessage(cap, tso->cap, (Message*)msg);
293         return;
294     }
295 #endif
296
297     switch (tso->why_blocked)
298     {
299     case BlockedOnBlackHole:
300     case BlockedOnSTM:
301     {
302         // just run the thread now, if the BH is not really available,
303         // we'll block again.
304         tso->why_blocked = NotBlocked;
305         appendToRunQueue(cap,tso);
306         break;
307     }
308     default:
309         // otherwise, do nothing
310         break;
311     }
312 }
313
314 /* ----------------------------------------------------------------------------
315    awakenBlockedQueue
316
317    wakes up all the threads on the specified queue.
318    ------------------------------------------------------------------------- */
319
320 void
321 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
322 {
323     MessageBlackHole *msg;
324     const StgInfoTable *i;
325
326     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
327            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
328
329     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
330          msg = msg->link) {
331         i = msg->header.info;
332         if (i != &stg_IND_info) {
333             ASSERT(i == &stg_MSG_BLACKHOLE_info);
334             tryWakeupThread(cap,msg->tso);
335         }
336     }
337
338     // overwrite the BQ with an indirection so it will be
339     // collected at the next GC.
340 #if defined(DEBUG) && !defined(THREADED_RTS)
341     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
342     // another thread might be looking at this BLOCKING_QUEUE and
343     // checking the owner field at the same time.
344     bq->bh = 0; bq->queue = 0; bq->owner = 0;
345 #endif
346     OVERWRITE_INFO(bq, &stg_IND_info);
347 }
348
349 // If we update a closure that we know we BLACKHOLE'd, and the closure
350 // no longer points to the current TSO as its owner, then there may be
351 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
352 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
353 // current TSO to see if any can now be woken up.
354 void
355 checkBlockingQueues (Capability *cap, StgTSO *tso)
356 {
357     StgBlockingQueue *bq, *next;
358     StgClosure *p;
359
360     debugTraceCap(DEBUG_sched, cap,
361                   "collision occurred; checking blocking queues for thread %ld",
362                   (lnat)tso->id);
363     
364     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
365         next = bq->link;
366
367         if (bq->header.info == &stg_IND_info) {
368             // ToDo: could short it out right here, to avoid
369             // traversing this IND multiple times.
370             continue;
371         }
372         
373         p = bq->bh;
374
375         if (p->header.info != &stg_BLACKHOLE_info ||
376             ((StgInd *)p)->indirectee != (StgClosure*)bq)
377         {
378             wakeBlockingQueue(cap,bq);
379         }   
380     }
381 }
382
383 /* ----------------------------------------------------------------------------
384    updateThunk
385
386    Update a thunk with a value.  In order to do this, we need to know
387    which TSO owns (or is evaluating) the thunk, in case we need to
388    awaken any threads that are blocked on it.
389    ------------------------------------------------------------------------- */
390
391 void
392 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
393 {
394     StgClosure *v;
395     StgTSO *owner;
396     const StgInfoTable *i;
397
398     i = thunk->header.info;
399     if (i != &stg_BLACKHOLE_info &&
400         i != &stg_CAF_BLACKHOLE_info &&
401         i != &stg_WHITEHOLE_info) {
402         updateWithIndirection(cap, thunk, val);
403         return;
404     }
405     
406     v = ((StgInd*)thunk)->indirectee;
407
408     updateWithIndirection(cap, thunk, val);
409
410     i = v->header.info;
411     if (i == &stg_TSO_info) {
412         owner = deRefTSO((StgTSO*)v);
413         if (owner != tso) {
414             checkBlockingQueues(cap, tso);
415         }
416         return;
417     }
418
419     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
420         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
421         checkBlockingQueues(cap, tso);
422         return;
423     }
424
425     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
426
427     if (owner != tso) {
428         checkBlockingQueues(cap, tso);
429     } else {
430         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
431     }
432 }
433
434 /* ----------------------------------------------------------------------------
435  * Wake up a thread on a Capability.
436  *
437  * This is used when the current Task is running on a Capability and
438  * wishes to wake up a thread on a different Capability.
439  * ------------------------------------------------------------------------- */
440
441 #ifdef THREADED_RTS
442
443 void
444 wakeupThreadOnCapability (Capability *cap,
445                           Capability *other_cap, 
446                           StgTSO *tso)
447 {
448     MessageWakeup *msg;
449
450     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
451     if (tso->bound) {
452         ASSERT(tso->bound->task->cap == tso->cap);
453         tso->bound->task->cap = other_cap;
454     }
455     tso->cap = other_cap;
456
457     ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
458            tso->block_info.closure->header.info == &stg_IND_info);
459
460     ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
461
462     msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
463     SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
464     msg->tso = tso;
465     tso->block_info.closure = (StgClosure *)msg;
466     dirty_TSO(cap, tso);
467     write_barrier();
468     tso->why_blocked = BlockedOnMsgWakeup;
469
470     sendMessage(cap, other_cap, (Message*)msg);
471 }
472
473 #endif /* THREADED_RTS */
474
475 /* ---------------------------------------------------------------------------
476  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
477  * used by Control.Concurrent for error checking.
478  * ------------------------------------------------------------------------- */
479  
480 HsBool
481 rtsSupportsBoundThreads(void)
482 {
483 #if defined(THREADED_RTS)
484   return HS_BOOL_TRUE;
485 #else
486   return HS_BOOL_FALSE;
487 #endif
488 }
489
490 /* ---------------------------------------------------------------------------
491  * isThreadBound(tso): check whether tso is bound to an OS thread.
492  * ------------------------------------------------------------------------- */
493  
494 StgBool
495 isThreadBound(StgTSO* tso USED_IF_THREADS)
496 {
497 #if defined(THREADED_RTS)
498   return (tso->bound != NULL);
499 #endif
500   return rtsFalse;
501 }
502
503 /* ----------------------------------------------------------------------------
504  * Debugging: why is a thread blocked
505  * ------------------------------------------------------------------------- */
506
507 #if DEBUG
508 void
509 printThreadBlockage(StgTSO *tso)
510 {
511   switch (tso->why_blocked) {
512   case BlockedOnRead:
513     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
514     break;
515   case BlockedOnWrite:
516     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
517     break;
518 #if defined(mingw32_HOST_OS)
519     case BlockedOnDoProc:
520     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
521     break;
522 #endif
523   case BlockedOnDelay:
524     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
525     break;
526   case BlockedOnMVar:
527     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
528     break;
529   case BlockedOnBlackHole:
530       debugBelch("is blocked on a black hole %p", 
531                  ((StgBlockingQueue*)tso->block_info.bh->bh));
532     break;
533   case BlockedOnMsgWakeup:
534     debugBelch("is blocked on a wakeup message");
535     break;
536   case BlockedOnMsgThrowTo:
537     debugBelch("is blocked on a throwto message");
538     break;
539   case NotBlocked:
540     debugBelch("is not blocked");
541     break;
542   case BlockedOnCCall:
543     debugBelch("is blocked on an external call");
544     break;
545   case BlockedOnCCall_NoUnblockExc:
546     debugBelch("is blocked on an external call (exceptions were already blocked)");
547     break;
548   case BlockedOnSTM:
549     debugBelch("is blocked on an STM operation");
550     break;
551   default:
552     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
553          tso->why_blocked, tso->id, tso);
554   }
555 }
556
557
558 void
559 printThreadStatus(StgTSO *t)
560 {
561   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
562     {
563       void *label = lookupThreadLabel(t->id);
564       if (label) debugBelch("[\"%s\"] ",(char *)label);
565     }
566     if (t->what_next == ThreadRelocated) {
567         debugBelch("has been relocated...\n");
568     } else {
569         switch (t->what_next) {
570         case ThreadKilled:
571             debugBelch("has been killed");
572             break;
573         case ThreadComplete:
574             debugBelch("has completed");
575             break;
576         default:
577             printThreadBlockage(t);
578         }
579         if (t->dirty) {
580             debugBelch(" (TSO_DIRTY)");
581         } else if (t->flags & TSO_LINK_DIRTY) {
582             debugBelch(" (TSO_LINK_DIRTY)");
583         }
584         debugBelch("\n");
585     }
586 }
587
588 void
589 printAllThreads(void)
590 {
591   StgTSO *t, *next;
592   nat i, g;
593   Capability *cap;
594
595   debugBelch("all threads:\n");
596
597   for (i = 0; i < n_capabilities; i++) {
598       cap = &capabilities[i];
599       debugBelch("threads on capability %d:\n", cap->no);
600       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
601           printThreadStatus(t);
602       }
603   }
604
605   debugBelch("other threads:\n");
606   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
607     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
608       if (t->why_blocked != NotBlocked) {
609           printThreadStatus(t);
610       }
611       if (t->what_next == ThreadRelocated) {
612           next = t->_link;
613       } else {
614           next = t->global_link;
615       }
616     }
617   }
618 }
619
620 // useful from gdb
621 void 
622 printThreadQueue(StgTSO *t)
623 {
624     nat i = 0;
625     for (; t != END_TSO_QUEUE; t = t->_link) {
626         printThreadStatus(t);
627         i++;
628     }
629     debugBelch("%d threads on queue\n", i);
630 }
631
632 #endif /* DEBUG */