1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "OSThreads.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
22 #include "RtsSignals.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
45 #include "Capability.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
52 #include "RaiseAsync.h"
54 #include "ThrIOManager.h"
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
71 // Turn off inlining when debugging - it obfuscates things
74 # define STATIC_INLINE static
77 /* -----------------------------------------------------------------------------
79 * -------------------------------------------------------------------------- */
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
87 In GranSim we have a runnable and a blocked queue for each processor.
88 In order to minimise code changes new arrays run_queue_hds/tls
89 are created. run_queue_hd is then a short cut (macro) for
90 run_queue_hds[CurrentProc] (see GranSim.h).
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97 the std RTS (i.e. we are cheating). However, we don't use this list in
98 the GranSim specific code at the moment (so we are only potentially
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
110 /* Threads blocked on blackholes.
111 * LOCK: sched_mutex+capability, or all capabilities
113 StgTSO *blackhole_queue = NULL;
116 /* The blackhole_queue should be checked for threads to wake up. See
117 * Schedule.h for more thorough comment.
118 * LOCK: none (doesn't matter if we miss an update)
120 rtsBool blackholes_need_checking = rtsFalse;
122 /* Linked list of all threads.
123 * Used for detecting garbage collected threads.
124 * LOCK: sched_mutex+capability, or all capabilities
126 StgTSO *all_threads = NULL;
128 /* flag set by signal handler to precipitate a context switch
129 * LOCK: none (just an advisory flag)
131 int context_switch = 0;
133 /* flag that tracks whether we have done any execution in this time slice.
134 * LOCK: currently none, perhaps we should lock (but needs to be
135 * updated in the fast path of the scheduler).
137 nat recent_activity = ACTIVITY_YES;
139 /* if this flag is set as well, give up execution
140 * LOCK: none (changes once, from false->true)
142 rtsBool sched_state = SCHED_RUNNING;
148 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
149 * exists - earlier gccs apparently didn't.
155 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156 * in an MT setting, needed to signal that a worker thread shouldn't hang around
157 * in the scheduler when it is out of work.
159 rtsBool shutting_down_scheduler = rtsFalse;
162 * This mutex protects most of the global scheduler data in
163 * the THREADED_RTS runtime.
165 #if defined(THREADED_RTS)
169 #if defined(PARALLEL_HASKELL)
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
179 /* -----------------------------------------------------------------------------
180 * static function prototypes
181 * -------------------------------------------------------------------------- */
183 static Capability *schedule (Capability *initialCapability, Task *task);
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
215 nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221 rtsBool force_major);
223 static rtsBool checkBlackHoles(Capability *cap);
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);
240 static char *whatNext_strs[] = {
250 /* -----------------------------------------------------------------------------
251 * Putting a thread on the run queue: different scheduling policies
252 * -------------------------------------------------------------------------- */
255 addToRunQueue( Capability *cap, StgTSO *t )
257 #if defined(PARALLEL_HASKELL)
258 if (RtsFlags.ParFlags.doFairScheduling) {
259 // this does round-robin scheduling; good for concurrency
260 appendToRunQueue(cap,t);
262 // this does unfair scheduling; good for parallelism
263 pushOnRunQueue(cap,t);
266 // this does round-robin scheduling; good for concurrency
267 appendToRunQueue(cap,t);
271 /* ---------------------------------------------------------------------------
272 Main scheduling loop.
274 We use round-robin scheduling, each thread returning to the
275 scheduler loop when one of these conditions is detected:
278 * timer expires (thread yields)
284 In a GranSim setup this loop iterates over the global event queue.
285 This revolves around the global event queue, which determines what
286 to do next. Therefore, it's more complicated than either the
287 concurrent or the parallel (GUM) setup.
290 GUM iterates over incoming messages.
291 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292 and sends out a fish whenever it has nothing to do; in-between
293 doing the actual reductions (shared code below) it processes the
294 incoming messages and deals with delayed operations
295 (see PendingFetches).
296 This is not the ugliest code you could imagine, but it's bloody close.
298 ------------------------------------------------------------------------ */
301 schedule (Capability *initialCapability, Task *task)
305 StgThreadReturnCode ret;
308 #elif defined(PARALLEL_HASKELL)
311 rtsBool receivedFinish = rtsFalse;
313 nat tp_size, sp_size; // stats only
318 #if defined(THREADED_RTS)
319 rtsBool first = rtsTrue;
322 cap = initialCapability;
324 // Pre-condition: this task owns initialCapability.
325 // The sched_mutex is *NOT* held
326 // NB. on return, we still hold a capability.
328 debugTrace (DEBUG_sched,
329 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330 task, initialCapability);
334 // -----------------------------------------------------------
335 // Scheduler loop starts here:
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION (!receivedFinish)
340 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
342 #define TERMINATION_CONDITION rtsTrue
345 while (TERMINATION_CONDITION) {
348 /* Choose the processor with the next event */
349 CurrentProc = event->proc;
350 CurrentTSO = event->tso;
353 #if defined(THREADED_RTS)
355 // don't yield the first time, we want a chance to run this
356 // thread for a bit, even if there are others banging at the
359 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
361 // Yield the capability to higher-priority tasks if necessary.
362 yieldCapability(&cap, task);
366 #if defined(THREADED_RTS)
367 schedulePushWork(cap,task);
370 // Check whether we have re-entered the RTS from Haskell without
371 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
373 if (cap->in_haskell) {
374 errorBelch("schedule: re-entered unsafely.\n"
375 " Perhaps a 'foreign import unsafe' should be 'safe'?");
376 stg_exit(EXIT_FAILURE);
379 // The interruption / shutdown sequence.
381 // In order to cleanly shut down the runtime, we want to:
382 // * make sure that all main threads return to their callers
383 // with the state 'Interrupted'.
384 // * clean up all OS threads assocated with the runtime
385 // * free all memory etc.
387 // So the sequence for ^C goes like this:
389 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
390 // arranges for some Capability to wake up
392 // * all threads in the system are halted, and the zombies are
393 // placed on the run queue for cleaning up. We acquire all
394 // the capabilities in order to delete the threads, this is
395 // done by scheduleDoGC() for convenience (because GC already
396 // needs to acquire all the capabilities). We can't kill
397 // threads involved in foreign calls.
399 // * somebody calls shutdownHaskell(), which calls exitScheduler()
401 // * sched_state := SCHED_SHUTTING_DOWN
403 // * all workers exit when the run queue on their capability
404 // drains. All main threads will also exit when their TSO
405 // reaches the head of the run queue and they can return.
407 // * eventually all Capabilities will shut down, and the RTS can
410 // * We might be left with threads blocked in foreign calls,
411 // we should really attempt to kill these somehow (TODO);
413 switch (sched_state) {
416 case SCHED_INTERRUPTING:
417 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419 discardSparksCap(cap);
421 /* scheduleDoGC() deletes all the threads */
422 cap = scheduleDoGC(cap,task,rtsFalse);
424 case SCHED_SHUTTING_DOWN:
425 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426 // If we are a worker, just exit. If we're a bound thread
427 // then we will exit below when we've removed our TSO from
429 if (task->tso == NULL && emptyRunQueue(cap)) {
434 barf("sched_state: %d", sched_state);
437 #if defined(THREADED_RTS)
438 // If the run queue is empty, take a spark and turn it into a thread.
440 if (emptyRunQueue(cap)) {
442 spark = findSpark(cap);
444 debugTrace(DEBUG_sched,
445 "turning spark of closure %p into a thread",
446 (StgClosure *)spark);
447 createSparkThread(cap,spark);
451 #endif // THREADED_RTS
453 scheduleStartSignalHandlers(cap);
455 // Only check the black holes here if we've nothing else to do.
456 // During normal execution, the black hole list only gets checked
457 // at GC time, to avoid repeatedly traversing this possibly long
458 // list each time around the scheduler.
459 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
461 scheduleCheckWakeupThreads(cap);
463 scheduleCheckBlockedThreads(cap);
465 scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467 cap = task->cap; // reload cap, it might have changed
470 // Normally, the only way we can get here with no threads to
471 // run is if a keyboard interrupt received during
472 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473 // Additionally, it is not fatal for the
474 // threaded RTS to reach here with no threads to run.
476 // win32: might be here due to awaitEvent() being abandoned
477 // as a result of a console event having been delivered.
478 if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480 ASSERT(sched_state >= SCHED_INTERRUPTING);
482 continue; // nothing to do
485 #if defined(PARALLEL_HASKELL)
486 scheduleSendPendingMessages();
487 if (emptyRunQueue(cap) && scheduleActivateSpark())
491 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
494 /* If we still have no work we need to send a FISH to get a spark
496 if (emptyRunQueue(cap)) {
497 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498 ASSERT(rtsFalse); // should not happen at the moment
500 // from here: non-empty run queue.
501 // TODO: merge above case with this, only one call processMessages() !
502 if (PacketsWaiting()) { /* process incoming messages, if
503 any pending... only in else
504 because getRemoteWork waits for
506 receivedFinish = processMessages();
511 scheduleProcessEvent(event);
515 // Get a thread to run
517 t = popRunQueue(cap);
519 #if defined(GRAN) || defined(PAR)
520 scheduleGranParReport(); // some kind of debuging output
522 // Sanity check the thread we're about to run. This can be
523 // expensive if there is lots of thread switching going on...
524 IF_DEBUG(sanity,checkTSO(t));
527 #if defined(THREADED_RTS)
528 // Check whether we can run this thread in the current task.
529 // If not, we have to pass our capability to the right task.
531 Task *bound = t->bound;
535 debugTrace(DEBUG_sched,
536 "### Running thread %lu in bound thread", (unsigned long)t->id);
537 // yes, the Haskell thread is bound to the current native thread
539 debugTrace(DEBUG_sched,
540 "### thread %lu bound to another OS thread", (unsigned long)t->id);
541 // no, bound to a different Haskell thread: pass to that thread
542 pushOnRunQueue(cap,t);
546 // The thread we want to run is unbound.
548 debugTrace(DEBUG_sched,
549 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550 // no, the current native thread is bound to a different
551 // Haskell thread, so pass it to any worker thread
552 pushOnRunQueue(cap,t);
559 cap->r.rCurrentTSO = t;
561 /* context switches are initiated by the timer signal, unless
562 * the user specified "context switch as often as possible", with
565 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566 && !emptyThreadQueues(cap)) {
572 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
573 (long)t->id, whatNext_strs[t->what_next]);
575 #if defined(PROFILING)
576 startHeapProfTimer();
579 // Check for exceptions blocked on this thread
580 maybePerformBlockedException (cap, t);
582 // ----------------------------------------------------------------------
583 // Run the current thread
585 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586 ASSERT(t->cap == cap);
588 prev_what_next = t->what_next;
590 errno = t->saved_errno;
591 cap->in_haskell = rtsTrue;
595 recent_activity = ACTIVITY_YES;
597 switch (prev_what_next) {
601 /* Thread already finished, return to scheduler. */
602 ret = ThreadFinished;
608 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609 cap = regTableToCapability(r);
614 case ThreadInterpret:
615 cap = interpretBCO(cap);
620 barf("schedule: invalid what_next field");
623 cap->in_haskell = rtsFalse;
625 // The TSO might have moved, eg. if it re-entered the RTS and a GC
626 // happened. So find the new location:
627 t = cap->r.rCurrentTSO;
629 // We have run some Haskell code: there might be blackhole-blocked
630 // threads to wake up now.
631 // Lock-free test here should be ok, we're just setting a flag.
632 if ( blackhole_queue != END_TSO_QUEUE ) {
633 blackholes_need_checking = rtsTrue;
636 // And save the current errno in this thread.
637 // XXX: possibly bogus for SMP because this thread might already
638 // be running again, see code below.
639 t->saved_errno = errno;
641 #if defined(THREADED_RTS)
642 // If ret is ThreadBlocked, and this Task is bound to the TSO that
643 // blocked, we are in limbo - the TSO is now owned by whatever it
644 // is blocked on, and may in fact already have been woken up,
645 // perhaps even on a different Capability. It may be the case
646 // that task->cap != cap. We better yield this Capability
647 // immediately and return to normaility.
648 if (ret == ThreadBlocked) {
649 debugTrace(DEBUG_sched,
650 "--<< thread %lu (%s) stopped: blocked",
651 (unsigned long)t->id, whatNext_strs[t->what_next]);
656 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
657 ASSERT(t->cap == cap);
659 // ----------------------------------------------------------------------
661 // Costs for the scheduler are assigned to CCS_SYSTEM
662 #if defined(PROFILING)
667 schedulePostRunThread();
669 ready_to_gc = rtsFalse;
673 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
677 scheduleHandleStackOverflow(cap,task,t);
681 if (scheduleHandleYield(cap, t, prev_what_next)) {
682 // shortcut for switching between compiler/interpreter:
688 scheduleHandleThreadBlocked(t);
692 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
693 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
697 barf("schedule: invalid thread return code %d", (int)ret);
700 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
702 cap = scheduleDoGC(cap,task,rtsFalse);
704 } /* end of while() */
706 debugTrace(PAR_DEBUG_verbose,
707 "== Leaving schedule() after having received Finish");
710 /* ----------------------------------------------------------------------------
711 * Setting up the scheduler loop
712 * ------------------------------------------------------------------------- */
715 schedulePreLoop(void)
718 /* set up first event to get things going */
719 /* ToDo: assign costs for system setup and init MainTSO ! */
720 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
722 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
724 debugTrace (DEBUG_gran,
725 "GRAN: Init CurrentTSO (in schedule) = %p",
727 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
729 if (RtsFlags.GranFlags.Light) {
730 /* Save current time; GranSim Light only */
731 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
736 /* -----------------------------------------------------------------------------
739 * Push work to other Capabilities if we have some.
740 * -------------------------------------------------------------------------- */
742 #if defined(THREADED_RTS)
744 schedulePushWork(Capability *cap USED_IF_THREADS,
745 Task *task USED_IF_THREADS)
747 Capability *free_caps[n_capabilities], *cap0;
750 // migration can be turned off with +RTS -qg
751 if (!RtsFlags.ParFlags.migrate) return;
753 // Check whether we have more threads on our run queue, or sparks
754 // in our pool, that we could hand to another Capability.
755 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
756 && sparkPoolSizeCap(cap) < 2) {
760 // First grab as many free Capabilities as we can.
761 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762 cap0 = &capabilities[i];
763 if (cap != cap0 && tryGrabCapability(cap0,task)) {
764 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765 // it already has some work, we just grabbed it at
766 // the wrong moment. Or maybe it's deadlocked!
767 releaseCapability(cap0);
769 free_caps[n_free_caps++] = cap0;
774 // we now have n_free_caps free capabilities stashed in
775 // free_caps[]. Share our run queue equally with them. This is
776 // probably the simplest thing we could do; improvements we might
777 // want to do include:
779 // - giving high priority to moving relatively new threads, on
780 // the gournds that they haven't had time to build up a
781 // working set in the cache on this CPU/Capability.
783 // - giving low priority to moving long-lived threads
785 if (n_free_caps > 0) {
786 StgTSO *prev, *t, *next;
787 rtsBool pushed_to_all;
789 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
792 pushed_to_all = rtsFalse;
794 if (cap->run_queue_hd != END_TSO_QUEUE) {
795 prev = cap->run_queue_hd;
797 prev->link = END_TSO_QUEUE;
798 for (; t != END_TSO_QUEUE; t = next) {
800 t->link = END_TSO_QUEUE;
801 if (t->what_next == ThreadRelocated
802 || t->bound == task // don't move my bound thread
803 || tsoLocked(t)) { // don't move a locked thread
806 } else if (i == n_free_caps) {
807 pushed_to_all = rtsTrue;
813 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
814 appendToRunQueue(free_caps[i],t);
815 if (t->bound) { t->bound->cap = free_caps[i]; }
816 t->cap = free_caps[i];
820 cap->run_queue_tl = prev;
823 // If there are some free capabilities that we didn't push any
824 // threads to, then try to push a spark to each one.
825 if (!pushed_to_all) {
827 // i is the next free capability to push to
828 for (; i < n_free_caps; i++) {
829 if (emptySparkPoolCap(free_caps[i])) {
830 spark = findSpark(cap);
832 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
833 newSpark(&(free_caps[i]->r), spark);
839 // release the capabilities
840 for (i = 0; i < n_free_caps; i++) {
841 task->cap = free_caps[i];
842 releaseCapability(free_caps[i]);
845 task->cap = cap; // reset to point to our Capability.
849 /* ----------------------------------------------------------------------------
850 * Start any pending signal handlers
851 * ------------------------------------------------------------------------- */
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
855 scheduleStartSignalHandlers(Capability *cap)
857 if (signals_pending()) { // safe outside the lock
858 startSignalHandlers(cap);
863 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
868 /* ----------------------------------------------------------------------------
869 * Check for blocked threads that can be woken up.
870 * ------------------------------------------------------------------------- */
873 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
875 #if !defined(THREADED_RTS)
877 // Check whether any waiting threads need to be woken up. If the
878 // run queue is empty, and there are no other tasks running, we
879 // can wait indefinitely for something to happen.
881 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
883 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
889 /* ----------------------------------------------------------------------------
890 * Check for threads woken up by other Capabilities
891 * ------------------------------------------------------------------------- */
894 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
896 #if defined(THREADED_RTS)
897 // Any threads that were woken up by other Capabilities get
898 // appended to our run queue.
899 if (!emptyWakeupQueue(cap)) {
900 ACQUIRE_LOCK(&cap->lock);
901 if (emptyRunQueue(cap)) {
902 cap->run_queue_hd = cap->wakeup_queue_hd;
903 cap->run_queue_tl = cap->wakeup_queue_tl;
905 cap->run_queue_tl->link = cap->wakeup_queue_hd;
906 cap->run_queue_tl = cap->wakeup_queue_tl;
908 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
909 RELEASE_LOCK(&cap->lock);
914 /* ----------------------------------------------------------------------------
915 * Check for threads blocked on BLACKHOLEs that can be woken up
916 * ------------------------------------------------------------------------- */
918 scheduleCheckBlackHoles (Capability *cap)
920 if ( blackholes_need_checking ) // check without the lock first
922 ACQUIRE_LOCK(&sched_mutex);
923 if ( blackholes_need_checking ) {
924 checkBlackHoles(cap);
925 blackholes_need_checking = rtsFalse;
927 RELEASE_LOCK(&sched_mutex);
931 /* ----------------------------------------------------------------------------
932 * Detect deadlock conditions and attempt to resolve them.
933 * ------------------------------------------------------------------------- */
936 scheduleDetectDeadlock (Capability *cap, Task *task)
939 #if defined(PARALLEL_HASKELL)
940 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
945 * Detect deadlock: when we have no threads to run, there are no
946 * threads blocked, waiting for I/O, or sleeping, and all the
947 * other tasks are waiting for work, we must have a deadlock of
950 if ( emptyThreadQueues(cap) )
952 #if defined(THREADED_RTS)
954 * In the threaded RTS, we only check for deadlock if there
955 * has been no activity in a complete timeslice. This means
956 * we won't eagerly start a full GC just because we don't have
957 * any threads to run currently.
959 if (recent_activity != ACTIVITY_INACTIVE) return;
962 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
964 // Garbage collection can release some new threads due to
965 // either (a) finalizers or (b) threads resurrected because
966 // they are unreachable and will therefore be sent an
967 // exception. Any threads thus released will be immediately
969 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
971 recent_activity = ACTIVITY_DONE_GC;
973 if ( !emptyRunQueue(cap) ) return;
975 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
976 /* If we have user-installed signal handlers, then wait
977 * for signals to arrive rather then bombing out with a
980 if ( anyUserHandlers() ) {
981 debugTrace(DEBUG_sched,
982 "still deadlocked, waiting for signals...");
986 if (signals_pending()) {
987 startSignalHandlers(cap);
990 // either we have threads to run, or we were interrupted:
991 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
995 #if !defined(THREADED_RTS)
996 /* Probably a real deadlock. Send the current main thread the
997 * Deadlock exception.
1000 switch (task->tso->why_blocked) {
1002 case BlockedOnBlackHole:
1003 case BlockedOnException:
1005 throwToSingleThreaded(cap, task->tso,
1006 (StgClosure *)NonTermination_closure);
1009 barf("deadlock: main thread blocked in a strange way");
1017 /* ----------------------------------------------------------------------------
1018 * Process an event (GRAN only)
1019 * ------------------------------------------------------------------------- */
1023 scheduleProcessEvent(rtsEvent *event)
1027 if (RtsFlags.GranFlags.Light)
1028 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1030 /* adjust time based on time-stamp */
1031 if (event->time > CurrentTime[CurrentProc] &&
1032 event->evttype != ContinueThread)
1033 CurrentTime[CurrentProc] = event->time;
1035 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1036 if (!RtsFlags.GranFlags.Light)
1039 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1041 /* main event dispatcher in GranSim */
1042 switch (event->evttype) {
1043 /* Should just be continuing execution */
1044 case ContinueThread:
1045 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1046 /* ToDo: check assertion
1047 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1048 run_queue_hd != END_TSO_QUEUE);
1050 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1051 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1052 procStatus[CurrentProc]==Fetching) {
1053 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1054 CurrentTSO->id, CurrentTSO, CurrentProc);
1057 /* Ignore ContinueThreads for completed threads */
1058 if (CurrentTSO->what_next == ThreadComplete) {
1059 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1060 CurrentTSO->id, CurrentTSO, CurrentProc);
1063 /* Ignore ContinueThreads for threads that are being migrated */
1064 if (PROCS(CurrentTSO)==Nowhere) {
1065 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1066 CurrentTSO->id, CurrentTSO, CurrentProc);
1069 /* The thread should be at the beginning of the run queue */
1070 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1071 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1072 CurrentTSO->id, CurrentTSO, CurrentProc);
1073 break; // run the thread anyway
1076 new_event(proc, proc, CurrentTime[proc],
1078 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1080 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1081 break; // now actually run the thread; DaH Qu'vam yImuHbej
1084 do_the_fetchnode(event);
1085 goto next_thread; /* handle next event in event queue */
1088 do_the_globalblock(event);
1089 goto next_thread; /* handle next event in event queue */
1092 do_the_fetchreply(event);
1093 goto next_thread; /* handle next event in event queue */
1095 case UnblockThread: /* Move from the blocked queue to the tail of */
1096 do_the_unblock(event);
1097 goto next_thread; /* handle next event in event queue */
1099 case ResumeThread: /* Move from the blocked queue to the tail of */
1100 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1101 event->tso->gran.blocktime +=
1102 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1103 do_the_startthread(event);
1104 goto next_thread; /* handle next event in event queue */
1107 do_the_startthread(event);
1108 goto next_thread; /* handle next event in event queue */
1111 do_the_movethread(event);
1112 goto next_thread; /* handle next event in event queue */
1115 do_the_movespark(event);
1116 goto next_thread; /* handle next event in event queue */
1119 do_the_findwork(event);
1120 goto next_thread; /* handle next event in event queue */
1123 barf("Illegal event type %u\n", event->evttype);
1126 /* This point was scheduler_loop in the old RTS */
1128 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1130 TimeOfLastEvent = CurrentTime[CurrentProc];
1131 TimeOfNextEvent = get_time_of_next_event();
1132 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1133 // CurrentTSO = ThreadQueueHd;
1135 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1138 if (RtsFlags.GranFlags.Light)
1139 GranSimLight_leave_system(event, &ActiveTSO);
1141 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1144 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1146 /* in a GranSim setup the TSO stays on the run queue */
1148 /* Take a thread from the run queue. */
1149 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1152 debugBelch("GRAN: About to run current thread, which is\n");
1155 context_switch = 0; // turned on via GranYield, checking events and time slice
1158 DumpGranEvent(GR_SCHEDULE, t));
1160 procStatus[CurrentProc] = Busy;
1164 /* ----------------------------------------------------------------------------
1165 * Send pending messages (PARALLEL_HASKELL only)
1166 * ------------------------------------------------------------------------- */
1168 #if defined(PARALLEL_HASKELL)
1170 scheduleSendPendingMessages(void)
1176 # if defined(PAR) // global Mem.Mgmt., omit for now
1177 if (PendingFetches != END_BF_QUEUE) {
1182 if (RtsFlags.ParFlags.BufferTime) {
1183 // if we use message buffering, we must send away all message
1184 // packets which have become too old...
1190 /* ----------------------------------------------------------------------------
1191 * Activate spark threads (PARALLEL_HASKELL only)
1192 * ------------------------------------------------------------------------- */
1194 #if defined(PARALLEL_HASKELL)
1196 scheduleActivateSpark(void)
1199 ASSERT(emptyRunQueue());
1200 /* We get here if the run queue is empty and want some work.
1201 We try to turn a spark into a thread, and add it to the run queue,
1202 from where it will be picked up in the next iteration of the scheduler
1206 /* :-[ no local threads => look out for local sparks */
1207 /* the spark pool for the current PE */
1208 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1209 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1210 pool->hd < pool->tl) {
1212 * ToDo: add GC code check that we really have enough heap afterwards!!
1214 * If we're here (no runnable threads) and we have pending
1215 * sparks, we must have a space problem. Get enough space
1216 * to turn one of those pending sparks into a
1220 spark = findSpark(rtsFalse); /* get a spark */
1221 if (spark != (rtsSpark) NULL) {
1222 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1223 IF_PAR_DEBUG(fish, // schedule,
1224 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1225 tso->id, tso, advisory_thread_count));
1227 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1228 IF_PAR_DEBUG(fish, // schedule,
1229 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1231 return rtsFalse; /* failed to generate a thread */
1232 } /* otherwise fall through & pick-up new tso */
1234 IF_PAR_DEBUG(fish, // schedule,
1235 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1236 spark_queue_len(pool)));
1237 return rtsFalse; /* failed to generate a thread */
1239 return rtsTrue; /* success in generating a thread */
1240 } else { /* no more threads permitted or pool empty */
1241 return rtsFalse; /* failed to generateThread */
1244 tso = NULL; // avoid compiler warning only
1245 return rtsFalse; /* dummy in non-PAR setup */
1248 #endif // PARALLEL_HASKELL
1250 /* ----------------------------------------------------------------------------
1251 * Get work from a remote node (PARALLEL_HASKELL only)
1252 * ------------------------------------------------------------------------- */
1254 #if defined(PARALLEL_HASKELL)
1256 scheduleGetRemoteWork(rtsBool *receivedFinish)
1258 ASSERT(emptyRunQueue());
1260 if (RtsFlags.ParFlags.BufferTime) {
1261 IF_PAR_DEBUG(verbose,
1262 debugBelch("...send all pending data,"));
1265 for (i=1; i<=nPEs; i++)
1266 sendImmediately(i); // send all messages away immediately
1270 //++EDEN++ idle() , i.e. send all buffers, wait for work
1271 // suppress fishing in EDEN... just look for incoming messages
1272 // (blocking receive)
1273 IF_PAR_DEBUG(verbose,
1274 debugBelch("...wait for incoming messages...\n"));
1275 *receivedFinish = processMessages(); // blocking receive...
1277 // and reenter scheduling loop after having received something
1278 // (return rtsFalse below)
1280 # else /* activate SPARKS machinery */
1281 /* We get here, if we have no work, tried to activate a local spark, but still
1282 have no work. We try to get a remote spark, by sending a FISH message.
1283 Thread migration should be added here, and triggered when a sequence of
1284 fishes returns without work. */
1285 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1287 /* =8-[ no local sparks => look for work on other PEs */
1289 * We really have absolutely no work. Send out a fish
1290 * (there may be some out there already), and wait for
1291 * something to arrive. We clearly can't run any threads
1292 * until a SCHEDULE or RESUME arrives, and so that's what
1293 * we're hoping to see. (Of course, we still have to
1294 * respond to other types of messages.)
1296 rtsTime now = msTime() /*CURRENT_TIME*/;
1297 IF_PAR_DEBUG(verbose,
1298 debugBelch("-- now=%ld\n", now));
1299 IF_PAR_DEBUG(fish, // verbose,
1300 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1301 (last_fish_arrived_at!=0 &&
1302 last_fish_arrived_at+delay > now)) {
1303 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1304 now, last_fish_arrived_at+delay,
1305 last_fish_arrived_at,
1309 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1310 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1311 if (last_fish_arrived_at==0 ||
1312 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1313 /* outstandingFishes is set in sendFish, processFish;
1314 avoid flooding system with fishes via delay */
1315 next_fish_to_send_at = 0;
1317 /* ToDo: this should be done in the main scheduling loop to avoid the
1318 busy wait here; not so bad if fish delay is very small */
1319 int iq = 0; // DEBUGGING -- HWL
1320 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1321 /* send a fish when ready, but process messages that arrive in the meantime */
1323 if (PacketsWaiting()) {
1325 *receivedFinish = processMessages();
1328 } while (!*receivedFinish || now<next_fish_to_send_at);
1329 // JB: This means the fish could become obsolete, if we receive
1330 // work. Better check for work again?
1331 // last line: while (!receivedFinish || !haveWork || now<...)
1332 // next line: if (receivedFinish || haveWork )
1334 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1335 return rtsFalse; // NB: this will leave scheduler loop
1336 // immediately after return!
1338 IF_PAR_DEBUG(fish, // verbose,
1339 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1343 // JB: IMHO, this should all be hidden inside sendFish(...)
1345 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1348 // Global statistics: count no. of fishes
1349 if (RtsFlags.ParFlags.ParStats.Global &&
1350 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1351 globalParStats.tot_fish_mess++;
1355 /* delayed fishes must have been sent by now! */
1356 next_fish_to_send_at = 0;
1359 *receivedFinish = processMessages();
1360 # endif /* SPARKS */
1363 /* NB: this function always returns rtsFalse, meaning the scheduler
1364 loop continues with the next iteration;
1366 return code means success in finding work; we enter this function
1367 if there is no local work, thus have to send a fish which takes
1368 time until it arrives with work; in the meantime we should process
1369 messages in the main loop;
1372 #endif // PARALLEL_HASKELL
1374 /* ----------------------------------------------------------------------------
1375 * PAR/GRAN: Report stats & debugging info(?)
1376 * ------------------------------------------------------------------------- */
1378 #if defined(PAR) || defined(GRAN)
1380 scheduleGranParReport(void)
1382 ASSERT(run_queue_hd != END_TSO_QUEUE);
1384 /* Take a thread from the run queue, if we have work */
1385 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1387 /* If this TSO has got its outport closed in the meantime,
1388 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1389 * It has to be marked as TH_DEAD for this purpose.
1390 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1392 JB: TODO: investigate wether state change field could be nuked
1393 entirely and replaced by the normal tso state (whatnext
1394 field). All we want to do is to kill tsos from outside.
1397 /* ToDo: write something to the log-file
1398 if (RTSflags.ParFlags.granSimStats && !sameThread)
1399 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1403 /* the spark pool for the current PE */
1404 pool = &(cap.r.rSparks); // cap = (old) MainCap
1407 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1408 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1411 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1412 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1414 if (RtsFlags.ParFlags.ParStats.Full &&
1415 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1416 (emitSchedule || // forced emit
1417 (t && LastTSO && t->id != LastTSO->id))) {
1419 we are running a different TSO, so write a schedule event to log file
1420 NB: If we use fair scheduling we also have to write a deschedule
1421 event for LastTSO; with unfair scheduling we know that the
1422 previous tso has blocked whenever we switch to another tso, so
1423 we don't need it in GUM for now
1425 IF_PAR_DEBUG(fish, // schedule,
1426 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1428 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1429 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1430 emitSchedule = rtsFalse;
1435 /* ----------------------------------------------------------------------------
1436 * After running a thread...
1437 * ------------------------------------------------------------------------- */
1440 schedulePostRunThread(void)
1443 /* HACK 675: if the last thread didn't yield, make sure to print a
1444 SCHEDULE event to the log file when StgRunning the next thread, even
1445 if it is the same one as before */
1447 TimeOfLastYield = CURRENT_TIME;
1450 /* some statistics gathering in the parallel case */
1452 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1456 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1457 globalGranStats.tot_heapover++;
1459 globalParStats.tot_heapover++;
1466 DumpGranEvent(GR_DESCHEDULE, t));
1467 globalGranStats.tot_stackover++;
1470 // DumpGranEvent(GR_DESCHEDULE, t);
1471 globalParStats.tot_stackover++;
1475 case ThreadYielding:
1478 DumpGranEvent(GR_DESCHEDULE, t));
1479 globalGranStats.tot_yields++;
1482 // DumpGranEvent(GR_DESCHEDULE, t);
1483 globalParStats.tot_yields++;
1489 debugTrace(DEBUG_sched,
1490 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1491 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1492 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1493 if (t->block_info.closure!=(StgClosure*)NULL)
1494 print_bq(t->block_info.closure);
1497 // ??? needed; should emit block before
1499 DumpGranEvent(GR_DESCHEDULE, t));
1500 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1503 ASSERT(procStatus[CurrentProc]==Busy ||
1504 ((procStatus[CurrentProc]==Fetching) &&
1505 (t->block_info.closure!=(StgClosure*)NULL)));
1506 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1507 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1508 procStatus[CurrentProc]==Fetching))
1509 procStatus[CurrentProc] = Idle;
1512 //++PAR++ blockThread() writes the event (change?)
1516 case ThreadFinished:
1520 barf("parGlobalStats: unknown return code");
1526 /* -----------------------------------------------------------------------------
1527 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1528 * -------------------------------------------------------------------------- */
1531 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1533 // did the task ask for a large block?
1534 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1535 // if so, get one and push it on the front of the nursery.
1539 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1541 debugTrace(DEBUG_sched,
1542 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1543 (long)t->id, whatNext_strs[t->what_next], blocks);
1545 // don't do this if the nursery is (nearly) full, we'll GC first.
1546 if (cap->r.rCurrentNursery->link != NULL ||
1547 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1548 // if the nursery has only one block.
1551 bd = allocGroup( blocks );
1553 cap->r.rNursery->n_blocks += blocks;
1555 // link the new group into the list
1556 bd->link = cap->r.rCurrentNursery;
1557 bd->u.back = cap->r.rCurrentNursery->u.back;
1558 if (cap->r.rCurrentNursery->u.back != NULL) {
1559 cap->r.rCurrentNursery->u.back->link = bd;
1561 #if !defined(THREADED_RTS)
1562 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1563 g0s0 == cap->r.rNursery);
1565 cap->r.rNursery->blocks = bd;
1567 cap->r.rCurrentNursery->u.back = bd;
1569 // initialise it as a nursery block. We initialise the
1570 // step, gen_no, and flags field of *every* sub-block in
1571 // this large block, because this is easier than making
1572 // sure that we always find the block head of a large
1573 // block whenever we call Bdescr() (eg. evacuate() and
1574 // isAlive() in the GC would both have to do this, at
1578 for (x = bd; x < bd + blocks; x++) {
1579 x->step = cap->r.rNursery;
1585 // This assert can be a killer if the app is doing lots
1586 // of large block allocations.
1587 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1589 // now update the nursery to point to the new block
1590 cap->r.rCurrentNursery = bd;
1592 // we might be unlucky and have another thread get on the
1593 // run queue before us and steal the large block, but in that
1594 // case the thread will just end up requesting another large
1596 pushOnRunQueue(cap,t);
1597 return rtsFalse; /* not actually GC'ing */
1601 debugTrace(DEBUG_sched,
1602 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1603 (long)t->id, whatNext_strs[t->what_next]);
1606 ASSERT(!is_on_queue(t,CurrentProc));
1607 #elif defined(PARALLEL_HASKELL)
1608 /* Currently we emit a DESCHEDULE event before GC in GUM.
1609 ToDo: either add separate event to distinguish SYSTEM time from rest
1610 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1611 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1612 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1613 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1614 emitSchedule = rtsTrue;
1618 pushOnRunQueue(cap,t);
1620 /* actual GC is done at the end of the while loop in schedule() */
1623 /* -----------------------------------------------------------------------------
1624 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1625 * -------------------------------------------------------------------------- */
1628 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1630 debugTrace (DEBUG_sched,
1631 "--<< thread %ld (%s) stopped, StackOverflow",
1632 (long)t->id, whatNext_strs[t->what_next]);
1634 /* just adjust the stack for this thread, then pop it back
1638 /* enlarge the stack */
1639 StgTSO *new_t = threadStackOverflow(cap, t);
1641 /* The TSO attached to this Task may have moved, so update the
1644 if (task->tso == t) {
1647 pushOnRunQueue(cap,new_t);
1651 /* -----------------------------------------------------------------------------
1652 * Handle a thread that returned to the scheduler with ThreadYielding
1653 * -------------------------------------------------------------------------- */
1656 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1658 // Reset the context switch flag. We don't do this just before
1659 // running the thread, because that would mean we would lose ticks
1660 // during GC, which can lead to unfair scheduling (a thread hogs
1661 // the CPU because the tick always arrives during GC). This way
1662 // penalises threads that do a lot of allocation, but that seems
1663 // better than the alternative.
1666 /* put the thread back on the run queue. Then, if we're ready to
1667 * GC, check whether this is the last task to stop. If so, wake
1668 * up the GC thread. getThread will block during a GC until the
1672 if (t->what_next != prev_what_next) {
1673 debugTrace(DEBUG_sched,
1674 "--<< thread %ld (%s) stopped to switch evaluators",
1675 (long)t->id, whatNext_strs[t->what_next]);
1677 debugTrace(DEBUG_sched,
1678 "--<< thread %ld (%s) stopped, yielding",
1679 (long)t->id, whatNext_strs[t->what_next]);
1684 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1686 ASSERT(t->link == END_TSO_QUEUE);
1688 // Shortcut if we're just switching evaluators: don't bother
1689 // doing stack squeezing (which can be expensive), just run the
1691 if (t->what_next != prev_what_next) {
1696 ASSERT(!is_on_queue(t,CurrentProc));
1699 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1700 checkThreadQsSanity(rtsTrue));
1704 addToRunQueue(cap,t);
1707 /* add a ContinueThread event to actually process the thread */
1708 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1710 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1712 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1719 /* -----------------------------------------------------------------------------
1720 * Handle a thread that returned to the scheduler with ThreadBlocked
1721 * -------------------------------------------------------------------------- */
1724 scheduleHandleThreadBlocked( StgTSO *t
1725 #if !defined(GRAN) && !defined(DEBUG)
1732 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1733 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1734 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1736 // ??? needed; should emit block before
1738 DumpGranEvent(GR_DESCHEDULE, t));
1739 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1742 ASSERT(procStatus[CurrentProc]==Busy ||
1743 ((procStatus[CurrentProc]==Fetching) &&
1744 (t->block_info.closure!=(StgClosure*)NULL)));
1745 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1746 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1747 procStatus[CurrentProc]==Fetching))
1748 procStatus[CurrentProc] = Idle;
1752 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1753 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1756 if (t->block_info.closure!=(StgClosure*)NULL)
1757 print_bq(t->block_info.closure));
1759 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1762 /* whatever we schedule next, we must log that schedule */
1763 emitSchedule = rtsTrue;
1767 // We don't need to do anything. The thread is blocked, and it
1768 // has tidied up its stack and placed itself on whatever queue
1769 // it needs to be on.
1771 // ASSERT(t->why_blocked != NotBlocked);
1772 // Not true: for example,
1773 // - in THREADED_RTS, the thread may already have been woken
1774 // up by another Capability. This actually happens: try
1775 // conc023 +RTS -N2.
1776 // - the thread may have woken itself up already, because
1777 // threadPaused() might have raised a blocked throwTo
1778 // exception, see maybePerformBlockedException().
1781 if (traceClass(DEBUG_sched)) {
1782 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1783 (unsigned long)t->id, whatNext_strs[t->what_next]);
1784 printThreadBlockage(t);
1789 /* Only for dumping event to log file
1790 ToDo: do I need this in GranSim, too?
1796 /* -----------------------------------------------------------------------------
1797 * Handle a thread that returned to the scheduler with ThreadFinished
1798 * -------------------------------------------------------------------------- */
1801 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1803 /* Need to check whether this was a main thread, and if so,
1804 * return with the return value.
1806 * We also end up here if the thread kills itself with an
1807 * uncaught exception, see Exception.cmm.
1809 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1810 (unsigned long)t->id, whatNext_strs[t->what_next]);
1812 /* Inform the Hpc that a thread has finished */
1813 hs_hpc_thread_finished_event(t);
1816 endThread(t, CurrentProc); // clean-up the thread
1817 #elif defined(PARALLEL_HASKELL)
1818 /* For now all are advisory -- HWL */
1819 //if(t->priority==AdvisoryPriority) ??
1820 advisory_thread_count--; // JB: Caution with this counter, buggy!
1823 if(t->dist.priority==RevalPriority)
1827 # if defined(EDENOLD)
1828 // the thread could still have an outport... (BUG)
1829 if (t->eden.outport != -1) {
1830 // delete the outport for the tso which has finished...
1831 IF_PAR_DEBUG(eden_ports,
1832 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1833 t->eden.outport, t->id));
1836 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1837 if (t->eden.epid != -1) {
1838 IF_PAR_DEBUG(eden_ports,
1839 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1840 t->id, t->eden.epid));
1841 removeTSOfromProcess(t);
1846 if (RtsFlags.ParFlags.ParStats.Full &&
1847 !RtsFlags.ParFlags.ParStats.Suppressed)
1848 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1850 // t->par only contains statistics: left out for now...
1852 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1853 t->id,t,t->par.sparkname));
1855 #endif // PARALLEL_HASKELL
1858 // Check whether the thread that just completed was a bound
1859 // thread, and if so return with the result.
1861 // There is an assumption here that all thread completion goes
1862 // through this point; we need to make sure that if a thread
1863 // ends up in the ThreadKilled state, that it stays on the run
1864 // queue so it can be dealt with here.
1869 if (t->bound != task) {
1870 #if !defined(THREADED_RTS)
1871 // Must be a bound thread that is not the topmost one. Leave
1872 // it on the run queue until the stack has unwound to the
1873 // point where we can deal with this. Leaving it on the run
1874 // queue also ensures that the garbage collector knows about
1875 // this thread and its return value (it gets dropped from the
1876 // all_threads list so there's no other way to find it).
1877 appendToRunQueue(cap,t);
1880 // this cannot happen in the threaded RTS, because a
1881 // bound thread can only be run by the appropriate Task.
1882 barf("finished bound thread that isn't mine");
1886 ASSERT(task->tso == t);
1888 if (t->what_next == ThreadComplete) {
1890 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1891 *(task->ret) = (StgClosure *)task->tso->sp[1];
1893 task->stat = Success;
1896 *(task->ret) = NULL;
1898 if (sched_state >= SCHED_INTERRUPTING) {
1899 task->stat = Interrupted;
1901 task->stat = Killed;
1905 removeThreadLabel((StgWord)task->tso->id);
1907 return rtsTrue; // tells schedule() to return
1913 /* -----------------------------------------------------------------------------
1914 * Perform a heap census, if PROFILING
1915 * -------------------------------------------------------------------------- */
1918 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1920 #if defined(PROFILING)
1921 // When we have +RTS -i0 and we're heap profiling, do a census at
1922 // every GC. This lets us get repeatable runs for debugging.
1923 if (performHeapProfile ||
1924 (RtsFlags.ProfFlags.profileInterval==0 &&
1925 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1927 // checking black holes is necessary before GC, otherwise
1928 // there may be threads that are unreachable except by the
1929 // blackhole queue, which the GC will consider to be
1931 scheduleCheckBlackHoles(&MainCapability);
1933 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1934 GarbageCollect(rtsTrue);
1936 debugTrace(DEBUG_sched, "performing heap census");
1939 performHeapProfile = rtsFalse;
1940 return rtsTrue; // true <=> we already GC'd
1946 /* -----------------------------------------------------------------------------
1947 * Perform a garbage collection if necessary
1948 * -------------------------------------------------------------------------- */
1951 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1955 static volatile StgWord waiting_for_gc;
1956 rtsBool was_waiting;
1961 // In order to GC, there must be no threads running Haskell code.
1962 // Therefore, the GC thread needs to hold *all* the capabilities,
1963 // and release them after the GC has completed.
1965 // This seems to be the simplest way: previous attempts involved
1966 // making all the threads with capabilities give up their
1967 // capabilities and sleep except for the *last* one, which
1968 // actually did the GC. But it's quite hard to arrange for all
1969 // the other tasks to sleep and stay asleep.
1972 was_waiting = cas(&waiting_for_gc, 0, 1);
1975 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1976 if (cap) yieldCapability(&cap,task);
1977 } while (waiting_for_gc);
1978 return cap; // NOTE: task->cap might have changed here
1981 for (i=0; i < n_capabilities; i++) {
1982 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1983 if (cap != &capabilities[i]) {
1984 Capability *pcap = &capabilities[i];
1985 // we better hope this task doesn't get migrated to
1986 // another Capability while we're waiting for this one.
1987 // It won't, because load balancing happens while we have
1988 // all the Capabilities, but even so it's a slightly
1989 // unsavoury invariant.
1992 waitForReturnCapability(&pcap, task);
1993 if (pcap != &capabilities[i]) {
1994 barf("scheduleDoGC: got the wrong capability");
1999 waiting_for_gc = rtsFalse;
2002 /* Kick any transactions which are invalid back to their
2003 * atomically frames. When next scheduled they will try to
2004 * commit, this commit will fail and they will retry.
2009 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2010 if (t->what_next == ThreadRelocated) {
2013 next = t->global_link;
2015 // This is a good place to check for blocked
2016 // exceptions. It might be the case that a thread is
2017 // blocked on delivering an exception to a thread that
2018 // is also blocked - we try to ensure that this
2019 // doesn't happen in throwTo(), but it's too hard (or
2020 // impossible) to close all the race holes, so we
2021 // accept that some might get through and deal with
2022 // them here. A GC will always happen at some point,
2023 // even if the system is otherwise deadlocked.
2024 maybePerformBlockedException (&capabilities[0], t);
2026 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2027 if (!stmValidateNestOfTransactions (t -> trec)) {
2028 debugTrace(DEBUG_sched | DEBUG_stm,
2029 "trec %p found wasting its time", t);
2031 // strip the stack back to the
2032 // ATOMICALLY_FRAME, aborting the (nested)
2033 // transaction, and saving the stack of any
2034 // partially-evaluated thunks on the heap.
2035 throwToSingleThreaded_(&capabilities[0], t,
2036 NULL, rtsTrue, NULL);
2039 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2047 // so this happens periodically:
2048 if (cap) scheduleCheckBlackHoles(cap);
2050 IF_DEBUG(scheduler, printAllThreads());
2053 * We now have all the capabilities; if we're in an interrupting
2054 * state, then we should take the opportunity to delete all the
2055 * threads in the system.
2057 if (sched_state >= SCHED_INTERRUPTING) {
2058 deleteAllThreads(&capabilities[0]);
2059 sched_state = SCHED_SHUTTING_DOWN;
2062 /* everybody back, start the GC.
2063 * Could do it in this thread, or signal a condition var
2064 * to do it in another thread. Either way, we need to
2065 * broadcast on gc_pending_cond afterward.
2067 #if defined(THREADED_RTS)
2068 debugTrace(DEBUG_sched, "doing GC");
2070 GarbageCollect(force_major);
2072 #if defined(THREADED_RTS)
2073 // release our stash of capabilities.
2074 for (i = 0; i < n_capabilities; i++) {
2075 if (cap != &capabilities[i]) {
2076 task->cap = &capabilities[i];
2077 releaseCapability(&capabilities[i]);
2088 /* add a ContinueThread event to continue execution of current thread */
2089 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2091 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2093 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2101 /* ---------------------------------------------------------------------------
2102 * Singleton fork(). Do not copy any running threads.
2103 * ------------------------------------------------------------------------- */
2106 forkProcess(HsStablePtr *entry
2107 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2112 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2118 #if defined(THREADED_RTS)
2119 if (RtsFlags.ParFlags.nNodes > 1) {
2120 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2121 stg_exit(EXIT_FAILURE);
2125 debugTrace(DEBUG_sched, "forking!");
2127 // ToDo: for SMP, we should probably acquire *all* the capabilities
2132 if (pid) { // parent
2134 // just return the pid
2140 // Now, all OS threads except the thread that forked are
2141 // stopped. We need to stop all Haskell threads, including
2142 // those involved in foreign calls. Also we need to delete
2143 // all Tasks, because they correspond to OS threads that are
2146 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2147 if (t->what_next == ThreadRelocated) {
2150 next = t->global_link;
2151 // don't allow threads to catch the ThreadKilled
2152 // exception, but we do want to raiseAsync() because these
2153 // threads may be evaluating thunks that we need later.
2154 deleteThread_(cap,t);
2158 // Empty the run queue. It seems tempting to let all the
2159 // killed threads stay on the run queue as zombies to be
2160 // cleaned up later, but some of them correspond to bound
2161 // threads for which the corresponding Task does not exist.
2162 cap->run_queue_hd = END_TSO_QUEUE;
2163 cap->run_queue_tl = END_TSO_QUEUE;
2165 // Any suspended C-calling Tasks are no more, their OS threads
2167 cap->suspended_ccalling_tasks = NULL;
2169 // Empty the all_threads list. Otherwise, the garbage
2170 // collector may attempt to resurrect some of these threads.
2171 all_threads = END_TSO_QUEUE;
2173 // Wipe the task list, except the current Task.
2174 ACQUIRE_LOCK(&sched_mutex);
2175 for (task = all_tasks; task != NULL; task=task->all_link) {
2176 if (task != cap->running_task) {
2180 RELEASE_LOCK(&sched_mutex);
2182 #if defined(THREADED_RTS)
2183 // Wipe our spare workers list, they no longer exist. New
2184 // workers will be created if necessary.
2185 cap->spare_workers = NULL;
2186 cap->returning_tasks_hd = NULL;
2187 cap->returning_tasks_tl = NULL;
2190 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2191 rts_checkSchedStatus("forkProcess",cap);
2194 hs_exit(); // clean up and exit
2195 stg_exit(EXIT_SUCCESS);
2197 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2198 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2203 /* ---------------------------------------------------------------------------
2204 * Delete all the threads in the system
2205 * ------------------------------------------------------------------------- */
2208 deleteAllThreads ( Capability *cap )
2210 // NOTE: only safe to call if we own all capabilities.
2213 debugTrace(DEBUG_sched,"deleting all threads");
2214 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2215 if (t->what_next == ThreadRelocated) {
2218 next = t->global_link;
2219 deleteThread(cap,t);
2223 // The run queue now contains a bunch of ThreadKilled threads. We
2224 // must not throw these away: the main thread(s) will be in there
2225 // somewhere, and the main scheduler loop has to deal with it.
2226 // Also, the run queue is the only thing keeping these threads from
2227 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2229 #if !defined(THREADED_RTS)
2230 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2231 ASSERT(sleeping_queue == END_TSO_QUEUE);
2235 /* -----------------------------------------------------------------------------
2236 Managing the suspended_ccalling_tasks list.
2237 Locks required: sched_mutex
2238 -------------------------------------------------------------------------- */
2241 suspendTask (Capability *cap, Task *task)
2243 ASSERT(task->next == NULL && task->prev == NULL);
2244 task->next = cap->suspended_ccalling_tasks;
2246 if (cap->suspended_ccalling_tasks) {
2247 cap->suspended_ccalling_tasks->prev = task;
2249 cap->suspended_ccalling_tasks = task;
2253 recoverSuspendedTask (Capability *cap, Task *task)
2256 task->prev->next = task->next;
2258 ASSERT(cap->suspended_ccalling_tasks == task);
2259 cap->suspended_ccalling_tasks = task->next;
2262 task->next->prev = task->prev;
2264 task->next = task->prev = NULL;
2267 /* ---------------------------------------------------------------------------
2268 * Suspending & resuming Haskell threads.
2270 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2271 * its capability before calling the C function. This allows another
2272 * task to pick up the capability and carry on running Haskell
2273 * threads. It also means that if the C call blocks, it won't lock
2276 * The Haskell thread making the C call is put to sleep for the
2277 * duration of the call, on the susepended_ccalling_threads queue. We
2278 * give out a token to the task, which it can use to resume the thread
2279 * on return from the C function.
2280 * ------------------------------------------------------------------------- */
2283 suspendThread (StgRegTable *reg)
2286 int saved_errno = errno;
2290 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2292 cap = regTableToCapability(reg);
2294 task = cap->running_task;
2295 tso = cap->r.rCurrentTSO;
2297 debugTrace(DEBUG_sched,
2298 "thread %lu did a safe foreign call",
2299 (unsigned long)cap->r.rCurrentTSO->id);
2301 // XXX this might not be necessary --SDM
2302 tso->what_next = ThreadRunGHC;
2304 threadPaused(cap,tso);
2306 if ((tso->flags & TSO_BLOCKEX) == 0) {
2307 tso->why_blocked = BlockedOnCCall;
2308 tso->flags |= TSO_BLOCKEX;
2309 tso->flags &= ~TSO_INTERRUPTIBLE;
2311 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2314 // Hand back capability
2315 task->suspended_tso = tso;
2317 ACQUIRE_LOCK(&cap->lock);
2319 suspendTask(cap,task);
2320 cap->in_haskell = rtsFalse;
2321 releaseCapability_(cap);
2323 RELEASE_LOCK(&cap->lock);
2325 #if defined(THREADED_RTS)
2326 /* Preparing to leave the RTS, so ensure there's a native thread/task
2327 waiting to take over.
2329 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2332 errno = saved_errno;
2337 resumeThread (void *task_)
2341 int saved_errno = errno;
2345 // Wait for permission to re-enter the RTS with the result.
2346 waitForReturnCapability(&cap,task);
2347 // we might be on a different capability now... but if so, our
2348 // entry on the suspended_ccalling_tasks list will also have been
2351 // Remove the thread from the suspended list
2352 recoverSuspendedTask(cap,task);
2354 tso = task->suspended_tso;
2355 task->suspended_tso = NULL;
2356 tso->link = END_TSO_QUEUE;
2357 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2359 if (tso->why_blocked == BlockedOnCCall) {
2360 awakenBlockedExceptionQueue(cap,tso);
2361 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2364 /* Reset blocking status */
2365 tso->why_blocked = NotBlocked;
2367 cap->r.rCurrentTSO = tso;
2368 cap->in_haskell = rtsTrue;
2369 errno = saved_errno;
2371 /* We might have GC'd, mark the TSO dirty again */
2374 IF_DEBUG(sanity, checkTSO(tso));
2379 /* ---------------------------------------------------------------------------
2382 * scheduleThread puts a thread on the end of the runnable queue.
2383 * This will usually be done immediately after a thread is created.
2384 * The caller of scheduleThread must create the thread using e.g.
2385 * createThread and push an appropriate closure
2386 * on this thread's stack before the scheduler is invoked.
2387 * ------------------------------------------------------------------------ */
2390 scheduleThread(Capability *cap, StgTSO *tso)
2392 // The thread goes at the *end* of the run-queue, to avoid possible
2393 // starvation of any threads already on the queue.
2394 appendToRunQueue(cap,tso);
2398 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2400 #if defined(THREADED_RTS)
2401 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2402 // move this thread from now on.
2403 cpu %= RtsFlags.ParFlags.nNodes;
2404 if (cpu == cap->no) {
2405 appendToRunQueue(cap,tso);
2407 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2410 appendToRunQueue(cap,tso);
2415 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2419 // We already created/initialised the Task
2420 task = cap->running_task;
2422 // This TSO is now a bound thread; make the Task and TSO
2423 // point to each other.
2429 task->stat = NoStatus;
2431 appendToRunQueue(cap,tso);
2433 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2436 /* GranSim specific init */
2437 CurrentTSO = m->tso; // the TSO to run
2438 procStatus[MainProc] = Busy; // status of main PE
2439 CurrentProc = MainProc; // PE to run it on
2442 cap = schedule(cap,task);
2444 ASSERT(task->stat != NoStatus);
2445 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2447 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2451 /* ----------------------------------------------------------------------------
2453 * ------------------------------------------------------------------------- */
2455 #if defined(THREADED_RTS)
2457 workerStart(Task *task)
2461 // See startWorkerTask().
2462 ACQUIRE_LOCK(&task->lock);
2464 RELEASE_LOCK(&task->lock);
2466 // set the thread-local pointer to the Task:
2469 // schedule() runs without a lock.
2470 cap = schedule(cap,task);
2472 // On exit from schedule(), we have a Capability.
2473 releaseCapability(cap);
2474 workerTaskStop(task);
2478 /* ---------------------------------------------------------------------------
2481 * Initialise the scheduler. This resets all the queues - if the
2482 * queues contained any threads, they'll be garbage collected at the
2485 * ------------------------------------------------------------------------ */
2492 for (i=0; i<=MAX_PROC; i++) {
2493 run_queue_hds[i] = END_TSO_QUEUE;
2494 run_queue_tls[i] = END_TSO_QUEUE;
2495 blocked_queue_hds[i] = END_TSO_QUEUE;
2496 blocked_queue_tls[i] = END_TSO_QUEUE;
2497 ccalling_threadss[i] = END_TSO_QUEUE;
2498 blackhole_queue[i] = END_TSO_QUEUE;
2499 sleeping_queue = END_TSO_QUEUE;
2501 #elif !defined(THREADED_RTS)
2502 blocked_queue_hd = END_TSO_QUEUE;
2503 blocked_queue_tl = END_TSO_QUEUE;
2504 sleeping_queue = END_TSO_QUEUE;
2507 blackhole_queue = END_TSO_QUEUE;
2508 all_threads = END_TSO_QUEUE;
2511 sched_state = SCHED_RUNNING;
2513 #if defined(THREADED_RTS)
2514 /* Initialise the mutex and condition variables used by
2516 initMutex(&sched_mutex);
2519 ACQUIRE_LOCK(&sched_mutex);
2521 /* A capability holds the state a native thread needs in
2522 * order to execute STG code. At least one capability is
2523 * floating around (only THREADED_RTS builds have more than one).
2529 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2533 #if defined(THREADED_RTS)
2535 * Eagerly start one worker to run each Capability, except for
2536 * Capability 0. The idea is that we're probably going to start a
2537 * bound thread on Capability 0 pretty soon, so we don't want a
2538 * worker task hogging it.
2543 for (i = 1; i < n_capabilities; i++) {
2544 cap = &capabilities[i];
2545 ACQUIRE_LOCK(&cap->lock);
2546 startWorkerTask(cap, workerStart);
2547 RELEASE_LOCK(&cap->lock);
2552 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2554 RELEASE_LOCK(&sched_mutex);
2558 exitScheduler( void )
2562 #if defined(THREADED_RTS)
2563 ACQUIRE_LOCK(&sched_mutex);
2564 task = newBoundTask();
2565 RELEASE_LOCK(&sched_mutex);
2568 // If we haven't killed all the threads yet, do it now.
2569 if (sched_state < SCHED_SHUTTING_DOWN) {
2570 sched_state = SCHED_INTERRUPTING;
2571 scheduleDoGC(NULL,task,rtsFalse);
2573 sched_state = SCHED_SHUTTING_DOWN;
2575 #if defined(THREADED_RTS)
2579 for (i = 0; i < n_capabilities; i++) {
2580 shutdownCapability(&capabilities[i], task);
2582 boundTaskExiting(task);
2586 freeCapability(&MainCapability);
2591 freeScheduler( void )
2594 if (n_capabilities != 1) {
2595 stgFree(capabilities);
2597 #if defined(THREADED_RTS)
2598 closeMutex(&sched_mutex);
2602 /* ---------------------------------------------------------------------------
2603 Where are the roots that we know about?
2605 - all the threads on the runnable queue
2606 - all the threads on the blocked queue
2607 - all the threads on the sleeping queue
2608 - all the thread currently executing a _ccall_GC
2609 - all the "main threads"
2611 ------------------------------------------------------------------------ */
2613 /* This has to be protected either by the scheduler monitor, or by the
2614 garbage collection monitor (probably the latter).
2619 GetRoots( evac_fn evac )
2626 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2627 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2628 evac((StgClosure **)&run_queue_hds[i]);
2629 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2630 evac((StgClosure **)&run_queue_tls[i]);
2632 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2633 evac((StgClosure **)&blocked_queue_hds[i]);
2634 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2635 evac((StgClosure **)&blocked_queue_tls[i]);
2636 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2637 evac((StgClosure **)&ccalling_threads[i]);
2644 for (i = 0; i < n_capabilities; i++) {
2645 cap = &capabilities[i];
2646 evac((StgClosure **)(void *)&cap->run_queue_hd);
2647 evac((StgClosure **)(void *)&cap->run_queue_tl);
2648 #if defined(THREADED_RTS)
2649 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2650 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2652 for (task = cap->suspended_ccalling_tasks; task != NULL;
2654 debugTrace(DEBUG_sched,
2655 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2656 evac((StgClosure **)(void *)&task->suspended_tso);
2662 #if !defined(THREADED_RTS)
2663 evac((StgClosure **)(void *)&blocked_queue_hd);
2664 evac((StgClosure **)(void *)&blocked_queue_tl);
2665 evac((StgClosure **)(void *)&sleeping_queue);
2669 // evac((StgClosure **)&blackhole_queue);
2671 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2672 markSparkQueue(evac);
2675 #if defined(RTS_USER_SIGNALS)
2676 // mark the signal handlers (signals should be already blocked)
2677 markSignalHandlers(evac);
2681 /* -----------------------------------------------------------------------------
2684 This is the interface to the garbage collector from Haskell land.
2685 We provide this so that external C code can allocate and garbage
2686 collect when called from Haskell via _ccall_GC.
2687 -------------------------------------------------------------------------- */
2690 performGC_(rtsBool force_major)
2693 // We must grab a new Task here, because the existing Task may be
2694 // associated with a particular Capability, and chained onto the
2695 // suspended_ccalling_tasks queue.
2696 ACQUIRE_LOCK(&sched_mutex);
2697 task = newBoundTask();
2698 RELEASE_LOCK(&sched_mutex);
2699 scheduleDoGC(NULL,task,force_major);
2700 boundTaskExiting(task);
2706 performGC_(rtsFalse);
2710 performMajorGC(void)
2712 performGC_(rtsTrue);
2715 /* -----------------------------------------------------------------------------
2718 If the thread has reached its maximum stack size, then raise the
2719 StackOverflow exception in the offending thread. Otherwise
2720 relocate the TSO into a larger chunk of memory and adjust its stack
2722 -------------------------------------------------------------------------- */
2725 threadStackOverflow(Capability *cap, StgTSO *tso)
2727 nat new_stack_size, stack_words;
2732 IF_DEBUG(sanity,checkTSO(tso));
2734 // don't allow throwTo() to modify the blocked_exceptions queue
2735 // while we are moving the TSO:
2736 lockClosure((StgClosure *)tso);
2738 if (tso->stack_size >= tso->max_stack_size) {
2740 debugTrace(DEBUG_gc,
2741 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2742 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2744 /* If we're debugging, just print out the top of the stack */
2745 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2748 // Send this thread the StackOverflow exception
2750 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2754 /* Try to double the current stack size. If that takes us over the
2755 * maximum stack size for this thread, then use the maximum instead.
2756 * Finally round up so the TSO ends up as a whole number of blocks.
2758 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2759 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2760 TSO_STRUCT_SIZE)/sizeof(W_);
2761 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2762 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2764 debugTrace(DEBUG_sched,
2765 "increasing stack size from %ld words to %d.",
2766 (long)tso->stack_size, new_stack_size);
2768 dest = (StgTSO *)allocate(new_tso_size);
2769 TICK_ALLOC_TSO(new_stack_size,0);
2771 /* copy the TSO block and the old stack into the new area */
2772 memcpy(dest,tso,TSO_STRUCT_SIZE);
2773 stack_words = tso->stack + tso->stack_size - tso->sp;
2774 new_sp = (P_)dest + new_tso_size - stack_words;
2775 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2777 /* relocate the stack pointers... */
2779 dest->stack_size = new_stack_size;
2781 /* Mark the old TSO as relocated. We have to check for relocated
2782 * TSOs in the garbage collector and any primops that deal with TSOs.
2784 * It's important to set the sp value to just beyond the end
2785 * of the stack, so we don't attempt to scavenge any part of the
2788 tso->what_next = ThreadRelocated;
2790 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2791 tso->why_blocked = NotBlocked;
2793 IF_PAR_DEBUG(verbose,
2794 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2795 tso->id, tso, tso->stack_size);
2796 /* If we're debugging, just print out the top of the stack */
2797 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2803 IF_DEBUG(sanity,checkTSO(dest));
2805 IF_DEBUG(scheduler,printTSO(dest));
2811 /* ---------------------------------------------------------------------------
2813 - usually called inside a signal handler so it mustn't do anything fancy.
2814 ------------------------------------------------------------------------ */
2817 interruptStgRts(void)
2819 sched_state = SCHED_INTERRUPTING;
2824 /* -----------------------------------------------------------------------------
2827 This function causes at least one OS thread to wake up and run the
2828 scheduler loop. It is invoked when the RTS might be deadlocked, or
2829 an external event has arrived that may need servicing (eg. a
2830 keyboard interrupt).
2832 In the single-threaded RTS we don't do anything here; we only have
2833 one thread anyway, and the event that caused us to want to wake up
2834 will have interrupted any blocking system call in progress anyway.
2835 -------------------------------------------------------------------------- */
2840 #if defined(THREADED_RTS)
2841 // This forces the IO Manager thread to wakeup, which will
2842 // in turn ensure that some OS thread wakes up and runs the
2843 // scheduler loop, which will cause a GC and deadlock check.
2848 /* -----------------------------------------------------------------------------
2851 * Check the blackhole_queue for threads that can be woken up. We do
2852 * this periodically: before every GC, and whenever the run queue is
2855 * An elegant solution might be to just wake up all the blocked
2856 * threads with awakenBlockedQueue occasionally: they'll go back to
2857 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2858 * doesn't give us a way to tell whether we've actually managed to
2859 * wake up any threads, so we would be busy-waiting.
2861 * -------------------------------------------------------------------------- */
2864 checkBlackHoles (Capability *cap)
2867 rtsBool any_woke_up = rtsFalse;
2870 // blackhole_queue is global:
2871 ASSERT_LOCK_HELD(&sched_mutex);
2873 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2875 // ASSUMES: sched_mutex
2876 prev = &blackhole_queue;
2877 t = blackhole_queue;
2878 while (t != END_TSO_QUEUE) {
2879 ASSERT(t->why_blocked == BlockedOnBlackHole);
2880 type = get_itbl(t->block_info.closure)->type;
2881 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2882 IF_DEBUG(sanity,checkTSO(t));
2883 t = unblockOne(cap, t);
2884 // urk, the threads migrate to the current capability
2885 // here, but we'd like to keep them on the original one.
2887 any_woke_up = rtsTrue;
2897 /* -----------------------------------------------------------------------------
2900 This is used for interruption (^C) and forking, and corresponds to
2901 raising an exception but without letting the thread catch the
2903 -------------------------------------------------------------------------- */
2906 deleteThread (Capability *cap, StgTSO *tso)
2908 // NOTE: must only be called on a TSO that we have exclusive
2909 // access to, because we will call throwToSingleThreaded() below.
2910 // The TSO must be on the run queue of the Capability we own, or
2911 // we must own all Capabilities.
2913 if (tso->why_blocked != BlockedOnCCall &&
2914 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2915 throwToSingleThreaded(cap,tso,NULL);
2919 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2921 deleteThread_(Capability *cap, StgTSO *tso)
2922 { // for forkProcess only:
2923 // like deleteThread(), but we delete threads in foreign calls, too.
2925 if (tso->why_blocked == BlockedOnCCall ||
2926 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2927 unblockOne(cap,tso);
2928 tso->what_next = ThreadKilled;
2930 deleteThread(cap,tso);
2935 /* -----------------------------------------------------------------------------
2936 raiseExceptionHelper
2938 This function is called by the raise# primitve, just so that we can
2939 move some of the tricky bits of raising an exception from C-- into
2940 C. Who knows, it might be a useful re-useable thing here too.
2941 -------------------------------------------------------------------------- */
2944 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2946 Capability *cap = regTableToCapability(reg);
2947 StgThunk *raise_closure = NULL;
2949 StgRetInfoTable *info;
2951 // This closure represents the expression 'raise# E' where E
2952 // is the exception raise. It is used to overwrite all the
2953 // thunks which are currently under evaluataion.
2956 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2957 // LDV profiling: stg_raise_info has THUNK as its closure
2958 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2959 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2960 // 1 does not cause any problem unless profiling is performed.
2961 // However, when LDV profiling goes on, we need to linearly scan
2962 // small object pool, where raise_closure is stored, so we should
2963 // use MIN_UPD_SIZE.
2965 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2966 // sizeofW(StgClosure)+1);
2970 // Walk up the stack, looking for the catch frame. On the way,
2971 // we update any closures pointed to from update frames with the
2972 // raise closure that we just built.
2976 info = get_ret_itbl((StgClosure *)p);
2977 next = p + stack_frame_sizeW((StgClosure *)p);
2978 switch (info->i.type) {
2981 // Only create raise_closure if we need to.
2982 if (raise_closure == NULL) {
2984 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2985 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2986 raise_closure->payload[0] = exception;
2988 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2992 case ATOMICALLY_FRAME:
2993 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2995 return ATOMICALLY_FRAME;
3001 case CATCH_STM_FRAME:
3002 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3004 return CATCH_STM_FRAME;
3010 case CATCH_RETRY_FRAME:
3019 /* -----------------------------------------------------------------------------
3020 findRetryFrameHelper
3022 This function is called by the retry# primitive. It traverses the stack
3023 leaving tso->sp referring to the frame which should handle the retry.
3025 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3026 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3028 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3029 create) because retries are not considered to be exceptions, despite the
3030 similar implementation.
3032 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3033 not be created within memory transactions.
3034 -------------------------------------------------------------------------- */
3037 findRetryFrameHelper (StgTSO *tso)
3040 StgRetInfoTable *info;
3044 info = get_ret_itbl((StgClosure *)p);
3045 next = p + stack_frame_sizeW((StgClosure *)p);
3046 switch (info->i.type) {
3048 case ATOMICALLY_FRAME:
3049 debugTrace(DEBUG_stm,
3050 "found ATOMICALLY_FRAME at %p during retry", p);
3052 return ATOMICALLY_FRAME;
3054 case CATCH_RETRY_FRAME:
3055 debugTrace(DEBUG_stm,
3056 "found CATCH_RETRY_FRAME at %p during retrry", p);
3058 return CATCH_RETRY_FRAME;
3060 case CATCH_STM_FRAME: {
3061 debugTrace(DEBUG_stm,
3062 "found CATCH_STM_FRAME at %p during retry", p);
3063 StgTRecHeader *trec = tso -> trec;
3064 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3065 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3066 stmAbortTransaction(tso -> cap, trec);
3067 stmFreeAbortedTRec(tso -> cap, trec);
3068 tso -> trec = outer;
3075 ASSERT(info->i.type != CATCH_FRAME);
3076 ASSERT(info->i.type != STOP_FRAME);
3083 /* -----------------------------------------------------------------------------
3084 resurrectThreads is called after garbage collection on the list of
3085 threads found to be garbage. Each of these threads will be woken
3086 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3087 on an MVar, or NonTermination if the thread was blocked on a Black
3090 Locks: assumes we hold *all* the capabilities.
3091 -------------------------------------------------------------------------- */
3094 resurrectThreads (StgTSO *threads)
3099 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3100 next = tso->global_link;
3101 tso->global_link = all_threads;
3103 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3105 // Wake up the thread on the Capability it was last on
3108 switch (tso->why_blocked) {
3110 case BlockedOnException:
3111 /* Called by GC - sched_mutex lock is currently held. */
3112 throwToSingleThreaded(cap, tso,
3113 (StgClosure *)BlockedOnDeadMVar_closure);
3115 case BlockedOnBlackHole:
3116 throwToSingleThreaded(cap, tso,
3117 (StgClosure *)NonTermination_closure);
3120 throwToSingleThreaded(cap, tso,
3121 (StgClosure *)BlockedIndefinitely_closure);
3124 /* This might happen if the thread was blocked on a black hole
3125 * belonging to a thread that we've just woken up (raiseAsync
3126 * can wake up threads, remember...).
3130 barf("resurrectThreads: thread blocked in a strange way");