1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
44 #include "Capability.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
51 #include "RaiseAsync.h"
53 #include "ThrIOManager.h"
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
70 // Turn off inlining when debugging - it obfuscates things
73 # define STATIC_INLINE static
76 /* -----------------------------------------------------------------------------
78 * -------------------------------------------------------------------------- */
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
112 StgTSO *blackhole_queue = NULL;
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
119 rtsBool blackholes_need_checking = rtsFalse;
121 /* flag set by signal handler to precipitate a context switch
122 * LOCK: none (just an advisory flag)
124 int context_switch = 0;
126 /* flag that tracks whether we have done any execution in this time slice.
127 * LOCK: currently none, perhaps we should lock (but needs to be
128 * updated in the fast path of the scheduler).
130 nat recent_activity = ACTIVITY_YES;
132 /* if this flag is set as well, give up execution
133 * LOCK: none (changes once, from false->true)
135 rtsBool sched_state = SCHED_RUNNING;
141 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
142 * exists - earlier gccs apparently didn't.
148 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149 * in an MT setting, needed to signal that a worker thread shouldn't hang around
150 * in the scheduler when it is out of work.
152 rtsBool shutting_down_scheduler = rtsFalse;
155 * This mutex protects most of the global scheduler data in
156 * the THREADED_RTS runtime.
158 #if defined(THREADED_RTS)
162 #if defined(PARALLEL_HASKELL)
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
172 /* -----------------------------------------------------------------------------
173 * static function prototypes
174 * -------------------------------------------------------------------------- */
176 static Capability *schedule (Capability *initialCapability, Task *task);
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
203 static void schedulePostRunThread(void);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
208 nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214 rtsBool force_major);
216 static rtsBool checkBlackHoles(Capability *cap);
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);
234 static char *whatNext_strs[] = {
244 /* -----------------------------------------------------------------------------
245 * Putting a thread on the run queue: different scheduling policies
246 * -------------------------------------------------------------------------- */
249 addToRunQueue( Capability *cap, StgTSO *t )
251 #if defined(PARALLEL_HASKELL)
252 if (RtsFlags.ParFlags.doFairScheduling) {
253 // this does round-robin scheduling; good for concurrency
254 appendToRunQueue(cap,t);
256 // this does unfair scheduling; good for parallelism
257 pushOnRunQueue(cap,t);
260 // this does round-robin scheduling; good for concurrency
261 appendToRunQueue(cap,t);
265 /* ---------------------------------------------------------------------------
266 Main scheduling loop.
268 We use round-robin scheduling, each thread returning to the
269 scheduler loop when one of these conditions is detected:
272 * timer expires (thread yields)
278 In a GranSim setup this loop iterates over the global event queue.
279 This revolves around the global event queue, which determines what
280 to do next. Therefore, it's more complicated than either the
281 concurrent or the parallel (GUM) setup.
284 GUM iterates over incoming messages.
285 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286 and sends out a fish whenever it has nothing to do; in-between
287 doing the actual reductions (shared code below) it processes the
288 incoming messages and deals with delayed operations
289 (see PendingFetches).
290 This is not the ugliest code you could imagine, but it's bloody close.
292 ------------------------------------------------------------------------ */
295 schedule (Capability *initialCapability, Task *task)
299 StgThreadReturnCode ret;
302 #elif defined(PARALLEL_HASKELL)
305 rtsBool receivedFinish = rtsFalse;
307 nat tp_size, sp_size; // stats only
312 #if defined(THREADED_RTS)
313 rtsBool first = rtsTrue;
316 cap = initialCapability;
318 // Pre-condition: this task owns initialCapability.
319 // The sched_mutex is *NOT* held
320 // NB. on return, we still hold a capability.
322 debugTrace (DEBUG_sched,
323 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324 task, initialCapability);
328 // -----------------------------------------------------------
329 // Scheduler loop starts here:
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION (!receivedFinish)
334 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
336 #define TERMINATION_CONDITION rtsTrue
339 while (TERMINATION_CONDITION) {
342 /* Choose the processor with the next event */
343 CurrentProc = event->proc;
344 CurrentTSO = event->tso;
347 #if defined(THREADED_RTS)
349 // don't yield the first time, we want a chance to run this
350 // thread for a bit, even if there are others banging at the
353 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
355 // Yield the capability to higher-priority tasks if necessary.
356 yieldCapability(&cap, task);
360 #if defined(THREADED_RTS)
361 schedulePushWork(cap,task);
364 // Check whether we have re-entered the RTS from Haskell without
365 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
367 if (cap->in_haskell) {
368 errorBelch("schedule: re-entered unsafely.\n"
369 " Perhaps a 'foreign import unsafe' should be 'safe'?");
370 stg_exit(EXIT_FAILURE);
373 // The interruption / shutdown sequence.
375 // In order to cleanly shut down the runtime, we want to:
376 // * make sure that all main threads return to their callers
377 // with the state 'Interrupted'.
378 // * clean up all OS threads assocated with the runtime
379 // * free all memory etc.
381 // So the sequence for ^C goes like this:
383 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
384 // arranges for some Capability to wake up
386 // * all threads in the system are halted, and the zombies are
387 // placed on the run queue for cleaning up. We acquire all
388 // the capabilities in order to delete the threads, this is
389 // done by scheduleDoGC() for convenience (because GC already
390 // needs to acquire all the capabilities). We can't kill
391 // threads involved in foreign calls.
393 // * somebody calls shutdownHaskell(), which calls exitScheduler()
395 // * sched_state := SCHED_SHUTTING_DOWN
397 // * all workers exit when the run queue on their capability
398 // drains. All main threads will also exit when their TSO
399 // reaches the head of the run queue and they can return.
401 // * eventually all Capabilities will shut down, and the RTS can
404 // * We might be left with threads blocked in foreign calls,
405 // we should really attempt to kill these somehow (TODO);
407 switch (sched_state) {
410 case SCHED_INTERRUPTING:
411 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413 discardSparksCap(cap);
415 /* scheduleDoGC() deletes all the threads */
416 cap = scheduleDoGC(cap,task,rtsFalse);
418 case SCHED_SHUTTING_DOWN:
419 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420 // If we are a worker, just exit. If we're a bound thread
421 // then we will exit below when we've removed our TSO from
423 if (task->tso == NULL && emptyRunQueue(cap)) {
428 barf("sched_state: %d", sched_state);
431 #if defined(THREADED_RTS)
432 // If the run queue is empty, take a spark and turn it into a thread.
434 if (emptyRunQueue(cap)) {
436 spark = findSpark(cap);
438 debugTrace(DEBUG_sched,
439 "turning spark of closure %p into a thread",
440 (StgClosure *)spark);
441 createSparkThread(cap,spark);
445 #endif // THREADED_RTS
447 scheduleStartSignalHandlers(cap);
449 // Only check the black holes here if we've nothing else to do.
450 // During normal execution, the black hole list only gets checked
451 // at GC time, to avoid repeatedly traversing this possibly long
452 // list each time around the scheduler.
453 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
455 scheduleCheckWakeupThreads(cap);
457 scheduleCheckBlockedThreads(cap);
459 scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461 cap = task->cap; // reload cap, it might have changed
464 // Normally, the only way we can get here with no threads to
465 // run is if a keyboard interrupt received during
466 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467 // Additionally, it is not fatal for the
468 // threaded RTS to reach here with no threads to run.
470 // win32: might be here due to awaitEvent() being abandoned
471 // as a result of a console event having been delivered.
472 if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474 ASSERT(sched_state >= SCHED_INTERRUPTING);
476 continue; // nothing to do
479 #if defined(PARALLEL_HASKELL)
480 scheduleSendPendingMessages();
481 if (emptyRunQueue(cap) && scheduleActivateSpark())
485 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
488 /* If we still have no work we need to send a FISH to get a spark
490 if (emptyRunQueue(cap)) {
491 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492 ASSERT(rtsFalse); // should not happen at the moment
494 // from here: non-empty run queue.
495 // TODO: merge above case with this, only one call processMessages() !
496 if (PacketsWaiting()) { /* process incoming messages, if
497 any pending... only in else
498 because getRemoteWork waits for
500 receivedFinish = processMessages();
505 scheduleProcessEvent(event);
509 // Get a thread to run
511 t = popRunQueue(cap);
513 #if defined(GRAN) || defined(PAR)
514 scheduleGranParReport(); // some kind of debuging output
516 // Sanity check the thread we're about to run. This can be
517 // expensive if there is lots of thread switching going on...
518 IF_DEBUG(sanity,checkTSO(t));
521 #if defined(THREADED_RTS)
522 // Check whether we can run this thread in the current task.
523 // If not, we have to pass our capability to the right task.
525 Task *bound = t->bound;
529 debugTrace(DEBUG_sched,
530 "### Running thread %lu in bound thread", (unsigned long)t->id);
531 // yes, the Haskell thread is bound to the current native thread
533 debugTrace(DEBUG_sched,
534 "### thread %lu bound to another OS thread", (unsigned long)t->id);
535 // no, bound to a different Haskell thread: pass to that thread
536 pushOnRunQueue(cap,t);
540 // The thread we want to run is unbound.
542 debugTrace(DEBUG_sched,
543 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544 // no, the current native thread is bound to a different
545 // Haskell thread, so pass it to any worker thread
546 pushOnRunQueue(cap,t);
553 cap->r.rCurrentTSO = t;
555 /* context switches are initiated by the timer signal, unless
556 * the user specified "context switch as often as possible", with
559 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560 && !emptyThreadQueues(cap)) {
566 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
567 (long)t->id, whatNext_strs[t->what_next]);
569 startHeapProfTimer();
571 // Check for exceptions blocked on this thread
572 maybePerformBlockedException (cap, t);
574 // ----------------------------------------------------------------------
575 // Run the current thread
577 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
578 ASSERT(t->cap == cap);
580 prev_what_next = t->what_next;
582 errno = t->saved_errno;
584 SetLastError(t->saved_winerror);
587 cap->in_haskell = rtsTrue;
591 #if defined(THREADED_RTS)
592 if (recent_activity == ACTIVITY_DONE_GC) {
593 // ACTIVITY_DONE_GC means we turned off the timer signal to
594 // conserve power (see #1623). Re-enable it here.
596 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
597 if (prev == ACTIVITY_DONE_GC) {
601 recent_activity = ACTIVITY_YES;
605 switch (prev_what_next) {
609 /* Thread already finished, return to scheduler. */
610 ret = ThreadFinished;
616 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
617 cap = regTableToCapability(r);
622 case ThreadInterpret:
623 cap = interpretBCO(cap);
628 barf("schedule: invalid what_next field");
631 cap->in_haskell = rtsFalse;
633 // The TSO might have moved, eg. if it re-entered the RTS and a GC
634 // happened. So find the new location:
635 t = cap->r.rCurrentTSO;
637 // We have run some Haskell code: there might be blackhole-blocked
638 // threads to wake up now.
639 // Lock-free test here should be ok, we're just setting a flag.
640 if ( blackhole_queue != END_TSO_QUEUE ) {
641 blackholes_need_checking = rtsTrue;
644 // And save the current errno in this thread.
645 // XXX: possibly bogus for SMP because this thread might already
646 // be running again, see code below.
647 t->saved_errno = errno;
649 // Similarly for Windows error code
650 t->saved_winerror = GetLastError();
653 #if defined(THREADED_RTS)
654 // If ret is ThreadBlocked, and this Task is bound to the TSO that
655 // blocked, we are in limbo - the TSO is now owned by whatever it
656 // is blocked on, and may in fact already have been woken up,
657 // perhaps even on a different Capability. It may be the case
658 // that task->cap != cap. We better yield this Capability
659 // immediately and return to normaility.
660 if (ret == ThreadBlocked) {
661 debugTrace(DEBUG_sched,
662 "--<< thread %lu (%s) stopped: blocked",
663 (unsigned long)t->id, whatNext_strs[t->what_next]);
668 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
669 ASSERT(t->cap == cap);
671 // ----------------------------------------------------------------------
673 // Costs for the scheduler are assigned to CCS_SYSTEM
675 #if defined(PROFILING)
679 schedulePostRunThread();
681 t = threadStackUnderflow(task,t);
683 ready_to_gc = rtsFalse;
687 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691 scheduleHandleStackOverflow(cap,task,t);
695 if (scheduleHandleYield(cap, t, prev_what_next)) {
696 // shortcut for switching between compiler/interpreter:
702 scheduleHandleThreadBlocked(t);
706 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
707 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711 barf("schedule: invalid thread return code %d", (int)ret);
714 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
715 cap = scheduleDoGC(cap,task,rtsFalse);
717 } /* end of while() */
720 /* ----------------------------------------------------------------------------
721 * Setting up the scheduler loop
722 * ------------------------------------------------------------------------- */
725 schedulePreLoop(void)
728 /* set up first event to get things going */
729 /* ToDo: assign costs for system setup and init MainTSO ! */
730 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
732 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
734 debugTrace (DEBUG_gran,
735 "GRAN: Init CurrentTSO (in schedule) = %p",
737 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
739 if (RtsFlags.GranFlags.Light) {
740 /* Save current time; GranSim Light only */
741 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
746 /* -----------------------------------------------------------------------------
749 * Push work to other Capabilities if we have some.
750 * -------------------------------------------------------------------------- */
752 #if defined(THREADED_RTS)
754 schedulePushWork(Capability *cap USED_IF_THREADS,
755 Task *task USED_IF_THREADS)
757 Capability *free_caps[n_capabilities], *cap0;
760 // migration can be turned off with +RTS -qg
761 if (!RtsFlags.ParFlags.migrate) return;
763 // Check whether we have more threads on our run queue, or sparks
764 // in our pool, that we could hand to another Capability.
765 if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
766 && sparkPoolSizeCap(cap) < 2) {
770 // First grab as many free Capabilities as we can.
771 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
772 cap0 = &capabilities[i];
773 if (cap != cap0 && tryGrabCapability(cap0,task)) {
774 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
775 // it already has some work, we just grabbed it at
776 // the wrong moment. Or maybe it's deadlocked!
777 releaseCapability(cap0);
779 free_caps[n_free_caps++] = cap0;
784 // we now have n_free_caps free capabilities stashed in
785 // free_caps[]. Share our run queue equally with them. This is
786 // probably the simplest thing we could do; improvements we might
787 // want to do include:
789 // - giving high priority to moving relatively new threads, on
790 // the gournds that they haven't had time to build up a
791 // working set in the cache on this CPU/Capability.
793 // - giving low priority to moving long-lived threads
795 if (n_free_caps > 0) {
796 StgTSO *prev, *t, *next;
797 rtsBool pushed_to_all;
799 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
802 pushed_to_all = rtsFalse;
804 if (cap->run_queue_hd != END_TSO_QUEUE) {
805 prev = cap->run_queue_hd;
807 prev->_link = END_TSO_QUEUE;
808 for (; t != END_TSO_QUEUE; t = next) {
810 t->_link = END_TSO_QUEUE;
811 if (t->what_next == ThreadRelocated
812 || t->bound == task // don't move my bound thread
813 || tsoLocked(t)) { // don't move a locked thread
814 setTSOLink(cap, prev, t);
816 } else if (i == n_free_caps) {
817 pushed_to_all = rtsTrue;
820 setTSOLink(cap, prev, t);
823 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
824 appendToRunQueue(free_caps[i],t);
825 if (t->bound) { t->bound->cap = free_caps[i]; }
826 t->cap = free_caps[i];
830 cap->run_queue_tl = prev;
833 // If there are some free capabilities that we didn't push any
834 // threads to, then try to push a spark to each one.
835 if (!pushed_to_all) {
837 // i is the next free capability to push to
838 for (; i < n_free_caps; i++) {
839 if (emptySparkPoolCap(free_caps[i])) {
840 spark = findSpark(cap);
842 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
843 newSpark(&(free_caps[i]->r), spark);
849 // release the capabilities
850 for (i = 0; i < n_free_caps; i++) {
851 task->cap = free_caps[i];
852 releaseCapability(free_caps[i]);
855 task->cap = cap; // reset to point to our Capability.
859 /* ----------------------------------------------------------------------------
860 * Start any pending signal handlers
861 * ------------------------------------------------------------------------- */
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
865 scheduleStartSignalHandlers(Capability *cap)
867 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868 // safe outside the lock
869 startSignalHandlers(cap);
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
879 /* ----------------------------------------------------------------------------
880 * Check for blocked threads that can be woken up.
881 * ------------------------------------------------------------------------- */
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
886 #if !defined(THREADED_RTS)
888 // Check whether any waiting threads need to be woken up. If the
889 // run queue is empty, and there are no other tasks running, we
890 // can wait indefinitely for something to happen.
892 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
894 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
900 /* ----------------------------------------------------------------------------
901 * Check for threads woken up by other Capabilities
902 * ------------------------------------------------------------------------- */
905 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
907 #if defined(THREADED_RTS)
908 // Any threads that were woken up by other Capabilities get
909 // appended to our run queue.
910 if (!emptyWakeupQueue(cap)) {
911 ACQUIRE_LOCK(&cap->lock);
912 if (emptyRunQueue(cap)) {
913 cap->run_queue_hd = cap->wakeup_queue_hd;
914 cap->run_queue_tl = cap->wakeup_queue_tl;
916 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
917 cap->run_queue_tl = cap->wakeup_queue_tl;
919 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
920 RELEASE_LOCK(&cap->lock);
925 /* ----------------------------------------------------------------------------
926 * Check for threads blocked on BLACKHOLEs that can be woken up
927 * ------------------------------------------------------------------------- */
929 scheduleCheckBlackHoles (Capability *cap)
931 if ( blackholes_need_checking ) // check without the lock first
933 ACQUIRE_LOCK(&sched_mutex);
934 if ( blackholes_need_checking ) {
935 checkBlackHoles(cap);
936 blackholes_need_checking = rtsFalse;
938 RELEASE_LOCK(&sched_mutex);
942 /* ----------------------------------------------------------------------------
943 * Detect deadlock conditions and attempt to resolve them.
944 * ------------------------------------------------------------------------- */
947 scheduleDetectDeadlock (Capability *cap, Task *task)
950 #if defined(PARALLEL_HASKELL)
951 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
956 * Detect deadlock: when we have no threads to run, there are no
957 * threads blocked, waiting for I/O, or sleeping, and all the
958 * other tasks are waiting for work, we must have a deadlock of
961 if ( emptyThreadQueues(cap) )
963 #if defined(THREADED_RTS)
965 * In the threaded RTS, we only check for deadlock if there
966 * has been no activity in a complete timeslice. This means
967 * we won't eagerly start a full GC just because we don't have
968 * any threads to run currently.
970 if (recent_activity != ACTIVITY_INACTIVE) return;
973 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
975 // Garbage collection can release some new threads due to
976 // either (a) finalizers or (b) threads resurrected because
977 // they are unreachable and will therefore be sent an
978 // exception. Any threads thus released will be immediately
980 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
982 recent_activity = ACTIVITY_DONE_GC;
983 // disable timer signals (see #1623)
986 if ( !emptyRunQueue(cap) ) return;
988 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
989 /* If we have user-installed signal handlers, then wait
990 * for signals to arrive rather then bombing out with a
993 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
994 debugTrace(DEBUG_sched,
995 "still deadlocked, waiting for signals...");
999 if (signals_pending()) {
1000 startSignalHandlers(cap);
1003 // either we have threads to run, or we were interrupted:
1004 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1008 #if !defined(THREADED_RTS)
1009 /* Probably a real deadlock. Send the current main thread the
1010 * Deadlock exception.
1013 switch (task->tso->why_blocked) {
1015 case BlockedOnBlackHole:
1016 case BlockedOnException:
1018 throwToSingleThreaded(cap, task->tso,
1019 (StgClosure *)NonTermination_closure);
1022 barf("deadlock: main thread blocked in a strange way");
1030 /* ----------------------------------------------------------------------------
1031 * Process an event (GRAN only)
1032 * ------------------------------------------------------------------------- */
1036 scheduleProcessEvent(rtsEvent *event)
1040 if (RtsFlags.GranFlags.Light)
1041 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1043 /* adjust time based on time-stamp */
1044 if (event->time > CurrentTime[CurrentProc] &&
1045 event->evttype != ContinueThread)
1046 CurrentTime[CurrentProc] = event->time;
1048 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1049 if (!RtsFlags.GranFlags.Light)
1052 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1054 /* main event dispatcher in GranSim */
1055 switch (event->evttype) {
1056 /* Should just be continuing execution */
1057 case ContinueThread:
1058 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1059 /* ToDo: check assertion
1060 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1061 run_queue_hd != END_TSO_QUEUE);
1063 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1064 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1065 procStatus[CurrentProc]==Fetching) {
1066 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1067 CurrentTSO->id, CurrentTSO, CurrentProc);
1070 /* Ignore ContinueThreads for completed threads */
1071 if (CurrentTSO->what_next == ThreadComplete) {
1072 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1073 CurrentTSO->id, CurrentTSO, CurrentProc);
1076 /* Ignore ContinueThreads for threads that are being migrated */
1077 if (PROCS(CurrentTSO)==Nowhere) {
1078 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1079 CurrentTSO->id, CurrentTSO, CurrentProc);
1082 /* The thread should be at the beginning of the run queue */
1083 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1084 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1085 CurrentTSO->id, CurrentTSO, CurrentProc);
1086 break; // run the thread anyway
1089 new_event(proc, proc, CurrentTime[proc],
1091 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1093 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1094 break; // now actually run the thread; DaH Qu'vam yImuHbej
1097 do_the_fetchnode(event);
1098 goto next_thread; /* handle next event in event queue */
1101 do_the_globalblock(event);
1102 goto next_thread; /* handle next event in event queue */
1105 do_the_fetchreply(event);
1106 goto next_thread; /* handle next event in event queue */
1108 case UnblockThread: /* Move from the blocked queue to the tail of */
1109 do_the_unblock(event);
1110 goto next_thread; /* handle next event in event queue */
1112 case ResumeThread: /* Move from the blocked queue to the tail of */
1113 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1114 event->tso->gran.blocktime +=
1115 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1116 do_the_startthread(event);
1117 goto next_thread; /* handle next event in event queue */
1120 do_the_startthread(event);
1121 goto next_thread; /* handle next event in event queue */
1124 do_the_movethread(event);
1125 goto next_thread; /* handle next event in event queue */
1128 do_the_movespark(event);
1129 goto next_thread; /* handle next event in event queue */
1132 do_the_findwork(event);
1133 goto next_thread; /* handle next event in event queue */
1136 barf("Illegal event type %u\n", event->evttype);
1139 /* This point was scheduler_loop in the old RTS */
1141 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1143 TimeOfLastEvent = CurrentTime[CurrentProc];
1144 TimeOfNextEvent = get_time_of_next_event();
1145 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1146 // CurrentTSO = ThreadQueueHd;
1148 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1151 if (RtsFlags.GranFlags.Light)
1152 GranSimLight_leave_system(event, &ActiveTSO);
1154 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1157 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1159 /* in a GranSim setup the TSO stays on the run queue */
1161 /* Take a thread from the run queue. */
1162 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1165 debugBelch("GRAN: About to run current thread, which is\n");
1168 context_switch = 0; // turned on via GranYield, checking events and time slice
1171 DumpGranEvent(GR_SCHEDULE, t));
1173 procStatus[CurrentProc] = Busy;
1177 /* ----------------------------------------------------------------------------
1178 * Send pending messages (PARALLEL_HASKELL only)
1179 * ------------------------------------------------------------------------- */
1181 #if defined(PARALLEL_HASKELL)
1183 scheduleSendPendingMessages(void)
1189 # if defined(PAR) // global Mem.Mgmt., omit for now
1190 if (PendingFetches != END_BF_QUEUE) {
1195 if (RtsFlags.ParFlags.BufferTime) {
1196 // if we use message buffering, we must send away all message
1197 // packets which have become too old...
1203 /* ----------------------------------------------------------------------------
1204 * Activate spark threads (PARALLEL_HASKELL only)
1205 * ------------------------------------------------------------------------- */
1207 #if defined(PARALLEL_HASKELL)
1209 scheduleActivateSpark(void)
1212 ASSERT(emptyRunQueue());
1213 /* We get here if the run queue is empty and want some work.
1214 We try to turn a spark into a thread, and add it to the run queue,
1215 from where it will be picked up in the next iteration of the scheduler
1219 /* :-[ no local threads => look out for local sparks */
1220 /* the spark pool for the current PE */
1221 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1222 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1223 pool->hd < pool->tl) {
1225 * ToDo: add GC code check that we really have enough heap afterwards!!
1227 * If we're here (no runnable threads) and we have pending
1228 * sparks, we must have a space problem. Get enough space
1229 * to turn one of those pending sparks into a
1233 spark = findSpark(rtsFalse); /* get a spark */
1234 if (spark != (rtsSpark) NULL) {
1235 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1236 IF_PAR_DEBUG(fish, // schedule,
1237 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1238 tso->id, tso, advisory_thread_count));
1240 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1241 IF_PAR_DEBUG(fish, // schedule,
1242 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1244 return rtsFalse; /* failed to generate a thread */
1245 } /* otherwise fall through & pick-up new tso */
1247 IF_PAR_DEBUG(fish, // schedule,
1248 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1249 spark_queue_len(pool)));
1250 return rtsFalse; /* failed to generate a thread */
1252 return rtsTrue; /* success in generating a thread */
1253 } else { /* no more threads permitted or pool empty */
1254 return rtsFalse; /* failed to generateThread */
1257 tso = NULL; // avoid compiler warning only
1258 return rtsFalse; /* dummy in non-PAR setup */
1261 #endif // PARALLEL_HASKELL
1263 /* ----------------------------------------------------------------------------
1264 * Get work from a remote node (PARALLEL_HASKELL only)
1265 * ------------------------------------------------------------------------- */
1267 #if defined(PARALLEL_HASKELL)
1269 scheduleGetRemoteWork(rtsBool *receivedFinish)
1271 ASSERT(emptyRunQueue());
1273 if (RtsFlags.ParFlags.BufferTime) {
1274 IF_PAR_DEBUG(verbose,
1275 debugBelch("...send all pending data,"));
1278 for (i=1; i<=nPEs; i++)
1279 sendImmediately(i); // send all messages away immediately
1283 //++EDEN++ idle() , i.e. send all buffers, wait for work
1284 // suppress fishing in EDEN... just look for incoming messages
1285 // (blocking receive)
1286 IF_PAR_DEBUG(verbose,
1287 debugBelch("...wait for incoming messages...\n"));
1288 *receivedFinish = processMessages(); // blocking receive...
1290 // and reenter scheduling loop after having received something
1291 // (return rtsFalse below)
1293 # else /* activate SPARKS machinery */
1294 /* We get here, if we have no work, tried to activate a local spark, but still
1295 have no work. We try to get a remote spark, by sending a FISH message.
1296 Thread migration should be added here, and triggered when a sequence of
1297 fishes returns without work. */
1298 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1300 /* =8-[ no local sparks => look for work on other PEs */
1302 * We really have absolutely no work. Send out a fish
1303 * (there may be some out there already), and wait for
1304 * something to arrive. We clearly can't run any threads
1305 * until a SCHEDULE or RESUME arrives, and so that's what
1306 * we're hoping to see. (Of course, we still have to
1307 * respond to other types of messages.)
1309 rtsTime now = msTime() /*CURRENT_TIME*/;
1310 IF_PAR_DEBUG(verbose,
1311 debugBelch("-- now=%ld\n", now));
1312 IF_PAR_DEBUG(fish, // verbose,
1313 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314 (last_fish_arrived_at!=0 &&
1315 last_fish_arrived_at+delay > now)) {
1316 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1317 now, last_fish_arrived_at+delay,
1318 last_fish_arrived_at,
1322 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1323 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1324 if (last_fish_arrived_at==0 ||
1325 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1326 /* outstandingFishes is set in sendFish, processFish;
1327 avoid flooding system with fishes via delay */
1328 next_fish_to_send_at = 0;
1330 /* ToDo: this should be done in the main scheduling loop to avoid the
1331 busy wait here; not so bad if fish delay is very small */
1332 int iq = 0; // DEBUGGING -- HWL
1333 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1334 /* send a fish when ready, but process messages that arrive in the meantime */
1336 if (PacketsWaiting()) {
1338 *receivedFinish = processMessages();
1341 } while (!*receivedFinish || now<next_fish_to_send_at);
1342 // JB: This means the fish could become obsolete, if we receive
1343 // work. Better check for work again?
1344 // last line: while (!receivedFinish || !haveWork || now<...)
1345 // next line: if (receivedFinish || haveWork )
1347 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1348 return rtsFalse; // NB: this will leave scheduler loop
1349 // immediately after return!
1351 IF_PAR_DEBUG(fish, // verbose,
1352 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1356 // JB: IMHO, this should all be hidden inside sendFish(...)
1358 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1361 // Global statistics: count no. of fishes
1362 if (RtsFlags.ParFlags.ParStats.Global &&
1363 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1364 globalParStats.tot_fish_mess++;
1368 /* delayed fishes must have been sent by now! */
1369 next_fish_to_send_at = 0;
1372 *receivedFinish = processMessages();
1373 # endif /* SPARKS */
1376 /* NB: this function always returns rtsFalse, meaning the scheduler
1377 loop continues with the next iteration;
1379 return code means success in finding work; we enter this function
1380 if there is no local work, thus have to send a fish which takes
1381 time until it arrives with work; in the meantime we should process
1382 messages in the main loop;
1385 #endif // PARALLEL_HASKELL
1387 /* ----------------------------------------------------------------------------
1388 * PAR/GRAN: Report stats & debugging info(?)
1389 * ------------------------------------------------------------------------- */
1391 #if defined(PAR) || defined(GRAN)
1393 scheduleGranParReport(void)
1395 ASSERT(run_queue_hd != END_TSO_QUEUE);
1397 /* Take a thread from the run queue, if we have work */
1398 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1400 /* If this TSO has got its outport closed in the meantime,
1401 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1402 * It has to be marked as TH_DEAD for this purpose.
1403 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1405 JB: TODO: investigate wether state change field could be nuked
1406 entirely and replaced by the normal tso state (whatnext
1407 field). All we want to do is to kill tsos from outside.
1410 /* ToDo: write something to the log-file
1411 if (RTSflags.ParFlags.granSimStats && !sameThread)
1412 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1416 /* the spark pool for the current PE */
1417 pool = &(cap.r.rSparks); // cap = (old) MainCap
1420 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1421 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1424 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1425 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1427 if (RtsFlags.ParFlags.ParStats.Full &&
1428 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1429 (emitSchedule || // forced emit
1430 (t && LastTSO && t->id != LastTSO->id))) {
1432 we are running a different TSO, so write a schedule event to log file
1433 NB: If we use fair scheduling we also have to write a deschedule
1434 event for LastTSO; with unfair scheduling we know that the
1435 previous tso has blocked whenever we switch to another tso, so
1436 we don't need it in GUM for now
1438 IF_PAR_DEBUG(fish, // schedule,
1439 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1441 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1442 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1443 emitSchedule = rtsFalse;
1448 /* ----------------------------------------------------------------------------
1449 * After running a thread...
1450 * ------------------------------------------------------------------------- */
1453 schedulePostRunThread(void)
1456 /* HACK 675: if the last thread didn't yield, make sure to print a
1457 SCHEDULE event to the log file when StgRunning the next thread, even
1458 if it is the same one as before */
1460 TimeOfLastYield = CURRENT_TIME;
1463 /* some statistics gathering in the parallel case */
1465 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1469 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1470 globalGranStats.tot_heapover++;
1472 globalParStats.tot_heapover++;
1479 DumpGranEvent(GR_DESCHEDULE, t));
1480 globalGranStats.tot_stackover++;
1483 // DumpGranEvent(GR_DESCHEDULE, t);
1484 globalParStats.tot_stackover++;
1488 case ThreadYielding:
1491 DumpGranEvent(GR_DESCHEDULE, t));
1492 globalGranStats.tot_yields++;
1495 // DumpGranEvent(GR_DESCHEDULE, t);
1496 globalParStats.tot_yields++;
1502 debugTrace(DEBUG_sched,
1503 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1504 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1505 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1506 if (t->block_info.closure!=(StgClosure*)NULL)
1507 print_bq(t->block_info.closure);
1510 // ??? needed; should emit block before
1512 DumpGranEvent(GR_DESCHEDULE, t));
1513 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1516 ASSERT(procStatus[CurrentProc]==Busy ||
1517 ((procStatus[CurrentProc]==Fetching) &&
1518 (t->block_info.closure!=(StgClosure*)NULL)));
1519 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1520 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1521 procStatus[CurrentProc]==Fetching))
1522 procStatus[CurrentProc] = Idle;
1525 //++PAR++ blockThread() writes the event (change?)
1529 case ThreadFinished:
1533 barf("parGlobalStats: unknown return code");
1539 /* -----------------------------------------------------------------------------
1540 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1541 * -------------------------------------------------------------------------- */
1544 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1546 // did the task ask for a large block?
1547 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1548 // if so, get one and push it on the front of the nursery.
1552 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1554 debugTrace(DEBUG_sched,
1555 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1556 (long)t->id, whatNext_strs[t->what_next], blocks);
1558 // don't do this if the nursery is (nearly) full, we'll GC first.
1559 if (cap->r.rCurrentNursery->link != NULL ||
1560 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1561 // if the nursery has only one block.
1564 bd = allocGroup( blocks );
1566 cap->r.rNursery->n_blocks += blocks;
1568 // link the new group into the list
1569 bd->link = cap->r.rCurrentNursery;
1570 bd->u.back = cap->r.rCurrentNursery->u.back;
1571 if (cap->r.rCurrentNursery->u.back != NULL) {
1572 cap->r.rCurrentNursery->u.back->link = bd;
1574 #if !defined(THREADED_RTS)
1575 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1576 g0s0 == cap->r.rNursery);
1578 cap->r.rNursery->blocks = bd;
1580 cap->r.rCurrentNursery->u.back = bd;
1582 // initialise it as a nursery block. We initialise the
1583 // step, gen_no, and flags field of *every* sub-block in
1584 // this large block, because this is easier than making
1585 // sure that we always find the block head of a large
1586 // block whenever we call Bdescr() (eg. evacuate() and
1587 // isAlive() in the GC would both have to do this, at
1591 for (x = bd; x < bd + blocks; x++) {
1592 x->step = cap->r.rNursery;
1598 // This assert can be a killer if the app is doing lots
1599 // of large block allocations.
1600 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1602 // now update the nursery to point to the new block
1603 cap->r.rCurrentNursery = bd;
1605 // we might be unlucky and have another thread get on the
1606 // run queue before us and steal the large block, but in that
1607 // case the thread will just end up requesting another large
1609 pushOnRunQueue(cap,t);
1610 return rtsFalse; /* not actually GC'ing */
1614 debugTrace(DEBUG_sched,
1615 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1616 (long)t->id, whatNext_strs[t->what_next]);
1619 ASSERT(!is_on_queue(t,CurrentProc));
1620 #elif defined(PARALLEL_HASKELL)
1621 /* Currently we emit a DESCHEDULE event before GC in GUM.
1622 ToDo: either add separate event to distinguish SYSTEM time from rest
1623 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1624 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1625 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1626 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1627 emitSchedule = rtsTrue;
1631 if (context_switch) {
1632 // Sometimes we miss a context switch, e.g. when calling
1633 // primitives in a tight loop, MAYBE_GC() doesn't check the
1634 // context switch flag, and we end up waiting for a GC.
1635 // See #1984, and concurrent/should_run/1984
1637 addToRunQueue(cap,t);
1639 pushOnRunQueue(cap,t);
1642 /* actual GC is done at the end of the while loop in schedule() */
1645 /* -----------------------------------------------------------------------------
1646 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1647 * -------------------------------------------------------------------------- */
1650 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1652 debugTrace (DEBUG_sched,
1653 "--<< thread %ld (%s) stopped, StackOverflow",
1654 (long)t->id, whatNext_strs[t->what_next]);
1656 /* just adjust the stack for this thread, then pop it back
1660 /* enlarge the stack */
1661 StgTSO *new_t = threadStackOverflow(cap, t);
1663 /* The TSO attached to this Task may have moved, so update the
1666 if (task->tso == t) {
1669 pushOnRunQueue(cap,new_t);
1673 /* -----------------------------------------------------------------------------
1674 * Handle a thread that returned to the scheduler with ThreadYielding
1675 * -------------------------------------------------------------------------- */
1678 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1680 // Reset the context switch flag. We don't do this just before
1681 // running the thread, because that would mean we would lose ticks
1682 // during GC, which can lead to unfair scheduling (a thread hogs
1683 // the CPU because the tick always arrives during GC). This way
1684 // penalises threads that do a lot of allocation, but that seems
1685 // better than the alternative.
1688 /* put the thread back on the run queue. Then, if we're ready to
1689 * GC, check whether this is the last task to stop. If so, wake
1690 * up the GC thread. getThread will block during a GC until the
1694 if (t->what_next != prev_what_next) {
1695 debugTrace(DEBUG_sched,
1696 "--<< thread %ld (%s) stopped to switch evaluators",
1697 (long)t->id, whatNext_strs[t->what_next]);
1699 debugTrace(DEBUG_sched,
1700 "--<< thread %ld (%s) stopped, yielding",
1701 (long)t->id, whatNext_strs[t->what_next]);
1706 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1708 ASSERT(t->_link == END_TSO_QUEUE);
1710 // Shortcut if we're just switching evaluators: don't bother
1711 // doing stack squeezing (which can be expensive), just run the
1713 if (t->what_next != prev_what_next) {
1718 ASSERT(!is_on_queue(t,CurrentProc));
1721 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1722 checkThreadQsSanity(rtsTrue));
1726 addToRunQueue(cap,t);
1729 /* add a ContinueThread event to actually process the thread */
1730 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1732 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1734 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1741 /* -----------------------------------------------------------------------------
1742 * Handle a thread that returned to the scheduler with ThreadBlocked
1743 * -------------------------------------------------------------------------- */
1746 scheduleHandleThreadBlocked( StgTSO *t
1747 #if !defined(GRAN) && !defined(DEBUG)
1754 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1755 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1756 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1758 // ??? needed; should emit block before
1760 DumpGranEvent(GR_DESCHEDULE, t));
1761 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1764 ASSERT(procStatus[CurrentProc]==Busy ||
1765 ((procStatus[CurrentProc]==Fetching) &&
1766 (t->block_info.closure!=(StgClosure*)NULL)));
1767 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1768 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1769 procStatus[CurrentProc]==Fetching))
1770 procStatus[CurrentProc] = Idle;
1774 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1775 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1778 if (t->block_info.closure!=(StgClosure*)NULL)
1779 print_bq(t->block_info.closure));
1781 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1784 /* whatever we schedule next, we must log that schedule */
1785 emitSchedule = rtsTrue;
1789 // We don't need to do anything. The thread is blocked, and it
1790 // has tidied up its stack and placed itself on whatever queue
1791 // it needs to be on.
1793 // ASSERT(t->why_blocked != NotBlocked);
1794 // Not true: for example,
1795 // - in THREADED_RTS, the thread may already have been woken
1796 // up by another Capability. This actually happens: try
1797 // conc023 +RTS -N2.
1798 // - the thread may have woken itself up already, because
1799 // threadPaused() might have raised a blocked throwTo
1800 // exception, see maybePerformBlockedException().
1803 if (traceClass(DEBUG_sched)) {
1804 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1805 (unsigned long)t->id, whatNext_strs[t->what_next]);
1806 printThreadBlockage(t);
1811 /* Only for dumping event to log file
1812 ToDo: do I need this in GranSim, too?
1818 /* -----------------------------------------------------------------------------
1819 * Handle a thread that returned to the scheduler with ThreadFinished
1820 * -------------------------------------------------------------------------- */
1823 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1825 /* Need to check whether this was a main thread, and if so,
1826 * return with the return value.
1828 * We also end up here if the thread kills itself with an
1829 * uncaught exception, see Exception.cmm.
1831 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1832 (unsigned long)t->id, whatNext_strs[t->what_next]);
1835 endThread(t, CurrentProc); // clean-up the thread
1836 #elif defined(PARALLEL_HASKELL)
1837 /* For now all are advisory -- HWL */
1838 //if(t->priority==AdvisoryPriority) ??
1839 advisory_thread_count--; // JB: Caution with this counter, buggy!
1842 if(t->dist.priority==RevalPriority)
1846 # if defined(EDENOLD)
1847 // the thread could still have an outport... (BUG)
1848 if (t->eden.outport != -1) {
1849 // delete the outport for the tso which has finished...
1850 IF_PAR_DEBUG(eden_ports,
1851 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1852 t->eden.outport, t->id));
1855 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1856 if (t->eden.epid != -1) {
1857 IF_PAR_DEBUG(eden_ports,
1858 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1859 t->id, t->eden.epid));
1860 removeTSOfromProcess(t);
1865 if (RtsFlags.ParFlags.ParStats.Full &&
1866 !RtsFlags.ParFlags.ParStats.Suppressed)
1867 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1869 // t->par only contains statistics: left out for now...
1871 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1872 t->id,t,t->par.sparkname));
1874 #endif // PARALLEL_HASKELL
1877 // Check whether the thread that just completed was a bound
1878 // thread, and if so return with the result.
1880 // There is an assumption here that all thread completion goes
1881 // through this point; we need to make sure that if a thread
1882 // ends up in the ThreadKilled state, that it stays on the run
1883 // queue so it can be dealt with here.
1888 if (t->bound != task) {
1889 #if !defined(THREADED_RTS)
1890 // Must be a bound thread that is not the topmost one. Leave
1891 // it on the run queue until the stack has unwound to the
1892 // point where we can deal with this. Leaving it on the run
1893 // queue also ensures that the garbage collector knows about
1894 // this thread and its return value (it gets dropped from the
1895 // step->threads list so there's no other way to find it).
1896 appendToRunQueue(cap,t);
1899 // this cannot happen in the threaded RTS, because a
1900 // bound thread can only be run by the appropriate Task.
1901 barf("finished bound thread that isn't mine");
1905 ASSERT(task->tso == t);
1907 if (t->what_next == ThreadComplete) {
1909 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1910 *(task->ret) = (StgClosure *)task->tso->sp[1];
1912 task->stat = Success;
1915 *(task->ret) = NULL;
1917 if (sched_state >= SCHED_INTERRUPTING) {
1918 task->stat = Interrupted;
1920 task->stat = Killed;
1924 removeThreadLabel((StgWord)task->tso->id);
1926 return rtsTrue; // tells schedule() to return
1932 /* -----------------------------------------------------------------------------
1933 * Perform a heap census
1934 * -------------------------------------------------------------------------- */
1937 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1939 // When we have +RTS -i0 and we're heap profiling, do a census at
1940 // every GC. This lets us get repeatable runs for debugging.
1941 if (performHeapProfile ||
1942 (RtsFlags.ProfFlags.profileInterval==0 &&
1943 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1950 /* -----------------------------------------------------------------------------
1951 * Perform a garbage collection if necessary
1952 * -------------------------------------------------------------------------- */
1955 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1958 rtsBool heap_census;
1960 static volatile StgWord waiting_for_gc;
1961 rtsBool was_waiting;
1966 // In order to GC, there must be no threads running Haskell code.
1967 // Therefore, the GC thread needs to hold *all* the capabilities,
1968 // and release them after the GC has completed.
1970 // This seems to be the simplest way: previous attempts involved
1971 // making all the threads with capabilities give up their
1972 // capabilities and sleep except for the *last* one, which
1973 // actually did the GC. But it's quite hard to arrange for all
1974 // the other tasks to sleep and stay asleep.
1977 was_waiting = cas(&waiting_for_gc, 0, 1);
1980 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1981 if (cap) yieldCapability(&cap,task);
1982 } while (waiting_for_gc);
1983 return cap; // NOTE: task->cap might have changed here
1986 for (i=0; i < n_capabilities; i++) {
1987 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1988 if (cap != &capabilities[i]) {
1989 Capability *pcap = &capabilities[i];
1990 // we better hope this task doesn't get migrated to
1991 // another Capability while we're waiting for this one.
1992 // It won't, because load balancing happens while we have
1993 // all the Capabilities, but even so it's a slightly
1994 // unsavoury invariant.
1997 waitForReturnCapability(&pcap, task);
1998 if (pcap != &capabilities[i]) {
1999 barf("scheduleDoGC: got the wrong capability");
2004 waiting_for_gc = rtsFalse;
2007 /* Kick any transactions which are invalid back to their
2008 * atomically frames. When next scheduled they will try to
2009 * commit, this commit will fail and they will retry.
2015 for (s = 0; s < total_steps; s++) {
2016 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2017 if (t->what_next == ThreadRelocated) {
2020 next = t->global_link;
2022 // This is a good place to check for blocked
2023 // exceptions. It might be the case that a thread is
2024 // blocked on delivering an exception to a thread that
2025 // is also blocked - we try to ensure that this
2026 // doesn't happen in throwTo(), but it's too hard (or
2027 // impossible) to close all the race holes, so we
2028 // accept that some might get through and deal with
2029 // them here. A GC will always happen at some point,
2030 // even if the system is otherwise deadlocked.
2031 maybePerformBlockedException (&capabilities[0], t);
2033 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2034 if (!stmValidateNestOfTransactions (t -> trec)) {
2035 debugTrace(DEBUG_sched | DEBUG_stm,
2036 "trec %p found wasting its time", t);
2038 // strip the stack back to the
2039 // ATOMICALLY_FRAME, aborting the (nested)
2040 // transaction, and saving the stack of any
2041 // partially-evaluated thunks on the heap.
2042 throwToSingleThreaded_(&capabilities[0], t,
2043 NULL, rtsTrue, NULL);
2046 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2055 // so this happens periodically:
2056 if (cap) scheduleCheckBlackHoles(cap);
2058 IF_DEBUG(scheduler, printAllThreads());
2061 * We now have all the capabilities; if we're in an interrupting
2062 * state, then we should take the opportunity to delete all the
2063 * threads in the system.
2065 if (sched_state >= SCHED_INTERRUPTING) {
2066 deleteAllThreads(&capabilities[0]);
2067 sched_state = SCHED_SHUTTING_DOWN;
2070 heap_census = scheduleNeedHeapProfile(rtsTrue);
2072 /* everybody back, start the GC.
2073 * Could do it in this thread, or signal a condition var
2074 * to do it in another thread. Either way, we need to
2075 * broadcast on gc_pending_cond afterward.
2077 #if defined(THREADED_RTS)
2078 debugTrace(DEBUG_sched, "doing GC");
2080 GarbageCollect(force_major || heap_census);
2083 debugTrace(DEBUG_sched, "performing heap census");
2085 performHeapProfile = rtsFalse;
2088 #if defined(THREADED_RTS)
2089 // release our stash of capabilities.
2090 for (i = 0; i < n_capabilities; i++) {
2091 if (cap != &capabilities[i]) {
2092 task->cap = &capabilities[i];
2093 releaseCapability(&capabilities[i]);
2104 /* add a ContinueThread event to continue execution of current thread */
2105 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2107 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2109 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2117 /* ---------------------------------------------------------------------------
2118 * Singleton fork(). Do not copy any running threads.
2119 * ------------------------------------------------------------------------- */
2122 forkProcess(HsStablePtr *entry
2123 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2128 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2135 #if defined(THREADED_RTS)
2136 if (RtsFlags.ParFlags.nNodes > 1) {
2137 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2138 stg_exit(EXIT_FAILURE);
2142 debugTrace(DEBUG_sched, "forking!");
2144 // ToDo: for SMP, we should probably acquire *all* the capabilities
2147 // no funny business: hold locks while we fork, otherwise if some
2148 // other thread is holding a lock when the fork happens, the data
2149 // structure protected by the lock will forever be in an
2150 // inconsistent state in the child. See also #1391.
2151 ACQUIRE_LOCK(&sched_mutex);
2152 ACQUIRE_LOCK(&cap->lock);
2153 ACQUIRE_LOCK(&cap->running_task->lock);
2157 if (pid) { // parent
2159 RELEASE_LOCK(&sched_mutex);
2160 RELEASE_LOCK(&cap->lock);
2161 RELEASE_LOCK(&cap->running_task->lock);
2163 // just return the pid
2169 #if defined(THREADED_RTS)
2170 initMutex(&sched_mutex);
2171 initMutex(&cap->lock);
2172 initMutex(&cap->running_task->lock);
2175 // Now, all OS threads except the thread that forked are
2176 // stopped. We need to stop all Haskell threads, including
2177 // those involved in foreign calls. Also we need to delete
2178 // all Tasks, because they correspond to OS threads that are
2181 for (s = 0; s < total_steps; s++) {
2182 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2183 if (t->what_next == ThreadRelocated) {
2186 next = t->global_link;
2187 // don't allow threads to catch the ThreadKilled
2188 // exception, but we do want to raiseAsync() because these
2189 // threads may be evaluating thunks that we need later.
2190 deleteThread_(cap,t);
2195 // Empty the run queue. It seems tempting to let all the
2196 // killed threads stay on the run queue as zombies to be
2197 // cleaned up later, but some of them correspond to bound
2198 // threads for which the corresponding Task does not exist.
2199 cap->run_queue_hd = END_TSO_QUEUE;
2200 cap->run_queue_tl = END_TSO_QUEUE;
2202 // Any suspended C-calling Tasks are no more, their OS threads
2204 cap->suspended_ccalling_tasks = NULL;
2206 // Empty the threads lists. Otherwise, the garbage
2207 // collector may attempt to resurrect some of these threads.
2208 for (s = 0; s < total_steps; s++) {
2209 all_steps[s].threads = END_TSO_QUEUE;
2212 // Wipe the task list, except the current Task.
2213 ACQUIRE_LOCK(&sched_mutex);
2214 for (task = all_tasks; task != NULL; task=task->all_link) {
2215 if (task != cap->running_task) {
2216 #if defined(THREADED_RTS)
2217 initMutex(&task->lock); // see #1391
2222 RELEASE_LOCK(&sched_mutex);
2224 #if defined(THREADED_RTS)
2225 // Wipe our spare workers list, they no longer exist. New
2226 // workers will be created if necessary.
2227 cap->spare_workers = NULL;
2228 cap->returning_tasks_hd = NULL;
2229 cap->returning_tasks_tl = NULL;
2232 // On Unix, all timers are reset in the child, so we need to start
2237 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2238 rts_checkSchedStatus("forkProcess",cap);
2241 hs_exit(); // clean up and exit
2242 stg_exit(EXIT_SUCCESS);
2244 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2245 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2250 /* ---------------------------------------------------------------------------
2251 * Delete all the threads in the system
2252 * ------------------------------------------------------------------------- */
2255 deleteAllThreads ( Capability *cap )
2257 // NOTE: only safe to call if we own all capabilities.
2262 debugTrace(DEBUG_sched,"deleting all threads");
2263 for (s = 0; s < total_steps; s++) {
2264 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2265 if (t->what_next == ThreadRelocated) {
2268 next = t->global_link;
2269 deleteThread(cap,t);
2274 // The run queue now contains a bunch of ThreadKilled threads. We
2275 // must not throw these away: the main thread(s) will be in there
2276 // somewhere, and the main scheduler loop has to deal with it.
2277 // Also, the run queue is the only thing keeping these threads from
2278 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2280 #if !defined(THREADED_RTS)
2281 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2282 ASSERT(sleeping_queue == END_TSO_QUEUE);
2286 /* -----------------------------------------------------------------------------
2287 Managing the suspended_ccalling_tasks list.
2288 Locks required: sched_mutex
2289 -------------------------------------------------------------------------- */
2292 suspendTask (Capability *cap, Task *task)
2294 ASSERT(task->next == NULL && task->prev == NULL);
2295 task->next = cap->suspended_ccalling_tasks;
2297 if (cap->suspended_ccalling_tasks) {
2298 cap->suspended_ccalling_tasks->prev = task;
2300 cap->suspended_ccalling_tasks = task;
2304 recoverSuspendedTask (Capability *cap, Task *task)
2307 task->prev->next = task->next;
2309 ASSERT(cap->suspended_ccalling_tasks == task);
2310 cap->suspended_ccalling_tasks = task->next;
2313 task->next->prev = task->prev;
2315 task->next = task->prev = NULL;
2318 /* ---------------------------------------------------------------------------
2319 * Suspending & resuming Haskell threads.
2321 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2322 * its capability before calling the C function. This allows another
2323 * task to pick up the capability and carry on running Haskell
2324 * threads. It also means that if the C call blocks, it won't lock
2327 * The Haskell thread making the C call is put to sleep for the
2328 * duration of the call, on the susepended_ccalling_threads queue. We
2329 * give out a token to the task, which it can use to resume the thread
2330 * on return from the C function.
2331 * ------------------------------------------------------------------------- */
2334 suspendThread (StgRegTable *reg)
2341 StgWord32 saved_winerror;
2344 saved_errno = errno;
2346 saved_winerror = GetLastError();
2349 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2351 cap = regTableToCapability(reg);
2353 task = cap->running_task;
2354 tso = cap->r.rCurrentTSO;
2356 debugTrace(DEBUG_sched,
2357 "thread %lu did a safe foreign call",
2358 (unsigned long)cap->r.rCurrentTSO->id);
2360 // XXX this might not be necessary --SDM
2361 tso->what_next = ThreadRunGHC;
2363 threadPaused(cap,tso);
2365 if ((tso->flags & TSO_BLOCKEX) == 0) {
2366 tso->why_blocked = BlockedOnCCall;
2367 tso->flags |= TSO_BLOCKEX;
2368 tso->flags &= ~TSO_INTERRUPTIBLE;
2370 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2373 // Hand back capability
2374 task->suspended_tso = tso;
2376 ACQUIRE_LOCK(&cap->lock);
2378 suspendTask(cap,task);
2379 cap->in_haskell = rtsFalse;
2380 releaseCapability_(cap);
2382 RELEASE_LOCK(&cap->lock);
2384 #if defined(THREADED_RTS)
2385 /* Preparing to leave the RTS, so ensure there's a native thread/task
2386 waiting to take over.
2388 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2391 errno = saved_errno;
2393 SetLastError(saved_winerror);
2399 resumeThread (void *task_)
2406 StgWord32 saved_winerror;
2409 saved_errno = errno;
2411 saved_winerror = GetLastError();
2415 // Wait for permission to re-enter the RTS with the result.
2416 waitForReturnCapability(&cap,task);
2417 // we might be on a different capability now... but if so, our
2418 // entry on the suspended_ccalling_tasks list will also have been
2421 // Remove the thread from the suspended list
2422 recoverSuspendedTask(cap,task);
2424 tso = task->suspended_tso;
2425 task->suspended_tso = NULL;
2426 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2427 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2429 if (tso->why_blocked == BlockedOnCCall) {
2430 awakenBlockedExceptionQueue(cap,tso);
2431 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2434 /* Reset blocking status */
2435 tso->why_blocked = NotBlocked;
2437 cap->r.rCurrentTSO = tso;
2438 cap->in_haskell = rtsTrue;
2439 errno = saved_errno;
2441 SetLastError(saved_winerror);
2444 /* We might have GC'd, mark the TSO dirty again */
2447 IF_DEBUG(sanity, checkTSO(tso));
2452 /* ---------------------------------------------------------------------------
2455 * scheduleThread puts a thread on the end of the runnable queue.
2456 * This will usually be done immediately after a thread is created.
2457 * The caller of scheduleThread must create the thread using e.g.
2458 * createThread and push an appropriate closure
2459 * on this thread's stack before the scheduler is invoked.
2460 * ------------------------------------------------------------------------ */
2463 scheduleThread(Capability *cap, StgTSO *tso)
2465 // The thread goes at the *end* of the run-queue, to avoid possible
2466 // starvation of any threads already on the queue.
2467 appendToRunQueue(cap,tso);
2471 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2473 #if defined(THREADED_RTS)
2474 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2475 // move this thread from now on.
2476 cpu %= RtsFlags.ParFlags.nNodes;
2477 if (cpu == cap->no) {
2478 appendToRunQueue(cap,tso);
2480 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2483 appendToRunQueue(cap,tso);
2488 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2492 // We already created/initialised the Task
2493 task = cap->running_task;
2495 // This TSO is now a bound thread; make the Task and TSO
2496 // point to each other.
2502 task->stat = NoStatus;
2504 appendToRunQueue(cap,tso);
2506 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2509 /* GranSim specific init */
2510 CurrentTSO = m->tso; // the TSO to run
2511 procStatus[MainProc] = Busy; // status of main PE
2512 CurrentProc = MainProc; // PE to run it on
2515 cap = schedule(cap,task);
2517 ASSERT(task->stat != NoStatus);
2518 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2520 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2524 /* ----------------------------------------------------------------------------
2526 * ------------------------------------------------------------------------- */
2528 #if defined(THREADED_RTS)
2530 workerStart(Task *task)
2534 // See startWorkerTask().
2535 ACQUIRE_LOCK(&task->lock);
2537 RELEASE_LOCK(&task->lock);
2539 // set the thread-local pointer to the Task:
2542 // schedule() runs without a lock.
2543 cap = schedule(cap,task);
2545 // On exit from schedule(), we have a Capability.
2546 releaseCapability(cap);
2547 workerTaskStop(task);
2551 /* ---------------------------------------------------------------------------
2554 * Initialise the scheduler. This resets all the queues - if the
2555 * queues contained any threads, they'll be garbage collected at the
2558 * ------------------------------------------------------------------------ */
2565 for (i=0; i<=MAX_PROC; i++) {
2566 run_queue_hds[i] = END_TSO_QUEUE;
2567 run_queue_tls[i] = END_TSO_QUEUE;
2568 blocked_queue_hds[i] = END_TSO_QUEUE;
2569 blocked_queue_tls[i] = END_TSO_QUEUE;
2570 ccalling_threadss[i] = END_TSO_QUEUE;
2571 blackhole_queue[i] = END_TSO_QUEUE;
2572 sleeping_queue = END_TSO_QUEUE;
2574 #elif !defined(THREADED_RTS)
2575 blocked_queue_hd = END_TSO_QUEUE;
2576 blocked_queue_tl = END_TSO_QUEUE;
2577 sleeping_queue = END_TSO_QUEUE;
2580 blackhole_queue = END_TSO_QUEUE;
2583 sched_state = SCHED_RUNNING;
2584 recent_activity = ACTIVITY_YES;
2586 #if defined(THREADED_RTS)
2587 /* Initialise the mutex and condition variables used by
2589 initMutex(&sched_mutex);
2592 ACQUIRE_LOCK(&sched_mutex);
2594 /* A capability holds the state a native thread needs in
2595 * order to execute STG code. At least one capability is
2596 * floating around (only THREADED_RTS builds have more than one).
2602 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2606 #if defined(THREADED_RTS)
2608 * Eagerly start one worker to run each Capability, except for
2609 * Capability 0. The idea is that we're probably going to start a
2610 * bound thread on Capability 0 pretty soon, so we don't want a
2611 * worker task hogging it.
2616 for (i = 1; i < n_capabilities; i++) {
2617 cap = &capabilities[i];
2618 ACQUIRE_LOCK(&cap->lock);
2619 startWorkerTask(cap, workerStart);
2620 RELEASE_LOCK(&cap->lock);
2625 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2627 RELEASE_LOCK(&sched_mutex);
2632 rtsBool wait_foreign
2633 #if !defined(THREADED_RTS)
2634 __attribute__((unused))
2637 /* see Capability.c, shutdownCapability() */
2641 #if defined(THREADED_RTS)
2642 ACQUIRE_LOCK(&sched_mutex);
2643 task = newBoundTask();
2644 RELEASE_LOCK(&sched_mutex);
2647 // If we haven't killed all the threads yet, do it now.
2648 if (sched_state < SCHED_SHUTTING_DOWN) {
2649 sched_state = SCHED_INTERRUPTING;
2650 scheduleDoGC(NULL,task,rtsFalse);
2652 sched_state = SCHED_SHUTTING_DOWN;
2654 #if defined(THREADED_RTS)
2658 for (i = 0; i < n_capabilities; i++) {
2659 shutdownCapability(&capabilities[i], task, wait_foreign);
2661 boundTaskExiting(task);
2665 freeCapability(&MainCapability);
2670 freeScheduler( void )
2673 if (n_capabilities != 1) {
2674 stgFree(capabilities);
2676 #if defined(THREADED_RTS)
2677 closeMutex(&sched_mutex);
2681 /* -----------------------------------------------------------------------------
2684 This is the interface to the garbage collector from Haskell land.
2685 We provide this so that external C code can allocate and garbage
2686 collect when called from Haskell via _ccall_GC.
2687 -------------------------------------------------------------------------- */
2690 performGC_(rtsBool force_major)
2693 // We must grab a new Task here, because the existing Task may be
2694 // associated with a particular Capability, and chained onto the
2695 // suspended_ccalling_tasks queue.
2696 ACQUIRE_LOCK(&sched_mutex);
2697 task = newBoundTask();
2698 RELEASE_LOCK(&sched_mutex);
2699 scheduleDoGC(NULL,task,force_major);
2700 boundTaskExiting(task);
2706 performGC_(rtsFalse);
2710 performMajorGC(void)
2712 performGC_(rtsTrue);
2715 /* -----------------------------------------------------------------------------
2718 If the thread has reached its maximum stack size, then raise the
2719 StackOverflow exception in the offending thread. Otherwise
2720 relocate the TSO into a larger chunk of memory and adjust its stack
2722 -------------------------------------------------------------------------- */
2725 threadStackOverflow(Capability *cap, StgTSO *tso)
2727 nat new_stack_size, stack_words;
2732 IF_DEBUG(sanity,checkTSO(tso));
2734 // don't allow throwTo() to modify the blocked_exceptions queue
2735 // while we are moving the TSO:
2736 lockClosure((StgClosure *)tso);
2738 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2739 // NB. never raise a StackOverflow exception if the thread is
2740 // inside Control.Exceptino.block. It is impractical to protect
2741 // against stack overflow exceptions, since virtually anything
2742 // can raise one (even 'catch'), so this is the only sensible
2743 // thing to do here. See bug #767.
2745 debugTrace(DEBUG_gc,
2746 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2747 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2749 /* If we're debugging, just print out the top of the stack */
2750 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2753 // Send this thread the StackOverflow exception
2755 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2759 /* Try to double the current stack size. If that takes us over the
2760 * maximum stack size for this thread, then use the maximum instead.
2761 * Finally round up so the TSO ends up as a whole number of blocks.
2763 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2764 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2765 TSO_STRUCT_SIZE)/sizeof(W_);
2766 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2767 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2769 debugTrace(DEBUG_sched,
2770 "increasing stack size from %ld words to %d.",
2771 (long)tso->stack_size, new_stack_size);
2773 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2774 TICK_ALLOC_TSO(new_stack_size,0);
2776 /* copy the TSO block and the old stack into the new area */
2777 memcpy(dest,tso,TSO_STRUCT_SIZE);
2778 stack_words = tso->stack + tso->stack_size - tso->sp;
2779 new_sp = (P_)dest + new_tso_size - stack_words;
2780 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2782 /* relocate the stack pointers... */
2784 dest->stack_size = new_stack_size;
2786 /* Mark the old TSO as relocated. We have to check for relocated
2787 * TSOs in the garbage collector and any primops that deal with TSOs.
2789 * It's important to set the sp value to just beyond the end
2790 * of the stack, so we don't attempt to scavenge any part of the
2793 tso->what_next = ThreadRelocated;
2794 setTSOLink(cap,tso,dest);
2795 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2796 tso->why_blocked = NotBlocked;
2798 IF_PAR_DEBUG(verbose,
2799 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2800 tso->id, tso, tso->stack_size);
2801 /* If we're debugging, just print out the top of the stack */
2802 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2808 IF_DEBUG(sanity,checkTSO(dest));
2810 IF_DEBUG(scheduler,printTSO(dest));
2817 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2819 bdescr *bd, *new_bd;
2820 lnat new_tso_size_w, tso_size_w;
2823 tso_size_w = tso_sizeW(tso);
2825 if (tso_size_w < MBLOCK_SIZE_W ||
2826 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2831 // don't allow throwTo() to modify the blocked_exceptions queue
2832 // while we are moving the TSO:
2833 lockClosure((StgClosure *)tso);
2835 new_tso_size_w = round_to_mblocks(tso_size_w/2);
2837 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2838 tso->id, tso_size_w, new_tso_size_w);
2840 bd = Bdescr((StgPtr)tso);
2841 new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2843 new_tso = (StgTSO *)new_bd->start;
2844 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2845 new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2847 tso->what_next = ThreadRelocated;
2848 tso->_link = new_tso; // no write barrier reqd: same generation
2850 // The TSO attached to this Task may have moved, so update the
2852 if (task->tso == tso) {
2853 task->tso = new_tso;
2859 IF_DEBUG(sanity,checkTSO(new_tso));
2864 /* ---------------------------------------------------------------------------
2866 - usually called inside a signal handler so it mustn't do anything fancy.
2867 ------------------------------------------------------------------------ */
2870 interruptStgRts(void)
2872 sched_state = SCHED_INTERRUPTING;
2877 /* -----------------------------------------------------------------------------
2880 This function causes at least one OS thread to wake up and run the
2881 scheduler loop. It is invoked when the RTS might be deadlocked, or
2882 an external event has arrived that may need servicing (eg. a
2883 keyboard interrupt).
2885 In the single-threaded RTS we don't do anything here; we only have
2886 one thread anyway, and the event that caused us to want to wake up
2887 will have interrupted any blocking system call in progress anyway.
2888 -------------------------------------------------------------------------- */
2893 #if defined(THREADED_RTS)
2894 // This forces the IO Manager thread to wakeup, which will
2895 // in turn ensure that some OS thread wakes up and runs the
2896 // scheduler loop, which will cause a GC and deadlock check.
2901 /* -----------------------------------------------------------------------------
2904 * Check the blackhole_queue for threads that can be woken up. We do
2905 * this periodically: before every GC, and whenever the run queue is
2908 * An elegant solution might be to just wake up all the blocked
2909 * threads with awakenBlockedQueue occasionally: they'll go back to
2910 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2911 * doesn't give us a way to tell whether we've actually managed to
2912 * wake up any threads, so we would be busy-waiting.
2914 * -------------------------------------------------------------------------- */
2917 checkBlackHoles (Capability *cap)
2920 rtsBool any_woke_up = rtsFalse;
2923 // blackhole_queue is global:
2924 ASSERT_LOCK_HELD(&sched_mutex);
2926 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2928 // ASSUMES: sched_mutex
2929 prev = &blackhole_queue;
2930 t = blackhole_queue;
2931 while (t != END_TSO_QUEUE) {
2932 ASSERT(t->why_blocked == BlockedOnBlackHole);
2933 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2934 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2935 IF_DEBUG(sanity,checkTSO(t));
2936 t = unblockOne(cap, t);
2937 // urk, the threads migrate to the current capability
2938 // here, but we'd like to keep them on the original one.
2940 any_woke_up = rtsTrue;
2950 /* -----------------------------------------------------------------------------
2953 This is used for interruption (^C) and forking, and corresponds to
2954 raising an exception but without letting the thread catch the
2956 -------------------------------------------------------------------------- */
2959 deleteThread (Capability *cap, StgTSO *tso)
2961 // NOTE: must only be called on a TSO that we have exclusive
2962 // access to, because we will call throwToSingleThreaded() below.
2963 // The TSO must be on the run queue of the Capability we own, or
2964 // we must own all Capabilities.
2966 if (tso->why_blocked != BlockedOnCCall &&
2967 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2968 throwToSingleThreaded(cap,tso,NULL);
2972 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2974 deleteThread_(Capability *cap, StgTSO *tso)
2975 { // for forkProcess only:
2976 // like deleteThread(), but we delete threads in foreign calls, too.
2978 if (tso->why_blocked == BlockedOnCCall ||
2979 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2980 unblockOne(cap,tso);
2981 tso->what_next = ThreadKilled;
2983 deleteThread(cap,tso);
2988 /* -----------------------------------------------------------------------------
2989 raiseExceptionHelper
2991 This function is called by the raise# primitve, just so that we can
2992 move some of the tricky bits of raising an exception from C-- into
2993 C. Who knows, it might be a useful re-useable thing here too.
2994 -------------------------------------------------------------------------- */
2997 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2999 Capability *cap = regTableToCapability(reg);
3000 StgThunk *raise_closure = NULL;
3002 StgRetInfoTable *info;
3004 // This closure represents the expression 'raise# E' where E
3005 // is the exception raise. It is used to overwrite all the
3006 // thunks which are currently under evaluataion.
3009 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3010 // LDV profiling: stg_raise_info has THUNK as its closure
3011 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3012 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3013 // 1 does not cause any problem unless profiling is performed.
3014 // However, when LDV profiling goes on, we need to linearly scan
3015 // small object pool, where raise_closure is stored, so we should
3016 // use MIN_UPD_SIZE.
3018 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3019 // sizeofW(StgClosure)+1);
3023 // Walk up the stack, looking for the catch frame. On the way,
3024 // we update any closures pointed to from update frames with the
3025 // raise closure that we just built.
3029 info = get_ret_itbl((StgClosure *)p);
3030 next = p + stack_frame_sizeW((StgClosure *)p);
3031 switch (info->i.type) {
3034 // Only create raise_closure if we need to.
3035 if (raise_closure == NULL) {
3037 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3038 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3039 raise_closure->payload[0] = exception;
3041 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3045 case ATOMICALLY_FRAME:
3046 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3048 return ATOMICALLY_FRAME;
3054 case CATCH_STM_FRAME:
3055 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3057 return CATCH_STM_FRAME;
3063 case CATCH_RETRY_FRAME:
3072 /* -----------------------------------------------------------------------------
3073 findRetryFrameHelper
3075 This function is called by the retry# primitive. It traverses the stack
3076 leaving tso->sp referring to the frame which should handle the retry.
3078 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3079 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3081 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3082 create) because retries are not considered to be exceptions, despite the
3083 similar implementation.
3085 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3086 not be created within memory transactions.
3087 -------------------------------------------------------------------------- */
3090 findRetryFrameHelper (StgTSO *tso)
3093 StgRetInfoTable *info;
3097 info = get_ret_itbl((StgClosure *)p);
3098 next = p + stack_frame_sizeW((StgClosure *)p);
3099 switch (info->i.type) {
3101 case ATOMICALLY_FRAME:
3102 debugTrace(DEBUG_stm,
3103 "found ATOMICALLY_FRAME at %p during retry", p);
3105 return ATOMICALLY_FRAME;
3107 case CATCH_RETRY_FRAME:
3108 debugTrace(DEBUG_stm,
3109 "found CATCH_RETRY_FRAME at %p during retrry", p);
3111 return CATCH_RETRY_FRAME;
3113 case CATCH_STM_FRAME: {
3114 StgTRecHeader *trec = tso -> trec;
3115 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3116 debugTrace(DEBUG_stm,
3117 "found CATCH_STM_FRAME at %p during retry", p);
3118 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3119 stmAbortTransaction(tso -> cap, trec);
3120 stmFreeAbortedTRec(tso -> cap, trec);
3121 tso -> trec = outer;
3128 ASSERT(info->i.type != CATCH_FRAME);
3129 ASSERT(info->i.type != STOP_FRAME);
3136 /* -----------------------------------------------------------------------------
3137 resurrectThreads is called after garbage collection on the list of
3138 threads found to be garbage. Each of these threads will be woken
3139 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3140 on an MVar, or NonTermination if the thread was blocked on a Black
3143 Locks: assumes we hold *all* the capabilities.
3144 -------------------------------------------------------------------------- */
3147 resurrectThreads (StgTSO *threads)
3153 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3154 next = tso->global_link;
3156 step = Bdescr((P_)tso)->step;
3157 tso->global_link = step->threads;
3158 step->threads = tso;
3160 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3162 // Wake up the thread on the Capability it was last on
3165 switch (tso->why_blocked) {
3167 case BlockedOnException:
3168 /* Called by GC - sched_mutex lock is currently held. */
3169 throwToSingleThreaded(cap, tso,
3170 (StgClosure *)BlockedOnDeadMVar_closure);
3172 case BlockedOnBlackHole:
3173 throwToSingleThreaded(cap, tso,
3174 (StgClosure *)NonTermination_closure);
3177 throwToSingleThreaded(cap, tso,
3178 (StgClosure *)BlockedIndefinitely_closure);
3181 /* This might happen if the thread was blocked on a black hole
3182 * belonging to a thread that we've just woken up (raiseAsync
3183 * can wake up threads, remember...).
3187 barf("resurrectThreads: thread blocked in a strange way");