1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "OSThreads.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
22 #include "RtsSignals.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
31 #include "Proftimer.h"
33 #if defined(GRAN) || defined(PARALLEL_HASKELL)
34 # include "GranSimRts.h"
36 # include "ParallelRts.h"
37 # include "Parallel.h"
38 # include "ParallelDebug.h"
43 #include "Capability.h"
45 #include "AwaitEvent.h"
46 #if defined(mingw32_HOST_OS)
47 #include "win32/IOManager.h"
50 #include "RaiseAsync.h"
52 #include "ThrIOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
75 /* -----------------------------------------------------------------------------
77 * -------------------------------------------------------------------------- */
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
85 In GranSim we have a runnable and a blocked queue for each processor.
86 In order to minimise code changes new arrays run_queue_hds/tls
87 are created. run_queue_hd is then a short cut (macro) for
88 run_queue_hds[CurrentProc] (see GranSim.h).
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95 the std RTS (i.e. we are cheating). However, we don't use this list in
96 the GranSim specific code at the moment (so we are only potentially
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
108 /* Threads blocked on blackholes.
109 * LOCK: sched_mutex+capability, or all capabilities
111 StgTSO *blackhole_queue = NULL;
114 /* The blackhole_queue should be checked for threads to wake up. See
115 * Schedule.h for more thorough comment.
116 * LOCK: none (doesn't matter if we miss an update)
118 rtsBool blackholes_need_checking = rtsFalse;
120 /* Linked list of all threads.
121 * Used for detecting garbage collected threads.
122 * LOCK: sched_mutex+capability, or all capabilities
124 StgTSO *all_threads = NULL;
126 /* flag set by signal handler to precipitate a context switch
127 * LOCK: none (just an advisory flag)
129 int context_switch = 0;
131 /* flag that tracks whether we have done any execution in this time slice.
132 * LOCK: currently none, perhaps we should lock (but needs to be
133 * updated in the fast path of the scheduler).
135 nat recent_activity = ACTIVITY_YES;
137 /* if this flag is set as well, give up execution
138 * LOCK: none (changes once, from false->true)
140 rtsBool sched_state = SCHED_RUNNING;
146 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
147 * exists - earlier gccs apparently didn't.
153 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
154 * in an MT setting, needed to signal that a worker thread shouldn't hang around
155 * in the scheduler when it is out of work.
157 rtsBool shutting_down_scheduler = rtsFalse;
160 * This mutex protects most of the global scheduler data in
161 * the THREADED_RTS runtime.
163 #if defined(THREADED_RTS)
167 #if defined(PARALLEL_HASKELL)
169 rtsTime TimeOfLastYield;
170 rtsBool emitSchedule = rtsTrue;
173 #if !defined(mingw32_HOST_OS)
174 #define FORKPROCESS_PRIMOP_SUPPORTED
177 /* -----------------------------------------------------------------------------
178 * static function prototypes
179 * -------------------------------------------------------------------------- */
181 static Capability *schedule (Capability *initialCapability, Task *task);
184 // These function all encapsulate parts of the scheduler loop, and are
185 // abstracted only to make the structure and control flow of the
186 // scheduler clearer.
188 static void schedulePreLoop (void);
189 #if defined(THREADED_RTS)
190 static void schedulePushWork(Capability *cap, Task *task);
192 static void scheduleStartSignalHandlers (Capability *cap);
193 static void scheduleCheckBlockedThreads (Capability *cap);
194 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
195 static void scheduleCheckBlackHoles (Capability *cap);
196 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #if defined(PARALLEL_HASKELL)
201 static StgTSO *scheduleSendPendingMessages(void);
202 static void scheduleActivateSpark(void);
203 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #if defined(PAR) || defined(GRAN)
206 static void scheduleGranParReport(void);
208 static void schedulePostRunThread(void);
209 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
210 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
212 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
213 nat prev_what_next );
214 static void scheduleHandleThreadBlocked( StgTSO *t );
215 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
218 static Capability *scheduleDoGC(Capability *cap, Task *task,
219 rtsBool force_major);
221 static rtsBool checkBlackHoles(Capability *cap);
223 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225 static void deleteThread (Capability *cap, StgTSO *tso);
226 static void deleteAllThreads (Capability *cap);
228 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
229 static void deleteThread_(Capability *cap, StgTSO *tso);
232 #if defined(PARALLEL_HASKELL)
233 StgTSO * createSparkThread(rtsSpark spark);
234 StgTSO * activateSpark (rtsSpark spark);
238 static char *whatNext_strs[] = {
248 /* -----------------------------------------------------------------------------
249 * Putting a thread on the run queue: different scheduling policies
250 * -------------------------------------------------------------------------- */
253 addToRunQueue( Capability *cap, StgTSO *t )
255 #if defined(PARALLEL_HASKELL)
256 if (RtsFlags.ParFlags.doFairScheduling) {
257 // this does round-robin scheduling; good for concurrency
258 appendToRunQueue(cap,t);
260 // this does unfair scheduling; good for parallelism
261 pushOnRunQueue(cap,t);
264 // this does round-robin scheduling; good for concurrency
265 appendToRunQueue(cap,t);
269 /* ---------------------------------------------------------------------------
270 Main scheduling loop.
272 We use round-robin scheduling, each thread returning to the
273 scheduler loop when one of these conditions is detected:
276 * timer expires (thread yields)
282 In a GranSim setup this loop iterates over the global event queue.
283 This revolves around the global event queue, which determines what
284 to do next. Therefore, it's more complicated than either the
285 concurrent or the parallel (GUM) setup.
288 GUM iterates over incoming messages.
289 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
290 and sends out a fish whenever it has nothing to do; in-between
291 doing the actual reductions (shared code below) it processes the
292 incoming messages and deals with delayed operations
293 (see PendingFetches).
294 This is not the ugliest code you could imagine, but it's bloody close.
296 ------------------------------------------------------------------------ */
299 schedule (Capability *initialCapability, Task *task)
303 StgThreadReturnCode ret;
306 #elif defined(PARALLEL_HASKELL)
309 rtsBool receivedFinish = rtsFalse;
311 nat tp_size, sp_size; // stats only
316 #if defined(THREADED_RTS)
317 rtsBool first = rtsTrue;
320 cap = initialCapability;
322 // Pre-condition: this task owns initialCapability.
323 // The sched_mutex is *NOT* held
324 // NB. on return, we still hold a capability.
326 debugTrace (DEBUG_sched,
327 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
328 task, initialCapability);
332 // -----------------------------------------------------------
333 // Scheduler loop starts here:
335 #if defined(PARALLEL_HASKELL)
336 #define TERMINATION_CONDITION (!receivedFinish)
338 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
340 #define TERMINATION_CONDITION rtsTrue
343 while (TERMINATION_CONDITION) {
346 /* Choose the processor with the next event */
347 CurrentProc = event->proc;
348 CurrentTSO = event->tso;
351 #if defined(THREADED_RTS)
353 // don't yield the first time, we want a chance to run this
354 // thread for a bit, even if there are others banging at the
357 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359 // Yield the capability to higher-priority tasks if necessary.
360 yieldCapability(&cap, task);
364 #if defined(THREADED_RTS)
365 schedulePushWork(cap,task);
368 // Check whether we have re-entered the RTS from Haskell without
369 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371 if (cap->in_haskell) {
372 errorBelch("schedule: re-entered unsafely.\n"
373 " Perhaps a 'foreign import unsafe' should be 'safe'?");
374 stg_exit(EXIT_FAILURE);
377 // The interruption / shutdown sequence.
379 // In order to cleanly shut down the runtime, we want to:
380 // * make sure that all main threads return to their callers
381 // with the state 'Interrupted'.
382 // * clean up all OS threads assocated with the runtime
383 // * free all memory etc.
385 // So the sequence for ^C goes like this:
387 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
388 // arranges for some Capability to wake up
390 // * all threads in the system are halted, and the zombies are
391 // placed on the run queue for cleaning up. We acquire all
392 // the capabilities in order to delete the threads, this is
393 // done by scheduleDoGC() for convenience (because GC already
394 // needs to acquire all the capabilities). We can't kill
395 // threads involved in foreign calls.
397 // * somebody calls shutdownHaskell(), which calls exitScheduler()
399 // * sched_state := SCHED_SHUTTING_DOWN
401 // * all workers exit when the run queue on their capability
402 // drains. All main threads will also exit when their TSO
403 // reaches the head of the run queue and they can return.
405 // * eventually all Capabilities will shut down, and the RTS can
408 // * We might be left with threads blocked in foreign calls,
409 // we should really attempt to kill these somehow (TODO);
411 switch (sched_state) {
414 case SCHED_INTERRUPTING:
415 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
416 #if defined(THREADED_RTS)
417 discardSparksCap(cap);
419 /* scheduleDoGC() deletes all the threads */
420 cap = scheduleDoGC(cap,task,rtsFalse);
422 case SCHED_SHUTTING_DOWN:
423 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
424 // If we are a worker, just exit. If we're a bound thread
425 // then we will exit below when we've removed our TSO from
427 if (task->tso == NULL && emptyRunQueue(cap)) {
432 barf("sched_state: %d", sched_state);
435 #if defined(THREADED_RTS)
436 // If the run queue is empty, take a spark and turn it into a thread.
438 if (emptyRunQueue(cap)) {
440 spark = findSpark(cap);
442 debugTrace(DEBUG_sched,
443 "turning spark of closure %p into a thread",
444 (StgClosure *)spark);
445 createSparkThread(cap,spark);
449 #endif // THREADED_RTS
451 scheduleStartSignalHandlers(cap);
453 // Only check the black holes here if we've nothing else to do.
454 // During normal execution, the black hole list only gets checked
455 // at GC time, to avoid repeatedly traversing this possibly long
456 // list each time around the scheduler.
457 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459 scheduleCheckWakeupThreads(cap);
461 scheduleCheckBlockedThreads(cap);
463 scheduleDetectDeadlock(cap,task);
464 #if defined(THREADED_RTS)
465 cap = task->cap; // reload cap, it might have changed
468 // Normally, the only way we can get here with no threads to
469 // run is if a keyboard interrupt received during
470 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
471 // Additionally, it is not fatal for the
472 // threaded RTS to reach here with no threads to run.
474 // win32: might be here due to awaitEvent() being abandoned
475 // as a result of a console event having been delivered.
476 if ( emptyRunQueue(cap) ) {
477 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
478 ASSERT(sched_state >= SCHED_INTERRUPTING);
480 continue; // nothing to do
483 #if defined(PARALLEL_HASKELL)
484 scheduleSendPendingMessages();
485 if (emptyRunQueue(cap) && scheduleActivateSpark())
489 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
492 /* If we still have no work we need to send a FISH to get a spark
494 if (emptyRunQueue(cap)) {
495 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
496 ASSERT(rtsFalse); // should not happen at the moment
498 // from here: non-empty run queue.
499 // TODO: merge above case with this, only one call processMessages() !
500 if (PacketsWaiting()) { /* process incoming messages, if
501 any pending... only in else
502 because getRemoteWork waits for
504 receivedFinish = processMessages();
509 scheduleProcessEvent(event);
513 // Get a thread to run
515 t = popRunQueue(cap);
517 #if defined(GRAN) || defined(PAR)
518 scheduleGranParReport(); // some kind of debuging output
520 // Sanity check the thread we're about to run. This can be
521 // expensive if there is lots of thread switching going on...
522 IF_DEBUG(sanity,checkTSO(t));
525 #if defined(THREADED_RTS)
526 // Check whether we can run this thread in the current task.
527 // If not, we have to pass our capability to the right task.
529 Task *bound = t->bound;
533 debugTrace(DEBUG_sched,
534 "### Running thread %lu in bound thread", (unsigned long)t->id);
535 // yes, the Haskell thread is bound to the current native thread
537 debugTrace(DEBUG_sched,
538 "### thread %lu bound to another OS thread", (unsigned long)t->id);
539 // no, bound to a different Haskell thread: pass to that thread
540 pushOnRunQueue(cap,t);
544 // The thread we want to run is unbound.
546 debugTrace(DEBUG_sched,
547 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
548 // no, the current native thread is bound to a different
549 // Haskell thread, so pass it to any worker thread
550 pushOnRunQueue(cap,t);
557 cap->r.rCurrentTSO = t;
559 /* context switches are initiated by the timer signal, unless
560 * the user specified "context switch as often as possible", with
563 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
564 && !emptyThreadQueues(cap)) {
570 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
571 (long)t->id, whatNext_strs[t->what_next]);
573 startHeapProfTimer();
575 // Check for exceptions blocked on this thread
576 maybePerformBlockedException (cap, t);
578 // ----------------------------------------------------------------------
579 // Run the current thread
581 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
582 ASSERT(t->cap == cap);
584 prev_what_next = t->what_next;
586 errno = t->saved_errno;
588 SetLastError(t->saved_winerror);
591 cap->in_haskell = rtsTrue;
595 recent_activity = ACTIVITY_YES;
597 switch (prev_what_next) {
601 /* Thread already finished, return to scheduler. */
602 ret = ThreadFinished;
608 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609 cap = regTableToCapability(r);
614 case ThreadInterpret:
615 cap = interpretBCO(cap);
620 barf("schedule: invalid what_next field");
623 cap->in_haskell = rtsFalse;
625 // The TSO might have moved, eg. if it re-entered the RTS and a GC
626 // happened. So find the new location:
627 t = cap->r.rCurrentTSO;
629 // We have run some Haskell code: there might be blackhole-blocked
630 // threads to wake up now.
631 // Lock-free test here should be ok, we're just setting a flag.
632 if ( blackhole_queue != END_TSO_QUEUE ) {
633 blackholes_need_checking = rtsTrue;
636 // And save the current errno in this thread.
637 // XXX: possibly bogus for SMP because this thread might already
638 // be running again, see code below.
639 t->saved_errno = errno;
641 // Similarly for Windows error code
642 t->saved_winerror = GetLastError();
645 #if defined(THREADED_RTS)
646 // If ret is ThreadBlocked, and this Task is bound to the TSO that
647 // blocked, we are in limbo - the TSO is now owned by whatever it
648 // is blocked on, and may in fact already have been woken up,
649 // perhaps even on a different Capability. It may be the case
650 // that task->cap != cap. We better yield this Capability
651 // immediately and return to normaility.
652 if (ret == ThreadBlocked) {
653 debugTrace(DEBUG_sched,
654 "--<< thread %lu (%s) stopped: blocked",
655 (unsigned long)t->id, whatNext_strs[t->what_next]);
660 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
661 ASSERT(t->cap == cap);
663 // ----------------------------------------------------------------------
665 // Costs for the scheduler are assigned to CCS_SYSTEM
667 #if defined(PROFILING)
671 schedulePostRunThread();
673 ready_to_gc = rtsFalse;
677 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
681 scheduleHandleStackOverflow(cap,task,t);
685 if (scheduleHandleYield(cap, t, prev_what_next)) {
686 // shortcut for switching between compiler/interpreter:
692 scheduleHandleThreadBlocked(t);
696 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
697 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
701 barf("schedule: invalid thread return code %d", (int)ret);
704 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
705 cap = scheduleDoGC(cap,task,rtsFalse);
707 } /* end of while() */
709 debugTrace(PAR_DEBUG_verbose,
710 "== Leaving schedule() after having received Finish");
713 /* ----------------------------------------------------------------------------
714 * Setting up the scheduler loop
715 * ------------------------------------------------------------------------- */
718 schedulePreLoop(void)
721 /* set up first event to get things going */
722 /* ToDo: assign costs for system setup and init MainTSO ! */
723 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
725 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
727 debugTrace (DEBUG_gran,
728 "GRAN: Init CurrentTSO (in schedule) = %p",
730 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
732 if (RtsFlags.GranFlags.Light) {
733 /* Save current time; GranSim Light only */
734 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
739 /* -----------------------------------------------------------------------------
742 * Push work to other Capabilities if we have some.
743 * -------------------------------------------------------------------------- */
745 #if defined(THREADED_RTS)
747 schedulePushWork(Capability *cap USED_IF_THREADS,
748 Task *task USED_IF_THREADS)
750 Capability *free_caps[n_capabilities], *cap0;
753 // migration can be turned off with +RTS -qg
754 if (!RtsFlags.ParFlags.migrate) return;
756 // Check whether we have more threads on our run queue, or sparks
757 // in our pool, that we could hand to another Capability.
758 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
759 && sparkPoolSizeCap(cap) < 2) {
763 // First grab as many free Capabilities as we can.
764 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
765 cap0 = &capabilities[i];
766 if (cap != cap0 && tryGrabCapability(cap0,task)) {
767 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
768 // it already has some work, we just grabbed it at
769 // the wrong moment. Or maybe it's deadlocked!
770 releaseCapability(cap0);
772 free_caps[n_free_caps++] = cap0;
777 // we now have n_free_caps free capabilities stashed in
778 // free_caps[]. Share our run queue equally with them. This is
779 // probably the simplest thing we could do; improvements we might
780 // want to do include:
782 // - giving high priority to moving relatively new threads, on
783 // the gournds that they haven't had time to build up a
784 // working set in the cache on this CPU/Capability.
786 // - giving low priority to moving long-lived threads
788 if (n_free_caps > 0) {
789 StgTSO *prev, *t, *next;
790 rtsBool pushed_to_all;
792 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
795 pushed_to_all = rtsFalse;
797 if (cap->run_queue_hd != END_TSO_QUEUE) {
798 prev = cap->run_queue_hd;
800 prev->link = END_TSO_QUEUE;
801 for (; t != END_TSO_QUEUE; t = next) {
803 t->link = END_TSO_QUEUE;
804 if (t->what_next == ThreadRelocated
805 || t->bound == task // don't move my bound thread
806 || tsoLocked(t)) { // don't move a locked thread
809 } else if (i == n_free_caps) {
810 pushed_to_all = rtsTrue;
816 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
817 appendToRunQueue(free_caps[i],t);
818 if (t->bound) { t->bound->cap = free_caps[i]; }
819 t->cap = free_caps[i];
823 cap->run_queue_tl = prev;
826 // If there are some free capabilities that we didn't push any
827 // threads to, then try to push a spark to each one.
828 if (!pushed_to_all) {
830 // i is the next free capability to push to
831 for (; i < n_free_caps; i++) {
832 if (emptySparkPoolCap(free_caps[i])) {
833 spark = findSpark(cap);
835 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
836 newSpark(&(free_caps[i]->r), spark);
842 // release the capabilities
843 for (i = 0; i < n_free_caps; i++) {
844 task->cap = free_caps[i];
845 releaseCapability(free_caps[i]);
848 task->cap = cap; // reset to point to our Capability.
852 /* ----------------------------------------------------------------------------
853 * Start any pending signal handlers
854 * ------------------------------------------------------------------------- */
856 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
858 scheduleStartSignalHandlers(Capability *cap)
860 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
861 // safe outside the lock
862 startSignalHandlers(cap);
867 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
872 /* ----------------------------------------------------------------------------
873 * Check for blocked threads that can be woken up.
874 * ------------------------------------------------------------------------- */
877 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
879 #if !defined(THREADED_RTS)
881 // Check whether any waiting threads need to be woken up. If the
882 // run queue is empty, and there are no other tasks running, we
883 // can wait indefinitely for something to happen.
885 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
887 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
893 /* ----------------------------------------------------------------------------
894 * Check for threads woken up by other Capabilities
895 * ------------------------------------------------------------------------- */
898 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
900 #if defined(THREADED_RTS)
901 // Any threads that were woken up by other Capabilities get
902 // appended to our run queue.
903 if (!emptyWakeupQueue(cap)) {
904 ACQUIRE_LOCK(&cap->lock);
905 if (emptyRunQueue(cap)) {
906 cap->run_queue_hd = cap->wakeup_queue_hd;
907 cap->run_queue_tl = cap->wakeup_queue_tl;
909 cap->run_queue_tl->link = cap->wakeup_queue_hd;
910 cap->run_queue_tl = cap->wakeup_queue_tl;
912 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
913 RELEASE_LOCK(&cap->lock);
918 /* ----------------------------------------------------------------------------
919 * Check for threads blocked on BLACKHOLEs that can be woken up
920 * ------------------------------------------------------------------------- */
922 scheduleCheckBlackHoles (Capability *cap)
924 if ( blackholes_need_checking ) // check without the lock first
926 ACQUIRE_LOCK(&sched_mutex);
927 if ( blackholes_need_checking ) {
928 checkBlackHoles(cap);
929 blackholes_need_checking = rtsFalse;
931 RELEASE_LOCK(&sched_mutex);
935 /* ----------------------------------------------------------------------------
936 * Detect deadlock conditions and attempt to resolve them.
937 * ------------------------------------------------------------------------- */
940 scheduleDetectDeadlock (Capability *cap, Task *task)
943 #if defined(PARALLEL_HASKELL)
944 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
949 * Detect deadlock: when we have no threads to run, there are no
950 * threads blocked, waiting for I/O, or sleeping, and all the
951 * other tasks are waiting for work, we must have a deadlock of
954 if ( emptyThreadQueues(cap) )
956 #if defined(THREADED_RTS)
958 * In the threaded RTS, we only check for deadlock if there
959 * has been no activity in a complete timeslice. This means
960 * we won't eagerly start a full GC just because we don't have
961 * any threads to run currently.
963 if (recent_activity != ACTIVITY_INACTIVE) return;
966 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
968 // Garbage collection can release some new threads due to
969 // either (a) finalizers or (b) threads resurrected because
970 // they are unreachable and will therefore be sent an
971 // exception. Any threads thus released will be immediately
973 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
975 recent_activity = ACTIVITY_DONE_GC;
977 if ( !emptyRunQueue(cap) ) return;
979 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
980 /* If we have user-installed signal handlers, then wait
981 * for signals to arrive rather then bombing out with a
984 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
985 debugTrace(DEBUG_sched,
986 "still deadlocked, waiting for signals...");
990 if (signals_pending()) {
991 startSignalHandlers(cap);
994 // either we have threads to run, or we were interrupted:
995 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
999 #if !defined(THREADED_RTS)
1000 /* Probably a real deadlock. Send the current main thread the
1001 * Deadlock exception.
1004 switch (task->tso->why_blocked) {
1006 case BlockedOnBlackHole:
1007 case BlockedOnException:
1009 throwToSingleThreaded(cap, task->tso,
1010 (StgClosure *)NonTermination_closure);
1013 barf("deadlock: main thread blocked in a strange way");
1021 /* ----------------------------------------------------------------------------
1022 * Process an event (GRAN only)
1023 * ------------------------------------------------------------------------- */
1027 scheduleProcessEvent(rtsEvent *event)
1031 if (RtsFlags.GranFlags.Light)
1032 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1034 /* adjust time based on time-stamp */
1035 if (event->time > CurrentTime[CurrentProc] &&
1036 event->evttype != ContinueThread)
1037 CurrentTime[CurrentProc] = event->time;
1039 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1040 if (!RtsFlags.GranFlags.Light)
1043 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1045 /* main event dispatcher in GranSim */
1046 switch (event->evttype) {
1047 /* Should just be continuing execution */
1048 case ContinueThread:
1049 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1050 /* ToDo: check assertion
1051 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1052 run_queue_hd != END_TSO_QUEUE);
1054 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1055 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1056 procStatus[CurrentProc]==Fetching) {
1057 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1058 CurrentTSO->id, CurrentTSO, CurrentProc);
1061 /* Ignore ContinueThreads for completed threads */
1062 if (CurrentTSO->what_next == ThreadComplete) {
1063 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1064 CurrentTSO->id, CurrentTSO, CurrentProc);
1067 /* Ignore ContinueThreads for threads that are being migrated */
1068 if (PROCS(CurrentTSO)==Nowhere) {
1069 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1070 CurrentTSO->id, CurrentTSO, CurrentProc);
1073 /* The thread should be at the beginning of the run queue */
1074 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1075 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1076 CurrentTSO->id, CurrentTSO, CurrentProc);
1077 break; // run the thread anyway
1080 new_event(proc, proc, CurrentTime[proc],
1082 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1084 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1085 break; // now actually run the thread; DaH Qu'vam yImuHbej
1088 do_the_fetchnode(event);
1089 goto next_thread; /* handle next event in event queue */
1092 do_the_globalblock(event);
1093 goto next_thread; /* handle next event in event queue */
1096 do_the_fetchreply(event);
1097 goto next_thread; /* handle next event in event queue */
1099 case UnblockThread: /* Move from the blocked queue to the tail of */
1100 do_the_unblock(event);
1101 goto next_thread; /* handle next event in event queue */
1103 case ResumeThread: /* Move from the blocked queue to the tail of */
1104 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1105 event->tso->gran.blocktime +=
1106 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1107 do_the_startthread(event);
1108 goto next_thread; /* handle next event in event queue */
1111 do_the_startthread(event);
1112 goto next_thread; /* handle next event in event queue */
1115 do_the_movethread(event);
1116 goto next_thread; /* handle next event in event queue */
1119 do_the_movespark(event);
1120 goto next_thread; /* handle next event in event queue */
1123 do_the_findwork(event);
1124 goto next_thread; /* handle next event in event queue */
1127 barf("Illegal event type %u\n", event->evttype);
1130 /* This point was scheduler_loop in the old RTS */
1132 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1134 TimeOfLastEvent = CurrentTime[CurrentProc];
1135 TimeOfNextEvent = get_time_of_next_event();
1136 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1137 // CurrentTSO = ThreadQueueHd;
1139 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1142 if (RtsFlags.GranFlags.Light)
1143 GranSimLight_leave_system(event, &ActiveTSO);
1145 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1148 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1150 /* in a GranSim setup the TSO stays on the run queue */
1152 /* Take a thread from the run queue. */
1153 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1156 debugBelch("GRAN: About to run current thread, which is\n");
1159 context_switch = 0; // turned on via GranYield, checking events and time slice
1162 DumpGranEvent(GR_SCHEDULE, t));
1164 procStatus[CurrentProc] = Busy;
1168 /* ----------------------------------------------------------------------------
1169 * Send pending messages (PARALLEL_HASKELL only)
1170 * ------------------------------------------------------------------------- */
1172 #if defined(PARALLEL_HASKELL)
1174 scheduleSendPendingMessages(void)
1180 # if defined(PAR) // global Mem.Mgmt., omit for now
1181 if (PendingFetches != END_BF_QUEUE) {
1186 if (RtsFlags.ParFlags.BufferTime) {
1187 // if we use message buffering, we must send away all message
1188 // packets which have become too old...
1194 /* ----------------------------------------------------------------------------
1195 * Activate spark threads (PARALLEL_HASKELL only)
1196 * ------------------------------------------------------------------------- */
1198 #if defined(PARALLEL_HASKELL)
1200 scheduleActivateSpark(void)
1203 ASSERT(emptyRunQueue());
1204 /* We get here if the run queue is empty and want some work.
1205 We try to turn a spark into a thread, and add it to the run queue,
1206 from where it will be picked up in the next iteration of the scheduler
1210 /* :-[ no local threads => look out for local sparks */
1211 /* the spark pool for the current PE */
1212 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1213 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1214 pool->hd < pool->tl) {
1216 * ToDo: add GC code check that we really have enough heap afterwards!!
1218 * If we're here (no runnable threads) and we have pending
1219 * sparks, we must have a space problem. Get enough space
1220 * to turn one of those pending sparks into a
1224 spark = findSpark(rtsFalse); /* get a spark */
1225 if (spark != (rtsSpark) NULL) {
1226 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1227 IF_PAR_DEBUG(fish, // schedule,
1228 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1229 tso->id, tso, advisory_thread_count));
1231 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1232 IF_PAR_DEBUG(fish, // schedule,
1233 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1235 return rtsFalse; /* failed to generate a thread */
1236 } /* otherwise fall through & pick-up new tso */
1238 IF_PAR_DEBUG(fish, // schedule,
1239 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1240 spark_queue_len(pool)));
1241 return rtsFalse; /* failed to generate a thread */
1243 return rtsTrue; /* success in generating a thread */
1244 } else { /* no more threads permitted or pool empty */
1245 return rtsFalse; /* failed to generateThread */
1248 tso = NULL; // avoid compiler warning only
1249 return rtsFalse; /* dummy in non-PAR setup */
1252 #endif // PARALLEL_HASKELL
1254 /* ----------------------------------------------------------------------------
1255 * Get work from a remote node (PARALLEL_HASKELL only)
1256 * ------------------------------------------------------------------------- */
1258 #if defined(PARALLEL_HASKELL)
1260 scheduleGetRemoteWork(rtsBool *receivedFinish)
1262 ASSERT(emptyRunQueue());
1264 if (RtsFlags.ParFlags.BufferTime) {
1265 IF_PAR_DEBUG(verbose,
1266 debugBelch("...send all pending data,"));
1269 for (i=1; i<=nPEs; i++)
1270 sendImmediately(i); // send all messages away immediately
1274 //++EDEN++ idle() , i.e. send all buffers, wait for work
1275 // suppress fishing in EDEN... just look for incoming messages
1276 // (blocking receive)
1277 IF_PAR_DEBUG(verbose,
1278 debugBelch("...wait for incoming messages...\n"));
1279 *receivedFinish = processMessages(); // blocking receive...
1281 // and reenter scheduling loop after having received something
1282 // (return rtsFalse below)
1284 # else /* activate SPARKS machinery */
1285 /* We get here, if we have no work, tried to activate a local spark, but still
1286 have no work. We try to get a remote spark, by sending a FISH message.
1287 Thread migration should be added here, and triggered when a sequence of
1288 fishes returns without work. */
1289 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1291 /* =8-[ no local sparks => look for work on other PEs */
1293 * We really have absolutely no work. Send out a fish
1294 * (there may be some out there already), and wait for
1295 * something to arrive. We clearly can't run any threads
1296 * until a SCHEDULE or RESUME arrives, and so that's what
1297 * we're hoping to see. (Of course, we still have to
1298 * respond to other types of messages.)
1300 rtsTime now = msTime() /*CURRENT_TIME*/;
1301 IF_PAR_DEBUG(verbose,
1302 debugBelch("-- now=%ld\n", now));
1303 IF_PAR_DEBUG(fish, // verbose,
1304 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1305 (last_fish_arrived_at!=0 &&
1306 last_fish_arrived_at+delay > now)) {
1307 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1308 now, last_fish_arrived_at+delay,
1309 last_fish_arrived_at,
1313 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1315 if (last_fish_arrived_at==0 ||
1316 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1317 /* outstandingFishes is set in sendFish, processFish;
1318 avoid flooding system with fishes via delay */
1319 next_fish_to_send_at = 0;
1321 /* ToDo: this should be done in the main scheduling loop to avoid the
1322 busy wait here; not so bad if fish delay is very small */
1323 int iq = 0; // DEBUGGING -- HWL
1324 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1325 /* send a fish when ready, but process messages that arrive in the meantime */
1327 if (PacketsWaiting()) {
1329 *receivedFinish = processMessages();
1332 } while (!*receivedFinish || now<next_fish_to_send_at);
1333 // JB: This means the fish could become obsolete, if we receive
1334 // work. Better check for work again?
1335 // last line: while (!receivedFinish || !haveWork || now<...)
1336 // next line: if (receivedFinish || haveWork )
1338 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1339 return rtsFalse; // NB: this will leave scheduler loop
1340 // immediately after return!
1342 IF_PAR_DEBUG(fish, // verbose,
1343 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1347 // JB: IMHO, this should all be hidden inside sendFish(...)
1349 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1352 // Global statistics: count no. of fishes
1353 if (RtsFlags.ParFlags.ParStats.Global &&
1354 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1355 globalParStats.tot_fish_mess++;
1359 /* delayed fishes must have been sent by now! */
1360 next_fish_to_send_at = 0;
1363 *receivedFinish = processMessages();
1364 # endif /* SPARKS */
1367 /* NB: this function always returns rtsFalse, meaning the scheduler
1368 loop continues with the next iteration;
1370 return code means success in finding work; we enter this function
1371 if there is no local work, thus have to send a fish which takes
1372 time until it arrives with work; in the meantime we should process
1373 messages in the main loop;
1376 #endif // PARALLEL_HASKELL
1378 /* ----------------------------------------------------------------------------
1379 * PAR/GRAN: Report stats & debugging info(?)
1380 * ------------------------------------------------------------------------- */
1382 #if defined(PAR) || defined(GRAN)
1384 scheduleGranParReport(void)
1386 ASSERT(run_queue_hd != END_TSO_QUEUE);
1388 /* Take a thread from the run queue, if we have work */
1389 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1391 /* If this TSO has got its outport closed in the meantime,
1392 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1393 * It has to be marked as TH_DEAD for this purpose.
1394 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1396 JB: TODO: investigate wether state change field could be nuked
1397 entirely and replaced by the normal tso state (whatnext
1398 field). All we want to do is to kill tsos from outside.
1401 /* ToDo: write something to the log-file
1402 if (RTSflags.ParFlags.granSimStats && !sameThread)
1403 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1407 /* the spark pool for the current PE */
1408 pool = &(cap.r.rSparks); // cap = (old) MainCap
1411 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1412 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1415 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1416 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1418 if (RtsFlags.ParFlags.ParStats.Full &&
1419 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1420 (emitSchedule || // forced emit
1421 (t && LastTSO && t->id != LastTSO->id))) {
1423 we are running a different TSO, so write a schedule event to log file
1424 NB: If we use fair scheduling we also have to write a deschedule
1425 event for LastTSO; with unfair scheduling we know that the
1426 previous tso has blocked whenever we switch to another tso, so
1427 we don't need it in GUM for now
1429 IF_PAR_DEBUG(fish, // schedule,
1430 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1432 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1433 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1434 emitSchedule = rtsFalse;
1439 /* ----------------------------------------------------------------------------
1440 * After running a thread...
1441 * ------------------------------------------------------------------------- */
1444 schedulePostRunThread(void)
1447 /* HACK 675: if the last thread didn't yield, make sure to print a
1448 SCHEDULE event to the log file when StgRunning the next thread, even
1449 if it is the same one as before */
1451 TimeOfLastYield = CURRENT_TIME;
1454 /* some statistics gathering in the parallel case */
1456 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1460 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1461 globalGranStats.tot_heapover++;
1463 globalParStats.tot_heapover++;
1470 DumpGranEvent(GR_DESCHEDULE, t));
1471 globalGranStats.tot_stackover++;
1474 // DumpGranEvent(GR_DESCHEDULE, t);
1475 globalParStats.tot_stackover++;
1479 case ThreadYielding:
1482 DumpGranEvent(GR_DESCHEDULE, t));
1483 globalGranStats.tot_yields++;
1486 // DumpGranEvent(GR_DESCHEDULE, t);
1487 globalParStats.tot_yields++;
1493 debugTrace(DEBUG_sched,
1494 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1495 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1496 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1497 if (t->block_info.closure!=(StgClosure*)NULL)
1498 print_bq(t->block_info.closure);
1501 // ??? needed; should emit block before
1503 DumpGranEvent(GR_DESCHEDULE, t));
1504 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1507 ASSERT(procStatus[CurrentProc]==Busy ||
1508 ((procStatus[CurrentProc]==Fetching) &&
1509 (t->block_info.closure!=(StgClosure*)NULL)));
1510 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1511 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1512 procStatus[CurrentProc]==Fetching))
1513 procStatus[CurrentProc] = Idle;
1516 //++PAR++ blockThread() writes the event (change?)
1520 case ThreadFinished:
1524 barf("parGlobalStats: unknown return code");
1530 /* -----------------------------------------------------------------------------
1531 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1532 * -------------------------------------------------------------------------- */
1535 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1537 // did the task ask for a large block?
1538 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1539 // if so, get one and push it on the front of the nursery.
1543 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1545 debugTrace(DEBUG_sched,
1546 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1547 (long)t->id, whatNext_strs[t->what_next], blocks);
1549 // don't do this if the nursery is (nearly) full, we'll GC first.
1550 if (cap->r.rCurrentNursery->link != NULL ||
1551 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1552 // if the nursery has only one block.
1555 bd = allocGroup( blocks );
1557 cap->r.rNursery->n_blocks += blocks;
1559 // link the new group into the list
1560 bd->link = cap->r.rCurrentNursery;
1561 bd->u.back = cap->r.rCurrentNursery->u.back;
1562 if (cap->r.rCurrentNursery->u.back != NULL) {
1563 cap->r.rCurrentNursery->u.back->link = bd;
1565 #if !defined(THREADED_RTS)
1566 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1567 g0s0 == cap->r.rNursery);
1569 cap->r.rNursery->blocks = bd;
1571 cap->r.rCurrentNursery->u.back = bd;
1573 // initialise it as a nursery block. We initialise the
1574 // step, gen_no, and flags field of *every* sub-block in
1575 // this large block, because this is easier than making
1576 // sure that we always find the block head of a large
1577 // block whenever we call Bdescr() (eg. evacuate() and
1578 // isAlive() in the GC would both have to do this, at
1582 for (x = bd; x < bd + blocks; x++) {
1583 x->step = cap->r.rNursery;
1589 // This assert can be a killer if the app is doing lots
1590 // of large block allocations.
1591 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1593 // now update the nursery to point to the new block
1594 cap->r.rCurrentNursery = bd;
1596 // we might be unlucky and have another thread get on the
1597 // run queue before us and steal the large block, but in that
1598 // case the thread will just end up requesting another large
1600 pushOnRunQueue(cap,t);
1601 return rtsFalse; /* not actually GC'ing */
1605 debugTrace(DEBUG_sched,
1606 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1607 (long)t->id, whatNext_strs[t->what_next]);
1610 ASSERT(!is_on_queue(t,CurrentProc));
1611 #elif defined(PARALLEL_HASKELL)
1612 /* Currently we emit a DESCHEDULE event before GC in GUM.
1613 ToDo: either add separate event to distinguish SYSTEM time from rest
1614 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1615 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1616 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1617 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1618 emitSchedule = rtsTrue;
1622 pushOnRunQueue(cap,t);
1624 /* actual GC is done at the end of the while loop in schedule() */
1627 /* -----------------------------------------------------------------------------
1628 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1629 * -------------------------------------------------------------------------- */
1632 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1634 debugTrace (DEBUG_sched,
1635 "--<< thread %ld (%s) stopped, StackOverflow",
1636 (long)t->id, whatNext_strs[t->what_next]);
1638 /* just adjust the stack for this thread, then pop it back
1642 /* enlarge the stack */
1643 StgTSO *new_t = threadStackOverflow(cap, t);
1645 /* The TSO attached to this Task may have moved, so update the
1648 if (task->tso == t) {
1651 pushOnRunQueue(cap,new_t);
1655 /* -----------------------------------------------------------------------------
1656 * Handle a thread that returned to the scheduler with ThreadYielding
1657 * -------------------------------------------------------------------------- */
1660 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1662 // Reset the context switch flag. We don't do this just before
1663 // running the thread, because that would mean we would lose ticks
1664 // during GC, which can lead to unfair scheduling (a thread hogs
1665 // the CPU because the tick always arrives during GC). This way
1666 // penalises threads that do a lot of allocation, but that seems
1667 // better than the alternative.
1670 /* put the thread back on the run queue. Then, if we're ready to
1671 * GC, check whether this is the last task to stop. If so, wake
1672 * up the GC thread. getThread will block during a GC until the
1676 if (t->what_next != prev_what_next) {
1677 debugTrace(DEBUG_sched,
1678 "--<< thread %ld (%s) stopped to switch evaluators",
1679 (long)t->id, whatNext_strs[t->what_next]);
1681 debugTrace(DEBUG_sched,
1682 "--<< thread %ld (%s) stopped, yielding",
1683 (long)t->id, whatNext_strs[t->what_next]);
1688 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1690 ASSERT(t->link == END_TSO_QUEUE);
1692 // Shortcut if we're just switching evaluators: don't bother
1693 // doing stack squeezing (which can be expensive), just run the
1695 if (t->what_next != prev_what_next) {
1700 ASSERT(!is_on_queue(t,CurrentProc));
1703 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1704 checkThreadQsSanity(rtsTrue));
1708 addToRunQueue(cap,t);
1711 /* add a ContinueThread event to actually process the thread */
1712 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1714 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1716 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1723 /* -----------------------------------------------------------------------------
1724 * Handle a thread that returned to the scheduler with ThreadBlocked
1725 * -------------------------------------------------------------------------- */
1728 scheduleHandleThreadBlocked( StgTSO *t
1729 #if !defined(GRAN) && !defined(DEBUG)
1736 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1737 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1738 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1740 // ??? needed; should emit block before
1742 DumpGranEvent(GR_DESCHEDULE, t));
1743 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1746 ASSERT(procStatus[CurrentProc]==Busy ||
1747 ((procStatus[CurrentProc]==Fetching) &&
1748 (t->block_info.closure!=(StgClosure*)NULL)));
1749 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1750 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1751 procStatus[CurrentProc]==Fetching))
1752 procStatus[CurrentProc] = Idle;
1756 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1757 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1760 if (t->block_info.closure!=(StgClosure*)NULL)
1761 print_bq(t->block_info.closure));
1763 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1766 /* whatever we schedule next, we must log that schedule */
1767 emitSchedule = rtsTrue;
1771 // We don't need to do anything. The thread is blocked, and it
1772 // has tidied up its stack and placed itself on whatever queue
1773 // it needs to be on.
1775 // ASSERT(t->why_blocked != NotBlocked);
1776 // Not true: for example,
1777 // - in THREADED_RTS, the thread may already have been woken
1778 // up by another Capability. This actually happens: try
1779 // conc023 +RTS -N2.
1780 // - the thread may have woken itself up already, because
1781 // threadPaused() might have raised a blocked throwTo
1782 // exception, see maybePerformBlockedException().
1785 if (traceClass(DEBUG_sched)) {
1786 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1787 (unsigned long)t->id, whatNext_strs[t->what_next]);
1788 printThreadBlockage(t);
1793 /* Only for dumping event to log file
1794 ToDo: do I need this in GranSim, too?
1800 /* -----------------------------------------------------------------------------
1801 * Handle a thread that returned to the scheduler with ThreadFinished
1802 * -------------------------------------------------------------------------- */
1805 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1807 /* Need to check whether this was a main thread, and if so,
1808 * return with the return value.
1810 * We also end up here if the thread kills itself with an
1811 * uncaught exception, see Exception.cmm.
1813 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1814 (unsigned long)t->id, whatNext_strs[t->what_next]);
1816 /* Inform the Hpc that a thread has finished */
1817 hs_hpc_thread_finished_event(t);
1820 endThread(t, CurrentProc); // clean-up the thread
1821 #elif defined(PARALLEL_HASKELL)
1822 /* For now all are advisory -- HWL */
1823 //if(t->priority==AdvisoryPriority) ??
1824 advisory_thread_count--; // JB: Caution with this counter, buggy!
1827 if(t->dist.priority==RevalPriority)
1831 # if defined(EDENOLD)
1832 // the thread could still have an outport... (BUG)
1833 if (t->eden.outport != -1) {
1834 // delete the outport for the tso which has finished...
1835 IF_PAR_DEBUG(eden_ports,
1836 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1837 t->eden.outport, t->id));
1840 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1841 if (t->eden.epid != -1) {
1842 IF_PAR_DEBUG(eden_ports,
1843 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1844 t->id, t->eden.epid));
1845 removeTSOfromProcess(t);
1850 if (RtsFlags.ParFlags.ParStats.Full &&
1851 !RtsFlags.ParFlags.ParStats.Suppressed)
1852 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1854 // t->par only contains statistics: left out for now...
1856 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1857 t->id,t,t->par.sparkname));
1859 #endif // PARALLEL_HASKELL
1862 // Check whether the thread that just completed was a bound
1863 // thread, and if so return with the result.
1865 // There is an assumption here that all thread completion goes
1866 // through this point; we need to make sure that if a thread
1867 // ends up in the ThreadKilled state, that it stays on the run
1868 // queue so it can be dealt with here.
1873 if (t->bound != task) {
1874 #if !defined(THREADED_RTS)
1875 // Must be a bound thread that is not the topmost one. Leave
1876 // it on the run queue until the stack has unwound to the
1877 // point where we can deal with this. Leaving it on the run
1878 // queue also ensures that the garbage collector knows about
1879 // this thread and its return value (it gets dropped from the
1880 // all_threads list so there's no other way to find it).
1881 appendToRunQueue(cap,t);
1884 // this cannot happen in the threaded RTS, because a
1885 // bound thread can only be run by the appropriate Task.
1886 barf("finished bound thread that isn't mine");
1890 ASSERT(task->tso == t);
1892 if (t->what_next == ThreadComplete) {
1894 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1895 *(task->ret) = (StgClosure *)task->tso->sp[1];
1897 task->stat = Success;
1900 *(task->ret) = NULL;
1902 if (sched_state >= SCHED_INTERRUPTING) {
1903 task->stat = Interrupted;
1905 task->stat = Killed;
1909 removeThreadLabel((StgWord)task->tso->id);
1911 return rtsTrue; // tells schedule() to return
1917 /* -----------------------------------------------------------------------------
1918 * Perform a heap census
1919 * -------------------------------------------------------------------------- */
1922 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1924 // When we have +RTS -i0 and we're heap profiling, do a census at
1925 // every GC. This lets us get repeatable runs for debugging.
1926 if (performHeapProfile ||
1927 (RtsFlags.ProfFlags.profileInterval==0 &&
1928 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1935 /* -----------------------------------------------------------------------------
1936 * Perform a garbage collection if necessary
1937 * -------------------------------------------------------------------------- */
1940 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1943 rtsBool heap_census;
1945 static volatile StgWord waiting_for_gc;
1946 rtsBool was_waiting;
1951 // In order to GC, there must be no threads running Haskell code.
1952 // Therefore, the GC thread needs to hold *all* the capabilities,
1953 // and release them after the GC has completed.
1955 // This seems to be the simplest way: previous attempts involved
1956 // making all the threads with capabilities give up their
1957 // capabilities and sleep except for the *last* one, which
1958 // actually did the GC. But it's quite hard to arrange for all
1959 // the other tasks to sleep and stay asleep.
1962 was_waiting = cas(&waiting_for_gc, 0, 1);
1965 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1966 if (cap) yieldCapability(&cap,task);
1967 } while (waiting_for_gc);
1968 return cap; // NOTE: task->cap might have changed here
1971 for (i=0; i < n_capabilities; i++) {
1972 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1973 if (cap != &capabilities[i]) {
1974 Capability *pcap = &capabilities[i];
1975 // we better hope this task doesn't get migrated to
1976 // another Capability while we're waiting for this one.
1977 // It won't, because load balancing happens while we have
1978 // all the Capabilities, but even so it's a slightly
1979 // unsavoury invariant.
1982 waitForReturnCapability(&pcap, task);
1983 if (pcap != &capabilities[i]) {
1984 barf("scheduleDoGC: got the wrong capability");
1989 waiting_for_gc = rtsFalse;
1992 /* Kick any transactions which are invalid back to their
1993 * atomically frames. When next scheduled they will try to
1994 * commit, this commit will fail and they will retry.
1999 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2000 if (t->what_next == ThreadRelocated) {
2003 next = t->global_link;
2005 // This is a good place to check for blocked
2006 // exceptions. It might be the case that a thread is
2007 // blocked on delivering an exception to a thread that
2008 // is also blocked - we try to ensure that this
2009 // doesn't happen in throwTo(), but it's too hard (or
2010 // impossible) to close all the race holes, so we
2011 // accept that some might get through and deal with
2012 // them here. A GC will always happen at some point,
2013 // even if the system is otherwise deadlocked.
2014 maybePerformBlockedException (&capabilities[0], t);
2016 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2017 if (!stmValidateNestOfTransactions (t -> trec)) {
2018 debugTrace(DEBUG_sched | DEBUG_stm,
2019 "trec %p found wasting its time", t);
2021 // strip the stack back to the
2022 // ATOMICALLY_FRAME, aborting the (nested)
2023 // transaction, and saving the stack of any
2024 // partially-evaluated thunks on the heap.
2025 throwToSingleThreaded_(&capabilities[0], t,
2026 NULL, rtsTrue, NULL);
2029 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2037 // so this happens periodically:
2038 if (cap) scheduleCheckBlackHoles(cap);
2040 IF_DEBUG(scheduler, printAllThreads());
2043 * We now have all the capabilities; if we're in an interrupting
2044 * state, then we should take the opportunity to delete all the
2045 * threads in the system.
2047 if (sched_state >= SCHED_INTERRUPTING) {
2048 deleteAllThreads(&capabilities[0]);
2049 sched_state = SCHED_SHUTTING_DOWN;
2052 heap_census = scheduleNeedHeapProfile(rtsTrue);
2054 /* everybody back, start the GC.
2055 * Could do it in this thread, or signal a condition var
2056 * to do it in another thread. Either way, we need to
2057 * broadcast on gc_pending_cond afterward.
2059 #if defined(THREADED_RTS)
2060 debugTrace(DEBUG_sched, "doing GC");
2062 GarbageCollect(force_major || heap_census);
2065 debugTrace(DEBUG_sched, "performing heap census");
2067 performHeapProfile = rtsFalse;
2070 #if defined(THREADED_RTS)
2071 // release our stash of capabilities.
2072 for (i = 0; i < n_capabilities; i++) {
2073 if (cap != &capabilities[i]) {
2074 task->cap = &capabilities[i];
2075 releaseCapability(&capabilities[i]);
2086 /* add a ContinueThread event to continue execution of current thread */
2087 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2089 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2091 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2099 /* ---------------------------------------------------------------------------
2100 * Singleton fork(). Do not copy any running threads.
2101 * ------------------------------------------------------------------------- */
2104 forkProcess(HsStablePtr *entry
2105 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2110 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2116 #if defined(THREADED_RTS)
2117 if (RtsFlags.ParFlags.nNodes > 1) {
2118 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2119 stg_exit(EXIT_FAILURE);
2123 debugTrace(DEBUG_sched, "forking!");
2125 // ToDo: for SMP, we should probably acquire *all* the capabilities
2130 if (pid) { // parent
2132 // just return the pid
2138 // Now, all OS threads except the thread that forked are
2139 // stopped. We need to stop all Haskell threads, including
2140 // those involved in foreign calls. Also we need to delete
2141 // all Tasks, because they correspond to OS threads that are
2144 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2145 if (t->what_next == ThreadRelocated) {
2148 next = t->global_link;
2149 // don't allow threads to catch the ThreadKilled
2150 // exception, but we do want to raiseAsync() because these
2151 // threads may be evaluating thunks that we need later.
2152 deleteThread_(cap,t);
2156 // Empty the run queue. It seems tempting to let all the
2157 // killed threads stay on the run queue as zombies to be
2158 // cleaned up later, but some of them correspond to bound
2159 // threads for which the corresponding Task does not exist.
2160 cap->run_queue_hd = END_TSO_QUEUE;
2161 cap->run_queue_tl = END_TSO_QUEUE;
2163 // Any suspended C-calling Tasks are no more, their OS threads
2165 cap->suspended_ccalling_tasks = NULL;
2167 // Empty the all_threads list. Otherwise, the garbage
2168 // collector may attempt to resurrect some of these threads.
2169 all_threads = END_TSO_QUEUE;
2171 // Wipe the task list, except the current Task.
2172 ACQUIRE_LOCK(&sched_mutex);
2173 for (task = all_tasks; task != NULL; task=task->all_link) {
2174 if (task != cap->running_task) {
2178 RELEASE_LOCK(&sched_mutex);
2180 #if defined(THREADED_RTS)
2181 // Wipe our spare workers list, they no longer exist. New
2182 // workers will be created if necessary.
2183 cap->spare_workers = NULL;
2184 cap->returning_tasks_hd = NULL;
2185 cap->returning_tasks_tl = NULL;
2188 // On Unix, all timers are reset in the child, so we need to start
2192 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2193 rts_checkSchedStatus("forkProcess",cap);
2196 hs_exit(); // clean up and exit
2197 stg_exit(EXIT_SUCCESS);
2199 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2200 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2205 /* ---------------------------------------------------------------------------
2206 * Delete all the threads in the system
2207 * ------------------------------------------------------------------------- */
2210 deleteAllThreads ( Capability *cap )
2212 // NOTE: only safe to call if we own all capabilities.
2215 debugTrace(DEBUG_sched,"deleting all threads");
2216 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2217 if (t->what_next == ThreadRelocated) {
2220 next = t->global_link;
2221 deleteThread(cap,t);
2225 // The run queue now contains a bunch of ThreadKilled threads. We
2226 // must not throw these away: the main thread(s) will be in there
2227 // somewhere, and the main scheduler loop has to deal with it.
2228 // Also, the run queue is the only thing keeping these threads from
2229 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2231 #if !defined(THREADED_RTS)
2232 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2233 ASSERT(sleeping_queue == END_TSO_QUEUE);
2237 /* -----------------------------------------------------------------------------
2238 Managing the suspended_ccalling_tasks list.
2239 Locks required: sched_mutex
2240 -------------------------------------------------------------------------- */
2243 suspendTask (Capability *cap, Task *task)
2245 ASSERT(task->next == NULL && task->prev == NULL);
2246 task->next = cap->suspended_ccalling_tasks;
2248 if (cap->suspended_ccalling_tasks) {
2249 cap->suspended_ccalling_tasks->prev = task;
2251 cap->suspended_ccalling_tasks = task;
2255 recoverSuspendedTask (Capability *cap, Task *task)
2258 task->prev->next = task->next;
2260 ASSERT(cap->suspended_ccalling_tasks == task);
2261 cap->suspended_ccalling_tasks = task->next;
2264 task->next->prev = task->prev;
2266 task->next = task->prev = NULL;
2269 /* ---------------------------------------------------------------------------
2270 * Suspending & resuming Haskell threads.
2272 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2273 * its capability before calling the C function. This allows another
2274 * task to pick up the capability and carry on running Haskell
2275 * threads. It also means that if the C call blocks, it won't lock
2278 * The Haskell thread making the C call is put to sleep for the
2279 * duration of the call, on the susepended_ccalling_threads queue. We
2280 * give out a token to the task, which it can use to resume the thread
2281 * on return from the C function.
2282 * ------------------------------------------------------------------------- */
2285 suspendThread (StgRegTable *reg)
2292 StgWord32 saved_winerror;
2295 saved_errno = errno;
2297 saved_winerror = GetLastError();
2300 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2302 cap = regTableToCapability(reg);
2304 task = cap->running_task;
2305 tso = cap->r.rCurrentTSO;
2307 debugTrace(DEBUG_sched,
2308 "thread %lu did a safe foreign call",
2309 (unsigned long)cap->r.rCurrentTSO->id);
2311 // XXX this might not be necessary --SDM
2312 tso->what_next = ThreadRunGHC;
2314 threadPaused(cap,tso);
2316 if ((tso->flags & TSO_BLOCKEX) == 0) {
2317 tso->why_blocked = BlockedOnCCall;
2318 tso->flags |= TSO_BLOCKEX;
2319 tso->flags &= ~TSO_INTERRUPTIBLE;
2321 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2324 // Hand back capability
2325 task->suspended_tso = tso;
2327 ACQUIRE_LOCK(&cap->lock);
2329 suspendTask(cap,task);
2330 cap->in_haskell = rtsFalse;
2331 releaseCapability_(cap);
2333 RELEASE_LOCK(&cap->lock);
2335 #if defined(THREADED_RTS)
2336 /* Preparing to leave the RTS, so ensure there's a native thread/task
2337 waiting to take over.
2339 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2342 errno = saved_errno;
2344 SetLastError(saved_winerror);
2350 resumeThread (void *task_)
2357 StgWord32 saved_winerror;
2360 saved_errno = errno;
2362 saved_winerror = GetLastError();
2366 // Wait for permission to re-enter the RTS with the result.
2367 waitForReturnCapability(&cap,task);
2368 // we might be on a different capability now... but if so, our
2369 // entry on the suspended_ccalling_tasks list will also have been
2372 // Remove the thread from the suspended list
2373 recoverSuspendedTask(cap,task);
2375 tso = task->suspended_tso;
2376 task->suspended_tso = NULL;
2377 tso->link = END_TSO_QUEUE;
2378 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2380 if (tso->why_blocked == BlockedOnCCall) {
2381 awakenBlockedExceptionQueue(cap,tso);
2382 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2385 /* Reset blocking status */
2386 tso->why_blocked = NotBlocked;
2388 cap->r.rCurrentTSO = tso;
2389 cap->in_haskell = rtsTrue;
2390 errno = saved_errno;
2392 SetLastError(saved_winerror);
2395 /* We might have GC'd, mark the TSO dirty again */
2398 IF_DEBUG(sanity, checkTSO(tso));
2403 /* ---------------------------------------------------------------------------
2406 * scheduleThread puts a thread on the end of the runnable queue.
2407 * This will usually be done immediately after a thread is created.
2408 * The caller of scheduleThread must create the thread using e.g.
2409 * createThread and push an appropriate closure
2410 * on this thread's stack before the scheduler is invoked.
2411 * ------------------------------------------------------------------------ */
2414 scheduleThread(Capability *cap, StgTSO *tso)
2416 // The thread goes at the *end* of the run-queue, to avoid possible
2417 // starvation of any threads already on the queue.
2418 appendToRunQueue(cap,tso);
2422 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2424 #if defined(THREADED_RTS)
2425 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2426 // move this thread from now on.
2427 cpu %= RtsFlags.ParFlags.nNodes;
2428 if (cpu == cap->no) {
2429 appendToRunQueue(cap,tso);
2431 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2434 appendToRunQueue(cap,tso);
2439 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2443 // We already created/initialised the Task
2444 task = cap->running_task;
2446 // This TSO is now a bound thread; make the Task and TSO
2447 // point to each other.
2453 task->stat = NoStatus;
2455 appendToRunQueue(cap,tso);
2457 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2460 /* GranSim specific init */
2461 CurrentTSO = m->tso; // the TSO to run
2462 procStatus[MainProc] = Busy; // status of main PE
2463 CurrentProc = MainProc; // PE to run it on
2466 cap = schedule(cap,task);
2468 ASSERT(task->stat != NoStatus);
2469 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2471 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2475 /* ----------------------------------------------------------------------------
2477 * ------------------------------------------------------------------------- */
2479 #if defined(THREADED_RTS)
2481 workerStart(Task *task)
2485 // See startWorkerTask().
2486 ACQUIRE_LOCK(&task->lock);
2488 RELEASE_LOCK(&task->lock);
2490 // set the thread-local pointer to the Task:
2493 // schedule() runs without a lock.
2494 cap = schedule(cap,task);
2496 // On exit from schedule(), we have a Capability.
2497 releaseCapability(cap);
2498 workerTaskStop(task);
2502 /* ---------------------------------------------------------------------------
2505 * Initialise the scheduler. This resets all the queues - if the
2506 * queues contained any threads, they'll be garbage collected at the
2509 * ------------------------------------------------------------------------ */
2516 for (i=0; i<=MAX_PROC; i++) {
2517 run_queue_hds[i] = END_TSO_QUEUE;
2518 run_queue_tls[i] = END_TSO_QUEUE;
2519 blocked_queue_hds[i] = END_TSO_QUEUE;
2520 blocked_queue_tls[i] = END_TSO_QUEUE;
2521 ccalling_threadss[i] = END_TSO_QUEUE;
2522 blackhole_queue[i] = END_TSO_QUEUE;
2523 sleeping_queue = END_TSO_QUEUE;
2525 #elif !defined(THREADED_RTS)
2526 blocked_queue_hd = END_TSO_QUEUE;
2527 blocked_queue_tl = END_TSO_QUEUE;
2528 sleeping_queue = END_TSO_QUEUE;
2531 blackhole_queue = END_TSO_QUEUE;
2532 all_threads = END_TSO_QUEUE;
2535 sched_state = SCHED_RUNNING;
2537 #if defined(THREADED_RTS)
2538 /* Initialise the mutex and condition variables used by
2540 initMutex(&sched_mutex);
2543 ACQUIRE_LOCK(&sched_mutex);
2545 /* A capability holds the state a native thread needs in
2546 * order to execute STG code. At least one capability is
2547 * floating around (only THREADED_RTS builds have more than one).
2553 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2557 #if defined(THREADED_RTS)
2559 * Eagerly start one worker to run each Capability, except for
2560 * Capability 0. The idea is that we're probably going to start a
2561 * bound thread on Capability 0 pretty soon, so we don't want a
2562 * worker task hogging it.
2567 for (i = 1; i < n_capabilities; i++) {
2568 cap = &capabilities[i];
2569 ACQUIRE_LOCK(&cap->lock);
2570 startWorkerTask(cap, workerStart);
2571 RELEASE_LOCK(&cap->lock);
2576 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2578 RELEASE_LOCK(&sched_mutex);
2582 exitScheduler( void )
2586 #if defined(THREADED_RTS)
2587 ACQUIRE_LOCK(&sched_mutex);
2588 task = newBoundTask();
2589 RELEASE_LOCK(&sched_mutex);
2592 // If we haven't killed all the threads yet, do it now.
2593 if (sched_state < SCHED_SHUTTING_DOWN) {
2594 sched_state = SCHED_INTERRUPTING;
2595 scheduleDoGC(NULL,task,rtsFalse);
2597 sched_state = SCHED_SHUTTING_DOWN;
2599 #if defined(THREADED_RTS)
2603 for (i = 0; i < n_capabilities; i++) {
2604 shutdownCapability(&capabilities[i], task);
2606 boundTaskExiting(task);
2610 freeCapability(&MainCapability);
2615 freeScheduler( void )
2618 if (n_capabilities != 1) {
2619 stgFree(capabilities);
2621 #if defined(THREADED_RTS)
2622 closeMutex(&sched_mutex);
2626 /* ---------------------------------------------------------------------------
2627 Where are the roots that we know about?
2629 - all the threads on the runnable queue
2630 - all the threads on the blocked queue
2631 - all the threads on the sleeping queue
2632 - all the thread currently executing a _ccall_GC
2633 - all the "main threads"
2635 ------------------------------------------------------------------------ */
2637 /* This has to be protected either by the scheduler monitor, or by the
2638 garbage collection monitor (probably the latter).
2643 GetRoots( evac_fn evac )
2650 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2651 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2652 evac((StgClosure **)&run_queue_hds[i]);
2653 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2654 evac((StgClosure **)&run_queue_tls[i]);
2656 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2657 evac((StgClosure **)&blocked_queue_hds[i]);
2658 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2659 evac((StgClosure **)&blocked_queue_tls[i]);
2660 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2661 evac((StgClosure **)&ccalling_threads[i]);
2668 for (i = 0; i < n_capabilities; i++) {
2669 cap = &capabilities[i];
2670 evac((StgClosure **)(void *)&cap->run_queue_hd);
2671 evac((StgClosure **)(void *)&cap->run_queue_tl);
2672 #if defined(THREADED_RTS)
2673 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2674 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2676 for (task = cap->suspended_ccalling_tasks; task != NULL;
2678 debugTrace(DEBUG_sched,
2679 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2680 evac((StgClosure **)(void *)&task->suspended_tso);
2686 #if !defined(THREADED_RTS)
2687 evac((StgClosure **)(void *)&blocked_queue_hd);
2688 evac((StgClosure **)(void *)&blocked_queue_tl);
2689 evac((StgClosure **)(void *)&sleeping_queue);
2693 // evac((StgClosure **)&blackhole_queue);
2695 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2696 markSparkQueue(evac);
2699 #if defined(RTS_USER_SIGNALS)
2700 // mark the signal handlers (signals should be already blocked)
2701 if (RtsFlags.MiscFlags.install_signal_handlers) {
2702 markSignalHandlers(evac);
2707 /* -----------------------------------------------------------------------------
2710 This is the interface to the garbage collector from Haskell land.
2711 We provide this so that external C code can allocate and garbage
2712 collect when called from Haskell via _ccall_GC.
2713 -------------------------------------------------------------------------- */
2716 performGC_(rtsBool force_major)
2719 // We must grab a new Task here, because the existing Task may be
2720 // associated with a particular Capability, and chained onto the
2721 // suspended_ccalling_tasks queue.
2722 ACQUIRE_LOCK(&sched_mutex);
2723 task = newBoundTask();
2724 RELEASE_LOCK(&sched_mutex);
2725 scheduleDoGC(NULL,task,force_major);
2726 boundTaskExiting(task);
2732 performGC_(rtsFalse);
2736 performMajorGC(void)
2738 performGC_(rtsTrue);
2741 /* -----------------------------------------------------------------------------
2744 If the thread has reached its maximum stack size, then raise the
2745 StackOverflow exception in the offending thread. Otherwise
2746 relocate the TSO into a larger chunk of memory and adjust its stack
2748 -------------------------------------------------------------------------- */
2751 threadStackOverflow(Capability *cap, StgTSO *tso)
2753 nat new_stack_size, stack_words;
2758 IF_DEBUG(sanity,checkTSO(tso));
2760 // don't allow throwTo() to modify the blocked_exceptions queue
2761 // while we are moving the TSO:
2762 lockClosure((StgClosure *)tso);
2764 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2765 // NB. never raise a StackOverflow exception if the thread is
2766 // inside Control.Exceptino.block. It is impractical to protect
2767 // against stack overflow exceptions, since virtually anything
2768 // can raise one (even 'catch'), so this is the only sensible
2769 // thing to do here. See bug #767.
2771 debugTrace(DEBUG_gc,
2772 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2773 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2775 /* If we're debugging, just print out the top of the stack */
2776 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2779 // Send this thread the StackOverflow exception
2781 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2785 /* Try to double the current stack size. If that takes us over the
2786 * maximum stack size for this thread, then use the maximum instead.
2787 * Finally round up so the TSO ends up as a whole number of blocks.
2789 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2790 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2791 TSO_STRUCT_SIZE)/sizeof(W_);
2792 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2793 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2795 debugTrace(DEBUG_sched,
2796 "increasing stack size from %ld words to %d.",
2797 (long)tso->stack_size, new_stack_size);
2799 dest = (StgTSO *)allocate(new_tso_size);
2800 TICK_ALLOC_TSO(new_stack_size,0);
2802 /* copy the TSO block and the old stack into the new area */
2803 memcpy(dest,tso,TSO_STRUCT_SIZE);
2804 stack_words = tso->stack + tso->stack_size - tso->sp;
2805 new_sp = (P_)dest + new_tso_size - stack_words;
2806 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2808 /* relocate the stack pointers... */
2810 dest->stack_size = new_stack_size;
2812 /* Mark the old TSO as relocated. We have to check for relocated
2813 * TSOs in the garbage collector and any primops that deal with TSOs.
2815 * It's important to set the sp value to just beyond the end
2816 * of the stack, so we don't attempt to scavenge any part of the
2819 tso->what_next = ThreadRelocated;
2821 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2822 tso->why_blocked = NotBlocked;
2824 IF_PAR_DEBUG(verbose,
2825 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2826 tso->id, tso, tso->stack_size);
2827 /* If we're debugging, just print out the top of the stack */
2828 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2834 IF_DEBUG(sanity,checkTSO(dest));
2836 IF_DEBUG(scheduler,printTSO(dest));
2842 /* ---------------------------------------------------------------------------
2844 - usually called inside a signal handler so it mustn't do anything fancy.
2845 ------------------------------------------------------------------------ */
2848 interruptStgRts(void)
2850 sched_state = SCHED_INTERRUPTING;
2855 /* -----------------------------------------------------------------------------
2858 This function causes at least one OS thread to wake up and run the
2859 scheduler loop. It is invoked when the RTS might be deadlocked, or
2860 an external event has arrived that may need servicing (eg. a
2861 keyboard interrupt).
2863 In the single-threaded RTS we don't do anything here; we only have
2864 one thread anyway, and the event that caused us to want to wake up
2865 will have interrupted any blocking system call in progress anyway.
2866 -------------------------------------------------------------------------- */
2871 #if defined(THREADED_RTS)
2872 // This forces the IO Manager thread to wakeup, which will
2873 // in turn ensure that some OS thread wakes up and runs the
2874 // scheduler loop, which will cause a GC and deadlock check.
2879 /* -----------------------------------------------------------------------------
2882 * Check the blackhole_queue for threads that can be woken up. We do
2883 * this periodically: before every GC, and whenever the run queue is
2886 * An elegant solution might be to just wake up all the blocked
2887 * threads with awakenBlockedQueue occasionally: they'll go back to
2888 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2889 * doesn't give us a way to tell whether we've actually managed to
2890 * wake up any threads, so we would be busy-waiting.
2892 * -------------------------------------------------------------------------- */
2895 checkBlackHoles (Capability *cap)
2898 rtsBool any_woke_up = rtsFalse;
2901 // blackhole_queue is global:
2902 ASSERT_LOCK_HELD(&sched_mutex);
2904 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2906 // ASSUMES: sched_mutex
2907 prev = &blackhole_queue;
2908 t = blackhole_queue;
2909 while (t != END_TSO_QUEUE) {
2910 ASSERT(t->why_blocked == BlockedOnBlackHole);
2911 type = get_itbl(t->block_info.closure)->type;
2912 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2913 IF_DEBUG(sanity,checkTSO(t));
2914 t = unblockOne(cap, t);
2915 // urk, the threads migrate to the current capability
2916 // here, but we'd like to keep them on the original one.
2918 any_woke_up = rtsTrue;
2928 /* -----------------------------------------------------------------------------
2931 This is used for interruption (^C) and forking, and corresponds to
2932 raising an exception but without letting the thread catch the
2934 -------------------------------------------------------------------------- */
2937 deleteThread (Capability *cap, StgTSO *tso)
2939 // NOTE: must only be called on a TSO that we have exclusive
2940 // access to, because we will call throwToSingleThreaded() below.
2941 // The TSO must be on the run queue of the Capability we own, or
2942 // we must own all Capabilities.
2944 if (tso->why_blocked != BlockedOnCCall &&
2945 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2946 throwToSingleThreaded(cap,tso,NULL);
2950 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2952 deleteThread_(Capability *cap, StgTSO *tso)
2953 { // for forkProcess only:
2954 // like deleteThread(), but we delete threads in foreign calls, too.
2956 if (tso->why_blocked == BlockedOnCCall ||
2957 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2958 unblockOne(cap,tso);
2959 tso->what_next = ThreadKilled;
2961 deleteThread(cap,tso);
2966 /* -----------------------------------------------------------------------------
2967 raiseExceptionHelper
2969 This function is called by the raise# primitve, just so that we can
2970 move some of the tricky bits of raising an exception from C-- into
2971 C. Who knows, it might be a useful re-useable thing here too.
2972 -------------------------------------------------------------------------- */
2975 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2977 Capability *cap = regTableToCapability(reg);
2978 StgThunk *raise_closure = NULL;
2980 StgRetInfoTable *info;
2982 // This closure represents the expression 'raise# E' where E
2983 // is the exception raise. It is used to overwrite all the
2984 // thunks which are currently under evaluataion.
2987 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2988 // LDV profiling: stg_raise_info has THUNK as its closure
2989 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2990 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2991 // 1 does not cause any problem unless profiling is performed.
2992 // However, when LDV profiling goes on, we need to linearly scan
2993 // small object pool, where raise_closure is stored, so we should
2994 // use MIN_UPD_SIZE.
2996 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2997 // sizeofW(StgClosure)+1);
3001 // Walk up the stack, looking for the catch frame. On the way,
3002 // we update any closures pointed to from update frames with the
3003 // raise closure that we just built.
3007 info = get_ret_itbl((StgClosure *)p);
3008 next = p + stack_frame_sizeW((StgClosure *)p);
3009 switch (info->i.type) {
3012 // Only create raise_closure if we need to.
3013 if (raise_closure == NULL) {
3015 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3016 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3017 raise_closure->payload[0] = exception;
3019 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3023 case ATOMICALLY_FRAME:
3024 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3026 return ATOMICALLY_FRAME;
3032 case CATCH_STM_FRAME:
3033 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3035 return CATCH_STM_FRAME;
3041 case CATCH_RETRY_FRAME:
3050 /* -----------------------------------------------------------------------------
3051 findRetryFrameHelper
3053 This function is called by the retry# primitive. It traverses the stack
3054 leaving tso->sp referring to the frame which should handle the retry.
3056 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3057 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3059 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3060 create) because retries are not considered to be exceptions, despite the
3061 similar implementation.
3063 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3064 not be created within memory transactions.
3065 -------------------------------------------------------------------------- */
3068 findRetryFrameHelper (StgTSO *tso)
3071 StgRetInfoTable *info;
3075 info = get_ret_itbl((StgClosure *)p);
3076 next = p + stack_frame_sizeW((StgClosure *)p);
3077 switch (info->i.type) {
3079 case ATOMICALLY_FRAME:
3080 debugTrace(DEBUG_stm,
3081 "found ATOMICALLY_FRAME at %p during retry", p);
3083 return ATOMICALLY_FRAME;
3085 case CATCH_RETRY_FRAME:
3086 debugTrace(DEBUG_stm,
3087 "found CATCH_RETRY_FRAME at %p during retrry", p);
3089 return CATCH_RETRY_FRAME;
3091 case CATCH_STM_FRAME: {
3092 debugTrace(DEBUG_stm,
3093 "found CATCH_STM_FRAME at %p during retry", p);
3094 StgTRecHeader *trec = tso -> trec;
3095 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3096 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3097 stmAbortTransaction(tso -> cap, trec);
3098 stmFreeAbortedTRec(tso -> cap, trec);
3099 tso -> trec = outer;
3106 ASSERT(info->i.type != CATCH_FRAME);
3107 ASSERT(info->i.type != STOP_FRAME);
3114 /* -----------------------------------------------------------------------------
3115 resurrectThreads is called after garbage collection on the list of
3116 threads found to be garbage. Each of these threads will be woken
3117 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3118 on an MVar, or NonTermination if the thread was blocked on a Black
3121 Locks: assumes we hold *all* the capabilities.
3122 -------------------------------------------------------------------------- */
3125 resurrectThreads (StgTSO *threads)
3130 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3131 next = tso->global_link;
3132 tso->global_link = all_threads;
3134 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3136 // Wake up the thread on the Capability it was last on
3139 switch (tso->why_blocked) {
3141 case BlockedOnException:
3142 /* Called by GC - sched_mutex lock is currently held. */
3143 throwToSingleThreaded(cap, tso,
3144 (StgClosure *)BlockedOnDeadMVar_closure);
3146 case BlockedOnBlackHole:
3147 throwToSingleThreaded(cap, tso,
3148 (StgClosure *)NonTermination_closure);
3151 throwToSingleThreaded(cap, tso,
3152 (StgClosure *)BlockedIndefinitely_closure);
3155 /* This might happen if the thread was blocked on a black hole
3156 * belonging to a thread that we've just woken up (raiseAsync
3157 * can wake up threads, remember...).
3161 barf("resurrectThreads: thread blocked in a strange way");