1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
44 #include "Capability.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
51 #include "RaiseAsync.h"
53 #include "ThrIOManager.h"
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
70 // Turn off inlining when debugging - it obfuscates things
73 # define STATIC_INLINE static
76 /* -----------------------------------------------------------------------------
78 * -------------------------------------------------------------------------- */
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
112 StgTSO *blackhole_queue = NULL;
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
119 rtsBool blackholes_need_checking = rtsFalse;
121 /* flag set by signal handler to precipitate a context switch
122 * LOCK: none (just an advisory flag)
124 int context_switch = 0;
126 /* flag that tracks whether we have done any execution in this time slice.
127 * LOCK: currently none, perhaps we should lock (but needs to be
128 * updated in the fast path of the scheduler).
130 nat recent_activity = ACTIVITY_YES;
132 /* if this flag is set as well, give up execution
133 * LOCK: none (changes once, from false->true)
135 rtsBool sched_state = SCHED_RUNNING;
141 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
142 * exists - earlier gccs apparently didn't.
148 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149 * in an MT setting, needed to signal that a worker thread shouldn't hang around
150 * in the scheduler when it is out of work.
152 rtsBool shutting_down_scheduler = rtsFalse;
155 * This mutex protects most of the global scheduler data in
156 * the THREADED_RTS runtime.
158 #if defined(THREADED_RTS)
162 #if defined(PARALLEL_HASKELL)
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
172 /* -----------------------------------------------------------------------------
173 * static function prototypes
174 * -------------------------------------------------------------------------- */
176 static Capability *schedule (Capability *initialCapability, Task *task);
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
203 static void schedulePostRunThread(StgTSO *t);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
208 nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214 rtsBool force_major);
216 static rtsBool checkBlackHoles(Capability *cap);
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);
234 static char *whatNext_strs[] = {
244 /* -----------------------------------------------------------------------------
245 * Putting a thread on the run queue: different scheduling policies
246 * -------------------------------------------------------------------------- */
249 addToRunQueue( Capability *cap, StgTSO *t )
251 #if defined(PARALLEL_HASKELL)
252 if (RtsFlags.ParFlags.doFairScheduling) {
253 // this does round-robin scheduling; good for concurrency
254 appendToRunQueue(cap,t);
256 // this does unfair scheduling; good for parallelism
257 pushOnRunQueue(cap,t);
260 // this does round-robin scheduling; good for concurrency
261 appendToRunQueue(cap,t);
265 /* ---------------------------------------------------------------------------
266 Main scheduling loop.
268 We use round-robin scheduling, each thread returning to the
269 scheduler loop when one of these conditions is detected:
272 * timer expires (thread yields)
278 In a GranSim setup this loop iterates over the global event queue.
279 This revolves around the global event queue, which determines what
280 to do next. Therefore, it's more complicated than either the
281 concurrent or the parallel (GUM) setup.
284 GUM iterates over incoming messages.
285 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286 and sends out a fish whenever it has nothing to do; in-between
287 doing the actual reductions (shared code below) it processes the
288 incoming messages and deals with delayed operations
289 (see PendingFetches).
290 This is not the ugliest code you could imagine, but it's bloody close.
292 ------------------------------------------------------------------------ */
295 schedule (Capability *initialCapability, Task *task)
299 StgThreadReturnCode ret;
302 #elif defined(PARALLEL_HASKELL)
305 rtsBool receivedFinish = rtsFalse;
307 nat tp_size, sp_size; // stats only
312 #if defined(THREADED_RTS)
313 rtsBool first = rtsTrue;
316 cap = initialCapability;
318 // Pre-condition: this task owns initialCapability.
319 // The sched_mutex is *NOT* held
320 // NB. on return, we still hold a capability.
322 debugTrace (DEBUG_sched,
323 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324 task, initialCapability);
328 // -----------------------------------------------------------
329 // Scheduler loop starts here:
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION (!receivedFinish)
334 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
336 #define TERMINATION_CONDITION rtsTrue
339 while (TERMINATION_CONDITION) {
342 /* Choose the processor with the next event */
343 CurrentProc = event->proc;
344 CurrentTSO = event->tso;
347 #if defined(THREADED_RTS)
349 // don't yield the first time, we want a chance to run this
350 // thread for a bit, even if there are others banging at the
353 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
355 // Yield the capability to higher-priority tasks if necessary.
356 yieldCapability(&cap, task);
360 #if defined(THREADED_RTS)
361 schedulePushWork(cap,task);
364 // Check whether we have re-entered the RTS from Haskell without
365 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
367 if (cap->in_haskell) {
368 errorBelch("schedule: re-entered unsafely.\n"
369 " Perhaps a 'foreign import unsafe' should be 'safe'?");
370 stg_exit(EXIT_FAILURE);
373 // The interruption / shutdown sequence.
375 // In order to cleanly shut down the runtime, we want to:
376 // * make sure that all main threads return to their callers
377 // with the state 'Interrupted'.
378 // * clean up all OS threads assocated with the runtime
379 // * free all memory etc.
381 // So the sequence for ^C goes like this:
383 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
384 // arranges for some Capability to wake up
386 // * all threads in the system are halted, and the zombies are
387 // placed on the run queue for cleaning up. We acquire all
388 // the capabilities in order to delete the threads, this is
389 // done by scheduleDoGC() for convenience (because GC already
390 // needs to acquire all the capabilities). We can't kill
391 // threads involved in foreign calls.
393 // * somebody calls shutdownHaskell(), which calls exitScheduler()
395 // * sched_state := SCHED_SHUTTING_DOWN
397 // * all workers exit when the run queue on their capability
398 // drains. All main threads will also exit when their TSO
399 // reaches the head of the run queue and they can return.
401 // * eventually all Capabilities will shut down, and the RTS can
404 // * We might be left with threads blocked in foreign calls,
405 // we should really attempt to kill these somehow (TODO);
407 switch (sched_state) {
410 case SCHED_INTERRUPTING:
411 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413 discardSparksCap(cap);
415 /* scheduleDoGC() deletes all the threads */
416 cap = scheduleDoGC(cap,task,rtsFalse);
418 case SCHED_SHUTTING_DOWN:
419 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420 // If we are a worker, just exit. If we're a bound thread
421 // then we will exit below when we've removed our TSO from
423 if (task->tso == NULL && emptyRunQueue(cap)) {
428 barf("sched_state: %d", sched_state);
431 #if defined(THREADED_RTS)
432 // If the run queue is empty, take a spark and turn it into a thread.
434 if (emptyRunQueue(cap)) {
436 spark = findSpark(cap);
438 debugTrace(DEBUG_sched,
439 "turning spark of closure %p into a thread",
440 (StgClosure *)spark);
441 createSparkThread(cap,spark);
445 #endif // THREADED_RTS
447 scheduleStartSignalHandlers(cap);
449 // Only check the black holes here if we've nothing else to do.
450 // During normal execution, the black hole list only gets checked
451 // at GC time, to avoid repeatedly traversing this possibly long
452 // list each time around the scheduler.
453 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
455 scheduleCheckWakeupThreads(cap);
457 scheduleCheckBlockedThreads(cap);
459 scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461 cap = task->cap; // reload cap, it might have changed
464 // Normally, the only way we can get here with no threads to
465 // run is if a keyboard interrupt received during
466 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467 // Additionally, it is not fatal for the
468 // threaded RTS to reach here with no threads to run.
470 // win32: might be here due to awaitEvent() being abandoned
471 // as a result of a console event having been delivered.
472 if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474 ASSERT(sched_state >= SCHED_INTERRUPTING);
476 continue; // nothing to do
479 #if defined(PARALLEL_HASKELL)
480 scheduleSendPendingMessages();
481 if (emptyRunQueue(cap) && scheduleActivateSpark())
485 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
488 /* If we still have no work we need to send a FISH to get a spark
490 if (emptyRunQueue(cap)) {
491 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492 ASSERT(rtsFalse); // should not happen at the moment
494 // from here: non-empty run queue.
495 // TODO: merge above case with this, only one call processMessages() !
496 if (PacketsWaiting()) { /* process incoming messages, if
497 any pending... only in else
498 because getRemoteWork waits for
500 receivedFinish = processMessages();
505 scheduleProcessEvent(event);
509 // Get a thread to run
511 t = popRunQueue(cap);
513 #if defined(GRAN) || defined(PAR)
514 scheduleGranParReport(); // some kind of debuging output
516 // Sanity check the thread we're about to run. This can be
517 // expensive if there is lots of thread switching going on...
518 IF_DEBUG(sanity,checkTSO(t));
521 #if defined(THREADED_RTS)
522 // Check whether we can run this thread in the current task.
523 // If not, we have to pass our capability to the right task.
525 Task *bound = t->bound;
529 debugTrace(DEBUG_sched,
530 "### Running thread %lu in bound thread", (unsigned long)t->id);
531 // yes, the Haskell thread is bound to the current native thread
533 debugTrace(DEBUG_sched,
534 "### thread %lu bound to another OS thread", (unsigned long)t->id);
535 // no, bound to a different Haskell thread: pass to that thread
536 pushOnRunQueue(cap,t);
540 // The thread we want to run is unbound.
542 debugTrace(DEBUG_sched,
543 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544 // no, the current native thread is bound to a different
545 // Haskell thread, so pass it to any worker thread
546 pushOnRunQueue(cap,t);
553 /* context switches are initiated by the timer signal, unless
554 * the user specified "context switch as often as possible", with
557 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
558 && !emptyThreadQueues(cap)) {
564 // CurrentTSO is the thread to run. t might be different if we
565 // loop back to run_thread, so make sure to set CurrentTSO after
567 cap->r.rCurrentTSO = t;
569 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
570 (long)t->id, whatNext_strs[t->what_next]);
572 startHeapProfTimer();
574 // Check for exceptions blocked on this thread
575 maybePerformBlockedException (cap, t);
577 // ----------------------------------------------------------------------
578 // Run the current thread
580 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
581 ASSERT(t->cap == cap);
583 prev_what_next = t->what_next;
585 errno = t->saved_errno;
587 SetLastError(t->saved_winerror);
590 cap->in_haskell = rtsTrue;
594 #if defined(THREADED_RTS)
595 if (recent_activity == ACTIVITY_DONE_GC) {
596 // ACTIVITY_DONE_GC means we turned off the timer signal to
597 // conserve power (see #1623). Re-enable it here.
599 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
600 if (prev == ACTIVITY_DONE_GC) {
604 recent_activity = ACTIVITY_YES;
608 switch (prev_what_next) {
612 /* Thread already finished, return to scheduler. */
613 ret = ThreadFinished;
619 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
620 cap = regTableToCapability(r);
625 case ThreadInterpret:
626 cap = interpretBCO(cap);
631 barf("schedule: invalid what_next field");
634 cap->in_haskell = rtsFalse;
636 // The TSO might have moved, eg. if it re-entered the RTS and a GC
637 // happened. So find the new location:
638 t = cap->r.rCurrentTSO;
640 // We have run some Haskell code: there might be blackhole-blocked
641 // threads to wake up now.
642 // Lock-free test here should be ok, we're just setting a flag.
643 if ( blackhole_queue != END_TSO_QUEUE ) {
644 blackholes_need_checking = rtsTrue;
647 // And save the current errno in this thread.
648 // XXX: possibly bogus for SMP because this thread might already
649 // be running again, see code below.
650 t->saved_errno = errno;
652 // Similarly for Windows error code
653 t->saved_winerror = GetLastError();
656 #if defined(THREADED_RTS)
657 // If ret is ThreadBlocked, and this Task is bound to the TSO that
658 // blocked, we are in limbo - the TSO is now owned by whatever it
659 // is blocked on, and may in fact already have been woken up,
660 // perhaps even on a different Capability. It may be the case
661 // that task->cap != cap. We better yield this Capability
662 // immediately and return to normaility.
663 if (ret == ThreadBlocked) {
664 debugTrace(DEBUG_sched,
665 "--<< thread %lu (%s) stopped: blocked",
666 (unsigned long)t->id, whatNext_strs[t->what_next]);
671 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
672 ASSERT(t->cap == cap);
674 // ----------------------------------------------------------------------
676 // Costs for the scheduler are assigned to CCS_SYSTEM
678 #if defined(PROFILING)
682 schedulePostRunThread(t);
684 t = threadStackUnderflow(task,t);
686 ready_to_gc = rtsFalse;
690 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
694 scheduleHandleStackOverflow(cap,task,t);
698 if (scheduleHandleYield(cap, t, prev_what_next)) {
699 // shortcut for switching between compiler/interpreter:
705 scheduleHandleThreadBlocked(t);
709 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
714 barf("schedule: invalid thread return code %d", (int)ret);
717 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718 cap = scheduleDoGC(cap,task,rtsFalse);
720 } /* end of while() */
723 /* ----------------------------------------------------------------------------
724 * Setting up the scheduler loop
725 * ------------------------------------------------------------------------- */
728 schedulePreLoop(void)
731 /* set up first event to get things going */
732 /* ToDo: assign costs for system setup and init MainTSO ! */
733 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
735 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
737 debugTrace (DEBUG_gran,
738 "GRAN: Init CurrentTSO (in schedule) = %p",
740 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
742 if (RtsFlags.GranFlags.Light) {
743 /* Save current time; GranSim Light only */
744 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
749 /* -----------------------------------------------------------------------------
752 * Push work to other Capabilities if we have some.
753 * -------------------------------------------------------------------------- */
755 #if defined(THREADED_RTS)
757 schedulePushWork(Capability *cap USED_IF_THREADS,
758 Task *task USED_IF_THREADS)
760 Capability *free_caps[n_capabilities], *cap0;
763 // migration can be turned off with +RTS -qg
764 if (!RtsFlags.ParFlags.migrate) return;
766 // Check whether we have more threads on our run queue, or sparks
767 // in our pool, that we could hand to another Capability.
768 if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
769 && sparkPoolSizeCap(cap) < 2) {
773 // First grab as many free Capabilities as we can.
774 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
775 cap0 = &capabilities[i];
776 if (cap != cap0 && tryGrabCapability(cap0,task)) {
777 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
778 // it already has some work, we just grabbed it at
779 // the wrong moment. Or maybe it's deadlocked!
780 releaseCapability(cap0);
782 free_caps[n_free_caps++] = cap0;
787 // we now have n_free_caps free capabilities stashed in
788 // free_caps[]. Share our run queue equally with them. This is
789 // probably the simplest thing we could do; improvements we might
790 // want to do include:
792 // - giving high priority to moving relatively new threads, on
793 // the gournds that they haven't had time to build up a
794 // working set in the cache on this CPU/Capability.
796 // - giving low priority to moving long-lived threads
798 if (n_free_caps > 0) {
799 StgTSO *prev, *t, *next;
800 rtsBool pushed_to_all;
802 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
805 pushed_to_all = rtsFalse;
807 if (cap->run_queue_hd != END_TSO_QUEUE) {
808 prev = cap->run_queue_hd;
810 prev->_link = END_TSO_QUEUE;
811 for (; t != END_TSO_QUEUE; t = next) {
813 t->_link = END_TSO_QUEUE;
814 if (t->what_next == ThreadRelocated
815 || t->bound == task // don't move my bound thread
816 || tsoLocked(t)) { // don't move a locked thread
817 setTSOLink(cap, prev, t);
819 } else if (i == n_free_caps) {
820 pushed_to_all = rtsTrue;
823 setTSOLink(cap, prev, t);
826 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
827 appendToRunQueue(free_caps[i],t);
828 if (t->bound) { t->bound->cap = free_caps[i]; }
829 t->cap = free_caps[i];
833 cap->run_queue_tl = prev;
836 // If there are some free capabilities that we didn't push any
837 // threads to, then try to push a spark to each one.
838 if (!pushed_to_all) {
840 // i is the next free capability to push to
841 for (; i < n_free_caps; i++) {
842 if (emptySparkPoolCap(free_caps[i])) {
843 spark = findSpark(cap);
845 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
846 newSpark(&(free_caps[i]->r), spark);
852 // release the capabilities
853 for (i = 0; i < n_free_caps; i++) {
854 task->cap = free_caps[i];
855 releaseCapability(free_caps[i]);
858 task->cap = cap; // reset to point to our Capability.
862 /* ----------------------------------------------------------------------------
863 * Start any pending signal handlers
864 * ------------------------------------------------------------------------- */
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
868 scheduleStartSignalHandlers(Capability *cap)
870 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871 // safe outside the lock
872 startSignalHandlers(cap);
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
882 /* ----------------------------------------------------------------------------
883 * Check for blocked threads that can be woken up.
884 * ------------------------------------------------------------------------- */
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
889 #if !defined(THREADED_RTS)
891 // Check whether any waiting threads need to be woken up. If the
892 // run queue is empty, and there are no other tasks running, we
893 // can wait indefinitely for something to happen.
895 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
897 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
903 /* ----------------------------------------------------------------------------
904 * Check for threads woken up by other Capabilities
905 * ------------------------------------------------------------------------- */
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
910 #if defined(THREADED_RTS)
911 // Any threads that were woken up by other Capabilities get
912 // appended to our run queue.
913 if (!emptyWakeupQueue(cap)) {
914 ACQUIRE_LOCK(&cap->lock);
915 if (emptyRunQueue(cap)) {
916 cap->run_queue_hd = cap->wakeup_queue_hd;
917 cap->run_queue_tl = cap->wakeup_queue_tl;
919 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
920 cap->run_queue_tl = cap->wakeup_queue_tl;
922 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923 RELEASE_LOCK(&cap->lock);
928 /* ----------------------------------------------------------------------------
929 * Check for threads blocked on BLACKHOLEs that can be woken up
930 * ------------------------------------------------------------------------- */
932 scheduleCheckBlackHoles (Capability *cap)
934 if ( blackholes_need_checking ) // check without the lock first
936 ACQUIRE_LOCK(&sched_mutex);
937 if ( blackholes_need_checking ) {
938 checkBlackHoles(cap);
939 blackholes_need_checking = rtsFalse;
941 RELEASE_LOCK(&sched_mutex);
945 /* ----------------------------------------------------------------------------
946 * Detect deadlock conditions and attempt to resolve them.
947 * ------------------------------------------------------------------------- */
950 scheduleDetectDeadlock (Capability *cap, Task *task)
953 #if defined(PARALLEL_HASKELL)
954 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
959 * Detect deadlock: when we have no threads to run, there are no
960 * threads blocked, waiting for I/O, or sleeping, and all the
961 * other tasks are waiting for work, we must have a deadlock of
964 if ( emptyThreadQueues(cap) )
966 #if defined(THREADED_RTS)
968 * In the threaded RTS, we only check for deadlock if there
969 * has been no activity in a complete timeslice. This means
970 * we won't eagerly start a full GC just because we don't have
971 * any threads to run currently.
973 if (recent_activity != ACTIVITY_INACTIVE) return;
976 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
978 // Garbage collection can release some new threads due to
979 // either (a) finalizers or (b) threads resurrected because
980 // they are unreachable and will therefore be sent an
981 // exception. Any threads thus released will be immediately
983 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
985 recent_activity = ACTIVITY_DONE_GC;
986 // disable timer signals (see #1623)
989 if ( !emptyRunQueue(cap) ) return;
991 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
992 /* If we have user-installed signal handlers, then wait
993 * for signals to arrive rather then bombing out with a
996 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
997 debugTrace(DEBUG_sched,
998 "still deadlocked, waiting for signals...");
1002 if (signals_pending()) {
1003 startSignalHandlers(cap);
1006 // either we have threads to run, or we were interrupted:
1007 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1013 #if !defined(THREADED_RTS)
1014 /* Probably a real deadlock. Send the current main thread the
1015 * Deadlock exception.
1018 switch (task->tso->why_blocked) {
1020 case BlockedOnBlackHole:
1021 case BlockedOnException:
1023 throwToSingleThreaded(cap, task->tso,
1024 (StgClosure *)nonTermination_closure);
1027 barf("deadlock: main thread blocked in a strange way");
1035 /* ----------------------------------------------------------------------------
1036 * Process an event (GRAN only)
1037 * ------------------------------------------------------------------------- */
1041 scheduleProcessEvent(rtsEvent *event)
1045 if (RtsFlags.GranFlags.Light)
1046 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1048 /* adjust time based on time-stamp */
1049 if (event->time > CurrentTime[CurrentProc] &&
1050 event->evttype != ContinueThread)
1051 CurrentTime[CurrentProc] = event->time;
1053 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1054 if (!RtsFlags.GranFlags.Light)
1057 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1059 /* main event dispatcher in GranSim */
1060 switch (event->evttype) {
1061 /* Should just be continuing execution */
1062 case ContinueThread:
1063 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1064 /* ToDo: check assertion
1065 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1066 run_queue_hd != END_TSO_QUEUE);
1068 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1069 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1070 procStatus[CurrentProc]==Fetching) {
1071 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1072 CurrentTSO->id, CurrentTSO, CurrentProc);
1075 /* Ignore ContinueThreads for completed threads */
1076 if (CurrentTSO->what_next == ThreadComplete) {
1077 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1078 CurrentTSO->id, CurrentTSO, CurrentProc);
1081 /* Ignore ContinueThreads for threads that are being migrated */
1082 if (PROCS(CurrentTSO)==Nowhere) {
1083 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1084 CurrentTSO->id, CurrentTSO, CurrentProc);
1087 /* The thread should be at the beginning of the run queue */
1088 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1089 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1090 CurrentTSO->id, CurrentTSO, CurrentProc);
1091 break; // run the thread anyway
1094 new_event(proc, proc, CurrentTime[proc],
1096 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1098 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1099 break; // now actually run the thread; DaH Qu'vam yImuHbej
1102 do_the_fetchnode(event);
1103 goto next_thread; /* handle next event in event queue */
1106 do_the_globalblock(event);
1107 goto next_thread; /* handle next event in event queue */
1110 do_the_fetchreply(event);
1111 goto next_thread; /* handle next event in event queue */
1113 case UnblockThread: /* Move from the blocked queue to the tail of */
1114 do_the_unblock(event);
1115 goto next_thread; /* handle next event in event queue */
1117 case ResumeThread: /* Move from the blocked queue to the tail of */
1118 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1119 event->tso->gran.blocktime +=
1120 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1121 do_the_startthread(event);
1122 goto next_thread; /* handle next event in event queue */
1125 do_the_startthread(event);
1126 goto next_thread; /* handle next event in event queue */
1129 do_the_movethread(event);
1130 goto next_thread; /* handle next event in event queue */
1133 do_the_movespark(event);
1134 goto next_thread; /* handle next event in event queue */
1137 do_the_findwork(event);
1138 goto next_thread; /* handle next event in event queue */
1141 barf("Illegal event type %u\n", event->evttype);
1144 /* This point was scheduler_loop in the old RTS */
1146 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1148 TimeOfLastEvent = CurrentTime[CurrentProc];
1149 TimeOfNextEvent = get_time_of_next_event();
1150 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1151 // CurrentTSO = ThreadQueueHd;
1153 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1156 if (RtsFlags.GranFlags.Light)
1157 GranSimLight_leave_system(event, &ActiveTSO);
1159 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1162 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1164 /* in a GranSim setup the TSO stays on the run queue */
1166 /* Take a thread from the run queue. */
1167 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1170 debugBelch("GRAN: About to run current thread, which is\n");
1173 context_switch = 0; // turned on via GranYield, checking events and time slice
1176 DumpGranEvent(GR_SCHEDULE, t));
1178 procStatus[CurrentProc] = Busy;
1182 /* ----------------------------------------------------------------------------
1183 * Send pending messages (PARALLEL_HASKELL only)
1184 * ------------------------------------------------------------------------- */
1186 #if defined(PARALLEL_HASKELL)
1188 scheduleSendPendingMessages(void)
1194 # if defined(PAR) // global Mem.Mgmt., omit for now
1195 if (PendingFetches != END_BF_QUEUE) {
1200 if (RtsFlags.ParFlags.BufferTime) {
1201 // if we use message buffering, we must send away all message
1202 // packets which have become too old...
1208 /* ----------------------------------------------------------------------------
1209 * Activate spark threads (PARALLEL_HASKELL only)
1210 * ------------------------------------------------------------------------- */
1212 #if defined(PARALLEL_HASKELL)
1214 scheduleActivateSpark(void)
1217 ASSERT(emptyRunQueue());
1218 /* We get here if the run queue is empty and want some work.
1219 We try to turn a spark into a thread, and add it to the run queue,
1220 from where it will be picked up in the next iteration of the scheduler
1224 /* :-[ no local threads => look out for local sparks */
1225 /* the spark pool for the current PE */
1226 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1227 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1228 pool->hd < pool->tl) {
1230 * ToDo: add GC code check that we really have enough heap afterwards!!
1232 * If we're here (no runnable threads) and we have pending
1233 * sparks, we must have a space problem. Get enough space
1234 * to turn one of those pending sparks into a
1238 spark = findSpark(rtsFalse); /* get a spark */
1239 if (spark != (rtsSpark) NULL) {
1240 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1241 IF_PAR_DEBUG(fish, // schedule,
1242 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1243 tso->id, tso, advisory_thread_count));
1245 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1246 IF_PAR_DEBUG(fish, // schedule,
1247 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1249 return rtsFalse; /* failed to generate a thread */
1250 } /* otherwise fall through & pick-up new tso */
1252 IF_PAR_DEBUG(fish, // schedule,
1253 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1254 spark_queue_len(pool)));
1255 return rtsFalse; /* failed to generate a thread */
1257 return rtsTrue; /* success in generating a thread */
1258 } else { /* no more threads permitted or pool empty */
1259 return rtsFalse; /* failed to generateThread */
1262 tso = NULL; // avoid compiler warning only
1263 return rtsFalse; /* dummy in non-PAR setup */
1266 #endif // PARALLEL_HASKELL
1268 /* ----------------------------------------------------------------------------
1269 * Get work from a remote node (PARALLEL_HASKELL only)
1270 * ------------------------------------------------------------------------- */
1272 #if defined(PARALLEL_HASKELL)
1274 scheduleGetRemoteWork(rtsBool *receivedFinish)
1276 ASSERT(emptyRunQueue());
1278 if (RtsFlags.ParFlags.BufferTime) {
1279 IF_PAR_DEBUG(verbose,
1280 debugBelch("...send all pending data,"));
1283 for (i=1; i<=nPEs; i++)
1284 sendImmediately(i); // send all messages away immediately
1288 //++EDEN++ idle() , i.e. send all buffers, wait for work
1289 // suppress fishing in EDEN... just look for incoming messages
1290 // (blocking receive)
1291 IF_PAR_DEBUG(verbose,
1292 debugBelch("...wait for incoming messages...\n"));
1293 *receivedFinish = processMessages(); // blocking receive...
1295 // and reenter scheduling loop after having received something
1296 // (return rtsFalse below)
1298 # else /* activate SPARKS machinery */
1299 /* We get here, if we have no work, tried to activate a local spark, but still
1300 have no work. We try to get a remote spark, by sending a FISH message.
1301 Thread migration should be added here, and triggered when a sequence of
1302 fishes returns without work. */
1303 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1305 /* =8-[ no local sparks => look for work on other PEs */
1307 * We really have absolutely no work. Send out a fish
1308 * (there may be some out there already), and wait for
1309 * something to arrive. We clearly can't run any threads
1310 * until a SCHEDULE or RESUME arrives, and so that's what
1311 * we're hoping to see. (Of course, we still have to
1312 * respond to other types of messages.)
1314 rtsTime now = msTime() /*CURRENT_TIME*/;
1315 IF_PAR_DEBUG(verbose,
1316 debugBelch("-- now=%ld\n", now));
1317 IF_PAR_DEBUG(fish, // verbose,
1318 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1319 (last_fish_arrived_at!=0 &&
1320 last_fish_arrived_at+delay > now)) {
1321 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1322 now, last_fish_arrived_at+delay,
1323 last_fish_arrived_at,
1327 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1328 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1329 if (last_fish_arrived_at==0 ||
1330 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1331 /* outstandingFishes is set in sendFish, processFish;
1332 avoid flooding system with fishes via delay */
1333 next_fish_to_send_at = 0;
1335 /* ToDo: this should be done in the main scheduling loop to avoid the
1336 busy wait here; not so bad if fish delay is very small */
1337 int iq = 0; // DEBUGGING -- HWL
1338 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1339 /* send a fish when ready, but process messages that arrive in the meantime */
1341 if (PacketsWaiting()) {
1343 *receivedFinish = processMessages();
1346 } while (!*receivedFinish || now<next_fish_to_send_at);
1347 // JB: This means the fish could become obsolete, if we receive
1348 // work. Better check for work again?
1349 // last line: while (!receivedFinish || !haveWork || now<...)
1350 // next line: if (receivedFinish || haveWork )
1352 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1353 return rtsFalse; // NB: this will leave scheduler loop
1354 // immediately after return!
1356 IF_PAR_DEBUG(fish, // verbose,
1357 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1361 // JB: IMHO, this should all be hidden inside sendFish(...)
1363 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1366 // Global statistics: count no. of fishes
1367 if (RtsFlags.ParFlags.ParStats.Global &&
1368 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1369 globalParStats.tot_fish_mess++;
1373 /* delayed fishes must have been sent by now! */
1374 next_fish_to_send_at = 0;
1377 *receivedFinish = processMessages();
1378 # endif /* SPARKS */
1381 /* NB: this function always returns rtsFalse, meaning the scheduler
1382 loop continues with the next iteration;
1384 return code means success in finding work; we enter this function
1385 if there is no local work, thus have to send a fish which takes
1386 time until it arrives with work; in the meantime we should process
1387 messages in the main loop;
1390 #endif // PARALLEL_HASKELL
1392 /* ----------------------------------------------------------------------------
1393 * PAR/GRAN: Report stats & debugging info(?)
1394 * ------------------------------------------------------------------------- */
1396 #if defined(PAR) || defined(GRAN)
1398 scheduleGranParReport(void)
1400 ASSERT(run_queue_hd != END_TSO_QUEUE);
1402 /* Take a thread from the run queue, if we have work */
1403 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1405 /* If this TSO has got its outport closed in the meantime,
1406 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1407 * It has to be marked as TH_DEAD for this purpose.
1408 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1410 JB: TODO: investigate wether state change field could be nuked
1411 entirely and replaced by the normal tso state (whatnext
1412 field). All we want to do is to kill tsos from outside.
1415 /* ToDo: write something to the log-file
1416 if (RTSflags.ParFlags.granSimStats && !sameThread)
1417 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1421 /* the spark pool for the current PE */
1422 pool = &(cap.r.rSparks); // cap = (old) MainCap
1425 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1426 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1429 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1430 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1432 if (RtsFlags.ParFlags.ParStats.Full &&
1433 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1434 (emitSchedule || // forced emit
1435 (t && LastTSO && t->id != LastTSO->id))) {
1437 we are running a different TSO, so write a schedule event to log file
1438 NB: If we use fair scheduling we also have to write a deschedule
1439 event for LastTSO; with unfair scheduling we know that the
1440 previous tso has blocked whenever we switch to another tso, so
1441 we don't need it in GUM for now
1443 IF_PAR_DEBUG(fish, // schedule,
1444 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1446 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1447 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1448 emitSchedule = rtsFalse;
1453 /* ----------------------------------------------------------------------------
1454 * After running a thread...
1455 * ------------------------------------------------------------------------- */
1458 schedulePostRunThread (StgTSO *t)
1460 // We have to be able to catch transactions that are in an
1461 // infinite loop as a result of seeing an inconsistent view of
1465 // [a,b] <- mapM readTVar [ta,tb]
1466 // when (a == b) loop
1468 // and a is never equal to b given a consistent view of memory.
1470 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1471 if (!stmValidateNestOfTransactions (t -> trec)) {
1472 debugTrace(DEBUG_sched | DEBUG_stm,
1473 "trec %p found wasting its time", t);
1475 // strip the stack back to the
1476 // ATOMICALLY_FRAME, aborting the (nested)
1477 // transaction, and saving the stack of any
1478 // partially-evaluated thunks on the heap.
1479 throwToSingleThreaded_(&capabilities[0], t,
1480 NULL, rtsTrue, NULL);
1482 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1487 /* HACK 675: if the last thread didn't yield, make sure to print a
1488 SCHEDULE event to the log file when StgRunning the next thread, even
1489 if it is the same one as before */
1491 TimeOfLastYield = CURRENT_TIME;
1494 /* some statistics gathering in the parallel case */
1496 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1500 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1501 globalGranStats.tot_heapover++;
1503 globalParStats.tot_heapover++;
1510 DumpGranEvent(GR_DESCHEDULE, t));
1511 globalGranStats.tot_stackover++;
1514 // DumpGranEvent(GR_DESCHEDULE, t);
1515 globalParStats.tot_stackover++;
1519 case ThreadYielding:
1522 DumpGranEvent(GR_DESCHEDULE, t));
1523 globalGranStats.tot_yields++;
1526 // DumpGranEvent(GR_DESCHEDULE, t);
1527 globalParStats.tot_yields++;
1533 debugTrace(DEBUG_sched,
1534 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1535 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1536 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1537 if (t->block_info.closure!=(StgClosure*)NULL)
1538 print_bq(t->block_info.closure);
1541 // ??? needed; should emit block before
1543 DumpGranEvent(GR_DESCHEDULE, t));
1544 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1547 ASSERT(procStatus[CurrentProc]==Busy ||
1548 ((procStatus[CurrentProc]==Fetching) &&
1549 (t->block_info.closure!=(StgClosure*)NULL)));
1550 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1551 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1552 procStatus[CurrentProc]==Fetching))
1553 procStatus[CurrentProc] = Idle;
1556 //++PAR++ blockThread() writes the event (change?)
1560 case ThreadFinished:
1564 barf("parGlobalStats: unknown return code");
1570 /* -----------------------------------------------------------------------------
1571 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1572 * -------------------------------------------------------------------------- */
1575 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1577 // did the task ask for a large block?
1578 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1579 // if so, get one and push it on the front of the nursery.
1583 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1585 debugTrace(DEBUG_sched,
1586 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1587 (long)t->id, whatNext_strs[t->what_next], blocks);
1589 // don't do this if the nursery is (nearly) full, we'll GC first.
1590 if (cap->r.rCurrentNursery->link != NULL ||
1591 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1592 // if the nursery has only one block.
1595 bd = allocGroup( blocks );
1597 cap->r.rNursery->n_blocks += blocks;
1599 // link the new group into the list
1600 bd->link = cap->r.rCurrentNursery;
1601 bd->u.back = cap->r.rCurrentNursery->u.back;
1602 if (cap->r.rCurrentNursery->u.back != NULL) {
1603 cap->r.rCurrentNursery->u.back->link = bd;
1605 #if !defined(THREADED_RTS)
1606 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1607 g0s0 == cap->r.rNursery);
1609 cap->r.rNursery->blocks = bd;
1611 cap->r.rCurrentNursery->u.back = bd;
1613 // initialise it as a nursery block. We initialise the
1614 // step, gen_no, and flags field of *every* sub-block in
1615 // this large block, because this is easier than making
1616 // sure that we always find the block head of a large
1617 // block whenever we call Bdescr() (eg. evacuate() and
1618 // isAlive() in the GC would both have to do this, at
1622 for (x = bd; x < bd + blocks; x++) {
1623 x->step = cap->r.rNursery;
1629 // This assert can be a killer if the app is doing lots
1630 // of large block allocations.
1631 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1633 // now update the nursery to point to the new block
1634 cap->r.rCurrentNursery = bd;
1636 // we might be unlucky and have another thread get on the
1637 // run queue before us and steal the large block, but in that
1638 // case the thread will just end up requesting another large
1640 pushOnRunQueue(cap,t);
1641 return rtsFalse; /* not actually GC'ing */
1645 debugTrace(DEBUG_sched,
1646 "--<< thread %ld (%s) stopped: HeapOverflow",
1647 (long)t->id, whatNext_strs[t->what_next]);
1650 ASSERT(!is_on_queue(t,CurrentProc));
1651 #elif defined(PARALLEL_HASKELL)
1652 /* Currently we emit a DESCHEDULE event before GC in GUM.
1653 ToDo: either add separate event to distinguish SYSTEM time from rest
1654 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1655 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1656 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1657 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1658 emitSchedule = rtsTrue;
1662 if (context_switch) {
1663 // Sometimes we miss a context switch, e.g. when calling
1664 // primitives in a tight loop, MAYBE_GC() doesn't check the
1665 // context switch flag, and we end up waiting for a GC.
1666 // See #1984, and concurrent/should_run/1984
1668 addToRunQueue(cap,t);
1670 pushOnRunQueue(cap,t);
1673 /* actual GC is done at the end of the while loop in schedule() */
1676 /* -----------------------------------------------------------------------------
1677 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1678 * -------------------------------------------------------------------------- */
1681 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1683 debugTrace (DEBUG_sched,
1684 "--<< thread %ld (%s) stopped, StackOverflow",
1685 (long)t->id, whatNext_strs[t->what_next]);
1687 /* just adjust the stack for this thread, then pop it back
1691 /* enlarge the stack */
1692 StgTSO *new_t = threadStackOverflow(cap, t);
1694 /* The TSO attached to this Task may have moved, so update the
1697 if (task->tso == t) {
1700 pushOnRunQueue(cap,new_t);
1704 /* -----------------------------------------------------------------------------
1705 * Handle a thread that returned to the scheduler with ThreadYielding
1706 * -------------------------------------------------------------------------- */
1709 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1711 // Reset the context switch flag. We don't do this just before
1712 // running the thread, because that would mean we would lose ticks
1713 // during GC, which can lead to unfair scheduling (a thread hogs
1714 // the CPU because the tick always arrives during GC). This way
1715 // penalises threads that do a lot of allocation, but that seems
1716 // better than the alternative.
1719 /* put the thread back on the run queue. Then, if we're ready to
1720 * GC, check whether this is the last task to stop. If so, wake
1721 * up the GC thread. getThread will block during a GC until the
1725 if (t->what_next != prev_what_next) {
1726 debugTrace(DEBUG_sched,
1727 "--<< thread %ld (%s) stopped to switch evaluators",
1728 (long)t->id, whatNext_strs[t->what_next]);
1730 debugTrace(DEBUG_sched,
1731 "--<< thread %ld (%s) stopped, yielding",
1732 (long)t->id, whatNext_strs[t->what_next]);
1737 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1739 ASSERT(t->_link == END_TSO_QUEUE);
1741 // Shortcut if we're just switching evaluators: don't bother
1742 // doing stack squeezing (which can be expensive), just run the
1744 if (t->what_next != prev_what_next) {
1749 ASSERT(!is_on_queue(t,CurrentProc));
1752 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1753 checkThreadQsSanity(rtsTrue));
1757 addToRunQueue(cap,t);
1760 /* add a ContinueThread event to actually process the thread */
1761 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1763 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1765 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1772 /* -----------------------------------------------------------------------------
1773 * Handle a thread that returned to the scheduler with ThreadBlocked
1774 * -------------------------------------------------------------------------- */
1777 scheduleHandleThreadBlocked( StgTSO *t
1778 #if !defined(GRAN) && !defined(DEBUG)
1785 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1786 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1787 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1789 // ??? needed; should emit block before
1791 DumpGranEvent(GR_DESCHEDULE, t));
1792 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1795 ASSERT(procStatus[CurrentProc]==Busy ||
1796 ((procStatus[CurrentProc]==Fetching) &&
1797 (t->block_info.closure!=(StgClosure*)NULL)));
1798 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1799 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1800 procStatus[CurrentProc]==Fetching))
1801 procStatus[CurrentProc] = Idle;
1805 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1806 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1809 if (t->block_info.closure!=(StgClosure*)NULL)
1810 print_bq(t->block_info.closure));
1812 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1815 /* whatever we schedule next, we must log that schedule */
1816 emitSchedule = rtsTrue;
1820 // We don't need to do anything. The thread is blocked, and it
1821 // has tidied up its stack and placed itself on whatever queue
1822 // it needs to be on.
1824 // ASSERT(t->why_blocked != NotBlocked);
1825 // Not true: for example,
1826 // - in THREADED_RTS, the thread may already have been woken
1827 // up by another Capability. This actually happens: try
1828 // conc023 +RTS -N2.
1829 // - the thread may have woken itself up already, because
1830 // threadPaused() might have raised a blocked throwTo
1831 // exception, see maybePerformBlockedException().
1834 if (traceClass(DEBUG_sched)) {
1835 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1836 (unsigned long)t->id, whatNext_strs[t->what_next]);
1837 printThreadBlockage(t);
1842 /* Only for dumping event to log file
1843 ToDo: do I need this in GranSim, too?
1849 /* -----------------------------------------------------------------------------
1850 * Handle a thread that returned to the scheduler with ThreadFinished
1851 * -------------------------------------------------------------------------- */
1854 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1856 /* Need to check whether this was a main thread, and if so,
1857 * return with the return value.
1859 * We also end up here if the thread kills itself with an
1860 * uncaught exception, see Exception.cmm.
1862 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1863 (unsigned long)t->id, whatNext_strs[t->what_next]);
1866 endThread(t, CurrentProc); // clean-up the thread
1867 #elif defined(PARALLEL_HASKELL)
1868 /* For now all are advisory -- HWL */
1869 //if(t->priority==AdvisoryPriority) ??
1870 advisory_thread_count--; // JB: Caution with this counter, buggy!
1873 if(t->dist.priority==RevalPriority)
1877 # if defined(EDENOLD)
1878 // the thread could still have an outport... (BUG)
1879 if (t->eden.outport != -1) {
1880 // delete the outport for the tso which has finished...
1881 IF_PAR_DEBUG(eden_ports,
1882 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1883 t->eden.outport, t->id));
1886 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1887 if (t->eden.epid != -1) {
1888 IF_PAR_DEBUG(eden_ports,
1889 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1890 t->id, t->eden.epid));
1891 removeTSOfromProcess(t);
1896 if (RtsFlags.ParFlags.ParStats.Full &&
1897 !RtsFlags.ParFlags.ParStats.Suppressed)
1898 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1900 // t->par only contains statistics: left out for now...
1902 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1903 t->id,t,t->par.sparkname));
1905 #endif // PARALLEL_HASKELL
1908 // Check whether the thread that just completed was a bound
1909 // thread, and if so return with the result.
1911 // There is an assumption here that all thread completion goes
1912 // through this point; we need to make sure that if a thread
1913 // ends up in the ThreadKilled state, that it stays on the run
1914 // queue so it can be dealt with here.
1919 if (t->bound != task) {
1920 #if !defined(THREADED_RTS)
1921 // Must be a bound thread that is not the topmost one. Leave
1922 // it on the run queue until the stack has unwound to the
1923 // point where we can deal with this. Leaving it on the run
1924 // queue also ensures that the garbage collector knows about
1925 // this thread and its return value (it gets dropped from the
1926 // step->threads list so there's no other way to find it).
1927 appendToRunQueue(cap,t);
1930 // this cannot happen in the threaded RTS, because a
1931 // bound thread can only be run by the appropriate Task.
1932 barf("finished bound thread that isn't mine");
1936 ASSERT(task->tso == t);
1938 if (t->what_next == ThreadComplete) {
1940 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1941 *(task->ret) = (StgClosure *)task->tso->sp[1];
1943 task->stat = Success;
1946 *(task->ret) = NULL;
1948 if (sched_state >= SCHED_INTERRUPTING) {
1949 task->stat = Interrupted;
1951 task->stat = Killed;
1955 removeThreadLabel((StgWord)task->tso->id);
1957 return rtsTrue; // tells schedule() to return
1963 /* -----------------------------------------------------------------------------
1964 * Perform a heap census
1965 * -------------------------------------------------------------------------- */
1968 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1970 // When we have +RTS -i0 and we're heap profiling, do a census at
1971 // every GC. This lets us get repeatable runs for debugging.
1972 if (performHeapProfile ||
1973 (RtsFlags.ProfFlags.profileInterval==0 &&
1974 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1981 /* -----------------------------------------------------------------------------
1982 * Perform a garbage collection if necessary
1983 * -------------------------------------------------------------------------- */
1986 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1989 rtsBool heap_census;
1991 static volatile StgWord waiting_for_gc;
1992 rtsBool was_waiting;
1997 // In order to GC, there must be no threads running Haskell code.
1998 // Therefore, the GC thread needs to hold *all* the capabilities,
1999 // and release them after the GC has completed.
2001 // This seems to be the simplest way: previous attempts involved
2002 // making all the threads with capabilities give up their
2003 // capabilities and sleep except for the *last* one, which
2004 // actually did the GC. But it's quite hard to arrange for all
2005 // the other tasks to sleep and stay asleep.
2008 was_waiting = cas(&waiting_for_gc, 0, 1);
2011 debugTrace(DEBUG_sched, "someone else is trying to GC...");
2012 if (cap) yieldCapability(&cap,task);
2013 } while (waiting_for_gc);
2014 return cap; // NOTE: task->cap might have changed here
2017 for (i=0; i < n_capabilities; i++) {
2018 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
2019 if (cap != &capabilities[i]) {
2020 Capability *pcap = &capabilities[i];
2021 // we better hope this task doesn't get migrated to
2022 // another Capability while we're waiting for this one.
2023 // It won't, because load balancing happens while we have
2024 // all the Capabilities, but even so it's a slightly
2025 // unsavoury invariant.
2028 waitForReturnCapability(&pcap, task);
2029 if (pcap != &capabilities[i]) {
2030 barf("scheduleDoGC: got the wrong capability");
2035 waiting_for_gc = rtsFalse;
2038 // so this happens periodically:
2039 if (cap) scheduleCheckBlackHoles(cap);
2041 IF_DEBUG(scheduler, printAllThreads());
2044 * We now have all the capabilities; if we're in an interrupting
2045 * state, then we should take the opportunity to delete all the
2046 * threads in the system.
2048 if (sched_state >= SCHED_INTERRUPTING) {
2049 deleteAllThreads(&capabilities[0]);
2050 sched_state = SCHED_SHUTTING_DOWN;
2053 heap_census = scheduleNeedHeapProfile(rtsTrue);
2055 /* everybody back, start the GC.
2056 * Could do it in this thread, or signal a condition var
2057 * to do it in another thread. Either way, we need to
2058 * broadcast on gc_pending_cond afterward.
2060 #if defined(THREADED_RTS)
2061 debugTrace(DEBUG_sched, "doing GC");
2063 GarbageCollect(force_major || heap_census);
2066 debugTrace(DEBUG_sched, "performing heap census");
2068 performHeapProfile = rtsFalse;
2071 #if defined(THREADED_RTS)
2072 // release our stash of capabilities.
2073 for (i = 0; i < n_capabilities; i++) {
2074 if (cap != &capabilities[i]) {
2075 task->cap = &capabilities[i];
2076 releaseCapability(&capabilities[i]);
2087 /* add a ContinueThread event to continue execution of current thread */
2088 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2090 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2092 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2100 /* ---------------------------------------------------------------------------
2101 * Singleton fork(). Do not copy any running threads.
2102 * ------------------------------------------------------------------------- */
2105 forkProcess(HsStablePtr *entry
2106 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2111 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2118 #if defined(THREADED_RTS)
2119 if (RtsFlags.ParFlags.nNodes > 1) {
2120 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2121 stg_exit(EXIT_FAILURE);
2125 debugTrace(DEBUG_sched, "forking!");
2127 // ToDo: for SMP, we should probably acquire *all* the capabilities
2130 // no funny business: hold locks while we fork, otherwise if some
2131 // other thread is holding a lock when the fork happens, the data
2132 // structure protected by the lock will forever be in an
2133 // inconsistent state in the child. See also #1391.
2134 ACQUIRE_LOCK(&sched_mutex);
2135 ACQUIRE_LOCK(&cap->lock);
2136 ACQUIRE_LOCK(&cap->running_task->lock);
2140 if (pid) { // parent
2142 RELEASE_LOCK(&sched_mutex);
2143 RELEASE_LOCK(&cap->lock);
2144 RELEASE_LOCK(&cap->running_task->lock);
2146 // just return the pid
2152 #if defined(THREADED_RTS)
2153 initMutex(&sched_mutex);
2154 initMutex(&cap->lock);
2155 initMutex(&cap->running_task->lock);
2158 // Now, all OS threads except the thread that forked are
2159 // stopped. We need to stop all Haskell threads, including
2160 // those involved in foreign calls. Also we need to delete
2161 // all Tasks, because they correspond to OS threads that are
2164 for (s = 0; s < total_steps; s++) {
2165 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2166 if (t->what_next == ThreadRelocated) {
2169 next = t->global_link;
2170 // don't allow threads to catch the ThreadKilled
2171 // exception, but we do want to raiseAsync() because these
2172 // threads may be evaluating thunks that we need later.
2173 deleteThread_(cap,t);
2178 // Empty the run queue. It seems tempting to let all the
2179 // killed threads stay on the run queue as zombies to be
2180 // cleaned up later, but some of them correspond to bound
2181 // threads for which the corresponding Task does not exist.
2182 cap->run_queue_hd = END_TSO_QUEUE;
2183 cap->run_queue_tl = END_TSO_QUEUE;
2185 // Any suspended C-calling Tasks are no more, their OS threads
2187 cap->suspended_ccalling_tasks = NULL;
2189 // Empty the threads lists. Otherwise, the garbage
2190 // collector may attempt to resurrect some of these threads.
2191 for (s = 0; s < total_steps; s++) {
2192 all_steps[s].threads = END_TSO_QUEUE;
2195 // Wipe the task list, except the current Task.
2196 ACQUIRE_LOCK(&sched_mutex);
2197 for (task = all_tasks; task != NULL; task=task->all_link) {
2198 if (task != cap->running_task) {
2199 #if defined(THREADED_RTS)
2200 initMutex(&task->lock); // see #1391
2205 RELEASE_LOCK(&sched_mutex);
2207 #if defined(THREADED_RTS)
2208 // Wipe our spare workers list, they no longer exist. New
2209 // workers will be created if necessary.
2210 cap->spare_workers = NULL;
2211 cap->returning_tasks_hd = NULL;
2212 cap->returning_tasks_tl = NULL;
2215 // On Unix, all timers are reset in the child, so we need to start
2220 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2221 rts_checkSchedStatus("forkProcess",cap);
2224 hs_exit(); // clean up and exit
2225 stg_exit(EXIT_SUCCESS);
2227 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2228 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2233 /* ---------------------------------------------------------------------------
2234 * Delete all the threads in the system
2235 * ------------------------------------------------------------------------- */
2238 deleteAllThreads ( Capability *cap )
2240 // NOTE: only safe to call if we own all capabilities.
2245 debugTrace(DEBUG_sched,"deleting all threads");
2246 for (s = 0; s < total_steps; s++) {
2247 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2248 if (t->what_next == ThreadRelocated) {
2251 next = t->global_link;
2252 deleteThread(cap,t);
2257 // The run queue now contains a bunch of ThreadKilled threads. We
2258 // must not throw these away: the main thread(s) will be in there
2259 // somewhere, and the main scheduler loop has to deal with it.
2260 // Also, the run queue is the only thing keeping these threads from
2261 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2263 #if !defined(THREADED_RTS)
2264 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2265 ASSERT(sleeping_queue == END_TSO_QUEUE);
2269 /* -----------------------------------------------------------------------------
2270 Managing the suspended_ccalling_tasks list.
2271 Locks required: sched_mutex
2272 -------------------------------------------------------------------------- */
2275 suspendTask (Capability *cap, Task *task)
2277 ASSERT(task->next == NULL && task->prev == NULL);
2278 task->next = cap->suspended_ccalling_tasks;
2280 if (cap->suspended_ccalling_tasks) {
2281 cap->suspended_ccalling_tasks->prev = task;
2283 cap->suspended_ccalling_tasks = task;
2287 recoverSuspendedTask (Capability *cap, Task *task)
2290 task->prev->next = task->next;
2292 ASSERT(cap->suspended_ccalling_tasks == task);
2293 cap->suspended_ccalling_tasks = task->next;
2296 task->next->prev = task->prev;
2298 task->next = task->prev = NULL;
2301 /* ---------------------------------------------------------------------------
2302 * Suspending & resuming Haskell threads.
2304 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2305 * its capability before calling the C function. This allows another
2306 * task to pick up the capability and carry on running Haskell
2307 * threads. It also means that if the C call blocks, it won't lock
2310 * The Haskell thread making the C call is put to sleep for the
2311 * duration of the call, on the susepended_ccalling_threads queue. We
2312 * give out a token to the task, which it can use to resume the thread
2313 * on return from the C function.
2314 * ------------------------------------------------------------------------- */
2317 suspendThread (StgRegTable *reg)
2324 StgWord32 saved_winerror;
2327 saved_errno = errno;
2329 saved_winerror = GetLastError();
2332 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2334 cap = regTableToCapability(reg);
2336 task = cap->running_task;
2337 tso = cap->r.rCurrentTSO;
2339 debugTrace(DEBUG_sched,
2340 "thread %lu did a safe foreign call",
2341 (unsigned long)cap->r.rCurrentTSO->id);
2343 // XXX this might not be necessary --SDM
2344 tso->what_next = ThreadRunGHC;
2346 threadPaused(cap,tso);
2348 if ((tso->flags & TSO_BLOCKEX) == 0) {
2349 tso->why_blocked = BlockedOnCCall;
2350 tso->flags |= TSO_BLOCKEX;
2351 tso->flags &= ~TSO_INTERRUPTIBLE;
2353 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2356 // Hand back capability
2357 task->suspended_tso = tso;
2359 ACQUIRE_LOCK(&cap->lock);
2361 suspendTask(cap,task);
2362 cap->in_haskell = rtsFalse;
2363 releaseCapability_(cap);
2365 RELEASE_LOCK(&cap->lock);
2367 #if defined(THREADED_RTS)
2368 /* Preparing to leave the RTS, so ensure there's a native thread/task
2369 waiting to take over.
2371 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2374 errno = saved_errno;
2376 SetLastError(saved_winerror);
2382 resumeThread (void *task_)
2389 StgWord32 saved_winerror;
2392 saved_errno = errno;
2394 saved_winerror = GetLastError();
2398 // Wait for permission to re-enter the RTS with the result.
2399 waitForReturnCapability(&cap,task);
2400 // we might be on a different capability now... but if so, our
2401 // entry on the suspended_ccalling_tasks list will also have been
2404 // Remove the thread from the suspended list
2405 recoverSuspendedTask(cap,task);
2407 tso = task->suspended_tso;
2408 task->suspended_tso = NULL;
2409 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2410 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2412 if (tso->why_blocked == BlockedOnCCall) {
2413 awakenBlockedExceptionQueue(cap,tso);
2414 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2417 /* Reset blocking status */
2418 tso->why_blocked = NotBlocked;
2420 cap->r.rCurrentTSO = tso;
2421 cap->in_haskell = rtsTrue;
2422 errno = saved_errno;
2424 SetLastError(saved_winerror);
2427 /* We might have GC'd, mark the TSO dirty again */
2430 IF_DEBUG(sanity, checkTSO(tso));
2435 /* ---------------------------------------------------------------------------
2438 * scheduleThread puts a thread on the end of the runnable queue.
2439 * This will usually be done immediately after a thread is created.
2440 * The caller of scheduleThread must create the thread using e.g.
2441 * createThread and push an appropriate closure
2442 * on this thread's stack before the scheduler is invoked.
2443 * ------------------------------------------------------------------------ */
2446 scheduleThread(Capability *cap, StgTSO *tso)
2448 // The thread goes at the *end* of the run-queue, to avoid possible
2449 // starvation of any threads already on the queue.
2450 appendToRunQueue(cap,tso);
2454 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2456 #if defined(THREADED_RTS)
2457 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2458 // move this thread from now on.
2459 cpu %= RtsFlags.ParFlags.nNodes;
2460 if (cpu == cap->no) {
2461 appendToRunQueue(cap,tso);
2463 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2466 appendToRunQueue(cap,tso);
2471 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2475 // We already created/initialised the Task
2476 task = cap->running_task;
2478 // This TSO is now a bound thread; make the Task and TSO
2479 // point to each other.
2485 task->stat = NoStatus;
2487 appendToRunQueue(cap,tso);
2489 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2492 /* GranSim specific init */
2493 CurrentTSO = m->tso; // the TSO to run
2494 procStatus[MainProc] = Busy; // status of main PE
2495 CurrentProc = MainProc; // PE to run it on
2498 cap = schedule(cap,task);
2500 ASSERT(task->stat != NoStatus);
2501 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2503 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2507 /* ----------------------------------------------------------------------------
2509 * ------------------------------------------------------------------------- */
2511 #if defined(THREADED_RTS)
2513 workerStart(Task *task)
2517 // See startWorkerTask().
2518 ACQUIRE_LOCK(&task->lock);
2520 RELEASE_LOCK(&task->lock);
2522 // set the thread-local pointer to the Task:
2525 // schedule() runs without a lock.
2526 cap = schedule(cap,task);
2528 // On exit from schedule(), we have a Capability.
2529 releaseCapability(cap);
2530 workerTaskStop(task);
2534 /* ---------------------------------------------------------------------------
2537 * Initialise the scheduler. This resets all the queues - if the
2538 * queues contained any threads, they'll be garbage collected at the
2541 * ------------------------------------------------------------------------ */
2548 for (i=0; i<=MAX_PROC; i++) {
2549 run_queue_hds[i] = END_TSO_QUEUE;
2550 run_queue_tls[i] = END_TSO_QUEUE;
2551 blocked_queue_hds[i] = END_TSO_QUEUE;
2552 blocked_queue_tls[i] = END_TSO_QUEUE;
2553 ccalling_threadss[i] = END_TSO_QUEUE;
2554 blackhole_queue[i] = END_TSO_QUEUE;
2555 sleeping_queue = END_TSO_QUEUE;
2557 #elif !defined(THREADED_RTS)
2558 blocked_queue_hd = END_TSO_QUEUE;
2559 blocked_queue_tl = END_TSO_QUEUE;
2560 sleeping_queue = END_TSO_QUEUE;
2563 blackhole_queue = END_TSO_QUEUE;
2566 sched_state = SCHED_RUNNING;
2567 recent_activity = ACTIVITY_YES;
2569 #if defined(THREADED_RTS)
2570 /* Initialise the mutex and condition variables used by
2572 initMutex(&sched_mutex);
2575 ACQUIRE_LOCK(&sched_mutex);
2577 /* A capability holds the state a native thread needs in
2578 * order to execute STG code. At least one capability is
2579 * floating around (only THREADED_RTS builds have more than one).
2585 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2589 #if defined(THREADED_RTS)
2591 * Eagerly start one worker to run each Capability, except for
2592 * Capability 0. The idea is that we're probably going to start a
2593 * bound thread on Capability 0 pretty soon, so we don't want a
2594 * worker task hogging it.
2599 for (i = 1; i < n_capabilities; i++) {
2600 cap = &capabilities[i];
2601 ACQUIRE_LOCK(&cap->lock);
2602 startWorkerTask(cap, workerStart);
2603 RELEASE_LOCK(&cap->lock);
2608 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2610 RELEASE_LOCK(&sched_mutex);
2615 rtsBool wait_foreign
2616 #if !defined(THREADED_RTS)
2617 __attribute__((unused))
2620 /* see Capability.c, shutdownCapability() */
2624 #if defined(THREADED_RTS)
2625 ACQUIRE_LOCK(&sched_mutex);
2626 task = newBoundTask();
2627 RELEASE_LOCK(&sched_mutex);
2630 // If we haven't killed all the threads yet, do it now.
2631 if (sched_state < SCHED_SHUTTING_DOWN) {
2632 sched_state = SCHED_INTERRUPTING;
2633 scheduleDoGC(NULL,task,rtsFalse);
2635 sched_state = SCHED_SHUTTING_DOWN;
2637 #if defined(THREADED_RTS)
2641 for (i = 0; i < n_capabilities; i++) {
2642 shutdownCapability(&capabilities[i], task, wait_foreign);
2644 boundTaskExiting(task);
2648 freeCapability(&MainCapability);
2653 freeScheduler( void )
2656 if (n_capabilities != 1) {
2657 stgFree(capabilities);
2659 #if defined(THREADED_RTS)
2660 closeMutex(&sched_mutex);
2664 /* -----------------------------------------------------------------------------
2667 This is the interface to the garbage collector from Haskell land.
2668 We provide this so that external C code can allocate and garbage
2669 collect when called from Haskell via _ccall_GC.
2670 -------------------------------------------------------------------------- */
2673 performGC_(rtsBool force_major)
2676 // We must grab a new Task here, because the existing Task may be
2677 // associated with a particular Capability, and chained onto the
2678 // suspended_ccalling_tasks queue.
2679 ACQUIRE_LOCK(&sched_mutex);
2680 task = newBoundTask();
2681 RELEASE_LOCK(&sched_mutex);
2682 scheduleDoGC(NULL,task,force_major);
2683 boundTaskExiting(task);
2689 performGC_(rtsFalse);
2693 performMajorGC(void)
2695 performGC_(rtsTrue);
2698 /* -----------------------------------------------------------------------------
2701 If the thread has reached its maximum stack size, then raise the
2702 StackOverflow exception in the offending thread. Otherwise
2703 relocate the TSO into a larger chunk of memory and adjust its stack
2705 -------------------------------------------------------------------------- */
2708 threadStackOverflow(Capability *cap, StgTSO *tso)
2710 nat new_stack_size, stack_words;
2715 IF_DEBUG(sanity,checkTSO(tso));
2717 // don't allow throwTo() to modify the blocked_exceptions queue
2718 // while we are moving the TSO:
2719 lockClosure((StgClosure *)tso);
2721 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2722 // NB. never raise a StackOverflow exception if the thread is
2723 // inside Control.Exceptino.block. It is impractical to protect
2724 // against stack overflow exceptions, since virtually anything
2725 // can raise one (even 'catch'), so this is the only sensible
2726 // thing to do here. See bug #767.
2728 debugTrace(DEBUG_gc,
2729 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2730 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2732 /* If we're debugging, just print out the top of the stack */
2733 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2736 // Send this thread the StackOverflow exception
2738 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2742 /* Try to double the current stack size. If that takes us over the
2743 * maximum stack size for this thread, then use the maximum instead.
2744 * Finally round up so the TSO ends up as a whole number of blocks.
2746 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2747 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2748 TSO_STRUCT_SIZE)/sizeof(W_);
2749 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2750 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2752 debugTrace(DEBUG_sched,
2753 "increasing stack size from %ld words to %d.",
2754 (long)tso->stack_size, new_stack_size);
2756 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2757 TICK_ALLOC_TSO(new_stack_size,0);
2759 /* copy the TSO block and the old stack into the new area */
2760 memcpy(dest,tso,TSO_STRUCT_SIZE);
2761 stack_words = tso->stack + tso->stack_size - tso->sp;
2762 new_sp = (P_)dest + new_tso_size - stack_words;
2763 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2765 /* relocate the stack pointers... */
2767 dest->stack_size = new_stack_size;
2769 /* Mark the old TSO as relocated. We have to check for relocated
2770 * TSOs in the garbage collector and any primops that deal with TSOs.
2772 * It's important to set the sp value to just beyond the end
2773 * of the stack, so we don't attempt to scavenge any part of the
2776 tso->what_next = ThreadRelocated;
2777 setTSOLink(cap,tso,dest);
2778 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2779 tso->why_blocked = NotBlocked;
2781 IF_PAR_DEBUG(verbose,
2782 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2783 tso->id, tso, tso->stack_size);
2784 /* If we're debugging, just print out the top of the stack */
2785 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2791 IF_DEBUG(sanity,checkTSO(dest));
2793 IF_DEBUG(scheduler,printTSO(dest));
2800 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2802 bdescr *bd, *new_bd;
2803 lnat new_tso_size_w, tso_size_w;
2806 tso_size_w = tso_sizeW(tso);
2808 if (tso_size_w < MBLOCK_SIZE_W ||
2809 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2814 // don't allow throwTo() to modify the blocked_exceptions queue
2815 // while we are moving the TSO:
2816 lockClosure((StgClosure *)tso);
2818 new_tso_size_w = round_to_mblocks(tso_size_w/2);
2820 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2821 tso->id, tso_size_w, new_tso_size_w);
2823 bd = Bdescr((StgPtr)tso);
2824 new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2825 new_bd->free = bd->free;
2826 bd->free = bd->start + TSO_STRUCT_SIZEW;
2828 new_tso = (StgTSO *)new_bd->start;
2829 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2830 new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2832 tso->what_next = ThreadRelocated;
2833 tso->_link = new_tso; // no write barrier reqd: same generation
2835 // The TSO attached to this Task may have moved, so update the
2837 if (task->tso == tso) {
2838 task->tso = new_tso;
2844 IF_DEBUG(sanity,checkTSO(new_tso));
2849 /* ---------------------------------------------------------------------------
2851 - usually called inside a signal handler so it mustn't do anything fancy.
2852 ------------------------------------------------------------------------ */
2855 interruptStgRts(void)
2857 sched_state = SCHED_INTERRUPTING;
2862 /* -----------------------------------------------------------------------------
2865 This function causes at least one OS thread to wake up and run the
2866 scheduler loop. It is invoked when the RTS might be deadlocked, or
2867 an external event has arrived that may need servicing (eg. a
2868 keyboard interrupt).
2870 In the single-threaded RTS we don't do anything here; we only have
2871 one thread anyway, and the event that caused us to want to wake up
2872 will have interrupted any blocking system call in progress anyway.
2873 -------------------------------------------------------------------------- */
2878 #if defined(THREADED_RTS)
2879 // This forces the IO Manager thread to wakeup, which will
2880 // in turn ensure that some OS thread wakes up and runs the
2881 // scheduler loop, which will cause a GC and deadlock check.
2886 /* -----------------------------------------------------------------------------
2889 * Check the blackhole_queue for threads that can be woken up. We do
2890 * this periodically: before every GC, and whenever the run queue is
2893 * An elegant solution might be to just wake up all the blocked
2894 * threads with awakenBlockedQueue occasionally: they'll go back to
2895 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2896 * doesn't give us a way to tell whether we've actually managed to
2897 * wake up any threads, so we would be busy-waiting.
2899 * -------------------------------------------------------------------------- */
2902 checkBlackHoles (Capability *cap)
2905 rtsBool any_woke_up = rtsFalse;
2908 // blackhole_queue is global:
2909 ASSERT_LOCK_HELD(&sched_mutex);
2911 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2913 // ASSUMES: sched_mutex
2914 prev = &blackhole_queue;
2915 t = blackhole_queue;
2916 while (t != END_TSO_QUEUE) {
2917 ASSERT(t->why_blocked == BlockedOnBlackHole);
2918 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2919 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2920 IF_DEBUG(sanity,checkTSO(t));
2921 t = unblockOne(cap, t);
2922 // urk, the threads migrate to the current capability
2923 // here, but we'd like to keep them on the original one.
2925 any_woke_up = rtsTrue;
2935 /* -----------------------------------------------------------------------------
2938 This is used for interruption (^C) and forking, and corresponds to
2939 raising an exception but without letting the thread catch the
2941 -------------------------------------------------------------------------- */
2944 deleteThread (Capability *cap, StgTSO *tso)
2946 // NOTE: must only be called on a TSO that we have exclusive
2947 // access to, because we will call throwToSingleThreaded() below.
2948 // The TSO must be on the run queue of the Capability we own, or
2949 // we must own all Capabilities.
2951 if (tso->why_blocked != BlockedOnCCall &&
2952 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2953 throwToSingleThreaded(cap,tso,NULL);
2957 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2959 deleteThread_(Capability *cap, StgTSO *tso)
2960 { // for forkProcess only:
2961 // like deleteThread(), but we delete threads in foreign calls, too.
2963 if (tso->why_blocked == BlockedOnCCall ||
2964 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2965 unblockOne(cap,tso);
2966 tso->what_next = ThreadKilled;
2968 deleteThread(cap,tso);
2973 /* -----------------------------------------------------------------------------
2974 raiseExceptionHelper
2976 This function is called by the raise# primitve, just so that we can
2977 move some of the tricky bits of raising an exception from C-- into
2978 C. Who knows, it might be a useful re-useable thing here too.
2979 -------------------------------------------------------------------------- */
2982 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2984 Capability *cap = regTableToCapability(reg);
2985 StgThunk *raise_closure = NULL;
2987 StgRetInfoTable *info;
2989 // This closure represents the expression 'raise# E' where E
2990 // is the exception raise. It is used to overwrite all the
2991 // thunks which are currently under evaluataion.
2994 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2995 // LDV profiling: stg_raise_info has THUNK as its closure
2996 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2997 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2998 // 1 does not cause any problem unless profiling is performed.
2999 // However, when LDV profiling goes on, we need to linearly scan
3000 // small object pool, where raise_closure is stored, so we should
3001 // use MIN_UPD_SIZE.
3003 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3004 // sizeofW(StgClosure)+1);
3008 // Walk up the stack, looking for the catch frame. On the way,
3009 // we update any closures pointed to from update frames with the
3010 // raise closure that we just built.
3014 info = get_ret_itbl((StgClosure *)p);
3015 next = p + stack_frame_sizeW((StgClosure *)p);
3016 switch (info->i.type) {
3019 // Only create raise_closure if we need to.
3020 if (raise_closure == NULL) {
3022 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3023 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3024 raise_closure->payload[0] = exception;
3026 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3030 case ATOMICALLY_FRAME:
3031 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3033 return ATOMICALLY_FRAME;
3039 case CATCH_STM_FRAME:
3040 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3042 return CATCH_STM_FRAME;
3048 case CATCH_RETRY_FRAME:
3057 /* -----------------------------------------------------------------------------
3058 findRetryFrameHelper
3060 This function is called by the retry# primitive. It traverses the stack
3061 leaving tso->sp referring to the frame which should handle the retry.
3063 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3064 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3066 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3067 create) because retries are not considered to be exceptions, despite the
3068 similar implementation.
3070 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3071 not be created within memory transactions.
3072 -------------------------------------------------------------------------- */
3075 findRetryFrameHelper (StgTSO *tso)
3078 StgRetInfoTable *info;
3082 info = get_ret_itbl((StgClosure *)p);
3083 next = p + stack_frame_sizeW((StgClosure *)p);
3084 switch (info->i.type) {
3086 case ATOMICALLY_FRAME:
3087 debugTrace(DEBUG_stm,
3088 "found ATOMICALLY_FRAME at %p during retry", p);
3090 return ATOMICALLY_FRAME;
3092 case CATCH_RETRY_FRAME:
3093 debugTrace(DEBUG_stm,
3094 "found CATCH_RETRY_FRAME at %p during retrry", p);
3096 return CATCH_RETRY_FRAME;
3098 case CATCH_STM_FRAME: {
3099 StgTRecHeader *trec = tso -> trec;
3100 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3101 debugTrace(DEBUG_stm,
3102 "found CATCH_STM_FRAME at %p during retry", p);
3103 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3104 stmAbortTransaction(tso -> cap, trec);
3105 stmFreeAbortedTRec(tso -> cap, trec);
3106 tso -> trec = outer;
3113 ASSERT(info->i.type != CATCH_FRAME);
3114 ASSERT(info->i.type != STOP_FRAME);
3121 /* -----------------------------------------------------------------------------
3122 resurrectThreads is called after garbage collection on the list of
3123 threads found to be garbage. Each of these threads will be woken
3124 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3125 on an MVar, or NonTermination if the thread was blocked on a Black
3128 Locks: assumes we hold *all* the capabilities.
3129 -------------------------------------------------------------------------- */
3132 resurrectThreads (StgTSO *threads)
3138 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3139 next = tso->global_link;
3141 step = Bdescr((P_)tso)->step;
3142 tso->global_link = step->threads;
3143 step->threads = tso;
3145 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3147 // Wake up the thread on the Capability it was last on
3150 switch (tso->why_blocked) {
3152 case BlockedOnException:
3153 /* Called by GC - sched_mutex lock is currently held. */
3154 throwToSingleThreaded(cap, tso,
3155 (StgClosure *)BlockedOnDeadMVar_closure);
3157 case BlockedOnBlackHole:
3158 throwToSingleThreaded(cap, tso,
3159 (StgClosure *)nonTermination_closure);
3162 throwToSingleThreaded(cap, tso,
3163 (StgClosure *)BlockedIndefinitely_closure);
3166 /* This might happen if the thread was blocked on a black hole
3167 * belonging to a thread that we've just woken up (raiseAsync
3168 * can wake up threads, remember...).
3172 barf("resurrectThreads: thread blocked in a strange way");
3177 /* -----------------------------------------------------------------------------
3178 performPendingThrowTos is called after garbage collection, and
3179 passed a list of threads that were found to have pending throwTos
3180 (tso->blocked_exceptions was not empty), and were blocked.
3181 Normally this doesn't happen, because we would deliver the
3182 exception directly if the target thread is blocked, but there are
3183 small windows where it might occur on a multiprocessor (see
3186 NB. we must be holding all the capabilities at this point, just
3187 like resurrectThreads().
3188 -------------------------------------------------------------------------- */
3191 performPendingThrowTos (StgTSO *threads)
3197 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3198 next = tso->global_link;
3200 step = Bdescr((P_)tso)->step;
3201 tso->global_link = step->threads;
3202 step->threads = tso;
3204 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
3207 maybePerformBlockedException(cap, tso);