1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
33 #include "Proftimer.h"
36 #if defined(GRAN) || defined(PARALLEL_HASKELL)
37 # include "GranSimRts.h"
39 # include "ParallelRts.h"
40 # include "Parallel.h"
41 # include "ParallelDebug.h"
46 #include "Capability.h"
48 #include "AwaitEvent.h"
49 #if defined(mingw32_HOST_OS)
50 #include "win32/IOManager.h"
53 #include "RaiseAsync.h"
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
71 // Turn off inlining when debugging - it obfuscates things
74 # define STATIC_INLINE static
77 /* -----------------------------------------------------------------------------
79 * -------------------------------------------------------------------------- */
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
87 In GranSim we have a runnable and a blocked queue for each processor.
88 In order to minimise code changes new arrays run_queue_hds/tls
89 are created. run_queue_hd is then a short cut (macro) for
90 run_queue_hds[CurrentProc] (see GranSim.h).
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97 the std RTS (i.e. we are cheating). However, we don't use this list in
98 the GranSim specific code at the moment (so we are only potentially
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
110 /* Threads blocked on blackholes.
111 * LOCK: sched_mutex+capability, or all capabilities
113 StgTSO *blackhole_queue = NULL;
116 /* The blackhole_queue should be checked for threads to wake up. See
117 * Schedule.h for more thorough comment.
118 * LOCK: none (doesn't matter if we miss an update)
120 rtsBool blackholes_need_checking = rtsFalse;
122 /* Linked list of all threads.
123 * Used for detecting garbage collected threads.
124 * LOCK: sched_mutex+capability, or all capabilities
126 StgTSO *all_threads = NULL;
128 /* flag set by signal handler to precipitate a context switch
129 * LOCK: none (just an advisory flag)
131 int context_switch = 0;
133 /* flag that tracks whether we have done any execution in this time slice.
134 * LOCK: currently none, perhaps we should lock (but needs to be
135 * updated in the fast path of the scheduler).
137 nat recent_activity = ACTIVITY_YES;
139 /* if this flag is set as well, give up execution
140 * LOCK: none (changes once, from false->true)
142 rtsBool sched_state = SCHED_RUNNING;
148 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
149 * exists - earlier gccs apparently didn't.
155 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156 * in an MT setting, needed to signal that a worker thread shouldn't hang around
157 * in the scheduler when it is out of work.
159 rtsBool shutting_down_scheduler = rtsFalse;
162 * This mutex protects most of the global scheduler data in
163 * the THREADED_RTS runtime.
165 #if defined(THREADED_RTS)
169 #if defined(PARALLEL_HASKELL)
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
179 /* -----------------------------------------------------------------------------
180 * static function prototypes
181 * -------------------------------------------------------------------------- */
183 static Capability *schedule (Capability *initialCapability, Task *task);
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
215 nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
222 void (*get_roots)(evac_fn));
224 static rtsBool checkBlackHoles(Capability *cap);
225 static void AllRoots(evac_fn evac);
227 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
229 static void deleteThread (Capability *cap, StgTSO *tso);
230 static void deleteAllThreads (Capability *cap);
232 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
233 static void deleteThread_(Capability *cap, StgTSO *tso);
236 #if defined(PARALLEL_HASKELL)
237 StgTSO * createSparkThread(rtsSpark spark);
238 StgTSO * activateSpark (rtsSpark spark);
242 static char *whatNext_strs[] = {
252 /* -----------------------------------------------------------------------------
253 * Putting a thread on the run queue: different scheduling policies
254 * -------------------------------------------------------------------------- */
257 addToRunQueue( Capability *cap, StgTSO *t )
259 #if defined(PARALLEL_HASKELL)
260 if (RtsFlags.ParFlags.doFairScheduling) {
261 // this does round-robin scheduling; good for concurrency
262 appendToRunQueue(cap,t);
264 // this does unfair scheduling; good for parallelism
265 pushOnRunQueue(cap,t);
268 // this does round-robin scheduling; good for concurrency
269 appendToRunQueue(cap,t);
273 /* ---------------------------------------------------------------------------
274 Main scheduling loop.
276 We use round-robin scheduling, each thread returning to the
277 scheduler loop when one of these conditions is detected:
280 * timer expires (thread yields)
286 In a GranSim setup this loop iterates over the global event queue.
287 This revolves around the global event queue, which determines what
288 to do next. Therefore, it's more complicated than either the
289 concurrent or the parallel (GUM) setup.
292 GUM iterates over incoming messages.
293 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
294 and sends out a fish whenever it has nothing to do; in-between
295 doing the actual reductions (shared code below) it processes the
296 incoming messages and deals with delayed operations
297 (see PendingFetches).
298 This is not the ugliest code you could imagine, but it's bloody close.
300 ------------------------------------------------------------------------ */
303 schedule (Capability *initialCapability, Task *task)
307 StgThreadReturnCode ret;
310 #elif defined(PARALLEL_HASKELL)
313 rtsBool receivedFinish = rtsFalse;
315 nat tp_size, sp_size; // stats only
320 #if defined(THREADED_RTS)
321 rtsBool first = rtsTrue;
324 cap = initialCapability;
326 // Pre-condition: this task owns initialCapability.
327 // The sched_mutex is *NOT* held
328 // NB. on return, we still hold a capability.
330 debugTrace (DEBUG_sched,
331 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
332 task, initialCapability);
336 // -----------------------------------------------------------
337 // Scheduler loop starts here:
339 #if defined(PARALLEL_HASKELL)
340 #define TERMINATION_CONDITION (!receivedFinish)
342 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
344 #define TERMINATION_CONDITION rtsTrue
347 while (TERMINATION_CONDITION) {
350 /* Choose the processor with the next event */
351 CurrentProc = event->proc;
352 CurrentTSO = event->tso;
355 #if defined(THREADED_RTS)
357 // don't yield the first time, we want a chance to run this
358 // thread for a bit, even if there are others banging at the
361 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
363 // Yield the capability to higher-priority tasks if necessary.
364 yieldCapability(&cap, task);
368 #if defined(THREADED_RTS)
369 schedulePushWork(cap,task);
372 // Check whether we have re-entered the RTS from Haskell without
373 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
375 if (cap->in_haskell) {
376 errorBelch("schedule: re-entered unsafely.\n"
377 " Perhaps a 'foreign import unsafe' should be 'safe'?");
378 stg_exit(EXIT_FAILURE);
381 // The interruption / shutdown sequence.
383 // In order to cleanly shut down the runtime, we want to:
384 // * make sure that all main threads return to their callers
385 // with the state 'Interrupted'.
386 // * clean up all OS threads assocated with the runtime
387 // * free all memory etc.
389 // So the sequence for ^C goes like this:
391 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
392 // arranges for some Capability to wake up
394 // * all threads in the system are halted, and the zombies are
395 // placed on the run queue for cleaning up. We acquire all
396 // the capabilities in order to delete the threads, this is
397 // done by scheduleDoGC() for convenience (because GC already
398 // needs to acquire all the capabilities). We can't kill
399 // threads involved in foreign calls.
401 // * somebody calls shutdownHaskell(), which calls exitScheduler()
403 // * sched_state := SCHED_SHUTTING_DOWN
405 // * all workers exit when the run queue on their capability
406 // drains. All main threads will also exit when their TSO
407 // reaches the head of the run queue and they can return.
409 // * eventually all Capabilities will shut down, and the RTS can
412 // * We might be left with threads blocked in foreign calls,
413 // we should really attempt to kill these somehow (TODO);
415 switch (sched_state) {
418 case SCHED_INTERRUPTING:
419 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
420 #if defined(THREADED_RTS)
421 discardSparksCap(cap);
423 /* scheduleDoGC() deletes all the threads */
424 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
426 case SCHED_SHUTTING_DOWN:
427 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
428 // If we are a worker, just exit. If we're a bound thread
429 // then we will exit below when we've removed our TSO from
431 if (task->tso == NULL && emptyRunQueue(cap)) {
436 barf("sched_state: %d", sched_state);
439 #if defined(THREADED_RTS)
440 // If the run queue is empty, take a spark and turn it into a thread.
442 if (emptyRunQueue(cap)) {
444 spark = findSpark(cap);
446 debugTrace(DEBUG_sched,
447 "turning spark of closure %p into a thread",
448 (StgClosure *)spark);
449 createSparkThread(cap,spark);
453 #endif // THREADED_RTS
455 scheduleStartSignalHandlers(cap);
457 // Only check the black holes here if we've nothing else to do.
458 // During normal execution, the black hole list only gets checked
459 // at GC time, to avoid repeatedly traversing this possibly long
460 // list each time around the scheduler.
461 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
463 scheduleCheckWakeupThreads(cap);
465 scheduleCheckBlockedThreads(cap);
467 scheduleDetectDeadlock(cap,task);
468 #if defined(THREADED_RTS)
469 cap = task->cap; // reload cap, it might have changed
472 // Normally, the only way we can get here with no threads to
473 // run is if a keyboard interrupt received during
474 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
475 // Additionally, it is not fatal for the
476 // threaded RTS to reach here with no threads to run.
478 // win32: might be here due to awaitEvent() being abandoned
479 // as a result of a console event having been delivered.
480 if ( emptyRunQueue(cap) ) {
481 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
482 ASSERT(sched_state >= SCHED_INTERRUPTING);
484 continue; // nothing to do
487 #if defined(PARALLEL_HASKELL)
488 scheduleSendPendingMessages();
489 if (emptyRunQueue(cap) && scheduleActivateSpark())
493 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
496 /* If we still have no work we need to send a FISH to get a spark
498 if (emptyRunQueue(cap)) {
499 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
500 ASSERT(rtsFalse); // should not happen at the moment
502 // from here: non-empty run queue.
503 // TODO: merge above case with this, only one call processMessages() !
504 if (PacketsWaiting()) { /* process incoming messages, if
505 any pending... only in else
506 because getRemoteWork waits for
508 receivedFinish = processMessages();
513 scheduleProcessEvent(event);
517 // Get a thread to run
519 t = popRunQueue(cap);
521 #if defined(GRAN) || defined(PAR)
522 scheduleGranParReport(); // some kind of debuging output
524 // Sanity check the thread we're about to run. This can be
525 // expensive if there is lots of thread switching going on...
526 IF_DEBUG(sanity,checkTSO(t));
529 #if defined(THREADED_RTS)
530 // Check whether we can run this thread in the current task.
531 // If not, we have to pass our capability to the right task.
533 Task *bound = t->bound;
537 debugTrace(DEBUG_sched,
538 "### Running thread %lu in bound thread", (unsigned long)t->id);
539 // yes, the Haskell thread is bound to the current native thread
541 debugTrace(DEBUG_sched,
542 "### thread %lu bound to another OS thread", (unsigned long)t->id);
543 // no, bound to a different Haskell thread: pass to that thread
544 pushOnRunQueue(cap,t);
548 // The thread we want to run is unbound.
550 debugTrace(DEBUG_sched,
551 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
552 // no, the current native thread is bound to a different
553 // Haskell thread, so pass it to any worker thread
554 pushOnRunQueue(cap,t);
561 cap->r.rCurrentTSO = t;
563 /* context switches are initiated by the timer signal, unless
564 * the user specified "context switch as often as possible", with
567 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
568 && !emptyThreadQueues(cap)) {
574 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
575 (long)t->id, whatNext_strs[t->what_next]);
577 #if defined(PROFILING)
578 startHeapProfTimer();
581 // Check for exceptions blocked on this thread
582 maybePerformBlockedException (cap, t);
584 // ----------------------------------------------------------------------
585 // Run the current thread
587 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
588 ASSERT(t->cap == cap);
590 prev_what_next = t->what_next;
592 errno = t->saved_errno;
593 cap->in_haskell = rtsTrue;
597 recent_activity = ACTIVITY_YES;
599 switch (prev_what_next) {
603 /* Thread already finished, return to scheduler. */
604 ret = ThreadFinished;
610 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
611 cap = regTableToCapability(r);
616 case ThreadInterpret:
617 cap = interpretBCO(cap);
622 barf("schedule: invalid what_next field");
625 cap->in_haskell = rtsFalse;
627 // The TSO might have moved, eg. if it re-entered the RTS and a GC
628 // happened. So find the new location:
629 t = cap->r.rCurrentTSO;
631 // We have run some Haskell code: there might be blackhole-blocked
632 // threads to wake up now.
633 // Lock-free test here should be ok, we're just setting a flag.
634 if ( blackhole_queue != END_TSO_QUEUE ) {
635 blackholes_need_checking = rtsTrue;
638 // And save the current errno in this thread.
639 // XXX: possibly bogus for SMP because this thread might already
640 // be running again, see code below.
641 t->saved_errno = errno;
643 #if defined(THREADED_RTS)
644 // If ret is ThreadBlocked, and this Task is bound to the TSO that
645 // blocked, we are in limbo - the TSO is now owned by whatever it
646 // is blocked on, and may in fact already have been woken up,
647 // perhaps even on a different Capability. It may be the case
648 // that task->cap != cap. We better yield this Capability
649 // immediately and return to normaility.
650 if (ret == ThreadBlocked) {
651 debugTrace(DEBUG_sched,
652 "--<< thread %lu (%s) stopped: blocked",
653 (unsigned long)t->id, whatNext_strs[t->what_next]);
658 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659 ASSERT(t->cap == cap);
661 // ----------------------------------------------------------------------
663 // Costs for the scheduler are assigned to CCS_SYSTEM
664 #if defined(PROFILING)
669 schedulePostRunThread();
671 ready_to_gc = rtsFalse;
675 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
679 scheduleHandleStackOverflow(cap,task,t);
683 if (scheduleHandleYield(cap, t, prev_what_next)) {
684 // shortcut for switching between compiler/interpreter:
690 scheduleHandleThreadBlocked(t);
694 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
695 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
699 barf("schedule: invalid thread return code %d", (int)ret);
702 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
704 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
706 } /* end of while() */
708 debugTrace(PAR_DEBUG_verbose,
709 "== Leaving schedule() after having received Finish");
712 /* ----------------------------------------------------------------------------
713 * Setting up the scheduler loop
714 * ------------------------------------------------------------------------- */
717 schedulePreLoop(void)
720 /* set up first event to get things going */
721 /* ToDo: assign costs for system setup and init MainTSO ! */
722 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
724 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
726 debugTrace (DEBUG_gran,
727 "GRAN: Init CurrentTSO (in schedule) = %p",
729 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
731 if (RtsFlags.GranFlags.Light) {
732 /* Save current time; GranSim Light only */
733 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
738 /* -----------------------------------------------------------------------------
741 * Push work to other Capabilities if we have some.
742 * -------------------------------------------------------------------------- */
744 #if defined(THREADED_RTS)
746 schedulePushWork(Capability *cap USED_IF_THREADS,
747 Task *task USED_IF_THREADS)
749 Capability *free_caps[n_capabilities], *cap0;
752 // migration can be turned off with +RTS -qg
753 if (!RtsFlags.ParFlags.migrate) return;
755 // Check whether we have more threads on our run queue, or sparks
756 // in our pool, that we could hand to another Capability.
757 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
758 && sparkPoolSizeCap(cap) < 2) {
762 // First grab as many free Capabilities as we can.
763 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
764 cap0 = &capabilities[i];
765 if (cap != cap0 && tryGrabCapability(cap0,task)) {
766 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
767 // it already has some work, we just grabbed it at
768 // the wrong moment. Or maybe it's deadlocked!
769 releaseCapability(cap0);
771 free_caps[n_free_caps++] = cap0;
776 // we now have n_free_caps free capabilities stashed in
777 // free_caps[]. Share our run queue equally with them. This is
778 // probably the simplest thing we could do; improvements we might
779 // want to do include:
781 // - giving high priority to moving relatively new threads, on
782 // the gournds that they haven't had time to build up a
783 // working set in the cache on this CPU/Capability.
785 // - giving low priority to moving long-lived threads
787 if (n_free_caps > 0) {
788 StgTSO *prev, *t, *next;
789 rtsBool pushed_to_all;
791 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
794 pushed_to_all = rtsFalse;
796 if (cap->run_queue_hd != END_TSO_QUEUE) {
797 prev = cap->run_queue_hd;
799 prev->link = END_TSO_QUEUE;
800 for (; t != END_TSO_QUEUE; t = next) {
802 t->link = END_TSO_QUEUE;
803 if (t->what_next == ThreadRelocated
804 || t->bound == task // don't move my bound thread
805 || tsoLocked(t)) { // don't move a locked thread
808 } else if (i == n_free_caps) {
809 pushed_to_all = rtsTrue;
815 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
816 appendToRunQueue(free_caps[i],t);
817 if (t->bound) { t->bound->cap = free_caps[i]; }
818 t->cap = free_caps[i];
822 cap->run_queue_tl = prev;
825 // If there are some free capabilities that we didn't push any
826 // threads to, then try to push a spark to each one.
827 if (!pushed_to_all) {
829 // i is the next free capability to push to
830 for (; i < n_free_caps; i++) {
831 if (emptySparkPoolCap(free_caps[i])) {
832 spark = findSpark(cap);
834 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
835 newSpark(&(free_caps[i]->r), spark);
841 // release the capabilities
842 for (i = 0; i < n_free_caps; i++) {
843 task->cap = free_caps[i];
844 releaseCapability(free_caps[i]);
847 task->cap = cap; // reset to point to our Capability.
851 /* ----------------------------------------------------------------------------
852 * Start any pending signal handlers
853 * ------------------------------------------------------------------------- */
855 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
857 scheduleStartSignalHandlers(Capability *cap)
859 if (signals_pending()) { // safe outside the lock
860 startSignalHandlers(cap);
865 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
870 /* ----------------------------------------------------------------------------
871 * Check for blocked threads that can be woken up.
872 * ------------------------------------------------------------------------- */
875 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
877 #if !defined(THREADED_RTS)
879 // Check whether any waiting threads need to be woken up. If the
880 // run queue is empty, and there are no other tasks running, we
881 // can wait indefinitely for something to happen.
883 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
885 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
891 /* ----------------------------------------------------------------------------
892 * Check for threads woken up by other Capabilities
893 * ------------------------------------------------------------------------- */
896 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
898 #if defined(THREADED_RTS)
899 // Any threads that were woken up by other Capabilities get
900 // appended to our run queue.
901 if (!emptyWakeupQueue(cap)) {
902 ACQUIRE_LOCK(&cap->lock);
903 if (emptyRunQueue(cap)) {
904 cap->run_queue_hd = cap->wakeup_queue_hd;
905 cap->run_queue_tl = cap->wakeup_queue_tl;
907 cap->run_queue_tl->link = cap->wakeup_queue_hd;
908 cap->run_queue_tl = cap->wakeup_queue_tl;
910 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
911 RELEASE_LOCK(&cap->lock);
916 /* ----------------------------------------------------------------------------
917 * Check for threads blocked on BLACKHOLEs that can be woken up
918 * ------------------------------------------------------------------------- */
920 scheduleCheckBlackHoles (Capability *cap)
922 if ( blackholes_need_checking ) // check without the lock first
924 ACQUIRE_LOCK(&sched_mutex);
925 if ( blackholes_need_checking ) {
926 checkBlackHoles(cap);
927 blackholes_need_checking = rtsFalse;
929 RELEASE_LOCK(&sched_mutex);
933 /* ----------------------------------------------------------------------------
934 * Detect deadlock conditions and attempt to resolve them.
935 * ------------------------------------------------------------------------- */
938 scheduleDetectDeadlock (Capability *cap, Task *task)
941 #if defined(PARALLEL_HASKELL)
942 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
947 * Detect deadlock: when we have no threads to run, there are no
948 * threads blocked, waiting for I/O, or sleeping, and all the
949 * other tasks are waiting for work, we must have a deadlock of
952 if ( emptyThreadQueues(cap) )
954 #if defined(THREADED_RTS)
956 * In the threaded RTS, we only check for deadlock if there
957 * has been no activity in a complete timeslice. This means
958 * we won't eagerly start a full GC just because we don't have
959 * any threads to run currently.
961 if (recent_activity != ACTIVITY_INACTIVE) return;
964 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
966 // Garbage collection can release some new threads due to
967 // either (a) finalizers or (b) threads resurrected because
968 // they are unreachable and will therefore be sent an
969 // exception. Any threads thus released will be immediately
971 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
973 recent_activity = ACTIVITY_DONE_GC;
975 if ( !emptyRunQueue(cap) ) return;
977 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
978 /* If we have user-installed signal handlers, then wait
979 * for signals to arrive rather then bombing out with a
982 if ( anyUserHandlers() ) {
983 debugTrace(DEBUG_sched,
984 "still deadlocked, waiting for signals...");
988 if (signals_pending()) {
989 startSignalHandlers(cap);
992 // either we have threads to run, or we were interrupted:
993 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
997 #if !defined(THREADED_RTS)
998 /* Probably a real deadlock. Send the current main thread the
999 * Deadlock exception.
1002 switch (task->tso->why_blocked) {
1004 case BlockedOnBlackHole:
1005 case BlockedOnException:
1007 throwToSingleThreaded(cap, task->tso,
1008 (StgClosure *)NonTermination_closure);
1011 barf("deadlock: main thread blocked in a strange way");
1019 /* ----------------------------------------------------------------------------
1020 * Process an event (GRAN only)
1021 * ------------------------------------------------------------------------- */
1025 scheduleProcessEvent(rtsEvent *event)
1029 if (RtsFlags.GranFlags.Light)
1030 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1032 /* adjust time based on time-stamp */
1033 if (event->time > CurrentTime[CurrentProc] &&
1034 event->evttype != ContinueThread)
1035 CurrentTime[CurrentProc] = event->time;
1037 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1038 if (!RtsFlags.GranFlags.Light)
1041 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1043 /* main event dispatcher in GranSim */
1044 switch (event->evttype) {
1045 /* Should just be continuing execution */
1046 case ContinueThread:
1047 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1048 /* ToDo: check assertion
1049 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1050 run_queue_hd != END_TSO_QUEUE);
1052 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1053 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1054 procStatus[CurrentProc]==Fetching) {
1055 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1056 CurrentTSO->id, CurrentTSO, CurrentProc);
1059 /* Ignore ContinueThreads for completed threads */
1060 if (CurrentTSO->what_next == ThreadComplete) {
1061 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1062 CurrentTSO->id, CurrentTSO, CurrentProc);
1065 /* Ignore ContinueThreads for threads that are being migrated */
1066 if (PROCS(CurrentTSO)==Nowhere) {
1067 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1068 CurrentTSO->id, CurrentTSO, CurrentProc);
1071 /* The thread should be at the beginning of the run queue */
1072 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1073 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1074 CurrentTSO->id, CurrentTSO, CurrentProc);
1075 break; // run the thread anyway
1078 new_event(proc, proc, CurrentTime[proc],
1080 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1082 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1083 break; // now actually run the thread; DaH Qu'vam yImuHbej
1086 do_the_fetchnode(event);
1087 goto next_thread; /* handle next event in event queue */
1090 do_the_globalblock(event);
1091 goto next_thread; /* handle next event in event queue */
1094 do_the_fetchreply(event);
1095 goto next_thread; /* handle next event in event queue */
1097 case UnblockThread: /* Move from the blocked queue to the tail of */
1098 do_the_unblock(event);
1099 goto next_thread; /* handle next event in event queue */
1101 case ResumeThread: /* Move from the blocked queue to the tail of */
1102 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1103 event->tso->gran.blocktime +=
1104 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1105 do_the_startthread(event);
1106 goto next_thread; /* handle next event in event queue */
1109 do_the_startthread(event);
1110 goto next_thread; /* handle next event in event queue */
1113 do_the_movethread(event);
1114 goto next_thread; /* handle next event in event queue */
1117 do_the_movespark(event);
1118 goto next_thread; /* handle next event in event queue */
1121 do_the_findwork(event);
1122 goto next_thread; /* handle next event in event queue */
1125 barf("Illegal event type %u\n", event->evttype);
1128 /* This point was scheduler_loop in the old RTS */
1130 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1132 TimeOfLastEvent = CurrentTime[CurrentProc];
1133 TimeOfNextEvent = get_time_of_next_event();
1134 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1135 // CurrentTSO = ThreadQueueHd;
1137 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1140 if (RtsFlags.GranFlags.Light)
1141 GranSimLight_leave_system(event, &ActiveTSO);
1143 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1146 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1148 /* in a GranSim setup the TSO stays on the run queue */
1150 /* Take a thread from the run queue. */
1151 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1154 debugBelch("GRAN: About to run current thread, which is\n");
1157 context_switch = 0; // turned on via GranYield, checking events and time slice
1160 DumpGranEvent(GR_SCHEDULE, t));
1162 procStatus[CurrentProc] = Busy;
1166 /* ----------------------------------------------------------------------------
1167 * Send pending messages (PARALLEL_HASKELL only)
1168 * ------------------------------------------------------------------------- */
1170 #if defined(PARALLEL_HASKELL)
1172 scheduleSendPendingMessages(void)
1178 # if defined(PAR) // global Mem.Mgmt., omit for now
1179 if (PendingFetches != END_BF_QUEUE) {
1184 if (RtsFlags.ParFlags.BufferTime) {
1185 // if we use message buffering, we must send away all message
1186 // packets which have become too old...
1192 /* ----------------------------------------------------------------------------
1193 * Activate spark threads (PARALLEL_HASKELL only)
1194 * ------------------------------------------------------------------------- */
1196 #if defined(PARALLEL_HASKELL)
1198 scheduleActivateSpark(void)
1201 ASSERT(emptyRunQueue());
1202 /* We get here if the run queue is empty and want some work.
1203 We try to turn a spark into a thread, and add it to the run queue,
1204 from where it will be picked up in the next iteration of the scheduler
1208 /* :-[ no local threads => look out for local sparks */
1209 /* the spark pool for the current PE */
1210 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1211 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1212 pool->hd < pool->tl) {
1214 * ToDo: add GC code check that we really have enough heap afterwards!!
1216 * If we're here (no runnable threads) and we have pending
1217 * sparks, we must have a space problem. Get enough space
1218 * to turn one of those pending sparks into a
1222 spark = findSpark(rtsFalse); /* get a spark */
1223 if (spark != (rtsSpark) NULL) {
1224 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1225 IF_PAR_DEBUG(fish, // schedule,
1226 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1227 tso->id, tso, advisory_thread_count));
1229 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1230 IF_PAR_DEBUG(fish, // schedule,
1231 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1233 return rtsFalse; /* failed to generate a thread */
1234 } /* otherwise fall through & pick-up new tso */
1236 IF_PAR_DEBUG(fish, // schedule,
1237 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1238 spark_queue_len(pool)));
1239 return rtsFalse; /* failed to generate a thread */
1241 return rtsTrue; /* success in generating a thread */
1242 } else { /* no more threads permitted or pool empty */
1243 return rtsFalse; /* failed to generateThread */
1246 tso = NULL; // avoid compiler warning only
1247 return rtsFalse; /* dummy in non-PAR setup */
1250 #endif // PARALLEL_HASKELL
1252 /* ----------------------------------------------------------------------------
1253 * Get work from a remote node (PARALLEL_HASKELL only)
1254 * ------------------------------------------------------------------------- */
1256 #if defined(PARALLEL_HASKELL)
1258 scheduleGetRemoteWork(rtsBool *receivedFinish)
1260 ASSERT(emptyRunQueue());
1262 if (RtsFlags.ParFlags.BufferTime) {
1263 IF_PAR_DEBUG(verbose,
1264 debugBelch("...send all pending data,"));
1267 for (i=1; i<=nPEs; i++)
1268 sendImmediately(i); // send all messages away immediately
1272 //++EDEN++ idle() , i.e. send all buffers, wait for work
1273 // suppress fishing in EDEN... just look for incoming messages
1274 // (blocking receive)
1275 IF_PAR_DEBUG(verbose,
1276 debugBelch("...wait for incoming messages...\n"));
1277 *receivedFinish = processMessages(); // blocking receive...
1279 // and reenter scheduling loop after having received something
1280 // (return rtsFalse below)
1282 # else /* activate SPARKS machinery */
1283 /* We get here, if we have no work, tried to activate a local spark, but still
1284 have no work. We try to get a remote spark, by sending a FISH message.
1285 Thread migration should be added here, and triggered when a sequence of
1286 fishes returns without work. */
1287 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1289 /* =8-[ no local sparks => look for work on other PEs */
1291 * We really have absolutely no work. Send out a fish
1292 * (there may be some out there already), and wait for
1293 * something to arrive. We clearly can't run any threads
1294 * until a SCHEDULE or RESUME arrives, and so that's what
1295 * we're hoping to see. (Of course, we still have to
1296 * respond to other types of messages.)
1298 rtsTime now = msTime() /*CURRENT_TIME*/;
1299 IF_PAR_DEBUG(verbose,
1300 debugBelch("-- now=%ld\n", now));
1301 IF_PAR_DEBUG(fish, // verbose,
1302 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1303 (last_fish_arrived_at!=0 &&
1304 last_fish_arrived_at+delay > now)) {
1305 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1306 now, last_fish_arrived_at+delay,
1307 last_fish_arrived_at,
1311 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1312 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1313 if (last_fish_arrived_at==0 ||
1314 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1315 /* outstandingFishes is set in sendFish, processFish;
1316 avoid flooding system with fishes via delay */
1317 next_fish_to_send_at = 0;
1319 /* ToDo: this should be done in the main scheduling loop to avoid the
1320 busy wait here; not so bad if fish delay is very small */
1321 int iq = 0; // DEBUGGING -- HWL
1322 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1323 /* send a fish when ready, but process messages that arrive in the meantime */
1325 if (PacketsWaiting()) {
1327 *receivedFinish = processMessages();
1330 } while (!*receivedFinish || now<next_fish_to_send_at);
1331 // JB: This means the fish could become obsolete, if we receive
1332 // work. Better check for work again?
1333 // last line: while (!receivedFinish || !haveWork || now<...)
1334 // next line: if (receivedFinish || haveWork )
1336 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1337 return rtsFalse; // NB: this will leave scheduler loop
1338 // immediately after return!
1340 IF_PAR_DEBUG(fish, // verbose,
1341 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1345 // JB: IMHO, this should all be hidden inside sendFish(...)
1347 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1350 // Global statistics: count no. of fishes
1351 if (RtsFlags.ParFlags.ParStats.Global &&
1352 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1353 globalParStats.tot_fish_mess++;
1357 /* delayed fishes must have been sent by now! */
1358 next_fish_to_send_at = 0;
1361 *receivedFinish = processMessages();
1362 # endif /* SPARKS */
1365 /* NB: this function always returns rtsFalse, meaning the scheduler
1366 loop continues with the next iteration;
1368 return code means success in finding work; we enter this function
1369 if there is no local work, thus have to send a fish which takes
1370 time until it arrives with work; in the meantime we should process
1371 messages in the main loop;
1374 #endif // PARALLEL_HASKELL
1376 /* ----------------------------------------------------------------------------
1377 * PAR/GRAN: Report stats & debugging info(?)
1378 * ------------------------------------------------------------------------- */
1380 #if defined(PAR) || defined(GRAN)
1382 scheduleGranParReport(void)
1384 ASSERT(run_queue_hd != END_TSO_QUEUE);
1386 /* Take a thread from the run queue, if we have work */
1387 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1389 /* If this TSO has got its outport closed in the meantime,
1390 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1391 * It has to be marked as TH_DEAD for this purpose.
1392 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1394 JB: TODO: investigate wether state change field could be nuked
1395 entirely and replaced by the normal tso state (whatnext
1396 field). All we want to do is to kill tsos from outside.
1399 /* ToDo: write something to the log-file
1400 if (RTSflags.ParFlags.granSimStats && !sameThread)
1401 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1405 /* the spark pool for the current PE */
1406 pool = &(cap.r.rSparks); // cap = (old) MainCap
1409 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1410 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1414 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1416 if (RtsFlags.ParFlags.ParStats.Full &&
1417 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1418 (emitSchedule || // forced emit
1419 (t && LastTSO && t->id != LastTSO->id))) {
1421 we are running a different TSO, so write a schedule event to log file
1422 NB: If we use fair scheduling we also have to write a deschedule
1423 event for LastTSO; with unfair scheduling we know that the
1424 previous tso has blocked whenever we switch to another tso, so
1425 we don't need it in GUM for now
1427 IF_PAR_DEBUG(fish, // schedule,
1428 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1430 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1431 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1432 emitSchedule = rtsFalse;
1437 /* ----------------------------------------------------------------------------
1438 * After running a thread...
1439 * ------------------------------------------------------------------------- */
1442 schedulePostRunThread(void)
1445 /* HACK 675: if the last thread didn't yield, make sure to print a
1446 SCHEDULE event to the log file when StgRunning the next thread, even
1447 if it is the same one as before */
1449 TimeOfLastYield = CURRENT_TIME;
1452 /* some statistics gathering in the parallel case */
1454 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1458 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1459 globalGranStats.tot_heapover++;
1461 globalParStats.tot_heapover++;
1468 DumpGranEvent(GR_DESCHEDULE, t));
1469 globalGranStats.tot_stackover++;
1472 // DumpGranEvent(GR_DESCHEDULE, t);
1473 globalParStats.tot_stackover++;
1477 case ThreadYielding:
1480 DumpGranEvent(GR_DESCHEDULE, t));
1481 globalGranStats.tot_yields++;
1484 // DumpGranEvent(GR_DESCHEDULE, t);
1485 globalParStats.tot_yields++;
1491 debugTrace(DEBUG_sched,
1492 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1493 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1494 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1495 if (t->block_info.closure!=(StgClosure*)NULL)
1496 print_bq(t->block_info.closure);
1499 // ??? needed; should emit block before
1501 DumpGranEvent(GR_DESCHEDULE, t));
1502 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1505 ASSERT(procStatus[CurrentProc]==Busy ||
1506 ((procStatus[CurrentProc]==Fetching) &&
1507 (t->block_info.closure!=(StgClosure*)NULL)));
1508 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1509 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1510 procStatus[CurrentProc]==Fetching))
1511 procStatus[CurrentProc] = Idle;
1514 //++PAR++ blockThread() writes the event (change?)
1518 case ThreadFinished:
1522 barf("parGlobalStats: unknown return code");
1528 /* -----------------------------------------------------------------------------
1529 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1530 * -------------------------------------------------------------------------- */
1533 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1535 // did the task ask for a large block?
1536 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1537 // if so, get one and push it on the front of the nursery.
1541 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1543 debugTrace(DEBUG_sched,
1544 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1545 (long)t->id, whatNext_strs[t->what_next], blocks);
1547 // don't do this if the nursery is (nearly) full, we'll GC first.
1548 if (cap->r.rCurrentNursery->link != NULL ||
1549 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1550 // if the nursery has only one block.
1553 bd = allocGroup( blocks );
1555 cap->r.rNursery->n_blocks += blocks;
1557 // link the new group into the list
1558 bd->link = cap->r.rCurrentNursery;
1559 bd->u.back = cap->r.rCurrentNursery->u.back;
1560 if (cap->r.rCurrentNursery->u.back != NULL) {
1561 cap->r.rCurrentNursery->u.back->link = bd;
1563 #if !defined(THREADED_RTS)
1564 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1565 g0s0 == cap->r.rNursery);
1567 cap->r.rNursery->blocks = bd;
1569 cap->r.rCurrentNursery->u.back = bd;
1571 // initialise it as a nursery block. We initialise the
1572 // step, gen_no, and flags field of *every* sub-block in
1573 // this large block, because this is easier than making
1574 // sure that we always find the block head of a large
1575 // block whenever we call Bdescr() (eg. evacuate() and
1576 // isAlive() in the GC would both have to do this, at
1580 for (x = bd; x < bd + blocks; x++) {
1581 x->step = cap->r.rNursery;
1587 // This assert can be a killer if the app is doing lots
1588 // of large block allocations.
1589 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1591 // now update the nursery to point to the new block
1592 cap->r.rCurrentNursery = bd;
1594 // we might be unlucky and have another thread get on the
1595 // run queue before us and steal the large block, but in that
1596 // case the thread will just end up requesting another large
1598 pushOnRunQueue(cap,t);
1599 return rtsFalse; /* not actually GC'ing */
1603 debugTrace(DEBUG_sched,
1604 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1605 (long)t->id, whatNext_strs[t->what_next]);
1608 ASSERT(!is_on_queue(t,CurrentProc));
1609 #elif defined(PARALLEL_HASKELL)
1610 /* Currently we emit a DESCHEDULE event before GC in GUM.
1611 ToDo: either add separate event to distinguish SYSTEM time from rest
1612 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1613 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1614 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1615 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1616 emitSchedule = rtsTrue;
1620 pushOnRunQueue(cap,t);
1622 /* actual GC is done at the end of the while loop in schedule() */
1625 /* -----------------------------------------------------------------------------
1626 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1627 * -------------------------------------------------------------------------- */
1630 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1632 debugTrace (DEBUG_sched,
1633 "--<< thread %ld (%s) stopped, StackOverflow",
1634 (long)t->id, whatNext_strs[t->what_next]);
1636 /* just adjust the stack for this thread, then pop it back
1640 /* enlarge the stack */
1641 StgTSO *new_t = threadStackOverflow(cap, t);
1643 /* The TSO attached to this Task may have moved, so update the
1646 if (task->tso == t) {
1649 pushOnRunQueue(cap,new_t);
1653 /* -----------------------------------------------------------------------------
1654 * Handle a thread that returned to the scheduler with ThreadYielding
1655 * -------------------------------------------------------------------------- */
1658 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1660 // Reset the context switch flag. We don't do this just before
1661 // running the thread, because that would mean we would lose ticks
1662 // during GC, which can lead to unfair scheduling (a thread hogs
1663 // the CPU because the tick always arrives during GC). This way
1664 // penalises threads that do a lot of allocation, but that seems
1665 // better than the alternative.
1668 /* put the thread back on the run queue. Then, if we're ready to
1669 * GC, check whether this is the last task to stop. If so, wake
1670 * up the GC thread. getThread will block during a GC until the
1674 if (t->what_next != prev_what_next) {
1675 debugTrace(DEBUG_sched,
1676 "--<< thread %ld (%s) stopped to switch evaluators",
1677 (long)t->id, whatNext_strs[t->what_next]);
1679 debugTrace(DEBUG_sched,
1680 "--<< thread %ld (%s) stopped, yielding",
1681 (long)t->id, whatNext_strs[t->what_next]);
1686 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1688 ASSERT(t->link == END_TSO_QUEUE);
1690 // Shortcut if we're just switching evaluators: don't bother
1691 // doing stack squeezing (which can be expensive), just run the
1693 if (t->what_next != prev_what_next) {
1698 ASSERT(!is_on_queue(t,CurrentProc));
1701 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1702 checkThreadQsSanity(rtsTrue));
1706 addToRunQueue(cap,t);
1709 /* add a ContinueThread event to actually process the thread */
1710 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1712 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1714 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1721 /* -----------------------------------------------------------------------------
1722 * Handle a thread that returned to the scheduler with ThreadBlocked
1723 * -------------------------------------------------------------------------- */
1726 scheduleHandleThreadBlocked( StgTSO *t
1727 #if !defined(GRAN) && !defined(DEBUG)
1734 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1735 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1736 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1738 // ??? needed; should emit block before
1740 DumpGranEvent(GR_DESCHEDULE, t));
1741 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1744 ASSERT(procStatus[CurrentProc]==Busy ||
1745 ((procStatus[CurrentProc]==Fetching) &&
1746 (t->block_info.closure!=(StgClosure*)NULL)));
1747 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1748 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1749 procStatus[CurrentProc]==Fetching))
1750 procStatus[CurrentProc] = Idle;
1754 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1755 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1758 if (t->block_info.closure!=(StgClosure*)NULL)
1759 print_bq(t->block_info.closure));
1761 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1764 /* whatever we schedule next, we must log that schedule */
1765 emitSchedule = rtsTrue;
1769 // We don't need to do anything. The thread is blocked, and it
1770 // has tidied up its stack and placed itself on whatever queue
1771 // it needs to be on.
1773 #if !defined(THREADED_RTS)
1774 ASSERT(t->why_blocked != NotBlocked);
1775 // This might not be true under THREADED_RTS: we don't have
1776 // exclusive access to this TSO, so someone might have
1777 // woken it up by now. This actually happens: try
1778 // conc023 +RTS -N2.
1782 if (traceClass(DEBUG_sched)) {
1783 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1784 (unsigned long)t->id, whatNext_strs[t->what_next]);
1785 printThreadBlockage(t);
1790 /* Only for dumping event to log file
1791 ToDo: do I need this in GranSim, too?
1797 /* -----------------------------------------------------------------------------
1798 * Handle a thread that returned to the scheduler with ThreadFinished
1799 * -------------------------------------------------------------------------- */
1802 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1804 /* Need to check whether this was a main thread, and if so,
1805 * return with the return value.
1807 * We also end up here if the thread kills itself with an
1808 * uncaught exception, see Exception.cmm.
1810 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1811 (unsigned long)t->id, whatNext_strs[t->what_next]);
1814 endThread(t, CurrentProc); // clean-up the thread
1815 #elif defined(PARALLEL_HASKELL)
1816 /* For now all are advisory -- HWL */
1817 //if(t->priority==AdvisoryPriority) ??
1818 advisory_thread_count--; // JB: Caution with this counter, buggy!
1821 if(t->dist.priority==RevalPriority)
1825 # if defined(EDENOLD)
1826 // the thread could still have an outport... (BUG)
1827 if (t->eden.outport != -1) {
1828 // delete the outport for the tso which has finished...
1829 IF_PAR_DEBUG(eden_ports,
1830 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1831 t->eden.outport, t->id));
1834 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1835 if (t->eden.epid != -1) {
1836 IF_PAR_DEBUG(eden_ports,
1837 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1838 t->id, t->eden.epid));
1839 removeTSOfromProcess(t);
1844 if (RtsFlags.ParFlags.ParStats.Full &&
1845 !RtsFlags.ParFlags.ParStats.Suppressed)
1846 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1848 // t->par only contains statistics: left out for now...
1850 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1851 t->id,t,t->par.sparkname));
1853 #endif // PARALLEL_HASKELL
1856 // Check whether the thread that just completed was a bound
1857 // thread, and if so return with the result.
1859 // There is an assumption here that all thread completion goes
1860 // through this point; we need to make sure that if a thread
1861 // ends up in the ThreadKilled state, that it stays on the run
1862 // queue so it can be dealt with here.
1867 if (t->bound != task) {
1868 #if !defined(THREADED_RTS)
1869 // Must be a bound thread that is not the topmost one. Leave
1870 // it on the run queue until the stack has unwound to the
1871 // point where we can deal with this. Leaving it on the run
1872 // queue also ensures that the garbage collector knows about
1873 // this thread and its return value (it gets dropped from the
1874 // all_threads list so there's no other way to find it).
1875 appendToRunQueue(cap,t);
1878 // this cannot happen in the threaded RTS, because a
1879 // bound thread can only be run by the appropriate Task.
1880 barf("finished bound thread that isn't mine");
1884 ASSERT(task->tso == t);
1886 if (t->what_next == ThreadComplete) {
1888 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1889 *(task->ret) = (StgClosure *)task->tso->sp[1];
1891 task->stat = Success;
1894 *(task->ret) = NULL;
1896 if (sched_state >= SCHED_INTERRUPTING) {
1897 task->stat = Interrupted;
1899 task->stat = Killed;
1903 removeThreadLabel((StgWord)task->tso->id);
1905 return rtsTrue; // tells schedule() to return
1911 /* -----------------------------------------------------------------------------
1912 * Perform a heap census, if PROFILING
1913 * -------------------------------------------------------------------------- */
1916 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1918 #if defined(PROFILING)
1919 // When we have +RTS -i0 and we're heap profiling, do a census at
1920 // every GC. This lets us get repeatable runs for debugging.
1921 if (performHeapProfile ||
1922 (RtsFlags.ProfFlags.profileInterval==0 &&
1923 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1925 // checking black holes is necessary before GC, otherwise
1926 // there may be threads that are unreachable except by the
1927 // blackhole queue, which the GC will consider to be
1929 scheduleCheckBlackHoles(&MainCapability);
1931 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1932 GarbageCollect(GetRoots, rtsTrue);
1934 debugTrace(DEBUG_sched, "performing heap census");
1937 performHeapProfile = rtsFalse;
1938 return rtsTrue; // true <=> we already GC'd
1944 /* -----------------------------------------------------------------------------
1945 * Perform a garbage collection if necessary
1946 * -------------------------------------------------------------------------- */
1949 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1950 rtsBool force_major, void (*get_roots)(evac_fn))
1954 static volatile StgWord waiting_for_gc;
1955 rtsBool was_waiting;
1960 // In order to GC, there must be no threads running Haskell code.
1961 // Therefore, the GC thread needs to hold *all* the capabilities,
1962 // and release them after the GC has completed.
1964 // This seems to be the simplest way: previous attempts involved
1965 // making all the threads with capabilities give up their
1966 // capabilities and sleep except for the *last* one, which
1967 // actually did the GC. But it's quite hard to arrange for all
1968 // the other tasks to sleep and stay asleep.
1971 was_waiting = cas(&waiting_for_gc, 0, 1);
1974 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1975 if (cap) yieldCapability(&cap,task);
1976 } while (waiting_for_gc);
1977 return cap; // NOTE: task->cap might have changed here
1980 for (i=0; i < n_capabilities; i++) {
1981 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1982 if (cap != &capabilities[i]) {
1983 Capability *pcap = &capabilities[i];
1984 // we better hope this task doesn't get migrated to
1985 // another Capability while we're waiting for this one.
1986 // It won't, because load balancing happens while we have
1987 // all the Capabilities, but even so it's a slightly
1988 // unsavoury invariant.
1991 waitForReturnCapability(&pcap, task);
1992 if (pcap != &capabilities[i]) {
1993 barf("scheduleDoGC: got the wrong capability");
1998 waiting_for_gc = rtsFalse;
2001 /* Kick any transactions which are invalid back to their
2002 * atomically frames. When next scheduled they will try to
2003 * commit, this commit will fail and they will retry.
2008 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2009 if (t->what_next == ThreadRelocated) {
2012 next = t->global_link;
2014 // This is a good place to check for blocked
2015 // exceptions. It might be the case that a thread is
2016 // blocked on delivering an exception to a thread that
2017 // is also blocked - we try to ensure that this
2018 // doesn't happen in throwTo(), but it's too hard (or
2019 // impossible) to close all the race holes, so we
2020 // accept that some might get through and deal with
2021 // them here. A GC will always happen at some point,
2022 // even if the system is otherwise deadlocked.
2023 maybePerformBlockedException (&capabilities[0], t);
2025 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2026 if (!stmValidateNestOfTransactions (t -> trec)) {
2027 debugTrace(DEBUG_sched | DEBUG_stm,
2028 "trec %p found wasting its time", t);
2030 // strip the stack back to the
2031 // ATOMICALLY_FRAME, aborting the (nested)
2032 // transaction, and saving the stack of any
2033 // partially-evaluated thunks on the heap.
2034 throwToSingleThreaded_(&capabilities[0], t,
2035 NULL, rtsTrue, NULL);
2038 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2046 // so this happens periodically:
2047 if (cap) scheduleCheckBlackHoles(cap);
2049 IF_DEBUG(scheduler, printAllThreads());
2052 * We now have all the capabilities; if we're in an interrupting
2053 * state, then we should take the opportunity to delete all the
2054 * threads in the system.
2056 if (sched_state >= SCHED_INTERRUPTING) {
2057 deleteAllThreads(&capabilities[0]);
2058 sched_state = SCHED_SHUTTING_DOWN;
2061 /* everybody back, start the GC.
2062 * Could do it in this thread, or signal a condition var
2063 * to do it in another thread. Either way, we need to
2064 * broadcast on gc_pending_cond afterward.
2066 #if defined(THREADED_RTS)
2067 debugTrace(DEBUG_sched, "doing GC");
2069 GarbageCollect(get_roots, force_major);
2071 #if defined(THREADED_RTS)
2072 // release our stash of capabilities.
2073 for (i = 0; i < n_capabilities; i++) {
2074 if (cap != &capabilities[i]) {
2075 task->cap = &capabilities[i];
2076 releaseCapability(&capabilities[i]);
2087 /* add a ContinueThread event to continue execution of current thread */
2088 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2090 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2092 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2100 /* ---------------------------------------------------------------------------
2101 * Singleton fork(). Do not copy any running threads.
2102 * ------------------------------------------------------------------------- */
2105 forkProcess(HsStablePtr *entry
2106 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2111 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2117 #if defined(THREADED_RTS)
2118 if (RtsFlags.ParFlags.nNodes > 1) {
2119 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2120 stg_exit(EXIT_FAILURE);
2124 debugTrace(DEBUG_sched, "forking!");
2126 // ToDo: for SMP, we should probably acquire *all* the capabilities
2131 if (pid) { // parent
2133 // just return the pid
2139 // Now, all OS threads except the thread that forked are
2140 // stopped. We need to stop all Haskell threads, including
2141 // those involved in foreign calls. Also we need to delete
2142 // all Tasks, because they correspond to OS threads that are
2145 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2146 if (t->what_next == ThreadRelocated) {
2149 next = t->global_link;
2150 // don't allow threads to catch the ThreadKilled
2151 // exception, but we do want to raiseAsync() because these
2152 // threads may be evaluating thunks that we need later.
2153 deleteThread_(cap,t);
2157 // Empty the run queue. It seems tempting to let all the
2158 // killed threads stay on the run queue as zombies to be
2159 // cleaned up later, but some of them correspond to bound
2160 // threads for which the corresponding Task does not exist.
2161 cap->run_queue_hd = END_TSO_QUEUE;
2162 cap->run_queue_tl = END_TSO_QUEUE;
2164 // Any suspended C-calling Tasks are no more, their OS threads
2166 cap->suspended_ccalling_tasks = NULL;
2168 // Empty the all_threads list. Otherwise, the garbage
2169 // collector may attempt to resurrect some of these threads.
2170 all_threads = END_TSO_QUEUE;
2172 // Wipe the task list, except the current Task.
2173 ACQUIRE_LOCK(&sched_mutex);
2174 for (task = all_tasks; task != NULL; task=task->all_link) {
2175 if (task != cap->running_task) {
2179 RELEASE_LOCK(&sched_mutex);
2181 #if defined(THREADED_RTS)
2182 // Wipe our spare workers list, they no longer exist. New
2183 // workers will be created if necessary.
2184 cap->spare_workers = NULL;
2185 cap->returning_tasks_hd = NULL;
2186 cap->returning_tasks_tl = NULL;
2189 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2190 rts_checkSchedStatus("forkProcess",cap);
2193 hs_exit(); // clean up and exit
2194 stg_exit(EXIT_SUCCESS);
2196 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2197 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2202 /* ---------------------------------------------------------------------------
2203 * Delete all the threads in the system
2204 * ------------------------------------------------------------------------- */
2207 deleteAllThreads ( Capability *cap )
2209 // NOTE: only safe to call if we own all capabilities.
2212 debugTrace(DEBUG_sched,"deleting all threads");
2213 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2214 if (t->what_next == ThreadRelocated) {
2217 next = t->global_link;
2218 deleteThread(cap,t);
2222 // The run queue now contains a bunch of ThreadKilled threads. We
2223 // must not throw these away: the main thread(s) will be in there
2224 // somewhere, and the main scheduler loop has to deal with it.
2225 // Also, the run queue is the only thing keeping these threads from
2226 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2228 #if !defined(THREADED_RTS)
2229 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2230 ASSERT(sleeping_queue == END_TSO_QUEUE);
2234 /* -----------------------------------------------------------------------------
2235 Managing the suspended_ccalling_tasks list.
2236 Locks required: sched_mutex
2237 -------------------------------------------------------------------------- */
2240 suspendTask (Capability *cap, Task *task)
2242 ASSERT(task->next == NULL && task->prev == NULL);
2243 task->next = cap->suspended_ccalling_tasks;
2245 if (cap->suspended_ccalling_tasks) {
2246 cap->suspended_ccalling_tasks->prev = task;
2248 cap->suspended_ccalling_tasks = task;
2252 recoverSuspendedTask (Capability *cap, Task *task)
2255 task->prev->next = task->next;
2257 ASSERT(cap->suspended_ccalling_tasks == task);
2258 cap->suspended_ccalling_tasks = task->next;
2261 task->next->prev = task->prev;
2263 task->next = task->prev = NULL;
2266 /* ---------------------------------------------------------------------------
2267 * Suspending & resuming Haskell threads.
2269 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2270 * its capability before calling the C function. This allows another
2271 * task to pick up the capability and carry on running Haskell
2272 * threads. It also means that if the C call blocks, it won't lock
2275 * The Haskell thread making the C call is put to sleep for the
2276 * duration of the call, on the susepended_ccalling_threads queue. We
2277 * give out a token to the task, which it can use to resume the thread
2278 * on return from the C function.
2279 * ------------------------------------------------------------------------- */
2282 suspendThread (StgRegTable *reg)
2285 int saved_errno = errno;
2289 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2291 cap = regTableToCapability(reg);
2293 task = cap->running_task;
2294 tso = cap->r.rCurrentTSO;
2296 debugTrace(DEBUG_sched,
2297 "thread %lu did a safe foreign call",
2298 (unsigned long)cap->r.rCurrentTSO->id);
2300 // XXX this might not be necessary --SDM
2301 tso->what_next = ThreadRunGHC;
2303 threadPaused(cap,tso);
2305 if ((tso->flags & TSO_BLOCKEX) == 0) {
2306 tso->why_blocked = BlockedOnCCall;
2307 tso->flags |= TSO_BLOCKEX;
2308 tso->flags &= ~TSO_INTERRUPTIBLE;
2310 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2313 // Hand back capability
2314 task->suspended_tso = tso;
2316 ACQUIRE_LOCK(&cap->lock);
2318 suspendTask(cap,task);
2319 cap->in_haskell = rtsFalse;
2320 releaseCapability_(cap);
2322 RELEASE_LOCK(&cap->lock);
2324 #if defined(THREADED_RTS)
2325 /* Preparing to leave the RTS, so ensure there's a native thread/task
2326 waiting to take over.
2328 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2331 errno = saved_errno;
2336 resumeThread (void *task_)
2340 int saved_errno = errno;
2344 // Wait for permission to re-enter the RTS with the result.
2345 waitForReturnCapability(&cap,task);
2346 // we might be on a different capability now... but if so, our
2347 // entry on the suspended_ccalling_tasks list will also have been
2350 // Remove the thread from the suspended list
2351 recoverSuspendedTask(cap,task);
2353 tso = task->suspended_tso;
2354 task->suspended_tso = NULL;
2355 tso->link = END_TSO_QUEUE;
2356 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2358 if (tso->why_blocked == BlockedOnCCall) {
2359 awakenBlockedExceptionQueue(cap,tso);
2360 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2363 /* Reset blocking status */
2364 tso->why_blocked = NotBlocked;
2366 cap->r.rCurrentTSO = tso;
2367 cap->in_haskell = rtsTrue;
2368 errno = saved_errno;
2370 /* We might have GC'd, mark the TSO dirty again */
2373 IF_DEBUG(sanity, checkTSO(tso));
2378 /* ---------------------------------------------------------------------------
2381 * scheduleThread puts a thread on the end of the runnable queue.
2382 * This will usually be done immediately after a thread is created.
2383 * The caller of scheduleThread must create the thread using e.g.
2384 * createThread and push an appropriate closure
2385 * on this thread's stack before the scheduler is invoked.
2386 * ------------------------------------------------------------------------ */
2389 scheduleThread(Capability *cap, StgTSO *tso)
2391 // The thread goes at the *end* of the run-queue, to avoid possible
2392 // starvation of any threads already on the queue.
2393 appendToRunQueue(cap,tso);
2397 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2399 #if defined(THREADED_RTS)
2400 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2401 // move this thread from now on.
2402 cpu %= RtsFlags.ParFlags.nNodes;
2403 if (cpu == cap->no) {
2404 appendToRunQueue(cap,tso);
2406 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2409 appendToRunQueue(cap,tso);
2414 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2418 // We already created/initialised the Task
2419 task = cap->running_task;
2421 // This TSO is now a bound thread; make the Task and TSO
2422 // point to each other.
2428 task->stat = NoStatus;
2430 appendToRunQueue(cap,tso);
2432 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2435 /* GranSim specific init */
2436 CurrentTSO = m->tso; // the TSO to run
2437 procStatus[MainProc] = Busy; // status of main PE
2438 CurrentProc = MainProc; // PE to run it on
2441 cap = schedule(cap,task);
2443 ASSERT(task->stat != NoStatus);
2444 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2446 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2450 /* ----------------------------------------------------------------------------
2452 * ------------------------------------------------------------------------- */
2454 #if defined(THREADED_RTS)
2456 workerStart(Task *task)
2460 // See startWorkerTask().
2461 ACQUIRE_LOCK(&task->lock);
2463 RELEASE_LOCK(&task->lock);
2465 // set the thread-local pointer to the Task:
2468 // schedule() runs without a lock.
2469 cap = schedule(cap,task);
2471 // On exit from schedule(), we have a Capability.
2472 releaseCapability(cap);
2473 workerTaskStop(task);
2477 /* ---------------------------------------------------------------------------
2480 * Initialise the scheduler. This resets all the queues - if the
2481 * queues contained any threads, they'll be garbage collected at the
2484 * ------------------------------------------------------------------------ */
2491 for (i=0; i<=MAX_PROC; i++) {
2492 run_queue_hds[i] = END_TSO_QUEUE;
2493 run_queue_tls[i] = END_TSO_QUEUE;
2494 blocked_queue_hds[i] = END_TSO_QUEUE;
2495 blocked_queue_tls[i] = END_TSO_QUEUE;
2496 ccalling_threadss[i] = END_TSO_QUEUE;
2497 blackhole_queue[i] = END_TSO_QUEUE;
2498 sleeping_queue = END_TSO_QUEUE;
2500 #elif !defined(THREADED_RTS)
2501 blocked_queue_hd = END_TSO_QUEUE;
2502 blocked_queue_tl = END_TSO_QUEUE;
2503 sleeping_queue = END_TSO_QUEUE;
2506 blackhole_queue = END_TSO_QUEUE;
2507 all_threads = END_TSO_QUEUE;
2510 sched_state = SCHED_RUNNING;
2512 #if defined(THREADED_RTS)
2513 /* Initialise the mutex and condition variables used by
2515 initMutex(&sched_mutex);
2518 ACQUIRE_LOCK(&sched_mutex);
2520 /* A capability holds the state a native thread needs in
2521 * order to execute STG code. At least one capability is
2522 * floating around (only THREADED_RTS builds have more than one).
2528 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2532 #if defined(THREADED_RTS)
2534 * Eagerly start one worker to run each Capability, except for
2535 * Capability 0. The idea is that we're probably going to start a
2536 * bound thread on Capability 0 pretty soon, so we don't want a
2537 * worker task hogging it.
2542 for (i = 1; i < n_capabilities; i++) {
2543 cap = &capabilities[i];
2544 ACQUIRE_LOCK(&cap->lock);
2545 startWorkerTask(cap, workerStart);
2546 RELEASE_LOCK(&cap->lock);
2551 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2553 RELEASE_LOCK(&sched_mutex);
2557 exitScheduler( void )
2561 #if defined(THREADED_RTS)
2562 ACQUIRE_LOCK(&sched_mutex);
2563 task = newBoundTask();
2564 RELEASE_LOCK(&sched_mutex);
2567 // If we haven't killed all the threads yet, do it now.
2568 if (sched_state < SCHED_SHUTTING_DOWN) {
2569 sched_state = SCHED_INTERRUPTING;
2570 scheduleDoGC(NULL,task,rtsFalse,GetRoots);
2572 sched_state = SCHED_SHUTTING_DOWN;
2574 #if defined(THREADED_RTS)
2578 for (i = 0; i < n_capabilities; i++) {
2579 shutdownCapability(&capabilities[i], task);
2581 boundTaskExiting(task);
2584 closeMutex(&sched_mutex);
2588 /* ---------------------------------------------------------------------------
2589 Where are the roots that we know about?
2591 - all the threads on the runnable queue
2592 - all the threads on the blocked queue
2593 - all the threads on the sleeping queue
2594 - all the thread currently executing a _ccall_GC
2595 - all the "main threads"
2597 ------------------------------------------------------------------------ */
2599 /* This has to be protected either by the scheduler monitor, or by the
2600 garbage collection monitor (probably the latter).
2605 GetRoots( evac_fn evac )
2612 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2613 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2614 evac((StgClosure **)&run_queue_hds[i]);
2615 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2616 evac((StgClosure **)&run_queue_tls[i]);
2618 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2619 evac((StgClosure **)&blocked_queue_hds[i]);
2620 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2621 evac((StgClosure **)&blocked_queue_tls[i]);
2622 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2623 evac((StgClosure **)&ccalling_threads[i]);
2630 for (i = 0; i < n_capabilities; i++) {
2631 cap = &capabilities[i];
2632 evac((StgClosure **)(void *)&cap->run_queue_hd);
2633 evac((StgClosure **)(void *)&cap->run_queue_tl);
2634 #if defined(THREADED_RTS)
2635 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2636 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2638 for (task = cap->suspended_ccalling_tasks; task != NULL;
2640 debugTrace(DEBUG_sched,
2641 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2642 evac((StgClosure **)(void *)&task->suspended_tso);
2648 #if !defined(THREADED_RTS)
2649 evac((StgClosure **)(void *)&blocked_queue_hd);
2650 evac((StgClosure **)(void *)&blocked_queue_tl);
2651 evac((StgClosure **)(void *)&sleeping_queue);
2655 // evac((StgClosure **)&blackhole_queue);
2657 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2658 markSparkQueue(evac);
2661 #if defined(RTS_USER_SIGNALS)
2662 // mark the signal handlers (signals should be already blocked)
2663 markSignalHandlers(evac);
2667 /* -----------------------------------------------------------------------------
2670 This is the interface to the garbage collector from Haskell land.
2671 We provide this so that external C code can allocate and garbage
2672 collect when called from Haskell via _ccall_GC.
2674 It might be useful to provide an interface whereby the programmer
2675 can specify more roots (ToDo).
2677 This needs to be protected by the GC condition variable above. KH.
2678 -------------------------------------------------------------------------- */
2680 static void (*extra_roots)(evac_fn);
2683 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2686 // We must grab a new Task here, because the existing Task may be
2687 // associated with a particular Capability, and chained onto the
2688 // suspended_ccalling_tasks queue.
2689 ACQUIRE_LOCK(&sched_mutex);
2690 task = newBoundTask();
2691 RELEASE_LOCK(&sched_mutex);
2692 scheduleDoGC(NULL,task,force_major, get_roots);
2693 boundTaskExiting(task);
2699 performGC_(rtsFalse, GetRoots);
2703 performMajorGC(void)
2705 performGC_(rtsTrue, GetRoots);
2709 AllRoots(evac_fn evac)
2711 GetRoots(evac); // the scheduler's roots
2712 extra_roots(evac); // the user's roots
2716 performGCWithRoots(void (*get_roots)(evac_fn))
2718 extra_roots = get_roots;
2719 performGC_(rtsFalse, AllRoots);
2722 /* -----------------------------------------------------------------------------
2725 If the thread has reached its maximum stack size, then raise the
2726 StackOverflow exception in the offending thread. Otherwise
2727 relocate the TSO into a larger chunk of memory and adjust its stack
2729 -------------------------------------------------------------------------- */
2732 threadStackOverflow(Capability *cap, StgTSO *tso)
2734 nat new_stack_size, stack_words;
2739 IF_DEBUG(sanity,checkTSO(tso));
2741 // don't allow throwTo() to modify the blocked_exceptions queue
2742 // while we are moving the TSO:
2743 lockClosure((StgClosure *)tso);
2745 if (tso->stack_size >= tso->max_stack_size) {
2747 debugTrace(DEBUG_gc,
2748 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2749 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2751 /* If we're debugging, just print out the top of the stack */
2752 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2755 // Send this thread the StackOverflow exception
2757 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2761 /* Try to double the current stack size. If that takes us over the
2762 * maximum stack size for this thread, then use the maximum instead.
2763 * Finally round up so the TSO ends up as a whole number of blocks.
2765 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2766 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2767 TSO_STRUCT_SIZE)/sizeof(W_);
2768 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2769 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2771 debugTrace(DEBUG_sched,
2772 "increasing stack size from %ld words to %d.",
2773 (long)tso->stack_size, new_stack_size);
2775 dest = (StgTSO *)allocate(new_tso_size);
2776 TICK_ALLOC_TSO(new_stack_size,0);
2778 /* copy the TSO block and the old stack into the new area */
2779 memcpy(dest,tso,TSO_STRUCT_SIZE);
2780 stack_words = tso->stack + tso->stack_size - tso->sp;
2781 new_sp = (P_)dest + new_tso_size - stack_words;
2782 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2784 /* relocate the stack pointers... */
2786 dest->stack_size = new_stack_size;
2788 /* Mark the old TSO as relocated. We have to check for relocated
2789 * TSOs in the garbage collector and any primops that deal with TSOs.
2791 * It's important to set the sp value to just beyond the end
2792 * of the stack, so we don't attempt to scavenge any part of the
2795 tso->what_next = ThreadRelocated;
2797 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2798 tso->why_blocked = NotBlocked;
2800 IF_PAR_DEBUG(verbose,
2801 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2802 tso->id, tso, tso->stack_size);
2803 /* If we're debugging, just print out the top of the stack */
2804 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2810 IF_DEBUG(sanity,checkTSO(dest));
2812 IF_DEBUG(scheduler,printTSO(dest));
2818 /* ---------------------------------------------------------------------------
2820 - usually called inside a signal handler so it mustn't do anything fancy.
2821 ------------------------------------------------------------------------ */
2824 interruptStgRts(void)
2826 sched_state = SCHED_INTERRUPTING;
2831 /* -----------------------------------------------------------------------------
2834 This function causes at least one OS thread to wake up and run the
2835 scheduler loop. It is invoked when the RTS might be deadlocked, or
2836 an external event has arrived that may need servicing (eg. a
2837 keyboard interrupt).
2839 In the single-threaded RTS we don't do anything here; we only have
2840 one thread anyway, and the event that caused us to want to wake up
2841 will have interrupted any blocking system call in progress anyway.
2842 -------------------------------------------------------------------------- */
2847 #if defined(THREADED_RTS)
2848 #if !defined(mingw32_HOST_OS)
2849 // This forces the IO Manager thread to wakeup, which will
2850 // in turn ensure that some OS thread wakes up and runs the
2851 // scheduler loop, which will cause a GC and deadlock check.
2854 // On Windows this might be safe enough, because we aren't
2855 // in a signal handler. Later we should use the IO Manager,
2857 prodOneCapability();
2862 /* -----------------------------------------------------------------------------
2865 * Check the blackhole_queue for threads that can be woken up. We do
2866 * this periodically: before every GC, and whenever the run queue is
2869 * An elegant solution might be to just wake up all the blocked
2870 * threads with awakenBlockedQueue occasionally: they'll go back to
2871 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2872 * doesn't give us a way to tell whether we've actually managed to
2873 * wake up any threads, so we would be busy-waiting.
2875 * -------------------------------------------------------------------------- */
2878 checkBlackHoles (Capability *cap)
2881 rtsBool any_woke_up = rtsFalse;
2884 // blackhole_queue is global:
2885 ASSERT_LOCK_HELD(&sched_mutex);
2887 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2889 // ASSUMES: sched_mutex
2890 prev = &blackhole_queue;
2891 t = blackhole_queue;
2892 while (t != END_TSO_QUEUE) {
2893 ASSERT(t->why_blocked == BlockedOnBlackHole);
2894 type = get_itbl(t->block_info.closure)->type;
2895 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2896 IF_DEBUG(sanity,checkTSO(t));
2897 t = unblockOne(cap, t);
2898 // urk, the threads migrate to the current capability
2899 // here, but we'd like to keep them on the original one.
2901 any_woke_up = rtsTrue;
2911 /* -----------------------------------------------------------------------------
2914 This is used for interruption (^C) and forking, and corresponds to
2915 raising an exception but without letting the thread catch the
2917 -------------------------------------------------------------------------- */
2920 deleteThread (Capability *cap, StgTSO *tso)
2922 // NOTE: must only be called on a TSO that we have exclusive
2923 // access to, because we will call throwToSingleThreaded() below.
2924 // The TSO must be on the run queue of the Capability we own, or
2925 // we must own all Capabilities.
2927 if (tso->why_blocked != BlockedOnCCall &&
2928 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2929 throwToSingleThreaded(cap,tso,NULL);
2933 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2935 deleteThread_(Capability *cap, StgTSO *tso)
2936 { // for forkProcess only:
2937 // like deleteThread(), but we delete threads in foreign calls, too.
2939 if (tso->why_blocked == BlockedOnCCall ||
2940 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2941 unblockOne(cap,tso);
2942 tso->what_next = ThreadKilled;
2944 deleteThread(cap,tso);
2949 /* -----------------------------------------------------------------------------
2950 raiseExceptionHelper
2952 This function is called by the raise# primitve, just so that we can
2953 move some of the tricky bits of raising an exception from C-- into
2954 C. Who knows, it might be a useful re-useable thing here too.
2955 -------------------------------------------------------------------------- */
2958 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2960 Capability *cap = regTableToCapability(reg);
2961 StgThunk *raise_closure = NULL;
2963 StgRetInfoTable *info;
2965 // This closure represents the expression 'raise# E' where E
2966 // is the exception raise. It is used to overwrite all the
2967 // thunks which are currently under evaluataion.
2970 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2971 // LDV profiling: stg_raise_info has THUNK as its closure
2972 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2973 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2974 // 1 does not cause any problem unless profiling is performed.
2975 // However, when LDV profiling goes on, we need to linearly scan
2976 // small object pool, where raise_closure is stored, so we should
2977 // use MIN_UPD_SIZE.
2979 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2980 // sizeofW(StgClosure)+1);
2984 // Walk up the stack, looking for the catch frame. On the way,
2985 // we update any closures pointed to from update frames with the
2986 // raise closure that we just built.
2990 info = get_ret_itbl((StgClosure *)p);
2991 next = p + stack_frame_sizeW((StgClosure *)p);
2992 switch (info->i.type) {
2995 // Only create raise_closure if we need to.
2996 if (raise_closure == NULL) {
2998 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2999 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3000 raise_closure->payload[0] = exception;
3002 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3006 case ATOMICALLY_FRAME:
3007 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3009 return ATOMICALLY_FRAME;
3015 case CATCH_STM_FRAME:
3016 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3018 return CATCH_STM_FRAME;
3024 case CATCH_RETRY_FRAME:
3033 /* -----------------------------------------------------------------------------
3034 findRetryFrameHelper
3036 This function is called by the retry# primitive. It traverses the stack
3037 leaving tso->sp referring to the frame which should handle the retry.
3039 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3040 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3042 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3043 despite the similar implementation.
3045 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3046 not be created within memory transactions.
3047 -------------------------------------------------------------------------- */
3050 findRetryFrameHelper (StgTSO *tso)
3053 StgRetInfoTable *info;
3057 info = get_ret_itbl((StgClosure *)p);
3058 next = p + stack_frame_sizeW((StgClosure *)p);
3059 switch (info->i.type) {
3061 case ATOMICALLY_FRAME:
3062 debugTrace(DEBUG_stm,
3063 "found ATOMICALLY_FRAME at %p during retrry", p);
3065 return ATOMICALLY_FRAME;
3067 case CATCH_RETRY_FRAME:
3068 debugTrace(DEBUG_stm,
3069 "found CATCH_RETRY_FRAME at %p during retrry", p);
3071 return CATCH_RETRY_FRAME;
3073 case CATCH_STM_FRAME:
3075 ASSERT(info->i.type != CATCH_FRAME);
3076 ASSERT(info->i.type != STOP_FRAME);
3083 /* -----------------------------------------------------------------------------
3084 resurrectThreads is called after garbage collection on the list of
3085 threads found to be garbage. Each of these threads will be woken
3086 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3087 on an MVar, or NonTermination if the thread was blocked on a Black
3090 Locks: assumes we hold *all* the capabilities.
3091 -------------------------------------------------------------------------- */
3094 resurrectThreads (StgTSO *threads)
3099 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3100 next = tso->global_link;
3101 tso->global_link = all_threads;
3103 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3105 // Wake up the thread on the Capability it was last on
3108 switch (tso->why_blocked) {
3110 case BlockedOnException:
3111 /* Called by GC - sched_mutex lock is currently held. */
3112 throwToSingleThreaded(cap, tso,
3113 (StgClosure *)BlockedOnDeadMVar_closure);
3115 case BlockedOnBlackHole:
3116 throwToSingleThreaded(cap, tso,
3117 (StgClosure *)NonTermination_closure);
3120 throwToSingleThreaded(cap, tso,
3121 (StgClosure *)BlockedIndefinitely_closure);
3124 /* This might happen if the thread was blocked on a black hole
3125 * belonging to a thread that we've just woken up (raiseAsync
3126 * can wake up threads, remember...).
3130 barf("resurrectThreads: thread blocked in a strange way");