1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
79 #define USED_WHEN_THREADED_RTS STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
86 #define USED_WHEN_SMP STG_UNUSED
89 /* -----------------------------------------------------------------------------
91 * -------------------------------------------------------------------------- */
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
99 In GranSim we have a runnable and a blocked queue for each processor.
100 In order to minimise code changes new arrays run_queue_hds/tls
101 are created. run_queue_hd is then a short cut (macro) for
102 run_queue_hds[CurrentProc] (see GranSim.h).
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109 the std RTS (i.e. we are cheating). However, we don't use this list in
110 the GranSim specific code at the moment (so we are only potentially
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
122 /* Threads blocked on blackholes.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *blackhole_queue = NULL;
128 /* The blackhole_queue should be checked for threads to wake up. See
129 * Schedule.h for more thorough comment.
130 * LOCK: none (doesn't matter if we miss an update)
132 rtsBool blackholes_need_checking = rtsFalse;
134 /* Linked list of all threads.
135 * Used for detecting garbage collected threads.
136 * LOCK: sched_mutex+capability, or all capabilities
138 StgTSO *all_threads = NULL;
140 /* flag set by signal handler to precipitate a context switch
141 * LOCK: none (just an advisory flag)
143 int context_switch = 0;
145 /* flag that tracks whether we have done any execution in this time slice.
146 * LOCK: currently none, perhaps we should lock (but needs to be
147 * updated in the fast path of the scheduler).
149 nat recent_activity = ACTIVITY_YES;
151 /* if this flag is set as well, give up execution
152 * LOCK: none (changes once, from false->true)
154 rtsBool interrupted = rtsFalse;
156 /* Next thread ID to allocate.
159 static StgThreadID next_thread_id = 1;
161 /* The smallest stack size that makes any sense is:
162 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
163 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
164 * + 1 (the closure to enter)
166 * + 1 (spare slot req'd by stg_ap_v_ret)
168 * A thread with this stack will bomb immediately with a stack
169 * overflow, which will increase its stack size.
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
177 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
178 * exists - earlier gccs apparently didn't.
184 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185 * in an MT setting, needed to signal that a worker thread shouldn't hang around
186 * in the scheduler when it is out of work.
188 rtsBool shutting_down_scheduler = rtsFalse;
191 * This mutex protects most of the global scheduler data in
192 * the THREADED_RTS and (inc. SMP) runtime.
194 #if defined(THREADED_RTS)
198 #if defined(PARALLEL_HASKELL)
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
204 /* -----------------------------------------------------------------------------
205 * static function prototypes
206 * -------------------------------------------------------------------------- */
208 static Capability *schedule (Capability *initialCapability, Task *task);
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
215 static void schedulePreLoop (void);
217 static void schedulePushWork(Capability *cap, Task *task);
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
239 nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
253 rtsBool stop_at_atomically);
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);
270 static char *whatNext_strs[] = {
280 /* -----------------------------------------------------------------------------
281 * Putting a thread on the run queue: different scheduling policies
282 * -------------------------------------------------------------------------- */
285 addToRunQueue( Capability *cap, StgTSO *t )
287 #if defined(PARALLEL_HASKELL)
288 if (RtsFlags.ParFlags.doFairScheduling) {
289 // this does round-robin scheduling; good for concurrency
290 appendToRunQueue(cap,t);
292 // this does unfair scheduling; good for parallelism
293 pushOnRunQueue(cap,t);
296 // this does round-robin scheduling; good for concurrency
297 appendToRunQueue(cap,t);
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
314 In a GranSim setup this loop iterates over the global event queue.
315 This revolves around the global event queue, which determines what
316 to do next. Therefore, it's more complicated than either the
317 concurrent or the parallel (GUM) setup.
320 GUM iterates over incoming messages.
321 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322 and sends out a fish whenever it has nothing to do; in-between
323 doing the actual reductions (shared code below) it processes the
324 incoming messages and deals with delayed operations
325 (see PendingFetches).
326 This is not the ugliest code you could imagine, but it's bloody close.
328 ------------------------------------------------------------------------ */
331 schedule (Capability *initialCapability, Task *task)
335 StgThreadReturnCode ret;
338 #elif defined(PARALLEL_HASKELL)
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
348 #if defined(THREADED_RTS)
349 rtsBool first = rtsTrue;
352 cap = initialCapability;
354 // Pre-condition: this task owns initialCapability.
355 // The sched_mutex is *NOT* held
356 // NB. on return, we still hold a capability.
359 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360 task, initialCapability);
365 // -----------------------------------------------------------
366 // Scheduler loop starts here:
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION (!receivedFinish)
371 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
373 #define TERMINATION_CONDITION rtsTrue
376 while (TERMINATION_CONDITION) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
384 #if defined(THREADED_RTS)
386 // don't yield the first time, we want a chance to run this
387 // thread for a bit, even if there are others banging at the
390 ASSERT_CAPABILITY_INVARIANTS(cap,task);
392 // Yield the capability to higher-priority tasks if necessary.
393 yieldCapability(&cap, task);
398 schedulePushWork(cap,task);
401 // Check whether we have re-entered the RTS from Haskell without
402 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
404 if (cap->in_haskell) {
405 errorBelch("schedule: re-entered unsafely.\n"
406 " Perhaps a 'foreign import unsafe' should be 'safe'?");
407 stg_exit(EXIT_FAILURE);
411 // Test for interruption. If interrupted==rtsTrue, then either
412 // we received a keyboard interrupt (^C), or the scheduler is
413 // trying to shut down all the tasks (shutting_down_scheduler) in
418 if (shutting_down_scheduler) {
419 IF_DEBUG(scheduler, sched_belch("shutting down"));
420 // If we are a worker, just exit. If we're a bound thread
421 // then we will exit below when we've removed our TSO from
423 if (task->tso == NULL && emptyRunQueue(cap)) {
427 IF_DEBUG(scheduler, sched_belch("interrupted"));
431 #if defined(not_yet) && defined(SMP)
433 // Top up the run queue from our spark pool. We try to make the
434 // number of threads in the run queue equal to the number of
435 // free capabilities.
439 if (emptyRunQueue()) {
440 spark = findSpark(rtsFalse);
442 break; /* no more sparks in the pool */
444 createSparkThread(spark);
446 sched_belch("==^^ turning spark of closure %p into a thread",
447 (StgClosure *)spark));
453 scheduleStartSignalHandlers(cap);
455 // Only check the black holes here if we've nothing else to do.
456 // During normal execution, the black hole list only gets checked
457 // at GC time, to avoid repeatedly traversing this possibly long
458 // list each time around the scheduler.
459 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
461 scheduleCheckBlockedThreads(cap);
463 scheduleDetectDeadlock(cap,task);
465 // Normally, the only way we can get here with no threads to
466 // run is if a keyboard interrupt received during
467 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468 // Additionally, it is not fatal for the
469 // threaded RTS to reach here with no threads to run.
471 // win32: might be here due to awaitEvent() being abandoned
472 // as a result of a console event having been delivered.
473 if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
477 continue; // nothing to do
480 #if defined(PARALLEL_HASKELL)
481 scheduleSendPendingMessages();
482 if (emptyRunQueue(cap) && scheduleActivateSpark())
486 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
489 /* If we still have no work we need to send a FISH to get a spark
491 if (emptyRunQueue(cap)) {
492 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493 ASSERT(rtsFalse); // should not happen at the moment
495 // from here: non-empty run queue.
496 // TODO: merge above case with this, only one call processMessages() !
497 if (PacketsWaiting()) { /* process incoming messages, if
498 any pending... only in else
499 because getRemoteWork waits for
501 receivedFinish = processMessages();
506 scheduleProcessEvent(event);
510 // Get a thread to run
512 t = popRunQueue(cap);
514 #if defined(GRAN) || defined(PAR)
515 scheduleGranParReport(); // some kind of debuging output
517 // Sanity check the thread we're about to run. This can be
518 // expensive if there is lots of thread switching going on...
519 IF_DEBUG(sanity,checkTSO(t));
522 #if defined(THREADED_RTS)
523 // Check whether we can run this thread in the current task.
524 // If not, we have to pass our capability to the right task.
526 Task *bound = t->bound;
531 sched_belch("### Running thread %d in bound thread",
533 // yes, the Haskell thread is bound to the current native thread
536 sched_belch("### thread %d bound to another OS thread",
538 // no, bound to a different Haskell thread: pass to that thread
539 pushOnRunQueue(cap,t);
543 // The thread we want to run is unbound.
546 sched_belch("### this OS thread cannot run thread %d", t->id));
547 // no, the current native thread is bound to a different
548 // Haskell thread, so pass it to any worker thread
549 pushOnRunQueue(cap,t);
556 cap->r.rCurrentTSO = t;
558 /* context switches are initiated by the timer signal, unless
559 * the user specified "context switch as often as possible", with
562 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563 && !emptyThreadQueues(cap)) {
569 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
570 (long)t->id, whatNext_strs[t->what_next]));
572 #if defined(PROFILING)
573 startHeapProfTimer();
576 // ----------------------------------------------------------------------
577 // Run the current thread
579 prev_what_next = t->what_next;
581 errno = t->saved_errno;
582 cap->in_haskell = rtsTrue;
584 recent_activity = ACTIVITY_YES;
586 switch (prev_what_next) {
590 /* Thread already finished, return to scheduler. */
591 ret = ThreadFinished;
597 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598 cap = regTableToCapability(r);
603 case ThreadInterpret:
604 cap = interpretBCO(cap);
609 barf("schedule: invalid what_next field");
612 cap->in_haskell = rtsFalse;
614 // The TSO might have moved, eg. if it re-entered the RTS and a GC
615 // happened. So find the new location:
616 t = cap->r.rCurrentTSO;
618 // We have run some Haskell code: there might be blackhole-blocked
619 // threads to wake up now.
620 // Lock-free test here should be ok, we're just setting a flag.
621 if ( blackhole_queue != END_TSO_QUEUE ) {
622 blackholes_need_checking = rtsTrue;
625 // And save the current errno in this thread.
626 // XXX: possibly bogus for SMP because this thread might already
627 // be running again, see code below.
628 t->saved_errno = errno;
631 // If ret is ThreadBlocked, and this Task is bound to the TSO that
632 // blocked, we are in limbo - the TSO is now owned by whatever it
633 // is blocked on, and may in fact already have been woken up,
634 // perhaps even on a different Capability. It may be the case
635 // that task->cap != cap. We better yield this Capability
636 // immediately and return to normaility.
637 if (ret == ThreadBlocked) {
639 debugBelch("--<< thread %d (%s) stopped: blocked\n",
640 t->id, whatNext_strs[t->what_next]));
645 ASSERT_CAPABILITY_INVARIANTS(cap,task);
647 // ----------------------------------------------------------------------
649 // Costs for the scheduler are assigned to CCS_SYSTEM
650 #if defined(PROFILING)
655 #if defined(THREADED_RTS)
656 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
657 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
658 IF_DEBUG(scheduler,debugBelch("sched: "););
661 schedulePostRunThread();
663 ready_to_gc = rtsFalse;
667 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
671 scheduleHandleStackOverflow(cap,task,t);
675 if (scheduleHandleYield(cap, t, prev_what_next)) {
676 // shortcut for switching between compiler/interpreter:
682 scheduleHandleThreadBlocked(t);
686 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
687 ASSERT_CAPABILITY_INVARIANTS(cap,task);
691 barf("schedule: invalid thread return code %d", (int)ret);
694 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
695 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
696 } /* end of while() */
698 IF_PAR_DEBUG(verbose,
699 debugBelch("== Leaving schedule() after having received Finish\n"));
702 /* ----------------------------------------------------------------------------
703 * Setting up the scheduler loop
704 * ------------------------------------------------------------------------- */
707 schedulePreLoop(void)
710 /* set up first event to get things going */
711 /* ToDo: assign costs for system setup and init MainTSO ! */
712 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
714 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
717 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
719 G_TSO(CurrentTSO, 5));
721 if (RtsFlags.GranFlags.Light) {
722 /* Save current time; GranSim Light only */
723 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
728 /* -----------------------------------------------------------------------------
731 * Push work to other Capabilities if we have some.
732 * -------------------------------------------------------------------------- */
736 schedulePushWork(Capability *cap USED_WHEN_SMP,
737 Task *task USED_WHEN_SMP)
739 Capability *free_caps[n_capabilities], *cap0;
742 // Check whether we have more threads on our run queue that we
743 // could hand to another Capability.
744 if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
748 // First grab as many free Capabilities as we can.
749 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
750 cap0 = &capabilities[i];
751 if (cap != cap0 && tryGrabCapability(cap0,task)) {
752 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
753 // it already has some work, we just grabbed it at
754 // the wrong moment. Or maybe it's deadlocked!
755 releaseCapability(cap0);
757 free_caps[n_free_caps++] = cap0;
762 // we now have n_free_caps free capabilities stashed in
763 // free_caps[]. Share our run queue equally with them. This is
764 // probably the simplest thing we could do; improvements we might
765 // want to do include:
767 // - giving high priority to moving relatively new threads, on
768 // the gournds that they haven't had time to build up a
769 // working set in the cache on this CPU/Capability.
771 // - giving low priority to moving long-lived threads
773 if (n_free_caps > 0) {
774 StgTSO *prev, *t, *next;
775 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
777 prev = cap->run_queue_hd;
779 prev->link = END_TSO_QUEUE;
781 for (; t != END_TSO_QUEUE; t = next) {
783 t->link = END_TSO_QUEUE;
784 if (t->what_next == ThreadRelocated
785 || t->bound == task) { // don't move my bound thread
788 } else if (i == n_free_caps) {
794 appendToRunQueue(free_caps[i],t);
795 if (t->bound) { t->bound->cap = free_caps[i]; }
799 cap->run_queue_tl = prev;
801 // release the capabilities
802 for (i = 0; i < n_free_caps; i++) {
803 task->cap = free_caps[i];
804 releaseCapability(free_caps[i]);
807 task->cap = cap; // reset to point to our Capability.
811 /* ----------------------------------------------------------------------------
812 * Start any pending signal handlers
813 * ------------------------------------------------------------------------- */
816 scheduleStartSignalHandlers(Capability *cap)
818 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
819 if (signals_pending()) { // safe outside the lock
820 startSignalHandlers(cap);
825 /* ----------------------------------------------------------------------------
826 * Check for blocked threads that can be woken up.
827 * ------------------------------------------------------------------------- */
830 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
832 #if !defined(THREADED_RTS)
834 // Check whether any waiting threads need to be woken up. If the
835 // run queue is empty, and there are no other tasks running, we
836 // can wait indefinitely for something to happen.
838 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
840 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
846 /* ----------------------------------------------------------------------------
847 * Check for threads blocked on BLACKHOLEs that can be woken up
848 * ------------------------------------------------------------------------- */
850 scheduleCheckBlackHoles (Capability *cap)
852 if ( blackholes_need_checking ) // check without the lock first
854 ACQUIRE_LOCK(&sched_mutex);
855 if ( blackholes_need_checking ) {
856 checkBlackHoles(cap);
857 blackholes_need_checking = rtsFalse;
859 RELEASE_LOCK(&sched_mutex);
863 /* ----------------------------------------------------------------------------
864 * Detect deadlock conditions and attempt to resolve them.
865 * ------------------------------------------------------------------------- */
868 scheduleDetectDeadlock (Capability *cap, Task *task)
871 #if defined(PARALLEL_HASKELL)
872 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
877 * Detect deadlock: when we have no threads to run, there are no
878 * threads blocked, waiting for I/O, or sleeping, and all the
879 * other tasks are waiting for work, we must have a deadlock of
882 if ( emptyThreadQueues(cap) )
884 #if defined(THREADED_RTS)
886 * In the threaded RTS, we only check for deadlock if there
887 * has been no activity in a complete timeslice. This means
888 * we won't eagerly start a full GC just because we don't have
889 * any threads to run currently.
891 if (recent_activity != ACTIVITY_INACTIVE) return;
894 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
896 // Garbage collection can release some new threads due to
897 // either (a) finalizers or (b) threads resurrected because
898 // they are unreachable and will therefore be sent an
899 // exception. Any threads thus released will be immediately
901 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
902 recent_activity = ACTIVITY_DONE_GC;
904 if ( !emptyRunQueue(cap) ) return;
906 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
907 /* If we have user-installed signal handlers, then wait
908 * for signals to arrive rather then bombing out with a
911 if ( anyUserHandlers() ) {
913 sched_belch("still deadlocked, waiting for signals..."));
917 if (signals_pending()) {
918 startSignalHandlers(cap);
921 // either we have threads to run, or we were interrupted:
922 ASSERT(!emptyRunQueue(cap) || interrupted);
926 #if !defined(THREADED_RTS)
927 /* Probably a real deadlock. Send the current main thread the
928 * Deadlock exception.
931 switch (task->tso->why_blocked) {
933 case BlockedOnBlackHole:
934 case BlockedOnException:
936 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
939 barf("deadlock: main thread blocked in a strange way");
947 /* ----------------------------------------------------------------------------
948 * Process an event (GRAN only)
949 * ------------------------------------------------------------------------- */
953 scheduleProcessEvent(rtsEvent *event)
957 if (RtsFlags.GranFlags.Light)
958 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
960 /* adjust time based on time-stamp */
961 if (event->time > CurrentTime[CurrentProc] &&
962 event->evttype != ContinueThread)
963 CurrentTime[CurrentProc] = event->time;
965 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
966 if (!RtsFlags.GranFlags.Light)
969 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
971 /* main event dispatcher in GranSim */
972 switch (event->evttype) {
973 /* Should just be continuing execution */
975 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
976 /* ToDo: check assertion
977 ASSERT(run_queue_hd != (StgTSO*)NULL &&
978 run_queue_hd != END_TSO_QUEUE);
980 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
981 if (!RtsFlags.GranFlags.DoAsyncFetch &&
982 procStatus[CurrentProc]==Fetching) {
983 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
984 CurrentTSO->id, CurrentTSO, CurrentProc);
987 /* Ignore ContinueThreads for completed threads */
988 if (CurrentTSO->what_next == ThreadComplete) {
989 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
990 CurrentTSO->id, CurrentTSO, CurrentProc);
993 /* Ignore ContinueThreads for threads that are being migrated */
994 if (PROCS(CurrentTSO)==Nowhere) {
995 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
996 CurrentTSO->id, CurrentTSO, CurrentProc);
999 /* The thread should be at the beginning of the run queue */
1000 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1001 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1002 CurrentTSO->id, CurrentTSO, CurrentProc);
1003 break; // run the thread anyway
1006 new_event(proc, proc, CurrentTime[proc],
1008 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1010 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1011 break; // now actually run the thread; DaH Qu'vam yImuHbej
1014 do_the_fetchnode(event);
1015 goto next_thread; /* handle next event in event queue */
1018 do_the_globalblock(event);
1019 goto next_thread; /* handle next event in event queue */
1022 do_the_fetchreply(event);
1023 goto next_thread; /* handle next event in event queue */
1025 case UnblockThread: /* Move from the blocked queue to the tail of */
1026 do_the_unblock(event);
1027 goto next_thread; /* handle next event in event queue */
1029 case ResumeThread: /* Move from the blocked queue to the tail of */
1030 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1031 event->tso->gran.blocktime +=
1032 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1033 do_the_startthread(event);
1034 goto next_thread; /* handle next event in event queue */
1037 do_the_startthread(event);
1038 goto next_thread; /* handle next event in event queue */
1041 do_the_movethread(event);
1042 goto next_thread; /* handle next event in event queue */
1045 do_the_movespark(event);
1046 goto next_thread; /* handle next event in event queue */
1049 do_the_findwork(event);
1050 goto next_thread; /* handle next event in event queue */
1053 barf("Illegal event type %u\n", event->evttype);
1056 /* This point was scheduler_loop in the old RTS */
1058 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1060 TimeOfLastEvent = CurrentTime[CurrentProc];
1061 TimeOfNextEvent = get_time_of_next_event();
1062 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1063 // CurrentTSO = ThreadQueueHd;
1065 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1068 if (RtsFlags.GranFlags.Light)
1069 GranSimLight_leave_system(event, &ActiveTSO);
1071 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1074 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1076 /* in a GranSim setup the TSO stays on the run queue */
1078 /* Take a thread from the run queue. */
1079 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1082 debugBelch("GRAN: About to run current thread, which is\n");
1085 context_switch = 0; // turned on via GranYield, checking events and time slice
1088 DumpGranEvent(GR_SCHEDULE, t));
1090 procStatus[CurrentProc] = Busy;
1094 /* ----------------------------------------------------------------------------
1095 * Send pending messages (PARALLEL_HASKELL only)
1096 * ------------------------------------------------------------------------- */
1098 #if defined(PARALLEL_HASKELL)
1100 scheduleSendPendingMessages(void)
1106 # if defined(PAR) // global Mem.Mgmt., omit for now
1107 if (PendingFetches != END_BF_QUEUE) {
1112 if (RtsFlags.ParFlags.BufferTime) {
1113 // if we use message buffering, we must send away all message
1114 // packets which have become too old...
1120 /* ----------------------------------------------------------------------------
1121 * Activate spark threads (PARALLEL_HASKELL only)
1122 * ------------------------------------------------------------------------- */
1124 #if defined(PARALLEL_HASKELL)
1126 scheduleActivateSpark(void)
1129 ASSERT(emptyRunQueue());
1130 /* We get here if the run queue is empty and want some work.
1131 We try to turn a spark into a thread, and add it to the run queue,
1132 from where it will be picked up in the next iteration of the scheduler
1136 /* :-[ no local threads => look out for local sparks */
1137 /* the spark pool for the current PE */
1138 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1139 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1140 pool->hd < pool->tl) {
1142 * ToDo: add GC code check that we really have enough heap afterwards!!
1144 * If we're here (no runnable threads) and we have pending
1145 * sparks, we must have a space problem. Get enough space
1146 * to turn one of those pending sparks into a
1150 spark = findSpark(rtsFalse); /* get a spark */
1151 if (spark != (rtsSpark) NULL) {
1152 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1153 IF_PAR_DEBUG(fish, // schedule,
1154 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1155 tso->id, tso, advisory_thread_count));
1157 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1158 IF_PAR_DEBUG(fish, // schedule,
1159 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1161 return rtsFalse; /* failed to generate a thread */
1162 } /* otherwise fall through & pick-up new tso */
1164 IF_PAR_DEBUG(fish, // schedule,
1165 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1166 spark_queue_len(pool)));
1167 return rtsFalse; /* failed to generate a thread */
1169 return rtsTrue; /* success in generating a thread */
1170 } else { /* no more threads permitted or pool empty */
1171 return rtsFalse; /* failed to generateThread */
1174 tso = NULL; // avoid compiler warning only
1175 return rtsFalse; /* dummy in non-PAR setup */
1178 #endif // PARALLEL_HASKELL
1180 /* ----------------------------------------------------------------------------
1181 * Get work from a remote node (PARALLEL_HASKELL only)
1182 * ------------------------------------------------------------------------- */
1184 #if defined(PARALLEL_HASKELL)
1186 scheduleGetRemoteWork(rtsBool *receivedFinish)
1188 ASSERT(emptyRunQueue());
1190 if (RtsFlags.ParFlags.BufferTime) {
1191 IF_PAR_DEBUG(verbose,
1192 debugBelch("...send all pending data,"));
1195 for (i=1; i<=nPEs; i++)
1196 sendImmediately(i); // send all messages away immediately
1200 //++EDEN++ idle() , i.e. send all buffers, wait for work
1201 // suppress fishing in EDEN... just look for incoming messages
1202 // (blocking receive)
1203 IF_PAR_DEBUG(verbose,
1204 debugBelch("...wait for incoming messages...\n"));
1205 *receivedFinish = processMessages(); // blocking receive...
1207 // and reenter scheduling loop after having received something
1208 // (return rtsFalse below)
1210 # else /* activate SPARKS machinery */
1211 /* We get here, if we have no work, tried to activate a local spark, but still
1212 have no work. We try to get a remote spark, by sending a FISH message.
1213 Thread migration should be added here, and triggered when a sequence of
1214 fishes returns without work. */
1215 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1217 /* =8-[ no local sparks => look for work on other PEs */
1219 * We really have absolutely no work. Send out a fish
1220 * (there may be some out there already), and wait for
1221 * something to arrive. We clearly can't run any threads
1222 * until a SCHEDULE or RESUME arrives, and so that's what
1223 * we're hoping to see. (Of course, we still have to
1224 * respond to other types of messages.)
1226 rtsTime now = msTime() /*CURRENT_TIME*/;
1227 IF_PAR_DEBUG(verbose,
1228 debugBelch("-- now=%ld\n", now));
1229 IF_PAR_DEBUG(fish, // verbose,
1230 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1231 (last_fish_arrived_at!=0 &&
1232 last_fish_arrived_at+delay > now)) {
1233 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1234 now, last_fish_arrived_at+delay,
1235 last_fish_arrived_at,
1239 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1240 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1241 if (last_fish_arrived_at==0 ||
1242 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1243 /* outstandingFishes is set in sendFish, processFish;
1244 avoid flooding system with fishes via delay */
1245 next_fish_to_send_at = 0;
1247 /* ToDo: this should be done in the main scheduling loop to avoid the
1248 busy wait here; not so bad if fish delay is very small */
1249 int iq = 0; // DEBUGGING -- HWL
1250 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1251 /* send a fish when ready, but process messages that arrive in the meantime */
1253 if (PacketsWaiting()) {
1255 *receivedFinish = processMessages();
1258 } while (!*receivedFinish || now<next_fish_to_send_at);
1259 // JB: This means the fish could become obsolete, if we receive
1260 // work. Better check for work again?
1261 // last line: while (!receivedFinish || !haveWork || now<...)
1262 // next line: if (receivedFinish || haveWork )
1264 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1265 return rtsFalse; // NB: this will leave scheduler loop
1266 // immediately after return!
1268 IF_PAR_DEBUG(fish, // verbose,
1269 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1273 // JB: IMHO, this should all be hidden inside sendFish(...)
1275 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1278 // Global statistics: count no. of fishes
1279 if (RtsFlags.ParFlags.ParStats.Global &&
1280 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1281 globalParStats.tot_fish_mess++;
1285 /* delayed fishes must have been sent by now! */
1286 next_fish_to_send_at = 0;
1289 *receivedFinish = processMessages();
1290 # endif /* SPARKS */
1293 /* NB: this function always returns rtsFalse, meaning the scheduler
1294 loop continues with the next iteration;
1296 return code means success in finding work; we enter this function
1297 if there is no local work, thus have to send a fish which takes
1298 time until it arrives with work; in the meantime we should process
1299 messages in the main loop;
1302 #endif // PARALLEL_HASKELL
1304 /* ----------------------------------------------------------------------------
1305 * PAR/GRAN: Report stats & debugging info(?)
1306 * ------------------------------------------------------------------------- */
1308 #if defined(PAR) || defined(GRAN)
1310 scheduleGranParReport(void)
1312 ASSERT(run_queue_hd != END_TSO_QUEUE);
1314 /* Take a thread from the run queue, if we have work */
1315 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1317 /* If this TSO has got its outport closed in the meantime,
1318 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1319 * It has to be marked as TH_DEAD for this purpose.
1320 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1322 JB: TODO: investigate wether state change field could be nuked
1323 entirely and replaced by the normal tso state (whatnext
1324 field). All we want to do is to kill tsos from outside.
1327 /* ToDo: write something to the log-file
1328 if (RTSflags.ParFlags.granSimStats && !sameThread)
1329 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1333 /* the spark pool for the current PE */
1334 pool = &(cap.r.rSparks); // cap = (old) MainCap
1337 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1338 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1341 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1342 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1344 if (RtsFlags.ParFlags.ParStats.Full &&
1345 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1346 (emitSchedule || // forced emit
1347 (t && LastTSO && t->id != LastTSO->id))) {
1349 we are running a different TSO, so write a schedule event to log file
1350 NB: If we use fair scheduling we also have to write a deschedule
1351 event for LastTSO; with unfair scheduling we know that the
1352 previous tso has blocked whenever we switch to another tso, so
1353 we don't need it in GUM for now
1355 IF_PAR_DEBUG(fish, // schedule,
1356 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1358 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1359 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1360 emitSchedule = rtsFalse;
1365 /* ----------------------------------------------------------------------------
1366 * After running a thread...
1367 * ------------------------------------------------------------------------- */
1370 schedulePostRunThread(void)
1373 /* HACK 675: if the last thread didn't yield, make sure to print a
1374 SCHEDULE event to the log file when StgRunning the next thread, even
1375 if it is the same one as before */
1377 TimeOfLastYield = CURRENT_TIME;
1380 /* some statistics gathering in the parallel case */
1382 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1386 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1387 globalGranStats.tot_heapover++;
1389 globalParStats.tot_heapover++;
1396 DumpGranEvent(GR_DESCHEDULE, t));
1397 globalGranStats.tot_stackover++;
1400 // DumpGranEvent(GR_DESCHEDULE, t);
1401 globalParStats.tot_stackover++;
1405 case ThreadYielding:
1408 DumpGranEvent(GR_DESCHEDULE, t));
1409 globalGranStats.tot_yields++;
1412 // DumpGranEvent(GR_DESCHEDULE, t);
1413 globalParStats.tot_yields++;
1420 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1421 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1422 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1423 if (t->block_info.closure!=(StgClosure*)NULL)
1424 print_bq(t->block_info.closure);
1427 // ??? needed; should emit block before
1429 DumpGranEvent(GR_DESCHEDULE, t));
1430 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1433 ASSERT(procStatus[CurrentProc]==Busy ||
1434 ((procStatus[CurrentProc]==Fetching) &&
1435 (t->block_info.closure!=(StgClosure*)NULL)));
1436 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1437 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1438 procStatus[CurrentProc]==Fetching))
1439 procStatus[CurrentProc] = Idle;
1442 //++PAR++ blockThread() writes the event (change?)
1446 case ThreadFinished:
1450 barf("parGlobalStats: unknown return code");
1456 /* -----------------------------------------------------------------------------
1457 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1458 * -------------------------------------------------------------------------- */
1461 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1463 // did the task ask for a large block?
1464 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1465 // if so, get one and push it on the front of the nursery.
1469 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1472 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1473 (long)t->id, whatNext_strs[t->what_next], blocks));
1475 // don't do this if the nursery is (nearly) full, we'll GC first.
1476 if (cap->r.rCurrentNursery->link != NULL ||
1477 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1478 // if the nursery has only one block.
1481 bd = allocGroup( blocks );
1483 cap->r.rNursery->n_blocks += blocks;
1485 // link the new group into the list
1486 bd->link = cap->r.rCurrentNursery;
1487 bd->u.back = cap->r.rCurrentNursery->u.back;
1488 if (cap->r.rCurrentNursery->u.back != NULL) {
1489 cap->r.rCurrentNursery->u.back->link = bd;
1492 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1493 g0s0 == cap->r.rNursery);
1495 cap->r.rNursery->blocks = bd;
1497 cap->r.rCurrentNursery->u.back = bd;
1499 // initialise it as a nursery block. We initialise the
1500 // step, gen_no, and flags field of *every* sub-block in
1501 // this large block, because this is easier than making
1502 // sure that we always find the block head of a large
1503 // block whenever we call Bdescr() (eg. evacuate() and
1504 // isAlive() in the GC would both have to do this, at
1508 for (x = bd; x < bd + blocks; x++) {
1509 x->step = cap->r.rNursery;
1515 // This assert can be a killer if the app is doing lots
1516 // of large block allocations.
1517 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1519 // now update the nursery to point to the new block
1520 cap->r.rCurrentNursery = bd;
1522 // we might be unlucky and have another thread get on the
1523 // run queue before us and steal the large block, but in that
1524 // case the thread will just end up requesting another large
1526 pushOnRunQueue(cap,t);
1527 return rtsFalse; /* not actually GC'ing */
1532 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1533 (long)t->id, whatNext_strs[t->what_next]));
1535 ASSERT(!is_on_queue(t,CurrentProc));
1536 #elif defined(PARALLEL_HASKELL)
1537 /* Currently we emit a DESCHEDULE event before GC in GUM.
1538 ToDo: either add separate event to distinguish SYSTEM time from rest
1539 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1540 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1541 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1542 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1543 emitSchedule = rtsTrue;
1547 pushOnRunQueue(cap,t);
1549 /* actual GC is done at the end of the while loop in schedule() */
1552 /* -----------------------------------------------------------------------------
1553 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1554 * -------------------------------------------------------------------------- */
1557 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1559 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1560 (long)t->id, whatNext_strs[t->what_next]));
1561 /* just adjust the stack for this thread, then pop it back
1565 /* enlarge the stack */
1566 StgTSO *new_t = threadStackOverflow(cap, t);
1568 /* The TSO attached to this Task may have moved, so update the
1571 if (task->tso == t) {
1574 pushOnRunQueue(cap,new_t);
1578 /* -----------------------------------------------------------------------------
1579 * Handle a thread that returned to the scheduler with ThreadYielding
1580 * -------------------------------------------------------------------------- */
1583 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1585 // Reset the context switch flag. We don't do this just before
1586 // running the thread, because that would mean we would lose ticks
1587 // during GC, which can lead to unfair scheduling (a thread hogs
1588 // the CPU because the tick always arrives during GC). This way
1589 // penalises threads that do a lot of allocation, but that seems
1590 // better than the alternative.
1593 /* put the thread back on the run queue. Then, if we're ready to
1594 * GC, check whether this is the last task to stop. If so, wake
1595 * up the GC thread. getThread will block during a GC until the
1599 if (t->what_next != prev_what_next) {
1600 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1601 (long)t->id, whatNext_strs[t->what_next]);
1603 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1604 (long)t->id, whatNext_strs[t->what_next]);
1609 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1611 ASSERT(t->link == END_TSO_QUEUE);
1613 // Shortcut if we're just switching evaluators: don't bother
1614 // doing stack squeezing (which can be expensive), just run the
1616 if (t->what_next != prev_what_next) {
1621 ASSERT(!is_on_queue(t,CurrentProc));
1624 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1625 checkThreadQsSanity(rtsTrue));
1629 addToRunQueue(cap,t);
1632 /* add a ContinueThread event to actually process the thread */
1633 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1635 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1637 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1644 /* -----------------------------------------------------------------------------
1645 * Handle a thread that returned to the scheduler with ThreadBlocked
1646 * -------------------------------------------------------------------------- */
1649 scheduleHandleThreadBlocked( StgTSO *t
1650 #if !defined(GRAN) && !defined(DEBUG)
1657 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1658 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1659 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1661 // ??? needed; should emit block before
1663 DumpGranEvent(GR_DESCHEDULE, t));
1664 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1667 ASSERT(procStatus[CurrentProc]==Busy ||
1668 ((procStatus[CurrentProc]==Fetching) &&
1669 (t->block_info.closure!=(StgClosure*)NULL)));
1670 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1671 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1672 procStatus[CurrentProc]==Fetching))
1673 procStatus[CurrentProc] = Idle;
1677 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1678 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1681 if (t->block_info.closure!=(StgClosure*)NULL)
1682 print_bq(t->block_info.closure));
1684 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1687 /* whatever we schedule next, we must log that schedule */
1688 emitSchedule = rtsTrue;
1692 // We don't need to do anything. The thread is blocked, and it
1693 // has tidied up its stack and placed itself on whatever queue
1694 // it needs to be on.
1697 ASSERT(t->why_blocked != NotBlocked);
1698 // This might not be true under SMP: we don't have
1699 // exclusive access to this TSO, so someone might have
1700 // woken it up by now. This actually happens: try
1701 // conc023 +RTS -N2.
1705 debugBelch("--<< thread %d (%s) stopped: ",
1706 t->id, whatNext_strs[t->what_next]);
1707 printThreadBlockage(t);
1710 /* Only for dumping event to log file
1711 ToDo: do I need this in GranSim, too?
1717 /* -----------------------------------------------------------------------------
1718 * Handle a thread that returned to the scheduler with ThreadFinished
1719 * -------------------------------------------------------------------------- */
1722 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1724 /* Need to check whether this was a main thread, and if so,
1725 * return with the return value.
1727 * We also end up here if the thread kills itself with an
1728 * uncaught exception, see Exception.cmm.
1730 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1731 t->id, whatNext_strs[t->what_next]));
1734 endThread(t, CurrentProc); // clean-up the thread
1735 #elif defined(PARALLEL_HASKELL)
1736 /* For now all are advisory -- HWL */
1737 //if(t->priority==AdvisoryPriority) ??
1738 advisory_thread_count--; // JB: Caution with this counter, buggy!
1741 if(t->dist.priority==RevalPriority)
1745 # if defined(EDENOLD)
1746 // the thread could still have an outport... (BUG)
1747 if (t->eden.outport != -1) {
1748 // delete the outport for the tso which has finished...
1749 IF_PAR_DEBUG(eden_ports,
1750 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1751 t->eden.outport, t->id));
1754 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1755 if (t->eden.epid != -1) {
1756 IF_PAR_DEBUG(eden_ports,
1757 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1758 t->id, t->eden.epid));
1759 removeTSOfromProcess(t);
1764 if (RtsFlags.ParFlags.ParStats.Full &&
1765 !RtsFlags.ParFlags.ParStats.Suppressed)
1766 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1768 // t->par only contains statistics: left out for now...
1770 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1771 t->id,t,t->par.sparkname));
1773 #endif // PARALLEL_HASKELL
1776 // Check whether the thread that just completed was a bound
1777 // thread, and if so return with the result.
1779 // There is an assumption here that all thread completion goes
1780 // through this point; we need to make sure that if a thread
1781 // ends up in the ThreadKilled state, that it stays on the run
1782 // queue so it can be dealt with here.
1787 if (t->bound != task) {
1788 #if !defined(THREADED_RTS)
1789 // Must be a bound thread that is not the topmost one. Leave
1790 // it on the run queue until the stack has unwound to the
1791 // point where we can deal with this. Leaving it on the run
1792 // queue also ensures that the garbage collector knows about
1793 // this thread and its return value (it gets dropped from the
1794 // all_threads list so there's no other way to find it).
1795 appendToRunQueue(cap,t);
1798 // this cannot happen in the threaded RTS, because a
1799 // bound thread can only be run by the appropriate Task.
1800 barf("finished bound thread that isn't mine");
1804 ASSERT(task->tso == t);
1806 if (t->what_next == ThreadComplete) {
1808 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1809 *(task->ret) = (StgClosure *)task->tso->sp[1];
1811 task->stat = Success;
1814 *(task->ret) = NULL;
1817 task->stat = Interrupted;
1819 task->stat = Killed;
1823 removeThreadLabel((StgWord)task->tso->id);
1825 return rtsTrue; // tells schedule() to return
1831 /* -----------------------------------------------------------------------------
1832 * Perform a heap census, if PROFILING
1833 * -------------------------------------------------------------------------- */
1836 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1838 #if defined(PROFILING)
1839 // When we have +RTS -i0 and we're heap profiling, do a census at
1840 // every GC. This lets us get repeatable runs for debugging.
1841 if (performHeapProfile ||
1842 (RtsFlags.ProfFlags.profileInterval==0 &&
1843 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1844 GarbageCollect(GetRoots, rtsTrue);
1846 performHeapProfile = rtsFalse;
1847 return rtsTrue; // true <=> we already GC'd
1853 /* -----------------------------------------------------------------------------
1854 * Perform a garbage collection if necessary
1855 * -------------------------------------------------------------------------- */
1858 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1862 static volatile StgWord waiting_for_gc;
1863 rtsBool was_waiting;
1868 // In order to GC, there must be no threads running Haskell code.
1869 // Therefore, the GC thread needs to hold *all* the capabilities,
1870 // and release them after the GC has completed.
1872 // This seems to be the simplest way: previous attempts involved
1873 // making all the threads with capabilities give up their
1874 // capabilities and sleep except for the *last* one, which
1875 // actually did the GC. But it's quite hard to arrange for all
1876 // the other tasks to sleep and stay asleep.
1879 was_waiting = cas(&waiting_for_gc, 0, 1);
1882 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1883 yieldCapability(&cap,task);
1884 } while (waiting_for_gc);
1888 for (i=0; i < n_capabilities; i++) {
1889 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1890 if (cap != &capabilities[i]) {
1891 Capability *pcap = &capabilities[i];
1892 // we better hope this task doesn't get migrated to
1893 // another Capability while we're waiting for this one.
1894 // It won't, because load balancing happens while we have
1895 // all the Capabilities, but even so it's a slightly
1896 // unsavoury invariant.
1899 waitForReturnCapability(&pcap, task);
1900 if (pcap != &capabilities[i]) {
1901 barf("scheduleDoGC: got the wrong capability");
1906 waiting_for_gc = rtsFalse;
1909 /* Kick any transactions which are invalid back to their
1910 * atomically frames. When next scheduled they will try to
1911 * commit, this commit will fail and they will retry.
1916 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1917 if (t->what_next == ThreadRelocated) {
1920 next = t->global_link;
1921 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1922 if (!stmValidateNestOfTransactions (t -> trec)) {
1923 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1925 // strip the stack back to the
1926 // ATOMICALLY_FRAME, aborting the (nested)
1927 // transaction, and saving the stack of any
1928 // partially-evaluated thunks on the heap.
1929 raiseAsync_(cap, t, NULL, rtsTrue);
1932 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1940 // so this happens periodically:
1941 scheduleCheckBlackHoles(cap);
1943 IF_DEBUG(scheduler, printAllThreads());
1945 /* everybody back, start the GC.
1946 * Could do it in this thread, or signal a condition var
1947 * to do it in another thread. Either way, we need to
1948 * broadcast on gc_pending_cond afterward.
1950 #if defined(THREADED_RTS)
1951 IF_DEBUG(scheduler,sched_belch("doing GC"));
1953 GarbageCollect(GetRoots, force_major);
1956 // release our stash of capabilities.
1957 for (i = 0; i < n_capabilities; i++) {
1958 if (cap != &capabilities[i]) {
1959 task->cap = &capabilities[i];
1960 releaseCapability(&capabilities[i]);
1967 /* add a ContinueThread event to continue execution of current thread */
1968 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1970 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1972 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1978 /* ---------------------------------------------------------------------------
1979 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1980 * used by Control.Concurrent for error checking.
1981 * ------------------------------------------------------------------------- */
1984 rtsSupportsBoundThreads(void)
1986 #if defined(THREADED_RTS)
1993 /* ---------------------------------------------------------------------------
1994 * isThreadBound(tso): check whether tso is bound to an OS thread.
1995 * ------------------------------------------------------------------------- */
1998 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2000 #if defined(THREADED_RTS)
2001 return (tso->bound != NULL);
2006 /* ---------------------------------------------------------------------------
2007 * Singleton fork(). Do not copy any running threads.
2008 * ------------------------------------------------------------------------- */
2010 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2011 #define FORKPROCESS_PRIMOP_SUPPORTED
2014 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2016 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2019 forkProcess(HsStablePtr *entry
2020 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2025 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2031 IF_DEBUG(scheduler,sched_belch("forking!"));
2033 // ToDo: for SMP, we should probably acquire *all* the capabilities
2038 if (pid) { // parent
2040 // just return the pid
2046 // delete all threads
2047 cap->run_queue_hd = END_TSO_QUEUE;
2048 cap->run_queue_tl = END_TSO_QUEUE;
2050 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2053 // don't allow threads to catch the ThreadKilled exception
2054 deleteThreadImmediately(cap,t);
2057 // wipe the task list
2058 ACQUIRE_LOCK(&sched_mutex);
2059 for (task = all_tasks; task != NULL; task=task->all_link) {
2060 if (task != cap->running_task) discardTask(task);
2062 RELEASE_LOCK(&sched_mutex);
2064 #if defined(THREADED_RTS)
2065 // wipe our spare workers list.
2066 cap->spare_workers = NULL;
2069 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2070 rts_checkSchedStatus("forkProcess",cap);
2073 hs_exit(); // clean up and exit
2074 stg_exit(EXIT_SUCCESS);
2076 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2077 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2082 /* ---------------------------------------------------------------------------
2083 * Delete the threads on the run queue of the current capability.
2084 * ------------------------------------------------------------------------- */
2087 deleteRunQueue (Capability *cap)
2090 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2091 ASSERT(t->what_next != ThreadRelocated);
2093 deleteThread(cap, t);
2097 /* startThread and insertThread are now in GranSim.c -- HWL */
2100 /* -----------------------------------------------------------------------------
2101 Managing the suspended_ccalling_tasks list.
2102 Locks required: sched_mutex
2103 -------------------------------------------------------------------------- */
2106 suspendTask (Capability *cap, Task *task)
2108 ASSERT(task->next == NULL && task->prev == NULL);
2109 task->next = cap->suspended_ccalling_tasks;
2111 if (cap->suspended_ccalling_tasks) {
2112 cap->suspended_ccalling_tasks->prev = task;
2114 cap->suspended_ccalling_tasks = task;
2118 recoverSuspendedTask (Capability *cap, Task *task)
2121 task->prev->next = task->next;
2123 ASSERT(cap->suspended_ccalling_tasks == task);
2124 cap->suspended_ccalling_tasks = task->next;
2127 task->next->prev = task->prev;
2129 task->next = task->prev = NULL;
2132 /* ---------------------------------------------------------------------------
2133 * Suspending & resuming Haskell threads.
2135 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2136 * its capability before calling the C function. This allows another
2137 * task to pick up the capability and carry on running Haskell
2138 * threads. It also means that if the C call blocks, it won't lock
2141 * The Haskell thread making the C call is put to sleep for the
2142 * duration of the call, on the susepended_ccalling_threads queue. We
2143 * give out a token to the task, which it can use to resume the thread
2144 * on return from the C function.
2145 * ------------------------------------------------------------------------- */
2148 suspendThread (StgRegTable *reg)
2151 int saved_errno = errno;
2155 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2157 cap = regTableToCapability(reg);
2159 task = cap->running_task;
2160 tso = cap->r.rCurrentTSO;
2163 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2165 // XXX this might not be necessary --SDM
2166 tso->what_next = ThreadRunGHC;
2170 if(tso->blocked_exceptions == NULL) {
2171 tso->why_blocked = BlockedOnCCall;
2172 tso->blocked_exceptions = END_TSO_QUEUE;
2174 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2177 // Hand back capability
2178 task->suspended_tso = tso;
2180 ACQUIRE_LOCK(&cap->lock);
2182 suspendTask(cap,task);
2183 cap->in_haskell = rtsFalse;
2184 releaseCapability_(cap);
2186 RELEASE_LOCK(&cap->lock);
2188 #if defined(THREADED_RTS)
2189 /* Preparing to leave the RTS, so ensure there's a native thread/task
2190 waiting to take over.
2192 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2195 errno = saved_errno;
2200 resumeThread (void *task_)
2204 int saved_errno = errno;
2208 // Wait for permission to re-enter the RTS with the result.
2209 waitForReturnCapability(&cap,task);
2210 // we might be on a different capability now... but if so, our
2211 // entry on the suspended_ccalling_tasks list will also have been
2214 // Remove the thread from the suspended list
2215 recoverSuspendedTask(cap,task);
2217 tso = task->suspended_tso;
2218 task->suspended_tso = NULL;
2219 tso->link = END_TSO_QUEUE;
2220 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2222 if (tso->why_blocked == BlockedOnCCall) {
2223 awakenBlockedQueue(cap,tso->blocked_exceptions);
2224 tso->blocked_exceptions = NULL;
2227 /* Reset blocking status */
2228 tso->why_blocked = NotBlocked;
2230 cap->r.rCurrentTSO = tso;
2231 cap->in_haskell = rtsTrue;
2232 errno = saved_errno;
2237 /* ---------------------------------------------------------------------------
2238 * Comparing Thread ids.
2240 * This is used from STG land in the implementation of the
2241 * instances of Eq/Ord for ThreadIds.
2242 * ------------------------------------------------------------------------ */
2245 cmp_thread(StgPtr tso1, StgPtr tso2)
2247 StgThreadID id1 = ((StgTSO *)tso1)->id;
2248 StgThreadID id2 = ((StgTSO *)tso2)->id;
2250 if (id1 < id2) return (-1);
2251 if (id1 > id2) return 1;
2255 /* ---------------------------------------------------------------------------
2256 * Fetching the ThreadID from an StgTSO.
2258 * This is used in the implementation of Show for ThreadIds.
2259 * ------------------------------------------------------------------------ */
2261 rts_getThreadId(StgPtr tso)
2263 return ((StgTSO *)tso)->id;
2268 labelThread(StgPtr tso, char *label)
2273 /* Caveat: Once set, you can only set the thread name to "" */
2274 len = strlen(label)+1;
2275 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2276 strncpy(buf,label,len);
2277 /* Update will free the old memory for us */
2278 updateThreadLabel(((StgTSO *)tso)->id,buf);
2282 /* ---------------------------------------------------------------------------
2283 Create a new thread.
2285 The new thread starts with the given stack size. Before the
2286 scheduler can run, however, this thread needs to have a closure
2287 (and possibly some arguments) pushed on its stack. See
2288 pushClosure() in Schedule.h.
2290 createGenThread() and createIOThread() (in SchedAPI.h) are
2291 convenient packaged versions of this function.
2293 currently pri (priority) is only used in a GRAN setup -- HWL
2294 ------------------------------------------------------------------------ */
2296 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2298 createThread(nat size, StgInt pri)
2301 createThread(Capability *cap, nat size)
2307 /* sched_mutex is *not* required */
2309 /* First check whether we should create a thread at all */
2310 #if defined(PARALLEL_HASKELL)
2311 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2312 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2314 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2315 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2316 return END_TSO_QUEUE;
2322 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2325 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2327 /* catch ridiculously small stack sizes */
2328 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2329 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2332 stack_size = size - TSO_STRUCT_SIZEW;
2334 tso = (StgTSO *)allocateLocal(cap, size);
2335 TICK_ALLOC_TSO(stack_size, 0);
2337 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2339 SET_GRAN_HDR(tso, ThisPE);
2342 // Always start with the compiled code evaluator
2343 tso->what_next = ThreadRunGHC;
2345 tso->why_blocked = NotBlocked;
2346 tso->blocked_exceptions = NULL;
2348 tso->saved_errno = 0;
2351 tso->stack_size = stack_size;
2352 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2354 tso->sp = (P_)&(tso->stack) + stack_size;
2356 tso->trec = NO_TREC;
2359 tso->prof.CCCS = CCS_MAIN;
2362 /* put a stop frame on the stack */
2363 tso->sp -= sizeofW(StgStopFrame);
2364 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2365 tso->link = END_TSO_QUEUE;
2369 /* uses more flexible routine in GranSim */
2370 insertThread(tso, CurrentProc);
2372 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2378 if (RtsFlags.GranFlags.GranSimStats.Full)
2379 DumpGranEvent(GR_START,tso);
2380 #elif defined(PARALLEL_HASKELL)
2381 if (RtsFlags.ParFlags.ParStats.Full)
2382 DumpGranEvent(GR_STARTQ,tso);
2383 /* HACk to avoid SCHEDULE
2387 /* Link the new thread on the global thread list.
2389 ACQUIRE_LOCK(&sched_mutex);
2390 tso->id = next_thread_id++; // while we have the mutex
2391 tso->global_link = all_threads;
2393 RELEASE_LOCK(&sched_mutex);
2396 tso->dist.priority = MandatoryPriority; //by default that is...
2400 tso->gran.pri = pri;
2402 tso->gran.magic = TSO_MAGIC; // debugging only
2404 tso->gran.sparkname = 0;
2405 tso->gran.startedat = CURRENT_TIME;
2406 tso->gran.exported = 0;
2407 tso->gran.basicblocks = 0;
2408 tso->gran.allocs = 0;
2409 tso->gran.exectime = 0;
2410 tso->gran.fetchtime = 0;
2411 tso->gran.fetchcount = 0;
2412 tso->gran.blocktime = 0;
2413 tso->gran.blockcount = 0;
2414 tso->gran.blockedat = 0;
2415 tso->gran.globalsparks = 0;
2416 tso->gran.localsparks = 0;
2417 if (RtsFlags.GranFlags.Light)
2418 tso->gran.clock = Now; /* local clock */
2420 tso->gran.clock = 0;
2422 IF_DEBUG(gran,printTSO(tso));
2423 #elif defined(PARALLEL_HASKELL)
2425 tso->par.magic = TSO_MAGIC; // debugging only
2427 tso->par.sparkname = 0;
2428 tso->par.startedat = CURRENT_TIME;
2429 tso->par.exported = 0;
2430 tso->par.basicblocks = 0;
2431 tso->par.allocs = 0;
2432 tso->par.exectime = 0;
2433 tso->par.fetchtime = 0;
2434 tso->par.fetchcount = 0;
2435 tso->par.blocktime = 0;
2436 tso->par.blockcount = 0;
2437 tso->par.blockedat = 0;
2438 tso->par.globalsparks = 0;
2439 tso->par.localsparks = 0;
2443 globalGranStats.tot_threads_created++;
2444 globalGranStats.threads_created_on_PE[CurrentProc]++;
2445 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2446 globalGranStats.tot_sq_probes++;
2447 #elif defined(PARALLEL_HASKELL)
2448 // collect parallel global statistics (currently done together with GC stats)
2449 if (RtsFlags.ParFlags.ParStats.Global &&
2450 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2451 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2452 globalParStats.tot_threads_created++;
2458 sched_belch("==__ schedule: Created TSO %d (%p);",
2459 CurrentProc, tso, tso->id));
2460 #elif defined(PARALLEL_HASKELL)
2461 IF_PAR_DEBUG(verbose,
2462 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2463 (long)tso->id, tso, advisory_thread_count));
2465 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2466 (long)tso->id, (long)tso->stack_size));
2473 all parallel thread creation calls should fall through the following routine.
2476 createThreadFromSpark(rtsSpark spark)
2478 ASSERT(spark != (rtsSpark)NULL);
2479 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2480 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2482 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2483 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2484 return END_TSO_QUEUE;
2488 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2489 if (tso==END_TSO_QUEUE)
2490 barf("createSparkThread: Cannot create TSO");
2492 tso->priority = AdvisoryPriority;
2494 pushClosure(tso,spark);
2496 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2503 Turn a spark into a thread.
2504 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2508 activateSpark (rtsSpark spark)
2512 tso = createSparkThread(spark);
2513 if (RtsFlags.ParFlags.ParStats.Full) {
2514 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2515 IF_PAR_DEBUG(verbose,
2516 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2517 (StgClosure *)spark, info_type((StgClosure *)spark)));
2519 // ToDo: fwd info on local/global spark to thread -- HWL
2520 // tso->gran.exported = spark->exported;
2521 // tso->gran.locked = !spark->global;
2522 // tso->gran.sparkname = spark->name;
2528 /* ---------------------------------------------------------------------------
2531 * scheduleThread puts a thread on the end of the runnable queue.
2532 * This will usually be done immediately after a thread is created.
2533 * The caller of scheduleThread must create the thread using e.g.
2534 * createThread and push an appropriate closure
2535 * on this thread's stack before the scheduler is invoked.
2536 * ------------------------------------------------------------------------ */
2539 scheduleThread(Capability *cap, StgTSO *tso)
2541 // The thread goes at the *end* of the run-queue, to avoid possible
2542 // starvation of any threads already on the queue.
2543 appendToRunQueue(cap,tso);
2547 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2551 // We already created/initialised the Task
2552 task = cap->running_task;
2554 // This TSO is now a bound thread; make the Task and TSO
2555 // point to each other.
2560 task->stat = NoStatus;
2562 appendToRunQueue(cap,tso);
2564 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2567 /* GranSim specific init */
2568 CurrentTSO = m->tso; // the TSO to run
2569 procStatus[MainProc] = Busy; // status of main PE
2570 CurrentProc = MainProc; // PE to run it on
2573 cap = schedule(cap,task);
2575 ASSERT(task->stat != NoStatus);
2576 ASSERT_CAPABILITY_INVARIANTS(cap,task);
2578 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2582 /* ----------------------------------------------------------------------------
2584 * ------------------------------------------------------------------------- */
2586 #if defined(THREADED_RTS)
2588 workerStart(Task *task)
2592 // See startWorkerTask().
2593 ACQUIRE_LOCK(&task->lock);
2595 RELEASE_LOCK(&task->lock);
2597 // set the thread-local pointer to the Task:
2600 // schedule() runs without a lock.
2601 cap = schedule(cap,task);
2603 // On exit from schedule(), we have a Capability.
2604 releaseCapability(cap);
2609 /* ---------------------------------------------------------------------------
2612 * Initialise the scheduler. This resets all the queues - if the
2613 * queues contained any threads, they'll be garbage collected at the
2616 * ------------------------------------------------------------------------ */
2623 for (i=0; i<=MAX_PROC; i++) {
2624 run_queue_hds[i] = END_TSO_QUEUE;
2625 run_queue_tls[i] = END_TSO_QUEUE;
2626 blocked_queue_hds[i] = END_TSO_QUEUE;
2627 blocked_queue_tls[i] = END_TSO_QUEUE;
2628 ccalling_threadss[i] = END_TSO_QUEUE;
2629 blackhole_queue[i] = END_TSO_QUEUE;
2630 sleeping_queue = END_TSO_QUEUE;
2632 #elif !defined(THREADED_RTS)
2633 blocked_queue_hd = END_TSO_QUEUE;
2634 blocked_queue_tl = END_TSO_QUEUE;
2635 sleeping_queue = END_TSO_QUEUE;
2638 blackhole_queue = END_TSO_QUEUE;
2639 all_threads = END_TSO_QUEUE;
2644 RtsFlags.ConcFlags.ctxtSwitchTicks =
2645 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2647 #if defined(THREADED_RTS)
2648 /* Initialise the mutex and condition variables used by
2650 initMutex(&sched_mutex);
2653 ACQUIRE_LOCK(&sched_mutex);
2655 /* A capability holds the state a native thread needs in
2656 * order to execute STG code. At least one capability is
2657 * floating around (only SMP builds have more than one).
2665 * Eagerly start one worker to run each Capability, except for
2666 * Capability 0. The idea is that we're probably going to start a
2667 * bound thread on Capability 0 pretty soon, so we don't want a
2668 * worker task hogging it.
2673 for (i = 1; i < n_capabilities; i++) {
2674 cap = &capabilities[i];
2675 ACQUIRE_LOCK(&cap->lock);
2676 startWorkerTask(cap, workerStart);
2677 RELEASE_LOCK(&cap->lock);
2682 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2686 RELEASE_LOCK(&sched_mutex);
2690 exitScheduler( void )
2692 interrupted = rtsTrue;
2693 shutting_down_scheduler = rtsTrue;
2695 #if defined(THREADED_RTS)
2700 ACQUIRE_LOCK(&sched_mutex);
2701 task = newBoundTask();
2702 RELEASE_LOCK(&sched_mutex);
2704 for (i = 0; i < n_capabilities; i++) {
2705 shutdownCapability(&capabilities[i], task);
2707 boundTaskExiting(task);
2713 /* ---------------------------------------------------------------------------
2714 Where are the roots that we know about?
2716 - all the threads on the runnable queue
2717 - all the threads on the blocked queue
2718 - all the threads on the sleeping queue
2719 - all the thread currently executing a _ccall_GC
2720 - all the "main threads"
2722 ------------------------------------------------------------------------ */
2724 /* This has to be protected either by the scheduler monitor, or by the
2725 garbage collection monitor (probably the latter).
2730 GetRoots( evac_fn evac )
2737 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2738 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2739 evac((StgClosure **)&run_queue_hds[i]);
2740 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2741 evac((StgClosure **)&run_queue_tls[i]);
2743 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2744 evac((StgClosure **)&blocked_queue_hds[i]);
2745 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2746 evac((StgClosure **)&blocked_queue_tls[i]);
2747 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2748 evac((StgClosure **)&ccalling_threads[i]);
2755 for (i = 0; i < n_capabilities; i++) {
2756 cap = &capabilities[i];
2757 evac((StgClosure **)&cap->run_queue_hd);
2758 evac((StgClosure **)&cap->run_queue_tl);
2760 for (task = cap->suspended_ccalling_tasks; task != NULL;
2762 evac((StgClosure **)&task->suspended_tso);
2766 #if !defined(THREADED_RTS)
2767 evac((StgClosure **)&blocked_queue_hd);
2768 evac((StgClosure **)&blocked_queue_tl);
2769 evac((StgClosure **)&sleeping_queue);
2773 evac((StgClosure **)&blackhole_queue);
2775 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2776 markSparkQueue(evac);
2779 #if defined(RTS_USER_SIGNALS)
2780 // mark the signal handlers (signals should be already blocked)
2781 markSignalHandlers(evac);
2785 /* -----------------------------------------------------------------------------
2788 This is the interface to the garbage collector from Haskell land.
2789 We provide this so that external C code can allocate and garbage
2790 collect when called from Haskell via _ccall_GC.
2792 It might be useful to provide an interface whereby the programmer
2793 can specify more roots (ToDo).
2795 This needs to be protected by the GC condition variable above. KH.
2796 -------------------------------------------------------------------------- */
2798 static void (*extra_roots)(evac_fn);
2804 // ToDo: we have to grab all the capabilities here.
2805 errorBelch("performGC not supported in threaded RTS (yet)");
2806 stg_exit(EXIT_FAILURE);
2808 /* Obligated to hold this lock upon entry */
2809 GarbageCollect(GetRoots,rtsFalse);
2813 performMajorGC(void)
2816 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2817 stg_exit(EXIT_FAILURE);
2819 GarbageCollect(GetRoots,rtsTrue);
2823 AllRoots(evac_fn evac)
2825 GetRoots(evac); // the scheduler's roots
2826 extra_roots(evac); // the user's roots
2830 performGCWithRoots(void (*get_roots)(evac_fn))
2833 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2834 stg_exit(EXIT_FAILURE);
2836 extra_roots = get_roots;
2837 GarbageCollect(AllRoots,rtsFalse);
2840 /* -----------------------------------------------------------------------------
2843 If the thread has reached its maximum stack size, then raise the
2844 StackOverflow exception in the offending thread. Otherwise
2845 relocate the TSO into a larger chunk of memory and adjust its stack
2847 -------------------------------------------------------------------------- */
2850 threadStackOverflow(Capability *cap, StgTSO *tso)
2852 nat new_stack_size, stack_words;
2857 IF_DEBUG(sanity,checkTSO(tso));
2858 if (tso->stack_size >= tso->max_stack_size) {
2861 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2862 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2863 /* If we're debugging, just print out the top of the stack */
2864 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2867 /* Send this thread the StackOverflow exception */
2868 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2872 /* Try to double the current stack size. If that takes us over the
2873 * maximum stack size for this thread, then use the maximum instead.
2874 * Finally round up so the TSO ends up as a whole number of blocks.
2876 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2877 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2878 TSO_STRUCT_SIZE)/sizeof(W_);
2879 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2880 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2882 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2884 dest = (StgTSO *)allocate(new_tso_size);
2885 TICK_ALLOC_TSO(new_stack_size,0);
2887 /* copy the TSO block and the old stack into the new area */
2888 memcpy(dest,tso,TSO_STRUCT_SIZE);
2889 stack_words = tso->stack + tso->stack_size - tso->sp;
2890 new_sp = (P_)dest + new_tso_size - stack_words;
2891 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2893 /* relocate the stack pointers... */
2895 dest->stack_size = new_stack_size;
2897 /* Mark the old TSO as relocated. We have to check for relocated
2898 * TSOs in the garbage collector and any primops that deal with TSOs.
2900 * It's important to set the sp value to just beyond the end
2901 * of the stack, so we don't attempt to scavenge any part of the
2904 tso->what_next = ThreadRelocated;
2906 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2907 tso->why_blocked = NotBlocked;
2909 IF_PAR_DEBUG(verbose,
2910 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2911 tso->id, tso, tso->stack_size);
2912 /* If we're debugging, just print out the top of the stack */
2913 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2916 IF_DEBUG(sanity,checkTSO(tso));
2918 IF_DEBUG(scheduler,printTSO(dest));
2924 /* ---------------------------------------------------------------------------
2925 Wake up a queue that was blocked on some resource.
2926 ------------------------------------------------------------------------ */
2930 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2933 #elif defined(PARALLEL_HASKELL)
2935 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2937 /* write RESUME events to log file and
2938 update blocked and fetch time (depending on type of the orig closure) */
2939 if (RtsFlags.ParFlags.ParStats.Full) {
2940 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2941 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2942 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2943 if (emptyRunQueue())
2944 emitSchedule = rtsTrue;
2946 switch (get_itbl(node)->type) {
2948 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2953 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2960 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2967 StgBlockingQueueElement *
2968 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2971 PEs node_loc, tso_loc;
2973 node_loc = where_is(node); // should be lifted out of loop
2974 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2975 tso_loc = where_is((StgClosure *)tso);
2976 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2977 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2978 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2979 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2980 // insertThread(tso, node_loc);
2981 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2983 tso, node, (rtsSpark*)NULL);
2984 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2987 } else { // TSO is remote (actually should be FMBQ)
2988 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2989 RtsFlags.GranFlags.Costs.gunblocktime +
2990 RtsFlags.GranFlags.Costs.latency;
2991 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2993 tso, node, (rtsSpark*)NULL);
2994 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2997 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2999 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3000 (node_loc==tso_loc ? "Local" : "Global"),
3001 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3002 tso->block_info.closure = NULL;
3003 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3006 #elif defined(PARALLEL_HASKELL)
3007 StgBlockingQueueElement *
3008 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3010 StgBlockingQueueElement *next;
3012 switch (get_itbl(bqe)->type) {
3014 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3015 /* if it's a TSO just push it onto the run_queue */
3017 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3018 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3020 unblockCount(bqe, node);
3021 /* reset blocking status after dumping event */
3022 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3026 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3028 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3029 PendingFetches = (StgBlockedFetch *)bqe;
3033 /* can ignore this case in a non-debugging setup;
3034 see comments on RBHSave closures above */
3036 /* check that the closure is an RBHSave closure */
3037 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3038 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3039 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3043 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3044 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3048 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3054 unblockOne(Capability *cap, StgTSO *tso)
3058 ASSERT(get_itbl(tso)->type == TSO);
3059 ASSERT(tso->why_blocked != NotBlocked);
3060 tso->why_blocked = NotBlocked;
3062 tso->link = END_TSO_QUEUE;
3064 // We might have just migrated this TSO to our Capability:
3066 tso->bound->cap = cap;
3069 appendToRunQueue(cap,tso);
3071 // we're holding a newly woken thread, make sure we context switch
3072 // quickly so we can migrate it if necessary.
3074 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3081 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3083 StgBlockingQueueElement *bqe;
3088 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3089 node, CurrentProc, CurrentTime[CurrentProc],
3090 CurrentTSO->id, CurrentTSO));
3092 node_loc = where_is(node);
3094 ASSERT(q == END_BQ_QUEUE ||
3095 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3096 get_itbl(q)->type == CONSTR); // closure (type constructor)
3097 ASSERT(is_unique(node));
3099 /* FAKE FETCH: magically copy the node to the tso's proc;
3100 no Fetch necessary because in reality the node should not have been
3101 moved to the other PE in the first place
3103 if (CurrentProc!=node_loc) {
3105 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3106 node, node_loc, CurrentProc, CurrentTSO->id,
3107 // CurrentTSO, where_is(CurrentTSO),
3108 node->header.gran.procs));
3109 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3111 debugBelch("## new bitmask of node %p is %#x\n",
3112 node, node->header.gran.procs));
3113 if (RtsFlags.GranFlags.GranSimStats.Global) {
3114 globalGranStats.tot_fake_fetches++;
3119 // ToDo: check: ASSERT(CurrentProc==node_loc);
3120 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3123 bqe points to the current element in the queue
3124 next points to the next element in the queue
3126 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3127 //tso_loc = where_is(tso);
3129 bqe = unblockOne(bqe, node);
3132 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3133 the closure to make room for the anchor of the BQ */
3134 if (bqe!=END_BQ_QUEUE) {
3135 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3137 ASSERT((info_ptr==&RBH_Save_0_info) ||
3138 (info_ptr==&RBH_Save_1_info) ||
3139 (info_ptr==&RBH_Save_2_info));
3141 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3142 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3143 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3146 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3147 node, info_type(node)));
3150 /* statistics gathering */
3151 if (RtsFlags.GranFlags.GranSimStats.Global) {
3152 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3153 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3154 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3155 globalGranStats.tot_awbq++; // total no. of bqs awakened
3158 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3159 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3161 #elif defined(PARALLEL_HASKELL)
3163 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3165 StgBlockingQueueElement *bqe;
3167 IF_PAR_DEBUG(verbose,
3168 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3172 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3173 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3178 ASSERT(q == END_BQ_QUEUE ||
3179 get_itbl(q)->type == TSO ||
3180 get_itbl(q)->type == BLOCKED_FETCH ||
3181 get_itbl(q)->type == CONSTR);
3184 while (get_itbl(bqe)->type==TSO ||
3185 get_itbl(bqe)->type==BLOCKED_FETCH) {
3186 bqe = unblockOne(bqe, node);
3190 #else /* !GRAN && !PARALLEL_HASKELL */
3193 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3195 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3197 while (tso != END_TSO_QUEUE) {
3198 tso = unblockOne(cap,tso);
3203 /* ---------------------------------------------------------------------------
3205 - usually called inside a signal handler so it mustn't do anything fancy.
3206 ------------------------------------------------------------------------ */
3209 interruptStgRts(void)
3213 #if defined(THREADED_RTS)
3214 prodAllCapabilities();
3218 /* -----------------------------------------------------------------------------
3221 This is for use when we raise an exception in another thread, which
3223 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3224 -------------------------------------------------------------------------- */
3226 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3228 NB: only the type of the blocking queue is different in GranSim and GUM
3229 the operations on the queue-elements are the same
3230 long live polymorphism!
3232 Locks: sched_mutex is held upon entry and exit.
3236 unblockThread(Capability *cap, StgTSO *tso)
3238 StgBlockingQueueElement *t, **last;
3240 switch (tso->why_blocked) {
3243 return; /* not blocked */
3246 // Be careful: nothing to do here! We tell the scheduler that the thread
3247 // is runnable and we leave it to the stack-walking code to abort the
3248 // transaction while unwinding the stack. We should perhaps have a debugging
3249 // test to make sure that this really happens and that the 'zombie' transaction
3250 // does not get committed.
3254 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3256 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3257 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3259 last = (StgBlockingQueueElement **)&mvar->head;
3260 for (t = (StgBlockingQueueElement *)mvar->head;
3262 last = &t->link, last_tso = t, t = t->link) {
3263 if (t == (StgBlockingQueueElement *)tso) {
3264 *last = (StgBlockingQueueElement *)tso->link;
3265 if (mvar->tail == tso) {
3266 mvar->tail = (StgTSO *)last_tso;
3271 barf("unblockThread (MVAR): TSO not found");
3274 case BlockedOnBlackHole:
3275 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3277 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3279 last = &bq->blocking_queue;
3280 for (t = bq->blocking_queue;
3282 last = &t->link, t = t->link) {
3283 if (t == (StgBlockingQueueElement *)tso) {
3284 *last = (StgBlockingQueueElement *)tso->link;
3288 barf("unblockThread (BLACKHOLE): TSO not found");
3291 case BlockedOnException:
3293 StgTSO *target = tso->block_info.tso;
3295 ASSERT(get_itbl(target)->type == TSO);
3297 if (target->what_next == ThreadRelocated) {
3298 target = target->link;
3299 ASSERT(get_itbl(target)->type == TSO);
3302 ASSERT(target->blocked_exceptions != NULL);
3304 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3305 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3307 last = &t->link, t = t->link) {
3308 ASSERT(get_itbl(t)->type == TSO);
3309 if (t == (StgBlockingQueueElement *)tso) {
3310 *last = (StgBlockingQueueElement *)tso->link;
3314 barf("unblockThread (Exception): TSO not found");
3318 case BlockedOnWrite:
3319 #if defined(mingw32_HOST_OS)
3320 case BlockedOnDoProc:
3323 /* take TSO off blocked_queue */
3324 StgBlockingQueueElement *prev = NULL;
3325 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3326 prev = t, t = t->link) {
3327 if (t == (StgBlockingQueueElement *)tso) {
3329 blocked_queue_hd = (StgTSO *)t->link;
3330 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3331 blocked_queue_tl = END_TSO_QUEUE;
3334 prev->link = t->link;
3335 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3336 blocked_queue_tl = (StgTSO *)prev;
3339 #if defined(mingw32_HOST_OS)
3340 /* (Cooperatively) signal that the worker thread should abort
3343 abandonWorkRequest(tso->block_info.async_result->reqID);
3348 barf("unblockThread (I/O): TSO not found");
3351 case BlockedOnDelay:
3353 /* take TSO off sleeping_queue */
3354 StgBlockingQueueElement *prev = NULL;
3355 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3356 prev = t, t = t->link) {
3357 if (t == (StgBlockingQueueElement *)tso) {
3359 sleeping_queue = (StgTSO *)t->link;
3361 prev->link = t->link;
3366 barf("unblockThread (delay): TSO not found");
3370 barf("unblockThread");
3374 tso->link = END_TSO_QUEUE;
3375 tso->why_blocked = NotBlocked;
3376 tso->block_info.closure = NULL;
3377 pushOnRunQueue(cap,tso);
3381 unblockThread(Capability *cap, StgTSO *tso)
3385 /* To avoid locking unnecessarily. */
3386 if (tso->why_blocked == NotBlocked) {
3390 switch (tso->why_blocked) {
3393 // Be careful: nothing to do here! We tell the scheduler that the thread
3394 // is runnable and we leave it to the stack-walking code to abort the
3395 // transaction while unwinding the stack. We should perhaps have a debugging
3396 // test to make sure that this really happens and that the 'zombie' transaction
3397 // does not get committed.
3401 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3403 StgTSO *last_tso = END_TSO_QUEUE;
3404 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3407 for (t = mvar->head; t != END_TSO_QUEUE;
3408 last = &t->link, last_tso = t, t = t->link) {
3411 if (mvar->tail == tso) {
3412 mvar->tail = last_tso;
3417 barf("unblockThread (MVAR): TSO not found");
3420 case BlockedOnBlackHole:
3422 last = &blackhole_queue;
3423 for (t = blackhole_queue; t != END_TSO_QUEUE;
3424 last = &t->link, t = t->link) {
3430 barf("unblockThread (BLACKHOLE): TSO not found");
3433 case BlockedOnException:
3435 StgTSO *target = tso->block_info.tso;
3437 ASSERT(get_itbl(target)->type == TSO);
3439 while (target->what_next == ThreadRelocated) {
3440 target = target->link;
3441 ASSERT(get_itbl(target)->type == TSO);
3444 ASSERT(target->blocked_exceptions != NULL);
3446 last = &target->blocked_exceptions;
3447 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3448 last = &t->link, t = t->link) {
3449 ASSERT(get_itbl(t)->type == TSO);
3455 barf("unblockThread (Exception): TSO not found");
3458 #if !defined(THREADED_RTS)
3460 case BlockedOnWrite:
3461 #if defined(mingw32_HOST_OS)
3462 case BlockedOnDoProc:
3465 StgTSO *prev = NULL;
3466 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3467 prev = t, t = t->link) {
3470 blocked_queue_hd = t->link;
3471 if (blocked_queue_tl == t) {
3472 blocked_queue_tl = END_TSO_QUEUE;
3475 prev->link = t->link;
3476 if (blocked_queue_tl == t) {
3477 blocked_queue_tl = prev;
3480 #if defined(mingw32_HOST_OS)
3481 /* (Cooperatively) signal that the worker thread should abort
3484 abandonWorkRequest(tso->block_info.async_result->reqID);
3489 barf("unblockThread (I/O): TSO not found");
3492 case BlockedOnDelay:
3494 StgTSO *prev = NULL;
3495 for (t = sleeping_queue; t != END_TSO_QUEUE;
3496 prev = t, t = t->link) {
3499 sleeping_queue = t->link;
3501 prev->link = t->link;
3506 barf("unblockThread (delay): TSO not found");
3511 barf("unblockThread");
3515 tso->link = END_TSO_QUEUE;
3516 tso->why_blocked = NotBlocked;
3517 tso->block_info.closure = NULL;
3518 appendToRunQueue(cap,tso);
3522 /* -----------------------------------------------------------------------------
3525 * Check the blackhole_queue for threads that can be woken up. We do
3526 * this periodically: before every GC, and whenever the run queue is
3529 * An elegant solution might be to just wake up all the blocked
3530 * threads with awakenBlockedQueue occasionally: they'll go back to
3531 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3532 * doesn't give us a way to tell whether we've actually managed to
3533 * wake up any threads, so we would be busy-waiting.
3535 * -------------------------------------------------------------------------- */
3538 checkBlackHoles (Capability *cap)
3541 rtsBool any_woke_up = rtsFalse;
3544 // blackhole_queue is global:
3545 ASSERT_LOCK_HELD(&sched_mutex);
3547 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3549 // ASSUMES: sched_mutex
3550 prev = &blackhole_queue;
3551 t = blackhole_queue;
3552 while (t != END_TSO_QUEUE) {
3553 ASSERT(t->why_blocked == BlockedOnBlackHole);
3554 type = get_itbl(t->block_info.closure)->type;
3555 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3556 IF_DEBUG(sanity,checkTSO(t));
3557 t = unblockOne(cap, t);
3558 // urk, the threads migrate to the current capability
3559 // here, but we'd like to keep them on the original one.
3561 any_woke_up = rtsTrue;
3571 /* -----------------------------------------------------------------------------
3574 * The following function implements the magic for raising an
3575 * asynchronous exception in an existing thread.
3577 * We first remove the thread from any queue on which it might be
3578 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3580 * We strip the stack down to the innermost CATCH_FRAME, building
3581 * thunks in the heap for all the active computations, so they can
3582 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3583 * an application of the handler to the exception, and push it on
3584 * the top of the stack.
3586 * How exactly do we save all the active computations? We create an
3587 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3588 * AP_STACKs pushes everything from the corresponding update frame
3589 * upwards onto the stack. (Actually, it pushes everything up to the
3590 * next update frame plus a pointer to the next AP_STACK object.
3591 * Entering the next AP_STACK object pushes more onto the stack until we
3592 * reach the last AP_STACK object - at which point the stack should look
3593 * exactly as it did when we killed the TSO and we can continue
3594 * execution by entering the closure on top of the stack.
3596 * We can also kill a thread entirely - this happens if either (a) the
3597 * exception passed to raiseAsync is NULL, or (b) there's no
3598 * CATCH_FRAME on the stack. In either case, we strip the entire
3599 * stack and replace the thread with a zombie.
3601 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3602 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3603 * the TSO is currently blocked on or on the run queue of.
3605 * -------------------------------------------------------------------------- */
3608 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3610 raiseAsync_(cap, tso, exception, rtsFalse);
3614 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3615 rtsBool stop_at_atomically)
3617 StgRetInfoTable *info;
3620 // Thread already dead?
3621 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3626 sched_belch("raising exception in thread %ld.", (long)tso->id));
3628 // Remove it from any blocking queues
3629 unblockThread(cap,tso);
3633 // The stack freezing code assumes there's a closure pointer on
3634 // the top of the stack, so we have to arrange that this is the case...
3636 if (sp[0] == (W_)&stg_enter_info) {
3640 sp[0] = (W_)&stg_dummy_ret_closure;
3646 // 1. Let the top of the stack be the "current closure"
3648 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3651 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3652 // current closure applied to the chunk of stack up to (but not
3653 // including) the update frame. This closure becomes the "current
3654 // closure". Go back to step 2.
3656 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3657 // top of the stack applied to the exception.
3659 // 5. If it's a STOP_FRAME, then kill the thread.
3661 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3668 info = get_ret_itbl((StgClosure *)frame);
3670 while (info->i.type != UPDATE_FRAME
3671 && (info->i.type != CATCH_FRAME || exception == NULL)
3672 && info->i.type != STOP_FRAME
3673 && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3675 if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3676 // IF we find an ATOMICALLY_FRAME then we abort the
3677 // current transaction and propagate the exception. In
3678 // this case (unlike ordinary exceptions) we do not care
3679 // whether the transaction is valid or not because its
3680 // possible validity cannot have caused the exception
3681 // and will not be visible after the abort.
3683 debugBelch("Found atomically block delivering async exception\n"));
3684 stmAbortTransaction(tso -> trec);
3685 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3687 frame += stack_frame_sizeW((StgClosure *)frame);
3688 info = get_ret_itbl((StgClosure *)frame);
3691 switch (info->i.type) {
3693 case ATOMICALLY_FRAME:
3694 ASSERT(stop_at_atomically);
3695 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3696 stmCondemnTransaction(tso -> trec);
3700 // R1 is not a register: the return convention for IO in
3701 // this case puts the return value on the stack, so we
3702 // need to set up the stack to return to the atomically
3703 // frame properly...
3704 tso->sp = frame - 2;
3705 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3706 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3708 tso->what_next = ThreadRunGHC;
3712 // If we find a CATCH_FRAME, and we've got an exception to raise,
3713 // then build the THUNK raise(exception), and leave it on
3714 // top of the CATCH_FRAME ready to enter.
3718 StgCatchFrame *cf = (StgCatchFrame *)frame;
3722 // we've got an exception to raise, so let's pass it to the
3723 // handler in this frame.
3725 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3726 TICK_ALLOC_SE_THK(1,0);
3727 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3728 raise->payload[0] = exception;
3730 // throw away the stack from Sp up to the CATCH_FRAME.
3734 /* Ensure that async excpetions are blocked now, so we don't get
3735 * a surprise exception before we get around to executing the
3738 if (tso->blocked_exceptions == NULL) {
3739 tso->blocked_exceptions = END_TSO_QUEUE;
3742 /* Put the newly-built THUNK on top of the stack, ready to execute
3743 * when the thread restarts.
3746 sp[-1] = (W_)&stg_enter_info;
3748 tso->what_next = ThreadRunGHC;
3749 IF_DEBUG(sanity, checkTSO(tso));
3758 // First build an AP_STACK consisting of the stack chunk above the
3759 // current update frame, with the top word on the stack as the
3762 words = frame - sp - 1;
3763 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3766 ap->fun = (StgClosure *)sp[0];
3768 for(i=0; i < (nat)words; ++i) {
3769 ap->payload[i] = (StgClosure *)*sp++;
3772 SET_HDR(ap,&stg_AP_STACK_info,
3773 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3774 TICK_ALLOC_UP_THK(words+1,0);
3777 debugBelch("sched: Updating ");
3778 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3779 debugBelch(" with ");
3780 printObj((StgClosure *)ap);
3783 // Replace the updatee with an indirection - happily
3784 // this will also wake up any threads currently
3785 // waiting on the result.
3787 // Warning: if we're in a loop, more than one update frame on
3788 // the stack may point to the same object. Be careful not to
3789 // overwrite an IND_OLDGEN in this case, because we'll screw
3790 // up the mutable lists. To be on the safe side, don't
3791 // overwrite any kind of indirection at all. See also
3792 // threadSqueezeStack in GC.c, where we have to make a similar
3795 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3796 // revert the black hole
3797 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3800 sp += sizeofW(StgUpdateFrame) - 1;
3801 sp[0] = (W_)ap; // push onto stack
3806 // We've stripped the entire stack, the thread is now dead.
3807 tso->what_next = ThreadKilled;
3808 tso->sp = frame + sizeofW(StgStopFrame);
3818 /* -----------------------------------------------------------------------------
3821 This is used for interruption (^C) and forking, and corresponds to
3822 raising an exception but without letting the thread catch the
3824 -------------------------------------------------------------------------- */
3827 deleteThread (Capability *cap, StgTSO *tso)
3829 if (tso->why_blocked != BlockedOnCCall &&
3830 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3831 raiseAsync(cap,tso,NULL);
3835 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3837 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3838 { // for forkProcess only:
3839 // delete thread without giving it a chance to catch the KillThread exception
3841 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3845 if (tso->why_blocked != BlockedOnCCall &&
3846 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3847 unblockThread(cap,tso);
3850 tso->what_next = ThreadKilled;
3854 /* -----------------------------------------------------------------------------
3855 raiseExceptionHelper
3857 This function is called by the raise# primitve, just so that we can
3858 move some of the tricky bits of raising an exception from C-- into
3859 C. Who knows, it might be a useful re-useable thing here too.
3860 -------------------------------------------------------------------------- */
3863 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3865 Capability *cap = regTableToCapability(reg);
3866 StgThunk *raise_closure = NULL;
3868 StgRetInfoTable *info;
3870 // This closure represents the expression 'raise# E' where E
3871 // is the exception raise. It is used to overwrite all the
3872 // thunks which are currently under evaluataion.
3876 // LDV profiling: stg_raise_info has THUNK as its closure
3877 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3878 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3879 // 1 does not cause any problem unless profiling is performed.
3880 // However, when LDV profiling goes on, we need to linearly scan
3881 // small object pool, where raise_closure is stored, so we should
3882 // use MIN_UPD_SIZE.
3884 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3885 // sizeofW(StgClosure)+1);
3889 // Walk up the stack, looking for the catch frame. On the way,
3890 // we update any closures pointed to from update frames with the
3891 // raise closure that we just built.
3895 info = get_ret_itbl((StgClosure *)p);
3896 next = p + stack_frame_sizeW((StgClosure *)p);
3897 switch (info->i.type) {
3900 // Only create raise_closure if we need to.
3901 if (raise_closure == NULL) {
3903 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3904 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3905 raise_closure->payload[0] = exception;
3907 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3911 case ATOMICALLY_FRAME:
3912 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3914 return ATOMICALLY_FRAME;
3920 case CATCH_STM_FRAME:
3921 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3923 return CATCH_STM_FRAME;
3929 case CATCH_RETRY_FRAME:
3938 /* -----------------------------------------------------------------------------
3939 findRetryFrameHelper
3941 This function is called by the retry# primitive. It traverses the stack
3942 leaving tso->sp referring to the frame which should handle the retry.
3944 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3945 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3947 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3948 despite the similar implementation.
3950 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3951 not be created within memory transactions.
3952 -------------------------------------------------------------------------- */
3955 findRetryFrameHelper (StgTSO *tso)
3958 StgRetInfoTable *info;
3962 info = get_ret_itbl((StgClosure *)p);
3963 next = p + stack_frame_sizeW((StgClosure *)p);
3964 switch (info->i.type) {
3966 case ATOMICALLY_FRAME:
3967 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3969 return ATOMICALLY_FRAME;
3971 case CATCH_RETRY_FRAME:
3972 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3974 return CATCH_RETRY_FRAME;
3976 case CATCH_STM_FRAME:
3978 ASSERT(info->i.type != CATCH_FRAME);
3979 ASSERT(info->i.type != STOP_FRAME);
3986 /* -----------------------------------------------------------------------------
3987 resurrectThreads is called after garbage collection on the list of
3988 threads found to be garbage. Each of these threads will be woken
3989 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3990 on an MVar, or NonTermination if the thread was blocked on a Black
3993 Locks: assumes we hold *all* the capabilities.
3994 -------------------------------------------------------------------------- */
3997 resurrectThreads (StgTSO *threads)
4002 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4003 next = tso->global_link;
4004 tso->global_link = all_threads;
4006 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4008 // Wake up the thread on the Capability it was last on for a
4009 // bound thread, or last_free_capability otherwise.
4011 cap = tso->bound->cap;
4013 cap = last_free_capability;
4016 switch (tso->why_blocked) {
4018 case BlockedOnException:
4019 /* Called by GC - sched_mutex lock is currently held. */
4020 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4022 case BlockedOnBlackHole:
4023 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4026 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4029 /* This might happen if the thread was blocked on a black hole
4030 * belonging to a thread that we've just woken up (raiseAsync
4031 * can wake up threads, remember...).
4035 barf("resurrectThreads: thread blocked in a strange way");
4040 /* ----------------------------------------------------------------------------
4041 * Debugging: why is a thread blocked
4042 * [Also provides useful information when debugging threaded programs
4043 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4044 ------------------------------------------------------------------------- */
4048 printThreadBlockage(StgTSO *tso)
4050 switch (tso->why_blocked) {
4052 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4054 case BlockedOnWrite:
4055 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4057 #if defined(mingw32_HOST_OS)
4058 case BlockedOnDoProc:
4059 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4062 case BlockedOnDelay:
4063 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4066 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4068 case BlockedOnException:
4069 debugBelch("is blocked on delivering an exception to thread %d",
4070 tso->block_info.tso->id);
4072 case BlockedOnBlackHole:
4073 debugBelch("is blocked on a black hole");
4076 debugBelch("is not blocked");
4078 #if defined(PARALLEL_HASKELL)
4080 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4081 tso->block_info.closure, info_type(tso->block_info.closure));
4083 case BlockedOnGA_NoSend:
4084 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4085 tso->block_info.closure, info_type(tso->block_info.closure));
4088 case BlockedOnCCall:
4089 debugBelch("is blocked on an external call");
4091 case BlockedOnCCall_NoUnblockExc:
4092 debugBelch("is blocked on an external call (exceptions were already blocked)");
4095 debugBelch("is blocked on an STM operation");
4098 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4099 tso->why_blocked, tso->id, tso);
4104 printThreadStatus(StgTSO *t)
4106 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4108 void *label = lookupThreadLabel(t->id);
4109 if (label) debugBelch("[\"%s\"] ",(char *)label);
4111 if (t->what_next == ThreadRelocated) {
4112 debugBelch("has been relocated...\n");
4114 switch (t->what_next) {
4116 debugBelch("has been killed");
4118 case ThreadComplete:
4119 debugBelch("has completed");
4122 printThreadBlockage(t);
4129 printAllThreads(void)
4136 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4137 ullong_format_string(TIME_ON_PROC(CurrentProc),
4138 time_string, rtsFalse/*no commas!*/);
4140 debugBelch("all threads at [%s]:\n", time_string);
4141 # elif defined(PARALLEL_HASKELL)
4142 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4143 ullong_format_string(CURRENT_TIME,
4144 time_string, rtsFalse/*no commas!*/);
4146 debugBelch("all threads at [%s]:\n", time_string);
4148 debugBelch("all threads:\n");
4151 for (i = 0; i < n_capabilities; i++) {
4152 cap = &capabilities[i];
4153 debugBelch("threads on capability %d:\n", cap->no);
4154 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4155 printThreadStatus(t);
4159 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4160 if (t->why_blocked != NotBlocked) {
4161 printThreadStatus(t);
4163 if (t->what_next == ThreadRelocated) {
4166 next = t->global_link;
4173 printThreadQueue(StgTSO *t)
4176 for (; t != END_TSO_QUEUE; t = t->link) {
4177 printThreadStatus(t);
4180 debugBelch("%d threads on queue\n", i);
4184 Print a whole blocking queue attached to node (debugging only).
4186 # if defined(PARALLEL_HASKELL)
4188 print_bq (StgClosure *node)
4190 StgBlockingQueueElement *bqe;
4194 debugBelch("## BQ of closure %p (%s): ",
4195 node, info_type(node));
4197 /* should cover all closures that may have a blocking queue */
4198 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4199 get_itbl(node)->type == FETCH_ME_BQ ||
4200 get_itbl(node)->type == RBH ||
4201 get_itbl(node)->type == MVAR);
4203 ASSERT(node!=(StgClosure*)NULL); // sanity check
4205 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4209 Print a whole blocking queue starting with the element bqe.
4212 print_bqe (StgBlockingQueueElement *bqe)
4217 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4219 for (end = (bqe==END_BQ_QUEUE);
4220 !end; // iterate until bqe points to a CONSTR
4221 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4222 bqe = end ? END_BQ_QUEUE : bqe->link) {
4223 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4224 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4225 /* types of closures that may appear in a blocking queue */
4226 ASSERT(get_itbl(bqe)->type == TSO ||
4227 get_itbl(bqe)->type == BLOCKED_FETCH ||
4228 get_itbl(bqe)->type == CONSTR);
4229 /* only BQs of an RBH end with an RBH_Save closure */
4230 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4232 switch (get_itbl(bqe)->type) {
4234 debugBelch(" TSO %u (%x),",
4235 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4238 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4239 ((StgBlockedFetch *)bqe)->node,
4240 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4241 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4242 ((StgBlockedFetch *)bqe)->ga.weight);
4245 debugBelch(" %s (IP %p),",
4246 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4247 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4248 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4249 "RBH_Save_?"), get_itbl(bqe));
4252 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4253 info_type((StgClosure *)bqe)); // , node, info_type(node));
4259 # elif defined(GRAN)
4261 print_bq (StgClosure *node)
4263 StgBlockingQueueElement *bqe;
4264 PEs node_loc, tso_loc;
4267 /* should cover all closures that may have a blocking queue */
4268 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4269 get_itbl(node)->type == FETCH_ME_BQ ||
4270 get_itbl(node)->type == RBH);
4272 ASSERT(node!=(StgClosure*)NULL); // sanity check
4273 node_loc = where_is(node);
4275 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4276 node, info_type(node), node_loc);
4279 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4281 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4282 !end; // iterate until bqe points to a CONSTR
4283 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4284 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4285 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4286 /* types of closures that may appear in a blocking queue */
4287 ASSERT(get_itbl(bqe)->type == TSO ||
4288 get_itbl(bqe)->type == CONSTR);
4289 /* only BQs of an RBH end with an RBH_Save closure */
4290 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4292 tso_loc = where_is((StgClosure *)bqe);
4293 switch (get_itbl(bqe)->type) {
4295 debugBelch(" TSO %d (%p) on [PE %d],",
4296 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4299 debugBelch(" %s (IP %p),",
4300 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4301 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4302 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4303 "RBH_Save_?"), get_itbl(bqe));
4306 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4307 info_type((StgClosure *)bqe), node, info_type(node));
4315 #if defined(PARALLEL_HASKELL)
4322 for (i=0, tso=run_queue_hd;
4323 tso != END_TSO_QUEUE;
4324 i++, tso=tso->link) {
4333 sched_belch(char *s, ...)
4338 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4339 #elif defined(PARALLEL_HASKELL)
4342 debugBelch("sched: ");