1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
75 /* -----------------------------------------------------------------------------
77 * -------------------------------------------------------------------------- */
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
85 In GranSim we have a runnable and a blocked queue for each processor.
86 In order to minimise code changes new arrays run_queue_hds/tls
87 are created. run_queue_hd is then a short cut (macro) for
88 run_queue_hds[CurrentProc] (see GranSim.h).
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95 the std RTS (i.e. we are cheating). However, we don't use this list in
96 the GranSim specific code at the moment (so we are only potentially
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
108 /* Threads blocked on blackholes.
109 * LOCK: sched_mutex+capability, or all capabilities
111 StgTSO *blackhole_queue = NULL;
114 /* The blackhole_queue should be checked for threads to wake up. See
115 * Schedule.h for more thorough comment.
116 * LOCK: none (doesn't matter if we miss an update)
118 rtsBool blackholes_need_checking = rtsFalse;
120 /* Linked list of all threads.
121 * Used for detecting garbage collected threads.
122 * LOCK: sched_mutex+capability, or all capabilities
124 StgTSO *all_threads = NULL;
126 /* flag set by signal handler to precipitate a context switch
127 * LOCK: none (just an advisory flag)
129 int context_switch = 0;
131 /* flag that tracks whether we have done any execution in this time slice.
132 * LOCK: currently none, perhaps we should lock (but needs to be
133 * updated in the fast path of the scheduler).
135 nat recent_activity = ACTIVITY_YES;
137 /* if this flag is set as well, give up execution
138 * LOCK: none (changes once, from false->true)
140 rtsBool sched_state = SCHED_RUNNING;
142 /* Next thread ID to allocate.
145 static StgThreadID next_thread_id = 1;
147 /* The smallest stack size that makes any sense is:
148 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
149 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
150 * + 1 (the closure to enter)
152 * + 1 (spare slot req'd by stg_ap_v_ret)
154 * A thread with this stack will bomb immediately with a stack
155 * overflow, which will increase its stack size.
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
163 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
164 * exists - earlier gccs apparently didn't.
170 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171 * in an MT setting, needed to signal that a worker thread shouldn't hang around
172 * in the scheduler when it is out of work.
174 rtsBool shutting_down_scheduler = rtsFalse;
177 * This mutex protects most of the global scheduler data in
178 * the THREADED_RTS runtime.
180 #if defined(THREADED_RTS)
184 #if defined(PARALLEL_HASKELL)
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
190 /* -----------------------------------------------------------------------------
191 * static function prototypes
192 * -------------------------------------------------------------------------- */
194 static Capability *schedule (Capability *initialCapability, Task *task);
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
208 static void scheduleCheckBlackHoles (Capability *cap);
209 static void scheduleDetectDeadlock (Capability *cap, Task *task);
211 static StgTSO *scheduleProcessEvent(rtsEvent *event);
213 #if defined(PARALLEL_HASKELL)
214 static StgTSO *scheduleSendPendingMessages(void);
215 static void scheduleActivateSpark(void);
216 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
218 #if defined(PAR) || defined(GRAN)
219 static void scheduleGranParReport(void);
221 static void schedulePostRunThread(void);
222 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
223 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
225 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
226 nat prev_what_next );
227 static void scheduleHandleThreadBlocked( StgTSO *t );
228 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
230 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
231 static Capability *scheduleDoGC(Capability *cap, Task *task,
233 void (*get_roots)(evac_fn));
235 static void unblockThread(Capability *cap, StgTSO *tso);
236 static rtsBool checkBlackHoles(Capability *cap);
237 static void AllRoots(evac_fn evac);
239 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
241 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
242 rtsBool stop_at_atomically, StgPtr stop_here);
244 static void deleteThread (Capability *cap, StgTSO *tso);
245 static void deleteAllThreads (Capability *cap);
248 static void printThreadBlockage(StgTSO *tso);
249 static void printThreadStatus(StgTSO *tso);
250 void printThreadQueue(StgTSO *tso);
253 #if defined(PARALLEL_HASKELL)
254 StgTSO * createSparkThread(rtsSpark spark);
255 StgTSO * activateSpark (rtsSpark spark);
259 static char *whatNext_strs[] = {
269 /* -----------------------------------------------------------------------------
270 * Putting a thread on the run queue: different scheduling policies
271 * -------------------------------------------------------------------------- */
274 addToRunQueue( Capability *cap, StgTSO *t )
276 #if defined(PARALLEL_HASKELL)
277 if (RtsFlags.ParFlags.doFairScheduling) {
278 // this does round-robin scheduling; good for concurrency
279 appendToRunQueue(cap,t);
281 // this does unfair scheduling; good for parallelism
282 pushOnRunQueue(cap,t);
285 // this does round-robin scheduling; good for concurrency
286 appendToRunQueue(cap,t);
290 /* ---------------------------------------------------------------------------
291 Main scheduling loop.
293 We use round-robin scheduling, each thread returning to the
294 scheduler loop when one of these conditions is detected:
297 * timer expires (thread yields)
303 In a GranSim setup this loop iterates over the global event queue.
304 This revolves around the global event queue, which determines what
305 to do next. Therefore, it's more complicated than either the
306 concurrent or the parallel (GUM) setup.
309 GUM iterates over incoming messages.
310 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
311 and sends out a fish whenever it has nothing to do; in-between
312 doing the actual reductions (shared code below) it processes the
313 incoming messages and deals with delayed operations
314 (see PendingFetches).
315 This is not the ugliest code you could imagine, but it's bloody close.
317 ------------------------------------------------------------------------ */
320 schedule (Capability *initialCapability, Task *task)
324 StgThreadReturnCode ret;
327 #elif defined(PARALLEL_HASKELL)
330 rtsBool receivedFinish = rtsFalse;
332 nat tp_size, sp_size; // stats only
337 #if defined(THREADED_RTS)
338 rtsBool first = rtsTrue;
341 cap = initialCapability;
343 // Pre-condition: this task owns initialCapability.
344 // The sched_mutex is *NOT* held
345 // NB. on return, we still hold a capability.
348 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
349 task, initialCapability);
354 // -----------------------------------------------------------
355 // Scheduler loop starts here:
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION (!receivedFinish)
360 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
362 #define TERMINATION_CONDITION rtsTrue
365 while (TERMINATION_CONDITION) {
368 /* Choose the processor with the next event */
369 CurrentProc = event->proc;
370 CurrentTSO = event->tso;
373 #if defined(THREADED_RTS)
375 // don't yield the first time, we want a chance to run this
376 // thread for a bit, even if there are others banging at the
379 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
381 // Yield the capability to higher-priority tasks if necessary.
382 yieldCapability(&cap, task);
386 #if defined(THREADED_RTS)
387 schedulePushWork(cap,task);
390 // Check whether we have re-entered the RTS from Haskell without
391 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
393 if (cap->in_haskell) {
394 errorBelch("schedule: re-entered unsafely.\n"
395 " Perhaps a 'foreign import unsafe' should be 'safe'?");
396 stg_exit(EXIT_FAILURE);
399 // The interruption / shutdown sequence.
401 // In order to cleanly shut down the runtime, we want to:
402 // * make sure that all main threads return to their callers
403 // with the state 'Interrupted'.
404 // * clean up all OS threads assocated with the runtime
405 // * free all memory etc.
407 // So the sequence for ^C goes like this:
409 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
410 // arranges for some Capability to wake up
412 // * all threads in the system are halted, and the zombies are
413 // placed on the run queue for cleaning up. We acquire all
414 // the capabilities in order to delete the threads, this is
415 // done by scheduleDoGC() for convenience (because GC already
416 // needs to acquire all the capabilities). We can't kill
417 // threads involved in foreign calls.
419 // * somebody calls shutdownHaskell(), which calls exitScheduler()
421 // * sched_state := SCHED_SHUTTING_DOWN
423 // * all workers exit when the run queue on their capability
424 // drains. All main threads will also exit when their TSO
425 // reaches the head of the run queue and they can return.
427 // * eventually all Capabilities will shut down, and the RTS can
430 // * We might be left with threads blocked in foreign calls,
431 // we should really attempt to kill these somehow (TODO);
433 switch (sched_state) {
436 case SCHED_INTERRUPTING:
437 IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
438 #if defined(THREADED_RTS)
439 discardSparksCap(cap);
441 /* scheduleDoGC() deletes all the threads */
442 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
444 case SCHED_SHUTTING_DOWN:
445 IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
446 // If we are a worker, just exit. If we're a bound thread
447 // then we will exit below when we've removed our TSO from
449 if (task->tso == NULL && emptyRunQueue(cap)) {
454 barf("sched_state: %d", sched_state);
457 #if defined(THREADED_RTS)
458 // If the run queue is empty, take a spark and turn it into a thread.
460 if (emptyRunQueue(cap)) {
462 spark = findSpark(cap);
465 sched_belch("turning spark of closure %p into a thread",
466 (StgClosure *)spark));
467 createSparkThread(cap,spark);
471 #endif // THREADED_RTS
473 scheduleStartSignalHandlers(cap);
475 // Only check the black holes here if we've nothing else to do.
476 // During normal execution, the black hole list only gets checked
477 // at GC time, to avoid repeatedly traversing this possibly long
478 // list each time around the scheduler.
479 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
481 scheduleCheckWakeupThreads(cap);
483 scheduleCheckBlockedThreads(cap);
485 scheduleDetectDeadlock(cap,task);
486 #if defined(THREADED_RTS)
487 cap = task->cap; // reload cap, it might have changed
490 // Normally, the only way we can get here with no threads to
491 // run is if a keyboard interrupt received during
492 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
493 // Additionally, it is not fatal for the
494 // threaded RTS to reach here with no threads to run.
496 // win32: might be here due to awaitEvent() being abandoned
497 // as a result of a console event having been delivered.
498 if ( emptyRunQueue(cap) ) {
499 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
500 ASSERT(sched_state >= SCHED_INTERRUPTING);
502 continue; // nothing to do
505 #if defined(PARALLEL_HASKELL)
506 scheduleSendPendingMessages();
507 if (emptyRunQueue(cap) && scheduleActivateSpark())
511 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
514 /* If we still have no work we need to send a FISH to get a spark
516 if (emptyRunQueue(cap)) {
517 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
518 ASSERT(rtsFalse); // should not happen at the moment
520 // from here: non-empty run queue.
521 // TODO: merge above case with this, only one call processMessages() !
522 if (PacketsWaiting()) { /* process incoming messages, if
523 any pending... only in else
524 because getRemoteWork waits for
526 receivedFinish = processMessages();
531 scheduleProcessEvent(event);
535 // Get a thread to run
537 t = popRunQueue(cap);
539 #if defined(GRAN) || defined(PAR)
540 scheduleGranParReport(); // some kind of debuging output
542 // Sanity check the thread we're about to run. This can be
543 // expensive if there is lots of thread switching going on...
544 IF_DEBUG(sanity,checkTSO(t));
547 #if defined(THREADED_RTS)
548 // Check whether we can run this thread in the current task.
549 // If not, we have to pass our capability to the right task.
551 Task *bound = t->bound;
556 sched_belch("### Running thread %d in bound thread",
558 // yes, the Haskell thread is bound to the current native thread
561 sched_belch("### thread %d bound to another OS thread",
563 // no, bound to a different Haskell thread: pass to that thread
564 pushOnRunQueue(cap,t);
568 // The thread we want to run is unbound.
571 sched_belch("### this OS thread cannot run thread %d", t->id));
572 // no, the current native thread is bound to a different
573 // Haskell thread, so pass it to any worker thread
574 pushOnRunQueue(cap,t);
581 cap->r.rCurrentTSO = t;
583 /* context switches are initiated by the timer signal, unless
584 * the user specified "context switch as often as possible", with
587 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
588 && !emptyThreadQueues(cap)) {
594 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
595 (long)t->id, whatNext_strs[t->what_next]));
597 #if defined(PROFILING)
598 startHeapProfTimer();
601 // ----------------------------------------------------------------------
602 // Run the current thread
604 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
605 ASSERT(t->cap == cap);
607 prev_what_next = t->what_next;
609 errno = t->saved_errno;
610 cap->in_haskell = rtsTrue;
614 recent_activity = ACTIVITY_YES;
616 switch (prev_what_next) {
620 /* Thread already finished, return to scheduler. */
621 ret = ThreadFinished;
627 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
628 cap = regTableToCapability(r);
633 case ThreadInterpret:
634 cap = interpretBCO(cap);
639 barf("schedule: invalid what_next field");
642 cap->in_haskell = rtsFalse;
644 // The TSO might have moved, eg. if it re-entered the RTS and a GC
645 // happened. So find the new location:
646 t = cap->r.rCurrentTSO;
648 // We have run some Haskell code: there might be blackhole-blocked
649 // threads to wake up now.
650 // Lock-free test here should be ok, we're just setting a flag.
651 if ( blackhole_queue != END_TSO_QUEUE ) {
652 blackholes_need_checking = rtsTrue;
655 // And save the current errno in this thread.
656 // XXX: possibly bogus for SMP because this thread might already
657 // be running again, see code below.
658 t->saved_errno = errno;
660 #if defined(THREADED_RTS)
661 // If ret is ThreadBlocked, and this Task is bound to the TSO that
662 // blocked, we are in limbo - the TSO is now owned by whatever it
663 // is blocked on, and may in fact already have been woken up,
664 // perhaps even on a different Capability. It may be the case
665 // that task->cap != cap. We better yield this Capability
666 // immediately and return to normaility.
667 if (ret == ThreadBlocked) {
669 sched_belch("--<< thread %d (%s) stopped: blocked\n",
670 t->id, whatNext_strs[t->what_next]));
675 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
676 ASSERT(t->cap == cap);
678 // ----------------------------------------------------------------------
680 // Costs for the scheduler are assigned to CCS_SYSTEM
681 #if defined(PROFILING)
686 #if defined(THREADED_RTS)
687 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
688 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
689 IF_DEBUG(scheduler,debugBelch("sched: "););
692 schedulePostRunThread();
694 ready_to_gc = rtsFalse;
698 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
702 scheduleHandleStackOverflow(cap,task,t);
706 if (scheduleHandleYield(cap, t, prev_what_next)) {
707 // shortcut for switching between compiler/interpreter:
713 scheduleHandleThreadBlocked(t);
717 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
718 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
722 barf("schedule: invalid thread return code %d", (int)ret);
725 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
727 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
729 } /* end of while() */
731 IF_PAR_DEBUG(verbose,
732 debugBelch("== Leaving schedule() after having received Finish\n"));
735 /* ----------------------------------------------------------------------------
736 * Setting up the scheduler loop
737 * ------------------------------------------------------------------------- */
740 schedulePreLoop(void)
743 /* set up first event to get things going */
744 /* ToDo: assign costs for system setup and init MainTSO ! */
745 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
747 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
750 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
752 G_TSO(CurrentTSO, 5));
754 if (RtsFlags.GranFlags.Light) {
755 /* Save current time; GranSim Light only */
756 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
761 /* -----------------------------------------------------------------------------
764 * Push work to other Capabilities if we have some.
765 * -------------------------------------------------------------------------- */
767 #if defined(THREADED_RTS)
769 schedulePushWork(Capability *cap USED_IF_THREADS,
770 Task *task USED_IF_THREADS)
772 Capability *free_caps[n_capabilities], *cap0;
775 // migration can be turned off with +RTS -qg
776 if (!RtsFlags.ParFlags.migrate) return;
778 // Check whether we have more threads on our run queue, or sparks
779 // in our pool, that we could hand to another Capability.
780 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
781 && sparkPoolSizeCap(cap) < 2) {
785 // First grab as many free Capabilities as we can.
786 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
787 cap0 = &capabilities[i];
788 if (cap != cap0 && tryGrabCapability(cap0,task)) {
789 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
790 // it already has some work, we just grabbed it at
791 // the wrong moment. Or maybe it's deadlocked!
792 releaseCapability(cap0);
794 free_caps[n_free_caps++] = cap0;
799 // we now have n_free_caps free capabilities stashed in
800 // free_caps[]. Share our run queue equally with them. This is
801 // probably the simplest thing we could do; improvements we might
802 // want to do include:
804 // - giving high priority to moving relatively new threads, on
805 // the gournds that they haven't had time to build up a
806 // working set in the cache on this CPU/Capability.
808 // - giving low priority to moving long-lived threads
810 if (n_free_caps > 0) {
811 StgTSO *prev, *t, *next;
812 rtsBool pushed_to_all;
814 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
817 pushed_to_all = rtsFalse;
819 if (cap->run_queue_hd != END_TSO_QUEUE) {
820 prev = cap->run_queue_hd;
822 prev->link = END_TSO_QUEUE;
823 for (; t != END_TSO_QUEUE; t = next) {
825 t->link = END_TSO_QUEUE;
826 if (t->what_next == ThreadRelocated
827 || t->bound == task // don't move my bound thread
828 || tsoLocked(t)) { // don't move a locked thread
831 } else if (i == n_free_caps) {
832 pushed_to_all = rtsTrue;
838 IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
839 appendToRunQueue(free_caps[i],t);
840 if (t->bound) { t->bound->cap = free_caps[i]; }
841 t->cap = free_caps[i];
845 cap->run_queue_tl = prev;
848 // If there are some free capabilities that we didn't push any
849 // threads to, then try to push a spark to each one.
850 if (!pushed_to_all) {
852 // i is the next free capability to push to
853 for (; i < n_free_caps; i++) {
854 if (emptySparkPoolCap(free_caps[i])) {
855 spark = findSpark(cap);
857 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
858 newSpark(&(free_caps[i]->r), spark);
864 // release the capabilities
865 for (i = 0; i < n_free_caps; i++) {
866 task->cap = free_caps[i];
867 releaseCapability(free_caps[i]);
870 task->cap = cap; // reset to point to our Capability.
874 /* ----------------------------------------------------------------------------
875 * Start any pending signal handlers
876 * ------------------------------------------------------------------------- */
878 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
880 scheduleStartSignalHandlers(Capability *cap)
882 if (signals_pending()) { // safe outside the lock
883 startSignalHandlers(cap);
888 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
893 /* ----------------------------------------------------------------------------
894 * Check for blocked threads that can be woken up.
895 * ------------------------------------------------------------------------- */
898 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
900 #if !defined(THREADED_RTS)
902 // Check whether any waiting threads need to be woken up. If the
903 // run queue is empty, and there are no other tasks running, we
904 // can wait indefinitely for something to happen.
906 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
908 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
914 /* ----------------------------------------------------------------------------
915 * Check for threads woken up by other Capabilities
916 * ------------------------------------------------------------------------- */
919 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
921 #if defined(THREADED_RTS)
922 // Any threads that were woken up by other Capabilities get
923 // appended to our run queue.
924 if (!emptyWakeupQueue(cap)) {
925 ACQUIRE_LOCK(&cap->lock);
926 if (emptyRunQueue(cap)) {
927 cap->run_queue_hd = cap->wakeup_queue_hd;
928 cap->run_queue_tl = cap->wakeup_queue_tl;
930 cap->run_queue_tl->link = cap->wakeup_queue_hd;
931 cap->run_queue_tl = cap->wakeup_queue_tl;
933 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
934 RELEASE_LOCK(&cap->lock);
939 /* ----------------------------------------------------------------------------
940 * Check for threads blocked on BLACKHOLEs that can be woken up
941 * ------------------------------------------------------------------------- */
943 scheduleCheckBlackHoles (Capability *cap)
945 if ( blackholes_need_checking ) // check without the lock first
947 ACQUIRE_LOCK(&sched_mutex);
948 if ( blackholes_need_checking ) {
949 checkBlackHoles(cap);
950 blackholes_need_checking = rtsFalse;
952 RELEASE_LOCK(&sched_mutex);
956 /* ----------------------------------------------------------------------------
957 * Detect deadlock conditions and attempt to resolve them.
958 * ------------------------------------------------------------------------- */
961 scheduleDetectDeadlock (Capability *cap, Task *task)
964 #if defined(PARALLEL_HASKELL)
965 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
970 * Detect deadlock: when we have no threads to run, there are no
971 * threads blocked, waiting for I/O, or sleeping, and all the
972 * other tasks are waiting for work, we must have a deadlock of
975 if ( emptyThreadQueues(cap) )
977 #if defined(THREADED_RTS)
979 * In the threaded RTS, we only check for deadlock if there
980 * has been no activity in a complete timeslice. This means
981 * we won't eagerly start a full GC just because we don't have
982 * any threads to run currently.
984 if (recent_activity != ACTIVITY_INACTIVE) return;
987 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
989 // Garbage collection can release some new threads due to
990 // either (a) finalizers or (b) threads resurrected because
991 // they are unreachable and will therefore be sent an
992 // exception. Any threads thus released will be immediately
994 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
996 recent_activity = ACTIVITY_DONE_GC;
998 if ( !emptyRunQueue(cap) ) return;
1000 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
1001 /* If we have user-installed signal handlers, then wait
1002 * for signals to arrive rather then bombing out with a
1005 if ( anyUserHandlers() ) {
1007 sched_belch("still deadlocked, waiting for signals..."));
1011 if (signals_pending()) {
1012 startSignalHandlers(cap);
1015 // either we have threads to run, or we were interrupted:
1016 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1020 #if !defined(THREADED_RTS)
1021 /* Probably a real deadlock. Send the current main thread the
1022 * Deadlock exception.
1025 switch (task->tso->why_blocked) {
1027 case BlockedOnBlackHole:
1028 case BlockedOnException:
1030 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1033 barf("deadlock: main thread blocked in a strange way");
1041 /* ----------------------------------------------------------------------------
1042 * Process an event (GRAN only)
1043 * ------------------------------------------------------------------------- */
1047 scheduleProcessEvent(rtsEvent *event)
1051 if (RtsFlags.GranFlags.Light)
1052 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1054 /* adjust time based on time-stamp */
1055 if (event->time > CurrentTime[CurrentProc] &&
1056 event->evttype != ContinueThread)
1057 CurrentTime[CurrentProc] = event->time;
1059 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1060 if (!RtsFlags.GranFlags.Light)
1063 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1065 /* main event dispatcher in GranSim */
1066 switch (event->evttype) {
1067 /* Should just be continuing execution */
1068 case ContinueThread:
1069 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1070 /* ToDo: check assertion
1071 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1072 run_queue_hd != END_TSO_QUEUE);
1074 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1075 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1076 procStatus[CurrentProc]==Fetching) {
1077 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1078 CurrentTSO->id, CurrentTSO, CurrentProc);
1081 /* Ignore ContinueThreads for completed threads */
1082 if (CurrentTSO->what_next == ThreadComplete) {
1083 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1084 CurrentTSO->id, CurrentTSO, CurrentProc);
1087 /* Ignore ContinueThreads for threads that are being migrated */
1088 if (PROCS(CurrentTSO)==Nowhere) {
1089 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1090 CurrentTSO->id, CurrentTSO, CurrentProc);
1093 /* The thread should be at the beginning of the run queue */
1094 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1095 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1096 CurrentTSO->id, CurrentTSO, CurrentProc);
1097 break; // run the thread anyway
1100 new_event(proc, proc, CurrentTime[proc],
1102 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1104 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1105 break; // now actually run the thread; DaH Qu'vam yImuHbej
1108 do_the_fetchnode(event);
1109 goto next_thread; /* handle next event in event queue */
1112 do_the_globalblock(event);
1113 goto next_thread; /* handle next event in event queue */
1116 do_the_fetchreply(event);
1117 goto next_thread; /* handle next event in event queue */
1119 case UnblockThread: /* Move from the blocked queue to the tail of */
1120 do_the_unblock(event);
1121 goto next_thread; /* handle next event in event queue */
1123 case ResumeThread: /* Move from the blocked queue to the tail of */
1124 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1125 event->tso->gran.blocktime +=
1126 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1127 do_the_startthread(event);
1128 goto next_thread; /* handle next event in event queue */
1131 do_the_startthread(event);
1132 goto next_thread; /* handle next event in event queue */
1135 do_the_movethread(event);
1136 goto next_thread; /* handle next event in event queue */
1139 do_the_movespark(event);
1140 goto next_thread; /* handle next event in event queue */
1143 do_the_findwork(event);
1144 goto next_thread; /* handle next event in event queue */
1147 barf("Illegal event type %u\n", event->evttype);
1150 /* This point was scheduler_loop in the old RTS */
1152 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1154 TimeOfLastEvent = CurrentTime[CurrentProc];
1155 TimeOfNextEvent = get_time_of_next_event();
1156 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1157 // CurrentTSO = ThreadQueueHd;
1159 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1162 if (RtsFlags.GranFlags.Light)
1163 GranSimLight_leave_system(event, &ActiveTSO);
1165 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1168 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1170 /* in a GranSim setup the TSO stays on the run queue */
1172 /* Take a thread from the run queue. */
1173 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1176 debugBelch("GRAN: About to run current thread, which is\n");
1179 context_switch = 0; // turned on via GranYield, checking events and time slice
1182 DumpGranEvent(GR_SCHEDULE, t));
1184 procStatus[CurrentProc] = Busy;
1188 /* ----------------------------------------------------------------------------
1189 * Send pending messages (PARALLEL_HASKELL only)
1190 * ------------------------------------------------------------------------- */
1192 #if defined(PARALLEL_HASKELL)
1194 scheduleSendPendingMessages(void)
1200 # if defined(PAR) // global Mem.Mgmt., omit for now
1201 if (PendingFetches != END_BF_QUEUE) {
1206 if (RtsFlags.ParFlags.BufferTime) {
1207 // if we use message buffering, we must send away all message
1208 // packets which have become too old...
1214 /* ----------------------------------------------------------------------------
1215 * Activate spark threads (PARALLEL_HASKELL only)
1216 * ------------------------------------------------------------------------- */
1218 #if defined(PARALLEL_HASKELL)
1220 scheduleActivateSpark(void)
1223 ASSERT(emptyRunQueue());
1224 /* We get here if the run queue is empty and want some work.
1225 We try to turn a spark into a thread, and add it to the run queue,
1226 from where it will be picked up in the next iteration of the scheduler
1230 /* :-[ no local threads => look out for local sparks */
1231 /* the spark pool for the current PE */
1232 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1233 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1234 pool->hd < pool->tl) {
1236 * ToDo: add GC code check that we really have enough heap afterwards!!
1238 * If we're here (no runnable threads) and we have pending
1239 * sparks, we must have a space problem. Get enough space
1240 * to turn one of those pending sparks into a
1244 spark = findSpark(rtsFalse); /* get a spark */
1245 if (spark != (rtsSpark) NULL) {
1246 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1247 IF_PAR_DEBUG(fish, // schedule,
1248 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1249 tso->id, tso, advisory_thread_count));
1251 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1252 IF_PAR_DEBUG(fish, // schedule,
1253 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1255 return rtsFalse; /* failed to generate a thread */
1256 } /* otherwise fall through & pick-up new tso */
1258 IF_PAR_DEBUG(fish, // schedule,
1259 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1260 spark_queue_len(pool)));
1261 return rtsFalse; /* failed to generate a thread */
1263 return rtsTrue; /* success in generating a thread */
1264 } else { /* no more threads permitted or pool empty */
1265 return rtsFalse; /* failed to generateThread */
1268 tso = NULL; // avoid compiler warning only
1269 return rtsFalse; /* dummy in non-PAR setup */
1272 #endif // PARALLEL_HASKELL
1274 /* ----------------------------------------------------------------------------
1275 * Get work from a remote node (PARALLEL_HASKELL only)
1276 * ------------------------------------------------------------------------- */
1278 #if defined(PARALLEL_HASKELL)
1280 scheduleGetRemoteWork(rtsBool *receivedFinish)
1282 ASSERT(emptyRunQueue());
1284 if (RtsFlags.ParFlags.BufferTime) {
1285 IF_PAR_DEBUG(verbose,
1286 debugBelch("...send all pending data,"));
1289 for (i=1; i<=nPEs; i++)
1290 sendImmediately(i); // send all messages away immediately
1294 //++EDEN++ idle() , i.e. send all buffers, wait for work
1295 // suppress fishing in EDEN... just look for incoming messages
1296 // (blocking receive)
1297 IF_PAR_DEBUG(verbose,
1298 debugBelch("...wait for incoming messages...\n"));
1299 *receivedFinish = processMessages(); // blocking receive...
1301 // and reenter scheduling loop after having received something
1302 // (return rtsFalse below)
1304 # else /* activate SPARKS machinery */
1305 /* We get here, if we have no work, tried to activate a local spark, but still
1306 have no work. We try to get a remote spark, by sending a FISH message.
1307 Thread migration should be added here, and triggered when a sequence of
1308 fishes returns without work. */
1309 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1311 /* =8-[ no local sparks => look for work on other PEs */
1313 * We really have absolutely no work. Send out a fish
1314 * (there may be some out there already), and wait for
1315 * something to arrive. We clearly can't run any threads
1316 * until a SCHEDULE or RESUME arrives, and so that's what
1317 * we're hoping to see. (Of course, we still have to
1318 * respond to other types of messages.)
1320 rtsTime now = msTime() /*CURRENT_TIME*/;
1321 IF_PAR_DEBUG(verbose,
1322 debugBelch("-- now=%ld\n", now));
1323 IF_PAR_DEBUG(fish, // verbose,
1324 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1325 (last_fish_arrived_at!=0 &&
1326 last_fish_arrived_at+delay > now)) {
1327 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1328 now, last_fish_arrived_at+delay,
1329 last_fish_arrived_at,
1333 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1334 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1335 if (last_fish_arrived_at==0 ||
1336 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1337 /* outstandingFishes is set in sendFish, processFish;
1338 avoid flooding system with fishes via delay */
1339 next_fish_to_send_at = 0;
1341 /* ToDo: this should be done in the main scheduling loop to avoid the
1342 busy wait here; not so bad if fish delay is very small */
1343 int iq = 0; // DEBUGGING -- HWL
1344 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1345 /* send a fish when ready, but process messages that arrive in the meantime */
1347 if (PacketsWaiting()) {
1349 *receivedFinish = processMessages();
1352 } while (!*receivedFinish || now<next_fish_to_send_at);
1353 // JB: This means the fish could become obsolete, if we receive
1354 // work. Better check for work again?
1355 // last line: while (!receivedFinish || !haveWork || now<...)
1356 // next line: if (receivedFinish || haveWork )
1358 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1359 return rtsFalse; // NB: this will leave scheduler loop
1360 // immediately after return!
1362 IF_PAR_DEBUG(fish, // verbose,
1363 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1367 // JB: IMHO, this should all be hidden inside sendFish(...)
1369 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1372 // Global statistics: count no. of fishes
1373 if (RtsFlags.ParFlags.ParStats.Global &&
1374 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1375 globalParStats.tot_fish_mess++;
1379 /* delayed fishes must have been sent by now! */
1380 next_fish_to_send_at = 0;
1383 *receivedFinish = processMessages();
1384 # endif /* SPARKS */
1387 /* NB: this function always returns rtsFalse, meaning the scheduler
1388 loop continues with the next iteration;
1390 return code means success in finding work; we enter this function
1391 if there is no local work, thus have to send a fish which takes
1392 time until it arrives with work; in the meantime we should process
1393 messages in the main loop;
1396 #endif // PARALLEL_HASKELL
1398 /* ----------------------------------------------------------------------------
1399 * PAR/GRAN: Report stats & debugging info(?)
1400 * ------------------------------------------------------------------------- */
1402 #if defined(PAR) || defined(GRAN)
1404 scheduleGranParReport(void)
1406 ASSERT(run_queue_hd != END_TSO_QUEUE);
1408 /* Take a thread from the run queue, if we have work */
1409 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1411 /* If this TSO has got its outport closed in the meantime,
1412 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1413 * It has to be marked as TH_DEAD for this purpose.
1414 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1416 JB: TODO: investigate wether state change field could be nuked
1417 entirely and replaced by the normal tso state (whatnext
1418 field). All we want to do is to kill tsos from outside.
1421 /* ToDo: write something to the log-file
1422 if (RTSflags.ParFlags.granSimStats && !sameThread)
1423 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1427 /* the spark pool for the current PE */
1428 pool = &(cap.r.rSparks); // cap = (old) MainCap
1431 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1432 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1435 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1436 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1438 if (RtsFlags.ParFlags.ParStats.Full &&
1439 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1440 (emitSchedule || // forced emit
1441 (t && LastTSO && t->id != LastTSO->id))) {
1443 we are running a different TSO, so write a schedule event to log file
1444 NB: If we use fair scheduling we also have to write a deschedule
1445 event for LastTSO; with unfair scheduling we know that the
1446 previous tso has blocked whenever we switch to another tso, so
1447 we don't need it in GUM for now
1449 IF_PAR_DEBUG(fish, // schedule,
1450 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1452 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1453 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1454 emitSchedule = rtsFalse;
1459 /* ----------------------------------------------------------------------------
1460 * After running a thread...
1461 * ------------------------------------------------------------------------- */
1464 schedulePostRunThread(void)
1467 /* HACK 675: if the last thread didn't yield, make sure to print a
1468 SCHEDULE event to the log file when StgRunning the next thread, even
1469 if it is the same one as before */
1471 TimeOfLastYield = CURRENT_TIME;
1474 /* some statistics gathering in the parallel case */
1476 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1480 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1481 globalGranStats.tot_heapover++;
1483 globalParStats.tot_heapover++;
1490 DumpGranEvent(GR_DESCHEDULE, t));
1491 globalGranStats.tot_stackover++;
1494 // DumpGranEvent(GR_DESCHEDULE, t);
1495 globalParStats.tot_stackover++;
1499 case ThreadYielding:
1502 DumpGranEvent(GR_DESCHEDULE, t));
1503 globalGranStats.tot_yields++;
1506 // DumpGranEvent(GR_DESCHEDULE, t);
1507 globalParStats.tot_yields++;
1514 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1515 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1516 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1517 if (t->block_info.closure!=(StgClosure*)NULL)
1518 print_bq(t->block_info.closure);
1521 // ??? needed; should emit block before
1523 DumpGranEvent(GR_DESCHEDULE, t));
1524 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1527 ASSERT(procStatus[CurrentProc]==Busy ||
1528 ((procStatus[CurrentProc]==Fetching) &&
1529 (t->block_info.closure!=(StgClosure*)NULL)));
1530 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1531 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1532 procStatus[CurrentProc]==Fetching))
1533 procStatus[CurrentProc] = Idle;
1536 //++PAR++ blockThread() writes the event (change?)
1540 case ThreadFinished:
1544 barf("parGlobalStats: unknown return code");
1550 /* -----------------------------------------------------------------------------
1551 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1552 * -------------------------------------------------------------------------- */
1555 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1557 // did the task ask for a large block?
1558 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1559 // if so, get one and push it on the front of the nursery.
1563 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1566 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1567 (long)t->id, whatNext_strs[t->what_next], blocks));
1569 // don't do this if the nursery is (nearly) full, we'll GC first.
1570 if (cap->r.rCurrentNursery->link != NULL ||
1571 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1572 // if the nursery has only one block.
1575 bd = allocGroup( blocks );
1577 cap->r.rNursery->n_blocks += blocks;
1579 // link the new group into the list
1580 bd->link = cap->r.rCurrentNursery;
1581 bd->u.back = cap->r.rCurrentNursery->u.back;
1582 if (cap->r.rCurrentNursery->u.back != NULL) {
1583 cap->r.rCurrentNursery->u.back->link = bd;
1585 #if !defined(THREADED_RTS)
1586 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1587 g0s0 == cap->r.rNursery);
1589 cap->r.rNursery->blocks = bd;
1591 cap->r.rCurrentNursery->u.back = bd;
1593 // initialise it as a nursery block. We initialise the
1594 // step, gen_no, and flags field of *every* sub-block in
1595 // this large block, because this is easier than making
1596 // sure that we always find the block head of a large
1597 // block whenever we call Bdescr() (eg. evacuate() and
1598 // isAlive() in the GC would both have to do this, at
1602 for (x = bd; x < bd + blocks; x++) {
1603 x->step = cap->r.rNursery;
1609 // This assert can be a killer if the app is doing lots
1610 // of large block allocations.
1611 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1613 // now update the nursery to point to the new block
1614 cap->r.rCurrentNursery = bd;
1616 // we might be unlucky and have another thread get on the
1617 // run queue before us and steal the large block, but in that
1618 // case the thread will just end up requesting another large
1620 pushOnRunQueue(cap,t);
1621 return rtsFalse; /* not actually GC'ing */
1626 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1627 (long)t->id, whatNext_strs[t->what_next]));
1629 ASSERT(!is_on_queue(t,CurrentProc));
1630 #elif defined(PARALLEL_HASKELL)
1631 /* Currently we emit a DESCHEDULE event before GC in GUM.
1632 ToDo: either add separate event to distinguish SYSTEM time from rest
1633 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1634 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1635 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1636 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1637 emitSchedule = rtsTrue;
1641 pushOnRunQueue(cap,t);
1643 /* actual GC is done at the end of the while loop in schedule() */
1646 /* -----------------------------------------------------------------------------
1647 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1648 * -------------------------------------------------------------------------- */
1651 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1653 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1654 (long)t->id, whatNext_strs[t->what_next]));
1655 /* just adjust the stack for this thread, then pop it back
1659 /* enlarge the stack */
1660 StgTSO *new_t = threadStackOverflow(cap, t);
1662 /* The TSO attached to this Task may have moved, so update the
1665 if (task->tso == t) {
1668 pushOnRunQueue(cap,new_t);
1672 /* -----------------------------------------------------------------------------
1673 * Handle a thread that returned to the scheduler with ThreadYielding
1674 * -------------------------------------------------------------------------- */
1677 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1679 // Reset the context switch flag. We don't do this just before
1680 // running the thread, because that would mean we would lose ticks
1681 // during GC, which can lead to unfair scheduling (a thread hogs
1682 // the CPU because the tick always arrives during GC). This way
1683 // penalises threads that do a lot of allocation, but that seems
1684 // better than the alternative.
1687 /* put the thread back on the run queue. Then, if we're ready to
1688 * GC, check whether this is the last task to stop. If so, wake
1689 * up the GC thread. getThread will block during a GC until the
1693 if (t->what_next != prev_what_next) {
1694 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1695 (long)t->id, whatNext_strs[t->what_next]);
1697 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1698 (long)t->id, whatNext_strs[t->what_next]);
1703 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1705 ASSERT(t->link == END_TSO_QUEUE);
1707 // Shortcut if we're just switching evaluators: don't bother
1708 // doing stack squeezing (which can be expensive), just run the
1710 if (t->what_next != prev_what_next) {
1715 ASSERT(!is_on_queue(t,CurrentProc));
1718 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1719 checkThreadQsSanity(rtsTrue));
1723 addToRunQueue(cap,t);
1726 /* add a ContinueThread event to actually process the thread */
1727 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1729 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1731 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1738 /* -----------------------------------------------------------------------------
1739 * Handle a thread that returned to the scheduler with ThreadBlocked
1740 * -------------------------------------------------------------------------- */
1743 scheduleHandleThreadBlocked( StgTSO *t
1744 #if !defined(GRAN) && !defined(DEBUG)
1751 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1752 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1753 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1755 // ??? needed; should emit block before
1757 DumpGranEvent(GR_DESCHEDULE, t));
1758 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1761 ASSERT(procStatus[CurrentProc]==Busy ||
1762 ((procStatus[CurrentProc]==Fetching) &&
1763 (t->block_info.closure!=(StgClosure*)NULL)));
1764 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1765 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1766 procStatus[CurrentProc]==Fetching))
1767 procStatus[CurrentProc] = Idle;
1771 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1772 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1775 if (t->block_info.closure!=(StgClosure*)NULL)
1776 print_bq(t->block_info.closure));
1778 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1781 /* whatever we schedule next, we must log that schedule */
1782 emitSchedule = rtsTrue;
1786 // We don't need to do anything. The thread is blocked, and it
1787 // has tidied up its stack and placed itself on whatever queue
1788 // it needs to be on.
1790 #if !defined(THREADED_RTS)
1791 ASSERT(t->why_blocked != NotBlocked);
1792 // This might not be true under THREADED_RTS: we don't have
1793 // exclusive access to this TSO, so someone might have
1794 // woken it up by now. This actually happens: try
1795 // conc023 +RTS -N2.
1799 debugBelch("--<< thread %d (%s) stopped: ",
1800 t->id, whatNext_strs[t->what_next]);
1801 printThreadBlockage(t);
1804 /* Only for dumping event to log file
1805 ToDo: do I need this in GranSim, too?
1811 /* -----------------------------------------------------------------------------
1812 * Handle a thread that returned to the scheduler with ThreadFinished
1813 * -------------------------------------------------------------------------- */
1816 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1818 /* Need to check whether this was a main thread, and if so,
1819 * return with the return value.
1821 * We also end up here if the thread kills itself with an
1822 * uncaught exception, see Exception.cmm.
1824 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1825 t->id, whatNext_strs[t->what_next]));
1828 endThread(t, CurrentProc); // clean-up the thread
1829 #elif defined(PARALLEL_HASKELL)
1830 /* For now all are advisory -- HWL */
1831 //if(t->priority==AdvisoryPriority) ??
1832 advisory_thread_count--; // JB: Caution with this counter, buggy!
1835 if(t->dist.priority==RevalPriority)
1839 # if defined(EDENOLD)
1840 // the thread could still have an outport... (BUG)
1841 if (t->eden.outport != -1) {
1842 // delete the outport for the tso which has finished...
1843 IF_PAR_DEBUG(eden_ports,
1844 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1845 t->eden.outport, t->id));
1848 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1849 if (t->eden.epid != -1) {
1850 IF_PAR_DEBUG(eden_ports,
1851 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1852 t->id, t->eden.epid));
1853 removeTSOfromProcess(t);
1858 if (RtsFlags.ParFlags.ParStats.Full &&
1859 !RtsFlags.ParFlags.ParStats.Suppressed)
1860 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1862 // t->par only contains statistics: left out for now...
1864 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1865 t->id,t,t->par.sparkname));
1867 #endif // PARALLEL_HASKELL
1870 // Check whether the thread that just completed was a bound
1871 // thread, and if so return with the result.
1873 // There is an assumption here that all thread completion goes
1874 // through this point; we need to make sure that if a thread
1875 // ends up in the ThreadKilled state, that it stays on the run
1876 // queue so it can be dealt with here.
1881 if (t->bound != task) {
1882 #if !defined(THREADED_RTS)
1883 // Must be a bound thread that is not the topmost one. Leave
1884 // it on the run queue until the stack has unwound to the
1885 // point where we can deal with this. Leaving it on the run
1886 // queue also ensures that the garbage collector knows about
1887 // this thread and its return value (it gets dropped from the
1888 // all_threads list so there's no other way to find it).
1889 appendToRunQueue(cap,t);
1892 // this cannot happen in the threaded RTS, because a
1893 // bound thread can only be run by the appropriate Task.
1894 barf("finished bound thread that isn't mine");
1898 ASSERT(task->tso == t);
1900 if (t->what_next == ThreadComplete) {
1902 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1903 *(task->ret) = (StgClosure *)task->tso->sp[1];
1905 task->stat = Success;
1908 *(task->ret) = NULL;
1910 if (sched_state >= SCHED_INTERRUPTING) {
1911 task->stat = Interrupted;
1913 task->stat = Killed;
1917 removeThreadLabel((StgWord)task->tso->id);
1919 return rtsTrue; // tells schedule() to return
1925 /* -----------------------------------------------------------------------------
1926 * Perform a heap census, if PROFILING
1927 * -------------------------------------------------------------------------- */
1930 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1932 #if defined(PROFILING)
1933 // When we have +RTS -i0 and we're heap profiling, do a census at
1934 // every GC. This lets us get repeatable runs for debugging.
1935 if (performHeapProfile ||
1936 (RtsFlags.ProfFlags.profileInterval==0 &&
1937 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1939 // checking black holes is necessary before GC, otherwise
1940 // there may be threads that are unreachable except by the
1941 // blackhole queue, which the GC will consider to be
1943 scheduleCheckBlackHoles(&MainCapability);
1945 IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1946 GarbageCollect(GetRoots, rtsTrue);
1948 IF_DEBUG(scheduler, sched_belch("performing heap census"));
1951 performHeapProfile = rtsFalse;
1952 return rtsTrue; // true <=> we already GC'd
1958 /* -----------------------------------------------------------------------------
1959 * Perform a garbage collection if necessary
1960 * -------------------------------------------------------------------------- */
1963 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1964 rtsBool force_major, void (*get_roots)(evac_fn))
1968 static volatile StgWord waiting_for_gc;
1969 rtsBool was_waiting;
1974 // In order to GC, there must be no threads running Haskell code.
1975 // Therefore, the GC thread needs to hold *all* the capabilities,
1976 // and release them after the GC has completed.
1978 // This seems to be the simplest way: previous attempts involved
1979 // making all the threads with capabilities give up their
1980 // capabilities and sleep except for the *last* one, which
1981 // actually did the GC. But it's quite hard to arrange for all
1982 // the other tasks to sleep and stay asleep.
1985 was_waiting = cas(&waiting_for_gc, 0, 1);
1988 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1989 if (cap) yieldCapability(&cap,task);
1990 } while (waiting_for_gc);
1991 return cap; // NOTE: task->cap might have changed here
1994 for (i=0; i < n_capabilities; i++) {
1995 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1996 if (cap != &capabilities[i]) {
1997 Capability *pcap = &capabilities[i];
1998 // we better hope this task doesn't get migrated to
1999 // another Capability while we're waiting for this one.
2000 // It won't, because load balancing happens while we have
2001 // all the Capabilities, but even so it's a slightly
2002 // unsavoury invariant.
2005 waitForReturnCapability(&pcap, task);
2006 if (pcap != &capabilities[i]) {
2007 barf("scheduleDoGC: got the wrong capability");
2012 waiting_for_gc = rtsFalse;
2015 /* Kick any transactions which are invalid back to their
2016 * atomically frames. When next scheduled they will try to
2017 * commit, this commit will fail and they will retry.
2022 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2023 if (t->what_next == ThreadRelocated) {
2026 next = t->global_link;
2027 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2028 if (!stmValidateNestOfTransactions (t -> trec)) {
2029 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2031 // strip the stack back to the
2032 // ATOMICALLY_FRAME, aborting the (nested)
2033 // transaction, and saving the stack of any
2034 // partially-evaluated thunks on the heap.
2035 raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2038 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2046 // so this happens periodically:
2047 if (cap) scheduleCheckBlackHoles(cap);
2049 IF_DEBUG(scheduler, printAllThreads());
2052 * We now have all the capabilities; if we're in an interrupting
2053 * state, then we should take the opportunity to delete all the
2054 * threads in the system.
2056 if (sched_state >= SCHED_INTERRUPTING) {
2057 deleteAllThreads(&capabilities[0]);
2058 sched_state = SCHED_SHUTTING_DOWN;
2061 /* everybody back, start the GC.
2062 * Could do it in this thread, or signal a condition var
2063 * to do it in another thread. Either way, we need to
2064 * broadcast on gc_pending_cond afterward.
2066 #if defined(THREADED_RTS)
2067 IF_DEBUG(scheduler,sched_belch("doing GC"));
2069 GarbageCollect(get_roots, force_major);
2071 #if defined(THREADED_RTS)
2072 // release our stash of capabilities.
2073 for (i = 0; i < n_capabilities; i++) {
2074 if (cap != &capabilities[i]) {
2075 task->cap = &capabilities[i];
2076 releaseCapability(&capabilities[i]);
2087 /* add a ContinueThread event to continue execution of current thread */
2088 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2090 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2092 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2100 /* ---------------------------------------------------------------------------
2101 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2102 * used by Control.Concurrent for error checking.
2103 * ------------------------------------------------------------------------- */
2106 rtsSupportsBoundThreads(void)
2108 #if defined(THREADED_RTS)
2115 /* ---------------------------------------------------------------------------
2116 * isThreadBound(tso): check whether tso is bound to an OS thread.
2117 * ------------------------------------------------------------------------- */
2120 isThreadBound(StgTSO* tso USED_IF_THREADS)
2122 #if defined(THREADED_RTS)
2123 return (tso->bound != NULL);
2128 /* ---------------------------------------------------------------------------
2129 * Singleton fork(). Do not copy any running threads.
2130 * ------------------------------------------------------------------------- */
2132 #if !defined(mingw32_HOST_OS)
2133 #define FORKPROCESS_PRIMOP_SUPPORTED
2136 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2138 deleteThread_(Capability *cap, StgTSO *tso);
2141 forkProcess(HsStablePtr *entry
2142 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2147 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2153 #if defined(THREADED_RTS)
2154 if (RtsFlags.ParFlags.nNodes > 1) {
2155 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2156 stg_exit(EXIT_FAILURE);
2160 IF_DEBUG(scheduler,sched_belch("forking!"));
2162 // ToDo: for SMP, we should probably acquire *all* the capabilities
2167 if (pid) { // parent
2169 // just return the pid
2175 // Now, all OS threads except the thread that forked are
2176 // stopped. We need to stop all Haskell threads, including
2177 // those involved in foreign calls. Also we need to delete
2178 // all Tasks, because they correspond to OS threads that are
2181 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2182 if (t->what_next == ThreadRelocated) {
2185 next = t->global_link;
2186 // don't allow threads to catch the ThreadKilled
2187 // exception, but we do want to raiseAsync() because these
2188 // threads may be evaluating thunks that we need later.
2189 deleteThread_(cap,t);
2193 // Empty the run queue. It seems tempting to let all the
2194 // killed threads stay on the run queue as zombies to be
2195 // cleaned up later, but some of them correspond to bound
2196 // threads for which the corresponding Task does not exist.
2197 cap->run_queue_hd = END_TSO_QUEUE;
2198 cap->run_queue_tl = END_TSO_QUEUE;
2200 // Any suspended C-calling Tasks are no more, their OS threads
2202 cap->suspended_ccalling_tasks = NULL;
2204 // Empty the all_threads list. Otherwise, the garbage
2205 // collector may attempt to resurrect some of these threads.
2206 all_threads = END_TSO_QUEUE;
2208 // Wipe the task list, except the current Task.
2209 ACQUIRE_LOCK(&sched_mutex);
2210 for (task = all_tasks; task != NULL; task=task->all_link) {
2211 if (task != cap->running_task) {
2215 RELEASE_LOCK(&sched_mutex);
2217 #if defined(THREADED_RTS)
2218 // Wipe our spare workers list, they no longer exist. New
2219 // workers will be created if necessary.
2220 cap->spare_workers = NULL;
2221 cap->returning_tasks_hd = NULL;
2222 cap->returning_tasks_tl = NULL;
2225 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2226 rts_checkSchedStatus("forkProcess",cap);
2229 hs_exit(); // clean up and exit
2230 stg_exit(EXIT_SUCCESS);
2232 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2233 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2238 /* ---------------------------------------------------------------------------
2239 * Delete all the threads in the system
2240 * ------------------------------------------------------------------------- */
2243 deleteAllThreads ( Capability *cap )
2246 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2247 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2248 if (t->what_next == ThreadRelocated) {
2251 next = t->global_link;
2252 deleteThread(cap,t);
2256 // The run queue now contains a bunch of ThreadKilled threads. We
2257 // must not throw these away: the main thread(s) will be in there
2258 // somewhere, and the main scheduler loop has to deal with it.
2259 // Also, the run queue is the only thing keeping these threads from
2260 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2262 #if !defined(THREADED_RTS)
2263 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2264 ASSERT(sleeping_queue == END_TSO_QUEUE);
2268 /* -----------------------------------------------------------------------------
2269 Managing the suspended_ccalling_tasks list.
2270 Locks required: sched_mutex
2271 -------------------------------------------------------------------------- */
2274 suspendTask (Capability *cap, Task *task)
2276 ASSERT(task->next == NULL && task->prev == NULL);
2277 task->next = cap->suspended_ccalling_tasks;
2279 if (cap->suspended_ccalling_tasks) {
2280 cap->suspended_ccalling_tasks->prev = task;
2282 cap->suspended_ccalling_tasks = task;
2286 recoverSuspendedTask (Capability *cap, Task *task)
2289 task->prev->next = task->next;
2291 ASSERT(cap->suspended_ccalling_tasks == task);
2292 cap->suspended_ccalling_tasks = task->next;
2295 task->next->prev = task->prev;
2297 task->next = task->prev = NULL;
2300 /* ---------------------------------------------------------------------------
2301 * Suspending & resuming Haskell threads.
2303 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2304 * its capability before calling the C function. This allows another
2305 * task to pick up the capability and carry on running Haskell
2306 * threads. It also means that if the C call blocks, it won't lock
2309 * The Haskell thread making the C call is put to sleep for the
2310 * duration of the call, on the susepended_ccalling_threads queue. We
2311 * give out a token to the task, which it can use to resume the thread
2312 * on return from the C function.
2313 * ------------------------------------------------------------------------- */
2316 suspendThread (StgRegTable *reg)
2319 int saved_errno = errno;
2323 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2325 cap = regTableToCapability(reg);
2327 task = cap->running_task;
2328 tso = cap->r.rCurrentTSO;
2331 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2333 // XXX this might not be necessary --SDM
2334 tso->what_next = ThreadRunGHC;
2336 threadPaused(cap,tso);
2338 if(tso->blocked_exceptions == NULL) {
2339 tso->why_blocked = BlockedOnCCall;
2340 tso->blocked_exceptions = END_TSO_QUEUE;
2342 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2345 // Hand back capability
2346 task->suspended_tso = tso;
2348 ACQUIRE_LOCK(&cap->lock);
2350 suspendTask(cap,task);
2351 cap->in_haskell = rtsFalse;
2352 releaseCapability_(cap);
2354 RELEASE_LOCK(&cap->lock);
2356 #if defined(THREADED_RTS)
2357 /* Preparing to leave the RTS, so ensure there's a native thread/task
2358 waiting to take over.
2360 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2363 errno = saved_errno;
2368 resumeThread (void *task_)
2372 int saved_errno = errno;
2376 // Wait for permission to re-enter the RTS with the result.
2377 waitForReturnCapability(&cap,task);
2378 // we might be on a different capability now... but if so, our
2379 // entry on the suspended_ccalling_tasks list will also have been
2382 // Remove the thread from the suspended list
2383 recoverSuspendedTask(cap,task);
2385 tso = task->suspended_tso;
2386 task->suspended_tso = NULL;
2387 tso->link = END_TSO_QUEUE;
2388 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2390 if (tso->why_blocked == BlockedOnCCall) {
2391 awakenBlockedQueue(cap,tso->blocked_exceptions);
2392 tso->blocked_exceptions = NULL;
2395 /* Reset blocking status */
2396 tso->why_blocked = NotBlocked;
2398 cap->r.rCurrentTSO = tso;
2399 cap->in_haskell = rtsTrue;
2400 errno = saved_errno;
2402 /* We might have GC'd, mark the TSO dirty again */
2405 IF_DEBUG(sanity, checkTSO(tso));
2410 /* ---------------------------------------------------------------------------
2411 * Comparing Thread ids.
2413 * This is used from STG land in the implementation of the
2414 * instances of Eq/Ord for ThreadIds.
2415 * ------------------------------------------------------------------------ */
2418 cmp_thread(StgPtr tso1, StgPtr tso2)
2420 StgThreadID id1 = ((StgTSO *)tso1)->id;
2421 StgThreadID id2 = ((StgTSO *)tso2)->id;
2423 if (id1 < id2) return (-1);
2424 if (id1 > id2) return 1;
2428 /* ---------------------------------------------------------------------------
2429 * Fetching the ThreadID from an StgTSO.
2431 * This is used in the implementation of Show for ThreadIds.
2432 * ------------------------------------------------------------------------ */
2434 rts_getThreadId(StgPtr tso)
2436 return ((StgTSO *)tso)->id;
2441 labelThread(StgPtr tso, char *label)
2446 /* Caveat: Once set, you can only set the thread name to "" */
2447 len = strlen(label)+1;
2448 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2449 strncpy(buf,label,len);
2450 /* Update will free the old memory for us */
2451 updateThreadLabel(((StgTSO *)tso)->id,buf);
2455 /* ---------------------------------------------------------------------------
2456 Create a new thread.
2458 The new thread starts with the given stack size. Before the
2459 scheduler can run, however, this thread needs to have a closure
2460 (and possibly some arguments) pushed on its stack. See
2461 pushClosure() in Schedule.h.
2463 createGenThread() and createIOThread() (in SchedAPI.h) are
2464 convenient packaged versions of this function.
2466 currently pri (priority) is only used in a GRAN setup -- HWL
2467 ------------------------------------------------------------------------ */
2469 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2471 createThread(nat size, StgInt pri)
2474 createThread(Capability *cap, nat size)
2480 /* sched_mutex is *not* required */
2482 /* First check whether we should create a thread at all */
2483 #if defined(PARALLEL_HASKELL)
2484 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2485 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2487 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2488 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2489 return END_TSO_QUEUE;
2495 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2498 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2500 /* catch ridiculously small stack sizes */
2501 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2502 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2505 stack_size = size - TSO_STRUCT_SIZEW;
2507 tso = (StgTSO *)allocateLocal(cap, size);
2508 TICK_ALLOC_TSO(stack_size, 0);
2510 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2512 SET_GRAN_HDR(tso, ThisPE);
2515 // Always start with the compiled code evaluator
2516 tso->what_next = ThreadRunGHC;
2518 tso->why_blocked = NotBlocked;
2519 tso->blocked_exceptions = NULL;
2520 tso->flags = TSO_DIRTY;
2522 tso->saved_errno = 0;
2526 tso->stack_size = stack_size;
2527 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2529 tso->sp = (P_)&(tso->stack) + stack_size;
2531 tso->trec = NO_TREC;
2534 tso->prof.CCCS = CCS_MAIN;
2537 /* put a stop frame on the stack */
2538 tso->sp -= sizeofW(StgStopFrame);
2539 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2540 tso->link = END_TSO_QUEUE;
2544 /* uses more flexible routine in GranSim */
2545 insertThread(tso, CurrentProc);
2547 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2553 if (RtsFlags.GranFlags.GranSimStats.Full)
2554 DumpGranEvent(GR_START,tso);
2555 #elif defined(PARALLEL_HASKELL)
2556 if (RtsFlags.ParFlags.ParStats.Full)
2557 DumpGranEvent(GR_STARTQ,tso);
2558 /* HACk to avoid SCHEDULE
2562 /* Link the new thread on the global thread list.
2564 ACQUIRE_LOCK(&sched_mutex);
2565 tso->id = next_thread_id++; // while we have the mutex
2566 tso->global_link = all_threads;
2568 RELEASE_LOCK(&sched_mutex);
2571 tso->dist.priority = MandatoryPriority; //by default that is...
2575 tso->gran.pri = pri;
2577 tso->gran.magic = TSO_MAGIC; // debugging only
2579 tso->gran.sparkname = 0;
2580 tso->gran.startedat = CURRENT_TIME;
2581 tso->gran.exported = 0;
2582 tso->gran.basicblocks = 0;
2583 tso->gran.allocs = 0;
2584 tso->gran.exectime = 0;
2585 tso->gran.fetchtime = 0;
2586 tso->gran.fetchcount = 0;
2587 tso->gran.blocktime = 0;
2588 tso->gran.blockcount = 0;
2589 tso->gran.blockedat = 0;
2590 tso->gran.globalsparks = 0;
2591 tso->gran.localsparks = 0;
2592 if (RtsFlags.GranFlags.Light)
2593 tso->gran.clock = Now; /* local clock */
2595 tso->gran.clock = 0;
2597 IF_DEBUG(gran,printTSO(tso));
2598 #elif defined(PARALLEL_HASKELL)
2600 tso->par.magic = TSO_MAGIC; // debugging only
2602 tso->par.sparkname = 0;
2603 tso->par.startedat = CURRENT_TIME;
2604 tso->par.exported = 0;
2605 tso->par.basicblocks = 0;
2606 tso->par.allocs = 0;
2607 tso->par.exectime = 0;
2608 tso->par.fetchtime = 0;
2609 tso->par.fetchcount = 0;
2610 tso->par.blocktime = 0;
2611 tso->par.blockcount = 0;
2612 tso->par.blockedat = 0;
2613 tso->par.globalsparks = 0;
2614 tso->par.localsparks = 0;
2618 globalGranStats.tot_threads_created++;
2619 globalGranStats.threads_created_on_PE[CurrentProc]++;
2620 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2621 globalGranStats.tot_sq_probes++;
2622 #elif defined(PARALLEL_HASKELL)
2623 // collect parallel global statistics (currently done together with GC stats)
2624 if (RtsFlags.ParFlags.ParStats.Global &&
2625 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2626 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2627 globalParStats.tot_threads_created++;
2633 sched_belch("==__ schedule: Created TSO %d (%p);",
2634 CurrentProc, tso, tso->id));
2635 #elif defined(PARALLEL_HASKELL)
2636 IF_PAR_DEBUG(verbose,
2637 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2638 (long)tso->id, tso, advisory_thread_count));
2640 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2641 (long)tso->id, (long)tso->stack_size));
2648 all parallel thread creation calls should fall through the following routine.
2651 createThreadFromSpark(rtsSpark spark)
2653 ASSERT(spark != (rtsSpark)NULL);
2654 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2655 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2657 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2658 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2659 return END_TSO_QUEUE;
2663 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2664 if (tso==END_TSO_QUEUE)
2665 barf("createSparkThread: Cannot create TSO");
2667 tso->priority = AdvisoryPriority;
2669 pushClosure(tso,spark);
2671 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2678 Turn a spark into a thread.
2679 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2683 activateSpark (rtsSpark spark)
2687 tso = createSparkThread(spark);
2688 if (RtsFlags.ParFlags.ParStats.Full) {
2689 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2690 IF_PAR_DEBUG(verbose,
2691 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2692 (StgClosure *)spark, info_type((StgClosure *)spark)));
2694 // ToDo: fwd info on local/global spark to thread -- HWL
2695 // tso->gran.exported = spark->exported;
2696 // tso->gran.locked = !spark->global;
2697 // tso->gran.sparkname = spark->name;
2703 /* ---------------------------------------------------------------------------
2706 * scheduleThread puts a thread on the end of the runnable queue.
2707 * This will usually be done immediately after a thread is created.
2708 * The caller of scheduleThread must create the thread using e.g.
2709 * createThread and push an appropriate closure
2710 * on this thread's stack before the scheduler is invoked.
2711 * ------------------------------------------------------------------------ */
2714 scheduleThread(Capability *cap, StgTSO *tso)
2716 // The thread goes at the *end* of the run-queue, to avoid possible
2717 // starvation of any threads already on the queue.
2718 appendToRunQueue(cap,tso);
2722 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2724 #if defined(THREADED_RTS)
2725 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2726 // move this thread from now on.
2727 cpu %= RtsFlags.ParFlags.nNodes;
2728 if (cpu == cap->no) {
2729 appendToRunQueue(cap,tso);
2731 Capability *target_cap = &capabilities[cpu];
2733 tso->bound->cap = target_cap;
2735 tso->cap = target_cap;
2736 wakeupThreadOnCapability(target_cap,tso);
2739 appendToRunQueue(cap,tso);
2744 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2748 // We already created/initialised the Task
2749 task = cap->running_task;
2751 // This TSO is now a bound thread; make the Task and TSO
2752 // point to each other.
2758 task->stat = NoStatus;
2760 appendToRunQueue(cap,tso);
2762 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2765 /* GranSim specific init */
2766 CurrentTSO = m->tso; // the TSO to run
2767 procStatus[MainProc] = Busy; // status of main PE
2768 CurrentProc = MainProc; // PE to run it on
2771 cap = schedule(cap,task);
2773 ASSERT(task->stat != NoStatus);
2774 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2776 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2780 /* ----------------------------------------------------------------------------
2782 * ------------------------------------------------------------------------- */
2784 #if defined(THREADED_RTS)
2786 workerStart(Task *task)
2790 // See startWorkerTask().
2791 ACQUIRE_LOCK(&task->lock);
2793 RELEASE_LOCK(&task->lock);
2795 // set the thread-local pointer to the Task:
2798 // schedule() runs without a lock.
2799 cap = schedule(cap,task);
2801 // On exit from schedule(), we have a Capability.
2802 releaseCapability(cap);
2803 workerTaskStop(task);
2807 /* ---------------------------------------------------------------------------
2810 * Initialise the scheduler. This resets all the queues - if the
2811 * queues contained any threads, they'll be garbage collected at the
2814 * ------------------------------------------------------------------------ */
2821 for (i=0; i<=MAX_PROC; i++) {
2822 run_queue_hds[i] = END_TSO_QUEUE;
2823 run_queue_tls[i] = END_TSO_QUEUE;
2824 blocked_queue_hds[i] = END_TSO_QUEUE;
2825 blocked_queue_tls[i] = END_TSO_QUEUE;
2826 ccalling_threadss[i] = END_TSO_QUEUE;
2827 blackhole_queue[i] = END_TSO_QUEUE;
2828 sleeping_queue = END_TSO_QUEUE;
2830 #elif !defined(THREADED_RTS)
2831 blocked_queue_hd = END_TSO_QUEUE;
2832 blocked_queue_tl = END_TSO_QUEUE;
2833 sleeping_queue = END_TSO_QUEUE;
2836 blackhole_queue = END_TSO_QUEUE;
2837 all_threads = END_TSO_QUEUE;
2840 sched_state = SCHED_RUNNING;
2842 RtsFlags.ConcFlags.ctxtSwitchTicks =
2843 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2845 #if defined(THREADED_RTS)
2846 /* Initialise the mutex and condition variables used by
2848 initMutex(&sched_mutex);
2851 ACQUIRE_LOCK(&sched_mutex);
2853 /* A capability holds the state a native thread needs in
2854 * order to execute STG code. At least one capability is
2855 * floating around (only THREADED_RTS builds have more than one).
2861 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2865 #if defined(THREADED_RTS)
2867 * Eagerly start one worker to run each Capability, except for
2868 * Capability 0. The idea is that we're probably going to start a
2869 * bound thread on Capability 0 pretty soon, so we don't want a
2870 * worker task hogging it.
2875 for (i = 1; i < n_capabilities; i++) {
2876 cap = &capabilities[i];
2877 ACQUIRE_LOCK(&cap->lock);
2878 startWorkerTask(cap, workerStart);
2879 RELEASE_LOCK(&cap->lock);
2884 RELEASE_LOCK(&sched_mutex);
2888 exitScheduler( void )
2892 #if defined(THREADED_RTS)
2893 ACQUIRE_LOCK(&sched_mutex);
2894 task = newBoundTask();
2895 RELEASE_LOCK(&sched_mutex);
2898 // If we haven't killed all the threads yet, do it now.
2899 if (sched_state < SCHED_SHUTTING_DOWN) {
2900 sched_state = SCHED_INTERRUPTING;
2901 scheduleDoGC(NULL,task,rtsFalse,GetRoots);
2903 sched_state = SCHED_SHUTTING_DOWN;
2905 #if defined(THREADED_RTS)
2909 for (i = 0; i < n_capabilities; i++) {
2910 shutdownCapability(&capabilities[i], task);
2912 boundTaskExiting(task);
2918 /* ---------------------------------------------------------------------------
2919 Where are the roots that we know about?
2921 - all the threads on the runnable queue
2922 - all the threads on the blocked queue
2923 - all the threads on the sleeping queue
2924 - all the thread currently executing a _ccall_GC
2925 - all the "main threads"
2927 ------------------------------------------------------------------------ */
2929 /* This has to be protected either by the scheduler monitor, or by the
2930 garbage collection monitor (probably the latter).
2935 GetRoots( evac_fn evac )
2942 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2943 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2944 evac((StgClosure **)&run_queue_hds[i]);
2945 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2946 evac((StgClosure **)&run_queue_tls[i]);
2948 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2949 evac((StgClosure **)&blocked_queue_hds[i]);
2950 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2951 evac((StgClosure **)&blocked_queue_tls[i]);
2952 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2953 evac((StgClosure **)&ccalling_threads[i]);
2960 for (i = 0; i < n_capabilities; i++) {
2961 cap = &capabilities[i];
2962 evac((StgClosure **)(void *)&cap->run_queue_hd);
2963 evac((StgClosure **)(void *)&cap->run_queue_tl);
2964 #if defined(THREADED_RTS)
2965 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2966 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2968 for (task = cap->suspended_ccalling_tasks; task != NULL;
2970 IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2971 evac((StgClosure **)(void *)&task->suspended_tso);
2977 #if !defined(THREADED_RTS)
2978 evac((StgClosure **)(void *)&blocked_queue_hd);
2979 evac((StgClosure **)(void *)&blocked_queue_tl);
2980 evac((StgClosure **)(void *)&sleeping_queue);
2984 // evac((StgClosure **)&blackhole_queue);
2986 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2987 markSparkQueue(evac);
2990 #if defined(RTS_USER_SIGNALS)
2991 // mark the signal handlers (signals should be already blocked)
2992 markSignalHandlers(evac);
2996 /* -----------------------------------------------------------------------------
2999 This is the interface to the garbage collector from Haskell land.
3000 We provide this so that external C code can allocate and garbage
3001 collect when called from Haskell via _ccall_GC.
3003 It might be useful to provide an interface whereby the programmer
3004 can specify more roots (ToDo).
3006 This needs to be protected by the GC condition variable above. KH.
3007 -------------------------------------------------------------------------- */
3009 static void (*extra_roots)(evac_fn);
3012 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3015 // We must grab a new Task here, because the existing Task may be
3016 // associated with a particular Capability, and chained onto the
3017 // suspended_ccalling_tasks queue.
3018 ACQUIRE_LOCK(&sched_mutex);
3019 task = newBoundTask();
3020 RELEASE_LOCK(&sched_mutex);
3021 scheduleDoGC(NULL,task,force_major, get_roots);
3022 boundTaskExiting(task);
3028 performGC_(rtsFalse, GetRoots);
3032 performMajorGC(void)
3034 performGC_(rtsTrue, GetRoots);
3038 AllRoots(evac_fn evac)
3040 GetRoots(evac); // the scheduler's roots
3041 extra_roots(evac); // the user's roots
3045 performGCWithRoots(void (*get_roots)(evac_fn))
3047 extra_roots = get_roots;
3048 performGC_(rtsFalse, AllRoots);
3051 /* -----------------------------------------------------------------------------
3054 If the thread has reached its maximum stack size, then raise the
3055 StackOverflow exception in the offending thread. Otherwise
3056 relocate the TSO into a larger chunk of memory and adjust its stack
3058 -------------------------------------------------------------------------- */
3061 threadStackOverflow(Capability *cap, StgTSO *tso)
3063 nat new_stack_size, stack_words;
3068 IF_DEBUG(sanity,checkTSO(tso));
3069 if (tso->stack_size >= tso->max_stack_size) {
3072 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3073 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3074 /* If we're debugging, just print out the top of the stack */
3075 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
3078 /* Send this thread the StackOverflow exception */
3079 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3083 /* Try to double the current stack size. If that takes us over the
3084 * maximum stack size for this thread, then use the maximum instead.
3085 * Finally round up so the TSO ends up as a whole number of blocks.
3087 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3088 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
3089 TSO_STRUCT_SIZE)/sizeof(W_);
3090 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
3091 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3093 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3095 dest = (StgTSO *)allocate(new_tso_size);
3096 TICK_ALLOC_TSO(new_stack_size,0);
3098 /* copy the TSO block and the old stack into the new area */
3099 memcpy(dest,tso,TSO_STRUCT_SIZE);
3100 stack_words = tso->stack + tso->stack_size - tso->sp;
3101 new_sp = (P_)dest + new_tso_size - stack_words;
3102 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3104 /* relocate the stack pointers... */
3106 dest->stack_size = new_stack_size;
3108 /* Mark the old TSO as relocated. We have to check for relocated
3109 * TSOs in the garbage collector and any primops that deal with TSOs.
3111 * It's important to set the sp value to just beyond the end
3112 * of the stack, so we don't attempt to scavenge any part of the
3115 tso->what_next = ThreadRelocated;
3117 tso->sp = (P_)&(tso->stack[tso->stack_size]);
3118 tso->why_blocked = NotBlocked;
3120 IF_PAR_DEBUG(verbose,
3121 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3122 tso->id, tso, tso->stack_size);
3123 /* If we're debugging, just print out the top of the stack */
3124 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
3127 IF_DEBUG(sanity,checkTSO(tso));
3129 IF_DEBUG(scheduler,printTSO(dest));
3135 /* ---------------------------------------------------------------------------
3136 Wake up a queue that was blocked on some resource.
3137 ------------------------------------------------------------------------ */
3141 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3144 #elif defined(PARALLEL_HASKELL)
3146 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3148 /* write RESUME events to log file and
3149 update blocked and fetch time (depending on type of the orig closure) */
3150 if (RtsFlags.ParFlags.ParStats.Full) {
3151 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
3152 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3153 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3154 if (emptyRunQueue())
3155 emitSchedule = rtsTrue;
3157 switch (get_itbl(node)->type) {
3159 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3164 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3171 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3178 StgBlockingQueueElement *
3179 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3182 PEs node_loc, tso_loc;
3184 node_loc = where_is(node); // should be lifted out of loop
3185 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3186 tso_loc = where_is((StgClosure *)tso);
3187 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3188 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3189 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3190 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3191 // insertThread(tso, node_loc);
3192 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3194 tso, node, (rtsSpark*)NULL);
3195 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3198 } else { // TSO is remote (actually should be FMBQ)
3199 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3200 RtsFlags.GranFlags.Costs.gunblocktime +
3201 RtsFlags.GranFlags.Costs.latency;
3202 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3204 tso, node, (rtsSpark*)NULL);
3205 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3208 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3210 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3211 (node_loc==tso_loc ? "Local" : "Global"),
3212 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3213 tso->block_info.closure = NULL;
3214 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3217 #elif defined(PARALLEL_HASKELL)
3218 StgBlockingQueueElement *
3219 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3221 StgBlockingQueueElement *next;
3223 switch (get_itbl(bqe)->type) {
3225 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3226 /* if it's a TSO just push it onto the run_queue */
3228 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3229 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3231 unblockCount(bqe, node);
3232 /* reset blocking status after dumping event */
3233 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3237 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3239 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3240 PendingFetches = (StgBlockedFetch *)bqe;
3244 /* can ignore this case in a non-debugging setup;
3245 see comments on RBHSave closures above */
3247 /* check that the closure is an RBHSave closure */
3248 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3249 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3250 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3254 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3255 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3259 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3265 unblockOne(Capability *cap, StgTSO *tso)
3269 ASSERT(get_itbl(tso)->type == TSO);
3270 ASSERT(tso->why_blocked != NotBlocked);
3272 tso->why_blocked = NotBlocked;
3274 tso->link = END_TSO_QUEUE;
3276 #if defined(THREADED_RTS)
3277 if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3278 // We are waking up this thread on the current Capability, which
3279 // might involve migrating it from the Capability it was last on.
3281 ASSERT(tso->bound->cap == tso->cap);
3282 tso->bound->cap = cap;
3285 appendToRunQueue(cap,tso);
3286 // we're holding a newly woken thread, make sure we context switch
3287 // quickly so we can migrate it if necessary.
3290 // we'll try to wake it up on the Capability it was last on.
3291 wakeupThreadOnCapability(tso->cap, tso);
3294 appendToRunQueue(cap,tso);
3298 IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
3305 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3307 StgBlockingQueueElement *bqe;
3312 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3313 node, CurrentProc, CurrentTime[CurrentProc],
3314 CurrentTSO->id, CurrentTSO));
3316 node_loc = where_is(node);
3318 ASSERT(q == END_BQ_QUEUE ||
3319 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3320 get_itbl(q)->type == CONSTR); // closure (type constructor)
3321 ASSERT(is_unique(node));
3323 /* FAKE FETCH: magically copy the node to the tso's proc;
3324 no Fetch necessary because in reality the node should not have been
3325 moved to the other PE in the first place
3327 if (CurrentProc!=node_loc) {
3329 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3330 node, node_loc, CurrentProc, CurrentTSO->id,
3331 // CurrentTSO, where_is(CurrentTSO),
3332 node->header.gran.procs));
3333 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3335 debugBelch("## new bitmask of node %p is %#x\n",
3336 node, node->header.gran.procs));
3337 if (RtsFlags.GranFlags.GranSimStats.Global) {
3338 globalGranStats.tot_fake_fetches++;
3343 // ToDo: check: ASSERT(CurrentProc==node_loc);
3344 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3347 bqe points to the current element in the queue
3348 next points to the next element in the queue
3350 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3351 //tso_loc = where_is(tso);
3353 bqe = unblockOne(bqe, node);
3356 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3357 the closure to make room for the anchor of the BQ */
3358 if (bqe!=END_BQ_QUEUE) {
3359 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3361 ASSERT((info_ptr==&RBH_Save_0_info) ||
3362 (info_ptr==&RBH_Save_1_info) ||
3363 (info_ptr==&RBH_Save_2_info));
3365 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3366 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3367 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3370 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3371 node, info_type(node)));
3374 /* statistics gathering */
3375 if (RtsFlags.GranFlags.GranSimStats.Global) {
3376 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3377 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3378 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3379 globalGranStats.tot_awbq++; // total no. of bqs awakened
3382 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3383 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3385 #elif defined(PARALLEL_HASKELL)
3387 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3389 StgBlockingQueueElement *bqe;
3391 IF_PAR_DEBUG(verbose,
3392 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3396 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3397 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3402 ASSERT(q == END_BQ_QUEUE ||
3403 get_itbl(q)->type == TSO ||
3404 get_itbl(q)->type == BLOCKED_FETCH ||
3405 get_itbl(q)->type == CONSTR);
3408 while (get_itbl(bqe)->type==TSO ||
3409 get_itbl(bqe)->type==BLOCKED_FETCH) {
3410 bqe = unblockOne(bqe, node);
3414 #else /* !GRAN && !PARALLEL_HASKELL */
3417 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3419 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3421 while (tso != END_TSO_QUEUE) {
3422 tso = unblockOne(cap,tso);
3427 /* ---------------------------------------------------------------------------
3429 - usually called inside a signal handler so it mustn't do anything fancy.
3430 ------------------------------------------------------------------------ */
3433 interruptStgRts(void)
3435 sched_state = SCHED_INTERRUPTING;
3437 #if defined(THREADED_RTS)
3438 prodAllCapabilities();
3442 /* -----------------------------------------------------------------------------
3445 This is for use when we raise an exception in another thread, which
3447 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3448 -------------------------------------------------------------------------- */
3450 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3452 NB: only the type of the blocking queue is different in GranSim and GUM
3453 the operations on the queue-elements are the same
3454 long live polymorphism!
3456 Locks: sched_mutex is held upon entry and exit.
3460 unblockThread(Capability *cap, StgTSO *tso)
3462 StgBlockingQueueElement *t, **last;
3464 switch (tso->why_blocked) {
3467 return; /* not blocked */
3470 // Be careful: nothing to do here! We tell the scheduler that the thread
3471 // is runnable and we leave it to the stack-walking code to abort the
3472 // transaction while unwinding the stack. We should perhaps have a debugging
3473 // test to make sure that this really happens and that the 'zombie' transaction
3474 // does not get committed.
3478 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3480 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3481 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3483 last = (StgBlockingQueueElement **)&mvar->head;
3484 for (t = (StgBlockingQueueElement *)mvar->head;
3486 last = &t->link, last_tso = t, t = t->link) {
3487 if (t == (StgBlockingQueueElement *)tso) {
3488 *last = (StgBlockingQueueElement *)tso->link;
3489 if (mvar->tail == tso) {
3490 mvar->tail = (StgTSO *)last_tso;
3495 barf("unblockThread (MVAR): TSO not found");
3498 case BlockedOnBlackHole:
3499 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3501 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3503 last = &bq->blocking_queue;
3504 for (t = bq->blocking_queue;
3506 last = &t->link, t = t->link) {
3507 if (t == (StgBlockingQueueElement *)tso) {
3508 *last = (StgBlockingQueueElement *)tso->link;
3512 barf("unblockThread (BLACKHOLE): TSO not found");
3515 case BlockedOnException:
3517 StgTSO *target = tso->block_info.tso;
3519 ASSERT(get_itbl(target)->type == TSO);
3521 if (target->what_next == ThreadRelocated) {
3522 target = target->link;
3523 ASSERT(get_itbl(target)->type == TSO);
3526 ASSERT(target->blocked_exceptions != NULL);
3528 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3529 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3531 last = &t->link, t = t->link) {
3532 ASSERT(get_itbl(t)->type == TSO);
3533 if (t == (StgBlockingQueueElement *)tso) {
3534 *last = (StgBlockingQueueElement *)tso->link;
3538 barf("unblockThread (Exception): TSO not found");
3542 case BlockedOnWrite:
3543 #if defined(mingw32_HOST_OS)
3544 case BlockedOnDoProc:
3547 /* take TSO off blocked_queue */
3548 StgBlockingQueueElement *prev = NULL;
3549 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3550 prev = t, t = t->link) {
3551 if (t == (StgBlockingQueueElement *)tso) {
3553 blocked_queue_hd = (StgTSO *)t->link;
3554 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3555 blocked_queue_tl = END_TSO_QUEUE;
3558 prev->link = t->link;
3559 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3560 blocked_queue_tl = (StgTSO *)prev;
3563 #if defined(mingw32_HOST_OS)
3564 /* (Cooperatively) signal that the worker thread should abort
3567 abandonWorkRequest(tso->block_info.async_result->reqID);
3572 barf("unblockThread (I/O): TSO not found");
3575 case BlockedOnDelay:
3577 /* take TSO off sleeping_queue */
3578 StgBlockingQueueElement *prev = NULL;
3579 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3580 prev = t, t = t->link) {
3581 if (t == (StgBlockingQueueElement *)tso) {
3583 sleeping_queue = (StgTSO *)t->link;
3585 prev->link = t->link;
3590 barf("unblockThread (delay): TSO not found");
3594 barf("unblockThread");
3598 tso->link = END_TSO_QUEUE;
3599 tso->why_blocked = NotBlocked;
3600 tso->block_info.closure = NULL;
3601 pushOnRunQueue(cap,tso);
3605 unblockThread(Capability *cap, StgTSO *tso)
3609 /* To avoid locking unnecessarily. */
3610 if (tso->why_blocked == NotBlocked) {
3614 switch (tso->why_blocked) {
3617 // Be careful: nothing to do here! We tell the scheduler that the thread
3618 // is runnable and we leave it to the stack-walking code to abort the
3619 // transaction while unwinding the stack. We should perhaps have a debugging
3620 // test to make sure that this really happens and that the 'zombie' transaction
3621 // does not get committed.
3625 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3627 StgTSO *last_tso = END_TSO_QUEUE;
3628 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3631 for (t = mvar->head; t != END_TSO_QUEUE;
3632 last = &t->link, last_tso = t, t = t->link) {
3635 if (mvar->tail == tso) {
3636 mvar->tail = last_tso;
3641 barf("unblockThread (MVAR): TSO not found");
3644 case BlockedOnBlackHole:
3646 last = &blackhole_queue;
3647 for (t = blackhole_queue; t != END_TSO_QUEUE;
3648 last = &t->link, t = t->link) {
3654 barf("unblockThread (BLACKHOLE): TSO not found");
3657 case BlockedOnException:
3659 StgTSO *target = tso->block_info.tso;
3661 ASSERT(get_itbl(target)->type == TSO);
3663 while (target->what_next == ThreadRelocated) {
3664 target = target->link;
3665 ASSERT(get_itbl(target)->type == TSO);
3668 ASSERT(target->blocked_exceptions != NULL);
3670 last = &target->blocked_exceptions;
3671 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3672 last = &t->link, t = t->link) {
3673 ASSERT(get_itbl(t)->type == TSO);
3679 barf("unblockThread (Exception): TSO not found");
3682 #if !defined(THREADED_RTS)
3684 case BlockedOnWrite:
3685 #if defined(mingw32_HOST_OS)
3686 case BlockedOnDoProc:
3689 StgTSO *prev = NULL;
3690 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3691 prev = t, t = t->link) {
3694 blocked_queue_hd = t->link;
3695 if (blocked_queue_tl == t) {
3696 blocked_queue_tl = END_TSO_QUEUE;
3699 prev->link = t->link;
3700 if (blocked_queue_tl == t) {
3701 blocked_queue_tl = prev;
3704 #if defined(mingw32_HOST_OS)
3705 /* (Cooperatively) signal that the worker thread should abort
3708 abandonWorkRequest(tso->block_info.async_result->reqID);
3713 barf("unblockThread (I/O): TSO not found");
3716 case BlockedOnDelay:
3718 StgTSO *prev = NULL;
3719 for (t = sleeping_queue; t != END_TSO_QUEUE;
3720 prev = t, t = t->link) {
3723 sleeping_queue = t->link;
3725 prev->link = t->link;
3730 barf("unblockThread (delay): TSO not found");
3735 barf("unblockThread");
3739 tso->link = END_TSO_QUEUE;
3740 tso->why_blocked = NotBlocked;
3741 tso->block_info.closure = NULL;
3742 appendToRunQueue(cap,tso);
3744 // We might have just migrated this TSO to our Capability:
3746 tso->bound->cap = cap;
3752 /* -----------------------------------------------------------------------------
3755 * Check the blackhole_queue for threads that can be woken up. We do
3756 * this periodically: before every GC, and whenever the run queue is
3759 * An elegant solution might be to just wake up all the blocked
3760 * threads with awakenBlockedQueue occasionally: they'll go back to
3761 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3762 * doesn't give us a way to tell whether we've actually managed to
3763 * wake up any threads, so we would be busy-waiting.
3765 * -------------------------------------------------------------------------- */
3768 checkBlackHoles (Capability *cap)
3771 rtsBool any_woke_up = rtsFalse;
3774 // blackhole_queue is global:
3775 ASSERT_LOCK_HELD(&sched_mutex);
3777 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3779 // ASSUMES: sched_mutex
3780 prev = &blackhole_queue;
3781 t = blackhole_queue;
3782 while (t != END_TSO_QUEUE) {
3783 ASSERT(t->why_blocked == BlockedOnBlackHole);
3784 type = get_itbl(t->block_info.closure)->type;
3785 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3786 IF_DEBUG(sanity,checkTSO(t));
3787 t = unblockOne(cap, t);
3788 // urk, the threads migrate to the current capability
3789 // here, but we'd like to keep them on the original one.
3791 any_woke_up = rtsTrue;
3801 /* -----------------------------------------------------------------------------
3804 * The following function implements the magic for raising an
3805 * asynchronous exception in an existing thread.
3807 * We first remove the thread from any queue on which it might be
3808 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3810 * We strip the stack down to the innermost CATCH_FRAME, building
3811 * thunks in the heap for all the active computations, so they can
3812 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3813 * an application of the handler to the exception, and push it on
3814 * the top of the stack.
3816 * How exactly do we save all the active computations? We create an
3817 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3818 * AP_STACKs pushes everything from the corresponding update frame
3819 * upwards onto the stack. (Actually, it pushes everything up to the
3820 * next update frame plus a pointer to the next AP_STACK object.
3821 * Entering the next AP_STACK object pushes more onto the stack until we
3822 * reach the last AP_STACK object - at which point the stack should look
3823 * exactly as it did when we killed the TSO and we can continue
3824 * execution by entering the closure on top of the stack.
3826 * We can also kill a thread entirely - this happens if either (a) the
3827 * exception passed to raiseAsync is NULL, or (b) there's no
3828 * CATCH_FRAME on the stack. In either case, we strip the entire
3829 * stack and replace the thread with a zombie.
3831 * ToDo: in THREADED_RTS mode, this function is only safe if either
3832 * (a) we hold all the Capabilities (eg. in GC, or if there is only
3833 * one Capability), or (b) we own the Capability that the TSO is
3834 * currently blocked on or on the run queue of.
3836 * -------------------------------------------------------------------------- */
3839 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3841 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3845 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3847 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3851 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3852 rtsBool stop_at_atomically, StgPtr stop_here)
3854 StgRetInfoTable *info;
3858 // Thread already dead?
3859 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3864 sched_belch("raising exception in thread %ld.", (long)tso->id));
3866 // Remove it from any blocking queues
3867 unblockThread(cap,tso);
3869 // mark it dirty; we're about to change its stack.
3874 // The stack freezing code assumes there's a closure pointer on
3875 // the top of the stack, so we have to arrange that this is the case...
3877 if (sp[0] == (W_)&stg_enter_info) {
3881 sp[0] = (W_)&stg_dummy_ret_closure;
3885 while (stop_here == NULL || frame < stop_here) {
3887 // 1. Let the top of the stack be the "current closure"
3889 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3892 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3893 // current closure applied to the chunk of stack up to (but not
3894 // including) the update frame. This closure becomes the "current
3895 // closure". Go back to step 2.
3897 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3898 // top of the stack applied to the exception.
3900 // 5. If it's a STOP_FRAME, then kill the thread.
3902 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3905 info = get_ret_itbl((StgClosure *)frame);
3907 switch (info->i.type) {
3914 // First build an AP_STACK consisting of the stack chunk above the
3915 // current update frame, with the top word on the stack as the
3918 words = frame - sp - 1;
3919 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3922 ap->fun = (StgClosure *)sp[0];
3924 for(i=0; i < (nat)words; ++i) {
3925 ap->payload[i] = (StgClosure *)*sp++;
3928 SET_HDR(ap,&stg_AP_STACK_info,
3929 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3930 TICK_ALLOC_UP_THK(words+1,0);
3933 debugBelch("sched: Updating ");
3934 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3935 debugBelch(" with ");
3936 printObj((StgClosure *)ap);
3939 // Replace the updatee with an indirection
3941 // Warning: if we're in a loop, more than one update frame on
3942 // the stack may point to the same object. Be careful not to
3943 // overwrite an IND_OLDGEN in this case, because we'll screw
3944 // up the mutable lists. To be on the safe side, don't
3945 // overwrite any kind of indirection at all. See also
3946 // threadSqueezeStack in GC.c, where we have to make a similar
3949 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3950 // revert the black hole
3951 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3954 sp += sizeofW(StgUpdateFrame) - 1;
3955 sp[0] = (W_)ap; // push onto stack
3957 continue; //no need to bump frame
3961 // We've stripped the entire stack, the thread is now dead.
3962 tso->what_next = ThreadKilled;
3963 tso->sp = frame + sizeofW(StgStopFrame);
3967 // If we find a CATCH_FRAME, and we've got an exception to raise,
3968 // then build the THUNK raise(exception), and leave it on
3969 // top of the CATCH_FRAME ready to enter.
3973 StgCatchFrame *cf = (StgCatchFrame *)frame;
3977 if (exception == NULL) break;
3979 // we've got an exception to raise, so let's pass it to the
3980 // handler in this frame.
3982 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3983 TICK_ALLOC_SE_THK(1,0);
3984 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3985 raise->payload[0] = exception;
3987 // throw away the stack from Sp up to the CATCH_FRAME.
3991 /* Ensure that async excpetions are blocked now, so we don't get
3992 * a surprise exception before we get around to executing the
3995 if (tso->blocked_exceptions == NULL) {
3996 tso->blocked_exceptions = END_TSO_QUEUE;
3999 /* Put the newly-built THUNK on top of the stack, ready to execute
4000 * when the thread restarts.
4003 sp[-1] = (W_)&stg_enter_info;
4005 tso->what_next = ThreadRunGHC;
4006 IF_DEBUG(sanity, checkTSO(tso));
4010 case ATOMICALLY_FRAME:
4011 if (stop_at_atomically) {
4012 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
4013 stmCondemnTransaction(cap, tso -> trec);
4017 // R1 is not a register: the return convention for IO in
4018 // this case puts the return value on the stack, so we
4019 // need to set up the stack to return to the atomically
4020 // frame properly...
4021 tso->sp = frame - 2;
4022 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
4023 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
4025 tso->what_next = ThreadRunGHC;
4028 // Not stop_at_atomically... fall through and abort the
4031 case CATCH_RETRY_FRAME:
4032 // IF we find an ATOMICALLY_FRAME then we abort the
4033 // current transaction and propagate the exception. In
4034 // this case (unlike ordinary exceptions) we do not care
4035 // whether the transaction is valid or not because its
4036 // possible validity cannot have caused the exception
4037 // and will not be visible after the abort.
4039 debugBelch("Found atomically block delivering async exception\n"));
4040 StgTRecHeader *trec = tso -> trec;
4041 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4042 stmAbortTransaction(cap, trec);
4043 tso -> trec = outer;
4050 // move on to the next stack frame
4051 frame += stack_frame_sizeW((StgClosure *)frame);
4054 // if we got here, then we stopped at stop_here
4055 ASSERT(stop_here != NULL);
4058 /* -----------------------------------------------------------------------------
4061 This is used for interruption (^C) and forking, and corresponds to
4062 raising an exception but without letting the thread catch the
4064 -------------------------------------------------------------------------- */
4067 deleteThread (Capability *cap, StgTSO *tso)
4069 if (tso->why_blocked != BlockedOnCCall &&
4070 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4071 raiseAsync(cap,tso,NULL);
4075 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4077 deleteThread_(Capability *cap, StgTSO *tso)
4078 { // for forkProcess only:
4079 // like deleteThread(), but we delete threads in foreign calls, too.
4081 if (tso->why_blocked == BlockedOnCCall ||
4082 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4083 unblockOne(cap,tso);
4084 tso->what_next = ThreadKilled;
4086 deleteThread(cap,tso);
4091 /* -----------------------------------------------------------------------------
4092 raiseExceptionHelper
4094 This function is called by the raise# primitve, just so that we can
4095 move some of the tricky bits of raising an exception from C-- into
4096 C. Who knows, it might be a useful re-useable thing here too.
4097 -------------------------------------------------------------------------- */
4100 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4102 Capability *cap = regTableToCapability(reg);
4103 StgThunk *raise_closure = NULL;
4105 StgRetInfoTable *info;
4107 // This closure represents the expression 'raise# E' where E
4108 // is the exception raise. It is used to overwrite all the
4109 // thunks which are currently under evaluataion.
4112 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4113 // LDV profiling: stg_raise_info has THUNK as its closure
4114 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4115 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
4116 // 1 does not cause any problem unless profiling is performed.
4117 // However, when LDV profiling goes on, we need to linearly scan
4118 // small object pool, where raise_closure is stored, so we should
4119 // use MIN_UPD_SIZE.
4121 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4122 // sizeofW(StgClosure)+1);
4126 // Walk up the stack, looking for the catch frame. On the way,
4127 // we update any closures pointed to from update frames with the
4128 // raise closure that we just built.
4132 info = get_ret_itbl((StgClosure *)p);
4133 next = p + stack_frame_sizeW((StgClosure *)p);
4134 switch (info->i.type) {
4137 // Only create raise_closure if we need to.
4138 if (raise_closure == NULL) {
4140 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4141 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4142 raise_closure->payload[0] = exception;
4144 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4148 case ATOMICALLY_FRAME:
4149 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4151 return ATOMICALLY_FRAME;
4157 case CATCH_STM_FRAME:
4158 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4160 return CATCH_STM_FRAME;
4166 case CATCH_RETRY_FRAME:
4175 /* -----------------------------------------------------------------------------
4176 findRetryFrameHelper
4178 This function is called by the retry# primitive. It traverses the stack
4179 leaving tso->sp referring to the frame which should handle the retry.
4181 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
4182 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
4184 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4185 despite the similar implementation.
4187 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4188 not be created within memory transactions.
4189 -------------------------------------------------------------------------- */
4192 findRetryFrameHelper (StgTSO *tso)
4195 StgRetInfoTable *info;
4199 info = get_ret_itbl((StgClosure *)p);
4200 next = p + stack_frame_sizeW((StgClosure *)p);
4201 switch (info->i.type) {
4203 case ATOMICALLY_FRAME:
4204 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4206 return ATOMICALLY_FRAME;
4208 case CATCH_RETRY_FRAME:
4209 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4211 return CATCH_RETRY_FRAME;
4213 case CATCH_STM_FRAME:
4215 ASSERT(info->i.type != CATCH_FRAME);
4216 ASSERT(info->i.type != STOP_FRAME);
4223 /* -----------------------------------------------------------------------------
4224 resurrectThreads is called after garbage collection on the list of
4225 threads found to be garbage. Each of these threads will be woken
4226 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4227 on an MVar, or NonTermination if the thread was blocked on a Black
4230 Locks: assumes we hold *all* the capabilities.
4231 -------------------------------------------------------------------------- */
4234 resurrectThreads (StgTSO *threads)
4239 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4240 next = tso->global_link;
4241 tso->global_link = all_threads;
4243 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4245 // Wake up the thread on the Capability it was last on
4248 switch (tso->why_blocked) {
4250 case BlockedOnException:
4251 /* Called by GC - sched_mutex lock is currently held. */
4252 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4254 case BlockedOnBlackHole:
4255 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4258 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4261 /* This might happen if the thread was blocked on a black hole
4262 * belonging to a thread that we've just woken up (raiseAsync
4263 * can wake up threads, remember...).
4267 barf("resurrectThreads: thread blocked in a strange way");
4272 /* ----------------------------------------------------------------------------
4273 * Debugging: why is a thread blocked
4274 * [Also provides useful information when debugging threaded programs
4275 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4276 ------------------------------------------------------------------------- */
4280 printThreadBlockage(StgTSO *tso)
4282 switch (tso->why_blocked) {
4284 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4286 case BlockedOnWrite:
4287 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4289 #if defined(mingw32_HOST_OS)
4290 case BlockedOnDoProc:
4291 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4294 case BlockedOnDelay:
4295 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4298 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4300 case BlockedOnException:
4301 debugBelch("is blocked on delivering an exception to thread %d",
4302 tso->block_info.tso->id);
4304 case BlockedOnBlackHole:
4305 debugBelch("is blocked on a black hole");
4308 debugBelch("is not blocked");
4310 #if defined(PARALLEL_HASKELL)
4312 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4313 tso->block_info.closure, info_type(tso->block_info.closure));
4315 case BlockedOnGA_NoSend:
4316 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4317 tso->block_info.closure, info_type(tso->block_info.closure));
4320 case BlockedOnCCall:
4321 debugBelch("is blocked on an external call");
4323 case BlockedOnCCall_NoUnblockExc:
4324 debugBelch("is blocked on an external call (exceptions were already blocked)");
4327 debugBelch("is blocked on an STM operation");
4330 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4331 tso->why_blocked, tso->id, tso);
4336 printThreadStatus(StgTSO *t)
4338 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4340 void *label = lookupThreadLabel(t->id);
4341 if (label) debugBelch("[\"%s\"] ",(char *)label);
4343 if (t->what_next == ThreadRelocated) {
4344 debugBelch("has been relocated...\n");
4346 switch (t->what_next) {
4348 debugBelch("has been killed");
4350 case ThreadComplete:
4351 debugBelch("has completed");
4354 printThreadBlockage(t);
4361 printAllThreads(void)
4368 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4369 ullong_format_string(TIME_ON_PROC(CurrentProc),
4370 time_string, rtsFalse/*no commas!*/);
4372 debugBelch("all threads at [%s]:\n", time_string);
4373 # elif defined(PARALLEL_HASKELL)
4374 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4375 ullong_format_string(CURRENT_TIME,
4376 time_string, rtsFalse/*no commas!*/);
4378 debugBelch("all threads at [%s]:\n", time_string);
4380 debugBelch("all threads:\n");
4383 for (i = 0; i < n_capabilities; i++) {
4384 cap = &capabilities[i];
4385 debugBelch("threads on capability %d:\n", cap->no);
4386 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4387 printThreadStatus(t);
4391 debugBelch("other threads:\n");
4392 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4393 if (t->why_blocked != NotBlocked) {
4394 printThreadStatus(t);
4396 if (t->what_next == ThreadRelocated) {
4399 next = t->global_link;
4406 printThreadQueue(StgTSO *t)
4409 for (; t != END_TSO_QUEUE; t = t->link) {
4410 printThreadStatus(t);
4413 debugBelch("%d threads on queue\n", i);
4417 Print a whole blocking queue attached to node (debugging only).
4419 # if defined(PARALLEL_HASKELL)
4421 print_bq (StgClosure *node)
4423 StgBlockingQueueElement *bqe;
4427 debugBelch("## BQ of closure %p (%s): ",
4428 node, info_type(node));
4430 /* should cover all closures that may have a blocking queue */
4431 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4432 get_itbl(node)->type == FETCH_ME_BQ ||
4433 get_itbl(node)->type == RBH ||
4434 get_itbl(node)->type == MVAR);
4436 ASSERT(node!=(StgClosure*)NULL); // sanity check
4438 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4442 Print a whole blocking queue starting with the element bqe.
4445 print_bqe (StgBlockingQueueElement *bqe)
4450 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4452 for (end = (bqe==END_BQ_QUEUE);
4453 !end; // iterate until bqe points to a CONSTR
4454 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4455 bqe = end ? END_BQ_QUEUE : bqe->link) {
4456 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4457 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4458 /* types of closures that may appear in a blocking queue */
4459 ASSERT(get_itbl(bqe)->type == TSO ||
4460 get_itbl(bqe)->type == BLOCKED_FETCH ||
4461 get_itbl(bqe)->type == CONSTR);
4462 /* only BQs of an RBH end with an RBH_Save closure */
4463 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4465 switch (get_itbl(bqe)->type) {
4467 debugBelch(" TSO %u (%x),",
4468 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4471 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4472 ((StgBlockedFetch *)bqe)->node,
4473 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4474 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4475 ((StgBlockedFetch *)bqe)->ga.weight);
4478 debugBelch(" %s (IP %p),",
4479 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4480 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4481 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4482 "RBH_Save_?"), get_itbl(bqe));
4485 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4486 info_type((StgClosure *)bqe)); // , node, info_type(node));
4492 # elif defined(GRAN)
4494 print_bq (StgClosure *node)
4496 StgBlockingQueueElement *bqe;
4497 PEs node_loc, tso_loc;
4500 /* should cover all closures that may have a blocking queue */
4501 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4502 get_itbl(node)->type == FETCH_ME_BQ ||
4503 get_itbl(node)->type == RBH);
4505 ASSERT(node!=(StgClosure*)NULL); // sanity check
4506 node_loc = where_is(node);
4508 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4509 node, info_type(node), node_loc);
4512 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4514 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4515 !end; // iterate until bqe points to a CONSTR
4516 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4517 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4518 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4519 /* types of closures that may appear in a blocking queue */
4520 ASSERT(get_itbl(bqe)->type == TSO ||
4521 get_itbl(bqe)->type == CONSTR);
4522 /* only BQs of an RBH end with an RBH_Save closure */
4523 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4525 tso_loc = where_is((StgClosure *)bqe);
4526 switch (get_itbl(bqe)->type) {
4528 debugBelch(" TSO %d (%p) on [PE %d],",
4529 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4532 debugBelch(" %s (IP %p),",
4533 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4534 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4535 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4536 "RBH_Save_?"), get_itbl(bqe));
4539 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4540 info_type((StgClosure *)bqe), node, info_type(node));
4548 #if defined(PARALLEL_HASKELL)
4555 for (i=0, tso=run_queue_hd;
4556 tso != END_TSO_QUEUE;
4557 i++, tso=tso->link) {
4566 sched_belch(char *s, ...)
4571 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4572 #elif defined(PARALLEL_HASKELL)
4575 debugBelch("sched: ");