1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "OSThreads.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
22 #include "RtsSignals.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
45 #include "Capability.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
52 #include "RaiseAsync.h"
54 #include "ThrIOManager.h"
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
71 // Turn off inlining when debugging - it obfuscates things
74 # define STATIC_INLINE static
77 /* -----------------------------------------------------------------------------
79 * -------------------------------------------------------------------------- */
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
87 In GranSim we have a runnable and a blocked queue for each processor.
88 In order to minimise code changes new arrays run_queue_hds/tls
89 are created. run_queue_hd is then a short cut (macro) for
90 run_queue_hds[CurrentProc] (see GranSim.h).
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97 the std RTS (i.e. we are cheating). However, we don't use this list in
98 the GranSim specific code at the moment (so we are only potentially
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
110 /* Threads blocked on blackholes.
111 * LOCK: sched_mutex+capability, or all capabilities
113 StgTSO *blackhole_queue = NULL;
116 /* The blackhole_queue should be checked for threads to wake up. See
117 * Schedule.h for more thorough comment.
118 * LOCK: none (doesn't matter if we miss an update)
120 rtsBool blackholes_need_checking = rtsFalse;
122 /* Linked list of all threads.
123 * Used for detecting garbage collected threads.
124 * LOCK: sched_mutex+capability, or all capabilities
126 StgTSO *all_threads = NULL;
128 /* flag set by signal handler to precipitate a context switch
129 * LOCK: none (just an advisory flag)
131 int context_switch = 0;
133 /* flag that tracks whether we have done any execution in this time slice.
134 * LOCK: currently none, perhaps we should lock (but needs to be
135 * updated in the fast path of the scheduler).
137 nat recent_activity = ACTIVITY_YES;
139 /* if this flag is set as well, give up execution
140 * LOCK: none (changes once, from false->true)
142 rtsBool sched_state = SCHED_RUNNING;
148 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
149 * exists - earlier gccs apparently didn't.
155 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156 * in an MT setting, needed to signal that a worker thread shouldn't hang around
157 * in the scheduler when it is out of work.
159 rtsBool shutting_down_scheduler = rtsFalse;
162 * This mutex protects most of the global scheduler data in
163 * the THREADED_RTS runtime.
165 #if defined(THREADED_RTS)
169 #if defined(PARALLEL_HASKELL)
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
179 /* -----------------------------------------------------------------------------
180 * static function prototypes
181 * -------------------------------------------------------------------------- */
183 static Capability *schedule (Capability *initialCapability, Task *task);
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
215 nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221 rtsBool force_major);
223 static rtsBool checkBlackHoles(Capability *cap);
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);
240 static char *whatNext_strs[] = {
250 /* -----------------------------------------------------------------------------
251 * Putting a thread on the run queue: different scheduling policies
252 * -------------------------------------------------------------------------- */
255 addToRunQueue( Capability *cap, StgTSO *t )
257 #if defined(PARALLEL_HASKELL)
258 if (RtsFlags.ParFlags.doFairScheduling) {
259 // this does round-robin scheduling; good for concurrency
260 appendToRunQueue(cap,t);
262 // this does unfair scheduling; good for parallelism
263 pushOnRunQueue(cap,t);
266 // this does round-robin scheduling; good for concurrency
267 appendToRunQueue(cap,t);
271 /* ---------------------------------------------------------------------------
272 Main scheduling loop.
274 We use round-robin scheduling, each thread returning to the
275 scheduler loop when one of these conditions is detected:
278 * timer expires (thread yields)
284 In a GranSim setup this loop iterates over the global event queue.
285 This revolves around the global event queue, which determines what
286 to do next. Therefore, it's more complicated than either the
287 concurrent or the parallel (GUM) setup.
290 GUM iterates over incoming messages.
291 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292 and sends out a fish whenever it has nothing to do; in-between
293 doing the actual reductions (shared code below) it processes the
294 incoming messages and deals with delayed operations
295 (see PendingFetches).
296 This is not the ugliest code you could imagine, but it's bloody close.
298 ------------------------------------------------------------------------ */
301 schedule (Capability *initialCapability, Task *task)
305 StgThreadReturnCode ret;
308 #elif defined(PARALLEL_HASKELL)
311 rtsBool receivedFinish = rtsFalse;
313 nat tp_size, sp_size; // stats only
318 #if defined(THREADED_RTS)
319 rtsBool first = rtsTrue;
322 cap = initialCapability;
324 // Pre-condition: this task owns initialCapability.
325 // The sched_mutex is *NOT* held
326 // NB. on return, we still hold a capability.
328 debugTrace (DEBUG_sched,
329 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330 task, initialCapability);
334 // -----------------------------------------------------------
335 // Scheduler loop starts here:
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION (!receivedFinish)
340 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
342 #define TERMINATION_CONDITION rtsTrue
345 while (TERMINATION_CONDITION) {
348 /* Choose the processor with the next event */
349 CurrentProc = event->proc;
350 CurrentTSO = event->tso;
353 #if defined(THREADED_RTS)
355 // don't yield the first time, we want a chance to run this
356 // thread for a bit, even if there are others banging at the
359 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
361 // Yield the capability to higher-priority tasks if necessary.
362 yieldCapability(&cap, task);
366 #if defined(THREADED_RTS)
367 schedulePushWork(cap,task);
370 // Check whether we have re-entered the RTS from Haskell without
371 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
373 if (cap->in_haskell) {
374 errorBelch("schedule: re-entered unsafely.\n"
375 " Perhaps a 'foreign import unsafe' should be 'safe'?");
376 stg_exit(EXIT_FAILURE);
379 // The interruption / shutdown sequence.
381 // In order to cleanly shut down the runtime, we want to:
382 // * make sure that all main threads return to their callers
383 // with the state 'Interrupted'.
384 // * clean up all OS threads assocated with the runtime
385 // * free all memory etc.
387 // So the sequence for ^C goes like this:
389 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
390 // arranges for some Capability to wake up
392 // * all threads in the system are halted, and the zombies are
393 // placed on the run queue for cleaning up. We acquire all
394 // the capabilities in order to delete the threads, this is
395 // done by scheduleDoGC() for convenience (because GC already
396 // needs to acquire all the capabilities). We can't kill
397 // threads involved in foreign calls.
399 // * somebody calls shutdownHaskell(), which calls exitScheduler()
401 // * sched_state := SCHED_SHUTTING_DOWN
403 // * all workers exit when the run queue on their capability
404 // drains. All main threads will also exit when their TSO
405 // reaches the head of the run queue and they can return.
407 // * eventually all Capabilities will shut down, and the RTS can
410 // * We might be left with threads blocked in foreign calls,
411 // we should really attempt to kill these somehow (TODO);
413 switch (sched_state) {
416 case SCHED_INTERRUPTING:
417 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419 discardSparksCap(cap);
421 /* scheduleDoGC() deletes all the threads */
422 cap = scheduleDoGC(cap,task,rtsFalse);
424 case SCHED_SHUTTING_DOWN:
425 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426 // If we are a worker, just exit. If we're a bound thread
427 // then we will exit below when we've removed our TSO from
429 if (task->tso == NULL && emptyRunQueue(cap)) {
434 barf("sched_state: %d", sched_state);
437 #if defined(THREADED_RTS)
438 // If the run queue is empty, take a spark and turn it into a thread.
440 if (emptyRunQueue(cap)) {
442 spark = findSpark(cap);
444 debugTrace(DEBUG_sched,
445 "turning spark of closure %p into a thread",
446 (StgClosure *)spark);
447 createSparkThread(cap,spark);
451 #endif // THREADED_RTS
453 scheduleStartSignalHandlers(cap);
455 // Only check the black holes here if we've nothing else to do.
456 // During normal execution, the black hole list only gets checked
457 // at GC time, to avoid repeatedly traversing this possibly long
458 // list each time around the scheduler.
459 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
461 scheduleCheckWakeupThreads(cap);
463 scheduleCheckBlockedThreads(cap);
465 scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467 cap = task->cap; // reload cap, it might have changed
470 // Normally, the only way we can get here with no threads to
471 // run is if a keyboard interrupt received during
472 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473 // Additionally, it is not fatal for the
474 // threaded RTS to reach here with no threads to run.
476 // win32: might be here due to awaitEvent() being abandoned
477 // as a result of a console event having been delivered.
478 if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480 ASSERT(sched_state >= SCHED_INTERRUPTING);
482 continue; // nothing to do
485 #if defined(PARALLEL_HASKELL)
486 scheduleSendPendingMessages();
487 if (emptyRunQueue(cap) && scheduleActivateSpark())
491 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
494 /* If we still have no work we need to send a FISH to get a spark
496 if (emptyRunQueue(cap)) {
497 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498 ASSERT(rtsFalse); // should not happen at the moment
500 // from here: non-empty run queue.
501 // TODO: merge above case with this, only one call processMessages() !
502 if (PacketsWaiting()) { /* process incoming messages, if
503 any pending... only in else
504 because getRemoteWork waits for
506 receivedFinish = processMessages();
511 scheduleProcessEvent(event);
515 // Get a thread to run
517 t = popRunQueue(cap);
519 #if defined(GRAN) || defined(PAR)
520 scheduleGranParReport(); // some kind of debuging output
522 // Sanity check the thread we're about to run. This can be
523 // expensive if there is lots of thread switching going on...
524 IF_DEBUG(sanity,checkTSO(t));
527 #if defined(THREADED_RTS)
528 // Check whether we can run this thread in the current task.
529 // If not, we have to pass our capability to the right task.
531 Task *bound = t->bound;
535 debugTrace(DEBUG_sched,
536 "### Running thread %lu in bound thread", (unsigned long)t->id);
537 // yes, the Haskell thread is bound to the current native thread
539 debugTrace(DEBUG_sched,
540 "### thread %lu bound to another OS thread", (unsigned long)t->id);
541 // no, bound to a different Haskell thread: pass to that thread
542 pushOnRunQueue(cap,t);
546 // The thread we want to run is unbound.
548 debugTrace(DEBUG_sched,
549 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550 // no, the current native thread is bound to a different
551 // Haskell thread, so pass it to any worker thread
552 pushOnRunQueue(cap,t);
559 cap->r.rCurrentTSO = t;
561 /* context switches are initiated by the timer signal, unless
562 * the user specified "context switch as often as possible", with
565 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566 && !emptyThreadQueues(cap)) {
572 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
573 (long)t->id, whatNext_strs[t->what_next]);
575 #if defined(PROFILING)
576 startHeapProfTimer();
579 // Check for exceptions blocked on this thread
580 maybePerformBlockedException (cap, t);
582 // ----------------------------------------------------------------------
583 // Run the current thread
585 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586 ASSERT(t->cap == cap);
588 prev_what_next = t->what_next;
590 errno = t->saved_errno;
592 SetLastError(t->saved_winerror);
595 cap->in_haskell = rtsTrue;
599 recent_activity = ACTIVITY_YES;
601 switch (prev_what_next) {
605 /* Thread already finished, return to scheduler. */
606 ret = ThreadFinished;
612 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
613 cap = regTableToCapability(r);
618 case ThreadInterpret:
619 cap = interpretBCO(cap);
624 barf("schedule: invalid what_next field");
627 cap->in_haskell = rtsFalse;
629 // The TSO might have moved, eg. if it re-entered the RTS and a GC
630 // happened. So find the new location:
631 t = cap->r.rCurrentTSO;
633 // We have run some Haskell code: there might be blackhole-blocked
634 // threads to wake up now.
635 // Lock-free test here should be ok, we're just setting a flag.
636 if ( blackhole_queue != END_TSO_QUEUE ) {
637 blackholes_need_checking = rtsTrue;
640 // And save the current errno in this thread.
641 // XXX: possibly bogus for SMP because this thread might already
642 // be running again, see code below.
643 t->saved_errno = errno;
645 // Similarly for Windows error code
646 t->saved_winerror = GetLastError();
649 #if defined(THREADED_RTS)
650 // If ret is ThreadBlocked, and this Task is bound to the TSO that
651 // blocked, we are in limbo - the TSO is now owned by whatever it
652 // is blocked on, and may in fact already have been woken up,
653 // perhaps even on a different Capability. It may be the case
654 // that task->cap != cap. We better yield this Capability
655 // immediately and return to normaility.
656 if (ret == ThreadBlocked) {
657 debugTrace(DEBUG_sched,
658 "--<< thread %lu (%s) stopped: blocked",
659 (unsigned long)t->id, whatNext_strs[t->what_next]);
664 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
665 ASSERT(t->cap == cap);
667 // ----------------------------------------------------------------------
669 // Costs for the scheduler are assigned to CCS_SYSTEM
670 #if defined(PROFILING)
675 schedulePostRunThread();
677 ready_to_gc = rtsFalse;
681 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
685 scheduleHandleStackOverflow(cap,task,t);
689 if (scheduleHandleYield(cap, t, prev_what_next)) {
690 // shortcut for switching between compiler/interpreter:
696 scheduleHandleThreadBlocked(t);
700 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
701 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
705 barf("schedule: invalid thread return code %d", (int)ret);
708 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
710 cap = scheduleDoGC(cap,task,rtsFalse);
712 } /* end of while() */
714 debugTrace(PAR_DEBUG_verbose,
715 "== Leaving schedule() after having received Finish");
718 /* ----------------------------------------------------------------------------
719 * Setting up the scheduler loop
720 * ------------------------------------------------------------------------- */
723 schedulePreLoop(void)
726 /* set up first event to get things going */
727 /* ToDo: assign costs for system setup and init MainTSO ! */
728 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
730 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
732 debugTrace (DEBUG_gran,
733 "GRAN: Init CurrentTSO (in schedule) = %p",
735 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
737 if (RtsFlags.GranFlags.Light) {
738 /* Save current time; GranSim Light only */
739 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
744 /* -----------------------------------------------------------------------------
747 * Push work to other Capabilities if we have some.
748 * -------------------------------------------------------------------------- */
750 #if defined(THREADED_RTS)
752 schedulePushWork(Capability *cap USED_IF_THREADS,
753 Task *task USED_IF_THREADS)
755 Capability *free_caps[n_capabilities], *cap0;
758 // migration can be turned off with +RTS -qg
759 if (!RtsFlags.ParFlags.migrate) return;
761 // Check whether we have more threads on our run queue, or sparks
762 // in our pool, that we could hand to another Capability.
763 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
764 && sparkPoolSizeCap(cap) < 2) {
768 // First grab as many free Capabilities as we can.
769 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
770 cap0 = &capabilities[i];
771 if (cap != cap0 && tryGrabCapability(cap0,task)) {
772 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
773 // it already has some work, we just grabbed it at
774 // the wrong moment. Or maybe it's deadlocked!
775 releaseCapability(cap0);
777 free_caps[n_free_caps++] = cap0;
782 // we now have n_free_caps free capabilities stashed in
783 // free_caps[]. Share our run queue equally with them. This is
784 // probably the simplest thing we could do; improvements we might
785 // want to do include:
787 // - giving high priority to moving relatively new threads, on
788 // the gournds that they haven't had time to build up a
789 // working set in the cache on this CPU/Capability.
791 // - giving low priority to moving long-lived threads
793 if (n_free_caps > 0) {
794 StgTSO *prev, *t, *next;
795 rtsBool pushed_to_all;
797 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
800 pushed_to_all = rtsFalse;
802 if (cap->run_queue_hd != END_TSO_QUEUE) {
803 prev = cap->run_queue_hd;
805 prev->link = END_TSO_QUEUE;
806 for (; t != END_TSO_QUEUE; t = next) {
808 t->link = END_TSO_QUEUE;
809 if (t->what_next == ThreadRelocated
810 || t->bound == task // don't move my bound thread
811 || tsoLocked(t)) { // don't move a locked thread
814 } else if (i == n_free_caps) {
815 pushed_to_all = rtsTrue;
821 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
822 appendToRunQueue(free_caps[i],t);
823 if (t->bound) { t->bound->cap = free_caps[i]; }
824 t->cap = free_caps[i];
828 cap->run_queue_tl = prev;
831 // If there are some free capabilities that we didn't push any
832 // threads to, then try to push a spark to each one.
833 if (!pushed_to_all) {
835 // i is the next free capability to push to
836 for (; i < n_free_caps; i++) {
837 if (emptySparkPoolCap(free_caps[i])) {
838 spark = findSpark(cap);
840 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841 newSpark(&(free_caps[i]->r), spark);
847 // release the capabilities
848 for (i = 0; i < n_free_caps; i++) {
849 task->cap = free_caps[i];
850 releaseCapability(free_caps[i]);
853 task->cap = cap; // reset to point to our Capability.
857 /* ----------------------------------------------------------------------------
858 * Start any pending signal handlers
859 * ------------------------------------------------------------------------- */
861 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
863 scheduleStartSignalHandlers(Capability *cap)
865 if (signals_pending()) { // safe outside the lock
866 startSignalHandlers(cap);
871 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
876 /* ----------------------------------------------------------------------------
877 * Check for blocked threads that can be woken up.
878 * ------------------------------------------------------------------------- */
881 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
883 #if !defined(THREADED_RTS)
885 // Check whether any waiting threads need to be woken up. If the
886 // run queue is empty, and there are no other tasks running, we
887 // can wait indefinitely for something to happen.
889 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
891 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
897 /* ----------------------------------------------------------------------------
898 * Check for threads woken up by other Capabilities
899 * ------------------------------------------------------------------------- */
902 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
904 #if defined(THREADED_RTS)
905 // Any threads that were woken up by other Capabilities get
906 // appended to our run queue.
907 if (!emptyWakeupQueue(cap)) {
908 ACQUIRE_LOCK(&cap->lock);
909 if (emptyRunQueue(cap)) {
910 cap->run_queue_hd = cap->wakeup_queue_hd;
911 cap->run_queue_tl = cap->wakeup_queue_tl;
913 cap->run_queue_tl->link = cap->wakeup_queue_hd;
914 cap->run_queue_tl = cap->wakeup_queue_tl;
916 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
917 RELEASE_LOCK(&cap->lock);
922 /* ----------------------------------------------------------------------------
923 * Check for threads blocked on BLACKHOLEs that can be woken up
924 * ------------------------------------------------------------------------- */
926 scheduleCheckBlackHoles (Capability *cap)
928 if ( blackholes_need_checking ) // check without the lock first
930 ACQUIRE_LOCK(&sched_mutex);
931 if ( blackholes_need_checking ) {
932 checkBlackHoles(cap);
933 blackholes_need_checking = rtsFalse;
935 RELEASE_LOCK(&sched_mutex);
939 /* ----------------------------------------------------------------------------
940 * Detect deadlock conditions and attempt to resolve them.
941 * ------------------------------------------------------------------------- */
944 scheduleDetectDeadlock (Capability *cap, Task *task)
947 #if defined(PARALLEL_HASKELL)
948 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
953 * Detect deadlock: when we have no threads to run, there are no
954 * threads blocked, waiting for I/O, or sleeping, and all the
955 * other tasks are waiting for work, we must have a deadlock of
958 if ( emptyThreadQueues(cap) )
960 #if defined(THREADED_RTS)
962 * In the threaded RTS, we only check for deadlock if there
963 * has been no activity in a complete timeslice. This means
964 * we won't eagerly start a full GC just because we don't have
965 * any threads to run currently.
967 if (recent_activity != ACTIVITY_INACTIVE) return;
970 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
972 // Garbage collection can release some new threads due to
973 // either (a) finalizers or (b) threads resurrected because
974 // they are unreachable and will therefore be sent an
975 // exception. Any threads thus released will be immediately
977 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
979 recent_activity = ACTIVITY_DONE_GC;
981 if ( !emptyRunQueue(cap) ) return;
983 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
984 /* If we have user-installed signal handlers, then wait
985 * for signals to arrive rather then bombing out with a
988 if ( anyUserHandlers() ) {
989 debugTrace(DEBUG_sched,
990 "still deadlocked, waiting for signals...");
994 if (signals_pending()) {
995 startSignalHandlers(cap);
998 // either we have threads to run, or we were interrupted:
999 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1003 #if !defined(THREADED_RTS)
1004 /* Probably a real deadlock. Send the current main thread the
1005 * Deadlock exception.
1008 switch (task->tso->why_blocked) {
1010 case BlockedOnBlackHole:
1011 case BlockedOnException:
1013 throwToSingleThreaded(cap, task->tso,
1014 (StgClosure *)NonTermination_closure);
1017 barf("deadlock: main thread blocked in a strange way");
1025 /* ----------------------------------------------------------------------------
1026 * Process an event (GRAN only)
1027 * ------------------------------------------------------------------------- */
1031 scheduleProcessEvent(rtsEvent *event)
1035 if (RtsFlags.GranFlags.Light)
1036 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1038 /* adjust time based on time-stamp */
1039 if (event->time > CurrentTime[CurrentProc] &&
1040 event->evttype != ContinueThread)
1041 CurrentTime[CurrentProc] = event->time;
1043 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1044 if (!RtsFlags.GranFlags.Light)
1047 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1049 /* main event dispatcher in GranSim */
1050 switch (event->evttype) {
1051 /* Should just be continuing execution */
1052 case ContinueThread:
1053 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1054 /* ToDo: check assertion
1055 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1056 run_queue_hd != END_TSO_QUEUE);
1058 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1059 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1060 procStatus[CurrentProc]==Fetching) {
1061 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1062 CurrentTSO->id, CurrentTSO, CurrentProc);
1065 /* Ignore ContinueThreads for completed threads */
1066 if (CurrentTSO->what_next == ThreadComplete) {
1067 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1068 CurrentTSO->id, CurrentTSO, CurrentProc);
1071 /* Ignore ContinueThreads for threads that are being migrated */
1072 if (PROCS(CurrentTSO)==Nowhere) {
1073 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1074 CurrentTSO->id, CurrentTSO, CurrentProc);
1077 /* The thread should be at the beginning of the run queue */
1078 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1079 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1080 CurrentTSO->id, CurrentTSO, CurrentProc);
1081 break; // run the thread anyway
1084 new_event(proc, proc, CurrentTime[proc],
1086 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1088 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1089 break; // now actually run the thread; DaH Qu'vam yImuHbej
1092 do_the_fetchnode(event);
1093 goto next_thread; /* handle next event in event queue */
1096 do_the_globalblock(event);
1097 goto next_thread; /* handle next event in event queue */
1100 do_the_fetchreply(event);
1101 goto next_thread; /* handle next event in event queue */
1103 case UnblockThread: /* Move from the blocked queue to the tail of */
1104 do_the_unblock(event);
1105 goto next_thread; /* handle next event in event queue */
1107 case ResumeThread: /* Move from the blocked queue to the tail of */
1108 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1109 event->tso->gran.blocktime +=
1110 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1111 do_the_startthread(event);
1112 goto next_thread; /* handle next event in event queue */
1115 do_the_startthread(event);
1116 goto next_thread; /* handle next event in event queue */
1119 do_the_movethread(event);
1120 goto next_thread; /* handle next event in event queue */
1123 do_the_movespark(event);
1124 goto next_thread; /* handle next event in event queue */
1127 do_the_findwork(event);
1128 goto next_thread; /* handle next event in event queue */
1131 barf("Illegal event type %u\n", event->evttype);
1134 /* This point was scheduler_loop in the old RTS */
1136 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1138 TimeOfLastEvent = CurrentTime[CurrentProc];
1139 TimeOfNextEvent = get_time_of_next_event();
1140 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1141 // CurrentTSO = ThreadQueueHd;
1143 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1146 if (RtsFlags.GranFlags.Light)
1147 GranSimLight_leave_system(event, &ActiveTSO);
1149 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1152 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1154 /* in a GranSim setup the TSO stays on the run queue */
1156 /* Take a thread from the run queue. */
1157 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1160 debugBelch("GRAN: About to run current thread, which is\n");
1163 context_switch = 0; // turned on via GranYield, checking events and time slice
1166 DumpGranEvent(GR_SCHEDULE, t));
1168 procStatus[CurrentProc] = Busy;
1172 /* ----------------------------------------------------------------------------
1173 * Send pending messages (PARALLEL_HASKELL only)
1174 * ------------------------------------------------------------------------- */
1176 #if defined(PARALLEL_HASKELL)
1178 scheduleSendPendingMessages(void)
1184 # if defined(PAR) // global Mem.Mgmt., omit for now
1185 if (PendingFetches != END_BF_QUEUE) {
1190 if (RtsFlags.ParFlags.BufferTime) {
1191 // if we use message buffering, we must send away all message
1192 // packets which have become too old...
1198 /* ----------------------------------------------------------------------------
1199 * Activate spark threads (PARALLEL_HASKELL only)
1200 * ------------------------------------------------------------------------- */
1202 #if defined(PARALLEL_HASKELL)
1204 scheduleActivateSpark(void)
1207 ASSERT(emptyRunQueue());
1208 /* We get here if the run queue is empty and want some work.
1209 We try to turn a spark into a thread, and add it to the run queue,
1210 from where it will be picked up in the next iteration of the scheduler
1214 /* :-[ no local threads => look out for local sparks */
1215 /* the spark pool for the current PE */
1216 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1217 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1218 pool->hd < pool->tl) {
1220 * ToDo: add GC code check that we really have enough heap afterwards!!
1222 * If we're here (no runnable threads) and we have pending
1223 * sparks, we must have a space problem. Get enough space
1224 * to turn one of those pending sparks into a
1228 spark = findSpark(rtsFalse); /* get a spark */
1229 if (spark != (rtsSpark) NULL) {
1230 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1231 IF_PAR_DEBUG(fish, // schedule,
1232 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1233 tso->id, tso, advisory_thread_count));
1235 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1236 IF_PAR_DEBUG(fish, // schedule,
1237 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1239 return rtsFalse; /* failed to generate a thread */
1240 } /* otherwise fall through & pick-up new tso */
1242 IF_PAR_DEBUG(fish, // schedule,
1243 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1244 spark_queue_len(pool)));
1245 return rtsFalse; /* failed to generate a thread */
1247 return rtsTrue; /* success in generating a thread */
1248 } else { /* no more threads permitted or pool empty */
1249 return rtsFalse; /* failed to generateThread */
1252 tso = NULL; // avoid compiler warning only
1253 return rtsFalse; /* dummy in non-PAR setup */
1256 #endif // PARALLEL_HASKELL
1258 /* ----------------------------------------------------------------------------
1259 * Get work from a remote node (PARALLEL_HASKELL only)
1260 * ------------------------------------------------------------------------- */
1262 #if defined(PARALLEL_HASKELL)
1264 scheduleGetRemoteWork(rtsBool *receivedFinish)
1266 ASSERT(emptyRunQueue());
1268 if (RtsFlags.ParFlags.BufferTime) {
1269 IF_PAR_DEBUG(verbose,
1270 debugBelch("...send all pending data,"));
1273 for (i=1; i<=nPEs; i++)
1274 sendImmediately(i); // send all messages away immediately
1278 //++EDEN++ idle() , i.e. send all buffers, wait for work
1279 // suppress fishing in EDEN... just look for incoming messages
1280 // (blocking receive)
1281 IF_PAR_DEBUG(verbose,
1282 debugBelch("...wait for incoming messages...\n"));
1283 *receivedFinish = processMessages(); // blocking receive...
1285 // and reenter scheduling loop after having received something
1286 // (return rtsFalse below)
1288 # else /* activate SPARKS machinery */
1289 /* We get here, if we have no work, tried to activate a local spark, but still
1290 have no work. We try to get a remote spark, by sending a FISH message.
1291 Thread migration should be added here, and triggered when a sequence of
1292 fishes returns without work. */
1293 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1295 /* =8-[ no local sparks => look for work on other PEs */
1297 * We really have absolutely no work. Send out a fish
1298 * (there may be some out there already), and wait for
1299 * something to arrive. We clearly can't run any threads
1300 * until a SCHEDULE or RESUME arrives, and so that's what
1301 * we're hoping to see. (Of course, we still have to
1302 * respond to other types of messages.)
1304 rtsTime now = msTime() /*CURRENT_TIME*/;
1305 IF_PAR_DEBUG(verbose,
1306 debugBelch("-- now=%ld\n", now));
1307 IF_PAR_DEBUG(fish, // verbose,
1308 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1309 (last_fish_arrived_at!=0 &&
1310 last_fish_arrived_at+delay > now)) {
1311 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1312 now, last_fish_arrived_at+delay,
1313 last_fish_arrived_at,
1317 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1318 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1319 if (last_fish_arrived_at==0 ||
1320 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1321 /* outstandingFishes is set in sendFish, processFish;
1322 avoid flooding system with fishes via delay */
1323 next_fish_to_send_at = 0;
1325 /* ToDo: this should be done in the main scheduling loop to avoid the
1326 busy wait here; not so bad if fish delay is very small */
1327 int iq = 0; // DEBUGGING -- HWL
1328 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1329 /* send a fish when ready, but process messages that arrive in the meantime */
1331 if (PacketsWaiting()) {
1333 *receivedFinish = processMessages();
1336 } while (!*receivedFinish || now<next_fish_to_send_at);
1337 // JB: This means the fish could become obsolete, if we receive
1338 // work. Better check for work again?
1339 // last line: while (!receivedFinish || !haveWork || now<...)
1340 // next line: if (receivedFinish || haveWork )
1342 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1343 return rtsFalse; // NB: this will leave scheduler loop
1344 // immediately after return!
1346 IF_PAR_DEBUG(fish, // verbose,
1347 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1351 // JB: IMHO, this should all be hidden inside sendFish(...)
1353 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1356 // Global statistics: count no. of fishes
1357 if (RtsFlags.ParFlags.ParStats.Global &&
1358 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1359 globalParStats.tot_fish_mess++;
1363 /* delayed fishes must have been sent by now! */
1364 next_fish_to_send_at = 0;
1367 *receivedFinish = processMessages();
1368 # endif /* SPARKS */
1371 /* NB: this function always returns rtsFalse, meaning the scheduler
1372 loop continues with the next iteration;
1374 return code means success in finding work; we enter this function
1375 if there is no local work, thus have to send a fish which takes
1376 time until it arrives with work; in the meantime we should process
1377 messages in the main loop;
1380 #endif // PARALLEL_HASKELL
1382 /* ----------------------------------------------------------------------------
1383 * PAR/GRAN: Report stats & debugging info(?)
1384 * ------------------------------------------------------------------------- */
1386 #if defined(PAR) || defined(GRAN)
1388 scheduleGranParReport(void)
1390 ASSERT(run_queue_hd != END_TSO_QUEUE);
1392 /* Take a thread from the run queue, if we have work */
1393 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1395 /* If this TSO has got its outport closed in the meantime,
1396 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1397 * It has to be marked as TH_DEAD for this purpose.
1398 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1400 JB: TODO: investigate wether state change field could be nuked
1401 entirely and replaced by the normal tso state (whatnext
1402 field). All we want to do is to kill tsos from outside.
1405 /* ToDo: write something to the log-file
1406 if (RTSflags.ParFlags.granSimStats && !sameThread)
1407 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1411 /* the spark pool for the current PE */
1412 pool = &(cap.r.rSparks); // cap = (old) MainCap
1415 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1416 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1419 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1420 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1422 if (RtsFlags.ParFlags.ParStats.Full &&
1423 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1424 (emitSchedule || // forced emit
1425 (t && LastTSO && t->id != LastTSO->id))) {
1427 we are running a different TSO, so write a schedule event to log file
1428 NB: If we use fair scheduling we also have to write a deschedule
1429 event for LastTSO; with unfair scheduling we know that the
1430 previous tso has blocked whenever we switch to another tso, so
1431 we don't need it in GUM for now
1433 IF_PAR_DEBUG(fish, // schedule,
1434 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1436 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1437 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1438 emitSchedule = rtsFalse;
1443 /* ----------------------------------------------------------------------------
1444 * After running a thread...
1445 * ------------------------------------------------------------------------- */
1448 schedulePostRunThread(void)
1451 /* HACK 675: if the last thread didn't yield, make sure to print a
1452 SCHEDULE event to the log file when StgRunning the next thread, even
1453 if it is the same one as before */
1455 TimeOfLastYield = CURRENT_TIME;
1458 /* some statistics gathering in the parallel case */
1460 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1464 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1465 globalGranStats.tot_heapover++;
1467 globalParStats.tot_heapover++;
1474 DumpGranEvent(GR_DESCHEDULE, t));
1475 globalGranStats.tot_stackover++;
1478 // DumpGranEvent(GR_DESCHEDULE, t);
1479 globalParStats.tot_stackover++;
1483 case ThreadYielding:
1486 DumpGranEvent(GR_DESCHEDULE, t));
1487 globalGranStats.tot_yields++;
1490 // DumpGranEvent(GR_DESCHEDULE, t);
1491 globalParStats.tot_yields++;
1497 debugTrace(DEBUG_sched,
1498 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1499 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1500 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1501 if (t->block_info.closure!=(StgClosure*)NULL)
1502 print_bq(t->block_info.closure);
1505 // ??? needed; should emit block before
1507 DumpGranEvent(GR_DESCHEDULE, t));
1508 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1511 ASSERT(procStatus[CurrentProc]==Busy ||
1512 ((procStatus[CurrentProc]==Fetching) &&
1513 (t->block_info.closure!=(StgClosure*)NULL)));
1514 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1515 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1516 procStatus[CurrentProc]==Fetching))
1517 procStatus[CurrentProc] = Idle;
1520 //++PAR++ blockThread() writes the event (change?)
1524 case ThreadFinished:
1528 barf("parGlobalStats: unknown return code");
1534 /* -----------------------------------------------------------------------------
1535 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1536 * -------------------------------------------------------------------------- */
1539 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1541 // did the task ask for a large block?
1542 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1543 // if so, get one and push it on the front of the nursery.
1547 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1549 debugTrace(DEBUG_sched,
1550 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1551 (long)t->id, whatNext_strs[t->what_next], blocks);
1553 // don't do this if the nursery is (nearly) full, we'll GC first.
1554 if (cap->r.rCurrentNursery->link != NULL ||
1555 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1556 // if the nursery has only one block.
1559 bd = allocGroup( blocks );
1561 cap->r.rNursery->n_blocks += blocks;
1563 // link the new group into the list
1564 bd->link = cap->r.rCurrentNursery;
1565 bd->u.back = cap->r.rCurrentNursery->u.back;
1566 if (cap->r.rCurrentNursery->u.back != NULL) {
1567 cap->r.rCurrentNursery->u.back->link = bd;
1569 #if !defined(THREADED_RTS)
1570 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1571 g0s0 == cap->r.rNursery);
1573 cap->r.rNursery->blocks = bd;
1575 cap->r.rCurrentNursery->u.back = bd;
1577 // initialise it as a nursery block. We initialise the
1578 // step, gen_no, and flags field of *every* sub-block in
1579 // this large block, because this is easier than making
1580 // sure that we always find the block head of a large
1581 // block whenever we call Bdescr() (eg. evacuate() and
1582 // isAlive() in the GC would both have to do this, at
1586 for (x = bd; x < bd + blocks; x++) {
1587 x->step = cap->r.rNursery;
1593 // This assert can be a killer if the app is doing lots
1594 // of large block allocations.
1595 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1597 // now update the nursery to point to the new block
1598 cap->r.rCurrentNursery = bd;
1600 // we might be unlucky and have another thread get on the
1601 // run queue before us and steal the large block, but in that
1602 // case the thread will just end up requesting another large
1604 pushOnRunQueue(cap,t);
1605 return rtsFalse; /* not actually GC'ing */
1609 debugTrace(DEBUG_sched,
1610 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1611 (long)t->id, whatNext_strs[t->what_next]);
1614 ASSERT(!is_on_queue(t,CurrentProc));
1615 #elif defined(PARALLEL_HASKELL)
1616 /* Currently we emit a DESCHEDULE event before GC in GUM.
1617 ToDo: either add separate event to distinguish SYSTEM time from rest
1618 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1619 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1620 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1621 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1622 emitSchedule = rtsTrue;
1626 pushOnRunQueue(cap,t);
1628 /* actual GC is done at the end of the while loop in schedule() */
1631 /* -----------------------------------------------------------------------------
1632 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1633 * -------------------------------------------------------------------------- */
1636 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1638 debugTrace (DEBUG_sched,
1639 "--<< thread %ld (%s) stopped, StackOverflow",
1640 (long)t->id, whatNext_strs[t->what_next]);
1642 /* just adjust the stack for this thread, then pop it back
1646 /* enlarge the stack */
1647 StgTSO *new_t = threadStackOverflow(cap, t);
1649 /* The TSO attached to this Task may have moved, so update the
1652 if (task->tso == t) {
1655 pushOnRunQueue(cap,new_t);
1659 /* -----------------------------------------------------------------------------
1660 * Handle a thread that returned to the scheduler with ThreadYielding
1661 * -------------------------------------------------------------------------- */
1664 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1666 // Reset the context switch flag. We don't do this just before
1667 // running the thread, because that would mean we would lose ticks
1668 // during GC, which can lead to unfair scheduling (a thread hogs
1669 // the CPU because the tick always arrives during GC). This way
1670 // penalises threads that do a lot of allocation, but that seems
1671 // better than the alternative.
1674 /* put the thread back on the run queue. Then, if we're ready to
1675 * GC, check whether this is the last task to stop. If so, wake
1676 * up the GC thread. getThread will block during a GC until the
1680 if (t->what_next != prev_what_next) {
1681 debugTrace(DEBUG_sched,
1682 "--<< thread %ld (%s) stopped to switch evaluators",
1683 (long)t->id, whatNext_strs[t->what_next]);
1685 debugTrace(DEBUG_sched,
1686 "--<< thread %ld (%s) stopped, yielding",
1687 (long)t->id, whatNext_strs[t->what_next]);
1692 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1694 ASSERT(t->link == END_TSO_QUEUE);
1696 // Shortcut if we're just switching evaluators: don't bother
1697 // doing stack squeezing (which can be expensive), just run the
1699 if (t->what_next != prev_what_next) {
1704 ASSERT(!is_on_queue(t,CurrentProc));
1707 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1708 checkThreadQsSanity(rtsTrue));
1712 addToRunQueue(cap,t);
1715 /* add a ContinueThread event to actually process the thread */
1716 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1718 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1720 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1727 /* -----------------------------------------------------------------------------
1728 * Handle a thread that returned to the scheduler with ThreadBlocked
1729 * -------------------------------------------------------------------------- */
1732 scheduleHandleThreadBlocked( StgTSO *t
1733 #if !defined(GRAN) && !defined(DEBUG)
1740 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1741 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1742 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1744 // ??? needed; should emit block before
1746 DumpGranEvent(GR_DESCHEDULE, t));
1747 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1750 ASSERT(procStatus[CurrentProc]==Busy ||
1751 ((procStatus[CurrentProc]==Fetching) &&
1752 (t->block_info.closure!=(StgClosure*)NULL)));
1753 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1754 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1755 procStatus[CurrentProc]==Fetching))
1756 procStatus[CurrentProc] = Idle;
1760 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1761 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1764 if (t->block_info.closure!=(StgClosure*)NULL)
1765 print_bq(t->block_info.closure));
1767 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1770 /* whatever we schedule next, we must log that schedule */
1771 emitSchedule = rtsTrue;
1775 // We don't need to do anything. The thread is blocked, and it
1776 // has tidied up its stack and placed itself on whatever queue
1777 // it needs to be on.
1779 // ASSERT(t->why_blocked != NotBlocked);
1780 // Not true: for example,
1781 // - in THREADED_RTS, the thread may already have been woken
1782 // up by another Capability. This actually happens: try
1783 // conc023 +RTS -N2.
1784 // - the thread may have woken itself up already, because
1785 // threadPaused() might have raised a blocked throwTo
1786 // exception, see maybePerformBlockedException().
1789 if (traceClass(DEBUG_sched)) {
1790 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1791 (unsigned long)t->id, whatNext_strs[t->what_next]);
1792 printThreadBlockage(t);
1797 /* Only for dumping event to log file
1798 ToDo: do I need this in GranSim, too?
1804 /* -----------------------------------------------------------------------------
1805 * Handle a thread that returned to the scheduler with ThreadFinished
1806 * -------------------------------------------------------------------------- */
1809 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1811 /* Need to check whether this was a main thread, and if so,
1812 * return with the return value.
1814 * We also end up here if the thread kills itself with an
1815 * uncaught exception, see Exception.cmm.
1817 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1818 (unsigned long)t->id, whatNext_strs[t->what_next]);
1820 /* Inform the Hpc that a thread has finished */
1821 hs_hpc_thread_finished_event(t);
1824 endThread(t, CurrentProc); // clean-up the thread
1825 #elif defined(PARALLEL_HASKELL)
1826 /* For now all are advisory -- HWL */
1827 //if(t->priority==AdvisoryPriority) ??
1828 advisory_thread_count--; // JB: Caution with this counter, buggy!
1831 if(t->dist.priority==RevalPriority)
1835 # if defined(EDENOLD)
1836 // the thread could still have an outport... (BUG)
1837 if (t->eden.outport != -1) {
1838 // delete the outport for the tso which has finished...
1839 IF_PAR_DEBUG(eden_ports,
1840 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1841 t->eden.outport, t->id));
1844 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1845 if (t->eden.epid != -1) {
1846 IF_PAR_DEBUG(eden_ports,
1847 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1848 t->id, t->eden.epid));
1849 removeTSOfromProcess(t);
1854 if (RtsFlags.ParFlags.ParStats.Full &&
1855 !RtsFlags.ParFlags.ParStats.Suppressed)
1856 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1858 // t->par only contains statistics: left out for now...
1860 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1861 t->id,t,t->par.sparkname));
1863 #endif // PARALLEL_HASKELL
1866 // Check whether the thread that just completed was a bound
1867 // thread, and if so return with the result.
1869 // There is an assumption here that all thread completion goes
1870 // through this point; we need to make sure that if a thread
1871 // ends up in the ThreadKilled state, that it stays on the run
1872 // queue so it can be dealt with here.
1877 if (t->bound != task) {
1878 #if !defined(THREADED_RTS)
1879 // Must be a bound thread that is not the topmost one. Leave
1880 // it on the run queue until the stack has unwound to the
1881 // point where we can deal with this. Leaving it on the run
1882 // queue also ensures that the garbage collector knows about
1883 // this thread and its return value (it gets dropped from the
1884 // all_threads list so there's no other way to find it).
1885 appendToRunQueue(cap,t);
1888 // this cannot happen in the threaded RTS, because a
1889 // bound thread can only be run by the appropriate Task.
1890 barf("finished bound thread that isn't mine");
1894 ASSERT(task->tso == t);
1896 if (t->what_next == ThreadComplete) {
1898 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1899 *(task->ret) = (StgClosure *)task->tso->sp[1];
1901 task->stat = Success;
1904 *(task->ret) = NULL;
1906 if (sched_state >= SCHED_INTERRUPTING) {
1907 task->stat = Interrupted;
1909 task->stat = Killed;
1913 removeThreadLabel((StgWord)task->tso->id);
1915 return rtsTrue; // tells schedule() to return
1921 /* -----------------------------------------------------------------------------
1922 * Perform a heap census, if PROFILING
1923 * -------------------------------------------------------------------------- */
1926 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1928 #if defined(PROFILING)
1929 // When we have +RTS -i0 and we're heap profiling, do a census at
1930 // every GC. This lets us get repeatable runs for debugging.
1931 if (performHeapProfile ||
1932 (RtsFlags.ProfFlags.profileInterval==0 &&
1933 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1935 // checking black holes is necessary before GC, otherwise
1936 // there may be threads that are unreachable except by the
1937 // blackhole queue, which the GC will consider to be
1939 scheduleCheckBlackHoles(&MainCapability);
1941 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1942 GarbageCollect(rtsTrue);
1944 debugTrace(DEBUG_sched, "performing heap census");
1947 performHeapProfile = rtsFalse;
1948 return rtsTrue; // true <=> we already GC'd
1954 /* -----------------------------------------------------------------------------
1955 * Perform a garbage collection if necessary
1956 * -------------------------------------------------------------------------- */
1959 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1963 static volatile StgWord waiting_for_gc;
1964 rtsBool was_waiting;
1969 // In order to GC, there must be no threads running Haskell code.
1970 // Therefore, the GC thread needs to hold *all* the capabilities,
1971 // and release them after the GC has completed.
1973 // This seems to be the simplest way: previous attempts involved
1974 // making all the threads with capabilities give up their
1975 // capabilities and sleep except for the *last* one, which
1976 // actually did the GC. But it's quite hard to arrange for all
1977 // the other tasks to sleep and stay asleep.
1980 was_waiting = cas(&waiting_for_gc, 0, 1);
1983 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1984 if (cap) yieldCapability(&cap,task);
1985 } while (waiting_for_gc);
1986 return cap; // NOTE: task->cap might have changed here
1989 for (i=0; i < n_capabilities; i++) {
1990 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1991 if (cap != &capabilities[i]) {
1992 Capability *pcap = &capabilities[i];
1993 // we better hope this task doesn't get migrated to
1994 // another Capability while we're waiting for this one.
1995 // It won't, because load balancing happens while we have
1996 // all the Capabilities, but even so it's a slightly
1997 // unsavoury invariant.
2000 waitForReturnCapability(&pcap, task);
2001 if (pcap != &capabilities[i]) {
2002 barf("scheduleDoGC: got the wrong capability");
2007 waiting_for_gc = rtsFalse;
2010 /* Kick any transactions which are invalid back to their
2011 * atomically frames. When next scheduled they will try to
2012 * commit, this commit will fail and they will retry.
2017 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2018 if (t->what_next == ThreadRelocated) {
2021 next = t->global_link;
2023 // This is a good place to check for blocked
2024 // exceptions. It might be the case that a thread is
2025 // blocked on delivering an exception to a thread that
2026 // is also blocked - we try to ensure that this
2027 // doesn't happen in throwTo(), but it's too hard (or
2028 // impossible) to close all the race holes, so we
2029 // accept that some might get through and deal with
2030 // them here. A GC will always happen at some point,
2031 // even if the system is otherwise deadlocked.
2032 maybePerformBlockedException (&capabilities[0], t);
2034 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2035 if (!stmValidateNestOfTransactions (t -> trec)) {
2036 debugTrace(DEBUG_sched | DEBUG_stm,
2037 "trec %p found wasting its time", t);
2039 // strip the stack back to the
2040 // ATOMICALLY_FRAME, aborting the (nested)
2041 // transaction, and saving the stack of any
2042 // partially-evaluated thunks on the heap.
2043 throwToSingleThreaded_(&capabilities[0], t,
2044 NULL, rtsTrue, NULL);
2047 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2055 // so this happens periodically:
2056 if (cap) scheduleCheckBlackHoles(cap);
2058 IF_DEBUG(scheduler, printAllThreads());
2061 * We now have all the capabilities; if we're in an interrupting
2062 * state, then we should take the opportunity to delete all the
2063 * threads in the system.
2065 if (sched_state >= SCHED_INTERRUPTING) {
2066 deleteAllThreads(&capabilities[0]);
2067 sched_state = SCHED_SHUTTING_DOWN;
2070 /* everybody back, start the GC.
2071 * Could do it in this thread, or signal a condition var
2072 * to do it in another thread. Either way, we need to
2073 * broadcast on gc_pending_cond afterward.
2075 #if defined(THREADED_RTS)
2076 debugTrace(DEBUG_sched, "doing GC");
2078 GarbageCollect(force_major);
2080 #if defined(THREADED_RTS)
2081 // release our stash of capabilities.
2082 for (i = 0; i < n_capabilities; i++) {
2083 if (cap != &capabilities[i]) {
2084 task->cap = &capabilities[i];
2085 releaseCapability(&capabilities[i]);
2096 /* add a ContinueThread event to continue execution of current thread */
2097 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2099 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2101 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2109 /* ---------------------------------------------------------------------------
2110 * Singleton fork(). Do not copy any running threads.
2111 * ------------------------------------------------------------------------- */
2114 forkProcess(HsStablePtr *entry
2115 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2120 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2126 #if defined(THREADED_RTS)
2127 if (RtsFlags.ParFlags.nNodes > 1) {
2128 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2129 stg_exit(EXIT_FAILURE);
2133 debugTrace(DEBUG_sched, "forking!");
2135 // ToDo: for SMP, we should probably acquire *all* the capabilities
2140 if (pid) { // parent
2142 // just return the pid
2148 // Now, all OS threads except the thread that forked are
2149 // stopped. We need to stop all Haskell threads, including
2150 // those involved in foreign calls. Also we need to delete
2151 // all Tasks, because they correspond to OS threads that are
2154 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2155 if (t->what_next == ThreadRelocated) {
2158 next = t->global_link;
2159 // don't allow threads to catch the ThreadKilled
2160 // exception, but we do want to raiseAsync() because these
2161 // threads may be evaluating thunks that we need later.
2162 deleteThread_(cap,t);
2166 // Empty the run queue. It seems tempting to let all the
2167 // killed threads stay on the run queue as zombies to be
2168 // cleaned up later, but some of them correspond to bound
2169 // threads for which the corresponding Task does not exist.
2170 cap->run_queue_hd = END_TSO_QUEUE;
2171 cap->run_queue_tl = END_TSO_QUEUE;
2173 // Any suspended C-calling Tasks are no more, their OS threads
2175 cap->suspended_ccalling_tasks = NULL;
2177 // Empty the all_threads list. Otherwise, the garbage
2178 // collector may attempt to resurrect some of these threads.
2179 all_threads = END_TSO_QUEUE;
2181 // Wipe the task list, except the current Task.
2182 ACQUIRE_LOCK(&sched_mutex);
2183 for (task = all_tasks; task != NULL; task=task->all_link) {
2184 if (task != cap->running_task) {
2188 RELEASE_LOCK(&sched_mutex);
2190 #if defined(THREADED_RTS)
2191 // Wipe our spare workers list, they no longer exist. New
2192 // workers will be created if necessary.
2193 cap->spare_workers = NULL;
2194 cap->returning_tasks_hd = NULL;
2195 cap->returning_tasks_tl = NULL;
2198 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2199 rts_checkSchedStatus("forkProcess",cap);
2202 hs_exit(); // clean up and exit
2203 stg_exit(EXIT_SUCCESS);
2205 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2206 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2211 /* ---------------------------------------------------------------------------
2212 * Delete all the threads in the system
2213 * ------------------------------------------------------------------------- */
2216 deleteAllThreads ( Capability *cap )
2218 // NOTE: only safe to call if we own all capabilities.
2221 debugTrace(DEBUG_sched,"deleting all threads");
2222 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2223 if (t->what_next == ThreadRelocated) {
2226 next = t->global_link;
2227 deleteThread(cap,t);
2231 // The run queue now contains a bunch of ThreadKilled threads. We
2232 // must not throw these away: the main thread(s) will be in there
2233 // somewhere, and the main scheduler loop has to deal with it.
2234 // Also, the run queue is the only thing keeping these threads from
2235 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2237 #if !defined(THREADED_RTS)
2238 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2239 ASSERT(sleeping_queue == END_TSO_QUEUE);
2243 /* -----------------------------------------------------------------------------
2244 Managing the suspended_ccalling_tasks list.
2245 Locks required: sched_mutex
2246 -------------------------------------------------------------------------- */
2249 suspendTask (Capability *cap, Task *task)
2251 ASSERT(task->next == NULL && task->prev == NULL);
2252 task->next = cap->suspended_ccalling_tasks;
2254 if (cap->suspended_ccalling_tasks) {
2255 cap->suspended_ccalling_tasks->prev = task;
2257 cap->suspended_ccalling_tasks = task;
2261 recoverSuspendedTask (Capability *cap, Task *task)
2264 task->prev->next = task->next;
2266 ASSERT(cap->suspended_ccalling_tasks == task);
2267 cap->suspended_ccalling_tasks = task->next;
2270 task->next->prev = task->prev;
2272 task->next = task->prev = NULL;
2275 /* ---------------------------------------------------------------------------
2276 * Suspending & resuming Haskell threads.
2278 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2279 * its capability before calling the C function. This allows another
2280 * task to pick up the capability and carry on running Haskell
2281 * threads. It also means that if the C call blocks, it won't lock
2284 * The Haskell thread making the C call is put to sleep for the
2285 * duration of the call, on the susepended_ccalling_threads queue. We
2286 * give out a token to the task, which it can use to resume the thread
2287 * on return from the C function.
2288 * ------------------------------------------------------------------------- */
2291 suspendThread (StgRegTable *reg)
2298 StgWord32 saved_winerror;
2301 saved_errno = errno;
2303 saved_winerror = GetLastError();
2306 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2308 cap = regTableToCapability(reg);
2310 task = cap->running_task;
2311 tso = cap->r.rCurrentTSO;
2313 debugTrace(DEBUG_sched,
2314 "thread %lu did a safe foreign call",
2315 (unsigned long)cap->r.rCurrentTSO->id);
2317 // XXX this might not be necessary --SDM
2318 tso->what_next = ThreadRunGHC;
2320 threadPaused(cap,tso);
2322 if ((tso->flags & TSO_BLOCKEX) == 0) {
2323 tso->why_blocked = BlockedOnCCall;
2324 tso->flags |= TSO_BLOCKEX;
2325 tso->flags &= ~TSO_INTERRUPTIBLE;
2327 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2330 // Hand back capability
2331 task->suspended_tso = tso;
2333 ACQUIRE_LOCK(&cap->lock);
2335 suspendTask(cap,task);
2336 cap->in_haskell = rtsFalse;
2337 releaseCapability_(cap);
2339 RELEASE_LOCK(&cap->lock);
2341 #if defined(THREADED_RTS)
2342 /* Preparing to leave the RTS, so ensure there's a native thread/task
2343 waiting to take over.
2345 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2348 errno = saved_errno;
2350 SetLastError(saved_winerror);
2356 resumeThread (void *task_)
2363 StgWord32 saved_winerror;
2366 saved_errno = errno;
2368 saved_winerror = GetLastError();
2372 // Wait for permission to re-enter the RTS with the result.
2373 waitForReturnCapability(&cap,task);
2374 // we might be on a different capability now... but if so, our
2375 // entry on the suspended_ccalling_tasks list will also have been
2378 // Remove the thread from the suspended list
2379 recoverSuspendedTask(cap,task);
2381 tso = task->suspended_tso;
2382 task->suspended_tso = NULL;
2383 tso->link = END_TSO_QUEUE;
2384 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2386 if (tso->why_blocked == BlockedOnCCall) {
2387 awakenBlockedExceptionQueue(cap,tso);
2388 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2391 /* Reset blocking status */
2392 tso->why_blocked = NotBlocked;
2394 cap->r.rCurrentTSO = tso;
2395 cap->in_haskell = rtsTrue;
2396 errno = saved_errno;
2398 SetLastError(saved_winerror);
2401 /* We might have GC'd, mark the TSO dirty again */
2404 IF_DEBUG(sanity, checkTSO(tso));
2409 /* ---------------------------------------------------------------------------
2412 * scheduleThread puts a thread on the end of the runnable queue.
2413 * This will usually be done immediately after a thread is created.
2414 * The caller of scheduleThread must create the thread using e.g.
2415 * createThread and push an appropriate closure
2416 * on this thread's stack before the scheduler is invoked.
2417 * ------------------------------------------------------------------------ */
2420 scheduleThread(Capability *cap, StgTSO *tso)
2422 // The thread goes at the *end* of the run-queue, to avoid possible
2423 // starvation of any threads already on the queue.
2424 appendToRunQueue(cap,tso);
2428 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2430 #if defined(THREADED_RTS)
2431 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2432 // move this thread from now on.
2433 cpu %= RtsFlags.ParFlags.nNodes;
2434 if (cpu == cap->no) {
2435 appendToRunQueue(cap,tso);
2437 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2440 appendToRunQueue(cap,tso);
2445 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2449 // We already created/initialised the Task
2450 task = cap->running_task;
2452 // This TSO is now a bound thread; make the Task and TSO
2453 // point to each other.
2459 task->stat = NoStatus;
2461 appendToRunQueue(cap,tso);
2463 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2466 /* GranSim specific init */
2467 CurrentTSO = m->tso; // the TSO to run
2468 procStatus[MainProc] = Busy; // status of main PE
2469 CurrentProc = MainProc; // PE to run it on
2472 cap = schedule(cap,task);
2474 ASSERT(task->stat != NoStatus);
2475 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2477 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2481 /* ----------------------------------------------------------------------------
2483 * ------------------------------------------------------------------------- */
2485 #if defined(THREADED_RTS)
2487 workerStart(Task *task)
2491 // See startWorkerTask().
2492 ACQUIRE_LOCK(&task->lock);
2494 RELEASE_LOCK(&task->lock);
2496 // set the thread-local pointer to the Task:
2499 // schedule() runs without a lock.
2500 cap = schedule(cap,task);
2502 // On exit from schedule(), we have a Capability.
2503 releaseCapability(cap);
2504 workerTaskStop(task);
2508 /* ---------------------------------------------------------------------------
2511 * Initialise the scheduler. This resets all the queues - if the
2512 * queues contained any threads, they'll be garbage collected at the
2515 * ------------------------------------------------------------------------ */
2522 for (i=0; i<=MAX_PROC; i++) {
2523 run_queue_hds[i] = END_TSO_QUEUE;
2524 run_queue_tls[i] = END_TSO_QUEUE;
2525 blocked_queue_hds[i] = END_TSO_QUEUE;
2526 blocked_queue_tls[i] = END_TSO_QUEUE;
2527 ccalling_threadss[i] = END_TSO_QUEUE;
2528 blackhole_queue[i] = END_TSO_QUEUE;
2529 sleeping_queue = END_TSO_QUEUE;
2531 #elif !defined(THREADED_RTS)
2532 blocked_queue_hd = END_TSO_QUEUE;
2533 blocked_queue_tl = END_TSO_QUEUE;
2534 sleeping_queue = END_TSO_QUEUE;
2537 blackhole_queue = END_TSO_QUEUE;
2538 all_threads = END_TSO_QUEUE;
2541 sched_state = SCHED_RUNNING;
2543 #if defined(THREADED_RTS)
2544 /* Initialise the mutex and condition variables used by
2546 initMutex(&sched_mutex);
2549 ACQUIRE_LOCK(&sched_mutex);
2551 /* A capability holds the state a native thread needs in
2552 * order to execute STG code. At least one capability is
2553 * floating around (only THREADED_RTS builds have more than one).
2559 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2563 #if defined(THREADED_RTS)
2565 * Eagerly start one worker to run each Capability, except for
2566 * Capability 0. The idea is that we're probably going to start a
2567 * bound thread on Capability 0 pretty soon, so we don't want a
2568 * worker task hogging it.
2573 for (i = 1; i < n_capabilities; i++) {
2574 cap = &capabilities[i];
2575 ACQUIRE_LOCK(&cap->lock);
2576 startWorkerTask(cap, workerStart);
2577 RELEASE_LOCK(&cap->lock);
2582 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2584 RELEASE_LOCK(&sched_mutex);
2588 exitScheduler( void )
2592 #if defined(THREADED_RTS)
2593 ACQUIRE_LOCK(&sched_mutex);
2594 task = newBoundTask();
2595 RELEASE_LOCK(&sched_mutex);
2598 // If we haven't killed all the threads yet, do it now.
2599 if (sched_state < SCHED_SHUTTING_DOWN) {
2600 sched_state = SCHED_INTERRUPTING;
2601 scheduleDoGC(NULL,task,rtsFalse);
2603 sched_state = SCHED_SHUTTING_DOWN;
2605 #if defined(THREADED_RTS)
2609 for (i = 0; i < n_capabilities; i++) {
2610 shutdownCapability(&capabilities[i], task);
2612 boundTaskExiting(task);
2616 freeCapability(&MainCapability);
2621 freeScheduler( void )
2624 if (n_capabilities != 1) {
2625 stgFree(capabilities);
2627 #if defined(THREADED_RTS)
2628 closeMutex(&sched_mutex);
2632 /* ---------------------------------------------------------------------------
2633 Where are the roots that we know about?
2635 - all the threads on the runnable queue
2636 - all the threads on the blocked queue
2637 - all the threads on the sleeping queue
2638 - all the thread currently executing a _ccall_GC
2639 - all the "main threads"
2641 ------------------------------------------------------------------------ */
2643 /* This has to be protected either by the scheduler monitor, or by the
2644 garbage collection monitor (probably the latter).
2649 GetRoots( evac_fn evac )
2656 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2657 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2658 evac((StgClosure **)&run_queue_hds[i]);
2659 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2660 evac((StgClosure **)&run_queue_tls[i]);
2662 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2663 evac((StgClosure **)&blocked_queue_hds[i]);
2664 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2665 evac((StgClosure **)&blocked_queue_tls[i]);
2666 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2667 evac((StgClosure **)&ccalling_threads[i]);
2674 for (i = 0; i < n_capabilities; i++) {
2675 cap = &capabilities[i];
2676 evac((StgClosure **)(void *)&cap->run_queue_hd);
2677 evac((StgClosure **)(void *)&cap->run_queue_tl);
2678 #if defined(THREADED_RTS)
2679 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2680 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2682 for (task = cap->suspended_ccalling_tasks; task != NULL;
2684 debugTrace(DEBUG_sched,
2685 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2686 evac((StgClosure **)(void *)&task->suspended_tso);
2692 #if !defined(THREADED_RTS)
2693 evac((StgClosure **)(void *)&blocked_queue_hd);
2694 evac((StgClosure **)(void *)&blocked_queue_tl);
2695 evac((StgClosure **)(void *)&sleeping_queue);
2699 // evac((StgClosure **)&blackhole_queue);
2701 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2702 markSparkQueue(evac);
2705 #if defined(RTS_USER_SIGNALS)
2706 // mark the signal handlers (signals should be already blocked)
2707 markSignalHandlers(evac);
2711 /* -----------------------------------------------------------------------------
2714 This is the interface to the garbage collector from Haskell land.
2715 We provide this so that external C code can allocate and garbage
2716 collect when called from Haskell via _ccall_GC.
2717 -------------------------------------------------------------------------- */
2720 performGC_(rtsBool force_major)
2723 // We must grab a new Task here, because the existing Task may be
2724 // associated with a particular Capability, and chained onto the
2725 // suspended_ccalling_tasks queue.
2726 ACQUIRE_LOCK(&sched_mutex);
2727 task = newBoundTask();
2728 RELEASE_LOCK(&sched_mutex);
2729 scheduleDoGC(NULL,task,force_major);
2730 boundTaskExiting(task);
2736 performGC_(rtsFalse);
2740 performMajorGC(void)
2742 performGC_(rtsTrue);
2745 /* -----------------------------------------------------------------------------
2748 If the thread has reached its maximum stack size, then raise the
2749 StackOverflow exception in the offending thread. Otherwise
2750 relocate the TSO into a larger chunk of memory and adjust its stack
2752 -------------------------------------------------------------------------- */
2755 threadStackOverflow(Capability *cap, StgTSO *tso)
2757 nat new_stack_size, stack_words;
2762 IF_DEBUG(sanity,checkTSO(tso));
2764 // don't allow throwTo() to modify the blocked_exceptions queue
2765 // while we are moving the TSO:
2766 lockClosure((StgClosure *)tso);
2768 if (tso->stack_size >= tso->max_stack_size) {
2770 debugTrace(DEBUG_gc,
2771 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2772 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2774 /* If we're debugging, just print out the top of the stack */
2775 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2778 // Send this thread the StackOverflow exception
2780 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2784 /* Try to double the current stack size. If that takes us over the
2785 * maximum stack size for this thread, then use the maximum instead.
2786 * Finally round up so the TSO ends up as a whole number of blocks.
2788 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2789 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2790 TSO_STRUCT_SIZE)/sizeof(W_);
2791 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2792 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2794 debugTrace(DEBUG_sched,
2795 "increasing stack size from %ld words to %d.",
2796 (long)tso->stack_size, new_stack_size);
2798 dest = (StgTSO *)allocate(new_tso_size);
2799 TICK_ALLOC_TSO(new_stack_size,0);
2801 /* copy the TSO block and the old stack into the new area */
2802 memcpy(dest,tso,TSO_STRUCT_SIZE);
2803 stack_words = tso->stack + tso->stack_size - tso->sp;
2804 new_sp = (P_)dest + new_tso_size - stack_words;
2805 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2807 /* relocate the stack pointers... */
2809 dest->stack_size = new_stack_size;
2811 /* Mark the old TSO as relocated. We have to check for relocated
2812 * TSOs in the garbage collector and any primops that deal with TSOs.
2814 * It's important to set the sp value to just beyond the end
2815 * of the stack, so we don't attempt to scavenge any part of the
2818 tso->what_next = ThreadRelocated;
2820 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2821 tso->why_blocked = NotBlocked;
2823 IF_PAR_DEBUG(verbose,
2824 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2825 tso->id, tso, tso->stack_size);
2826 /* If we're debugging, just print out the top of the stack */
2827 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2833 IF_DEBUG(sanity,checkTSO(dest));
2835 IF_DEBUG(scheduler,printTSO(dest));
2841 /* ---------------------------------------------------------------------------
2843 - usually called inside a signal handler so it mustn't do anything fancy.
2844 ------------------------------------------------------------------------ */
2847 interruptStgRts(void)
2849 sched_state = SCHED_INTERRUPTING;
2854 /* -----------------------------------------------------------------------------
2857 This function causes at least one OS thread to wake up and run the
2858 scheduler loop. It is invoked when the RTS might be deadlocked, or
2859 an external event has arrived that may need servicing (eg. a
2860 keyboard interrupt).
2862 In the single-threaded RTS we don't do anything here; we only have
2863 one thread anyway, and the event that caused us to want to wake up
2864 will have interrupted any blocking system call in progress anyway.
2865 -------------------------------------------------------------------------- */
2870 #if defined(THREADED_RTS)
2871 // This forces the IO Manager thread to wakeup, which will
2872 // in turn ensure that some OS thread wakes up and runs the
2873 // scheduler loop, which will cause a GC and deadlock check.
2878 /* -----------------------------------------------------------------------------
2881 * Check the blackhole_queue for threads that can be woken up. We do
2882 * this periodically: before every GC, and whenever the run queue is
2885 * An elegant solution might be to just wake up all the blocked
2886 * threads with awakenBlockedQueue occasionally: they'll go back to
2887 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2888 * doesn't give us a way to tell whether we've actually managed to
2889 * wake up any threads, so we would be busy-waiting.
2891 * -------------------------------------------------------------------------- */
2894 checkBlackHoles (Capability *cap)
2897 rtsBool any_woke_up = rtsFalse;
2900 // blackhole_queue is global:
2901 ASSERT_LOCK_HELD(&sched_mutex);
2903 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2905 // ASSUMES: sched_mutex
2906 prev = &blackhole_queue;
2907 t = blackhole_queue;
2908 while (t != END_TSO_QUEUE) {
2909 ASSERT(t->why_blocked == BlockedOnBlackHole);
2910 type = get_itbl(t->block_info.closure)->type;
2911 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2912 IF_DEBUG(sanity,checkTSO(t));
2913 t = unblockOne(cap, t);
2914 // urk, the threads migrate to the current capability
2915 // here, but we'd like to keep them on the original one.
2917 any_woke_up = rtsTrue;
2927 /* -----------------------------------------------------------------------------
2930 This is used for interruption (^C) and forking, and corresponds to
2931 raising an exception but without letting the thread catch the
2933 -------------------------------------------------------------------------- */
2936 deleteThread (Capability *cap, StgTSO *tso)
2938 // NOTE: must only be called on a TSO that we have exclusive
2939 // access to, because we will call throwToSingleThreaded() below.
2940 // The TSO must be on the run queue of the Capability we own, or
2941 // we must own all Capabilities.
2943 if (tso->why_blocked != BlockedOnCCall &&
2944 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2945 throwToSingleThreaded(cap,tso,NULL);
2949 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2951 deleteThread_(Capability *cap, StgTSO *tso)
2952 { // for forkProcess only:
2953 // like deleteThread(), but we delete threads in foreign calls, too.
2955 if (tso->why_blocked == BlockedOnCCall ||
2956 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2957 unblockOne(cap,tso);
2958 tso->what_next = ThreadKilled;
2960 deleteThread(cap,tso);
2965 /* -----------------------------------------------------------------------------
2966 raiseExceptionHelper
2968 This function is called by the raise# primitve, just so that we can
2969 move some of the tricky bits of raising an exception from C-- into
2970 C. Who knows, it might be a useful re-useable thing here too.
2971 -------------------------------------------------------------------------- */
2974 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2976 Capability *cap = regTableToCapability(reg);
2977 StgThunk *raise_closure = NULL;
2979 StgRetInfoTable *info;
2981 // This closure represents the expression 'raise# E' where E
2982 // is the exception raise. It is used to overwrite all the
2983 // thunks which are currently under evaluataion.
2986 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2987 // LDV profiling: stg_raise_info has THUNK as its closure
2988 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2989 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2990 // 1 does not cause any problem unless profiling is performed.
2991 // However, when LDV profiling goes on, we need to linearly scan
2992 // small object pool, where raise_closure is stored, so we should
2993 // use MIN_UPD_SIZE.
2995 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2996 // sizeofW(StgClosure)+1);
3000 // Walk up the stack, looking for the catch frame. On the way,
3001 // we update any closures pointed to from update frames with the
3002 // raise closure that we just built.
3006 info = get_ret_itbl((StgClosure *)p);
3007 next = p + stack_frame_sizeW((StgClosure *)p);
3008 switch (info->i.type) {
3011 // Only create raise_closure if we need to.
3012 if (raise_closure == NULL) {
3014 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3015 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3016 raise_closure->payload[0] = exception;
3018 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3022 case ATOMICALLY_FRAME:
3023 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3025 return ATOMICALLY_FRAME;
3031 case CATCH_STM_FRAME:
3032 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3034 return CATCH_STM_FRAME;
3040 case CATCH_RETRY_FRAME:
3049 /* -----------------------------------------------------------------------------
3050 findRetryFrameHelper
3052 This function is called by the retry# primitive. It traverses the stack
3053 leaving tso->sp referring to the frame which should handle the retry.
3055 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3056 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3058 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3059 create) because retries are not considered to be exceptions, despite the
3060 similar implementation.
3062 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3063 not be created within memory transactions.
3064 -------------------------------------------------------------------------- */
3067 findRetryFrameHelper (StgTSO *tso)
3070 StgRetInfoTable *info;
3074 info = get_ret_itbl((StgClosure *)p);
3075 next = p + stack_frame_sizeW((StgClosure *)p);
3076 switch (info->i.type) {
3078 case ATOMICALLY_FRAME:
3079 debugTrace(DEBUG_stm,
3080 "found ATOMICALLY_FRAME at %p during retry", p);
3082 return ATOMICALLY_FRAME;
3084 case CATCH_RETRY_FRAME:
3085 debugTrace(DEBUG_stm,
3086 "found CATCH_RETRY_FRAME at %p during retrry", p);
3088 return CATCH_RETRY_FRAME;
3090 case CATCH_STM_FRAME: {
3091 debugTrace(DEBUG_stm,
3092 "found CATCH_STM_FRAME at %p during retry", p);
3093 StgTRecHeader *trec = tso -> trec;
3094 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3095 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3096 stmAbortTransaction(tso -> cap, trec);
3097 stmFreeAbortedTRec(tso -> cap, trec);
3098 tso -> trec = outer;
3105 ASSERT(info->i.type != CATCH_FRAME);
3106 ASSERT(info->i.type != STOP_FRAME);
3113 /* -----------------------------------------------------------------------------
3114 resurrectThreads is called after garbage collection on the list of
3115 threads found to be garbage. Each of these threads will be woken
3116 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3117 on an MVar, or NonTermination if the thread was blocked on a Black
3120 Locks: assumes we hold *all* the capabilities.
3121 -------------------------------------------------------------------------- */
3124 resurrectThreads (StgTSO *threads)
3129 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3130 next = tso->global_link;
3131 tso->global_link = all_threads;
3133 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3135 // Wake up the thread on the Capability it was last on
3138 switch (tso->why_blocked) {
3140 case BlockedOnException:
3141 /* Called by GC - sched_mutex lock is currently held. */
3142 throwToSingleThreaded(cap, tso,
3143 (StgClosure *)BlockedOnDeadMVar_closure);
3145 case BlockedOnBlackHole:
3146 throwToSingleThreaded(cap, tso,
3147 (StgClosure *)NonTermination_closure);
3150 throwToSingleThreaded(cap, tso,
3151 (StgClosure *)BlockedIndefinitely_closure);
3154 /* This might happen if the thread was blocked on a black hole
3155 * belonging to a thread that we've just woken up (raiseAsync
3156 * can wake up threads, remember...).
3160 barf("resurrectThreads: thread blocked in a strange way");