1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
44 #include "Capability.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
51 #include "RaiseAsync.h"
53 #include "ThrIOManager.h"
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
70 // Turn off inlining when debugging - it obfuscates things
73 # define STATIC_INLINE static
76 /* -----------------------------------------------------------------------------
78 * -------------------------------------------------------------------------- */
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
112 StgTSO *blackhole_queue = NULL;
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
119 rtsBool blackholes_need_checking = rtsFalse;
121 /* Linked list of all threads.
122 * Used for detecting garbage collected threads.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *all_threads = NULL;
127 /* flag set by signal handler to precipitate a context switch
128 * LOCK: none (just an advisory flag)
130 int context_switch = 0;
132 /* flag that tracks whether we have done any execution in this time slice.
133 * LOCK: currently none, perhaps we should lock (but needs to be
134 * updated in the fast path of the scheduler).
136 nat recent_activity = ACTIVITY_YES;
138 /* if this flag is set as well, give up execution
139 * LOCK: none (changes once, from false->true)
141 rtsBool sched_state = SCHED_RUNNING;
147 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
148 * exists - earlier gccs apparently didn't.
154 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155 * in an MT setting, needed to signal that a worker thread shouldn't hang around
156 * in the scheduler when it is out of work.
158 rtsBool shutting_down_scheduler = rtsFalse;
161 * This mutex protects most of the global scheduler data in
162 * the THREADED_RTS runtime.
164 #if defined(THREADED_RTS)
168 #if defined(PARALLEL_HASKELL)
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
178 /* -----------------------------------------------------------------------------
179 * static function prototypes
180 * -------------------------------------------------------------------------- */
182 static Capability *schedule (Capability *initialCapability, Task *task);
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
214 nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220 rtsBool force_major);
222 static rtsBool checkBlackHoles(Capability *cap);
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);
239 static char *whatNext_strs[] = {
249 /* -----------------------------------------------------------------------------
250 * Putting a thread on the run queue: different scheduling policies
251 * -------------------------------------------------------------------------- */
254 addToRunQueue( Capability *cap, StgTSO *t )
256 #if defined(PARALLEL_HASKELL)
257 if (RtsFlags.ParFlags.doFairScheduling) {
258 // this does round-robin scheduling; good for concurrency
259 appendToRunQueue(cap,t);
261 // this does unfair scheduling; good for parallelism
262 pushOnRunQueue(cap,t);
265 // this does round-robin scheduling; good for concurrency
266 appendToRunQueue(cap,t);
270 /* ---------------------------------------------------------------------------
271 Main scheduling loop.
273 We use round-robin scheduling, each thread returning to the
274 scheduler loop when one of these conditions is detected:
277 * timer expires (thread yields)
283 In a GranSim setup this loop iterates over the global event queue.
284 This revolves around the global event queue, which determines what
285 to do next. Therefore, it's more complicated than either the
286 concurrent or the parallel (GUM) setup.
289 GUM iterates over incoming messages.
290 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291 and sends out a fish whenever it has nothing to do; in-between
292 doing the actual reductions (shared code below) it processes the
293 incoming messages and deals with delayed operations
294 (see PendingFetches).
295 This is not the ugliest code you could imagine, but it's bloody close.
297 ------------------------------------------------------------------------ */
300 schedule (Capability *initialCapability, Task *task)
304 StgThreadReturnCode ret;
307 #elif defined(PARALLEL_HASKELL)
310 rtsBool receivedFinish = rtsFalse;
312 nat tp_size, sp_size; // stats only
317 #if defined(THREADED_RTS)
318 rtsBool first = rtsTrue;
321 cap = initialCapability;
323 // Pre-condition: this task owns initialCapability.
324 // The sched_mutex is *NOT* held
325 // NB. on return, we still hold a capability.
327 debugTrace (DEBUG_sched,
328 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329 task, initialCapability);
333 // -----------------------------------------------------------
334 // Scheduler loop starts here:
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION (!receivedFinish)
339 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
341 #define TERMINATION_CONDITION rtsTrue
344 while (TERMINATION_CONDITION) {
347 /* Choose the processor with the next event */
348 CurrentProc = event->proc;
349 CurrentTSO = event->tso;
352 #if defined(THREADED_RTS)
354 // don't yield the first time, we want a chance to run this
355 // thread for a bit, even if there are others banging at the
358 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360 // Yield the capability to higher-priority tasks if necessary.
361 yieldCapability(&cap, task);
365 #if defined(THREADED_RTS)
366 schedulePushWork(cap,task);
369 // Check whether we have re-entered the RTS from Haskell without
370 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372 if (cap->in_haskell) {
373 errorBelch("schedule: re-entered unsafely.\n"
374 " Perhaps a 'foreign import unsafe' should be 'safe'?");
375 stg_exit(EXIT_FAILURE);
378 // The interruption / shutdown sequence.
380 // In order to cleanly shut down the runtime, we want to:
381 // * make sure that all main threads return to their callers
382 // with the state 'Interrupted'.
383 // * clean up all OS threads assocated with the runtime
384 // * free all memory etc.
386 // So the sequence for ^C goes like this:
388 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
389 // arranges for some Capability to wake up
391 // * all threads in the system are halted, and the zombies are
392 // placed on the run queue for cleaning up. We acquire all
393 // the capabilities in order to delete the threads, this is
394 // done by scheduleDoGC() for convenience (because GC already
395 // needs to acquire all the capabilities). We can't kill
396 // threads involved in foreign calls.
398 // * somebody calls shutdownHaskell(), which calls exitScheduler()
400 // * sched_state := SCHED_SHUTTING_DOWN
402 // * all workers exit when the run queue on their capability
403 // drains. All main threads will also exit when their TSO
404 // reaches the head of the run queue and they can return.
406 // * eventually all Capabilities will shut down, and the RTS can
409 // * We might be left with threads blocked in foreign calls,
410 // we should really attempt to kill these somehow (TODO);
412 switch (sched_state) {
415 case SCHED_INTERRUPTING:
416 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418 discardSparksCap(cap);
420 /* scheduleDoGC() deletes all the threads */
421 cap = scheduleDoGC(cap,task,rtsFalse);
423 case SCHED_SHUTTING_DOWN:
424 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425 // If we are a worker, just exit. If we're a bound thread
426 // then we will exit below when we've removed our TSO from
428 if (task->tso == NULL && emptyRunQueue(cap)) {
433 barf("sched_state: %d", sched_state);
436 #if defined(THREADED_RTS)
437 // If the run queue is empty, take a spark and turn it into a thread.
439 if (emptyRunQueue(cap)) {
441 spark = findSpark(cap);
443 debugTrace(DEBUG_sched,
444 "turning spark of closure %p into a thread",
445 (StgClosure *)spark);
446 createSparkThread(cap,spark);
450 #endif // THREADED_RTS
452 scheduleStartSignalHandlers(cap);
454 // Only check the black holes here if we've nothing else to do.
455 // During normal execution, the black hole list only gets checked
456 // at GC time, to avoid repeatedly traversing this possibly long
457 // list each time around the scheduler.
458 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460 scheduleCheckWakeupThreads(cap);
462 scheduleCheckBlockedThreads(cap);
464 scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466 cap = task->cap; // reload cap, it might have changed
469 // Normally, the only way we can get here with no threads to
470 // run is if a keyboard interrupt received during
471 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472 // Additionally, it is not fatal for the
473 // threaded RTS to reach here with no threads to run.
475 // win32: might be here due to awaitEvent() being abandoned
476 // as a result of a console event having been delivered.
477 if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479 ASSERT(sched_state >= SCHED_INTERRUPTING);
481 continue; // nothing to do
484 #if defined(PARALLEL_HASKELL)
485 scheduleSendPendingMessages();
486 if (emptyRunQueue(cap) && scheduleActivateSpark())
490 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
493 /* If we still have no work we need to send a FISH to get a spark
495 if (emptyRunQueue(cap)) {
496 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497 ASSERT(rtsFalse); // should not happen at the moment
499 // from here: non-empty run queue.
500 // TODO: merge above case with this, only one call processMessages() !
501 if (PacketsWaiting()) { /* process incoming messages, if
502 any pending... only in else
503 because getRemoteWork waits for
505 receivedFinish = processMessages();
510 scheduleProcessEvent(event);
514 // Get a thread to run
516 t = popRunQueue(cap);
518 #if defined(GRAN) || defined(PAR)
519 scheduleGranParReport(); // some kind of debuging output
521 // Sanity check the thread we're about to run. This can be
522 // expensive if there is lots of thread switching going on...
523 IF_DEBUG(sanity,checkTSO(t));
526 #if defined(THREADED_RTS)
527 // Check whether we can run this thread in the current task.
528 // If not, we have to pass our capability to the right task.
530 Task *bound = t->bound;
534 debugTrace(DEBUG_sched,
535 "### Running thread %lu in bound thread", (unsigned long)t->id);
536 // yes, the Haskell thread is bound to the current native thread
538 debugTrace(DEBUG_sched,
539 "### thread %lu bound to another OS thread", (unsigned long)t->id);
540 // no, bound to a different Haskell thread: pass to that thread
541 pushOnRunQueue(cap,t);
545 // The thread we want to run is unbound.
547 debugTrace(DEBUG_sched,
548 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549 // no, the current native thread is bound to a different
550 // Haskell thread, so pass it to any worker thread
551 pushOnRunQueue(cap,t);
558 cap->r.rCurrentTSO = t;
560 /* context switches are initiated by the timer signal, unless
561 * the user specified "context switch as often as possible", with
564 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565 && !emptyThreadQueues(cap)) {
571 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
572 (long)t->id, whatNext_strs[t->what_next]);
574 startHeapProfTimer();
576 // Check for exceptions blocked on this thread
577 maybePerformBlockedException (cap, t);
579 // ----------------------------------------------------------------------
580 // Run the current thread
582 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583 ASSERT(t->cap == cap);
585 prev_what_next = t->what_next;
587 errno = t->saved_errno;
589 SetLastError(t->saved_winerror);
592 cap->in_haskell = rtsTrue;
596 #if defined(THREADED_RTS)
597 if (recent_activity == ACTIVITY_DONE_GC) {
598 // ACTIVITY_DONE_GC means we turned off the timer signal to
599 // conserve power (see #1623). Re-enable it here.
601 prev = xchg(&recent_activity, ACTIVITY_YES);
602 if (prev == ACTIVITY_DONE_GC) {
606 recent_activity = ACTIVITY_YES;
610 switch (prev_what_next) {
614 /* Thread already finished, return to scheduler. */
615 ret = ThreadFinished;
621 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
622 cap = regTableToCapability(r);
627 case ThreadInterpret:
628 cap = interpretBCO(cap);
633 barf("schedule: invalid what_next field");
636 cap->in_haskell = rtsFalse;
638 // The TSO might have moved, eg. if it re-entered the RTS and a GC
639 // happened. So find the new location:
640 t = cap->r.rCurrentTSO;
642 // We have run some Haskell code: there might be blackhole-blocked
643 // threads to wake up now.
644 // Lock-free test here should be ok, we're just setting a flag.
645 if ( blackhole_queue != END_TSO_QUEUE ) {
646 blackholes_need_checking = rtsTrue;
649 // And save the current errno in this thread.
650 // XXX: possibly bogus for SMP because this thread might already
651 // be running again, see code below.
652 t->saved_errno = errno;
654 // Similarly for Windows error code
655 t->saved_winerror = GetLastError();
658 #if defined(THREADED_RTS)
659 // If ret is ThreadBlocked, and this Task is bound to the TSO that
660 // blocked, we are in limbo - the TSO is now owned by whatever it
661 // is blocked on, and may in fact already have been woken up,
662 // perhaps even on a different Capability. It may be the case
663 // that task->cap != cap. We better yield this Capability
664 // immediately and return to normaility.
665 if (ret == ThreadBlocked) {
666 debugTrace(DEBUG_sched,
667 "--<< thread %lu (%s) stopped: blocked",
668 (unsigned long)t->id, whatNext_strs[t->what_next]);
673 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674 ASSERT(t->cap == cap);
676 // ----------------------------------------------------------------------
678 // Costs for the scheduler are assigned to CCS_SYSTEM
680 #if defined(PROFILING)
684 schedulePostRunThread();
686 ready_to_gc = rtsFalse;
690 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
694 scheduleHandleStackOverflow(cap,task,t);
698 if (scheduleHandleYield(cap, t, prev_what_next)) {
699 // shortcut for switching between compiler/interpreter:
705 scheduleHandleThreadBlocked(t);
709 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
714 barf("schedule: invalid thread return code %d", (int)ret);
717 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718 cap = scheduleDoGC(cap,task,rtsFalse);
720 } /* end of while() */
723 /* ----------------------------------------------------------------------------
724 * Setting up the scheduler loop
725 * ------------------------------------------------------------------------- */
728 schedulePreLoop(void)
731 /* set up first event to get things going */
732 /* ToDo: assign costs for system setup and init MainTSO ! */
733 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
735 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
737 debugTrace (DEBUG_gran,
738 "GRAN: Init CurrentTSO (in schedule) = %p",
740 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
742 if (RtsFlags.GranFlags.Light) {
743 /* Save current time; GranSim Light only */
744 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
749 /* -----------------------------------------------------------------------------
752 * Push work to other Capabilities if we have some.
753 * -------------------------------------------------------------------------- */
755 #if defined(THREADED_RTS)
757 schedulePushWork(Capability *cap USED_IF_THREADS,
758 Task *task USED_IF_THREADS)
760 Capability *free_caps[n_capabilities], *cap0;
763 // migration can be turned off with +RTS -qg
764 if (!RtsFlags.ParFlags.migrate) return;
766 // Check whether we have more threads on our run queue, or sparks
767 // in our pool, that we could hand to another Capability.
768 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
769 && sparkPoolSizeCap(cap) < 2) {
773 // First grab as many free Capabilities as we can.
774 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
775 cap0 = &capabilities[i];
776 if (cap != cap0 && tryGrabCapability(cap0,task)) {
777 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
778 // it already has some work, we just grabbed it at
779 // the wrong moment. Or maybe it's deadlocked!
780 releaseCapability(cap0);
782 free_caps[n_free_caps++] = cap0;
787 // we now have n_free_caps free capabilities stashed in
788 // free_caps[]. Share our run queue equally with them. This is
789 // probably the simplest thing we could do; improvements we might
790 // want to do include:
792 // - giving high priority to moving relatively new threads, on
793 // the gournds that they haven't had time to build up a
794 // working set in the cache on this CPU/Capability.
796 // - giving low priority to moving long-lived threads
798 if (n_free_caps > 0) {
799 StgTSO *prev, *t, *next;
800 rtsBool pushed_to_all;
802 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
805 pushed_to_all = rtsFalse;
807 if (cap->run_queue_hd != END_TSO_QUEUE) {
808 prev = cap->run_queue_hd;
810 prev->link = END_TSO_QUEUE;
811 for (; t != END_TSO_QUEUE; t = next) {
813 t->link = END_TSO_QUEUE;
814 if (t->what_next == ThreadRelocated
815 || t->bound == task // don't move my bound thread
816 || tsoLocked(t)) { // don't move a locked thread
819 } else if (i == n_free_caps) {
820 pushed_to_all = rtsTrue;
826 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
827 appendToRunQueue(free_caps[i],t);
828 if (t->bound) { t->bound->cap = free_caps[i]; }
829 t->cap = free_caps[i];
833 cap->run_queue_tl = prev;
836 // If there are some free capabilities that we didn't push any
837 // threads to, then try to push a spark to each one.
838 if (!pushed_to_all) {
840 // i is the next free capability to push to
841 for (; i < n_free_caps; i++) {
842 if (emptySparkPoolCap(free_caps[i])) {
843 spark = findSpark(cap);
845 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
846 newSpark(&(free_caps[i]->r), spark);
852 // release the capabilities
853 for (i = 0; i < n_free_caps; i++) {
854 task->cap = free_caps[i];
855 releaseCapability(free_caps[i]);
858 task->cap = cap; // reset to point to our Capability.
862 /* ----------------------------------------------------------------------------
863 * Start any pending signal handlers
864 * ------------------------------------------------------------------------- */
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
868 scheduleStartSignalHandlers(Capability *cap)
870 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871 // safe outside the lock
872 startSignalHandlers(cap);
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
882 /* ----------------------------------------------------------------------------
883 * Check for blocked threads that can be woken up.
884 * ------------------------------------------------------------------------- */
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
889 #if !defined(THREADED_RTS)
891 // Check whether any waiting threads need to be woken up. If the
892 // run queue is empty, and there are no other tasks running, we
893 // can wait indefinitely for something to happen.
895 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
897 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
903 /* ----------------------------------------------------------------------------
904 * Check for threads woken up by other Capabilities
905 * ------------------------------------------------------------------------- */
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
910 #if defined(THREADED_RTS)
911 // Any threads that were woken up by other Capabilities get
912 // appended to our run queue.
913 if (!emptyWakeupQueue(cap)) {
914 ACQUIRE_LOCK(&cap->lock);
915 if (emptyRunQueue(cap)) {
916 cap->run_queue_hd = cap->wakeup_queue_hd;
917 cap->run_queue_tl = cap->wakeup_queue_tl;
919 cap->run_queue_tl->link = cap->wakeup_queue_hd;
920 cap->run_queue_tl = cap->wakeup_queue_tl;
922 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923 RELEASE_LOCK(&cap->lock);
928 /* ----------------------------------------------------------------------------
929 * Check for threads blocked on BLACKHOLEs that can be woken up
930 * ------------------------------------------------------------------------- */
932 scheduleCheckBlackHoles (Capability *cap)
934 if ( blackholes_need_checking ) // check without the lock first
936 ACQUIRE_LOCK(&sched_mutex);
937 if ( blackholes_need_checking ) {
938 checkBlackHoles(cap);
939 blackholes_need_checking = rtsFalse;
941 RELEASE_LOCK(&sched_mutex);
945 /* ----------------------------------------------------------------------------
946 * Detect deadlock conditions and attempt to resolve them.
947 * ------------------------------------------------------------------------- */
950 scheduleDetectDeadlock (Capability *cap, Task *task)
953 #if defined(PARALLEL_HASKELL)
954 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
959 * Detect deadlock: when we have no threads to run, there are no
960 * threads blocked, waiting for I/O, or sleeping, and all the
961 * other tasks are waiting for work, we must have a deadlock of
964 if ( emptyThreadQueues(cap) )
966 #if defined(THREADED_RTS)
968 * In the threaded RTS, we only check for deadlock if there
969 * has been no activity in a complete timeslice. This means
970 * we won't eagerly start a full GC just because we don't have
971 * any threads to run currently.
973 if (recent_activity != ACTIVITY_INACTIVE) return;
976 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
978 // Garbage collection can release some new threads due to
979 // either (a) finalizers or (b) threads resurrected because
980 // they are unreachable and will therefore be sent an
981 // exception. Any threads thus released will be immediately
983 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
985 recent_activity = ACTIVITY_DONE_GC;
986 // disable timer signals (see #1623)
989 if ( !emptyRunQueue(cap) ) return;
991 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
992 /* If we have user-installed signal handlers, then wait
993 * for signals to arrive rather then bombing out with a
996 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
997 debugTrace(DEBUG_sched,
998 "still deadlocked, waiting for signals...");
1002 if (signals_pending()) {
1003 startSignalHandlers(cap);
1006 // either we have threads to run, or we were interrupted:
1007 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1011 #if !defined(THREADED_RTS)
1012 /* Probably a real deadlock. Send the current main thread the
1013 * Deadlock exception.
1016 switch (task->tso->why_blocked) {
1018 case BlockedOnBlackHole:
1019 case BlockedOnException:
1021 throwToSingleThreaded(cap, task->tso,
1022 (StgClosure *)NonTermination_closure);
1025 barf("deadlock: main thread blocked in a strange way");
1033 /* ----------------------------------------------------------------------------
1034 * Process an event (GRAN only)
1035 * ------------------------------------------------------------------------- */
1039 scheduleProcessEvent(rtsEvent *event)
1043 if (RtsFlags.GranFlags.Light)
1044 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1046 /* adjust time based on time-stamp */
1047 if (event->time > CurrentTime[CurrentProc] &&
1048 event->evttype != ContinueThread)
1049 CurrentTime[CurrentProc] = event->time;
1051 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1052 if (!RtsFlags.GranFlags.Light)
1055 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1057 /* main event dispatcher in GranSim */
1058 switch (event->evttype) {
1059 /* Should just be continuing execution */
1060 case ContinueThread:
1061 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1062 /* ToDo: check assertion
1063 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1064 run_queue_hd != END_TSO_QUEUE);
1066 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1067 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1068 procStatus[CurrentProc]==Fetching) {
1069 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1070 CurrentTSO->id, CurrentTSO, CurrentProc);
1073 /* Ignore ContinueThreads for completed threads */
1074 if (CurrentTSO->what_next == ThreadComplete) {
1075 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1076 CurrentTSO->id, CurrentTSO, CurrentProc);
1079 /* Ignore ContinueThreads for threads that are being migrated */
1080 if (PROCS(CurrentTSO)==Nowhere) {
1081 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1082 CurrentTSO->id, CurrentTSO, CurrentProc);
1085 /* The thread should be at the beginning of the run queue */
1086 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1087 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1088 CurrentTSO->id, CurrentTSO, CurrentProc);
1089 break; // run the thread anyway
1092 new_event(proc, proc, CurrentTime[proc],
1094 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1096 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1097 break; // now actually run the thread; DaH Qu'vam yImuHbej
1100 do_the_fetchnode(event);
1101 goto next_thread; /* handle next event in event queue */
1104 do_the_globalblock(event);
1105 goto next_thread; /* handle next event in event queue */
1108 do_the_fetchreply(event);
1109 goto next_thread; /* handle next event in event queue */
1111 case UnblockThread: /* Move from the blocked queue to the tail of */
1112 do_the_unblock(event);
1113 goto next_thread; /* handle next event in event queue */
1115 case ResumeThread: /* Move from the blocked queue to the tail of */
1116 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1117 event->tso->gran.blocktime +=
1118 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1119 do_the_startthread(event);
1120 goto next_thread; /* handle next event in event queue */
1123 do_the_startthread(event);
1124 goto next_thread; /* handle next event in event queue */
1127 do_the_movethread(event);
1128 goto next_thread; /* handle next event in event queue */
1131 do_the_movespark(event);
1132 goto next_thread; /* handle next event in event queue */
1135 do_the_findwork(event);
1136 goto next_thread; /* handle next event in event queue */
1139 barf("Illegal event type %u\n", event->evttype);
1142 /* This point was scheduler_loop in the old RTS */
1144 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1146 TimeOfLastEvent = CurrentTime[CurrentProc];
1147 TimeOfNextEvent = get_time_of_next_event();
1148 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1149 // CurrentTSO = ThreadQueueHd;
1151 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1154 if (RtsFlags.GranFlags.Light)
1155 GranSimLight_leave_system(event, &ActiveTSO);
1157 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1160 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1162 /* in a GranSim setup the TSO stays on the run queue */
1164 /* Take a thread from the run queue. */
1165 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1168 debugBelch("GRAN: About to run current thread, which is\n");
1171 context_switch = 0; // turned on via GranYield, checking events and time slice
1174 DumpGranEvent(GR_SCHEDULE, t));
1176 procStatus[CurrentProc] = Busy;
1180 /* ----------------------------------------------------------------------------
1181 * Send pending messages (PARALLEL_HASKELL only)
1182 * ------------------------------------------------------------------------- */
1184 #if defined(PARALLEL_HASKELL)
1186 scheduleSendPendingMessages(void)
1192 # if defined(PAR) // global Mem.Mgmt., omit for now
1193 if (PendingFetches != END_BF_QUEUE) {
1198 if (RtsFlags.ParFlags.BufferTime) {
1199 // if we use message buffering, we must send away all message
1200 // packets which have become too old...
1206 /* ----------------------------------------------------------------------------
1207 * Activate spark threads (PARALLEL_HASKELL only)
1208 * ------------------------------------------------------------------------- */
1210 #if defined(PARALLEL_HASKELL)
1212 scheduleActivateSpark(void)
1215 ASSERT(emptyRunQueue());
1216 /* We get here if the run queue is empty and want some work.
1217 We try to turn a spark into a thread, and add it to the run queue,
1218 from where it will be picked up in the next iteration of the scheduler
1222 /* :-[ no local threads => look out for local sparks */
1223 /* the spark pool for the current PE */
1224 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1225 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1226 pool->hd < pool->tl) {
1228 * ToDo: add GC code check that we really have enough heap afterwards!!
1230 * If we're here (no runnable threads) and we have pending
1231 * sparks, we must have a space problem. Get enough space
1232 * to turn one of those pending sparks into a
1236 spark = findSpark(rtsFalse); /* get a spark */
1237 if (spark != (rtsSpark) NULL) {
1238 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1239 IF_PAR_DEBUG(fish, // schedule,
1240 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1241 tso->id, tso, advisory_thread_count));
1243 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1244 IF_PAR_DEBUG(fish, // schedule,
1245 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1247 return rtsFalse; /* failed to generate a thread */
1248 } /* otherwise fall through & pick-up new tso */
1250 IF_PAR_DEBUG(fish, // schedule,
1251 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1252 spark_queue_len(pool)));
1253 return rtsFalse; /* failed to generate a thread */
1255 return rtsTrue; /* success in generating a thread */
1256 } else { /* no more threads permitted or pool empty */
1257 return rtsFalse; /* failed to generateThread */
1260 tso = NULL; // avoid compiler warning only
1261 return rtsFalse; /* dummy in non-PAR setup */
1264 #endif // PARALLEL_HASKELL
1266 /* ----------------------------------------------------------------------------
1267 * Get work from a remote node (PARALLEL_HASKELL only)
1268 * ------------------------------------------------------------------------- */
1270 #if defined(PARALLEL_HASKELL)
1272 scheduleGetRemoteWork(rtsBool *receivedFinish)
1274 ASSERT(emptyRunQueue());
1276 if (RtsFlags.ParFlags.BufferTime) {
1277 IF_PAR_DEBUG(verbose,
1278 debugBelch("...send all pending data,"));
1281 for (i=1; i<=nPEs; i++)
1282 sendImmediately(i); // send all messages away immediately
1286 //++EDEN++ idle() , i.e. send all buffers, wait for work
1287 // suppress fishing in EDEN... just look for incoming messages
1288 // (blocking receive)
1289 IF_PAR_DEBUG(verbose,
1290 debugBelch("...wait for incoming messages...\n"));
1291 *receivedFinish = processMessages(); // blocking receive...
1293 // and reenter scheduling loop after having received something
1294 // (return rtsFalse below)
1296 # else /* activate SPARKS machinery */
1297 /* We get here, if we have no work, tried to activate a local spark, but still
1298 have no work. We try to get a remote spark, by sending a FISH message.
1299 Thread migration should be added here, and triggered when a sequence of
1300 fishes returns without work. */
1301 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1303 /* =8-[ no local sparks => look for work on other PEs */
1305 * We really have absolutely no work. Send out a fish
1306 * (there may be some out there already), and wait for
1307 * something to arrive. We clearly can't run any threads
1308 * until a SCHEDULE or RESUME arrives, and so that's what
1309 * we're hoping to see. (Of course, we still have to
1310 * respond to other types of messages.)
1312 rtsTime now = msTime() /*CURRENT_TIME*/;
1313 IF_PAR_DEBUG(verbose,
1314 debugBelch("-- now=%ld\n", now));
1315 IF_PAR_DEBUG(fish, // verbose,
1316 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1317 (last_fish_arrived_at!=0 &&
1318 last_fish_arrived_at+delay > now)) {
1319 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1320 now, last_fish_arrived_at+delay,
1321 last_fish_arrived_at,
1325 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1326 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1327 if (last_fish_arrived_at==0 ||
1328 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1329 /* outstandingFishes is set in sendFish, processFish;
1330 avoid flooding system with fishes via delay */
1331 next_fish_to_send_at = 0;
1333 /* ToDo: this should be done in the main scheduling loop to avoid the
1334 busy wait here; not so bad if fish delay is very small */
1335 int iq = 0; // DEBUGGING -- HWL
1336 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1337 /* send a fish when ready, but process messages that arrive in the meantime */
1339 if (PacketsWaiting()) {
1341 *receivedFinish = processMessages();
1344 } while (!*receivedFinish || now<next_fish_to_send_at);
1345 // JB: This means the fish could become obsolete, if we receive
1346 // work. Better check for work again?
1347 // last line: while (!receivedFinish || !haveWork || now<...)
1348 // next line: if (receivedFinish || haveWork )
1350 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1351 return rtsFalse; // NB: this will leave scheduler loop
1352 // immediately after return!
1354 IF_PAR_DEBUG(fish, // verbose,
1355 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1359 // JB: IMHO, this should all be hidden inside sendFish(...)
1361 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1364 // Global statistics: count no. of fishes
1365 if (RtsFlags.ParFlags.ParStats.Global &&
1366 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1367 globalParStats.tot_fish_mess++;
1371 /* delayed fishes must have been sent by now! */
1372 next_fish_to_send_at = 0;
1375 *receivedFinish = processMessages();
1376 # endif /* SPARKS */
1379 /* NB: this function always returns rtsFalse, meaning the scheduler
1380 loop continues with the next iteration;
1382 return code means success in finding work; we enter this function
1383 if there is no local work, thus have to send a fish which takes
1384 time until it arrives with work; in the meantime we should process
1385 messages in the main loop;
1388 #endif // PARALLEL_HASKELL
1390 /* ----------------------------------------------------------------------------
1391 * PAR/GRAN: Report stats & debugging info(?)
1392 * ------------------------------------------------------------------------- */
1394 #if defined(PAR) || defined(GRAN)
1396 scheduleGranParReport(void)
1398 ASSERT(run_queue_hd != END_TSO_QUEUE);
1400 /* Take a thread from the run queue, if we have work */
1401 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1403 /* If this TSO has got its outport closed in the meantime,
1404 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1405 * It has to be marked as TH_DEAD for this purpose.
1406 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1408 JB: TODO: investigate wether state change field could be nuked
1409 entirely and replaced by the normal tso state (whatnext
1410 field). All we want to do is to kill tsos from outside.
1413 /* ToDo: write something to the log-file
1414 if (RTSflags.ParFlags.granSimStats && !sameThread)
1415 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1419 /* the spark pool for the current PE */
1420 pool = &(cap.r.rSparks); // cap = (old) MainCap
1423 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1424 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1427 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1428 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1430 if (RtsFlags.ParFlags.ParStats.Full &&
1431 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1432 (emitSchedule || // forced emit
1433 (t && LastTSO && t->id != LastTSO->id))) {
1435 we are running a different TSO, so write a schedule event to log file
1436 NB: If we use fair scheduling we also have to write a deschedule
1437 event for LastTSO; with unfair scheduling we know that the
1438 previous tso has blocked whenever we switch to another tso, so
1439 we don't need it in GUM for now
1441 IF_PAR_DEBUG(fish, // schedule,
1442 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1444 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1445 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1446 emitSchedule = rtsFalse;
1451 /* ----------------------------------------------------------------------------
1452 * After running a thread...
1453 * ------------------------------------------------------------------------- */
1456 schedulePostRunThread(void)
1459 /* HACK 675: if the last thread didn't yield, make sure to print a
1460 SCHEDULE event to the log file when StgRunning the next thread, even
1461 if it is the same one as before */
1463 TimeOfLastYield = CURRENT_TIME;
1466 /* some statistics gathering in the parallel case */
1468 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1472 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1473 globalGranStats.tot_heapover++;
1475 globalParStats.tot_heapover++;
1482 DumpGranEvent(GR_DESCHEDULE, t));
1483 globalGranStats.tot_stackover++;
1486 // DumpGranEvent(GR_DESCHEDULE, t);
1487 globalParStats.tot_stackover++;
1491 case ThreadYielding:
1494 DumpGranEvent(GR_DESCHEDULE, t));
1495 globalGranStats.tot_yields++;
1498 // DumpGranEvent(GR_DESCHEDULE, t);
1499 globalParStats.tot_yields++;
1505 debugTrace(DEBUG_sched,
1506 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1507 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1508 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1509 if (t->block_info.closure!=(StgClosure*)NULL)
1510 print_bq(t->block_info.closure);
1513 // ??? needed; should emit block before
1515 DumpGranEvent(GR_DESCHEDULE, t));
1516 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1519 ASSERT(procStatus[CurrentProc]==Busy ||
1520 ((procStatus[CurrentProc]==Fetching) &&
1521 (t->block_info.closure!=(StgClosure*)NULL)));
1522 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1523 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1524 procStatus[CurrentProc]==Fetching))
1525 procStatus[CurrentProc] = Idle;
1528 //++PAR++ blockThread() writes the event (change?)
1532 case ThreadFinished:
1536 barf("parGlobalStats: unknown return code");
1542 /* -----------------------------------------------------------------------------
1543 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1544 * -------------------------------------------------------------------------- */
1547 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1549 // did the task ask for a large block?
1550 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1551 // if so, get one and push it on the front of the nursery.
1555 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1557 debugTrace(DEBUG_sched,
1558 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1559 (long)t->id, whatNext_strs[t->what_next], blocks);
1561 // don't do this if the nursery is (nearly) full, we'll GC first.
1562 if (cap->r.rCurrentNursery->link != NULL ||
1563 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1564 // if the nursery has only one block.
1567 bd = allocGroup( blocks );
1569 cap->r.rNursery->n_blocks += blocks;
1571 // link the new group into the list
1572 bd->link = cap->r.rCurrentNursery;
1573 bd->u.back = cap->r.rCurrentNursery->u.back;
1574 if (cap->r.rCurrentNursery->u.back != NULL) {
1575 cap->r.rCurrentNursery->u.back->link = bd;
1577 #if !defined(THREADED_RTS)
1578 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1579 g0s0 == cap->r.rNursery);
1581 cap->r.rNursery->blocks = bd;
1583 cap->r.rCurrentNursery->u.back = bd;
1585 // initialise it as a nursery block. We initialise the
1586 // step, gen_no, and flags field of *every* sub-block in
1587 // this large block, because this is easier than making
1588 // sure that we always find the block head of a large
1589 // block whenever we call Bdescr() (eg. evacuate() and
1590 // isAlive() in the GC would both have to do this, at
1594 for (x = bd; x < bd + blocks; x++) {
1595 x->step = cap->r.rNursery;
1601 // This assert can be a killer if the app is doing lots
1602 // of large block allocations.
1603 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1605 // now update the nursery to point to the new block
1606 cap->r.rCurrentNursery = bd;
1608 // we might be unlucky and have another thread get on the
1609 // run queue before us and steal the large block, but in that
1610 // case the thread will just end up requesting another large
1612 pushOnRunQueue(cap,t);
1613 return rtsFalse; /* not actually GC'ing */
1617 debugTrace(DEBUG_sched,
1618 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1619 (long)t->id, whatNext_strs[t->what_next]);
1622 ASSERT(!is_on_queue(t,CurrentProc));
1623 #elif defined(PARALLEL_HASKELL)
1624 /* Currently we emit a DESCHEDULE event before GC in GUM.
1625 ToDo: either add separate event to distinguish SYSTEM time from rest
1626 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1627 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1628 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1629 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1630 emitSchedule = rtsTrue;
1634 if (context_switch) {
1635 // Sometimes we miss a context switch, e.g. when calling
1636 // primitives in a tight loop, MAYBE_GC() doesn't check the
1637 // context switch flag, and we end up waiting for a GC.
1638 // See #1984, and concurrent/should_run/1984
1640 addToRunQueue(cap,t);
1642 pushOnRunQueue(cap,t);
1645 /* actual GC is done at the end of the while loop in schedule() */
1648 /* -----------------------------------------------------------------------------
1649 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1650 * -------------------------------------------------------------------------- */
1653 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1655 debugTrace (DEBUG_sched,
1656 "--<< thread %ld (%s) stopped, StackOverflow",
1657 (long)t->id, whatNext_strs[t->what_next]);
1659 /* just adjust the stack for this thread, then pop it back
1663 /* enlarge the stack */
1664 StgTSO *new_t = threadStackOverflow(cap, t);
1666 /* The TSO attached to this Task may have moved, so update the
1669 if (task->tso == t) {
1672 pushOnRunQueue(cap,new_t);
1676 /* -----------------------------------------------------------------------------
1677 * Handle a thread that returned to the scheduler with ThreadYielding
1678 * -------------------------------------------------------------------------- */
1681 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1683 // Reset the context switch flag. We don't do this just before
1684 // running the thread, because that would mean we would lose ticks
1685 // during GC, which can lead to unfair scheduling (a thread hogs
1686 // the CPU because the tick always arrives during GC). This way
1687 // penalises threads that do a lot of allocation, but that seems
1688 // better than the alternative.
1691 /* put the thread back on the run queue. Then, if we're ready to
1692 * GC, check whether this is the last task to stop. If so, wake
1693 * up the GC thread. getThread will block during a GC until the
1697 if (t->what_next != prev_what_next) {
1698 debugTrace(DEBUG_sched,
1699 "--<< thread %ld (%s) stopped to switch evaluators",
1700 (long)t->id, whatNext_strs[t->what_next]);
1702 debugTrace(DEBUG_sched,
1703 "--<< thread %ld (%s) stopped, yielding",
1704 (long)t->id, whatNext_strs[t->what_next]);
1709 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1711 ASSERT(t->link == END_TSO_QUEUE);
1713 // Shortcut if we're just switching evaluators: don't bother
1714 // doing stack squeezing (which can be expensive), just run the
1716 if (t->what_next != prev_what_next) {
1721 ASSERT(!is_on_queue(t,CurrentProc));
1724 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1725 checkThreadQsSanity(rtsTrue));
1729 addToRunQueue(cap,t);
1732 /* add a ContinueThread event to actually process the thread */
1733 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1735 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1737 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1744 /* -----------------------------------------------------------------------------
1745 * Handle a thread that returned to the scheduler with ThreadBlocked
1746 * -------------------------------------------------------------------------- */
1749 scheduleHandleThreadBlocked( StgTSO *t
1750 #if !defined(GRAN) && !defined(DEBUG)
1757 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1758 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1759 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1761 // ??? needed; should emit block before
1763 DumpGranEvent(GR_DESCHEDULE, t));
1764 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1767 ASSERT(procStatus[CurrentProc]==Busy ||
1768 ((procStatus[CurrentProc]==Fetching) &&
1769 (t->block_info.closure!=(StgClosure*)NULL)));
1770 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1771 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1772 procStatus[CurrentProc]==Fetching))
1773 procStatus[CurrentProc] = Idle;
1777 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1778 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1781 if (t->block_info.closure!=(StgClosure*)NULL)
1782 print_bq(t->block_info.closure));
1784 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1787 /* whatever we schedule next, we must log that schedule */
1788 emitSchedule = rtsTrue;
1792 // We don't need to do anything. The thread is blocked, and it
1793 // has tidied up its stack and placed itself on whatever queue
1794 // it needs to be on.
1796 // ASSERT(t->why_blocked != NotBlocked);
1797 // Not true: for example,
1798 // - in THREADED_RTS, the thread may already have been woken
1799 // up by another Capability. This actually happens: try
1800 // conc023 +RTS -N2.
1801 // - the thread may have woken itself up already, because
1802 // threadPaused() might have raised a blocked throwTo
1803 // exception, see maybePerformBlockedException().
1806 if (traceClass(DEBUG_sched)) {
1807 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1808 (unsigned long)t->id, whatNext_strs[t->what_next]);
1809 printThreadBlockage(t);
1814 /* Only for dumping event to log file
1815 ToDo: do I need this in GranSim, too?
1821 /* -----------------------------------------------------------------------------
1822 * Handle a thread that returned to the scheduler with ThreadFinished
1823 * -------------------------------------------------------------------------- */
1826 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1828 /* Need to check whether this was a main thread, and if so,
1829 * return with the return value.
1831 * We also end up here if the thread kills itself with an
1832 * uncaught exception, see Exception.cmm.
1834 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1835 (unsigned long)t->id, whatNext_strs[t->what_next]);
1838 endThread(t, CurrentProc); // clean-up the thread
1839 #elif defined(PARALLEL_HASKELL)
1840 /* For now all are advisory -- HWL */
1841 //if(t->priority==AdvisoryPriority) ??
1842 advisory_thread_count--; // JB: Caution with this counter, buggy!
1845 if(t->dist.priority==RevalPriority)
1849 # if defined(EDENOLD)
1850 // the thread could still have an outport... (BUG)
1851 if (t->eden.outport != -1) {
1852 // delete the outport for the tso which has finished...
1853 IF_PAR_DEBUG(eden_ports,
1854 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1855 t->eden.outport, t->id));
1858 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1859 if (t->eden.epid != -1) {
1860 IF_PAR_DEBUG(eden_ports,
1861 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1862 t->id, t->eden.epid));
1863 removeTSOfromProcess(t);
1868 if (RtsFlags.ParFlags.ParStats.Full &&
1869 !RtsFlags.ParFlags.ParStats.Suppressed)
1870 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1872 // t->par only contains statistics: left out for now...
1874 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1875 t->id,t,t->par.sparkname));
1877 #endif // PARALLEL_HASKELL
1880 // Check whether the thread that just completed was a bound
1881 // thread, and if so return with the result.
1883 // There is an assumption here that all thread completion goes
1884 // through this point; we need to make sure that if a thread
1885 // ends up in the ThreadKilled state, that it stays on the run
1886 // queue so it can be dealt with here.
1891 if (t->bound != task) {
1892 #if !defined(THREADED_RTS)
1893 // Must be a bound thread that is not the topmost one. Leave
1894 // it on the run queue until the stack has unwound to the
1895 // point where we can deal with this. Leaving it on the run
1896 // queue also ensures that the garbage collector knows about
1897 // this thread and its return value (it gets dropped from the
1898 // all_threads list so there's no other way to find it).
1899 appendToRunQueue(cap,t);
1902 // this cannot happen in the threaded RTS, because a
1903 // bound thread can only be run by the appropriate Task.
1904 barf("finished bound thread that isn't mine");
1908 ASSERT(task->tso == t);
1910 if (t->what_next == ThreadComplete) {
1912 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1913 *(task->ret) = (StgClosure *)task->tso->sp[1];
1915 task->stat = Success;
1918 *(task->ret) = NULL;
1920 if (sched_state >= SCHED_INTERRUPTING) {
1921 task->stat = Interrupted;
1923 task->stat = Killed;
1927 removeThreadLabel((StgWord)task->tso->id);
1929 return rtsTrue; // tells schedule() to return
1935 /* -----------------------------------------------------------------------------
1936 * Perform a heap census
1937 * -------------------------------------------------------------------------- */
1940 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1942 // When we have +RTS -i0 and we're heap profiling, do a census at
1943 // every GC. This lets us get repeatable runs for debugging.
1944 if (performHeapProfile ||
1945 (RtsFlags.ProfFlags.profileInterval==0 &&
1946 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1953 /* -----------------------------------------------------------------------------
1954 * Perform a garbage collection if necessary
1955 * -------------------------------------------------------------------------- */
1958 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1961 rtsBool heap_census;
1963 static volatile StgWord waiting_for_gc;
1964 rtsBool was_waiting;
1969 // In order to GC, there must be no threads running Haskell code.
1970 // Therefore, the GC thread needs to hold *all* the capabilities,
1971 // and release them after the GC has completed.
1973 // This seems to be the simplest way: previous attempts involved
1974 // making all the threads with capabilities give up their
1975 // capabilities and sleep except for the *last* one, which
1976 // actually did the GC. But it's quite hard to arrange for all
1977 // the other tasks to sleep and stay asleep.
1980 was_waiting = cas(&waiting_for_gc, 0, 1);
1983 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1984 if (cap) yieldCapability(&cap,task);
1985 } while (waiting_for_gc);
1986 return cap; // NOTE: task->cap might have changed here
1989 for (i=0; i < n_capabilities; i++) {
1990 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1991 if (cap != &capabilities[i]) {
1992 Capability *pcap = &capabilities[i];
1993 // we better hope this task doesn't get migrated to
1994 // another Capability while we're waiting for this one.
1995 // It won't, because load balancing happens while we have
1996 // all the Capabilities, but even so it's a slightly
1997 // unsavoury invariant.
2000 waitForReturnCapability(&pcap, task);
2001 if (pcap != &capabilities[i]) {
2002 barf("scheduleDoGC: got the wrong capability");
2007 waiting_for_gc = rtsFalse;
2010 /* Kick any transactions which are invalid back to their
2011 * atomically frames. When next scheduled they will try to
2012 * commit, this commit will fail and they will retry.
2017 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2018 if (t->what_next == ThreadRelocated) {
2021 next = t->global_link;
2023 // This is a good place to check for blocked
2024 // exceptions. It might be the case that a thread is
2025 // blocked on delivering an exception to a thread that
2026 // is also blocked - we try to ensure that this
2027 // doesn't happen in throwTo(), but it's too hard (or
2028 // impossible) to close all the race holes, so we
2029 // accept that some might get through and deal with
2030 // them here. A GC will always happen at some point,
2031 // even if the system is otherwise deadlocked.
2032 maybePerformBlockedException (&capabilities[0], t);
2034 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2035 if (!stmValidateNestOfTransactions (t -> trec)) {
2036 debugTrace(DEBUG_sched | DEBUG_stm,
2037 "trec %p found wasting its time", t);
2039 // strip the stack back to the
2040 // ATOMICALLY_FRAME, aborting the (nested)
2041 // transaction, and saving the stack of any
2042 // partially-evaluated thunks on the heap.
2043 throwToSingleThreaded_(&capabilities[0], t,
2044 NULL, rtsTrue, NULL);
2047 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2055 // so this happens periodically:
2056 if (cap) scheduleCheckBlackHoles(cap);
2058 IF_DEBUG(scheduler, printAllThreads());
2061 * We now have all the capabilities; if we're in an interrupting
2062 * state, then we should take the opportunity to delete all the
2063 * threads in the system.
2065 if (sched_state >= SCHED_INTERRUPTING) {
2066 deleteAllThreads(&capabilities[0]);
2067 sched_state = SCHED_SHUTTING_DOWN;
2070 heap_census = scheduleNeedHeapProfile(rtsTrue);
2072 /* everybody back, start the GC.
2073 * Could do it in this thread, or signal a condition var
2074 * to do it in another thread. Either way, we need to
2075 * broadcast on gc_pending_cond afterward.
2077 #if defined(THREADED_RTS)
2078 debugTrace(DEBUG_sched, "doing GC");
2080 GarbageCollect(force_major || heap_census);
2083 debugTrace(DEBUG_sched, "performing heap census");
2085 performHeapProfile = rtsFalse;
2088 #if defined(THREADED_RTS)
2089 // release our stash of capabilities.
2090 for (i = 0; i < n_capabilities; i++) {
2091 if (cap != &capabilities[i]) {
2092 task->cap = &capabilities[i];
2093 releaseCapability(&capabilities[i]);
2104 /* add a ContinueThread event to continue execution of current thread */
2105 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2107 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2109 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2117 /* ---------------------------------------------------------------------------
2118 * Singleton fork(). Do not copy any running threads.
2119 * ------------------------------------------------------------------------- */
2122 forkProcess(HsStablePtr *entry
2123 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2128 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2134 #if defined(THREADED_RTS)
2135 if (RtsFlags.ParFlags.nNodes > 1) {
2136 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2137 stg_exit(EXIT_FAILURE);
2141 debugTrace(DEBUG_sched, "forking!");
2143 // ToDo: for SMP, we should probably acquire *all* the capabilities
2146 // no funny business: hold locks while we fork, otherwise if some
2147 // other thread is holding a lock when the fork happens, the data
2148 // structure protected by the lock will forever be in an
2149 // inconsistent state in the child. See also #1391.
2150 ACQUIRE_LOCK(&sched_mutex);
2151 ACQUIRE_LOCK(&cap->lock);
2152 ACQUIRE_LOCK(&cap->running_task->lock);
2156 if (pid) { // parent
2158 RELEASE_LOCK(&sched_mutex);
2159 RELEASE_LOCK(&cap->lock);
2160 RELEASE_LOCK(&cap->running_task->lock);
2162 // just return the pid
2168 #if defined(THREADED_RTS)
2169 initMutex(&sched_mutex);
2170 initMutex(&cap->lock);
2171 initMutex(&cap->running_task->lock);
2174 // Now, all OS threads except the thread that forked are
2175 // stopped. We need to stop all Haskell threads, including
2176 // those involved in foreign calls. Also we need to delete
2177 // all Tasks, because they correspond to OS threads that are
2180 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2181 if (t->what_next == ThreadRelocated) {
2184 next = t->global_link;
2185 // don't allow threads to catch the ThreadKilled
2186 // exception, but we do want to raiseAsync() because these
2187 // threads may be evaluating thunks that we need later.
2188 deleteThread_(cap,t);
2192 // Empty the run queue. It seems tempting to let all the
2193 // killed threads stay on the run queue as zombies to be
2194 // cleaned up later, but some of them correspond to bound
2195 // threads for which the corresponding Task does not exist.
2196 cap->run_queue_hd = END_TSO_QUEUE;
2197 cap->run_queue_tl = END_TSO_QUEUE;
2199 // Any suspended C-calling Tasks are no more, their OS threads
2201 cap->suspended_ccalling_tasks = NULL;
2203 // Empty the all_threads list. Otherwise, the garbage
2204 // collector may attempt to resurrect some of these threads.
2205 all_threads = END_TSO_QUEUE;
2207 // Wipe the task list, except the current Task.
2208 ACQUIRE_LOCK(&sched_mutex);
2209 for (task = all_tasks; task != NULL; task=task->all_link) {
2210 if (task != cap->running_task) {
2211 #if defined(THREADED_RTS)
2212 initMutex(&task->lock); // see #1391
2217 RELEASE_LOCK(&sched_mutex);
2219 #if defined(THREADED_RTS)
2220 // Wipe our spare workers list, they no longer exist. New
2221 // workers will be created if necessary.
2222 cap->spare_workers = NULL;
2223 cap->returning_tasks_hd = NULL;
2224 cap->returning_tasks_tl = NULL;
2227 // On Unix, all timers are reset in the child, so we need to start
2232 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2233 rts_checkSchedStatus("forkProcess",cap);
2236 hs_exit(); // clean up and exit
2237 stg_exit(EXIT_SUCCESS);
2239 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2240 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2245 /* ---------------------------------------------------------------------------
2246 * Delete all the threads in the system
2247 * ------------------------------------------------------------------------- */
2250 deleteAllThreads ( Capability *cap )
2252 // NOTE: only safe to call if we own all capabilities.
2255 debugTrace(DEBUG_sched,"deleting all threads");
2256 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2257 if (t->what_next == ThreadRelocated) {
2260 next = t->global_link;
2261 deleteThread(cap,t);
2265 // The run queue now contains a bunch of ThreadKilled threads. We
2266 // must not throw these away: the main thread(s) will be in there
2267 // somewhere, and the main scheduler loop has to deal with it.
2268 // Also, the run queue is the only thing keeping these threads from
2269 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2271 #if !defined(THREADED_RTS)
2272 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2273 ASSERT(sleeping_queue == END_TSO_QUEUE);
2277 /* -----------------------------------------------------------------------------
2278 Managing the suspended_ccalling_tasks list.
2279 Locks required: sched_mutex
2280 -------------------------------------------------------------------------- */
2283 suspendTask (Capability *cap, Task *task)
2285 ASSERT(task->next == NULL && task->prev == NULL);
2286 task->next = cap->suspended_ccalling_tasks;
2288 if (cap->suspended_ccalling_tasks) {
2289 cap->suspended_ccalling_tasks->prev = task;
2291 cap->suspended_ccalling_tasks = task;
2295 recoverSuspendedTask (Capability *cap, Task *task)
2298 task->prev->next = task->next;
2300 ASSERT(cap->suspended_ccalling_tasks == task);
2301 cap->suspended_ccalling_tasks = task->next;
2304 task->next->prev = task->prev;
2306 task->next = task->prev = NULL;
2309 /* ---------------------------------------------------------------------------
2310 * Suspending & resuming Haskell threads.
2312 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2313 * its capability before calling the C function. This allows another
2314 * task to pick up the capability and carry on running Haskell
2315 * threads. It also means that if the C call blocks, it won't lock
2318 * The Haskell thread making the C call is put to sleep for the
2319 * duration of the call, on the susepended_ccalling_threads queue. We
2320 * give out a token to the task, which it can use to resume the thread
2321 * on return from the C function.
2322 * ------------------------------------------------------------------------- */
2325 suspendThread (StgRegTable *reg)
2332 StgWord32 saved_winerror;
2335 saved_errno = errno;
2337 saved_winerror = GetLastError();
2340 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2342 cap = regTableToCapability(reg);
2344 task = cap->running_task;
2345 tso = cap->r.rCurrentTSO;
2347 debugTrace(DEBUG_sched,
2348 "thread %lu did a safe foreign call",
2349 (unsigned long)cap->r.rCurrentTSO->id);
2351 // XXX this might not be necessary --SDM
2352 tso->what_next = ThreadRunGHC;
2354 threadPaused(cap,tso);
2356 if ((tso->flags & TSO_BLOCKEX) == 0) {
2357 tso->why_blocked = BlockedOnCCall;
2358 tso->flags |= TSO_BLOCKEX;
2359 tso->flags &= ~TSO_INTERRUPTIBLE;
2361 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2364 // Hand back capability
2365 task->suspended_tso = tso;
2367 ACQUIRE_LOCK(&cap->lock);
2369 suspendTask(cap,task);
2370 cap->in_haskell = rtsFalse;
2371 releaseCapability_(cap);
2373 RELEASE_LOCK(&cap->lock);
2375 #if defined(THREADED_RTS)
2376 /* Preparing to leave the RTS, so ensure there's a native thread/task
2377 waiting to take over.
2379 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2382 errno = saved_errno;
2384 SetLastError(saved_winerror);
2390 resumeThread (void *task_)
2397 StgWord32 saved_winerror;
2400 saved_errno = errno;
2402 saved_winerror = GetLastError();
2406 // Wait for permission to re-enter the RTS with the result.
2407 waitForReturnCapability(&cap,task);
2408 // we might be on a different capability now... but if so, our
2409 // entry on the suspended_ccalling_tasks list will also have been
2412 // Remove the thread from the suspended list
2413 recoverSuspendedTask(cap,task);
2415 tso = task->suspended_tso;
2416 task->suspended_tso = NULL;
2417 tso->link = END_TSO_QUEUE;
2418 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2420 if (tso->why_blocked == BlockedOnCCall) {
2421 awakenBlockedExceptionQueue(cap,tso);
2422 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2425 /* Reset blocking status */
2426 tso->why_blocked = NotBlocked;
2428 cap->r.rCurrentTSO = tso;
2429 cap->in_haskell = rtsTrue;
2430 errno = saved_errno;
2432 SetLastError(saved_winerror);
2435 /* We might have GC'd, mark the TSO dirty again */
2438 IF_DEBUG(sanity, checkTSO(tso));
2443 /* ---------------------------------------------------------------------------
2446 * scheduleThread puts a thread on the end of the runnable queue.
2447 * This will usually be done immediately after a thread is created.
2448 * The caller of scheduleThread must create the thread using e.g.
2449 * createThread and push an appropriate closure
2450 * on this thread's stack before the scheduler is invoked.
2451 * ------------------------------------------------------------------------ */
2454 scheduleThread(Capability *cap, StgTSO *tso)
2456 // The thread goes at the *end* of the run-queue, to avoid possible
2457 // starvation of any threads already on the queue.
2458 appendToRunQueue(cap,tso);
2462 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2464 #if defined(THREADED_RTS)
2465 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2466 // move this thread from now on.
2467 cpu %= RtsFlags.ParFlags.nNodes;
2468 if (cpu == cap->no) {
2469 appendToRunQueue(cap,tso);
2471 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2474 appendToRunQueue(cap,tso);
2479 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2483 // We already created/initialised the Task
2484 task = cap->running_task;
2486 // This TSO is now a bound thread; make the Task and TSO
2487 // point to each other.
2493 task->stat = NoStatus;
2495 appendToRunQueue(cap,tso);
2497 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2500 /* GranSim specific init */
2501 CurrentTSO = m->tso; // the TSO to run
2502 procStatus[MainProc] = Busy; // status of main PE
2503 CurrentProc = MainProc; // PE to run it on
2506 cap = schedule(cap,task);
2508 ASSERT(task->stat != NoStatus);
2509 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2511 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2515 /* ----------------------------------------------------------------------------
2517 * ------------------------------------------------------------------------- */
2519 #if defined(THREADED_RTS)
2521 workerStart(Task *task)
2525 // See startWorkerTask().
2526 ACQUIRE_LOCK(&task->lock);
2528 RELEASE_LOCK(&task->lock);
2530 // set the thread-local pointer to the Task:
2533 // schedule() runs without a lock.
2534 cap = schedule(cap,task);
2536 // On exit from schedule(), we have a Capability.
2537 releaseCapability(cap);
2538 workerTaskStop(task);
2542 /* ---------------------------------------------------------------------------
2545 * Initialise the scheduler. This resets all the queues - if the
2546 * queues contained any threads, they'll be garbage collected at the
2549 * ------------------------------------------------------------------------ */
2556 for (i=0; i<=MAX_PROC; i++) {
2557 run_queue_hds[i] = END_TSO_QUEUE;
2558 run_queue_tls[i] = END_TSO_QUEUE;
2559 blocked_queue_hds[i] = END_TSO_QUEUE;
2560 blocked_queue_tls[i] = END_TSO_QUEUE;
2561 ccalling_threadss[i] = END_TSO_QUEUE;
2562 blackhole_queue[i] = END_TSO_QUEUE;
2563 sleeping_queue = END_TSO_QUEUE;
2565 #elif !defined(THREADED_RTS)
2566 blocked_queue_hd = END_TSO_QUEUE;
2567 blocked_queue_tl = END_TSO_QUEUE;
2568 sleeping_queue = END_TSO_QUEUE;
2571 blackhole_queue = END_TSO_QUEUE;
2572 all_threads = END_TSO_QUEUE;
2575 sched_state = SCHED_RUNNING;
2576 recent_activity = ACTIVITY_YES;
2578 #if defined(THREADED_RTS)
2579 /* Initialise the mutex and condition variables used by
2581 initMutex(&sched_mutex);
2584 ACQUIRE_LOCK(&sched_mutex);
2586 /* A capability holds the state a native thread needs in
2587 * order to execute STG code. At least one capability is
2588 * floating around (only THREADED_RTS builds have more than one).
2594 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2598 #if defined(THREADED_RTS)
2600 * Eagerly start one worker to run each Capability, except for
2601 * Capability 0. The idea is that we're probably going to start a
2602 * bound thread on Capability 0 pretty soon, so we don't want a
2603 * worker task hogging it.
2608 for (i = 1; i < n_capabilities; i++) {
2609 cap = &capabilities[i];
2610 ACQUIRE_LOCK(&cap->lock);
2611 startWorkerTask(cap, workerStart);
2612 RELEASE_LOCK(&cap->lock);
2617 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2619 RELEASE_LOCK(&sched_mutex);
2624 rtsBool wait_foreign
2625 #if !defined(THREADED_RTS)
2626 __attribute__((unused))
2629 /* see Capability.c, shutdownCapability() */
2633 #if defined(THREADED_RTS)
2634 ACQUIRE_LOCK(&sched_mutex);
2635 task = newBoundTask();
2636 RELEASE_LOCK(&sched_mutex);
2639 // If we haven't killed all the threads yet, do it now.
2640 if (sched_state < SCHED_SHUTTING_DOWN) {
2641 sched_state = SCHED_INTERRUPTING;
2642 scheduleDoGC(NULL,task,rtsFalse);
2644 sched_state = SCHED_SHUTTING_DOWN;
2646 #if defined(THREADED_RTS)
2650 for (i = 0; i < n_capabilities; i++) {
2651 shutdownCapability(&capabilities[i], task, wait_foreign);
2653 boundTaskExiting(task);
2657 freeCapability(&MainCapability);
2662 freeScheduler( void )
2665 if (n_capabilities != 1) {
2666 stgFree(capabilities);
2668 #if defined(THREADED_RTS)
2669 closeMutex(&sched_mutex);
2673 /* -----------------------------------------------------------------------------
2676 This is the interface to the garbage collector from Haskell land.
2677 We provide this so that external C code can allocate and garbage
2678 collect when called from Haskell via _ccall_GC.
2679 -------------------------------------------------------------------------- */
2682 performGC_(rtsBool force_major)
2685 // We must grab a new Task here, because the existing Task may be
2686 // associated with a particular Capability, and chained onto the
2687 // suspended_ccalling_tasks queue.
2688 ACQUIRE_LOCK(&sched_mutex);
2689 task = newBoundTask();
2690 RELEASE_LOCK(&sched_mutex);
2691 scheduleDoGC(NULL,task,force_major);
2692 boundTaskExiting(task);
2698 performGC_(rtsFalse);
2702 performMajorGC(void)
2704 performGC_(rtsTrue);
2707 /* -----------------------------------------------------------------------------
2710 If the thread has reached its maximum stack size, then raise the
2711 StackOverflow exception in the offending thread. Otherwise
2712 relocate the TSO into a larger chunk of memory and adjust its stack
2714 -------------------------------------------------------------------------- */
2717 threadStackOverflow(Capability *cap, StgTSO *tso)
2719 nat new_stack_size, stack_words;
2724 IF_DEBUG(sanity,checkTSO(tso));
2726 // don't allow throwTo() to modify the blocked_exceptions queue
2727 // while we are moving the TSO:
2728 lockClosure((StgClosure *)tso);
2730 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2731 // NB. never raise a StackOverflow exception if the thread is
2732 // inside Control.Exceptino.block. It is impractical to protect
2733 // against stack overflow exceptions, since virtually anything
2734 // can raise one (even 'catch'), so this is the only sensible
2735 // thing to do here. See bug #767.
2737 debugTrace(DEBUG_gc,
2738 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2739 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2741 /* If we're debugging, just print out the top of the stack */
2742 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2745 // Send this thread the StackOverflow exception
2747 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2751 /* Try to double the current stack size. If that takes us over the
2752 * maximum stack size for this thread, then use the maximum instead.
2753 * Finally round up so the TSO ends up as a whole number of blocks.
2755 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2756 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2757 TSO_STRUCT_SIZE)/sizeof(W_);
2758 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2759 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2761 debugTrace(DEBUG_sched,
2762 "increasing stack size from %ld words to %d.",
2763 (long)tso->stack_size, new_stack_size);
2765 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2766 TICK_ALLOC_TSO(new_stack_size,0);
2768 /* copy the TSO block and the old stack into the new area */
2769 memcpy(dest,tso,TSO_STRUCT_SIZE);
2770 stack_words = tso->stack + tso->stack_size - tso->sp;
2771 new_sp = (P_)dest + new_tso_size - stack_words;
2772 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2774 /* relocate the stack pointers... */
2776 dest->stack_size = new_stack_size;
2778 /* Mark the old TSO as relocated. We have to check for relocated
2779 * TSOs in the garbage collector and any primops that deal with TSOs.
2781 * It's important to set the sp value to just beyond the end
2782 * of the stack, so we don't attempt to scavenge any part of the
2785 tso->what_next = ThreadRelocated;
2787 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2788 tso->why_blocked = NotBlocked;
2790 IF_PAR_DEBUG(verbose,
2791 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2792 tso->id, tso, tso->stack_size);
2793 /* If we're debugging, just print out the top of the stack */
2794 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2800 IF_DEBUG(sanity,checkTSO(dest));
2802 IF_DEBUG(scheduler,printTSO(dest));
2808 /* ---------------------------------------------------------------------------
2810 - usually called inside a signal handler so it mustn't do anything fancy.
2811 ------------------------------------------------------------------------ */
2814 interruptStgRts(void)
2816 sched_state = SCHED_INTERRUPTING;
2821 /* -----------------------------------------------------------------------------
2824 This function causes at least one OS thread to wake up and run the
2825 scheduler loop. It is invoked when the RTS might be deadlocked, or
2826 an external event has arrived that may need servicing (eg. a
2827 keyboard interrupt).
2829 In the single-threaded RTS we don't do anything here; we only have
2830 one thread anyway, and the event that caused us to want to wake up
2831 will have interrupted any blocking system call in progress anyway.
2832 -------------------------------------------------------------------------- */
2837 #if defined(THREADED_RTS)
2838 // This forces the IO Manager thread to wakeup, which will
2839 // in turn ensure that some OS thread wakes up and runs the
2840 // scheduler loop, which will cause a GC and deadlock check.
2845 /* -----------------------------------------------------------------------------
2848 * Check the blackhole_queue for threads that can be woken up. We do
2849 * this periodically: before every GC, and whenever the run queue is
2852 * An elegant solution might be to just wake up all the blocked
2853 * threads with awakenBlockedQueue occasionally: they'll go back to
2854 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2855 * doesn't give us a way to tell whether we've actually managed to
2856 * wake up any threads, so we would be busy-waiting.
2858 * -------------------------------------------------------------------------- */
2861 checkBlackHoles (Capability *cap)
2864 rtsBool any_woke_up = rtsFalse;
2867 // blackhole_queue is global:
2868 ASSERT_LOCK_HELD(&sched_mutex);
2870 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2872 // ASSUMES: sched_mutex
2873 prev = &blackhole_queue;
2874 t = blackhole_queue;
2875 while (t != END_TSO_QUEUE) {
2876 ASSERT(t->why_blocked == BlockedOnBlackHole);
2877 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2878 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2879 IF_DEBUG(sanity,checkTSO(t));
2880 t = unblockOne(cap, t);
2881 // urk, the threads migrate to the current capability
2882 // here, but we'd like to keep them on the original one.
2884 any_woke_up = rtsTrue;
2894 /* -----------------------------------------------------------------------------
2897 This is used for interruption (^C) and forking, and corresponds to
2898 raising an exception but without letting the thread catch the
2900 -------------------------------------------------------------------------- */
2903 deleteThread (Capability *cap, StgTSO *tso)
2905 // NOTE: must only be called on a TSO that we have exclusive
2906 // access to, because we will call throwToSingleThreaded() below.
2907 // The TSO must be on the run queue of the Capability we own, or
2908 // we must own all Capabilities.
2910 if (tso->why_blocked != BlockedOnCCall &&
2911 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2912 throwToSingleThreaded(cap,tso,NULL);
2916 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2918 deleteThread_(Capability *cap, StgTSO *tso)
2919 { // for forkProcess only:
2920 // like deleteThread(), but we delete threads in foreign calls, too.
2922 if (tso->why_blocked == BlockedOnCCall ||
2923 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2924 unblockOne(cap,tso);
2925 tso->what_next = ThreadKilled;
2927 deleteThread(cap,tso);
2932 /* -----------------------------------------------------------------------------
2933 raiseExceptionHelper
2935 This function is called by the raise# primitve, just so that we can
2936 move some of the tricky bits of raising an exception from C-- into
2937 C. Who knows, it might be a useful re-useable thing here too.
2938 -------------------------------------------------------------------------- */
2941 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2943 Capability *cap = regTableToCapability(reg);
2944 StgThunk *raise_closure = NULL;
2946 StgRetInfoTable *info;
2948 // This closure represents the expression 'raise# E' where E
2949 // is the exception raise. It is used to overwrite all the
2950 // thunks which are currently under evaluataion.
2953 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2954 // LDV profiling: stg_raise_info has THUNK as its closure
2955 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2956 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2957 // 1 does not cause any problem unless profiling is performed.
2958 // However, when LDV profiling goes on, we need to linearly scan
2959 // small object pool, where raise_closure is stored, so we should
2960 // use MIN_UPD_SIZE.
2962 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2963 // sizeofW(StgClosure)+1);
2967 // Walk up the stack, looking for the catch frame. On the way,
2968 // we update any closures pointed to from update frames with the
2969 // raise closure that we just built.
2973 info = get_ret_itbl((StgClosure *)p);
2974 next = p + stack_frame_sizeW((StgClosure *)p);
2975 switch (info->i.type) {
2978 // Only create raise_closure if we need to.
2979 if (raise_closure == NULL) {
2981 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2982 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2983 raise_closure->payload[0] = exception;
2985 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2989 case ATOMICALLY_FRAME:
2990 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2992 return ATOMICALLY_FRAME;
2998 case CATCH_STM_FRAME:
2999 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3001 return CATCH_STM_FRAME;
3007 case CATCH_RETRY_FRAME:
3016 /* -----------------------------------------------------------------------------
3017 findRetryFrameHelper
3019 This function is called by the retry# primitive. It traverses the stack
3020 leaving tso->sp referring to the frame which should handle the retry.
3022 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3023 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3025 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3026 create) because retries are not considered to be exceptions, despite the
3027 similar implementation.
3029 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3030 not be created within memory transactions.
3031 -------------------------------------------------------------------------- */
3034 findRetryFrameHelper (StgTSO *tso)
3037 StgRetInfoTable *info;
3041 info = get_ret_itbl((StgClosure *)p);
3042 next = p + stack_frame_sizeW((StgClosure *)p);
3043 switch (info->i.type) {
3045 case ATOMICALLY_FRAME:
3046 debugTrace(DEBUG_stm,
3047 "found ATOMICALLY_FRAME at %p during retry", p);
3049 return ATOMICALLY_FRAME;
3051 case CATCH_RETRY_FRAME:
3052 debugTrace(DEBUG_stm,
3053 "found CATCH_RETRY_FRAME at %p during retrry", p);
3055 return CATCH_RETRY_FRAME;
3057 case CATCH_STM_FRAME: {
3058 StgTRecHeader *trec = tso -> trec;
3059 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3060 debugTrace(DEBUG_stm,
3061 "found CATCH_STM_FRAME at %p during retry", p);
3062 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3063 stmAbortTransaction(tso -> cap, trec);
3064 stmFreeAbortedTRec(tso -> cap, trec);
3065 tso -> trec = outer;
3072 ASSERT(info->i.type != CATCH_FRAME);
3073 ASSERT(info->i.type != STOP_FRAME);
3080 /* -----------------------------------------------------------------------------
3081 resurrectThreads is called after garbage collection on the list of
3082 threads found to be garbage. Each of these threads will be woken
3083 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3084 on an MVar, or NonTermination if the thread was blocked on a Black
3087 Locks: assumes we hold *all* the capabilities.
3088 -------------------------------------------------------------------------- */
3091 resurrectThreads (StgTSO *threads)
3096 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3097 next = tso->global_link;
3098 tso->global_link = all_threads;
3100 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3102 // Wake up the thread on the Capability it was last on
3105 switch (tso->why_blocked) {
3107 case BlockedOnException:
3108 /* Called by GC - sched_mutex lock is currently held. */
3109 throwToSingleThreaded(cap, tso,
3110 (StgClosure *)BlockedOnDeadMVar_closure);
3112 case BlockedOnBlackHole:
3113 throwToSingleThreaded(cap, tso,
3114 (StgClosure *)NonTermination_closure);
3117 throwToSingleThreaded(cap, tso,
3118 (StgClosure *)BlockedIndefinitely_closure);
3121 /* This might happen if the thread was blocked on a black hole
3122 * belonging to a thread that we've just woken up (raiseAsync
3123 * can wake up threads, remember...).
3127 barf("resurrectThreads: thread blocked in a strange way");