1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
79 #define USED_WHEN_THREADED_RTS STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
86 #define USED_WHEN_SMP STG_UNUSED
89 /* -----------------------------------------------------------------------------
91 * -------------------------------------------------------------------------- */
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
99 In GranSim we have a runnable and a blocked queue for each processor.
100 In order to minimise code changes new arrays run_queue_hds/tls
101 are created. run_queue_hd is then a short cut (macro) for
102 run_queue_hds[CurrentProc] (see GranSim.h).
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109 the std RTS (i.e. we are cheating). However, we don't use this list in
110 the GranSim specific code at the moment (so we are only potentially
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
122 /* Threads blocked on blackholes.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *blackhole_queue = NULL;
128 /* The blackhole_queue should be checked for threads to wake up. See
129 * Schedule.h for more thorough comment.
130 * LOCK: none (doesn't matter if we miss an update)
132 rtsBool blackholes_need_checking = rtsFalse;
134 /* Linked list of all threads.
135 * Used for detecting garbage collected threads.
136 * LOCK: sched_mutex+capability, or all capabilities
138 StgTSO *all_threads = NULL;
140 /* flag set by signal handler to precipitate a context switch
141 * LOCK: none (just an advisory flag)
143 int context_switch = 0;
145 /* flag that tracks whether we have done any execution in this time slice.
146 * LOCK: currently none, perhaps we should lock (but needs to be
147 * updated in the fast path of the scheduler).
149 nat recent_activity = ACTIVITY_YES;
151 /* if this flag is set as well, give up execution
152 * LOCK: none (changes once, from false->true)
154 rtsBool interrupted = rtsFalse;
156 /* Next thread ID to allocate.
159 static StgThreadID next_thread_id = 1;
161 /* The smallest stack size that makes any sense is:
162 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
163 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
164 * + 1 (the closure to enter)
166 * + 1 (spare slot req'd by stg_ap_v_ret)
168 * A thread with this stack will bomb immediately with a stack
169 * overflow, which will increase its stack size.
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
177 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
178 * exists - earlier gccs apparently didn't.
184 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185 * in an MT setting, needed to signal that a worker thread shouldn't hang around
186 * in the scheduler when it is out of work.
188 rtsBool shutting_down_scheduler = rtsFalse;
191 * This mutex protects most of the global scheduler data in
192 * the THREADED_RTS and (inc. SMP) runtime.
194 #if defined(THREADED_RTS)
198 #if defined(PARALLEL_HASKELL)
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
204 /* -----------------------------------------------------------------------------
205 * static function prototypes
206 * -------------------------------------------------------------------------- */
208 static Capability *schedule (Capability *initialCapability, Task *task);
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
215 static void schedulePreLoop (void);
217 static void schedulePushWork(Capability *cap, Task *task);
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
239 nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
253 rtsBool stop_at_atomically, StgPtr stop_here);
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);
270 static char *whatNext_strs[] = {
280 /* -----------------------------------------------------------------------------
281 * Putting a thread on the run queue: different scheduling policies
282 * -------------------------------------------------------------------------- */
285 addToRunQueue( Capability *cap, StgTSO *t )
287 #if defined(PARALLEL_HASKELL)
288 if (RtsFlags.ParFlags.doFairScheduling) {
289 // this does round-robin scheduling; good for concurrency
290 appendToRunQueue(cap,t);
292 // this does unfair scheduling; good for parallelism
293 pushOnRunQueue(cap,t);
296 // this does round-robin scheduling; good for concurrency
297 appendToRunQueue(cap,t);
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
314 In a GranSim setup this loop iterates over the global event queue.
315 This revolves around the global event queue, which determines what
316 to do next. Therefore, it's more complicated than either the
317 concurrent or the parallel (GUM) setup.
320 GUM iterates over incoming messages.
321 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322 and sends out a fish whenever it has nothing to do; in-between
323 doing the actual reductions (shared code below) it processes the
324 incoming messages and deals with delayed operations
325 (see PendingFetches).
326 This is not the ugliest code you could imagine, but it's bloody close.
328 ------------------------------------------------------------------------ */
331 schedule (Capability *initialCapability, Task *task)
335 StgThreadReturnCode ret;
338 #elif defined(PARALLEL_HASKELL)
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
348 #if defined(THREADED_RTS)
349 rtsBool first = rtsTrue;
352 cap = initialCapability;
354 // Pre-condition: this task owns initialCapability.
355 // The sched_mutex is *NOT* held
356 // NB. on return, we still hold a capability.
359 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360 task, initialCapability);
365 // -----------------------------------------------------------
366 // Scheduler loop starts here:
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION (!receivedFinish)
371 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
373 #define TERMINATION_CONDITION rtsTrue
376 while (TERMINATION_CONDITION) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
384 #if defined(THREADED_RTS)
386 // don't yield the first time, we want a chance to run this
387 // thread for a bit, even if there are others banging at the
390 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
392 // Yield the capability to higher-priority tasks if necessary.
393 yieldCapability(&cap, task);
398 schedulePushWork(cap,task);
401 // Check whether we have re-entered the RTS from Haskell without
402 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
404 if (cap->in_haskell) {
405 errorBelch("schedule: re-entered unsafely.\n"
406 " Perhaps a 'foreign import unsafe' should be 'safe'?");
407 stg_exit(EXIT_FAILURE);
411 // Test for interruption. If interrupted==rtsTrue, then either
412 // we received a keyboard interrupt (^C), or the scheduler is
413 // trying to shut down all the tasks (shutting_down_scheduler) in
419 discardSparksCap(cap);
421 if (shutting_down_scheduler) {
422 IF_DEBUG(scheduler, sched_belch("shutting down"));
423 // If we are a worker, just exit. If we're a bound thread
424 // then we will exit below when we've removed our TSO from
426 if (task->tso == NULL && emptyRunQueue(cap)) {
430 IF_DEBUG(scheduler, sched_belch("interrupted"));
435 // If the run queue is empty, take a spark and turn it into a thread.
437 if (emptyRunQueue(cap)) {
439 spark = findSpark(cap);
442 sched_belch("turning spark of closure %p into a thread",
443 (StgClosure *)spark));
444 createSparkThread(cap,spark);
450 scheduleStartSignalHandlers(cap);
452 // Only check the black holes here if we've nothing else to do.
453 // During normal execution, the black hole list only gets checked
454 // at GC time, to avoid repeatedly traversing this possibly long
455 // list each time around the scheduler.
456 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
458 scheduleCheckBlockedThreads(cap);
460 scheduleDetectDeadlock(cap,task);
462 // Normally, the only way we can get here with no threads to
463 // run is if a keyboard interrupt received during
464 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465 // Additionally, it is not fatal for the
466 // threaded RTS to reach here with no threads to run.
468 // win32: might be here due to awaitEvent() being abandoned
469 // as a result of a console event having been delivered.
470 if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474 continue; // nothing to do
477 #if defined(PARALLEL_HASKELL)
478 scheduleSendPendingMessages();
479 if (emptyRunQueue(cap) && scheduleActivateSpark())
483 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
486 /* If we still have no work we need to send a FISH to get a spark
488 if (emptyRunQueue(cap)) {
489 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490 ASSERT(rtsFalse); // should not happen at the moment
492 // from here: non-empty run queue.
493 // TODO: merge above case with this, only one call processMessages() !
494 if (PacketsWaiting()) { /* process incoming messages, if
495 any pending... only in else
496 because getRemoteWork waits for
498 receivedFinish = processMessages();
503 scheduleProcessEvent(event);
507 // Get a thread to run
509 t = popRunQueue(cap);
511 #if defined(GRAN) || defined(PAR)
512 scheduleGranParReport(); // some kind of debuging output
514 // Sanity check the thread we're about to run. This can be
515 // expensive if there is lots of thread switching going on...
516 IF_DEBUG(sanity,checkTSO(t));
519 #if defined(THREADED_RTS)
520 // Check whether we can run this thread in the current task.
521 // If not, we have to pass our capability to the right task.
523 Task *bound = t->bound;
528 sched_belch("### Running thread %d in bound thread",
530 // yes, the Haskell thread is bound to the current native thread
533 sched_belch("### thread %d bound to another OS thread",
535 // no, bound to a different Haskell thread: pass to that thread
536 pushOnRunQueue(cap,t);
540 // The thread we want to run is unbound.
543 sched_belch("### this OS thread cannot run thread %d", t->id));
544 // no, the current native thread is bound to a different
545 // Haskell thread, so pass it to any worker thread
546 pushOnRunQueue(cap,t);
553 cap->r.rCurrentTSO = t;
555 /* context switches are initiated by the timer signal, unless
556 * the user specified "context switch as often as possible", with
559 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560 && !emptyThreadQueues(cap)) {
566 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
567 (long)t->id, whatNext_strs[t->what_next]));
569 #if defined(PROFILING)
570 startHeapProfTimer();
573 // ----------------------------------------------------------------------
574 // Run the current thread
576 prev_what_next = t->what_next;
578 errno = t->saved_errno;
579 cap->in_haskell = rtsTrue;
581 recent_activity = ACTIVITY_YES;
583 switch (prev_what_next) {
587 /* Thread already finished, return to scheduler. */
588 ret = ThreadFinished;
594 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595 cap = regTableToCapability(r);
600 case ThreadInterpret:
601 cap = interpretBCO(cap);
606 barf("schedule: invalid what_next field");
609 cap->in_haskell = rtsFalse;
611 // The TSO might have moved, eg. if it re-entered the RTS and a GC
612 // happened. So find the new location:
613 t = cap->r.rCurrentTSO;
615 // We have run some Haskell code: there might be blackhole-blocked
616 // threads to wake up now.
617 // Lock-free test here should be ok, we're just setting a flag.
618 if ( blackhole_queue != END_TSO_QUEUE ) {
619 blackholes_need_checking = rtsTrue;
622 // And save the current errno in this thread.
623 // XXX: possibly bogus for SMP because this thread might already
624 // be running again, see code below.
625 t->saved_errno = errno;
628 // If ret is ThreadBlocked, and this Task is bound to the TSO that
629 // blocked, we are in limbo - the TSO is now owned by whatever it
630 // is blocked on, and may in fact already have been woken up,
631 // perhaps even on a different Capability. It may be the case
632 // that task->cap != cap. We better yield this Capability
633 // immediately and return to normaility.
634 if (ret == ThreadBlocked) {
636 debugBelch("--<< thread %d (%s) stopped: blocked\n",
637 t->id, whatNext_strs[t->what_next]));
642 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
644 // ----------------------------------------------------------------------
646 // Costs for the scheduler are assigned to CCS_SYSTEM
647 #if defined(PROFILING)
652 #if defined(THREADED_RTS)
653 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
654 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
655 IF_DEBUG(scheduler,debugBelch("sched: "););
658 schedulePostRunThread();
660 ready_to_gc = rtsFalse;
664 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
668 scheduleHandleStackOverflow(cap,task,t);
672 if (scheduleHandleYield(cap, t, prev_what_next)) {
673 // shortcut for switching between compiler/interpreter:
679 scheduleHandleThreadBlocked(t);
683 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
684 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
688 barf("schedule: invalid thread return code %d", (int)ret);
691 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
692 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
693 } /* end of while() */
695 IF_PAR_DEBUG(verbose,
696 debugBelch("== Leaving schedule() after having received Finish\n"));
699 /* ----------------------------------------------------------------------------
700 * Setting up the scheduler loop
701 * ------------------------------------------------------------------------- */
704 schedulePreLoop(void)
707 /* set up first event to get things going */
708 /* ToDo: assign costs for system setup and init MainTSO ! */
709 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
714 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
716 G_TSO(CurrentTSO, 5));
718 if (RtsFlags.GranFlags.Light) {
719 /* Save current time; GranSim Light only */
720 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
725 /* -----------------------------------------------------------------------------
728 * Push work to other Capabilities if we have some.
729 * -------------------------------------------------------------------------- */
733 schedulePushWork(Capability *cap USED_WHEN_SMP,
734 Task *task USED_WHEN_SMP)
736 Capability *free_caps[n_capabilities], *cap0;
739 // Check whether we have more threads on our run queue, or sparks
740 // in our pool, that we could hand to another Capability.
741 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
742 && sparkPoolSizeCap(cap) < 2) {
746 // First grab as many free Capabilities as we can.
747 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748 cap0 = &capabilities[i];
749 if (cap != cap0 && tryGrabCapability(cap0,task)) {
750 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751 // it already has some work, we just grabbed it at
752 // the wrong moment. Or maybe it's deadlocked!
753 releaseCapability(cap0);
755 free_caps[n_free_caps++] = cap0;
760 // we now have n_free_caps free capabilities stashed in
761 // free_caps[]. Share our run queue equally with them. This is
762 // probably the simplest thing we could do; improvements we might
763 // want to do include:
765 // - giving high priority to moving relatively new threads, on
766 // the gournds that they haven't had time to build up a
767 // working set in the cache on this CPU/Capability.
769 // - giving low priority to moving long-lived threads
771 if (n_free_caps > 0) {
772 StgTSO *prev, *t, *next;
773 rtsBool pushed_to_all;
775 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
778 pushed_to_all = rtsFalse;
780 if (cap->run_queue_hd != END_TSO_QUEUE) {
781 prev = cap->run_queue_hd;
783 prev->link = END_TSO_QUEUE;
784 for (; t != END_TSO_QUEUE; t = next) {
786 t->link = END_TSO_QUEUE;
787 if (t->what_next == ThreadRelocated
788 || t->bound == task) { // don't move my bound thread
791 } else if (i == n_free_caps) {
792 pushed_to_all = rtsTrue;
798 IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
799 appendToRunQueue(free_caps[i],t);
800 if (t->bound) { t->bound->cap = free_caps[i]; }
804 cap->run_queue_tl = prev;
807 // If there are some free capabilities that we didn't push any
808 // threads to, then try to push a spark to each one.
809 if (!pushed_to_all) {
811 // i is the next free capability to push to
812 for (; i < n_free_caps; i++) {
813 if (emptySparkPoolCap(free_caps[i])) {
814 spark = findSpark(cap);
816 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
817 newSpark(&(free_caps[i]->r), spark);
823 // release the capabilities
824 for (i = 0; i < n_free_caps; i++) {
825 task->cap = free_caps[i];
826 releaseCapability(free_caps[i]);
829 task->cap = cap; // reset to point to our Capability.
833 /* ----------------------------------------------------------------------------
834 * Start any pending signal handlers
835 * ------------------------------------------------------------------------- */
837 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
839 scheduleStartSignalHandlers(Capability *cap)
841 if (signals_pending()) { // safe outside the lock
842 startSignalHandlers(cap);
847 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
852 /* ----------------------------------------------------------------------------
853 * Check for blocked threads that can be woken up.
854 * ------------------------------------------------------------------------- */
857 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
859 #if !defined(THREADED_RTS)
861 // Check whether any waiting threads need to be woken up. If the
862 // run queue is empty, and there are no other tasks running, we
863 // can wait indefinitely for something to happen.
865 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
867 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
873 /* ----------------------------------------------------------------------------
874 * Check for threads blocked on BLACKHOLEs that can be woken up
875 * ------------------------------------------------------------------------- */
877 scheduleCheckBlackHoles (Capability *cap)
879 if ( blackholes_need_checking ) // check without the lock first
881 ACQUIRE_LOCK(&sched_mutex);
882 if ( blackholes_need_checking ) {
883 checkBlackHoles(cap);
884 blackholes_need_checking = rtsFalse;
886 RELEASE_LOCK(&sched_mutex);
890 /* ----------------------------------------------------------------------------
891 * Detect deadlock conditions and attempt to resolve them.
892 * ------------------------------------------------------------------------- */
895 scheduleDetectDeadlock (Capability *cap, Task *task)
898 #if defined(PARALLEL_HASKELL)
899 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
904 * Detect deadlock: when we have no threads to run, there are no
905 * threads blocked, waiting for I/O, or sleeping, and all the
906 * other tasks are waiting for work, we must have a deadlock of
909 if ( emptyThreadQueues(cap) )
911 #if defined(THREADED_RTS)
913 * In the threaded RTS, we only check for deadlock if there
914 * has been no activity in a complete timeslice. This means
915 * we won't eagerly start a full GC just because we don't have
916 * any threads to run currently.
918 if (recent_activity != ACTIVITY_INACTIVE) return;
921 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
923 // Garbage collection can release some new threads due to
924 // either (a) finalizers or (b) threads resurrected because
925 // they are unreachable and will therefore be sent an
926 // exception. Any threads thus released will be immediately
928 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
929 recent_activity = ACTIVITY_DONE_GC;
931 if ( !emptyRunQueue(cap) ) return;
933 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
934 /* If we have user-installed signal handlers, then wait
935 * for signals to arrive rather then bombing out with a
938 if ( anyUserHandlers() ) {
940 sched_belch("still deadlocked, waiting for signals..."));
944 if (signals_pending()) {
945 startSignalHandlers(cap);
948 // either we have threads to run, or we were interrupted:
949 ASSERT(!emptyRunQueue(cap) || interrupted);
953 #if !defined(THREADED_RTS)
954 /* Probably a real deadlock. Send the current main thread the
955 * Deadlock exception.
958 switch (task->tso->why_blocked) {
960 case BlockedOnBlackHole:
961 case BlockedOnException:
963 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
966 barf("deadlock: main thread blocked in a strange way");
974 /* ----------------------------------------------------------------------------
975 * Process an event (GRAN only)
976 * ------------------------------------------------------------------------- */
980 scheduleProcessEvent(rtsEvent *event)
984 if (RtsFlags.GranFlags.Light)
985 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
987 /* adjust time based on time-stamp */
988 if (event->time > CurrentTime[CurrentProc] &&
989 event->evttype != ContinueThread)
990 CurrentTime[CurrentProc] = event->time;
992 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
993 if (!RtsFlags.GranFlags.Light)
996 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
998 /* main event dispatcher in GranSim */
999 switch (event->evttype) {
1000 /* Should just be continuing execution */
1001 case ContinueThread:
1002 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1003 /* ToDo: check assertion
1004 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1005 run_queue_hd != END_TSO_QUEUE);
1007 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1008 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1009 procStatus[CurrentProc]==Fetching) {
1010 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1011 CurrentTSO->id, CurrentTSO, CurrentProc);
1014 /* Ignore ContinueThreads for completed threads */
1015 if (CurrentTSO->what_next == ThreadComplete) {
1016 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1017 CurrentTSO->id, CurrentTSO, CurrentProc);
1020 /* Ignore ContinueThreads for threads that are being migrated */
1021 if (PROCS(CurrentTSO)==Nowhere) {
1022 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1023 CurrentTSO->id, CurrentTSO, CurrentProc);
1026 /* The thread should be at the beginning of the run queue */
1027 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1028 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1029 CurrentTSO->id, CurrentTSO, CurrentProc);
1030 break; // run the thread anyway
1033 new_event(proc, proc, CurrentTime[proc],
1035 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1037 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1038 break; // now actually run the thread; DaH Qu'vam yImuHbej
1041 do_the_fetchnode(event);
1042 goto next_thread; /* handle next event in event queue */
1045 do_the_globalblock(event);
1046 goto next_thread; /* handle next event in event queue */
1049 do_the_fetchreply(event);
1050 goto next_thread; /* handle next event in event queue */
1052 case UnblockThread: /* Move from the blocked queue to the tail of */
1053 do_the_unblock(event);
1054 goto next_thread; /* handle next event in event queue */
1056 case ResumeThread: /* Move from the blocked queue to the tail of */
1057 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1058 event->tso->gran.blocktime +=
1059 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1060 do_the_startthread(event);
1061 goto next_thread; /* handle next event in event queue */
1064 do_the_startthread(event);
1065 goto next_thread; /* handle next event in event queue */
1068 do_the_movethread(event);
1069 goto next_thread; /* handle next event in event queue */
1072 do_the_movespark(event);
1073 goto next_thread; /* handle next event in event queue */
1076 do_the_findwork(event);
1077 goto next_thread; /* handle next event in event queue */
1080 barf("Illegal event type %u\n", event->evttype);
1083 /* This point was scheduler_loop in the old RTS */
1085 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1087 TimeOfLastEvent = CurrentTime[CurrentProc];
1088 TimeOfNextEvent = get_time_of_next_event();
1089 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1090 // CurrentTSO = ThreadQueueHd;
1092 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1095 if (RtsFlags.GranFlags.Light)
1096 GranSimLight_leave_system(event, &ActiveTSO);
1098 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1101 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1103 /* in a GranSim setup the TSO stays on the run queue */
1105 /* Take a thread from the run queue. */
1106 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1109 debugBelch("GRAN: About to run current thread, which is\n");
1112 context_switch = 0; // turned on via GranYield, checking events and time slice
1115 DumpGranEvent(GR_SCHEDULE, t));
1117 procStatus[CurrentProc] = Busy;
1121 /* ----------------------------------------------------------------------------
1122 * Send pending messages (PARALLEL_HASKELL only)
1123 * ------------------------------------------------------------------------- */
1125 #if defined(PARALLEL_HASKELL)
1127 scheduleSendPendingMessages(void)
1133 # if defined(PAR) // global Mem.Mgmt., omit for now
1134 if (PendingFetches != END_BF_QUEUE) {
1139 if (RtsFlags.ParFlags.BufferTime) {
1140 // if we use message buffering, we must send away all message
1141 // packets which have become too old...
1147 /* ----------------------------------------------------------------------------
1148 * Activate spark threads (PARALLEL_HASKELL only)
1149 * ------------------------------------------------------------------------- */
1151 #if defined(PARALLEL_HASKELL)
1153 scheduleActivateSpark(void)
1156 ASSERT(emptyRunQueue());
1157 /* We get here if the run queue is empty and want some work.
1158 We try to turn a spark into a thread, and add it to the run queue,
1159 from where it will be picked up in the next iteration of the scheduler
1163 /* :-[ no local threads => look out for local sparks */
1164 /* the spark pool for the current PE */
1165 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1166 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1167 pool->hd < pool->tl) {
1169 * ToDo: add GC code check that we really have enough heap afterwards!!
1171 * If we're here (no runnable threads) and we have pending
1172 * sparks, we must have a space problem. Get enough space
1173 * to turn one of those pending sparks into a
1177 spark = findSpark(rtsFalse); /* get a spark */
1178 if (spark != (rtsSpark) NULL) {
1179 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1180 IF_PAR_DEBUG(fish, // schedule,
1181 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1182 tso->id, tso, advisory_thread_count));
1184 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1185 IF_PAR_DEBUG(fish, // schedule,
1186 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1188 return rtsFalse; /* failed to generate a thread */
1189 } /* otherwise fall through & pick-up new tso */
1191 IF_PAR_DEBUG(fish, // schedule,
1192 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1193 spark_queue_len(pool)));
1194 return rtsFalse; /* failed to generate a thread */
1196 return rtsTrue; /* success in generating a thread */
1197 } else { /* no more threads permitted or pool empty */
1198 return rtsFalse; /* failed to generateThread */
1201 tso = NULL; // avoid compiler warning only
1202 return rtsFalse; /* dummy in non-PAR setup */
1205 #endif // PARALLEL_HASKELL
1207 /* ----------------------------------------------------------------------------
1208 * Get work from a remote node (PARALLEL_HASKELL only)
1209 * ------------------------------------------------------------------------- */
1211 #if defined(PARALLEL_HASKELL)
1213 scheduleGetRemoteWork(rtsBool *receivedFinish)
1215 ASSERT(emptyRunQueue());
1217 if (RtsFlags.ParFlags.BufferTime) {
1218 IF_PAR_DEBUG(verbose,
1219 debugBelch("...send all pending data,"));
1222 for (i=1; i<=nPEs; i++)
1223 sendImmediately(i); // send all messages away immediately
1227 //++EDEN++ idle() , i.e. send all buffers, wait for work
1228 // suppress fishing in EDEN... just look for incoming messages
1229 // (blocking receive)
1230 IF_PAR_DEBUG(verbose,
1231 debugBelch("...wait for incoming messages...\n"));
1232 *receivedFinish = processMessages(); // blocking receive...
1234 // and reenter scheduling loop after having received something
1235 // (return rtsFalse below)
1237 # else /* activate SPARKS machinery */
1238 /* We get here, if we have no work, tried to activate a local spark, but still
1239 have no work. We try to get a remote spark, by sending a FISH message.
1240 Thread migration should be added here, and triggered when a sequence of
1241 fishes returns without work. */
1242 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1244 /* =8-[ no local sparks => look for work on other PEs */
1246 * We really have absolutely no work. Send out a fish
1247 * (there may be some out there already), and wait for
1248 * something to arrive. We clearly can't run any threads
1249 * until a SCHEDULE or RESUME arrives, and so that's what
1250 * we're hoping to see. (Of course, we still have to
1251 * respond to other types of messages.)
1253 rtsTime now = msTime() /*CURRENT_TIME*/;
1254 IF_PAR_DEBUG(verbose,
1255 debugBelch("-- now=%ld\n", now));
1256 IF_PAR_DEBUG(fish, // verbose,
1257 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1258 (last_fish_arrived_at!=0 &&
1259 last_fish_arrived_at+delay > now)) {
1260 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1261 now, last_fish_arrived_at+delay,
1262 last_fish_arrived_at,
1266 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1267 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1268 if (last_fish_arrived_at==0 ||
1269 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1270 /* outstandingFishes is set in sendFish, processFish;
1271 avoid flooding system with fishes via delay */
1272 next_fish_to_send_at = 0;
1274 /* ToDo: this should be done in the main scheduling loop to avoid the
1275 busy wait here; not so bad if fish delay is very small */
1276 int iq = 0; // DEBUGGING -- HWL
1277 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1278 /* send a fish when ready, but process messages that arrive in the meantime */
1280 if (PacketsWaiting()) {
1282 *receivedFinish = processMessages();
1285 } while (!*receivedFinish || now<next_fish_to_send_at);
1286 // JB: This means the fish could become obsolete, if we receive
1287 // work. Better check for work again?
1288 // last line: while (!receivedFinish || !haveWork || now<...)
1289 // next line: if (receivedFinish || haveWork )
1291 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1292 return rtsFalse; // NB: this will leave scheduler loop
1293 // immediately after return!
1295 IF_PAR_DEBUG(fish, // verbose,
1296 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1300 // JB: IMHO, this should all be hidden inside sendFish(...)
1302 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1305 // Global statistics: count no. of fishes
1306 if (RtsFlags.ParFlags.ParStats.Global &&
1307 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1308 globalParStats.tot_fish_mess++;
1312 /* delayed fishes must have been sent by now! */
1313 next_fish_to_send_at = 0;
1316 *receivedFinish = processMessages();
1317 # endif /* SPARKS */
1320 /* NB: this function always returns rtsFalse, meaning the scheduler
1321 loop continues with the next iteration;
1323 return code means success in finding work; we enter this function
1324 if there is no local work, thus have to send a fish which takes
1325 time until it arrives with work; in the meantime we should process
1326 messages in the main loop;
1329 #endif // PARALLEL_HASKELL
1331 /* ----------------------------------------------------------------------------
1332 * PAR/GRAN: Report stats & debugging info(?)
1333 * ------------------------------------------------------------------------- */
1335 #if defined(PAR) || defined(GRAN)
1337 scheduleGranParReport(void)
1339 ASSERT(run_queue_hd != END_TSO_QUEUE);
1341 /* Take a thread from the run queue, if we have work */
1342 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1344 /* If this TSO has got its outport closed in the meantime,
1345 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1346 * It has to be marked as TH_DEAD for this purpose.
1347 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1349 JB: TODO: investigate wether state change field could be nuked
1350 entirely and replaced by the normal tso state (whatnext
1351 field). All we want to do is to kill tsos from outside.
1354 /* ToDo: write something to the log-file
1355 if (RTSflags.ParFlags.granSimStats && !sameThread)
1356 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1360 /* the spark pool for the current PE */
1361 pool = &(cap.r.rSparks); // cap = (old) MainCap
1364 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1365 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1368 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1369 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1371 if (RtsFlags.ParFlags.ParStats.Full &&
1372 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1373 (emitSchedule || // forced emit
1374 (t && LastTSO && t->id != LastTSO->id))) {
1376 we are running a different TSO, so write a schedule event to log file
1377 NB: If we use fair scheduling we also have to write a deschedule
1378 event for LastTSO; with unfair scheduling we know that the
1379 previous tso has blocked whenever we switch to another tso, so
1380 we don't need it in GUM for now
1382 IF_PAR_DEBUG(fish, // schedule,
1383 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1385 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1386 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1387 emitSchedule = rtsFalse;
1392 /* ----------------------------------------------------------------------------
1393 * After running a thread...
1394 * ------------------------------------------------------------------------- */
1397 schedulePostRunThread(void)
1400 /* HACK 675: if the last thread didn't yield, make sure to print a
1401 SCHEDULE event to the log file when StgRunning the next thread, even
1402 if it is the same one as before */
1404 TimeOfLastYield = CURRENT_TIME;
1407 /* some statistics gathering in the parallel case */
1409 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1413 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1414 globalGranStats.tot_heapover++;
1416 globalParStats.tot_heapover++;
1423 DumpGranEvent(GR_DESCHEDULE, t));
1424 globalGranStats.tot_stackover++;
1427 // DumpGranEvent(GR_DESCHEDULE, t);
1428 globalParStats.tot_stackover++;
1432 case ThreadYielding:
1435 DumpGranEvent(GR_DESCHEDULE, t));
1436 globalGranStats.tot_yields++;
1439 // DumpGranEvent(GR_DESCHEDULE, t);
1440 globalParStats.tot_yields++;
1447 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1448 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1449 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1450 if (t->block_info.closure!=(StgClosure*)NULL)
1451 print_bq(t->block_info.closure);
1454 // ??? needed; should emit block before
1456 DumpGranEvent(GR_DESCHEDULE, t));
1457 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1460 ASSERT(procStatus[CurrentProc]==Busy ||
1461 ((procStatus[CurrentProc]==Fetching) &&
1462 (t->block_info.closure!=(StgClosure*)NULL)));
1463 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1464 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1465 procStatus[CurrentProc]==Fetching))
1466 procStatus[CurrentProc] = Idle;
1469 //++PAR++ blockThread() writes the event (change?)
1473 case ThreadFinished:
1477 barf("parGlobalStats: unknown return code");
1483 /* -----------------------------------------------------------------------------
1484 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1485 * -------------------------------------------------------------------------- */
1488 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1490 // did the task ask for a large block?
1491 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1492 // if so, get one and push it on the front of the nursery.
1496 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1499 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1500 (long)t->id, whatNext_strs[t->what_next], blocks));
1502 // don't do this if the nursery is (nearly) full, we'll GC first.
1503 if (cap->r.rCurrentNursery->link != NULL ||
1504 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1505 // if the nursery has only one block.
1508 bd = allocGroup( blocks );
1510 cap->r.rNursery->n_blocks += blocks;
1512 // link the new group into the list
1513 bd->link = cap->r.rCurrentNursery;
1514 bd->u.back = cap->r.rCurrentNursery->u.back;
1515 if (cap->r.rCurrentNursery->u.back != NULL) {
1516 cap->r.rCurrentNursery->u.back->link = bd;
1519 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1520 g0s0 == cap->r.rNursery);
1522 cap->r.rNursery->blocks = bd;
1524 cap->r.rCurrentNursery->u.back = bd;
1526 // initialise it as a nursery block. We initialise the
1527 // step, gen_no, and flags field of *every* sub-block in
1528 // this large block, because this is easier than making
1529 // sure that we always find the block head of a large
1530 // block whenever we call Bdescr() (eg. evacuate() and
1531 // isAlive() in the GC would both have to do this, at
1535 for (x = bd; x < bd + blocks; x++) {
1536 x->step = cap->r.rNursery;
1542 // This assert can be a killer if the app is doing lots
1543 // of large block allocations.
1544 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1546 // now update the nursery to point to the new block
1547 cap->r.rCurrentNursery = bd;
1549 // we might be unlucky and have another thread get on the
1550 // run queue before us and steal the large block, but in that
1551 // case the thread will just end up requesting another large
1553 pushOnRunQueue(cap,t);
1554 return rtsFalse; /* not actually GC'ing */
1559 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1560 (long)t->id, whatNext_strs[t->what_next]));
1562 ASSERT(!is_on_queue(t,CurrentProc));
1563 #elif defined(PARALLEL_HASKELL)
1564 /* Currently we emit a DESCHEDULE event before GC in GUM.
1565 ToDo: either add separate event to distinguish SYSTEM time from rest
1566 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1567 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1568 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1569 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1570 emitSchedule = rtsTrue;
1574 pushOnRunQueue(cap,t);
1576 /* actual GC is done at the end of the while loop in schedule() */
1579 /* -----------------------------------------------------------------------------
1580 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1581 * -------------------------------------------------------------------------- */
1584 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1586 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1587 (long)t->id, whatNext_strs[t->what_next]));
1588 /* just adjust the stack for this thread, then pop it back
1592 /* enlarge the stack */
1593 StgTSO *new_t = threadStackOverflow(cap, t);
1595 /* The TSO attached to this Task may have moved, so update the
1598 if (task->tso == t) {
1601 pushOnRunQueue(cap,new_t);
1605 /* -----------------------------------------------------------------------------
1606 * Handle a thread that returned to the scheduler with ThreadYielding
1607 * -------------------------------------------------------------------------- */
1610 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1612 // Reset the context switch flag. We don't do this just before
1613 // running the thread, because that would mean we would lose ticks
1614 // during GC, which can lead to unfair scheduling (a thread hogs
1615 // the CPU because the tick always arrives during GC). This way
1616 // penalises threads that do a lot of allocation, but that seems
1617 // better than the alternative.
1620 /* put the thread back on the run queue. Then, if we're ready to
1621 * GC, check whether this is the last task to stop. If so, wake
1622 * up the GC thread. getThread will block during a GC until the
1626 if (t->what_next != prev_what_next) {
1627 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1628 (long)t->id, whatNext_strs[t->what_next]);
1630 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1631 (long)t->id, whatNext_strs[t->what_next]);
1636 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1638 ASSERT(t->link == END_TSO_QUEUE);
1640 // Shortcut if we're just switching evaluators: don't bother
1641 // doing stack squeezing (which can be expensive), just run the
1643 if (t->what_next != prev_what_next) {
1648 ASSERT(!is_on_queue(t,CurrentProc));
1651 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1652 checkThreadQsSanity(rtsTrue));
1656 addToRunQueue(cap,t);
1659 /* add a ContinueThread event to actually process the thread */
1660 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1662 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1664 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1671 /* -----------------------------------------------------------------------------
1672 * Handle a thread that returned to the scheduler with ThreadBlocked
1673 * -------------------------------------------------------------------------- */
1676 scheduleHandleThreadBlocked( StgTSO *t
1677 #if !defined(GRAN) && !defined(DEBUG)
1684 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1685 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1686 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1688 // ??? needed; should emit block before
1690 DumpGranEvent(GR_DESCHEDULE, t));
1691 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1694 ASSERT(procStatus[CurrentProc]==Busy ||
1695 ((procStatus[CurrentProc]==Fetching) &&
1696 (t->block_info.closure!=(StgClosure*)NULL)));
1697 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1698 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1699 procStatus[CurrentProc]==Fetching))
1700 procStatus[CurrentProc] = Idle;
1704 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1705 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1708 if (t->block_info.closure!=(StgClosure*)NULL)
1709 print_bq(t->block_info.closure));
1711 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1714 /* whatever we schedule next, we must log that schedule */
1715 emitSchedule = rtsTrue;
1719 // We don't need to do anything. The thread is blocked, and it
1720 // has tidied up its stack and placed itself on whatever queue
1721 // it needs to be on.
1724 ASSERT(t->why_blocked != NotBlocked);
1725 // This might not be true under SMP: we don't have
1726 // exclusive access to this TSO, so someone might have
1727 // woken it up by now. This actually happens: try
1728 // conc023 +RTS -N2.
1732 debugBelch("--<< thread %d (%s) stopped: ",
1733 t->id, whatNext_strs[t->what_next]);
1734 printThreadBlockage(t);
1737 /* Only for dumping event to log file
1738 ToDo: do I need this in GranSim, too?
1744 /* -----------------------------------------------------------------------------
1745 * Handle a thread that returned to the scheduler with ThreadFinished
1746 * -------------------------------------------------------------------------- */
1749 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1751 /* Need to check whether this was a main thread, and if so,
1752 * return with the return value.
1754 * We also end up here if the thread kills itself with an
1755 * uncaught exception, see Exception.cmm.
1757 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1758 t->id, whatNext_strs[t->what_next]));
1761 endThread(t, CurrentProc); // clean-up the thread
1762 #elif defined(PARALLEL_HASKELL)
1763 /* For now all are advisory -- HWL */
1764 //if(t->priority==AdvisoryPriority) ??
1765 advisory_thread_count--; // JB: Caution with this counter, buggy!
1768 if(t->dist.priority==RevalPriority)
1772 # if defined(EDENOLD)
1773 // the thread could still have an outport... (BUG)
1774 if (t->eden.outport != -1) {
1775 // delete the outport for the tso which has finished...
1776 IF_PAR_DEBUG(eden_ports,
1777 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1778 t->eden.outport, t->id));
1781 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1782 if (t->eden.epid != -1) {
1783 IF_PAR_DEBUG(eden_ports,
1784 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1785 t->id, t->eden.epid));
1786 removeTSOfromProcess(t);
1791 if (RtsFlags.ParFlags.ParStats.Full &&
1792 !RtsFlags.ParFlags.ParStats.Suppressed)
1793 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1795 // t->par only contains statistics: left out for now...
1797 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1798 t->id,t,t->par.sparkname));
1800 #endif // PARALLEL_HASKELL
1803 // Check whether the thread that just completed was a bound
1804 // thread, and if so return with the result.
1806 // There is an assumption here that all thread completion goes
1807 // through this point; we need to make sure that if a thread
1808 // ends up in the ThreadKilled state, that it stays on the run
1809 // queue so it can be dealt with here.
1814 if (t->bound != task) {
1815 #if !defined(THREADED_RTS)
1816 // Must be a bound thread that is not the topmost one. Leave
1817 // it on the run queue until the stack has unwound to the
1818 // point where we can deal with this. Leaving it on the run
1819 // queue also ensures that the garbage collector knows about
1820 // this thread and its return value (it gets dropped from the
1821 // all_threads list so there's no other way to find it).
1822 appendToRunQueue(cap,t);
1825 // this cannot happen in the threaded RTS, because a
1826 // bound thread can only be run by the appropriate Task.
1827 barf("finished bound thread that isn't mine");
1831 ASSERT(task->tso == t);
1833 if (t->what_next == ThreadComplete) {
1835 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1836 *(task->ret) = (StgClosure *)task->tso->sp[1];
1838 task->stat = Success;
1841 *(task->ret) = NULL;
1844 task->stat = Interrupted;
1846 task->stat = Killed;
1850 removeThreadLabel((StgWord)task->tso->id);
1852 return rtsTrue; // tells schedule() to return
1858 /* -----------------------------------------------------------------------------
1859 * Perform a heap census, if PROFILING
1860 * -------------------------------------------------------------------------- */
1863 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1865 #if defined(PROFILING)
1866 // When we have +RTS -i0 and we're heap profiling, do a census at
1867 // every GC. This lets us get repeatable runs for debugging.
1868 if (performHeapProfile ||
1869 (RtsFlags.ProfFlags.profileInterval==0 &&
1870 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1871 GarbageCollect(GetRoots, rtsTrue);
1873 performHeapProfile = rtsFalse;
1874 return rtsTrue; // true <=> we already GC'd
1880 /* -----------------------------------------------------------------------------
1881 * Perform a garbage collection if necessary
1882 * -------------------------------------------------------------------------- */
1885 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1889 static volatile StgWord waiting_for_gc;
1890 rtsBool was_waiting;
1895 // In order to GC, there must be no threads running Haskell code.
1896 // Therefore, the GC thread needs to hold *all* the capabilities,
1897 // and release them after the GC has completed.
1899 // This seems to be the simplest way: previous attempts involved
1900 // making all the threads with capabilities give up their
1901 // capabilities and sleep except for the *last* one, which
1902 // actually did the GC. But it's quite hard to arrange for all
1903 // the other tasks to sleep and stay asleep.
1906 was_waiting = cas(&waiting_for_gc, 0, 1);
1909 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1910 yieldCapability(&cap,task);
1911 } while (waiting_for_gc);
1915 for (i=0; i < n_capabilities; i++) {
1916 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1917 if (cap != &capabilities[i]) {
1918 Capability *pcap = &capabilities[i];
1919 // we better hope this task doesn't get migrated to
1920 // another Capability while we're waiting for this one.
1921 // It won't, because load balancing happens while we have
1922 // all the Capabilities, but even so it's a slightly
1923 // unsavoury invariant.
1926 waitForReturnCapability(&pcap, task);
1927 if (pcap != &capabilities[i]) {
1928 barf("scheduleDoGC: got the wrong capability");
1933 waiting_for_gc = rtsFalse;
1936 /* Kick any transactions which are invalid back to their
1937 * atomically frames. When next scheduled they will try to
1938 * commit, this commit will fail and they will retry.
1943 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1944 if (t->what_next == ThreadRelocated) {
1947 next = t->global_link;
1948 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1949 if (!stmValidateNestOfTransactions (t -> trec)) {
1950 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1952 // strip the stack back to the
1953 // ATOMICALLY_FRAME, aborting the (nested)
1954 // transaction, and saving the stack of any
1955 // partially-evaluated thunks on the heap.
1956 raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1959 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1967 // so this happens periodically:
1968 scheduleCheckBlackHoles(cap);
1970 IF_DEBUG(scheduler, printAllThreads());
1972 /* everybody back, start the GC.
1973 * Could do it in this thread, or signal a condition var
1974 * to do it in another thread. Either way, we need to
1975 * broadcast on gc_pending_cond afterward.
1977 #if defined(THREADED_RTS)
1978 IF_DEBUG(scheduler,sched_belch("doing GC"));
1980 GarbageCollect(GetRoots, force_major);
1983 // release our stash of capabilities.
1984 for (i = 0; i < n_capabilities; i++) {
1985 if (cap != &capabilities[i]) {
1986 task->cap = &capabilities[i];
1987 releaseCapability(&capabilities[i]);
1994 /* add a ContinueThread event to continue execution of current thread */
1995 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1997 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1999 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2005 /* ---------------------------------------------------------------------------
2006 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2007 * used by Control.Concurrent for error checking.
2008 * ------------------------------------------------------------------------- */
2011 rtsSupportsBoundThreads(void)
2013 #if defined(THREADED_RTS)
2020 /* ---------------------------------------------------------------------------
2021 * isThreadBound(tso): check whether tso is bound to an OS thread.
2022 * ------------------------------------------------------------------------- */
2025 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2027 #if defined(THREADED_RTS)
2028 return (tso->bound != NULL);
2033 /* ---------------------------------------------------------------------------
2034 * Singleton fork(). Do not copy any running threads.
2035 * ------------------------------------------------------------------------- */
2037 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2038 #define FORKPROCESS_PRIMOP_SUPPORTED
2041 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2043 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2046 forkProcess(HsStablePtr *entry
2047 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2052 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2058 IF_DEBUG(scheduler,sched_belch("forking!"));
2060 // ToDo: for SMP, we should probably acquire *all* the capabilities
2065 if (pid) { // parent
2067 // just return the pid
2073 // delete all threads
2074 cap->run_queue_hd = END_TSO_QUEUE;
2075 cap->run_queue_tl = END_TSO_QUEUE;
2077 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2080 // don't allow threads to catch the ThreadKilled exception
2081 deleteThreadImmediately(cap,t);
2084 // wipe the task list
2085 ACQUIRE_LOCK(&sched_mutex);
2086 for (task = all_tasks; task != NULL; task=task->all_link) {
2087 if (task != cap->running_task) discardTask(task);
2089 RELEASE_LOCK(&sched_mutex);
2091 cap->suspended_ccalling_tasks = NULL;
2093 #if defined(THREADED_RTS)
2094 // wipe our spare workers list.
2095 cap->spare_workers = NULL;
2096 cap->returning_tasks_hd = NULL;
2097 cap->returning_tasks_tl = NULL;
2100 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2101 rts_checkSchedStatus("forkProcess",cap);
2104 hs_exit(); // clean up and exit
2105 stg_exit(EXIT_SUCCESS);
2107 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2108 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2113 /* ---------------------------------------------------------------------------
2114 * Delete the threads on the run queue of the current capability.
2115 * ------------------------------------------------------------------------- */
2118 deleteRunQueue (Capability *cap)
2121 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2122 ASSERT(t->what_next != ThreadRelocated);
2124 deleteThread(cap, t);
2128 /* startThread and insertThread are now in GranSim.c -- HWL */
2131 /* -----------------------------------------------------------------------------
2132 Managing the suspended_ccalling_tasks list.
2133 Locks required: sched_mutex
2134 -------------------------------------------------------------------------- */
2137 suspendTask (Capability *cap, Task *task)
2139 ASSERT(task->next == NULL && task->prev == NULL);
2140 task->next = cap->suspended_ccalling_tasks;
2142 if (cap->suspended_ccalling_tasks) {
2143 cap->suspended_ccalling_tasks->prev = task;
2145 cap->suspended_ccalling_tasks = task;
2149 recoverSuspendedTask (Capability *cap, Task *task)
2152 task->prev->next = task->next;
2154 ASSERT(cap->suspended_ccalling_tasks == task);
2155 cap->suspended_ccalling_tasks = task->next;
2158 task->next->prev = task->prev;
2160 task->next = task->prev = NULL;
2163 /* ---------------------------------------------------------------------------
2164 * Suspending & resuming Haskell threads.
2166 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2167 * its capability before calling the C function. This allows another
2168 * task to pick up the capability and carry on running Haskell
2169 * threads. It also means that if the C call blocks, it won't lock
2172 * The Haskell thread making the C call is put to sleep for the
2173 * duration of the call, on the susepended_ccalling_threads queue. We
2174 * give out a token to the task, which it can use to resume the thread
2175 * on return from the C function.
2176 * ------------------------------------------------------------------------- */
2179 suspendThread (StgRegTable *reg)
2182 int saved_errno = errno;
2186 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2188 cap = regTableToCapability(reg);
2190 task = cap->running_task;
2191 tso = cap->r.rCurrentTSO;
2194 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2196 // XXX this might not be necessary --SDM
2197 tso->what_next = ThreadRunGHC;
2199 threadPaused(cap,tso);
2201 if(tso->blocked_exceptions == NULL) {
2202 tso->why_blocked = BlockedOnCCall;
2203 tso->blocked_exceptions = END_TSO_QUEUE;
2205 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2208 // Hand back capability
2209 task->suspended_tso = tso;
2211 ACQUIRE_LOCK(&cap->lock);
2213 suspendTask(cap,task);
2214 cap->in_haskell = rtsFalse;
2215 releaseCapability_(cap);
2217 RELEASE_LOCK(&cap->lock);
2219 #if defined(THREADED_RTS)
2220 /* Preparing to leave the RTS, so ensure there's a native thread/task
2221 waiting to take over.
2223 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2226 errno = saved_errno;
2231 resumeThread (void *task_)
2235 int saved_errno = errno;
2239 // Wait for permission to re-enter the RTS with the result.
2240 waitForReturnCapability(&cap,task);
2241 // we might be on a different capability now... but if so, our
2242 // entry on the suspended_ccalling_tasks list will also have been
2245 // Remove the thread from the suspended list
2246 recoverSuspendedTask(cap,task);
2248 tso = task->suspended_tso;
2249 task->suspended_tso = NULL;
2250 tso->link = END_TSO_QUEUE;
2251 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2253 if (tso->why_blocked == BlockedOnCCall) {
2254 awakenBlockedQueue(cap,tso->blocked_exceptions);
2255 tso->blocked_exceptions = NULL;
2258 /* Reset blocking status */
2259 tso->why_blocked = NotBlocked;
2261 cap->r.rCurrentTSO = tso;
2262 cap->in_haskell = rtsTrue;
2263 errno = saved_errno;
2268 /* ---------------------------------------------------------------------------
2269 * Comparing Thread ids.
2271 * This is used from STG land in the implementation of the
2272 * instances of Eq/Ord for ThreadIds.
2273 * ------------------------------------------------------------------------ */
2276 cmp_thread(StgPtr tso1, StgPtr tso2)
2278 StgThreadID id1 = ((StgTSO *)tso1)->id;
2279 StgThreadID id2 = ((StgTSO *)tso2)->id;
2281 if (id1 < id2) return (-1);
2282 if (id1 > id2) return 1;
2286 /* ---------------------------------------------------------------------------
2287 * Fetching the ThreadID from an StgTSO.
2289 * This is used in the implementation of Show for ThreadIds.
2290 * ------------------------------------------------------------------------ */
2292 rts_getThreadId(StgPtr tso)
2294 return ((StgTSO *)tso)->id;
2299 labelThread(StgPtr tso, char *label)
2304 /* Caveat: Once set, you can only set the thread name to "" */
2305 len = strlen(label)+1;
2306 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2307 strncpy(buf,label,len);
2308 /* Update will free the old memory for us */
2309 updateThreadLabel(((StgTSO *)tso)->id,buf);
2313 /* ---------------------------------------------------------------------------
2314 Create a new thread.
2316 The new thread starts with the given stack size. Before the
2317 scheduler can run, however, this thread needs to have a closure
2318 (and possibly some arguments) pushed on its stack. See
2319 pushClosure() in Schedule.h.
2321 createGenThread() and createIOThread() (in SchedAPI.h) are
2322 convenient packaged versions of this function.
2324 currently pri (priority) is only used in a GRAN setup -- HWL
2325 ------------------------------------------------------------------------ */
2327 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2329 createThread(nat size, StgInt pri)
2332 createThread(Capability *cap, nat size)
2338 /* sched_mutex is *not* required */
2340 /* First check whether we should create a thread at all */
2341 #if defined(PARALLEL_HASKELL)
2342 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2343 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2345 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2346 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2347 return END_TSO_QUEUE;
2353 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2356 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2358 /* catch ridiculously small stack sizes */
2359 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2360 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2363 stack_size = size - TSO_STRUCT_SIZEW;
2365 tso = (StgTSO *)allocateLocal(cap, size);
2366 TICK_ALLOC_TSO(stack_size, 0);
2368 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2370 SET_GRAN_HDR(tso, ThisPE);
2373 // Always start with the compiled code evaluator
2374 tso->what_next = ThreadRunGHC;
2376 tso->why_blocked = NotBlocked;
2377 tso->blocked_exceptions = NULL;
2379 tso->saved_errno = 0;
2382 tso->stack_size = stack_size;
2383 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2385 tso->sp = (P_)&(tso->stack) + stack_size;
2387 tso->trec = NO_TREC;
2390 tso->prof.CCCS = CCS_MAIN;
2393 /* put a stop frame on the stack */
2394 tso->sp -= sizeofW(StgStopFrame);
2395 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2396 tso->link = END_TSO_QUEUE;
2400 /* uses more flexible routine in GranSim */
2401 insertThread(tso, CurrentProc);
2403 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2409 if (RtsFlags.GranFlags.GranSimStats.Full)
2410 DumpGranEvent(GR_START,tso);
2411 #elif defined(PARALLEL_HASKELL)
2412 if (RtsFlags.ParFlags.ParStats.Full)
2413 DumpGranEvent(GR_STARTQ,tso);
2414 /* HACk to avoid SCHEDULE
2418 /* Link the new thread on the global thread list.
2420 ACQUIRE_LOCK(&sched_mutex);
2421 tso->id = next_thread_id++; // while we have the mutex
2422 tso->global_link = all_threads;
2424 RELEASE_LOCK(&sched_mutex);
2427 tso->dist.priority = MandatoryPriority; //by default that is...
2431 tso->gran.pri = pri;
2433 tso->gran.magic = TSO_MAGIC; // debugging only
2435 tso->gran.sparkname = 0;
2436 tso->gran.startedat = CURRENT_TIME;
2437 tso->gran.exported = 0;
2438 tso->gran.basicblocks = 0;
2439 tso->gran.allocs = 0;
2440 tso->gran.exectime = 0;
2441 tso->gran.fetchtime = 0;
2442 tso->gran.fetchcount = 0;
2443 tso->gran.blocktime = 0;
2444 tso->gran.blockcount = 0;
2445 tso->gran.blockedat = 0;
2446 tso->gran.globalsparks = 0;
2447 tso->gran.localsparks = 0;
2448 if (RtsFlags.GranFlags.Light)
2449 tso->gran.clock = Now; /* local clock */
2451 tso->gran.clock = 0;
2453 IF_DEBUG(gran,printTSO(tso));
2454 #elif defined(PARALLEL_HASKELL)
2456 tso->par.magic = TSO_MAGIC; // debugging only
2458 tso->par.sparkname = 0;
2459 tso->par.startedat = CURRENT_TIME;
2460 tso->par.exported = 0;
2461 tso->par.basicblocks = 0;
2462 tso->par.allocs = 0;
2463 tso->par.exectime = 0;
2464 tso->par.fetchtime = 0;
2465 tso->par.fetchcount = 0;
2466 tso->par.blocktime = 0;
2467 tso->par.blockcount = 0;
2468 tso->par.blockedat = 0;
2469 tso->par.globalsparks = 0;
2470 tso->par.localsparks = 0;
2474 globalGranStats.tot_threads_created++;
2475 globalGranStats.threads_created_on_PE[CurrentProc]++;
2476 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2477 globalGranStats.tot_sq_probes++;
2478 #elif defined(PARALLEL_HASKELL)
2479 // collect parallel global statistics (currently done together with GC stats)
2480 if (RtsFlags.ParFlags.ParStats.Global &&
2481 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2482 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2483 globalParStats.tot_threads_created++;
2489 sched_belch("==__ schedule: Created TSO %d (%p);",
2490 CurrentProc, tso, tso->id));
2491 #elif defined(PARALLEL_HASKELL)
2492 IF_PAR_DEBUG(verbose,
2493 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2494 (long)tso->id, tso, advisory_thread_count));
2496 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2497 (long)tso->id, (long)tso->stack_size));
2504 all parallel thread creation calls should fall through the following routine.
2507 createThreadFromSpark(rtsSpark spark)
2509 ASSERT(spark != (rtsSpark)NULL);
2510 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2511 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2513 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2514 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2515 return END_TSO_QUEUE;
2519 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2520 if (tso==END_TSO_QUEUE)
2521 barf("createSparkThread: Cannot create TSO");
2523 tso->priority = AdvisoryPriority;
2525 pushClosure(tso,spark);
2527 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2534 Turn a spark into a thread.
2535 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2539 activateSpark (rtsSpark spark)
2543 tso = createSparkThread(spark);
2544 if (RtsFlags.ParFlags.ParStats.Full) {
2545 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2546 IF_PAR_DEBUG(verbose,
2547 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2548 (StgClosure *)spark, info_type((StgClosure *)spark)));
2550 // ToDo: fwd info on local/global spark to thread -- HWL
2551 // tso->gran.exported = spark->exported;
2552 // tso->gran.locked = !spark->global;
2553 // tso->gran.sparkname = spark->name;
2559 /* ---------------------------------------------------------------------------
2562 * scheduleThread puts a thread on the end of the runnable queue.
2563 * This will usually be done immediately after a thread is created.
2564 * The caller of scheduleThread must create the thread using e.g.
2565 * createThread and push an appropriate closure
2566 * on this thread's stack before the scheduler is invoked.
2567 * ------------------------------------------------------------------------ */
2570 scheduleThread(Capability *cap, StgTSO *tso)
2572 // The thread goes at the *end* of the run-queue, to avoid possible
2573 // starvation of any threads already on the queue.
2574 appendToRunQueue(cap,tso);
2578 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2582 // We already created/initialised the Task
2583 task = cap->running_task;
2585 // This TSO is now a bound thread; make the Task and TSO
2586 // point to each other.
2591 task->stat = NoStatus;
2593 appendToRunQueue(cap,tso);
2595 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2598 /* GranSim specific init */
2599 CurrentTSO = m->tso; // the TSO to run
2600 procStatus[MainProc] = Busy; // status of main PE
2601 CurrentProc = MainProc; // PE to run it on
2604 cap = schedule(cap,task);
2606 ASSERT(task->stat != NoStatus);
2607 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2609 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2613 /* ----------------------------------------------------------------------------
2615 * ------------------------------------------------------------------------- */
2617 #if defined(THREADED_RTS)
2619 workerStart(Task *task)
2623 // See startWorkerTask().
2624 ACQUIRE_LOCK(&task->lock);
2626 RELEASE_LOCK(&task->lock);
2628 // set the thread-local pointer to the Task:
2631 // schedule() runs without a lock.
2632 cap = schedule(cap,task);
2634 // On exit from schedule(), we have a Capability.
2635 releaseCapability(cap);
2640 /* ---------------------------------------------------------------------------
2643 * Initialise the scheduler. This resets all the queues - if the
2644 * queues contained any threads, they'll be garbage collected at the
2647 * ------------------------------------------------------------------------ */
2654 for (i=0; i<=MAX_PROC; i++) {
2655 run_queue_hds[i] = END_TSO_QUEUE;
2656 run_queue_tls[i] = END_TSO_QUEUE;
2657 blocked_queue_hds[i] = END_TSO_QUEUE;
2658 blocked_queue_tls[i] = END_TSO_QUEUE;
2659 ccalling_threadss[i] = END_TSO_QUEUE;
2660 blackhole_queue[i] = END_TSO_QUEUE;
2661 sleeping_queue = END_TSO_QUEUE;
2663 #elif !defined(THREADED_RTS)
2664 blocked_queue_hd = END_TSO_QUEUE;
2665 blocked_queue_tl = END_TSO_QUEUE;
2666 sleeping_queue = END_TSO_QUEUE;
2669 blackhole_queue = END_TSO_QUEUE;
2670 all_threads = END_TSO_QUEUE;
2675 RtsFlags.ConcFlags.ctxtSwitchTicks =
2676 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2678 #if defined(THREADED_RTS)
2679 /* Initialise the mutex and condition variables used by
2681 initMutex(&sched_mutex);
2684 ACQUIRE_LOCK(&sched_mutex);
2686 /* A capability holds the state a native thread needs in
2687 * order to execute STG code. At least one capability is
2688 * floating around (only SMP builds have more than one).
2694 #if defined(SMP) || defined(PARALLEL_HASKELL)
2700 * Eagerly start one worker to run each Capability, except for
2701 * Capability 0. The idea is that we're probably going to start a
2702 * bound thread on Capability 0 pretty soon, so we don't want a
2703 * worker task hogging it.
2708 for (i = 1; i < n_capabilities; i++) {
2709 cap = &capabilities[i];
2710 ACQUIRE_LOCK(&cap->lock);
2711 startWorkerTask(cap, workerStart);
2712 RELEASE_LOCK(&cap->lock);
2717 RELEASE_LOCK(&sched_mutex);
2721 exitScheduler( void )
2723 interrupted = rtsTrue;
2724 shutting_down_scheduler = rtsTrue;
2726 #if defined(THREADED_RTS)
2731 ACQUIRE_LOCK(&sched_mutex);
2732 task = newBoundTask();
2733 RELEASE_LOCK(&sched_mutex);
2735 for (i = 0; i < n_capabilities; i++) {
2736 shutdownCapability(&capabilities[i], task);
2738 boundTaskExiting(task);
2744 /* ---------------------------------------------------------------------------
2745 Where are the roots that we know about?
2747 - all the threads on the runnable queue
2748 - all the threads on the blocked queue
2749 - all the threads on the sleeping queue
2750 - all the thread currently executing a _ccall_GC
2751 - all the "main threads"
2753 ------------------------------------------------------------------------ */
2755 /* This has to be protected either by the scheduler monitor, or by the
2756 garbage collection monitor (probably the latter).
2761 GetRoots( evac_fn evac )
2768 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2769 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2770 evac((StgClosure **)&run_queue_hds[i]);
2771 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2772 evac((StgClosure **)&run_queue_tls[i]);
2774 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2775 evac((StgClosure **)&blocked_queue_hds[i]);
2776 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2777 evac((StgClosure **)&blocked_queue_tls[i]);
2778 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2779 evac((StgClosure **)&ccalling_threads[i]);
2786 for (i = 0; i < n_capabilities; i++) {
2787 cap = &capabilities[i];
2788 evac((StgClosure **)&cap->run_queue_hd);
2789 evac((StgClosure **)&cap->run_queue_tl);
2791 for (task = cap->suspended_ccalling_tasks; task != NULL;
2793 evac((StgClosure **)&task->suspended_tso);
2797 #if !defined(THREADED_RTS)
2798 evac((StgClosure **)&blocked_queue_hd);
2799 evac((StgClosure **)&blocked_queue_tl);
2800 evac((StgClosure **)&sleeping_queue);
2804 evac((StgClosure **)&blackhole_queue);
2806 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2807 markSparkQueue(evac);
2810 #if defined(RTS_USER_SIGNALS)
2811 // mark the signal handlers (signals should be already blocked)
2812 markSignalHandlers(evac);
2816 /* -----------------------------------------------------------------------------
2819 This is the interface to the garbage collector from Haskell land.
2820 We provide this so that external C code can allocate and garbage
2821 collect when called from Haskell via _ccall_GC.
2823 It might be useful to provide an interface whereby the programmer
2824 can specify more roots (ToDo).
2826 This needs to be protected by the GC condition variable above. KH.
2827 -------------------------------------------------------------------------- */
2829 static void (*extra_roots)(evac_fn);
2835 // ToDo: we have to grab all the capabilities here.
2836 errorBelch("performGC not supported in threaded RTS (yet)");
2837 stg_exit(EXIT_FAILURE);
2839 /* Obligated to hold this lock upon entry */
2840 GarbageCollect(GetRoots,rtsFalse);
2844 performMajorGC(void)
2847 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2848 stg_exit(EXIT_FAILURE);
2850 GarbageCollect(GetRoots,rtsTrue);
2854 AllRoots(evac_fn evac)
2856 GetRoots(evac); // the scheduler's roots
2857 extra_roots(evac); // the user's roots
2861 performGCWithRoots(void (*get_roots)(evac_fn))
2864 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2865 stg_exit(EXIT_FAILURE);
2867 extra_roots = get_roots;
2868 GarbageCollect(AllRoots,rtsFalse);
2871 /* -----------------------------------------------------------------------------
2874 If the thread has reached its maximum stack size, then raise the
2875 StackOverflow exception in the offending thread. Otherwise
2876 relocate the TSO into a larger chunk of memory and adjust its stack
2878 -------------------------------------------------------------------------- */
2881 threadStackOverflow(Capability *cap, StgTSO *tso)
2883 nat new_stack_size, stack_words;
2888 IF_DEBUG(sanity,checkTSO(tso));
2889 if (tso->stack_size >= tso->max_stack_size) {
2892 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2893 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2894 /* If we're debugging, just print out the top of the stack */
2895 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2898 /* Send this thread the StackOverflow exception */
2899 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2903 /* Try to double the current stack size. If that takes us over the
2904 * maximum stack size for this thread, then use the maximum instead.
2905 * Finally round up so the TSO ends up as a whole number of blocks.
2907 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2908 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2909 TSO_STRUCT_SIZE)/sizeof(W_);
2910 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2911 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2913 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2915 dest = (StgTSO *)allocate(new_tso_size);
2916 TICK_ALLOC_TSO(new_stack_size,0);
2918 /* copy the TSO block and the old stack into the new area */
2919 memcpy(dest,tso,TSO_STRUCT_SIZE);
2920 stack_words = tso->stack + tso->stack_size - tso->sp;
2921 new_sp = (P_)dest + new_tso_size - stack_words;
2922 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2924 /* relocate the stack pointers... */
2926 dest->stack_size = new_stack_size;
2928 /* Mark the old TSO as relocated. We have to check for relocated
2929 * TSOs in the garbage collector and any primops that deal with TSOs.
2931 * It's important to set the sp value to just beyond the end
2932 * of the stack, so we don't attempt to scavenge any part of the
2935 tso->what_next = ThreadRelocated;
2937 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2938 tso->why_blocked = NotBlocked;
2940 IF_PAR_DEBUG(verbose,
2941 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2942 tso->id, tso, tso->stack_size);
2943 /* If we're debugging, just print out the top of the stack */
2944 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2947 IF_DEBUG(sanity,checkTSO(tso));
2949 IF_DEBUG(scheduler,printTSO(dest));
2955 /* ---------------------------------------------------------------------------
2956 Wake up a queue that was blocked on some resource.
2957 ------------------------------------------------------------------------ */
2961 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2964 #elif defined(PARALLEL_HASKELL)
2966 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2968 /* write RESUME events to log file and
2969 update blocked and fetch time (depending on type of the orig closure) */
2970 if (RtsFlags.ParFlags.ParStats.Full) {
2971 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2972 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2973 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2974 if (emptyRunQueue())
2975 emitSchedule = rtsTrue;
2977 switch (get_itbl(node)->type) {
2979 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2984 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2991 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2998 StgBlockingQueueElement *
2999 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3002 PEs node_loc, tso_loc;
3004 node_loc = where_is(node); // should be lifted out of loop
3005 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3006 tso_loc = where_is((StgClosure *)tso);
3007 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3008 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3009 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3010 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3011 // insertThread(tso, node_loc);
3012 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3014 tso, node, (rtsSpark*)NULL);
3015 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3018 } else { // TSO is remote (actually should be FMBQ)
3019 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3020 RtsFlags.GranFlags.Costs.gunblocktime +
3021 RtsFlags.GranFlags.Costs.latency;
3022 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3024 tso, node, (rtsSpark*)NULL);
3025 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3028 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3030 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3031 (node_loc==tso_loc ? "Local" : "Global"),
3032 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3033 tso->block_info.closure = NULL;
3034 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3037 #elif defined(PARALLEL_HASKELL)
3038 StgBlockingQueueElement *
3039 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3041 StgBlockingQueueElement *next;
3043 switch (get_itbl(bqe)->type) {
3045 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3046 /* if it's a TSO just push it onto the run_queue */
3048 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3049 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3051 unblockCount(bqe, node);
3052 /* reset blocking status after dumping event */
3053 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3057 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3059 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3060 PendingFetches = (StgBlockedFetch *)bqe;
3064 /* can ignore this case in a non-debugging setup;
3065 see comments on RBHSave closures above */
3067 /* check that the closure is an RBHSave closure */
3068 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3069 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3070 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3074 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3075 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3079 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3085 unblockOne(Capability *cap, StgTSO *tso)
3089 ASSERT(get_itbl(tso)->type == TSO);
3090 ASSERT(tso->why_blocked != NotBlocked);
3091 tso->why_blocked = NotBlocked;
3093 tso->link = END_TSO_QUEUE;
3095 // We might have just migrated this TSO to our Capability:
3097 tso->bound->cap = cap;
3100 appendToRunQueue(cap,tso);
3102 // we're holding a newly woken thread, make sure we context switch
3103 // quickly so we can migrate it if necessary.
3105 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3112 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3114 StgBlockingQueueElement *bqe;
3119 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3120 node, CurrentProc, CurrentTime[CurrentProc],
3121 CurrentTSO->id, CurrentTSO));
3123 node_loc = where_is(node);
3125 ASSERT(q == END_BQ_QUEUE ||
3126 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3127 get_itbl(q)->type == CONSTR); // closure (type constructor)
3128 ASSERT(is_unique(node));
3130 /* FAKE FETCH: magically copy the node to the tso's proc;
3131 no Fetch necessary because in reality the node should not have been
3132 moved to the other PE in the first place
3134 if (CurrentProc!=node_loc) {
3136 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3137 node, node_loc, CurrentProc, CurrentTSO->id,
3138 // CurrentTSO, where_is(CurrentTSO),
3139 node->header.gran.procs));
3140 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3142 debugBelch("## new bitmask of node %p is %#x\n",
3143 node, node->header.gran.procs));
3144 if (RtsFlags.GranFlags.GranSimStats.Global) {
3145 globalGranStats.tot_fake_fetches++;
3150 // ToDo: check: ASSERT(CurrentProc==node_loc);
3151 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3154 bqe points to the current element in the queue
3155 next points to the next element in the queue
3157 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3158 //tso_loc = where_is(tso);
3160 bqe = unblockOne(bqe, node);
3163 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3164 the closure to make room for the anchor of the BQ */
3165 if (bqe!=END_BQ_QUEUE) {
3166 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3168 ASSERT((info_ptr==&RBH_Save_0_info) ||
3169 (info_ptr==&RBH_Save_1_info) ||
3170 (info_ptr==&RBH_Save_2_info));
3172 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3173 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3174 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3177 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3178 node, info_type(node)));
3181 /* statistics gathering */
3182 if (RtsFlags.GranFlags.GranSimStats.Global) {
3183 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3184 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3185 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3186 globalGranStats.tot_awbq++; // total no. of bqs awakened
3189 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3190 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3192 #elif defined(PARALLEL_HASKELL)
3194 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3196 StgBlockingQueueElement *bqe;
3198 IF_PAR_DEBUG(verbose,
3199 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3203 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3204 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3209 ASSERT(q == END_BQ_QUEUE ||
3210 get_itbl(q)->type == TSO ||
3211 get_itbl(q)->type == BLOCKED_FETCH ||
3212 get_itbl(q)->type == CONSTR);
3215 while (get_itbl(bqe)->type==TSO ||
3216 get_itbl(bqe)->type==BLOCKED_FETCH) {
3217 bqe = unblockOne(bqe, node);
3221 #else /* !GRAN && !PARALLEL_HASKELL */
3224 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3226 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3228 while (tso != END_TSO_QUEUE) {
3229 tso = unblockOne(cap,tso);
3234 /* ---------------------------------------------------------------------------
3236 - usually called inside a signal handler so it mustn't do anything fancy.
3237 ------------------------------------------------------------------------ */
3240 interruptStgRts(void)
3244 #if defined(THREADED_RTS)
3245 prodAllCapabilities();
3249 /* -----------------------------------------------------------------------------
3252 This is for use when we raise an exception in another thread, which
3254 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3255 -------------------------------------------------------------------------- */
3257 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3259 NB: only the type of the blocking queue is different in GranSim and GUM
3260 the operations on the queue-elements are the same
3261 long live polymorphism!
3263 Locks: sched_mutex is held upon entry and exit.
3267 unblockThread(Capability *cap, StgTSO *tso)
3269 StgBlockingQueueElement *t, **last;
3271 switch (tso->why_blocked) {
3274 return; /* not blocked */
3277 // Be careful: nothing to do here! We tell the scheduler that the thread
3278 // is runnable and we leave it to the stack-walking code to abort the
3279 // transaction while unwinding the stack. We should perhaps have a debugging
3280 // test to make sure that this really happens and that the 'zombie' transaction
3281 // does not get committed.
3285 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3287 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3288 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3290 last = (StgBlockingQueueElement **)&mvar->head;
3291 for (t = (StgBlockingQueueElement *)mvar->head;
3293 last = &t->link, last_tso = t, t = t->link) {
3294 if (t == (StgBlockingQueueElement *)tso) {
3295 *last = (StgBlockingQueueElement *)tso->link;
3296 if (mvar->tail == tso) {
3297 mvar->tail = (StgTSO *)last_tso;
3302 barf("unblockThread (MVAR): TSO not found");
3305 case BlockedOnBlackHole:
3306 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3308 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3310 last = &bq->blocking_queue;
3311 for (t = bq->blocking_queue;
3313 last = &t->link, t = t->link) {
3314 if (t == (StgBlockingQueueElement *)tso) {
3315 *last = (StgBlockingQueueElement *)tso->link;
3319 barf("unblockThread (BLACKHOLE): TSO not found");
3322 case BlockedOnException:
3324 StgTSO *target = tso->block_info.tso;
3326 ASSERT(get_itbl(target)->type == TSO);
3328 if (target->what_next == ThreadRelocated) {
3329 target = target->link;
3330 ASSERT(get_itbl(target)->type == TSO);
3333 ASSERT(target->blocked_exceptions != NULL);
3335 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3336 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3338 last = &t->link, t = t->link) {
3339 ASSERT(get_itbl(t)->type == TSO);
3340 if (t == (StgBlockingQueueElement *)tso) {
3341 *last = (StgBlockingQueueElement *)tso->link;
3345 barf("unblockThread (Exception): TSO not found");
3349 case BlockedOnWrite:
3350 #if defined(mingw32_HOST_OS)
3351 case BlockedOnDoProc:
3354 /* take TSO off blocked_queue */
3355 StgBlockingQueueElement *prev = NULL;
3356 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3357 prev = t, t = t->link) {
3358 if (t == (StgBlockingQueueElement *)tso) {
3360 blocked_queue_hd = (StgTSO *)t->link;
3361 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3362 blocked_queue_tl = END_TSO_QUEUE;
3365 prev->link = t->link;
3366 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3367 blocked_queue_tl = (StgTSO *)prev;
3370 #if defined(mingw32_HOST_OS)
3371 /* (Cooperatively) signal that the worker thread should abort
3374 abandonWorkRequest(tso->block_info.async_result->reqID);
3379 barf("unblockThread (I/O): TSO not found");
3382 case BlockedOnDelay:
3384 /* take TSO off sleeping_queue */
3385 StgBlockingQueueElement *prev = NULL;
3386 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3387 prev = t, t = t->link) {
3388 if (t == (StgBlockingQueueElement *)tso) {
3390 sleeping_queue = (StgTSO *)t->link;
3392 prev->link = t->link;
3397 barf("unblockThread (delay): TSO not found");
3401 barf("unblockThread");
3405 tso->link = END_TSO_QUEUE;
3406 tso->why_blocked = NotBlocked;
3407 tso->block_info.closure = NULL;
3408 pushOnRunQueue(cap,tso);
3412 unblockThread(Capability *cap, StgTSO *tso)
3416 /* To avoid locking unnecessarily. */
3417 if (tso->why_blocked == NotBlocked) {
3421 switch (tso->why_blocked) {
3424 // Be careful: nothing to do here! We tell the scheduler that the thread
3425 // is runnable and we leave it to the stack-walking code to abort the
3426 // transaction while unwinding the stack. We should perhaps have a debugging
3427 // test to make sure that this really happens and that the 'zombie' transaction
3428 // does not get committed.
3432 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3434 StgTSO *last_tso = END_TSO_QUEUE;
3435 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3438 for (t = mvar->head; t != END_TSO_QUEUE;
3439 last = &t->link, last_tso = t, t = t->link) {
3442 if (mvar->tail == tso) {
3443 mvar->tail = last_tso;
3448 barf("unblockThread (MVAR): TSO not found");
3451 case BlockedOnBlackHole:
3453 last = &blackhole_queue;
3454 for (t = blackhole_queue; t != END_TSO_QUEUE;
3455 last = &t->link, t = t->link) {
3461 barf("unblockThread (BLACKHOLE): TSO not found");
3464 case BlockedOnException:
3466 StgTSO *target = tso->block_info.tso;
3468 ASSERT(get_itbl(target)->type == TSO);
3470 while (target->what_next == ThreadRelocated) {
3471 target = target->link;
3472 ASSERT(get_itbl(target)->type == TSO);
3475 ASSERT(target->blocked_exceptions != NULL);
3477 last = &target->blocked_exceptions;
3478 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3479 last = &t->link, t = t->link) {
3480 ASSERT(get_itbl(t)->type == TSO);
3486 barf("unblockThread (Exception): TSO not found");
3489 #if !defined(THREADED_RTS)
3491 case BlockedOnWrite:
3492 #if defined(mingw32_HOST_OS)
3493 case BlockedOnDoProc:
3496 StgTSO *prev = NULL;
3497 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3498 prev = t, t = t->link) {
3501 blocked_queue_hd = t->link;
3502 if (blocked_queue_tl == t) {
3503 blocked_queue_tl = END_TSO_QUEUE;
3506 prev->link = t->link;
3507 if (blocked_queue_tl == t) {
3508 blocked_queue_tl = prev;
3511 #if defined(mingw32_HOST_OS)
3512 /* (Cooperatively) signal that the worker thread should abort
3515 abandonWorkRequest(tso->block_info.async_result->reqID);
3520 barf("unblockThread (I/O): TSO not found");
3523 case BlockedOnDelay:
3525 StgTSO *prev = NULL;
3526 for (t = sleeping_queue; t != END_TSO_QUEUE;
3527 prev = t, t = t->link) {
3530 sleeping_queue = t->link;
3532 prev->link = t->link;
3537 barf("unblockThread (delay): TSO not found");
3542 barf("unblockThread");
3546 tso->link = END_TSO_QUEUE;
3547 tso->why_blocked = NotBlocked;
3548 tso->block_info.closure = NULL;
3549 appendToRunQueue(cap,tso);
3553 /* -----------------------------------------------------------------------------
3556 * Check the blackhole_queue for threads that can be woken up. We do
3557 * this periodically: before every GC, and whenever the run queue is
3560 * An elegant solution might be to just wake up all the blocked
3561 * threads with awakenBlockedQueue occasionally: they'll go back to
3562 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3563 * doesn't give us a way to tell whether we've actually managed to
3564 * wake up any threads, so we would be busy-waiting.
3566 * -------------------------------------------------------------------------- */
3569 checkBlackHoles (Capability *cap)
3572 rtsBool any_woke_up = rtsFalse;
3575 // blackhole_queue is global:
3576 ASSERT_LOCK_HELD(&sched_mutex);
3578 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3580 // ASSUMES: sched_mutex
3581 prev = &blackhole_queue;
3582 t = blackhole_queue;
3583 while (t != END_TSO_QUEUE) {
3584 ASSERT(t->why_blocked == BlockedOnBlackHole);
3585 type = get_itbl(t->block_info.closure)->type;
3586 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3587 IF_DEBUG(sanity,checkTSO(t));
3588 t = unblockOne(cap, t);
3589 // urk, the threads migrate to the current capability
3590 // here, but we'd like to keep them on the original one.
3592 any_woke_up = rtsTrue;
3602 /* -----------------------------------------------------------------------------
3605 * The following function implements the magic for raising an
3606 * asynchronous exception in an existing thread.
3608 * We first remove the thread from any queue on which it might be
3609 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3611 * We strip the stack down to the innermost CATCH_FRAME, building
3612 * thunks in the heap for all the active computations, so they can
3613 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3614 * an application of the handler to the exception, and push it on
3615 * the top of the stack.
3617 * How exactly do we save all the active computations? We create an
3618 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3619 * AP_STACKs pushes everything from the corresponding update frame
3620 * upwards onto the stack. (Actually, it pushes everything up to the
3621 * next update frame plus a pointer to the next AP_STACK object.
3622 * Entering the next AP_STACK object pushes more onto the stack until we
3623 * reach the last AP_STACK object - at which point the stack should look
3624 * exactly as it did when we killed the TSO and we can continue
3625 * execution by entering the closure on top of the stack.
3627 * We can also kill a thread entirely - this happens if either (a) the
3628 * exception passed to raiseAsync is NULL, or (b) there's no
3629 * CATCH_FRAME on the stack. In either case, we strip the entire
3630 * stack and replace the thread with a zombie.
3632 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3633 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3634 * the TSO is currently blocked on or on the run queue of.
3636 * -------------------------------------------------------------------------- */
3639 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3641 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3645 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3647 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3651 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3652 rtsBool stop_at_atomically, StgPtr stop_here)
3654 StgRetInfoTable *info;
3658 // Thread already dead?
3659 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3664 sched_belch("raising exception in thread %ld.", (long)tso->id));
3666 // Remove it from any blocking queues
3667 unblockThread(cap,tso);
3671 // The stack freezing code assumes there's a closure pointer on
3672 // the top of the stack, so we have to arrange that this is the case...
3674 if (sp[0] == (W_)&stg_enter_info) {
3678 sp[0] = (W_)&stg_dummy_ret_closure;
3682 while (stop_here == NULL || frame < stop_here) {
3684 // 1. Let the top of the stack be the "current closure"
3686 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3689 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3690 // current closure applied to the chunk of stack up to (but not
3691 // including) the update frame. This closure becomes the "current
3692 // closure". Go back to step 2.
3694 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3695 // top of the stack applied to the exception.
3697 // 5. If it's a STOP_FRAME, then kill the thread.
3699 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3702 info = get_ret_itbl((StgClosure *)frame);
3704 switch (info->i.type) {
3711 // First build an AP_STACK consisting of the stack chunk above the
3712 // current update frame, with the top word on the stack as the
3715 words = frame - sp - 1;
3716 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3719 ap->fun = (StgClosure *)sp[0];
3721 for(i=0; i < (nat)words; ++i) {
3722 ap->payload[i] = (StgClosure *)*sp++;
3725 SET_HDR(ap,&stg_AP_STACK_info,
3726 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3727 TICK_ALLOC_UP_THK(words+1,0);
3730 debugBelch("sched: Updating ");
3731 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3732 debugBelch(" with ");
3733 printObj((StgClosure *)ap);
3736 // Replace the updatee with an indirection
3738 // Warning: if we're in a loop, more than one update frame on
3739 // the stack may point to the same object. Be careful not to
3740 // overwrite an IND_OLDGEN in this case, because we'll screw
3741 // up the mutable lists. To be on the safe side, don't
3742 // overwrite any kind of indirection at all. See also
3743 // threadSqueezeStack in GC.c, where we have to make a similar
3746 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3747 // revert the black hole
3748 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3751 sp += sizeofW(StgUpdateFrame) - 1;
3752 sp[0] = (W_)ap; // push onto stack
3754 continue; //no need to bump frame
3758 // We've stripped the entire stack, the thread is now dead.
3759 tso->what_next = ThreadKilled;
3760 tso->sp = frame + sizeofW(StgStopFrame);
3764 // If we find a CATCH_FRAME, and we've got an exception to raise,
3765 // then build the THUNK raise(exception), and leave it on
3766 // top of the CATCH_FRAME ready to enter.
3770 StgCatchFrame *cf = (StgCatchFrame *)frame;
3774 if (exception == NULL) break;
3776 // we've got an exception to raise, so let's pass it to the
3777 // handler in this frame.
3779 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3780 TICK_ALLOC_SE_THK(1,0);
3781 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3782 raise->payload[0] = exception;
3784 // throw away the stack from Sp up to the CATCH_FRAME.
3788 /* Ensure that async excpetions are blocked now, so we don't get
3789 * a surprise exception before we get around to executing the
3792 if (tso->blocked_exceptions == NULL) {
3793 tso->blocked_exceptions = END_TSO_QUEUE;
3796 /* Put the newly-built THUNK on top of the stack, ready to execute
3797 * when the thread restarts.
3800 sp[-1] = (W_)&stg_enter_info;
3802 tso->what_next = ThreadRunGHC;
3803 IF_DEBUG(sanity, checkTSO(tso));
3807 case ATOMICALLY_FRAME:
3808 if (stop_at_atomically) {
3809 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3810 stmCondemnTransaction(cap, tso -> trec);
3814 // R1 is not a register: the return convention for IO in
3815 // this case puts the return value on the stack, so we
3816 // need to set up the stack to return to the atomically
3817 // frame properly...
3818 tso->sp = frame - 2;
3819 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3820 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3822 tso->what_next = ThreadRunGHC;
3825 // Not stop_at_atomically... fall through and abort the
3828 case CATCH_RETRY_FRAME:
3829 // IF we find an ATOMICALLY_FRAME then we abort the
3830 // current transaction and propagate the exception. In
3831 // this case (unlike ordinary exceptions) we do not care
3832 // whether the transaction is valid or not because its
3833 // possible validity cannot have caused the exception
3834 // and will not be visible after the abort.
3836 debugBelch("Found atomically block delivering async exception\n"));
3837 StgTRecHeader *trec = tso -> trec;
3838 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3839 stmAbortTransaction(cap, trec);
3840 tso -> trec = outer;
3847 // move on to the next stack frame
3848 frame += stack_frame_sizeW((StgClosure *)frame);
3851 // if we got here, then we stopped at stop_here
3852 ASSERT(stop_here != NULL);
3855 /* -----------------------------------------------------------------------------
3858 This is used for interruption (^C) and forking, and corresponds to
3859 raising an exception but without letting the thread catch the
3861 -------------------------------------------------------------------------- */
3864 deleteThread (Capability *cap, StgTSO *tso)
3866 if (tso->why_blocked != BlockedOnCCall &&
3867 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3868 raiseAsync(cap,tso,NULL);
3872 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3874 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3875 { // for forkProcess only:
3876 // delete thread without giving it a chance to catch the KillThread exception
3878 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3882 if (tso->why_blocked != BlockedOnCCall &&
3883 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3884 unblockThread(cap,tso);
3887 tso->what_next = ThreadKilled;
3891 /* -----------------------------------------------------------------------------
3892 raiseExceptionHelper
3894 This function is called by the raise# primitve, just so that we can
3895 move some of the tricky bits of raising an exception from C-- into
3896 C. Who knows, it might be a useful re-useable thing here too.
3897 -------------------------------------------------------------------------- */
3900 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3902 Capability *cap = regTableToCapability(reg);
3903 StgThunk *raise_closure = NULL;
3905 StgRetInfoTable *info;
3907 // This closure represents the expression 'raise# E' where E
3908 // is the exception raise. It is used to overwrite all the
3909 // thunks which are currently under evaluataion.
3913 // LDV profiling: stg_raise_info has THUNK as its closure
3914 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3915 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3916 // 1 does not cause any problem unless profiling is performed.
3917 // However, when LDV profiling goes on, we need to linearly scan
3918 // small object pool, where raise_closure is stored, so we should
3919 // use MIN_UPD_SIZE.
3921 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3922 // sizeofW(StgClosure)+1);
3926 // Walk up the stack, looking for the catch frame. On the way,
3927 // we update any closures pointed to from update frames with the
3928 // raise closure that we just built.
3932 info = get_ret_itbl((StgClosure *)p);
3933 next = p + stack_frame_sizeW((StgClosure *)p);
3934 switch (info->i.type) {
3937 // Only create raise_closure if we need to.
3938 if (raise_closure == NULL) {
3940 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3941 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3942 raise_closure->payload[0] = exception;
3944 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3948 case ATOMICALLY_FRAME:
3949 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3951 return ATOMICALLY_FRAME;
3957 case CATCH_STM_FRAME:
3958 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3960 return CATCH_STM_FRAME;
3966 case CATCH_RETRY_FRAME:
3975 /* -----------------------------------------------------------------------------
3976 findRetryFrameHelper
3978 This function is called by the retry# primitive. It traverses the stack
3979 leaving tso->sp referring to the frame which should handle the retry.
3981 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3982 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3984 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3985 despite the similar implementation.
3987 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3988 not be created within memory transactions.
3989 -------------------------------------------------------------------------- */
3992 findRetryFrameHelper (StgTSO *tso)
3995 StgRetInfoTable *info;
3999 info = get_ret_itbl((StgClosure *)p);
4000 next = p + stack_frame_sizeW((StgClosure *)p);
4001 switch (info->i.type) {
4003 case ATOMICALLY_FRAME:
4004 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4006 return ATOMICALLY_FRAME;
4008 case CATCH_RETRY_FRAME:
4009 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4011 return CATCH_RETRY_FRAME;
4013 case CATCH_STM_FRAME:
4015 ASSERT(info->i.type != CATCH_FRAME);
4016 ASSERT(info->i.type != STOP_FRAME);
4023 /* -----------------------------------------------------------------------------
4024 resurrectThreads is called after garbage collection on the list of
4025 threads found to be garbage. Each of these threads will be woken
4026 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4027 on an MVar, or NonTermination if the thread was blocked on a Black
4030 Locks: assumes we hold *all* the capabilities.
4031 -------------------------------------------------------------------------- */
4034 resurrectThreads (StgTSO *threads)
4039 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4040 next = tso->global_link;
4041 tso->global_link = all_threads;
4043 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4045 // Wake up the thread on the Capability it was last on for a
4046 // bound thread, or last_free_capability otherwise.
4048 cap = tso->bound->cap;
4050 cap = last_free_capability;
4053 switch (tso->why_blocked) {
4055 case BlockedOnException:
4056 /* Called by GC - sched_mutex lock is currently held. */
4057 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4059 case BlockedOnBlackHole:
4060 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4063 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4066 /* This might happen if the thread was blocked on a black hole
4067 * belonging to a thread that we've just woken up (raiseAsync
4068 * can wake up threads, remember...).
4072 barf("resurrectThreads: thread blocked in a strange way");
4077 /* ----------------------------------------------------------------------------
4078 * Debugging: why is a thread blocked
4079 * [Also provides useful information when debugging threaded programs
4080 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4081 ------------------------------------------------------------------------- */
4085 printThreadBlockage(StgTSO *tso)
4087 switch (tso->why_blocked) {
4089 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4091 case BlockedOnWrite:
4092 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4094 #if defined(mingw32_HOST_OS)
4095 case BlockedOnDoProc:
4096 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4099 case BlockedOnDelay:
4100 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4103 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4105 case BlockedOnException:
4106 debugBelch("is blocked on delivering an exception to thread %d",
4107 tso->block_info.tso->id);
4109 case BlockedOnBlackHole:
4110 debugBelch("is blocked on a black hole");
4113 debugBelch("is not blocked");
4115 #if defined(PARALLEL_HASKELL)
4117 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4118 tso->block_info.closure, info_type(tso->block_info.closure));
4120 case BlockedOnGA_NoSend:
4121 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4122 tso->block_info.closure, info_type(tso->block_info.closure));
4125 case BlockedOnCCall:
4126 debugBelch("is blocked on an external call");
4128 case BlockedOnCCall_NoUnblockExc:
4129 debugBelch("is blocked on an external call (exceptions were already blocked)");
4132 debugBelch("is blocked on an STM operation");
4135 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4136 tso->why_blocked, tso->id, tso);
4141 printThreadStatus(StgTSO *t)
4143 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4145 void *label = lookupThreadLabel(t->id);
4146 if (label) debugBelch("[\"%s\"] ",(char *)label);
4148 if (t->what_next == ThreadRelocated) {
4149 debugBelch("has been relocated...\n");
4151 switch (t->what_next) {
4153 debugBelch("has been killed");
4155 case ThreadComplete:
4156 debugBelch("has completed");
4159 printThreadBlockage(t);
4166 printAllThreads(void)
4173 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4174 ullong_format_string(TIME_ON_PROC(CurrentProc),
4175 time_string, rtsFalse/*no commas!*/);
4177 debugBelch("all threads at [%s]:\n", time_string);
4178 # elif defined(PARALLEL_HASKELL)
4179 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4180 ullong_format_string(CURRENT_TIME,
4181 time_string, rtsFalse/*no commas!*/);
4183 debugBelch("all threads at [%s]:\n", time_string);
4185 debugBelch("all threads:\n");
4188 for (i = 0; i < n_capabilities; i++) {
4189 cap = &capabilities[i];
4190 debugBelch("threads on capability %d:\n", cap->no);
4191 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4192 printThreadStatus(t);
4196 debugBelch("other threads:\n");
4197 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4198 if (t->why_blocked != NotBlocked) {
4199 printThreadStatus(t);
4201 if (t->what_next == ThreadRelocated) {
4204 next = t->global_link;
4211 printThreadQueue(StgTSO *t)
4214 for (; t != END_TSO_QUEUE; t = t->link) {
4215 printThreadStatus(t);
4218 debugBelch("%d threads on queue\n", i);
4222 Print a whole blocking queue attached to node (debugging only).
4224 # if defined(PARALLEL_HASKELL)
4226 print_bq (StgClosure *node)
4228 StgBlockingQueueElement *bqe;
4232 debugBelch("## BQ of closure %p (%s): ",
4233 node, info_type(node));
4235 /* should cover all closures that may have a blocking queue */
4236 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4237 get_itbl(node)->type == FETCH_ME_BQ ||
4238 get_itbl(node)->type == RBH ||
4239 get_itbl(node)->type == MVAR);
4241 ASSERT(node!=(StgClosure*)NULL); // sanity check
4243 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4247 Print a whole blocking queue starting with the element bqe.
4250 print_bqe (StgBlockingQueueElement *bqe)
4255 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4257 for (end = (bqe==END_BQ_QUEUE);
4258 !end; // iterate until bqe points to a CONSTR
4259 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4260 bqe = end ? END_BQ_QUEUE : bqe->link) {
4261 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4262 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4263 /* types of closures that may appear in a blocking queue */
4264 ASSERT(get_itbl(bqe)->type == TSO ||
4265 get_itbl(bqe)->type == BLOCKED_FETCH ||
4266 get_itbl(bqe)->type == CONSTR);
4267 /* only BQs of an RBH end with an RBH_Save closure */
4268 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4270 switch (get_itbl(bqe)->type) {
4272 debugBelch(" TSO %u (%x),",
4273 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4276 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4277 ((StgBlockedFetch *)bqe)->node,
4278 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4279 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4280 ((StgBlockedFetch *)bqe)->ga.weight);
4283 debugBelch(" %s (IP %p),",
4284 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4285 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4286 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4287 "RBH_Save_?"), get_itbl(bqe));
4290 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4291 info_type((StgClosure *)bqe)); // , node, info_type(node));
4297 # elif defined(GRAN)
4299 print_bq (StgClosure *node)
4301 StgBlockingQueueElement *bqe;
4302 PEs node_loc, tso_loc;
4305 /* should cover all closures that may have a blocking queue */
4306 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4307 get_itbl(node)->type == FETCH_ME_BQ ||
4308 get_itbl(node)->type == RBH);
4310 ASSERT(node!=(StgClosure*)NULL); // sanity check
4311 node_loc = where_is(node);
4313 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4314 node, info_type(node), node_loc);
4317 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4319 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4320 !end; // iterate until bqe points to a CONSTR
4321 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4322 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4323 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4324 /* types of closures that may appear in a blocking queue */
4325 ASSERT(get_itbl(bqe)->type == TSO ||
4326 get_itbl(bqe)->type == CONSTR);
4327 /* only BQs of an RBH end with an RBH_Save closure */
4328 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4330 tso_loc = where_is((StgClosure *)bqe);
4331 switch (get_itbl(bqe)->type) {
4333 debugBelch(" TSO %d (%p) on [PE %d],",
4334 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4337 debugBelch(" %s (IP %p),",
4338 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4339 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4340 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4341 "RBH_Save_?"), get_itbl(bqe));
4344 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4345 info_type((StgClosure *)bqe), node, info_type(node));
4353 #if defined(PARALLEL_HASKELL)
4360 for (i=0, tso=run_queue_hd;
4361 tso != END_TSO_QUEUE;
4362 i++, tso=tso->link) {
4371 sched_belch(char *s, ...)
4376 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4377 #elif defined(PARALLEL_HASKELL)
4380 debugBelch("sched: ");