1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
44 #include "Capability.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
51 #include "RaiseAsync.h"
53 #include "ThrIOManager.h"
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
70 // Turn off inlining when debugging - it obfuscates things
73 # define STATIC_INLINE static
76 /* -----------------------------------------------------------------------------
78 * -------------------------------------------------------------------------- */
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
112 StgTSO *blackhole_queue = NULL;
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
119 rtsBool blackholes_need_checking = rtsFalse;
121 /* Linked list of all threads.
122 * Used for detecting garbage collected threads.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *all_threads = NULL;
127 /* flag set by signal handler to precipitate a context switch
128 * LOCK: none (just an advisory flag)
130 int context_switch = 0;
132 /* flag that tracks whether we have done any execution in this time slice.
133 * LOCK: currently none, perhaps we should lock (but needs to be
134 * updated in the fast path of the scheduler).
136 nat recent_activity = ACTIVITY_YES;
138 /* if this flag is set as well, give up execution
139 * LOCK: none (changes once, from false->true)
141 rtsBool sched_state = SCHED_RUNNING;
147 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
148 * exists - earlier gccs apparently didn't.
154 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155 * in an MT setting, needed to signal that a worker thread shouldn't hang around
156 * in the scheduler when it is out of work.
158 rtsBool shutting_down_scheduler = rtsFalse;
161 * This mutex protects most of the global scheduler data in
162 * the THREADED_RTS runtime.
164 #if defined(THREADED_RTS)
168 #if defined(PARALLEL_HASKELL)
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
178 /* -----------------------------------------------------------------------------
179 * static function prototypes
180 * -------------------------------------------------------------------------- */
182 static Capability *schedule (Capability *initialCapability, Task *task);
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
214 nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220 rtsBool force_major);
222 static rtsBool checkBlackHoles(Capability *cap);
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);
239 static char *whatNext_strs[] = {
249 /* -----------------------------------------------------------------------------
250 * Putting a thread on the run queue: different scheduling policies
251 * -------------------------------------------------------------------------- */
254 addToRunQueue( Capability *cap, StgTSO *t )
256 #if defined(PARALLEL_HASKELL)
257 if (RtsFlags.ParFlags.doFairScheduling) {
258 // this does round-robin scheduling; good for concurrency
259 appendToRunQueue(cap,t);
261 // this does unfair scheduling; good for parallelism
262 pushOnRunQueue(cap,t);
265 // this does round-robin scheduling; good for concurrency
266 appendToRunQueue(cap,t);
270 /* ---------------------------------------------------------------------------
271 Main scheduling loop.
273 We use round-robin scheduling, each thread returning to the
274 scheduler loop when one of these conditions is detected:
277 * timer expires (thread yields)
283 In a GranSim setup this loop iterates over the global event queue.
284 This revolves around the global event queue, which determines what
285 to do next. Therefore, it's more complicated than either the
286 concurrent or the parallel (GUM) setup.
289 GUM iterates over incoming messages.
290 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291 and sends out a fish whenever it has nothing to do; in-between
292 doing the actual reductions (shared code below) it processes the
293 incoming messages and deals with delayed operations
294 (see PendingFetches).
295 This is not the ugliest code you could imagine, but it's bloody close.
297 ------------------------------------------------------------------------ */
300 schedule (Capability *initialCapability, Task *task)
304 StgThreadReturnCode ret;
307 #elif defined(PARALLEL_HASKELL)
310 rtsBool receivedFinish = rtsFalse;
312 nat tp_size, sp_size; // stats only
317 #if defined(THREADED_RTS)
318 rtsBool first = rtsTrue;
321 cap = initialCapability;
323 // Pre-condition: this task owns initialCapability.
324 // The sched_mutex is *NOT* held
325 // NB. on return, we still hold a capability.
327 debugTrace (DEBUG_sched,
328 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329 task, initialCapability);
333 // -----------------------------------------------------------
334 // Scheduler loop starts here:
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION (!receivedFinish)
339 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
341 #define TERMINATION_CONDITION rtsTrue
344 while (TERMINATION_CONDITION) {
347 /* Choose the processor with the next event */
348 CurrentProc = event->proc;
349 CurrentTSO = event->tso;
352 #if defined(THREADED_RTS)
354 // don't yield the first time, we want a chance to run this
355 // thread for a bit, even if there are others banging at the
358 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360 // Yield the capability to higher-priority tasks if necessary.
361 yieldCapability(&cap, task);
365 #if defined(THREADED_RTS)
366 schedulePushWork(cap,task);
369 // Check whether we have re-entered the RTS from Haskell without
370 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372 if (cap->in_haskell) {
373 errorBelch("schedule: re-entered unsafely.\n"
374 " Perhaps a 'foreign import unsafe' should be 'safe'?");
375 stg_exit(EXIT_FAILURE);
378 // The interruption / shutdown sequence.
380 // In order to cleanly shut down the runtime, we want to:
381 // * make sure that all main threads return to their callers
382 // with the state 'Interrupted'.
383 // * clean up all OS threads assocated with the runtime
384 // * free all memory etc.
386 // So the sequence for ^C goes like this:
388 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
389 // arranges for some Capability to wake up
391 // * all threads in the system are halted, and the zombies are
392 // placed on the run queue for cleaning up. We acquire all
393 // the capabilities in order to delete the threads, this is
394 // done by scheduleDoGC() for convenience (because GC already
395 // needs to acquire all the capabilities). We can't kill
396 // threads involved in foreign calls.
398 // * somebody calls shutdownHaskell(), which calls exitScheduler()
400 // * sched_state := SCHED_SHUTTING_DOWN
402 // * all workers exit when the run queue on their capability
403 // drains. All main threads will also exit when their TSO
404 // reaches the head of the run queue and they can return.
406 // * eventually all Capabilities will shut down, and the RTS can
409 // * We might be left with threads blocked in foreign calls,
410 // we should really attempt to kill these somehow (TODO);
412 switch (sched_state) {
415 case SCHED_INTERRUPTING:
416 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418 discardSparksCap(cap);
420 /* scheduleDoGC() deletes all the threads */
421 cap = scheduleDoGC(cap,task,rtsFalse);
423 case SCHED_SHUTTING_DOWN:
424 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425 // If we are a worker, just exit. If we're a bound thread
426 // then we will exit below when we've removed our TSO from
428 if (task->tso == NULL && emptyRunQueue(cap)) {
433 barf("sched_state: %d", sched_state);
436 #if defined(THREADED_RTS)
437 // If the run queue is empty, take a spark and turn it into a thread.
439 if (emptyRunQueue(cap)) {
441 spark = findSpark(cap);
443 debugTrace(DEBUG_sched,
444 "turning spark of closure %p into a thread",
445 (StgClosure *)spark);
446 createSparkThread(cap,spark);
450 #endif // THREADED_RTS
452 scheduleStartSignalHandlers(cap);
454 // Only check the black holes here if we've nothing else to do.
455 // During normal execution, the black hole list only gets checked
456 // at GC time, to avoid repeatedly traversing this possibly long
457 // list each time around the scheduler.
458 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460 scheduleCheckWakeupThreads(cap);
462 scheduleCheckBlockedThreads(cap);
464 scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466 cap = task->cap; // reload cap, it might have changed
469 // Normally, the only way we can get here with no threads to
470 // run is if a keyboard interrupt received during
471 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472 // Additionally, it is not fatal for the
473 // threaded RTS to reach here with no threads to run.
475 // win32: might be here due to awaitEvent() being abandoned
476 // as a result of a console event having been delivered.
477 if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479 ASSERT(sched_state >= SCHED_INTERRUPTING);
481 continue; // nothing to do
484 #if defined(PARALLEL_HASKELL)
485 scheduleSendPendingMessages();
486 if (emptyRunQueue(cap) && scheduleActivateSpark())
490 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
493 /* If we still have no work we need to send a FISH to get a spark
495 if (emptyRunQueue(cap)) {
496 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497 ASSERT(rtsFalse); // should not happen at the moment
499 // from here: non-empty run queue.
500 // TODO: merge above case with this, only one call processMessages() !
501 if (PacketsWaiting()) { /* process incoming messages, if
502 any pending... only in else
503 because getRemoteWork waits for
505 receivedFinish = processMessages();
510 scheduleProcessEvent(event);
514 // Get a thread to run
516 t = popRunQueue(cap);
518 #if defined(GRAN) || defined(PAR)
519 scheduleGranParReport(); // some kind of debuging output
521 // Sanity check the thread we're about to run. This can be
522 // expensive if there is lots of thread switching going on...
523 IF_DEBUG(sanity,checkTSO(t));
526 #if defined(THREADED_RTS)
527 // Check whether we can run this thread in the current task.
528 // If not, we have to pass our capability to the right task.
530 Task *bound = t->bound;
534 debugTrace(DEBUG_sched,
535 "### Running thread %lu in bound thread", (unsigned long)t->id);
536 // yes, the Haskell thread is bound to the current native thread
538 debugTrace(DEBUG_sched,
539 "### thread %lu bound to another OS thread", (unsigned long)t->id);
540 // no, bound to a different Haskell thread: pass to that thread
541 pushOnRunQueue(cap,t);
545 // The thread we want to run is unbound.
547 debugTrace(DEBUG_sched,
548 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549 // no, the current native thread is bound to a different
550 // Haskell thread, so pass it to any worker thread
551 pushOnRunQueue(cap,t);
558 cap->r.rCurrentTSO = t;
560 /* context switches are initiated by the timer signal, unless
561 * the user specified "context switch as often as possible", with
564 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565 && !emptyThreadQueues(cap)) {
571 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
572 (long)t->id, whatNext_strs[t->what_next]);
574 startHeapProfTimer();
576 // Check for exceptions blocked on this thread
577 maybePerformBlockedException (cap, t);
579 // ----------------------------------------------------------------------
580 // Run the current thread
582 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583 ASSERT(t->cap == cap);
585 prev_what_next = t->what_next;
587 errno = t->saved_errno;
589 SetLastError(t->saved_winerror);
592 cap->in_haskell = rtsTrue;
596 #if defined(THREADED_RTS)
597 if (recent_activity == ACTIVITY_DONE_GC) {
598 // ACTIVITY_DONE_GC means we turned off the timer signal to
599 // conserve power (see #1623). Re-enable it here.
601 prev = xchg(&recent_activity, ACTIVITY_YES);
602 if (prev == ACTIVITY_DONE_GC) {
606 recent_activity = ACTIVITY_YES;
610 switch (prev_what_next) {
614 /* Thread already finished, return to scheduler. */
615 ret = ThreadFinished;
621 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
622 cap = regTableToCapability(r);
627 case ThreadInterpret:
628 cap = interpretBCO(cap);
633 barf("schedule: invalid what_next field");
636 cap->in_haskell = rtsFalse;
638 // The TSO might have moved, eg. if it re-entered the RTS and a GC
639 // happened. So find the new location:
640 t = cap->r.rCurrentTSO;
642 // We have run some Haskell code: there might be blackhole-blocked
643 // threads to wake up now.
644 // Lock-free test here should be ok, we're just setting a flag.
645 if ( blackhole_queue != END_TSO_QUEUE ) {
646 blackholes_need_checking = rtsTrue;
649 // And save the current errno in this thread.
650 // XXX: possibly bogus for SMP because this thread might already
651 // be running again, see code below.
652 t->saved_errno = errno;
654 // Similarly for Windows error code
655 t->saved_winerror = GetLastError();
658 #if defined(THREADED_RTS)
659 // If ret is ThreadBlocked, and this Task is bound to the TSO that
660 // blocked, we are in limbo - the TSO is now owned by whatever it
661 // is blocked on, and may in fact already have been woken up,
662 // perhaps even on a different Capability. It may be the case
663 // that task->cap != cap. We better yield this Capability
664 // immediately and return to normaility.
665 if (ret == ThreadBlocked) {
666 debugTrace(DEBUG_sched,
667 "--<< thread %lu (%s) stopped: blocked",
668 (unsigned long)t->id, whatNext_strs[t->what_next]);
673 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674 ASSERT(t->cap == cap);
676 // ----------------------------------------------------------------------
678 // Costs for the scheduler are assigned to CCS_SYSTEM
680 #if defined(PROFILING)
684 schedulePostRunThread();
686 ready_to_gc = rtsFalse;
690 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
694 scheduleHandleStackOverflow(cap,task,t);
698 if (scheduleHandleYield(cap, t, prev_what_next)) {
699 // shortcut for switching between compiler/interpreter:
705 scheduleHandleThreadBlocked(t);
709 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
714 barf("schedule: invalid thread return code %d", (int)ret);
717 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718 cap = scheduleDoGC(cap,task,rtsFalse);
720 } /* end of while() */
722 debugTrace(PAR_DEBUG_verbose,
723 "== Leaving schedule() after having received Finish");
726 /* ----------------------------------------------------------------------------
727 * Setting up the scheduler loop
728 * ------------------------------------------------------------------------- */
731 schedulePreLoop(void)
734 /* set up first event to get things going */
735 /* ToDo: assign costs for system setup and init MainTSO ! */
736 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
738 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
740 debugTrace (DEBUG_gran,
741 "GRAN: Init CurrentTSO (in schedule) = %p",
743 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
745 if (RtsFlags.GranFlags.Light) {
746 /* Save current time; GranSim Light only */
747 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
752 /* -----------------------------------------------------------------------------
755 * Push work to other Capabilities if we have some.
756 * -------------------------------------------------------------------------- */
758 #if defined(THREADED_RTS)
760 schedulePushWork(Capability *cap USED_IF_THREADS,
761 Task *task USED_IF_THREADS)
763 Capability *free_caps[n_capabilities], *cap0;
766 // migration can be turned off with +RTS -qg
767 if (!RtsFlags.ParFlags.migrate) return;
769 // Check whether we have more threads on our run queue, or sparks
770 // in our pool, that we could hand to another Capability.
771 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
772 && sparkPoolSizeCap(cap) < 2) {
776 // First grab as many free Capabilities as we can.
777 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
778 cap0 = &capabilities[i];
779 if (cap != cap0 && tryGrabCapability(cap0,task)) {
780 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
781 // it already has some work, we just grabbed it at
782 // the wrong moment. Or maybe it's deadlocked!
783 releaseCapability(cap0);
785 free_caps[n_free_caps++] = cap0;
790 // we now have n_free_caps free capabilities stashed in
791 // free_caps[]. Share our run queue equally with them. This is
792 // probably the simplest thing we could do; improvements we might
793 // want to do include:
795 // - giving high priority to moving relatively new threads, on
796 // the gournds that they haven't had time to build up a
797 // working set in the cache on this CPU/Capability.
799 // - giving low priority to moving long-lived threads
801 if (n_free_caps > 0) {
802 StgTSO *prev, *t, *next;
803 rtsBool pushed_to_all;
805 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
808 pushed_to_all = rtsFalse;
810 if (cap->run_queue_hd != END_TSO_QUEUE) {
811 prev = cap->run_queue_hd;
813 prev->link = END_TSO_QUEUE;
814 for (; t != END_TSO_QUEUE; t = next) {
816 t->link = END_TSO_QUEUE;
817 if (t->what_next == ThreadRelocated
818 || t->bound == task // don't move my bound thread
819 || tsoLocked(t)) { // don't move a locked thread
822 } else if (i == n_free_caps) {
823 pushed_to_all = rtsTrue;
829 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
830 appendToRunQueue(free_caps[i],t);
831 if (t->bound) { t->bound->cap = free_caps[i]; }
832 t->cap = free_caps[i];
836 cap->run_queue_tl = prev;
839 // If there are some free capabilities that we didn't push any
840 // threads to, then try to push a spark to each one.
841 if (!pushed_to_all) {
843 // i is the next free capability to push to
844 for (; i < n_free_caps; i++) {
845 if (emptySparkPoolCap(free_caps[i])) {
846 spark = findSpark(cap);
848 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
849 newSpark(&(free_caps[i]->r), spark);
855 // release the capabilities
856 for (i = 0; i < n_free_caps; i++) {
857 task->cap = free_caps[i];
858 releaseCapability(free_caps[i]);
861 task->cap = cap; // reset to point to our Capability.
865 /* ----------------------------------------------------------------------------
866 * Start any pending signal handlers
867 * ------------------------------------------------------------------------- */
869 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
871 scheduleStartSignalHandlers(Capability *cap)
873 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
874 // safe outside the lock
875 startSignalHandlers(cap);
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
885 /* ----------------------------------------------------------------------------
886 * Check for blocked threads that can be woken up.
887 * ------------------------------------------------------------------------- */
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
892 #if !defined(THREADED_RTS)
894 // Check whether any waiting threads need to be woken up. If the
895 // run queue is empty, and there are no other tasks running, we
896 // can wait indefinitely for something to happen.
898 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
900 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
906 /* ----------------------------------------------------------------------------
907 * Check for threads woken up by other Capabilities
908 * ------------------------------------------------------------------------- */
911 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
913 #if defined(THREADED_RTS)
914 // Any threads that were woken up by other Capabilities get
915 // appended to our run queue.
916 if (!emptyWakeupQueue(cap)) {
917 ACQUIRE_LOCK(&cap->lock);
918 if (emptyRunQueue(cap)) {
919 cap->run_queue_hd = cap->wakeup_queue_hd;
920 cap->run_queue_tl = cap->wakeup_queue_tl;
922 cap->run_queue_tl->link = cap->wakeup_queue_hd;
923 cap->run_queue_tl = cap->wakeup_queue_tl;
925 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
926 RELEASE_LOCK(&cap->lock);
931 /* ----------------------------------------------------------------------------
932 * Check for threads blocked on BLACKHOLEs that can be woken up
933 * ------------------------------------------------------------------------- */
935 scheduleCheckBlackHoles (Capability *cap)
937 if ( blackholes_need_checking ) // check without the lock first
939 ACQUIRE_LOCK(&sched_mutex);
940 if ( blackholes_need_checking ) {
941 checkBlackHoles(cap);
942 blackholes_need_checking = rtsFalse;
944 RELEASE_LOCK(&sched_mutex);
948 /* ----------------------------------------------------------------------------
949 * Detect deadlock conditions and attempt to resolve them.
950 * ------------------------------------------------------------------------- */
953 scheduleDetectDeadlock (Capability *cap, Task *task)
956 #if defined(PARALLEL_HASKELL)
957 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
962 * Detect deadlock: when we have no threads to run, there are no
963 * threads blocked, waiting for I/O, or sleeping, and all the
964 * other tasks are waiting for work, we must have a deadlock of
967 if ( emptyThreadQueues(cap) )
969 #if defined(THREADED_RTS)
971 * In the threaded RTS, we only check for deadlock if there
972 * has been no activity in a complete timeslice. This means
973 * we won't eagerly start a full GC just because we don't have
974 * any threads to run currently.
976 if (recent_activity != ACTIVITY_INACTIVE) return;
979 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
981 // Garbage collection can release some new threads due to
982 // either (a) finalizers or (b) threads resurrected because
983 // they are unreachable and will therefore be sent an
984 // exception. Any threads thus released will be immediately
986 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
988 recent_activity = ACTIVITY_DONE_GC;
989 // disable timer signals (see #1623)
992 if ( !emptyRunQueue(cap) ) return;
994 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
995 /* If we have user-installed signal handlers, then wait
996 * for signals to arrive rather then bombing out with a
999 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1000 debugTrace(DEBUG_sched,
1001 "still deadlocked, waiting for signals...");
1005 if (signals_pending()) {
1006 startSignalHandlers(cap);
1009 // either we have threads to run, or we were interrupted:
1010 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1014 #if !defined(THREADED_RTS)
1015 /* Probably a real deadlock. Send the current main thread the
1016 * Deadlock exception.
1019 switch (task->tso->why_blocked) {
1021 case BlockedOnBlackHole:
1022 case BlockedOnException:
1024 throwToSingleThreaded(cap, task->tso,
1025 (StgClosure *)NonTermination_closure);
1028 barf("deadlock: main thread blocked in a strange way");
1036 /* ----------------------------------------------------------------------------
1037 * Process an event (GRAN only)
1038 * ------------------------------------------------------------------------- */
1042 scheduleProcessEvent(rtsEvent *event)
1046 if (RtsFlags.GranFlags.Light)
1047 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1049 /* adjust time based on time-stamp */
1050 if (event->time > CurrentTime[CurrentProc] &&
1051 event->evttype != ContinueThread)
1052 CurrentTime[CurrentProc] = event->time;
1054 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1055 if (!RtsFlags.GranFlags.Light)
1058 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1060 /* main event dispatcher in GranSim */
1061 switch (event->evttype) {
1062 /* Should just be continuing execution */
1063 case ContinueThread:
1064 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1065 /* ToDo: check assertion
1066 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1067 run_queue_hd != END_TSO_QUEUE);
1069 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1070 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1071 procStatus[CurrentProc]==Fetching) {
1072 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1073 CurrentTSO->id, CurrentTSO, CurrentProc);
1076 /* Ignore ContinueThreads for completed threads */
1077 if (CurrentTSO->what_next == ThreadComplete) {
1078 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1079 CurrentTSO->id, CurrentTSO, CurrentProc);
1082 /* Ignore ContinueThreads for threads that are being migrated */
1083 if (PROCS(CurrentTSO)==Nowhere) {
1084 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1085 CurrentTSO->id, CurrentTSO, CurrentProc);
1088 /* The thread should be at the beginning of the run queue */
1089 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1090 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1091 CurrentTSO->id, CurrentTSO, CurrentProc);
1092 break; // run the thread anyway
1095 new_event(proc, proc, CurrentTime[proc],
1097 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1099 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1100 break; // now actually run the thread; DaH Qu'vam yImuHbej
1103 do_the_fetchnode(event);
1104 goto next_thread; /* handle next event in event queue */
1107 do_the_globalblock(event);
1108 goto next_thread; /* handle next event in event queue */
1111 do_the_fetchreply(event);
1112 goto next_thread; /* handle next event in event queue */
1114 case UnblockThread: /* Move from the blocked queue to the tail of */
1115 do_the_unblock(event);
1116 goto next_thread; /* handle next event in event queue */
1118 case ResumeThread: /* Move from the blocked queue to the tail of */
1119 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1120 event->tso->gran.blocktime +=
1121 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1122 do_the_startthread(event);
1123 goto next_thread; /* handle next event in event queue */
1126 do_the_startthread(event);
1127 goto next_thread; /* handle next event in event queue */
1130 do_the_movethread(event);
1131 goto next_thread; /* handle next event in event queue */
1134 do_the_movespark(event);
1135 goto next_thread; /* handle next event in event queue */
1138 do_the_findwork(event);
1139 goto next_thread; /* handle next event in event queue */
1142 barf("Illegal event type %u\n", event->evttype);
1145 /* This point was scheduler_loop in the old RTS */
1147 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1149 TimeOfLastEvent = CurrentTime[CurrentProc];
1150 TimeOfNextEvent = get_time_of_next_event();
1151 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1152 // CurrentTSO = ThreadQueueHd;
1154 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1157 if (RtsFlags.GranFlags.Light)
1158 GranSimLight_leave_system(event, &ActiveTSO);
1160 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1163 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1165 /* in a GranSim setup the TSO stays on the run queue */
1167 /* Take a thread from the run queue. */
1168 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1171 debugBelch("GRAN: About to run current thread, which is\n");
1174 context_switch = 0; // turned on via GranYield, checking events and time slice
1177 DumpGranEvent(GR_SCHEDULE, t));
1179 procStatus[CurrentProc] = Busy;
1183 /* ----------------------------------------------------------------------------
1184 * Send pending messages (PARALLEL_HASKELL only)
1185 * ------------------------------------------------------------------------- */
1187 #if defined(PARALLEL_HASKELL)
1189 scheduleSendPendingMessages(void)
1195 # if defined(PAR) // global Mem.Mgmt., omit for now
1196 if (PendingFetches != END_BF_QUEUE) {
1201 if (RtsFlags.ParFlags.BufferTime) {
1202 // if we use message buffering, we must send away all message
1203 // packets which have become too old...
1209 /* ----------------------------------------------------------------------------
1210 * Activate spark threads (PARALLEL_HASKELL only)
1211 * ------------------------------------------------------------------------- */
1213 #if defined(PARALLEL_HASKELL)
1215 scheduleActivateSpark(void)
1218 ASSERT(emptyRunQueue());
1219 /* We get here if the run queue is empty and want some work.
1220 We try to turn a spark into a thread, and add it to the run queue,
1221 from where it will be picked up in the next iteration of the scheduler
1225 /* :-[ no local threads => look out for local sparks */
1226 /* the spark pool for the current PE */
1227 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1228 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1229 pool->hd < pool->tl) {
1231 * ToDo: add GC code check that we really have enough heap afterwards!!
1233 * If we're here (no runnable threads) and we have pending
1234 * sparks, we must have a space problem. Get enough space
1235 * to turn one of those pending sparks into a
1239 spark = findSpark(rtsFalse); /* get a spark */
1240 if (spark != (rtsSpark) NULL) {
1241 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1242 IF_PAR_DEBUG(fish, // schedule,
1243 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1244 tso->id, tso, advisory_thread_count));
1246 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1247 IF_PAR_DEBUG(fish, // schedule,
1248 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1250 return rtsFalse; /* failed to generate a thread */
1251 } /* otherwise fall through & pick-up new tso */
1253 IF_PAR_DEBUG(fish, // schedule,
1254 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1255 spark_queue_len(pool)));
1256 return rtsFalse; /* failed to generate a thread */
1258 return rtsTrue; /* success in generating a thread */
1259 } else { /* no more threads permitted or pool empty */
1260 return rtsFalse; /* failed to generateThread */
1263 tso = NULL; // avoid compiler warning only
1264 return rtsFalse; /* dummy in non-PAR setup */
1267 #endif // PARALLEL_HASKELL
1269 /* ----------------------------------------------------------------------------
1270 * Get work from a remote node (PARALLEL_HASKELL only)
1271 * ------------------------------------------------------------------------- */
1273 #if defined(PARALLEL_HASKELL)
1275 scheduleGetRemoteWork(rtsBool *receivedFinish)
1277 ASSERT(emptyRunQueue());
1279 if (RtsFlags.ParFlags.BufferTime) {
1280 IF_PAR_DEBUG(verbose,
1281 debugBelch("...send all pending data,"));
1284 for (i=1; i<=nPEs; i++)
1285 sendImmediately(i); // send all messages away immediately
1289 //++EDEN++ idle() , i.e. send all buffers, wait for work
1290 // suppress fishing in EDEN... just look for incoming messages
1291 // (blocking receive)
1292 IF_PAR_DEBUG(verbose,
1293 debugBelch("...wait for incoming messages...\n"));
1294 *receivedFinish = processMessages(); // blocking receive...
1296 // and reenter scheduling loop after having received something
1297 // (return rtsFalse below)
1299 # else /* activate SPARKS machinery */
1300 /* We get here, if we have no work, tried to activate a local spark, but still
1301 have no work. We try to get a remote spark, by sending a FISH message.
1302 Thread migration should be added here, and triggered when a sequence of
1303 fishes returns without work. */
1304 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1306 /* =8-[ no local sparks => look for work on other PEs */
1308 * We really have absolutely no work. Send out a fish
1309 * (there may be some out there already), and wait for
1310 * something to arrive. We clearly can't run any threads
1311 * until a SCHEDULE or RESUME arrives, and so that's what
1312 * we're hoping to see. (Of course, we still have to
1313 * respond to other types of messages.)
1315 rtsTime now = msTime() /*CURRENT_TIME*/;
1316 IF_PAR_DEBUG(verbose,
1317 debugBelch("-- now=%ld\n", now));
1318 IF_PAR_DEBUG(fish, // verbose,
1319 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1320 (last_fish_arrived_at!=0 &&
1321 last_fish_arrived_at+delay > now)) {
1322 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1323 now, last_fish_arrived_at+delay,
1324 last_fish_arrived_at,
1328 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1329 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1330 if (last_fish_arrived_at==0 ||
1331 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1332 /* outstandingFishes is set in sendFish, processFish;
1333 avoid flooding system with fishes via delay */
1334 next_fish_to_send_at = 0;
1336 /* ToDo: this should be done in the main scheduling loop to avoid the
1337 busy wait here; not so bad if fish delay is very small */
1338 int iq = 0; // DEBUGGING -- HWL
1339 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1340 /* send a fish when ready, but process messages that arrive in the meantime */
1342 if (PacketsWaiting()) {
1344 *receivedFinish = processMessages();
1347 } while (!*receivedFinish || now<next_fish_to_send_at);
1348 // JB: This means the fish could become obsolete, if we receive
1349 // work. Better check for work again?
1350 // last line: while (!receivedFinish || !haveWork || now<...)
1351 // next line: if (receivedFinish || haveWork )
1353 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1354 return rtsFalse; // NB: this will leave scheduler loop
1355 // immediately after return!
1357 IF_PAR_DEBUG(fish, // verbose,
1358 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1362 // JB: IMHO, this should all be hidden inside sendFish(...)
1364 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1367 // Global statistics: count no. of fishes
1368 if (RtsFlags.ParFlags.ParStats.Global &&
1369 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1370 globalParStats.tot_fish_mess++;
1374 /* delayed fishes must have been sent by now! */
1375 next_fish_to_send_at = 0;
1378 *receivedFinish = processMessages();
1379 # endif /* SPARKS */
1382 /* NB: this function always returns rtsFalse, meaning the scheduler
1383 loop continues with the next iteration;
1385 return code means success in finding work; we enter this function
1386 if there is no local work, thus have to send a fish which takes
1387 time until it arrives with work; in the meantime we should process
1388 messages in the main loop;
1391 #endif // PARALLEL_HASKELL
1393 /* ----------------------------------------------------------------------------
1394 * PAR/GRAN: Report stats & debugging info(?)
1395 * ------------------------------------------------------------------------- */
1397 #if defined(PAR) || defined(GRAN)
1399 scheduleGranParReport(void)
1401 ASSERT(run_queue_hd != END_TSO_QUEUE);
1403 /* Take a thread from the run queue, if we have work */
1404 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1406 /* If this TSO has got its outport closed in the meantime,
1407 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1408 * It has to be marked as TH_DEAD for this purpose.
1409 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1411 JB: TODO: investigate wether state change field could be nuked
1412 entirely and replaced by the normal tso state (whatnext
1413 field). All we want to do is to kill tsos from outside.
1416 /* ToDo: write something to the log-file
1417 if (RTSflags.ParFlags.granSimStats && !sameThread)
1418 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1422 /* the spark pool for the current PE */
1423 pool = &(cap.r.rSparks); // cap = (old) MainCap
1426 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1427 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1430 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1431 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1433 if (RtsFlags.ParFlags.ParStats.Full &&
1434 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1435 (emitSchedule || // forced emit
1436 (t && LastTSO && t->id != LastTSO->id))) {
1438 we are running a different TSO, so write a schedule event to log file
1439 NB: If we use fair scheduling we also have to write a deschedule
1440 event for LastTSO; with unfair scheduling we know that the
1441 previous tso has blocked whenever we switch to another tso, so
1442 we don't need it in GUM for now
1444 IF_PAR_DEBUG(fish, // schedule,
1445 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1447 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1448 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1449 emitSchedule = rtsFalse;
1454 /* ----------------------------------------------------------------------------
1455 * After running a thread...
1456 * ------------------------------------------------------------------------- */
1459 schedulePostRunThread(void)
1462 /* HACK 675: if the last thread didn't yield, make sure to print a
1463 SCHEDULE event to the log file when StgRunning the next thread, even
1464 if it is the same one as before */
1466 TimeOfLastYield = CURRENT_TIME;
1469 /* some statistics gathering in the parallel case */
1471 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1475 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1476 globalGranStats.tot_heapover++;
1478 globalParStats.tot_heapover++;
1485 DumpGranEvent(GR_DESCHEDULE, t));
1486 globalGranStats.tot_stackover++;
1489 // DumpGranEvent(GR_DESCHEDULE, t);
1490 globalParStats.tot_stackover++;
1494 case ThreadYielding:
1497 DumpGranEvent(GR_DESCHEDULE, t));
1498 globalGranStats.tot_yields++;
1501 // DumpGranEvent(GR_DESCHEDULE, t);
1502 globalParStats.tot_yields++;
1508 debugTrace(DEBUG_sched,
1509 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1510 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1511 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1512 if (t->block_info.closure!=(StgClosure*)NULL)
1513 print_bq(t->block_info.closure);
1516 // ??? needed; should emit block before
1518 DumpGranEvent(GR_DESCHEDULE, t));
1519 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1522 ASSERT(procStatus[CurrentProc]==Busy ||
1523 ((procStatus[CurrentProc]==Fetching) &&
1524 (t->block_info.closure!=(StgClosure*)NULL)));
1525 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1526 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1527 procStatus[CurrentProc]==Fetching))
1528 procStatus[CurrentProc] = Idle;
1531 //++PAR++ blockThread() writes the event (change?)
1535 case ThreadFinished:
1539 barf("parGlobalStats: unknown return code");
1545 /* -----------------------------------------------------------------------------
1546 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1547 * -------------------------------------------------------------------------- */
1550 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1552 // did the task ask for a large block?
1553 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1554 // if so, get one and push it on the front of the nursery.
1558 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1560 debugTrace(DEBUG_sched,
1561 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1562 (long)t->id, whatNext_strs[t->what_next], blocks);
1564 // don't do this if the nursery is (nearly) full, we'll GC first.
1565 if (cap->r.rCurrentNursery->link != NULL ||
1566 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1567 // if the nursery has only one block.
1570 bd = allocGroup( blocks );
1572 cap->r.rNursery->n_blocks += blocks;
1574 // link the new group into the list
1575 bd->link = cap->r.rCurrentNursery;
1576 bd->u.back = cap->r.rCurrentNursery->u.back;
1577 if (cap->r.rCurrentNursery->u.back != NULL) {
1578 cap->r.rCurrentNursery->u.back->link = bd;
1580 #if !defined(THREADED_RTS)
1581 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1582 g0s0 == cap->r.rNursery);
1584 cap->r.rNursery->blocks = bd;
1586 cap->r.rCurrentNursery->u.back = bd;
1588 // initialise it as a nursery block. We initialise the
1589 // step, gen_no, and flags field of *every* sub-block in
1590 // this large block, because this is easier than making
1591 // sure that we always find the block head of a large
1592 // block whenever we call Bdescr() (eg. evacuate() and
1593 // isAlive() in the GC would both have to do this, at
1597 for (x = bd; x < bd + blocks; x++) {
1598 x->step = cap->r.rNursery;
1604 // This assert can be a killer if the app is doing lots
1605 // of large block allocations.
1606 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1608 // now update the nursery to point to the new block
1609 cap->r.rCurrentNursery = bd;
1611 // we might be unlucky and have another thread get on the
1612 // run queue before us and steal the large block, but in that
1613 // case the thread will just end up requesting another large
1615 pushOnRunQueue(cap,t);
1616 return rtsFalse; /* not actually GC'ing */
1620 debugTrace(DEBUG_sched,
1621 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1622 (long)t->id, whatNext_strs[t->what_next]);
1625 ASSERT(!is_on_queue(t,CurrentProc));
1626 #elif defined(PARALLEL_HASKELL)
1627 /* Currently we emit a DESCHEDULE event before GC in GUM.
1628 ToDo: either add separate event to distinguish SYSTEM time from rest
1629 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1630 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1631 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1632 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1633 emitSchedule = rtsTrue;
1637 if (context_switch) {
1638 // Sometimes we miss a context switch, e.g. when calling
1639 // primitives in a tight loop, MAYBE_GC() doesn't check the
1640 // context switch flag, and we end up waiting for a GC.
1641 // See #1984, and concurrent/should_run/1984
1643 addToRunQueue(cap,t);
1645 pushOnRunQueue(cap,t);
1648 /* actual GC is done at the end of the while loop in schedule() */
1651 /* -----------------------------------------------------------------------------
1652 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1653 * -------------------------------------------------------------------------- */
1656 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1658 debugTrace (DEBUG_sched,
1659 "--<< thread %ld (%s) stopped, StackOverflow",
1660 (long)t->id, whatNext_strs[t->what_next]);
1662 /* just adjust the stack for this thread, then pop it back
1666 /* enlarge the stack */
1667 StgTSO *new_t = threadStackOverflow(cap, t);
1669 /* The TSO attached to this Task may have moved, so update the
1672 if (task->tso == t) {
1675 pushOnRunQueue(cap,new_t);
1679 /* -----------------------------------------------------------------------------
1680 * Handle a thread that returned to the scheduler with ThreadYielding
1681 * -------------------------------------------------------------------------- */
1684 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1686 // Reset the context switch flag. We don't do this just before
1687 // running the thread, because that would mean we would lose ticks
1688 // during GC, which can lead to unfair scheduling (a thread hogs
1689 // the CPU because the tick always arrives during GC). This way
1690 // penalises threads that do a lot of allocation, but that seems
1691 // better than the alternative.
1694 /* put the thread back on the run queue. Then, if we're ready to
1695 * GC, check whether this is the last task to stop. If so, wake
1696 * up the GC thread. getThread will block during a GC until the
1700 if (t->what_next != prev_what_next) {
1701 debugTrace(DEBUG_sched,
1702 "--<< thread %ld (%s) stopped to switch evaluators",
1703 (long)t->id, whatNext_strs[t->what_next]);
1705 debugTrace(DEBUG_sched,
1706 "--<< thread %ld (%s) stopped, yielding",
1707 (long)t->id, whatNext_strs[t->what_next]);
1712 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1714 ASSERT(t->link == END_TSO_QUEUE);
1716 // Shortcut if we're just switching evaluators: don't bother
1717 // doing stack squeezing (which can be expensive), just run the
1719 if (t->what_next != prev_what_next) {
1724 ASSERT(!is_on_queue(t,CurrentProc));
1727 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1728 checkThreadQsSanity(rtsTrue));
1732 addToRunQueue(cap,t);
1735 /* add a ContinueThread event to actually process the thread */
1736 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1738 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1740 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1747 /* -----------------------------------------------------------------------------
1748 * Handle a thread that returned to the scheduler with ThreadBlocked
1749 * -------------------------------------------------------------------------- */
1752 scheduleHandleThreadBlocked( StgTSO *t
1753 #if !defined(GRAN) && !defined(DEBUG)
1760 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1761 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1762 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1764 // ??? needed; should emit block before
1766 DumpGranEvent(GR_DESCHEDULE, t));
1767 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1770 ASSERT(procStatus[CurrentProc]==Busy ||
1771 ((procStatus[CurrentProc]==Fetching) &&
1772 (t->block_info.closure!=(StgClosure*)NULL)));
1773 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1774 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1775 procStatus[CurrentProc]==Fetching))
1776 procStatus[CurrentProc] = Idle;
1780 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1781 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1784 if (t->block_info.closure!=(StgClosure*)NULL)
1785 print_bq(t->block_info.closure));
1787 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1790 /* whatever we schedule next, we must log that schedule */
1791 emitSchedule = rtsTrue;
1795 // We don't need to do anything. The thread is blocked, and it
1796 // has tidied up its stack and placed itself on whatever queue
1797 // it needs to be on.
1799 // ASSERT(t->why_blocked != NotBlocked);
1800 // Not true: for example,
1801 // - in THREADED_RTS, the thread may already have been woken
1802 // up by another Capability. This actually happens: try
1803 // conc023 +RTS -N2.
1804 // - the thread may have woken itself up already, because
1805 // threadPaused() might have raised a blocked throwTo
1806 // exception, see maybePerformBlockedException().
1809 if (traceClass(DEBUG_sched)) {
1810 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1811 (unsigned long)t->id, whatNext_strs[t->what_next]);
1812 printThreadBlockage(t);
1817 /* Only for dumping event to log file
1818 ToDo: do I need this in GranSim, too?
1824 /* -----------------------------------------------------------------------------
1825 * Handle a thread that returned to the scheduler with ThreadFinished
1826 * -------------------------------------------------------------------------- */
1829 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1831 /* Need to check whether this was a main thread, and if so,
1832 * return with the return value.
1834 * We also end up here if the thread kills itself with an
1835 * uncaught exception, see Exception.cmm.
1837 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1838 (unsigned long)t->id, whatNext_strs[t->what_next]);
1841 endThread(t, CurrentProc); // clean-up the thread
1842 #elif defined(PARALLEL_HASKELL)
1843 /* For now all are advisory -- HWL */
1844 //if(t->priority==AdvisoryPriority) ??
1845 advisory_thread_count--; // JB: Caution with this counter, buggy!
1848 if(t->dist.priority==RevalPriority)
1852 # if defined(EDENOLD)
1853 // the thread could still have an outport... (BUG)
1854 if (t->eden.outport != -1) {
1855 // delete the outport for the tso which has finished...
1856 IF_PAR_DEBUG(eden_ports,
1857 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1858 t->eden.outport, t->id));
1861 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1862 if (t->eden.epid != -1) {
1863 IF_PAR_DEBUG(eden_ports,
1864 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1865 t->id, t->eden.epid));
1866 removeTSOfromProcess(t);
1871 if (RtsFlags.ParFlags.ParStats.Full &&
1872 !RtsFlags.ParFlags.ParStats.Suppressed)
1873 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1875 // t->par only contains statistics: left out for now...
1877 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1878 t->id,t,t->par.sparkname));
1880 #endif // PARALLEL_HASKELL
1883 // Check whether the thread that just completed was a bound
1884 // thread, and if so return with the result.
1886 // There is an assumption here that all thread completion goes
1887 // through this point; we need to make sure that if a thread
1888 // ends up in the ThreadKilled state, that it stays on the run
1889 // queue so it can be dealt with here.
1894 if (t->bound != task) {
1895 #if !defined(THREADED_RTS)
1896 // Must be a bound thread that is not the topmost one. Leave
1897 // it on the run queue until the stack has unwound to the
1898 // point where we can deal with this. Leaving it on the run
1899 // queue also ensures that the garbage collector knows about
1900 // this thread and its return value (it gets dropped from the
1901 // all_threads list so there's no other way to find it).
1902 appendToRunQueue(cap,t);
1905 // this cannot happen in the threaded RTS, because a
1906 // bound thread can only be run by the appropriate Task.
1907 barf("finished bound thread that isn't mine");
1911 ASSERT(task->tso == t);
1913 if (t->what_next == ThreadComplete) {
1915 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1916 *(task->ret) = (StgClosure *)task->tso->sp[1];
1918 task->stat = Success;
1921 *(task->ret) = NULL;
1923 if (sched_state >= SCHED_INTERRUPTING) {
1924 task->stat = Interrupted;
1926 task->stat = Killed;
1930 removeThreadLabel((StgWord)task->tso->id);
1932 return rtsTrue; // tells schedule() to return
1938 /* -----------------------------------------------------------------------------
1939 * Perform a heap census
1940 * -------------------------------------------------------------------------- */
1943 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1945 // When we have +RTS -i0 and we're heap profiling, do a census at
1946 // every GC. This lets us get repeatable runs for debugging.
1947 if (performHeapProfile ||
1948 (RtsFlags.ProfFlags.profileInterval==0 &&
1949 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1956 /* -----------------------------------------------------------------------------
1957 * Perform a garbage collection if necessary
1958 * -------------------------------------------------------------------------- */
1961 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1964 rtsBool heap_census;
1966 static volatile StgWord waiting_for_gc;
1967 rtsBool was_waiting;
1972 // In order to GC, there must be no threads running Haskell code.
1973 // Therefore, the GC thread needs to hold *all* the capabilities,
1974 // and release them after the GC has completed.
1976 // This seems to be the simplest way: previous attempts involved
1977 // making all the threads with capabilities give up their
1978 // capabilities and sleep except for the *last* one, which
1979 // actually did the GC. But it's quite hard to arrange for all
1980 // the other tasks to sleep and stay asleep.
1983 was_waiting = cas(&waiting_for_gc, 0, 1);
1986 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1987 if (cap) yieldCapability(&cap,task);
1988 } while (waiting_for_gc);
1989 return cap; // NOTE: task->cap might have changed here
1992 for (i=0; i < n_capabilities; i++) {
1993 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1994 if (cap != &capabilities[i]) {
1995 Capability *pcap = &capabilities[i];
1996 // we better hope this task doesn't get migrated to
1997 // another Capability while we're waiting for this one.
1998 // It won't, because load balancing happens while we have
1999 // all the Capabilities, but even so it's a slightly
2000 // unsavoury invariant.
2003 waitForReturnCapability(&pcap, task);
2004 if (pcap != &capabilities[i]) {
2005 barf("scheduleDoGC: got the wrong capability");
2010 waiting_for_gc = rtsFalse;
2013 /* Kick any transactions which are invalid back to their
2014 * atomically frames. When next scheduled they will try to
2015 * commit, this commit will fail and they will retry.
2020 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2021 if (t->what_next == ThreadRelocated) {
2024 next = t->global_link;
2026 // This is a good place to check for blocked
2027 // exceptions. It might be the case that a thread is
2028 // blocked on delivering an exception to a thread that
2029 // is also blocked - we try to ensure that this
2030 // doesn't happen in throwTo(), but it's too hard (or
2031 // impossible) to close all the race holes, so we
2032 // accept that some might get through and deal with
2033 // them here. A GC will always happen at some point,
2034 // even if the system is otherwise deadlocked.
2035 maybePerformBlockedException (&capabilities[0], t);
2037 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2038 if (!stmValidateNestOfTransactions (t -> trec)) {
2039 debugTrace(DEBUG_sched | DEBUG_stm,
2040 "trec %p found wasting its time", t);
2042 // strip the stack back to the
2043 // ATOMICALLY_FRAME, aborting the (nested)
2044 // transaction, and saving the stack of any
2045 // partially-evaluated thunks on the heap.
2046 throwToSingleThreaded_(&capabilities[0], t,
2047 NULL, rtsTrue, NULL);
2050 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2058 // so this happens periodically:
2059 if (cap) scheduleCheckBlackHoles(cap);
2061 IF_DEBUG(scheduler, printAllThreads());
2064 * We now have all the capabilities; if we're in an interrupting
2065 * state, then we should take the opportunity to delete all the
2066 * threads in the system.
2068 if (sched_state >= SCHED_INTERRUPTING) {
2069 deleteAllThreads(&capabilities[0]);
2070 sched_state = SCHED_SHUTTING_DOWN;
2073 heap_census = scheduleNeedHeapProfile(rtsTrue);
2075 /* everybody back, start the GC.
2076 * Could do it in this thread, or signal a condition var
2077 * to do it in another thread. Either way, we need to
2078 * broadcast on gc_pending_cond afterward.
2080 #if defined(THREADED_RTS)
2081 debugTrace(DEBUG_sched, "doing GC");
2083 GarbageCollect(force_major || heap_census);
2086 debugTrace(DEBUG_sched, "performing heap census");
2088 performHeapProfile = rtsFalse;
2091 #if defined(THREADED_RTS)
2092 // release our stash of capabilities.
2093 for (i = 0; i < n_capabilities; i++) {
2094 if (cap != &capabilities[i]) {
2095 task->cap = &capabilities[i];
2096 releaseCapability(&capabilities[i]);
2107 /* add a ContinueThread event to continue execution of current thread */
2108 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2110 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2112 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2120 /* ---------------------------------------------------------------------------
2121 * Singleton fork(). Do not copy any running threads.
2122 * ------------------------------------------------------------------------- */
2125 forkProcess(HsStablePtr *entry
2126 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2131 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2137 #if defined(THREADED_RTS)
2138 if (RtsFlags.ParFlags.nNodes > 1) {
2139 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2140 stg_exit(EXIT_FAILURE);
2144 debugTrace(DEBUG_sched, "forking!");
2146 // ToDo: for SMP, we should probably acquire *all* the capabilities
2149 // no funny business: hold locks while we fork, otherwise if some
2150 // other thread is holding a lock when the fork happens, the data
2151 // structure protected by the lock will forever be in an
2152 // inconsistent state in the child. See also #1391.
2153 ACQUIRE_LOCK(&sched_mutex);
2154 ACQUIRE_LOCK(&cap->lock);
2155 ACQUIRE_LOCK(&cap->running_task->lock);
2159 if (pid) { // parent
2161 RELEASE_LOCK(&sched_mutex);
2162 RELEASE_LOCK(&cap->lock);
2163 RELEASE_LOCK(&cap->running_task->lock);
2165 // just return the pid
2171 #if defined(THREADED_RTS)
2172 initMutex(&sched_mutex);
2173 initMutex(&cap->lock);
2174 initMutex(&cap->running_task->lock);
2177 // Now, all OS threads except the thread that forked are
2178 // stopped. We need to stop all Haskell threads, including
2179 // those involved in foreign calls. Also we need to delete
2180 // all Tasks, because they correspond to OS threads that are
2183 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2184 if (t->what_next == ThreadRelocated) {
2187 next = t->global_link;
2188 // don't allow threads to catch the ThreadKilled
2189 // exception, but we do want to raiseAsync() because these
2190 // threads may be evaluating thunks that we need later.
2191 deleteThread_(cap,t);
2195 // Empty the run queue. It seems tempting to let all the
2196 // killed threads stay on the run queue as zombies to be
2197 // cleaned up later, but some of them correspond to bound
2198 // threads for which the corresponding Task does not exist.
2199 cap->run_queue_hd = END_TSO_QUEUE;
2200 cap->run_queue_tl = END_TSO_QUEUE;
2202 // Any suspended C-calling Tasks are no more, their OS threads
2204 cap->suspended_ccalling_tasks = NULL;
2206 // Empty the all_threads list. Otherwise, the garbage
2207 // collector may attempt to resurrect some of these threads.
2208 all_threads = END_TSO_QUEUE;
2210 // Wipe the task list, except the current Task.
2211 ACQUIRE_LOCK(&sched_mutex);
2212 for (task = all_tasks; task != NULL; task=task->all_link) {
2213 if (task != cap->running_task) {
2214 #if defined(THREADED_RTS)
2215 initMutex(&task->lock); // see #1391
2220 RELEASE_LOCK(&sched_mutex);
2222 #if defined(THREADED_RTS)
2223 // Wipe our spare workers list, they no longer exist. New
2224 // workers will be created if necessary.
2225 cap->spare_workers = NULL;
2226 cap->returning_tasks_hd = NULL;
2227 cap->returning_tasks_tl = NULL;
2230 // On Unix, all timers are reset in the child, so we need to start
2235 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2236 rts_checkSchedStatus("forkProcess",cap);
2239 hs_exit(); // clean up and exit
2240 stg_exit(EXIT_SUCCESS);
2242 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2243 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2248 /* ---------------------------------------------------------------------------
2249 * Delete all the threads in the system
2250 * ------------------------------------------------------------------------- */
2253 deleteAllThreads ( Capability *cap )
2255 // NOTE: only safe to call if we own all capabilities.
2258 debugTrace(DEBUG_sched,"deleting all threads");
2259 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2260 if (t->what_next == ThreadRelocated) {
2263 next = t->global_link;
2264 deleteThread(cap,t);
2268 // The run queue now contains a bunch of ThreadKilled threads. We
2269 // must not throw these away: the main thread(s) will be in there
2270 // somewhere, and the main scheduler loop has to deal with it.
2271 // Also, the run queue is the only thing keeping these threads from
2272 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2274 #if !defined(THREADED_RTS)
2275 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2276 ASSERT(sleeping_queue == END_TSO_QUEUE);
2280 /* -----------------------------------------------------------------------------
2281 Managing the suspended_ccalling_tasks list.
2282 Locks required: sched_mutex
2283 -------------------------------------------------------------------------- */
2286 suspendTask (Capability *cap, Task *task)
2288 ASSERT(task->next == NULL && task->prev == NULL);
2289 task->next = cap->suspended_ccalling_tasks;
2291 if (cap->suspended_ccalling_tasks) {
2292 cap->suspended_ccalling_tasks->prev = task;
2294 cap->suspended_ccalling_tasks = task;
2298 recoverSuspendedTask (Capability *cap, Task *task)
2301 task->prev->next = task->next;
2303 ASSERT(cap->suspended_ccalling_tasks == task);
2304 cap->suspended_ccalling_tasks = task->next;
2307 task->next->prev = task->prev;
2309 task->next = task->prev = NULL;
2312 /* ---------------------------------------------------------------------------
2313 * Suspending & resuming Haskell threads.
2315 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2316 * its capability before calling the C function. This allows another
2317 * task to pick up the capability and carry on running Haskell
2318 * threads. It also means that if the C call blocks, it won't lock
2321 * The Haskell thread making the C call is put to sleep for the
2322 * duration of the call, on the susepended_ccalling_threads queue. We
2323 * give out a token to the task, which it can use to resume the thread
2324 * on return from the C function.
2325 * ------------------------------------------------------------------------- */
2328 suspendThread (StgRegTable *reg)
2335 StgWord32 saved_winerror;
2338 saved_errno = errno;
2340 saved_winerror = GetLastError();
2343 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2345 cap = regTableToCapability(reg);
2347 task = cap->running_task;
2348 tso = cap->r.rCurrentTSO;
2350 debugTrace(DEBUG_sched,
2351 "thread %lu did a safe foreign call",
2352 (unsigned long)cap->r.rCurrentTSO->id);
2354 // XXX this might not be necessary --SDM
2355 tso->what_next = ThreadRunGHC;
2357 threadPaused(cap,tso);
2359 if ((tso->flags & TSO_BLOCKEX) == 0) {
2360 tso->why_blocked = BlockedOnCCall;
2361 tso->flags |= TSO_BLOCKEX;
2362 tso->flags &= ~TSO_INTERRUPTIBLE;
2364 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2367 // Hand back capability
2368 task->suspended_tso = tso;
2370 ACQUIRE_LOCK(&cap->lock);
2372 suspendTask(cap,task);
2373 cap->in_haskell = rtsFalse;
2374 releaseCapability_(cap);
2376 RELEASE_LOCK(&cap->lock);
2378 #if defined(THREADED_RTS)
2379 /* Preparing to leave the RTS, so ensure there's a native thread/task
2380 waiting to take over.
2382 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2385 errno = saved_errno;
2387 SetLastError(saved_winerror);
2393 resumeThread (void *task_)
2400 StgWord32 saved_winerror;
2403 saved_errno = errno;
2405 saved_winerror = GetLastError();
2409 // Wait for permission to re-enter the RTS with the result.
2410 waitForReturnCapability(&cap,task);
2411 // we might be on a different capability now... but if so, our
2412 // entry on the suspended_ccalling_tasks list will also have been
2415 // Remove the thread from the suspended list
2416 recoverSuspendedTask(cap,task);
2418 tso = task->suspended_tso;
2419 task->suspended_tso = NULL;
2420 tso->link = END_TSO_QUEUE;
2421 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2423 if (tso->why_blocked == BlockedOnCCall) {
2424 awakenBlockedExceptionQueue(cap,tso);
2425 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2428 /* Reset blocking status */
2429 tso->why_blocked = NotBlocked;
2431 cap->r.rCurrentTSO = tso;
2432 cap->in_haskell = rtsTrue;
2433 errno = saved_errno;
2435 SetLastError(saved_winerror);
2438 /* We might have GC'd, mark the TSO dirty again */
2441 IF_DEBUG(sanity, checkTSO(tso));
2446 /* ---------------------------------------------------------------------------
2449 * scheduleThread puts a thread on the end of the runnable queue.
2450 * This will usually be done immediately after a thread is created.
2451 * The caller of scheduleThread must create the thread using e.g.
2452 * createThread and push an appropriate closure
2453 * on this thread's stack before the scheduler is invoked.
2454 * ------------------------------------------------------------------------ */
2457 scheduleThread(Capability *cap, StgTSO *tso)
2459 // The thread goes at the *end* of the run-queue, to avoid possible
2460 // starvation of any threads already on the queue.
2461 appendToRunQueue(cap,tso);
2465 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2467 #if defined(THREADED_RTS)
2468 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2469 // move this thread from now on.
2470 cpu %= RtsFlags.ParFlags.nNodes;
2471 if (cpu == cap->no) {
2472 appendToRunQueue(cap,tso);
2474 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2477 appendToRunQueue(cap,tso);
2482 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2486 // We already created/initialised the Task
2487 task = cap->running_task;
2489 // This TSO is now a bound thread; make the Task and TSO
2490 // point to each other.
2496 task->stat = NoStatus;
2498 appendToRunQueue(cap,tso);
2500 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2503 /* GranSim specific init */
2504 CurrentTSO = m->tso; // the TSO to run
2505 procStatus[MainProc] = Busy; // status of main PE
2506 CurrentProc = MainProc; // PE to run it on
2509 cap = schedule(cap,task);
2511 ASSERT(task->stat != NoStatus);
2512 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2514 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2518 /* ----------------------------------------------------------------------------
2520 * ------------------------------------------------------------------------- */
2522 #if defined(THREADED_RTS)
2524 workerStart(Task *task)
2528 // See startWorkerTask().
2529 ACQUIRE_LOCK(&task->lock);
2531 RELEASE_LOCK(&task->lock);
2533 // set the thread-local pointer to the Task:
2536 // schedule() runs without a lock.
2537 cap = schedule(cap,task);
2539 // On exit from schedule(), we have a Capability.
2540 releaseCapability(cap);
2541 workerTaskStop(task);
2545 /* ---------------------------------------------------------------------------
2548 * Initialise the scheduler. This resets all the queues - if the
2549 * queues contained any threads, they'll be garbage collected at the
2552 * ------------------------------------------------------------------------ */
2559 for (i=0; i<=MAX_PROC; i++) {
2560 run_queue_hds[i] = END_TSO_QUEUE;
2561 run_queue_tls[i] = END_TSO_QUEUE;
2562 blocked_queue_hds[i] = END_TSO_QUEUE;
2563 blocked_queue_tls[i] = END_TSO_QUEUE;
2564 ccalling_threadss[i] = END_TSO_QUEUE;
2565 blackhole_queue[i] = END_TSO_QUEUE;
2566 sleeping_queue = END_TSO_QUEUE;
2568 #elif !defined(THREADED_RTS)
2569 blocked_queue_hd = END_TSO_QUEUE;
2570 blocked_queue_tl = END_TSO_QUEUE;
2571 sleeping_queue = END_TSO_QUEUE;
2574 blackhole_queue = END_TSO_QUEUE;
2575 all_threads = END_TSO_QUEUE;
2578 sched_state = SCHED_RUNNING;
2579 recent_activity = ACTIVITY_YES;
2581 #if defined(THREADED_RTS)
2582 /* Initialise the mutex and condition variables used by
2584 initMutex(&sched_mutex);
2587 ACQUIRE_LOCK(&sched_mutex);
2589 /* A capability holds the state a native thread needs in
2590 * order to execute STG code. At least one capability is
2591 * floating around (only THREADED_RTS builds have more than one).
2597 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2601 #if defined(THREADED_RTS)
2603 * Eagerly start one worker to run each Capability, except for
2604 * Capability 0. The idea is that we're probably going to start a
2605 * bound thread on Capability 0 pretty soon, so we don't want a
2606 * worker task hogging it.
2611 for (i = 1; i < n_capabilities; i++) {
2612 cap = &capabilities[i];
2613 ACQUIRE_LOCK(&cap->lock);
2614 startWorkerTask(cap, workerStart);
2615 RELEASE_LOCK(&cap->lock);
2620 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2622 RELEASE_LOCK(&sched_mutex);
2627 rtsBool wait_foreign
2628 #if !defined(THREADED_RTS)
2629 __attribute__((unused))
2632 /* see Capability.c, shutdownCapability() */
2636 #if defined(THREADED_RTS)
2637 ACQUIRE_LOCK(&sched_mutex);
2638 task = newBoundTask();
2639 RELEASE_LOCK(&sched_mutex);
2642 // If we haven't killed all the threads yet, do it now.
2643 if (sched_state < SCHED_SHUTTING_DOWN) {
2644 sched_state = SCHED_INTERRUPTING;
2645 scheduleDoGC(NULL,task,rtsFalse);
2647 sched_state = SCHED_SHUTTING_DOWN;
2649 #if defined(THREADED_RTS)
2653 for (i = 0; i < n_capabilities; i++) {
2654 shutdownCapability(&capabilities[i], task, wait_foreign);
2656 boundTaskExiting(task);
2660 freeCapability(&MainCapability);
2665 freeScheduler( void )
2668 if (n_capabilities != 1) {
2669 stgFree(capabilities);
2671 #if defined(THREADED_RTS)
2672 closeMutex(&sched_mutex);
2676 /* ---------------------------------------------------------------------------
2677 Where are the roots that we know about?
2679 - all the threads on the runnable queue
2680 - all the threads on the blocked queue
2681 - all the threads on the sleeping queue
2682 - all the thread currently executing a _ccall_GC
2683 - all the "main threads"
2685 ------------------------------------------------------------------------ */
2687 /* This has to be protected either by the scheduler monitor, or by the
2688 garbage collection monitor (probably the latter).
2693 GetRoots( evac_fn evac )
2700 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2701 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2702 evac((StgClosure **)&run_queue_hds[i]);
2703 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2704 evac((StgClosure **)&run_queue_tls[i]);
2706 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2707 evac((StgClosure **)&blocked_queue_hds[i]);
2708 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2709 evac((StgClosure **)&blocked_queue_tls[i]);
2710 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2711 evac((StgClosure **)&ccalling_threads[i]);
2718 for (i = 0; i < n_capabilities; i++) {
2719 cap = &capabilities[i];
2720 evac((StgClosure **)(void *)&cap->run_queue_hd);
2721 evac((StgClosure **)(void *)&cap->run_queue_tl);
2722 #if defined(THREADED_RTS)
2723 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2724 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2726 for (task = cap->suspended_ccalling_tasks; task != NULL;
2728 debugTrace(DEBUG_sched,
2729 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2730 evac((StgClosure **)(void *)&task->suspended_tso);
2736 #if !defined(THREADED_RTS)
2737 evac((StgClosure **)(void *)&blocked_queue_hd);
2738 evac((StgClosure **)(void *)&blocked_queue_tl);
2739 evac((StgClosure **)(void *)&sleeping_queue);
2743 // evac((StgClosure **)&blackhole_queue);
2745 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2746 markSparkQueue(evac);
2749 #if defined(RTS_USER_SIGNALS)
2750 // mark the signal handlers (signals should be already blocked)
2751 if (RtsFlags.MiscFlags.install_signal_handlers) {
2752 markSignalHandlers(evac);
2757 /* -----------------------------------------------------------------------------
2760 This is the interface to the garbage collector from Haskell land.
2761 We provide this so that external C code can allocate and garbage
2762 collect when called from Haskell via _ccall_GC.
2763 -------------------------------------------------------------------------- */
2766 performGC_(rtsBool force_major)
2769 // We must grab a new Task here, because the existing Task may be
2770 // associated with a particular Capability, and chained onto the
2771 // suspended_ccalling_tasks queue.
2772 ACQUIRE_LOCK(&sched_mutex);
2773 task = newBoundTask();
2774 RELEASE_LOCK(&sched_mutex);
2775 scheduleDoGC(NULL,task,force_major);
2776 boundTaskExiting(task);
2782 performGC_(rtsFalse);
2786 performMajorGC(void)
2788 performGC_(rtsTrue);
2791 /* -----------------------------------------------------------------------------
2794 If the thread has reached its maximum stack size, then raise the
2795 StackOverflow exception in the offending thread. Otherwise
2796 relocate the TSO into a larger chunk of memory and adjust its stack
2798 -------------------------------------------------------------------------- */
2801 threadStackOverflow(Capability *cap, StgTSO *tso)
2803 nat new_stack_size, stack_words;
2808 IF_DEBUG(sanity,checkTSO(tso));
2810 // don't allow throwTo() to modify the blocked_exceptions queue
2811 // while we are moving the TSO:
2812 lockClosure((StgClosure *)tso);
2814 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2815 // NB. never raise a StackOverflow exception if the thread is
2816 // inside Control.Exceptino.block. It is impractical to protect
2817 // against stack overflow exceptions, since virtually anything
2818 // can raise one (even 'catch'), so this is the only sensible
2819 // thing to do here. See bug #767.
2821 debugTrace(DEBUG_gc,
2822 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2823 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2825 /* If we're debugging, just print out the top of the stack */
2826 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2829 // Send this thread the StackOverflow exception
2831 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2835 /* Try to double the current stack size. If that takes us over the
2836 * maximum stack size for this thread, then use the maximum instead.
2837 * Finally round up so the TSO ends up as a whole number of blocks.
2839 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2840 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2841 TSO_STRUCT_SIZE)/sizeof(W_);
2842 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2843 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2845 debugTrace(DEBUG_sched,
2846 "increasing stack size from %ld words to %d.",
2847 (long)tso->stack_size, new_stack_size);
2849 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2850 TICK_ALLOC_TSO(new_stack_size,0);
2852 /* copy the TSO block and the old stack into the new area */
2853 memcpy(dest,tso,TSO_STRUCT_SIZE);
2854 stack_words = tso->stack + tso->stack_size - tso->sp;
2855 new_sp = (P_)dest + new_tso_size - stack_words;
2856 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2858 /* relocate the stack pointers... */
2860 dest->stack_size = new_stack_size;
2862 /* Mark the old TSO as relocated. We have to check for relocated
2863 * TSOs in the garbage collector and any primops that deal with TSOs.
2865 * It's important to set the sp value to just beyond the end
2866 * of the stack, so we don't attempt to scavenge any part of the
2869 tso->what_next = ThreadRelocated;
2871 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2872 tso->why_blocked = NotBlocked;
2874 IF_PAR_DEBUG(verbose,
2875 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2876 tso->id, tso, tso->stack_size);
2877 /* If we're debugging, just print out the top of the stack */
2878 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2884 IF_DEBUG(sanity,checkTSO(dest));
2886 IF_DEBUG(scheduler,printTSO(dest));
2892 /* ---------------------------------------------------------------------------
2894 - usually called inside a signal handler so it mustn't do anything fancy.
2895 ------------------------------------------------------------------------ */
2898 interruptStgRts(void)
2900 sched_state = SCHED_INTERRUPTING;
2905 /* -----------------------------------------------------------------------------
2908 This function causes at least one OS thread to wake up and run the
2909 scheduler loop. It is invoked when the RTS might be deadlocked, or
2910 an external event has arrived that may need servicing (eg. a
2911 keyboard interrupt).
2913 In the single-threaded RTS we don't do anything here; we only have
2914 one thread anyway, and the event that caused us to want to wake up
2915 will have interrupted any blocking system call in progress anyway.
2916 -------------------------------------------------------------------------- */
2921 #if defined(THREADED_RTS)
2922 // This forces the IO Manager thread to wakeup, which will
2923 // in turn ensure that some OS thread wakes up and runs the
2924 // scheduler loop, which will cause a GC and deadlock check.
2929 /* -----------------------------------------------------------------------------
2932 * Check the blackhole_queue for threads that can be woken up. We do
2933 * this periodically: before every GC, and whenever the run queue is
2936 * An elegant solution might be to just wake up all the blocked
2937 * threads with awakenBlockedQueue occasionally: they'll go back to
2938 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2939 * doesn't give us a way to tell whether we've actually managed to
2940 * wake up any threads, so we would be busy-waiting.
2942 * -------------------------------------------------------------------------- */
2945 checkBlackHoles (Capability *cap)
2948 rtsBool any_woke_up = rtsFalse;
2951 // blackhole_queue is global:
2952 ASSERT_LOCK_HELD(&sched_mutex);
2954 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2956 // ASSUMES: sched_mutex
2957 prev = &blackhole_queue;
2958 t = blackhole_queue;
2959 while (t != END_TSO_QUEUE) {
2960 ASSERT(t->why_blocked == BlockedOnBlackHole);
2961 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2962 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2963 IF_DEBUG(sanity,checkTSO(t));
2964 t = unblockOne(cap, t);
2965 // urk, the threads migrate to the current capability
2966 // here, but we'd like to keep them on the original one.
2968 any_woke_up = rtsTrue;
2978 /* -----------------------------------------------------------------------------
2981 This is used for interruption (^C) and forking, and corresponds to
2982 raising an exception but without letting the thread catch the
2984 -------------------------------------------------------------------------- */
2987 deleteThread (Capability *cap, StgTSO *tso)
2989 // NOTE: must only be called on a TSO that we have exclusive
2990 // access to, because we will call throwToSingleThreaded() below.
2991 // The TSO must be on the run queue of the Capability we own, or
2992 // we must own all Capabilities.
2994 if (tso->why_blocked != BlockedOnCCall &&
2995 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2996 throwToSingleThreaded(cap,tso,NULL);
3000 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3002 deleteThread_(Capability *cap, StgTSO *tso)
3003 { // for forkProcess only:
3004 // like deleteThread(), but we delete threads in foreign calls, too.
3006 if (tso->why_blocked == BlockedOnCCall ||
3007 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
3008 unblockOne(cap,tso);
3009 tso->what_next = ThreadKilled;
3011 deleteThread(cap,tso);
3016 /* -----------------------------------------------------------------------------
3017 raiseExceptionHelper
3019 This function is called by the raise# primitve, just so that we can
3020 move some of the tricky bits of raising an exception from C-- into
3021 C. Who knows, it might be a useful re-useable thing here too.
3022 -------------------------------------------------------------------------- */
3025 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3027 Capability *cap = regTableToCapability(reg);
3028 StgThunk *raise_closure = NULL;
3030 StgRetInfoTable *info;
3032 // This closure represents the expression 'raise# E' where E
3033 // is the exception raise. It is used to overwrite all the
3034 // thunks which are currently under evaluataion.
3037 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3038 // LDV profiling: stg_raise_info has THUNK as its closure
3039 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3040 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3041 // 1 does not cause any problem unless profiling is performed.
3042 // However, when LDV profiling goes on, we need to linearly scan
3043 // small object pool, where raise_closure is stored, so we should
3044 // use MIN_UPD_SIZE.
3046 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3047 // sizeofW(StgClosure)+1);
3051 // Walk up the stack, looking for the catch frame. On the way,
3052 // we update any closures pointed to from update frames with the
3053 // raise closure that we just built.
3057 info = get_ret_itbl((StgClosure *)p);
3058 next = p + stack_frame_sizeW((StgClosure *)p);
3059 switch (info->i.type) {
3062 // Only create raise_closure if we need to.
3063 if (raise_closure == NULL) {
3065 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3066 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3067 raise_closure->payload[0] = exception;
3069 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3073 case ATOMICALLY_FRAME:
3074 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3076 return ATOMICALLY_FRAME;
3082 case CATCH_STM_FRAME:
3083 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3085 return CATCH_STM_FRAME;
3091 case CATCH_RETRY_FRAME:
3100 /* -----------------------------------------------------------------------------
3101 findRetryFrameHelper
3103 This function is called by the retry# primitive. It traverses the stack
3104 leaving tso->sp referring to the frame which should handle the retry.
3106 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3107 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3109 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3110 create) because retries are not considered to be exceptions, despite the
3111 similar implementation.
3113 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3114 not be created within memory transactions.
3115 -------------------------------------------------------------------------- */
3118 findRetryFrameHelper (StgTSO *tso)
3121 StgRetInfoTable *info;
3125 info = get_ret_itbl((StgClosure *)p);
3126 next = p + stack_frame_sizeW((StgClosure *)p);
3127 switch (info->i.type) {
3129 case ATOMICALLY_FRAME:
3130 debugTrace(DEBUG_stm,
3131 "found ATOMICALLY_FRAME at %p during retry", p);
3133 return ATOMICALLY_FRAME;
3135 case CATCH_RETRY_FRAME:
3136 debugTrace(DEBUG_stm,
3137 "found CATCH_RETRY_FRAME at %p during retrry", p);
3139 return CATCH_RETRY_FRAME;
3141 case CATCH_STM_FRAME: {
3142 StgTRecHeader *trec = tso -> trec;
3143 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3144 debugTrace(DEBUG_stm,
3145 "found CATCH_STM_FRAME at %p during retry", p);
3146 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3147 stmAbortTransaction(tso -> cap, trec);
3148 stmFreeAbortedTRec(tso -> cap, trec);
3149 tso -> trec = outer;
3156 ASSERT(info->i.type != CATCH_FRAME);
3157 ASSERT(info->i.type != STOP_FRAME);
3164 /* -----------------------------------------------------------------------------
3165 resurrectThreads is called after garbage collection on the list of
3166 threads found to be garbage. Each of these threads will be woken
3167 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3168 on an MVar, or NonTermination if the thread was blocked on a Black
3171 Locks: assumes we hold *all* the capabilities.
3172 -------------------------------------------------------------------------- */
3175 resurrectThreads (StgTSO *threads)
3180 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3181 next = tso->global_link;
3182 tso->global_link = all_threads;
3184 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3186 // Wake up the thread on the Capability it was last on
3189 switch (tso->why_blocked) {
3191 case BlockedOnException:
3192 /* Called by GC - sched_mutex lock is currently held. */
3193 throwToSingleThreaded(cap, tso,
3194 (StgClosure *)BlockedOnDeadMVar_closure);
3196 case BlockedOnBlackHole:
3197 throwToSingleThreaded(cap, tso,
3198 (StgClosure *)NonTermination_closure);
3201 throwToSingleThreaded(cap, tso,
3202 (StgClosure *)BlockedIndefinitely_closure);
3205 /* This might happen if the thread was blocked on a black hole
3206 * belonging to a thread that we've just woken up (raiseAsync
3207 * can wake up threads, remember...).
3211 barf("resurrectThreads: thread blocked in a strange way");