1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
79 #define USED_WHEN_THREADED_RTS STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
86 #define USED_WHEN_SMP STG_UNUSED
89 /* -----------------------------------------------------------------------------
91 * -------------------------------------------------------------------------- */
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
99 In GranSim we have a runnable and a blocked queue for each processor.
100 In order to minimise code changes new arrays run_queue_hds/tls
101 are created. run_queue_hd is then a short cut (macro) for
102 run_queue_hds[CurrentProc] (see GranSim.h).
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109 the std RTS (i.e. we are cheating). However, we don't use this list in
110 the GranSim specific code at the moment (so we are only potentially
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
122 /* Threads blocked on blackholes.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *blackhole_queue = NULL;
128 /* The blackhole_queue should be checked for threads to wake up. See
129 * Schedule.h for more thorough comment.
130 * LOCK: none (doesn't matter if we miss an update)
132 rtsBool blackholes_need_checking = rtsFalse;
134 /* Linked list of all threads.
135 * Used for detecting garbage collected threads.
136 * LOCK: sched_mutex+capability, or all capabilities
138 StgTSO *all_threads = NULL;
140 /* flag set by signal handler to precipitate a context switch
141 * LOCK: none (just an advisory flag)
143 int context_switch = 0;
145 /* flag that tracks whether we have done any execution in this time slice.
146 * LOCK: currently none, perhaps we should lock (but needs to be
147 * updated in the fast path of the scheduler).
149 nat recent_activity = ACTIVITY_YES;
151 /* if this flag is set as well, give up execution
152 * LOCK: none (changes once, from false->true)
154 rtsBool interrupted = rtsFalse;
156 /* Next thread ID to allocate.
159 static StgThreadID next_thread_id = 1;
161 /* The smallest stack size that makes any sense is:
162 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
163 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
164 * + 1 (the closure to enter)
166 * + 1 (spare slot req'd by stg_ap_v_ret)
168 * A thread with this stack will bomb immediately with a stack
169 * overflow, which will increase its stack size.
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
177 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
178 * exists - earlier gccs apparently didn't.
184 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185 * in an MT setting, needed to signal that a worker thread shouldn't hang around
186 * in the scheduler when it is out of work.
188 rtsBool shutting_down_scheduler = rtsFalse;
191 * This mutex protects most of the global scheduler data in
192 * the THREADED_RTS and (inc. SMP) runtime.
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex = INIT_MUTEX_VAR;
198 #if defined(PARALLEL_HASKELL)
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
204 /* -----------------------------------------------------------------------------
205 * static function prototypes
206 * -------------------------------------------------------------------------- */
208 static Capability *schedule (Capability *initialCapability, Task *task);
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
215 static void schedulePreLoop (void);
217 static void schedulePushWork(Capability *cap, Task *task);
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
239 nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
253 rtsBool stop_at_atomically);
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);
270 static char *whatNext_strs[] = {
280 /* -----------------------------------------------------------------------------
281 * Putting a thread on the run queue: different scheduling policies
282 * -------------------------------------------------------------------------- */
285 addToRunQueue( Capability *cap, StgTSO *t )
287 #if defined(PARALLEL_HASKELL)
288 if (RtsFlags.ParFlags.doFairScheduling) {
289 // this does round-robin scheduling; good for concurrency
290 appendToRunQueue(cap,t);
292 // this does unfair scheduling; good for parallelism
293 pushOnRunQueue(cap,t);
296 // this does round-robin scheduling; good for concurrency
297 appendToRunQueue(cap,t);
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
314 In a GranSim setup this loop iterates over the global event queue.
315 This revolves around the global event queue, which determines what
316 to do next. Therefore, it's more complicated than either the
317 concurrent or the parallel (GUM) setup.
320 GUM iterates over incoming messages.
321 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322 and sends out a fish whenever it has nothing to do; in-between
323 doing the actual reductions (shared code below) it processes the
324 incoming messages and deals with delayed operations
325 (see PendingFetches).
326 This is not the ugliest code you could imagine, but it's bloody close.
328 ------------------------------------------------------------------------ */
331 schedule (Capability *initialCapability, Task *task)
335 StgThreadReturnCode ret;
338 #elif defined(PARALLEL_HASKELL)
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
348 #if defined(THREADED_RTS)
349 rtsBool first = rtsTrue;
352 cap = initialCapability;
354 // Pre-condition: this task owns initialCapability.
355 // The sched_mutex is *NOT* held
356 // NB. on return, we still hold a capability.
359 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360 task, initialCapability);
365 // -----------------------------------------------------------
366 // Scheduler loop starts here:
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION (!receivedFinish)
371 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
373 #define TERMINATION_CONDITION rtsTrue
376 while (TERMINATION_CONDITION) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
384 #if defined(THREADED_RTS)
386 // don't yield the first time, we want a chance to run this
387 // thread for a bit, even if there are others banging at the
390 ASSERT_CAPABILITY_INVARIANTS(cap,task);
392 // Yield the capability to higher-priority tasks if necessary.
393 yieldCapability(&cap, task);
398 schedulePushWork(cap,task);
401 // Check whether we have re-entered the RTS from Haskell without
402 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
404 if (cap->in_haskell) {
405 errorBelch("schedule: re-entered unsafely.\n"
406 " Perhaps a 'foreign import unsafe' should be 'safe'?");
407 stg_exit(EXIT_FAILURE);
411 // Test for interruption. If interrupted==rtsTrue, then either
412 // we received a keyboard interrupt (^C), or the scheduler is
413 // trying to shut down all the tasks (shutting_down_scheduler) in
418 if (shutting_down_scheduler) {
419 IF_DEBUG(scheduler, sched_belch("shutting down"));
420 // If we are a worker, just exit. If we're a bound thread
421 // then we will exit below when we've removed our TSO from
423 if (task->tso == NULL && emptyRunQueue(cap)) {
427 IF_DEBUG(scheduler, sched_belch("interrupted"));
431 #if defined(not_yet) && defined(SMP)
433 // Top up the run queue from our spark pool. We try to make the
434 // number of threads in the run queue equal to the number of
435 // free capabilities.
439 if (emptyRunQueue()) {
440 spark = findSpark(rtsFalse);
442 break; /* no more sparks in the pool */
444 createSparkThread(spark);
446 sched_belch("==^^ turning spark of closure %p into a thread",
447 (StgClosure *)spark));
453 scheduleStartSignalHandlers(cap);
455 // Only check the black holes here if we've nothing else to do.
456 // During normal execution, the black hole list only gets checked
457 // at GC time, to avoid repeatedly traversing this possibly long
458 // list each time around the scheduler.
459 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
461 scheduleCheckBlockedThreads(cap);
463 scheduleDetectDeadlock(cap,task);
465 // Normally, the only way we can get here with no threads to
466 // run is if a keyboard interrupt received during
467 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468 // Additionally, it is not fatal for the
469 // threaded RTS to reach here with no threads to run.
471 // win32: might be here due to awaitEvent() being abandoned
472 // as a result of a console event having been delivered.
473 if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
477 continue; // nothing to do
480 #if defined(PARALLEL_HASKELL)
481 scheduleSendPendingMessages();
482 if (emptyRunQueue(cap) && scheduleActivateSpark())
486 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
489 /* If we still have no work we need to send a FISH to get a spark
491 if (emptyRunQueue(cap)) {
492 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493 ASSERT(rtsFalse); // should not happen at the moment
495 // from here: non-empty run queue.
496 // TODO: merge above case with this, only one call processMessages() !
497 if (PacketsWaiting()) { /* process incoming messages, if
498 any pending... only in else
499 because getRemoteWork waits for
501 receivedFinish = processMessages();
506 scheduleProcessEvent(event);
510 // Get a thread to run
512 t = popRunQueue(cap);
514 #if defined(GRAN) || defined(PAR)
515 scheduleGranParReport(); // some kind of debuging output
517 // Sanity check the thread we're about to run. This can be
518 // expensive if there is lots of thread switching going on...
519 IF_DEBUG(sanity,checkTSO(t));
522 #if defined(THREADED_RTS)
523 // Check whether we can run this thread in the current task.
524 // If not, we have to pass our capability to the right task.
526 Task *bound = t->bound;
531 sched_belch("### Running thread %d in bound thread",
533 // yes, the Haskell thread is bound to the current native thread
536 sched_belch("### thread %d bound to another OS thread",
538 // no, bound to a different Haskell thread: pass to that thread
539 pushOnRunQueue(cap,t);
543 // The thread we want to run is unbound.
546 sched_belch("### this OS thread cannot run thread %d", t->id));
547 // no, the current native thread is bound to a different
548 // Haskell thread, so pass it to any worker thread
549 pushOnRunQueue(cap,t);
556 cap->r.rCurrentTSO = t;
558 /* context switches are initiated by the timer signal, unless
559 * the user specified "context switch as often as possible", with
562 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563 && !emptyThreadQueues(cap)) {
569 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
570 (long)t->id, whatNext_strs[t->what_next]));
572 #if defined(PROFILING)
573 startHeapProfTimer();
576 // ----------------------------------------------------------------------
577 // Run the current thread
579 prev_what_next = t->what_next;
581 errno = t->saved_errno;
582 cap->in_haskell = rtsTrue;
584 recent_activity = ACTIVITY_YES;
586 switch (prev_what_next) {
590 /* Thread already finished, return to scheduler. */
591 ret = ThreadFinished;
597 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598 cap = regTableToCapability(r);
603 case ThreadInterpret:
604 cap = interpretBCO(cap);
609 barf("schedule: invalid what_next field");
612 cap->in_haskell = rtsFalse;
614 // The TSO might have moved, eg. if it re-entered the RTS and a GC
615 // happened. So find the new location:
616 t = cap->r.rCurrentTSO;
619 // If ret is ThreadBlocked, and this Task is bound to the TSO that
620 // blocked, we are in limbo - the TSO is now owned by whatever it
621 // is blocked on, and may in fact already have been woken up,
622 // perhaps even on a different Capability. It may be the case
623 // that task->cap != cap. We better yield this Capability
624 // immediately and return to normaility.
625 if (ret == ThreadBlocked) {
627 debugBelch("--<< thread %d (%s) stopped: blocked\n",
628 t->id, whatNext_strs[t->what_next]));
633 ASSERT_CAPABILITY_INVARIANTS(cap,task);
635 // And save the current errno in this thread.
636 t->saved_errno = errno;
638 // ----------------------------------------------------------------------
640 // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
646 // We have run some Haskell code: there might be blackhole-blocked
647 // threads to wake up now.
648 // Lock-free test here should be ok, we're just setting a flag.
649 if ( blackhole_queue != END_TSO_QUEUE ) {
650 blackholes_need_checking = rtsTrue;
653 #if defined(THREADED_RTS)
654 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
656 IF_DEBUG(scheduler,debugBelch("sched: "););
659 schedulePostRunThread();
661 ready_to_gc = rtsFalse;
665 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
669 scheduleHandleStackOverflow(cap,task,t);
673 if (scheduleHandleYield(cap, t, prev_what_next)) {
674 // shortcut for switching between compiler/interpreter:
680 scheduleHandleThreadBlocked(t);
684 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685 ASSERT_CAPABILITY_INVARIANTS(cap,task);
689 barf("schedule: invalid thread return code %d", (int)ret);
692 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694 } /* end of while() */
696 IF_PAR_DEBUG(verbose,
697 debugBelch("== Leaving schedule() after having received Finish\n"));
700 /* ----------------------------------------------------------------------------
701 * Setting up the scheduler loop
702 * ------------------------------------------------------------------------- */
705 schedulePreLoop(void)
708 /* set up first event to get things going */
709 /* ToDo: assign costs for system setup and init MainTSO ! */
710 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
712 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
715 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
717 G_TSO(CurrentTSO, 5));
719 if (RtsFlags.GranFlags.Light) {
720 /* Save current time; GranSim Light only */
721 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
726 /* -----------------------------------------------------------------------------
729 * Push work to other Capabilities if we have some.
730 * -------------------------------------------------------------------------- */
734 schedulePushWork(Capability *cap USED_WHEN_SMP,
735 Task *task USED_WHEN_SMP)
737 Capability *free_caps[n_capabilities], *cap0;
740 // Check whether we have more threads on our run queue that we
741 // could hand to another Capability.
742 if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
746 // First grab as many free Capabilities as we can.
747 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748 cap0 = &capabilities[i];
749 if (cap != cap0 && tryGrabCapability(cap0,task)) {
750 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751 // it already has some work, we just grabbed it at
752 // the wrong moment. Or maybe it's deadlocked!
753 releaseCapability(cap0);
755 free_caps[n_free_caps++] = cap0;
760 // we now have n_free_caps free capabilities stashed in
761 // free_caps[]. Share our run queue equally with them. This is
762 // probably the simplest thing we could do; improvements we might
763 // want to do include:
765 // - giving high priority to moving relatively new threads, on
766 // the gournds that they haven't had time to build up a
767 // working set in the cache on this CPU/Capability.
769 // - giving low priority to moving long-lived threads
771 if (n_free_caps > 0) {
772 StgTSO *prev, *t, *next;
773 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
775 prev = cap->run_queue_hd;
777 prev->link = END_TSO_QUEUE;
779 for (; t != END_TSO_QUEUE; t = next) {
781 t->link = END_TSO_QUEUE;
782 if (t->what_next == ThreadRelocated
783 || t->bound == task) { // don't move my bound thread
786 } else if (i == n_free_caps) {
792 appendToRunQueue(free_caps[i],t);
793 if (t->bound) { t->bound->cap = free_caps[i]; }
797 cap->run_queue_tl = prev;
799 // release the capabilities
800 for (i = 0; i < n_free_caps; i++) {
801 task->cap = free_caps[i];
802 releaseCapability(free_caps[i]);
805 task->cap = cap; // reset to point to our Capability.
809 /* ----------------------------------------------------------------------------
810 * Start any pending signal handlers
811 * ------------------------------------------------------------------------- */
814 scheduleStartSignalHandlers(Capability *cap)
816 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
817 if (signals_pending()) { // safe outside the lock
818 startSignalHandlers(cap);
823 /* ----------------------------------------------------------------------------
824 * Check for blocked threads that can be woken up.
825 * ------------------------------------------------------------------------- */
828 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
830 #if !defined(THREADED_RTS)
832 // Check whether any waiting threads need to be woken up. If the
833 // run queue is empty, and there are no other tasks running, we
834 // can wait indefinitely for something to happen.
836 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
838 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
844 /* ----------------------------------------------------------------------------
845 * Check for threads blocked on BLACKHOLEs that can be woken up
846 * ------------------------------------------------------------------------- */
848 scheduleCheckBlackHoles (Capability *cap)
850 if ( blackholes_need_checking ) // check without the lock first
852 ACQUIRE_LOCK(&sched_mutex);
853 if ( blackholes_need_checking ) {
854 checkBlackHoles(cap);
855 blackholes_need_checking = rtsFalse;
857 RELEASE_LOCK(&sched_mutex);
861 /* ----------------------------------------------------------------------------
862 * Detect deadlock conditions and attempt to resolve them.
863 * ------------------------------------------------------------------------- */
866 scheduleDetectDeadlock (Capability *cap, Task *task)
869 #if defined(PARALLEL_HASKELL)
870 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
875 * Detect deadlock: when we have no threads to run, there are no
876 * threads blocked, waiting for I/O, or sleeping, and all the
877 * other tasks are waiting for work, we must have a deadlock of
880 if ( emptyThreadQueues(cap) )
882 #if defined(THREADED_RTS)
884 * In the threaded RTS, we only check for deadlock if there
885 * has been no activity in a complete timeslice. This means
886 * we won't eagerly start a full GC just because we don't have
887 * any threads to run currently.
889 if (recent_activity != ACTIVITY_INACTIVE) return;
892 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
894 // Garbage collection can release some new threads due to
895 // either (a) finalizers or (b) threads resurrected because
896 // they are unreachable and will therefore be sent an
897 // exception. Any threads thus released will be immediately
899 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
900 recent_activity = ACTIVITY_DONE_GC;
902 if ( !emptyRunQueue(cap) ) return;
904 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
905 /* If we have user-installed signal handlers, then wait
906 * for signals to arrive rather then bombing out with a
909 if ( anyUserHandlers() ) {
911 sched_belch("still deadlocked, waiting for signals..."));
915 if (signals_pending()) {
916 startSignalHandlers(cap);
919 // either we have threads to run, or we were interrupted:
920 ASSERT(!emptyRunQueue(cap) || interrupted);
924 #if !defined(THREADED_RTS)
925 /* Probably a real deadlock. Send the current main thread the
926 * Deadlock exception.
929 switch (task->tso->why_blocked) {
931 case BlockedOnBlackHole:
932 case BlockedOnException:
934 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
937 barf("deadlock: main thread blocked in a strange way");
945 /* ----------------------------------------------------------------------------
946 * Process an event (GRAN only)
947 * ------------------------------------------------------------------------- */
951 scheduleProcessEvent(rtsEvent *event)
955 if (RtsFlags.GranFlags.Light)
956 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
958 /* adjust time based on time-stamp */
959 if (event->time > CurrentTime[CurrentProc] &&
960 event->evttype != ContinueThread)
961 CurrentTime[CurrentProc] = event->time;
963 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
964 if (!RtsFlags.GranFlags.Light)
967 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
969 /* main event dispatcher in GranSim */
970 switch (event->evttype) {
971 /* Should just be continuing execution */
973 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
974 /* ToDo: check assertion
975 ASSERT(run_queue_hd != (StgTSO*)NULL &&
976 run_queue_hd != END_TSO_QUEUE);
978 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
979 if (!RtsFlags.GranFlags.DoAsyncFetch &&
980 procStatus[CurrentProc]==Fetching) {
981 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
982 CurrentTSO->id, CurrentTSO, CurrentProc);
985 /* Ignore ContinueThreads for completed threads */
986 if (CurrentTSO->what_next == ThreadComplete) {
987 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
988 CurrentTSO->id, CurrentTSO, CurrentProc);
991 /* Ignore ContinueThreads for threads that are being migrated */
992 if (PROCS(CurrentTSO)==Nowhere) {
993 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
994 CurrentTSO->id, CurrentTSO, CurrentProc);
997 /* The thread should be at the beginning of the run queue */
998 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
999 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1000 CurrentTSO->id, CurrentTSO, CurrentProc);
1001 break; // run the thread anyway
1004 new_event(proc, proc, CurrentTime[proc],
1006 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1008 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1009 break; // now actually run the thread; DaH Qu'vam yImuHbej
1012 do_the_fetchnode(event);
1013 goto next_thread; /* handle next event in event queue */
1016 do_the_globalblock(event);
1017 goto next_thread; /* handle next event in event queue */
1020 do_the_fetchreply(event);
1021 goto next_thread; /* handle next event in event queue */
1023 case UnblockThread: /* Move from the blocked queue to the tail of */
1024 do_the_unblock(event);
1025 goto next_thread; /* handle next event in event queue */
1027 case ResumeThread: /* Move from the blocked queue to the tail of */
1028 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1029 event->tso->gran.blocktime +=
1030 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1031 do_the_startthread(event);
1032 goto next_thread; /* handle next event in event queue */
1035 do_the_startthread(event);
1036 goto next_thread; /* handle next event in event queue */
1039 do_the_movethread(event);
1040 goto next_thread; /* handle next event in event queue */
1043 do_the_movespark(event);
1044 goto next_thread; /* handle next event in event queue */
1047 do_the_findwork(event);
1048 goto next_thread; /* handle next event in event queue */
1051 barf("Illegal event type %u\n", event->evttype);
1054 /* This point was scheduler_loop in the old RTS */
1056 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1058 TimeOfLastEvent = CurrentTime[CurrentProc];
1059 TimeOfNextEvent = get_time_of_next_event();
1060 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1061 // CurrentTSO = ThreadQueueHd;
1063 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1066 if (RtsFlags.GranFlags.Light)
1067 GranSimLight_leave_system(event, &ActiveTSO);
1069 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1072 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1074 /* in a GranSim setup the TSO stays on the run queue */
1076 /* Take a thread from the run queue. */
1077 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1080 debugBelch("GRAN: About to run current thread, which is\n");
1083 context_switch = 0; // turned on via GranYield, checking events and time slice
1086 DumpGranEvent(GR_SCHEDULE, t));
1088 procStatus[CurrentProc] = Busy;
1092 /* ----------------------------------------------------------------------------
1093 * Send pending messages (PARALLEL_HASKELL only)
1094 * ------------------------------------------------------------------------- */
1096 #if defined(PARALLEL_HASKELL)
1098 scheduleSendPendingMessages(void)
1104 # if defined(PAR) // global Mem.Mgmt., omit for now
1105 if (PendingFetches != END_BF_QUEUE) {
1110 if (RtsFlags.ParFlags.BufferTime) {
1111 // if we use message buffering, we must send away all message
1112 // packets which have become too old...
1118 /* ----------------------------------------------------------------------------
1119 * Activate spark threads (PARALLEL_HASKELL only)
1120 * ------------------------------------------------------------------------- */
1122 #if defined(PARALLEL_HASKELL)
1124 scheduleActivateSpark(void)
1127 ASSERT(emptyRunQueue());
1128 /* We get here if the run queue is empty and want some work.
1129 We try to turn a spark into a thread, and add it to the run queue,
1130 from where it will be picked up in the next iteration of the scheduler
1134 /* :-[ no local threads => look out for local sparks */
1135 /* the spark pool for the current PE */
1136 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1137 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1138 pool->hd < pool->tl) {
1140 * ToDo: add GC code check that we really have enough heap afterwards!!
1142 * If we're here (no runnable threads) and we have pending
1143 * sparks, we must have a space problem. Get enough space
1144 * to turn one of those pending sparks into a
1148 spark = findSpark(rtsFalse); /* get a spark */
1149 if (spark != (rtsSpark) NULL) {
1150 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1151 IF_PAR_DEBUG(fish, // schedule,
1152 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1153 tso->id, tso, advisory_thread_count));
1155 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1156 IF_PAR_DEBUG(fish, // schedule,
1157 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1159 return rtsFalse; /* failed to generate a thread */
1160 } /* otherwise fall through & pick-up new tso */
1162 IF_PAR_DEBUG(fish, // schedule,
1163 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1164 spark_queue_len(pool)));
1165 return rtsFalse; /* failed to generate a thread */
1167 return rtsTrue; /* success in generating a thread */
1168 } else { /* no more threads permitted or pool empty */
1169 return rtsFalse; /* failed to generateThread */
1172 tso = NULL; // avoid compiler warning only
1173 return rtsFalse; /* dummy in non-PAR setup */
1176 #endif // PARALLEL_HASKELL
1178 /* ----------------------------------------------------------------------------
1179 * Get work from a remote node (PARALLEL_HASKELL only)
1180 * ------------------------------------------------------------------------- */
1182 #if defined(PARALLEL_HASKELL)
1184 scheduleGetRemoteWork(rtsBool *receivedFinish)
1186 ASSERT(emptyRunQueue());
1188 if (RtsFlags.ParFlags.BufferTime) {
1189 IF_PAR_DEBUG(verbose,
1190 debugBelch("...send all pending data,"));
1193 for (i=1; i<=nPEs; i++)
1194 sendImmediately(i); // send all messages away immediately
1198 //++EDEN++ idle() , i.e. send all buffers, wait for work
1199 // suppress fishing in EDEN... just look for incoming messages
1200 // (blocking receive)
1201 IF_PAR_DEBUG(verbose,
1202 debugBelch("...wait for incoming messages...\n"));
1203 *receivedFinish = processMessages(); // blocking receive...
1205 // and reenter scheduling loop after having received something
1206 // (return rtsFalse below)
1208 # else /* activate SPARKS machinery */
1209 /* We get here, if we have no work, tried to activate a local spark, but still
1210 have no work. We try to get a remote spark, by sending a FISH message.
1211 Thread migration should be added here, and triggered when a sequence of
1212 fishes returns without work. */
1213 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1215 /* =8-[ no local sparks => look for work on other PEs */
1217 * We really have absolutely no work. Send out a fish
1218 * (there may be some out there already), and wait for
1219 * something to arrive. We clearly can't run any threads
1220 * until a SCHEDULE or RESUME arrives, and so that's what
1221 * we're hoping to see. (Of course, we still have to
1222 * respond to other types of messages.)
1224 rtsTime now = msTime() /*CURRENT_TIME*/;
1225 IF_PAR_DEBUG(verbose,
1226 debugBelch("-- now=%ld\n", now));
1227 IF_PAR_DEBUG(fish, // verbose,
1228 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1229 (last_fish_arrived_at!=0 &&
1230 last_fish_arrived_at+delay > now)) {
1231 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1232 now, last_fish_arrived_at+delay,
1233 last_fish_arrived_at,
1237 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1238 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1239 if (last_fish_arrived_at==0 ||
1240 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1241 /* outstandingFishes is set in sendFish, processFish;
1242 avoid flooding system with fishes via delay */
1243 next_fish_to_send_at = 0;
1245 /* ToDo: this should be done in the main scheduling loop to avoid the
1246 busy wait here; not so bad if fish delay is very small */
1247 int iq = 0; // DEBUGGING -- HWL
1248 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1249 /* send a fish when ready, but process messages that arrive in the meantime */
1251 if (PacketsWaiting()) {
1253 *receivedFinish = processMessages();
1256 } while (!*receivedFinish || now<next_fish_to_send_at);
1257 // JB: This means the fish could become obsolete, if we receive
1258 // work. Better check for work again?
1259 // last line: while (!receivedFinish || !haveWork || now<...)
1260 // next line: if (receivedFinish || haveWork )
1262 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1263 return rtsFalse; // NB: this will leave scheduler loop
1264 // immediately after return!
1266 IF_PAR_DEBUG(fish, // verbose,
1267 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1271 // JB: IMHO, this should all be hidden inside sendFish(...)
1273 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1276 // Global statistics: count no. of fishes
1277 if (RtsFlags.ParFlags.ParStats.Global &&
1278 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1279 globalParStats.tot_fish_mess++;
1283 /* delayed fishes must have been sent by now! */
1284 next_fish_to_send_at = 0;
1287 *receivedFinish = processMessages();
1288 # endif /* SPARKS */
1291 /* NB: this function always returns rtsFalse, meaning the scheduler
1292 loop continues with the next iteration;
1294 return code means success in finding work; we enter this function
1295 if there is no local work, thus have to send a fish which takes
1296 time until it arrives with work; in the meantime we should process
1297 messages in the main loop;
1300 #endif // PARALLEL_HASKELL
1302 /* ----------------------------------------------------------------------------
1303 * PAR/GRAN: Report stats & debugging info(?)
1304 * ------------------------------------------------------------------------- */
1306 #if defined(PAR) || defined(GRAN)
1308 scheduleGranParReport(void)
1310 ASSERT(run_queue_hd != END_TSO_QUEUE);
1312 /* Take a thread from the run queue, if we have work */
1313 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1315 /* If this TSO has got its outport closed in the meantime,
1316 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1317 * It has to be marked as TH_DEAD for this purpose.
1318 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1320 JB: TODO: investigate wether state change field could be nuked
1321 entirely and replaced by the normal tso state (whatnext
1322 field). All we want to do is to kill tsos from outside.
1325 /* ToDo: write something to the log-file
1326 if (RTSflags.ParFlags.granSimStats && !sameThread)
1327 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1331 /* the spark pool for the current PE */
1332 pool = &(cap.r.rSparks); // cap = (old) MainCap
1335 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1336 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1339 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1340 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1342 if (RtsFlags.ParFlags.ParStats.Full &&
1343 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1344 (emitSchedule || // forced emit
1345 (t && LastTSO && t->id != LastTSO->id))) {
1347 we are running a different TSO, so write a schedule event to log file
1348 NB: If we use fair scheduling we also have to write a deschedule
1349 event for LastTSO; with unfair scheduling we know that the
1350 previous tso has blocked whenever we switch to another tso, so
1351 we don't need it in GUM for now
1353 IF_PAR_DEBUG(fish, // schedule,
1354 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1356 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1357 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1358 emitSchedule = rtsFalse;
1363 /* ----------------------------------------------------------------------------
1364 * After running a thread...
1365 * ------------------------------------------------------------------------- */
1368 schedulePostRunThread(void)
1371 /* HACK 675: if the last thread didn't yield, make sure to print a
1372 SCHEDULE event to the log file when StgRunning the next thread, even
1373 if it is the same one as before */
1375 TimeOfLastYield = CURRENT_TIME;
1378 /* some statistics gathering in the parallel case */
1380 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1384 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1385 globalGranStats.tot_heapover++;
1387 globalParStats.tot_heapover++;
1394 DumpGranEvent(GR_DESCHEDULE, t));
1395 globalGranStats.tot_stackover++;
1398 // DumpGranEvent(GR_DESCHEDULE, t);
1399 globalParStats.tot_stackover++;
1403 case ThreadYielding:
1406 DumpGranEvent(GR_DESCHEDULE, t));
1407 globalGranStats.tot_yields++;
1410 // DumpGranEvent(GR_DESCHEDULE, t);
1411 globalParStats.tot_yields++;
1418 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1419 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1420 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1421 if (t->block_info.closure!=(StgClosure*)NULL)
1422 print_bq(t->block_info.closure);
1425 // ??? needed; should emit block before
1427 DumpGranEvent(GR_DESCHEDULE, t));
1428 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1431 ASSERT(procStatus[CurrentProc]==Busy ||
1432 ((procStatus[CurrentProc]==Fetching) &&
1433 (t->block_info.closure!=(StgClosure*)NULL)));
1434 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1435 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1436 procStatus[CurrentProc]==Fetching))
1437 procStatus[CurrentProc] = Idle;
1440 //++PAR++ blockThread() writes the event (change?)
1444 case ThreadFinished:
1448 barf("parGlobalStats: unknown return code");
1454 /* -----------------------------------------------------------------------------
1455 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1456 * -------------------------------------------------------------------------- */
1459 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1461 // did the task ask for a large block?
1462 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1463 // if so, get one and push it on the front of the nursery.
1467 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1470 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1471 (long)t->id, whatNext_strs[t->what_next], blocks));
1473 // don't do this if the nursery is (nearly) full, we'll GC first.
1474 if (cap->r.rCurrentNursery->link != NULL ||
1475 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1476 // if the nursery has only one block.
1479 bd = allocGroup( blocks );
1481 cap->r.rNursery->n_blocks += blocks;
1483 // link the new group into the list
1484 bd->link = cap->r.rCurrentNursery;
1485 bd->u.back = cap->r.rCurrentNursery->u.back;
1486 if (cap->r.rCurrentNursery->u.back != NULL) {
1487 cap->r.rCurrentNursery->u.back->link = bd;
1490 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1491 g0s0 == cap->r.rNursery);
1493 cap->r.rNursery->blocks = bd;
1495 cap->r.rCurrentNursery->u.back = bd;
1497 // initialise it as a nursery block. We initialise the
1498 // step, gen_no, and flags field of *every* sub-block in
1499 // this large block, because this is easier than making
1500 // sure that we always find the block head of a large
1501 // block whenever we call Bdescr() (eg. evacuate() and
1502 // isAlive() in the GC would both have to do this, at
1506 for (x = bd; x < bd + blocks; x++) {
1507 x->step = cap->r.rNursery;
1513 // This assert can be a killer if the app is doing lots
1514 // of large block allocations.
1515 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1517 // now update the nursery to point to the new block
1518 cap->r.rCurrentNursery = bd;
1520 // we might be unlucky and have another thread get on the
1521 // run queue before us and steal the large block, but in that
1522 // case the thread will just end up requesting another large
1524 pushOnRunQueue(cap,t);
1525 return rtsFalse; /* not actually GC'ing */
1530 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1531 (long)t->id, whatNext_strs[t->what_next]));
1533 ASSERT(!is_on_queue(t,CurrentProc));
1534 #elif defined(PARALLEL_HASKELL)
1535 /* Currently we emit a DESCHEDULE event before GC in GUM.
1536 ToDo: either add separate event to distinguish SYSTEM time from rest
1537 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1538 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1539 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1540 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1541 emitSchedule = rtsTrue;
1545 pushOnRunQueue(cap,t);
1547 /* actual GC is done at the end of the while loop in schedule() */
1550 /* -----------------------------------------------------------------------------
1551 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1552 * -------------------------------------------------------------------------- */
1555 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1557 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1558 (long)t->id, whatNext_strs[t->what_next]));
1559 /* just adjust the stack for this thread, then pop it back
1563 /* enlarge the stack */
1564 StgTSO *new_t = threadStackOverflow(cap, t);
1566 /* The TSO attached to this Task may have moved, so update the
1569 if (task->tso == t) {
1572 pushOnRunQueue(cap,new_t);
1576 /* -----------------------------------------------------------------------------
1577 * Handle a thread that returned to the scheduler with ThreadYielding
1578 * -------------------------------------------------------------------------- */
1581 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1583 // Reset the context switch flag. We don't do this just before
1584 // running the thread, because that would mean we would lose ticks
1585 // during GC, which can lead to unfair scheduling (a thread hogs
1586 // the CPU because the tick always arrives during GC). This way
1587 // penalises threads that do a lot of allocation, but that seems
1588 // better than the alternative.
1591 /* put the thread back on the run queue. Then, if we're ready to
1592 * GC, check whether this is the last task to stop. If so, wake
1593 * up the GC thread. getThread will block during a GC until the
1597 if (t->what_next != prev_what_next) {
1598 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1599 (long)t->id, whatNext_strs[t->what_next]);
1601 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1602 (long)t->id, whatNext_strs[t->what_next]);
1607 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1609 ASSERT(t->link == END_TSO_QUEUE);
1611 // Shortcut if we're just switching evaluators: don't bother
1612 // doing stack squeezing (which can be expensive), just run the
1614 if (t->what_next != prev_what_next) {
1619 ASSERT(!is_on_queue(t,CurrentProc));
1622 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1623 checkThreadQsSanity(rtsTrue));
1627 addToRunQueue(cap,t);
1630 /* add a ContinueThread event to actually process the thread */
1631 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1633 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1635 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1642 /* -----------------------------------------------------------------------------
1643 * Handle a thread that returned to the scheduler with ThreadBlocked
1644 * -------------------------------------------------------------------------- */
1647 scheduleHandleThreadBlocked( StgTSO *t
1648 #if !defined(GRAN) && !defined(DEBUG)
1655 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1656 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1657 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1659 // ??? needed; should emit block before
1661 DumpGranEvent(GR_DESCHEDULE, t));
1662 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1665 ASSERT(procStatus[CurrentProc]==Busy ||
1666 ((procStatus[CurrentProc]==Fetching) &&
1667 (t->block_info.closure!=(StgClosure*)NULL)));
1668 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1669 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1670 procStatus[CurrentProc]==Fetching))
1671 procStatus[CurrentProc] = Idle;
1675 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1676 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1679 if (t->block_info.closure!=(StgClosure*)NULL)
1680 print_bq(t->block_info.closure));
1682 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1685 /* whatever we schedule next, we must log that schedule */
1686 emitSchedule = rtsTrue;
1690 // We don't need to do anything. The thread is blocked, and it
1691 // has tidied up its stack and placed itself on whatever queue
1692 // it needs to be on.
1695 ASSERT(t->why_blocked != NotBlocked);
1696 // This might not be true under SMP: we don't have
1697 // exclusive access to this TSO, so someone might have
1698 // woken it up by now. This actually happens: try
1699 // conc023 +RTS -N2.
1703 debugBelch("--<< thread %d (%s) stopped: ",
1704 t->id, whatNext_strs[t->what_next]);
1705 printThreadBlockage(t);
1708 /* Only for dumping event to log file
1709 ToDo: do I need this in GranSim, too?
1715 /* -----------------------------------------------------------------------------
1716 * Handle a thread that returned to the scheduler with ThreadFinished
1717 * -------------------------------------------------------------------------- */
1720 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1722 /* Need to check whether this was a main thread, and if so,
1723 * return with the return value.
1725 * We also end up here if the thread kills itself with an
1726 * uncaught exception, see Exception.cmm.
1728 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1729 t->id, whatNext_strs[t->what_next]));
1732 endThread(t, CurrentProc); // clean-up the thread
1733 #elif defined(PARALLEL_HASKELL)
1734 /* For now all are advisory -- HWL */
1735 //if(t->priority==AdvisoryPriority) ??
1736 advisory_thread_count--; // JB: Caution with this counter, buggy!
1739 if(t->dist.priority==RevalPriority)
1743 # if defined(EDENOLD)
1744 // the thread could still have an outport... (BUG)
1745 if (t->eden.outport != -1) {
1746 // delete the outport for the tso which has finished...
1747 IF_PAR_DEBUG(eden_ports,
1748 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1749 t->eden.outport, t->id));
1752 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1753 if (t->eden.epid != -1) {
1754 IF_PAR_DEBUG(eden_ports,
1755 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1756 t->id, t->eden.epid));
1757 removeTSOfromProcess(t);
1762 if (RtsFlags.ParFlags.ParStats.Full &&
1763 !RtsFlags.ParFlags.ParStats.Suppressed)
1764 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1766 // t->par only contains statistics: left out for now...
1768 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1769 t->id,t,t->par.sparkname));
1771 #endif // PARALLEL_HASKELL
1774 // Check whether the thread that just completed was a bound
1775 // thread, and if so return with the result.
1777 // There is an assumption here that all thread completion goes
1778 // through this point; we need to make sure that if a thread
1779 // ends up in the ThreadKilled state, that it stays on the run
1780 // queue so it can be dealt with here.
1785 if (t->bound != task) {
1786 #if !defined(THREADED_RTS)
1787 // Must be a bound thread that is not the topmost one. Leave
1788 // it on the run queue until the stack has unwound to the
1789 // point where we can deal with this. Leaving it on the run
1790 // queue also ensures that the garbage collector knows about
1791 // this thread and its return value (it gets dropped from the
1792 // all_threads list so there's no other way to find it).
1793 appendToRunQueue(cap,t);
1796 // this cannot happen in the threaded RTS, because a
1797 // bound thread can only be run by the appropriate Task.
1798 barf("finished bound thread that isn't mine");
1802 ASSERT(task->tso == t);
1804 if (t->what_next == ThreadComplete) {
1806 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1807 *(task->ret) = (StgClosure *)task->tso->sp[1];
1809 task->stat = Success;
1812 *(task->ret) = NULL;
1815 task->stat = Interrupted;
1817 task->stat = Killed;
1821 removeThreadLabel((StgWord)task->tso->id);
1823 return rtsTrue; // tells schedule() to return
1829 /* -----------------------------------------------------------------------------
1830 * Perform a heap census, if PROFILING
1831 * -------------------------------------------------------------------------- */
1834 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1836 #if defined(PROFILING)
1837 // When we have +RTS -i0 and we're heap profiling, do a census at
1838 // every GC. This lets us get repeatable runs for debugging.
1839 if (performHeapProfile ||
1840 (RtsFlags.ProfFlags.profileInterval==0 &&
1841 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1842 GarbageCollect(GetRoots, rtsTrue);
1844 performHeapProfile = rtsFalse;
1845 return rtsTrue; // true <=> we already GC'd
1851 /* -----------------------------------------------------------------------------
1852 * Perform a garbage collection if necessary
1853 * -------------------------------------------------------------------------- */
1856 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1860 static volatile StgWord waiting_for_gc;
1861 rtsBool was_waiting;
1866 // In order to GC, there must be no threads running Haskell code.
1867 // Therefore, the GC thread needs to hold *all* the capabilities,
1868 // and release them after the GC has completed.
1870 // This seems to be the simplest way: previous attempts involved
1871 // making all the threads with capabilities give up their
1872 // capabilities and sleep except for the *last* one, which
1873 // actually did the GC. But it's quite hard to arrange for all
1874 // the other tasks to sleep and stay asleep.
1877 was_waiting = cas(&waiting_for_gc, 0, 1);
1880 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1881 yieldCapability(&cap,task);
1882 } while (waiting_for_gc);
1886 for (i=0; i < n_capabilities; i++) {
1887 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1888 if (cap != &capabilities[i]) {
1889 Capability *pcap = &capabilities[i];
1890 // we better hope this task doesn't get migrated to
1891 // another Capability while we're waiting for this one.
1892 // It won't, because load balancing happens while we have
1893 // all the Capabilities, but even so it's a slightly
1894 // unsavoury invariant.
1897 waitForReturnCapability(&pcap, task);
1898 if (pcap != &capabilities[i]) {
1899 barf("scheduleDoGC: got the wrong capability");
1904 waiting_for_gc = rtsFalse;
1907 /* Kick any transactions which are invalid back to their
1908 * atomically frames. When next scheduled they will try to
1909 * commit, this commit will fail and they will retry.
1914 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1915 if (t->what_next == ThreadRelocated) {
1918 next = t->global_link;
1919 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1920 if (!stmValidateNestOfTransactions (t -> trec)) {
1921 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1923 // strip the stack back to the
1924 // ATOMICALLY_FRAME, aborting the (nested)
1925 // transaction, and saving the stack of any
1926 // partially-evaluated thunks on the heap.
1927 raiseAsync_(cap, t, NULL, rtsTrue);
1930 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1938 // so this happens periodically:
1939 scheduleCheckBlackHoles(cap);
1941 IF_DEBUG(scheduler, printAllThreads());
1943 /* everybody back, start the GC.
1944 * Could do it in this thread, or signal a condition var
1945 * to do it in another thread. Either way, we need to
1946 * broadcast on gc_pending_cond afterward.
1948 #if defined(THREADED_RTS)
1949 IF_DEBUG(scheduler,sched_belch("doing GC"));
1951 GarbageCollect(GetRoots, force_major);
1954 // release our stash of capabilities.
1955 for (i = 0; i < n_capabilities; i++) {
1956 if (cap != &capabilities[i]) {
1957 task->cap = &capabilities[i];
1958 releaseCapability(&capabilities[i]);
1965 /* add a ContinueThread event to continue execution of current thread */
1966 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1968 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1970 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1976 /* ---------------------------------------------------------------------------
1977 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1978 * used by Control.Concurrent for error checking.
1979 * ------------------------------------------------------------------------- */
1982 rtsSupportsBoundThreads(void)
1984 #if defined(THREADED_RTS)
1991 /* ---------------------------------------------------------------------------
1992 * isThreadBound(tso): check whether tso is bound to an OS thread.
1993 * ------------------------------------------------------------------------- */
1996 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1998 #if defined(THREADED_RTS)
1999 return (tso->bound != NULL);
2004 /* ---------------------------------------------------------------------------
2005 * Singleton fork(). Do not copy any running threads.
2006 * ------------------------------------------------------------------------- */
2008 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2009 #define FORKPROCESS_PRIMOP_SUPPORTED
2012 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2014 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2017 forkProcess(HsStablePtr *entry
2018 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2023 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2029 IF_DEBUG(scheduler,sched_belch("forking!"));
2031 // ToDo: for SMP, we should probably acquire *all* the capabilities
2036 if (pid) { // parent
2038 // just return the pid
2044 // delete all threads
2045 cap->run_queue_hd = END_TSO_QUEUE;
2046 cap->run_queue_tl = END_TSO_QUEUE;
2048 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2051 // don't allow threads to catch the ThreadKilled exception
2052 deleteThreadImmediately(cap,t);
2055 // wipe the main thread list
2056 while ((task = all_tasks) != NULL) {
2057 all_tasks = task->all_link;
2061 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2062 rts_checkSchedStatus("forkProcess",cap);
2065 hs_exit(); // clean up and exit
2066 stg_exit(EXIT_SUCCESS);
2068 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2069 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2074 /* ---------------------------------------------------------------------------
2075 * Delete the threads on the run queue of the current capability.
2076 * ------------------------------------------------------------------------- */
2079 deleteRunQueue (Capability *cap)
2082 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2083 ASSERT(t->what_next != ThreadRelocated);
2085 deleteThread(cap, t);
2089 /* startThread and insertThread are now in GranSim.c -- HWL */
2092 /* -----------------------------------------------------------------------------
2093 Managing the suspended_ccalling_tasks list.
2094 Locks required: sched_mutex
2095 -------------------------------------------------------------------------- */
2098 suspendTask (Capability *cap, Task *task)
2100 ASSERT(task->next == NULL && task->prev == NULL);
2101 task->next = cap->suspended_ccalling_tasks;
2103 if (cap->suspended_ccalling_tasks) {
2104 cap->suspended_ccalling_tasks->prev = task;
2106 cap->suspended_ccalling_tasks = task;
2110 recoverSuspendedTask (Capability *cap, Task *task)
2113 task->prev->next = task->next;
2115 ASSERT(cap->suspended_ccalling_tasks == task);
2116 cap->suspended_ccalling_tasks = task->next;
2119 task->next->prev = task->prev;
2121 task->next = task->prev = NULL;
2124 /* ---------------------------------------------------------------------------
2125 * Suspending & resuming Haskell threads.
2127 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2128 * its capability before calling the C function. This allows another
2129 * task to pick up the capability and carry on running Haskell
2130 * threads. It also means that if the C call blocks, it won't lock
2133 * The Haskell thread making the C call is put to sleep for the
2134 * duration of the call, on the susepended_ccalling_threads queue. We
2135 * give out a token to the task, which it can use to resume the thread
2136 * on return from the C function.
2137 * ------------------------------------------------------------------------- */
2140 suspendThread (StgRegTable *reg)
2143 int saved_errno = errno;
2147 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2149 cap = regTableToCapability(reg);
2151 task = cap->running_task;
2152 tso = cap->r.rCurrentTSO;
2155 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2157 // XXX this might not be necessary --SDM
2158 tso->what_next = ThreadRunGHC;
2162 if(tso->blocked_exceptions == NULL) {
2163 tso->why_blocked = BlockedOnCCall;
2164 tso->blocked_exceptions = END_TSO_QUEUE;
2166 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2169 // Hand back capability
2170 task->suspended_tso = tso;
2172 ACQUIRE_LOCK(&cap->lock);
2174 suspendTask(cap,task);
2175 cap->in_haskell = rtsFalse;
2176 releaseCapability_(cap);
2178 RELEASE_LOCK(&cap->lock);
2180 #if defined(THREADED_RTS)
2181 /* Preparing to leave the RTS, so ensure there's a native thread/task
2182 waiting to take over.
2184 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2187 errno = saved_errno;
2192 resumeThread (void *task_)
2196 int saved_errno = errno;
2200 // Wait for permission to re-enter the RTS with the result.
2201 waitForReturnCapability(&cap,task);
2202 // we might be on a different capability now... but if so, our
2203 // entry on the suspended_ccalling_tasks list will also have been
2206 // Remove the thread from the suspended list
2207 recoverSuspendedTask(cap,task);
2209 tso = task->suspended_tso;
2210 task->suspended_tso = NULL;
2211 tso->link = END_TSO_QUEUE;
2212 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2214 if (tso->why_blocked == BlockedOnCCall) {
2215 awakenBlockedQueue(cap,tso->blocked_exceptions);
2216 tso->blocked_exceptions = NULL;
2219 /* Reset blocking status */
2220 tso->why_blocked = NotBlocked;
2222 cap->r.rCurrentTSO = tso;
2223 cap->in_haskell = rtsTrue;
2224 errno = saved_errno;
2229 /* ---------------------------------------------------------------------------
2230 * Comparing Thread ids.
2232 * This is used from STG land in the implementation of the
2233 * instances of Eq/Ord for ThreadIds.
2234 * ------------------------------------------------------------------------ */
2237 cmp_thread(StgPtr tso1, StgPtr tso2)
2239 StgThreadID id1 = ((StgTSO *)tso1)->id;
2240 StgThreadID id2 = ((StgTSO *)tso2)->id;
2242 if (id1 < id2) return (-1);
2243 if (id1 > id2) return 1;
2247 /* ---------------------------------------------------------------------------
2248 * Fetching the ThreadID from an StgTSO.
2250 * This is used in the implementation of Show for ThreadIds.
2251 * ------------------------------------------------------------------------ */
2253 rts_getThreadId(StgPtr tso)
2255 return ((StgTSO *)tso)->id;
2260 labelThread(StgPtr tso, char *label)
2265 /* Caveat: Once set, you can only set the thread name to "" */
2266 len = strlen(label)+1;
2267 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2268 strncpy(buf,label,len);
2269 /* Update will free the old memory for us */
2270 updateThreadLabel(((StgTSO *)tso)->id,buf);
2274 /* ---------------------------------------------------------------------------
2275 Create a new thread.
2277 The new thread starts with the given stack size. Before the
2278 scheduler can run, however, this thread needs to have a closure
2279 (and possibly some arguments) pushed on its stack. See
2280 pushClosure() in Schedule.h.
2282 createGenThread() and createIOThread() (in SchedAPI.h) are
2283 convenient packaged versions of this function.
2285 currently pri (priority) is only used in a GRAN setup -- HWL
2286 ------------------------------------------------------------------------ */
2288 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2290 createThread(nat size, StgInt pri)
2293 createThread(Capability *cap, nat size)
2299 /* sched_mutex is *not* required */
2301 /* First check whether we should create a thread at all */
2302 #if defined(PARALLEL_HASKELL)
2303 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2304 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2306 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2307 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2308 return END_TSO_QUEUE;
2314 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2317 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2319 /* catch ridiculously small stack sizes */
2320 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2321 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2324 stack_size = size - TSO_STRUCT_SIZEW;
2326 tso = (StgTSO *)allocateLocal(cap, size);
2327 TICK_ALLOC_TSO(stack_size, 0);
2329 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2331 SET_GRAN_HDR(tso, ThisPE);
2334 // Always start with the compiled code evaluator
2335 tso->what_next = ThreadRunGHC;
2337 tso->why_blocked = NotBlocked;
2338 tso->blocked_exceptions = NULL;
2340 tso->saved_errno = 0;
2343 tso->stack_size = stack_size;
2344 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2346 tso->sp = (P_)&(tso->stack) + stack_size;
2348 tso->trec = NO_TREC;
2351 tso->prof.CCCS = CCS_MAIN;
2354 /* put a stop frame on the stack */
2355 tso->sp -= sizeofW(StgStopFrame);
2356 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2357 tso->link = END_TSO_QUEUE;
2361 /* uses more flexible routine in GranSim */
2362 insertThread(tso, CurrentProc);
2364 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2370 if (RtsFlags.GranFlags.GranSimStats.Full)
2371 DumpGranEvent(GR_START,tso);
2372 #elif defined(PARALLEL_HASKELL)
2373 if (RtsFlags.ParFlags.ParStats.Full)
2374 DumpGranEvent(GR_STARTQ,tso);
2375 /* HACk to avoid SCHEDULE
2379 /* Link the new thread on the global thread list.
2381 ACQUIRE_LOCK(&sched_mutex);
2382 tso->id = next_thread_id++; // while we have the mutex
2383 tso->global_link = all_threads;
2385 RELEASE_LOCK(&sched_mutex);
2388 tso->dist.priority = MandatoryPriority; //by default that is...
2392 tso->gran.pri = pri;
2394 tso->gran.magic = TSO_MAGIC; // debugging only
2396 tso->gran.sparkname = 0;
2397 tso->gran.startedat = CURRENT_TIME;
2398 tso->gran.exported = 0;
2399 tso->gran.basicblocks = 0;
2400 tso->gran.allocs = 0;
2401 tso->gran.exectime = 0;
2402 tso->gran.fetchtime = 0;
2403 tso->gran.fetchcount = 0;
2404 tso->gran.blocktime = 0;
2405 tso->gran.blockcount = 0;
2406 tso->gran.blockedat = 0;
2407 tso->gran.globalsparks = 0;
2408 tso->gran.localsparks = 0;
2409 if (RtsFlags.GranFlags.Light)
2410 tso->gran.clock = Now; /* local clock */
2412 tso->gran.clock = 0;
2414 IF_DEBUG(gran,printTSO(tso));
2415 #elif defined(PARALLEL_HASKELL)
2417 tso->par.magic = TSO_MAGIC; // debugging only
2419 tso->par.sparkname = 0;
2420 tso->par.startedat = CURRENT_TIME;
2421 tso->par.exported = 0;
2422 tso->par.basicblocks = 0;
2423 tso->par.allocs = 0;
2424 tso->par.exectime = 0;
2425 tso->par.fetchtime = 0;
2426 tso->par.fetchcount = 0;
2427 tso->par.blocktime = 0;
2428 tso->par.blockcount = 0;
2429 tso->par.blockedat = 0;
2430 tso->par.globalsparks = 0;
2431 tso->par.localsparks = 0;
2435 globalGranStats.tot_threads_created++;
2436 globalGranStats.threads_created_on_PE[CurrentProc]++;
2437 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2438 globalGranStats.tot_sq_probes++;
2439 #elif defined(PARALLEL_HASKELL)
2440 // collect parallel global statistics (currently done together with GC stats)
2441 if (RtsFlags.ParFlags.ParStats.Global &&
2442 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2443 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2444 globalParStats.tot_threads_created++;
2450 sched_belch("==__ schedule: Created TSO %d (%p);",
2451 CurrentProc, tso, tso->id));
2452 #elif defined(PARALLEL_HASKELL)
2453 IF_PAR_DEBUG(verbose,
2454 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2455 (long)tso->id, tso, advisory_thread_count));
2457 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2458 (long)tso->id, (long)tso->stack_size));
2465 all parallel thread creation calls should fall through the following routine.
2468 createThreadFromSpark(rtsSpark spark)
2470 ASSERT(spark != (rtsSpark)NULL);
2471 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2472 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2474 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2475 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2476 return END_TSO_QUEUE;
2480 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2481 if (tso==END_TSO_QUEUE)
2482 barf("createSparkThread: Cannot create TSO");
2484 tso->priority = AdvisoryPriority;
2486 pushClosure(tso,spark);
2488 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2495 Turn a spark into a thread.
2496 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2500 activateSpark (rtsSpark spark)
2504 tso = createSparkThread(spark);
2505 if (RtsFlags.ParFlags.ParStats.Full) {
2506 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2507 IF_PAR_DEBUG(verbose,
2508 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2509 (StgClosure *)spark, info_type((StgClosure *)spark)));
2511 // ToDo: fwd info on local/global spark to thread -- HWL
2512 // tso->gran.exported = spark->exported;
2513 // tso->gran.locked = !spark->global;
2514 // tso->gran.sparkname = spark->name;
2520 /* ---------------------------------------------------------------------------
2523 * scheduleThread puts a thread on the end of the runnable queue.
2524 * This will usually be done immediately after a thread is created.
2525 * The caller of scheduleThread must create the thread using e.g.
2526 * createThread and push an appropriate closure
2527 * on this thread's stack before the scheduler is invoked.
2528 * ------------------------------------------------------------------------ */
2531 scheduleThread(Capability *cap, StgTSO *tso)
2533 // The thread goes at the *end* of the run-queue, to avoid possible
2534 // starvation of any threads already on the queue.
2535 appendToRunQueue(cap,tso);
2539 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2543 // We already created/initialised the Task
2544 task = cap->running_task;
2546 // This TSO is now a bound thread; make the Task and TSO
2547 // point to each other.
2552 task->stat = NoStatus;
2554 appendToRunQueue(cap,tso);
2556 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2559 /* GranSim specific init */
2560 CurrentTSO = m->tso; // the TSO to run
2561 procStatus[MainProc] = Busy; // status of main PE
2562 CurrentProc = MainProc; // PE to run it on
2565 cap = schedule(cap,task);
2567 ASSERT(task->stat != NoStatus);
2568 ASSERT_CAPABILITY_INVARIANTS(cap,task);
2570 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2574 /* ----------------------------------------------------------------------------
2576 * ------------------------------------------------------------------------- */
2578 #if defined(THREADED_RTS)
2580 workerStart(Task *task)
2584 // See startWorkerTask().
2585 ACQUIRE_LOCK(&task->lock);
2587 RELEASE_LOCK(&task->lock);
2589 // set the thread-local pointer to the Task:
2592 // schedule() runs without a lock.
2593 cap = schedule(cap,task);
2595 // On exit from schedule(), we have a Capability.
2596 releaseCapability(cap);
2601 /* ---------------------------------------------------------------------------
2604 * Initialise the scheduler. This resets all the queues - if the
2605 * queues contained any threads, they'll be garbage collected at the
2608 * ------------------------------------------------------------------------ */
2615 for (i=0; i<=MAX_PROC; i++) {
2616 run_queue_hds[i] = END_TSO_QUEUE;
2617 run_queue_tls[i] = END_TSO_QUEUE;
2618 blocked_queue_hds[i] = END_TSO_QUEUE;
2619 blocked_queue_tls[i] = END_TSO_QUEUE;
2620 ccalling_threadss[i] = END_TSO_QUEUE;
2621 blackhole_queue[i] = END_TSO_QUEUE;
2622 sleeping_queue = END_TSO_QUEUE;
2624 #elif !defined(THREADED_RTS)
2625 blocked_queue_hd = END_TSO_QUEUE;
2626 blocked_queue_tl = END_TSO_QUEUE;
2627 sleeping_queue = END_TSO_QUEUE;
2630 blackhole_queue = END_TSO_QUEUE;
2631 all_threads = END_TSO_QUEUE;
2636 RtsFlags.ConcFlags.ctxtSwitchTicks =
2637 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2639 #if defined(THREADED_RTS)
2640 /* Initialise the mutex and condition variables used by
2642 initMutex(&sched_mutex);
2645 ACQUIRE_LOCK(&sched_mutex);
2647 /* A capability holds the state a native thread needs in
2648 * order to execute STG code. At least one capability is
2649 * floating around (only SMP builds have more than one).
2657 * Eagerly start one worker to run each Capability, except for
2658 * Capability 0. The idea is that we're probably going to start a
2659 * bound thread on Capability 0 pretty soon, so we don't want a
2660 * worker task hogging it.
2665 for (i = 1; i < n_capabilities; i++) {
2666 cap = &capabilities[i];
2667 ACQUIRE_LOCK(&cap->lock);
2668 startWorkerTask(cap, workerStart);
2669 RELEASE_LOCK(&cap->lock);
2674 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2678 RELEASE_LOCK(&sched_mutex);
2682 exitScheduler( void )
2684 interrupted = rtsTrue;
2685 shutting_down_scheduler = rtsTrue;
2687 #if defined(THREADED_RTS)
2692 ACQUIRE_LOCK(&sched_mutex);
2693 task = newBoundTask();
2694 RELEASE_LOCK(&sched_mutex);
2696 for (i = 0; i < n_capabilities; i++) {
2697 shutdownCapability(&capabilities[i], task);
2699 boundTaskExiting(task);
2705 /* ---------------------------------------------------------------------------
2706 Where are the roots that we know about?
2708 - all the threads on the runnable queue
2709 - all the threads on the blocked queue
2710 - all the threads on the sleeping queue
2711 - all the thread currently executing a _ccall_GC
2712 - all the "main threads"
2714 ------------------------------------------------------------------------ */
2716 /* This has to be protected either by the scheduler monitor, or by the
2717 garbage collection monitor (probably the latter).
2722 GetRoots( evac_fn evac )
2729 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2730 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2731 evac((StgClosure **)&run_queue_hds[i]);
2732 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2733 evac((StgClosure **)&run_queue_tls[i]);
2735 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2736 evac((StgClosure **)&blocked_queue_hds[i]);
2737 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2738 evac((StgClosure **)&blocked_queue_tls[i]);
2739 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2740 evac((StgClosure **)&ccalling_threads[i]);
2747 for (i = 0; i < n_capabilities; i++) {
2748 cap = &capabilities[i];
2749 evac((StgClosure **)&cap->run_queue_hd);
2750 evac((StgClosure **)&cap->run_queue_tl);
2752 for (task = cap->suspended_ccalling_tasks; task != NULL;
2754 evac((StgClosure **)&task->suspended_tso);
2758 #if !defined(THREADED_RTS)
2759 evac((StgClosure **)&blocked_queue_hd);
2760 evac((StgClosure **)&blocked_queue_tl);
2761 evac((StgClosure **)&sleeping_queue);
2765 evac((StgClosure **)&blackhole_queue);
2767 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2768 markSparkQueue(evac);
2771 #if defined(RTS_USER_SIGNALS)
2772 // mark the signal handlers (signals should be already blocked)
2773 markSignalHandlers(evac);
2777 /* -----------------------------------------------------------------------------
2780 This is the interface to the garbage collector from Haskell land.
2781 We provide this so that external C code can allocate and garbage
2782 collect when called from Haskell via _ccall_GC.
2784 It might be useful to provide an interface whereby the programmer
2785 can specify more roots (ToDo).
2787 This needs to be protected by the GC condition variable above. KH.
2788 -------------------------------------------------------------------------- */
2790 static void (*extra_roots)(evac_fn);
2796 // ToDo: we have to grab all the capabilities here.
2797 errorBelch("performGC not supported in threaded RTS (yet)");
2798 stg_exit(EXIT_FAILURE);
2800 /* Obligated to hold this lock upon entry */
2801 GarbageCollect(GetRoots,rtsFalse);
2805 performMajorGC(void)
2808 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2809 stg_exit(EXIT_FAILURE);
2811 GarbageCollect(GetRoots,rtsTrue);
2815 AllRoots(evac_fn evac)
2817 GetRoots(evac); // the scheduler's roots
2818 extra_roots(evac); // the user's roots
2822 performGCWithRoots(void (*get_roots)(evac_fn))
2825 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2826 stg_exit(EXIT_FAILURE);
2828 extra_roots = get_roots;
2829 GarbageCollect(AllRoots,rtsFalse);
2832 /* -----------------------------------------------------------------------------
2835 If the thread has reached its maximum stack size, then raise the
2836 StackOverflow exception in the offending thread. Otherwise
2837 relocate the TSO into a larger chunk of memory and adjust its stack
2839 -------------------------------------------------------------------------- */
2842 threadStackOverflow(Capability *cap, StgTSO *tso)
2844 nat new_stack_size, stack_words;
2849 IF_DEBUG(sanity,checkTSO(tso));
2850 if (tso->stack_size >= tso->max_stack_size) {
2853 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2854 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2855 /* If we're debugging, just print out the top of the stack */
2856 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2859 /* Send this thread the StackOverflow exception */
2860 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2864 /* Try to double the current stack size. If that takes us over the
2865 * maximum stack size for this thread, then use the maximum instead.
2866 * Finally round up so the TSO ends up as a whole number of blocks.
2868 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2869 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2870 TSO_STRUCT_SIZE)/sizeof(W_);
2871 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2872 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2874 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2876 dest = (StgTSO *)allocate(new_tso_size);
2877 TICK_ALLOC_TSO(new_stack_size,0);
2879 /* copy the TSO block and the old stack into the new area */
2880 memcpy(dest,tso,TSO_STRUCT_SIZE);
2881 stack_words = tso->stack + tso->stack_size - tso->sp;
2882 new_sp = (P_)dest + new_tso_size - stack_words;
2883 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2885 /* relocate the stack pointers... */
2887 dest->stack_size = new_stack_size;
2889 /* Mark the old TSO as relocated. We have to check for relocated
2890 * TSOs in the garbage collector and any primops that deal with TSOs.
2892 * It's important to set the sp value to just beyond the end
2893 * of the stack, so we don't attempt to scavenge any part of the
2896 tso->what_next = ThreadRelocated;
2898 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2899 tso->why_blocked = NotBlocked;
2901 IF_PAR_DEBUG(verbose,
2902 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2903 tso->id, tso, tso->stack_size);
2904 /* If we're debugging, just print out the top of the stack */
2905 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2908 IF_DEBUG(sanity,checkTSO(tso));
2910 IF_DEBUG(scheduler,printTSO(dest));
2916 /* ---------------------------------------------------------------------------
2917 Wake up a queue that was blocked on some resource.
2918 ------------------------------------------------------------------------ */
2922 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2925 #elif defined(PARALLEL_HASKELL)
2927 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2929 /* write RESUME events to log file and
2930 update blocked and fetch time (depending on type of the orig closure) */
2931 if (RtsFlags.ParFlags.ParStats.Full) {
2932 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2933 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2934 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2935 if (emptyRunQueue())
2936 emitSchedule = rtsTrue;
2938 switch (get_itbl(node)->type) {
2940 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2945 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2952 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2959 StgBlockingQueueElement *
2960 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2963 PEs node_loc, tso_loc;
2965 node_loc = where_is(node); // should be lifted out of loop
2966 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2967 tso_loc = where_is((StgClosure *)tso);
2968 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2969 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2970 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2971 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2972 // insertThread(tso, node_loc);
2973 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2975 tso, node, (rtsSpark*)NULL);
2976 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2979 } else { // TSO is remote (actually should be FMBQ)
2980 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2981 RtsFlags.GranFlags.Costs.gunblocktime +
2982 RtsFlags.GranFlags.Costs.latency;
2983 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2985 tso, node, (rtsSpark*)NULL);
2986 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2989 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2991 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2992 (node_loc==tso_loc ? "Local" : "Global"),
2993 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2994 tso->block_info.closure = NULL;
2995 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2998 #elif defined(PARALLEL_HASKELL)
2999 StgBlockingQueueElement *
3000 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3002 StgBlockingQueueElement *next;
3004 switch (get_itbl(bqe)->type) {
3006 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3007 /* if it's a TSO just push it onto the run_queue */
3009 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3010 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3012 unblockCount(bqe, node);
3013 /* reset blocking status after dumping event */
3014 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3018 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3020 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3021 PendingFetches = (StgBlockedFetch *)bqe;
3025 /* can ignore this case in a non-debugging setup;
3026 see comments on RBHSave closures above */
3028 /* check that the closure is an RBHSave closure */
3029 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3030 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3031 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3035 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3036 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3040 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3046 unblockOne(Capability *cap, StgTSO *tso)
3050 ASSERT(get_itbl(tso)->type == TSO);
3051 ASSERT(tso->why_blocked != NotBlocked);
3052 tso->why_blocked = NotBlocked;
3054 tso->link = END_TSO_QUEUE;
3056 // We might have just migrated this TSO to our Capability:
3058 tso->bound->cap = cap;
3061 appendToRunQueue(cap,tso);
3063 // we're holding a newly woken thread, make sure we context switch
3064 // quickly so we can migrate it if necessary.
3066 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3073 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3075 StgBlockingQueueElement *bqe;
3080 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3081 node, CurrentProc, CurrentTime[CurrentProc],
3082 CurrentTSO->id, CurrentTSO));
3084 node_loc = where_is(node);
3086 ASSERT(q == END_BQ_QUEUE ||
3087 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3088 get_itbl(q)->type == CONSTR); // closure (type constructor)
3089 ASSERT(is_unique(node));
3091 /* FAKE FETCH: magically copy the node to the tso's proc;
3092 no Fetch necessary because in reality the node should not have been
3093 moved to the other PE in the first place
3095 if (CurrentProc!=node_loc) {
3097 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3098 node, node_loc, CurrentProc, CurrentTSO->id,
3099 // CurrentTSO, where_is(CurrentTSO),
3100 node->header.gran.procs));
3101 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3103 debugBelch("## new bitmask of node %p is %#x\n",
3104 node, node->header.gran.procs));
3105 if (RtsFlags.GranFlags.GranSimStats.Global) {
3106 globalGranStats.tot_fake_fetches++;
3111 // ToDo: check: ASSERT(CurrentProc==node_loc);
3112 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3115 bqe points to the current element in the queue
3116 next points to the next element in the queue
3118 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3119 //tso_loc = where_is(tso);
3121 bqe = unblockOne(bqe, node);
3124 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3125 the closure to make room for the anchor of the BQ */
3126 if (bqe!=END_BQ_QUEUE) {
3127 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3129 ASSERT((info_ptr==&RBH_Save_0_info) ||
3130 (info_ptr==&RBH_Save_1_info) ||
3131 (info_ptr==&RBH_Save_2_info));
3133 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3134 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3135 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3138 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3139 node, info_type(node)));
3142 /* statistics gathering */
3143 if (RtsFlags.GranFlags.GranSimStats.Global) {
3144 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3145 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3146 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3147 globalGranStats.tot_awbq++; // total no. of bqs awakened
3150 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3151 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3153 #elif defined(PARALLEL_HASKELL)
3155 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3157 StgBlockingQueueElement *bqe;
3159 IF_PAR_DEBUG(verbose,
3160 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3164 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3165 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3170 ASSERT(q == END_BQ_QUEUE ||
3171 get_itbl(q)->type == TSO ||
3172 get_itbl(q)->type == BLOCKED_FETCH ||
3173 get_itbl(q)->type == CONSTR);
3176 while (get_itbl(bqe)->type==TSO ||
3177 get_itbl(bqe)->type==BLOCKED_FETCH) {
3178 bqe = unblockOne(bqe, node);
3182 #else /* !GRAN && !PARALLEL_HASKELL */
3185 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3187 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3189 while (tso != END_TSO_QUEUE) {
3190 tso = unblockOne(cap,tso);
3195 /* ---------------------------------------------------------------------------
3197 - usually called inside a signal handler so it mustn't do anything fancy.
3198 ------------------------------------------------------------------------ */
3201 interruptStgRts(void)
3205 #if defined(THREADED_RTS)
3206 prodAllCapabilities();
3210 /* -----------------------------------------------------------------------------
3213 This is for use when we raise an exception in another thread, which
3215 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3216 -------------------------------------------------------------------------- */
3218 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3220 NB: only the type of the blocking queue is different in GranSim and GUM
3221 the operations on the queue-elements are the same
3222 long live polymorphism!
3224 Locks: sched_mutex is held upon entry and exit.
3228 unblockThread(Capability *cap, StgTSO *tso)
3230 StgBlockingQueueElement *t, **last;
3232 switch (tso->why_blocked) {
3235 return; /* not blocked */
3238 // Be careful: nothing to do here! We tell the scheduler that the thread
3239 // is runnable and we leave it to the stack-walking code to abort the
3240 // transaction while unwinding the stack. We should perhaps have a debugging
3241 // test to make sure that this really happens and that the 'zombie' transaction
3242 // does not get committed.
3246 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3248 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3249 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3251 last = (StgBlockingQueueElement **)&mvar->head;
3252 for (t = (StgBlockingQueueElement *)mvar->head;
3254 last = &t->link, last_tso = t, t = t->link) {
3255 if (t == (StgBlockingQueueElement *)tso) {
3256 *last = (StgBlockingQueueElement *)tso->link;
3257 if (mvar->tail == tso) {
3258 mvar->tail = (StgTSO *)last_tso;
3263 barf("unblockThread (MVAR): TSO not found");
3266 case BlockedOnBlackHole:
3267 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3269 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3271 last = &bq->blocking_queue;
3272 for (t = bq->blocking_queue;
3274 last = &t->link, t = t->link) {
3275 if (t == (StgBlockingQueueElement *)tso) {
3276 *last = (StgBlockingQueueElement *)tso->link;
3280 barf("unblockThread (BLACKHOLE): TSO not found");
3283 case BlockedOnException:
3285 StgTSO *target = tso->block_info.tso;
3287 ASSERT(get_itbl(target)->type == TSO);
3289 if (target->what_next == ThreadRelocated) {
3290 target = target->link;
3291 ASSERT(get_itbl(target)->type == TSO);
3294 ASSERT(target->blocked_exceptions != NULL);
3296 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3297 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3299 last = &t->link, t = t->link) {
3300 ASSERT(get_itbl(t)->type == TSO);
3301 if (t == (StgBlockingQueueElement *)tso) {
3302 *last = (StgBlockingQueueElement *)tso->link;
3306 barf("unblockThread (Exception): TSO not found");
3310 case BlockedOnWrite:
3311 #if defined(mingw32_HOST_OS)
3312 case BlockedOnDoProc:
3315 /* take TSO off blocked_queue */
3316 StgBlockingQueueElement *prev = NULL;
3317 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3318 prev = t, t = t->link) {
3319 if (t == (StgBlockingQueueElement *)tso) {
3321 blocked_queue_hd = (StgTSO *)t->link;
3322 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3323 blocked_queue_tl = END_TSO_QUEUE;
3326 prev->link = t->link;
3327 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3328 blocked_queue_tl = (StgTSO *)prev;
3331 #if defined(mingw32_HOST_OS)
3332 /* (Cooperatively) signal that the worker thread should abort
3335 abandonWorkRequest(tso->block_info.async_result->reqID);
3340 barf("unblockThread (I/O): TSO not found");
3343 case BlockedOnDelay:
3345 /* take TSO off sleeping_queue */
3346 StgBlockingQueueElement *prev = NULL;
3347 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3348 prev = t, t = t->link) {
3349 if (t == (StgBlockingQueueElement *)tso) {
3351 sleeping_queue = (StgTSO *)t->link;
3353 prev->link = t->link;
3358 barf("unblockThread (delay): TSO not found");
3362 barf("unblockThread");
3366 tso->link = END_TSO_QUEUE;
3367 tso->why_blocked = NotBlocked;
3368 tso->block_info.closure = NULL;
3369 pushOnRunQueue(cap,tso);
3373 unblockThread(Capability *cap, StgTSO *tso)
3377 /* To avoid locking unnecessarily. */
3378 if (tso->why_blocked == NotBlocked) {
3382 switch (tso->why_blocked) {
3385 // Be careful: nothing to do here! We tell the scheduler that the thread
3386 // is runnable and we leave it to the stack-walking code to abort the
3387 // transaction while unwinding the stack. We should perhaps have a debugging
3388 // test to make sure that this really happens and that the 'zombie' transaction
3389 // does not get committed.
3393 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3395 StgTSO *last_tso = END_TSO_QUEUE;
3396 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3399 for (t = mvar->head; t != END_TSO_QUEUE;
3400 last = &t->link, last_tso = t, t = t->link) {
3403 if (mvar->tail == tso) {
3404 mvar->tail = last_tso;
3409 barf("unblockThread (MVAR): TSO not found");
3412 case BlockedOnBlackHole:
3414 last = &blackhole_queue;
3415 for (t = blackhole_queue; t != END_TSO_QUEUE;
3416 last = &t->link, t = t->link) {
3422 barf("unblockThread (BLACKHOLE): TSO not found");
3425 case BlockedOnException:
3427 StgTSO *target = tso->block_info.tso;
3429 ASSERT(get_itbl(target)->type == TSO);
3431 while (target->what_next == ThreadRelocated) {
3432 target = target->link;
3433 ASSERT(get_itbl(target)->type == TSO);
3436 ASSERT(target->blocked_exceptions != NULL);
3438 last = &target->blocked_exceptions;
3439 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3440 last = &t->link, t = t->link) {
3441 ASSERT(get_itbl(t)->type == TSO);
3447 barf("unblockThread (Exception): TSO not found");
3450 #if !defined(THREADED_RTS)
3452 case BlockedOnWrite:
3453 #if defined(mingw32_HOST_OS)
3454 case BlockedOnDoProc:
3457 StgTSO *prev = NULL;
3458 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3459 prev = t, t = t->link) {
3462 blocked_queue_hd = t->link;
3463 if (blocked_queue_tl == t) {
3464 blocked_queue_tl = END_TSO_QUEUE;
3467 prev->link = t->link;
3468 if (blocked_queue_tl == t) {
3469 blocked_queue_tl = prev;
3472 #if defined(mingw32_HOST_OS)
3473 /* (Cooperatively) signal that the worker thread should abort
3476 abandonWorkRequest(tso->block_info.async_result->reqID);
3481 barf("unblockThread (I/O): TSO not found");
3484 case BlockedOnDelay:
3486 StgTSO *prev = NULL;
3487 for (t = sleeping_queue; t != END_TSO_QUEUE;
3488 prev = t, t = t->link) {
3491 sleeping_queue = t->link;
3493 prev->link = t->link;
3498 barf("unblockThread (delay): TSO not found");
3503 barf("unblockThread");
3507 tso->link = END_TSO_QUEUE;
3508 tso->why_blocked = NotBlocked;
3509 tso->block_info.closure = NULL;
3510 appendToRunQueue(cap,tso);
3514 /* -----------------------------------------------------------------------------
3517 * Check the blackhole_queue for threads that can be woken up. We do
3518 * this periodically: before every GC, and whenever the run queue is
3521 * An elegant solution might be to just wake up all the blocked
3522 * threads with awakenBlockedQueue occasionally: they'll go back to
3523 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3524 * doesn't give us a way to tell whether we've actually managed to
3525 * wake up any threads, so we would be busy-waiting.
3527 * -------------------------------------------------------------------------- */
3530 checkBlackHoles (Capability *cap)
3533 rtsBool any_woke_up = rtsFalse;
3536 // blackhole_queue is global:
3537 ASSERT_LOCK_HELD(&sched_mutex);
3539 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3541 // ASSUMES: sched_mutex
3542 prev = &blackhole_queue;
3543 t = blackhole_queue;
3544 while (t != END_TSO_QUEUE) {
3545 ASSERT(t->why_blocked == BlockedOnBlackHole);
3546 type = get_itbl(t->block_info.closure)->type;
3547 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3548 IF_DEBUG(sanity,checkTSO(t));
3549 t = unblockOne(cap, t);
3550 // urk, the threads migrate to the current capability
3551 // here, but we'd like to keep them on the original one.
3553 any_woke_up = rtsTrue;
3563 /* -----------------------------------------------------------------------------
3566 * The following function implements the magic for raising an
3567 * asynchronous exception in an existing thread.
3569 * We first remove the thread from any queue on which it might be
3570 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3572 * We strip the stack down to the innermost CATCH_FRAME, building
3573 * thunks in the heap for all the active computations, so they can
3574 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3575 * an application of the handler to the exception, and push it on
3576 * the top of the stack.
3578 * How exactly do we save all the active computations? We create an
3579 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3580 * AP_STACKs pushes everything from the corresponding update frame
3581 * upwards onto the stack. (Actually, it pushes everything up to the
3582 * next update frame plus a pointer to the next AP_STACK object.
3583 * Entering the next AP_STACK object pushes more onto the stack until we
3584 * reach the last AP_STACK object - at which point the stack should look
3585 * exactly as it did when we killed the TSO and we can continue
3586 * execution by entering the closure on top of the stack.
3588 * We can also kill a thread entirely - this happens if either (a) the
3589 * exception passed to raiseAsync is NULL, or (b) there's no
3590 * CATCH_FRAME on the stack. In either case, we strip the entire
3591 * stack and replace the thread with a zombie.
3593 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3594 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3595 * the TSO is currently blocked on or on the run queue of.
3597 * -------------------------------------------------------------------------- */
3600 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3602 raiseAsync_(cap, tso, exception, rtsFalse);
3606 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3607 rtsBool stop_at_atomically)
3609 StgRetInfoTable *info;
3612 // Thread already dead?
3613 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3618 sched_belch("raising exception in thread %ld.", (long)tso->id));
3620 // Remove it from any blocking queues
3621 unblockThread(cap,tso);
3625 // The stack freezing code assumes there's a closure pointer on
3626 // the top of the stack, so we have to arrange that this is the case...
3628 if (sp[0] == (W_)&stg_enter_info) {
3632 sp[0] = (W_)&stg_dummy_ret_closure;
3638 // 1. Let the top of the stack be the "current closure"
3640 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3643 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3644 // current closure applied to the chunk of stack up to (but not
3645 // including) the update frame. This closure becomes the "current
3646 // closure". Go back to step 2.
3648 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3649 // top of the stack applied to the exception.
3651 // 5. If it's a STOP_FRAME, then kill the thread.
3653 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3660 info = get_ret_itbl((StgClosure *)frame);
3662 while (info->i.type != UPDATE_FRAME
3663 && (info->i.type != CATCH_FRAME || exception == NULL)
3664 && info->i.type != STOP_FRAME
3665 && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3667 if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3668 // IF we find an ATOMICALLY_FRAME then we abort the
3669 // current transaction and propagate the exception. In
3670 // this case (unlike ordinary exceptions) we do not care
3671 // whether the transaction is valid or not because its
3672 // possible validity cannot have caused the exception
3673 // and will not be visible after the abort.
3675 debugBelch("Found atomically block delivering async exception\n"));
3676 stmAbortTransaction(tso -> trec);
3677 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3679 frame += stack_frame_sizeW((StgClosure *)frame);
3680 info = get_ret_itbl((StgClosure *)frame);
3683 switch (info->i.type) {
3685 case ATOMICALLY_FRAME:
3686 ASSERT(stop_at_atomically);
3687 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3688 stmCondemnTransaction(tso -> trec);
3692 // R1 is not a register: the return convention for IO in
3693 // this case puts the return value on the stack, so we
3694 // need to set up the stack to return to the atomically
3695 // frame properly...
3696 tso->sp = frame - 2;
3697 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3698 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3700 tso->what_next = ThreadRunGHC;
3704 // If we find a CATCH_FRAME, and we've got an exception to raise,
3705 // then build the THUNK raise(exception), and leave it on
3706 // top of the CATCH_FRAME ready to enter.
3710 StgCatchFrame *cf = (StgCatchFrame *)frame;
3714 // we've got an exception to raise, so let's pass it to the
3715 // handler in this frame.
3717 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3718 TICK_ALLOC_SE_THK(1,0);
3719 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3720 raise->payload[0] = exception;
3722 // throw away the stack from Sp up to the CATCH_FRAME.
3726 /* Ensure that async excpetions are blocked now, so we don't get
3727 * a surprise exception before we get around to executing the
3730 if (tso->blocked_exceptions == NULL) {
3731 tso->blocked_exceptions = END_TSO_QUEUE;
3734 /* Put the newly-built THUNK on top of the stack, ready to execute
3735 * when the thread restarts.
3738 sp[-1] = (W_)&stg_enter_info;
3740 tso->what_next = ThreadRunGHC;
3741 IF_DEBUG(sanity, checkTSO(tso));
3750 // First build an AP_STACK consisting of the stack chunk above the
3751 // current update frame, with the top word on the stack as the
3754 words = frame - sp - 1;
3755 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3758 ap->fun = (StgClosure *)sp[0];
3760 for(i=0; i < (nat)words; ++i) {
3761 ap->payload[i] = (StgClosure *)*sp++;
3764 SET_HDR(ap,&stg_AP_STACK_info,
3765 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3766 TICK_ALLOC_UP_THK(words+1,0);
3769 debugBelch("sched: Updating ");
3770 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3771 debugBelch(" with ");
3772 printObj((StgClosure *)ap);
3775 // Replace the updatee with an indirection - happily
3776 // this will also wake up any threads currently
3777 // waiting on the result.
3779 // Warning: if we're in a loop, more than one update frame on
3780 // the stack may point to the same object. Be careful not to
3781 // overwrite an IND_OLDGEN in this case, because we'll screw
3782 // up the mutable lists. To be on the safe side, don't
3783 // overwrite any kind of indirection at all. See also
3784 // threadSqueezeStack in GC.c, where we have to make a similar
3787 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3788 // revert the black hole
3789 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3792 sp += sizeofW(StgUpdateFrame) - 1;
3793 sp[0] = (W_)ap; // push onto stack
3798 // We've stripped the entire stack, the thread is now dead.
3799 sp += sizeofW(StgStopFrame);
3800 tso->what_next = ThreadKilled;
3811 /* -----------------------------------------------------------------------------
3814 This is used for interruption (^C) and forking, and corresponds to
3815 raising an exception but without letting the thread catch the
3817 -------------------------------------------------------------------------- */
3820 deleteThread (Capability *cap, StgTSO *tso)
3822 if (tso->why_blocked != BlockedOnCCall &&
3823 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3824 raiseAsync(cap,tso,NULL);
3828 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3830 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3831 { // for forkProcess only:
3832 // delete thread without giving it a chance to catch the KillThread exception
3834 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3838 if (tso->why_blocked != BlockedOnCCall &&
3839 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3840 unblockThread(cap,tso);
3843 tso->what_next = ThreadKilled;
3847 /* -----------------------------------------------------------------------------
3848 raiseExceptionHelper
3850 This function is called by the raise# primitve, just so that we can
3851 move some of the tricky bits of raising an exception from C-- into
3852 C. Who knows, it might be a useful re-useable thing here too.
3853 -------------------------------------------------------------------------- */
3856 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3858 Capability *cap = regTableToCapability(reg);
3859 StgThunk *raise_closure = NULL;
3861 StgRetInfoTable *info;
3863 // This closure represents the expression 'raise# E' where E
3864 // is the exception raise. It is used to overwrite all the
3865 // thunks which are currently under evaluataion.
3869 // LDV profiling: stg_raise_info has THUNK as its closure
3870 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3871 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3872 // 1 does not cause any problem unless profiling is performed.
3873 // However, when LDV profiling goes on, we need to linearly scan
3874 // small object pool, where raise_closure is stored, so we should
3875 // use MIN_UPD_SIZE.
3877 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3878 // sizeofW(StgClosure)+1);
3882 // Walk up the stack, looking for the catch frame. On the way,
3883 // we update any closures pointed to from update frames with the
3884 // raise closure that we just built.
3888 info = get_ret_itbl((StgClosure *)p);
3889 next = p + stack_frame_sizeW((StgClosure *)p);
3890 switch (info->i.type) {
3893 // Only create raise_closure if we need to.
3894 if (raise_closure == NULL) {
3896 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3897 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3898 raise_closure->payload[0] = exception;
3900 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3904 case ATOMICALLY_FRAME:
3905 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3907 return ATOMICALLY_FRAME;
3913 case CATCH_STM_FRAME:
3914 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3916 return CATCH_STM_FRAME;
3922 case CATCH_RETRY_FRAME:
3931 /* -----------------------------------------------------------------------------
3932 findRetryFrameHelper
3934 This function is called by the retry# primitive. It traverses the stack
3935 leaving tso->sp referring to the frame which should handle the retry.
3937 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3938 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3940 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3941 despite the similar implementation.
3943 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3944 not be created within memory transactions.
3945 -------------------------------------------------------------------------- */
3948 findRetryFrameHelper (StgTSO *tso)
3951 StgRetInfoTable *info;
3955 info = get_ret_itbl((StgClosure *)p);
3956 next = p + stack_frame_sizeW((StgClosure *)p);
3957 switch (info->i.type) {
3959 case ATOMICALLY_FRAME:
3960 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3962 return ATOMICALLY_FRAME;
3964 case CATCH_RETRY_FRAME:
3965 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3967 return CATCH_RETRY_FRAME;
3969 case CATCH_STM_FRAME:
3971 ASSERT(info->i.type != CATCH_FRAME);
3972 ASSERT(info->i.type != STOP_FRAME);
3979 /* -----------------------------------------------------------------------------
3980 resurrectThreads is called after garbage collection on the list of
3981 threads found to be garbage. Each of these threads will be woken
3982 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3983 on an MVar, or NonTermination if the thread was blocked on a Black
3986 Locks: assumes we hold *all* the capabilities.
3987 -------------------------------------------------------------------------- */
3990 resurrectThreads (StgTSO *threads)
3995 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3996 next = tso->global_link;
3997 tso->global_link = all_threads;
3999 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4001 // Wake up the thread on the Capability it was last on for a
4002 // bound thread, or last_free_capability otherwise.
4004 cap = tso->bound->cap;
4006 cap = last_free_capability;
4009 switch (tso->why_blocked) {
4011 case BlockedOnException:
4012 /* Called by GC - sched_mutex lock is currently held. */
4013 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4015 case BlockedOnBlackHole:
4016 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4019 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4022 /* This might happen if the thread was blocked on a black hole
4023 * belonging to a thread that we've just woken up (raiseAsync
4024 * can wake up threads, remember...).
4028 barf("resurrectThreads: thread blocked in a strange way");
4033 /* ----------------------------------------------------------------------------
4034 * Debugging: why is a thread blocked
4035 * [Also provides useful information when debugging threaded programs
4036 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4037 ------------------------------------------------------------------------- */
4041 printThreadBlockage(StgTSO *tso)
4043 switch (tso->why_blocked) {
4045 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4047 case BlockedOnWrite:
4048 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4050 #if defined(mingw32_HOST_OS)
4051 case BlockedOnDoProc:
4052 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4055 case BlockedOnDelay:
4056 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4059 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4061 case BlockedOnException:
4062 debugBelch("is blocked on delivering an exception to thread %d",
4063 tso->block_info.tso->id);
4065 case BlockedOnBlackHole:
4066 debugBelch("is blocked on a black hole");
4069 debugBelch("is not blocked");
4071 #if defined(PARALLEL_HASKELL)
4073 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4074 tso->block_info.closure, info_type(tso->block_info.closure));
4076 case BlockedOnGA_NoSend:
4077 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4078 tso->block_info.closure, info_type(tso->block_info.closure));
4081 case BlockedOnCCall:
4082 debugBelch("is blocked on an external call");
4084 case BlockedOnCCall_NoUnblockExc:
4085 debugBelch("is blocked on an external call (exceptions were already blocked)");
4088 debugBelch("is blocked on an STM operation");
4091 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4092 tso->why_blocked, tso->id, tso);
4097 printThreadStatus(StgTSO *t)
4099 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4101 void *label = lookupThreadLabel(t->id);
4102 if (label) debugBelch("[\"%s\"] ",(char *)label);
4104 if (t->what_next == ThreadRelocated) {
4105 debugBelch("has been relocated...\n");
4107 switch (t->what_next) {
4109 debugBelch("has been killed");
4111 case ThreadComplete:
4112 debugBelch("has completed");
4115 printThreadBlockage(t);
4122 printAllThreads(void)
4129 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4130 ullong_format_string(TIME_ON_PROC(CurrentProc),
4131 time_string, rtsFalse/*no commas!*/);
4133 debugBelch("all threads at [%s]:\n", time_string);
4134 # elif defined(PARALLEL_HASKELL)
4135 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4136 ullong_format_string(CURRENT_TIME,
4137 time_string, rtsFalse/*no commas!*/);
4139 debugBelch("all threads at [%s]:\n", time_string);
4141 debugBelch("all threads:\n");
4144 for (i = 0; i < n_capabilities; i++) {
4145 cap = &capabilities[i];
4146 debugBelch("threads on capability %d:\n", cap->no);
4147 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4148 printThreadStatus(t);
4152 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4153 if (t->why_blocked != NotBlocked) {
4154 printThreadStatus(t);
4156 if (t->what_next == ThreadRelocated) {
4159 next = t->global_link;
4166 printThreadQueue(StgTSO *t)
4169 for (; t != END_TSO_QUEUE; t = t->link) {
4170 printThreadStatus(t);
4173 debugBelch("%d threads on queue\n", i);
4177 Print a whole blocking queue attached to node (debugging only).
4179 # if defined(PARALLEL_HASKELL)
4181 print_bq (StgClosure *node)
4183 StgBlockingQueueElement *bqe;
4187 debugBelch("## BQ of closure %p (%s): ",
4188 node, info_type(node));
4190 /* should cover all closures that may have a blocking queue */
4191 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4192 get_itbl(node)->type == FETCH_ME_BQ ||
4193 get_itbl(node)->type == RBH ||
4194 get_itbl(node)->type == MVAR);
4196 ASSERT(node!=(StgClosure*)NULL); // sanity check
4198 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4202 Print a whole blocking queue starting with the element bqe.
4205 print_bqe (StgBlockingQueueElement *bqe)
4210 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4212 for (end = (bqe==END_BQ_QUEUE);
4213 !end; // iterate until bqe points to a CONSTR
4214 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4215 bqe = end ? END_BQ_QUEUE : bqe->link) {
4216 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4217 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4218 /* types of closures that may appear in a blocking queue */
4219 ASSERT(get_itbl(bqe)->type == TSO ||
4220 get_itbl(bqe)->type == BLOCKED_FETCH ||
4221 get_itbl(bqe)->type == CONSTR);
4222 /* only BQs of an RBH end with an RBH_Save closure */
4223 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4225 switch (get_itbl(bqe)->type) {
4227 debugBelch(" TSO %u (%x),",
4228 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4231 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4232 ((StgBlockedFetch *)bqe)->node,
4233 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4234 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4235 ((StgBlockedFetch *)bqe)->ga.weight);
4238 debugBelch(" %s (IP %p),",
4239 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4240 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4241 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4242 "RBH_Save_?"), get_itbl(bqe));
4245 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4246 info_type((StgClosure *)bqe)); // , node, info_type(node));
4252 # elif defined(GRAN)
4254 print_bq (StgClosure *node)
4256 StgBlockingQueueElement *bqe;
4257 PEs node_loc, tso_loc;
4260 /* should cover all closures that may have a blocking queue */
4261 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4262 get_itbl(node)->type == FETCH_ME_BQ ||
4263 get_itbl(node)->type == RBH);
4265 ASSERT(node!=(StgClosure*)NULL); // sanity check
4266 node_loc = where_is(node);
4268 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4269 node, info_type(node), node_loc);
4272 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4274 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4275 !end; // iterate until bqe points to a CONSTR
4276 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4277 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4278 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4279 /* types of closures that may appear in a blocking queue */
4280 ASSERT(get_itbl(bqe)->type == TSO ||
4281 get_itbl(bqe)->type == CONSTR);
4282 /* only BQs of an RBH end with an RBH_Save closure */
4283 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4285 tso_loc = where_is((StgClosure *)bqe);
4286 switch (get_itbl(bqe)->type) {
4288 debugBelch(" TSO %d (%p) on [PE %d],",
4289 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4292 debugBelch(" %s (IP %p),",
4293 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4294 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4295 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4296 "RBH_Save_?"), get_itbl(bqe));
4299 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4300 info_type((StgClosure *)bqe), node, info_type(node));
4308 #if defined(PARALLEL_HASKELL)
4315 for (i=0, tso=run_queue_hd;
4316 tso != END_TSO_QUEUE;
4317 i++, tso=tso->link) {
4326 sched_belch(char *s, ...)
4331 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4332 #elif defined(PARALLEL_HASKELL)
4335 debugBelch("sched: ");