1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "OSThreads.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
22 #include "RtsSignals.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
45 #include "Capability.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
52 #include "RaiseAsync.h"
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
70 // Turn off inlining when debugging - it obfuscates things
73 # define STATIC_INLINE static
76 /* -----------------------------------------------------------------------------
78 * -------------------------------------------------------------------------- */
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
112 StgTSO *blackhole_queue = NULL;
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
119 rtsBool blackholes_need_checking = rtsFalse;
121 /* Linked list of all threads.
122 * Used for detecting garbage collected threads.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *all_threads = NULL;
127 /* flag set by signal handler to precipitate a context switch
128 * LOCK: none (just an advisory flag)
130 int context_switch = 0;
132 /* flag that tracks whether we have done any execution in this time slice.
133 * LOCK: currently none, perhaps we should lock (but needs to be
134 * updated in the fast path of the scheduler).
136 nat recent_activity = ACTIVITY_YES;
138 /* if this flag is set as well, give up execution
139 * LOCK: none (changes once, from false->true)
141 rtsBool sched_state = SCHED_RUNNING;
147 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
148 * exists - earlier gccs apparently didn't.
154 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155 * in an MT setting, needed to signal that a worker thread shouldn't hang around
156 * in the scheduler when it is out of work.
158 rtsBool shutting_down_scheduler = rtsFalse;
161 * This mutex protects most of the global scheduler data in
162 * the THREADED_RTS runtime.
164 #if defined(THREADED_RTS)
168 #if defined(PARALLEL_HASKELL)
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
178 /* -----------------------------------------------------------------------------
179 * static function prototypes
180 * -------------------------------------------------------------------------- */
182 static Capability *schedule (Capability *initialCapability, Task *task);
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
214 nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220 rtsBool force_major);
222 static rtsBool checkBlackHoles(Capability *cap);
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);
239 static char *whatNext_strs[] = {
249 /* -----------------------------------------------------------------------------
250 * Putting a thread on the run queue: different scheduling policies
251 * -------------------------------------------------------------------------- */
254 addToRunQueue( Capability *cap, StgTSO *t )
256 #if defined(PARALLEL_HASKELL)
257 if (RtsFlags.ParFlags.doFairScheduling) {
258 // this does round-robin scheduling; good for concurrency
259 appendToRunQueue(cap,t);
261 // this does unfair scheduling; good for parallelism
262 pushOnRunQueue(cap,t);
265 // this does round-robin scheduling; good for concurrency
266 appendToRunQueue(cap,t);
270 /* ---------------------------------------------------------------------------
271 Main scheduling loop.
273 We use round-robin scheduling, each thread returning to the
274 scheduler loop when one of these conditions is detected:
277 * timer expires (thread yields)
283 In a GranSim setup this loop iterates over the global event queue.
284 This revolves around the global event queue, which determines what
285 to do next. Therefore, it's more complicated than either the
286 concurrent or the parallel (GUM) setup.
289 GUM iterates over incoming messages.
290 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291 and sends out a fish whenever it has nothing to do; in-between
292 doing the actual reductions (shared code below) it processes the
293 incoming messages and deals with delayed operations
294 (see PendingFetches).
295 This is not the ugliest code you could imagine, but it's bloody close.
297 ------------------------------------------------------------------------ */
300 schedule (Capability *initialCapability, Task *task)
304 StgThreadReturnCode ret;
307 #elif defined(PARALLEL_HASKELL)
310 rtsBool receivedFinish = rtsFalse;
312 nat tp_size, sp_size; // stats only
317 #if defined(THREADED_RTS)
318 rtsBool first = rtsTrue;
321 cap = initialCapability;
323 // Pre-condition: this task owns initialCapability.
324 // The sched_mutex is *NOT* held
325 // NB. on return, we still hold a capability.
327 debugTrace (DEBUG_sched,
328 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329 task, initialCapability);
333 // -----------------------------------------------------------
334 // Scheduler loop starts here:
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION (!receivedFinish)
339 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
341 #define TERMINATION_CONDITION rtsTrue
344 while (TERMINATION_CONDITION) {
347 /* Choose the processor with the next event */
348 CurrentProc = event->proc;
349 CurrentTSO = event->tso;
352 #if defined(THREADED_RTS)
354 // don't yield the first time, we want a chance to run this
355 // thread for a bit, even if there are others banging at the
358 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360 // Yield the capability to higher-priority tasks if necessary.
361 yieldCapability(&cap, task);
365 #if defined(THREADED_RTS)
366 schedulePushWork(cap,task);
369 // Check whether we have re-entered the RTS from Haskell without
370 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372 if (cap->in_haskell) {
373 errorBelch("schedule: re-entered unsafely.\n"
374 " Perhaps a 'foreign import unsafe' should be 'safe'?");
375 stg_exit(EXIT_FAILURE);
378 // The interruption / shutdown sequence.
380 // In order to cleanly shut down the runtime, we want to:
381 // * make sure that all main threads return to their callers
382 // with the state 'Interrupted'.
383 // * clean up all OS threads assocated with the runtime
384 // * free all memory etc.
386 // So the sequence for ^C goes like this:
388 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
389 // arranges for some Capability to wake up
391 // * all threads in the system are halted, and the zombies are
392 // placed on the run queue for cleaning up. We acquire all
393 // the capabilities in order to delete the threads, this is
394 // done by scheduleDoGC() for convenience (because GC already
395 // needs to acquire all the capabilities). We can't kill
396 // threads involved in foreign calls.
398 // * somebody calls shutdownHaskell(), which calls exitScheduler()
400 // * sched_state := SCHED_SHUTTING_DOWN
402 // * all workers exit when the run queue on their capability
403 // drains. All main threads will also exit when their TSO
404 // reaches the head of the run queue and they can return.
406 // * eventually all Capabilities will shut down, and the RTS can
409 // * We might be left with threads blocked in foreign calls,
410 // we should really attempt to kill these somehow (TODO);
412 switch (sched_state) {
415 case SCHED_INTERRUPTING:
416 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418 discardSparksCap(cap);
420 /* scheduleDoGC() deletes all the threads */
421 cap = scheduleDoGC(cap,task,rtsFalse);
423 case SCHED_SHUTTING_DOWN:
424 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425 // If we are a worker, just exit. If we're a bound thread
426 // then we will exit below when we've removed our TSO from
428 if (task->tso == NULL && emptyRunQueue(cap)) {
433 barf("sched_state: %d", sched_state);
436 #if defined(THREADED_RTS)
437 // If the run queue is empty, take a spark and turn it into a thread.
439 if (emptyRunQueue(cap)) {
441 spark = findSpark(cap);
443 debugTrace(DEBUG_sched,
444 "turning spark of closure %p into a thread",
445 (StgClosure *)spark);
446 createSparkThread(cap,spark);
450 #endif // THREADED_RTS
452 scheduleStartSignalHandlers(cap);
454 // Only check the black holes here if we've nothing else to do.
455 // During normal execution, the black hole list only gets checked
456 // at GC time, to avoid repeatedly traversing this possibly long
457 // list each time around the scheduler.
458 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460 scheduleCheckWakeupThreads(cap);
462 scheduleCheckBlockedThreads(cap);
464 scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466 cap = task->cap; // reload cap, it might have changed
469 // Normally, the only way we can get here with no threads to
470 // run is if a keyboard interrupt received during
471 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472 // Additionally, it is not fatal for the
473 // threaded RTS to reach here with no threads to run.
475 // win32: might be here due to awaitEvent() being abandoned
476 // as a result of a console event having been delivered.
477 if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479 ASSERT(sched_state >= SCHED_INTERRUPTING);
481 continue; // nothing to do
484 #if defined(PARALLEL_HASKELL)
485 scheduleSendPendingMessages();
486 if (emptyRunQueue(cap) && scheduleActivateSpark())
490 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
493 /* If we still have no work we need to send a FISH to get a spark
495 if (emptyRunQueue(cap)) {
496 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497 ASSERT(rtsFalse); // should not happen at the moment
499 // from here: non-empty run queue.
500 // TODO: merge above case with this, only one call processMessages() !
501 if (PacketsWaiting()) { /* process incoming messages, if
502 any pending... only in else
503 because getRemoteWork waits for
505 receivedFinish = processMessages();
510 scheduleProcessEvent(event);
514 // Get a thread to run
516 t = popRunQueue(cap);
518 #if defined(GRAN) || defined(PAR)
519 scheduleGranParReport(); // some kind of debuging output
521 // Sanity check the thread we're about to run. This can be
522 // expensive if there is lots of thread switching going on...
523 IF_DEBUG(sanity,checkTSO(t));
526 #if defined(THREADED_RTS)
527 // Check whether we can run this thread in the current task.
528 // If not, we have to pass our capability to the right task.
530 Task *bound = t->bound;
534 debugTrace(DEBUG_sched,
535 "### Running thread %lu in bound thread", (unsigned long)t->id);
536 // yes, the Haskell thread is bound to the current native thread
538 debugTrace(DEBUG_sched,
539 "### thread %lu bound to another OS thread", (unsigned long)t->id);
540 // no, bound to a different Haskell thread: pass to that thread
541 pushOnRunQueue(cap,t);
545 // The thread we want to run is unbound.
547 debugTrace(DEBUG_sched,
548 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549 // no, the current native thread is bound to a different
550 // Haskell thread, so pass it to any worker thread
551 pushOnRunQueue(cap,t);
558 cap->r.rCurrentTSO = t;
560 /* context switches are initiated by the timer signal, unless
561 * the user specified "context switch as often as possible", with
564 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565 && !emptyThreadQueues(cap)) {
571 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
572 (long)t->id, whatNext_strs[t->what_next]);
574 #if defined(PROFILING)
575 startHeapProfTimer();
578 // Check for exceptions blocked on this thread
579 maybePerformBlockedException (cap, t);
581 // ----------------------------------------------------------------------
582 // Run the current thread
584 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
585 ASSERT(t->cap == cap);
587 prev_what_next = t->what_next;
589 errno = t->saved_errno;
590 cap->in_haskell = rtsTrue;
594 recent_activity = ACTIVITY_YES;
596 switch (prev_what_next) {
600 /* Thread already finished, return to scheduler. */
601 ret = ThreadFinished;
607 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
608 cap = regTableToCapability(r);
613 case ThreadInterpret:
614 cap = interpretBCO(cap);
619 barf("schedule: invalid what_next field");
622 cap->in_haskell = rtsFalse;
624 // The TSO might have moved, eg. if it re-entered the RTS and a GC
625 // happened. So find the new location:
626 t = cap->r.rCurrentTSO;
628 // We have run some Haskell code: there might be blackhole-blocked
629 // threads to wake up now.
630 // Lock-free test here should be ok, we're just setting a flag.
631 if ( blackhole_queue != END_TSO_QUEUE ) {
632 blackholes_need_checking = rtsTrue;
635 // And save the current errno in this thread.
636 // XXX: possibly bogus for SMP because this thread might already
637 // be running again, see code below.
638 t->saved_errno = errno;
640 #if defined(THREADED_RTS)
641 // If ret is ThreadBlocked, and this Task is bound to the TSO that
642 // blocked, we are in limbo - the TSO is now owned by whatever it
643 // is blocked on, and may in fact already have been woken up,
644 // perhaps even on a different Capability. It may be the case
645 // that task->cap != cap. We better yield this Capability
646 // immediately and return to normaility.
647 if (ret == ThreadBlocked) {
648 debugTrace(DEBUG_sched,
649 "--<< thread %lu (%s) stopped: blocked",
650 (unsigned long)t->id, whatNext_strs[t->what_next]);
655 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
656 ASSERT(t->cap == cap);
658 // ----------------------------------------------------------------------
660 // Costs for the scheduler are assigned to CCS_SYSTEM
661 #if defined(PROFILING)
666 schedulePostRunThread();
668 ready_to_gc = rtsFalse;
672 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
676 scheduleHandleStackOverflow(cap,task,t);
680 if (scheduleHandleYield(cap, t, prev_what_next)) {
681 // shortcut for switching between compiler/interpreter:
687 scheduleHandleThreadBlocked(t);
691 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
692 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
696 barf("schedule: invalid thread return code %d", (int)ret);
699 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
701 cap = scheduleDoGC(cap,task,rtsFalse);
703 } /* end of while() */
705 debugTrace(PAR_DEBUG_verbose,
706 "== Leaving schedule() after having received Finish");
709 /* ----------------------------------------------------------------------------
710 * Setting up the scheduler loop
711 * ------------------------------------------------------------------------- */
714 schedulePreLoop(void)
717 /* set up first event to get things going */
718 /* ToDo: assign costs for system setup and init MainTSO ! */
719 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
721 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
723 debugTrace (DEBUG_gran,
724 "GRAN: Init CurrentTSO (in schedule) = %p",
726 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
728 if (RtsFlags.GranFlags.Light) {
729 /* Save current time; GranSim Light only */
730 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
735 /* -----------------------------------------------------------------------------
738 * Push work to other Capabilities if we have some.
739 * -------------------------------------------------------------------------- */
741 #if defined(THREADED_RTS)
743 schedulePushWork(Capability *cap USED_IF_THREADS,
744 Task *task USED_IF_THREADS)
746 Capability *free_caps[n_capabilities], *cap0;
749 // migration can be turned off with +RTS -qg
750 if (!RtsFlags.ParFlags.migrate) return;
752 // Check whether we have more threads on our run queue, or sparks
753 // in our pool, that we could hand to another Capability.
754 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
755 && sparkPoolSizeCap(cap) < 2) {
759 // First grab as many free Capabilities as we can.
760 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
761 cap0 = &capabilities[i];
762 if (cap != cap0 && tryGrabCapability(cap0,task)) {
763 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
764 // it already has some work, we just grabbed it at
765 // the wrong moment. Or maybe it's deadlocked!
766 releaseCapability(cap0);
768 free_caps[n_free_caps++] = cap0;
773 // we now have n_free_caps free capabilities stashed in
774 // free_caps[]. Share our run queue equally with them. This is
775 // probably the simplest thing we could do; improvements we might
776 // want to do include:
778 // - giving high priority to moving relatively new threads, on
779 // the gournds that they haven't had time to build up a
780 // working set in the cache on this CPU/Capability.
782 // - giving low priority to moving long-lived threads
784 if (n_free_caps > 0) {
785 StgTSO *prev, *t, *next;
786 rtsBool pushed_to_all;
788 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
791 pushed_to_all = rtsFalse;
793 if (cap->run_queue_hd != END_TSO_QUEUE) {
794 prev = cap->run_queue_hd;
796 prev->link = END_TSO_QUEUE;
797 for (; t != END_TSO_QUEUE; t = next) {
799 t->link = END_TSO_QUEUE;
800 if (t->what_next == ThreadRelocated
801 || t->bound == task // don't move my bound thread
802 || tsoLocked(t)) { // don't move a locked thread
805 } else if (i == n_free_caps) {
806 pushed_to_all = rtsTrue;
812 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
813 appendToRunQueue(free_caps[i],t);
814 if (t->bound) { t->bound->cap = free_caps[i]; }
815 t->cap = free_caps[i];
819 cap->run_queue_tl = prev;
822 // If there are some free capabilities that we didn't push any
823 // threads to, then try to push a spark to each one.
824 if (!pushed_to_all) {
826 // i is the next free capability to push to
827 for (; i < n_free_caps; i++) {
828 if (emptySparkPoolCap(free_caps[i])) {
829 spark = findSpark(cap);
831 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
832 newSpark(&(free_caps[i]->r), spark);
838 // release the capabilities
839 for (i = 0; i < n_free_caps; i++) {
840 task->cap = free_caps[i];
841 releaseCapability(free_caps[i]);
844 task->cap = cap; // reset to point to our Capability.
848 /* ----------------------------------------------------------------------------
849 * Start any pending signal handlers
850 * ------------------------------------------------------------------------- */
852 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
854 scheduleStartSignalHandlers(Capability *cap)
856 if (signals_pending()) { // safe outside the lock
857 startSignalHandlers(cap);
862 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
867 /* ----------------------------------------------------------------------------
868 * Check for blocked threads that can be woken up.
869 * ------------------------------------------------------------------------- */
872 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
874 #if !defined(THREADED_RTS)
876 // Check whether any waiting threads need to be woken up. If the
877 // run queue is empty, and there are no other tasks running, we
878 // can wait indefinitely for something to happen.
880 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
882 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
888 /* ----------------------------------------------------------------------------
889 * Check for threads woken up by other Capabilities
890 * ------------------------------------------------------------------------- */
893 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
895 #if defined(THREADED_RTS)
896 // Any threads that were woken up by other Capabilities get
897 // appended to our run queue.
898 if (!emptyWakeupQueue(cap)) {
899 ACQUIRE_LOCK(&cap->lock);
900 if (emptyRunQueue(cap)) {
901 cap->run_queue_hd = cap->wakeup_queue_hd;
902 cap->run_queue_tl = cap->wakeup_queue_tl;
904 cap->run_queue_tl->link = cap->wakeup_queue_hd;
905 cap->run_queue_tl = cap->wakeup_queue_tl;
907 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
908 RELEASE_LOCK(&cap->lock);
913 /* ----------------------------------------------------------------------------
914 * Check for threads blocked on BLACKHOLEs that can be woken up
915 * ------------------------------------------------------------------------- */
917 scheduleCheckBlackHoles (Capability *cap)
919 if ( blackholes_need_checking ) // check without the lock first
921 ACQUIRE_LOCK(&sched_mutex);
922 if ( blackholes_need_checking ) {
923 checkBlackHoles(cap);
924 blackholes_need_checking = rtsFalse;
926 RELEASE_LOCK(&sched_mutex);
930 /* ----------------------------------------------------------------------------
931 * Detect deadlock conditions and attempt to resolve them.
932 * ------------------------------------------------------------------------- */
935 scheduleDetectDeadlock (Capability *cap, Task *task)
938 #if defined(PARALLEL_HASKELL)
939 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
944 * Detect deadlock: when we have no threads to run, there are no
945 * threads blocked, waiting for I/O, or sleeping, and all the
946 * other tasks are waiting for work, we must have a deadlock of
949 if ( emptyThreadQueues(cap) )
951 #if defined(THREADED_RTS)
953 * In the threaded RTS, we only check for deadlock if there
954 * has been no activity in a complete timeslice. This means
955 * we won't eagerly start a full GC just because we don't have
956 * any threads to run currently.
958 if (recent_activity != ACTIVITY_INACTIVE) return;
961 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
963 // Garbage collection can release some new threads due to
964 // either (a) finalizers or (b) threads resurrected because
965 // they are unreachable and will therefore be sent an
966 // exception. Any threads thus released will be immediately
968 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
970 recent_activity = ACTIVITY_DONE_GC;
972 if ( !emptyRunQueue(cap) ) return;
974 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
975 /* If we have user-installed signal handlers, then wait
976 * for signals to arrive rather then bombing out with a
979 if ( anyUserHandlers() ) {
980 debugTrace(DEBUG_sched,
981 "still deadlocked, waiting for signals...");
985 if (signals_pending()) {
986 startSignalHandlers(cap);
989 // either we have threads to run, or we were interrupted:
990 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
994 #if !defined(THREADED_RTS)
995 /* Probably a real deadlock. Send the current main thread the
996 * Deadlock exception.
999 switch (task->tso->why_blocked) {
1001 case BlockedOnBlackHole:
1002 case BlockedOnException:
1004 throwToSingleThreaded(cap, task->tso,
1005 (StgClosure *)NonTermination_closure);
1008 barf("deadlock: main thread blocked in a strange way");
1016 /* ----------------------------------------------------------------------------
1017 * Process an event (GRAN only)
1018 * ------------------------------------------------------------------------- */
1022 scheduleProcessEvent(rtsEvent *event)
1026 if (RtsFlags.GranFlags.Light)
1027 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1029 /* adjust time based on time-stamp */
1030 if (event->time > CurrentTime[CurrentProc] &&
1031 event->evttype != ContinueThread)
1032 CurrentTime[CurrentProc] = event->time;
1034 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1035 if (!RtsFlags.GranFlags.Light)
1038 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1040 /* main event dispatcher in GranSim */
1041 switch (event->evttype) {
1042 /* Should just be continuing execution */
1043 case ContinueThread:
1044 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1045 /* ToDo: check assertion
1046 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1047 run_queue_hd != END_TSO_QUEUE);
1049 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1050 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1051 procStatus[CurrentProc]==Fetching) {
1052 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1053 CurrentTSO->id, CurrentTSO, CurrentProc);
1056 /* Ignore ContinueThreads for completed threads */
1057 if (CurrentTSO->what_next == ThreadComplete) {
1058 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1059 CurrentTSO->id, CurrentTSO, CurrentProc);
1062 /* Ignore ContinueThreads for threads that are being migrated */
1063 if (PROCS(CurrentTSO)==Nowhere) {
1064 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1065 CurrentTSO->id, CurrentTSO, CurrentProc);
1068 /* The thread should be at the beginning of the run queue */
1069 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1070 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1071 CurrentTSO->id, CurrentTSO, CurrentProc);
1072 break; // run the thread anyway
1075 new_event(proc, proc, CurrentTime[proc],
1077 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1079 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1080 break; // now actually run the thread; DaH Qu'vam yImuHbej
1083 do_the_fetchnode(event);
1084 goto next_thread; /* handle next event in event queue */
1087 do_the_globalblock(event);
1088 goto next_thread; /* handle next event in event queue */
1091 do_the_fetchreply(event);
1092 goto next_thread; /* handle next event in event queue */
1094 case UnblockThread: /* Move from the blocked queue to the tail of */
1095 do_the_unblock(event);
1096 goto next_thread; /* handle next event in event queue */
1098 case ResumeThread: /* Move from the blocked queue to the tail of */
1099 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1100 event->tso->gran.blocktime +=
1101 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1102 do_the_startthread(event);
1103 goto next_thread; /* handle next event in event queue */
1106 do_the_startthread(event);
1107 goto next_thread; /* handle next event in event queue */
1110 do_the_movethread(event);
1111 goto next_thread; /* handle next event in event queue */
1114 do_the_movespark(event);
1115 goto next_thread; /* handle next event in event queue */
1118 do_the_findwork(event);
1119 goto next_thread; /* handle next event in event queue */
1122 barf("Illegal event type %u\n", event->evttype);
1125 /* This point was scheduler_loop in the old RTS */
1127 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1129 TimeOfLastEvent = CurrentTime[CurrentProc];
1130 TimeOfNextEvent = get_time_of_next_event();
1131 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1132 // CurrentTSO = ThreadQueueHd;
1134 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1137 if (RtsFlags.GranFlags.Light)
1138 GranSimLight_leave_system(event, &ActiveTSO);
1140 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1143 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1145 /* in a GranSim setup the TSO stays on the run queue */
1147 /* Take a thread from the run queue. */
1148 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1151 debugBelch("GRAN: About to run current thread, which is\n");
1154 context_switch = 0; // turned on via GranYield, checking events and time slice
1157 DumpGranEvent(GR_SCHEDULE, t));
1159 procStatus[CurrentProc] = Busy;
1163 /* ----------------------------------------------------------------------------
1164 * Send pending messages (PARALLEL_HASKELL only)
1165 * ------------------------------------------------------------------------- */
1167 #if defined(PARALLEL_HASKELL)
1169 scheduleSendPendingMessages(void)
1175 # if defined(PAR) // global Mem.Mgmt., omit for now
1176 if (PendingFetches != END_BF_QUEUE) {
1181 if (RtsFlags.ParFlags.BufferTime) {
1182 // if we use message buffering, we must send away all message
1183 // packets which have become too old...
1189 /* ----------------------------------------------------------------------------
1190 * Activate spark threads (PARALLEL_HASKELL only)
1191 * ------------------------------------------------------------------------- */
1193 #if defined(PARALLEL_HASKELL)
1195 scheduleActivateSpark(void)
1198 ASSERT(emptyRunQueue());
1199 /* We get here if the run queue is empty and want some work.
1200 We try to turn a spark into a thread, and add it to the run queue,
1201 from where it will be picked up in the next iteration of the scheduler
1205 /* :-[ no local threads => look out for local sparks */
1206 /* the spark pool for the current PE */
1207 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1208 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1209 pool->hd < pool->tl) {
1211 * ToDo: add GC code check that we really have enough heap afterwards!!
1213 * If we're here (no runnable threads) and we have pending
1214 * sparks, we must have a space problem. Get enough space
1215 * to turn one of those pending sparks into a
1219 spark = findSpark(rtsFalse); /* get a spark */
1220 if (spark != (rtsSpark) NULL) {
1221 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1222 IF_PAR_DEBUG(fish, // schedule,
1223 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1224 tso->id, tso, advisory_thread_count));
1226 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1227 IF_PAR_DEBUG(fish, // schedule,
1228 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1230 return rtsFalse; /* failed to generate a thread */
1231 } /* otherwise fall through & pick-up new tso */
1233 IF_PAR_DEBUG(fish, // schedule,
1234 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1235 spark_queue_len(pool)));
1236 return rtsFalse; /* failed to generate a thread */
1238 return rtsTrue; /* success in generating a thread */
1239 } else { /* no more threads permitted or pool empty */
1240 return rtsFalse; /* failed to generateThread */
1243 tso = NULL; // avoid compiler warning only
1244 return rtsFalse; /* dummy in non-PAR setup */
1247 #endif // PARALLEL_HASKELL
1249 /* ----------------------------------------------------------------------------
1250 * Get work from a remote node (PARALLEL_HASKELL only)
1251 * ------------------------------------------------------------------------- */
1253 #if defined(PARALLEL_HASKELL)
1255 scheduleGetRemoteWork(rtsBool *receivedFinish)
1257 ASSERT(emptyRunQueue());
1259 if (RtsFlags.ParFlags.BufferTime) {
1260 IF_PAR_DEBUG(verbose,
1261 debugBelch("...send all pending data,"));
1264 for (i=1; i<=nPEs; i++)
1265 sendImmediately(i); // send all messages away immediately
1269 //++EDEN++ idle() , i.e. send all buffers, wait for work
1270 // suppress fishing in EDEN... just look for incoming messages
1271 // (blocking receive)
1272 IF_PAR_DEBUG(verbose,
1273 debugBelch("...wait for incoming messages...\n"));
1274 *receivedFinish = processMessages(); // blocking receive...
1276 // and reenter scheduling loop after having received something
1277 // (return rtsFalse below)
1279 # else /* activate SPARKS machinery */
1280 /* We get here, if we have no work, tried to activate a local spark, but still
1281 have no work. We try to get a remote spark, by sending a FISH message.
1282 Thread migration should be added here, and triggered when a sequence of
1283 fishes returns without work. */
1284 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1286 /* =8-[ no local sparks => look for work on other PEs */
1288 * We really have absolutely no work. Send out a fish
1289 * (there may be some out there already), and wait for
1290 * something to arrive. We clearly can't run any threads
1291 * until a SCHEDULE or RESUME arrives, and so that's what
1292 * we're hoping to see. (Of course, we still have to
1293 * respond to other types of messages.)
1295 rtsTime now = msTime() /*CURRENT_TIME*/;
1296 IF_PAR_DEBUG(verbose,
1297 debugBelch("-- now=%ld\n", now));
1298 IF_PAR_DEBUG(fish, // verbose,
1299 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1300 (last_fish_arrived_at!=0 &&
1301 last_fish_arrived_at+delay > now)) {
1302 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1303 now, last_fish_arrived_at+delay,
1304 last_fish_arrived_at,
1308 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1309 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1310 if (last_fish_arrived_at==0 ||
1311 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1312 /* outstandingFishes is set in sendFish, processFish;
1313 avoid flooding system with fishes via delay */
1314 next_fish_to_send_at = 0;
1316 /* ToDo: this should be done in the main scheduling loop to avoid the
1317 busy wait here; not so bad if fish delay is very small */
1318 int iq = 0; // DEBUGGING -- HWL
1319 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1320 /* send a fish when ready, but process messages that arrive in the meantime */
1322 if (PacketsWaiting()) {
1324 *receivedFinish = processMessages();
1327 } while (!*receivedFinish || now<next_fish_to_send_at);
1328 // JB: This means the fish could become obsolete, if we receive
1329 // work. Better check for work again?
1330 // last line: while (!receivedFinish || !haveWork || now<...)
1331 // next line: if (receivedFinish || haveWork )
1333 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1334 return rtsFalse; // NB: this will leave scheduler loop
1335 // immediately after return!
1337 IF_PAR_DEBUG(fish, // verbose,
1338 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1342 // JB: IMHO, this should all be hidden inside sendFish(...)
1344 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1347 // Global statistics: count no. of fishes
1348 if (RtsFlags.ParFlags.ParStats.Global &&
1349 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1350 globalParStats.tot_fish_mess++;
1354 /* delayed fishes must have been sent by now! */
1355 next_fish_to_send_at = 0;
1358 *receivedFinish = processMessages();
1359 # endif /* SPARKS */
1362 /* NB: this function always returns rtsFalse, meaning the scheduler
1363 loop continues with the next iteration;
1365 return code means success in finding work; we enter this function
1366 if there is no local work, thus have to send a fish which takes
1367 time until it arrives with work; in the meantime we should process
1368 messages in the main loop;
1371 #endif // PARALLEL_HASKELL
1373 /* ----------------------------------------------------------------------------
1374 * PAR/GRAN: Report stats & debugging info(?)
1375 * ------------------------------------------------------------------------- */
1377 #if defined(PAR) || defined(GRAN)
1379 scheduleGranParReport(void)
1381 ASSERT(run_queue_hd != END_TSO_QUEUE);
1383 /* Take a thread from the run queue, if we have work */
1384 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1386 /* If this TSO has got its outport closed in the meantime,
1387 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1388 * It has to be marked as TH_DEAD for this purpose.
1389 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1391 JB: TODO: investigate wether state change field could be nuked
1392 entirely and replaced by the normal tso state (whatnext
1393 field). All we want to do is to kill tsos from outside.
1396 /* ToDo: write something to the log-file
1397 if (RTSflags.ParFlags.granSimStats && !sameThread)
1398 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1402 /* the spark pool for the current PE */
1403 pool = &(cap.r.rSparks); // cap = (old) MainCap
1406 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1407 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1410 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1411 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413 if (RtsFlags.ParFlags.ParStats.Full &&
1414 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1415 (emitSchedule || // forced emit
1416 (t && LastTSO && t->id != LastTSO->id))) {
1418 we are running a different TSO, so write a schedule event to log file
1419 NB: If we use fair scheduling we also have to write a deschedule
1420 event for LastTSO; with unfair scheduling we know that the
1421 previous tso has blocked whenever we switch to another tso, so
1422 we don't need it in GUM for now
1424 IF_PAR_DEBUG(fish, // schedule,
1425 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1427 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1428 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1429 emitSchedule = rtsFalse;
1434 /* ----------------------------------------------------------------------------
1435 * After running a thread...
1436 * ------------------------------------------------------------------------- */
1439 schedulePostRunThread(void)
1442 /* HACK 675: if the last thread didn't yield, make sure to print a
1443 SCHEDULE event to the log file when StgRunning the next thread, even
1444 if it is the same one as before */
1446 TimeOfLastYield = CURRENT_TIME;
1449 /* some statistics gathering in the parallel case */
1451 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1455 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1456 globalGranStats.tot_heapover++;
1458 globalParStats.tot_heapover++;
1465 DumpGranEvent(GR_DESCHEDULE, t));
1466 globalGranStats.tot_stackover++;
1469 // DumpGranEvent(GR_DESCHEDULE, t);
1470 globalParStats.tot_stackover++;
1474 case ThreadYielding:
1477 DumpGranEvent(GR_DESCHEDULE, t));
1478 globalGranStats.tot_yields++;
1481 // DumpGranEvent(GR_DESCHEDULE, t);
1482 globalParStats.tot_yields++;
1488 debugTrace(DEBUG_sched,
1489 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1490 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1491 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1492 if (t->block_info.closure!=(StgClosure*)NULL)
1493 print_bq(t->block_info.closure);
1496 // ??? needed; should emit block before
1498 DumpGranEvent(GR_DESCHEDULE, t));
1499 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1502 ASSERT(procStatus[CurrentProc]==Busy ||
1503 ((procStatus[CurrentProc]==Fetching) &&
1504 (t->block_info.closure!=(StgClosure*)NULL)));
1505 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1506 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1507 procStatus[CurrentProc]==Fetching))
1508 procStatus[CurrentProc] = Idle;
1511 //++PAR++ blockThread() writes the event (change?)
1515 case ThreadFinished:
1519 barf("parGlobalStats: unknown return code");
1525 /* -----------------------------------------------------------------------------
1526 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1527 * -------------------------------------------------------------------------- */
1530 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1532 // did the task ask for a large block?
1533 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1534 // if so, get one and push it on the front of the nursery.
1538 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1540 debugTrace(DEBUG_sched,
1541 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1542 (long)t->id, whatNext_strs[t->what_next], blocks);
1544 // don't do this if the nursery is (nearly) full, we'll GC first.
1545 if (cap->r.rCurrentNursery->link != NULL ||
1546 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1547 // if the nursery has only one block.
1550 bd = allocGroup( blocks );
1552 cap->r.rNursery->n_blocks += blocks;
1554 // link the new group into the list
1555 bd->link = cap->r.rCurrentNursery;
1556 bd->u.back = cap->r.rCurrentNursery->u.back;
1557 if (cap->r.rCurrentNursery->u.back != NULL) {
1558 cap->r.rCurrentNursery->u.back->link = bd;
1560 #if !defined(THREADED_RTS)
1561 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1562 g0s0 == cap->r.rNursery);
1564 cap->r.rNursery->blocks = bd;
1566 cap->r.rCurrentNursery->u.back = bd;
1568 // initialise it as a nursery block. We initialise the
1569 // step, gen_no, and flags field of *every* sub-block in
1570 // this large block, because this is easier than making
1571 // sure that we always find the block head of a large
1572 // block whenever we call Bdescr() (eg. evacuate() and
1573 // isAlive() in the GC would both have to do this, at
1577 for (x = bd; x < bd + blocks; x++) {
1578 x->step = cap->r.rNursery;
1584 // This assert can be a killer if the app is doing lots
1585 // of large block allocations.
1586 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1588 // now update the nursery to point to the new block
1589 cap->r.rCurrentNursery = bd;
1591 // we might be unlucky and have another thread get on the
1592 // run queue before us and steal the large block, but in that
1593 // case the thread will just end up requesting another large
1595 pushOnRunQueue(cap,t);
1596 return rtsFalse; /* not actually GC'ing */
1600 debugTrace(DEBUG_sched,
1601 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1602 (long)t->id, whatNext_strs[t->what_next]);
1605 ASSERT(!is_on_queue(t,CurrentProc));
1606 #elif defined(PARALLEL_HASKELL)
1607 /* Currently we emit a DESCHEDULE event before GC in GUM.
1608 ToDo: either add separate event to distinguish SYSTEM time from rest
1609 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1610 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1611 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1612 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1613 emitSchedule = rtsTrue;
1617 pushOnRunQueue(cap,t);
1619 /* actual GC is done at the end of the while loop in schedule() */
1622 /* -----------------------------------------------------------------------------
1623 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1624 * -------------------------------------------------------------------------- */
1627 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1629 debugTrace (DEBUG_sched,
1630 "--<< thread %ld (%s) stopped, StackOverflow",
1631 (long)t->id, whatNext_strs[t->what_next]);
1633 /* just adjust the stack for this thread, then pop it back
1637 /* enlarge the stack */
1638 StgTSO *new_t = threadStackOverflow(cap, t);
1640 /* The TSO attached to this Task may have moved, so update the
1643 if (task->tso == t) {
1646 pushOnRunQueue(cap,new_t);
1650 /* -----------------------------------------------------------------------------
1651 * Handle a thread that returned to the scheduler with ThreadYielding
1652 * -------------------------------------------------------------------------- */
1655 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1657 // Reset the context switch flag. We don't do this just before
1658 // running the thread, because that would mean we would lose ticks
1659 // during GC, which can lead to unfair scheduling (a thread hogs
1660 // the CPU because the tick always arrives during GC). This way
1661 // penalises threads that do a lot of allocation, but that seems
1662 // better than the alternative.
1665 /* put the thread back on the run queue. Then, if we're ready to
1666 * GC, check whether this is the last task to stop. If so, wake
1667 * up the GC thread. getThread will block during a GC until the
1671 if (t->what_next != prev_what_next) {
1672 debugTrace(DEBUG_sched,
1673 "--<< thread %ld (%s) stopped to switch evaluators",
1674 (long)t->id, whatNext_strs[t->what_next]);
1676 debugTrace(DEBUG_sched,
1677 "--<< thread %ld (%s) stopped, yielding",
1678 (long)t->id, whatNext_strs[t->what_next]);
1683 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1685 ASSERT(t->link == END_TSO_QUEUE);
1687 // Shortcut if we're just switching evaluators: don't bother
1688 // doing stack squeezing (which can be expensive), just run the
1690 if (t->what_next != prev_what_next) {
1695 ASSERT(!is_on_queue(t,CurrentProc));
1698 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1699 checkThreadQsSanity(rtsTrue));
1703 addToRunQueue(cap,t);
1706 /* add a ContinueThread event to actually process the thread */
1707 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1709 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1711 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1718 /* -----------------------------------------------------------------------------
1719 * Handle a thread that returned to the scheduler with ThreadBlocked
1720 * -------------------------------------------------------------------------- */
1723 scheduleHandleThreadBlocked( StgTSO *t
1724 #if !defined(GRAN) && !defined(DEBUG)
1731 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1732 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1733 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1735 // ??? needed; should emit block before
1737 DumpGranEvent(GR_DESCHEDULE, t));
1738 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1741 ASSERT(procStatus[CurrentProc]==Busy ||
1742 ((procStatus[CurrentProc]==Fetching) &&
1743 (t->block_info.closure!=(StgClosure*)NULL)));
1744 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1745 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1746 procStatus[CurrentProc]==Fetching))
1747 procStatus[CurrentProc] = Idle;
1751 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1752 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1755 if (t->block_info.closure!=(StgClosure*)NULL)
1756 print_bq(t->block_info.closure));
1758 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1761 /* whatever we schedule next, we must log that schedule */
1762 emitSchedule = rtsTrue;
1766 // We don't need to do anything. The thread is blocked, and it
1767 // has tidied up its stack and placed itself on whatever queue
1768 // it needs to be on.
1770 #if !defined(THREADED_RTS)
1771 ASSERT(t->why_blocked != NotBlocked);
1772 // This might not be true under THREADED_RTS: we don't have
1773 // exclusive access to this TSO, so someone might have
1774 // woken it up by now. This actually happens: try
1775 // conc023 +RTS -N2.
1779 if (traceClass(DEBUG_sched)) {
1780 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1781 (unsigned long)t->id, whatNext_strs[t->what_next]);
1782 printThreadBlockage(t);
1787 /* Only for dumping event to log file
1788 ToDo: do I need this in GranSim, too?
1794 /* -----------------------------------------------------------------------------
1795 * Handle a thread that returned to the scheduler with ThreadFinished
1796 * -------------------------------------------------------------------------- */
1799 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1801 /* Need to check whether this was a main thread, and if so,
1802 * return with the return value.
1804 * We also end up here if the thread kills itself with an
1805 * uncaught exception, see Exception.cmm.
1807 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1808 (unsigned long)t->id, whatNext_strs[t->what_next]);
1811 endThread(t, CurrentProc); // clean-up the thread
1812 #elif defined(PARALLEL_HASKELL)
1813 /* For now all are advisory -- HWL */
1814 //if(t->priority==AdvisoryPriority) ??
1815 advisory_thread_count--; // JB: Caution with this counter, buggy!
1818 if(t->dist.priority==RevalPriority)
1822 # if defined(EDENOLD)
1823 // the thread could still have an outport... (BUG)
1824 if (t->eden.outport != -1) {
1825 // delete the outport for the tso which has finished...
1826 IF_PAR_DEBUG(eden_ports,
1827 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1828 t->eden.outport, t->id));
1831 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1832 if (t->eden.epid != -1) {
1833 IF_PAR_DEBUG(eden_ports,
1834 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1835 t->id, t->eden.epid));
1836 removeTSOfromProcess(t);
1841 if (RtsFlags.ParFlags.ParStats.Full &&
1842 !RtsFlags.ParFlags.ParStats.Suppressed)
1843 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1845 // t->par only contains statistics: left out for now...
1847 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1848 t->id,t,t->par.sparkname));
1850 #endif // PARALLEL_HASKELL
1853 // Check whether the thread that just completed was a bound
1854 // thread, and if so return with the result.
1856 // There is an assumption here that all thread completion goes
1857 // through this point; we need to make sure that if a thread
1858 // ends up in the ThreadKilled state, that it stays on the run
1859 // queue so it can be dealt with here.
1864 if (t->bound != task) {
1865 #if !defined(THREADED_RTS)
1866 // Must be a bound thread that is not the topmost one. Leave
1867 // it on the run queue until the stack has unwound to the
1868 // point where we can deal with this. Leaving it on the run
1869 // queue also ensures that the garbage collector knows about
1870 // this thread and its return value (it gets dropped from the
1871 // all_threads list so there's no other way to find it).
1872 appendToRunQueue(cap,t);
1875 // this cannot happen in the threaded RTS, because a
1876 // bound thread can only be run by the appropriate Task.
1877 barf("finished bound thread that isn't mine");
1881 ASSERT(task->tso == t);
1883 if (t->what_next == ThreadComplete) {
1885 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1886 *(task->ret) = (StgClosure *)task->tso->sp[1];
1888 task->stat = Success;
1891 *(task->ret) = NULL;
1893 if (sched_state >= SCHED_INTERRUPTING) {
1894 task->stat = Interrupted;
1896 task->stat = Killed;
1900 removeThreadLabel((StgWord)task->tso->id);
1902 return rtsTrue; // tells schedule() to return
1908 /* -----------------------------------------------------------------------------
1909 * Perform a heap census, if PROFILING
1910 * -------------------------------------------------------------------------- */
1913 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1915 #if defined(PROFILING)
1916 // When we have +RTS -i0 and we're heap profiling, do a census at
1917 // every GC. This lets us get repeatable runs for debugging.
1918 if (performHeapProfile ||
1919 (RtsFlags.ProfFlags.profileInterval==0 &&
1920 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1922 // checking black holes is necessary before GC, otherwise
1923 // there may be threads that are unreachable except by the
1924 // blackhole queue, which the GC will consider to be
1926 scheduleCheckBlackHoles(&MainCapability);
1928 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1929 GarbageCollect(rtsTrue);
1931 debugTrace(DEBUG_sched, "performing heap census");
1934 performHeapProfile = rtsFalse;
1935 return rtsTrue; // true <=> we already GC'd
1941 /* -----------------------------------------------------------------------------
1942 * Perform a garbage collection if necessary
1943 * -------------------------------------------------------------------------- */
1946 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1950 static volatile StgWord waiting_for_gc;
1951 rtsBool was_waiting;
1956 // In order to GC, there must be no threads running Haskell code.
1957 // Therefore, the GC thread needs to hold *all* the capabilities,
1958 // and release them after the GC has completed.
1960 // This seems to be the simplest way: previous attempts involved
1961 // making all the threads with capabilities give up their
1962 // capabilities and sleep except for the *last* one, which
1963 // actually did the GC. But it's quite hard to arrange for all
1964 // the other tasks to sleep and stay asleep.
1967 was_waiting = cas(&waiting_for_gc, 0, 1);
1970 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1971 if (cap) yieldCapability(&cap,task);
1972 } while (waiting_for_gc);
1973 return cap; // NOTE: task->cap might have changed here
1976 for (i=0; i < n_capabilities; i++) {
1977 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1978 if (cap != &capabilities[i]) {
1979 Capability *pcap = &capabilities[i];
1980 // we better hope this task doesn't get migrated to
1981 // another Capability while we're waiting for this one.
1982 // It won't, because load balancing happens while we have
1983 // all the Capabilities, but even so it's a slightly
1984 // unsavoury invariant.
1987 waitForReturnCapability(&pcap, task);
1988 if (pcap != &capabilities[i]) {
1989 barf("scheduleDoGC: got the wrong capability");
1994 waiting_for_gc = rtsFalse;
1997 /* Kick any transactions which are invalid back to their
1998 * atomically frames. When next scheduled they will try to
1999 * commit, this commit will fail and they will retry.
2004 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2005 if (t->what_next == ThreadRelocated) {
2008 next = t->global_link;
2010 // This is a good place to check for blocked
2011 // exceptions. It might be the case that a thread is
2012 // blocked on delivering an exception to a thread that
2013 // is also blocked - we try to ensure that this
2014 // doesn't happen in throwTo(), but it's too hard (or
2015 // impossible) to close all the race holes, so we
2016 // accept that some might get through and deal with
2017 // them here. A GC will always happen at some point,
2018 // even if the system is otherwise deadlocked.
2019 maybePerformBlockedException (&capabilities[0], t);
2021 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2022 if (!stmValidateNestOfTransactions (t -> trec)) {
2023 debugTrace(DEBUG_sched | DEBUG_stm,
2024 "trec %p found wasting its time", t);
2026 // strip the stack back to the
2027 // ATOMICALLY_FRAME, aborting the (nested)
2028 // transaction, and saving the stack of any
2029 // partially-evaluated thunks on the heap.
2030 throwToSingleThreaded_(&capabilities[0], t,
2031 NULL, rtsTrue, NULL);
2034 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2042 // so this happens periodically:
2043 if (cap) scheduleCheckBlackHoles(cap);
2045 IF_DEBUG(scheduler, printAllThreads());
2048 * We now have all the capabilities; if we're in an interrupting
2049 * state, then we should take the opportunity to delete all the
2050 * threads in the system.
2052 if (sched_state >= SCHED_INTERRUPTING) {
2053 deleteAllThreads(&capabilities[0]);
2054 sched_state = SCHED_SHUTTING_DOWN;
2057 /* everybody back, start the GC.
2058 * Could do it in this thread, or signal a condition var
2059 * to do it in another thread. Either way, we need to
2060 * broadcast on gc_pending_cond afterward.
2062 #if defined(THREADED_RTS)
2063 debugTrace(DEBUG_sched, "doing GC");
2065 GarbageCollect(force_major);
2067 #if defined(THREADED_RTS)
2068 // release our stash of capabilities.
2069 for (i = 0; i < n_capabilities; i++) {
2070 if (cap != &capabilities[i]) {
2071 task->cap = &capabilities[i];
2072 releaseCapability(&capabilities[i]);
2083 /* add a ContinueThread event to continue execution of current thread */
2084 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2086 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2088 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2096 /* ---------------------------------------------------------------------------
2097 * Singleton fork(). Do not copy any running threads.
2098 * ------------------------------------------------------------------------- */
2101 forkProcess(HsStablePtr *entry
2102 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2107 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2113 #if defined(THREADED_RTS)
2114 if (RtsFlags.ParFlags.nNodes > 1) {
2115 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2116 stg_exit(EXIT_FAILURE);
2120 debugTrace(DEBUG_sched, "forking!");
2122 // ToDo: for SMP, we should probably acquire *all* the capabilities
2127 if (pid) { // parent
2129 // just return the pid
2135 // Now, all OS threads except the thread that forked are
2136 // stopped. We need to stop all Haskell threads, including
2137 // those involved in foreign calls. Also we need to delete
2138 // all Tasks, because they correspond to OS threads that are
2141 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2142 if (t->what_next == ThreadRelocated) {
2145 next = t->global_link;
2146 // don't allow threads to catch the ThreadKilled
2147 // exception, but we do want to raiseAsync() because these
2148 // threads may be evaluating thunks that we need later.
2149 deleteThread_(cap,t);
2153 // Empty the run queue. It seems tempting to let all the
2154 // killed threads stay on the run queue as zombies to be
2155 // cleaned up later, but some of them correspond to bound
2156 // threads for which the corresponding Task does not exist.
2157 cap->run_queue_hd = END_TSO_QUEUE;
2158 cap->run_queue_tl = END_TSO_QUEUE;
2160 // Any suspended C-calling Tasks are no more, their OS threads
2162 cap->suspended_ccalling_tasks = NULL;
2164 // Empty the all_threads list. Otherwise, the garbage
2165 // collector may attempt to resurrect some of these threads.
2166 all_threads = END_TSO_QUEUE;
2168 // Wipe the task list, except the current Task.
2169 ACQUIRE_LOCK(&sched_mutex);
2170 for (task = all_tasks; task != NULL; task=task->all_link) {
2171 if (task != cap->running_task) {
2175 RELEASE_LOCK(&sched_mutex);
2177 #if defined(THREADED_RTS)
2178 // Wipe our spare workers list, they no longer exist. New
2179 // workers will be created if necessary.
2180 cap->spare_workers = NULL;
2181 cap->returning_tasks_hd = NULL;
2182 cap->returning_tasks_tl = NULL;
2185 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2186 rts_checkSchedStatus("forkProcess",cap);
2189 hs_exit(); // clean up and exit
2190 stg_exit(EXIT_SUCCESS);
2192 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2193 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2198 /* ---------------------------------------------------------------------------
2199 * Delete all the threads in the system
2200 * ------------------------------------------------------------------------- */
2203 deleteAllThreads ( Capability *cap )
2205 // NOTE: only safe to call if we own all capabilities.
2208 debugTrace(DEBUG_sched,"deleting all threads");
2209 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2210 if (t->what_next == ThreadRelocated) {
2213 next = t->global_link;
2214 deleteThread(cap,t);
2218 // The run queue now contains a bunch of ThreadKilled threads. We
2219 // must not throw these away: the main thread(s) will be in there
2220 // somewhere, and the main scheduler loop has to deal with it.
2221 // Also, the run queue is the only thing keeping these threads from
2222 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2224 #if !defined(THREADED_RTS)
2225 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2226 ASSERT(sleeping_queue == END_TSO_QUEUE);
2230 /* -----------------------------------------------------------------------------
2231 Managing the suspended_ccalling_tasks list.
2232 Locks required: sched_mutex
2233 -------------------------------------------------------------------------- */
2236 suspendTask (Capability *cap, Task *task)
2238 ASSERT(task->next == NULL && task->prev == NULL);
2239 task->next = cap->suspended_ccalling_tasks;
2241 if (cap->suspended_ccalling_tasks) {
2242 cap->suspended_ccalling_tasks->prev = task;
2244 cap->suspended_ccalling_tasks = task;
2248 recoverSuspendedTask (Capability *cap, Task *task)
2251 task->prev->next = task->next;
2253 ASSERT(cap->suspended_ccalling_tasks == task);
2254 cap->suspended_ccalling_tasks = task->next;
2257 task->next->prev = task->prev;
2259 task->next = task->prev = NULL;
2262 /* ---------------------------------------------------------------------------
2263 * Suspending & resuming Haskell threads.
2265 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2266 * its capability before calling the C function. This allows another
2267 * task to pick up the capability and carry on running Haskell
2268 * threads. It also means that if the C call blocks, it won't lock
2271 * The Haskell thread making the C call is put to sleep for the
2272 * duration of the call, on the susepended_ccalling_threads queue. We
2273 * give out a token to the task, which it can use to resume the thread
2274 * on return from the C function.
2275 * ------------------------------------------------------------------------- */
2278 suspendThread (StgRegTable *reg)
2281 int saved_errno = errno;
2285 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2287 cap = regTableToCapability(reg);
2289 task = cap->running_task;
2290 tso = cap->r.rCurrentTSO;
2292 debugTrace(DEBUG_sched,
2293 "thread %lu did a safe foreign call",
2294 (unsigned long)cap->r.rCurrentTSO->id);
2296 // XXX this might not be necessary --SDM
2297 tso->what_next = ThreadRunGHC;
2299 threadPaused(cap,tso);
2301 if ((tso->flags & TSO_BLOCKEX) == 0) {
2302 tso->why_blocked = BlockedOnCCall;
2303 tso->flags |= TSO_BLOCKEX;
2304 tso->flags &= ~TSO_INTERRUPTIBLE;
2306 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2309 // Hand back capability
2310 task->suspended_tso = tso;
2312 ACQUIRE_LOCK(&cap->lock);
2314 suspendTask(cap,task);
2315 cap->in_haskell = rtsFalse;
2316 releaseCapability_(cap);
2318 RELEASE_LOCK(&cap->lock);
2320 #if defined(THREADED_RTS)
2321 /* Preparing to leave the RTS, so ensure there's a native thread/task
2322 waiting to take over.
2324 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2327 errno = saved_errno;
2332 resumeThread (void *task_)
2336 int saved_errno = errno;
2340 // Wait for permission to re-enter the RTS with the result.
2341 waitForReturnCapability(&cap,task);
2342 // we might be on a different capability now... but if so, our
2343 // entry on the suspended_ccalling_tasks list will also have been
2346 // Remove the thread from the suspended list
2347 recoverSuspendedTask(cap,task);
2349 tso = task->suspended_tso;
2350 task->suspended_tso = NULL;
2351 tso->link = END_TSO_QUEUE;
2352 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2354 if (tso->why_blocked == BlockedOnCCall) {
2355 awakenBlockedExceptionQueue(cap,tso);
2356 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2359 /* Reset blocking status */
2360 tso->why_blocked = NotBlocked;
2362 cap->r.rCurrentTSO = tso;
2363 cap->in_haskell = rtsTrue;
2364 errno = saved_errno;
2366 /* We might have GC'd, mark the TSO dirty again */
2369 IF_DEBUG(sanity, checkTSO(tso));
2374 /* ---------------------------------------------------------------------------
2377 * scheduleThread puts a thread on the end of the runnable queue.
2378 * This will usually be done immediately after a thread is created.
2379 * The caller of scheduleThread must create the thread using e.g.
2380 * createThread and push an appropriate closure
2381 * on this thread's stack before the scheduler is invoked.
2382 * ------------------------------------------------------------------------ */
2385 scheduleThread(Capability *cap, StgTSO *tso)
2387 // The thread goes at the *end* of the run-queue, to avoid possible
2388 // starvation of any threads already on the queue.
2389 appendToRunQueue(cap,tso);
2393 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2395 #if defined(THREADED_RTS)
2396 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2397 // move this thread from now on.
2398 cpu %= RtsFlags.ParFlags.nNodes;
2399 if (cpu == cap->no) {
2400 appendToRunQueue(cap,tso);
2402 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2405 appendToRunQueue(cap,tso);
2410 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2414 // We already created/initialised the Task
2415 task = cap->running_task;
2417 // This TSO is now a bound thread; make the Task and TSO
2418 // point to each other.
2424 task->stat = NoStatus;
2426 appendToRunQueue(cap,tso);
2428 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2431 /* GranSim specific init */
2432 CurrentTSO = m->tso; // the TSO to run
2433 procStatus[MainProc] = Busy; // status of main PE
2434 CurrentProc = MainProc; // PE to run it on
2437 cap = schedule(cap,task);
2439 ASSERT(task->stat != NoStatus);
2440 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2442 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2446 /* ----------------------------------------------------------------------------
2448 * ------------------------------------------------------------------------- */
2450 #if defined(THREADED_RTS)
2452 workerStart(Task *task)
2456 // See startWorkerTask().
2457 ACQUIRE_LOCK(&task->lock);
2459 RELEASE_LOCK(&task->lock);
2461 // set the thread-local pointer to the Task:
2464 // schedule() runs without a lock.
2465 cap = schedule(cap,task);
2467 // On exit from schedule(), we have a Capability.
2468 releaseCapability(cap);
2469 workerTaskStop(task);
2473 /* ---------------------------------------------------------------------------
2476 * Initialise the scheduler. This resets all the queues - if the
2477 * queues contained any threads, they'll be garbage collected at the
2480 * ------------------------------------------------------------------------ */
2487 for (i=0; i<=MAX_PROC; i++) {
2488 run_queue_hds[i] = END_TSO_QUEUE;
2489 run_queue_tls[i] = END_TSO_QUEUE;
2490 blocked_queue_hds[i] = END_TSO_QUEUE;
2491 blocked_queue_tls[i] = END_TSO_QUEUE;
2492 ccalling_threadss[i] = END_TSO_QUEUE;
2493 blackhole_queue[i] = END_TSO_QUEUE;
2494 sleeping_queue = END_TSO_QUEUE;
2496 #elif !defined(THREADED_RTS)
2497 blocked_queue_hd = END_TSO_QUEUE;
2498 blocked_queue_tl = END_TSO_QUEUE;
2499 sleeping_queue = END_TSO_QUEUE;
2502 blackhole_queue = END_TSO_QUEUE;
2503 all_threads = END_TSO_QUEUE;
2506 sched_state = SCHED_RUNNING;
2508 #if defined(THREADED_RTS)
2509 /* Initialise the mutex and condition variables used by
2511 initMutex(&sched_mutex);
2514 ACQUIRE_LOCK(&sched_mutex);
2516 /* A capability holds the state a native thread needs in
2517 * order to execute STG code. At least one capability is
2518 * floating around (only THREADED_RTS builds have more than one).
2524 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2528 #if defined(THREADED_RTS)
2530 * Eagerly start one worker to run each Capability, except for
2531 * Capability 0. The idea is that we're probably going to start a
2532 * bound thread on Capability 0 pretty soon, so we don't want a
2533 * worker task hogging it.
2538 for (i = 1; i < n_capabilities; i++) {
2539 cap = &capabilities[i];
2540 ACQUIRE_LOCK(&cap->lock);
2541 startWorkerTask(cap, workerStart);
2542 RELEASE_LOCK(&cap->lock);
2547 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2549 RELEASE_LOCK(&sched_mutex);
2553 exitScheduler( void )
2557 #if defined(THREADED_RTS)
2558 ACQUIRE_LOCK(&sched_mutex);
2559 task = newBoundTask();
2560 RELEASE_LOCK(&sched_mutex);
2563 // If we haven't killed all the threads yet, do it now.
2564 if (sched_state < SCHED_SHUTTING_DOWN) {
2565 sched_state = SCHED_INTERRUPTING;
2566 scheduleDoGC(NULL,task,rtsFalse);
2568 sched_state = SCHED_SHUTTING_DOWN;
2570 #if defined(THREADED_RTS)
2574 for (i = 0; i < n_capabilities; i++) {
2575 shutdownCapability(&capabilities[i], task);
2577 boundTaskExiting(task);
2580 closeMutex(&sched_mutex);
2584 /* ---------------------------------------------------------------------------
2585 Where are the roots that we know about?
2587 - all the threads on the runnable queue
2588 - all the threads on the blocked queue
2589 - all the threads on the sleeping queue
2590 - all the thread currently executing a _ccall_GC
2591 - all the "main threads"
2593 ------------------------------------------------------------------------ */
2595 /* This has to be protected either by the scheduler monitor, or by the
2596 garbage collection monitor (probably the latter).
2601 GetRoots( evac_fn evac )
2608 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2609 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2610 evac((StgClosure **)&run_queue_hds[i]);
2611 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2612 evac((StgClosure **)&run_queue_tls[i]);
2614 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2615 evac((StgClosure **)&blocked_queue_hds[i]);
2616 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2617 evac((StgClosure **)&blocked_queue_tls[i]);
2618 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2619 evac((StgClosure **)&ccalling_threads[i]);
2626 for (i = 0; i < n_capabilities; i++) {
2627 cap = &capabilities[i];
2628 evac((StgClosure **)(void *)&cap->run_queue_hd);
2629 evac((StgClosure **)(void *)&cap->run_queue_tl);
2630 #if defined(THREADED_RTS)
2631 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2632 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2634 for (task = cap->suspended_ccalling_tasks; task != NULL;
2636 debugTrace(DEBUG_sched,
2637 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2638 evac((StgClosure **)(void *)&task->suspended_tso);
2644 #if !defined(THREADED_RTS)
2645 evac((StgClosure **)(void *)&blocked_queue_hd);
2646 evac((StgClosure **)(void *)&blocked_queue_tl);
2647 evac((StgClosure **)(void *)&sleeping_queue);
2651 // evac((StgClosure **)&blackhole_queue);
2653 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2654 markSparkQueue(evac);
2657 #if defined(RTS_USER_SIGNALS)
2658 // mark the signal handlers (signals should be already blocked)
2659 markSignalHandlers(evac);
2663 /* -----------------------------------------------------------------------------
2666 This is the interface to the garbage collector from Haskell land.
2667 We provide this so that external C code can allocate and garbage
2668 collect when called from Haskell via _ccall_GC.
2669 -------------------------------------------------------------------------- */
2672 performGC_(rtsBool force_major)
2675 // We must grab a new Task here, because the existing Task may be
2676 // associated with a particular Capability, and chained onto the
2677 // suspended_ccalling_tasks queue.
2678 ACQUIRE_LOCK(&sched_mutex);
2679 task = newBoundTask();
2680 RELEASE_LOCK(&sched_mutex);
2681 scheduleDoGC(NULL,task,force_major);
2682 boundTaskExiting(task);
2688 performGC_(rtsFalse);
2692 performMajorGC(void)
2694 performGC_(rtsTrue);
2697 /* -----------------------------------------------------------------------------
2700 If the thread has reached its maximum stack size, then raise the
2701 StackOverflow exception in the offending thread. Otherwise
2702 relocate the TSO into a larger chunk of memory and adjust its stack
2704 -------------------------------------------------------------------------- */
2707 threadStackOverflow(Capability *cap, StgTSO *tso)
2709 nat new_stack_size, stack_words;
2714 IF_DEBUG(sanity,checkTSO(tso));
2716 // don't allow throwTo() to modify the blocked_exceptions queue
2717 // while we are moving the TSO:
2718 lockClosure((StgClosure *)tso);
2720 if (tso->stack_size >= tso->max_stack_size) {
2722 debugTrace(DEBUG_gc,
2723 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2724 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2726 /* If we're debugging, just print out the top of the stack */
2727 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2730 // Send this thread the StackOverflow exception
2732 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2736 /* Try to double the current stack size. If that takes us over the
2737 * maximum stack size for this thread, then use the maximum instead.
2738 * Finally round up so the TSO ends up as a whole number of blocks.
2740 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2741 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2742 TSO_STRUCT_SIZE)/sizeof(W_);
2743 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2744 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2746 debugTrace(DEBUG_sched,
2747 "increasing stack size from %ld words to %d.",
2748 (long)tso->stack_size, new_stack_size);
2750 dest = (StgTSO *)allocate(new_tso_size);
2751 TICK_ALLOC_TSO(new_stack_size,0);
2753 /* copy the TSO block and the old stack into the new area */
2754 memcpy(dest,tso,TSO_STRUCT_SIZE);
2755 stack_words = tso->stack + tso->stack_size - tso->sp;
2756 new_sp = (P_)dest + new_tso_size - stack_words;
2757 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2759 /* relocate the stack pointers... */
2761 dest->stack_size = new_stack_size;
2763 /* Mark the old TSO as relocated. We have to check for relocated
2764 * TSOs in the garbage collector and any primops that deal with TSOs.
2766 * It's important to set the sp value to just beyond the end
2767 * of the stack, so we don't attempt to scavenge any part of the
2770 tso->what_next = ThreadRelocated;
2772 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2773 tso->why_blocked = NotBlocked;
2775 IF_PAR_DEBUG(verbose,
2776 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2777 tso->id, tso, tso->stack_size);
2778 /* If we're debugging, just print out the top of the stack */
2779 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2785 IF_DEBUG(sanity,checkTSO(dest));
2787 IF_DEBUG(scheduler,printTSO(dest));
2793 /* ---------------------------------------------------------------------------
2795 - usually called inside a signal handler so it mustn't do anything fancy.
2796 ------------------------------------------------------------------------ */
2799 interruptStgRts(void)
2801 sched_state = SCHED_INTERRUPTING;
2806 /* -----------------------------------------------------------------------------
2809 This function causes at least one OS thread to wake up and run the
2810 scheduler loop. It is invoked when the RTS might be deadlocked, or
2811 an external event has arrived that may need servicing (eg. a
2812 keyboard interrupt).
2814 In the single-threaded RTS we don't do anything here; we only have
2815 one thread anyway, and the event that caused us to want to wake up
2816 will have interrupted any blocking system call in progress anyway.
2817 -------------------------------------------------------------------------- */
2822 #if defined(THREADED_RTS)
2823 #if !defined(mingw32_HOST_OS)
2824 // This forces the IO Manager thread to wakeup, which will
2825 // in turn ensure that some OS thread wakes up and runs the
2826 // scheduler loop, which will cause a GC and deadlock check.
2829 // On Windows this might be safe enough, because we aren't
2830 // in a signal handler. Later we should use the IO Manager,
2832 prodOneCapability();
2837 /* -----------------------------------------------------------------------------
2840 * Check the blackhole_queue for threads that can be woken up. We do
2841 * this periodically: before every GC, and whenever the run queue is
2844 * An elegant solution might be to just wake up all the blocked
2845 * threads with awakenBlockedQueue occasionally: they'll go back to
2846 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2847 * doesn't give us a way to tell whether we've actually managed to
2848 * wake up any threads, so we would be busy-waiting.
2850 * -------------------------------------------------------------------------- */
2853 checkBlackHoles (Capability *cap)
2856 rtsBool any_woke_up = rtsFalse;
2859 // blackhole_queue is global:
2860 ASSERT_LOCK_HELD(&sched_mutex);
2862 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2864 // ASSUMES: sched_mutex
2865 prev = &blackhole_queue;
2866 t = blackhole_queue;
2867 while (t != END_TSO_QUEUE) {
2868 ASSERT(t->why_blocked == BlockedOnBlackHole);
2869 type = get_itbl(t->block_info.closure)->type;
2870 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2871 IF_DEBUG(sanity,checkTSO(t));
2872 t = unblockOne(cap, t);
2873 // urk, the threads migrate to the current capability
2874 // here, but we'd like to keep them on the original one.
2876 any_woke_up = rtsTrue;
2886 /* -----------------------------------------------------------------------------
2889 This is used for interruption (^C) and forking, and corresponds to
2890 raising an exception but without letting the thread catch the
2892 -------------------------------------------------------------------------- */
2895 deleteThread (Capability *cap, StgTSO *tso)
2897 // NOTE: must only be called on a TSO that we have exclusive
2898 // access to, because we will call throwToSingleThreaded() below.
2899 // The TSO must be on the run queue of the Capability we own, or
2900 // we must own all Capabilities.
2902 if (tso->why_blocked != BlockedOnCCall &&
2903 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2904 throwToSingleThreaded(cap,tso,NULL);
2908 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2910 deleteThread_(Capability *cap, StgTSO *tso)
2911 { // for forkProcess only:
2912 // like deleteThread(), but we delete threads in foreign calls, too.
2914 if (tso->why_blocked == BlockedOnCCall ||
2915 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2916 unblockOne(cap,tso);
2917 tso->what_next = ThreadKilled;
2919 deleteThread(cap,tso);
2924 /* -----------------------------------------------------------------------------
2925 raiseExceptionHelper
2927 This function is called by the raise# primitve, just so that we can
2928 move some of the tricky bits of raising an exception from C-- into
2929 C. Who knows, it might be a useful re-useable thing here too.
2930 -------------------------------------------------------------------------- */
2933 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2935 Capability *cap = regTableToCapability(reg);
2936 StgThunk *raise_closure = NULL;
2938 StgRetInfoTable *info;
2940 // This closure represents the expression 'raise# E' where E
2941 // is the exception raise. It is used to overwrite all the
2942 // thunks which are currently under evaluataion.
2945 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2946 // LDV profiling: stg_raise_info has THUNK as its closure
2947 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2948 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2949 // 1 does not cause any problem unless profiling is performed.
2950 // However, when LDV profiling goes on, we need to linearly scan
2951 // small object pool, where raise_closure is stored, so we should
2952 // use MIN_UPD_SIZE.
2954 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2955 // sizeofW(StgClosure)+1);
2959 // Walk up the stack, looking for the catch frame. On the way,
2960 // we update any closures pointed to from update frames with the
2961 // raise closure that we just built.
2965 info = get_ret_itbl((StgClosure *)p);
2966 next = p + stack_frame_sizeW((StgClosure *)p);
2967 switch (info->i.type) {
2970 // Only create raise_closure if we need to.
2971 if (raise_closure == NULL) {
2973 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2974 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2975 raise_closure->payload[0] = exception;
2977 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2981 case ATOMICALLY_FRAME:
2982 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2984 return ATOMICALLY_FRAME;
2990 case CATCH_STM_FRAME:
2991 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2993 return CATCH_STM_FRAME;
2999 case CATCH_RETRY_FRAME:
3008 /* -----------------------------------------------------------------------------
3009 findRetryFrameHelper
3011 This function is called by the retry# primitive. It traverses the stack
3012 leaving tso->sp referring to the frame which should handle the retry.
3014 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3015 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3017 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3018 create) because retries are not considered to be exceptions, despite the
3019 similar implementation.
3021 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3022 not be created within memory transactions.
3023 -------------------------------------------------------------------------- */
3026 findRetryFrameHelper (StgTSO *tso)
3029 StgRetInfoTable *info;
3033 info = get_ret_itbl((StgClosure *)p);
3034 next = p + stack_frame_sizeW((StgClosure *)p);
3035 switch (info->i.type) {
3037 case ATOMICALLY_FRAME:
3038 debugTrace(DEBUG_stm,
3039 "found ATOMICALLY_FRAME at %p during retry", p);
3041 return ATOMICALLY_FRAME;
3043 case CATCH_RETRY_FRAME:
3044 debugTrace(DEBUG_stm,
3045 "found CATCH_RETRY_FRAME at %p during retrry", p);
3047 return CATCH_RETRY_FRAME;
3049 case CATCH_STM_FRAME: {
3050 debugTrace(DEBUG_stm,
3051 "found CATCH_STM_FRAME at %p during retry", p);
3052 StgTRecHeader *trec = tso -> trec;
3053 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3054 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3055 stmAbortTransaction(tso -> cap, trec);
3056 stmFreeAbortedTRec(tso -> cap, trec);
3057 tso -> trec = outer;
3064 ASSERT(info->i.type != CATCH_FRAME);
3065 ASSERT(info->i.type != STOP_FRAME);
3072 /* -----------------------------------------------------------------------------
3073 resurrectThreads is called after garbage collection on the list of
3074 threads found to be garbage. Each of these threads will be woken
3075 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3076 on an MVar, or NonTermination if the thread was blocked on a Black
3079 Locks: assumes we hold *all* the capabilities.
3080 -------------------------------------------------------------------------- */
3083 resurrectThreads (StgTSO *threads)
3088 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3089 next = tso->global_link;
3090 tso->global_link = all_threads;
3092 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3094 // Wake up the thread on the Capability it was last on
3097 switch (tso->why_blocked) {
3099 case BlockedOnException:
3100 /* Called by GC - sched_mutex lock is currently held. */
3101 throwToSingleThreaded(cap, tso,
3102 (StgClosure *)BlockedOnDeadMVar_closure);
3104 case BlockedOnBlackHole:
3105 throwToSingleThreaded(cap, tso,
3106 (StgClosure *)NonTermination_closure);
3109 throwToSingleThreaded(cap, tso,
3110 (StgClosure *)BlockedIndefinitely_closure);
3113 /* This might happen if the thread was blocked on a black hole
3114 * belonging to a thread that we've just woken up (raiseAsync
3115 * can wake up threads, remember...).
3119 barf("resurrectThreads: thread blocked in a strange way");