change throwTo to use tryWakeupThread rather than unblockOne
[ghc-hetmet.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2006
4  *
5  * Thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24  * LOCK: sched_mutex
25  */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
30  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
31  *  + 1                       (the closure to enter)
32  *  + 1                       (stg_ap_v_ret)
33  *  + 1                       (spare slot req'd by stg_ap_v_ret)
34  *
35  * A thread with this stack will bomb immediately with a stack
36  * overflow, which will increase its stack size.  
37  */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41    Create a new thread.
42
43    The new thread starts with the given stack size.  Before the
44    scheduler can run, however, this thread needs to have a closure
45    (and possibly some arguments) pushed on its stack.  See
46    pushClosure() in Schedule.h.
47
48    createGenThread() and createIOThread() (in SchedAPI.h) are
49    convenient packaged versions of this function.
50
51    currently pri (priority) is only used in a GRAN setup -- HWL
52    ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56     StgTSO *tso;
57     nat stack_size;
58
59     /* sched_mutex is *not* required */
60
61     /* First check whether we should create a thread at all */
62
63     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65     /* catch ridiculously small stack sizes */
66     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68     }
69
70     size = round_to_mblocks(size);
71     tso = (StgTSO *)allocate(cap, size);
72
73     stack_size = size - TSO_STRUCT_SIZEW;
74     TICK_ALLOC_TSO(stack_size, 0);
75
76     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78     // Always start with the compiled code evaluator
79     tso->what_next = ThreadRunGHC;
80
81     tso->why_blocked  = NotBlocked;
82     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85     tso->flags = 0;
86     tso->dirty = 1;
87     
88     tso->saved_errno = 0;
89     tso->bound = NULL;
90     tso->cap = cap;
91     
92     tso->stack_size     = stack_size;
93     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
94                           - TSO_STRUCT_SIZEW;
95     tso->sp             = (P_)&(tso->stack) + stack_size;
96
97     tso->trec = NO_TREC;
98     
99 #ifdef PROFILING
100     tso->prof.CCCS = CCS_MAIN;
101 #endif
102     
103   /* put a stop frame on the stack */
104     tso->sp -= sizeofW(StgStopFrame);
105     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106     tso->_link = END_TSO_QUEUE;
107     
108     /* Link the new thread on the global thread list.
109      */
110     ACQUIRE_LOCK(&sched_mutex);
111     tso->id = next_thread_id++;  // while we have the mutex
112     tso->global_link = g0->threads;
113     g0->threads = tso;
114     RELEASE_LOCK(&sched_mutex);
115     
116     // ToDo: report the stack size in the event?
117     traceEventCreateThread(cap, tso);
118
119     return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123  * Comparing Thread ids.
124  *
125  * This is used from STG land in the implementation of the
126  * instances of Eq/Ord for ThreadIds.
127  * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2) 
131
132   StgThreadID id1 = ((StgTSO *)tso1)->id; 
133   StgThreadID id2 = ((StgTSO *)tso2)->id;
134  
135   if (id1 < id2) return (-1);
136   if (id1 > id2) return 1;
137   return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141  * Fetching the ThreadID from an StgTSO.
142  *
143  * This is used in the implementation of Show for ThreadIds.
144  * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso) 
147 {
148   return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152    Remove a thread from a queue.
153    Fails fatally if the TSO is not on the queue.
154    -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159     StgTSO *t, *prev;
160
161     prev = NULL;
162     for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163         if (t == tso) {
164             if (prev) {
165                 setTSOLink(cap,prev,t->_link);
166                 return rtsFalse;
167             } else {
168                 *queue = t->_link;
169                 return rtsTrue;
170             }
171         }
172     }
173     barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap, 
178                          StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180     StgTSO *t, *prev;
181     rtsBool flag = rtsFalse;
182
183     prev = NULL;
184     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185         if (t == tso) {
186             if (prev) {
187                 setTSOLink(cap,prev,t->_link);
188                 flag = rtsFalse;
189             } else {
190                 *head = t->_link;
191                 flag = rtsTrue;
192             }
193             if (*tail == tso) {
194                 if (prev) {
195                     *tail = prev;
196                 } else {
197                     *tail = END_TSO_QUEUE;
198                 }
199                 return rtsTrue;
200             } else {
201                 return flag;
202             }
203         }
204     }
205     barf("removeThreadFromMVarQueue: not found");
206 }
207
208 void
209 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
210 {
211     // caller must do the write barrier, because replacing the info
212     // pointer will unlock the MVar.
213     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
214     tso->_link = END_TSO_QUEUE;
215 }
216
217 /* ----------------------------------------------------------------------------
218    unblockOne()
219
220    unblock a single thread.
221    ------------------------------------------------------------------------- */
222
223 StgTSO *
224 unblockOne (Capability *cap, StgTSO *tso)
225 {
226     return unblockOne_(cap,tso,rtsTrue); // allow migration
227 }
228
229 StgTSO *
230 unblockOne_ (Capability *cap, StgTSO *tso, 
231              rtsBool allow_migrate USED_IF_THREADS)
232 {
233   StgTSO *next;
234
235   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
236   ASSERT(tso->why_blocked != NotBlocked);
237   ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
238          tso->block_info.closure->header.info == &stg_IND_info);
239
240   next = tso->_link;
241   tso->_link = END_TSO_QUEUE;
242
243 #if defined(THREADED_RTS)
244   if (tso->cap == cap || (!tsoLocked(tso) && 
245                           allow_migrate && 
246                           RtsFlags.ParFlags.wakeupMigrate)) {
247       // We are waking up this thread on the current Capability, which
248       // might involve migrating it from the Capability it was last on.
249       if (tso->bound) {
250           ASSERT(tso->bound->task->cap == tso->cap);
251           tso->bound->task->cap = cap;
252       }
253
254       tso->cap = cap;
255       write_barrier();
256       tso->why_blocked = NotBlocked;
257       appendToRunQueue(cap,tso);
258
259       // context-switch soonish so we can migrate the new thread if
260       // necessary.  NB. not contextSwitchCapability(cap), which would
261       // force a context switch immediately.
262       cap->context_switch = 1;
263   } else {
264       // we'll try to wake it up on the Capability it was last on.
265       wakeupThreadOnCapability(cap, tso->cap, tso);
266   }
267 #else
268   tso->why_blocked = NotBlocked;
269   appendToRunQueue(cap,tso);
270
271   // context-switch soonish so we can migrate the new thread if
272   // necessary.  NB. not contextSwitchCapability(cap), which would
273   // force a context switch immediately.
274   cap->context_switch = 1;
275 #endif
276
277   traceEventThreadWakeup (cap, tso, tso->cap->no);
278
279   return next;
280 }
281
282 void
283 tryWakeupThread (Capability *cap, StgTSO *tso)
284 {
285 #ifdef THREADED_RTS
286     if (tso->cap != cap)
287     {
288         MessageWakeup *msg;
289         msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
290         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
291         msg->tso = tso;
292         sendMessage(cap, tso->cap, (Message*)msg);
293         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
294                       (lnat)tso->id, tso->cap->no);
295         return;
296     }
297 #endif
298
299     switch (tso->why_blocked)
300     {
301     case BlockedOnMsgThrowTo:
302     {
303         const StgInfoTable *i;
304         
305         i = lockClosure(tso->block_info.closure);
306         unlockClosure(tso->block_info.closure, i);
307         if (i != &stg_MSG_NULL_info) {
308             debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
309                           (lnat)tso->id, tso->block_info.throwto->header.info);
310             break; // still blocked
311         }
312
313         // remove the block frame from the stack
314         ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
315         tso->sp += 3;
316         // fall through...
317     }
318     case BlockedOnBlackHole:
319     case BlockedOnSTM:
320     {
321         // just run the thread now, if the BH is not really available,
322         // we'll block again.
323         tso->why_blocked = NotBlocked;
324         appendToRunQueue(cap,tso);
325         break;
326     }
327     default:
328         // otherwise, do nothing
329         break;
330     }
331 }
332
333 /* ----------------------------------------------------------------------------
334    awakenBlockedQueue
335
336    wakes up all the threads on the specified queue.
337    ------------------------------------------------------------------------- */
338
339 void
340 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
341 {
342     MessageBlackHole *msg;
343     const StgInfoTable *i;
344
345     ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
346            bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
347
348     for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
349          msg = msg->link) {
350         i = msg->header.info;
351         if (i != &stg_IND_info) {
352             ASSERT(i == &stg_MSG_BLACKHOLE_info);
353             tryWakeupThread(cap,msg->tso);
354         }
355     }
356
357     // overwrite the BQ with an indirection so it will be
358     // collected at the next GC.
359 #if defined(DEBUG) && !defined(THREADED_RTS)
360     // XXX FILL_SLOP, but not if THREADED_RTS because in that case
361     // another thread might be looking at this BLOCKING_QUEUE and
362     // checking the owner field at the same time.
363     bq->bh = 0; bq->queue = 0; bq->owner = 0;
364 #endif
365     OVERWRITE_INFO(bq, &stg_IND_info);
366 }
367
368 // If we update a closure that we know we BLACKHOLE'd, and the closure
369 // no longer points to the current TSO as its owner, then there may be
370 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
371 // it.  We therefore traverse the BLOCKING_QUEUEs attached to the
372 // current TSO to see if any can now be woken up.
373 void
374 checkBlockingQueues (Capability *cap, StgTSO *tso)
375 {
376     StgBlockingQueue *bq, *next;
377     StgClosure *p;
378
379     debugTraceCap(DEBUG_sched, cap,
380                   "collision occurred; checking blocking queues for thread %ld",
381                   (lnat)tso->id);
382     
383     for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
384         next = bq->link;
385
386         if (bq->header.info == &stg_IND_info) {
387             // ToDo: could short it out right here, to avoid
388             // traversing this IND multiple times.
389             continue;
390         }
391         
392         p = bq->bh;
393
394         if (p->header.info != &stg_BLACKHOLE_info ||
395             ((StgInd *)p)->indirectee != (StgClosure*)bq)
396         {
397             wakeBlockingQueue(cap,bq);
398         }   
399     }
400 }
401
402 /* ----------------------------------------------------------------------------
403    updateThunk
404
405    Update a thunk with a value.  In order to do this, we need to know
406    which TSO owns (or is evaluating) the thunk, in case we need to
407    awaken any threads that are blocked on it.
408    ------------------------------------------------------------------------- */
409
410 void
411 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
412 {
413     StgClosure *v;
414     StgTSO *owner;
415     const StgInfoTable *i;
416
417     i = thunk->header.info;
418     if (i != &stg_BLACKHOLE_info &&
419         i != &stg_CAF_BLACKHOLE_info &&
420         i != &stg_WHITEHOLE_info) {
421         updateWithIndirection(cap, thunk, val);
422         return;
423     }
424     
425     v = ((StgInd*)thunk)->indirectee;
426
427     updateWithIndirection(cap, thunk, val);
428
429     i = v->header.info;
430     if (i == &stg_TSO_info) {
431         owner = deRefTSO((StgTSO*)v);
432         if (owner != tso) {
433             checkBlockingQueues(cap, tso);
434         }
435         return;
436     }
437
438     if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
439         i != &stg_BLOCKING_QUEUE_DIRTY_info) {
440         checkBlockingQueues(cap, tso);
441         return;
442     }
443
444     owner = deRefTSO(((StgBlockingQueue*)v)->owner);
445
446     if (owner != tso) {
447         checkBlockingQueues(cap, tso);
448     } else {
449         wakeBlockingQueue(cap, (StgBlockingQueue*)v);
450     }
451 }
452
453 /* ----------------------------------------------------------------------------
454  * Wake up a thread on a Capability.
455  *
456  * This is used when the current Task is running on a Capability and
457  * wishes to wake up a thread on a different Capability.
458  * ------------------------------------------------------------------------- */
459
460 #ifdef THREADED_RTS
461
462 void
463 wakeupThreadOnCapability (Capability *cap,
464                           Capability *other_cap, 
465                           StgTSO *tso)
466 {
467     MessageWakeup *msg;
468
469     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
470     if (tso->bound) {
471         ASSERT(tso->bound->task->cap == tso->cap);
472         tso->bound->task->cap = other_cap;
473     }
474     tso->cap = other_cap;
475
476     ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
477            tso->block_info.closure->header.info == &stg_IND_info);
478
479     ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
480
481     msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
482     SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
483     msg->tso = tso;
484     tso->block_info.closure = (StgClosure *)msg;
485     dirty_TSO(cap, tso);
486     write_barrier();
487     tso->why_blocked = BlockedOnMsgWakeup;
488
489     sendMessage(cap, other_cap, (Message*)msg);
490 }
491
492 #endif /* THREADED_RTS */
493
494 /* ---------------------------------------------------------------------------
495  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
496  * used by Control.Concurrent for error checking.
497  * ------------------------------------------------------------------------- */
498  
499 HsBool
500 rtsSupportsBoundThreads(void)
501 {
502 #if defined(THREADED_RTS)
503   return HS_BOOL_TRUE;
504 #else
505   return HS_BOOL_FALSE;
506 #endif
507 }
508
509 /* ---------------------------------------------------------------------------
510  * isThreadBound(tso): check whether tso is bound to an OS thread.
511  * ------------------------------------------------------------------------- */
512  
513 StgBool
514 isThreadBound(StgTSO* tso USED_IF_THREADS)
515 {
516 #if defined(THREADED_RTS)
517   return (tso->bound != NULL);
518 #endif
519   return rtsFalse;
520 }
521
522 /* ----------------------------------------------------------------------------
523  * Debugging: why is a thread blocked
524  * ------------------------------------------------------------------------- */
525
526 #if DEBUG
527 void
528 printThreadBlockage(StgTSO *tso)
529 {
530   switch (tso->why_blocked) {
531   case BlockedOnRead:
532     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
533     break;
534   case BlockedOnWrite:
535     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
536     break;
537 #if defined(mingw32_HOST_OS)
538     case BlockedOnDoProc:
539     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
540     break;
541 #endif
542   case BlockedOnDelay:
543     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
544     break;
545   case BlockedOnMVar:
546     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
547     break;
548   case BlockedOnBlackHole:
549       debugBelch("is blocked on a black hole %p", 
550                  ((StgBlockingQueue*)tso->block_info.bh->bh));
551     break;
552   case BlockedOnMsgWakeup:
553     debugBelch("is blocked on a wakeup message");
554     break;
555   case BlockedOnMsgThrowTo:
556     debugBelch("is blocked on a throwto message");
557     break;
558   case NotBlocked:
559     debugBelch("is not blocked");
560     break;
561   case BlockedOnCCall:
562     debugBelch("is blocked on an external call");
563     break;
564   case BlockedOnCCall_NoUnblockExc:
565     debugBelch("is blocked on an external call (exceptions were already blocked)");
566     break;
567   case BlockedOnSTM:
568     debugBelch("is blocked on an STM operation");
569     break;
570   default:
571     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
572          tso->why_blocked, tso->id, tso);
573   }
574 }
575
576
577 void
578 printThreadStatus(StgTSO *t)
579 {
580   debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
581     {
582       void *label = lookupThreadLabel(t->id);
583       if (label) debugBelch("[\"%s\"] ",(char *)label);
584     }
585     if (t->what_next == ThreadRelocated) {
586         debugBelch("has been relocated...\n");
587     } else {
588         switch (t->what_next) {
589         case ThreadKilled:
590             debugBelch("has been killed");
591             break;
592         case ThreadComplete:
593             debugBelch("has completed");
594             break;
595         default:
596             printThreadBlockage(t);
597         }
598         if (t->dirty) {
599             debugBelch(" (TSO_DIRTY)");
600         } else if (t->flags & TSO_LINK_DIRTY) {
601             debugBelch(" (TSO_LINK_DIRTY)");
602         }
603         debugBelch("\n");
604     }
605 }
606
607 void
608 printAllThreads(void)
609 {
610   StgTSO *t, *next;
611   nat i, g;
612   Capability *cap;
613
614   debugBelch("all threads:\n");
615
616   for (i = 0; i < n_capabilities; i++) {
617       cap = &capabilities[i];
618       debugBelch("threads on capability %d:\n", cap->no);
619       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
620           printThreadStatus(t);
621       }
622   }
623
624   debugBelch("other threads:\n");
625   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
626     for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
627       if (t->why_blocked != NotBlocked) {
628           printThreadStatus(t);
629       }
630       if (t->what_next == ThreadRelocated) {
631           next = t->_link;
632       } else {
633           next = t->global_link;
634       }
635     }
636   }
637 }
638
639 // useful from gdb
640 void 
641 printThreadQueue(StgTSO *t)
642 {
643     nat i = 0;
644     for (; t != END_TSO_QUEUE; t = t->_link) {
645         printThreadStatus(t);
646         i++;
647     }
648     debugBelch("%d threads on queue\n", i);
649 }
650
651 #endif /* DEBUG */