1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "OSThreads.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
22 #include "RtsSignals.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
45 #include "Capability.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
52 #include "RaiseAsync.h"
54 #include "ThrIOManager.h"
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
71 // Turn off inlining when debugging - it obfuscates things
74 # define STATIC_INLINE static
77 /* -----------------------------------------------------------------------------
79 * -------------------------------------------------------------------------- */
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
87 In GranSim we have a runnable and a blocked queue for each processor.
88 In order to minimise code changes new arrays run_queue_hds/tls
89 are created. run_queue_hd is then a short cut (macro) for
90 run_queue_hds[CurrentProc] (see GranSim.h).
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97 the std RTS (i.e. we are cheating). However, we don't use this list in
98 the GranSim specific code at the moment (so we are only potentially
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
110 /* Threads blocked on blackholes.
111 * LOCK: sched_mutex+capability, or all capabilities
113 StgTSO *blackhole_queue = NULL;
116 /* The blackhole_queue should be checked for threads to wake up. See
117 * Schedule.h for more thorough comment.
118 * LOCK: none (doesn't matter if we miss an update)
120 rtsBool blackholes_need_checking = rtsFalse;
122 /* Linked list of all threads.
123 * Used for detecting garbage collected threads.
124 * LOCK: sched_mutex+capability, or all capabilities
126 StgTSO *all_threads = NULL;
128 /* flag set by signal handler to precipitate a context switch
129 * LOCK: none (just an advisory flag)
131 int context_switch = 0;
133 /* flag that tracks whether we have done any execution in this time slice.
134 * LOCK: currently none, perhaps we should lock (but needs to be
135 * updated in the fast path of the scheduler).
137 nat recent_activity = ACTIVITY_YES;
139 /* if this flag is set as well, give up execution
140 * LOCK: none (changes once, from false->true)
142 rtsBool sched_state = SCHED_RUNNING;
148 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
149 * exists - earlier gccs apparently didn't.
155 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156 * in an MT setting, needed to signal that a worker thread shouldn't hang around
157 * in the scheduler when it is out of work.
159 rtsBool shutting_down_scheduler = rtsFalse;
162 * This mutex protects most of the global scheduler data in
163 * the THREADED_RTS runtime.
165 #if defined(THREADED_RTS)
169 #if defined(PARALLEL_HASKELL)
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
179 /* -----------------------------------------------------------------------------
180 * static function prototypes
181 * -------------------------------------------------------------------------- */
183 static Capability *schedule (Capability *initialCapability, Task *task);
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
215 nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221 rtsBool force_major);
223 static rtsBool checkBlackHoles(Capability *cap);
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);
240 static char *whatNext_strs[] = {
250 /* -----------------------------------------------------------------------------
251 * Putting a thread on the run queue: different scheduling policies
252 * -------------------------------------------------------------------------- */
255 addToRunQueue( Capability *cap, StgTSO *t )
257 #if defined(PARALLEL_HASKELL)
258 if (RtsFlags.ParFlags.doFairScheduling) {
259 // this does round-robin scheduling; good for concurrency
260 appendToRunQueue(cap,t);
262 // this does unfair scheduling; good for parallelism
263 pushOnRunQueue(cap,t);
266 // this does round-robin scheduling; good for concurrency
267 appendToRunQueue(cap,t);
271 /* ---------------------------------------------------------------------------
272 Main scheduling loop.
274 We use round-robin scheduling, each thread returning to the
275 scheduler loop when one of these conditions is detected:
278 * timer expires (thread yields)
284 In a GranSim setup this loop iterates over the global event queue.
285 This revolves around the global event queue, which determines what
286 to do next. Therefore, it's more complicated than either the
287 concurrent or the parallel (GUM) setup.
290 GUM iterates over incoming messages.
291 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292 and sends out a fish whenever it has nothing to do; in-between
293 doing the actual reductions (shared code below) it processes the
294 incoming messages and deals with delayed operations
295 (see PendingFetches).
296 This is not the ugliest code you could imagine, but it's bloody close.
298 ------------------------------------------------------------------------ */
301 schedule (Capability *initialCapability, Task *task)
305 StgThreadReturnCode ret;
308 #elif defined(PARALLEL_HASKELL)
311 rtsBool receivedFinish = rtsFalse;
313 nat tp_size, sp_size; // stats only
318 #if defined(THREADED_RTS)
319 rtsBool first = rtsTrue;
322 cap = initialCapability;
324 // Pre-condition: this task owns initialCapability.
325 // The sched_mutex is *NOT* held
326 // NB. on return, we still hold a capability.
328 debugTrace (DEBUG_sched,
329 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330 task, initialCapability);
334 // -----------------------------------------------------------
335 // Scheduler loop starts here:
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION (!receivedFinish)
340 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
342 #define TERMINATION_CONDITION rtsTrue
345 while (TERMINATION_CONDITION) {
348 /* Choose the processor with the next event */
349 CurrentProc = event->proc;
350 CurrentTSO = event->tso;
353 #if defined(THREADED_RTS)
355 // don't yield the first time, we want a chance to run this
356 // thread for a bit, even if there are others banging at the
359 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
361 // Yield the capability to higher-priority tasks if necessary.
362 yieldCapability(&cap, task);
366 #if defined(THREADED_RTS)
367 schedulePushWork(cap,task);
370 // Check whether we have re-entered the RTS from Haskell without
371 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
373 if (cap->in_haskell) {
374 errorBelch("schedule: re-entered unsafely.\n"
375 " Perhaps a 'foreign import unsafe' should be 'safe'?");
376 stg_exit(EXIT_FAILURE);
379 // The interruption / shutdown sequence.
381 // In order to cleanly shut down the runtime, we want to:
382 // * make sure that all main threads return to their callers
383 // with the state 'Interrupted'.
384 // * clean up all OS threads assocated with the runtime
385 // * free all memory etc.
387 // So the sequence for ^C goes like this:
389 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
390 // arranges for some Capability to wake up
392 // * all threads in the system are halted, and the zombies are
393 // placed on the run queue for cleaning up. We acquire all
394 // the capabilities in order to delete the threads, this is
395 // done by scheduleDoGC() for convenience (because GC already
396 // needs to acquire all the capabilities). We can't kill
397 // threads involved in foreign calls.
399 // * somebody calls shutdownHaskell(), which calls exitScheduler()
401 // * sched_state := SCHED_SHUTTING_DOWN
403 // * all workers exit when the run queue on their capability
404 // drains. All main threads will also exit when their TSO
405 // reaches the head of the run queue and they can return.
407 // * eventually all Capabilities will shut down, and the RTS can
410 // * We might be left with threads blocked in foreign calls,
411 // we should really attempt to kill these somehow (TODO);
413 switch (sched_state) {
416 case SCHED_INTERRUPTING:
417 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419 discardSparksCap(cap);
421 /* scheduleDoGC() deletes all the threads */
422 cap = scheduleDoGC(cap,task,rtsFalse);
424 case SCHED_SHUTTING_DOWN:
425 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426 // If we are a worker, just exit. If we're a bound thread
427 // then we will exit below when we've removed our TSO from
429 if (task->tso == NULL && emptyRunQueue(cap)) {
434 barf("sched_state: %d", sched_state);
437 #if defined(THREADED_RTS)
438 // If the run queue is empty, take a spark and turn it into a thread.
440 if (emptyRunQueue(cap)) {
442 spark = findSpark(cap);
444 debugTrace(DEBUG_sched,
445 "turning spark of closure %p into a thread",
446 (StgClosure *)spark);
447 createSparkThread(cap,spark);
451 #endif // THREADED_RTS
453 scheduleStartSignalHandlers(cap);
455 // Only check the black holes here if we've nothing else to do.
456 // During normal execution, the black hole list only gets checked
457 // at GC time, to avoid repeatedly traversing this possibly long
458 // list each time around the scheduler.
459 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
461 scheduleCheckWakeupThreads(cap);
463 scheduleCheckBlockedThreads(cap);
465 scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467 cap = task->cap; // reload cap, it might have changed
470 // Normally, the only way we can get here with no threads to
471 // run is if a keyboard interrupt received during
472 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473 // Additionally, it is not fatal for the
474 // threaded RTS to reach here with no threads to run.
476 // win32: might be here due to awaitEvent() being abandoned
477 // as a result of a console event having been delivered.
478 if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480 ASSERT(sched_state >= SCHED_INTERRUPTING);
482 continue; // nothing to do
485 #if defined(PARALLEL_HASKELL)
486 scheduleSendPendingMessages();
487 if (emptyRunQueue(cap) && scheduleActivateSpark())
491 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
494 /* If we still have no work we need to send a FISH to get a spark
496 if (emptyRunQueue(cap)) {
497 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498 ASSERT(rtsFalse); // should not happen at the moment
500 // from here: non-empty run queue.
501 // TODO: merge above case with this, only one call processMessages() !
502 if (PacketsWaiting()) { /* process incoming messages, if
503 any pending... only in else
504 because getRemoteWork waits for
506 receivedFinish = processMessages();
511 scheduleProcessEvent(event);
515 // Get a thread to run
517 t = popRunQueue(cap);
519 #if defined(GRAN) || defined(PAR)
520 scheduleGranParReport(); // some kind of debuging output
522 // Sanity check the thread we're about to run. This can be
523 // expensive if there is lots of thread switching going on...
524 IF_DEBUG(sanity,checkTSO(t));
527 #if defined(THREADED_RTS)
528 // Check whether we can run this thread in the current task.
529 // If not, we have to pass our capability to the right task.
531 Task *bound = t->bound;
535 debugTrace(DEBUG_sched,
536 "### Running thread %lu in bound thread", (unsigned long)t->id);
537 // yes, the Haskell thread is bound to the current native thread
539 debugTrace(DEBUG_sched,
540 "### thread %lu bound to another OS thread", (unsigned long)t->id);
541 // no, bound to a different Haskell thread: pass to that thread
542 pushOnRunQueue(cap,t);
546 // The thread we want to run is unbound.
548 debugTrace(DEBUG_sched,
549 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550 // no, the current native thread is bound to a different
551 // Haskell thread, so pass it to any worker thread
552 pushOnRunQueue(cap,t);
559 cap->r.rCurrentTSO = t;
561 /* context switches are initiated by the timer signal, unless
562 * the user specified "context switch as often as possible", with
565 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566 && !emptyThreadQueues(cap)) {
572 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
573 (long)t->id, whatNext_strs[t->what_next]);
575 #if defined(PROFILING)
576 startHeapProfTimer();
579 // Check for exceptions blocked on this thread
580 maybePerformBlockedException (cap, t);
582 // ----------------------------------------------------------------------
583 // Run the current thread
585 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586 ASSERT(t->cap == cap);
588 prev_what_next = t->what_next;
590 errno = t->saved_errno;
592 SetLastError(t->saved_winerror);
595 cap->in_haskell = rtsTrue;
599 recent_activity = ACTIVITY_YES;
601 switch (prev_what_next) {
605 /* Thread already finished, return to scheduler. */
606 ret = ThreadFinished;
612 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
613 cap = regTableToCapability(r);
618 case ThreadInterpret:
619 cap = interpretBCO(cap);
624 barf("schedule: invalid what_next field");
627 cap->in_haskell = rtsFalse;
629 // The TSO might have moved, eg. if it re-entered the RTS and a GC
630 // happened. So find the new location:
631 t = cap->r.rCurrentTSO;
633 // We have run some Haskell code: there might be blackhole-blocked
634 // threads to wake up now.
635 // Lock-free test here should be ok, we're just setting a flag.
636 if ( blackhole_queue != END_TSO_QUEUE ) {
637 blackholes_need_checking = rtsTrue;
640 // And save the current errno in this thread.
641 // XXX: possibly bogus for SMP because this thread might already
642 // be running again, see code below.
643 t->saved_errno = errno;
645 // Similarly for Windows error code
646 t->saved_winerror = GetLastError();
649 #if defined(THREADED_RTS)
650 // If ret is ThreadBlocked, and this Task is bound to the TSO that
651 // blocked, we are in limbo - the TSO is now owned by whatever it
652 // is blocked on, and may in fact already have been woken up,
653 // perhaps even on a different Capability. It may be the case
654 // that task->cap != cap. We better yield this Capability
655 // immediately and return to normaility.
656 if (ret == ThreadBlocked) {
657 debugTrace(DEBUG_sched,
658 "--<< thread %lu (%s) stopped: blocked",
659 (unsigned long)t->id, whatNext_strs[t->what_next]);
664 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
665 ASSERT(t->cap == cap);
667 // ----------------------------------------------------------------------
669 // Costs for the scheduler are assigned to CCS_SYSTEM
670 #if defined(PROFILING)
675 schedulePostRunThread();
677 ready_to_gc = rtsFalse;
681 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
685 scheduleHandleStackOverflow(cap,task,t);
689 if (scheduleHandleYield(cap, t, prev_what_next)) {
690 // shortcut for switching between compiler/interpreter:
696 scheduleHandleThreadBlocked(t);
700 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
701 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
705 barf("schedule: invalid thread return code %d", (int)ret);
708 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
710 cap = scheduleDoGC(cap,task,rtsFalse);
712 } /* end of while() */
714 debugTrace(PAR_DEBUG_verbose,
715 "== Leaving schedule() after having received Finish");
718 /* ----------------------------------------------------------------------------
719 * Setting up the scheduler loop
720 * ------------------------------------------------------------------------- */
723 schedulePreLoop(void)
726 /* set up first event to get things going */
727 /* ToDo: assign costs for system setup and init MainTSO ! */
728 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
730 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
732 debugTrace (DEBUG_gran,
733 "GRAN: Init CurrentTSO (in schedule) = %p",
735 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
737 if (RtsFlags.GranFlags.Light) {
738 /* Save current time; GranSim Light only */
739 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
744 /* -----------------------------------------------------------------------------
747 * Push work to other Capabilities if we have some.
748 * -------------------------------------------------------------------------- */
750 #if defined(THREADED_RTS)
752 schedulePushWork(Capability *cap USED_IF_THREADS,
753 Task *task USED_IF_THREADS)
755 Capability *free_caps[n_capabilities], *cap0;
758 // migration can be turned off with +RTS -qg
759 if (!RtsFlags.ParFlags.migrate) return;
761 // Check whether we have more threads on our run queue, or sparks
762 // in our pool, that we could hand to another Capability.
763 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
764 && sparkPoolSizeCap(cap) < 2) {
768 // First grab as many free Capabilities as we can.
769 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
770 cap0 = &capabilities[i];
771 if (cap != cap0 && tryGrabCapability(cap0,task)) {
772 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
773 // it already has some work, we just grabbed it at
774 // the wrong moment. Or maybe it's deadlocked!
775 releaseCapability(cap0);
777 free_caps[n_free_caps++] = cap0;
782 // we now have n_free_caps free capabilities stashed in
783 // free_caps[]. Share our run queue equally with them. This is
784 // probably the simplest thing we could do; improvements we might
785 // want to do include:
787 // - giving high priority to moving relatively new threads, on
788 // the gournds that they haven't had time to build up a
789 // working set in the cache on this CPU/Capability.
791 // - giving low priority to moving long-lived threads
793 if (n_free_caps > 0) {
794 StgTSO *prev, *t, *next;
795 rtsBool pushed_to_all;
797 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
800 pushed_to_all = rtsFalse;
802 if (cap->run_queue_hd != END_TSO_QUEUE) {
803 prev = cap->run_queue_hd;
805 prev->link = END_TSO_QUEUE;
806 for (; t != END_TSO_QUEUE; t = next) {
808 t->link = END_TSO_QUEUE;
809 if (t->what_next == ThreadRelocated
810 || t->bound == task // don't move my bound thread
811 || tsoLocked(t)) { // don't move a locked thread
814 } else if (i == n_free_caps) {
815 pushed_to_all = rtsTrue;
821 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
822 appendToRunQueue(free_caps[i],t);
823 if (t->bound) { t->bound->cap = free_caps[i]; }
824 t->cap = free_caps[i];
828 cap->run_queue_tl = prev;
831 // If there are some free capabilities that we didn't push any
832 // threads to, then try to push a spark to each one.
833 if (!pushed_to_all) {
835 // i is the next free capability to push to
836 for (; i < n_free_caps; i++) {
837 if (emptySparkPoolCap(free_caps[i])) {
838 spark = findSpark(cap);
840 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841 newSpark(&(free_caps[i]->r), spark);
847 // release the capabilities
848 for (i = 0; i < n_free_caps; i++) {
849 task->cap = free_caps[i];
850 releaseCapability(free_caps[i]);
853 task->cap = cap; // reset to point to our Capability.
857 /* ----------------------------------------------------------------------------
858 * Start any pending signal handlers
859 * ------------------------------------------------------------------------- */
861 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
863 scheduleStartSignalHandlers(Capability *cap)
865 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
866 // safe outside the lock
867 startSignalHandlers(cap);
872 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
877 /* ----------------------------------------------------------------------------
878 * Check for blocked threads that can be woken up.
879 * ------------------------------------------------------------------------- */
882 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
884 #if !defined(THREADED_RTS)
886 // Check whether any waiting threads need to be woken up. If the
887 // run queue is empty, and there are no other tasks running, we
888 // can wait indefinitely for something to happen.
890 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
892 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898 /* ----------------------------------------------------------------------------
899 * Check for threads woken up by other Capabilities
900 * ------------------------------------------------------------------------- */
903 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
905 #if defined(THREADED_RTS)
906 // Any threads that were woken up by other Capabilities get
907 // appended to our run queue.
908 if (!emptyWakeupQueue(cap)) {
909 ACQUIRE_LOCK(&cap->lock);
910 if (emptyRunQueue(cap)) {
911 cap->run_queue_hd = cap->wakeup_queue_hd;
912 cap->run_queue_tl = cap->wakeup_queue_tl;
914 cap->run_queue_tl->link = cap->wakeup_queue_hd;
915 cap->run_queue_tl = cap->wakeup_queue_tl;
917 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
918 RELEASE_LOCK(&cap->lock);
923 /* ----------------------------------------------------------------------------
924 * Check for threads blocked on BLACKHOLEs that can be woken up
925 * ------------------------------------------------------------------------- */
927 scheduleCheckBlackHoles (Capability *cap)
929 if ( blackholes_need_checking ) // check without the lock first
931 ACQUIRE_LOCK(&sched_mutex);
932 if ( blackholes_need_checking ) {
933 checkBlackHoles(cap);
934 blackholes_need_checking = rtsFalse;
936 RELEASE_LOCK(&sched_mutex);
940 /* ----------------------------------------------------------------------------
941 * Detect deadlock conditions and attempt to resolve them.
942 * ------------------------------------------------------------------------- */
945 scheduleDetectDeadlock (Capability *cap, Task *task)
948 #if defined(PARALLEL_HASKELL)
949 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
954 * Detect deadlock: when we have no threads to run, there are no
955 * threads blocked, waiting for I/O, or sleeping, and all the
956 * other tasks are waiting for work, we must have a deadlock of
959 if ( emptyThreadQueues(cap) )
961 #if defined(THREADED_RTS)
963 * In the threaded RTS, we only check for deadlock if there
964 * has been no activity in a complete timeslice. This means
965 * we won't eagerly start a full GC just because we don't have
966 * any threads to run currently.
968 if (recent_activity != ACTIVITY_INACTIVE) return;
971 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
973 // Garbage collection can release some new threads due to
974 // either (a) finalizers or (b) threads resurrected because
975 // they are unreachable and will therefore be sent an
976 // exception. Any threads thus released will be immediately
978 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
980 recent_activity = ACTIVITY_DONE_GC;
982 if ( !emptyRunQueue(cap) ) return;
984 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
985 /* If we have user-installed signal handlers, then wait
986 * for signals to arrive rather then bombing out with a
989 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
990 debugTrace(DEBUG_sched,
991 "still deadlocked, waiting for signals...");
995 if (signals_pending()) {
996 startSignalHandlers(cap);
999 // either we have threads to run, or we were interrupted:
1000 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1004 #if !defined(THREADED_RTS)
1005 /* Probably a real deadlock. Send the current main thread the
1006 * Deadlock exception.
1009 switch (task->tso->why_blocked) {
1011 case BlockedOnBlackHole:
1012 case BlockedOnException:
1014 throwToSingleThreaded(cap, task->tso,
1015 (StgClosure *)NonTermination_closure);
1018 barf("deadlock: main thread blocked in a strange way");
1026 /* ----------------------------------------------------------------------------
1027 * Process an event (GRAN only)
1028 * ------------------------------------------------------------------------- */
1032 scheduleProcessEvent(rtsEvent *event)
1036 if (RtsFlags.GranFlags.Light)
1037 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1039 /* adjust time based on time-stamp */
1040 if (event->time > CurrentTime[CurrentProc] &&
1041 event->evttype != ContinueThread)
1042 CurrentTime[CurrentProc] = event->time;
1044 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1045 if (!RtsFlags.GranFlags.Light)
1048 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1050 /* main event dispatcher in GranSim */
1051 switch (event->evttype) {
1052 /* Should just be continuing execution */
1053 case ContinueThread:
1054 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1055 /* ToDo: check assertion
1056 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1057 run_queue_hd != END_TSO_QUEUE);
1059 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1060 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1061 procStatus[CurrentProc]==Fetching) {
1062 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1063 CurrentTSO->id, CurrentTSO, CurrentProc);
1066 /* Ignore ContinueThreads for completed threads */
1067 if (CurrentTSO->what_next == ThreadComplete) {
1068 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1069 CurrentTSO->id, CurrentTSO, CurrentProc);
1072 /* Ignore ContinueThreads for threads that are being migrated */
1073 if (PROCS(CurrentTSO)==Nowhere) {
1074 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1075 CurrentTSO->id, CurrentTSO, CurrentProc);
1078 /* The thread should be at the beginning of the run queue */
1079 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1080 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1081 CurrentTSO->id, CurrentTSO, CurrentProc);
1082 break; // run the thread anyway
1085 new_event(proc, proc, CurrentTime[proc],
1087 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1089 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1090 break; // now actually run the thread; DaH Qu'vam yImuHbej
1093 do_the_fetchnode(event);
1094 goto next_thread; /* handle next event in event queue */
1097 do_the_globalblock(event);
1098 goto next_thread; /* handle next event in event queue */
1101 do_the_fetchreply(event);
1102 goto next_thread; /* handle next event in event queue */
1104 case UnblockThread: /* Move from the blocked queue to the tail of */
1105 do_the_unblock(event);
1106 goto next_thread; /* handle next event in event queue */
1108 case ResumeThread: /* Move from the blocked queue to the tail of */
1109 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1110 event->tso->gran.blocktime +=
1111 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1112 do_the_startthread(event);
1113 goto next_thread; /* handle next event in event queue */
1116 do_the_startthread(event);
1117 goto next_thread; /* handle next event in event queue */
1120 do_the_movethread(event);
1121 goto next_thread; /* handle next event in event queue */
1124 do_the_movespark(event);
1125 goto next_thread; /* handle next event in event queue */
1128 do_the_findwork(event);
1129 goto next_thread; /* handle next event in event queue */
1132 barf("Illegal event type %u\n", event->evttype);
1135 /* This point was scheduler_loop in the old RTS */
1137 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1139 TimeOfLastEvent = CurrentTime[CurrentProc];
1140 TimeOfNextEvent = get_time_of_next_event();
1141 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1142 // CurrentTSO = ThreadQueueHd;
1144 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1147 if (RtsFlags.GranFlags.Light)
1148 GranSimLight_leave_system(event, &ActiveTSO);
1150 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1153 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1155 /* in a GranSim setup the TSO stays on the run queue */
1157 /* Take a thread from the run queue. */
1158 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1161 debugBelch("GRAN: About to run current thread, which is\n");
1164 context_switch = 0; // turned on via GranYield, checking events and time slice
1167 DumpGranEvent(GR_SCHEDULE, t));
1169 procStatus[CurrentProc] = Busy;
1173 /* ----------------------------------------------------------------------------
1174 * Send pending messages (PARALLEL_HASKELL only)
1175 * ------------------------------------------------------------------------- */
1177 #if defined(PARALLEL_HASKELL)
1179 scheduleSendPendingMessages(void)
1185 # if defined(PAR) // global Mem.Mgmt., omit for now
1186 if (PendingFetches != END_BF_QUEUE) {
1191 if (RtsFlags.ParFlags.BufferTime) {
1192 // if we use message buffering, we must send away all message
1193 // packets which have become too old...
1199 /* ----------------------------------------------------------------------------
1200 * Activate spark threads (PARALLEL_HASKELL only)
1201 * ------------------------------------------------------------------------- */
1203 #if defined(PARALLEL_HASKELL)
1205 scheduleActivateSpark(void)
1208 ASSERT(emptyRunQueue());
1209 /* We get here if the run queue is empty and want some work.
1210 We try to turn a spark into a thread, and add it to the run queue,
1211 from where it will be picked up in the next iteration of the scheduler
1215 /* :-[ no local threads => look out for local sparks */
1216 /* the spark pool for the current PE */
1217 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1218 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1219 pool->hd < pool->tl) {
1221 * ToDo: add GC code check that we really have enough heap afterwards!!
1223 * If we're here (no runnable threads) and we have pending
1224 * sparks, we must have a space problem. Get enough space
1225 * to turn one of those pending sparks into a
1229 spark = findSpark(rtsFalse); /* get a spark */
1230 if (spark != (rtsSpark) NULL) {
1231 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1232 IF_PAR_DEBUG(fish, // schedule,
1233 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1234 tso->id, tso, advisory_thread_count));
1236 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1237 IF_PAR_DEBUG(fish, // schedule,
1238 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1240 return rtsFalse; /* failed to generate a thread */
1241 } /* otherwise fall through & pick-up new tso */
1243 IF_PAR_DEBUG(fish, // schedule,
1244 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1245 spark_queue_len(pool)));
1246 return rtsFalse; /* failed to generate a thread */
1248 return rtsTrue; /* success in generating a thread */
1249 } else { /* no more threads permitted or pool empty */
1250 return rtsFalse; /* failed to generateThread */
1253 tso = NULL; // avoid compiler warning only
1254 return rtsFalse; /* dummy in non-PAR setup */
1257 #endif // PARALLEL_HASKELL
1259 /* ----------------------------------------------------------------------------
1260 * Get work from a remote node (PARALLEL_HASKELL only)
1261 * ------------------------------------------------------------------------- */
1263 #if defined(PARALLEL_HASKELL)
1265 scheduleGetRemoteWork(rtsBool *receivedFinish)
1267 ASSERT(emptyRunQueue());
1269 if (RtsFlags.ParFlags.BufferTime) {
1270 IF_PAR_DEBUG(verbose,
1271 debugBelch("...send all pending data,"));
1274 for (i=1; i<=nPEs; i++)
1275 sendImmediately(i); // send all messages away immediately
1279 //++EDEN++ idle() , i.e. send all buffers, wait for work
1280 // suppress fishing in EDEN... just look for incoming messages
1281 // (blocking receive)
1282 IF_PAR_DEBUG(verbose,
1283 debugBelch("...wait for incoming messages...\n"));
1284 *receivedFinish = processMessages(); // blocking receive...
1286 // and reenter scheduling loop after having received something
1287 // (return rtsFalse below)
1289 # else /* activate SPARKS machinery */
1290 /* We get here, if we have no work, tried to activate a local spark, but still
1291 have no work. We try to get a remote spark, by sending a FISH message.
1292 Thread migration should be added here, and triggered when a sequence of
1293 fishes returns without work. */
1294 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1296 /* =8-[ no local sparks => look for work on other PEs */
1298 * We really have absolutely no work. Send out a fish
1299 * (there may be some out there already), and wait for
1300 * something to arrive. We clearly can't run any threads
1301 * until a SCHEDULE or RESUME arrives, and so that's what
1302 * we're hoping to see. (Of course, we still have to
1303 * respond to other types of messages.)
1305 rtsTime now = msTime() /*CURRENT_TIME*/;
1306 IF_PAR_DEBUG(verbose,
1307 debugBelch("-- now=%ld\n", now));
1308 IF_PAR_DEBUG(fish, // verbose,
1309 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1310 (last_fish_arrived_at!=0 &&
1311 last_fish_arrived_at+delay > now)) {
1312 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1313 now, last_fish_arrived_at+delay,
1314 last_fish_arrived_at,
1318 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1319 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1320 if (last_fish_arrived_at==0 ||
1321 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1322 /* outstandingFishes is set in sendFish, processFish;
1323 avoid flooding system with fishes via delay */
1324 next_fish_to_send_at = 0;
1326 /* ToDo: this should be done in the main scheduling loop to avoid the
1327 busy wait here; not so bad if fish delay is very small */
1328 int iq = 0; // DEBUGGING -- HWL
1329 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1330 /* send a fish when ready, but process messages that arrive in the meantime */
1332 if (PacketsWaiting()) {
1334 *receivedFinish = processMessages();
1337 } while (!*receivedFinish || now<next_fish_to_send_at);
1338 // JB: This means the fish could become obsolete, if we receive
1339 // work. Better check for work again?
1340 // last line: while (!receivedFinish || !haveWork || now<...)
1341 // next line: if (receivedFinish || haveWork )
1343 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1344 return rtsFalse; // NB: this will leave scheduler loop
1345 // immediately after return!
1347 IF_PAR_DEBUG(fish, // verbose,
1348 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1352 // JB: IMHO, this should all be hidden inside sendFish(...)
1354 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1357 // Global statistics: count no. of fishes
1358 if (RtsFlags.ParFlags.ParStats.Global &&
1359 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1360 globalParStats.tot_fish_mess++;
1364 /* delayed fishes must have been sent by now! */
1365 next_fish_to_send_at = 0;
1368 *receivedFinish = processMessages();
1369 # endif /* SPARKS */
1372 /* NB: this function always returns rtsFalse, meaning the scheduler
1373 loop continues with the next iteration;
1375 return code means success in finding work; we enter this function
1376 if there is no local work, thus have to send a fish which takes
1377 time until it arrives with work; in the meantime we should process
1378 messages in the main loop;
1381 #endif // PARALLEL_HASKELL
1383 /* ----------------------------------------------------------------------------
1384 * PAR/GRAN: Report stats & debugging info(?)
1385 * ------------------------------------------------------------------------- */
1387 #if defined(PAR) || defined(GRAN)
1389 scheduleGranParReport(void)
1391 ASSERT(run_queue_hd != END_TSO_QUEUE);
1393 /* Take a thread from the run queue, if we have work */
1394 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1396 /* If this TSO has got its outport closed in the meantime,
1397 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1398 * It has to be marked as TH_DEAD for this purpose.
1399 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1401 JB: TODO: investigate wether state change field could be nuked
1402 entirely and replaced by the normal tso state (whatnext
1403 field). All we want to do is to kill tsos from outside.
1406 /* ToDo: write something to the log-file
1407 if (RTSflags.ParFlags.granSimStats && !sameThread)
1408 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1412 /* the spark pool for the current PE */
1413 pool = &(cap.r.rSparks); // cap = (old) MainCap
1416 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1417 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1420 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1421 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1423 if (RtsFlags.ParFlags.ParStats.Full &&
1424 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1425 (emitSchedule || // forced emit
1426 (t && LastTSO && t->id != LastTSO->id))) {
1428 we are running a different TSO, so write a schedule event to log file
1429 NB: If we use fair scheduling we also have to write a deschedule
1430 event for LastTSO; with unfair scheduling we know that the
1431 previous tso has blocked whenever we switch to another tso, so
1432 we don't need it in GUM for now
1434 IF_PAR_DEBUG(fish, // schedule,
1435 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1437 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1438 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1439 emitSchedule = rtsFalse;
1444 /* ----------------------------------------------------------------------------
1445 * After running a thread...
1446 * ------------------------------------------------------------------------- */
1449 schedulePostRunThread(void)
1452 /* HACK 675: if the last thread didn't yield, make sure to print a
1453 SCHEDULE event to the log file when StgRunning the next thread, even
1454 if it is the same one as before */
1456 TimeOfLastYield = CURRENT_TIME;
1459 /* some statistics gathering in the parallel case */
1461 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1465 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1466 globalGranStats.tot_heapover++;
1468 globalParStats.tot_heapover++;
1475 DumpGranEvent(GR_DESCHEDULE, t));
1476 globalGranStats.tot_stackover++;
1479 // DumpGranEvent(GR_DESCHEDULE, t);
1480 globalParStats.tot_stackover++;
1484 case ThreadYielding:
1487 DumpGranEvent(GR_DESCHEDULE, t));
1488 globalGranStats.tot_yields++;
1491 // DumpGranEvent(GR_DESCHEDULE, t);
1492 globalParStats.tot_yields++;
1498 debugTrace(DEBUG_sched,
1499 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1500 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1501 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1502 if (t->block_info.closure!=(StgClosure*)NULL)
1503 print_bq(t->block_info.closure);
1506 // ??? needed; should emit block before
1508 DumpGranEvent(GR_DESCHEDULE, t));
1509 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1512 ASSERT(procStatus[CurrentProc]==Busy ||
1513 ((procStatus[CurrentProc]==Fetching) &&
1514 (t->block_info.closure!=(StgClosure*)NULL)));
1515 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1516 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1517 procStatus[CurrentProc]==Fetching))
1518 procStatus[CurrentProc] = Idle;
1521 //++PAR++ blockThread() writes the event (change?)
1525 case ThreadFinished:
1529 barf("parGlobalStats: unknown return code");
1535 /* -----------------------------------------------------------------------------
1536 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1537 * -------------------------------------------------------------------------- */
1540 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1542 // did the task ask for a large block?
1543 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1544 // if so, get one and push it on the front of the nursery.
1548 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1550 debugTrace(DEBUG_sched,
1551 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1552 (long)t->id, whatNext_strs[t->what_next], blocks);
1554 // don't do this if the nursery is (nearly) full, we'll GC first.
1555 if (cap->r.rCurrentNursery->link != NULL ||
1556 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1557 // if the nursery has only one block.
1560 bd = allocGroup( blocks );
1562 cap->r.rNursery->n_blocks += blocks;
1564 // link the new group into the list
1565 bd->link = cap->r.rCurrentNursery;
1566 bd->u.back = cap->r.rCurrentNursery->u.back;
1567 if (cap->r.rCurrentNursery->u.back != NULL) {
1568 cap->r.rCurrentNursery->u.back->link = bd;
1570 #if !defined(THREADED_RTS)
1571 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1572 g0s0 == cap->r.rNursery);
1574 cap->r.rNursery->blocks = bd;
1576 cap->r.rCurrentNursery->u.back = bd;
1578 // initialise it as a nursery block. We initialise the
1579 // step, gen_no, and flags field of *every* sub-block in
1580 // this large block, because this is easier than making
1581 // sure that we always find the block head of a large
1582 // block whenever we call Bdescr() (eg. evacuate() and
1583 // isAlive() in the GC would both have to do this, at
1587 for (x = bd; x < bd + blocks; x++) {
1588 x->step = cap->r.rNursery;
1594 // This assert can be a killer if the app is doing lots
1595 // of large block allocations.
1596 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1598 // now update the nursery to point to the new block
1599 cap->r.rCurrentNursery = bd;
1601 // we might be unlucky and have another thread get on the
1602 // run queue before us and steal the large block, but in that
1603 // case the thread will just end up requesting another large
1605 pushOnRunQueue(cap,t);
1606 return rtsFalse; /* not actually GC'ing */
1610 debugTrace(DEBUG_sched,
1611 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1612 (long)t->id, whatNext_strs[t->what_next]);
1615 ASSERT(!is_on_queue(t,CurrentProc));
1616 #elif defined(PARALLEL_HASKELL)
1617 /* Currently we emit a DESCHEDULE event before GC in GUM.
1618 ToDo: either add separate event to distinguish SYSTEM time from rest
1619 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1620 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1621 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1622 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1623 emitSchedule = rtsTrue;
1627 pushOnRunQueue(cap,t);
1629 /* actual GC is done at the end of the while loop in schedule() */
1632 /* -----------------------------------------------------------------------------
1633 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1634 * -------------------------------------------------------------------------- */
1637 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1639 debugTrace (DEBUG_sched,
1640 "--<< thread %ld (%s) stopped, StackOverflow",
1641 (long)t->id, whatNext_strs[t->what_next]);
1643 /* just adjust the stack for this thread, then pop it back
1647 /* enlarge the stack */
1648 StgTSO *new_t = threadStackOverflow(cap, t);
1650 /* The TSO attached to this Task may have moved, so update the
1653 if (task->tso == t) {
1656 pushOnRunQueue(cap,new_t);
1660 /* -----------------------------------------------------------------------------
1661 * Handle a thread that returned to the scheduler with ThreadYielding
1662 * -------------------------------------------------------------------------- */
1665 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1667 // Reset the context switch flag. We don't do this just before
1668 // running the thread, because that would mean we would lose ticks
1669 // during GC, which can lead to unfair scheduling (a thread hogs
1670 // the CPU because the tick always arrives during GC). This way
1671 // penalises threads that do a lot of allocation, but that seems
1672 // better than the alternative.
1675 /* put the thread back on the run queue. Then, if we're ready to
1676 * GC, check whether this is the last task to stop. If so, wake
1677 * up the GC thread. getThread will block during a GC until the
1681 if (t->what_next != prev_what_next) {
1682 debugTrace(DEBUG_sched,
1683 "--<< thread %ld (%s) stopped to switch evaluators",
1684 (long)t->id, whatNext_strs[t->what_next]);
1686 debugTrace(DEBUG_sched,
1687 "--<< thread %ld (%s) stopped, yielding",
1688 (long)t->id, whatNext_strs[t->what_next]);
1693 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1695 ASSERT(t->link == END_TSO_QUEUE);
1697 // Shortcut if we're just switching evaluators: don't bother
1698 // doing stack squeezing (which can be expensive), just run the
1700 if (t->what_next != prev_what_next) {
1705 ASSERT(!is_on_queue(t,CurrentProc));
1708 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1709 checkThreadQsSanity(rtsTrue));
1713 addToRunQueue(cap,t);
1716 /* add a ContinueThread event to actually process the thread */
1717 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1719 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1721 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1728 /* -----------------------------------------------------------------------------
1729 * Handle a thread that returned to the scheduler with ThreadBlocked
1730 * -------------------------------------------------------------------------- */
1733 scheduleHandleThreadBlocked( StgTSO *t
1734 #if !defined(GRAN) && !defined(DEBUG)
1741 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1742 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1743 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1745 // ??? needed; should emit block before
1747 DumpGranEvent(GR_DESCHEDULE, t));
1748 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1751 ASSERT(procStatus[CurrentProc]==Busy ||
1752 ((procStatus[CurrentProc]==Fetching) &&
1753 (t->block_info.closure!=(StgClosure*)NULL)));
1754 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1755 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1756 procStatus[CurrentProc]==Fetching))
1757 procStatus[CurrentProc] = Idle;
1761 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1762 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1765 if (t->block_info.closure!=(StgClosure*)NULL)
1766 print_bq(t->block_info.closure));
1768 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1771 /* whatever we schedule next, we must log that schedule */
1772 emitSchedule = rtsTrue;
1776 // We don't need to do anything. The thread is blocked, and it
1777 // has tidied up its stack and placed itself on whatever queue
1778 // it needs to be on.
1780 // ASSERT(t->why_blocked != NotBlocked);
1781 // Not true: for example,
1782 // - in THREADED_RTS, the thread may already have been woken
1783 // up by another Capability. This actually happens: try
1784 // conc023 +RTS -N2.
1785 // - the thread may have woken itself up already, because
1786 // threadPaused() might have raised a blocked throwTo
1787 // exception, see maybePerformBlockedException().
1790 if (traceClass(DEBUG_sched)) {
1791 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1792 (unsigned long)t->id, whatNext_strs[t->what_next]);
1793 printThreadBlockage(t);
1798 /* Only for dumping event to log file
1799 ToDo: do I need this in GranSim, too?
1805 /* -----------------------------------------------------------------------------
1806 * Handle a thread that returned to the scheduler with ThreadFinished
1807 * -------------------------------------------------------------------------- */
1810 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1812 /* Need to check whether this was a main thread, and if so,
1813 * return with the return value.
1815 * We also end up here if the thread kills itself with an
1816 * uncaught exception, see Exception.cmm.
1818 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1819 (unsigned long)t->id, whatNext_strs[t->what_next]);
1821 /* Inform the Hpc that a thread has finished */
1822 hs_hpc_thread_finished_event(t);
1825 endThread(t, CurrentProc); // clean-up the thread
1826 #elif defined(PARALLEL_HASKELL)
1827 /* For now all are advisory -- HWL */
1828 //if(t->priority==AdvisoryPriority) ??
1829 advisory_thread_count--; // JB: Caution with this counter, buggy!
1832 if(t->dist.priority==RevalPriority)
1836 # if defined(EDENOLD)
1837 // the thread could still have an outport... (BUG)
1838 if (t->eden.outport != -1) {
1839 // delete the outport for the tso which has finished...
1840 IF_PAR_DEBUG(eden_ports,
1841 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1842 t->eden.outport, t->id));
1845 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1846 if (t->eden.epid != -1) {
1847 IF_PAR_DEBUG(eden_ports,
1848 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1849 t->id, t->eden.epid));
1850 removeTSOfromProcess(t);
1855 if (RtsFlags.ParFlags.ParStats.Full &&
1856 !RtsFlags.ParFlags.ParStats.Suppressed)
1857 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1859 // t->par only contains statistics: left out for now...
1861 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1862 t->id,t,t->par.sparkname));
1864 #endif // PARALLEL_HASKELL
1867 // Check whether the thread that just completed was a bound
1868 // thread, and if so return with the result.
1870 // There is an assumption here that all thread completion goes
1871 // through this point; we need to make sure that if a thread
1872 // ends up in the ThreadKilled state, that it stays on the run
1873 // queue so it can be dealt with here.
1878 if (t->bound != task) {
1879 #if !defined(THREADED_RTS)
1880 // Must be a bound thread that is not the topmost one. Leave
1881 // it on the run queue until the stack has unwound to the
1882 // point where we can deal with this. Leaving it on the run
1883 // queue also ensures that the garbage collector knows about
1884 // this thread and its return value (it gets dropped from the
1885 // all_threads list so there's no other way to find it).
1886 appendToRunQueue(cap,t);
1889 // this cannot happen in the threaded RTS, because a
1890 // bound thread can only be run by the appropriate Task.
1891 barf("finished bound thread that isn't mine");
1895 ASSERT(task->tso == t);
1897 if (t->what_next == ThreadComplete) {
1899 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1900 *(task->ret) = (StgClosure *)task->tso->sp[1];
1902 task->stat = Success;
1905 *(task->ret) = NULL;
1907 if (sched_state >= SCHED_INTERRUPTING) {
1908 task->stat = Interrupted;
1910 task->stat = Killed;
1914 removeThreadLabel((StgWord)task->tso->id);
1916 return rtsTrue; // tells schedule() to return
1922 /* -----------------------------------------------------------------------------
1923 * Perform a heap census, if PROFILING
1924 * -------------------------------------------------------------------------- */
1927 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1929 #if defined(PROFILING)
1930 // When we have +RTS -i0 and we're heap profiling, do a census at
1931 // every GC. This lets us get repeatable runs for debugging.
1932 if (performHeapProfile ||
1933 (RtsFlags.ProfFlags.profileInterval==0 &&
1934 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1936 // checking black holes is necessary before GC, otherwise
1937 // there may be threads that are unreachable except by the
1938 // blackhole queue, which the GC will consider to be
1940 scheduleCheckBlackHoles(&MainCapability);
1942 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1943 GarbageCollect(rtsTrue);
1945 debugTrace(DEBUG_sched, "performing heap census");
1948 performHeapProfile = rtsFalse;
1949 return rtsTrue; // true <=> we already GC'd
1955 /* -----------------------------------------------------------------------------
1956 * Perform a garbage collection if necessary
1957 * -------------------------------------------------------------------------- */
1960 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1964 static volatile StgWord waiting_for_gc;
1965 rtsBool was_waiting;
1970 // In order to GC, there must be no threads running Haskell code.
1971 // Therefore, the GC thread needs to hold *all* the capabilities,
1972 // and release them after the GC has completed.
1974 // This seems to be the simplest way: previous attempts involved
1975 // making all the threads with capabilities give up their
1976 // capabilities and sleep except for the *last* one, which
1977 // actually did the GC. But it's quite hard to arrange for all
1978 // the other tasks to sleep and stay asleep.
1981 was_waiting = cas(&waiting_for_gc, 0, 1);
1984 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1985 if (cap) yieldCapability(&cap,task);
1986 } while (waiting_for_gc);
1987 return cap; // NOTE: task->cap might have changed here
1990 for (i=0; i < n_capabilities; i++) {
1991 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1992 if (cap != &capabilities[i]) {
1993 Capability *pcap = &capabilities[i];
1994 // we better hope this task doesn't get migrated to
1995 // another Capability while we're waiting for this one.
1996 // It won't, because load balancing happens while we have
1997 // all the Capabilities, but even so it's a slightly
1998 // unsavoury invariant.
2001 waitForReturnCapability(&pcap, task);
2002 if (pcap != &capabilities[i]) {
2003 barf("scheduleDoGC: got the wrong capability");
2008 waiting_for_gc = rtsFalse;
2011 /* Kick any transactions which are invalid back to their
2012 * atomically frames. When next scheduled they will try to
2013 * commit, this commit will fail and they will retry.
2018 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2019 if (t->what_next == ThreadRelocated) {
2022 next = t->global_link;
2024 // This is a good place to check for blocked
2025 // exceptions. It might be the case that a thread is
2026 // blocked on delivering an exception to a thread that
2027 // is also blocked - we try to ensure that this
2028 // doesn't happen in throwTo(), but it's too hard (or
2029 // impossible) to close all the race holes, so we
2030 // accept that some might get through and deal with
2031 // them here. A GC will always happen at some point,
2032 // even if the system is otherwise deadlocked.
2033 maybePerformBlockedException (&capabilities[0], t);
2035 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2036 if (!stmValidateNestOfTransactions (t -> trec)) {
2037 debugTrace(DEBUG_sched | DEBUG_stm,
2038 "trec %p found wasting its time", t);
2040 // strip the stack back to the
2041 // ATOMICALLY_FRAME, aborting the (nested)
2042 // transaction, and saving the stack of any
2043 // partially-evaluated thunks on the heap.
2044 throwToSingleThreaded_(&capabilities[0], t,
2045 NULL, rtsTrue, NULL);
2048 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2056 // so this happens periodically:
2057 if (cap) scheduleCheckBlackHoles(cap);
2059 IF_DEBUG(scheduler, printAllThreads());
2062 * We now have all the capabilities; if we're in an interrupting
2063 * state, then we should take the opportunity to delete all the
2064 * threads in the system.
2066 if (sched_state >= SCHED_INTERRUPTING) {
2067 deleteAllThreads(&capabilities[0]);
2068 sched_state = SCHED_SHUTTING_DOWN;
2071 /* everybody back, start the GC.
2072 * Could do it in this thread, or signal a condition var
2073 * to do it in another thread. Either way, we need to
2074 * broadcast on gc_pending_cond afterward.
2076 #if defined(THREADED_RTS)
2077 debugTrace(DEBUG_sched, "doing GC");
2079 GarbageCollect(force_major);
2081 #if defined(THREADED_RTS)
2082 // release our stash of capabilities.
2083 for (i = 0; i < n_capabilities; i++) {
2084 if (cap != &capabilities[i]) {
2085 task->cap = &capabilities[i];
2086 releaseCapability(&capabilities[i]);
2097 /* add a ContinueThread event to continue execution of current thread */
2098 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2100 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2102 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2110 /* ---------------------------------------------------------------------------
2111 * Singleton fork(). Do not copy any running threads.
2112 * ------------------------------------------------------------------------- */
2115 forkProcess(HsStablePtr *entry
2116 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2121 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2127 #if defined(THREADED_RTS)
2128 if (RtsFlags.ParFlags.nNodes > 1) {
2129 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2130 stg_exit(EXIT_FAILURE);
2134 debugTrace(DEBUG_sched, "forking!");
2136 // ToDo: for SMP, we should probably acquire *all* the capabilities
2141 if (pid) { // parent
2143 // just return the pid
2149 // Now, all OS threads except the thread that forked are
2150 // stopped. We need to stop all Haskell threads, including
2151 // those involved in foreign calls. Also we need to delete
2152 // all Tasks, because they correspond to OS threads that are
2155 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2156 if (t->what_next == ThreadRelocated) {
2159 next = t->global_link;
2160 // don't allow threads to catch the ThreadKilled
2161 // exception, but we do want to raiseAsync() because these
2162 // threads may be evaluating thunks that we need later.
2163 deleteThread_(cap,t);
2167 // Empty the run queue. It seems tempting to let all the
2168 // killed threads stay on the run queue as zombies to be
2169 // cleaned up later, but some of them correspond to bound
2170 // threads for which the corresponding Task does not exist.
2171 cap->run_queue_hd = END_TSO_QUEUE;
2172 cap->run_queue_tl = END_TSO_QUEUE;
2174 // Any suspended C-calling Tasks are no more, their OS threads
2176 cap->suspended_ccalling_tasks = NULL;
2178 // Empty the all_threads list. Otherwise, the garbage
2179 // collector may attempt to resurrect some of these threads.
2180 all_threads = END_TSO_QUEUE;
2182 // Wipe the task list, except the current Task.
2183 ACQUIRE_LOCK(&sched_mutex);
2184 for (task = all_tasks; task != NULL; task=task->all_link) {
2185 if (task != cap->running_task) {
2189 RELEASE_LOCK(&sched_mutex);
2191 #if defined(THREADED_RTS)
2192 // Wipe our spare workers list, they no longer exist. New
2193 // workers will be created if necessary.
2194 cap->spare_workers = NULL;
2195 cap->returning_tasks_hd = NULL;
2196 cap->returning_tasks_tl = NULL;
2199 // On Unix, all timers are reset in the child, so we need to start
2203 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2204 rts_checkSchedStatus("forkProcess",cap);
2207 hs_exit(); // clean up and exit
2208 stg_exit(EXIT_SUCCESS);
2210 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2211 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2216 /* ---------------------------------------------------------------------------
2217 * Delete all the threads in the system
2218 * ------------------------------------------------------------------------- */
2221 deleteAllThreads ( Capability *cap )
2223 // NOTE: only safe to call if we own all capabilities.
2226 debugTrace(DEBUG_sched,"deleting all threads");
2227 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2228 if (t->what_next == ThreadRelocated) {
2231 next = t->global_link;
2232 deleteThread(cap,t);
2236 // The run queue now contains a bunch of ThreadKilled threads. We
2237 // must not throw these away: the main thread(s) will be in there
2238 // somewhere, and the main scheduler loop has to deal with it.
2239 // Also, the run queue is the only thing keeping these threads from
2240 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2242 #if !defined(THREADED_RTS)
2243 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2244 ASSERT(sleeping_queue == END_TSO_QUEUE);
2248 /* -----------------------------------------------------------------------------
2249 Managing the suspended_ccalling_tasks list.
2250 Locks required: sched_mutex
2251 -------------------------------------------------------------------------- */
2254 suspendTask (Capability *cap, Task *task)
2256 ASSERT(task->next == NULL && task->prev == NULL);
2257 task->next = cap->suspended_ccalling_tasks;
2259 if (cap->suspended_ccalling_tasks) {
2260 cap->suspended_ccalling_tasks->prev = task;
2262 cap->suspended_ccalling_tasks = task;
2266 recoverSuspendedTask (Capability *cap, Task *task)
2269 task->prev->next = task->next;
2271 ASSERT(cap->suspended_ccalling_tasks == task);
2272 cap->suspended_ccalling_tasks = task->next;
2275 task->next->prev = task->prev;
2277 task->next = task->prev = NULL;
2280 /* ---------------------------------------------------------------------------
2281 * Suspending & resuming Haskell threads.
2283 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2284 * its capability before calling the C function. This allows another
2285 * task to pick up the capability and carry on running Haskell
2286 * threads. It also means that if the C call blocks, it won't lock
2289 * The Haskell thread making the C call is put to sleep for the
2290 * duration of the call, on the susepended_ccalling_threads queue. We
2291 * give out a token to the task, which it can use to resume the thread
2292 * on return from the C function.
2293 * ------------------------------------------------------------------------- */
2296 suspendThread (StgRegTable *reg)
2303 StgWord32 saved_winerror;
2306 saved_errno = errno;
2308 saved_winerror = GetLastError();
2311 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2313 cap = regTableToCapability(reg);
2315 task = cap->running_task;
2316 tso = cap->r.rCurrentTSO;
2318 debugTrace(DEBUG_sched,
2319 "thread %lu did a safe foreign call",
2320 (unsigned long)cap->r.rCurrentTSO->id);
2322 // XXX this might not be necessary --SDM
2323 tso->what_next = ThreadRunGHC;
2325 threadPaused(cap,tso);
2327 if ((tso->flags & TSO_BLOCKEX) == 0) {
2328 tso->why_blocked = BlockedOnCCall;
2329 tso->flags |= TSO_BLOCKEX;
2330 tso->flags &= ~TSO_INTERRUPTIBLE;
2332 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2335 // Hand back capability
2336 task->suspended_tso = tso;
2338 ACQUIRE_LOCK(&cap->lock);
2340 suspendTask(cap,task);
2341 cap->in_haskell = rtsFalse;
2342 releaseCapability_(cap);
2344 RELEASE_LOCK(&cap->lock);
2346 #if defined(THREADED_RTS)
2347 /* Preparing to leave the RTS, so ensure there's a native thread/task
2348 waiting to take over.
2350 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2353 errno = saved_errno;
2355 SetLastError(saved_winerror);
2361 resumeThread (void *task_)
2368 StgWord32 saved_winerror;
2371 saved_errno = errno;
2373 saved_winerror = GetLastError();
2377 // Wait for permission to re-enter the RTS with the result.
2378 waitForReturnCapability(&cap,task);
2379 // we might be on a different capability now... but if so, our
2380 // entry on the suspended_ccalling_tasks list will also have been
2383 // Remove the thread from the suspended list
2384 recoverSuspendedTask(cap,task);
2386 tso = task->suspended_tso;
2387 task->suspended_tso = NULL;
2388 tso->link = END_TSO_QUEUE;
2389 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2391 if (tso->why_blocked == BlockedOnCCall) {
2392 awakenBlockedExceptionQueue(cap,tso);
2393 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2396 /* Reset blocking status */
2397 tso->why_blocked = NotBlocked;
2399 cap->r.rCurrentTSO = tso;
2400 cap->in_haskell = rtsTrue;
2401 errno = saved_errno;
2403 SetLastError(saved_winerror);
2406 /* We might have GC'd, mark the TSO dirty again */
2409 IF_DEBUG(sanity, checkTSO(tso));
2414 /* ---------------------------------------------------------------------------
2417 * scheduleThread puts a thread on the end of the runnable queue.
2418 * This will usually be done immediately after a thread is created.
2419 * The caller of scheduleThread must create the thread using e.g.
2420 * createThread and push an appropriate closure
2421 * on this thread's stack before the scheduler is invoked.
2422 * ------------------------------------------------------------------------ */
2425 scheduleThread(Capability *cap, StgTSO *tso)
2427 // The thread goes at the *end* of the run-queue, to avoid possible
2428 // starvation of any threads already on the queue.
2429 appendToRunQueue(cap,tso);
2433 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2435 #if defined(THREADED_RTS)
2436 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2437 // move this thread from now on.
2438 cpu %= RtsFlags.ParFlags.nNodes;
2439 if (cpu == cap->no) {
2440 appendToRunQueue(cap,tso);
2442 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2445 appendToRunQueue(cap,tso);
2450 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2454 // We already created/initialised the Task
2455 task = cap->running_task;
2457 // This TSO is now a bound thread; make the Task and TSO
2458 // point to each other.
2464 task->stat = NoStatus;
2466 appendToRunQueue(cap,tso);
2468 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2471 /* GranSim specific init */
2472 CurrentTSO = m->tso; // the TSO to run
2473 procStatus[MainProc] = Busy; // status of main PE
2474 CurrentProc = MainProc; // PE to run it on
2477 cap = schedule(cap,task);
2479 ASSERT(task->stat != NoStatus);
2480 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2482 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2486 /* ----------------------------------------------------------------------------
2488 * ------------------------------------------------------------------------- */
2490 #if defined(THREADED_RTS)
2492 workerStart(Task *task)
2496 // See startWorkerTask().
2497 ACQUIRE_LOCK(&task->lock);
2499 RELEASE_LOCK(&task->lock);
2501 // set the thread-local pointer to the Task:
2504 // schedule() runs without a lock.
2505 cap = schedule(cap,task);
2507 // On exit from schedule(), we have a Capability.
2508 releaseCapability(cap);
2509 workerTaskStop(task);
2513 /* ---------------------------------------------------------------------------
2516 * Initialise the scheduler. This resets all the queues - if the
2517 * queues contained any threads, they'll be garbage collected at the
2520 * ------------------------------------------------------------------------ */
2527 for (i=0; i<=MAX_PROC; i++) {
2528 run_queue_hds[i] = END_TSO_QUEUE;
2529 run_queue_tls[i] = END_TSO_QUEUE;
2530 blocked_queue_hds[i] = END_TSO_QUEUE;
2531 blocked_queue_tls[i] = END_TSO_QUEUE;
2532 ccalling_threadss[i] = END_TSO_QUEUE;
2533 blackhole_queue[i] = END_TSO_QUEUE;
2534 sleeping_queue = END_TSO_QUEUE;
2536 #elif !defined(THREADED_RTS)
2537 blocked_queue_hd = END_TSO_QUEUE;
2538 blocked_queue_tl = END_TSO_QUEUE;
2539 sleeping_queue = END_TSO_QUEUE;
2542 blackhole_queue = END_TSO_QUEUE;
2543 all_threads = END_TSO_QUEUE;
2546 sched_state = SCHED_RUNNING;
2548 #if defined(THREADED_RTS)
2549 /* Initialise the mutex and condition variables used by
2551 initMutex(&sched_mutex);
2554 ACQUIRE_LOCK(&sched_mutex);
2556 /* A capability holds the state a native thread needs in
2557 * order to execute STG code. At least one capability is
2558 * floating around (only THREADED_RTS builds have more than one).
2564 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2568 #if defined(THREADED_RTS)
2570 * Eagerly start one worker to run each Capability, except for
2571 * Capability 0. The idea is that we're probably going to start a
2572 * bound thread on Capability 0 pretty soon, so we don't want a
2573 * worker task hogging it.
2578 for (i = 1; i < n_capabilities; i++) {
2579 cap = &capabilities[i];
2580 ACQUIRE_LOCK(&cap->lock);
2581 startWorkerTask(cap, workerStart);
2582 RELEASE_LOCK(&cap->lock);
2587 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2589 RELEASE_LOCK(&sched_mutex);
2593 exitScheduler( void )
2597 #if defined(THREADED_RTS)
2598 ACQUIRE_LOCK(&sched_mutex);
2599 task = newBoundTask();
2600 RELEASE_LOCK(&sched_mutex);
2603 // If we haven't killed all the threads yet, do it now.
2604 if (sched_state < SCHED_SHUTTING_DOWN) {
2605 sched_state = SCHED_INTERRUPTING;
2606 scheduleDoGC(NULL,task,rtsFalse);
2608 sched_state = SCHED_SHUTTING_DOWN;
2610 #if defined(THREADED_RTS)
2614 for (i = 0; i < n_capabilities; i++) {
2615 shutdownCapability(&capabilities[i], task);
2617 boundTaskExiting(task);
2621 freeCapability(&MainCapability);
2626 freeScheduler( void )
2629 if (n_capabilities != 1) {
2630 stgFree(capabilities);
2632 #if defined(THREADED_RTS)
2633 closeMutex(&sched_mutex);
2637 /* ---------------------------------------------------------------------------
2638 Where are the roots that we know about?
2640 - all the threads on the runnable queue
2641 - all the threads on the blocked queue
2642 - all the threads on the sleeping queue
2643 - all the thread currently executing a _ccall_GC
2644 - all the "main threads"
2646 ------------------------------------------------------------------------ */
2648 /* This has to be protected either by the scheduler monitor, or by the
2649 garbage collection monitor (probably the latter).
2654 GetRoots( evac_fn evac )
2661 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2662 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2663 evac((StgClosure **)&run_queue_hds[i]);
2664 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2665 evac((StgClosure **)&run_queue_tls[i]);
2667 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2668 evac((StgClosure **)&blocked_queue_hds[i]);
2669 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2670 evac((StgClosure **)&blocked_queue_tls[i]);
2671 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2672 evac((StgClosure **)&ccalling_threads[i]);
2679 for (i = 0; i < n_capabilities; i++) {
2680 cap = &capabilities[i];
2681 evac((StgClosure **)(void *)&cap->run_queue_hd);
2682 evac((StgClosure **)(void *)&cap->run_queue_tl);
2683 #if defined(THREADED_RTS)
2684 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2685 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2687 for (task = cap->suspended_ccalling_tasks; task != NULL;
2689 debugTrace(DEBUG_sched,
2690 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2691 evac((StgClosure **)(void *)&task->suspended_tso);
2697 #if !defined(THREADED_RTS)
2698 evac((StgClosure **)(void *)&blocked_queue_hd);
2699 evac((StgClosure **)(void *)&blocked_queue_tl);
2700 evac((StgClosure **)(void *)&sleeping_queue);
2704 // evac((StgClosure **)&blackhole_queue);
2706 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2707 markSparkQueue(evac);
2710 #if defined(RTS_USER_SIGNALS)
2711 // mark the signal handlers (signals should be already blocked)
2712 if (RtsFlags.MiscFlags.install_signal_handlers) {
2713 markSignalHandlers(evac);
2718 /* -----------------------------------------------------------------------------
2721 This is the interface to the garbage collector from Haskell land.
2722 We provide this so that external C code can allocate and garbage
2723 collect when called from Haskell via _ccall_GC.
2724 -------------------------------------------------------------------------- */
2727 performGC_(rtsBool force_major)
2730 // We must grab a new Task here, because the existing Task may be
2731 // associated with a particular Capability, and chained onto the
2732 // suspended_ccalling_tasks queue.
2733 ACQUIRE_LOCK(&sched_mutex);
2734 task = newBoundTask();
2735 RELEASE_LOCK(&sched_mutex);
2736 scheduleDoGC(NULL,task,force_major);
2737 boundTaskExiting(task);
2743 performGC_(rtsFalse);
2747 performMajorGC(void)
2749 performGC_(rtsTrue);
2752 /* -----------------------------------------------------------------------------
2755 If the thread has reached its maximum stack size, then raise the
2756 StackOverflow exception in the offending thread. Otherwise
2757 relocate the TSO into a larger chunk of memory and adjust its stack
2759 -------------------------------------------------------------------------- */
2762 threadStackOverflow(Capability *cap, StgTSO *tso)
2764 nat new_stack_size, stack_words;
2769 IF_DEBUG(sanity,checkTSO(tso));
2771 // don't allow throwTo() to modify the blocked_exceptions queue
2772 // while we are moving the TSO:
2773 lockClosure((StgClosure *)tso);
2775 if (tso->stack_size >= tso->max_stack_size) {
2777 debugTrace(DEBUG_gc,
2778 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2779 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2781 /* If we're debugging, just print out the top of the stack */
2782 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2785 // Send this thread the StackOverflow exception
2787 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2791 /* Try to double the current stack size. If that takes us over the
2792 * maximum stack size for this thread, then use the maximum instead.
2793 * Finally round up so the TSO ends up as a whole number of blocks.
2795 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2796 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2797 TSO_STRUCT_SIZE)/sizeof(W_);
2798 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2799 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2801 debugTrace(DEBUG_sched,
2802 "increasing stack size from %ld words to %d.",
2803 (long)tso->stack_size, new_stack_size);
2805 dest = (StgTSO *)allocate(new_tso_size);
2806 TICK_ALLOC_TSO(new_stack_size,0);
2808 /* copy the TSO block and the old stack into the new area */
2809 memcpy(dest,tso,TSO_STRUCT_SIZE);
2810 stack_words = tso->stack + tso->stack_size - tso->sp;
2811 new_sp = (P_)dest + new_tso_size - stack_words;
2812 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2814 /* relocate the stack pointers... */
2816 dest->stack_size = new_stack_size;
2818 /* Mark the old TSO as relocated. We have to check for relocated
2819 * TSOs in the garbage collector and any primops that deal with TSOs.
2821 * It's important to set the sp value to just beyond the end
2822 * of the stack, so we don't attempt to scavenge any part of the
2825 tso->what_next = ThreadRelocated;
2827 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2828 tso->why_blocked = NotBlocked;
2830 IF_PAR_DEBUG(verbose,
2831 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2832 tso->id, tso, tso->stack_size);
2833 /* If we're debugging, just print out the top of the stack */
2834 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2840 IF_DEBUG(sanity,checkTSO(dest));
2842 IF_DEBUG(scheduler,printTSO(dest));
2848 /* ---------------------------------------------------------------------------
2850 - usually called inside a signal handler so it mustn't do anything fancy.
2851 ------------------------------------------------------------------------ */
2854 interruptStgRts(void)
2856 sched_state = SCHED_INTERRUPTING;
2861 /* -----------------------------------------------------------------------------
2864 This function causes at least one OS thread to wake up and run the
2865 scheduler loop. It is invoked when the RTS might be deadlocked, or
2866 an external event has arrived that may need servicing (eg. a
2867 keyboard interrupt).
2869 In the single-threaded RTS we don't do anything here; we only have
2870 one thread anyway, and the event that caused us to want to wake up
2871 will have interrupted any blocking system call in progress anyway.
2872 -------------------------------------------------------------------------- */
2877 #if defined(THREADED_RTS)
2878 // This forces the IO Manager thread to wakeup, which will
2879 // in turn ensure that some OS thread wakes up and runs the
2880 // scheduler loop, which will cause a GC and deadlock check.
2885 /* -----------------------------------------------------------------------------
2888 * Check the blackhole_queue for threads that can be woken up. We do
2889 * this periodically: before every GC, and whenever the run queue is
2892 * An elegant solution might be to just wake up all the blocked
2893 * threads with awakenBlockedQueue occasionally: they'll go back to
2894 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2895 * doesn't give us a way to tell whether we've actually managed to
2896 * wake up any threads, so we would be busy-waiting.
2898 * -------------------------------------------------------------------------- */
2901 checkBlackHoles (Capability *cap)
2904 rtsBool any_woke_up = rtsFalse;
2907 // blackhole_queue is global:
2908 ASSERT_LOCK_HELD(&sched_mutex);
2910 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2912 // ASSUMES: sched_mutex
2913 prev = &blackhole_queue;
2914 t = blackhole_queue;
2915 while (t != END_TSO_QUEUE) {
2916 ASSERT(t->why_blocked == BlockedOnBlackHole);
2917 type = get_itbl(t->block_info.closure)->type;
2918 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2919 IF_DEBUG(sanity,checkTSO(t));
2920 t = unblockOne(cap, t);
2921 // urk, the threads migrate to the current capability
2922 // here, but we'd like to keep them on the original one.
2924 any_woke_up = rtsTrue;
2934 /* -----------------------------------------------------------------------------
2937 This is used for interruption (^C) and forking, and corresponds to
2938 raising an exception but without letting the thread catch the
2940 -------------------------------------------------------------------------- */
2943 deleteThread (Capability *cap, StgTSO *tso)
2945 // NOTE: must only be called on a TSO that we have exclusive
2946 // access to, because we will call throwToSingleThreaded() below.
2947 // The TSO must be on the run queue of the Capability we own, or
2948 // we must own all Capabilities.
2950 if (tso->why_blocked != BlockedOnCCall &&
2951 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2952 throwToSingleThreaded(cap,tso,NULL);
2956 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2958 deleteThread_(Capability *cap, StgTSO *tso)
2959 { // for forkProcess only:
2960 // like deleteThread(), but we delete threads in foreign calls, too.
2962 if (tso->why_blocked == BlockedOnCCall ||
2963 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2964 unblockOne(cap,tso);
2965 tso->what_next = ThreadKilled;
2967 deleteThread(cap,tso);
2972 /* -----------------------------------------------------------------------------
2973 raiseExceptionHelper
2975 This function is called by the raise# primitve, just so that we can
2976 move some of the tricky bits of raising an exception from C-- into
2977 C. Who knows, it might be a useful re-useable thing here too.
2978 -------------------------------------------------------------------------- */
2981 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2983 Capability *cap = regTableToCapability(reg);
2984 StgThunk *raise_closure = NULL;
2986 StgRetInfoTable *info;
2988 // This closure represents the expression 'raise# E' where E
2989 // is the exception raise. It is used to overwrite all the
2990 // thunks which are currently under evaluataion.
2993 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2994 // LDV profiling: stg_raise_info has THUNK as its closure
2995 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2996 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2997 // 1 does not cause any problem unless profiling is performed.
2998 // However, when LDV profiling goes on, we need to linearly scan
2999 // small object pool, where raise_closure is stored, so we should
3000 // use MIN_UPD_SIZE.
3002 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3003 // sizeofW(StgClosure)+1);
3007 // Walk up the stack, looking for the catch frame. On the way,
3008 // we update any closures pointed to from update frames with the
3009 // raise closure that we just built.
3013 info = get_ret_itbl((StgClosure *)p);
3014 next = p + stack_frame_sizeW((StgClosure *)p);
3015 switch (info->i.type) {
3018 // Only create raise_closure if we need to.
3019 if (raise_closure == NULL) {
3021 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3022 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3023 raise_closure->payload[0] = exception;
3025 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3029 case ATOMICALLY_FRAME:
3030 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3032 return ATOMICALLY_FRAME;
3038 case CATCH_STM_FRAME:
3039 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3041 return CATCH_STM_FRAME;
3047 case CATCH_RETRY_FRAME:
3056 /* -----------------------------------------------------------------------------
3057 findRetryFrameHelper
3059 This function is called by the retry# primitive. It traverses the stack
3060 leaving tso->sp referring to the frame which should handle the retry.
3062 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3063 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3065 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3066 create) because retries are not considered to be exceptions, despite the
3067 similar implementation.
3069 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3070 not be created within memory transactions.
3071 -------------------------------------------------------------------------- */
3074 findRetryFrameHelper (StgTSO *tso)
3077 StgRetInfoTable *info;
3081 info = get_ret_itbl((StgClosure *)p);
3082 next = p + stack_frame_sizeW((StgClosure *)p);
3083 switch (info->i.type) {
3085 case ATOMICALLY_FRAME:
3086 debugTrace(DEBUG_stm,
3087 "found ATOMICALLY_FRAME at %p during retry", p);
3089 return ATOMICALLY_FRAME;
3091 case CATCH_RETRY_FRAME:
3092 debugTrace(DEBUG_stm,
3093 "found CATCH_RETRY_FRAME at %p during retrry", p);
3095 return CATCH_RETRY_FRAME;
3097 case CATCH_STM_FRAME: {
3098 debugTrace(DEBUG_stm,
3099 "found CATCH_STM_FRAME at %p during retry", p);
3100 StgTRecHeader *trec = tso -> trec;
3101 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3102 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3103 stmAbortTransaction(tso -> cap, trec);
3104 stmFreeAbortedTRec(tso -> cap, trec);
3105 tso -> trec = outer;
3112 ASSERT(info->i.type != CATCH_FRAME);
3113 ASSERT(info->i.type != STOP_FRAME);
3120 /* -----------------------------------------------------------------------------
3121 resurrectThreads is called after garbage collection on the list of
3122 threads found to be garbage. Each of these threads will be woken
3123 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3124 on an MVar, or NonTermination if the thread was blocked on a Black
3127 Locks: assumes we hold *all* the capabilities.
3128 -------------------------------------------------------------------------- */
3131 resurrectThreads (StgTSO *threads)
3136 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3137 next = tso->global_link;
3138 tso->global_link = all_threads;
3140 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3142 // Wake up the thread on the Capability it was last on
3145 switch (tso->why_blocked) {
3147 case BlockedOnException:
3148 /* Called by GC - sched_mutex lock is currently held. */
3149 throwToSingleThreaded(cap, tso,
3150 (StgClosure *)BlockedOnDeadMVar_closure);
3152 case BlockedOnBlackHole:
3153 throwToSingleThreaded(cap, tso,
3154 (StgClosure *)NonTermination_closure);
3157 throwToSingleThreaded(cap, tso,
3158 (StgClosure *)BlockedIndefinitely_closure);
3161 /* This might happen if the thread was blocked on a black hole
3162 * belonging to a thread that we've just woken up (raiseAsync
3163 * can wake up threads, remember...).
3167 barf("resurrectThreads: thread blocked in a strange way");