1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
70 // Turn off inlining when debugging - it obfuscates things
73 # define STATIC_INLINE static
76 /* -----------------------------------------------------------------------------
78 * -------------------------------------------------------------------------- */
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
112 StgTSO *blackhole_queue = NULL;
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
119 rtsBool blackholes_need_checking = rtsFalse;
121 /* Linked list of all threads.
122 * Used for detecting garbage collected threads.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *all_threads = NULL;
127 /* flag set by signal handler to precipitate a context switch
128 * LOCK: none (just an advisory flag)
130 int context_switch = 0;
132 /* flag that tracks whether we have done any execution in this time slice.
133 * LOCK: currently none, perhaps we should lock (but needs to be
134 * updated in the fast path of the scheduler).
136 nat recent_activity = ACTIVITY_YES;
138 /* if this flag is set as well, give up execution
139 * LOCK: none (changes once, from false->true)
141 rtsBool sched_state = SCHED_RUNNING;
143 /* Next thread ID to allocate.
146 static StgThreadID next_thread_id = 1;
148 /* The smallest stack size that makes any sense is:
149 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
150 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
151 * + 1 (the closure to enter)
153 * + 1 (spare slot req'd by stg_ap_v_ret)
155 * A thread with this stack will bomb immediately with a stack
156 * overflow, which will increase its stack size.
158 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
164 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
165 * exists - earlier gccs apparently didn't.
171 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
172 * in an MT setting, needed to signal that a worker thread shouldn't hang around
173 * in the scheduler when it is out of work.
175 rtsBool shutting_down_scheduler = rtsFalse;
178 * This mutex protects most of the global scheduler data in
179 * the THREADED_RTS runtime.
181 #if defined(THREADED_RTS)
185 #if defined(PARALLEL_HASKELL)
187 rtsTime TimeOfLastYield;
188 rtsBool emitSchedule = rtsTrue;
191 /* -----------------------------------------------------------------------------
192 * static function prototypes
193 * -------------------------------------------------------------------------- */
195 static Capability *schedule (Capability *initialCapability, Task *task);
198 // These function all encapsulate parts of the scheduler loop, and are
199 // abstracted only to make the structure and control flow of the
200 // scheduler clearer.
202 static void schedulePreLoop (void);
203 #if defined(THREADED_RTS)
204 static void schedulePushWork(Capability *cap, Task *task);
206 static void scheduleStartSignalHandlers (Capability *cap);
207 static void scheduleCheckBlockedThreads (Capability *cap);
208 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
209 static void scheduleCheckBlackHoles (Capability *cap);
210 static void scheduleDetectDeadlock (Capability *cap, Task *task);
212 static StgTSO *scheduleProcessEvent(rtsEvent *event);
214 #if defined(PARALLEL_HASKELL)
215 static StgTSO *scheduleSendPendingMessages(void);
216 static void scheduleActivateSpark(void);
217 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
219 #if defined(PAR) || defined(GRAN)
220 static void scheduleGranParReport(void);
222 static void schedulePostRunThread(void);
223 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
224 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
226 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
227 nat prev_what_next );
228 static void scheduleHandleThreadBlocked( StgTSO *t );
229 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
231 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
232 static Capability *scheduleDoGC(Capability *cap, Task *task,
234 void (*get_roots)(evac_fn));
236 static void unblockThread(Capability *cap, StgTSO *tso);
237 static rtsBool checkBlackHoles(Capability *cap);
238 static void AllRoots(evac_fn evac);
240 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
242 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
243 rtsBool stop_at_atomically, StgPtr stop_here);
245 static void deleteThread (Capability *cap, StgTSO *tso);
246 static void deleteAllThreads (Capability *cap);
249 static void printThreadBlockage(StgTSO *tso);
250 static void printThreadStatus(StgTSO *tso);
251 void printThreadQueue(StgTSO *tso);
254 #if defined(PARALLEL_HASKELL)
255 StgTSO * createSparkThread(rtsSpark spark);
256 StgTSO * activateSpark (rtsSpark spark);
260 static char *whatNext_strs[] = {
270 /* -----------------------------------------------------------------------------
271 * Putting a thread on the run queue: different scheduling policies
272 * -------------------------------------------------------------------------- */
275 addToRunQueue( Capability *cap, StgTSO *t )
277 #if defined(PARALLEL_HASKELL)
278 if (RtsFlags.ParFlags.doFairScheduling) {
279 // this does round-robin scheduling; good for concurrency
280 appendToRunQueue(cap,t);
282 // this does unfair scheduling; good for parallelism
283 pushOnRunQueue(cap,t);
286 // this does round-robin scheduling; good for concurrency
287 appendToRunQueue(cap,t);
291 /* ---------------------------------------------------------------------------
292 Main scheduling loop.
294 We use round-robin scheduling, each thread returning to the
295 scheduler loop when one of these conditions is detected:
298 * timer expires (thread yields)
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
321 schedule (Capability *initialCapability, Task *task)
325 StgThreadReturnCode ret;
328 #elif defined(PARALLEL_HASKELL)
331 rtsBool receivedFinish = rtsFalse;
333 nat tp_size, sp_size; // stats only
338 #if defined(THREADED_RTS)
339 rtsBool first = rtsTrue;
342 cap = initialCapability;
344 // Pre-condition: this task owns initialCapability.
345 // The sched_mutex is *NOT* held
346 // NB. on return, we still hold a capability.
348 debugTrace (DEBUG_sched,
349 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
350 task, initialCapability);
354 // -----------------------------------------------------------
355 // Scheduler loop starts here:
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION (!receivedFinish)
360 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
362 #define TERMINATION_CONDITION rtsTrue
365 while (TERMINATION_CONDITION) {
368 /* Choose the processor with the next event */
369 CurrentProc = event->proc;
370 CurrentTSO = event->tso;
373 #if defined(THREADED_RTS)
375 // don't yield the first time, we want a chance to run this
376 // thread for a bit, even if there are others banging at the
379 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
381 // Yield the capability to higher-priority tasks if necessary.
382 yieldCapability(&cap, task);
386 #if defined(THREADED_RTS)
387 schedulePushWork(cap,task);
390 // Check whether we have re-entered the RTS from Haskell without
391 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
393 if (cap->in_haskell) {
394 errorBelch("schedule: re-entered unsafely.\n"
395 " Perhaps a 'foreign import unsafe' should be 'safe'?");
396 stg_exit(EXIT_FAILURE);
399 // The interruption / shutdown sequence.
401 // In order to cleanly shut down the runtime, we want to:
402 // * make sure that all main threads return to their callers
403 // with the state 'Interrupted'.
404 // * clean up all OS threads assocated with the runtime
405 // * free all memory etc.
407 // So the sequence for ^C goes like this:
409 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
410 // arranges for some Capability to wake up
412 // * all threads in the system are halted, and the zombies are
413 // placed on the run queue for cleaning up. We acquire all
414 // the capabilities in order to delete the threads, this is
415 // done by scheduleDoGC() for convenience (because GC already
416 // needs to acquire all the capabilities). We can't kill
417 // threads involved in foreign calls.
419 // * somebody calls shutdownHaskell(), which calls exitScheduler()
421 // * sched_state := SCHED_SHUTTING_DOWN
423 // * all workers exit when the run queue on their capability
424 // drains. All main threads will also exit when their TSO
425 // reaches the head of the run queue and they can return.
427 // * eventually all Capabilities will shut down, and the RTS can
430 // * We might be left with threads blocked in foreign calls,
431 // we should really attempt to kill these somehow (TODO);
433 switch (sched_state) {
436 case SCHED_INTERRUPTING:
437 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
438 #if defined(THREADED_RTS)
439 discardSparksCap(cap);
441 /* scheduleDoGC() deletes all the threads */
442 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
444 case SCHED_SHUTTING_DOWN:
445 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
446 // If we are a worker, just exit. If we're a bound thread
447 // then we will exit below when we've removed our TSO from
449 if (task->tso == NULL && emptyRunQueue(cap)) {
454 barf("sched_state: %d", sched_state);
457 #if defined(THREADED_RTS)
458 // If the run queue is empty, take a spark and turn it into a thread.
460 if (emptyRunQueue(cap)) {
462 spark = findSpark(cap);
464 debugTrace(DEBUG_sched,
465 "turning spark of closure %p into a thread",
466 (StgClosure *)spark);
467 createSparkThread(cap,spark);
471 #endif // THREADED_RTS
473 scheduleStartSignalHandlers(cap);
475 // Only check the black holes here if we've nothing else to do.
476 // During normal execution, the black hole list only gets checked
477 // at GC time, to avoid repeatedly traversing this possibly long
478 // list each time around the scheduler.
479 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
481 scheduleCheckWakeupThreads(cap);
483 scheduleCheckBlockedThreads(cap);
485 scheduleDetectDeadlock(cap,task);
486 #if defined(THREADED_RTS)
487 cap = task->cap; // reload cap, it might have changed
490 // Normally, the only way we can get here with no threads to
491 // run is if a keyboard interrupt received during
492 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
493 // Additionally, it is not fatal for the
494 // threaded RTS to reach here with no threads to run.
496 // win32: might be here due to awaitEvent() being abandoned
497 // as a result of a console event having been delivered.
498 if ( emptyRunQueue(cap) ) {
499 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
500 ASSERT(sched_state >= SCHED_INTERRUPTING);
502 continue; // nothing to do
505 #if defined(PARALLEL_HASKELL)
506 scheduleSendPendingMessages();
507 if (emptyRunQueue(cap) && scheduleActivateSpark())
511 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
514 /* If we still have no work we need to send a FISH to get a spark
516 if (emptyRunQueue(cap)) {
517 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
518 ASSERT(rtsFalse); // should not happen at the moment
520 // from here: non-empty run queue.
521 // TODO: merge above case with this, only one call processMessages() !
522 if (PacketsWaiting()) { /* process incoming messages, if
523 any pending... only in else
524 because getRemoteWork waits for
526 receivedFinish = processMessages();
531 scheduleProcessEvent(event);
535 // Get a thread to run
537 t = popRunQueue(cap);
539 #if defined(GRAN) || defined(PAR)
540 scheduleGranParReport(); // some kind of debuging output
542 // Sanity check the thread we're about to run. This can be
543 // expensive if there is lots of thread switching going on...
544 IF_DEBUG(sanity,checkTSO(t));
547 #if defined(THREADED_RTS)
548 // Check whether we can run this thread in the current task.
549 // If not, we have to pass our capability to the right task.
551 Task *bound = t->bound;
555 debugTrace(DEBUG_sched,
556 "### Running thread %d in bound thread", t->id);
557 // yes, the Haskell thread is bound to the current native thread
559 debugTrace(DEBUG_sched,
560 "### thread %d bound to another OS thread", t->id);
561 // no, bound to a different Haskell thread: pass to that thread
562 pushOnRunQueue(cap,t);
566 // The thread we want to run is unbound.
568 debugTrace(DEBUG_sched,
569 "### this OS thread cannot run thread %d", t->id);
570 // no, the current native thread is bound to a different
571 // Haskell thread, so pass it to any worker thread
572 pushOnRunQueue(cap,t);
579 cap->r.rCurrentTSO = t;
581 /* context switches are initiated by the timer signal, unless
582 * the user specified "context switch as often as possible", with
585 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
586 && !emptyThreadQueues(cap)) {
592 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
593 (long)t->id, whatNext_strs[t->what_next]);
595 #if defined(PROFILING)
596 startHeapProfTimer();
599 // ----------------------------------------------------------------------
600 // Run the current thread
602 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
603 ASSERT(t->cap == cap);
605 prev_what_next = t->what_next;
607 errno = t->saved_errno;
608 cap->in_haskell = rtsTrue;
612 recent_activity = ACTIVITY_YES;
614 switch (prev_what_next) {
618 /* Thread already finished, return to scheduler. */
619 ret = ThreadFinished;
625 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
626 cap = regTableToCapability(r);
631 case ThreadInterpret:
632 cap = interpretBCO(cap);
637 barf("schedule: invalid what_next field");
640 cap->in_haskell = rtsFalse;
642 // The TSO might have moved, eg. if it re-entered the RTS and a GC
643 // happened. So find the new location:
644 t = cap->r.rCurrentTSO;
646 // We have run some Haskell code: there might be blackhole-blocked
647 // threads to wake up now.
648 // Lock-free test here should be ok, we're just setting a flag.
649 if ( blackhole_queue != END_TSO_QUEUE ) {
650 blackholes_need_checking = rtsTrue;
653 // And save the current errno in this thread.
654 // XXX: possibly bogus for SMP because this thread might already
655 // be running again, see code below.
656 t->saved_errno = errno;
658 #if defined(THREADED_RTS)
659 // If ret is ThreadBlocked, and this Task is bound to the TSO that
660 // blocked, we are in limbo - the TSO is now owned by whatever it
661 // is blocked on, and may in fact already have been woken up,
662 // perhaps even on a different Capability. It may be the case
663 // that task->cap != cap. We better yield this Capability
664 // immediately and return to normaility.
665 if (ret == ThreadBlocked) {
666 debugTrace(DEBUG_sched,
667 "--<< thread %d (%s) stopped: blocked",
668 t->id, whatNext_strs[t->what_next]);
673 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674 ASSERT(t->cap == cap);
676 // ----------------------------------------------------------------------
678 // Costs for the scheduler are assigned to CCS_SYSTEM
679 #if defined(PROFILING)
684 schedulePostRunThread();
686 ready_to_gc = rtsFalse;
690 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
694 scheduleHandleStackOverflow(cap,task,t);
698 if (scheduleHandleYield(cap, t, prev_what_next)) {
699 // shortcut for switching between compiler/interpreter:
705 scheduleHandleThreadBlocked(t);
709 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
714 barf("schedule: invalid thread return code %d", (int)ret);
717 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
719 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
721 } /* end of while() */
723 debugTrace(PAR_DEBUG_verbose,
724 "== Leaving schedule() after having received Finish");
727 /* ----------------------------------------------------------------------------
728 * Setting up the scheduler loop
729 * ------------------------------------------------------------------------- */
732 schedulePreLoop(void)
735 /* set up first event to get things going */
736 /* ToDo: assign costs for system setup and init MainTSO ! */
737 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
739 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
741 debugTrace (DEBUG_gran,
742 "GRAN: Init CurrentTSO (in schedule) = %p",
744 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
746 if (RtsFlags.GranFlags.Light) {
747 /* Save current time; GranSim Light only */
748 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
753 /* -----------------------------------------------------------------------------
756 * Push work to other Capabilities if we have some.
757 * -------------------------------------------------------------------------- */
759 #if defined(THREADED_RTS)
761 schedulePushWork(Capability *cap USED_IF_THREADS,
762 Task *task USED_IF_THREADS)
764 Capability *free_caps[n_capabilities], *cap0;
767 // migration can be turned off with +RTS -qg
768 if (!RtsFlags.ParFlags.migrate) return;
770 // Check whether we have more threads on our run queue, or sparks
771 // in our pool, that we could hand to another Capability.
772 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
773 && sparkPoolSizeCap(cap) < 2) {
777 // First grab as many free Capabilities as we can.
778 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
779 cap0 = &capabilities[i];
780 if (cap != cap0 && tryGrabCapability(cap0,task)) {
781 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
782 // it already has some work, we just grabbed it at
783 // the wrong moment. Or maybe it's deadlocked!
784 releaseCapability(cap0);
786 free_caps[n_free_caps++] = cap0;
791 // we now have n_free_caps free capabilities stashed in
792 // free_caps[]. Share our run queue equally with them. This is
793 // probably the simplest thing we could do; improvements we might
794 // want to do include:
796 // - giving high priority to moving relatively new threads, on
797 // the gournds that they haven't had time to build up a
798 // working set in the cache on this CPU/Capability.
800 // - giving low priority to moving long-lived threads
802 if (n_free_caps > 0) {
803 StgTSO *prev, *t, *next;
804 rtsBool pushed_to_all;
806 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
809 pushed_to_all = rtsFalse;
811 if (cap->run_queue_hd != END_TSO_QUEUE) {
812 prev = cap->run_queue_hd;
814 prev->link = END_TSO_QUEUE;
815 for (; t != END_TSO_QUEUE; t = next) {
817 t->link = END_TSO_QUEUE;
818 if (t->what_next == ThreadRelocated
819 || t->bound == task // don't move my bound thread
820 || tsoLocked(t)) { // don't move a locked thread
823 } else if (i == n_free_caps) {
824 pushed_to_all = rtsTrue;
830 debugTrace(DEBUG_sched, "pushing thread %d to capability %d", t->id, free_caps[i]->no);
831 appendToRunQueue(free_caps[i],t);
832 if (t->bound) { t->bound->cap = free_caps[i]; }
833 t->cap = free_caps[i];
837 cap->run_queue_tl = prev;
840 // If there are some free capabilities that we didn't push any
841 // threads to, then try to push a spark to each one.
842 if (!pushed_to_all) {
844 // i is the next free capability to push to
845 for (; i < n_free_caps; i++) {
846 if (emptySparkPoolCap(free_caps[i])) {
847 spark = findSpark(cap);
849 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
850 newSpark(&(free_caps[i]->r), spark);
856 // release the capabilities
857 for (i = 0; i < n_free_caps; i++) {
858 task->cap = free_caps[i];
859 releaseCapability(free_caps[i]);
862 task->cap = cap; // reset to point to our Capability.
866 /* ----------------------------------------------------------------------------
867 * Start any pending signal handlers
868 * ------------------------------------------------------------------------- */
870 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
872 scheduleStartSignalHandlers(Capability *cap)
874 if (signals_pending()) { // safe outside the lock
875 startSignalHandlers(cap);
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
885 /* ----------------------------------------------------------------------------
886 * Check for blocked threads that can be woken up.
887 * ------------------------------------------------------------------------- */
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
892 #if !defined(THREADED_RTS)
894 // Check whether any waiting threads need to be woken up. If the
895 // run queue is empty, and there are no other tasks running, we
896 // can wait indefinitely for something to happen.
898 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
900 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
906 /* ----------------------------------------------------------------------------
907 * Check for threads woken up by other Capabilities
908 * ------------------------------------------------------------------------- */
911 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
913 #if defined(THREADED_RTS)
914 // Any threads that were woken up by other Capabilities get
915 // appended to our run queue.
916 if (!emptyWakeupQueue(cap)) {
917 ACQUIRE_LOCK(&cap->lock);
918 if (emptyRunQueue(cap)) {
919 cap->run_queue_hd = cap->wakeup_queue_hd;
920 cap->run_queue_tl = cap->wakeup_queue_tl;
922 cap->run_queue_tl->link = cap->wakeup_queue_hd;
923 cap->run_queue_tl = cap->wakeup_queue_tl;
925 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
926 RELEASE_LOCK(&cap->lock);
931 /* ----------------------------------------------------------------------------
932 * Check for threads blocked on BLACKHOLEs that can be woken up
933 * ------------------------------------------------------------------------- */
935 scheduleCheckBlackHoles (Capability *cap)
937 if ( blackholes_need_checking ) // check without the lock first
939 ACQUIRE_LOCK(&sched_mutex);
940 if ( blackholes_need_checking ) {
941 checkBlackHoles(cap);
942 blackholes_need_checking = rtsFalse;
944 RELEASE_LOCK(&sched_mutex);
948 /* ----------------------------------------------------------------------------
949 * Detect deadlock conditions and attempt to resolve them.
950 * ------------------------------------------------------------------------- */
953 scheduleDetectDeadlock (Capability *cap, Task *task)
956 #if defined(PARALLEL_HASKELL)
957 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
962 * Detect deadlock: when we have no threads to run, there are no
963 * threads blocked, waiting for I/O, or sleeping, and all the
964 * other tasks are waiting for work, we must have a deadlock of
967 if ( emptyThreadQueues(cap) )
969 #if defined(THREADED_RTS)
971 * In the threaded RTS, we only check for deadlock if there
972 * has been no activity in a complete timeslice. This means
973 * we won't eagerly start a full GC just because we don't have
974 * any threads to run currently.
976 if (recent_activity != ACTIVITY_INACTIVE) return;
979 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
981 // Garbage collection can release some new threads due to
982 // either (a) finalizers or (b) threads resurrected because
983 // they are unreachable and will therefore be sent an
984 // exception. Any threads thus released will be immediately
986 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
988 recent_activity = ACTIVITY_DONE_GC;
990 if ( !emptyRunQueue(cap) ) return;
992 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
993 /* If we have user-installed signal handlers, then wait
994 * for signals to arrive rather then bombing out with a
997 if ( anyUserHandlers() ) {
998 debugTrace(DEBUG_sched,
999 "still deadlocked, waiting for signals...");
1003 if (signals_pending()) {
1004 startSignalHandlers(cap);
1007 // either we have threads to run, or we were interrupted:
1008 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1012 #if !defined(THREADED_RTS)
1013 /* Probably a real deadlock. Send the current main thread the
1014 * Deadlock exception.
1017 switch (task->tso->why_blocked) {
1019 case BlockedOnBlackHole:
1020 case BlockedOnException:
1022 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1025 barf("deadlock: main thread blocked in a strange way");
1033 /* ----------------------------------------------------------------------------
1034 * Process an event (GRAN only)
1035 * ------------------------------------------------------------------------- */
1039 scheduleProcessEvent(rtsEvent *event)
1043 if (RtsFlags.GranFlags.Light)
1044 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1046 /* adjust time based on time-stamp */
1047 if (event->time > CurrentTime[CurrentProc] &&
1048 event->evttype != ContinueThread)
1049 CurrentTime[CurrentProc] = event->time;
1051 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1052 if (!RtsFlags.GranFlags.Light)
1055 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1057 /* main event dispatcher in GranSim */
1058 switch (event->evttype) {
1059 /* Should just be continuing execution */
1060 case ContinueThread:
1061 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1062 /* ToDo: check assertion
1063 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1064 run_queue_hd != END_TSO_QUEUE);
1066 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1067 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1068 procStatus[CurrentProc]==Fetching) {
1069 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1070 CurrentTSO->id, CurrentTSO, CurrentProc);
1073 /* Ignore ContinueThreads for completed threads */
1074 if (CurrentTSO->what_next == ThreadComplete) {
1075 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1076 CurrentTSO->id, CurrentTSO, CurrentProc);
1079 /* Ignore ContinueThreads for threads that are being migrated */
1080 if (PROCS(CurrentTSO)==Nowhere) {
1081 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1082 CurrentTSO->id, CurrentTSO, CurrentProc);
1085 /* The thread should be at the beginning of the run queue */
1086 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1087 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1088 CurrentTSO->id, CurrentTSO, CurrentProc);
1089 break; // run the thread anyway
1092 new_event(proc, proc, CurrentTime[proc],
1094 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1096 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1097 break; // now actually run the thread; DaH Qu'vam yImuHbej
1100 do_the_fetchnode(event);
1101 goto next_thread; /* handle next event in event queue */
1104 do_the_globalblock(event);
1105 goto next_thread; /* handle next event in event queue */
1108 do_the_fetchreply(event);
1109 goto next_thread; /* handle next event in event queue */
1111 case UnblockThread: /* Move from the blocked queue to the tail of */
1112 do_the_unblock(event);
1113 goto next_thread; /* handle next event in event queue */
1115 case ResumeThread: /* Move from the blocked queue to the tail of */
1116 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1117 event->tso->gran.blocktime +=
1118 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1119 do_the_startthread(event);
1120 goto next_thread; /* handle next event in event queue */
1123 do_the_startthread(event);
1124 goto next_thread; /* handle next event in event queue */
1127 do_the_movethread(event);
1128 goto next_thread; /* handle next event in event queue */
1131 do_the_movespark(event);
1132 goto next_thread; /* handle next event in event queue */
1135 do_the_findwork(event);
1136 goto next_thread; /* handle next event in event queue */
1139 barf("Illegal event type %u\n", event->evttype);
1142 /* This point was scheduler_loop in the old RTS */
1144 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1146 TimeOfLastEvent = CurrentTime[CurrentProc];
1147 TimeOfNextEvent = get_time_of_next_event();
1148 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1149 // CurrentTSO = ThreadQueueHd;
1151 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1154 if (RtsFlags.GranFlags.Light)
1155 GranSimLight_leave_system(event, &ActiveTSO);
1157 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1160 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1162 /* in a GranSim setup the TSO stays on the run queue */
1164 /* Take a thread from the run queue. */
1165 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1168 debugBelch("GRAN: About to run current thread, which is\n");
1171 context_switch = 0; // turned on via GranYield, checking events and time slice
1174 DumpGranEvent(GR_SCHEDULE, t));
1176 procStatus[CurrentProc] = Busy;
1180 /* ----------------------------------------------------------------------------
1181 * Send pending messages (PARALLEL_HASKELL only)
1182 * ------------------------------------------------------------------------- */
1184 #if defined(PARALLEL_HASKELL)
1186 scheduleSendPendingMessages(void)
1192 # if defined(PAR) // global Mem.Mgmt., omit for now
1193 if (PendingFetches != END_BF_QUEUE) {
1198 if (RtsFlags.ParFlags.BufferTime) {
1199 // if we use message buffering, we must send away all message
1200 // packets which have become too old...
1206 /* ----------------------------------------------------------------------------
1207 * Activate spark threads (PARALLEL_HASKELL only)
1208 * ------------------------------------------------------------------------- */
1210 #if defined(PARALLEL_HASKELL)
1212 scheduleActivateSpark(void)
1215 ASSERT(emptyRunQueue());
1216 /* We get here if the run queue is empty and want some work.
1217 We try to turn a spark into a thread, and add it to the run queue,
1218 from where it will be picked up in the next iteration of the scheduler
1222 /* :-[ no local threads => look out for local sparks */
1223 /* the spark pool for the current PE */
1224 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1225 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1226 pool->hd < pool->tl) {
1228 * ToDo: add GC code check that we really have enough heap afterwards!!
1230 * If we're here (no runnable threads) and we have pending
1231 * sparks, we must have a space problem. Get enough space
1232 * to turn one of those pending sparks into a
1236 spark = findSpark(rtsFalse); /* get a spark */
1237 if (spark != (rtsSpark) NULL) {
1238 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1239 IF_PAR_DEBUG(fish, // schedule,
1240 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1241 tso->id, tso, advisory_thread_count));
1243 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1244 IF_PAR_DEBUG(fish, // schedule,
1245 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1247 return rtsFalse; /* failed to generate a thread */
1248 } /* otherwise fall through & pick-up new tso */
1250 IF_PAR_DEBUG(fish, // schedule,
1251 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1252 spark_queue_len(pool)));
1253 return rtsFalse; /* failed to generate a thread */
1255 return rtsTrue; /* success in generating a thread */
1256 } else { /* no more threads permitted or pool empty */
1257 return rtsFalse; /* failed to generateThread */
1260 tso = NULL; // avoid compiler warning only
1261 return rtsFalse; /* dummy in non-PAR setup */
1264 #endif // PARALLEL_HASKELL
1266 /* ----------------------------------------------------------------------------
1267 * Get work from a remote node (PARALLEL_HASKELL only)
1268 * ------------------------------------------------------------------------- */
1270 #if defined(PARALLEL_HASKELL)
1272 scheduleGetRemoteWork(rtsBool *receivedFinish)
1274 ASSERT(emptyRunQueue());
1276 if (RtsFlags.ParFlags.BufferTime) {
1277 IF_PAR_DEBUG(verbose,
1278 debugBelch("...send all pending data,"));
1281 for (i=1; i<=nPEs; i++)
1282 sendImmediately(i); // send all messages away immediately
1286 //++EDEN++ idle() , i.e. send all buffers, wait for work
1287 // suppress fishing in EDEN... just look for incoming messages
1288 // (blocking receive)
1289 IF_PAR_DEBUG(verbose,
1290 debugBelch("...wait for incoming messages...\n"));
1291 *receivedFinish = processMessages(); // blocking receive...
1293 // and reenter scheduling loop after having received something
1294 // (return rtsFalse below)
1296 # else /* activate SPARKS machinery */
1297 /* We get here, if we have no work, tried to activate a local spark, but still
1298 have no work. We try to get a remote spark, by sending a FISH message.
1299 Thread migration should be added here, and triggered when a sequence of
1300 fishes returns without work. */
1301 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1303 /* =8-[ no local sparks => look for work on other PEs */
1305 * We really have absolutely no work. Send out a fish
1306 * (there may be some out there already), and wait for
1307 * something to arrive. We clearly can't run any threads
1308 * until a SCHEDULE or RESUME arrives, and so that's what
1309 * we're hoping to see. (Of course, we still have to
1310 * respond to other types of messages.)
1312 rtsTime now = msTime() /*CURRENT_TIME*/;
1313 IF_PAR_DEBUG(verbose,
1314 debugBelch("-- now=%ld\n", now));
1315 IF_PAR_DEBUG(fish, // verbose,
1316 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1317 (last_fish_arrived_at!=0 &&
1318 last_fish_arrived_at+delay > now)) {
1319 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1320 now, last_fish_arrived_at+delay,
1321 last_fish_arrived_at,
1325 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1326 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1327 if (last_fish_arrived_at==0 ||
1328 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1329 /* outstandingFishes is set in sendFish, processFish;
1330 avoid flooding system with fishes via delay */
1331 next_fish_to_send_at = 0;
1333 /* ToDo: this should be done in the main scheduling loop to avoid the
1334 busy wait here; not so bad if fish delay is very small */
1335 int iq = 0; // DEBUGGING -- HWL
1336 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1337 /* send a fish when ready, but process messages that arrive in the meantime */
1339 if (PacketsWaiting()) {
1341 *receivedFinish = processMessages();
1344 } while (!*receivedFinish || now<next_fish_to_send_at);
1345 // JB: This means the fish could become obsolete, if we receive
1346 // work. Better check for work again?
1347 // last line: while (!receivedFinish || !haveWork || now<...)
1348 // next line: if (receivedFinish || haveWork )
1350 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1351 return rtsFalse; // NB: this will leave scheduler loop
1352 // immediately after return!
1354 IF_PAR_DEBUG(fish, // verbose,
1355 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1359 // JB: IMHO, this should all be hidden inside sendFish(...)
1361 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1364 // Global statistics: count no. of fishes
1365 if (RtsFlags.ParFlags.ParStats.Global &&
1366 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1367 globalParStats.tot_fish_mess++;
1371 /* delayed fishes must have been sent by now! */
1372 next_fish_to_send_at = 0;
1375 *receivedFinish = processMessages();
1376 # endif /* SPARKS */
1379 /* NB: this function always returns rtsFalse, meaning the scheduler
1380 loop continues with the next iteration;
1382 return code means success in finding work; we enter this function
1383 if there is no local work, thus have to send a fish which takes
1384 time until it arrives with work; in the meantime we should process
1385 messages in the main loop;
1388 #endif // PARALLEL_HASKELL
1390 /* ----------------------------------------------------------------------------
1391 * PAR/GRAN: Report stats & debugging info(?)
1392 * ------------------------------------------------------------------------- */
1394 #if defined(PAR) || defined(GRAN)
1396 scheduleGranParReport(void)
1398 ASSERT(run_queue_hd != END_TSO_QUEUE);
1400 /* Take a thread from the run queue, if we have work */
1401 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1403 /* If this TSO has got its outport closed in the meantime,
1404 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1405 * It has to be marked as TH_DEAD for this purpose.
1406 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1408 JB: TODO: investigate wether state change field could be nuked
1409 entirely and replaced by the normal tso state (whatnext
1410 field). All we want to do is to kill tsos from outside.
1413 /* ToDo: write something to the log-file
1414 if (RTSflags.ParFlags.granSimStats && !sameThread)
1415 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1419 /* the spark pool for the current PE */
1420 pool = &(cap.r.rSparks); // cap = (old) MainCap
1423 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1424 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1427 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1428 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1430 if (RtsFlags.ParFlags.ParStats.Full &&
1431 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1432 (emitSchedule || // forced emit
1433 (t && LastTSO && t->id != LastTSO->id))) {
1435 we are running a different TSO, so write a schedule event to log file
1436 NB: If we use fair scheduling we also have to write a deschedule
1437 event for LastTSO; with unfair scheduling we know that the
1438 previous tso has blocked whenever we switch to another tso, so
1439 we don't need it in GUM for now
1441 IF_PAR_DEBUG(fish, // schedule,
1442 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1444 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1445 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1446 emitSchedule = rtsFalse;
1451 /* ----------------------------------------------------------------------------
1452 * After running a thread...
1453 * ------------------------------------------------------------------------- */
1456 schedulePostRunThread(void)
1459 /* HACK 675: if the last thread didn't yield, make sure to print a
1460 SCHEDULE event to the log file when StgRunning the next thread, even
1461 if it is the same one as before */
1463 TimeOfLastYield = CURRENT_TIME;
1466 /* some statistics gathering in the parallel case */
1468 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1472 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1473 globalGranStats.tot_heapover++;
1475 globalParStats.tot_heapover++;
1482 DumpGranEvent(GR_DESCHEDULE, t));
1483 globalGranStats.tot_stackover++;
1486 // DumpGranEvent(GR_DESCHEDULE, t);
1487 globalParStats.tot_stackover++;
1491 case ThreadYielding:
1494 DumpGranEvent(GR_DESCHEDULE, t));
1495 globalGranStats.tot_yields++;
1498 // DumpGranEvent(GR_DESCHEDULE, t);
1499 globalParStats.tot_yields++;
1505 debugTrace(DEBUG_sched,
1506 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1507 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1508 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1509 if (t->block_info.closure!=(StgClosure*)NULL)
1510 print_bq(t->block_info.closure);
1513 // ??? needed; should emit block before
1515 DumpGranEvent(GR_DESCHEDULE, t));
1516 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1519 ASSERT(procStatus[CurrentProc]==Busy ||
1520 ((procStatus[CurrentProc]==Fetching) &&
1521 (t->block_info.closure!=(StgClosure*)NULL)));
1522 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1523 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1524 procStatus[CurrentProc]==Fetching))
1525 procStatus[CurrentProc] = Idle;
1528 //++PAR++ blockThread() writes the event (change?)
1532 case ThreadFinished:
1536 barf("parGlobalStats: unknown return code");
1542 /* -----------------------------------------------------------------------------
1543 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1544 * -------------------------------------------------------------------------- */
1547 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1549 // did the task ask for a large block?
1550 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1551 // if so, get one and push it on the front of the nursery.
1555 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1557 debugTrace(DEBUG_sched,
1558 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1559 (long)t->id, whatNext_strs[t->what_next], blocks);
1561 // don't do this if the nursery is (nearly) full, we'll GC first.
1562 if (cap->r.rCurrentNursery->link != NULL ||
1563 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1564 // if the nursery has only one block.
1567 bd = allocGroup( blocks );
1569 cap->r.rNursery->n_blocks += blocks;
1571 // link the new group into the list
1572 bd->link = cap->r.rCurrentNursery;
1573 bd->u.back = cap->r.rCurrentNursery->u.back;
1574 if (cap->r.rCurrentNursery->u.back != NULL) {
1575 cap->r.rCurrentNursery->u.back->link = bd;
1577 #if !defined(THREADED_RTS)
1578 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1579 g0s0 == cap->r.rNursery);
1581 cap->r.rNursery->blocks = bd;
1583 cap->r.rCurrentNursery->u.back = bd;
1585 // initialise it as a nursery block. We initialise the
1586 // step, gen_no, and flags field of *every* sub-block in
1587 // this large block, because this is easier than making
1588 // sure that we always find the block head of a large
1589 // block whenever we call Bdescr() (eg. evacuate() and
1590 // isAlive() in the GC would both have to do this, at
1594 for (x = bd; x < bd + blocks; x++) {
1595 x->step = cap->r.rNursery;
1601 // This assert can be a killer if the app is doing lots
1602 // of large block allocations.
1603 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1605 // now update the nursery to point to the new block
1606 cap->r.rCurrentNursery = bd;
1608 // we might be unlucky and have another thread get on the
1609 // run queue before us and steal the large block, but in that
1610 // case the thread will just end up requesting another large
1612 pushOnRunQueue(cap,t);
1613 return rtsFalse; /* not actually GC'ing */
1617 debugTrace(DEBUG_sched,
1618 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1619 (long)t->id, whatNext_strs[t->what_next]);
1622 ASSERT(!is_on_queue(t,CurrentProc));
1623 #elif defined(PARALLEL_HASKELL)
1624 /* Currently we emit a DESCHEDULE event before GC in GUM.
1625 ToDo: either add separate event to distinguish SYSTEM time from rest
1626 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1627 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1628 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1629 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1630 emitSchedule = rtsTrue;
1634 pushOnRunQueue(cap,t);
1636 /* actual GC is done at the end of the while loop in schedule() */
1639 /* -----------------------------------------------------------------------------
1640 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1641 * -------------------------------------------------------------------------- */
1644 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1646 debugTrace (DEBUG_sched,
1647 "--<< thread %ld (%s) stopped, StackOverflow\n",
1648 (long)t->id, whatNext_strs[t->what_next]);
1650 /* just adjust the stack for this thread, then pop it back
1654 /* enlarge the stack */
1655 StgTSO *new_t = threadStackOverflow(cap, t);
1657 /* The TSO attached to this Task may have moved, so update the
1660 if (task->tso == t) {
1663 pushOnRunQueue(cap,new_t);
1667 /* -----------------------------------------------------------------------------
1668 * Handle a thread that returned to the scheduler with ThreadYielding
1669 * -------------------------------------------------------------------------- */
1672 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1674 // Reset the context switch flag. We don't do this just before
1675 // running the thread, because that would mean we would lose ticks
1676 // during GC, which can lead to unfair scheduling (a thread hogs
1677 // the CPU because the tick always arrives during GC). This way
1678 // penalises threads that do a lot of allocation, but that seems
1679 // better than the alternative.
1682 /* put the thread back on the run queue. Then, if we're ready to
1683 * GC, check whether this is the last task to stop. If so, wake
1684 * up the GC thread. getThread will block during a GC until the
1688 if (t->what_next != prev_what_next) {
1689 debugTrace(DEBUG_sched,
1690 "--<< thread %ld (%s) stopped to switch evaluators\n",
1691 (long)t->id, whatNext_strs[t->what_next]);
1693 debugTrace(DEBUG_sched,
1694 "--<< thread %ld (%s) stopped, yielding\n",
1695 (long)t->id, whatNext_strs[t->what_next]);
1700 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1702 ASSERT(t->link == END_TSO_QUEUE);
1704 // Shortcut if we're just switching evaluators: don't bother
1705 // doing stack squeezing (which can be expensive), just run the
1707 if (t->what_next != prev_what_next) {
1712 ASSERT(!is_on_queue(t,CurrentProc));
1715 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1716 checkThreadQsSanity(rtsTrue));
1720 addToRunQueue(cap,t);
1723 /* add a ContinueThread event to actually process the thread */
1724 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1726 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1728 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1735 /* -----------------------------------------------------------------------------
1736 * Handle a thread that returned to the scheduler with ThreadBlocked
1737 * -------------------------------------------------------------------------- */
1740 scheduleHandleThreadBlocked( StgTSO *t
1741 #if !defined(GRAN) && !defined(DEBUG)
1748 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1749 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1750 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1752 // ??? needed; should emit block before
1754 DumpGranEvent(GR_DESCHEDULE, t));
1755 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1758 ASSERT(procStatus[CurrentProc]==Busy ||
1759 ((procStatus[CurrentProc]==Fetching) &&
1760 (t->block_info.closure!=(StgClosure*)NULL)));
1761 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1762 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1763 procStatus[CurrentProc]==Fetching))
1764 procStatus[CurrentProc] = Idle;
1768 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1769 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1772 if (t->block_info.closure!=(StgClosure*)NULL)
1773 print_bq(t->block_info.closure));
1775 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1778 /* whatever we schedule next, we must log that schedule */
1779 emitSchedule = rtsTrue;
1783 // We don't need to do anything. The thread is blocked, and it
1784 // has tidied up its stack and placed itself on whatever queue
1785 // it needs to be on.
1787 #if !defined(THREADED_RTS)
1788 ASSERT(t->why_blocked != NotBlocked);
1789 // This might not be true under THREADED_RTS: we don't have
1790 // exclusive access to this TSO, so someone might have
1791 // woken it up by now. This actually happens: try
1792 // conc023 +RTS -N2.
1796 if (traceClass(DEBUG_sched)) {
1797 debugTraceBegin("--<< thread %d (%s) stopped: ",
1798 t->id, whatNext_strs[t->what_next]);
1799 printThreadBlockage(t);
1804 /* Only for dumping event to log file
1805 ToDo: do I need this in GranSim, too?
1811 /* -----------------------------------------------------------------------------
1812 * Handle a thread that returned to the scheduler with ThreadFinished
1813 * -------------------------------------------------------------------------- */
1816 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1818 /* Need to check whether this was a main thread, and if so,
1819 * return with the return value.
1821 * We also end up here if the thread kills itself with an
1822 * uncaught exception, see Exception.cmm.
1824 debugTrace(DEBUG_sched, "--++ thread %d (%s) finished",
1825 t->id, whatNext_strs[t->what_next]);
1828 endThread(t, CurrentProc); // clean-up the thread
1829 #elif defined(PARALLEL_HASKELL)
1830 /* For now all are advisory -- HWL */
1831 //if(t->priority==AdvisoryPriority) ??
1832 advisory_thread_count--; // JB: Caution with this counter, buggy!
1835 if(t->dist.priority==RevalPriority)
1839 # if defined(EDENOLD)
1840 // the thread could still have an outport... (BUG)
1841 if (t->eden.outport != -1) {
1842 // delete the outport for the tso which has finished...
1843 IF_PAR_DEBUG(eden_ports,
1844 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1845 t->eden.outport, t->id));
1848 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1849 if (t->eden.epid != -1) {
1850 IF_PAR_DEBUG(eden_ports,
1851 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1852 t->id, t->eden.epid));
1853 removeTSOfromProcess(t);
1858 if (RtsFlags.ParFlags.ParStats.Full &&
1859 !RtsFlags.ParFlags.ParStats.Suppressed)
1860 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1862 // t->par only contains statistics: left out for now...
1864 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1865 t->id,t,t->par.sparkname));
1867 #endif // PARALLEL_HASKELL
1870 // Check whether the thread that just completed was a bound
1871 // thread, and if so return with the result.
1873 // There is an assumption here that all thread completion goes
1874 // through this point; we need to make sure that if a thread
1875 // ends up in the ThreadKilled state, that it stays on the run
1876 // queue so it can be dealt with here.
1881 if (t->bound != task) {
1882 #if !defined(THREADED_RTS)
1883 // Must be a bound thread that is not the topmost one. Leave
1884 // it on the run queue until the stack has unwound to the
1885 // point where we can deal with this. Leaving it on the run
1886 // queue also ensures that the garbage collector knows about
1887 // this thread and its return value (it gets dropped from the
1888 // all_threads list so there's no other way to find it).
1889 appendToRunQueue(cap,t);
1892 // this cannot happen in the threaded RTS, because a
1893 // bound thread can only be run by the appropriate Task.
1894 barf("finished bound thread that isn't mine");
1898 ASSERT(task->tso == t);
1900 if (t->what_next == ThreadComplete) {
1902 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1903 *(task->ret) = (StgClosure *)task->tso->sp[1];
1905 task->stat = Success;
1908 *(task->ret) = NULL;
1910 if (sched_state >= SCHED_INTERRUPTING) {
1911 task->stat = Interrupted;
1913 task->stat = Killed;
1917 removeThreadLabel((StgWord)task->tso->id);
1919 return rtsTrue; // tells schedule() to return
1925 /* -----------------------------------------------------------------------------
1926 * Perform a heap census, if PROFILING
1927 * -------------------------------------------------------------------------- */
1930 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1932 #if defined(PROFILING)
1933 // When we have +RTS -i0 and we're heap profiling, do a census at
1934 // every GC. This lets us get repeatable runs for debugging.
1935 if (performHeapProfile ||
1936 (RtsFlags.ProfFlags.profileInterval==0 &&
1937 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1939 // checking black holes is necessary before GC, otherwise
1940 // there may be threads that are unreachable except by the
1941 // blackhole queue, which the GC will consider to be
1943 scheduleCheckBlackHoles(&MainCapability);
1945 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1946 GarbageCollect(GetRoots, rtsTrue);
1948 debugTrace(DEBUG_sched, "performing heap census");
1951 performHeapProfile = rtsFalse;
1952 return rtsTrue; // true <=> we already GC'd
1958 /* -----------------------------------------------------------------------------
1959 * Perform a garbage collection if necessary
1960 * -------------------------------------------------------------------------- */
1963 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1964 rtsBool force_major, void (*get_roots)(evac_fn))
1968 static volatile StgWord waiting_for_gc;
1969 rtsBool was_waiting;
1974 // In order to GC, there must be no threads running Haskell code.
1975 // Therefore, the GC thread needs to hold *all* the capabilities,
1976 // and release them after the GC has completed.
1978 // This seems to be the simplest way: previous attempts involved
1979 // making all the threads with capabilities give up their
1980 // capabilities and sleep except for the *last* one, which
1981 // actually did the GC. But it's quite hard to arrange for all
1982 // the other tasks to sleep and stay asleep.
1985 was_waiting = cas(&waiting_for_gc, 0, 1);
1988 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1989 if (cap) yieldCapability(&cap,task);
1990 } while (waiting_for_gc);
1991 return cap; // NOTE: task->cap might have changed here
1994 for (i=0; i < n_capabilities; i++) {
1995 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1996 if (cap != &capabilities[i]) {
1997 Capability *pcap = &capabilities[i];
1998 // we better hope this task doesn't get migrated to
1999 // another Capability while we're waiting for this one.
2000 // It won't, because load balancing happens while we have
2001 // all the Capabilities, but even so it's a slightly
2002 // unsavoury invariant.
2005 waitForReturnCapability(&pcap, task);
2006 if (pcap != &capabilities[i]) {
2007 barf("scheduleDoGC: got the wrong capability");
2012 waiting_for_gc = rtsFalse;
2015 /* Kick any transactions which are invalid back to their
2016 * atomically frames. When next scheduled they will try to
2017 * commit, this commit will fail and they will retry.
2022 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2023 if (t->what_next == ThreadRelocated) {
2026 next = t->global_link;
2027 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2028 if (!stmValidateNestOfTransactions (t -> trec)) {
2029 debugTrace(DEBUG_sched | DEBUG_stm,
2030 "trec %p found wasting its time", t);
2032 // strip the stack back to the
2033 // ATOMICALLY_FRAME, aborting the (nested)
2034 // transaction, and saving the stack of any
2035 // partially-evaluated thunks on the heap.
2036 raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2039 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2047 // so this happens periodically:
2048 if (cap) scheduleCheckBlackHoles(cap);
2050 IF_DEBUG(scheduler, printAllThreads());
2053 * We now have all the capabilities; if we're in an interrupting
2054 * state, then we should take the opportunity to delete all the
2055 * threads in the system.
2057 if (sched_state >= SCHED_INTERRUPTING) {
2058 deleteAllThreads(&capabilities[0]);
2059 sched_state = SCHED_SHUTTING_DOWN;
2062 /* everybody back, start the GC.
2063 * Could do it in this thread, or signal a condition var
2064 * to do it in another thread. Either way, we need to
2065 * broadcast on gc_pending_cond afterward.
2067 #if defined(THREADED_RTS)
2068 debugTrace(DEBUG_sched, "doing GC");
2070 GarbageCollect(get_roots, force_major);
2072 #if defined(THREADED_RTS)
2073 // release our stash of capabilities.
2074 for (i = 0; i < n_capabilities; i++) {
2075 if (cap != &capabilities[i]) {
2076 task->cap = &capabilities[i];
2077 releaseCapability(&capabilities[i]);
2088 /* add a ContinueThread event to continue execution of current thread */
2089 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2091 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2093 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2101 /* ---------------------------------------------------------------------------
2102 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2103 * used by Control.Concurrent for error checking.
2104 * ------------------------------------------------------------------------- */
2107 rtsSupportsBoundThreads(void)
2109 #if defined(THREADED_RTS)
2116 /* ---------------------------------------------------------------------------
2117 * isThreadBound(tso): check whether tso is bound to an OS thread.
2118 * ------------------------------------------------------------------------- */
2121 isThreadBound(StgTSO* tso USED_IF_THREADS)
2123 #if defined(THREADED_RTS)
2124 return (tso->bound != NULL);
2129 /* ---------------------------------------------------------------------------
2130 * Singleton fork(). Do not copy any running threads.
2131 * ------------------------------------------------------------------------- */
2133 #if !defined(mingw32_HOST_OS)
2134 #define FORKPROCESS_PRIMOP_SUPPORTED
2137 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2139 deleteThread_(Capability *cap, StgTSO *tso);
2142 forkProcess(HsStablePtr *entry
2143 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2148 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2154 #if defined(THREADED_RTS)
2155 if (RtsFlags.ParFlags.nNodes > 1) {
2156 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2157 stg_exit(EXIT_FAILURE);
2161 debugTrace(DEBUG_sched, "forking!");
2163 // ToDo: for SMP, we should probably acquire *all* the capabilities
2168 if (pid) { // parent
2170 // just return the pid
2176 // Now, all OS threads except the thread that forked are
2177 // stopped. We need to stop all Haskell threads, including
2178 // those involved in foreign calls. Also we need to delete
2179 // all Tasks, because they correspond to OS threads that are
2182 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2183 if (t->what_next == ThreadRelocated) {
2186 next = t->global_link;
2187 // don't allow threads to catch the ThreadKilled
2188 // exception, but we do want to raiseAsync() because these
2189 // threads may be evaluating thunks that we need later.
2190 deleteThread_(cap,t);
2194 // Empty the run queue. It seems tempting to let all the
2195 // killed threads stay on the run queue as zombies to be
2196 // cleaned up later, but some of them correspond to bound
2197 // threads for which the corresponding Task does not exist.
2198 cap->run_queue_hd = END_TSO_QUEUE;
2199 cap->run_queue_tl = END_TSO_QUEUE;
2201 // Any suspended C-calling Tasks are no more, their OS threads
2203 cap->suspended_ccalling_tasks = NULL;
2205 // Empty the all_threads list. Otherwise, the garbage
2206 // collector may attempt to resurrect some of these threads.
2207 all_threads = END_TSO_QUEUE;
2209 // Wipe the task list, except the current Task.
2210 ACQUIRE_LOCK(&sched_mutex);
2211 for (task = all_tasks; task != NULL; task=task->all_link) {
2212 if (task != cap->running_task) {
2216 RELEASE_LOCK(&sched_mutex);
2218 #if defined(THREADED_RTS)
2219 // Wipe our spare workers list, they no longer exist. New
2220 // workers will be created if necessary.
2221 cap->spare_workers = NULL;
2222 cap->returning_tasks_hd = NULL;
2223 cap->returning_tasks_tl = NULL;
2226 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2227 rts_checkSchedStatus("forkProcess",cap);
2230 hs_exit(); // clean up and exit
2231 stg_exit(EXIT_SUCCESS);
2233 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2234 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2239 /* ---------------------------------------------------------------------------
2240 * Delete all the threads in the system
2241 * ------------------------------------------------------------------------- */
2244 deleteAllThreads ( Capability *cap )
2247 debugTrace(DEBUG_sched,"deleting all threads");
2248 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2249 if (t->what_next == ThreadRelocated) {
2252 next = t->global_link;
2253 deleteThread(cap,t);
2257 // The run queue now contains a bunch of ThreadKilled threads. We
2258 // must not throw these away: the main thread(s) will be in there
2259 // somewhere, and the main scheduler loop has to deal with it.
2260 // Also, the run queue is the only thing keeping these threads from
2261 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2263 #if !defined(THREADED_RTS)
2264 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2265 ASSERT(sleeping_queue == END_TSO_QUEUE);
2269 /* -----------------------------------------------------------------------------
2270 Managing the suspended_ccalling_tasks list.
2271 Locks required: sched_mutex
2272 -------------------------------------------------------------------------- */
2275 suspendTask (Capability *cap, Task *task)
2277 ASSERT(task->next == NULL && task->prev == NULL);
2278 task->next = cap->suspended_ccalling_tasks;
2280 if (cap->suspended_ccalling_tasks) {
2281 cap->suspended_ccalling_tasks->prev = task;
2283 cap->suspended_ccalling_tasks = task;
2287 recoverSuspendedTask (Capability *cap, Task *task)
2290 task->prev->next = task->next;
2292 ASSERT(cap->suspended_ccalling_tasks == task);
2293 cap->suspended_ccalling_tasks = task->next;
2296 task->next->prev = task->prev;
2298 task->next = task->prev = NULL;
2301 /* ---------------------------------------------------------------------------
2302 * Suspending & resuming Haskell threads.
2304 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2305 * its capability before calling the C function. This allows another
2306 * task to pick up the capability and carry on running Haskell
2307 * threads. It also means that if the C call blocks, it won't lock
2310 * The Haskell thread making the C call is put to sleep for the
2311 * duration of the call, on the susepended_ccalling_threads queue. We
2312 * give out a token to the task, which it can use to resume the thread
2313 * on return from the C function.
2314 * ------------------------------------------------------------------------- */
2317 suspendThread (StgRegTable *reg)
2320 int saved_errno = errno;
2324 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2326 cap = regTableToCapability(reg);
2328 task = cap->running_task;
2329 tso = cap->r.rCurrentTSO;
2331 debugTrace(DEBUG_sched,
2332 "thread %d did a safe foreign call",
2333 cap->r.rCurrentTSO->id);
2335 // XXX this might not be necessary --SDM
2336 tso->what_next = ThreadRunGHC;
2338 threadPaused(cap,tso);
2340 if(tso->blocked_exceptions == NULL) {
2341 tso->why_blocked = BlockedOnCCall;
2342 tso->blocked_exceptions = END_TSO_QUEUE;
2344 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2347 // Hand back capability
2348 task->suspended_tso = tso;
2350 ACQUIRE_LOCK(&cap->lock);
2352 suspendTask(cap,task);
2353 cap->in_haskell = rtsFalse;
2354 releaseCapability_(cap);
2356 RELEASE_LOCK(&cap->lock);
2358 #if defined(THREADED_RTS)
2359 /* Preparing to leave the RTS, so ensure there's a native thread/task
2360 waiting to take over.
2362 debugTrace(DEBUG_sched, "thread %d: leaving RTS", tso->id);
2365 errno = saved_errno;
2370 resumeThread (void *task_)
2374 int saved_errno = errno;
2378 // Wait for permission to re-enter the RTS with the result.
2379 waitForReturnCapability(&cap,task);
2380 // we might be on a different capability now... but if so, our
2381 // entry on the suspended_ccalling_tasks list will also have been
2384 // Remove the thread from the suspended list
2385 recoverSuspendedTask(cap,task);
2387 tso = task->suspended_tso;
2388 task->suspended_tso = NULL;
2389 tso->link = END_TSO_QUEUE;
2390 debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id);
2392 if (tso->why_blocked == BlockedOnCCall) {
2393 awakenBlockedQueue(cap,tso->blocked_exceptions);
2394 tso->blocked_exceptions = NULL;
2397 /* Reset blocking status */
2398 tso->why_blocked = NotBlocked;
2400 cap->r.rCurrentTSO = tso;
2401 cap->in_haskell = rtsTrue;
2402 errno = saved_errno;
2404 /* We might have GC'd, mark the TSO dirty again */
2407 IF_DEBUG(sanity, checkTSO(tso));
2412 /* ---------------------------------------------------------------------------
2413 * Comparing Thread ids.
2415 * This is used from STG land in the implementation of the
2416 * instances of Eq/Ord for ThreadIds.
2417 * ------------------------------------------------------------------------ */
2420 cmp_thread(StgPtr tso1, StgPtr tso2)
2422 StgThreadID id1 = ((StgTSO *)tso1)->id;
2423 StgThreadID id2 = ((StgTSO *)tso2)->id;
2425 if (id1 < id2) return (-1);
2426 if (id1 > id2) return 1;
2430 /* ---------------------------------------------------------------------------
2431 * Fetching the ThreadID from an StgTSO.
2433 * This is used in the implementation of Show for ThreadIds.
2434 * ------------------------------------------------------------------------ */
2436 rts_getThreadId(StgPtr tso)
2438 return ((StgTSO *)tso)->id;
2443 labelThread(StgPtr tso, char *label)
2448 /* Caveat: Once set, you can only set the thread name to "" */
2449 len = strlen(label)+1;
2450 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2451 strncpy(buf,label,len);
2452 /* Update will free the old memory for us */
2453 updateThreadLabel(((StgTSO *)tso)->id,buf);
2457 /* ---------------------------------------------------------------------------
2458 Create a new thread.
2460 The new thread starts with the given stack size. Before the
2461 scheduler can run, however, this thread needs to have a closure
2462 (and possibly some arguments) pushed on its stack. See
2463 pushClosure() in Schedule.h.
2465 createGenThread() and createIOThread() (in SchedAPI.h) are
2466 convenient packaged versions of this function.
2468 currently pri (priority) is only used in a GRAN setup -- HWL
2469 ------------------------------------------------------------------------ */
2471 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2473 createThread(nat size, StgInt pri)
2476 createThread(Capability *cap, nat size)
2482 /* sched_mutex is *not* required */
2484 /* First check whether we should create a thread at all */
2485 #if defined(PARALLEL_HASKELL)
2486 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2487 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2489 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2490 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2491 return END_TSO_QUEUE;
2497 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2500 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2502 /* catch ridiculously small stack sizes */
2503 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2504 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2507 stack_size = size - TSO_STRUCT_SIZEW;
2509 tso = (StgTSO *)allocateLocal(cap, size);
2510 TICK_ALLOC_TSO(stack_size, 0);
2512 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2514 SET_GRAN_HDR(tso, ThisPE);
2517 // Always start with the compiled code evaluator
2518 tso->what_next = ThreadRunGHC;
2520 tso->why_blocked = NotBlocked;
2521 tso->blocked_exceptions = NULL;
2522 tso->flags = TSO_DIRTY;
2524 tso->saved_errno = 0;
2528 tso->stack_size = stack_size;
2529 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2531 tso->sp = (P_)&(tso->stack) + stack_size;
2533 tso->trec = NO_TREC;
2536 tso->prof.CCCS = CCS_MAIN;
2539 /* put a stop frame on the stack */
2540 tso->sp -= sizeofW(StgStopFrame);
2541 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2542 tso->link = END_TSO_QUEUE;
2546 /* uses more flexible routine in GranSim */
2547 insertThread(tso, CurrentProc);
2549 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2555 if (RtsFlags.GranFlags.GranSimStats.Full)
2556 DumpGranEvent(GR_START,tso);
2557 #elif defined(PARALLEL_HASKELL)
2558 if (RtsFlags.ParFlags.ParStats.Full)
2559 DumpGranEvent(GR_STARTQ,tso);
2560 /* HACk to avoid SCHEDULE
2564 /* Link the new thread on the global thread list.
2566 ACQUIRE_LOCK(&sched_mutex);
2567 tso->id = next_thread_id++; // while we have the mutex
2568 tso->global_link = all_threads;
2570 RELEASE_LOCK(&sched_mutex);
2573 tso->dist.priority = MandatoryPriority; //by default that is...
2577 tso->gran.pri = pri;
2579 tso->gran.magic = TSO_MAGIC; // debugging only
2581 tso->gran.sparkname = 0;
2582 tso->gran.startedat = CURRENT_TIME;
2583 tso->gran.exported = 0;
2584 tso->gran.basicblocks = 0;
2585 tso->gran.allocs = 0;
2586 tso->gran.exectime = 0;
2587 tso->gran.fetchtime = 0;
2588 tso->gran.fetchcount = 0;
2589 tso->gran.blocktime = 0;
2590 tso->gran.blockcount = 0;
2591 tso->gran.blockedat = 0;
2592 tso->gran.globalsparks = 0;
2593 tso->gran.localsparks = 0;
2594 if (RtsFlags.GranFlags.Light)
2595 tso->gran.clock = Now; /* local clock */
2597 tso->gran.clock = 0;
2599 IF_DEBUG(gran,printTSO(tso));
2600 #elif defined(PARALLEL_HASKELL)
2602 tso->par.magic = TSO_MAGIC; // debugging only
2604 tso->par.sparkname = 0;
2605 tso->par.startedat = CURRENT_TIME;
2606 tso->par.exported = 0;
2607 tso->par.basicblocks = 0;
2608 tso->par.allocs = 0;
2609 tso->par.exectime = 0;
2610 tso->par.fetchtime = 0;
2611 tso->par.fetchcount = 0;
2612 tso->par.blocktime = 0;
2613 tso->par.blockcount = 0;
2614 tso->par.blockedat = 0;
2615 tso->par.globalsparks = 0;
2616 tso->par.localsparks = 0;
2620 globalGranStats.tot_threads_created++;
2621 globalGranStats.threads_created_on_PE[CurrentProc]++;
2622 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2623 globalGranStats.tot_sq_probes++;
2624 #elif defined(PARALLEL_HASKELL)
2625 // collect parallel global statistics (currently done together with GC stats)
2626 if (RtsFlags.ParFlags.ParStats.Global &&
2627 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2628 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2629 globalParStats.tot_threads_created++;
2634 debugTrace(GRAN_DEBUG_pri,
2635 "==__ schedule: Created TSO %d (%p);",
2636 CurrentProc, tso, tso->id);
2637 #elif defined(PARALLEL_HASKELL)
2638 debugTrace(PAR_DEBUG_verbose,
2639 "==__ schedule: Created TSO %d (%p); %d threads active",
2640 (long)tso->id, tso, advisory_thread_count);
2642 debugTrace(DEBUG_sched,
2643 "created thread %ld, stack size = %lx words",
2644 (long)tso->id, (long)tso->stack_size);
2651 all parallel thread creation calls should fall through the following routine.
2654 createThreadFromSpark(rtsSpark spark)
2656 ASSERT(spark != (rtsSpark)NULL);
2657 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2658 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2660 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2661 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2662 return END_TSO_QUEUE;
2666 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2667 if (tso==END_TSO_QUEUE)
2668 barf("createSparkThread: Cannot create TSO");
2670 tso->priority = AdvisoryPriority;
2672 pushClosure(tso,spark);
2674 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2681 Turn a spark into a thread.
2682 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2686 activateSpark (rtsSpark spark)
2690 tso = createSparkThread(spark);
2691 if (RtsFlags.ParFlags.ParStats.Full) {
2692 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2693 IF_PAR_DEBUG(verbose,
2694 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2695 (StgClosure *)spark, info_type((StgClosure *)spark)));
2697 // ToDo: fwd info on local/global spark to thread -- HWL
2698 // tso->gran.exported = spark->exported;
2699 // tso->gran.locked = !spark->global;
2700 // tso->gran.sparkname = spark->name;
2706 /* ---------------------------------------------------------------------------
2709 * scheduleThread puts a thread on the end of the runnable queue.
2710 * This will usually be done immediately after a thread is created.
2711 * The caller of scheduleThread must create the thread using e.g.
2712 * createThread and push an appropriate closure
2713 * on this thread's stack before the scheduler is invoked.
2714 * ------------------------------------------------------------------------ */
2717 scheduleThread(Capability *cap, StgTSO *tso)
2719 // The thread goes at the *end* of the run-queue, to avoid possible
2720 // starvation of any threads already on the queue.
2721 appendToRunQueue(cap,tso);
2725 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2727 #if defined(THREADED_RTS)
2728 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2729 // move this thread from now on.
2730 cpu %= RtsFlags.ParFlags.nNodes;
2731 if (cpu == cap->no) {
2732 appendToRunQueue(cap,tso);
2734 Capability *target_cap = &capabilities[cpu];
2736 tso->bound->cap = target_cap;
2738 tso->cap = target_cap;
2739 wakeupThreadOnCapability(target_cap,tso);
2742 appendToRunQueue(cap,tso);
2747 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2751 // We already created/initialised the Task
2752 task = cap->running_task;
2754 // This TSO is now a bound thread; make the Task and TSO
2755 // point to each other.
2761 task->stat = NoStatus;
2763 appendToRunQueue(cap,tso);
2765 debugTrace(DEBUG_sched, "new bound thread (%d)", tso->id);
2768 /* GranSim specific init */
2769 CurrentTSO = m->tso; // the TSO to run
2770 procStatus[MainProc] = Busy; // status of main PE
2771 CurrentProc = MainProc; // PE to run it on
2774 cap = schedule(cap,task);
2776 ASSERT(task->stat != NoStatus);
2777 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2779 debugTrace(DEBUG_sched, "bound thread (%d) finished", task->tso->id);
2783 /* ----------------------------------------------------------------------------
2785 * ------------------------------------------------------------------------- */
2787 #if defined(THREADED_RTS)
2789 workerStart(Task *task)
2793 // See startWorkerTask().
2794 ACQUIRE_LOCK(&task->lock);
2796 RELEASE_LOCK(&task->lock);
2798 // set the thread-local pointer to the Task:
2801 // schedule() runs without a lock.
2802 cap = schedule(cap,task);
2804 // On exit from schedule(), we have a Capability.
2805 releaseCapability(cap);
2806 workerTaskStop(task);
2810 /* ---------------------------------------------------------------------------
2813 * Initialise the scheduler. This resets all the queues - if the
2814 * queues contained any threads, they'll be garbage collected at the
2817 * ------------------------------------------------------------------------ */
2824 for (i=0; i<=MAX_PROC; i++) {
2825 run_queue_hds[i] = END_TSO_QUEUE;
2826 run_queue_tls[i] = END_TSO_QUEUE;
2827 blocked_queue_hds[i] = END_TSO_QUEUE;
2828 blocked_queue_tls[i] = END_TSO_QUEUE;
2829 ccalling_threadss[i] = END_TSO_QUEUE;
2830 blackhole_queue[i] = END_TSO_QUEUE;
2831 sleeping_queue = END_TSO_QUEUE;
2833 #elif !defined(THREADED_RTS)
2834 blocked_queue_hd = END_TSO_QUEUE;
2835 blocked_queue_tl = END_TSO_QUEUE;
2836 sleeping_queue = END_TSO_QUEUE;
2839 blackhole_queue = END_TSO_QUEUE;
2840 all_threads = END_TSO_QUEUE;
2843 sched_state = SCHED_RUNNING;
2845 RtsFlags.ConcFlags.ctxtSwitchTicks =
2846 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2848 #if defined(THREADED_RTS)
2849 /* Initialise the mutex and condition variables used by
2851 initMutex(&sched_mutex);
2854 ACQUIRE_LOCK(&sched_mutex);
2856 /* A capability holds the state a native thread needs in
2857 * order to execute STG code. At least one capability is
2858 * floating around (only THREADED_RTS builds have more than one).
2864 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2868 #if defined(THREADED_RTS)
2870 * Eagerly start one worker to run each Capability, except for
2871 * Capability 0. The idea is that we're probably going to start a
2872 * bound thread on Capability 0 pretty soon, so we don't want a
2873 * worker task hogging it.
2878 for (i = 1; i < n_capabilities; i++) {
2879 cap = &capabilities[i];
2880 ACQUIRE_LOCK(&cap->lock);
2881 startWorkerTask(cap, workerStart);
2882 RELEASE_LOCK(&cap->lock);
2887 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2889 RELEASE_LOCK(&sched_mutex);
2893 exitScheduler( void )
2897 #if defined(THREADED_RTS)
2898 ACQUIRE_LOCK(&sched_mutex);
2899 task = newBoundTask();
2900 RELEASE_LOCK(&sched_mutex);
2903 // If we haven't killed all the threads yet, do it now.
2904 if (sched_state < SCHED_SHUTTING_DOWN) {
2905 sched_state = SCHED_INTERRUPTING;
2906 scheduleDoGC(NULL,task,rtsFalse,GetRoots);
2908 sched_state = SCHED_SHUTTING_DOWN;
2910 #if defined(THREADED_RTS)
2914 for (i = 0; i < n_capabilities; i++) {
2915 shutdownCapability(&capabilities[i], task);
2917 boundTaskExiting(task);
2923 /* ---------------------------------------------------------------------------
2924 Where are the roots that we know about?
2926 - all the threads on the runnable queue
2927 - all the threads on the blocked queue
2928 - all the threads on the sleeping queue
2929 - all the thread currently executing a _ccall_GC
2930 - all the "main threads"
2932 ------------------------------------------------------------------------ */
2934 /* This has to be protected either by the scheduler monitor, or by the
2935 garbage collection monitor (probably the latter).
2940 GetRoots( evac_fn evac )
2947 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2948 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2949 evac((StgClosure **)&run_queue_hds[i]);
2950 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2951 evac((StgClosure **)&run_queue_tls[i]);
2953 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2954 evac((StgClosure **)&blocked_queue_hds[i]);
2955 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2956 evac((StgClosure **)&blocked_queue_tls[i]);
2957 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2958 evac((StgClosure **)&ccalling_threads[i]);
2965 for (i = 0; i < n_capabilities; i++) {
2966 cap = &capabilities[i];
2967 evac((StgClosure **)(void *)&cap->run_queue_hd);
2968 evac((StgClosure **)(void *)&cap->run_queue_tl);
2969 #if defined(THREADED_RTS)
2970 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2971 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2973 for (task = cap->suspended_ccalling_tasks; task != NULL;
2975 debugTrace(DEBUG_sched,
2976 "evac'ing suspended TSO %d", task->suspended_tso->id);
2977 evac((StgClosure **)(void *)&task->suspended_tso);
2983 #if !defined(THREADED_RTS)
2984 evac((StgClosure **)(void *)&blocked_queue_hd);
2985 evac((StgClosure **)(void *)&blocked_queue_tl);
2986 evac((StgClosure **)(void *)&sleeping_queue);
2990 // evac((StgClosure **)&blackhole_queue);
2992 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2993 markSparkQueue(evac);
2996 #if defined(RTS_USER_SIGNALS)
2997 // mark the signal handlers (signals should be already blocked)
2998 markSignalHandlers(evac);
3002 /* -----------------------------------------------------------------------------
3005 This is the interface to the garbage collector from Haskell land.
3006 We provide this so that external C code can allocate and garbage
3007 collect when called from Haskell via _ccall_GC.
3009 It might be useful to provide an interface whereby the programmer
3010 can specify more roots (ToDo).
3012 This needs to be protected by the GC condition variable above. KH.
3013 -------------------------------------------------------------------------- */
3015 static void (*extra_roots)(evac_fn);
3018 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3021 // We must grab a new Task here, because the existing Task may be
3022 // associated with a particular Capability, and chained onto the
3023 // suspended_ccalling_tasks queue.
3024 ACQUIRE_LOCK(&sched_mutex);
3025 task = newBoundTask();
3026 RELEASE_LOCK(&sched_mutex);
3027 scheduleDoGC(NULL,task,force_major, get_roots);
3028 boundTaskExiting(task);
3034 performGC_(rtsFalse, GetRoots);
3038 performMajorGC(void)
3040 performGC_(rtsTrue, GetRoots);
3044 AllRoots(evac_fn evac)
3046 GetRoots(evac); // the scheduler's roots
3047 extra_roots(evac); // the user's roots
3051 performGCWithRoots(void (*get_roots)(evac_fn))
3053 extra_roots = get_roots;
3054 performGC_(rtsFalse, AllRoots);
3057 /* -----------------------------------------------------------------------------
3060 If the thread has reached its maximum stack size, then raise the
3061 StackOverflow exception in the offending thread. Otherwise
3062 relocate the TSO into a larger chunk of memory and adjust its stack
3064 -------------------------------------------------------------------------- */
3067 threadStackOverflow(Capability *cap, StgTSO *tso)
3069 nat new_stack_size, stack_words;
3074 IF_DEBUG(sanity,checkTSO(tso));
3075 if (tso->stack_size >= tso->max_stack_size) {
3077 debugTrace(DEBUG_gc,
3078 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3079 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3081 /* If we're debugging, just print out the top of the stack */
3082 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
3085 /* Send this thread the StackOverflow exception */
3086 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3090 /* Try to double the current stack size. If that takes us over the
3091 * maximum stack size for this thread, then use the maximum instead.
3092 * Finally round up so the TSO ends up as a whole number of blocks.
3094 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3095 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
3096 TSO_STRUCT_SIZE)/sizeof(W_);
3097 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
3098 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3100 debugTrace(DEBUG_sched,
3101 "increasing stack size from %ld words to %d.\n",
3102 (long)tso->stack_size, new_stack_size);
3104 dest = (StgTSO *)allocate(new_tso_size);
3105 TICK_ALLOC_TSO(new_stack_size,0);
3107 /* copy the TSO block and the old stack into the new area */
3108 memcpy(dest,tso,TSO_STRUCT_SIZE);
3109 stack_words = tso->stack + tso->stack_size - tso->sp;
3110 new_sp = (P_)dest + new_tso_size - stack_words;
3111 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3113 /* relocate the stack pointers... */
3115 dest->stack_size = new_stack_size;
3117 /* Mark the old TSO as relocated. We have to check for relocated
3118 * TSOs in the garbage collector and any primops that deal with TSOs.
3120 * It's important to set the sp value to just beyond the end
3121 * of the stack, so we don't attempt to scavenge any part of the
3124 tso->what_next = ThreadRelocated;
3126 tso->sp = (P_)&(tso->stack[tso->stack_size]);
3127 tso->why_blocked = NotBlocked;
3129 IF_PAR_DEBUG(verbose,
3130 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3131 tso->id, tso, tso->stack_size);
3132 /* If we're debugging, just print out the top of the stack */
3133 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
3136 IF_DEBUG(sanity,checkTSO(tso));
3138 IF_DEBUG(scheduler,printTSO(dest));
3144 /* ---------------------------------------------------------------------------
3145 Wake up a queue that was blocked on some resource.
3146 ------------------------------------------------------------------------ */
3150 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3153 #elif defined(PARALLEL_HASKELL)
3155 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3157 /* write RESUME events to log file and
3158 update blocked and fetch time (depending on type of the orig closure) */
3159 if (RtsFlags.ParFlags.ParStats.Full) {
3160 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
3161 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3162 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3163 if (emptyRunQueue())
3164 emitSchedule = rtsTrue;
3166 switch (get_itbl(node)->type) {
3168 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3173 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3180 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3187 StgBlockingQueueElement *
3188 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3191 PEs node_loc, tso_loc;
3193 node_loc = where_is(node); // should be lifted out of loop
3194 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3195 tso_loc = where_is((StgClosure *)tso);
3196 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3197 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3198 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3199 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3200 // insertThread(tso, node_loc);
3201 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3203 tso, node, (rtsSpark*)NULL);
3204 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3207 } else { // TSO is remote (actually should be FMBQ)
3208 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3209 RtsFlags.GranFlags.Costs.gunblocktime +
3210 RtsFlags.GranFlags.Costs.latency;
3211 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3213 tso, node, (rtsSpark*)NULL);
3214 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3217 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3219 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3220 (node_loc==tso_loc ? "Local" : "Global"),
3221 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3222 tso->block_info.closure = NULL;
3223 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)\n",
3226 #elif defined(PARALLEL_HASKELL)
3227 StgBlockingQueueElement *
3228 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3230 StgBlockingQueueElement *next;
3232 switch (get_itbl(bqe)->type) {
3234 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3235 /* if it's a TSO just push it onto the run_queue */
3237 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3238 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3240 unblockCount(bqe, node);
3241 /* reset blocking status after dumping event */
3242 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3246 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3248 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3249 PendingFetches = (StgBlockedFetch *)bqe;
3253 /* can ignore this case in a non-debugging setup;
3254 see comments on RBHSave closures above */
3256 /* check that the closure is an RBHSave closure */
3257 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3258 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3259 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3263 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3264 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3268 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3274 unblockOne(Capability *cap, StgTSO *tso)
3278 ASSERT(get_itbl(tso)->type == TSO);
3279 ASSERT(tso->why_blocked != NotBlocked);
3281 tso->why_blocked = NotBlocked;
3283 tso->link = END_TSO_QUEUE;
3285 #if defined(THREADED_RTS)
3286 if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3287 // We are waking up this thread on the current Capability, which
3288 // might involve migrating it from the Capability it was last on.
3290 ASSERT(tso->bound->cap == tso->cap);
3291 tso->bound->cap = cap;
3294 appendToRunQueue(cap,tso);
3295 // we're holding a newly woken thread, make sure we context switch
3296 // quickly so we can migrate it if necessary.
3299 // we'll try to wake it up on the Capability it was last on.
3300 wakeupThreadOnCapability(tso->cap, tso);
3303 appendToRunQueue(cap,tso);
3307 debugTrace(DEBUG_sched,
3308 "waking up thread %ld on cap %d",
3309 (long)tso->id, tso->cap->no);
3317 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3319 StgBlockingQueueElement *bqe;
3324 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3325 node, CurrentProc, CurrentTime[CurrentProc],
3326 CurrentTSO->id, CurrentTSO));
3328 node_loc = where_is(node);
3330 ASSERT(q == END_BQ_QUEUE ||
3331 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3332 get_itbl(q)->type == CONSTR); // closure (type constructor)
3333 ASSERT(is_unique(node));
3335 /* FAKE FETCH: magically copy the node to the tso's proc;
3336 no Fetch necessary because in reality the node should not have been
3337 moved to the other PE in the first place
3339 if (CurrentProc!=node_loc) {
3341 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3342 node, node_loc, CurrentProc, CurrentTSO->id,
3343 // CurrentTSO, where_is(CurrentTSO),
3344 node->header.gran.procs));
3345 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3347 debugBelch("## new bitmask of node %p is %#x\n",
3348 node, node->header.gran.procs));
3349 if (RtsFlags.GranFlags.GranSimStats.Global) {
3350 globalGranStats.tot_fake_fetches++;
3355 // ToDo: check: ASSERT(CurrentProc==node_loc);
3356 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3359 bqe points to the current element in the queue
3360 next points to the next element in the queue
3362 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3363 //tso_loc = where_is(tso);
3365 bqe = unblockOne(bqe, node);
3368 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3369 the closure to make room for the anchor of the BQ */
3370 if (bqe!=END_BQ_QUEUE) {
3371 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3373 ASSERT((info_ptr==&RBH_Save_0_info) ||
3374 (info_ptr==&RBH_Save_1_info) ||
3375 (info_ptr==&RBH_Save_2_info));
3377 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3378 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3379 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3382 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3383 node, info_type(node)));
3386 /* statistics gathering */
3387 if (RtsFlags.GranFlags.GranSimStats.Global) {
3388 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3389 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3390 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3391 globalGranStats.tot_awbq++; // total no. of bqs awakened
3394 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3395 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3397 #elif defined(PARALLEL_HASKELL)
3399 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3401 StgBlockingQueueElement *bqe;
3403 IF_PAR_DEBUG(verbose,
3404 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3408 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3409 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3414 ASSERT(q == END_BQ_QUEUE ||
3415 get_itbl(q)->type == TSO ||
3416 get_itbl(q)->type == BLOCKED_FETCH ||
3417 get_itbl(q)->type == CONSTR);
3420 while (get_itbl(bqe)->type==TSO ||
3421 get_itbl(bqe)->type==BLOCKED_FETCH) {
3422 bqe = unblockOne(bqe, node);
3426 #else /* !GRAN && !PARALLEL_HASKELL */
3429 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3431 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3433 while (tso != END_TSO_QUEUE) {
3434 tso = unblockOne(cap,tso);
3439 /* ---------------------------------------------------------------------------
3441 - usually called inside a signal handler so it mustn't do anything fancy.
3442 ------------------------------------------------------------------------ */
3445 interruptStgRts(void)
3447 sched_state = SCHED_INTERRUPTING;
3452 /* -----------------------------------------------------------------------------
3455 This function causes at least one OS thread to wake up and run the
3456 scheduler loop. It is invoked when the RTS might be deadlocked, or
3457 an external event has arrived that may need servicing (eg. a
3458 keyboard interrupt).
3460 In the single-threaded RTS we don't do anything here; we only have
3461 one thread anyway, and the event that caused us to want to wake up
3462 will have interrupted any blocking system call in progress anyway.
3463 -------------------------------------------------------------------------- */
3468 #if defined(THREADED_RTS)
3469 #if !defined(mingw32_HOST_OS)
3470 // This forces the IO Manager thread to wakeup, which will
3471 // in turn ensure that some OS thread wakes up and runs the
3472 // scheduler loop, which will cause a GC and deadlock check.
3475 // On Windows this might be safe enough, because we aren't
3476 // in a signal handler. Later we should use the IO Manager,
3478 prodOneCapability();
3483 /* -----------------------------------------------------------------------------
3486 This is for use when we raise an exception in another thread, which
3488 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3489 -------------------------------------------------------------------------- */
3491 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3493 NB: only the type of the blocking queue is different in GranSim and GUM
3494 the operations on the queue-elements are the same
3495 long live polymorphism!
3497 Locks: sched_mutex is held upon entry and exit.
3501 unblockThread(Capability *cap, StgTSO *tso)
3503 StgBlockingQueueElement *t, **last;
3505 switch (tso->why_blocked) {
3508 return; /* not blocked */
3511 // Be careful: nothing to do here! We tell the scheduler that the thread
3512 // is runnable and we leave it to the stack-walking code to abort the
3513 // transaction while unwinding the stack. We should perhaps have a debugging
3514 // test to make sure that this really happens and that the 'zombie' transaction
3515 // does not get committed.
3519 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3521 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3522 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3524 last = (StgBlockingQueueElement **)&mvar->head;
3525 for (t = (StgBlockingQueueElement *)mvar->head;
3527 last = &t->link, last_tso = t, t = t->link) {
3528 if (t == (StgBlockingQueueElement *)tso) {
3529 *last = (StgBlockingQueueElement *)tso->link;
3530 if (mvar->tail == tso) {
3531 mvar->tail = (StgTSO *)last_tso;
3536 barf("unblockThread (MVAR): TSO not found");
3539 case BlockedOnBlackHole:
3540 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3542 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3544 last = &bq->blocking_queue;
3545 for (t = bq->blocking_queue;
3547 last = &t->link, t = t->link) {
3548 if (t == (StgBlockingQueueElement *)tso) {
3549 *last = (StgBlockingQueueElement *)tso->link;
3553 barf("unblockThread (BLACKHOLE): TSO not found");
3556 case BlockedOnException:
3558 StgTSO *target = tso->block_info.tso;
3560 ASSERT(get_itbl(target)->type == TSO);
3562 if (target->what_next == ThreadRelocated) {
3563 target = target->link;
3564 ASSERT(get_itbl(target)->type == TSO);
3567 ASSERT(target->blocked_exceptions != NULL);
3569 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3570 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3572 last = &t->link, t = t->link) {
3573 ASSERT(get_itbl(t)->type == TSO);
3574 if (t == (StgBlockingQueueElement *)tso) {
3575 *last = (StgBlockingQueueElement *)tso->link;
3579 barf("unblockThread (Exception): TSO not found");
3583 case BlockedOnWrite:
3584 #if defined(mingw32_HOST_OS)
3585 case BlockedOnDoProc:
3588 /* take TSO off blocked_queue */
3589 StgBlockingQueueElement *prev = NULL;
3590 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3591 prev = t, t = t->link) {
3592 if (t == (StgBlockingQueueElement *)tso) {
3594 blocked_queue_hd = (StgTSO *)t->link;
3595 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3596 blocked_queue_tl = END_TSO_QUEUE;
3599 prev->link = t->link;
3600 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3601 blocked_queue_tl = (StgTSO *)prev;
3604 #if defined(mingw32_HOST_OS)
3605 /* (Cooperatively) signal that the worker thread should abort
3608 abandonWorkRequest(tso->block_info.async_result->reqID);
3613 barf("unblockThread (I/O): TSO not found");
3616 case BlockedOnDelay:
3618 /* take TSO off sleeping_queue */
3619 StgBlockingQueueElement *prev = NULL;
3620 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3621 prev = t, t = t->link) {
3622 if (t == (StgBlockingQueueElement *)tso) {
3624 sleeping_queue = (StgTSO *)t->link;
3626 prev->link = t->link;
3631 barf("unblockThread (delay): TSO not found");
3635 barf("unblockThread");
3639 tso->link = END_TSO_QUEUE;
3640 tso->why_blocked = NotBlocked;
3641 tso->block_info.closure = NULL;
3642 pushOnRunQueue(cap,tso);
3646 unblockThread(Capability *cap, StgTSO *tso)
3650 /* To avoid locking unnecessarily. */
3651 if (tso->why_blocked == NotBlocked) {
3655 switch (tso->why_blocked) {
3658 // Be careful: nothing to do here! We tell the scheduler that the thread
3659 // is runnable and we leave it to the stack-walking code to abort the
3660 // transaction while unwinding the stack. We should perhaps have a debugging
3661 // test to make sure that this really happens and that the 'zombie' transaction
3662 // does not get committed.
3666 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3668 StgTSO *last_tso = END_TSO_QUEUE;
3669 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3672 for (t = mvar->head; t != END_TSO_QUEUE;
3673 last = &t->link, last_tso = t, t = t->link) {
3676 if (mvar->tail == tso) {
3677 mvar->tail = last_tso;
3682 barf("unblockThread (MVAR): TSO not found");
3685 case BlockedOnBlackHole:
3687 last = &blackhole_queue;
3688 for (t = blackhole_queue; t != END_TSO_QUEUE;
3689 last = &t->link, t = t->link) {
3695 barf("unblockThread (BLACKHOLE): TSO not found");
3698 case BlockedOnException:
3700 StgTSO *target = tso->block_info.tso;
3702 ASSERT(get_itbl(target)->type == TSO);
3704 while (target->what_next == ThreadRelocated) {
3705 target = target->link;
3706 ASSERT(get_itbl(target)->type == TSO);
3709 ASSERT(target->blocked_exceptions != NULL);
3711 last = &target->blocked_exceptions;
3712 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3713 last = &t->link, t = t->link) {
3714 ASSERT(get_itbl(t)->type == TSO);
3720 barf("unblockThread (Exception): TSO not found");
3723 #if !defined(THREADED_RTS)
3725 case BlockedOnWrite:
3726 #if defined(mingw32_HOST_OS)
3727 case BlockedOnDoProc:
3730 StgTSO *prev = NULL;
3731 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3732 prev = t, t = t->link) {
3735 blocked_queue_hd = t->link;
3736 if (blocked_queue_tl == t) {
3737 blocked_queue_tl = END_TSO_QUEUE;
3740 prev->link = t->link;
3741 if (blocked_queue_tl == t) {
3742 blocked_queue_tl = prev;
3745 #if defined(mingw32_HOST_OS)
3746 /* (Cooperatively) signal that the worker thread should abort
3749 abandonWorkRequest(tso->block_info.async_result->reqID);
3754 barf("unblockThread (I/O): TSO not found");
3757 case BlockedOnDelay:
3759 StgTSO *prev = NULL;
3760 for (t = sleeping_queue; t != END_TSO_QUEUE;
3761 prev = t, t = t->link) {
3764 sleeping_queue = t->link;
3766 prev->link = t->link;
3771 barf("unblockThread (delay): TSO not found");
3776 barf("unblockThread");
3780 tso->link = END_TSO_QUEUE;
3781 tso->why_blocked = NotBlocked;
3782 tso->block_info.closure = NULL;
3783 appendToRunQueue(cap,tso);
3785 // We might have just migrated this TSO to our Capability:
3787 tso->bound->cap = cap;
3793 /* -----------------------------------------------------------------------------
3796 * Check the blackhole_queue for threads that can be woken up. We do
3797 * this periodically: before every GC, and whenever the run queue is
3800 * An elegant solution might be to just wake up all the blocked
3801 * threads with awakenBlockedQueue occasionally: they'll go back to
3802 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3803 * doesn't give us a way to tell whether we've actually managed to
3804 * wake up any threads, so we would be busy-waiting.
3806 * -------------------------------------------------------------------------- */
3809 checkBlackHoles (Capability *cap)
3812 rtsBool any_woke_up = rtsFalse;
3815 // blackhole_queue is global:
3816 ASSERT_LOCK_HELD(&sched_mutex);
3818 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
3820 // ASSUMES: sched_mutex
3821 prev = &blackhole_queue;
3822 t = blackhole_queue;
3823 while (t != END_TSO_QUEUE) {
3824 ASSERT(t->why_blocked == BlockedOnBlackHole);
3825 type = get_itbl(t->block_info.closure)->type;
3826 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3827 IF_DEBUG(sanity,checkTSO(t));
3828 t = unblockOne(cap, t);
3829 // urk, the threads migrate to the current capability
3830 // here, but we'd like to keep them on the original one.
3832 any_woke_up = rtsTrue;
3842 /* -----------------------------------------------------------------------------
3845 * The following function implements the magic for raising an
3846 * asynchronous exception in an existing thread.
3848 * We first remove the thread from any queue on which it might be
3849 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3851 * We strip the stack down to the innermost CATCH_FRAME, building
3852 * thunks in the heap for all the active computations, so they can
3853 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3854 * an application of the handler to the exception, and push it on
3855 * the top of the stack.
3857 * How exactly do we save all the active computations? We create an
3858 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3859 * AP_STACKs pushes everything from the corresponding update frame
3860 * upwards onto the stack. (Actually, it pushes everything up to the
3861 * next update frame plus a pointer to the next AP_STACK object.
3862 * Entering the next AP_STACK object pushes more onto the stack until we
3863 * reach the last AP_STACK object - at which point the stack should look
3864 * exactly as it did when we killed the TSO and we can continue
3865 * execution by entering the closure on top of the stack.
3867 * We can also kill a thread entirely - this happens if either (a) the
3868 * exception passed to raiseAsync is NULL, or (b) there's no
3869 * CATCH_FRAME on the stack. In either case, we strip the entire
3870 * stack and replace the thread with a zombie.
3872 * ToDo: in THREADED_RTS mode, this function is only safe if either
3873 * (a) we hold all the Capabilities (eg. in GC, or if there is only
3874 * one Capability), or (b) we own the Capability that the TSO is
3875 * currently blocked on or on the run queue of.
3877 * -------------------------------------------------------------------------- */
3880 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3882 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3886 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3888 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3892 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3893 rtsBool stop_at_atomically, StgPtr stop_here)
3895 StgRetInfoTable *info;
3899 // Thread already dead?
3900 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3904 debugTrace(DEBUG_sched,
3905 "raising exception in thread %ld.", (long)tso->id);
3907 // Remove it from any blocking queues
3908 unblockThread(cap,tso);
3910 // mark it dirty; we're about to change its stack.
3915 // The stack freezing code assumes there's a closure pointer on
3916 // the top of the stack, so we have to arrange that this is the case...
3918 if (sp[0] == (W_)&stg_enter_info) {
3922 sp[0] = (W_)&stg_dummy_ret_closure;
3926 while (stop_here == NULL || frame < stop_here) {
3928 // 1. Let the top of the stack be the "current closure"
3930 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3933 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3934 // current closure applied to the chunk of stack up to (but not
3935 // including) the update frame. This closure becomes the "current
3936 // closure". Go back to step 2.
3938 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3939 // top of the stack applied to the exception.
3941 // 5. If it's a STOP_FRAME, then kill the thread.
3943 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3946 info = get_ret_itbl((StgClosure *)frame);
3948 switch (info->i.type) {
3955 // First build an AP_STACK consisting of the stack chunk above the
3956 // current update frame, with the top word on the stack as the
3959 words = frame - sp - 1;
3960 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3963 ap->fun = (StgClosure *)sp[0];
3965 for(i=0; i < (nat)words; ++i) {
3966 ap->payload[i] = (StgClosure *)*sp++;
3969 SET_HDR(ap,&stg_AP_STACK_info,
3970 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3971 TICK_ALLOC_UP_THK(words+1,0);
3973 //IF_DEBUG(scheduler,
3974 // debugBelch("sched: Updating ");
3975 // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3976 // debugBelch(" with ");
3977 // printObj((StgClosure *)ap);
3980 // Replace the updatee with an indirection
3982 // Warning: if we're in a loop, more than one update frame on
3983 // the stack may point to the same object. Be careful not to
3984 // overwrite an IND_OLDGEN in this case, because we'll screw
3985 // up the mutable lists. To be on the safe side, don't
3986 // overwrite any kind of indirection at all. See also
3987 // threadSqueezeStack in GC.c, where we have to make a similar
3990 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3991 // revert the black hole
3992 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3995 sp += sizeofW(StgUpdateFrame) - 1;
3996 sp[0] = (W_)ap; // push onto stack
3998 continue; //no need to bump frame
4002 // We've stripped the entire stack, the thread is now dead.
4003 tso->what_next = ThreadKilled;
4004 tso->sp = frame + sizeofW(StgStopFrame);
4008 // If we find a CATCH_FRAME, and we've got an exception to raise,
4009 // then build the THUNK raise(exception), and leave it on
4010 // top of the CATCH_FRAME ready to enter.
4014 StgCatchFrame *cf = (StgCatchFrame *)frame;
4018 if (exception == NULL) break;
4020 // we've got an exception to raise, so let's pass it to the
4021 // handler in this frame.
4023 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4024 TICK_ALLOC_SE_THK(1,0);
4025 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
4026 raise->payload[0] = exception;
4028 // throw away the stack from Sp up to the CATCH_FRAME.
4032 /* Ensure that async excpetions are blocked now, so we don't get
4033 * a surprise exception before we get around to executing the
4036 if (tso->blocked_exceptions == NULL) {
4037 tso->blocked_exceptions = END_TSO_QUEUE;
4040 /* Put the newly-built THUNK on top of the stack, ready to execute
4041 * when the thread restarts.
4044 sp[-1] = (W_)&stg_enter_info;
4046 tso->what_next = ThreadRunGHC;
4047 IF_DEBUG(sanity, checkTSO(tso));
4051 case ATOMICALLY_FRAME:
4052 if (stop_at_atomically) {
4053 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
4054 stmCondemnTransaction(cap, tso -> trec);
4058 // R1 is not a register: the return convention for IO in
4059 // this case puts the return value on the stack, so we
4060 // need to set up the stack to return to the atomically
4061 // frame properly...
4062 tso->sp = frame - 2;
4063 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
4064 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
4066 tso->what_next = ThreadRunGHC;
4069 // Not stop_at_atomically... fall through and abort the
4072 case CATCH_RETRY_FRAME:
4073 // IF we find an ATOMICALLY_FRAME then we abort the
4074 // current transaction and propagate the exception. In
4075 // this case (unlike ordinary exceptions) we do not care
4076 // whether the transaction is valid or not because its
4077 // possible validity cannot have caused the exception
4078 // and will not be visible after the abort.
4079 debugTrace(DEBUG_stm,
4080 "found atomically block delivering async exception");
4082 StgTRecHeader *trec = tso -> trec;
4083 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4084 stmAbortTransaction(cap, trec);
4085 tso -> trec = outer;
4092 // move on to the next stack frame
4093 frame += stack_frame_sizeW((StgClosure *)frame);
4096 // if we got here, then we stopped at stop_here
4097 ASSERT(stop_here != NULL);
4100 /* -----------------------------------------------------------------------------
4103 This is used for interruption (^C) and forking, and corresponds to
4104 raising an exception but without letting the thread catch the
4106 -------------------------------------------------------------------------- */
4109 deleteThread (Capability *cap, StgTSO *tso)
4111 if (tso->why_blocked != BlockedOnCCall &&
4112 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4113 raiseAsync(cap,tso,NULL);
4117 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4119 deleteThread_(Capability *cap, StgTSO *tso)
4120 { // for forkProcess only:
4121 // like deleteThread(), but we delete threads in foreign calls, too.
4123 if (tso->why_blocked == BlockedOnCCall ||
4124 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4125 unblockOne(cap,tso);
4126 tso->what_next = ThreadKilled;
4128 deleteThread(cap,tso);
4133 /* -----------------------------------------------------------------------------
4134 raiseExceptionHelper
4136 This function is called by the raise# primitve, just so that we can
4137 move some of the tricky bits of raising an exception from C-- into
4138 C. Who knows, it might be a useful re-useable thing here too.
4139 -------------------------------------------------------------------------- */
4142 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4144 Capability *cap = regTableToCapability(reg);
4145 StgThunk *raise_closure = NULL;
4147 StgRetInfoTable *info;
4149 // This closure represents the expression 'raise# E' where E
4150 // is the exception raise. It is used to overwrite all the
4151 // thunks which are currently under evaluataion.
4154 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4155 // LDV profiling: stg_raise_info has THUNK as its closure
4156 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4157 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
4158 // 1 does not cause any problem unless profiling is performed.
4159 // However, when LDV profiling goes on, we need to linearly scan
4160 // small object pool, where raise_closure is stored, so we should
4161 // use MIN_UPD_SIZE.
4163 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4164 // sizeofW(StgClosure)+1);
4168 // Walk up the stack, looking for the catch frame. On the way,
4169 // we update any closures pointed to from update frames with the
4170 // raise closure that we just built.
4174 info = get_ret_itbl((StgClosure *)p);
4175 next = p + stack_frame_sizeW((StgClosure *)p);
4176 switch (info->i.type) {
4179 // Only create raise_closure if we need to.
4180 if (raise_closure == NULL) {
4182 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4183 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4184 raise_closure->payload[0] = exception;
4186 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4190 case ATOMICALLY_FRAME:
4191 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
4193 return ATOMICALLY_FRAME;
4199 case CATCH_STM_FRAME:
4200 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
4202 return CATCH_STM_FRAME;
4208 case CATCH_RETRY_FRAME:
4217 /* -----------------------------------------------------------------------------
4218 findRetryFrameHelper
4220 This function is called by the retry# primitive. It traverses the stack
4221 leaving tso->sp referring to the frame which should handle the retry.
4223 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
4224 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
4226 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4227 despite the similar implementation.
4229 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4230 not be created within memory transactions.
4231 -------------------------------------------------------------------------- */
4234 findRetryFrameHelper (StgTSO *tso)
4237 StgRetInfoTable *info;
4241 info = get_ret_itbl((StgClosure *)p);
4242 next = p + stack_frame_sizeW((StgClosure *)p);
4243 switch (info->i.type) {
4245 case ATOMICALLY_FRAME:
4246 debugTrace(DEBUG_stm,
4247 "found ATOMICALLY_FRAME at %p during retrry", p);
4249 return ATOMICALLY_FRAME;
4251 case CATCH_RETRY_FRAME:
4252 debugTrace(DEBUG_stm,
4253 "found CATCH_RETRY_FRAME at %p during retrry", p);
4255 return CATCH_RETRY_FRAME;
4257 case CATCH_STM_FRAME:
4259 ASSERT(info->i.type != CATCH_FRAME);
4260 ASSERT(info->i.type != STOP_FRAME);
4267 /* -----------------------------------------------------------------------------
4268 resurrectThreads is called after garbage collection on the list of
4269 threads found to be garbage. Each of these threads will be woken
4270 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4271 on an MVar, or NonTermination if the thread was blocked on a Black
4274 Locks: assumes we hold *all* the capabilities.
4275 -------------------------------------------------------------------------- */
4278 resurrectThreads (StgTSO *threads)
4283 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4284 next = tso->global_link;
4285 tso->global_link = all_threads;
4287 debugTrace(DEBUG_sched, "resurrecting thread %d", tso->id);
4289 // Wake up the thread on the Capability it was last on
4292 switch (tso->why_blocked) {
4294 case BlockedOnException:
4295 /* Called by GC - sched_mutex lock is currently held. */
4296 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4298 case BlockedOnBlackHole:
4299 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4302 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4305 /* This might happen if the thread was blocked on a black hole
4306 * belonging to a thread that we've just woken up (raiseAsync
4307 * can wake up threads, remember...).
4311 barf("resurrectThreads: thread blocked in a strange way");
4316 /* ----------------------------------------------------------------------------
4317 * Debugging: why is a thread blocked
4318 * [Also provides useful information when debugging threaded programs
4319 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4320 ------------------------------------------------------------------------- */
4324 printThreadBlockage(StgTSO *tso)
4326 switch (tso->why_blocked) {
4328 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4330 case BlockedOnWrite:
4331 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4333 #if defined(mingw32_HOST_OS)
4334 case BlockedOnDoProc:
4335 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4338 case BlockedOnDelay:
4339 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4342 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4344 case BlockedOnException:
4345 debugBelch("is blocked on delivering an exception to thread %d",
4346 tso->block_info.tso->id);
4348 case BlockedOnBlackHole:
4349 debugBelch("is blocked on a black hole");
4352 debugBelch("is not blocked");
4354 #if defined(PARALLEL_HASKELL)
4356 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4357 tso->block_info.closure, info_type(tso->block_info.closure));
4359 case BlockedOnGA_NoSend:
4360 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4361 tso->block_info.closure, info_type(tso->block_info.closure));
4364 case BlockedOnCCall:
4365 debugBelch("is blocked on an external call");
4367 case BlockedOnCCall_NoUnblockExc:
4368 debugBelch("is blocked on an external call (exceptions were already blocked)");
4371 debugBelch("is blocked on an STM operation");
4374 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4375 tso->why_blocked, tso->id, tso);
4380 printThreadStatus(StgTSO *t)
4382 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4384 void *label = lookupThreadLabel(t->id);
4385 if (label) debugBelch("[\"%s\"] ",(char *)label);
4387 if (t->what_next == ThreadRelocated) {
4388 debugBelch("has been relocated...\n");
4390 switch (t->what_next) {
4392 debugBelch("has been killed");
4394 case ThreadComplete:
4395 debugBelch("has completed");
4398 printThreadBlockage(t);
4405 printAllThreads(void)
4412 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4413 ullong_format_string(TIME_ON_PROC(CurrentProc),
4414 time_string, rtsFalse/*no commas!*/);
4416 debugBelch("all threads at [%s]:\n", time_string);
4417 # elif defined(PARALLEL_HASKELL)
4418 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4419 ullong_format_string(CURRENT_TIME,
4420 time_string, rtsFalse/*no commas!*/);
4422 debugBelch("all threads at [%s]:\n", time_string);
4424 debugBelch("all threads:\n");
4427 for (i = 0; i < n_capabilities; i++) {
4428 cap = &capabilities[i];
4429 debugBelch("threads on capability %d:\n", cap->no);
4430 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4431 printThreadStatus(t);
4435 debugBelch("other threads:\n");
4436 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4437 if (t->why_blocked != NotBlocked) {
4438 printThreadStatus(t);
4440 if (t->what_next == ThreadRelocated) {
4443 next = t->global_link;
4450 printThreadQueue(StgTSO *t)
4453 for (; t != END_TSO_QUEUE; t = t->link) {
4454 printThreadStatus(t);
4457 debugBelch("%d threads on queue\n", i);
4461 Print a whole blocking queue attached to node (debugging only).
4463 # if defined(PARALLEL_HASKELL)
4465 print_bq (StgClosure *node)
4467 StgBlockingQueueElement *bqe;
4471 debugBelch("## BQ of closure %p (%s): ",
4472 node, info_type(node));
4474 /* should cover all closures that may have a blocking queue */
4475 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4476 get_itbl(node)->type == FETCH_ME_BQ ||
4477 get_itbl(node)->type == RBH ||
4478 get_itbl(node)->type == MVAR);
4480 ASSERT(node!=(StgClosure*)NULL); // sanity check
4482 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4486 Print a whole blocking queue starting with the element bqe.
4489 print_bqe (StgBlockingQueueElement *bqe)
4494 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4496 for (end = (bqe==END_BQ_QUEUE);
4497 !end; // iterate until bqe points to a CONSTR
4498 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4499 bqe = end ? END_BQ_QUEUE : bqe->link) {
4500 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4501 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4502 /* types of closures that may appear in a blocking queue */
4503 ASSERT(get_itbl(bqe)->type == TSO ||
4504 get_itbl(bqe)->type == BLOCKED_FETCH ||
4505 get_itbl(bqe)->type == CONSTR);
4506 /* only BQs of an RBH end with an RBH_Save closure */
4507 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4509 switch (get_itbl(bqe)->type) {
4511 debugBelch(" TSO %u (%x),",
4512 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4515 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4516 ((StgBlockedFetch *)bqe)->node,
4517 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4518 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4519 ((StgBlockedFetch *)bqe)->ga.weight);
4522 debugBelch(" %s (IP %p),",
4523 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4524 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4525 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4526 "RBH_Save_?"), get_itbl(bqe));
4529 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4530 info_type((StgClosure *)bqe)); // , node, info_type(node));
4536 # elif defined(GRAN)
4538 print_bq (StgClosure *node)
4540 StgBlockingQueueElement *bqe;
4541 PEs node_loc, tso_loc;
4544 /* should cover all closures that may have a blocking queue */
4545 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4546 get_itbl(node)->type == FETCH_ME_BQ ||
4547 get_itbl(node)->type == RBH);
4549 ASSERT(node!=(StgClosure*)NULL); // sanity check
4550 node_loc = where_is(node);
4552 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4553 node, info_type(node), node_loc);
4556 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4558 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4559 !end; // iterate until bqe points to a CONSTR
4560 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4561 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4562 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4563 /* types of closures that may appear in a blocking queue */
4564 ASSERT(get_itbl(bqe)->type == TSO ||
4565 get_itbl(bqe)->type == CONSTR);
4566 /* only BQs of an RBH end with an RBH_Save closure */
4567 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4569 tso_loc = where_is((StgClosure *)bqe);
4570 switch (get_itbl(bqe)->type) {
4572 debugBelch(" TSO %d (%p) on [PE %d],",
4573 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4576 debugBelch(" %s (IP %p),",
4577 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4578 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4579 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4580 "RBH_Save_?"), get_itbl(bqe));
4583 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4584 info_type((StgClosure *)bqe), node, info_type(node));
4592 #if defined(PARALLEL_HASKELL)
4599 for (i=0, tso=run_queue_hd;
4600 tso != END_TSO_QUEUE;
4601 i++, tso=tso->link) {