1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
75 /* -----------------------------------------------------------------------------
77 * -------------------------------------------------------------------------- */
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
85 In GranSim we have a runnable and a blocked queue for each processor.
86 In order to minimise code changes new arrays run_queue_hds/tls
87 are created. run_queue_hd is then a short cut (macro) for
88 run_queue_hds[CurrentProc] (see GranSim.h).
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95 the std RTS (i.e. we are cheating). However, we don't use this list in
96 the GranSim specific code at the moment (so we are only potentially
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
108 /* Threads blocked on blackholes.
109 * LOCK: sched_mutex+capability, or all capabilities
111 StgTSO *blackhole_queue = NULL;
114 /* The blackhole_queue should be checked for threads to wake up. See
115 * Schedule.h for more thorough comment.
116 * LOCK: none (doesn't matter if we miss an update)
118 rtsBool blackholes_need_checking = rtsFalse;
120 /* Linked list of all threads.
121 * Used for detecting garbage collected threads.
122 * LOCK: sched_mutex+capability, or all capabilities
124 StgTSO *all_threads = NULL;
126 /* flag set by signal handler to precipitate a context switch
127 * LOCK: none (just an advisory flag)
129 int context_switch = 0;
131 /* flag that tracks whether we have done any execution in this time slice.
132 * LOCK: currently none, perhaps we should lock (but needs to be
133 * updated in the fast path of the scheduler).
135 nat recent_activity = ACTIVITY_YES;
137 /* if this flag is set as well, give up execution
138 * LOCK: none (changes once, from false->true)
140 rtsBool sched_state = SCHED_RUNNING;
142 /* Next thread ID to allocate.
145 static StgThreadID next_thread_id = 1;
147 /* The smallest stack size that makes any sense is:
148 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
149 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
150 * + 1 (the closure to enter)
152 * + 1 (spare slot req'd by stg_ap_v_ret)
154 * A thread with this stack will bomb immediately with a stack
155 * overflow, which will increase its stack size.
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
163 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
164 * exists - earlier gccs apparently didn't.
170 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171 * in an MT setting, needed to signal that a worker thread shouldn't hang around
172 * in the scheduler when it is out of work.
174 rtsBool shutting_down_scheduler = rtsFalse;
177 * This mutex protects most of the global scheduler data in
178 * the THREADED_RTS runtime.
180 #if defined(THREADED_RTS)
184 #if defined(PARALLEL_HASKELL)
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
190 /* -----------------------------------------------------------------------------
191 * static function prototypes
192 * -------------------------------------------------------------------------- */
194 static Capability *schedule (Capability *initialCapability, Task *task);
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
225 nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static Capability *scheduleDoGC(Capability *cap, Task *task,
232 void (*get_roots)(evac_fn));
234 static void unblockThread(Capability *cap, StgTSO *tso);
235 static rtsBool checkBlackHoles(Capability *cap);
236 static void AllRoots(evac_fn evac);
238 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
240 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
241 rtsBool stop_at_atomically, StgPtr stop_here);
243 static void deleteThread (Capability *cap, StgTSO *tso);
244 static void deleteAllThreads (Capability *cap);
247 static void printThreadBlockage(StgTSO *tso);
248 static void printThreadStatus(StgTSO *tso);
249 void printThreadQueue(StgTSO *tso);
252 #if defined(PARALLEL_HASKELL)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);
258 static char *whatNext_strs[] = {
268 /* -----------------------------------------------------------------------------
269 * Putting a thread on the run queue: different scheduling policies
270 * -------------------------------------------------------------------------- */
273 addToRunQueue( Capability *cap, StgTSO *t )
275 #if defined(PARALLEL_HASKELL)
276 if (RtsFlags.ParFlags.doFairScheduling) {
277 // this does round-robin scheduling; good for concurrency
278 appendToRunQueue(cap,t);
280 // this does unfair scheduling; good for parallelism
281 pushOnRunQueue(cap,t);
284 // this does round-robin scheduling; good for concurrency
285 appendToRunQueue(cap,t);
289 /* ---------------------------------------------------------------------------
290 Main scheduling loop.
292 We use round-robin scheduling, each thread returning to the
293 scheduler loop when one of these conditions is detected:
296 * timer expires (thread yields)
302 In a GranSim setup this loop iterates over the global event queue.
303 This revolves around the global event queue, which determines what
304 to do next. Therefore, it's more complicated than either the
305 concurrent or the parallel (GUM) setup.
308 GUM iterates over incoming messages.
309 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
310 and sends out a fish whenever it has nothing to do; in-between
311 doing the actual reductions (shared code below) it processes the
312 incoming messages and deals with delayed operations
313 (see PendingFetches).
314 This is not the ugliest code you could imagine, but it's bloody close.
316 ------------------------------------------------------------------------ */
319 schedule (Capability *initialCapability, Task *task)
323 StgThreadReturnCode ret;
326 #elif defined(PARALLEL_HASKELL)
329 rtsBool receivedFinish = rtsFalse;
331 nat tp_size, sp_size; // stats only
336 #if defined(THREADED_RTS)
337 rtsBool first = rtsTrue;
340 cap = initialCapability;
342 // Pre-condition: this task owns initialCapability.
343 // The sched_mutex is *NOT* held
344 // NB. on return, we still hold a capability.
347 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
348 task, initialCapability);
353 // -----------------------------------------------------------
354 // Scheduler loop starts here:
356 #if defined(PARALLEL_HASKELL)
357 #define TERMINATION_CONDITION (!receivedFinish)
359 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
361 #define TERMINATION_CONDITION rtsTrue
364 while (TERMINATION_CONDITION) {
367 /* Choose the processor with the next event */
368 CurrentProc = event->proc;
369 CurrentTSO = event->tso;
372 #if defined(THREADED_RTS)
374 // don't yield the first time, we want a chance to run this
375 // thread for a bit, even if there are others banging at the
378 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
380 // Yield the capability to higher-priority tasks if necessary.
381 yieldCapability(&cap, task);
385 #if defined(THREADED_RTS)
386 schedulePushWork(cap,task);
389 // Check whether we have re-entered the RTS from Haskell without
390 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
392 if (cap->in_haskell) {
393 errorBelch("schedule: re-entered unsafely.\n"
394 " Perhaps a 'foreign import unsafe' should be 'safe'?");
395 stg_exit(EXIT_FAILURE);
398 // The interruption / shutdown sequence.
400 // In order to cleanly shut down the runtime, we want to:
401 // * make sure that all main threads return to their callers
402 // with the state 'Interrupted'.
403 // * clean up all OS threads assocated with the runtime
404 // * free all memory etc.
406 // So the sequence for ^C goes like this:
408 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
409 // arranges for some Capability to wake up
411 // * all threads in the system are halted, and the zombies are
412 // placed on the run queue for cleaning up. We acquire all
413 // the capabilities in order to delete the threads, this is
414 // done by scheduleDoGC() for convenience (because GC already
415 // needs to acquire all the capabilities). We can't kill
416 // threads involved in foreign calls.
418 // * sched_state := SCHED_INTERRUPTED
420 // * somebody calls shutdownHaskell(), which calls exitScheduler()
422 // * sched_state := SCHED_SHUTTING_DOWN
424 // * all workers exit when the run queue on their capability
425 // drains. All main threads will also exit when their TSO
426 // reaches the head of the run queue and they can return.
428 // * eventually all Capabilities will shut down, and the RTS can
431 // * We might be left with threads blocked in foreign calls,
432 // we should really attempt to kill these somehow (TODO);
434 switch (sched_state) {
437 case SCHED_INTERRUPTING:
438 IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
439 #if defined(THREADED_RTS)
440 discardSparksCap(cap);
442 /* scheduleDoGC() deletes all the threads */
443 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
445 case SCHED_INTERRUPTED:
446 IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
448 case SCHED_SHUTTING_DOWN:
449 IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
450 // If we are a worker, just exit. If we're a bound thread
451 // then we will exit below when we've removed our TSO from
453 if (task->tso == NULL && emptyRunQueue(cap)) {
458 barf("sched_state: %d", sched_state);
461 #if defined(THREADED_RTS)
462 // If the run queue is empty, take a spark and turn it into a thread.
464 if (emptyRunQueue(cap)) {
466 spark = findSpark(cap);
469 sched_belch("turning spark of closure %p into a thread",
470 (StgClosure *)spark));
471 createSparkThread(cap,spark);
475 #endif // THREADED_RTS
477 scheduleStartSignalHandlers(cap);
479 // Only check the black holes here if we've nothing else to do.
480 // During normal execution, the black hole list only gets checked
481 // at GC time, to avoid repeatedly traversing this possibly long
482 // list each time around the scheduler.
483 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
485 // Any threads that were woken up by other Capabilities get
486 // appended to our run queue.
487 if (!emptyWakeupQueue(cap)) {
488 ACQUIRE_LOCK(&cap->lock);
489 if (emptyRunQueue(cap)) {
490 cap->run_queue_hd = cap->wakeup_queue_hd;
491 cap->run_queue_tl = cap->wakeup_queue_tl;
493 cap->run_queue_tl->link = cap->wakeup_queue_hd;
494 cap->run_queue_tl = cap->wakeup_queue_tl;
496 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
497 RELEASE_LOCK(&cap->lock);
500 scheduleCheckBlockedThreads(cap);
502 scheduleDetectDeadlock(cap,task);
503 #if defined(THREADED_RTS)
504 cap = task->cap; // reload cap, it might have changed
507 // Normally, the only way we can get here with no threads to
508 // run is if a keyboard interrupt received during
509 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
510 // Additionally, it is not fatal for the
511 // threaded RTS to reach here with no threads to run.
513 // win32: might be here due to awaitEvent() being abandoned
514 // as a result of a console event having been delivered.
515 if ( emptyRunQueue(cap) ) {
516 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
517 ASSERT(sched_state >= SCHED_INTERRUPTING);
519 continue; // nothing to do
522 #if defined(PARALLEL_HASKELL)
523 scheduleSendPendingMessages();
524 if (emptyRunQueue(cap) && scheduleActivateSpark())
528 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
531 /* If we still have no work we need to send a FISH to get a spark
533 if (emptyRunQueue(cap)) {
534 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
535 ASSERT(rtsFalse); // should not happen at the moment
537 // from here: non-empty run queue.
538 // TODO: merge above case with this, only one call processMessages() !
539 if (PacketsWaiting()) { /* process incoming messages, if
540 any pending... only in else
541 because getRemoteWork waits for
543 receivedFinish = processMessages();
548 scheduleProcessEvent(event);
552 // Get a thread to run
554 t = popRunQueue(cap);
556 #if defined(GRAN) || defined(PAR)
557 scheduleGranParReport(); // some kind of debuging output
559 // Sanity check the thread we're about to run. This can be
560 // expensive if there is lots of thread switching going on...
561 IF_DEBUG(sanity,checkTSO(t));
564 #if defined(THREADED_RTS)
565 // Check whether we can run this thread in the current task.
566 // If not, we have to pass our capability to the right task.
568 Task *bound = t->bound;
573 sched_belch("### Running thread %d in bound thread",
575 // yes, the Haskell thread is bound to the current native thread
578 sched_belch("### thread %d bound to another OS thread",
580 // no, bound to a different Haskell thread: pass to that thread
581 pushOnRunQueue(cap,t);
585 // The thread we want to run is unbound.
588 sched_belch("### this OS thread cannot run thread %d", t->id));
589 // no, the current native thread is bound to a different
590 // Haskell thread, so pass it to any worker thread
591 pushOnRunQueue(cap,t);
598 cap->r.rCurrentTSO = t;
600 /* context switches are initiated by the timer signal, unless
601 * the user specified "context switch as often as possible", with
604 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
605 && !emptyThreadQueues(cap)) {
611 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
612 (long)t->id, whatNext_strs[t->what_next]));
614 #if defined(PROFILING)
615 startHeapProfTimer();
618 // ----------------------------------------------------------------------
619 // Run the current thread
621 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
622 ASSERT(t->cap == cap);
624 prev_what_next = t->what_next;
626 errno = t->saved_errno;
627 cap->in_haskell = rtsTrue;
631 recent_activity = ACTIVITY_YES;
633 switch (prev_what_next) {
637 /* Thread already finished, return to scheduler. */
638 ret = ThreadFinished;
644 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
645 cap = regTableToCapability(r);
650 case ThreadInterpret:
651 cap = interpretBCO(cap);
656 barf("schedule: invalid what_next field");
659 cap->in_haskell = rtsFalse;
661 // The TSO might have moved, eg. if it re-entered the RTS and a GC
662 // happened. So find the new location:
663 t = cap->r.rCurrentTSO;
665 // We have run some Haskell code: there might be blackhole-blocked
666 // threads to wake up now.
667 // Lock-free test here should be ok, we're just setting a flag.
668 if ( blackhole_queue != END_TSO_QUEUE ) {
669 blackholes_need_checking = rtsTrue;
672 // And save the current errno in this thread.
673 // XXX: possibly bogus for SMP because this thread might already
674 // be running again, see code below.
675 t->saved_errno = errno;
677 #if defined(THREADED_RTS)
678 // If ret is ThreadBlocked, and this Task is bound to the TSO that
679 // blocked, we are in limbo - the TSO is now owned by whatever it
680 // is blocked on, and may in fact already have been woken up,
681 // perhaps even on a different Capability. It may be the case
682 // that task->cap != cap. We better yield this Capability
683 // immediately and return to normaility.
684 if (ret == ThreadBlocked) {
686 sched_belch("--<< thread %d (%s) stopped: blocked\n",
687 t->id, whatNext_strs[t->what_next]));
692 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
693 ASSERT(t->cap == cap);
695 // ----------------------------------------------------------------------
697 // Costs for the scheduler are assigned to CCS_SYSTEM
698 #if defined(PROFILING)
703 #if defined(THREADED_RTS)
704 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
705 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
706 IF_DEBUG(scheduler,debugBelch("sched: "););
709 schedulePostRunThread();
711 ready_to_gc = rtsFalse;
715 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
719 scheduleHandleStackOverflow(cap,task,t);
723 if (scheduleHandleYield(cap, t, prev_what_next)) {
724 // shortcut for switching between compiler/interpreter:
730 scheduleHandleThreadBlocked(t);
734 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
735 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
739 barf("schedule: invalid thread return code %d", (int)ret);
742 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
744 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
746 } /* end of while() */
748 IF_PAR_DEBUG(verbose,
749 debugBelch("== Leaving schedule() after having received Finish\n"));
752 /* ----------------------------------------------------------------------------
753 * Setting up the scheduler loop
754 * ------------------------------------------------------------------------- */
757 schedulePreLoop(void)
760 /* set up first event to get things going */
761 /* ToDo: assign costs for system setup and init MainTSO ! */
762 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
764 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
767 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
769 G_TSO(CurrentTSO, 5));
771 if (RtsFlags.GranFlags.Light) {
772 /* Save current time; GranSim Light only */
773 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
778 /* -----------------------------------------------------------------------------
781 * Push work to other Capabilities if we have some.
782 * -------------------------------------------------------------------------- */
784 #if defined(THREADED_RTS)
786 schedulePushWork(Capability *cap USED_IF_THREADS,
787 Task *task USED_IF_THREADS)
789 Capability *free_caps[n_capabilities], *cap0;
792 // migration can be turned off with +RTS -qg
793 if (!RtsFlags.ParFlags.migrate) return;
795 // Check whether we have more threads on our run queue, or sparks
796 // in our pool, that we could hand to another Capability.
797 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
798 && sparkPoolSizeCap(cap) < 2) {
802 // First grab as many free Capabilities as we can.
803 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
804 cap0 = &capabilities[i];
805 if (cap != cap0 && tryGrabCapability(cap0,task)) {
806 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
807 // it already has some work, we just grabbed it at
808 // the wrong moment. Or maybe it's deadlocked!
809 releaseCapability(cap0);
811 free_caps[n_free_caps++] = cap0;
816 // we now have n_free_caps free capabilities stashed in
817 // free_caps[]. Share our run queue equally with them. This is
818 // probably the simplest thing we could do; improvements we might
819 // want to do include:
821 // - giving high priority to moving relatively new threads, on
822 // the gournds that they haven't had time to build up a
823 // working set in the cache on this CPU/Capability.
825 // - giving low priority to moving long-lived threads
827 if (n_free_caps > 0) {
828 StgTSO *prev, *t, *next;
829 rtsBool pushed_to_all;
831 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
834 pushed_to_all = rtsFalse;
836 if (cap->run_queue_hd != END_TSO_QUEUE) {
837 prev = cap->run_queue_hd;
839 prev->link = END_TSO_QUEUE;
840 for (; t != END_TSO_QUEUE; t = next) {
842 t->link = END_TSO_QUEUE;
843 if (t->what_next == ThreadRelocated
844 || t->bound == task) { // don't move my bound thread
847 } else if (i == n_free_caps) {
848 pushed_to_all = rtsTrue;
854 IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
855 appendToRunQueue(free_caps[i],t);
856 if (t->bound) { t->bound->cap = free_caps[i]; }
857 t->cap = free_caps[i];
861 cap->run_queue_tl = prev;
864 // If there are some free capabilities that we didn't push any
865 // threads to, then try to push a spark to each one.
866 if (!pushed_to_all) {
868 // i is the next free capability to push to
869 for (; i < n_free_caps; i++) {
870 if (emptySparkPoolCap(free_caps[i])) {
871 spark = findSpark(cap);
873 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
874 newSpark(&(free_caps[i]->r), spark);
880 // release the capabilities
881 for (i = 0; i < n_free_caps; i++) {
882 task->cap = free_caps[i];
883 releaseCapability(free_caps[i]);
886 task->cap = cap; // reset to point to our Capability.
890 /* ----------------------------------------------------------------------------
891 * Start any pending signal handlers
892 * ------------------------------------------------------------------------- */
894 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
896 scheduleStartSignalHandlers(Capability *cap)
898 if (signals_pending()) { // safe outside the lock
899 startSignalHandlers(cap);
904 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
909 /* ----------------------------------------------------------------------------
910 * Check for blocked threads that can be woken up.
911 * ------------------------------------------------------------------------- */
914 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
916 #if !defined(THREADED_RTS)
918 // Check whether any waiting threads need to be woken up. If the
919 // run queue is empty, and there are no other tasks running, we
920 // can wait indefinitely for something to happen.
922 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
924 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
930 /* ----------------------------------------------------------------------------
931 * Check for threads blocked on BLACKHOLEs that can be woken up
932 * ------------------------------------------------------------------------- */
934 scheduleCheckBlackHoles (Capability *cap)
936 if ( blackholes_need_checking ) // check without the lock first
938 ACQUIRE_LOCK(&sched_mutex);
939 if ( blackholes_need_checking ) {
940 checkBlackHoles(cap);
941 blackholes_need_checking = rtsFalse;
943 RELEASE_LOCK(&sched_mutex);
947 /* ----------------------------------------------------------------------------
948 * Detect deadlock conditions and attempt to resolve them.
949 * ------------------------------------------------------------------------- */
952 scheduleDetectDeadlock (Capability *cap, Task *task)
955 #if defined(PARALLEL_HASKELL)
956 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
961 * Detect deadlock: when we have no threads to run, there are no
962 * threads blocked, waiting for I/O, or sleeping, and all the
963 * other tasks are waiting for work, we must have a deadlock of
966 if ( emptyThreadQueues(cap) )
968 #if defined(THREADED_RTS)
970 * In the threaded RTS, we only check for deadlock if there
971 * has been no activity in a complete timeslice. This means
972 * we won't eagerly start a full GC just because we don't have
973 * any threads to run currently.
975 if (recent_activity != ACTIVITY_INACTIVE) return;
978 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
980 // Garbage collection can release some new threads due to
981 // either (a) finalizers or (b) threads resurrected because
982 // they are unreachable and will therefore be sent an
983 // exception. Any threads thus released will be immediately
985 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
987 recent_activity = ACTIVITY_DONE_GC;
989 if ( !emptyRunQueue(cap) ) return;
991 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
992 /* If we have user-installed signal handlers, then wait
993 * for signals to arrive rather then bombing out with a
996 if ( anyUserHandlers() ) {
998 sched_belch("still deadlocked, waiting for signals..."));
1002 if (signals_pending()) {
1003 startSignalHandlers(cap);
1006 // either we have threads to run, or we were interrupted:
1007 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1011 #if !defined(THREADED_RTS)
1012 /* Probably a real deadlock. Send the current main thread the
1013 * Deadlock exception.
1016 switch (task->tso->why_blocked) {
1018 case BlockedOnBlackHole:
1019 case BlockedOnException:
1021 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1024 barf("deadlock: main thread blocked in a strange way");
1032 /* ----------------------------------------------------------------------------
1033 * Process an event (GRAN only)
1034 * ------------------------------------------------------------------------- */
1038 scheduleProcessEvent(rtsEvent *event)
1042 if (RtsFlags.GranFlags.Light)
1043 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1045 /* adjust time based on time-stamp */
1046 if (event->time > CurrentTime[CurrentProc] &&
1047 event->evttype != ContinueThread)
1048 CurrentTime[CurrentProc] = event->time;
1050 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1051 if (!RtsFlags.GranFlags.Light)
1054 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1056 /* main event dispatcher in GranSim */
1057 switch (event->evttype) {
1058 /* Should just be continuing execution */
1059 case ContinueThread:
1060 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1061 /* ToDo: check assertion
1062 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1063 run_queue_hd != END_TSO_QUEUE);
1065 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1066 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1067 procStatus[CurrentProc]==Fetching) {
1068 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1069 CurrentTSO->id, CurrentTSO, CurrentProc);
1072 /* Ignore ContinueThreads for completed threads */
1073 if (CurrentTSO->what_next == ThreadComplete) {
1074 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1075 CurrentTSO->id, CurrentTSO, CurrentProc);
1078 /* Ignore ContinueThreads for threads that are being migrated */
1079 if (PROCS(CurrentTSO)==Nowhere) {
1080 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1081 CurrentTSO->id, CurrentTSO, CurrentProc);
1084 /* The thread should be at the beginning of the run queue */
1085 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1086 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1087 CurrentTSO->id, CurrentTSO, CurrentProc);
1088 break; // run the thread anyway
1091 new_event(proc, proc, CurrentTime[proc],
1093 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1095 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1096 break; // now actually run the thread; DaH Qu'vam yImuHbej
1099 do_the_fetchnode(event);
1100 goto next_thread; /* handle next event in event queue */
1103 do_the_globalblock(event);
1104 goto next_thread; /* handle next event in event queue */
1107 do_the_fetchreply(event);
1108 goto next_thread; /* handle next event in event queue */
1110 case UnblockThread: /* Move from the blocked queue to the tail of */
1111 do_the_unblock(event);
1112 goto next_thread; /* handle next event in event queue */
1114 case ResumeThread: /* Move from the blocked queue to the tail of */
1115 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1116 event->tso->gran.blocktime +=
1117 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1118 do_the_startthread(event);
1119 goto next_thread; /* handle next event in event queue */
1122 do_the_startthread(event);
1123 goto next_thread; /* handle next event in event queue */
1126 do_the_movethread(event);
1127 goto next_thread; /* handle next event in event queue */
1130 do_the_movespark(event);
1131 goto next_thread; /* handle next event in event queue */
1134 do_the_findwork(event);
1135 goto next_thread; /* handle next event in event queue */
1138 barf("Illegal event type %u\n", event->evttype);
1141 /* This point was scheduler_loop in the old RTS */
1143 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1145 TimeOfLastEvent = CurrentTime[CurrentProc];
1146 TimeOfNextEvent = get_time_of_next_event();
1147 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1148 // CurrentTSO = ThreadQueueHd;
1150 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1153 if (RtsFlags.GranFlags.Light)
1154 GranSimLight_leave_system(event, &ActiveTSO);
1156 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1159 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1161 /* in a GranSim setup the TSO stays on the run queue */
1163 /* Take a thread from the run queue. */
1164 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1167 debugBelch("GRAN: About to run current thread, which is\n");
1170 context_switch = 0; // turned on via GranYield, checking events and time slice
1173 DumpGranEvent(GR_SCHEDULE, t));
1175 procStatus[CurrentProc] = Busy;
1179 /* ----------------------------------------------------------------------------
1180 * Send pending messages (PARALLEL_HASKELL only)
1181 * ------------------------------------------------------------------------- */
1183 #if defined(PARALLEL_HASKELL)
1185 scheduleSendPendingMessages(void)
1191 # if defined(PAR) // global Mem.Mgmt., omit for now
1192 if (PendingFetches != END_BF_QUEUE) {
1197 if (RtsFlags.ParFlags.BufferTime) {
1198 // if we use message buffering, we must send away all message
1199 // packets which have become too old...
1205 /* ----------------------------------------------------------------------------
1206 * Activate spark threads (PARALLEL_HASKELL only)
1207 * ------------------------------------------------------------------------- */
1209 #if defined(PARALLEL_HASKELL)
1211 scheduleActivateSpark(void)
1214 ASSERT(emptyRunQueue());
1215 /* We get here if the run queue is empty and want some work.
1216 We try to turn a spark into a thread, and add it to the run queue,
1217 from where it will be picked up in the next iteration of the scheduler
1221 /* :-[ no local threads => look out for local sparks */
1222 /* the spark pool for the current PE */
1223 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1224 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1225 pool->hd < pool->tl) {
1227 * ToDo: add GC code check that we really have enough heap afterwards!!
1229 * If we're here (no runnable threads) and we have pending
1230 * sparks, we must have a space problem. Get enough space
1231 * to turn one of those pending sparks into a
1235 spark = findSpark(rtsFalse); /* get a spark */
1236 if (spark != (rtsSpark) NULL) {
1237 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1238 IF_PAR_DEBUG(fish, // schedule,
1239 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1240 tso->id, tso, advisory_thread_count));
1242 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1243 IF_PAR_DEBUG(fish, // schedule,
1244 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1246 return rtsFalse; /* failed to generate a thread */
1247 } /* otherwise fall through & pick-up new tso */
1249 IF_PAR_DEBUG(fish, // schedule,
1250 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1251 spark_queue_len(pool)));
1252 return rtsFalse; /* failed to generate a thread */
1254 return rtsTrue; /* success in generating a thread */
1255 } else { /* no more threads permitted or pool empty */
1256 return rtsFalse; /* failed to generateThread */
1259 tso = NULL; // avoid compiler warning only
1260 return rtsFalse; /* dummy in non-PAR setup */
1263 #endif // PARALLEL_HASKELL
1265 /* ----------------------------------------------------------------------------
1266 * Get work from a remote node (PARALLEL_HASKELL only)
1267 * ------------------------------------------------------------------------- */
1269 #if defined(PARALLEL_HASKELL)
1271 scheduleGetRemoteWork(rtsBool *receivedFinish)
1273 ASSERT(emptyRunQueue());
1275 if (RtsFlags.ParFlags.BufferTime) {
1276 IF_PAR_DEBUG(verbose,
1277 debugBelch("...send all pending data,"));
1280 for (i=1; i<=nPEs; i++)
1281 sendImmediately(i); // send all messages away immediately
1285 //++EDEN++ idle() , i.e. send all buffers, wait for work
1286 // suppress fishing in EDEN... just look for incoming messages
1287 // (blocking receive)
1288 IF_PAR_DEBUG(verbose,
1289 debugBelch("...wait for incoming messages...\n"));
1290 *receivedFinish = processMessages(); // blocking receive...
1292 // and reenter scheduling loop after having received something
1293 // (return rtsFalse below)
1295 # else /* activate SPARKS machinery */
1296 /* We get here, if we have no work, tried to activate a local spark, but still
1297 have no work. We try to get a remote spark, by sending a FISH message.
1298 Thread migration should be added here, and triggered when a sequence of
1299 fishes returns without work. */
1300 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1302 /* =8-[ no local sparks => look for work on other PEs */
1304 * We really have absolutely no work. Send out a fish
1305 * (there may be some out there already), and wait for
1306 * something to arrive. We clearly can't run any threads
1307 * until a SCHEDULE or RESUME arrives, and so that's what
1308 * we're hoping to see. (Of course, we still have to
1309 * respond to other types of messages.)
1311 rtsTime now = msTime() /*CURRENT_TIME*/;
1312 IF_PAR_DEBUG(verbose,
1313 debugBelch("-- now=%ld\n", now));
1314 IF_PAR_DEBUG(fish, // verbose,
1315 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1316 (last_fish_arrived_at!=0 &&
1317 last_fish_arrived_at+delay > now)) {
1318 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1319 now, last_fish_arrived_at+delay,
1320 last_fish_arrived_at,
1324 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1325 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1326 if (last_fish_arrived_at==0 ||
1327 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1328 /* outstandingFishes is set in sendFish, processFish;
1329 avoid flooding system with fishes via delay */
1330 next_fish_to_send_at = 0;
1332 /* ToDo: this should be done in the main scheduling loop to avoid the
1333 busy wait here; not so bad if fish delay is very small */
1334 int iq = 0; // DEBUGGING -- HWL
1335 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1336 /* send a fish when ready, but process messages that arrive in the meantime */
1338 if (PacketsWaiting()) {
1340 *receivedFinish = processMessages();
1343 } while (!*receivedFinish || now<next_fish_to_send_at);
1344 // JB: This means the fish could become obsolete, if we receive
1345 // work. Better check for work again?
1346 // last line: while (!receivedFinish || !haveWork || now<...)
1347 // next line: if (receivedFinish || haveWork )
1349 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1350 return rtsFalse; // NB: this will leave scheduler loop
1351 // immediately after return!
1353 IF_PAR_DEBUG(fish, // verbose,
1354 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1358 // JB: IMHO, this should all be hidden inside sendFish(...)
1360 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1363 // Global statistics: count no. of fishes
1364 if (RtsFlags.ParFlags.ParStats.Global &&
1365 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1366 globalParStats.tot_fish_mess++;
1370 /* delayed fishes must have been sent by now! */
1371 next_fish_to_send_at = 0;
1374 *receivedFinish = processMessages();
1375 # endif /* SPARKS */
1378 /* NB: this function always returns rtsFalse, meaning the scheduler
1379 loop continues with the next iteration;
1381 return code means success in finding work; we enter this function
1382 if there is no local work, thus have to send a fish which takes
1383 time until it arrives with work; in the meantime we should process
1384 messages in the main loop;
1387 #endif // PARALLEL_HASKELL
1389 /* ----------------------------------------------------------------------------
1390 * PAR/GRAN: Report stats & debugging info(?)
1391 * ------------------------------------------------------------------------- */
1393 #if defined(PAR) || defined(GRAN)
1395 scheduleGranParReport(void)
1397 ASSERT(run_queue_hd != END_TSO_QUEUE);
1399 /* Take a thread from the run queue, if we have work */
1400 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1402 /* If this TSO has got its outport closed in the meantime,
1403 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1404 * It has to be marked as TH_DEAD for this purpose.
1405 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1407 JB: TODO: investigate wether state change field could be nuked
1408 entirely and replaced by the normal tso state (whatnext
1409 field). All we want to do is to kill tsos from outside.
1412 /* ToDo: write something to the log-file
1413 if (RTSflags.ParFlags.granSimStats && !sameThread)
1414 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1418 /* the spark pool for the current PE */
1419 pool = &(cap.r.rSparks); // cap = (old) MainCap
1422 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1423 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1426 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1427 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1429 if (RtsFlags.ParFlags.ParStats.Full &&
1430 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1431 (emitSchedule || // forced emit
1432 (t && LastTSO && t->id != LastTSO->id))) {
1434 we are running a different TSO, so write a schedule event to log file
1435 NB: If we use fair scheduling we also have to write a deschedule
1436 event for LastTSO; with unfair scheduling we know that the
1437 previous tso has blocked whenever we switch to another tso, so
1438 we don't need it in GUM for now
1440 IF_PAR_DEBUG(fish, // schedule,
1441 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1443 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1444 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1445 emitSchedule = rtsFalse;
1450 /* ----------------------------------------------------------------------------
1451 * After running a thread...
1452 * ------------------------------------------------------------------------- */
1455 schedulePostRunThread(void)
1458 /* HACK 675: if the last thread didn't yield, make sure to print a
1459 SCHEDULE event to the log file when StgRunning the next thread, even
1460 if it is the same one as before */
1462 TimeOfLastYield = CURRENT_TIME;
1465 /* some statistics gathering in the parallel case */
1467 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1471 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1472 globalGranStats.tot_heapover++;
1474 globalParStats.tot_heapover++;
1481 DumpGranEvent(GR_DESCHEDULE, t));
1482 globalGranStats.tot_stackover++;
1485 // DumpGranEvent(GR_DESCHEDULE, t);
1486 globalParStats.tot_stackover++;
1490 case ThreadYielding:
1493 DumpGranEvent(GR_DESCHEDULE, t));
1494 globalGranStats.tot_yields++;
1497 // DumpGranEvent(GR_DESCHEDULE, t);
1498 globalParStats.tot_yields++;
1505 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1506 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1507 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1508 if (t->block_info.closure!=(StgClosure*)NULL)
1509 print_bq(t->block_info.closure);
1512 // ??? needed; should emit block before
1514 DumpGranEvent(GR_DESCHEDULE, t));
1515 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1518 ASSERT(procStatus[CurrentProc]==Busy ||
1519 ((procStatus[CurrentProc]==Fetching) &&
1520 (t->block_info.closure!=(StgClosure*)NULL)));
1521 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1522 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1523 procStatus[CurrentProc]==Fetching))
1524 procStatus[CurrentProc] = Idle;
1527 //++PAR++ blockThread() writes the event (change?)
1531 case ThreadFinished:
1535 barf("parGlobalStats: unknown return code");
1541 /* -----------------------------------------------------------------------------
1542 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1543 * -------------------------------------------------------------------------- */
1546 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1548 // did the task ask for a large block?
1549 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1550 // if so, get one and push it on the front of the nursery.
1554 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1557 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1558 (long)t->id, whatNext_strs[t->what_next], blocks));
1560 // don't do this if the nursery is (nearly) full, we'll GC first.
1561 if (cap->r.rCurrentNursery->link != NULL ||
1562 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1563 // if the nursery has only one block.
1566 bd = allocGroup( blocks );
1568 cap->r.rNursery->n_blocks += blocks;
1570 // link the new group into the list
1571 bd->link = cap->r.rCurrentNursery;
1572 bd->u.back = cap->r.rCurrentNursery->u.back;
1573 if (cap->r.rCurrentNursery->u.back != NULL) {
1574 cap->r.rCurrentNursery->u.back->link = bd;
1576 #if !defined(THREADED_RTS)
1577 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1578 g0s0 == cap->r.rNursery);
1580 cap->r.rNursery->blocks = bd;
1582 cap->r.rCurrentNursery->u.back = bd;
1584 // initialise it as a nursery block. We initialise the
1585 // step, gen_no, and flags field of *every* sub-block in
1586 // this large block, because this is easier than making
1587 // sure that we always find the block head of a large
1588 // block whenever we call Bdescr() (eg. evacuate() and
1589 // isAlive() in the GC would both have to do this, at
1593 for (x = bd; x < bd + blocks; x++) {
1594 x->step = cap->r.rNursery;
1600 // This assert can be a killer if the app is doing lots
1601 // of large block allocations.
1602 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1604 // now update the nursery to point to the new block
1605 cap->r.rCurrentNursery = bd;
1607 // we might be unlucky and have another thread get on the
1608 // run queue before us and steal the large block, but in that
1609 // case the thread will just end up requesting another large
1611 pushOnRunQueue(cap,t);
1612 return rtsFalse; /* not actually GC'ing */
1617 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1618 (long)t->id, whatNext_strs[t->what_next]));
1620 ASSERT(!is_on_queue(t,CurrentProc));
1621 #elif defined(PARALLEL_HASKELL)
1622 /* Currently we emit a DESCHEDULE event before GC in GUM.
1623 ToDo: either add separate event to distinguish SYSTEM time from rest
1624 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1625 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1626 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1627 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1628 emitSchedule = rtsTrue;
1632 pushOnRunQueue(cap,t);
1634 /* actual GC is done at the end of the while loop in schedule() */
1637 /* -----------------------------------------------------------------------------
1638 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1639 * -------------------------------------------------------------------------- */
1642 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1644 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1645 (long)t->id, whatNext_strs[t->what_next]));
1646 /* just adjust the stack for this thread, then pop it back
1650 /* enlarge the stack */
1651 StgTSO *new_t = threadStackOverflow(cap, t);
1653 /* The TSO attached to this Task may have moved, so update the
1656 if (task->tso == t) {
1659 pushOnRunQueue(cap,new_t);
1663 /* -----------------------------------------------------------------------------
1664 * Handle a thread that returned to the scheduler with ThreadYielding
1665 * -------------------------------------------------------------------------- */
1668 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1670 // Reset the context switch flag. We don't do this just before
1671 // running the thread, because that would mean we would lose ticks
1672 // during GC, which can lead to unfair scheduling (a thread hogs
1673 // the CPU because the tick always arrives during GC). This way
1674 // penalises threads that do a lot of allocation, but that seems
1675 // better than the alternative.
1678 /* put the thread back on the run queue. Then, if we're ready to
1679 * GC, check whether this is the last task to stop. If so, wake
1680 * up the GC thread. getThread will block during a GC until the
1684 if (t->what_next != prev_what_next) {
1685 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1686 (long)t->id, whatNext_strs[t->what_next]);
1688 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1689 (long)t->id, whatNext_strs[t->what_next]);
1694 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1696 ASSERT(t->link == END_TSO_QUEUE);
1698 // Shortcut if we're just switching evaluators: don't bother
1699 // doing stack squeezing (which can be expensive), just run the
1701 if (t->what_next != prev_what_next) {
1706 ASSERT(!is_on_queue(t,CurrentProc));
1709 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1710 checkThreadQsSanity(rtsTrue));
1714 addToRunQueue(cap,t);
1717 /* add a ContinueThread event to actually process the thread */
1718 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1720 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1722 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1729 /* -----------------------------------------------------------------------------
1730 * Handle a thread that returned to the scheduler with ThreadBlocked
1731 * -------------------------------------------------------------------------- */
1734 scheduleHandleThreadBlocked( StgTSO *t
1735 #if !defined(GRAN) && !defined(DEBUG)
1742 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1743 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1744 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1746 // ??? needed; should emit block before
1748 DumpGranEvent(GR_DESCHEDULE, t));
1749 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1752 ASSERT(procStatus[CurrentProc]==Busy ||
1753 ((procStatus[CurrentProc]==Fetching) &&
1754 (t->block_info.closure!=(StgClosure*)NULL)));
1755 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1756 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1757 procStatus[CurrentProc]==Fetching))
1758 procStatus[CurrentProc] = Idle;
1762 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1763 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1766 if (t->block_info.closure!=(StgClosure*)NULL)
1767 print_bq(t->block_info.closure));
1769 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1772 /* whatever we schedule next, we must log that schedule */
1773 emitSchedule = rtsTrue;
1777 // We don't need to do anything. The thread is blocked, and it
1778 // has tidied up its stack and placed itself on whatever queue
1779 // it needs to be on.
1781 #if !defined(THREADED_RTS)
1782 ASSERT(t->why_blocked != NotBlocked);
1783 // This might not be true under THREADED_RTS: we don't have
1784 // exclusive access to this TSO, so someone might have
1785 // woken it up by now. This actually happens: try
1786 // conc023 +RTS -N2.
1790 debugBelch("--<< thread %d (%s) stopped: ",
1791 t->id, whatNext_strs[t->what_next]);
1792 printThreadBlockage(t);
1795 /* Only for dumping event to log file
1796 ToDo: do I need this in GranSim, too?
1802 /* -----------------------------------------------------------------------------
1803 * Handle a thread that returned to the scheduler with ThreadFinished
1804 * -------------------------------------------------------------------------- */
1807 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1809 /* Need to check whether this was a main thread, and if so,
1810 * return with the return value.
1812 * We also end up here if the thread kills itself with an
1813 * uncaught exception, see Exception.cmm.
1815 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1816 t->id, whatNext_strs[t->what_next]));
1819 endThread(t, CurrentProc); // clean-up the thread
1820 #elif defined(PARALLEL_HASKELL)
1821 /* For now all are advisory -- HWL */
1822 //if(t->priority==AdvisoryPriority) ??
1823 advisory_thread_count--; // JB: Caution with this counter, buggy!
1826 if(t->dist.priority==RevalPriority)
1830 # if defined(EDENOLD)
1831 // the thread could still have an outport... (BUG)
1832 if (t->eden.outport != -1) {
1833 // delete the outport for the tso which has finished...
1834 IF_PAR_DEBUG(eden_ports,
1835 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1836 t->eden.outport, t->id));
1839 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1840 if (t->eden.epid != -1) {
1841 IF_PAR_DEBUG(eden_ports,
1842 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1843 t->id, t->eden.epid));
1844 removeTSOfromProcess(t);
1849 if (RtsFlags.ParFlags.ParStats.Full &&
1850 !RtsFlags.ParFlags.ParStats.Suppressed)
1851 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1853 // t->par only contains statistics: left out for now...
1855 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1856 t->id,t,t->par.sparkname));
1858 #endif // PARALLEL_HASKELL
1861 // Check whether the thread that just completed was a bound
1862 // thread, and if so return with the result.
1864 // There is an assumption here that all thread completion goes
1865 // through this point; we need to make sure that if a thread
1866 // ends up in the ThreadKilled state, that it stays on the run
1867 // queue so it can be dealt with here.
1872 if (t->bound != task) {
1873 #if !defined(THREADED_RTS)
1874 // Must be a bound thread that is not the topmost one. Leave
1875 // it on the run queue until the stack has unwound to the
1876 // point where we can deal with this. Leaving it on the run
1877 // queue also ensures that the garbage collector knows about
1878 // this thread and its return value (it gets dropped from the
1879 // all_threads list so there's no other way to find it).
1880 appendToRunQueue(cap,t);
1883 // this cannot happen in the threaded RTS, because a
1884 // bound thread can only be run by the appropriate Task.
1885 barf("finished bound thread that isn't mine");
1889 ASSERT(task->tso == t);
1891 if (t->what_next == ThreadComplete) {
1893 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1894 *(task->ret) = (StgClosure *)task->tso->sp[1];
1896 task->stat = Success;
1899 *(task->ret) = NULL;
1901 if (sched_state >= SCHED_INTERRUPTING) {
1902 task->stat = Interrupted;
1904 task->stat = Killed;
1908 removeThreadLabel((StgWord)task->tso->id);
1910 return rtsTrue; // tells schedule() to return
1916 /* -----------------------------------------------------------------------------
1917 * Perform a heap census, if PROFILING
1918 * -------------------------------------------------------------------------- */
1921 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1923 #if defined(PROFILING)
1924 // When we have +RTS -i0 and we're heap profiling, do a census at
1925 // every GC. This lets us get repeatable runs for debugging.
1926 if (performHeapProfile ||
1927 (RtsFlags.ProfFlags.profileInterval==0 &&
1928 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1930 // checking black holes is necessary before GC, otherwise
1931 // there may be threads that are unreachable except by the
1932 // blackhole queue, which the GC will consider to be
1934 scheduleCheckBlackHoles(&MainCapability);
1936 IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1937 GarbageCollect(GetRoots, rtsTrue);
1939 IF_DEBUG(scheduler, sched_belch("performing heap census"));
1942 performHeapProfile = rtsFalse;
1943 return rtsTrue; // true <=> we already GC'd
1949 /* -----------------------------------------------------------------------------
1950 * Perform a garbage collection if necessary
1951 * -------------------------------------------------------------------------- */
1954 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1955 rtsBool force_major, void (*get_roots)(evac_fn))
1959 static volatile StgWord waiting_for_gc;
1960 rtsBool was_waiting;
1965 // In order to GC, there must be no threads running Haskell code.
1966 // Therefore, the GC thread needs to hold *all* the capabilities,
1967 // and release them after the GC has completed.
1969 // This seems to be the simplest way: previous attempts involved
1970 // making all the threads with capabilities give up their
1971 // capabilities and sleep except for the *last* one, which
1972 // actually did the GC. But it's quite hard to arrange for all
1973 // the other tasks to sleep and stay asleep.
1976 was_waiting = cas(&waiting_for_gc, 0, 1);
1979 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1980 if (cap) yieldCapability(&cap,task);
1981 } while (waiting_for_gc);
1982 return cap; // NOTE: task->cap might have changed here
1985 for (i=0; i < n_capabilities; i++) {
1986 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1987 if (cap != &capabilities[i]) {
1988 Capability *pcap = &capabilities[i];
1989 // we better hope this task doesn't get migrated to
1990 // another Capability while we're waiting for this one.
1991 // It won't, because load balancing happens while we have
1992 // all the Capabilities, but even so it's a slightly
1993 // unsavoury invariant.
1996 waitForReturnCapability(&pcap, task);
1997 if (pcap != &capabilities[i]) {
1998 barf("scheduleDoGC: got the wrong capability");
2003 waiting_for_gc = rtsFalse;
2006 /* Kick any transactions which are invalid back to their
2007 * atomically frames. When next scheduled they will try to
2008 * commit, this commit will fail and they will retry.
2013 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2014 if (t->what_next == ThreadRelocated) {
2017 next = t->global_link;
2018 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2019 if (!stmValidateNestOfTransactions (t -> trec)) {
2020 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2022 // strip the stack back to the
2023 // ATOMICALLY_FRAME, aborting the (nested)
2024 // transaction, and saving the stack of any
2025 // partially-evaluated thunks on the heap.
2026 raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2029 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2037 // so this happens periodically:
2038 if (cap) scheduleCheckBlackHoles(cap);
2040 IF_DEBUG(scheduler, printAllThreads());
2043 * We now have all the capabilities; if we're in an interrupting
2044 * state, then we should take the opportunity to delete all the
2045 * threads in the system.
2047 if (sched_state >= SCHED_INTERRUPTING) {
2048 deleteAllThreads(&capabilities[0]);
2049 sched_state = SCHED_INTERRUPTED;
2052 /* everybody back, start the GC.
2053 * Could do it in this thread, or signal a condition var
2054 * to do it in another thread. Either way, we need to
2055 * broadcast on gc_pending_cond afterward.
2057 #if defined(THREADED_RTS)
2058 IF_DEBUG(scheduler,sched_belch("doing GC"));
2060 GarbageCollect(get_roots, force_major);
2062 #if defined(THREADED_RTS)
2063 // release our stash of capabilities.
2064 for (i = 0; i < n_capabilities; i++) {
2065 if (cap != &capabilities[i]) {
2066 task->cap = &capabilities[i];
2067 releaseCapability(&capabilities[i]);
2078 /* add a ContinueThread event to continue execution of current thread */
2079 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2081 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2083 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2091 /* ---------------------------------------------------------------------------
2092 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2093 * used by Control.Concurrent for error checking.
2094 * ------------------------------------------------------------------------- */
2097 rtsSupportsBoundThreads(void)
2099 #if defined(THREADED_RTS)
2106 /* ---------------------------------------------------------------------------
2107 * isThreadBound(tso): check whether tso is bound to an OS thread.
2108 * ------------------------------------------------------------------------- */
2111 isThreadBound(StgTSO* tso USED_IF_THREADS)
2113 #if defined(THREADED_RTS)
2114 return (tso->bound != NULL);
2119 /* ---------------------------------------------------------------------------
2120 * Singleton fork(). Do not copy any running threads.
2121 * ------------------------------------------------------------------------- */
2123 #if !defined(mingw32_HOST_OS)
2124 #define FORKPROCESS_PRIMOP_SUPPORTED
2127 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2129 deleteThread_(Capability *cap, StgTSO *tso);
2132 forkProcess(HsStablePtr *entry
2133 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2138 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2144 #if defined(THREADED_RTS)
2145 if (RtsFlags.ParFlags.nNodes > 1) {
2146 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2147 stg_exit(EXIT_FAILURE);
2151 IF_DEBUG(scheduler,sched_belch("forking!"));
2153 // ToDo: for SMP, we should probably acquire *all* the capabilities
2158 if (pid) { // parent
2160 // just return the pid
2166 // Now, all OS threads except the thread that forked are
2167 // stopped. We need to stop all Haskell threads, including
2168 // those involved in foreign calls. Also we need to delete
2169 // all Tasks, because they correspond to OS threads that are
2172 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2173 if (t->what_next == ThreadRelocated) {
2176 next = t->global_link;
2177 // don't allow threads to catch the ThreadKilled
2178 // exception, but we do want to raiseAsync() because these
2179 // threads may be evaluating thunks that we need later.
2180 deleteThread_(cap,t);
2184 // Empty the run queue. It seems tempting to let all the
2185 // killed threads stay on the run queue as zombies to be
2186 // cleaned up later, but some of them correspond to bound
2187 // threads for which the corresponding Task does not exist.
2188 cap->run_queue_hd = END_TSO_QUEUE;
2189 cap->run_queue_tl = END_TSO_QUEUE;
2191 // Any suspended C-calling Tasks are no more, their OS threads
2193 cap->suspended_ccalling_tasks = NULL;
2195 // Empty the all_threads list. Otherwise, the garbage
2196 // collector may attempt to resurrect some of these threads.
2197 all_threads = END_TSO_QUEUE;
2199 // Wipe the task list, except the current Task.
2200 ACQUIRE_LOCK(&sched_mutex);
2201 for (task = all_tasks; task != NULL; task=task->all_link) {
2202 if (task != cap->running_task) {
2206 RELEASE_LOCK(&sched_mutex);
2208 #if defined(THREADED_RTS)
2209 // Wipe our spare workers list, they no longer exist. New
2210 // workers will be created if necessary.
2211 cap->spare_workers = NULL;
2212 cap->returning_tasks_hd = NULL;
2213 cap->returning_tasks_tl = NULL;
2216 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2217 rts_checkSchedStatus("forkProcess",cap);
2220 hs_exit(); // clean up and exit
2221 stg_exit(EXIT_SUCCESS);
2223 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2224 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2229 /* ---------------------------------------------------------------------------
2230 * Delete all the threads in the system
2231 * ------------------------------------------------------------------------- */
2234 deleteAllThreads ( Capability *cap )
2237 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2238 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2239 if (t->what_next == ThreadRelocated) {
2242 next = t->global_link;
2243 deleteThread(cap,t);
2247 // The run queue now contains a bunch of ThreadKilled threads. We
2248 // must not throw these away: the main thread(s) will be in there
2249 // somewhere, and the main scheduler loop has to deal with it.
2250 // Also, the run queue is the only thing keeping these threads from
2251 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2253 #if !defined(THREADED_RTS)
2254 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2255 ASSERT(sleeping_queue == END_TSO_QUEUE);
2259 /* -----------------------------------------------------------------------------
2260 Managing the suspended_ccalling_tasks list.
2261 Locks required: sched_mutex
2262 -------------------------------------------------------------------------- */
2265 suspendTask (Capability *cap, Task *task)
2267 ASSERT(task->next == NULL && task->prev == NULL);
2268 task->next = cap->suspended_ccalling_tasks;
2270 if (cap->suspended_ccalling_tasks) {
2271 cap->suspended_ccalling_tasks->prev = task;
2273 cap->suspended_ccalling_tasks = task;
2277 recoverSuspendedTask (Capability *cap, Task *task)
2280 task->prev->next = task->next;
2282 ASSERT(cap->suspended_ccalling_tasks == task);
2283 cap->suspended_ccalling_tasks = task->next;
2286 task->next->prev = task->prev;
2288 task->next = task->prev = NULL;
2291 /* ---------------------------------------------------------------------------
2292 * Suspending & resuming Haskell threads.
2294 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2295 * its capability before calling the C function. This allows another
2296 * task to pick up the capability and carry on running Haskell
2297 * threads. It also means that if the C call blocks, it won't lock
2300 * The Haskell thread making the C call is put to sleep for the
2301 * duration of the call, on the susepended_ccalling_threads queue. We
2302 * give out a token to the task, which it can use to resume the thread
2303 * on return from the C function.
2304 * ------------------------------------------------------------------------- */
2307 suspendThread (StgRegTable *reg)
2310 int saved_errno = errno;
2314 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2316 cap = regTableToCapability(reg);
2318 task = cap->running_task;
2319 tso = cap->r.rCurrentTSO;
2322 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2324 // XXX this might not be necessary --SDM
2325 tso->what_next = ThreadRunGHC;
2327 threadPaused(cap,tso);
2329 if(tso->blocked_exceptions == NULL) {
2330 tso->why_blocked = BlockedOnCCall;
2331 tso->blocked_exceptions = END_TSO_QUEUE;
2333 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2336 // Hand back capability
2337 task->suspended_tso = tso;
2339 ACQUIRE_LOCK(&cap->lock);
2341 suspendTask(cap,task);
2342 cap->in_haskell = rtsFalse;
2343 releaseCapability_(cap);
2345 RELEASE_LOCK(&cap->lock);
2347 #if defined(THREADED_RTS)
2348 /* Preparing to leave the RTS, so ensure there's a native thread/task
2349 waiting to take over.
2351 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2354 errno = saved_errno;
2359 resumeThread (void *task_)
2363 int saved_errno = errno;
2367 // Wait for permission to re-enter the RTS with the result.
2368 waitForReturnCapability(&cap,task);
2369 // we might be on a different capability now... but if so, our
2370 // entry on the suspended_ccalling_tasks list will also have been
2373 // Remove the thread from the suspended list
2374 recoverSuspendedTask(cap,task);
2376 tso = task->suspended_tso;
2377 task->suspended_tso = NULL;
2378 tso->link = END_TSO_QUEUE;
2379 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2381 if (tso->why_blocked == BlockedOnCCall) {
2382 awakenBlockedQueue(cap,tso->blocked_exceptions);
2383 tso->blocked_exceptions = NULL;
2386 /* Reset blocking status */
2387 tso->why_blocked = NotBlocked;
2389 cap->r.rCurrentTSO = tso;
2390 cap->in_haskell = rtsTrue;
2391 errno = saved_errno;
2393 /* We might have GC'd, mark the TSO dirty again */
2396 IF_DEBUG(sanity, checkTSO(tso));
2401 /* ---------------------------------------------------------------------------
2402 * Comparing Thread ids.
2404 * This is used from STG land in the implementation of the
2405 * instances of Eq/Ord for ThreadIds.
2406 * ------------------------------------------------------------------------ */
2409 cmp_thread(StgPtr tso1, StgPtr tso2)
2411 StgThreadID id1 = ((StgTSO *)tso1)->id;
2412 StgThreadID id2 = ((StgTSO *)tso2)->id;
2414 if (id1 < id2) return (-1);
2415 if (id1 > id2) return 1;
2419 /* ---------------------------------------------------------------------------
2420 * Fetching the ThreadID from an StgTSO.
2422 * This is used in the implementation of Show for ThreadIds.
2423 * ------------------------------------------------------------------------ */
2425 rts_getThreadId(StgPtr tso)
2427 return ((StgTSO *)tso)->id;
2432 labelThread(StgPtr tso, char *label)
2437 /* Caveat: Once set, you can only set the thread name to "" */
2438 len = strlen(label)+1;
2439 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2440 strncpy(buf,label,len);
2441 /* Update will free the old memory for us */
2442 updateThreadLabel(((StgTSO *)tso)->id,buf);
2446 /* ---------------------------------------------------------------------------
2447 Create a new thread.
2449 The new thread starts with the given stack size. Before the
2450 scheduler can run, however, this thread needs to have a closure
2451 (and possibly some arguments) pushed on its stack. See
2452 pushClosure() in Schedule.h.
2454 createGenThread() and createIOThread() (in SchedAPI.h) are
2455 convenient packaged versions of this function.
2457 currently pri (priority) is only used in a GRAN setup -- HWL
2458 ------------------------------------------------------------------------ */
2460 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2462 createThread(nat size, StgInt pri)
2465 createThread(Capability *cap, nat size)
2471 /* sched_mutex is *not* required */
2473 /* First check whether we should create a thread at all */
2474 #if defined(PARALLEL_HASKELL)
2475 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2476 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2478 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2479 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2480 return END_TSO_QUEUE;
2486 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2489 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2491 /* catch ridiculously small stack sizes */
2492 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2493 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2496 stack_size = size - TSO_STRUCT_SIZEW;
2498 tso = (StgTSO *)allocateLocal(cap, size);
2499 TICK_ALLOC_TSO(stack_size, 0);
2501 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2503 SET_GRAN_HDR(tso, ThisPE);
2506 // Always start with the compiled code evaluator
2507 tso->what_next = ThreadRunGHC;
2509 tso->why_blocked = NotBlocked;
2510 tso->blocked_exceptions = NULL;
2511 tso->flags = TSO_DIRTY;
2513 tso->saved_errno = 0;
2517 tso->stack_size = stack_size;
2518 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2520 tso->sp = (P_)&(tso->stack) + stack_size;
2522 tso->trec = NO_TREC;
2525 tso->prof.CCCS = CCS_MAIN;
2528 /* put a stop frame on the stack */
2529 tso->sp -= sizeofW(StgStopFrame);
2530 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2531 tso->link = END_TSO_QUEUE;
2535 /* uses more flexible routine in GranSim */
2536 insertThread(tso, CurrentProc);
2538 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2544 if (RtsFlags.GranFlags.GranSimStats.Full)
2545 DumpGranEvent(GR_START,tso);
2546 #elif defined(PARALLEL_HASKELL)
2547 if (RtsFlags.ParFlags.ParStats.Full)
2548 DumpGranEvent(GR_STARTQ,tso);
2549 /* HACk to avoid SCHEDULE
2553 /* Link the new thread on the global thread list.
2555 ACQUIRE_LOCK(&sched_mutex);
2556 tso->id = next_thread_id++; // while we have the mutex
2557 tso->global_link = all_threads;
2559 RELEASE_LOCK(&sched_mutex);
2562 tso->dist.priority = MandatoryPriority; //by default that is...
2566 tso->gran.pri = pri;
2568 tso->gran.magic = TSO_MAGIC; // debugging only
2570 tso->gran.sparkname = 0;
2571 tso->gran.startedat = CURRENT_TIME;
2572 tso->gran.exported = 0;
2573 tso->gran.basicblocks = 0;
2574 tso->gran.allocs = 0;
2575 tso->gran.exectime = 0;
2576 tso->gran.fetchtime = 0;
2577 tso->gran.fetchcount = 0;
2578 tso->gran.blocktime = 0;
2579 tso->gran.blockcount = 0;
2580 tso->gran.blockedat = 0;
2581 tso->gran.globalsparks = 0;
2582 tso->gran.localsparks = 0;
2583 if (RtsFlags.GranFlags.Light)
2584 tso->gran.clock = Now; /* local clock */
2586 tso->gran.clock = 0;
2588 IF_DEBUG(gran,printTSO(tso));
2589 #elif defined(PARALLEL_HASKELL)
2591 tso->par.magic = TSO_MAGIC; // debugging only
2593 tso->par.sparkname = 0;
2594 tso->par.startedat = CURRENT_TIME;
2595 tso->par.exported = 0;
2596 tso->par.basicblocks = 0;
2597 tso->par.allocs = 0;
2598 tso->par.exectime = 0;
2599 tso->par.fetchtime = 0;
2600 tso->par.fetchcount = 0;
2601 tso->par.blocktime = 0;
2602 tso->par.blockcount = 0;
2603 tso->par.blockedat = 0;
2604 tso->par.globalsparks = 0;
2605 tso->par.localsparks = 0;
2609 globalGranStats.tot_threads_created++;
2610 globalGranStats.threads_created_on_PE[CurrentProc]++;
2611 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2612 globalGranStats.tot_sq_probes++;
2613 #elif defined(PARALLEL_HASKELL)
2614 // collect parallel global statistics (currently done together with GC stats)
2615 if (RtsFlags.ParFlags.ParStats.Global &&
2616 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2617 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2618 globalParStats.tot_threads_created++;
2624 sched_belch("==__ schedule: Created TSO %d (%p);",
2625 CurrentProc, tso, tso->id));
2626 #elif defined(PARALLEL_HASKELL)
2627 IF_PAR_DEBUG(verbose,
2628 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2629 (long)tso->id, tso, advisory_thread_count));
2631 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2632 (long)tso->id, (long)tso->stack_size));
2639 all parallel thread creation calls should fall through the following routine.
2642 createThreadFromSpark(rtsSpark spark)
2644 ASSERT(spark != (rtsSpark)NULL);
2645 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2646 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2648 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2649 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2650 return END_TSO_QUEUE;
2654 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2655 if (tso==END_TSO_QUEUE)
2656 barf("createSparkThread: Cannot create TSO");
2658 tso->priority = AdvisoryPriority;
2660 pushClosure(tso,spark);
2662 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2669 Turn a spark into a thread.
2670 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2674 activateSpark (rtsSpark spark)
2678 tso = createSparkThread(spark);
2679 if (RtsFlags.ParFlags.ParStats.Full) {
2680 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2681 IF_PAR_DEBUG(verbose,
2682 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2683 (StgClosure *)spark, info_type((StgClosure *)spark)));
2685 // ToDo: fwd info on local/global spark to thread -- HWL
2686 // tso->gran.exported = spark->exported;
2687 // tso->gran.locked = !spark->global;
2688 // tso->gran.sparkname = spark->name;
2694 /* ---------------------------------------------------------------------------
2697 * scheduleThread puts a thread on the end of the runnable queue.
2698 * This will usually be done immediately after a thread is created.
2699 * The caller of scheduleThread must create the thread using e.g.
2700 * createThread and push an appropriate closure
2701 * on this thread's stack before the scheduler is invoked.
2702 * ------------------------------------------------------------------------ */
2705 scheduleThread(Capability *cap, StgTSO *tso)
2707 // The thread goes at the *end* of the run-queue, to avoid possible
2708 // starvation of any threads already on the queue.
2709 appendToRunQueue(cap,tso);
2713 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2717 // We already created/initialised the Task
2718 task = cap->running_task;
2720 // This TSO is now a bound thread; make the Task and TSO
2721 // point to each other.
2727 task->stat = NoStatus;
2729 appendToRunQueue(cap,tso);
2731 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2734 /* GranSim specific init */
2735 CurrentTSO = m->tso; // the TSO to run
2736 procStatus[MainProc] = Busy; // status of main PE
2737 CurrentProc = MainProc; // PE to run it on
2740 cap = schedule(cap,task);
2742 ASSERT(task->stat != NoStatus);
2743 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2745 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2749 /* ----------------------------------------------------------------------------
2751 * ------------------------------------------------------------------------- */
2753 #if defined(THREADED_RTS)
2755 workerStart(Task *task)
2759 // See startWorkerTask().
2760 ACQUIRE_LOCK(&task->lock);
2762 RELEASE_LOCK(&task->lock);
2764 // set the thread-local pointer to the Task:
2767 // schedule() runs without a lock.
2768 cap = schedule(cap,task);
2770 // On exit from schedule(), we have a Capability.
2771 releaseCapability(cap);
2776 /* ---------------------------------------------------------------------------
2779 * Initialise the scheduler. This resets all the queues - if the
2780 * queues contained any threads, they'll be garbage collected at the
2783 * ------------------------------------------------------------------------ */
2790 for (i=0; i<=MAX_PROC; i++) {
2791 run_queue_hds[i] = END_TSO_QUEUE;
2792 run_queue_tls[i] = END_TSO_QUEUE;
2793 blocked_queue_hds[i] = END_TSO_QUEUE;
2794 blocked_queue_tls[i] = END_TSO_QUEUE;
2795 ccalling_threadss[i] = END_TSO_QUEUE;
2796 blackhole_queue[i] = END_TSO_QUEUE;
2797 sleeping_queue = END_TSO_QUEUE;
2799 #elif !defined(THREADED_RTS)
2800 blocked_queue_hd = END_TSO_QUEUE;
2801 blocked_queue_tl = END_TSO_QUEUE;
2802 sleeping_queue = END_TSO_QUEUE;
2805 blackhole_queue = END_TSO_QUEUE;
2806 all_threads = END_TSO_QUEUE;
2809 sched_state = SCHED_RUNNING;
2811 RtsFlags.ConcFlags.ctxtSwitchTicks =
2812 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2814 #if defined(THREADED_RTS)
2815 /* Initialise the mutex and condition variables used by
2817 initMutex(&sched_mutex);
2820 ACQUIRE_LOCK(&sched_mutex);
2822 /* A capability holds the state a native thread needs in
2823 * order to execute STG code. At least one capability is
2824 * floating around (only THREADED_RTS builds have more than one).
2830 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2834 #if defined(THREADED_RTS)
2836 * Eagerly start one worker to run each Capability, except for
2837 * Capability 0. The idea is that we're probably going to start a
2838 * bound thread on Capability 0 pretty soon, so we don't want a
2839 * worker task hogging it.
2844 for (i = 1; i < n_capabilities; i++) {
2845 cap = &capabilities[i];
2846 ACQUIRE_LOCK(&cap->lock);
2847 startWorkerTask(cap, workerStart);
2848 RELEASE_LOCK(&cap->lock);
2853 RELEASE_LOCK(&sched_mutex);
2857 exitScheduler( void )
2861 #if defined(THREADED_RTS)
2862 ACQUIRE_LOCK(&sched_mutex);
2863 task = newBoundTask();
2864 RELEASE_LOCK(&sched_mutex);
2867 // If we haven't killed all the threads yet, do it now.
2868 if (sched_state < SCHED_INTERRUPTED) {
2869 sched_state = SCHED_INTERRUPTING;
2870 scheduleDoGC(NULL,task,rtsFalse,GetRoots);
2872 sched_state = SCHED_SHUTTING_DOWN;
2874 #if defined(THREADED_RTS)
2878 for (i = 0; i < n_capabilities; i++) {
2879 shutdownCapability(&capabilities[i], task);
2881 boundTaskExiting(task);
2887 /* ---------------------------------------------------------------------------
2888 Where are the roots that we know about?
2890 - all the threads on the runnable queue
2891 - all the threads on the blocked queue
2892 - all the threads on the sleeping queue
2893 - all the thread currently executing a _ccall_GC
2894 - all the "main threads"
2896 ------------------------------------------------------------------------ */
2898 /* This has to be protected either by the scheduler monitor, or by the
2899 garbage collection monitor (probably the latter).
2904 GetRoots( evac_fn evac )
2911 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2912 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2913 evac((StgClosure **)&run_queue_hds[i]);
2914 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2915 evac((StgClosure **)&run_queue_tls[i]);
2917 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2918 evac((StgClosure **)&blocked_queue_hds[i]);
2919 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2920 evac((StgClosure **)&blocked_queue_tls[i]);
2921 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2922 evac((StgClosure **)&ccalling_threads[i]);
2929 for (i = 0; i < n_capabilities; i++) {
2930 cap = &capabilities[i];
2931 evac((StgClosure **)(void *)&cap->run_queue_hd);
2932 evac((StgClosure **)(void *)&cap->run_queue_tl);
2933 #if defined(THREADED_RTS)
2934 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2935 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2937 for (task = cap->suspended_ccalling_tasks; task != NULL;
2939 IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2940 evac((StgClosure **)(void *)&task->suspended_tso);
2946 #if !defined(THREADED_RTS)
2947 evac((StgClosure **)(void *)&blocked_queue_hd);
2948 evac((StgClosure **)(void *)&blocked_queue_tl);
2949 evac((StgClosure **)(void *)&sleeping_queue);
2953 // evac((StgClosure **)&blackhole_queue);
2955 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2956 markSparkQueue(evac);
2959 #if defined(RTS_USER_SIGNALS)
2960 // mark the signal handlers (signals should be already blocked)
2961 markSignalHandlers(evac);
2965 /* -----------------------------------------------------------------------------
2968 This is the interface to the garbage collector from Haskell land.
2969 We provide this so that external C code can allocate and garbage
2970 collect when called from Haskell via _ccall_GC.
2972 It might be useful to provide an interface whereby the programmer
2973 can specify more roots (ToDo).
2975 This needs to be protected by the GC condition variable above. KH.
2976 -------------------------------------------------------------------------- */
2978 static void (*extra_roots)(evac_fn);
2981 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2983 Task *task = myTask();
2986 ACQUIRE_LOCK(&sched_mutex);
2987 task = newBoundTask();
2988 RELEASE_LOCK(&sched_mutex);
2989 scheduleDoGC(NULL,task,force_major, get_roots);
2990 boundTaskExiting(task);
2992 scheduleDoGC(NULL,task,force_major, get_roots);
2999 performGC_(rtsFalse, GetRoots);
3003 performMajorGC(void)
3005 performGC_(rtsTrue, GetRoots);
3009 AllRoots(evac_fn evac)
3011 GetRoots(evac); // the scheduler's roots
3012 extra_roots(evac); // the user's roots
3016 performGCWithRoots(void (*get_roots)(evac_fn))
3018 extra_roots = get_roots;
3019 performGC_(rtsFalse, AllRoots);
3022 /* -----------------------------------------------------------------------------
3025 If the thread has reached its maximum stack size, then raise the
3026 StackOverflow exception in the offending thread. Otherwise
3027 relocate the TSO into a larger chunk of memory and adjust its stack
3029 -------------------------------------------------------------------------- */
3032 threadStackOverflow(Capability *cap, StgTSO *tso)
3034 nat new_stack_size, stack_words;
3039 IF_DEBUG(sanity,checkTSO(tso));
3040 if (tso->stack_size >= tso->max_stack_size) {
3043 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3044 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3045 /* If we're debugging, just print out the top of the stack */
3046 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
3049 /* Send this thread the StackOverflow exception */
3050 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3054 /* Try to double the current stack size. If that takes us over the
3055 * maximum stack size for this thread, then use the maximum instead.
3056 * Finally round up so the TSO ends up as a whole number of blocks.
3058 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3059 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
3060 TSO_STRUCT_SIZE)/sizeof(W_);
3061 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
3062 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3064 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3066 dest = (StgTSO *)allocate(new_tso_size);
3067 TICK_ALLOC_TSO(new_stack_size,0);
3069 /* copy the TSO block and the old stack into the new area */
3070 memcpy(dest,tso,TSO_STRUCT_SIZE);
3071 stack_words = tso->stack + tso->stack_size - tso->sp;
3072 new_sp = (P_)dest + new_tso_size - stack_words;
3073 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3075 /* relocate the stack pointers... */
3077 dest->stack_size = new_stack_size;
3079 /* Mark the old TSO as relocated. We have to check for relocated
3080 * TSOs in the garbage collector and any primops that deal with TSOs.
3082 * It's important to set the sp value to just beyond the end
3083 * of the stack, so we don't attempt to scavenge any part of the
3086 tso->what_next = ThreadRelocated;
3088 tso->sp = (P_)&(tso->stack[tso->stack_size]);
3089 tso->why_blocked = NotBlocked;
3091 IF_PAR_DEBUG(verbose,
3092 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3093 tso->id, tso, tso->stack_size);
3094 /* If we're debugging, just print out the top of the stack */
3095 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
3098 IF_DEBUG(sanity,checkTSO(tso));
3100 IF_DEBUG(scheduler,printTSO(dest));
3106 /* ---------------------------------------------------------------------------
3107 Wake up a queue that was blocked on some resource.
3108 ------------------------------------------------------------------------ */
3112 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3115 #elif defined(PARALLEL_HASKELL)
3117 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3119 /* write RESUME events to log file and
3120 update blocked and fetch time (depending on type of the orig closure) */
3121 if (RtsFlags.ParFlags.ParStats.Full) {
3122 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
3123 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3124 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3125 if (emptyRunQueue())
3126 emitSchedule = rtsTrue;
3128 switch (get_itbl(node)->type) {
3130 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3135 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3142 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3149 StgBlockingQueueElement *
3150 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3153 PEs node_loc, tso_loc;
3155 node_loc = where_is(node); // should be lifted out of loop
3156 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3157 tso_loc = where_is((StgClosure *)tso);
3158 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3159 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3160 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3161 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3162 // insertThread(tso, node_loc);
3163 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3165 tso, node, (rtsSpark*)NULL);
3166 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3169 } else { // TSO is remote (actually should be FMBQ)
3170 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3171 RtsFlags.GranFlags.Costs.gunblocktime +
3172 RtsFlags.GranFlags.Costs.latency;
3173 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3175 tso, node, (rtsSpark*)NULL);
3176 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3179 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3181 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3182 (node_loc==tso_loc ? "Local" : "Global"),
3183 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3184 tso->block_info.closure = NULL;
3185 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3188 #elif defined(PARALLEL_HASKELL)
3189 StgBlockingQueueElement *
3190 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3192 StgBlockingQueueElement *next;
3194 switch (get_itbl(bqe)->type) {
3196 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3197 /* if it's a TSO just push it onto the run_queue */
3199 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3200 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3202 unblockCount(bqe, node);
3203 /* reset blocking status after dumping event */
3204 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3208 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3210 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3211 PendingFetches = (StgBlockedFetch *)bqe;
3215 /* can ignore this case in a non-debugging setup;
3216 see comments on RBHSave closures above */
3218 /* check that the closure is an RBHSave closure */
3219 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3220 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3221 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3225 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3226 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3230 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3236 unblockOne(Capability *cap, StgTSO *tso)
3240 ASSERT(get_itbl(tso)->type == TSO);
3241 ASSERT(tso->why_blocked != NotBlocked);
3243 tso->why_blocked = NotBlocked;
3245 tso->link = END_TSO_QUEUE;
3247 if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
3248 // We are waking up this thread on the current Capability, which
3249 // might involve migrating it from the Capability it was last on.
3251 ASSERT(tso->bound->cap == tso->cap);
3252 tso->bound->cap = cap;
3255 appendToRunQueue(cap,tso);
3256 // we're holding a newly woken thread, make sure we context switch
3257 // quickly so we can migrate it if necessary.
3260 // we'll try to wake it up on the Capability it was last on.
3261 wakeupThreadOnCapability(tso->cap, tso);
3264 IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
3271 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3273 StgBlockingQueueElement *bqe;
3278 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3279 node, CurrentProc, CurrentTime[CurrentProc],
3280 CurrentTSO->id, CurrentTSO));
3282 node_loc = where_is(node);
3284 ASSERT(q == END_BQ_QUEUE ||
3285 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3286 get_itbl(q)->type == CONSTR); // closure (type constructor)
3287 ASSERT(is_unique(node));
3289 /* FAKE FETCH: magically copy the node to the tso's proc;
3290 no Fetch necessary because in reality the node should not have been
3291 moved to the other PE in the first place
3293 if (CurrentProc!=node_loc) {
3295 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3296 node, node_loc, CurrentProc, CurrentTSO->id,
3297 // CurrentTSO, where_is(CurrentTSO),
3298 node->header.gran.procs));
3299 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3301 debugBelch("## new bitmask of node %p is %#x\n",
3302 node, node->header.gran.procs));
3303 if (RtsFlags.GranFlags.GranSimStats.Global) {
3304 globalGranStats.tot_fake_fetches++;
3309 // ToDo: check: ASSERT(CurrentProc==node_loc);
3310 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3313 bqe points to the current element in the queue
3314 next points to the next element in the queue
3316 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3317 //tso_loc = where_is(tso);
3319 bqe = unblockOne(bqe, node);
3322 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3323 the closure to make room for the anchor of the BQ */
3324 if (bqe!=END_BQ_QUEUE) {
3325 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3327 ASSERT((info_ptr==&RBH_Save_0_info) ||
3328 (info_ptr==&RBH_Save_1_info) ||
3329 (info_ptr==&RBH_Save_2_info));
3331 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3332 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3333 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3336 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3337 node, info_type(node)));
3340 /* statistics gathering */
3341 if (RtsFlags.GranFlags.GranSimStats.Global) {
3342 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3343 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3344 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3345 globalGranStats.tot_awbq++; // total no. of bqs awakened
3348 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3349 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3351 #elif defined(PARALLEL_HASKELL)
3353 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3355 StgBlockingQueueElement *bqe;
3357 IF_PAR_DEBUG(verbose,
3358 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3362 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3363 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3368 ASSERT(q == END_BQ_QUEUE ||
3369 get_itbl(q)->type == TSO ||
3370 get_itbl(q)->type == BLOCKED_FETCH ||
3371 get_itbl(q)->type == CONSTR);
3374 while (get_itbl(bqe)->type==TSO ||
3375 get_itbl(bqe)->type==BLOCKED_FETCH) {
3376 bqe = unblockOne(bqe, node);
3380 #else /* !GRAN && !PARALLEL_HASKELL */
3383 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3385 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3387 while (tso != END_TSO_QUEUE) {
3388 tso = unblockOne(cap,tso);
3393 /* ---------------------------------------------------------------------------
3395 - usually called inside a signal handler so it mustn't do anything fancy.
3396 ------------------------------------------------------------------------ */
3399 interruptStgRts(void)
3401 sched_state = SCHED_INTERRUPTING;
3403 #if defined(THREADED_RTS)
3404 prodAllCapabilities();
3408 /* -----------------------------------------------------------------------------
3411 This is for use when we raise an exception in another thread, which
3413 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3414 -------------------------------------------------------------------------- */
3416 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3418 NB: only the type of the blocking queue is different in GranSim and GUM
3419 the operations on the queue-elements are the same
3420 long live polymorphism!
3422 Locks: sched_mutex is held upon entry and exit.
3426 unblockThread(Capability *cap, StgTSO *tso)
3428 StgBlockingQueueElement *t, **last;
3430 switch (tso->why_blocked) {
3433 return; /* not blocked */
3436 // Be careful: nothing to do here! We tell the scheduler that the thread
3437 // is runnable and we leave it to the stack-walking code to abort the
3438 // transaction while unwinding the stack. We should perhaps have a debugging
3439 // test to make sure that this really happens and that the 'zombie' transaction
3440 // does not get committed.
3444 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3446 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3447 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3449 last = (StgBlockingQueueElement **)&mvar->head;
3450 for (t = (StgBlockingQueueElement *)mvar->head;
3452 last = &t->link, last_tso = t, t = t->link) {
3453 if (t == (StgBlockingQueueElement *)tso) {
3454 *last = (StgBlockingQueueElement *)tso->link;
3455 if (mvar->tail == tso) {
3456 mvar->tail = (StgTSO *)last_tso;
3461 barf("unblockThread (MVAR): TSO not found");
3464 case BlockedOnBlackHole:
3465 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3467 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3469 last = &bq->blocking_queue;
3470 for (t = bq->blocking_queue;
3472 last = &t->link, t = t->link) {
3473 if (t == (StgBlockingQueueElement *)tso) {
3474 *last = (StgBlockingQueueElement *)tso->link;
3478 barf("unblockThread (BLACKHOLE): TSO not found");
3481 case BlockedOnException:
3483 StgTSO *target = tso->block_info.tso;
3485 ASSERT(get_itbl(target)->type == TSO);
3487 if (target->what_next == ThreadRelocated) {
3488 target = target->link;
3489 ASSERT(get_itbl(target)->type == TSO);
3492 ASSERT(target->blocked_exceptions != NULL);
3494 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3495 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3497 last = &t->link, t = t->link) {
3498 ASSERT(get_itbl(t)->type == TSO);
3499 if (t == (StgBlockingQueueElement *)tso) {
3500 *last = (StgBlockingQueueElement *)tso->link;
3504 barf("unblockThread (Exception): TSO not found");
3508 case BlockedOnWrite:
3509 #if defined(mingw32_HOST_OS)
3510 case BlockedOnDoProc:
3513 /* take TSO off blocked_queue */
3514 StgBlockingQueueElement *prev = NULL;
3515 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3516 prev = t, t = t->link) {
3517 if (t == (StgBlockingQueueElement *)tso) {
3519 blocked_queue_hd = (StgTSO *)t->link;
3520 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3521 blocked_queue_tl = END_TSO_QUEUE;
3524 prev->link = t->link;
3525 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3526 blocked_queue_tl = (StgTSO *)prev;
3529 #if defined(mingw32_HOST_OS)
3530 /* (Cooperatively) signal that the worker thread should abort
3533 abandonWorkRequest(tso->block_info.async_result->reqID);
3538 barf("unblockThread (I/O): TSO not found");
3541 case BlockedOnDelay:
3543 /* take TSO off sleeping_queue */
3544 StgBlockingQueueElement *prev = NULL;
3545 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3546 prev = t, t = t->link) {
3547 if (t == (StgBlockingQueueElement *)tso) {
3549 sleeping_queue = (StgTSO *)t->link;
3551 prev->link = t->link;
3556 barf("unblockThread (delay): TSO not found");
3560 barf("unblockThread");
3564 tso->link = END_TSO_QUEUE;
3565 tso->why_blocked = NotBlocked;
3566 tso->block_info.closure = NULL;
3567 pushOnRunQueue(cap,tso);
3571 unblockThread(Capability *cap, StgTSO *tso)
3575 /* To avoid locking unnecessarily. */
3576 if (tso->why_blocked == NotBlocked) {
3580 switch (tso->why_blocked) {
3583 // Be careful: nothing to do here! We tell the scheduler that the thread
3584 // is runnable and we leave it to the stack-walking code to abort the
3585 // transaction while unwinding the stack. We should perhaps have a debugging
3586 // test to make sure that this really happens and that the 'zombie' transaction
3587 // does not get committed.
3591 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3593 StgTSO *last_tso = END_TSO_QUEUE;
3594 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3597 for (t = mvar->head; t != END_TSO_QUEUE;
3598 last = &t->link, last_tso = t, t = t->link) {
3601 if (mvar->tail == tso) {
3602 mvar->tail = last_tso;
3607 barf("unblockThread (MVAR): TSO not found");
3610 case BlockedOnBlackHole:
3612 last = &blackhole_queue;
3613 for (t = blackhole_queue; t != END_TSO_QUEUE;
3614 last = &t->link, t = t->link) {
3620 barf("unblockThread (BLACKHOLE): TSO not found");
3623 case BlockedOnException:
3625 StgTSO *target = tso->block_info.tso;
3627 ASSERT(get_itbl(target)->type == TSO);
3629 while (target->what_next == ThreadRelocated) {
3630 target = target->link;
3631 ASSERT(get_itbl(target)->type == TSO);
3634 ASSERT(target->blocked_exceptions != NULL);
3636 last = &target->blocked_exceptions;
3637 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3638 last = &t->link, t = t->link) {
3639 ASSERT(get_itbl(t)->type == TSO);
3645 barf("unblockThread (Exception): TSO not found");
3648 #if !defined(THREADED_RTS)
3650 case BlockedOnWrite:
3651 #if defined(mingw32_HOST_OS)
3652 case BlockedOnDoProc:
3655 StgTSO *prev = NULL;
3656 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3657 prev = t, t = t->link) {
3660 blocked_queue_hd = t->link;
3661 if (blocked_queue_tl == t) {
3662 blocked_queue_tl = END_TSO_QUEUE;
3665 prev->link = t->link;
3666 if (blocked_queue_tl == t) {
3667 blocked_queue_tl = prev;
3670 #if defined(mingw32_HOST_OS)
3671 /* (Cooperatively) signal that the worker thread should abort
3674 abandonWorkRequest(tso->block_info.async_result->reqID);
3679 barf("unblockThread (I/O): TSO not found");
3682 case BlockedOnDelay:
3684 StgTSO *prev = NULL;
3685 for (t = sleeping_queue; t != END_TSO_QUEUE;
3686 prev = t, t = t->link) {
3689 sleeping_queue = t->link;
3691 prev->link = t->link;
3696 barf("unblockThread (delay): TSO not found");
3701 barf("unblockThread");
3705 tso->link = END_TSO_QUEUE;
3706 tso->why_blocked = NotBlocked;
3707 tso->block_info.closure = NULL;
3708 appendToRunQueue(cap,tso);
3710 // We might have just migrated this TSO to our Capability:
3712 tso->bound->cap = cap;
3718 /* -----------------------------------------------------------------------------
3721 * Check the blackhole_queue for threads that can be woken up. We do
3722 * this periodically: before every GC, and whenever the run queue is
3725 * An elegant solution might be to just wake up all the blocked
3726 * threads with awakenBlockedQueue occasionally: they'll go back to
3727 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3728 * doesn't give us a way to tell whether we've actually managed to
3729 * wake up any threads, so we would be busy-waiting.
3731 * -------------------------------------------------------------------------- */
3734 checkBlackHoles (Capability *cap)
3737 rtsBool any_woke_up = rtsFalse;
3740 // blackhole_queue is global:
3741 ASSERT_LOCK_HELD(&sched_mutex);
3743 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3745 // ASSUMES: sched_mutex
3746 prev = &blackhole_queue;
3747 t = blackhole_queue;
3748 while (t != END_TSO_QUEUE) {
3749 ASSERT(t->why_blocked == BlockedOnBlackHole);
3750 type = get_itbl(t->block_info.closure)->type;
3751 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3752 IF_DEBUG(sanity,checkTSO(t));
3753 t = unblockOne(cap, t);
3754 // urk, the threads migrate to the current capability
3755 // here, but we'd like to keep them on the original one.
3757 any_woke_up = rtsTrue;
3767 /* -----------------------------------------------------------------------------
3770 * The following function implements the magic for raising an
3771 * asynchronous exception in an existing thread.
3773 * We first remove the thread from any queue on which it might be
3774 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3776 * We strip the stack down to the innermost CATCH_FRAME, building
3777 * thunks in the heap for all the active computations, so they can
3778 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3779 * an application of the handler to the exception, and push it on
3780 * the top of the stack.
3782 * How exactly do we save all the active computations? We create an
3783 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3784 * AP_STACKs pushes everything from the corresponding update frame
3785 * upwards onto the stack. (Actually, it pushes everything up to the
3786 * next update frame plus a pointer to the next AP_STACK object.
3787 * Entering the next AP_STACK object pushes more onto the stack until we
3788 * reach the last AP_STACK object - at which point the stack should look
3789 * exactly as it did when we killed the TSO and we can continue
3790 * execution by entering the closure on top of the stack.
3792 * We can also kill a thread entirely - this happens if either (a) the
3793 * exception passed to raiseAsync is NULL, or (b) there's no
3794 * CATCH_FRAME on the stack. In either case, we strip the entire
3795 * stack and replace the thread with a zombie.
3797 * ToDo: in THREADED_RTS mode, this function is only safe if either
3798 * (a) we hold all the Capabilities (eg. in GC, or if there is only
3799 * one Capability), or (b) we own the Capability that the TSO is
3800 * currently blocked on or on the run queue of.
3802 * -------------------------------------------------------------------------- */
3805 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3807 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3811 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3813 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3817 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3818 rtsBool stop_at_atomically, StgPtr stop_here)
3820 StgRetInfoTable *info;
3824 // Thread already dead?
3825 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3830 sched_belch("raising exception in thread %ld.", (long)tso->id));
3832 // Remove it from any blocking queues
3833 unblockThread(cap,tso);
3835 // mark it dirty; we're about to change its stack.
3840 // The stack freezing code assumes there's a closure pointer on
3841 // the top of the stack, so we have to arrange that this is the case...
3843 if (sp[0] == (W_)&stg_enter_info) {
3847 sp[0] = (W_)&stg_dummy_ret_closure;
3851 while (stop_here == NULL || frame < stop_here) {
3853 // 1. Let the top of the stack be the "current closure"
3855 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3858 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3859 // current closure applied to the chunk of stack up to (but not
3860 // including) the update frame. This closure becomes the "current
3861 // closure". Go back to step 2.
3863 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3864 // top of the stack applied to the exception.
3866 // 5. If it's a STOP_FRAME, then kill the thread.
3868 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3871 info = get_ret_itbl((StgClosure *)frame);
3873 switch (info->i.type) {
3880 // First build an AP_STACK consisting of the stack chunk above the
3881 // current update frame, with the top word on the stack as the
3884 words = frame - sp - 1;
3885 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3888 ap->fun = (StgClosure *)sp[0];
3890 for(i=0; i < (nat)words; ++i) {
3891 ap->payload[i] = (StgClosure *)*sp++;
3894 SET_HDR(ap,&stg_AP_STACK_info,
3895 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3896 TICK_ALLOC_UP_THK(words+1,0);
3899 debugBelch("sched: Updating ");
3900 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3901 debugBelch(" with ");
3902 printObj((StgClosure *)ap);
3905 // Replace the updatee with an indirection
3907 // Warning: if we're in a loop, more than one update frame on
3908 // the stack may point to the same object. Be careful not to
3909 // overwrite an IND_OLDGEN in this case, because we'll screw
3910 // up the mutable lists. To be on the safe side, don't
3911 // overwrite any kind of indirection at all. See also
3912 // threadSqueezeStack in GC.c, where we have to make a similar
3915 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3916 // revert the black hole
3917 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3920 sp += sizeofW(StgUpdateFrame) - 1;
3921 sp[0] = (W_)ap; // push onto stack
3923 continue; //no need to bump frame
3927 // We've stripped the entire stack, the thread is now dead.
3928 tso->what_next = ThreadKilled;
3929 tso->sp = frame + sizeofW(StgStopFrame);
3933 // If we find a CATCH_FRAME, and we've got an exception to raise,
3934 // then build the THUNK raise(exception), and leave it on
3935 // top of the CATCH_FRAME ready to enter.
3939 StgCatchFrame *cf = (StgCatchFrame *)frame;
3943 if (exception == NULL) break;
3945 // we've got an exception to raise, so let's pass it to the
3946 // handler in this frame.
3948 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3949 TICK_ALLOC_SE_THK(1,0);
3950 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3951 raise->payload[0] = exception;
3953 // throw away the stack from Sp up to the CATCH_FRAME.
3957 /* Ensure that async excpetions are blocked now, so we don't get
3958 * a surprise exception before we get around to executing the
3961 if (tso->blocked_exceptions == NULL) {
3962 tso->blocked_exceptions = END_TSO_QUEUE;
3965 /* Put the newly-built THUNK on top of the stack, ready to execute
3966 * when the thread restarts.
3969 sp[-1] = (W_)&stg_enter_info;
3971 tso->what_next = ThreadRunGHC;
3972 IF_DEBUG(sanity, checkTSO(tso));
3976 case ATOMICALLY_FRAME:
3977 if (stop_at_atomically) {
3978 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3979 stmCondemnTransaction(cap, tso -> trec);
3983 // R1 is not a register: the return convention for IO in
3984 // this case puts the return value on the stack, so we
3985 // need to set up the stack to return to the atomically
3986 // frame properly...
3987 tso->sp = frame - 2;
3988 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3989 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3991 tso->what_next = ThreadRunGHC;
3994 // Not stop_at_atomically... fall through and abort the
3997 case CATCH_RETRY_FRAME:
3998 // IF we find an ATOMICALLY_FRAME then we abort the
3999 // current transaction and propagate the exception. In
4000 // this case (unlike ordinary exceptions) we do not care
4001 // whether the transaction is valid or not because its
4002 // possible validity cannot have caused the exception
4003 // and will not be visible after the abort.
4005 debugBelch("Found atomically block delivering async exception\n"));
4006 StgTRecHeader *trec = tso -> trec;
4007 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4008 stmAbortTransaction(cap, trec);
4009 tso -> trec = outer;
4016 // move on to the next stack frame
4017 frame += stack_frame_sizeW((StgClosure *)frame);
4020 // if we got here, then we stopped at stop_here
4021 ASSERT(stop_here != NULL);
4024 /* -----------------------------------------------------------------------------
4027 This is used for interruption (^C) and forking, and corresponds to
4028 raising an exception but without letting the thread catch the
4030 -------------------------------------------------------------------------- */
4033 deleteThread (Capability *cap, StgTSO *tso)
4035 if (tso->why_blocked != BlockedOnCCall &&
4036 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4037 raiseAsync(cap,tso,NULL);
4041 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4043 deleteThread_(Capability *cap, StgTSO *tso)
4044 { // for forkProcess only:
4045 // like deleteThread(), but we delete threads in foreign calls, too.
4047 if (tso->why_blocked == BlockedOnCCall ||
4048 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4049 unblockOne(cap,tso);
4050 tso->what_next = ThreadKilled;
4052 deleteThread(cap,tso);
4057 /* -----------------------------------------------------------------------------
4058 raiseExceptionHelper
4060 This function is called by the raise# primitve, just so that we can
4061 move some of the tricky bits of raising an exception from C-- into
4062 C. Who knows, it might be a useful re-useable thing here too.
4063 -------------------------------------------------------------------------- */
4066 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4068 Capability *cap = regTableToCapability(reg);
4069 StgThunk *raise_closure = NULL;
4071 StgRetInfoTable *info;
4073 // This closure represents the expression 'raise# E' where E
4074 // is the exception raise. It is used to overwrite all the
4075 // thunks which are currently under evaluataion.
4078 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4079 // LDV profiling: stg_raise_info has THUNK as its closure
4080 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4081 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
4082 // 1 does not cause any problem unless profiling is performed.
4083 // However, when LDV profiling goes on, we need to linearly scan
4084 // small object pool, where raise_closure is stored, so we should
4085 // use MIN_UPD_SIZE.
4087 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4088 // sizeofW(StgClosure)+1);
4092 // Walk up the stack, looking for the catch frame. On the way,
4093 // we update any closures pointed to from update frames with the
4094 // raise closure that we just built.
4098 info = get_ret_itbl((StgClosure *)p);
4099 next = p + stack_frame_sizeW((StgClosure *)p);
4100 switch (info->i.type) {
4103 // Only create raise_closure if we need to.
4104 if (raise_closure == NULL) {
4106 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4107 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4108 raise_closure->payload[0] = exception;
4110 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4114 case ATOMICALLY_FRAME:
4115 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4117 return ATOMICALLY_FRAME;
4123 case CATCH_STM_FRAME:
4124 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4126 return CATCH_STM_FRAME;
4132 case CATCH_RETRY_FRAME:
4141 /* -----------------------------------------------------------------------------
4142 findRetryFrameHelper
4144 This function is called by the retry# primitive. It traverses the stack
4145 leaving tso->sp referring to the frame which should handle the retry.
4147 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
4148 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
4150 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4151 despite the similar implementation.
4153 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4154 not be created within memory transactions.
4155 -------------------------------------------------------------------------- */
4158 findRetryFrameHelper (StgTSO *tso)
4161 StgRetInfoTable *info;
4165 info = get_ret_itbl((StgClosure *)p);
4166 next = p + stack_frame_sizeW((StgClosure *)p);
4167 switch (info->i.type) {
4169 case ATOMICALLY_FRAME:
4170 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4172 return ATOMICALLY_FRAME;
4174 case CATCH_RETRY_FRAME:
4175 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4177 return CATCH_RETRY_FRAME;
4179 case CATCH_STM_FRAME:
4181 ASSERT(info->i.type != CATCH_FRAME);
4182 ASSERT(info->i.type != STOP_FRAME);
4189 /* -----------------------------------------------------------------------------
4190 resurrectThreads is called after garbage collection on the list of
4191 threads found to be garbage. Each of these threads will be woken
4192 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4193 on an MVar, or NonTermination if the thread was blocked on a Black
4196 Locks: assumes we hold *all* the capabilities.
4197 -------------------------------------------------------------------------- */
4200 resurrectThreads (StgTSO *threads)
4205 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4206 next = tso->global_link;
4207 tso->global_link = all_threads;
4209 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4211 // Wake up the thread on the Capability it was last on
4214 switch (tso->why_blocked) {
4216 case BlockedOnException:
4217 /* Called by GC - sched_mutex lock is currently held. */
4218 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4220 case BlockedOnBlackHole:
4221 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4224 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4227 /* This might happen if the thread was blocked on a black hole
4228 * belonging to a thread that we've just woken up (raiseAsync
4229 * can wake up threads, remember...).
4233 barf("resurrectThreads: thread blocked in a strange way");
4238 /* ----------------------------------------------------------------------------
4239 * Debugging: why is a thread blocked
4240 * [Also provides useful information when debugging threaded programs
4241 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4242 ------------------------------------------------------------------------- */
4246 printThreadBlockage(StgTSO *tso)
4248 switch (tso->why_blocked) {
4250 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4252 case BlockedOnWrite:
4253 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4255 #if defined(mingw32_HOST_OS)
4256 case BlockedOnDoProc:
4257 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4260 case BlockedOnDelay:
4261 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4264 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4266 case BlockedOnException:
4267 debugBelch("is blocked on delivering an exception to thread %d",
4268 tso->block_info.tso->id);
4270 case BlockedOnBlackHole:
4271 debugBelch("is blocked on a black hole");
4274 debugBelch("is not blocked");
4276 #if defined(PARALLEL_HASKELL)
4278 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4279 tso->block_info.closure, info_type(tso->block_info.closure));
4281 case BlockedOnGA_NoSend:
4282 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4283 tso->block_info.closure, info_type(tso->block_info.closure));
4286 case BlockedOnCCall:
4287 debugBelch("is blocked on an external call");
4289 case BlockedOnCCall_NoUnblockExc:
4290 debugBelch("is blocked on an external call (exceptions were already blocked)");
4293 debugBelch("is blocked on an STM operation");
4296 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4297 tso->why_blocked, tso->id, tso);
4302 printThreadStatus(StgTSO *t)
4304 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4306 void *label = lookupThreadLabel(t->id);
4307 if (label) debugBelch("[\"%s\"] ",(char *)label);
4309 if (t->what_next == ThreadRelocated) {
4310 debugBelch("has been relocated...\n");
4312 switch (t->what_next) {
4314 debugBelch("has been killed");
4316 case ThreadComplete:
4317 debugBelch("has completed");
4320 printThreadBlockage(t);
4327 printAllThreads(void)
4334 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4335 ullong_format_string(TIME_ON_PROC(CurrentProc),
4336 time_string, rtsFalse/*no commas!*/);
4338 debugBelch("all threads at [%s]:\n", time_string);
4339 # elif defined(PARALLEL_HASKELL)
4340 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4341 ullong_format_string(CURRENT_TIME,
4342 time_string, rtsFalse/*no commas!*/);
4344 debugBelch("all threads at [%s]:\n", time_string);
4346 debugBelch("all threads:\n");
4349 for (i = 0; i < n_capabilities; i++) {
4350 cap = &capabilities[i];
4351 debugBelch("threads on capability %d:\n", cap->no);
4352 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4353 printThreadStatus(t);
4357 debugBelch("other threads:\n");
4358 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4359 if (t->why_blocked != NotBlocked) {
4360 printThreadStatus(t);
4362 if (t->what_next == ThreadRelocated) {
4365 next = t->global_link;
4372 printThreadQueue(StgTSO *t)
4375 for (; t != END_TSO_QUEUE; t = t->link) {
4376 printThreadStatus(t);
4379 debugBelch("%d threads on queue\n", i);
4383 Print a whole blocking queue attached to node (debugging only).
4385 # if defined(PARALLEL_HASKELL)
4387 print_bq (StgClosure *node)
4389 StgBlockingQueueElement *bqe;
4393 debugBelch("## BQ of closure %p (%s): ",
4394 node, info_type(node));
4396 /* should cover all closures that may have a blocking queue */
4397 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4398 get_itbl(node)->type == FETCH_ME_BQ ||
4399 get_itbl(node)->type == RBH ||
4400 get_itbl(node)->type == MVAR);
4402 ASSERT(node!=(StgClosure*)NULL); // sanity check
4404 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4408 Print a whole blocking queue starting with the element bqe.
4411 print_bqe (StgBlockingQueueElement *bqe)
4416 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4418 for (end = (bqe==END_BQ_QUEUE);
4419 !end; // iterate until bqe points to a CONSTR
4420 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4421 bqe = end ? END_BQ_QUEUE : bqe->link) {
4422 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4423 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4424 /* types of closures that may appear in a blocking queue */
4425 ASSERT(get_itbl(bqe)->type == TSO ||
4426 get_itbl(bqe)->type == BLOCKED_FETCH ||
4427 get_itbl(bqe)->type == CONSTR);
4428 /* only BQs of an RBH end with an RBH_Save closure */
4429 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4431 switch (get_itbl(bqe)->type) {
4433 debugBelch(" TSO %u (%x),",
4434 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4437 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4438 ((StgBlockedFetch *)bqe)->node,
4439 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4440 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4441 ((StgBlockedFetch *)bqe)->ga.weight);
4444 debugBelch(" %s (IP %p),",
4445 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4446 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4447 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4448 "RBH_Save_?"), get_itbl(bqe));
4451 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4452 info_type((StgClosure *)bqe)); // , node, info_type(node));
4458 # elif defined(GRAN)
4460 print_bq (StgClosure *node)
4462 StgBlockingQueueElement *bqe;
4463 PEs node_loc, tso_loc;
4466 /* should cover all closures that may have a blocking queue */
4467 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4468 get_itbl(node)->type == FETCH_ME_BQ ||
4469 get_itbl(node)->type == RBH);
4471 ASSERT(node!=(StgClosure*)NULL); // sanity check
4472 node_loc = where_is(node);
4474 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4475 node, info_type(node), node_loc);
4478 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4480 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4481 !end; // iterate until bqe points to a CONSTR
4482 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4483 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4484 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4485 /* types of closures that may appear in a blocking queue */
4486 ASSERT(get_itbl(bqe)->type == TSO ||
4487 get_itbl(bqe)->type == CONSTR);
4488 /* only BQs of an RBH end with an RBH_Save closure */
4489 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4491 tso_loc = where_is((StgClosure *)bqe);
4492 switch (get_itbl(bqe)->type) {
4494 debugBelch(" TSO %d (%p) on [PE %d],",
4495 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4498 debugBelch(" %s (IP %p),",
4499 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4500 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4501 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4502 "RBH_Save_?"), get_itbl(bqe));
4505 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4506 info_type((StgClosure *)bqe), node, info_type(node));
4514 #if defined(PARALLEL_HASKELL)
4521 for (i=0, tso=run_queue_hd;
4522 tso != END_TSO_QUEUE;
4523 i++, tso=tso->link) {
4532 sched_belch(char *s, ...)
4537 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4538 #elif defined(PARALLEL_HASKELL)
4541 debugBelch("sched: ");