1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
79 #define USED_WHEN_THREADED_RTS STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
86 #define USED_WHEN_SMP STG_UNUSED
89 /* -----------------------------------------------------------------------------
91 * -------------------------------------------------------------------------- */
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
99 In GranSim we have a runnable and a blocked queue for each processor.
100 In order to minimise code changes new arrays run_queue_hds/tls
101 are created. run_queue_hd is then a short cut (macro) for
102 run_queue_hds[CurrentProc] (see GranSim.h).
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109 the std RTS (i.e. we are cheating). However, we don't use this list in
110 the GranSim specific code at the moment (so we are only potentially
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
122 /* Threads blocked on blackholes.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *blackhole_queue = NULL;
128 /* The blackhole_queue should be checked for threads to wake up. See
129 * Schedule.h for more thorough comment.
130 * LOCK: none (doesn't matter if we miss an update)
132 rtsBool blackholes_need_checking = rtsFalse;
134 /* Linked list of all threads.
135 * Used for detecting garbage collected threads.
136 * LOCK: sched_mutex+capability, or all capabilities
138 StgTSO *all_threads = NULL;
140 /* flag set by signal handler to precipitate a context switch
141 * LOCK: none (just an advisory flag)
143 int context_switch = 0;
145 /* flag that tracks whether we have done any execution in this time slice.
146 * LOCK: currently none, perhaps we should lock (but needs to be
147 * updated in the fast path of the scheduler).
149 nat recent_activity = ACTIVITY_YES;
151 /* if this flag is set as well, give up execution
152 * LOCK: none (changes once, from false->true)
154 rtsBool interrupted = rtsFalse;
156 /* Next thread ID to allocate.
159 static StgThreadID next_thread_id = 1;
161 /* The smallest stack size that makes any sense is:
162 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
163 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
164 * + 1 (the closure to enter)
166 * + 1 (spare slot req'd by stg_ap_v_ret)
168 * A thread with this stack will bomb immediately with a stack
169 * overflow, which will increase its stack size.
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
177 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
178 * exists - earlier gccs apparently didn't.
184 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185 * in an MT setting, needed to signal that a worker thread shouldn't hang around
186 * in the scheduler when it is out of work.
188 rtsBool shutting_down_scheduler = rtsFalse;
191 * This mutex protects most of the global scheduler data in
192 * the THREADED_RTS and (inc. SMP) runtime.
194 #if defined(THREADED_RTS)
198 #if defined(PARALLEL_HASKELL)
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
204 /* -----------------------------------------------------------------------------
205 * static function prototypes
206 * -------------------------------------------------------------------------- */
208 static Capability *schedule (Capability *initialCapability, Task *task);
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
215 static void schedulePreLoop (void);
217 static void schedulePushWork(Capability *cap, Task *task);
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
239 nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
253 rtsBool stop_at_atomically, StgPtr stop_here);
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);
270 static char *whatNext_strs[] = {
280 /* -----------------------------------------------------------------------------
281 * Putting a thread on the run queue: different scheduling policies
282 * -------------------------------------------------------------------------- */
285 addToRunQueue( Capability *cap, StgTSO *t )
287 #if defined(PARALLEL_HASKELL)
288 if (RtsFlags.ParFlags.doFairScheduling) {
289 // this does round-robin scheduling; good for concurrency
290 appendToRunQueue(cap,t);
292 // this does unfair scheduling; good for parallelism
293 pushOnRunQueue(cap,t);
296 // this does round-robin scheduling; good for concurrency
297 appendToRunQueue(cap,t);
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
314 In a GranSim setup this loop iterates over the global event queue.
315 This revolves around the global event queue, which determines what
316 to do next. Therefore, it's more complicated than either the
317 concurrent or the parallel (GUM) setup.
320 GUM iterates over incoming messages.
321 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322 and sends out a fish whenever it has nothing to do; in-between
323 doing the actual reductions (shared code below) it processes the
324 incoming messages and deals with delayed operations
325 (see PendingFetches).
326 This is not the ugliest code you could imagine, but it's bloody close.
328 ------------------------------------------------------------------------ */
331 schedule (Capability *initialCapability, Task *task)
335 StgThreadReturnCode ret;
338 #elif defined(PARALLEL_HASKELL)
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
348 #if defined(THREADED_RTS)
349 rtsBool first = rtsTrue;
352 cap = initialCapability;
354 // Pre-condition: this task owns initialCapability.
355 // The sched_mutex is *NOT* held
356 // NB. on return, we still hold a capability.
359 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360 task, initialCapability);
365 // -----------------------------------------------------------
366 // Scheduler loop starts here:
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION (!receivedFinish)
371 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
373 #define TERMINATION_CONDITION rtsTrue
376 while (TERMINATION_CONDITION) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
384 #if defined(THREADED_RTS)
386 // don't yield the first time, we want a chance to run this
387 // thread for a bit, even if there are others banging at the
390 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
392 // Yield the capability to higher-priority tasks if necessary.
393 yieldCapability(&cap, task);
398 schedulePushWork(cap,task);
401 // Check whether we have re-entered the RTS from Haskell without
402 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
404 if (cap->in_haskell) {
405 errorBelch("schedule: re-entered unsafely.\n"
406 " Perhaps a 'foreign import unsafe' should be 'safe'?");
407 stg_exit(EXIT_FAILURE);
411 // Test for interruption. If interrupted==rtsTrue, then either
412 // we received a keyboard interrupt (^C), or the scheduler is
413 // trying to shut down all the tasks (shutting_down_scheduler) in
419 discardSparksCap(cap);
421 if (shutting_down_scheduler) {
422 IF_DEBUG(scheduler, sched_belch("shutting down"));
423 // If we are a worker, just exit. If we're a bound thread
424 // then we will exit below when we've removed our TSO from
426 if (task->tso == NULL && emptyRunQueue(cap)) {
430 IF_DEBUG(scheduler, sched_belch("interrupted"));
435 // If the run queue is empty, take a spark and turn it into a thread.
437 if (emptyRunQueue(cap)) {
439 spark = findSpark(cap);
442 sched_belch("turning spark of closure %p into a thread",
443 (StgClosure *)spark));
444 createSparkThread(cap,spark);
450 scheduleStartSignalHandlers(cap);
452 // Only check the black holes here if we've nothing else to do.
453 // During normal execution, the black hole list only gets checked
454 // at GC time, to avoid repeatedly traversing this possibly long
455 // list each time around the scheduler.
456 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
458 scheduleCheckBlockedThreads(cap);
460 scheduleDetectDeadlock(cap,task);
462 // Normally, the only way we can get here with no threads to
463 // run is if a keyboard interrupt received during
464 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465 // Additionally, it is not fatal for the
466 // threaded RTS to reach here with no threads to run.
468 // win32: might be here due to awaitEvent() being abandoned
469 // as a result of a console event having been delivered.
470 if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474 continue; // nothing to do
477 #if defined(PARALLEL_HASKELL)
478 scheduleSendPendingMessages();
479 if (emptyRunQueue(cap) && scheduleActivateSpark())
483 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
486 /* If we still have no work we need to send a FISH to get a spark
488 if (emptyRunQueue(cap)) {
489 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490 ASSERT(rtsFalse); // should not happen at the moment
492 // from here: non-empty run queue.
493 // TODO: merge above case with this, only one call processMessages() !
494 if (PacketsWaiting()) { /* process incoming messages, if
495 any pending... only in else
496 because getRemoteWork waits for
498 receivedFinish = processMessages();
503 scheduleProcessEvent(event);
507 // Get a thread to run
509 t = popRunQueue(cap);
511 #if defined(GRAN) || defined(PAR)
512 scheduleGranParReport(); // some kind of debuging output
514 // Sanity check the thread we're about to run. This can be
515 // expensive if there is lots of thread switching going on...
516 IF_DEBUG(sanity,checkTSO(t));
519 #if defined(THREADED_RTS)
520 // Check whether we can run this thread in the current task.
521 // If not, we have to pass our capability to the right task.
523 Task *bound = t->bound;
528 sched_belch("### Running thread %d in bound thread",
530 // yes, the Haskell thread is bound to the current native thread
533 sched_belch("### thread %d bound to another OS thread",
535 // no, bound to a different Haskell thread: pass to that thread
536 pushOnRunQueue(cap,t);
540 // The thread we want to run is unbound.
543 sched_belch("### this OS thread cannot run thread %d", t->id));
544 // no, the current native thread is bound to a different
545 // Haskell thread, so pass it to any worker thread
546 pushOnRunQueue(cap,t);
553 cap->r.rCurrentTSO = t;
555 /* context switches are initiated by the timer signal, unless
556 * the user specified "context switch as often as possible", with
559 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560 && !emptyThreadQueues(cap)) {
566 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
567 (long)t->id, whatNext_strs[t->what_next]));
569 #if defined(PROFILING)
570 startHeapProfTimer();
573 // ----------------------------------------------------------------------
574 // Run the current thread
576 prev_what_next = t->what_next;
578 errno = t->saved_errno;
579 cap->in_haskell = rtsTrue;
581 recent_activity = ACTIVITY_YES;
583 switch (prev_what_next) {
587 /* Thread already finished, return to scheduler. */
588 ret = ThreadFinished;
594 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595 cap = regTableToCapability(r);
600 case ThreadInterpret:
601 cap = interpretBCO(cap);
606 barf("schedule: invalid what_next field");
609 cap->in_haskell = rtsFalse;
611 // The TSO might have moved, eg. if it re-entered the RTS and a GC
612 // happened. So find the new location:
613 t = cap->r.rCurrentTSO;
615 // We have run some Haskell code: there might be blackhole-blocked
616 // threads to wake up now.
617 // Lock-free test here should be ok, we're just setting a flag.
618 if ( blackhole_queue != END_TSO_QUEUE ) {
619 blackholes_need_checking = rtsTrue;
622 // And save the current errno in this thread.
623 // XXX: possibly bogus for SMP because this thread might already
624 // be running again, see code below.
625 t->saved_errno = errno;
628 // If ret is ThreadBlocked, and this Task is bound to the TSO that
629 // blocked, we are in limbo - the TSO is now owned by whatever it
630 // is blocked on, and may in fact already have been woken up,
631 // perhaps even on a different Capability. It may be the case
632 // that task->cap != cap. We better yield this Capability
633 // immediately and return to normaility.
634 if (ret == ThreadBlocked) {
636 debugBelch("--<< thread %d (%s) stopped: blocked\n",
637 t->id, whatNext_strs[t->what_next]));
642 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
644 // ----------------------------------------------------------------------
646 // Costs for the scheduler are assigned to CCS_SYSTEM
647 #if defined(PROFILING)
652 #if defined(THREADED_RTS)
653 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
654 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
655 IF_DEBUG(scheduler,debugBelch("sched: "););
658 schedulePostRunThread();
660 ready_to_gc = rtsFalse;
664 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
668 scheduleHandleStackOverflow(cap,task,t);
672 if (scheduleHandleYield(cap, t, prev_what_next)) {
673 // shortcut for switching between compiler/interpreter:
679 scheduleHandleThreadBlocked(t);
683 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
684 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
688 barf("schedule: invalid thread return code %d", (int)ret);
691 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
692 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
693 } /* end of while() */
695 IF_PAR_DEBUG(verbose,
696 debugBelch("== Leaving schedule() after having received Finish\n"));
699 /* ----------------------------------------------------------------------------
700 * Setting up the scheduler loop
701 * ------------------------------------------------------------------------- */
704 schedulePreLoop(void)
707 /* set up first event to get things going */
708 /* ToDo: assign costs for system setup and init MainTSO ! */
709 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
714 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
716 G_TSO(CurrentTSO, 5));
718 if (RtsFlags.GranFlags.Light) {
719 /* Save current time; GranSim Light only */
720 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
725 /* -----------------------------------------------------------------------------
728 * Push work to other Capabilities if we have some.
729 * -------------------------------------------------------------------------- */
733 schedulePushWork(Capability *cap USED_WHEN_SMP,
734 Task *task USED_WHEN_SMP)
736 Capability *free_caps[n_capabilities], *cap0;
739 // Check whether we have more threads on our run queue, or sparks
740 // in our pool, that we could hand to another Capability.
741 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
742 && sparkPoolSizeCap(cap) < 2) {
746 // First grab as many free Capabilities as we can.
747 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748 cap0 = &capabilities[i];
749 if (cap != cap0 && tryGrabCapability(cap0,task)) {
750 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751 // it already has some work, we just grabbed it at
752 // the wrong moment. Or maybe it's deadlocked!
753 releaseCapability(cap0);
755 free_caps[n_free_caps++] = cap0;
760 // we now have n_free_caps free capabilities stashed in
761 // free_caps[]. Share our run queue equally with them. This is
762 // probably the simplest thing we could do; improvements we might
763 // want to do include:
765 // - giving high priority to moving relatively new threads, on
766 // the gournds that they haven't had time to build up a
767 // working set in the cache on this CPU/Capability.
769 // - giving low priority to moving long-lived threads
771 if (n_free_caps > 0) {
772 StgTSO *prev, *t, *next;
773 rtsBool pushed_to_all;
775 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
778 pushed_to_all = rtsFalse;
780 if (cap->run_queue_hd != END_TSO_QUEUE) {
781 prev = cap->run_queue_hd;
783 prev->link = END_TSO_QUEUE;
784 for (; t != END_TSO_QUEUE; t = next) {
786 t->link = END_TSO_QUEUE;
787 if (t->what_next == ThreadRelocated
788 || t->bound == task) { // don't move my bound thread
791 } else if (i == n_free_caps) {
792 pushed_to_all = rtsTrue;
798 IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
799 appendToRunQueue(free_caps[i],t);
800 if (t->bound) { t->bound->cap = free_caps[i]; }
804 cap->run_queue_tl = prev;
807 // If there are some free capabilities that we didn't push any
808 // threads to, then try to push a spark to each one.
809 if (!pushed_to_all) {
811 // i is the next free capability to push to
812 for (; i < n_free_caps; i++) {
813 if (emptySparkPoolCap(free_caps[i])) {
814 spark = findSpark(cap);
816 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
817 newSpark(&(free_caps[i]->r), spark);
823 // release the capabilities
824 for (i = 0; i < n_free_caps; i++) {
825 task->cap = free_caps[i];
826 releaseCapability(free_caps[i]);
829 task->cap = cap; // reset to point to our Capability.
833 /* ----------------------------------------------------------------------------
834 * Start any pending signal handlers
835 * ------------------------------------------------------------------------- */
837 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
839 scheduleStartSignalHandlers(Capability *cap)
841 if (signals_pending()) { // safe outside the lock
842 startSignalHandlers(cap);
847 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
852 /* ----------------------------------------------------------------------------
853 * Check for blocked threads that can be woken up.
854 * ------------------------------------------------------------------------- */
857 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
859 #if !defined(THREADED_RTS)
861 // Check whether any waiting threads need to be woken up. If the
862 // run queue is empty, and there are no other tasks running, we
863 // can wait indefinitely for something to happen.
865 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
867 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
873 /* ----------------------------------------------------------------------------
874 * Check for threads blocked on BLACKHOLEs that can be woken up
875 * ------------------------------------------------------------------------- */
877 scheduleCheckBlackHoles (Capability *cap)
879 if ( blackholes_need_checking ) // check without the lock first
881 ACQUIRE_LOCK(&sched_mutex);
882 if ( blackholes_need_checking ) {
883 checkBlackHoles(cap);
884 blackholes_need_checking = rtsFalse;
886 RELEASE_LOCK(&sched_mutex);
890 /* ----------------------------------------------------------------------------
891 * Detect deadlock conditions and attempt to resolve them.
892 * ------------------------------------------------------------------------- */
895 scheduleDetectDeadlock (Capability *cap, Task *task)
898 #if defined(PARALLEL_HASKELL)
899 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
909 if ( emptyThreadQueues(cap) )
911 #if defined(THREADED_RTS)
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
918 if (recent_activity != ACTIVITY_INACTIVE) return;
921 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
928 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
929 recent_activity = ACTIVITY_DONE_GC;
931 if ( !emptyRunQueue(cap) ) return;
933 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
934 /* If we have user-installed signal handlers, then wait
935 * for signals to arrive rather then bombing out with a
938 if ( anyUserHandlers() ) {
940 sched_belch("still deadlocked, waiting for signals..."));
944 if (signals_pending()) {
945 startSignalHandlers(cap);
948 // either we have threads to run, or we were interrupted:
949 ASSERT(!emptyRunQueue(cap) || interrupted);
953 #if !defined(THREADED_RTS)
954 /* Probably a real deadlock. Send the current main thread the
955 * Deadlock exception.
958 switch (task->tso->why_blocked) {
960 case BlockedOnBlackHole:
961 case BlockedOnException:
963 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
966 barf("deadlock: main thread blocked in a strange way");
974 /* ----------------------------------------------------------------------------
975 * Process an event (GRAN only)
976 * ------------------------------------------------------------------------- */
980 scheduleProcessEvent(rtsEvent *event)
984 if (RtsFlags.GranFlags.Light)
985 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
987 /* adjust time based on time-stamp */
988 if (event->time > CurrentTime[CurrentProc] &&
989 event->evttype != ContinueThread)
990 CurrentTime[CurrentProc] = event->time;
992 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
993 if (!RtsFlags.GranFlags.Light)
996 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
998 /* main event dispatcher in GranSim */
999 switch (event->evttype) {
1000 /* Should just be continuing execution */
1001 case ContinueThread:
1002 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1003 /* ToDo: check assertion
1004 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1005 run_queue_hd != END_TSO_QUEUE);
1007 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1008 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1009 procStatus[CurrentProc]==Fetching) {
1010 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1011 CurrentTSO->id, CurrentTSO, CurrentProc);
1014 /* Ignore ContinueThreads for completed threads */
1015 if (CurrentTSO->what_next == ThreadComplete) {
1016 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1017 CurrentTSO->id, CurrentTSO, CurrentProc);
1020 /* Ignore ContinueThreads for threads that are being migrated */
1021 if (PROCS(CurrentTSO)==Nowhere) {
1022 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1023 CurrentTSO->id, CurrentTSO, CurrentProc);
1026 /* The thread should be at the beginning of the run queue */
1027 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1028 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1029 CurrentTSO->id, CurrentTSO, CurrentProc);
1030 break; // run the thread anyway
1033 new_event(proc, proc, CurrentTime[proc],
1035 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1037 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1038 break; // now actually run the thread; DaH Qu'vam yImuHbej
1041 do_the_fetchnode(event);
1042 goto next_thread; /* handle next event in event queue */
1045 do_the_globalblock(event);
1046 goto next_thread; /* handle next event in event queue */
1049 do_the_fetchreply(event);
1050 goto next_thread; /* handle next event in event queue */
1052 case UnblockThread: /* Move from the blocked queue to the tail of */
1053 do_the_unblock(event);
1054 goto next_thread; /* handle next event in event queue */
1056 case ResumeThread: /* Move from the blocked queue to the tail of */
1057 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1058 event->tso->gran.blocktime +=
1059 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1060 do_the_startthread(event);
1061 goto next_thread; /* handle next event in event queue */
1064 do_the_startthread(event);
1065 goto next_thread; /* handle next event in event queue */
1068 do_the_movethread(event);
1069 goto next_thread; /* handle next event in event queue */
1072 do_the_movespark(event);
1073 goto next_thread; /* handle next event in event queue */
1076 do_the_findwork(event);
1077 goto next_thread; /* handle next event in event queue */
1080 barf("Illegal event type %u\n", event->evttype);
1083 /* This point was scheduler_loop in the old RTS */
1085 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1087 TimeOfLastEvent = CurrentTime[CurrentProc];
1088 TimeOfNextEvent = get_time_of_next_event();
1089 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1090 // CurrentTSO = ThreadQueueHd;
1092 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1095 if (RtsFlags.GranFlags.Light)
1096 GranSimLight_leave_system(event, &ActiveTSO);
1098 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1101 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1103 /* in a GranSim setup the TSO stays on the run queue */
1105 /* Take a thread from the run queue. */
1106 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1109 debugBelch("GRAN: About to run current thread, which is\n");
1112 context_switch = 0; // turned on via GranYield, checking events and time slice
1115 DumpGranEvent(GR_SCHEDULE, t));
1117 procStatus[CurrentProc] = Busy;
1121 /* ----------------------------------------------------------------------------
1122 * Send pending messages (PARALLEL_HASKELL only)
1123 * ------------------------------------------------------------------------- */
1125 #if defined(PARALLEL_HASKELL)
1127 scheduleSendPendingMessages(void)
1133 # if defined(PAR) // global Mem.Mgmt., omit for now
1134 if (PendingFetches != END_BF_QUEUE) {
1139 if (RtsFlags.ParFlags.BufferTime) {
1140 // if we use message buffering, we must send away all message
1141 // packets which have become too old...
1147 /* ----------------------------------------------------------------------------
1148 * Activate spark threads (PARALLEL_HASKELL only)
1149 * ------------------------------------------------------------------------- */
1151 #if defined(PARALLEL_HASKELL)
1153 scheduleActivateSpark(void)
1156 ASSERT(emptyRunQueue());
1157 /* We get here if the run queue is empty and want some work.
1158 We try to turn a spark into a thread, and add it to the run queue,
1159 from where it will be picked up in the next iteration of the scheduler
1163 /* :-[ no local threads => look out for local sparks */
1164 /* the spark pool for the current PE */
1165 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1166 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1167 pool->hd < pool->tl) {
1169 * ToDo: add GC code check that we really have enough heap afterwards!!
1171 * If we're here (no runnable threads) and we have pending
1172 * sparks, we must have a space problem. Get enough space
1173 * to turn one of those pending sparks into a
1177 spark = findSpark(rtsFalse); /* get a spark */
1178 if (spark != (rtsSpark) NULL) {
1179 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1180 IF_PAR_DEBUG(fish, // schedule,
1181 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1182 tso->id, tso, advisory_thread_count));
1184 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1185 IF_PAR_DEBUG(fish, // schedule,
1186 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1188 return rtsFalse; /* failed to generate a thread */
1189 } /* otherwise fall through & pick-up new tso */
1191 IF_PAR_DEBUG(fish, // schedule,
1192 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1193 spark_queue_len(pool)));
1194 return rtsFalse; /* failed to generate a thread */
1196 return rtsTrue; /* success in generating a thread */
1197 } else { /* no more threads permitted or pool empty */
1198 return rtsFalse; /* failed to generateThread */
1201 tso = NULL; // avoid compiler warning only
1202 return rtsFalse; /* dummy in non-PAR setup */
1205 #endif // PARALLEL_HASKELL
1207 /* ----------------------------------------------------------------------------
1208 * Get work from a remote node (PARALLEL_HASKELL only)
1209 * ------------------------------------------------------------------------- */
1211 #if defined(PARALLEL_HASKELL)
1213 scheduleGetRemoteWork(rtsBool *receivedFinish)
1215 ASSERT(emptyRunQueue());
1217 if (RtsFlags.ParFlags.BufferTime) {
1218 IF_PAR_DEBUG(verbose,
1219 debugBelch("...send all pending data,"));
1222 for (i=1; i<=nPEs; i++)
1223 sendImmediately(i); // send all messages away immediately
1227 //++EDEN++ idle() , i.e. send all buffers, wait for work
1228 // suppress fishing in EDEN... just look for incoming messages
1229 // (blocking receive)
1230 IF_PAR_DEBUG(verbose,
1231 debugBelch("...wait for incoming messages...\n"));
1232 *receivedFinish = processMessages(); // blocking receive...
1234 // and reenter scheduling loop after having received something
1235 // (return rtsFalse below)
1237 # else /* activate SPARKS machinery */
1238 /* We get here, if we have no work, tried to activate a local spark, but still
1239 have no work. We try to get a remote spark, by sending a FISH message.
1240 Thread migration should be added here, and triggered when a sequence of
1241 fishes returns without work. */
1242 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1244 /* =8-[ no local sparks => look for work on other PEs */
1246 * We really have absolutely no work. Send out a fish
1247 * (there may be some out there already), and wait for
1248 * something to arrive. We clearly can't run any threads
1249 * until a SCHEDULE or RESUME arrives, and so that's what
1250 * we're hoping to see. (Of course, we still have to
1251 * respond to other types of messages.)
1253 rtsTime now = msTime() /*CURRENT_TIME*/;
1254 IF_PAR_DEBUG(verbose,
1255 debugBelch("-- now=%ld\n", now));
1256 IF_PAR_DEBUG(fish, // verbose,
1257 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1258 (last_fish_arrived_at!=0 &&
1259 last_fish_arrived_at+delay > now)) {
1260 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1261 now, last_fish_arrived_at+delay,
1262 last_fish_arrived_at,
1266 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1267 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1268 if (last_fish_arrived_at==0 ||
1269 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1270 /* outstandingFishes is set in sendFish, processFish;
1271 avoid flooding system with fishes via delay */
1272 next_fish_to_send_at = 0;
1274 /* ToDo: this should be done in the main scheduling loop to avoid the
1275 busy wait here; not so bad if fish delay is very small */
1276 int iq = 0; // DEBUGGING -- HWL
1277 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1278 /* send a fish when ready, but process messages that arrive in the meantime */
1280 if (PacketsWaiting()) {
1282 *receivedFinish = processMessages();
1285 } while (!*receivedFinish || now<next_fish_to_send_at);
1286 // JB: This means the fish could become obsolete, if we receive
1287 // work. Better check for work again?
1288 // last line: while (!receivedFinish || !haveWork || now<...)
1289 // next line: if (receivedFinish || haveWork )
1291 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1292 return rtsFalse; // NB: this will leave scheduler loop
1293 // immediately after return!
1295 IF_PAR_DEBUG(fish, // verbose,
1296 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1300 // JB: IMHO, this should all be hidden inside sendFish(...)
1302 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1305 // Global statistics: count no. of fishes
1306 if (RtsFlags.ParFlags.ParStats.Global &&
1307 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1308 globalParStats.tot_fish_mess++;
1312 /* delayed fishes must have been sent by now! */
1313 next_fish_to_send_at = 0;
1316 *receivedFinish = processMessages();
1317 # endif /* SPARKS */
1320 /* NB: this function always returns rtsFalse, meaning the scheduler
1321 loop continues with the next iteration;
1323 return code means success in finding work; we enter this function
1324 if there is no local work, thus have to send a fish which takes
1325 time until it arrives with work; in the meantime we should process
1326 messages in the main loop;
1329 #endif // PARALLEL_HASKELL
1331 /* ----------------------------------------------------------------------------
1332 * PAR/GRAN: Report stats & debugging info(?)
1333 * ------------------------------------------------------------------------- */
1335 #if defined(PAR) || defined(GRAN)
1337 scheduleGranParReport(void)
1339 ASSERT(run_queue_hd != END_TSO_QUEUE);
1341 /* Take a thread from the run queue, if we have work */
1342 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1344 /* If this TSO has got its outport closed in the meantime,
1345 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1346 * It has to be marked as TH_DEAD for this purpose.
1347 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1349 JB: TODO: investigate wether state change field could be nuked
1350 entirely and replaced by the normal tso state (whatnext
1351 field). All we want to do is to kill tsos from outside.
1354 /* ToDo: write something to the log-file
1355 if (RTSflags.ParFlags.granSimStats && !sameThread)
1356 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1360 /* the spark pool for the current PE */
1361 pool = &(cap.r.rSparks); // cap = (old) MainCap
1364 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1365 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1368 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1369 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1371 if (RtsFlags.ParFlags.ParStats.Full &&
1372 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1373 (emitSchedule || // forced emit
1374 (t && LastTSO && t->id != LastTSO->id))) {
1376 we are running a different TSO, so write a schedule event to log file
1377 NB: If we use fair scheduling we also have to write a deschedule
1378 event for LastTSO; with unfair scheduling we know that the
1379 previous tso has blocked whenever we switch to another tso, so
1380 we don't need it in GUM for now
1382 IF_PAR_DEBUG(fish, // schedule,
1383 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1385 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1386 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1387 emitSchedule = rtsFalse;
1392 /* ----------------------------------------------------------------------------
1393 * After running a thread...
1394 * ------------------------------------------------------------------------- */
1397 schedulePostRunThread(void)
1400 /* HACK 675: if the last thread didn't yield, make sure to print a
1401 SCHEDULE event to the log file when StgRunning the next thread, even
1402 if it is the same one as before */
1404 TimeOfLastYield = CURRENT_TIME;
1407 /* some statistics gathering in the parallel case */
1409 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1413 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1414 globalGranStats.tot_heapover++;
1416 globalParStats.tot_heapover++;
1423 DumpGranEvent(GR_DESCHEDULE, t));
1424 globalGranStats.tot_stackover++;
1427 // DumpGranEvent(GR_DESCHEDULE, t);
1428 globalParStats.tot_stackover++;
1432 case ThreadYielding:
1435 DumpGranEvent(GR_DESCHEDULE, t));
1436 globalGranStats.tot_yields++;
1439 // DumpGranEvent(GR_DESCHEDULE, t);
1440 globalParStats.tot_yields++;
1447 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1448 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1449 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1450 if (t->block_info.closure!=(StgClosure*)NULL)
1451 print_bq(t->block_info.closure);
1454 // ??? needed; should emit block before
1456 DumpGranEvent(GR_DESCHEDULE, t));
1457 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1460 ASSERT(procStatus[CurrentProc]==Busy ||
1461 ((procStatus[CurrentProc]==Fetching) &&
1462 (t->block_info.closure!=(StgClosure*)NULL)));
1463 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1464 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1465 procStatus[CurrentProc]==Fetching))
1466 procStatus[CurrentProc] = Idle;
1469 //++PAR++ blockThread() writes the event (change?)
1473 case ThreadFinished:
1477 barf("parGlobalStats: unknown return code");
1483 /* -----------------------------------------------------------------------------
1484 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1485 * -------------------------------------------------------------------------- */
1488 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1490 // did the task ask for a large block?
1491 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1492 // if so, get one and push it on the front of the nursery.
1496 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1499 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1500 (long)t->id, whatNext_strs[t->what_next], blocks));
1502 // don't do this if the nursery is (nearly) full, we'll GC first.
1503 if (cap->r.rCurrentNursery->link != NULL ||
1504 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1505 // if the nursery has only one block.
1508 bd = allocGroup( blocks );
1510 cap->r.rNursery->n_blocks += blocks;
1512 // link the new group into the list
1513 bd->link = cap->r.rCurrentNursery;
1514 bd->u.back = cap->r.rCurrentNursery->u.back;
1515 if (cap->r.rCurrentNursery->u.back != NULL) {
1516 cap->r.rCurrentNursery->u.back->link = bd;
1519 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1520 g0s0 == cap->r.rNursery);
1522 cap->r.rNursery->blocks = bd;
1524 cap->r.rCurrentNursery->u.back = bd;
1526 // initialise it as a nursery block. We initialise the
1527 // step, gen_no, and flags field of *every* sub-block in
1528 // this large block, because this is easier than making
1529 // sure that we always find the block head of a large
1530 // block whenever we call Bdescr() (eg. evacuate() and
1531 // isAlive() in the GC would both have to do this, at
1535 for (x = bd; x < bd + blocks; x++) {
1536 x->step = cap->r.rNursery;
1542 // This assert can be a killer if the app is doing lots
1543 // of large block allocations.
1544 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1546 // now update the nursery to point to the new block
1547 cap->r.rCurrentNursery = bd;
1549 // we might be unlucky and have another thread get on the
1550 // run queue before us and steal the large block, but in that
1551 // case the thread will just end up requesting another large
1553 pushOnRunQueue(cap,t);
1554 return rtsFalse; /* not actually GC'ing */
1559 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1560 (long)t->id, whatNext_strs[t->what_next]));
1562 ASSERT(!is_on_queue(t,CurrentProc));
1563 #elif defined(PARALLEL_HASKELL)
1564 /* Currently we emit a DESCHEDULE event before GC in GUM.
1565 ToDo: either add separate event to distinguish SYSTEM time from rest
1566 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1567 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1568 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1569 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1570 emitSchedule = rtsTrue;
1574 pushOnRunQueue(cap,t);
1576 /* actual GC is done at the end of the while loop in schedule() */
1579 /* -----------------------------------------------------------------------------
1580 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1581 * -------------------------------------------------------------------------- */
1584 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1586 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1587 (long)t->id, whatNext_strs[t->what_next]));
1588 /* just adjust the stack for this thread, then pop it back
1592 /* enlarge the stack */
1593 StgTSO *new_t = threadStackOverflow(cap, t);
1595 /* The TSO attached to this Task may have moved, so update the
1598 if (task->tso == t) {
1601 pushOnRunQueue(cap,new_t);
1605 /* -----------------------------------------------------------------------------
1606 * Handle a thread that returned to the scheduler with ThreadYielding
1607 * -------------------------------------------------------------------------- */
1610 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1612 // Reset the context switch flag. We don't do this just before
1613 // running the thread, because that would mean we would lose ticks
1614 // during GC, which can lead to unfair scheduling (a thread hogs
1615 // the CPU because the tick always arrives during GC). This way
1616 // penalises threads that do a lot of allocation, but that seems
1617 // better than the alternative.
1620 /* put the thread back on the run queue. Then, if we're ready to
1621 * GC, check whether this is the last task to stop. If so, wake
1622 * up the GC thread. getThread will block during a GC until the
1626 if (t->what_next != prev_what_next) {
1627 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1628 (long)t->id, whatNext_strs[t->what_next]);
1630 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1631 (long)t->id, whatNext_strs[t->what_next]);
1636 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1638 ASSERT(t->link == END_TSO_QUEUE);
1640 // Shortcut if we're just switching evaluators: don't bother
1641 // doing stack squeezing (which can be expensive), just run the
1643 if (t->what_next != prev_what_next) {
1648 ASSERT(!is_on_queue(t,CurrentProc));
1651 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1652 checkThreadQsSanity(rtsTrue));
1656 addToRunQueue(cap,t);
1659 /* add a ContinueThread event to actually process the thread */
1660 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1662 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1664 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1671 /* -----------------------------------------------------------------------------
1672 * Handle a thread that returned to the scheduler with ThreadBlocked
1673 * -------------------------------------------------------------------------- */
1676 scheduleHandleThreadBlocked( StgTSO *t
1677 #if !defined(GRAN) && !defined(DEBUG)
1684 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1685 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1686 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1688 // ??? needed; should emit block before
1690 DumpGranEvent(GR_DESCHEDULE, t));
1691 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1694 ASSERT(procStatus[CurrentProc]==Busy ||
1695 ((procStatus[CurrentProc]==Fetching) &&
1696 (t->block_info.closure!=(StgClosure*)NULL)));
1697 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1698 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1699 procStatus[CurrentProc]==Fetching))
1700 procStatus[CurrentProc] = Idle;
1704 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1705 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1708 if (t->block_info.closure!=(StgClosure*)NULL)
1709 print_bq(t->block_info.closure));
1711 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1714 /* whatever we schedule next, we must log that schedule */
1715 emitSchedule = rtsTrue;
1719 // We don't need to do anything. The thread is blocked, and it
1720 // has tidied up its stack and placed itself on whatever queue
1721 // it needs to be on.
1724 ASSERT(t->why_blocked != NotBlocked);
1725 // This might not be true under SMP: we don't have
1726 // exclusive access to this TSO, so someone might have
1727 // woken it up by now. This actually happens: try
1728 // conc023 +RTS -N2.
1732 debugBelch("--<< thread %d (%s) stopped: ",
1733 t->id, whatNext_strs[t->what_next]);
1734 printThreadBlockage(t);
1737 /* Only for dumping event to log file
1738 ToDo: do I need this in GranSim, too?
1744 /* -----------------------------------------------------------------------------
1745 * Handle a thread that returned to the scheduler with ThreadFinished
1746 * -------------------------------------------------------------------------- */
1749 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1751 /* Need to check whether this was a main thread, and if so,
1752 * return with the return value.
1754 * We also end up here if the thread kills itself with an
1755 * uncaught exception, see Exception.cmm.
1757 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1758 t->id, whatNext_strs[t->what_next]));
1761 endThread(t, CurrentProc); // clean-up the thread
1762 #elif defined(PARALLEL_HASKELL)
1763 /* For now all are advisory -- HWL */
1764 //if(t->priority==AdvisoryPriority) ??
1765 advisory_thread_count--; // JB: Caution with this counter, buggy!
1768 if(t->dist.priority==RevalPriority)
1772 # if defined(EDENOLD)
1773 // the thread could still have an outport... (BUG)
1774 if (t->eden.outport != -1) {
1775 // delete the outport for the tso which has finished...
1776 IF_PAR_DEBUG(eden_ports,
1777 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1778 t->eden.outport, t->id));
1781 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1782 if (t->eden.epid != -1) {
1783 IF_PAR_DEBUG(eden_ports,
1784 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1785 t->id, t->eden.epid));
1786 removeTSOfromProcess(t);
1791 if (RtsFlags.ParFlags.ParStats.Full &&
1792 !RtsFlags.ParFlags.ParStats.Suppressed)
1793 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1795 // t->par only contains statistics: left out for now...
1797 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1798 t->id,t,t->par.sparkname));
1800 #endif // PARALLEL_HASKELL
1803 // Check whether the thread that just completed was a bound
1804 // thread, and if so return with the result.
1806 // There is an assumption here that all thread completion goes
1807 // through this point; we need to make sure that if a thread
1808 // ends up in the ThreadKilled state, that it stays on the run
1809 // queue so it can be dealt with here.
1814 if (t->bound != task) {
1815 #if !defined(THREADED_RTS)
1816 // Must be a bound thread that is not the topmost one. Leave
1817 // it on the run queue until the stack has unwound to the
1818 // point where we can deal with this. Leaving it on the run
1819 // queue also ensures that the garbage collector knows about
1820 // this thread and its return value (it gets dropped from the
1821 // all_threads list so there's no other way to find it).
1822 appendToRunQueue(cap,t);
1825 // this cannot happen in the threaded RTS, because a
1826 // bound thread can only be run by the appropriate Task.
1827 barf("finished bound thread that isn't mine");
1831 ASSERT(task->tso == t);
1833 if (t->what_next == ThreadComplete) {
1835 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1836 *(task->ret) = (StgClosure *)task->tso->sp[1];
1838 task->stat = Success;
1841 *(task->ret) = NULL;
1844 task->stat = Interrupted;
1846 task->stat = Killed;
1850 removeThreadLabel((StgWord)task->tso->id);
1852 return rtsTrue; // tells schedule() to return
1858 /* -----------------------------------------------------------------------------
1859 * Perform a heap census, if PROFILING
1860 * -------------------------------------------------------------------------- */
1863 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1865 #if defined(PROFILING)
1866 // When we have +RTS -i0 and we're heap profiling, do a census at
1867 // every GC. This lets us get repeatable runs for debugging.
1868 if (performHeapProfile ||
1869 (RtsFlags.ProfFlags.profileInterval==0 &&
1870 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1871 GarbageCollect(GetRoots, rtsTrue);
1873 performHeapProfile = rtsFalse;
1874 return rtsTrue; // true <=> we already GC'd
1880 /* -----------------------------------------------------------------------------
1881 * Perform a garbage collection if necessary
1882 * -------------------------------------------------------------------------- */
1885 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1889 static volatile StgWord waiting_for_gc;
1890 rtsBool was_waiting;
1895 // In order to GC, there must be no threads running Haskell code.
1896 // Therefore, the GC thread needs to hold *all* the capabilities,
1897 // and release them after the GC has completed.
1899 // This seems to be the simplest way: previous attempts involved
1900 // making all the threads with capabilities give up their
1901 // capabilities and sleep except for the *last* one, which
1902 // actually did the GC. But it's quite hard to arrange for all
1903 // the other tasks to sleep and stay asleep.
1906 was_waiting = cas(&waiting_for_gc, 0, 1);
1909 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1910 yieldCapability(&cap,task);
1911 } while (waiting_for_gc);
1915 for (i=0; i < n_capabilities; i++) {
1916 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1917 if (cap != &capabilities[i]) {
1918 Capability *pcap = &capabilities[i];
1919 // we better hope this task doesn't get migrated to
1920 // another Capability while we're waiting for this one.
1921 // It won't, because load balancing happens while we have
1922 // all the Capabilities, but even so it's a slightly
1923 // unsavoury invariant.
1926 waitForReturnCapability(&pcap, task);
1927 if (pcap != &capabilities[i]) {
1928 barf("scheduleDoGC: got the wrong capability");
1933 waiting_for_gc = rtsFalse;
1936 /* Kick any transactions which are invalid back to their
1937 * atomically frames. When next scheduled they will try to
1938 * commit, this commit will fail and they will retry.
1943 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1944 if (t->what_next == ThreadRelocated) {
1947 next = t->global_link;
1948 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1949 if (!stmValidateNestOfTransactions (t -> trec)) {
1950 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1952 // strip the stack back to the
1953 // ATOMICALLY_FRAME, aborting the (nested)
1954 // transaction, and saving the stack of any
1955 // partially-evaluated thunks on the heap.
1956 raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1959 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1967 // so this happens periodically:
1968 scheduleCheckBlackHoles(cap);
1970 IF_DEBUG(scheduler, printAllThreads());
1972 /* everybody back, start the GC.
1973 * Could do it in this thread, or signal a condition var
1974 * to do it in another thread. Either way, we need to
1975 * broadcast on gc_pending_cond afterward.
1977 #if defined(THREADED_RTS)
1978 IF_DEBUG(scheduler,sched_belch("doing GC"));
1980 GarbageCollect(GetRoots, force_major);
1983 // release our stash of capabilities.
1984 for (i = 0; i < n_capabilities; i++) {
1985 if (cap != &capabilities[i]) {
1986 task->cap = &capabilities[i];
1987 releaseCapability(&capabilities[i]);
1994 /* add a ContinueThread event to continue execution of current thread */
1995 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1997 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1999 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2005 /* ---------------------------------------------------------------------------
2006 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2007 * used by Control.Concurrent for error checking.
2008 * ------------------------------------------------------------------------- */
2011 rtsSupportsBoundThreads(void)
2013 #if defined(THREADED_RTS)
2020 /* ---------------------------------------------------------------------------
2021 * isThreadBound(tso): check whether tso is bound to an OS thread.
2022 * ------------------------------------------------------------------------- */
2025 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2027 #if defined(THREADED_RTS)
2028 return (tso->bound != NULL);
2033 /* ---------------------------------------------------------------------------
2034 * Singleton fork(). Do not copy any running threads.
2035 * ------------------------------------------------------------------------- */
2037 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2038 #define FORKPROCESS_PRIMOP_SUPPORTED
2041 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2043 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2046 forkProcess(HsStablePtr *entry
2047 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2052 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2058 IF_DEBUG(scheduler,sched_belch("forking!"));
2060 // ToDo: for SMP, we should probably acquire *all* the capabilities
2065 if (pid) { // parent
2067 // just return the pid
2073 // delete all threads
2074 cap->run_queue_hd = END_TSO_QUEUE;
2075 cap->run_queue_tl = END_TSO_QUEUE;
2077 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2080 // don't allow threads to catch the ThreadKilled exception
2081 deleteThreadImmediately(cap,t);
2084 // wipe the task list
2085 ACQUIRE_LOCK(&sched_mutex);
2086 for (task = all_tasks; task != NULL; task=task->all_link) {
2087 if (task != cap->running_task) discardTask(task);
2089 RELEASE_LOCK(&sched_mutex);
2091 #if defined(THREADED_RTS)
2092 // wipe our spare workers list.
2093 cap->spare_workers = NULL;
2096 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2097 rts_checkSchedStatus("forkProcess",cap);
2100 hs_exit(); // clean up and exit
2101 stg_exit(EXIT_SUCCESS);
2103 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2104 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2109 /* ---------------------------------------------------------------------------
2110 * Delete the threads on the run queue of the current capability.
2111 * ------------------------------------------------------------------------- */
2114 deleteRunQueue (Capability *cap)
2117 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2118 ASSERT(t->what_next != ThreadRelocated);
2120 deleteThread(cap, t);
2124 /* startThread and insertThread are now in GranSim.c -- HWL */
2127 /* -----------------------------------------------------------------------------
2128 Managing the suspended_ccalling_tasks list.
2129 Locks required: sched_mutex
2130 -------------------------------------------------------------------------- */
2133 suspendTask (Capability *cap, Task *task)
2135 ASSERT(task->next == NULL && task->prev == NULL);
2136 task->next = cap->suspended_ccalling_tasks;
2138 if (cap->suspended_ccalling_tasks) {
2139 cap->suspended_ccalling_tasks->prev = task;
2141 cap->suspended_ccalling_tasks = task;
2145 recoverSuspendedTask (Capability *cap, Task *task)
2148 task->prev->next = task->next;
2150 ASSERT(cap->suspended_ccalling_tasks == task);
2151 cap->suspended_ccalling_tasks = task->next;
2154 task->next->prev = task->prev;
2156 task->next = task->prev = NULL;
2159 /* ---------------------------------------------------------------------------
2160 * Suspending & resuming Haskell threads.
2162 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2163 * its capability before calling the C function. This allows another
2164 * task to pick up the capability and carry on running Haskell
2165 * threads. It also means that if the C call blocks, it won't lock
2168 * The Haskell thread making the C call is put to sleep for the
2169 * duration of the call, on the susepended_ccalling_threads queue. We
2170 * give out a token to the task, which it can use to resume the thread
2171 * on return from the C function.
2172 * ------------------------------------------------------------------------- */
2175 suspendThread (StgRegTable *reg)
2178 int saved_errno = errno;
2182 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2184 cap = regTableToCapability(reg);
2186 task = cap->running_task;
2187 tso = cap->r.rCurrentTSO;
2190 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2192 // XXX this might not be necessary --SDM
2193 tso->what_next = ThreadRunGHC;
2195 threadPaused(cap,tso);
2197 if(tso->blocked_exceptions == NULL) {
2198 tso->why_blocked = BlockedOnCCall;
2199 tso->blocked_exceptions = END_TSO_QUEUE;
2201 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2204 // Hand back capability
2205 task->suspended_tso = tso;
2207 ACQUIRE_LOCK(&cap->lock);
2209 suspendTask(cap,task);
2210 cap->in_haskell = rtsFalse;
2211 releaseCapability_(cap);
2213 RELEASE_LOCK(&cap->lock);
2215 #if defined(THREADED_RTS)
2216 /* Preparing to leave the RTS, so ensure there's a native thread/task
2217 waiting to take over.
2219 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2222 errno = saved_errno;
2227 resumeThread (void *task_)
2231 int saved_errno = errno;
2235 // Wait for permission to re-enter the RTS with the result.
2236 waitForReturnCapability(&cap,task);
2237 // we might be on a different capability now... but if so, our
2238 // entry on the suspended_ccalling_tasks list will also have been
2241 // Remove the thread from the suspended list
2242 recoverSuspendedTask(cap,task);
2244 tso = task->suspended_tso;
2245 task->suspended_tso = NULL;
2246 tso->link = END_TSO_QUEUE;
2247 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2249 if (tso->why_blocked == BlockedOnCCall) {
2250 awakenBlockedQueue(cap,tso->blocked_exceptions);
2251 tso->blocked_exceptions = NULL;
2254 /* Reset blocking status */
2255 tso->why_blocked = NotBlocked;
2257 cap->r.rCurrentTSO = tso;
2258 cap->in_haskell = rtsTrue;
2259 errno = saved_errno;
2264 /* ---------------------------------------------------------------------------
2265 * Comparing Thread ids.
2267 * This is used from STG land in the implementation of the
2268 * instances of Eq/Ord for ThreadIds.
2269 * ------------------------------------------------------------------------ */
2272 cmp_thread(StgPtr tso1, StgPtr tso2)
2274 StgThreadID id1 = ((StgTSO *)tso1)->id;
2275 StgThreadID id2 = ((StgTSO *)tso2)->id;
2277 if (id1 < id2) return (-1);
2278 if (id1 > id2) return 1;
2282 /* ---------------------------------------------------------------------------
2283 * Fetching the ThreadID from an StgTSO.
2285 * This is used in the implementation of Show for ThreadIds.
2286 * ------------------------------------------------------------------------ */
2288 rts_getThreadId(StgPtr tso)
2290 return ((StgTSO *)tso)->id;
2295 labelThread(StgPtr tso, char *label)
2300 /* Caveat: Once set, you can only set the thread name to "" */
2301 len = strlen(label)+1;
2302 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2303 strncpy(buf,label,len);
2304 /* Update will free the old memory for us */
2305 updateThreadLabel(((StgTSO *)tso)->id,buf);
2309 /* ---------------------------------------------------------------------------
2310 Create a new thread.
2312 The new thread starts with the given stack size. Before the
2313 scheduler can run, however, this thread needs to have a closure
2314 (and possibly some arguments) pushed on its stack. See
2315 pushClosure() in Schedule.h.
2317 createGenThread() and createIOThread() (in SchedAPI.h) are
2318 convenient packaged versions of this function.
2320 currently pri (priority) is only used in a GRAN setup -- HWL
2321 ------------------------------------------------------------------------ */
2323 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2325 createThread(nat size, StgInt pri)
2328 createThread(Capability *cap, nat size)
2334 /* sched_mutex is *not* required */
2336 /* First check whether we should create a thread at all */
2337 #if defined(PARALLEL_HASKELL)
2338 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2339 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2341 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2342 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2343 return END_TSO_QUEUE;
2349 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2352 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2354 /* catch ridiculously small stack sizes */
2355 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2356 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2359 stack_size = size - TSO_STRUCT_SIZEW;
2361 tso = (StgTSO *)allocateLocal(cap, size);
2362 TICK_ALLOC_TSO(stack_size, 0);
2364 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2366 SET_GRAN_HDR(tso, ThisPE);
2369 // Always start with the compiled code evaluator
2370 tso->what_next = ThreadRunGHC;
2372 tso->why_blocked = NotBlocked;
2373 tso->blocked_exceptions = NULL;
2375 tso->saved_errno = 0;
2378 tso->stack_size = stack_size;
2379 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2381 tso->sp = (P_)&(tso->stack) + stack_size;
2383 tso->trec = NO_TREC;
2386 tso->prof.CCCS = CCS_MAIN;
2389 /* put a stop frame on the stack */
2390 tso->sp -= sizeofW(StgStopFrame);
2391 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2392 tso->link = END_TSO_QUEUE;
2396 /* uses more flexible routine in GranSim */
2397 insertThread(tso, CurrentProc);
2399 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2405 if (RtsFlags.GranFlags.GranSimStats.Full)
2406 DumpGranEvent(GR_START,tso);
2407 #elif defined(PARALLEL_HASKELL)
2408 if (RtsFlags.ParFlags.ParStats.Full)
2409 DumpGranEvent(GR_STARTQ,tso);
2410 /* HACk to avoid SCHEDULE
2414 /* Link the new thread on the global thread list.
2416 ACQUIRE_LOCK(&sched_mutex);
2417 tso->id = next_thread_id++; // while we have the mutex
2418 tso->global_link = all_threads;
2420 RELEASE_LOCK(&sched_mutex);
2423 tso->dist.priority = MandatoryPriority; //by default that is...
2427 tso->gran.pri = pri;
2429 tso->gran.magic = TSO_MAGIC; // debugging only
2431 tso->gran.sparkname = 0;
2432 tso->gran.startedat = CURRENT_TIME;
2433 tso->gran.exported = 0;
2434 tso->gran.basicblocks = 0;
2435 tso->gran.allocs = 0;
2436 tso->gran.exectime = 0;
2437 tso->gran.fetchtime = 0;
2438 tso->gran.fetchcount = 0;
2439 tso->gran.blocktime = 0;
2440 tso->gran.blockcount = 0;
2441 tso->gran.blockedat = 0;
2442 tso->gran.globalsparks = 0;
2443 tso->gran.localsparks = 0;
2444 if (RtsFlags.GranFlags.Light)
2445 tso->gran.clock = Now; /* local clock */
2447 tso->gran.clock = 0;
2449 IF_DEBUG(gran,printTSO(tso));
2450 #elif defined(PARALLEL_HASKELL)
2452 tso->par.magic = TSO_MAGIC; // debugging only
2454 tso->par.sparkname = 0;
2455 tso->par.startedat = CURRENT_TIME;
2456 tso->par.exported = 0;
2457 tso->par.basicblocks = 0;
2458 tso->par.allocs = 0;
2459 tso->par.exectime = 0;
2460 tso->par.fetchtime = 0;
2461 tso->par.fetchcount = 0;
2462 tso->par.blocktime = 0;
2463 tso->par.blockcount = 0;
2464 tso->par.blockedat = 0;
2465 tso->par.globalsparks = 0;
2466 tso->par.localsparks = 0;
2470 globalGranStats.tot_threads_created++;
2471 globalGranStats.threads_created_on_PE[CurrentProc]++;
2472 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2473 globalGranStats.tot_sq_probes++;
2474 #elif defined(PARALLEL_HASKELL)
2475 // collect parallel global statistics (currently done together with GC stats)
2476 if (RtsFlags.ParFlags.ParStats.Global &&
2477 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2478 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2479 globalParStats.tot_threads_created++;
2485 sched_belch("==__ schedule: Created TSO %d (%p);",
2486 CurrentProc, tso, tso->id));
2487 #elif defined(PARALLEL_HASKELL)
2488 IF_PAR_DEBUG(verbose,
2489 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2490 (long)tso->id, tso, advisory_thread_count));
2492 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2493 (long)tso->id, (long)tso->stack_size));
2500 all parallel thread creation calls should fall through the following routine.
2503 createThreadFromSpark(rtsSpark spark)
2505 ASSERT(spark != (rtsSpark)NULL);
2506 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2507 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2509 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2510 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2511 return END_TSO_QUEUE;
2515 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2516 if (tso==END_TSO_QUEUE)
2517 barf("createSparkThread: Cannot create TSO");
2519 tso->priority = AdvisoryPriority;
2521 pushClosure(tso,spark);
2523 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2530 Turn a spark into a thread.
2531 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2535 activateSpark (rtsSpark spark)
2539 tso = createSparkThread(spark);
2540 if (RtsFlags.ParFlags.ParStats.Full) {
2541 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2542 IF_PAR_DEBUG(verbose,
2543 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2544 (StgClosure *)spark, info_type((StgClosure *)spark)));
2546 // ToDo: fwd info on local/global spark to thread -- HWL
2547 // tso->gran.exported = spark->exported;
2548 // tso->gran.locked = !spark->global;
2549 // tso->gran.sparkname = spark->name;
2555 /* ---------------------------------------------------------------------------
2558 * scheduleThread puts a thread on the end of the runnable queue.
2559 * This will usually be done immediately after a thread is created.
2560 * The caller of scheduleThread must create the thread using e.g.
2561 * createThread and push an appropriate closure
2562 * on this thread's stack before the scheduler is invoked.
2563 * ------------------------------------------------------------------------ */
2566 scheduleThread(Capability *cap, StgTSO *tso)
2568 // The thread goes at the *end* of the run-queue, to avoid possible
2569 // starvation of any threads already on the queue.
2570 appendToRunQueue(cap,tso);
2574 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2578 // We already created/initialised the Task
2579 task = cap->running_task;
2581 // This TSO is now a bound thread; make the Task and TSO
2582 // point to each other.
2587 task->stat = NoStatus;
2589 appendToRunQueue(cap,tso);
2591 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2594 /* GranSim specific init */
2595 CurrentTSO = m->tso; // the TSO to run
2596 procStatus[MainProc] = Busy; // status of main PE
2597 CurrentProc = MainProc; // PE to run it on
2600 cap = schedule(cap,task);
2602 ASSERT(task->stat != NoStatus);
2603 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2605 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2609 /* ----------------------------------------------------------------------------
2611 * ------------------------------------------------------------------------- */
2613 #if defined(THREADED_RTS)
2615 workerStart(Task *task)
2619 // See startWorkerTask().
2620 ACQUIRE_LOCK(&task->lock);
2622 RELEASE_LOCK(&task->lock);
2624 // set the thread-local pointer to the Task:
2627 // schedule() runs without a lock.
2628 cap = schedule(cap,task);
2630 // On exit from schedule(), we have a Capability.
2631 releaseCapability(cap);
2636 /* ---------------------------------------------------------------------------
2639 * Initialise the scheduler. This resets all the queues - if the
2640 * queues contained any threads, they'll be garbage collected at the
2643 * ------------------------------------------------------------------------ */
2650 for (i=0; i<=MAX_PROC; i++) {
2651 run_queue_hds[i] = END_TSO_QUEUE;
2652 run_queue_tls[i] = END_TSO_QUEUE;
2653 blocked_queue_hds[i] = END_TSO_QUEUE;
2654 blocked_queue_tls[i] = END_TSO_QUEUE;
2655 ccalling_threadss[i] = END_TSO_QUEUE;
2656 blackhole_queue[i] = END_TSO_QUEUE;
2657 sleeping_queue = END_TSO_QUEUE;
2659 #elif !defined(THREADED_RTS)
2660 blocked_queue_hd = END_TSO_QUEUE;
2661 blocked_queue_tl = END_TSO_QUEUE;
2662 sleeping_queue = END_TSO_QUEUE;
2665 blackhole_queue = END_TSO_QUEUE;
2666 all_threads = END_TSO_QUEUE;
2671 RtsFlags.ConcFlags.ctxtSwitchTicks =
2672 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2674 #if defined(THREADED_RTS)
2675 /* Initialise the mutex and condition variables used by
2677 initMutex(&sched_mutex);
2680 ACQUIRE_LOCK(&sched_mutex);
2682 /* A capability holds the state a native thread needs in
2683 * order to execute STG code. At least one capability is
2684 * floating around (only SMP builds have more than one).
2690 #if defined(SMP) || defined(PARALLEL_HASKELL)
2696 * Eagerly start one worker to run each Capability, except for
2697 * Capability 0. The idea is that we're probably going to start a
2698 * bound thread on Capability 0 pretty soon, so we don't want a
2699 * worker task hogging it.
2704 for (i = 1; i < n_capabilities; i++) {
2705 cap = &capabilities[i];
2706 ACQUIRE_LOCK(&cap->lock);
2707 startWorkerTask(cap, workerStart);
2708 RELEASE_LOCK(&cap->lock);
2713 RELEASE_LOCK(&sched_mutex);
2717 exitScheduler( void )
2719 interrupted = rtsTrue;
2720 shutting_down_scheduler = rtsTrue;
2722 #if defined(THREADED_RTS)
2727 ACQUIRE_LOCK(&sched_mutex);
2728 task = newBoundTask();
2729 RELEASE_LOCK(&sched_mutex);
2731 for (i = 0; i < n_capabilities; i++) {
2732 shutdownCapability(&capabilities[i], task);
2734 boundTaskExiting(task);
2740 /* ---------------------------------------------------------------------------
2741 Where are the roots that we know about?
2743 - all the threads on the runnable queue
2744 - all the threads on the blocked queue
2745 - all the threads on the sleeping queue
2746 - all the thread currently executing a _ccall_GC
2747 - all the "main threads"
2749 ------------------------------------------------------------------------ */
2751 /* This has to be protected either by the scheduler monitor, or by the
2752 garbage collection monitor (probably the latter).
2757 GetRoots( evac_fn evac )
2764 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2765 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2766 evac((StgClosure **)&run_queue_hds[i]);
2767 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2768 evac((StgClosure **)&run_queue_tls[i]);
2770 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2771 evac((StgClosure **)&blocked_queue_hds[i]);
2772 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2773 evac((StgClosure **)&blocked_queue_tls[i]);
2774 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2775 evac((StgClosure **)&ccalling_threads[i]);
2782 for (i = 0; i < n_capabilities; i++) {
2783 cap = &capabilities[i];
2784 evac((StgClosure **)&cap->run_queue_hd);
2785 evac((StgClosure **)&cap->run_queue_tl);
2787 for (task = cap->suspended_ccalling_tasks; task != NULL;
2789 evac((StgClosure **)&task->suspended_tso);
2793 #if !defined(THREADED_RTS)
2794 evac((StgClosure **)&blocked_queue_hd);
2795 evac((StgClosure **)&blocked_queue_tl);
2796 evac((StgClosure **)&sleeping_queue);
2800 evac((StgClosure **)&blackhole_queue);
2802 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2803 markSparkQueue(evac);
2806 #if defined(RTS_USER_SIGNALS)
2807 // mark the signal handlers (signals should be already blocked)
2808 markSignalHandlers(evac);
2812 /* -----------------------------------------------------------------------------
2815 This is the interface to the garbage collector from Haskell land.
2816 We provide this so that external C code can allocate and garbage
2817 collect when called from Haskell via _ccall_GC.
2819 It might be useful to provide an interface whereby the programmer
2820 can specify more roots (ToDo).
2822 This needs to be protected by the GC condition variable above. KH.
2823 -------------------------------------------------------------------------- */
2825 static void (*extra_roots)(evac_fn);
2831 // ToDo: we have to grab all the capabilities here.
2832 errorBelch("performGC not supported in threaded RTS (yet)");
2833 stg_exit(EXIT_FAILURE);
2835 /* Obligated to hold this lock upon entry */
2836 GarbageCollect(GetRoots,rtsFalse);
2840 performMajorGC(void)
2843 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2844 stg_exit(EXIT_FAILURE);
2846 GarbageCollect(GetRoots,rtsTrue);
2850 AllRoots(evac_fn evac)
2852 GetRoots(evac); // the scheduler's roots
2853 extra_roots(evac); // the user's roots
2857 performGCWithRoots(void (*get_roots)(evac_fn))
2860 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2861 stg_exit(EXIT_FAILURE);
2863 extra_roots = get_roots;
2864 GarbageCollect(AllRoots,rtsFalse);
2867 /* -----------------------------------------------------------------------------
2870 If the thread has reached its maximum stack size, then raise the
2871 StackOverflow exception in the offending thread. Otherwise
2872 relocate the TSO into a larger chunk of memory and adjust its stack
2874 -------------------------------------------------------------------------- */
2877 threadStackOverflow(Capability *cap, StgTSO *tso)
2879 nat new_stack_size, stack_words;
2884 IF_DEBUG(sanity,checkTSO(tso));
2885 if (tso->stack_size >= tso->max_stack_size) {
2888 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2889 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2890 /* If we're debugging, just print out the top of the stack */
2891 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2894 /* Send this thread the StackOverflow exception */
2895 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2899 /* Try to double the current stack size. If that takes us over the
2900 * maximum stack size for this thread, then use the maximum instead.
2901 * Finally round up so the TSO ends up as a whole number of blocks.
2903 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2904 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2905 TSO_STRUCT_SIZE)/sizeof(W_);
2906 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2907 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2909 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2911 dest = (StgTSO *)allocate(new_tso_size);
2912 TICK_ALLOC_TSO(new_stack_size,0);
2914 /* copy the TSO block and the old stack into the new area */
2915 memcpy(dest,tso,TSO_STRUCT_SIZE);
2916 stack_words = tso->stack + tso->stack_size - tso->sp;
2917 new_sp = (P_)dest + new_tso_size - stack_words;
2918 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2920 /* relocate the stack pointers... */
2922 dest->stack_size = new_stack_size;
2924 /* Mark the old TSO as relocated. We have to check for relocated
2925 * TSOs in the garbage collector and any primops that deal with TSOs.
2927 * It's important to set the sp value to just beyond the end
2928 * of the stack, so we don't attempt to scavenge any part of the
2931 tso->what_next = ThreadRelocated;
2933 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2934 tso->why_blocked = NotBlocked;
2936 IF_PAR_DEBUG(verbose,
2937 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2938 tso->id, tso, tso->stack_size);
2939 /* If we're debugging, just print out the top of the stack */
2940 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2943 IF_DEBUG(sanity,checkTSO(tso));
2945 IF_DEBUG(scheduler,printTSO(dest));
2951 /* ---------------------------------------------------------------------------
2952 Wake up a queue that was blocked on some resource.
2953 ------------------------------------------------------------------------ */
2957 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2960 #elif defined(PARALLEL_HASKELL)
2962 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2964 /* write RESUME events to log file and
2965 update blocked and fetch time (depending on type of the orig closure) */
2966 if (RtsFlags.ParFlags.ParStats.Full) {
2967 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2968 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2969 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2970 if (emptyRunQueue())
2971 emitSchedule = rtsTrue;
2973 switch (get_itbl(node)->type) {
2975 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2980 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2987 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2994 StgBlockingQueueElement *
2995 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2998 PEs node_loc, tso_loc;
3000 node_loc = where_is(node); // should be lifted out of loop
3001 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3002 tso_loc = where_is((StgClosure *)tso);
3003 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3004 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3005 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3006 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3007 // insertThread(tso, node_loc);
3008 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3010 tso, node, (rtsSpark*)NULL);
3011 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3014 } else { // TSO is remote (actually should be FMBQ)
3015 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3016 RtsFlags.GranFlags.Costs.gunblocktime +
3017 RtsFlags.GranFlags.Costs.latency;
3018 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3020 tso, node, (rtsSpark*)NULL);
3021 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3024 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3026 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3027 (node_loc==tso_loc ? "Local" : "Global"),
3028 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3029 tso->block_info.closure = NULL;
3030 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3033 #elif defined(PARALLEL_HASKELL)
3034 StgBlockingQueueElement *
3035 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3037 StgBlockingQueueElement *next;
3039 switch (get_itbl(bqe)->type) {
3041 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3042 /* if it's a TSO just push it onto the run_queue */
3044 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3045 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3047 unblockCount(bqe, node);
3048 /* reset blocking status after dumping event */
3049 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3053 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3055 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3056 PendingFetches = (StgBlockedFetch *)bqe;
3060 /* can ignore this case in a non-debugging setup;
3061 see comments on RBHSave closures above */
3063 /* check that the closure is an RBHSave closure */
3064 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3065 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3066 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3070 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3071 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3075 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3081 unblockOne(Capability *cap, StgTSO *tso)
3085 ASSERT(get_itbl(tso)->type == TSO);
3086 ASSERT(tso->why_blocked != NotBlocked);
3087 tso->why_blocked = NotBlocked;
3089 tso->link = END_TSO_QUEUE;
3091 // We might have just migrated this TSO to our Capability:
3093 tso->bound->cap = cap;
3096 appendToRunQueue(cap,tso);
3098 // we're holding a newly woken thread, make sure we context switch
3099 // quickly so we can migrate it if necessary.
3101 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3108 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3110 StgBlockingQueueElement *bqe;
3115 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3116 node, CurrentProc, CurrentTime[CurrentProc],
3117 CurrentTSO->id, CurrentTSO));
3119 node_loc = where_is(node);
3121 ASSERT(q == END_BQ_QUEUE ||
3122 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3123 get_itbl(q)->type == CONSTR); // closure (type constructor)
3124 ASSERT(is_unique(node));
3126 /* FAKE FETCH: magically copy the node to the tso's proc;
3127 no Fetch necessary because in reality the node should not have been
3128 moved to the other PE in the first place
3130 if (CurrentProc!=node_loc) {
3132 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3133 node, node_loc, CurrentProc, CurrentTSO->id,
3134 // CurrentTSO, where_is(CurrentTSO),
3135 node->header.gran.procs));
3136 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3138 debugBelch("## new bitmask of node %p is %#x\n",
3139 node, node->header.gran.procs));
3140 if (RtsFlags.GranFlags.GranSimStats.Global) {
3141 globalGranStats.tot_fake_fetches++;
3146 // ToDo: check: ASSERT(CurrentProc==node_loc);
3147 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3150 bqe points to the current element in the queue
3151 next points to the next element in the queue
3153 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3154 //tso_loc = where_is(tso);
3156 bqe = unblockOne(bqe, node);
3159 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3160 the closure to make room for the anchor of the BQ */
3161 if (bqe!=END_BQ_QUEUE) {
3162 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3164 ASSERT((info_ptr==&RBH_Save_0_info) ||
3165 (info_ptr==&RBH_Save_1_info) ||
3166 (info_ptr==&RBH_Save_2_info));
3168 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3169 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3170 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3173 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3174 node, info_type(node)));
3177 /* statistics gathering */
3178 if (RtsFlags.GranFlags.GranSimStats.Global) {
3179 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3180 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3181 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3182 globalGranStats.tot_awbq++; // total no. of bqs awakened
3185 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3186 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3188 #elif defined(PARALLEL_HASKELL)
3190 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3192 StgBlockingQueueElement *bqe;
3194 IF_PAR_DEBUG(verbose,
3195 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3199 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3200 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3205 ASSERT(q == END_BQ_QUEUE ||
3206 get_itbl(q)->type == TSO ||
3207 get_itbl(q)->type == BLOCKED_FETCH ||
3208 get_itbl(q)->type == CONSTR);
3211 while (get_itbl(bqe)->type==TSO ||
3212 get_itbl(bqe)->type==BLOCKED_FETCH) {
3213 bqe = unblockOne(bqe, node);
3217 #else /* !GRAN && !PARALLEL_HASKELL */
3220 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3222 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3224 while (tso != END_TSO_QUEUE) {
3225 tso = unblockOne(cap,tso);
3230 /* ---------------------------------------------------------------------------
3232 - usually called inside a signal handler so it mustn't do anything fancy.
3233 ------------------------------------------------------------------------ */
3236 interruptStgRts(void)
3240 #if defined(THREADED_RTS)
3241 prodAllCapabilities();
3245 /* -----------------------------------------------------------------------------
3248 This is for use when we raise an exception in another thread, which
3250 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3251 -------------------------------------------------------------------------- */
3253 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3255 NB: only the type of the blocking queue is different in GranSim and GUM
3256 the operations on the queue-elements are the same
3257 long live polymorphism!
3259 Locks: sched_mutex is held upon entry and exit.
3263 unblockThread(Capability *cap, StgTSO *tso)
3265 StgBlockingQueueElement *t, **last;
3267 switch (tso->why_blocked) {
3270 return; /* not blocked */
3273 // Be careful: nothing to do here! We tell the scheduler that the thread
3274 // is runnable and we leave it to the stack-walking code to abort the
3275 // transaction while unwinding the stack. We should perhaps have a debugging
3276 // test to make sure that this really happens and that the 'zombie' transaction
3277 // does not get committed.
3281 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3283 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3284 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3286 last = (StgBlockingQueueElement **)&mvar->head;
3287 for (t = (StgBlockingQueueElement *)mvar->head;
3289 last = &t->link, last_tso = t, t = t->link) {
3290 if (t == (StgBlockingQueueElement *)tso) {
3291 *last = (StgBlockingQueueElement *)tso->link;
3292 if (mvar->tail == tso) {
3293 mvar->tail = (StgTSO *)last_tso;
3298 barf("unblockThread (MVAR): TSO not found");
3301 case BlockedOnBlackHole:
3302 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3304 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3306 last = &bq->blocking_queue;
3307 for (t = bq->blocking_queue;
3309 last = &t->link, t = t->link) {
3310 if (t == (StgBlockingQueueElement *)tso) {
3311 *last = (StgBlockingQueueElement *)tso->link;
3315 barf("unblockThread (BLACKHOLE): TSO not found");
3318 case BlockedOnException:
3320 StgTSO *target = tso->block_info.tso;
3322 ASSERT(get_itbl(target)->type == TSO);
3324 if (target->what_next == ThreadRelocated) {
3325 target = target->link;
3326 ASSERT(get_itbl(target)->type == TSO);
3329 ASSERT(target->blocked_exceptions != NULL);
3331 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3332 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3334 last = &t->link, t = t->link) {
3335 ASSERT(get_itbl(t)->type == TSO);
3336 if (t == (StgBlockingQueueElement *)tso) {
3337 *last = (StgBlockingQueueElement *)tso->link;
3341 barf("unblockThread (Exception): TSO not found");
3345 case BlockedOnWrite:
3346 #if defined(mingw32_HOST_OS)
3347 case BlockedOnDoProc:
3350 /* take TSO off blocked_queue */
3351 StgBlockingQueueElement *prev = NULL;
3352 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3353 prev = t, t = t->link) {
3354 if (t == (StgBlockingQueueElement *)tso) {
3356 blocked_queue_hd = (StgTSO *)t->link;
3357 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3358 blocked_queue_tl = END_TSO_QUEUE;
3361 prev->link = t->link;
3362 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3363 blocked_queue_tl = (StgTSO *)prev;
3366 #if defined(mingw32_HOST_OS)
3367 /* (Cooperatively) signal that the worker thread should abort
3370 abandonWorkRequest(tso->block_info.async_result->reqID);
3375 barf("unblockThread (I/O): TSO not found");
3378 case BlockedOnDelay:
3380 /* take TSO off sleeping_queue */
3381 StgBlockingQueueElement *prev = NULL;
3382 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3383 prev = t, t = t->link) {
3384 if (t == (StgBlockingQueueElement *)tso) {
3386 sleeping_queue = (StgTSO *)t->link;
3388 prev->link = t->link;
3393 barf("unblockThread (delay): TSO not found");
3397 barf("unblockThread");
3401 tso->link = END_TSO_QUEUE;
3402 tso->why_blocked = NotBlocked;
3403 tso->block_info.closure = NULL;
3404 pushOnRunQueue(cap,tso);
3408 unblockThread(Capability *cap, StgTSO *tso)
3412 /* To avoid locking unnecessarily. */
3413 if (tso->why_blocked == NotBlocked) {
3417 switch (tso->why_blocked) {
3420 // Be careful: nothing to do here! We tell the scheduler that the thread
3421 // is runnable and we leave it to the stack-walking code to abort the
3422 // transaction while unwinding the stack. We should perhaps have a debugging
3423 // test to make sure that this really happens and that the 'zombie' transaction
3424 // does not get committed.
3428 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3430 StgTSO *last_tso = END_TSO_QUEUE;
3431 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3434 for (t = mvar->head; t != END_TSO_QUEUE;
3435 last = &t->link, last_tso = t, t = t->link) {
3438 if (mvar->tail == tso) {
3439 mvar->tail = last_tso;
3444 barf("unblockThread (MVAR): TSO not found");
3447 case BlockedOnBlackHole:
3449 last = &blackhole_queue;
3450 for (t = blackhole_queue; t != END_TSO_QUEUE;
3451 last = &t->link, t = t->link) {
3457 barf("unblockThread (BLACKHOLE): TSO not found");
3460 case BlockedOnException:
3462 StgTSO *target = tso->block_info.tso;
3464 ASSERT(get_itbl(target)->type == TSO);
3466 while (target->what_next == ThreadRelocated) {
3467 target = target->link;
3468 ASSERT(get_itbl(target)->type == TSO);
3471 ASSERT(target->blocked_exceptions != NULL);
3473 last = &target->blocked_exceptions;
3474 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3475 last = &t->link, t = t->link) {
3476 ASSERT(get_itbl(t)->type == TSO);
3482 barf("unblockThread (Exception): TSO not found");
3485 #if !defined(THREADED_RTS)
3487 case BlockedOnWrite:
3488 #if defined(mingw32_HOST_OS)
3489 case BlockedOnDoProc:
3492 StgTSO *prev = NULL;
3493 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3494 prev = t, t = t->link) {
3497 blocked_queue_hd = t->link;
3498 if (blocked_queue_tl == t) {
3499 blocked_queue_tl = END_TSO_QUEUE;
3502 prev->link = t->link;
3503 if (blocked_queue_tl == t) {
3504 blocked_queue_tl = prev;
3507 #if defined(mingw32_HOST_OS)
3508 /* (Cooperatively) signal that the worker thread should abort
3511 abandonWorkRequest(tso->block_info.async_result->reqID);
3516 barf("unblockThread (I/O): TSO not found");
3519 case BlockedOnDelay:
3521 StgTSO *prev = NULL;
3522 for (t = sleeping_queue; t != END_TSO_QUEUE;
3523 prev = t, t = t->link) {
3526 sleeping_queue = t->link;
3528 prev->link = t->link;
3533 barf("unblockThread (delay): TSO not found");
3538 barf("unblockThread");
3542 tso->link = END_TSO_QUEUE;
3543 tso->why_blocked = NotBlocked;
3544 tso->block_info.closure = NULL;
3545 appendToRunQueue(cap,tso);
3549 /* -----------------------------------------------------------------------------
3552 * Check the blackhole_queue for threads that can be woken up. We do
3553 * this periodically: before every GC, and whenever the run queue is
3556 * An elegant solution might be to just wake up all the blocked
3557 * threads with awakenBlockedQueue occasionally: they'll go back to
3558 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3559 * doesn't give us a way to tell whether we've actually managed to
3560 * wake up any threads, so we would be busy-waiting.
3562 * -------------------------------------------------------------------------- */
3565 checkBlackHoles (Capability *cap)
3568 rtsBool any_woke_up = rtsFalse;
3571 // blackhole_queue is global:
3572 ASSERT_LOCK_HELD(&sched_mutex);
3574 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3576 // ASSUMES: sched_mutex
3577 prev = &blackhole_queue;
3578 t = blackhole_queue;
3579 while (t != END_TSO_QUEUE) {
3580 ASSERT(t->why_blocked == BlockedOnBlackHole);
3581 type = get_itbl(t->block_info.closure)->type;
3582 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3583 IF_DEBUG(sanity,checkTSO(t));
3584 t = unblockOne(cap, t);
3585 // urk, the threads migrate to the current capability
3586 // here, but we'd like to keep them on the original one.
3588 any_woke_up = rtsTrue;
3598 /* -----------------------------------------------------------------------------
3601 * The following function implements the magic for raising an
3602 * asynchronous exception in an existing thread.
3604 * We first remove the thread from any queue on which it might be
3605 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3607 * We strip the stack down to the innermost CATCH_FRAME, building
3608 * thunks in the heap for all the active computations, so they can
3609 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3610 * an application of the handler to the exception, and push it on
3611 * the top of the stack.
3613 * How exactly do we save all the active computations? We create an
3614 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3615 * AP_STACKs pushes everything from the corresponding update frame
3616 * upwards onto the stack. (Actually, it pushes everything up to the
3617 * next update frame plus a pointer to the next AP_STACK object.
3618 * Entering the next AP_STACK object pushes more onto the stack until we
3619 * reach the last AP_STACK object - at which point the stack should look
3620 * exactly as it did when we killed the TSO and we can continue
3621 * execution by entering the closure on top of the stack.
3623 * We can also kill a thread entirely - this happens if either (a) the
3624 * exception passed to raiseAsync is NULL, or (b) there's no
3625 * CATCH_FRAME on the stack. In either case, we strip the entire
3626 * stack and replace the thread with a zombie.
3628 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3629 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3630 * the TSO is currently blocked on or on the run queue of.
3632 * -------------------------------------------------------------------------- */
3635 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3637 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3641 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3643 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3647 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3648 rtsBool stop_at_atomically, StgPtr stop_here)
3650 StgRetInfoTable *info;
3654 // Thread already dead?
3655 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3660 sched_belch("raising exception in thread %ld.", (long)tso->id));
3662 // Remove it from any blocking queues
3663 unblockThread(cap,tso);
3667 // The stack freezing code assumes there's a closure pointer on
3668 // the top of the stack, so we have to arrange that this is the case...
3670 if (sp[0] == (W_)&stg_enter_info) {
3674 sp[0] = (W_)&stg_dummy_ret_closure;
3678 while (stop_here == NULL || frame < stop_here) {
3680 // 1. Let the top of the stack be the "current closure"
3682 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3685 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3686 // current closure applied to the chunk of stack up to (but not
3687 // including) the update frame. This closure becomes the "current
3688 // closure". Go back to step 2.
3690 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3691 // top of the stack applied to the exception.
3693 // 5. If it's a STOP_FRAME, then kill the thread.
3695 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3698 info = get_ret_itbl((StgClosure *)frame);
3700 switch (info->i.type) {
3707 // First build an AP_STACK consisting of the stack chunk above the
3708 // current update frame, with the top word on the stack as the
3711 words = frame - sp - 1;
3712 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3715 ap->fun = (StgClosure *)sp[0];
3717 for(i=0; i < (nat)words; ++i) {
3718 ap->payload[i] = (StgClosure *)*sp++;
3721 SET_HDR(ap,&stg_AP_STACK_info,
3722 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3723 TICK_ALLOC_UP_THK(words+1,0);
3726 debugBelch("sched: Updating ");
3727 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3728 debugBelch(" with ");
3729 printObj((StgClosure *)ap);
3732 // Replace the updatee with an indirection
3734 // Warning: if we're in a loop, more than one update frame on
3735 // the stack may point to the same object. Be careful not to
3736 // overwrite an IND_OLDGEN in this case, because we'll screw
3737 // up the mutable lists. To be on the safe side, don't
3738 // overwrite any kind of indirection at all. See also
3739 // threadSqueezeStack in GC.c, where we have to make a similar
3742 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3743 // revert the black hole
3744 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3747 sp += sizeofW(StgUpdateFrame) - 1;
3748 sp[0] = (W_)ap; // push onto stack
3750 continue; //no need to bump frame
3754 // We've stripped the entire stack, the thread is now dead.
3755 tso->what_next = ThreadKilled;
3756 tso->sp = frame + sizeofW(StgStopFrame);
3760 // If we find a CATCH_FRAME, and we've got an exception to raise,
3761 // then build the THUNK raise(exception), and leave it on
3762 // top of the CATCH_FRAME ready to enter.
3766 StgCatchFrame *cf = (StgCatchFrame *)frame;
3770 if (exception == NULL) break;
3772 // we've got an exception to raise, so let's pass it to the
3773 // handler in this frame.
3775 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3776 TICK_ALLOC_SE_THK(1,0);
3777 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3778 raise->payload[0] = exception;
3780 // throw away the stack from Sp up to the CATCH_FRAME.
3784 /* Ensure that async excpetions are blocked now, so we don't get
3785 * a surprise exception before we get around to executing the
3788 if (tso->blocked_exceptions == NULL) {
3789 tso->blocked_exceptions = END_TSO_QUEUE;
3792 /* Put the newly-built THUNK on top of the stack, ready to execute
3793 * when the thread restarts.
3796 sp[-1] = (W_)&stg_enter_info;
3798 tso->what_next = ThreadRunGHC;
3799 IF_DEBUG(sanity, checkTSO(tso));
3803 case ATOMICALLY_FRAME:
3804 if (stop_at_atomically) {
3805 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3806 stmCondemnTransaction(cap, tso -> trec);
3810 // R1 is not a register: the return convention for IO in
3811 // this case puts the return value on the stack, so we
3812 // need to set up the stack to return to the atomically
3813 // frame properly...
3814 tso->sp = frame - 2;
3815 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3816 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3818 tso->what_next = ThreadRunGHC;
3821 // Not stop_at_atomically... fall through and abort the
3824 case CATCH_RETRY_FRAME:
3825 // IF we find an ATOMICALLY_FRAME then we abort the
3826 // current transaction and propagate the exception. In
3827 // this case (unlike ordinary exceptions) we do not care
3828 // whether the transaction is valid or not because its
3829 // possible validity cannot have caused the exception
3830 // and will not be visible after the abort.
3832 debugBelch("Found atomically block delivering async exception\n"));
3833 StgTRecHeader *trec = tso -> trec;
3834 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3835 stmAbortTransaction(cap, trec);
3836 tso -> trec = outer;
3843 // move on to the next stack frame
3844 frame += stack_frame_sizeW((StgClosure *)frame);
3847 // if we got here, then we stopped at stop_here
3848 ASSERT(stop_here != NULL);
3851 /* -----------------------------------------------------------------------------
3854 This is used for interruption (^C) and forking, and corresponds to
3855 raising an exception but without letting the thread catch the
3857 -------------------------------------------------------------------------- */
3860 deleteThread (Capability *cap, StgTSO *tso)
3862 if (tso->why_blocked != BlockedOnCCall &&
3863 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3864 raiseAsync(cap,tso,NULL);
3868 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3870 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3871 { // for forkProcess only:
3872 // delete thread without giving it a chance to catch the KillThread exception
3874 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3878 if (tso->why_blocked != BlockedOnCCall &&
3879 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3880 unblockThread(cap,tso);
3883 tso->what_next = ThreadKilled;
3887 /* -----------------------------------------------------------------------------
3888 raiseExceptionHelper
3890 This function is called by the raise# primitve, just so that we can
3891 move some of the tricky bits of raising an exception from C-- into
3892 C. Who knows, it might be a useful re-useable thing here too.
3893 -------------------------------------------------------------------------- */
3896 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3898 Capability *cap = regTableToCapability(reg);
3899 StgThunk *raise_closure = NULL;
3901 StgRetInfoTable *info;
3903 // This closure represents the expression 'raise# E' where E
3904 // is the exception raise. It is used to overwrite all the
3905 // thunks which are currently under evaluataion.
3909 // LDV profiling: stg_raise_info has THUNK as its closure
3910 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3911 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3912 // 1 does not cause any problem unless profiling is performed.
3913 // However, when LDV profiling goes on, we need to linearly scan
3914 // small object pool, where raise_closure is stored, so we should
3915 // use MIN_UPD_SIZE.
3917 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3918 // sizeofW(StgClosure)+1);
3922 // Walk up the stack, looking for the catch frame. On the way,
3923 // we update any closures pointed to from update frames with the
3924 // raise closure that we just built.
3928 info = get_ret_itbl((StgClosure *)p);
3929 next = p + stack_frame_sizeW((StgClosure *)p);
3930 switch (info->i.type) {
3933 // Only create raise_closure if we need to.
3934 if (raise_closure == NULL) {
3936 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3937 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3938 raise_closure->payload[0] = exception;
3940 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3944 case ATOMICALLY_FRAME:
3945 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3947 return ATOMICALLY_FRAME;
3953 case CATCH_STM_FRAME:
3954 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3956 return CATCH_STM_FRAME;
3962 case CATCH_RETRY_FRAME:
3971 /* -----------------------------------------------------------------------------
3972 findRetryFrameHelper
3974 This function is called by the retry# primitive. It traverses the stack
3975 leaving tso->sp referring to the frame which should handle the retry.
3977 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3978 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3980 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3981 despite the similar implementation.
3983 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3984 not be created within memory transactions.
3985 -------------------------------------------------------------------------- */
3988 findRetryFrameHelper (StgTSO *tso)
3991 StgRetInfoTable *info;
3995 info = get_ret_itbl((StgClosure *)p);
3996 next = p + stack_frame_sizeW((StgClosure *)p);
3997 switch (info->i.type) {
3999 case ATOMICALLY_FRAME:
4000 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4002 return ATOMICALLY_FRAME;
4004 case CATCH_RETRY_FRAME:
4005 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4007 return CATCH_RETRY_FRAME;
4009 case CATCH_STM_FRAME:
4011 ASSERT(info->i.type != CATCH_FRAME);
4012 ASSERT(info->i.type != STOP_FRAME);
4019 /* -----------------------------------------------------------------------------
4020 resurrectThreads is called after garbage collection on the list of
4021 threads found to be garbage. Each of these threads will be woken
4022 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4023 on an MVar, or NonTermination if the thread was blocked on a Black
4026 Locks: assumes we hold *all* the capabilities.
4027 -------------------------------------------------------------------------- */
4030 resurrectThreads (StgTSO *threads)
4035 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4036 next = tso->global_link;
4037 tso->global_link = all_threads;
4039 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4041 // Wake up the thread on the Capability it was last on for a
4042 // bound thread, or last_free_capability otherwise.
4044 cap = tso->bound->cap;
4046 cap = last_free_capability;
4049 switch (tso->why_blocked) {
4051 case BlockedOnException:
4052 /* Called by GC - sched_mutex lock is currently held. */
4053 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4055 case BlockedOnBlackHole:
4056 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4059 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4062 /* This might happen if the thread was blocked on a black hole
4063 * belonging to a thread that we've just woken up (raiseAsync
4064 * can wake up threads, remember...).
4068 barf("resurrectThreads: thread blocked in a strange way");
4073 /* ----------------------------------------------------------------------------
4074 * Debugging: why is a thread blocked
4075 * [Also provides useful information when debugging threaded programs
4076 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4077 ------------------------------------------------------------------------- */
4081 printThreadBlockage(StgTSO *tso)
4083 switch (tso->why_blocked) {
4085 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4087 case BlockedOnWrite:
4088 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4090 #if defined(mingw32_HOST_OS)
4091 case BlockedOnDoProc:
4092 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4095 case BlockedOnDelay:
4096 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4099 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4101 case BlockedOnException:
4102 debugBelch("is blocked on delivering an exception to thread %d",
4103 tso->block_info.tso->id);
4105 case BlockedOnBlackHole:
4106 debugBelch("is blocked on a black hole");
4109 debugBelch("is not blocked");
4111 #if defined(PARALLEL_HASKELL)
4113 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4114 tso->block_info.closure, info_type(tso->block_info.closure));
4116 case BlockedOnGA_NoSend:
4117 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4118 tso->block_info.closure, info_type(tso->block_info.closure));
4121 case BlockedOnCCall:
4122 debugBelch("is blocked on an external call");
4124 case BlockedOnCCall_NoUnblockExc:
4125 debugBelch("is blocked on an external call (exceptions were already blocked)");
4128 debugBelch("is blocked on an STM operation");
4131 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4132 tso->why_blocked, tso->id, tso);
4137 printThreadStatus(StgTSO *t)
4139 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4141 void *label = lookupThreadLabel(t->id);
4142 if (label) debugBelch("[\"%s\"] ",(char *)label);
4144 if (t->what_next == ThreadRelocated) {
4145 debugBelch("has been relocated...\n");
4147 switch (t->what_next) {
4149 debugBelch("has been killed");
4151 case ThreadComplete:
4152 debugBelch("has completed");
4155 printThreadBlockage(t);
4162 printAllThreads(void)
4169 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4170 ullong_format_string(TIME_ON_PROC(CurrentProc),
4171 time_string, rtsFalse/*no commas!*/);
4173 debugBelch("all threads at [%s]:\n", time_string);
4174 # elif defined(PARALLEL_HASKELL)
4175 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4176 ullong_format_string(CURRENT_TIME,
4177 time_string, rtsFalse/*no commas!*/);
4179 debugBelch("all threads at [%s]:\n", time_string);
4181 debugBelch("all threads:\n");
4184 for (i = 0; i < n_capabilities; i++) {
4185 cap = &capabilities[i];
4186 debugBelch("threads on capability %d:\n", cap->no);
4187 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4188 printThreadStatus(t);
4192 debugBelch("other threads:\n");
4193 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4194 if (t->why_blocked != NotBlocked) {
4195 printThreadStatus(t);
4197 if (t->what_next == ThreadRelocated) {
4200 next = t->global_link;
4207 printThreadQueue(StgTSO *t)
4210 for (; t != END_TSO_QUEUE; t = t->link) {
4211 printThreadStatus(t);
4214 debugBelch("%d threads on queue\n", i);
4218 Print a whole blocking queue attached to node (debugging only).
4220 # if defined(PARALLEL_HASKELL)
4222 print_bq (StgClosure *node)
4224 StgBlockingQueueElement *bqe;
4228 debugBelch("## BQ of closure %p (%s): ",
4229 node, info_type(node));
4231 /* should cover all closures that may have a blocking queue */
4232 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4233 get_itbl(node)->type == FETCH_ME_BQ ||
4234 get_itbl(node)->type == RBH ||
4235 get_itbl(node)->type == MVAR);
4237 ASSERT(node!=(StgClosure*)NULL); // sanity check
4239 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4243 Print a whole blocking queue starting with the element bqe.
4246 print_bqe (StgBlockingQueueElement *bqe)
4251 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4253 for (end = (bqe==END_BQ_QUEUE);
4254 !end; // iterate until bqe points to a CONSTR
4255 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4256 bqe = end ? END_BQ_QUEUE : bqe->link) {
4257 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4258 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4259 /* types of closures that may appear in a blocking queue */
4260 ASSERT(get_itbl(bqe)->type == TSO ||
4261 get_itbl(bqe)->type == BLOCKED_FETCH ||
4262 get_itbl(bqe)->type == CONSTR);
4263 /* only BQs of an RBH end with an RBH_Save closure */
4264 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4266 switch (get_itbl(bqe)->type) {
4268 debugBelch(" TSO %u (%x),",
4269 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4272 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4273 ((StgBlockedFetch *)bqe)->node,
4274 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4275 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4276 ((StgBlockedFetch *)bqe)->ga.weight);
4279 debugBelch(" %s (IP %p),",
4280 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4281 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4282 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4283 "RBH_Save_?"), get_itbl(bqe));
4286 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4287 info_type((StgClosure *)bqe)); // , node, info_type(node));
4293 # elif defined(GRAN)
4295 print_bq (StgClosure *node)
4297 StgBlockingQueueElement *bqe;
4298 PEs node_loc, tso_loc;
4301 /* should cover all closures that may have a blocking queue */
4302 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4303 get_itbl(node)->type == FETCH_ME_BQ ||
4304 get_itbl(node)->type == RBH);
4306 ASSERT(node!=(StgClosure*)NULL); // sanity check
4307 node_loc = where_is(node);
4309 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4310 node, info_type(node), node_loc);
4313 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4315 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4316 !end; // iterate until bqe points to a CONSTR
4317 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4318 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4319 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4320 /* types of closures that may appear in a blocking queue */
4321 ASSERT(get_itbl(bqe)->type == TSO ||
4322 get_itbl(bqe)->type == CONSTR);
4323 /* only BQs of an RBH end with an RBH_Save closure */
4324 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4326 tso_loc = where_is((StgClosure *)bqe);
4327 switch (get_itbl(bqe)->type) {
4329 debugBelch(" TSO %d (%p) on [PE %d],",
4330 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4333 debugBelch(" %s (IP %p),",
4334 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4335 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4336 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4337 "RBH_Save_?"), get_itbl(bqe));
4340 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4341 info_type((StgClosure *)bqe), node, info_type(node));
4349 #if defined(PARALLEL_HASKELL)
4356 for (i=0, tso=run_queue_hd;
4357 tso != END_TSO_QUEUE;
4358 i++, tso=tso->link) {
4367 sched_belch(char *s, ...)
4372 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4373 #elif defined(PARALLEL_HASKELL)
4376 debugBelch("sched: ");